Source code for angr.knowledge_plugins.key_definitions.live_definitions

from typing import Optional, Iterable, Dict, Set, Generator, Tuple, Union, Any, TYPE_CHECKING, overload, Type
import weakref
import logging
from enum import Enum, auto

from collections import defaultdict

import claripy
from claripy.annotation import Annotation
import archinfo

from angr.misc.ux import deprecated
from angr.errors import SimMemoryMissingError, SimMemoryError
from angr.storage.memory_mixins import MultiValuedMemory
from angr.storage.memory_mixins.paged_memory.pages.multi_values import MultiValues
from angr.engines.light import SpOffset
from angr.code_location import CodeLocation, ExternalCodeLocation
from .atoms import Atom, Register, MemoryLocation, Tmp, ConstantSrc
from .definition import Definition, Tag
from .heap_address import HeapAddress
from .uses import Uses

if TYPE_CHECKING:
    from angr.project import Project
    from angr.storage import SimMemoryObject


l = logging.getLogger(name=__name__)


[docs]class DerefSize(Enum): """ An enum for specialized kinds of dereferences NULL_TERMINATE - Dereference until the first byte which could be a literal null. Return a value including the terminator. """ NULL_TERMINATE = auto()
# # Annotations #
[docs]class DefinitionAnnotation(Annotation): """ An annotation that attaches a `Definition` to an AST. """ __slots__ = ("definition", "_hash")
[docs] def __init__(self, definition): super().__init__() self.definition = definition self._hash = hash((DefinitionAnnotation, self.definition))
@property def relocatable(self): return True @property def eliminatable(self): return False def __hash__(self): return self._hash def __eq__(self, other: "object"): if type(other) is DefinitionAnnotation: return self.definition == other.definition else: return False def __repr__(self): return f"<{self.__class__.__name__}({repr(self.definition)})"
# pylint: disable=W1116
[docs]class LiveDefinitions: """ A LiveDefinitions instance contains definitions and uses for register, stack, memory, and temporary variables, uncovered during the analysis. """ INITIAL_SP_32BIT = 0x7FFF0000 INITIAL_SP_64BIT = 0x7FFFFFFF0000 _tops = {} __slots__ = ( "project", "arch", "track_tmps", "registers", "stack", "heap", "memory", "tmps", "others", "other_uses", "register_uses", "stack_uses", "heap_uses", "memory_uses", "uses_by_codeloc", "tmp_uses", "_canonical_size", "__weakref__", )
[docs] def __init__( self, arch: archinfo.Arch, track_tmps: bool = False, canonical_size=8, registers=None, stack=None, memory=None, heap=None, tmps=None, others=None, register_uses=None, stack_uses=None, heap_uses=None, memory_uses=None, tmp_uses=None, other_uses=None, element_limit=5, ): self.project: Optional["Project"] = None self.arch = arch self.track_tmps = track_tmps self._canonical_size: int = canonical_size # TODO: Drop canonical_size self.registers: MultiValuedMemory = ( MultiValuedMemory( memory_id="reg", top_func=self.top, is_top_func=self.is_top, skip_missing_values_during_merging=False, page_kwargs={"mo_cmp": self._mo_cmp}, endness=self.arch.register_endness, element_limit=element_limit, ) if registers is None else registers ) self.stack: MultiValuedMemory = ( MultiValuedMemory( memory_id="mem", top_func=self.top, is_top_func=self.is_top, skip_missing_values_during_merging=False, page_kwargs={"mo_cmp": self._mo_cmp}, element_limit=element_limit, ) if stack is None else stack ) self.memory: MultiValuedMemory = ( MultiValuedMemory( memory_id="mem", top_func=self.top, is_top_func=self.is_top, skip_missing_values_during_merging=False, page_kwargs={"mo_cmp": self._mo_cmp}, element_limit=element_limit, ) if memory is None else memory ) self.heap: MultiValuedMemory = ( MultiValuedMemory( memory_id="mem", top_func=self.top, is_top_func=self.is_top, skip_missing_values_during_merging=False, page_kwargs={"mo_cmp": self._mo_cmp}, element_limit=element_limit, ) if heap is None else heap ) self.tmps: Dict[int, Set[Definition]] = {} if tmps is None else tmps self.others: Dict[Atom, MultiValues] = others if others is not None else {} # set state self.registers.set_state(self) self.stack.set_state(self) self.memory.set_state(self) self.heap.set_state(self) self.register_uses = Uses() if register_uses is None else register_uses self.stack_uses = Uses() if stack_uses is None else stack_uses self.heap_uses = Uses() if heap_uses is None else heap_uses self.memory_uses = Uses() if memory_uses is None else memory_uses self.tmp_uses: Dict[int, Set[CodeLocation]] = defaultdict(set) if tmp_uses is None else tmp_uses self.other_uses = Uses() if other_uses is None else other_uses self.uses_by_codeloc: Dict[CodeLocation, Set[Definition]] = defaultdict(set)
@property @deprecated("registers") def register_definitions(self) -> MultiValuedMemory: return self.registers @property @deprecated("stack") def stack_definitions(self) -> MultiValuedMemory: return self.stack @property @deprecated("memory") def memory_definitions(self) -> MultiValuedMemory: return self.memory @property @deprecated("heap") def heap_definitions(self) -> MultiValuedMemory: return self.heap def __repr__(self): ctnt = "LiveDefs" if self.tmps: ctnt += ", %d tmpdefs" % len(self.tmps) return "<%s>" % ctnt
[docs] def copy(self, discard_tmpdefs=False) -> "LiveDefinitions": rd = LiveDefinitions( self.arch, track_tmps=self.track_tmps, canonical_size=self._canonical_size, registers=self.registers.copy(), stack=self.stack.copy(), heap=self.heap.copy(), memory=self.memory.copy(), tmps=self.tmps.copy() if not discard_tmpdefs else None, others=dict(self.others), register_uses=self.register_uses.copy(), stack_uses=self.stack_uses.copy(), heap_uses=self.heap_uses.copy(), memory_uses=self.memory_uses.copy(), tmp_uses=self.tmp_uses.copy() if not discard_tmpdefs else None, other_uses=self.other_uses.copy(), ) rd.project = self.project return rd
[docs] def reset_uses(self): self.stack_uses = Uses() self.register_uses = Uses() self.memory_uses = Uses() self.heap_uses = Uses() self.other_uses = Uses()
def _get_weakref(self): return weakref.proxy(self) @staticmethod def _mo_cmp( mo_self: Union["SimMemoryObject", Set["SimMemoryObject"]], mo_other: Union["SimMemoryObject", Set["SimMemoryObject"]], addr: int, size: int, ): # pylint:disable=unused-argument # comparing bytes from two sets of memory objects # we don't need to resort to byte-level comparison. object-level is good enough. if type(mo_self) is set and type(mo_other) is set and len(mo_self) == 1 and len(mo_other) == 1: a = next(iter(mo_self)) b = next(iter(mo_other)) return a.object is b.object and a.endness == b.endness values_self = set() values_other = set() if type(mo_self) is set: for mo in mo_self: values_self.add(mo.object) else: values_self.add(mo_self) if type(mo_other) is set: for mo in mo_other: values_other.add(mo.object) else: values_other.add(mo_other) return values_self == values_other
[docs] @staticmethod def top(bits: int): """ Get a TOP value. :param bits: Width of the TOP value (in bits). :return: The TOP value. """ if bits in LiveDefinitions._tops: return LiveDefinitions._tops[bits] r = claripy.BVS("TOP", bits, explicit_name=True) LiveDefinitions._tops[bits] = r return r
[docs] @staticmethod def is_top(expr) -> bool: """ Check if the given expression is a TOP value. :param expr: The given expression. :return: True if the expression is TOP, False otherwise. """ if isinstance(expr, claripy.ast.Base): if expr.op == "BVS" and expr.args[0] == "TOP": return True if "TOP" in expr.variables: return True return False
[docs] def stack_address(self, offset: int) -> Optional[claripy.ast.bv.BV]: base = claripy.BVS("stack_base", self.arch.bits, explicit_name=True) if offset: return base + offset return base
[docs] @staticmethod def is_stack_address(addr: claripy.ast.Base) -> bool: return "stack_base" in addr.variables
[docs] @staticmethod def get_stack_offset(addr: claripy.ast.Base, had_stack_base=False) -> Optional[int]: if had_stack_base and addr.op == "BVV": assert isinstance(addr, claripy.ast.BV) return addr.concrete_value if "TOP" in addr.variables: return None if "stack_base" in addr.variables: if addr.op == "BVS": return 0 elif addr.op == "__add__": if len(addr.args) == 2: off0 = LiveDefinitions.get_stack_offset(addr.args[0], had_stack_base=True) off1 = LiveDefinitions.get_stack_offset(addr.args[1], had_stack_base=True) if off0 is not None and off1 is not None: return off0 + off1 elif len(addr.args) == 1: return 0 elif addr.op == "__sub__" and len(addr.args) == 2: off0 = LiveDefinitions.get_stack_offset(addr.args[0], had_stack_base=True) off1 = LiveDefinitions.get_stack_offset(addr.args[1], had_stack_base=True) if off0 is not None and off1 is not None: return off0 - off1 return None
[docs] @staticmethod def annotate_with_def(symvar: claripy.ast.BV, definition: Definition) -> claripy.ast.BV: """ :param symvar: :param definition: :return: """ # strip existing definition annotations annotations_to_remove = [] for anno in symvar.annotations: if isinstance(anno, DefinitionAnnotation): annotations_to_remove.append(anno) # annotate with the new definition annotation return symvar.annotate(DefinitionAnnotation(definition), remove_annotations=annotations_to_remove)
[docs] @staticmethod def extract_defs(symvar: claripy.ast.Base) -> Generator[Definition, None, None]: for anno in symvar.annotations: if isinstance(anno, DefinitionAnnotation): yield anno.definition
[docs] @staticmethod def extract_defs_from_annotations(annos: Iterable["Annotation"]) -> Set[Definition]: defs = set() for anno in annos: if isinstance(anno, DefinitionAnnotation): defs.add(anno.definition) return defs
[docs] @staticmethod def extract_defs_from_mv(mv: MultiValues) -> Generator[Definition, None, None]: for vs in mv.values(): for v in vs: yield from LiveDefinitions.extract_defs(v)
[docs] def get_sp(self) -> int: """ Return the concrete value contained by the stack pointer. """ assert self.arch.sp_offset is not None sp_values: MultiValues = self.registers.load(self.arch.sp_offset, size=self.arch.bytes) sp_v = sp_values.one_value() if sp_v is None: # multiple values of sp exists. still return a value if there is only one value (maybe with different # definitions) values = [v for v in next(iter(sp_values.values())) if self.get_stack_offset(v) is not None] assert len({self.get_stack_offset(v) for v in values}) == 1 result = self.get_stack_address(values[0]) assert result is not None return result result = self.get_stack_address(sp_v) assert result is not None return result
[docs] def get_sp_offset(self) -> Optional[int]: """ Return the offset of the stack pointer. """ assert self.arch.sp_offset is not None sp_values: MultiValues = self.registers.load(self.arch.sp_offset, size=self.arch.bytes) sp_v = sp_values.one_value() if sp_v is None: values = [v for v in next(iter(sp_values.values())) if self.get_stack_offset(v) is not None] assert len({self.get_stack_offset(v) for v in values}) == 1 result = self.get_stack_offset(values[0]) return result result = self.get_stack_offset(sp_v) return result
[docs] def get_stack_address(self, offset: claripy.ast.Base) -> Optional[int]: offset_int = self.get_stack_offset(offset) if offset_int is None: return None return self.stack_offset_to_stack_addr(offset_int)
[docs] def stack_offset_to_stack_addr(self, offset) -> int: if self.arch.bits == 32: base_v = self.INITIAL_SP_32BIT mask = 0xFFFF_FFFF elif self.arch.bits == 64: base_v = self.INITIAL_SP_64BIT mask = 0xFFFF_FFFF_FFFF_FFFF else: raise ValueError("Unsupported architecture word size %d" % self.arch.bits) return (base_v + offset) & mask
[docs] def merge(self, *others: "LiveDefinitions") -> Tuple["LiveDefinitions", bool]: state = self.copy() merge_occurred = state.registers.merge([other.registers for other in others], None) merge_occurred |= state.heap.merge([other.heap for other in others], None) merge_occurred |= state.memory.merge([other.memory for other in others], None) merge_occurred |= state.stack.merge([other.stack for other in others], None) for other in others: for k in other.others: if k in self.others: thing = self.others[k].merge(other.others[k]) if thing != self.others[k]: merge_occurred = True self.others[k] = thing else: self.others[k] = other.others[k] merge_occurred = True merge_occurred |= state.register_uses.merge(other.register_uses) merge_occurred |= state.stack_uses.merge(other.stack_uses) merge_occurred |= state.heap_uses.merge(other.heap_uses) merge_occurred |= state.memory_uses.merge(other.memory_uses) merge_occurred |= state.other_uses.merge(other.other_uses) return state, merge_occurred
[docs] def compare(self, other: "LiveDefinitions") -> bool: r0 = self.registers.compare(other.registers) if r0 is False: return False r1 = self.heap.compare(other.heap) if r1 is False: return False r2 = self.memory.compare(other.memory) if r2 is False: return False r3 = self.stack.compare(other.stack) if r3 is False: return False r4 = True for k in other.others: if k in self.others: thing = self.others[k].merge(other.others[k]) if thing != self.others[k]: r4 = False break else: r4 = False break return r0 and r1 and r2 and r3 and r4
[docs] def kill_definitions(self, atom: Atom) -> None: """ Overwrite existing definitions w.r.t 'atom' with a dummy definition instance. A dummy definition will not be removed during simplification. :param atom: :return: None """ if isinstance(atom, Register): self.registers.erase(atom.reg_offset, size=atom.size) elif isinstance(atom, MemoryLocation): if isinstance(atom.addr, SpOffset): if atom.addr.offset is not None: stack_addr = self.stack_offset_to_stack_addr(atom.addr.offset) self.stack.erase(stack_addr, size=atom.size) else: l.warning("Skip stack storing since the stack offset is None.") elif isinstance(atom.addr, HeapAddress): self.heap.erase(atom.addr.value, size=atom.size) elif isinstance(atom.addr, int): self.memory.erase(atom.addr, size=atom.size) elif isinstance(atom.addr, claripy.ast.Base): if atom.addr.concrete: self.memory.erase(atom.addr.concrete_value, size=atom.size) elif self.is_stack_address(atom.addr): stack_addr = self.get_stack_address(atom.addr) if stack_addr is None: l.warning( "Failed to convert stack address %s to a concrete stack address. Skip the store.", atom.addr ) else: self.stack.erase(stack_addr, size=atom.size) else: return else: return elif isinstance(atom, Tmp): del self.tmps[atom.tmp_idx] else: del self.others[atom]
[docs] def kill_and_add_definition( self, atom: Atom, code_loc: CodeLocation, data: MultiValues, dummy=False, tags: Optional[Set[Tag]] = None, endness=None, annotated=False, ) -> Optional[MultiValues]: if data is None: raise TypeError("kill_and_add_definition() does not take None as data.") if annotated: d = data else: definition: Definition = Definition(atom, code_loc, dummy=dummy, tags=tags) d = MultiValues() for offset, vs in data.items(): for v in vs: d.add_value(offset, self.annotate_with_def(v, definition)) # set_object() replaces kill (not implemented) and add (add) in one step if isinstance(atom, Register): try: self.registers.store( atom.reg_offset, d, size=atom.size, endness=endness, ) except SimMemoryError: l.warning("Failed to store register definition %s at %d.", d, atom.reg_offset, exc_info=True) elif isinstance(atom, MemoryLocation): if endness is None: endness = atom.endness if isinstance(atom.addr, SpOffset): if atom.addr.offset is not None: stack_addr = self.stack_offset_to_stack_addr(atom.addr.offset) self.stack.store(stack_addr, d, size=atom.size, endness=endness) else: l.warning("Skip stack storing since the stack offset is None.") elif isinstance(atom.addr, HeapAddress): self.heap.store(atom.addr.value, d, size=atom.size, endness=endness) elif isinstance(atom.addr, int): self.memory.store(atom.addr, d, size=atom.size, endness=endness) elif isinstance(atom.addr, claripy.ast.Base): if atom.addr.concrete: self.memory.store(atom.addr.concrete_value, d, size=atom.size, endness=endness) elif self.is_stack_address(atom.addr): stack_addr = self.get_stack_address(atom.addr) if stack_addr is None: l.warning( "Failed to convert stack address %s to a concrete stack address. Skip the store.", atom.addr ) else: self.stack.store(stack_addr, d, size=atom.size, endness=endness) else: return None else: return None elif isinstance(atom, Tmp): if self.track_tmps: self.tmps[atom.tmp_idx] = {definition} else: self.tmps[atom.tmp_idx] = self.uses_by_codeloc[code_loc] return None else: self.others[atom] = d return d
[docs] def add_use(self, atom: Atom, code_loc: CodeLocation, expr: Optional[Any] = None) -> None: if isinstance(atom, Register): self.add_register_use(atom.reg_offset, atom.size, code_loc, expr=expr) elif isinstance(atom, MemoryLocation): if isinstance(atom.addr, SpOffset): self.add_stack_use(atom, code_loc, expr=expr) elif isinstance(atom.addr, HeapAddress): self.add_heap_use(atom, code_loc, expr=expr) elif isinstance(atom.addr, int): self.add_memory_use(atom, code_loc, expr=expr) else: # ignore RegisterOffset pass elif isinstance(atom, Tmp): self.add_tmp_use(atom, code_loc) else: for defn in self.get_definitions(atom): self.other_uses.add_use(defn, code_loc, expr) self.uses_by_codeloc[code_loc].add(defn)
[docs] def add_use_by_def(self, definition: Definition, code_loc: CodeLocation, expr: Any = None) -> None: if isinstance(definition.atom, Register): self.add_register_use_by_def(definition, code_loc, expr=expr) elif isinstance(definition.atom, MemoryLocation): if isinstance(definition.atom.addr, SpOffset): self.add_stack_use_by_def(definition, code_loc, expr=expr) elif isinstance(definition.atom.addr, HeapAddress): self.add_heap_use_by_def(definition, code_loc, expr=expr) elif isinstance(definition.atom.addr, int): self.add_memory_use_by_def(definition, code_loc, expr=expr) else: # ignore RegisterOffset pass elif type(definition.atom) is Tmp: self.add_tmp_use_by_def(definition, code_loc) elif type(definition.atom) is ConstantSrc: # ignore constants pass else: self.other_uses.add_use(definition, code_loc, expr)
[docs] def get_definitions( self, thing: Union[Atom, Definition[Atom], Iterable[Atom], Iterable[Definition[Atom]], MultiValues] ) -> Set[Definition[Atom]]: if isinstance(thing, MultiValues): defs = set() for vs in thing.values(): for v in vs: defs.update(LiveDefinitions.extract_defs_from_annotations(v.annotations)) return defs elif isinstance(thing, Atom): pass elif isinstance(thing, Definition): thing = thing.atom else: defs = set() for atom2 in thing: defs |= self.get_definitions(atom2) return defs if isinstance(thing, Register): return self.get_register_definitions(thing.reg_offset, thing.size) elif isinstance(thing, MemoryLocation): if isinstance(thing.addr, SpOffset): return self.get_stack_definitions(thing.addr.offset, thing.size) elif isinstance(thing.addr, HeapAddress): return self.get_heap_definitions(thing.addr.value, size=thing.size) elif isinstance(thing.addr, int): return self.get_memory_definitions(thing.addr, thing.size) else: return set() elif isinstance(thing, Tmp): return self.get_tmp_definitions(thing.tmp_idx) else: defs = set() for mvs in self.others.get(thing, {}).values(): for mv in mvs: defs |= self.get_definitions(mv) return defs
[docs] def get_tmp_definitions(self, tmp_idx: int) -> Set[Definition]: if tmp_idx in self.tmps: return self.tmps[tmp_idx] else: return set()
[docs] def get_register_definitions(self, reg_offset: int, size: int) -> Set[Definition]: try: annotations = self.registers.load_annotations(reg_offset, size) except SimMemoryMissingError as ex: if ex.missing_addr > reg_offset: annotations = self.registers.load_annotations(reg_offset, ex.missing_addr - reg_offset) else: # nothing we can do return set() return LiveDefinitions.extract_defs_from_annotations(annotations)
[docs] def get_stack_values(self, stack_offset: int, size: int, endness: str) -> Optional[MultiValues]: stack_addr = self.stack_offset_to_stack_addr(stack_offset) try: return self.stack.load(stack_addr, size=size, endness=endness) except SimMemoryMissingError: return None
[docs] def get_stack_definitions(self, stack_offset: int, size: int) -> Set[Definition]: try: stack_addr = self.stack_offset_to_stack_addr(stack_offset) annotations = self.stack.load_annotations(stack_addr, size) except SimMemoryMissingError: return set() return LiveDefinitions.extract_defs_from_annotations(annotations)
[docs] def get_heap_definitions(self, heap_addr: int, size: int) -> Set[Definition]: try: annotations = self.heap.load_annotations(heap_addr, size) except SimMemoryMissingError: return set() return LiveDefinitions.extract_defs_from_annotations(annotations)
[docs] def get_memory_definitions(self, addr: int, size: int) -> Set[Definition]: try: annotations = self.memory.load_annotations(addr, size) except SimMemoryMissingError: return set() return LiveDefinitions.extract_defs_from_annotations(annotations)
@deprecated("get_definitions") def get_definitions_from_atoms(self, atoms: Iterable[Atom]) -> Iterable[Definition]: result = set() for atom in atoms: result |= self.get_definitions(atom) return result @deprecated("get_values") def get_value_from_definition(self, definition: Definition) -> Optional[MultiValues]: return self.get_value_from_atom(definition.atom) @deprecated("get_one_value") def get_one_value_from_definition(self, definition: Definition) -> Optional[claripy.ast.bv.BV]: return self.get_one_value_from_atom(definition.atom) @deprecated("get_concrete_value") def get_concrete_value_from_definition(self, definition: Definition) -> Optional[int]: return self.get_concrete_value_from_atom(definition.atom) @deprecated("get_values") def get_value_from_atom(self, atom: Atom) -> Optional[MultiValues]: if isinstance(atom, Register): try: return self.registers.load(atom.reg_offset, size=atom.size) except SimMemoryMissingError: return None elif isinstance(atom, MemoryLocation): if isinstance(atom.addr, SpOffset): stack_addr = self.stack_offset_to_stack_addr(atom.addr.offset) try: return self.stack.load(stack_addr, size=atom.size, endness=atom.endness) except SimMemoryMissingError: return None elif isinstance(atom.addr, HeapAddress): try: return self.heap.load(atom.addr.value, size=atom.size, endness=atom.endness) except SimMemoryMissingError: return None elif isinstance(atom.addr, int): try: return self.memory.load(atom.addr, size=atom.size, endness=atom.endness) except SimMemoryMissingError: return None else: # ignore RegisterOffset return None else: return None @deprecated("get_one_value") def get_one_value_from_atom(self, atom: Atom) -> Optional[claripy.ast.bv.BV]: r = self.get_value_from_atom(atom) if r is None: return None return r.one_value() @deprecated("get_concrete_value") def get_concrete_value_from_atom(self, atom: Atom) -> Optional[int]: r = self.get_one_value_from_atom(atom) if r is None: return None if r.symbolic: return None return r.concrete_value
[docs] def get_values( self, spec: Union[Atom, Definition[Atom], Iterable[Atom], Iterable[Definition[Atom]]] ) -> Optional[MultiValues]: if isinstance(spec, Definition): atom = spec.atom elif isinstance(spec, Atom): atom = spec else: result = None for atom in spec: r = self.get_values(atom) if r is None: continue if result is None: result = r else: result = result.merge(r) return result if isinstance(atom, Register): try: return self.registers.load(atom.reg_offset, size=atom.size) except SimMemoryMissingError: return None elif isinstance(atom, MemoryLocation): if isinstance(atom.addr, SpOffset): stack_addr = self.stack_offset_to_stack_addr(atom.addr.offset) try: return self.stack.load(stack_addr, size=atom.size, endness=atom.endness) except SimMemoryMissingError: return None elif isinstance(atom.addr, HeapAddress): try: return self.heap.load(atom.addr.value, size=atom.size, endness=atom.endness) except SimMemoryMissingError: return None elif isinstance(atom.addr, int): try: return self.memory.load(atom.addr, size=atom.size, endness=atom.endness) except SimMemoryMissingError: pass if self.project is not None: try: bytestring = self.project.loader.memory.load(atom.addr, atom.size) if atom.endness == archinfo.Endness.LE: bytestring = bytes(reversed(bytestring)) mv = MultiValues( self.annotate_with_def(claripy.BVV(bytestring), Definition(atom, ExternalCodeLocation())) ) return mv except KeyError: pass return None else: # ignore RegisterOffset return None else: return self.others.get(atom, None)
[docs] def get_one_value( self, spec: Union[Atom, Definition, Iterable[Atom], Iterable[Definition[Atom]]], strip_annotations: bool = False, ) -> Optional[claripy.ast.bv.BV]: r = self.get_values(spec) if r is None: return None return r.one_value(strip_annotations=strip_annotations)
@overload def get_concrete_value( self, spec: Union[Atom, Definition[Atom], Iterable[Atom], Iterable[Definition[Atom]]], cast_to: Type[int] = ... ) -> Optional[int]: ... @overload def get_concrete_value( self, spec: Union[Atom, Definition[Atom], Iterable[Atom], Iterable[Definition[Atom]]], cast_to: Type[bytes] = ..., ) -> Optional[bytes]: ...
[docs] def get_concrete_value( self, spec: Union[Atom, Definition[Atom], Iterable[Atom], Iterable[Definition[Atom]]], cast_to: Union[Type[int], Type[bytes]] = int, ) -> Union[int, bytes, None]: r = self.get_one_value(spec, strip_annotations=True) if r is None: return None if r.symbolic: return None result = r.concrete_value if issubclass(cast_to, bytes): return result.to_bytes(len(r) // 8, "big") return result
[docs] def add_register_use(self, reg_offset: int, size: int, code_loc: CodeLocation, expr: Optional[Any] = None) -> None: # get all current definitions try: mvs: MultiValues = self.registers.load(reg_offset, size=size) except SimMemoryMissingError: return for vs in mvs.values(): for v in vs: for def_ in self.extract_defs(v): self.add_register_use_by_def(def_, code_loc, expr=expr)
[docs] def add_register_use_by_def(self, def_: Definition, code_loc: CodeLocation, expr: Optional[Any] = None) -> None: self.register_uses.add_use(def_, code_loc, expr=expr) self.uses_by_codeloc[code_loc].add(def_)
[docs] def add_stack_use(self, atom: MemoryLocation, code_loc: CodeLocation, expr: Optional[Any] = None) -> None: if not isinstance(atom.addr, SpOffset): raise TypeError("Atom %r is not a stack location atom." % atom) for current_def in self.get_definitions(atom): self.add_stack_use_by_def(current_def, code_loc, expr=expr)
[docs] def add_stack_use_by_def(self, def_: Definition, code_loc: CodeLocation, expr: Optional[Any] = None) -> None: self.stack_uses.add_use(def_, code_loc, expr=expr) self.uses_by_codeloc[code_loc].add(def_)
[docs] def add_heap_use(self, atom: MemoryLocation, code_loc: CodeLocation, expr: Optional[Any] = None) -> None: if not isinstance(atom.addr, HeapAddress): raise TypeError("Atom %r is not a heap location atom." % atom) current_defs = self.get_definitions(atom) for current_def in current_defs: self.add_heap_use_by_def(current_def, code_loc, expr=expr)
[docs] def add_heap_use_by_def(self, def_: Definition, code_loc: CodeLocation, expr: Optional[Any] = None) -> None: self.heap_uses.add_use(def_, code_loc, expr=expr) self.uses_by_codeloc[code_loc].add(def_)
[docs] def add_memory_use(self, atom: MemoryLocation, code_loc: CodeLocation, expr: Optional[Any] = None) -> None: # get all current definitions current_defs: Set[Definition] = self.get_definitions(atom) for current_def in current_defs: self.add_memory_use_by_def(current_def, code_loc, expr=expr)
[docs] def add_memory_use_by_def(self, def_: Definition, code_loc: CodeLocation, expr: Optional[Any] = None) -> None: self.memory_uses.add_use(def_, code_loc, expr=expr) self.uses_by_codeloc[code_loc].add(def_)
[docs] def add_tmp_use(self, atom: Tmp, code_loc: CodeLocation) -> None: if self.track_tmps: if atom.tmp_idx in self.tmps: defs = self.tmps[atom.tmp_idx] for def_ in defs: self.add_tmp_use_by_def(def_, code_loc) else: if atom.tmp_idx in self.tmps: defs = self.tmps[atom.tmp_idx] for d in defs: assert type(d.atom) is not Tmp self.add_use_by_def(d, code_loc)
[docs] def add_tmp_use_by_def(self, def_: Definition, code_loc: CodeLocation) -> None: if not isinstance(def_.atom, Tmp): raise TypeError("Atom %r is not a Tmp atom." % def_.atom) self.tmp_uses[def_.atom.tmp_idx].add(code_loc) self.uses_by_codeloc[code_loc].add(def_)
@overload def deref( self, pointer: Union[MultiValues, Atom, Definition, Iterable[Atom], Iterable[Definition]], size: Union[int, DerefSize], endness: archinfo.Endness = ..., ) -> Set[MemoryLocation]: ... @overload def deref( self, pointer: Union[int, claripy.ast.BV, HeapAddress, SpOffset], size: Union[int, DerefSize], endness: archinfo.Endness = ..., ) -> Optional[MemoryLocation]: ...
[docs] def deref(self, pointer, size, endness=archinfo.Endness.BE): if isinstance(pointer, (Atom, Definition)): pointer = self.get_values(pointer) if pointer is None: return set() if isinstance(pointer, set): result = set() for ptr_atom in pointer: result.update(self.deref(ptr_atom, size, endness)) return result if isinstance(pointer, MultiValues): result = set() for vs in pointer.values(): for value in vs: atom = self.deref(value, size, endness) if atom is not None: result.add(atom) return result if isinstance(pointer, (HeapAddress, SpOffset, int)): addr = pointer else: assert isinstance(pointer, claripy.ast.BV) if self.is_top(pointer): return None # TODO this can be simplified with the walrus operator stack_offset = self.get_stack_offset(pointer) if stack_offset is not None: addr = SpOffset(len(pointer), stack_offset) else: heap_offset = self.get_heap_offset(pointer) if heap_offset is not None: addr = HeapAddress(heap_offset) elif pointer.op == "BVV": addr = pointer.args[0] else: # cannot resolve return None if isinstance(size, DerefSize): assert size == DerefSize.NULL_TERMINATE for sz in range(4096): # arbitrary # truly evil that this is an abstraction we have to contend with mv = self.get_values(MemoryLocation(addr + sz, 1, endness)) if mv is not None and 0 in mv and any(one.op == "BVV" and one.args[0] == 0 for one in mv[0]): size = sz + 1 break else: l.warning( "Could not resolve cstring dereference at %s to a concrete size", hex(addr) if isinstance(addr, int) else addr, ) size = 4096 return MemoryLocation(addr, size, endness)
[docs] @staticmethod def is_heap_address(addr: claripy.ast.Base) -> bool: return "heap_base" in addr.variables
[docs] @staticmethod def get_heap_offset(addr: claripy.ast.Base) -> Optional[int]: if "heap_base" in addr.variables: if addr.op == "BVS": return 0 elif addr.op == "__add__" and len(addr.args) == 2 and addr.args[1].op == "BVV": return addr.args[1].concrete_value return None
[docs] def heap_address(self, offset: Union[int, HeapAddress]) -> claripy.ast.BV: if isinstance(offset, HeapAddress): if not isinstance(offset.value, int): raise TypeError("TODO: what's this? contact @rhelmot") offset = offset.value base = claripy.BVS("heap_base", self.arch.bits, explicit_name=True) if offset: return base + offset return base