Source code for angr.exploration_techniques.suggestions

import logging
import claripy

from . import ExplorationTechnique
from ..misc.ux import once
from ..misc.picklable_lock import PicklableLock
from ..state_plugins.sim_action import SimActionConstraint
from ..state_plugins.sim_action_object import SimActionObject

l = logging.getLogger(__name__)


[docs]def ast_weight(ast, memo=None): if isinstance(ast, SimActionObject): ast = ast.ast if not isinstance(ast, claripy.ast.Base): return 0 if memo is None: memo = {} result = memo.get(ast.cache_key, None) if result is not None: return result result = 1 + sum(ast_weight(arg, memo) for arg in ast.args) memo[ast.cache_key] = result return result
[docs]class Suggestions(ExplorationTechnique): """ An exploration technique which analyzes failure cases and logs suggestions for how to mitigate them in future analyses. """
[docs] def __init__(self): super().__init__() self.suggested = set() self.lock = PicklableLock()
[docs] def step(self, simgr, stash="active", **kwargs): simgr.step(stash=stash, **kwargs) for state in simgr.stashes.get("interrupted", []): if id(state) in self.suggested: continue self.suggested.add(id(state)) try: event = state.history.events[-1] except IndexError: continue if event.type != "insufficient_resources": continue with self.lock: # do not interleave logs self.report(state, event) return simgr
[docs] @staticmethod def report(state, event): if once("suggestion_technique"): l.warning("Some of your states hit a resource limit. set logger %s to INFO for suggestions.", __name__) l.info("Create your simulation manager with `suggestions=False` to disable this.") if event.objects["type"] is claripy.errors.ClaripySolverInterruptError: if event.objects["reason"][0] == "timeout": limit_number = state.solver._solver.timeout limit_kind = "hit a solver timeout of %s ms." % limit_number limit_minimum = 60 * 1000 elif event.objects["reason"][0] == "max. memory exceeded": limit_number = state.solver._solver.max_memory limit_kind = "hit a solver memory limit of %s MB." % limit_number limit_minimum = 1024 else: limit_number = None limit_kind = "hit an unknown resource limit. are you manually mucking with the z3 backend?" limit_minimum = None l.info("%s %s", state, limit_kind) if limit_number is not None and limit_minimum is not None and limit_number < limit_minimum: l.info("The minimum recommended limit is %s. Consider turning it up?", limit_minimum) log = [] for history in state.history.lineage: for constraint_event in history.recent_events: if isinstance(constraint_event, SimActionConstraint): constraint = constraint_event.constraint.ast if constraint is history.jump_guard: src_addr = history.jump_source dst_addr = history.jump_target transition_type = "jumping" else: if constraint_event.sim_procedure is None: src_addr = constraint_event.ins_addr else: src_addr = constraint_event.sim_procedure.addr block = state.block(src_addr, num_inst=2) try: dst_addr = block.instruction_addrs[1] except IndexError: dst_addr = history.jump_target transition_type = "stepping" if type(dst_addr) is int: dst_addr = claripy.BVV(dst_addr, state.arch.bits) log.append((constraint, ast_weight(constraint), src_addr, dst_addr, transition_type, len(log))) log.sort(key=lambda t: t[1], reverse=True) max_delta_idx = None if len(log) > 1: deltas = [b[1] - a[1] for a, b in zip(log, log[1:])] max_delta_idx, max_delta = max(enumerate(deltas)) if max_delta < 2**10: max_delta_idx = None if max_delta_idx is None and log and log[0][1] >= 2**16: for max_delta_idx, t in enumerate(log): if t[1] < 2**16: max_delta_idx -= 1 break if max_delta_idx is not None: l.info( "%d constraint%s abnormally complex.", max_delta_idx + 1, "s are" if max_delta_idx > 0 else " is" ) descriptions = [] for t in sorted(log[: max_delta_idx + 1], key=lambda t: t[5]): if max_delta_idx < 10: l.info( "...generated %s from %s to %s", t[4], state.project.loader.describe_addr(t[2]), state.project.loader.describe_addr(t[3].args[0]) if t[3].op == "BVV" else "<symbol>", ) descriptions.extend(state.solver.describe_variables(t[0])) descriptions = set(descriptions) seen_apis = set() for description in descriptions: if description[0] == "api": if description[1] not in seen_apis: seen_apis.add(description[1]) l.info("...using variables originating in %s (hook it?)", description[1]) elif description[0] == "mem": if "##mem" not in seen_apis: seen_apis.add("##mem") l.info("...using unconstrained memory (add ZERO_FILL_UNCONSTRAINED_MEMORY?)") elif description[0] == "reg": if "##reg" not in seen_apis: seen_apis.add("##reg") l.info("...using unconstrained registers (add ZERO_FILL_UNCONSTRAINED_REGISTERS?)") elif description[0] == "file": api = "file##" + description[1] if api not in seen_apis: seen_apis.add(api) l.info("...using variables from file %s", description[1]) else: l.info("...using uncategorized variable %s", description)