Source code for angr.exploration_techniques.explorer

import logging
import claripy

from . import ExplorationTechnique
from .common import condition_to_lambda
from .. import sim_options
from ..state_plugins.sim_event import resource_event

l = logging.getLogger(name=__name__)


[docs]class Explorer(ExplorationTechnique): """ Search for up to "num_find" paths that satisfy condition "find", avoiding condition "avoid". Stashes found paths into "find_stash' and avoided paths into "avoid_stash". The "find" and "avoid" parameters may be any of: - An address to find - A set or list of addresses to find - A function that takes a path and returns whether or not it matches. If an angr CFG is passed in as the "cfg" parameter and "find" is either a number or a list or a set, then any paths which cannot possibly reach a success state without going through a failure state will be preemptively avoided. If either the "find" or "avoid" parameter is a function returning a boolean, and a path triggers both conditions, it will be added to the find stash, unless "avoid_priority" is set to True. """
[docs] def __init__( self, find=None, avoid=None, find_stash="found", avoid_stash="avoid", cfg=None, num_find=1, avoid_priority=False ): super().__init__() self.find, static_find = condition_to_lambda(find) self.avoid, static_avoid = condition_to_lambda(avoid) self.find_stash = find_stash self.avoid_stash = avoid_stash self.cfg = cfg self.ok_blocks = set() self.num_find = num_find self.avoid_priority = avoid_priority # even if avoid or find addresses are not statically known, stop on those that we do know self._extra_stop_points = (static_find or set()) | (static_avoid or set()) self._unknown_stop_points = static_find is None or static_avoid is None self._warned_unicorn = False # TODO: This is a hack for while CFGFast doesn't handle procedure continuations from .. import analyses # pylint: disable=import-outside-toplevel if isinstance(cfg, analyses.CFGFast): l.error("CFGFast is currently inappropriate for use with Explorer.") l.error("Usage of the CFG has been disabled for this explorer.") self.cfg = None if self.cfg is not None: avoid = static_avoid or set() # we need the find addresses to be determined statically if not static_find: l.error("You must provide at least one numeric 'find' address if you provide a CFG.") l.error("Usage of the CFG has been disabled for this explorer.") self.cfg = None return for a in avoid: if cfg.model.get_any_node(a) is None: l.warning("'Avoid' address %#x not present in CFG...", a) # not a queue but a stack... it's just a worklist! queue = [] for f in static_find: nodes = cfg.model.get_all_nodes(f) if len(nodes) == 0: l.warning("'Find' address %#x not present in CFG...", f) else: queue.extend(nodes) seen_nodes = set() while len(queue) > 0: n = queue.pop() if id(n) in seen_nodes: continue if n.addr in avoid: continue self.ok_blocks.add(n.addr) seen_nodes.add(id(n)) queue.extend(n.predecessors) if len(self.ok_blocks) == 0: l.error("No addresses could be validated by the provided CFG!") l.error("Usage of the CFG has been disabled for this explorer.") self.cfg = None return l.warning("Please be sure that the CFG you have passed in is complete.") l.warning("Providing an incomplete CFG can cause viable paths to be discarded!")
[docs] def setup(self, simgr): if self.find_stash not in simgr.stashes: simgr.stashes[self.find_stash] = [] if self.avoid_stash not in simgr.stashes: simgr.stashes[self.avoid_stash] = []
[docs] def step(self, simgr, stash="active", **kwargs): base_extra_stop_points = set(kwargs.pop("extra_stop_points", [])) return simgr.step(stash=stash, extra_stop_points=base_extra_stop_points | self._extra_stop_points, **kwargs)
# make it more natural to deal with the intended dataflow
[docs] def filter(self, simgr, state, **kwargs): stash = self._filter_inner(state) if stash is None: return simgr.filter(state, **kwargs) return stash
def _filter_inner(self, state): if self._unknown_stop_points and sim_options.UNICORN in state.options and not self._warned_unicorn: l.warning("Using unicorn with find/avoid conditions that are a lambda (not a number, set, tuple or list)") l.warning("Unicorn may step over states that match the condition (find or avoid) without stopping.") self._warned_unicorn = True try: if self.avoid_priority: avoidable = self.avoid(state) if avoidable and (avoidable is True or state.addr in avoidable): return self.avoid_stash findable = self.find(state) if findable and (findable is True or state.addr in findable): return self.find_stash if not self.avoid_priority: avoidable = self.avoid(state) if avoidable and (avoidable is True or state.addr in avoidable): return self.avoid_stash except claripy.errors.ClaripySolverInterruptError as e: resource_event(state, e) return "interrupted" if self.cfg is not None and self.cfg.model.get_any_node(state.addr) is not None: if state.addr not in self.ok_blocks: return self.avoid_stash return None
[docs] def complete(self, simgr): return len(simgr.stashes[self.find_stash]) >= self.num_find