[docs]defget_memory_rss(pid:int)->int:""" Returns the physical memory used by a process. :param pid: process id, current one is `os.getpid()` :return: physical memory It relies on the module :epkg:`psutil`. """importpsutilprocess=psutil.Process(pid)mem=process.memory_info().rssreturnmem
classMonitor:def__init__(self):self.max_peak=0self.average=0self.n_measures=0self.begin=0self.end=0defto_dict(self,unit:int=1):funit=float(unit)returndict(peak=self.max_peak/funit,mean=self.average*1.0/self.n_measures/funit,n=self.n_measures,begin=self.begin/funit,end=self.end/funit,)@propertydefdelta_peak(self):returnself.max_peak-self.begin@propertydefdelta_end(self):returnself.end-self.begin@propertydefdelta_avg(self):returnself.average/self.n_measures-self.begindef__repr__(self):return(f"{self.__class__.__name__}(begin={self.begin}, end={self.end}, "f"peak={self.max_peak}, average={self.average}, n={self.n_measures}, "f"d_end={self.delta_end}, d_peak={self.delta_peak}, d_avg={self.delta_avg}"f")")defupdate(self,mem):ifself.n_measures==0:self.begin=memself.max_peak=max(mem,self.max_peak)self.average+=memself.end=memself.n_measures+=1defsend(self,conn):conn.send(self.max_peak)conn.send(self.average)conn.send(self.n_measures)conn.send(self.begin)conn.send(self.end)@classmethoddefrecv(cls,conn):m=cls()m.max_peak=conn.recv()m.average=conn.recv()m.n_measures=conn.recv()m.begin=conn.recv()m.end=conn.recv()returnmdef_process_memory_spy(conn):# Sends the value it started.conn.send(-2)# process id to spy onpid=conn.recv()# delay between two measurestimeout=conn.recv()# do CUDAcuda=conn.recv()importpsutilprocess=psutil.Process(pid)ifcuda:frompynvmlimport(nvmlDeviceGetCount,nvmlDeviceGetHandleByIndex,nvmlDeviceGetMemoryInfo,nvmlInit,nvmlShutdown,)nvmlInit()n_gpus=nvmlDeviceGetCount()handles=[nvmlDeviceGetHandleByIndex(i)foriinrange(n_gpus)]defgpu_used():return[nvmlDeviceGetMemoryInfo(h).usedforhinhandles]gpus=[Monitor()foriinrange(n_gpus)]else:gpus=[]cpu=Monitor()conn.send(-2)# loopwhileTrue:mem=process.memory_info().rsscpu.update(mem)ifcuda:forr,ginzip(gpu_used(),gpus):g.update(r)ifconn.poll(timeout=timeout):code=conn.recv()ifcode==-3:break# final iterationend=process.memory_info().rsscpu.update(end)ifcuda:forr,ginzip(gpu_used(),gpus):g.update(r)# sendcpu.send(conn)conn.send(len(gpus))forgingpus:g.send(conn)ifcuda:nvmlShutdown()conn.close()
[docs]classMemorySpy:""" Information about the spy. It class method `start`. Method `stop` can be called to end the measure. :param pid: process id of the process to spy on :param delay: spy on every delay seconds :param cuda: enable cuda monitoring """def__init__(self,pid:int,delay:float=0.01,cuda:bool=False):self.pid=pidself.delay=delayself.cuda=cudaself.start()
[docs]defstart(self)->"MemorySpy":"""Starts another process and tells it to spy."""self.parent_conn,self.child_conn=multiprocessing.Pipe()self.child_process=multiprocessing.Process(target=_process_memory_spy,args=(self.child_conn,))self.child_process.start()data=self.parent_conn.recv()ifdata!=-2:raiseRuntimeError(f"The child processing is supposed to send -2 not {data}.")self.parent_conn.send(self.pid)self.parent_conn.send(self.delay)self.parent_conn.send(1ifself.cudaelse0)data=self.parent_conn.recv()ifdata!=-2:raiseRuntimeError(f"The child processing is supposed to send -2 again not {data}.")returnself
[docs]defstart_spying_on(pid:Optional[int]=None,delay:float=0.01,cuda:bool=False)->MemorySpy:""" Starts the memory spy. The function starts another process spying on the one sent as an argument. :param pid: process id to spy or the the current one. :param delay: delay between two measures. :param cuda: True or False to get memory for cuda devices Example:: .. code-block:: python from experimental_experiment.memory_peak import start_spying_on, flatten p = start_spying_on() # ... # code to measure # ... stat = p.stop() print(stat) print(flatten(stat)) """ifpidisNone:pid=os.getpid()returnMemorySpy(pid,delay,cuda)