Source code for angr.exploration_techniques.spiller

# pylint:disable=no-member,import-outside-toplevel
import logging

from . import ExplorationTechnique


l = logging.getLogger(name=__name__)


[docs]class PickledStatesBase: """ The base class of pickled states """
[docs] def sort(self): """ Sort pickled states. """ raise NotImplementedError()
[docs] def add(self, prio, sid): """ Add a newly pickled state. :param int prio: Priority of the state. :param str sid: Persistent ID of the state. :return: None """ raise NotImplementedError()
[docs] def pop_n(self, n): """ Pop the top N states. :param int n: Number of states to take. :return: A list of states. """ raise NotImplementedError()
[docs]class PickledStatesList(PickledStatesBase): """ List-backed pickled state storage. """
[docs] def __init__(self): self._picked_states = []
[docs] def sort(self): self._picked_states.sort()
[docs] def add(self, prio, sid): self._picked_states.append((prio, sid))
[docs] def pop_n(self, n): ss = self._picked_states[:n] self._picked_states[:n] = [] return ss
[docs]class PickledStatesDb(PickledStatesBase): """ Database-backed pickled state storage. """
[docs] def __init__(self, db_str="sqlite:///:memory:"): from .spiller_db import sqlalchemy, create_engine, Base, OperationalError, sessionmaker if sqlalchemy is None: raise ImportError( f"Cannot import SQLAlchemy. Please install SQLAlchemy before using " f"{self.__class__.__name__}." ) # ORM declarations engine = create_engine(db_str) # create table try: Base.metadata.create_all(engine, checkfirst=True) except OperationalError: # table already exists pass self.Session = sessionmaker(bind=engine)
[docs] def sort(self): pass
[docs] def add(self, prio, sid, taken=False, stash="spilled"): # pylint:disable=arguments-differ from .spiller_db import PickledState record = PickledState(id=sid, priority=prio, taken=taken, stash=stash) session = self.Session() session.add(record) session.commit() session.close()
[docs] def pop_n(self, n, stash="spilled"): # pylint:disable=arguments-differ from .spiller_db import PickledState session = self.Session() q = ( session.query(PickledState) .filter_by(taken=False) .filter_by(stash=stash) .order_by(PickledState.priority) .limit(n) .all() ) ss = [] for r in q: r.taken = True ss.append((r.priority, r.id)) session.commit() session.close() return ss
[docs] def get_recent_n(self, n, stash="spilled"): from .spiller_db import PickledState session = self.Session() q = session.query(PickledState).filter_by(stash=stash).order_by(PickledState.timestamp.desc()).limit(n).all() ss = [] for r in q: ss.append((r.timestamp, r.id)) session.close() return ss
[docs] def count(self): from .spiller_db import PickledState session = self.Session() q = session.query(PickledState).count() session.close() return q
[docs]class Spiller(ExplorationTechnique): """ Automatically spill states out. It can spill out states to a different stash, spill them out to ANA, or first do the former and then (after enough states) the latter. """
[docs] def __init__( self, src_stash="active", min=5, max=10, # pylint:disable=redefined-builtin staging_stash="spill_stage", staging_min=10, staging_max=20, pickle_callback=None, unpickle_callback=None, post_pickle_callback=None, priority_key=None, vault=None, states_collection=None, ): """ Initializes the spiller. :param max: the number of states that are *not* spilled :param src_stash: the stash from which to spill states (default: active) :param staging_stash: the stash *to* which to spill states (default: "spill_stage") :param staging_max: the number of states that can be in the staging stash before things get spilled to ANA (default: None. If staging_stash is set, then this means unlimited, and ANA will not be used). :param priority_key: a function that takes a state and returns its numerical priority (MAX_INT is lowest priority). By default, self.state_priority will be used, which prioritizes by object ID. :param vault: an angr.Vault object to handle storing and loading of states. If not provided, an angr.vaults.VaultShelf will be created with a temporary file. """ super().__init__() self.max = max self.min = min self.src_stash = src_stash self.staging_stash = staging_stash self.staging_max = staging_max self.staging_min = staging_min # various callbacks self.priority_key = priority_key self.unpickle_callback = unpickle_callback self.pickle_callback = pickle_callback self.post_pickle_callback = post_pickle_callback # tracking of pickled stuff self._pickled_states = PickledStatesList() if states_collection is None else states_collection self._ever_pickled = 0 self._ever_unpickled = 0 self._vault = vaults.VaultShelf() if vault is None else vault
def _unpickle(self, n): self._pickled_states.sort() unpickled = [(sid, self._load_state(sid)) for _, sid in self._pickled_states.pop_n(n)] self._ever_unpickled += len(unpickled) if self.unpickle_callback: for sid, u in unpickled: self.unpickle_callback(sid, u) return [u for _, u in unpickled] def _get_priority(self, state): return (self.priority_key or self.state_priority)(state) def _pickle(self, states): if self.pickle_callback: for s in states: self.pickle_callback(s) self._ever_pickled += len(states) for state in states: try: state_oid = self._store_state(state) except RecursionError: l.warning( "Couldn't store the state because of a recursion error. This is most likely to be pickle's " "fault. You may try to increase the recursion limit using sys.setrecursionlimit()." ) continue prio = self._get_priority(state) if self.post_pickle_callback: self.post_pickle_callback(state, prio, state_oid) self._pickled_states.add(prio, state_oid) def _store_state(self, state): return self._vault.store(state) def _load_state(self, sid): return self._vault.load(sid)
[docs] def step(self, simgr, stash="active", **kwargs): simgr = simgr.step(stash=stash, **kwargs) l.debug( "STASH STATUS: active: %d, staging: %d", len(simgr.stashes[self.src_stash]), len(simgr.stashes[self.staging_stash]), ) states = simgr.stashes[self.src_stash] staged_states = simgr.stashes.setdefault(self.staging_stash, []) if self.staging_stash else [] if len(states) < self.min: missing = (self.max + self.min) // 2 - len(states) l.debug("Too few states (%d/%d) in stash %s.", len(states), self.min, self.src_stash) if self.staging_stash: l.debug("... retrieving states from staging stash (%s)", self.staging_stash) staged_states.sort(key=self.priority_key or self.state_priority) states += staged_states[:missing] staged_states[:missing] = [] else: l.debug("... staging stash disabled; unpickling states") states += self._unpickle(missing) if len(states) > self.max: l.debug("Too many states (%d/%d) in stash %s", len(states), self.max, self.src_stash) states.sort(key=self.priority_key or self.state_priority) staged_states += states[self.max :] states[self.max :] = [] # if we have too few staged states, unpickle up to halfway between max and min if len(staged_states) < self.staging_min: l.debug("Too few states in staging stash (%s)", self.staging_stash) staged_states += self._unpickle((self.staging_min + self.staging_max) // 2 - len(staged_states)) if len(staged_states) > self.staging_max: l.debug("Too many states in staging stash (%s)", self.staging_stash) self._pickle(staged_states[self.staging_max :]) staged_states[self.staging_max :] = [] simgr.stashes[self.src_stash] = states simgr.stashes[self.staging_stash] = staged_states return simgr
[docs] @staticmethod def state_priority(state): return id(state)
from .. import vaults