Source code for angr.analyses.variable_recovery.variable_recovery_base

import weakref
from typing import List, Generator, Iterable, Tuple, Union, Set, Optional, Dict, Any, TYPE_CHECKING
import logging
from collections import defaultdict

import claripy
from claripy.annotation import Annotation
from archinfo import Arch
from ailment.expression import BinaryOp, StackBaseOffset

from ...utils.cowdict import DefaultChainMapCOW
from ...engines.light import SpOffset
from ...sim_variable import SimVariable
from ...storage.memory_mixins import MultiValuedMemory
from ..analysis import Analysis
from ..typehoon.typevars import TypeVariables, TypeVariable

if TYPE_CHECKING:
    from angr.storage import SimMemoryObject


l = logging.getLogger(name=__name__)


[docs]def parse_stack_pointer(sp): """ Convert multiple supported forms of stack pointer representations into stack offsets. :param sp: A stack pointer representation. :return: A stack pointer offset. :rtype: int """ if isinstance(sp, int): return sp if isinstance(sp, StackBaseOffset): return sp.offset if isinstance(sp, BinaryOp): op0, op1 = sp.operands off0 = parse_stack_pointer(op0) off1 = parse_stack_pointer(op1) if sp.op == "Sub": return off0 - off1 elif sp.op == "Add": return off0 + off1 raise NotImplementedError("Unsupported stack pointer representation type %s." % type(sp))
[docs]class VariableAnnotation(Annotation): __slots__ = ("addr_and_variables",)
[docs] def __init__(self, addr_and_variables: List[Tuple[int, SimVariable]]): self.addr_and_variables = addr_and_variables
@property def relocatable(self): return True @property def eliminatable(self): return False def __eq__(self, other): if type(other) is VariableAnnotation: return self.addr_and_variables == other.addr_and_variables return False def __hash__(self): return hash(("Va", tuple(self.addr_and_variables))) def __repr__(self): return f"<VariableAnnotation: {self.addr_and_variables}>"
[docs]class VariableRecoveryBase(Analysis): """ The base class for VariableRecovery and VariableRecoveryFast. """
[docs] def __init__(self, func, max_iterations, store_live_variables: bool): self.function = func self.variable_manager = self.kb.variables self._max_iterations = max_iterations self._store_live_variables = store_live_variables self._outstates = {} self._instates: Dict[Any, VariableRecoveryStateBase] = {} self._dominance_frontiers = None
# # Public methods #
[docs] def get_variable_definitions(self, block_addr): """ Get variables that are defined at the specified block. :param int block_addr: Address of the block. :return: A set of variables. """ if block_addr in self._outstates: return self._outstates[block_addr].variables return set()
# # Private methods #
[docs] def initialize_dominance_frontiers(self): # Computer the dominance frontier for each node in the graph df = self.project.analyses.DominanceFrontier(self.function) self._dominance_frontiers = defaultdict(set) for b0, domfront in df.frontiers.items(): for d in domfront: self._dominance_frontiers[d.addr].add(b0.addr)
[docs]class VariableRecoveryStateBase: """ The base abstract state for variable recovery analysis. """ _tops = {}
[docs] def __init__( self, block_addr, analysis, arch, func, stack_region=None, register_region=None, global_region=None, typevars=None, type_constraints=None, delayed_type_constraints=None, stack_offset_typevars=None, project=None, ): self.block_addr = block_addr self._analysis = analysis self.arch: Arch = arch self.function = func self.project = project if stack_region is not None: self.stack_region: MultiValuedMemory = stack_region self.stack_region._phi_maker = self._make_phi_variable else: self.stack_region: MultiValuedMemory = MultiValuedMemory( memory_id="mem", top_func=self.top, phi_maker=self._make_phi_variable, skip_missing_values_during_merging=True, page_kwargs={"mo_cmp": self._mo_cmp}, ) self.stack_region.set_state(self) if register_region is not None: self.register_region: MultiValuedMemory = register_region self.register_region._phi_maker = self._make_phi_variable else: self.register_region: MultiValuedMemory = MultiValuedMemory( memory_id="reg", top_func=self.top, phi_maker=self._make_phi_variable, skip_missing_values_during_merging=True, page_kwargs={"mo_cmp": self._mo_cmp}, ) self.register_region.set_state(self) if global_region is not None: self.global_region: MultiValuedMemory = global_region self.global_region._phi_maker = self._make_phi_variable else: self.global_region: MultiValuedMemory = MultiValuedMemory( memory_id="mem", top_func=self.top, phi_maker=self._make_phi_variable, skip_missing_values_during_merging=True, page_kwargs={"mo_cmp": self._mo_cmp}, ) self.global_region.set_state(self) # Used during merging self.successor_block_addr: Optional[int] = None self.phi_variables: Dict[SimVariable, SimVariable] = {} self.typevars = TypeVariables() if typevars is None else typevars self.type_constraints = set() if type_constraints is None else type_constraints self.delayed_type_constraints = ( DefaultChainMapCOW(set, collapse_threshold=25) if delayed_type_constraints is None else delayed_type_constraints ) self.stack_offset_typevars: Dict[int, TypeVariable] = ( {} if stack_offset_typevars is None else stack_offset_typevars )
def _get_weakref(self): return weakref.proxy(self)
[docs] @staticmethod def top(bits) -> claripy.ast.BV: if bits in VariableRecoveryStateBase._tops: return VariableRecoveryStateBase._tops[bits] r = claripy.BVS("top", bits, explicit_name=True) VariableRecoveryStateBase._tops[bits] = r return r
[docs] @staticmethod def is_top(thing) -> bool: if isinstance(thing, claripy.ast.BV) and thing.op == "BVS" and thing.args[0] == "top": return True return False
[docs] @staticmethod def extract_variables(expr: claripy.ast.Base) -> Generator[Tuple[int, Union[SimVariable, SpOffset]], None, None]: for anno in expr.annotations: if isinstance(anno, VariableAnnotation): yield from anno.addr_and_variables
[docs] @staticmethod def annotate_with_variables( expr: claripy.ast.Base, addr_and_variables: Iterable[Tuple[int, Union[SimVariable, SpOffset]]] ) -> claripy.ast.Base: expr = expr.replace_annotations((VariableAnnotation(list(addr_and_variables)),)) return expr
[docs] def stack_address(self, offset: int) -> claripy.ast.Base: base = claripy.BVS("stack_base", self.arch.bits, explicit_name=True) if offset: return base + offset return base
[docs] @staticmethod def is_stack_address(addr: claripy.ast.Base) -> bool: return "stack_base" in addr.variables
[docs] def is_global_variable_address(self, addr: claripy.ast.Base) -> bool: if addr.op == "BVV": addr_v = addr._model_concrete.value # make sure it is within a mapped region obj = self.project.loader.find_object_containing(addr_v) if obj is not None: return True return False
[docs] @staticmethod def extract_stack_offset_from_addr(addr: claripy.ast.Base) -> claripy.ast.Base: r = None if addr.op == "BVS": r = claripy.BVV(0, addr.size()) elif addr.op == "BVV": r = addr elif addr.op == "__add__": r = sum(VariableRecoveryStateBase.extract_stack_offset_from_addr(arg) for arg in addr.args) elif addr.op == "__sub__": r1 = VariableRecoveryStateBase.extract_stack_offset_from_addr(addr.args[0]) r2 = VariableRecoveryStateBase.extract_stack_offset_from_addr(addr.args[1]) r = r1 - r2 else: # NOTE: The original code here didn't support mul or # anything like that, so let's specify it as 0 r = claripy.BVV(0, addr.size()) return r
[docs] def get_stack_offset(self, addr: claripy.ast.Base) -> Optional[int]: if "stack_base" in addr.variables: r = VariableRecoveryStateBase.extract_stack_offset_from_addr(addr) # extract_stack_offset_from_addr should ensure that r is a BVV assert r.concrete val = r._model_concrete.value # convert it to a signed integer if val >= 2 ** (self.arch.bits - 1): return val - 2**self.arch.bits if val < -(2 ** (self.arch.bits - 1)): return 2**self.arch.bits + val return val return None
[docs] def stack_addr_from_offset(self, offset: int) -> int: if self.arch.bits == 32: base = 0x7FFF_FE00 mask = 0xFFFF_FFFF elif self.arch.bits == 64: base = 0x7F_FFFF_FFFE_0000 mask = 0xFFFF_FFFF_FFFF_FFFF else: raise RuntimeError("Unsupported bits %d" % self.arch.bits) return (offset + base) & mask
@property def func_addr(self): return self.function.addr @property def dominance_frontiers(self): return self._analysis._dominance_frontiers @property def variable_manager(self): return self._analysis.variable_manager @property def variables(self): for ro in self.stack_region: yield from ro.internal_objects for ro in self.register_region: yield from ro.internal_objects
[docs] def get_variable_definitions(self, block_addr): """ Get variables that are defined at the specified block. :param int block_addr: Address of the block. :return: A set of variables. """ return self._analysis.get_variable_definitions(block_addr)
[docs] def add_type_constraint(self, constraint): """ Add a new type constraint. :param constraint: :return: """ self.type_constraints.add(constraint)
[docs] def downsize(self) -> None: """ Remove unnecessary members. :return: None """ self.type_constraints = set()
[docs] @staticmethod def downsize_region(region: MultiValuedMemory) -> MultiValuedMemory: """ Get rid of unnecessary references in region so that it won't avoid garbage collection on those referenced objects. :param region: A MultiValuedMemory region. :return: None """ region._phi_maker = None return region
# # Private methods # @staticmethod def _mo_cmp( mos_self: Set["SimMemoryObject"], mos_other: Set["SimMemoryObject"], addr: int, size: int ): # pylint:disable=unused-argument # comparing bytes from two sets of memory objects # we don't need to resort to byte-level comparison. object-level is good enough. return mos_self == mos_other def _make_phi_variable(self, values: Set[claripy.ast.Base]) -> Optional[claripy.ast.Base]: # we only create a new phi variable if the there is at least one variable involved variables = set() bits: Optional[int] = None for v in values: bits = v.size() for _, var in self.extract_variables(v): variables.add(var) if len(variables) <= 1: return None assert self.successor_block_addr is not None # find existing phi variables phi_var = self.variable_manager[self.function.addr].make_phi_node(self.successor_block_addr, *variables) for var in variables: if var is not phi_var: self.phi_variables[var] = phi_var r = self.top(bits) r = self.annotate_with_variables(r, [(0, phi_var)]) return r def _phi_node_contains(self, phi_variable, variable): """ Checks if `phi_variable` is a phi variable, and if it contains `variable` as a sub-variable. :param phi_variable: :param variable: :return: """ if self.variable_manager[self.function.addr].is_phi_variable(phi_variable): return variable in self.variable_manager[self.function.addr].get_phi_subvariables(phi_variable) return False