Source code for angr.engines.successors

import logging

import claripy

from archinfo.arch_soot import ArchSoot


l = logging.getLogger(name=__name__)


[docs]class SimSuccessors: """ This class serves as a categorization of all the kinds of result states that can come from a SimEngine run. :ivar int addr: The address at which execution is taking place, as a python int :ivar initial_state: The initial state for which execution produced these successors :ivar engine: The engine that produced these successors :ivar sort: A string identifying the type of engine that produced these successors :ivar bool processed: Whether or not the processing succeeded :ivar str description: A textual description of the execution step The successor states produced by this run are categorized into several lists: :ivar dict artifacts: Any analysis byproducts (for example, an IRSB) that were produced during execution :ivar successors: The "normal" successors. IP may be symbolic, but must have reasonable number of solutions :ivar unsat_successors: Any successor which is unsatisfiable after its guard condition is added. :ivar all_successors: successors + unsat_successors :ivar flat_successors: The normal successors, but any symbolic IPs have been concretized. There is one state in this list for each possible value an IP may be concretized to for each successor state. :ivar unconstrained_successors: Any state for which during the flattening process we find too many solutions. A more detailed description of the successor lists may be found here: https://docs.angr.io/core-concepts/simulation#simsuccessors """
[docs] def __init__(self, addr, initial_state): self.addr = addr self.initial_state = initial_state self.successors = [] self.all_successors = [] self.flat_successors = [] self.unsat_successors = [] self.unconstrained_successors = [] # the engine that should process or did process this request self.engine = None self.processed = False self.description = "SimSuccessors" self.sort = None self.artifacts = {}
[docs] @classmethod def failure(cls): return cls(None, None)
def __repr__(self): if self.processed: successor_strings = [] if len(self.flat_successors) != 0: successor_strings.append(f"{len(self.flat_successors)} sat") if len(self.unsat_successors) != 0: successor_strings.append(f"{len(self.unsat_successors)} unsat") if len(self.unconstrained_successors) != 0: successor_strings.append(f"{len(self.unconstrained_successors)} unconstrained") if len(successor_strings) == 0: result = "empty" else: result = " ".join(successor_strings) else: result = "failure" if isinstance(self.addr, int): return f"<{self.description} from {self.addr:#x}: {result}>" else: return f"<{self.description} from {self.addr}: {result}>" @property def is_empty(self): return ( not self.all_successors and not self.flat_successors and not self.unsat_successors and not self.unconstrained_successors ) def __getitem__(self, k): return self.flat_successors[k] def __iter__(self): return iter(self.flat_successors)
[docs] def add_successor( self, state, target, guard, jumpkind, add_guard=True, exit_stmt_idx=None, exit_ins_addr=None, source=None ): """ Add a successor state of the SimRun. This procedure stores method parameters into state.scratch, does some housekeeping, and calls out to helper functions to prepare the state and categorize it into the appropriate successor lists. :param SimState state: The successor state. :param target: The target (of the jump/call/ret). :param guard: The guard expression. :param str jumpkind: The jumpkind (call, ret, jump, or whatnot). :param bool add_guard: Whether to add the guard constraint (default: True). :param int exit_stmt_idx: The ID of the exit statement, an integer by default. 'default' stands for the default exit, and None means it's not from a statement (for example, from a SimProcedure). :param int exit_ins_addr: The instruction pointer of this exit, which is an integer by default. :param int source: The source of the jump (i.e., the address of the basic block). """ # First, trigger the SimInspect breakpoint state._inspect("exit", BP_BEFORE, exit_target=target, exit_guard=guard, exit_jumpkind=jumpkind) state.scratch.target = state._inspect_getattr("exit_target", target) state.scratch.guard = state._inspect_getattr("exit_guard", guard) state.history.jumpkind = state._inspect_getattr("exit_jumpkind", jumpkind) state.history.jump_target = state.scratch.target state.history.jump_guard = state.scratch.guard # track some vex-specific stuff here for now state.scratch.source = source if source is not None else self.addr state.scratch.exit_stmt_idx = exit_stmt_idx state.scratch.exit_ins_addr = exit_ins_addr state.history.jump_source = state.scratch.exit_ins_addr self._preprocess_successor(state, add_guard=add_guard) if state.history.jumpkind == "Ijk_SigFPE_IntDiv" and o.PRODUCE_ZERODIV_SUCCESSORS not in state.options: return self._categorize_successor(state) state._inspect("exit", BP_AFTER, exit_target=target, exit_guard=guard, exit_jumpkind=jumpkind) if state.supports_inspect: state.inspect.downsize()
# # Successor management # def _preprocess_successor(self, state, add_guard=True): # pylint:disable=unused-argument """ Preprocesses the successor state. :param state: the successor state """ # Next, simplify what needs to be simplified if o.SIMPLIFY_EXIT_STATE in state.options: state.solver.simplify() if o.SIMPLIFY_EXIT_GUARD in state.options: state.scratch.guard = state.solver.simplify(state.scratch.guard) if o.SIMPLIFY_EXIT_TARGET in state.options: state.scratch.target = state.solver.simplify(state.scratch.target) # unwrap stuff from SimActionObjects state.scratch.target = _raw_ast(state.scratch.target) state.scratch.guard = _raw_ast(state.scratch.guard) # apply the guard constraint and new program counter to the state if add_guard: state.add_constraints(state.scratch.guard) # trigger inspect breakpoints here since this statement technically shows up in the IRSB as the "next" state.regs.ip = state.scratch.target # For architectures with no stack pointer, we can't manage a callstack. This has the side effect of breaking # SimProcedures that call out to binary code self.call. if self.initial_state.arch.sp_offset is not None and not isinstance(state.arch, ArchSoot): self._manage_callstack(state) if len(self.successors) != 0: # This is a fork! state._inspect("fork", BP_AFTER) # clean up the state state.options.discard(o.AST_DEPS) state.options.discard(o.AUTO_REFS) @staticmethod def _manage_callstack(state): # condition for call = Ijk_Call # condition for ret = stack pointer drops below call point if state.history.jumpkind == "Ijk_Call": state._inspect("call", BP_BEFORE, function_address=state.regs._ip) new_func_addr = state._inspect_getattr("function_address", None) if new_func_addr is not None and not claripy.is_true(new_func_addr == state.regs._ip): state.regs._ip = new_func_addr try: if state.arch.call_pushes_ret: ret_addr = state.mem[state.regs._sp].long.concrete else: ret_addr = state.solver.eval(state.regs._lr) except (SimSolverModeError, SimUnsatError, AttributeError, KeyError): # Use the address for UnresolvableCallTarget instead. ret_addr = state.project.simos.unresolvable_call_target try: state_addr = state.addr except (SimValueError, SimSolverModeError): state_addr = None try: stack_ptr = state.solver.eval(state.regs._sp) except (SimSolverModeError, SimUnsatError): stack_ptr = 0 new_frame = CallStack( call_site_addr=state.history.recent_bbl_addrs[-1], func_addr=state_addr, stack_ptr=stack_ptr, ret_addr=ret_addr, jumpkind="Ijk_Call", ) state.callstack.push(new_frame) state._inspect("call", BP_AFTER) else: while True: cur_sp = state.solver.max(state.regs._sp) if state.has_plugin("symbolizer") else state.regs._sp if not state.solver.is_true(cur_sp > state.callstack.top.stack_ptr): break state._inspect("return", BP_BEFORE, function_address=state.callstack.top.func_addr) state.callstack.pop() state._inspect("return", BP_AFTER) if ( not state.arch.call_pushes_ret and claripy.is_true(state.regs._ip == state.callstack.ret_addr) and claripy.is_true(state.regs._sp == state.callstack.stack_ptr) ): # very weird edge case that's not actually weird or on the edge at all: # if we use a link register for the return address, the stack pointer will be the same # before and after the call. therefore we have to check for equality with the marker # along with this other check with the instruction pointer to guess whether it's time # to pop a callframe. Still better than relying on Ijk_Ret. state._inspect("return", BP_BEFORE, function_address=state.callstack.top.func_addr) state.callstack.pop() state._inspect("return", BP_AFTER) def _categorize_successor(self, state): """ Append state into successor lists. :param state: a SimState instance :param target: The target (of the jump/call/ret) :return: The state """ self.all_successors.append(state) target = state.scratch.target # categorize the state if o.APPROXIMATE_GUARDS in state.options and state.solver.is_false(state.scratch.guard, exact=False): if o.VALIDATE_APPROXIMATIONS in state.options: if state.satisfiable(): raise Exception("WTF") self.unsat_successors.append(state) elif o.APPROXIMATE_SATISFIABILITY in state.options and not state.solver.satisfiable(exact=False): if o.VALIDATE_APPROXIMATIONS in state.options: if state.solver.satisfiable(): raise Exception("WTF") self.unsat_successors.append(state) elif not state.scratch.guard.symbolic and state.solver.is_false(state.scratch.guard): self.unsat_successors.append(state) elif o.LAZY_SOLVES not in state.options and not state.satisfiable(): self.unsat_successors.append(state) elif o.NO_SYMBOLIC_JUMP_RESOLUTION in state.options and state.solver.symbolic(target): self.unconstrained_successors.append(state) elif not state.solver.symbolic(target) and not state.history.jumpkind.startswith("Ijk_Sys"): # a successor with a concrete IP, and it's not a syscall self.successors.append(state) self.flat_successors.append(state) elif state.history.jumpkind.startswith("Ijk_Sys"): # syscall self.successors.append(state) # Misuse the ip_at_syscall register to save the return address for this syscall # state.ip *might be* changed to be the real address of syscall SimProcedures by syscall handling code in # angr state.regs.ip_at_syscall = state.ip try: symbolic_syscall_num, concrete_syscall_nums = self._resolve_syscall(state) if concrete_syscall_nums is not None: for i, n in enumerate(concrete_syscall_nums): split_state = state if i == len(concrete_syscall_nums) - 1 else state.copy() split_state.add_constraints(symbolic_syscall_num == n) if split_state.supports_inspect: split_state.inspect.downsize() self._fix_syscall_ip(split_state) self.flat_successors.append(split_state) else: # We cannot resolve the syscall number # However, we still put it to the flat_successors list, and angr.SimOS.handle_syscall will pick it # up, and create a "unknown syscall" stub for it. self._fix_syscall_ip(state) self.flat_successors.append(state) except (AngrUnsupportedSyscallError, AngrSyscallError): self.unsat_successors.append(state) else: # a successor with a symbolic IP _max_targets = state.options.symbolic_ip_max_targets _max_jumptable_targets = state.options.jumptable_symbolic_ip_max_targets try: skip_max_targets_warning = False if o.NO_IP_CONCRETIZATION in state.options: # Don't try to concretize the IP cond_and_targets = [(claripy.true, target)] max_targets = 0 skip_max_targets_warning = True # don't warn elif o.KEEP_IP_SYMBOLIC in state.options: s = claripy.Solver() addrs = s.eval(target, _max_targets + 1, extra_constraints=tuple(state.ip_constraints)) if len(addrs) > _max_targets: # It is not a library l.debug("It is not a Library") addrs = state.solver.eval_upto(target, _max_targets + 1) l.debug("addrs :%s", addrs) cond_and_targets = [(target == addr, addr) for addr in addrs] max_targets = _max_targets else: cond_and_targets = self._eval_target_jumptable(state, target, _max_jumptable_targets + 1) if cond_and_targets is None: # Fallback to the traditional and slow method cond_and_targets = self._eval_target_brutal(state, target, _max_targets + 1) max_targets = _max_targets else: max_targets = _max_jumptable_targets if len(cond_and_targets) > max_targets: if not skip_max_targets_warning: l.warning( "Exit state has over %d possible solutions. Likely unconstrained; skipping. %s", max_targets, target.shallow_repr(), ) self.unconstrained_successors.append(state) else: for cond, a in cond_and_targets: split_state = state.copy() if o.KEEP_IP_SYMBOLIC in split_state.options: split_state.regs.ip = target else: split_state.add_constraints(cond, action=True) split_state.regs.ip = a if split_state.supports_inspect: split_state.inspect.downsize() self.flat_successors.append(split_state) self.successors.append(state) except SimSolverModeError: self.unsat_successors.append(state) return state # misc stuff @staticmethod def _resolve_syscall(state): if state.os_name in SYSCALL_CC[state.arch.name]: cc = SYSCALL_CC[state.arch.name][state.os_name](state.arch) else: # Use the default syscall calling convention - it may bring problems cc = SYSCALL_CC[state.arch.name]["default"](state.arch) syscall_num = cc.syscall_num(state) if syscall_num.symbolic and o.NO_SYMBOLIC_SYSCALL_RESOLUTION in state.options: l.debug("Not resolving symbolic syscall number") return syscall_num, None maximum = state.posix.maximum_symbolic_syscalls possible = state.solver.eval_upto(syscall_num, maximum + 1) if len(possible) == 0: raise AngrUnsupportedSyscallError("Unsatisfiable state attempting to do a syscall") if len(possible) > maximum: l.warning("Too many possible syscalls. Concretizing to 1.") possible = possible[:1] l.debug("Possible syscall values: %s", possible) return syscall_num, possible @staticmethod def _fix_syscall_ip(state): """ Resolve syscall information from the state, get the IP address of the syscall SimProcedure, and set the IP of the state accordingly. Don't do anything if the resolution fails. :param SimState state: the program state. :return: None """ stub = state.project.simos.syscall(state, allow_unsupported=True) if stub: # can be None if simos is not a subclass of SimUserspace state.ip = stub.addr # fix the IP def _finalize(self): """ Finalizes the request. """ if len(self.all_successors) == 0: return # do some cleanup if o.DOWNSIZE_Z3 in self.all_successors[0].options: for s in self.all_successors: s.downsize() # record if the exit is unavoidable if len(self.flat_successors) == 1 and len(self.unconstrained_successors) == 0: self.flat_successors[0].scratch.avoidable = False @staticmethod def _eval_target_jumptable(state, ip, limit): """ A *very* fast method to evaluate symbolic jump targets if they are a) concrete targets, or b) targets coming from jump tables. :param state: A SimState instance. :param ip: The AST of the instruction pointer to evaluate. :param limit: The maximum number of concrete IPs. :return: A list of conditions and the corresponding concrete IPs, or None which indicates fallback is necessary. :rtype: list or None """ if ip.symbolic is False: return [(claripy.ast.bool.true, ip)] # concrete # Detect whether ip is in the form of "if a == 1 then addr_0 else if a == 2 then addr_1 else ..." cond_and_targets = [] # tuple of (condition, target) ip_ = ip # Handle the outer Reverse outer_reverse = False if ip_.op == "Reverse": ip_ = ip_.args[0] outer_reverse = True fallback = False target_variable = None concretes = set() reached_sentinel = False for cond, target in claripy.reverse_ite_cases(ip_): # We must fully unpack the entire AST to make sure it indeed complies with the form above if reached_sentinel: # We should not have any other value beyond the sentinel - maybe one of the possible targets happens to # be the same as the sentinel value? fallback = True break if target.symbolic is False and state.solver.eval(target) == DUMMY_SYMBOLIC_READ_VALUE: # Ignore the dummy value, which acts as the sentinel of this ITE tree reached_sentinel = True continue if cond.op != "__eq__": # We only support equivalence right now. Fallback fallback = True break if cond.args[0].symbolic is True and cond.args[1].symbolic is False: variable, value = cond.args elif cond.args[0].symbolic is False and cond.args[1].symbolic is True: value, variable = cond.args else: # Cannot determine variable and value. Fallback fallback = True break if target_variable is None: target_variable = variable elif target_variable is not variable: # it's checking a different variable. Fallback fallback = True break # Make sure the conditions are mutually exclusive value_concrete = state.solver.eval(value) if value_concrete in concretes: # oops... the conditions are not mutually exclusive fallback = True break concretes.add(value_concrete) if target.symbolic is True: # Cannot handle symbolic targets. Fallback fallback = True break cond_and_targets.append((cond, target if not outer_reverse else state.solver.Reverse(target))) if reached_sentinel is False: # huh? fallback = True if fallback: return None else: return cond_and_targets[:limit] @staticmethod def _eval_target_brutal(state, ip, limit): """ The traditional way of evaluating symbolic jump targets. :param state: A SimState instance. :param ip: The AST of the instruction pointer to evaluate. :param limit: The maximum number of concrete IPs. :return: A list of conditions and the corresponding concrete IPs. :rtype: list """ addrs = state.solver.eval_upto(ip, limit) return [(ip == addr, addr) for addr in addrs]
# pylint: disable=wrong-import-position from ..state_plugins.inspect import BP_BEFORE, BP_AFTER from ..errors import SimSolverModeError, AngrUnsupportedSyscallError, AngrSyscallError, SimValueError, SimUnsatError from ..calling_conventions import SYSCALL_CC from ..state_plugins.sim_action_object import _raw_ast from ..state_plugins.callstack import CallStack from ..storage import DUMMY_SYMBOLIC_READ_VALUE from .. import sim_options as o