[docs]defget_memory_rss(pid:int)->int:""" Returns the physical memory used by a process. :param pid: process id, current one is `os.getpid()` :return: physical memory It relies on the module :epkg:`psutil`. """importpsutilprocess=psutil.Process(pid)mem=process.memory_info().rssreturnmem
classMonitor:def__init__(self):self.max_peak=0self.average=0self.n_measures=0self.begin=0self.end=0defto_dict(self,unit:int=1):funit=float(unit)returndict(peak=self.max_peak/funit,mean=self.average*1.0/self.n_measures/funit,n=self.n_measures/funit,begin=self.begin/funit,end=self.end/funit,)def__repr__(self):return(f"{self.__class__.__name__}(peak={self.max_peak}, "f"average={self.average}, n={self.n_measures})")defupdate(self,mem):ifself.n_measures==0:self.begin=memself.max_peak=max(mem,self.max_peak)self.average+=memself.end=memself.n_measures+=1defsend(self,conn):conn.send(self.max_peak)conn.send(self.average)conn.send(self.n_measures)conn.send(self.begin)conn.send(self.end)@classmethoddefrecv(cls,conn):m=cls()m.max_peak=conn.recv()m.average=conn.recv()m.n_measures=conn.recv()m.begin=conn.recv()m.end=conn.recv()returnmdef_process_memory_spy(conn):# Sends the value it started.conn.send(-2)# process id to spy onpid=conn.recv()# delay between two measurestimeout=conn.recv()# do CUDAcuda=conn.recv()importpsutilprocess=psutil.Process(pid)ifcuda:fromonnx_extended.validation.cuda.cuda_monitorimport(nvml_device_get_count,nvml_device_get_memory_info,nvml_init,nvml_shutdown,)nvml_init()n_gpus=nvml_device_get_count()defgpu_used():return[nvml_device_get_memory_info(i)[1]foriinrange(n_gpus)]gpus=[Monitor()foriinrange(n_gpus)]else:gpus=[]cpu=Monitor()conn.send(-2)# loopwhileTrue:mem=process.memory_info().rsscpu.update(mem)ifcuda:forr,ginzip(gpu_used(),gpus):g.update(r)ifconn.poll(timeout=timeout):code=conn.recv()ifcode==-3:break# final iterationend=process.memory_info().rsscpu.update(end)ifcuda:forr,ginzip(gpu_used(),gpus):g.update(r)# sendcpu.send(conn)conn.send(len(gpus))forgingpus:g.send(conn)ifcuda:nvml_shutdown()conn.close()
[docs]classMemorySpy:""" Information about the spy. It class method `start`. Method `stop` can be called to end the measure. :param pid: process id of the process to spy on :param delay: spy on every delay seconds :param cuda: enable cuda monitoring """def__init__(self,pid:int,delay:float=0.01,cuda:bool=False):self.pid=pidself.delay=delayself.cuda=cudaself.start()
[docs]defstart(self)->"MemorySpy":""" Starts another process and tells it to spy. """self.parent_conn,self.child_conn=multiprocessing.Pipe()self.child_process=multiprocessing.Process(target=_process_memory_spy,args=(self.child_conn,))self.child_process.start()data=self.parent_conn.recv()assertdata==-2,f"The child processing is supposed to send -2 not {data}."self.parent_conn.send(self.pid)self.parent_conn.send(self.delay)self.parent_conn.send(1ifself.cudaelse0)data=self.parent_conn.recv()assert(data==-2),"The child processing is supposed to send -2 again not {data}."returnself
[docs]defstop(self):""" Stops spying on. """self.parent_conn.send(-3)cpu=Monitor.recv(self.parent_conn)n_gpus=self.parent_conn.recv()gpus=[]foriinrange(n_gpus):gpus.append(Monitor.recv(self.parent_conn))self.parent_conn.close()self.child_process.join()res=dict(cpu=cpu)ifself.cuda:res["gpus"]=gpusreturnres
[docs]defstart_spying_on(pid:Optional[int]=None,delay:float=0.01,cuda:bool=False)->MemorySpy:""" Starts the memory spy. The function starts another process spying on the one sent as an argument. :param pid: process id to spy or the the current one. :param delay: delay between two measures. :param cuda: True or False to get memory for cuda devices Example:: .. code-block:: python from onnx_extended.memory_peak import start_spying_on p = start_spying_on() # ... # code to measure # ... stat = p.stop() print(stat) """ifpidisNone:pid=os.getpid()returnMemorySpy(pid,delay,cuda)