Source code for angr.storage.memory_mixins.address_concretization_mixin

from typing import List

import claripy

from . import MemoryMixin
from ... import sim_options as options
from ... import concretization_strategies
from ...sim_state_options import SimStateOptions
from ...state_plugins.inspect import BP_BEFORE, BP_AFTER
from ...errors import SimMergeError, SimUnsatError, SimMemoryAddressError, SimMemoryError
from ...storage import DUMMY_SYMBOLIC_READ_VALUE


[docs]class MultiwriteAnnotation(claripy.Annotation): @property def eliminatable(self): return False @property def relocateable(self): return True
def _multiwrite_filter(mem, ast): # pylint:disable=unused-argument # this is a huge hack, but so is the whole multiwrite crap return any(isinstance(a, MultiwriteAnnotation) for a in ast._uneliminatable_annotations) SimStateOptions.register_option( "symbolic_ip_max_targets", int, default=256, description="The maximum number of concrete addresses a symbolic instruction pointer can be concretized to.", ) SimStateOptions.register_option( "jumptable_symbolic_ip_max_targets", int, default=16384, description="The maximum number of concrete addresses a symbolic instruction pointer " "can be concretized to if it is part of a jump table.", )
[docs]class AddressConcretizationMixin(MemoryMixin): """ The address concretization mixin allows symbolic reads and writes to be handled sanely by dispatching them as a number of conditional concrete reads/writes. It provides a "concretization strategies" interface allowing the process of serializing symbolic addresses into concrete ones to be specified. """
[docs] def __init__(self, read_strategies=None, write_strategies=None, **kwargs): super().__init__(**kwargs) self.read_strategies = read_strategies self.write_strategies = write_strategies
[docs] def set_state(self, state): super().set_state(state) if self.state is not None: if self.read_strategies is None: self._create_default_read_strategies() if self.write_strategies is None: self._create_default_write_strategies()
@MemoryMixin.memo def copy(self, memo): o = super().copy(memo) o.read_strategies = list(self.read_strategies) o.write_strategies = list(self.write_strategies) return o
[docs] def merge(self, others, merge_conditions, common_ancestor=None) -> bool: r = super().merge(others, merge_conditions, common_ancestor=common_ancestor) self.read_strategies = self._merge_strategies(self.read_strategies, *[o.read_strategies for o in others]) self.write_strategies = self._merge_strategies(self.write_strategies, *[o.write_strategies for o in others]) return r
def _create_default_read_strategies(self): """ This function is used to populate `self.read_strategies` if by set-state time none have been provided It uses state options to pick defaults. """ self.read_strategies = [] if options.APPROXIMATE_MEMORY_INDICES in self.state.options: # first, we try to resolve the read address by approximation self.read_strategies.append( concretization_strategies.SimConcretizationStrategyRange(1024, exact=False), ) # then, we try symbolic reads, with a maximum width of a kilobyte self.read_strategies.append(concretization_strategies.SimConcretizationStrategyRange(1024)) if options.CONSERVATIVE_READ_STRATEGY not in self.state.options: # finally, we concretize to any one solution self.read_strategies.append( concretization_strategies.SimConcretizationStrategyAny(), ) def _create_default_write_strategies(self): """ This function is used to populate `self.write_strategies` if by set-state time none have been provided. It uses state options to pick defaults. """ self.write_strategies = [] if options.APPROXIMATE_MEMORY_INDICES in self.state.options: if options.SYMBOLIC_WRITE_ADDRESSES not in self.state.options: # we try to resolve a unique solution by approximation self.write_strategies.append( concretization_strategies.SimConcretizationStrategySingle(exact=False), ) else: # we try a solution range by approximation self.write_strategies.append(concretization_strategies.SimConcretizationStrategyRange(128, exact=False)) if options.SYMBOLIC_WRITE_ADDRESSES in self.state.options: # we try to find a range of values self.write_strategies.append(concretization_strategies.SimConcretizationStrategyRange(128)) else: # we try to find a range of values, but only for ASTs annotated with the multiwrite annotation self.write_strategies.append( concretization_strategies.SimConcretizationStrategyRange(128, filter=_multiwrite_filter) ) # finally, we just grab the maximum solution if options.CONSERVATIVE_WRITE_STRATEGY not in self.state.options: self.write_strategies.append(concretization_strategies.SimConcretizationStrategyMax()) @staticmethod def _merge_strategies(*strategy_lists): """ Utility function for merging. Does the merge operation on lists of strategies """ if len({len(sl) for sl in strategy_lists}) != 1: raise SimMergeError("unable to merge memories with amounts of strategies") merged_strategies = [] for strategies in zip(*strategy_lists): if len({s.__class__ for s in strategies}) != 1: raise SimMergeError("unable to merge memories with different types of strategies") unique = list(set(strategies)) if len(unique) > 1: unique[0].merge(unique[1:]) merged_strategies.append(unique[0]) return merged_strategies def _apply_concretization_strategies(self, addr, strategies, action, condition): """ Applies concretization strategies on the address until one of them succeeds. """ # we try all the strategies in order for s in strategies: # first, we trigger the SimInspect breakpoint and give it a chance to intervene e = addr self.state._inspect( "address_concretization", BP_BEFORE, address_concretization_strategy=s, address_concretization_action=action, address_concretization_memory=self, address_concretization_expr=e, address_concretization_add_constraints=True, ) s = self.state._inspect_getattr("address_concretization_strategy", s) e = self.state._inspect_getattr("address_concretization_expr", addr) # if the breakpoint None'd out the strategy, we skip it if s is None: continue # let's try to apply it! try: a = s.concretize(self, e, extra_constraints=(condition,) if condition is not None else ()) except SimUnsatError: a = None # trigger the AFTER breakpoint and give it a chance to intervene self.state._inspect("address_concretization", BP_AFTER, address_concretization_result=a) a = self.state._inspect_getattr("address_concretization_result", a) # return the result if not None! if a is not None: return a # well, we tried raise SimMemoryAddressError("Unable to concretize address for %s with the provided strategies." % action)
[docs] def concretize_write_addr(self, addr, strategies=None, condition=None): """ Concretizes an address meant for writing. :param addr: An expression for the address. :param strategies: A list of concretization strategies (to override the default). :param condition: Any extra constraints that should be observed when determining address satisfiability :returns: A list of concrete addresses. """ if isinstance(addr, int): return [addr] elif not self.state.solver.symbolic(addr): return [self.state.solver.eval(addr)] strategies = self.write_strategies if strategies is None else strategies return self._apply_concretization_strategies(addr, strategies, "store", condition)
[docs] def concretize_read_addr(self, addr, strategies=None, condition=None): """ Concretizes an address meant for reading. :param addr: An expression for the address. :param strategies: A list of concretization strategies (to override the default). :returns: A list of concrete addresses. """ if isinstance(addr, int): return [addr] elif not self.state.solver.symbolic(addr): return [self.state.solver.eval(addr)] strategies = self.read_strategies if strategies is None else strategies return self._apply_concretization_strategies(addr, strategies, "load", condition)
# # Real shit # @staticmethod def _interleave_ints(addrs: List[int]) -> List[int]: """ Take a list of integers and return a new list of integers where front and back integers interleave. """ lst = [None] * len(addrs) front, back = 0, len(addrs) - 1 i = 0 while front <= back: lst[i] = addrs[front] i += 1 front += 1 if front < back: lst[i] = addrs[back] i += 1 back -= 1 return lst def _load_one_addr(self, concrete_addr: int, trivial: bool, addr, condition, size, read_value=None, **kwargs): if trivial: sub_condition = condition else: sub_condition = addr == concrete_addr if condition is not None: sub_condition = condition & sub_condition sub_value = super().load(concrete_addr, size=size, condition=sub_condition, **kwargs) if read_value is None: return sub_value else: return self.state.solver.If(addr == concrete_addr, sub_value, read_value)
[docs] def load(self, addr, size=None, condition=None, **kwargs): if type(size) is not int: raise TypeError("Size must have been specified as an int before reaching address concretization") # Fast path if type(addr) is int: return self._load_one_addr(addr, True, addr, condition, size, read_value=None, **kwargs) elif not self.state.solver.symbolic(addr): return self._load_one_addr( self.state.solver.eval(addr), True, addr, condition, size, read_value=None, **kwargs ) if self.state.solver.symbolic(addr) and options.AVOID_MULTIVALUED_READS in self.state.options: return self._default_value(None, size, name="symbolic_read_unconstrained", **kwargs) try: concrete_addrs = self._interleave_ints(sorted(self.concretize_read_addr(addr, condition=condition))) except SimMemoryError: if options.CONSERVATIVE_READ_STRATEGY in self.state.options: return self._default_value(None, size, name="symbolic_read_unconstrained", **kwargs) else: raise # quick optimization so as to not involve the solver if not necessary trivial = len(concrete_addrs) == 1 and (addr == concrete_addrs[0]).is_true() if not trivial: # apply the concretization results to the state constraint_options = [addr == concrete_addr for concrete_addr in concrete_addrs] conditional_constraint = self.state.solver.Or(*constraint_options) self._add_constraints(conditional_constraint, condition=condition, **kwargs) # quick optimization to not introduce the DUMMY value if there's only one loop if len(concrete_addrs) == 1: read_value = None else: read_value = DUMMY_SYMBOLIC_READ_VALUE # this is a sentinel value and should never be touched for concrete_addr in concrete_addrs: # perform each of the loads # the implementation of the "fallback" value ought to be implemented above this in the stack!! read_value = self._load_one_addr( concrete_addr, trivial, addr, condition, size, read_value=read_value, **kwargs ) return read_value
def _store_one_addr(self, concrete_addr: int, data, trivial: bool, addr, condition, size, **kwargs): if trivial: sub_condition = condition else: sub_condition = addr == concrete_addr if condition is not None: sub_condition = condition & sub_condition super().store(concrete_addr, data, size=size, condition=sub_condition, **kwargs)
[docs] def store(self, addr, data, size=None, condition=None, **kwargs): # Fast path if type(addr) is int: self._store_one_addr(addr, data, True, addr, condition, size, **kwargs) return elif not self.state.solver.symbolic(addr): self._store_one_addr(self.state.solver.eval(addr), data, True, addr, condition, size, **kwargs) return if self.state.solver.symbolic(addr) and options.AVOID_MULTIVALUED_WRITES in self.state.options: # not completed return try: concrete_addrs = self._interleave_ints(sorted(self.concretize_write_addr(addr))) except SimMemoryError: if options.CONSERVATIVE_WRITE_STRATEGY in self.state.options: return # not completed else: raise # quick optimization so as to not involve the solver if not necessary trivial = len(concrete_addrs) == 1 and (addr == concrete_addrs[0]).is_true() if not trivial: # apply the concretization results to the state constraint_options = [addr == concrete_addr for concrete_addr in concrete_addrs] conditional_constraint = self.state.solver.Or(*constraint_options) self._add_constraints(conditional_constraint, condition=condition, **kwargs) if len(concrete_addrs) == 1: # simple case: avoid conditional write since the address has been concretized to one solution super().store(concrete_addrs[0], data, size=size, **kwargs) return for concrete_addr in concrete_addrs: # perform each of the stores as conditional # the implementation of conditionality must be at the bottom of the stack self._store_one_addr(concrete_addr, data, trivial, addr, condition, size, **kwargs)
[docs] def permissions(self, addr, permissions=None, **kwargs): if type(addr) is int: pass elif getattr(addr, "op", None) == "BVV": addr = addr.args[0] else: raise SimMemoryAddressError("Cannot get/set permissions for a symbolic address") return super().permissions(addr, permissions=permissions, **kwargs)
[docs] def map_region(self, addr, length, permissions, **kwargs): if type(addr) is int: pass elif getattr(addr, "op", None) == "BVV": addr = addr.args[0] else: raise SimMemoryAddressError("Cannot map a region for a symbolic address") return super().map_region(addr, length, permissions, **kwargs)
[docs] def unmap_region(self, addr, length, **kwargs): if type(addr) is int: pass elif getattr(addr, "op", None) == "BVV": addr = addr.args[0] else: raise SimMemoryAddressError("Cannot unmap a region for a symbolic address") return super().unmap_region(addr, length, **kwargs)
[docs] def concrete_load(self, addr, size, *args, **kwargs): if type(addr) is int: pass elif getattr(addr, "op", None) == "BVV": addr = addr.args[0] else: raise SimMemoryAddressError("Cannot unmap a region for a symbolic address") return super().concrete_load(addr, size, *args, **kwargs)