[docs]classServer:""" Server implements the analysis server with a series of control interfaces exposed. :ivar project: An instance of angr.Project. :ivar str spill_yard: A directory to store spilled states. :ivar str db: Path of the database that stores information about spilled states. :ivar int max_workers: Maximum number of workers. Each worker starts a new process. :ivar int max_states: Maximum number of active states for each worker. :ivar int staging_max: Maximum number of inactive states that are kept into memory before spilled onto the disk and potentially be picked up by another worker. :ivar bool bucketizer: Use the Bucketizer exploration strategy. :ivar _worker_exit_callback: A method that will be called upon the exit of each worker. """
[docs]def__init__(self,project,spill_yard=None,db=None,max_workers=None,max_states=10,staging_max=10,bucketizer=True,recursion_limit=1000,worker_exit_callback=None,techniques=None,add_options=None,remove_options=None,):self.project=projectself.spill_yard=spill_yardifspill_yardelsetempfile.mkdtemp(suffix="angr_spill_yard")ifnotspill_yard:_l.info("Temporary spill yard: %s",self.spill_yard)self.db_str=dbifdbelse"sqlite:///"+os.path.join(tempfile.mkdtemp(suffix="angr_server_db"),"db.sqlite3")ifnotdb:_l.info("Database: %s",self.db_str)self.max_workers=max_workersifmax_workersisnotNoneelsemultiprocessing.cpu_count()self.max_states=max_statesself.staging_max=staging_maxself.bucketizer=bucketizerself.techniques=techniquesself.add_options=add_optionsself.remove_options=remove_optionsself._recursion_limit=recursion_limitself._worker_exit_args_lock=Noneself._worker_exit_args:dict[int,tuple]=None# the following will not be pickledself._worker_exit_callback=worker_exit_callbackself._workers=[]self._stopped=Falseself._active_workers=multiprocessing.Value("i",lock=True)
[docs]defon_worker_exit(self,worker_id,stashes):ifself._worker_exit_args_lockisnotNone:# callback is enabled# we add this check since passing Python objects between processes is definitely not fastwithself._worker_exit_args_lock:self._worker_exit_args[worker_id]=(worker_id,stashes,)
## Public methods#
[docs]defrun(self):# create workerswithmultiprocessing.Manager()asmanager:server_state=manager.dict()server_state["stopped"]=self.stoppedifself._worker_exit_callback:# Do not initialize the lock if no callback is providedself._worker_exit_args_lock=manager.Lock()# pylint:disable=no-memberself._worker_exit_args=manager.dict()foriinrange(self.max_workers):_l.info("### Creating worker %d",i)worker=Worker(i,self,server_state,recursion_limit=self._recursion_limit,techniques=self.techniques,add_options=self.add_options,remove_options=self.remove_options,)self._workers.append(worker)# start themforwinself._workers:w.start()# should be enough for at least one child process to starttime.sleep(3)i=0whilenotself.stoppedorself.active_workers>0:server_state["stopped"]=self.stoppedtime.sleep(1)ifself._worker_exit_callbackandself._worker_exit_args:withself._worker_exit_args_lock:for_,argsinself._worker_exit_args.items():self._worker_exit_callback(*args)server_state["stopped"]=self.stoppedforworkerinself._workers:# wait for 10 seconds then kill the process_l.info("Joining worker %d.",worker.worker_id)worker._proc.join(10)ifworker._proc.is_alive():_l.info("Worker %d is still running. Kill it",worker.worker_id)worker._proc.kill()self._workers=[]