Source code for angr.exploration_techniques.threading

# pylint: disable=cell-var-from-loop
import concurrent.futures
import logging

from . import ExplorationTechnique
from ..engines.engine import TLSMixin
from ..misc.ux import once

l = logging.getLogger(__name__)


[docs]class Threading(ExplorationTechnique): """ Enable multithreading. This is only useful in paths where a lot of time is taken inside z3, doing constraint solving. This is because of python's GIL, which says that only one thread at a time may be executing python code. """
[docs] def __init__(self, threads=8, local_stash="thread_local"): super().__init__() self.threads = threads self.queued = set() self.tasks = set() self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=threads) self.local_stash = local_stash
[docs] def step(self, simgr, stash="active", error_list=None, target_stash=None, **kwargs): target_stash = target_stash or stash if error_list is not None: raise ValueError("Can't pass error_list to step with threading enabled. Did you install threading twice?") l.info("Thread-stepping %s of %s", stash, simgr) for state in simgr.stashes[stash]: if state in self.queued: continue # construct new simgr with new lists # this means that threads won't trample each other's hook stacks # but can still negotiate over shared resources tsimgr = simgr.copy() tsimgr._stashes = {self.local_stash: [state]} tsimgr._errored = [] self.tasks.add(self.executor.submit(self.inner_step, state, tsimgr, target_stash=target_stash, **kwargs)) self.queued.add(state) timeout = None while True: done, self.tasks = concurrent.futures.wait( self.tasks, timeout=timeout, return_when=concurrent.futures.FIRST_COMPLETED ) if not done: break for done_future in done: done_future: concurrent.futures.Future state, error_list, tsimgr = done_future.result() simgr.absorb(tsimgr) simgr.errored.extend(error_list) simgr.stashes[stash].remove(state) self.queued.remove(state) timeout = 0 return simgr
[docs] def inner_step(self, state, simgr, **kwargs): error_list = [] simgr.step(stash=self.local_stash, error_list=error_list, **kwargs) return state, error_list, simgr
[docs] def successors(self, simgr, state, engine=None, **kwargs): engine = engine or self.project.factory.default_engine if not isinstance(engine, TLSMixin) and once("tls_engine"): l.error("Using Threading exploration technique but your engine is not thread-safe.") l.error("Do you want to add the TLSMixin to your engine?") return simgr.successors(state, engine=engine, **kwargs)