from typing import Optional, Iterable, Set, Tuple, Any, TYPE_CHECKING, Iterator, Union
import logging
import archinfo
import claripy
from ...analyses.reaching_definitions.call_trace import CallTrace
from ...storage.memory_mixins.paged_memory.pages.multi_values import MultiValues
from ...storage.memory_mixins import MultiValuedMemory
from ...knowledge_plugins.key_definitions import LiveDefinitions
from ...knowledge_plugins.key_definitions.atoms import (
Atom,
GuardUse,
Register,
MemoryLocation,
ConstantSrc,
)
from ...knowledge_plugins.functions.function import Function
from ...knowledge_plugins.key_definitions.definition import Definition
from ...knowledge_plugins.key_definitions.environment import Environment
from ...knowledge_plugins.key_definitions.tag import InitialValueTag, ParameterTag, Tag
from ...knowledge_plugins.key_definitions.heap_address import HeapAddress
from ...calling_conventions import SimCC, SimRegArg, SimStackArg
from ...engines.light import SpOffset
from ...code_location import CodeLocation
from .external_codeloc import ExternalCodeLocation
from .heap_allocator import HeapAllocator
from .subject import Subject, SubjectType
if TYPE_CHECKING:
from .reaching_definitions import ReachingDefinitionsAnalysis
l = logging.getLogger(name=__name__)
#
# Reaching definitions state
#
[docs]class ReachingDefinitionsState:
"""
Represents the internal state of the ReachingDefinitionsAnalysis.
It contains a data class LiveDefinitions, which stores both definitions and uses for register, stack, memory, and
temporary variables, uncovered during the analysis.
:param subject: The subject being analyzed.
:ivar arch: The architecture targeted by the program.
:param track_tmps: Only tells whether or not temporary variables should be taken into consideration when
representing the state of the analysis.
Should be set to true when the analysis has counted uses and definitions for temporary
variables, false otherwise.
:param analysis: The analysis that generated the state represented by this object.
:param rtoc_value: When the targeted architecture is ppc64, the initial function needs to know the `rtoc_value`.
:param live_definitions:
:param canonical_size:
The sizes (in bytes) that objects with an UNKNOWN_SIZE are treated as for operations where sizes are necessary.
:param heap_allocator: Mechanism to model the management of heap memory.
:param environment: Representation of the environment of the analyzed program.
"""
__slots__ = (
"arch",
"_subject",
"_track_tmps",
"analysis",
"codeloc",
"codeloc_uses",
"live_definitions",
"all_definitions",
"_canonical_size",
"heap_allocator",
"_environment",
"_track_consts",
"_sp_adjusted",
"exit_observed",
)
[docs] def __init__(
self,
codeloc: CodeLocation,
arch: archinfo.Arch,
subject: Subject,
track_tmps: bool = False,
track_consts: bool = False,
analysis: Optional["ReachingDefinitionsAnalysis"] = None,
rtoc_value=None,
live_definitions: Optional[LiveDefinitions] = None,
canonical_size: int = 8,
heap_allocator: HeapAllocator = None,
environment: Environment = None,
sp_adjusted: bool = False,
all_definitions: Optional[Set[Definition]] = None,
):
# handy short-hands
self.codeloc = codeloc
self.arch: archinfo.Arch = arch
self._subject = subject
self._track_tmps = track_tmps
self._track_consts = track_consts
self.analysis = analysis
self._canonical_size: int = canonical_size
self._sp_adjusted: bool = sp_adjusted
self.all_definitions: Set[Definition] = set() if all_definitions is None else all_definitions
if live_definitions is None:
# the first time this state is created. initialize it
self.live_definitions = LiveDefinitions(
self.arch, track_tmps=self._track_tmps, canonical_size=canonical_size
)
self._set_initialization_values(subject, rtoc_value)
else:
# this state is a copy from a previous state. skip the initialization
self.live_definitions = live_definitions
self.heap_allocator = heap_allocator or HeapAllocator(canonical_size)
self._environment: Environment = environment or Environment()
self.codeloc_uses: Set[Definition] = set()
# have we observed an exit statement or not during the analysis of the *last instruction* of a block? we should
# not perform any sp updates if it is the case. this is for handling conditional returns in ARM binaries.
# this variable is not copied to new states because it only tracks if an exit statement is observed in a single
# block and is always set to False at the beginning of the analysis of each block.
self.exit_observed: bool = False
#
# Util methods for working with the memory model
#
[docs] def top(self, bits: int):
return self.live_definitions.top(bits)
[docs] def is_top(self, *args):
return self.live_definitions.is_top(*args)
[docs] def heap_address(self, offset: int) -> claripy.ast.Base:
base = claripy.BVS("heap_base", self.arch.bits, explicit_name=True)
if offset:
return base + offset
return base
[docs] @staticmethod
def is_heap_address(addr: claripy.ast.Base) -> bool:
return "heap_base" in addr.variables
[docs] @staticmethod
def get_heap_offset(addr: claripy.ast.Base) -> Optional[int]:
if "heap_base" in addr.variables:
if addr.op == "BVS":
return 0
elif addr.op == "__add__" and len(addr.args) == 2 and addr.args[1].op == "BVV":
return addr.args[1]._model_concrete.value
return None
[docs] def stack_address(self, offset: int) -> claripy.ast.Base:
return self.live_definitions.stack_address(offset)
[docs] def is_stack_address(self, addr: claripy.ast.Base) -> bool:
return self.live_definitions.is_stack_address(addr)
[docs] def get_stack_offset(self, addr: claripy.ast.Base) -> Optional[int]:
offset = self.live_definitions.get_stack_offset(addr)
if offset is not None:
return self._to_signed(offset)
return None
def _initial_stack_pointer(self):
if self.arch.bits == 32:
return claripy.BVS("stack_base", 32, explicit_name=True)
elif self.arch.bits == 64:
return claripy.BVS("stack_base", 64, explicit_name=True)
else:
raise ValueError("Unsupported architecture word size %d" % self.arch.bits)
def _to_signed(self, n):
if n >= 2 ** (self.arch.bits - 1):
# convert it to a negative number
return n - 2**self.arch.bits
return n
[docs] def annotate_with_def(self, symvar: claripy.ast.Base, definition: Definition) -> claripy.ast.Base:
"""
:param symvar:
:param definition:
:return:
"""
return self.live_definitions.annotate_with_def(symvar, definition)
[docs] def annotate_mv_with_def(self, mv: MultiValues, definition: Definition) -> MultiValues:
return MultiValues(
offset_to_values={
offset: {self.annotate_with_def(value, definition) for value in values} for offset, values in mv.items()
}
)
#
# Other methods
#
@property
def tmp_definitions(self):
return self.live_definitions.tmps
@property
def tmp_uses(self):
return self.live_definitions.tmp_uses
@property
def register_uses(self):
return self.live_definitions.register_uses
@property
def register_definitions(self) -> MultiValuedMemory:
return self.live_definitions.register_definitions
@property
def stack_definitions(self) -> MultiValuedMemory:
return self.live_definitions.stack_definitions
@property
def stack_uses(self):
return self.live_definitions.stack_uses
@property
def heap_definitions(self) -> MultiValuedMemory:
return self.live_definitions.heap_definitions
@property
def heap_uses(self):
return self.live_definitions.heap_uses
@property
def memory_uses(self):
return self.live_definitions.memory_uses
@property
def memory_definitions(self) -> MultiValuedMemory:
return self.live_definitions.memory_definitions
@property
def uses_by_codeloc(self):
return self.live_definitions.uses_by_codeloc
[docs] def get_sp(self) -> int:
return self.live_definitions.get_sp()
[docs] def get_stack_address(self, offset: claripy.ast.Base) -> int:
return self.live_definitions.get_stack_address(offset)
@property
def environment(self):
return self._environment
@property
def _dep_graph(self):
return self.analysis._dep_graph
@property
def dep_graph(self):
return self.analysis.dep_graph
def __repr__(self):
ctnt = "RDState-%r" % (self.live_definitions)
return "{%s}" % ctnt
def _set_initialization_values(self, subject: Subject, rtoc_value: Optional[int] = None):
if subject.type == SubjectType.Function:
if isinstance(self.arch, archinfo.arch_ppc64.ArchPPC64) and not rtoc_value:
raise ValueError("The architecture being ppc64, the parameter `rtoc_value` should be provided.")
self._initialize_function(
subject.cc,
subject.content.addr,
rtoc_value,
)
elif subject.type == SubjectType.CallTrace:
if isinstance(self.arch, archinfo.arch_ppc64.ArchPPC64) and not rtoc_value:
raise ValueError("The architecture being ppc64, the parameter `rtoc_value` should be provided.")
self._initialize_function(
subject.cc,
subject.content.current_function_address(),
rtoc_value,
)
elif subject.type == SubjectType.Block:
pass
return self
def _generate_call_string(self, current_address: int) -> Tuple[int, ...]:
if isinstance(self._subject.content, Function):
return (self._subject.content.addr,)
elif isinstance(self._subject.content, CallTrace):
if any(current_address == x.caller_func_addr for x in self._subject.content.callsites):
callsites = iter(reversed([x.caller_func_addr for x in self._subject.content.callsites]))
for call_addr in callsites:
if current_address == call_addr:
break
return tuple(callsites)
else:
return tuple(x.caller_func_addr for x in self._subject.content.callsites)
else:
l.warning("Subject with unknown content-type")
return None
def _initialize_function(self, cc: SimCC, func_addr: int, rtoc_value: Optional[int] = None):
# initialize stack pointer
call_string = self._generate_call_string(func_addr)
sp_atom = Register(self.arch.sp_offset, self.arch.bytes)
sp_def = Definition(sp_atom, ExternalCodeLocation(call_string), tags={InitialValueTag()})
sp = self.annotate_with_def(self._initial_stack_pointer(), sp_def)
self.register_definitions.store(self.arch.sp_offset, sp)
if cc is not None:
prototype = self.analysis.kb.functions[func_addr].prototype
if prototype is not None:
for loc in cc.arg_locs(prototype):
for arg in loc.get_footprint():
# initialize register parameters
if isinstance(arg, SimRegArg):
# FIXME: implement reg_offset handling in SimRegArg
reg_offset = self.arch.registers[arg.reg_name][0]
reg_atom = Register(reg_offset, self.arch.bytes)
reg_def = Definition(
reg_atom, ExternalCodeLocation(call_string), tags={ParameterTag(function=func_addr)}
)
self.all_definitions.add(reg_def)
reg = self.annotate_with_def(self.top(self.arch.bits), reg_def)
self.register_definitions.store(reg_offset, reg)
# initialize stack parameters
elif isinstance(arg, SimStackArg):
ml_atom = MemoryLocation(SpOffset(self.arch.bits, arg.stack_offset), arg.size)
ml_def = Definition(
ml_atom, ExternalCodeLocation(call_string), tags={ParameterTag(function=func_addr)}
)
self.all_definitions.add(ml_def)
ml = self.annotate_with_def(self.top(self.arch.bits), ml_def)
stack_address = self.get_stack_address(self.stack_address(arg.stack_offset))
self.stack_definitions.store(stack_address, ml, endness=self.arch.memory_endness)
else:
raise TypeError("Unsupported parameter type %s." % type(arg).__name__)
# architecture dependent initialization
if self.arch.name.startswith("PPC64"):
if rtoc_value is None:
raise TypeError("rtoc_value must be provided on PPC64.")
offset, size = self.arch.registers["rtoc"]
rtoc_atom = Register(offset, size)
rtoc_def = Definition(rtoc_atom, ExternalCodeLocation(call_string), tags={InitialValueTag()})
self.all_definitions.add(rtoc_def)
rtoc = self.annotate_with_def(claripy.BVV(rtoc_value, self.arch.bits), rtoc_def)
self.register_definitions.store(offset, rtoc)
elif self.arch.name.startswith("MIPS64"):
offset, size = self.arch.registers["t9"]
t9_atom = Register(offset, size)
t9_def = Definition(t9_atom, ExternalCodeLocation(call_string), tags={InitialValueTag()})
self.all_definitions.add(t9_def)
t9 = self.annotate_with_def(claripy.BVV(func_addr, self.arch.bits), t9_def)
self.register_definitions.store(offset, t9)
elif self.arch.name.startswith("MIPS"):
if func_addr is None:
l.warning("func_addr must not be None to initialize a function in mips")
t9_offset = self.arch.registers["t9"][0]
t9_atom = Register(t9_offset, self.arch.bytes)
t9_def = Definition(t9_atom, ExternalCodeLocation(call_string), tags={InitialValueTag()})
self.all_definitions.add(t9_def)
t9 = self.annotate_with_def(claripy.BVV(func_addr, self.arch.bits), t9_def)
self.register_definitions.store(t9_offset, t9)
[docs] def copy(self) -> "ReachingDefinitionsState":
rd = ReachingDefinitionsState(
self.codeloc,
self.arch,
self._subject,
track_tmps=self._track_tmps,
track_consts=self._track_consts,
analysis=self.analysis,
live_definitions=self.live_definitions.copy(),
canonical_size=self._canonical_size,
heap_allocator=self.heap_allocator,
environment=self._environment,
sp_adjusted=self._sp_adjusted,
all_definitions=self.all_definitions.copy(),
)
return rd
[docs] def merge(self, *others) -> Tuple["ReachingDefinitionsState", bool]:
state = self.copy()
others: Iterable["ReachingDefinitionsState"]
state.live_definitions, merged_0 = state.live_definitions.merge(*[other.live_definitions for other in others])
state._environment, merged_1 = state.environment.merge(*[other.environment for other in others])
return state, merged_0 or merged_1
[docs] def move_codelocs(self, new_codeloc: CodeLocation) -> None:
if self.codeloc != new_codeloc:
self.codeloc = new_codeloc
self.codeloc_uses = set()
[docs] def kill_definitions(self, atom: Atom) -> None:
"""
Overwrite existing definitions w.r.t 'atom' with a dummy definition instance. A dummy definition will not be
removed during simplification.
"""
self.live_definitions.kill_definitions(atom)
[docs] def kill_and_add_definition(
self,
atom: Atom,
data: MultiValues,
dummy=False,
tags: Set[Tag] = None,
endness=None, # XXX destroy
annotated: bool = False,
uses: Optional[Set[Definition]] = None,
override_codeloc: Optional[CodeLocation] = None,
) -> Tuple[Optional[MultiValues], Set[Definition]]:
codeloc = override_codeloc or self.codeloc
mv = self.live_definitions.kill_and_add_definition(
atom, codeloc, data, dummy=dummy, tags=tags, endness=endness, annotated=annotated
)
if mv is not None:
defs = set(LiveDefinitions.extract_defs_from_mv(mv))
self.all_definitions |= defs
if self._dep_graph is not None:
stack_use = {u for u in self.codeloc_uses if isinstance(u.atom, MemoryLocation) and u.atom.is_on_stack}
sp_offset = self.arch.sp_offset
bp_offset = self.arch.bp_offset
values = set()
for vs in mv.values():
for v in vs:
values.add(v)
if uses is None:
uses = self.codeloc_uses
for used in uses:
# sp is always used as a stack pointer, and we do not track dependencies against stack pointers.
# bp is sometimes used as a base pointer. we recognize such cases by checking if there is a use to
# the stack variable.
#
# There are two cases for which it is superfluous to report a dependency on (a use of) stack/base
# pointers:
# - The `Definition` *uses* a `MemoryLocation` pointing to the stack;
# - The `Definition` *is* a `MemoryLocation` pointing to the stack.
is_using_spbp_while_memory_address_on_stack_is_used = (
isinstance(used.atom, Register)
and used.atom.reg_offset in (sp_offset, bp_offset)
and len(stack_use) > 0
)
is_using_spbp_to_define_memory_location_on_stack = (
isinstance(atom, MemoryLocation)
and (
atom.is_on_stack
or (isinstance(atom.addr, claripy.ast.Base) and self.is_stack_address(atom.addr))
)
and isinstance(used.atom, Register)
and used.atom.reg_offset in (sp_offset, bp_offset)
)
if not (
is_using_spbp_while_memory_address_on_stack_is_used
or is_using_spbp_to_define_memory_location_on_stack
):
# Moderately confusing misnomers. This is an edge from a def to a use, since the
# "uses" are actually the definitions that we're using and the "definition" is the
# new definition; i.e. The def that the old def is used to construct so this is
# really a graph where nodes are defs and edges are uses.
self._dep_graph.add_node(used)
for def_ in defs:
if not def_.dummy:
self._dep_graph.add_edge(used, def_)
self._dep_graph.add_dependencies_for_concrete_pointers_of(
values,
used,
self.analysis.project.kb.cfgs.get_most_accurate(),
self.analysis.project.loader,
)
else:
defs = set()
return mv, defs
[docs] def add_use(self, atom: Atom, expr: Optional[Any] = None) -> None:
self.codeloc_uses.update(self.get_definitions(atom))
self.live_definitions.add_use(atom, self.codeloc, expr=expr)
[docs] def add_use_by_def(self, definition: Definition, expr: Optional[Any] = None) -> None:
self.codeloc_uses.add(definition)
self.live_definitions.add_use_by_def(definition, self.codeloc, expr=expr)
[docs] def add_tmp_use(self, tmp: int, expr: Optional[Any] = None) -> None:
defs = self.live_definitions.get_tmp_definitions(tmp)
self.add_tmp_use_by_defs(defs, expr=expr)
[docs] def add_tmp_use_by_defs(
self, defs: Iterable[Definition], expr: Optional[Any] = None
) -> None: # pylint:disable=unused-argument
for definition in defs:
self.codeloc_uses.add(definition)
# if track_tmps is False, definitions may not be Tmp definitions
self.live_definitions.add_use_by_def(definition, self.codeloc, expr=expr)
[docs] def add_register_use(self, reg_offset: int, size: int, expr: Optional[Any] = None) -> None:
defs = self.live_definitions.get_register_definitions(reg_offset, size)
self.add_register_use_by_defs(defs, expr=expr)
[docs] def add_register_use_by_defs(self, defs: Iterable[Definition], expr: Optional[Any] = None) -> None:
for definition in defs:
self.codeloc_uses.add(definition)
self.live_definitions.add_register_use_by_def(definition, self.codeloc, expr=expr)
[docs] def add_stack_use(self, stack_offset: int, size: int, endness, expr: Optional[Any] = None) -> None:
defs = self.live_definitions.get_stack_definitions(stack_offset, size, endness)
self.add_stack_use_by_defs(defs, expr=expr)
[docs] def add_stack_use_by_defs(self, defs: Iterable[Definition], expr: Optional[Any] = None):
for definition in defs:
self.codeloc_uses.add(definition)
self.live_definitions.add_stack_use_by_def(definition, self.codeloc, expr=expr)
[docs] def add_heap_use(self, heap_offset: int, size: int, endness, expr: Optional[Any] = None) -> None:
defs = self.live_definitions.get_heap_definitions(heap_offset, size, endness)
self.add_heap_use_by_defs(defs, expr=expr)
[docs] def add_heap_use_by_defs(self, defs: Iterable[Definition], expr: Optional[Any] = None):
for definition in defs:
self.codeloc_uses.add(definition)
self.live_definitions.add_heap_use_by_def(definition, self.codeloc, expr=expr)
[docs] def add_memory_use_by_def(self, definition: Definition, expr: Optional[Any] = None):
self.codeloc_uses.add(definition)
self.live_definitions.add_memory_use_by_def(definition, self.codeloc, expr=expr)
[docs] def add_memory_use_by_defs(self, defs: Iterable[Definition], expr: Optional[Any] = None):
for definition in defs:
self.codeloc_uses.add(definition)
self.live_definitions.add_memory_use_by_def(definition, self.codeloc, expr=expr)
[docs] def get_definitions(self, atom: Atom) -> Iterable[Definition]:
yield from self.live_definitions.get_definitions(atom)
[docs] def get_values(self, spec: Union[Atom, Definition]) -> Optional[MultiValues]:
return self.live_definitions.get_values(spec)
[docs] def get_one_value(self, spec: Union[Atom, Definition]) -> Optional[claripy.ast.base.Base]:
return self.live_definitions.get_one_value(spec)
[docs] def get_concrete_value(self, spec: Union[Atom, Definition]) -> Optional[int]:
return self.live_definitions.get_concrete_value(spec)
[docs] def mark_guard(self, target):
atom = GuardUse(target)
kinda_definition = Definition(atom, self.codeloc)
if self._dep_graph is not None:
self._dep_graph.add_node(kinda_definition)
for used in self.codeloc_uses:
self._dep_graph.add_edge(used, kinda_definition)
[docs] def mark_const(self, value: int, size: int):
atom = ConstantSrc(value, size)
kinda_definition = Definition(atom, self.codeloc)
if self._dep_graph is not None and self._track_consts:
self._dep_graph.add_node(kinda_definition)
self.codeloc_uses.add(kinda_definition)
self.live_definitions.uses_by_codeloc[self.codeloc].add(kinda_definition)
[docs] def downsize(self):
self.all_definitions = set()
[docs] def pointer_to_atoms(self, pointer: MultiValues, size: int, endness: str) -> Set[MemoryLocation]:
"""
Given a MultiValues, return the set of atoms that loading or storing to the pointer with that value
could define or use.
"""
result = set()
for vs in pointer.values():
for value in vs:
atom = self.pointer_to_atom(value, size, endness)
if atom is not None:
result.add(atom)
return result
[docs] def pointer_to_atom(self, value: claripy.ast.base.Base, size: int, endness: str) -> Optional[MemoryLocation]:
if self.is_top(value):
return None
# TODO this can be simplified with the walrus operator
stack_offset = self.get_stack_offset(value)
if stack_offset is not None:
addr = SpOffset(len(value), stack_offset)
else:
heap_offset = self.get_heap_offset(value)
if heap_offset is not None:
addr = HeapAddress(heap_offset)
elif value.op == "BVV":
addr = value.args[0]
else:
# cannot resolve
return None
return MemoryLocation(addr, size, endness)