# pylint:disable=isinstance-second-argument-not-valid-type
import weakref
from typing import Set, Optional, Any, Tuple, Union, List, Dict, DefaultDict, TYPE_CHECKING
from collections import defaultdict
import logging
import time
import claripy
import ailment
import pyvex
from archinfo.arch_arm import is_arm_arch
from ... import sim_options
from ...knowledge_plugins.propagations import PropagationModel
from ...storage.memory_mixins import LabeledMemory
from ...errors import SimMemoryMissingError
from ...code_location import CodeLocation
from ...storage.memory_object import SimMemoryObject, SimLabeledMemoryObject
from .. import register_analysis
from ..analysis import Analysis
from ..forward_analysis import ForwardAnalysis, FunctionGraphVisitor, SingleNodeGraphVisitor
from .engine_vex import SimEnginePropagatorVEX
from .engine_ail import SimEnginePropagatorAIL
from .prop_value import PropValue, Detail
if TYPE_CHECKING:
from archinfo import Arch
from angr.project import Project
from angr.analyses.reaching_definitions.reaching_definitions import ReachingDefinitionsModel
_l = logging.getLogger(name=__name__)
[docs]class PropagatorState:
"""
Describes the base state used in Propagator.
:ivar arch: Architecture of the binary.
:ivar gp: alue of the global pointer for MIPS binaries.
:ivar _replacements: Stores expressions to replace, keyed by CodeLocation instances
:ivar _equivalence: Stores equivalence constraints that Propagator discovers during the analysis.
:ivar _only_consts: Only track constants.
:ivar _expr_used_locs: A dict keyed by expressions and valued by CodeLocations where the expression is used.
:ivar _max_prop_expr_occurrence: The upperbound for the number of occurrences of an expression for Propagator
to propagate that expression to new locations (and replace the original expression).
Setting it to 0 disables this limit, which means Propagator will always propagate
expressions regardless of how many times it has been propagated.
"""
__slots__ = (
"arch",
"gpr_size",
"_expr_used_locs",
"_only_consts",
"_replacements",
"_equivalence",
"project",
"_store_tops",
"_gp",
"_max_prop_expr_occurrence",
"__weakref__",
)
_tops = {}
def __init__(
self,
arch: "Arch",
project: Optional["Project"] = None,
replacements: Optional[DefaultDict[CodeLocation, Dict]] = None,
only_consts: bool = False,
expr_used_locs: Optional[DefaultDict[Any, Set[CodeLocation]]] = None,
equivalence: Optional[Set["Equivalence"]] = None,
store_tops: bool = True,
gp: Optional[int] = None,
max_prop_expr_occurrence: int = 1,
):
self.arch = arch
self.gpr_size = arch.bits // arch.byte_width # size of the general-purpose registers
# propagation count of each expression
self._expr_used_locs = defaultdict(set) if expr_used_locs is None else expr_used_locs
self._only_consts = only_consts
self._replacements = defaultdict(dict) if replacements is None else replacements
self._equivalence: Set[Equivalence] = equivalence if equivalence is not None else set()
self._store_tops = store_tops
self._max_prop_expr_occurrence = max_prop_expr_occurrence
# architecture-specific information
self._gp: Optional[int] = gp # Value of gp for MIPS32 and 64 binaries
self.project = project
def __repr__(self):
return "<PropagatorState>"
def _get_weakref(self):
return weakref.proxy(self)
@staticmethod
def _is_const(v) -> bool:
if isinstance(v, (int, ailment.Expr.Const)):
return True
if isinstance(v, claripy.ast.BV) and v.op == "BVV":
return True
if isinstance(v, claripy.ast.FP) and v.op == "FPV":
return True
if isinstance(v, claripy.ast.Bool) and v.op == "BoolV":
return True
return False
@staticmethod
def _mo_cmp(
mo_self: Union["SimMemoryObject", "SimLabeledMemoryObject"],
mo_other: Union["SimMemoryObject", "SimLabeledMemoryObject"],
addr: int,
size: int,
): # pylint:disable=unused-argument
# comparing bytes from two sets of memory objects
# we don't need to resort to byte-level comparison. object-level is good enough.
if mo_self.object.symbolic or mo_other.object.symbolic:
if type(mo_self) is SimLabeledMemoryObject and type(mo_other) is SimLabeledMemoryObject:
return mo_self.label == mo_other.label and mo_self.object is mo_other.object
if type(mo_self) is SimMemoryObject and type(mo_other) is SimMemoryObject:
return mo_self.object is mo_other.object
# SimMemoryObject vs SimLabeledMemoryObject -> the label must be different
return False
return None
[docs] @staticmethod
def top(bits: int) -> claripy.ast.Bits:
"""
Get a TOP value.
:param size: Width of the TOP value (in bits).
:return: The TOP value.
"""
if bits in PropagatorState._tops:
return PropagatorState._tops[bits]
r = claripy.BVS("TOP", bits, explicit_name=True)
PropagatorState._tops[bits] = r
return r
[docs] @staticmethod
def is_top(expr) -> bool:
"""
Check if the given expression is a TOP value.
:param expr: The given expression.
:return: True if the expression is TOP, False otherwise.
"""
if isinstance(expr, claripy.ast.Base):
if expr.op == "BVS" and expr.args[0] == "TOP":
return True
if "TOP" in expr.variables:
return True
return False
[docs] def copy(self) -> "PropagatorState":
raise NotImplementedError()
[docs] def merge(self, *others):
state = self.copy()
merge_occurred = False
for o in others:
merge_occurred |= PropagatorAnalysis.merge_replacements(state._replacements, o._replacements)
if state._equivalence != o._equivalence:
merge_occurred = True
state._equivalence |= o._equivalence
return state, merge_occurred
[docs] def add_replacement(self, codeloc, old, new):
"""
Add a replacement record: Replacing expression `old` with `new` at program location `codeloc`.
If the self._only_consts flag is set to true, only constant values will be set.
:param CodeLocation codeloc: The code location.
:param old: The expression to be replaced.
:param new: The expression to replace with.
:return: None
"""
if self.is_top(new):
return
if self._only_consts:
if self._is_const(new) or self.is_top(new):
self._replacements[codeloc][old] = new
else:
self._replacements[codeloc][old] = new
[docs] def filter_replacements(self):
pass
# VEX state
[docs]class PropagatorVEXState(PropagatorState):
"""
Describes the state used in the VEX engine of Propagator.
"""
__slots__ = (
"_registers",
"_stack_variables",
"do_binops",
)
def __init__(
self,
arch,
project=None,
registers=None,
local_variables=None,
replacements=None,
only_consts=False,
expr_used_locs=None,
do_binops=True,
store_tops=True,
gp=None,
max_prop_expr_occurrence: int = 1,
):
super().__init__(
arch,
project=project,
replacements=replacements,
only_consts=only_consts,
expr_used_locs=expr_used_locs,
store_tops=store_tops,
gp=gp,
max_prop_expr_occurrence=max_prop_expr_occurrence,
)
self.do_binops = do_binops
self._registers = (
LabeledMemory(memory_id="reg", top_func=self.top, page_kwargs={"mo_cmp": self._mo_cmp})
if registers is None
else registers
)
self._stack_variables = (
LabeledMemory(memory_id="mem", top_func=self.top, page_kwargs={"mo_cmp": self._mo_cmp})
if local_variables is None
else local_variables
)
self._registers.set_state(self)
self._stack_variables.set_state(self)
def __repr__(self):
return "<PropagatorVEXState>"
[docs] def copy(self) -> "PropagatorVEXState":
cp = PropagatorVEXState(
self.arch,
project=self.project,
registers=self._registers.copy(),
local_variables=self._stack_variables.copy(),
replacements=self._replacements.copy(),
expr_used_locs=self._expr_used_locs.copy(),
only_consts=self._only_consts,
do_binops=self.do_binops,
store_tops=self._store_tops,
gp=self._gp,
max_prop_expr_occurrence=self._max_prop_expr_occurrence,
)
return cp
[docs] def merge(self, *others: "PropagatorVEXState") -> Tuple["PropagatorVEXState", bool]:
state = self.copy()
merge_occurred = state._registers.merge([o._registers for o in others], None)
merge_occurred |= state._stack_variables.merge([o._stack_variables for o in others], None)
return state, merge_occurred
[docs] def store_local_variable(self, offset, size, value, endness): # pylint:disable=unused-argument
# TODO: Handle size
self._stack_variables.store(offset, value, size=size, endness=endness)
[docs] def load_local_variable(self, offset, size, endness): # pylint:disable=unused-argument
# TODO: Handle size
try:
return self._stack_variables.load(offset, size=size, endness=endness)
except SimMemoryMissingError:
return self.top(size * self.arch.byte_width)
[docs] def store_register(self, offset, size, value):
self._registers.store(offset, value, size=size)
[docs] def load_register(self, offset, size):
# TODO: Fix me
if size != self.gpr_size:
return self.top(size * self.arch.byte_width)
try:
return self._registers.load(offset, size=size)
except SimMemoryMissingError:
return self.top(size * self.arch.byte_width)
# AIL state
[docs]class Equivalence:
"""
Describes an equivalence relationship between two atoms.
"""
__slots__ = (
"codeloc",
"atom0",
"atom1",
)
def __init__(self, codeloc, atom0, atom1):
self.codeloc = codeloc
self.atom0 = atom0
self.atom1 = atom1
def __repr__(self):
return f"<Eq@{self.codeloc!r}: {self.atom0!r}=={self.atom1!r}>"
def __eq__(self, other):
return (
type(other) is Equivalence
and other.codeloc == self.codeloc
and other.atom0 == self.atom0
and other.atom1 == self.atom1
)
def __hash__(self):
return hash((Equivalence, self.codeloc, self.atom0, self.atom1))
[docs]class PropagatorAILState(PropagatorState):
"""
Describes the state used in the AIL engine of Propagator.
"""
__slots__ = (
"_registers",
"_stack_variables",
"_tmps",
"temp_expressions",
"register_expressions",
"last_stack_store",
"global_stores",
"block_initial_reg_values",
"_sp_adjusted",
)
def __init__(
self,
arch,
project=None,
replacements=None,
only_consts=False,
expr_used_locs=None,
equivalence=None,
stack_variables=None,
registers=None,
gp=None,
block_initial_reg_values=None,
max_prop_expr_occurrence: int = 1,
sp_adjusted: bool = False,
):
super().__init__(
arch,
project=project,
replacements=replacements,
only_consts=only_consts,
expr_used_locs=expr_used_locs,
equivalence=equivalence,
gp=gp,
max_prop_expr_occurrence=max_prop_expr_occurrence,
)
self._stack_variables = (
LabeledMemory(memory_id="mem", top_func=self.top, page_kwargs={"mo_cmp": self._mo_cmp})
if stack_variables is None
else stack_variables
)
self._registers = (
LabeledMemory(memory_id="reg", top_func=self.top, page_kwargs={"mo_cmp": self._mo_cmp})
if registers is None
else registers
)
self._tmps = {}
self.temp_expressions = {}
self.register_expressions = {}
self.block_initial_reg_values: DefaultDict[
Tuple[int, int], List[Tuple[ailment.Expr.Register, ailment.Expr.Const]]
] = (defaultdict(list) if block_initial_reg_values is None else block_initial_reg_values)
self._sp_adjusted: bool = sp_adjusted
self._registers.set_state(self)
self._stack_variables.set_state(self)
# last_stack_store stores the most recent stack store statement with a non-concrete or unresolvable address. we
# use this information to determine if stack reads after this store can be safely resolved to definitions prior
# to the stack read.
self.last_stack_store: Optional[Tuple[int, int, ailment.Stmt.Store]] = None
self.global_stores: List[Tuple[int, int, Any, ailment.Stmt.Store]] = []
def __repr__(self):
return "<PropagatorAILState>"
[docs] def copy(self) -> "PropagatorAILState":
rd = PropagatorAILState(
self.arch,
project=self.project,
replacements=self._replacements.copy(),
expr_used_locs=self._expr_used_locs.copy(),
only_consts=self._only_consts,
equivalence=self._equivalence.copy(),
stack_variables=self._stack_variables.copy(),
registers=self._registers.copy(),
block_initial_reg_values=self.block_initial_reg_values.copy(),
# drop tmps
gp=self._gp,
max_prop_expr_occurrence=self._max_prop_expr_occurrence,
sp_adjusted=self._sp_adjusted,
)
return rd
[docs] @staticmethod
def is_const_or_register(value: Optional[Union[ailment.Expr.Expression, claripy.ast.Bits]]) -> bool:
if value is None:
return False
if isinstance(value, claripy.ast.BV):
return not value.symbolic
if isinstance(value, ailment.Expr.Register):
return True
if isinstance(value, ailment.Expr.Const) or (isinstance(value, int) and value == 0):
return True
# more hacks: also store the eq comparisons
if isinstance(value, ailment.Expr.BinaryOp) and value.op == "CmpEQ":
if all(isinstance(arg, (ailment.Expr.Const, ailment.Expr.Tmp)) for arg in value.operands):
return True
# more hacks: also store the conversions
if isinstance(value, ailment.Expr.Convert) and PropagatorAILState.is_const_or_register(value.operand):
return True
return False
[docs] def merge(self, *others) -> Tuple["PropagatorAILState", bool]:
state, merge_occurred = super().merge(*others)
state: "PropagatorAILState"
merge_occurred |= state._registers.merge([o._registers for o in others], None)
merge_occurred |= state._stack_variables.merge([o._stack_variables for o in others], None)
return state, merge_occurred
[docs] def store_temp(self, tmp_idx: int, value: PropValue):
self._tmps[tmp_idx] = value
[docs] def load_tmp(self, tmp_idx: int) -> Optional[PropValue]:
return self._tmps.get(tmp_idx, None)
[docs] def store_register(self, reg: ailment.Expr.Register, value: PropValue) -> None:
if isinstance(value, ailment.Expr.Expression) and value.has_atom(reg, identity=False):
return
for offset, chopped_value, size, label in value.value_and_labels():
self._registers.store(
reg.reg_offset + offset,
chopped_value,
size=size,
label=label,
endness=self.project.arch.register_endness,
)
[docs] def store_stack_variable(
self, sp_offset: int, new: PropValue, endness=None
) -> None: # pylint:disable=unused-argument
# normalize sp_offset to handle negative offsets
sp_offset += 0x65536
sp_offset &= (1 << self.arch.bits) - 1
for offset, value, size, label in new.value_and_labels():
self._stack_variables.store(sp_offset + offset, value, size=size, endness=endness, label=label)
[docs] def load_register(self, reg: ailment.Expr.Register) -> Optional[PropValue]:
try:
value, labels = self._registers.load_with_labels(
reg.reg_offset, size=reg.size, endness=self.project.arch.register_endness
)
except SimMemoryMissingError:
# value does not exist
return None
prop_value = PropValue.from_value_and_labels(value, labels)
return prop_value
[docs] def load_stack_variable(self, sp_offset: int, size, endness=None) -> Optional[PropValue]:
# normalize sp_offset to handle negative offsets
sp_offset += 0x65536
sp_offset &= (1 << self.arch.bits) - 1
try:
value, labels = self._stack_variables.load_with_labels(sp_offset, size=size, endness=endness)
except SimMemoryMissingError as ex:
# the stack variable does not exist - however, maybe some portion of it exists!
if ex.missing_addr > sp_offset:
# some data exist. load again
try:
value, labels = self._stack_variables.load_with_labels(
sp_offset, size=ex.missing_addr - sp_offset, endness=endness
)
# then we zero-extend both the value and labels
if value is not None and len(labels) == 1 and labels[0][0] == 0:
value = claripy.ZeroExt(ex.missing_size * self.arch.byte_width, value)
offset, offset_in_expr, size, label = labels[0]
labels = ((offset, offset_in_expr, size + ex.missing_size, label),)
except SimMemoryMissingError:
# failed again... welp
return None
else:
return None
prop_value = PropValue.from_value_and_labels(value, labels)
return prop_value
[docs] def add_replacement(self, codeloc, old, new):
if self._only_consts:
if self.is_const_or_register(new) or self.is_top(new):
pass
else:
new = self.top(1)
# do not replace anything with a call expression
if isinstance(new, ailment.statement.Call):
return
else:
from .call_expr_finder import CallExprFinder # pylint:disable=import-outside-toplevel
callexpr_finder = CallExprFinder()
callexpr_finder.walk_expression(new)
if callexpr_finder.has_call:
return
if self.is_top(new):
# eliminate the past propagation of this expression
self._replacements[codeloc][old] = self.top(1) # placeholder
return
# count-based propagation rule only matters when we are performing a full-function copy propagation
if self._max_prop_expr_occurrence == 0:
if (
isinstance(old, ailment.Expr.Tmp)
or isinstance(old, ailment.Expr.Register)
and old.reg_offset in {self.arch.sp_offset, self.arch.bp_offset}
):
self._replacements[codeloc][old] = new
else:
prop_count = 0
if (
not isinstance(old, ailment.Expr.Tmp)
and isinstance(new, ailment.Expr.Expression)
and not isinstance(new, ailment.Expr.Const)
):
# FIXME: We should find the definition in the RDA result and use the definition as the key
self._expr_used_locs[new].add(codeloc)
prop_count = len(self._expr_used_locs[new])
if ( # pylint:disable=too-many-boolean-expressions
prop_count <= self._max_prop_expr_occurrence
or isinstance(new, ailment.Expr.StackBaseOffset)
or isinstance(new, ailment.Expr.Convert)
and isinstance(new.operand, ailment.Expr.StackBaseOffset)
or (
isinstance(old, ailment.Expr.Register)
and self.arch.is_artificial_register(old.reg_offset, old.size)
)
):
# we can propagate this expression
self._replacements[codeloc][old] = new
else:
# eliminate the past propagation of this expression
for codeloc_ in self._replacements:
if old in self._replacements[codeloc_]:
self._replacements[codeloc_][old] = self.top(1)
[docs] def add_equivalence(self, codeloc, old, new):
eq = Equivalence(codeloc, old, new)
self._equivalence.add(eq)
[docs]class PropagatorAnalysis(ForwardAnalysis, Analysis): # pylint:disable=abstract-method
"""
PropagatorAnalysis implements copy propagation. It propagates values (either constant values or variables) and
expressions inside a block or across a function.
PropagatorAnalysis supports both VEX and AIL. The VEX propagator only performs constant propagation. The AIL
propagator performs both constant propagation and copy propagation of depth-N expressions.
PropagatorAnalysis performs certain arithmetic operations between constants, including but are not limited to:
- addition
- subtraction
- multiplication
- division
- xor
It also performs the following memory operations:
- Loading values from a known address
- Writing values to a stack variable
"""
def __init__(
self,
func=None,
block=None,
func_graph=None,
base_state=None,
max_iterations=3,
load_callback=None,
stack_pointer_tracker=None,
only_consts=False,
completed_funcs=None,
do_binops=True,
store_tops=True,
vex_cross_insn_opt=False,
func_addr: Optional[int] = None,
gp: Optional[int] = None,
cache_results: bool = False,
key_prefix: Optional[str] = None,
reaching_definitions: Optional["ReachingDefinitionsModel"] = None,
profiling: bool = False,
):
if block is None and func is not None:
# only func is specified. traversing a function
self.flavor = "function"
elif block is not None:
# traversing a block (but func might be specified at the same time to provide extra information, e.g., the
# value for register t9 for MIPS32/64 binaries)
self.flavor = "block"
else:
raise ValueError("Unsupported analysis target.")
start = time.perf_counter_ns() / 1000000
self._base_state = base_state
self._function = func
self._func_addr = func_addr if func_addr is not None else (None if func is None else func.addr)
self._block = block
self._block_addr = block.addr if block is not None else None
self._max_iterations = max_iterations
self._load_callback = load_callback
self._stack_pointer_tracker = stack_pointer_tracker # only used when analyzing AIL functions
self._only_consts = only_consts
self._completed_funcs = completed_funcs
self._do_binops = do_binops
self._store_tops = store_tops
self._vex_cross_insn_opt = vex_cross_insn_opt
self._gp = gp
self._prop_key_prefix = key_prefix
self._cache_results = cache_results
self._reaching_definitions = reaching_definitions
self._initial_codeloc: CodeLocation
if self.flavor == "function":
self._initial_codeloc = CodeLocation(self._func_addr, stmt_idx=0, ins_addr=self._func_addr)
else: # flavor == "block"
self._initial_codeloc = CodeLocation(self._block_addr, stmt_idx=0, ins_addr=self._block_addr)
self.model: PropagationModel = None
if self._cache_results:
# Resume the analysis from the previously unfinished result
self.model = self.kb.propagations.get(self.prop_key, None)
if self.model is None:
self.model = PropagationModel(
self.prop_key,
)
cache_used = False
else:
cache_used = True
graph_visitor: Union[SingleNodeGraphVisitor, FunctionGraphVisitor]
if self.flavor == "block":
graph_visitor = None
if self._cache_results:
graph_visitor: Optional[SingleNodeGraphVisitor] = self.model.graph_visitor
if graph_visitor is None:
graph_visitor = SingleNodeGraphVisitor(block)
elif self.flavor == "function":
graph_visitor = None
if self._cache_results:
graph_visitor: Optional[FunctionGraphVisitor] = self.model.graph_visitor
if graph_visitor is not None:
# resume
resumed = graph_visitor.resume_with_new_graph(func_graph if func_graph is not None else func.graph)
if not resumed:
# clean up...
self.model = PropagationModel(self.prop_key)
if graph_visitor is None:
graph_visitor = FunctionGraphVisitor(func, func_graph)
self.model.graph_visitor = graph_visitor
else:
raise TypeError(f"Unsupported flavor {self.flavor}")
ForwardAnalysis.__init__(
self, order_jobs=True, allow_merging=True, allow_widening=False, graph_visitor=graph_visitor
)
self._engine_vex = SimEnginePropagatorVEX(
project=self.project, arch=self.project.arch, reaching_definitions=self._reaching_definitions
)
self._engine_ail = SimEnginePropagatorAIL(
arch=self.project.arch,
stack_pointer_tracker=self._stack_pointer_tracker,
# We only propagate tmps within the same block. This is because the lifetime of tmps is one block only.
propagate_tmps=block is not None,
reaching_definitions=self._reaching_definitions,
)
# optimization: skip state copying for the initial state
self._initial_state = None
# performance counters
self._analyzed_states: int = 0
self._analyzed_statements: int = 0
self._analyze()
if self._cache_results:
# update the cache
self.kb.propagations.update(self.prop_key, self.model)
if profiling:
elapsed = time.perf_counter_ns() / 1000000 - start
if self.flavor == "function":
_l.warning("%r:", self._function)
else:
_l.warning("%r:", self._block)
_l.warning(" Time elapsed: %s milliseconds", elapsed)
_l.warning(" Cache used: %s", cache_used)
_l.warning(" Analyzed states: %d", self._analyzed_states)
_l.warning(" Analyzed statements: %d", self._analyzed_statements)
@property
def prop_key(self) -> Tuple[Optional[str], str, int, bool, bool, bool]:
"""
Gets a key that represents the function and the "flavor" of the propagation result.
"""
addr = self._func_addr if self._func_addr is not None else self._block_addr
return self._prop_key_prefix, self.flavor, addr, self._do_binops, self._only_consts, self._vex_cross_insn_opt
@property
def replacements(self):
return self.model.replacements
@replacements.setter
def replacements(self, v):
self.model.replacements = v
#
# Main analysis routines
#
def _node_key(self, node: Union[ailment.Block, pyvex.IRSB]) -> Any:
if type(node) is ailment.Block:
return node.addr, node.idx
elif type(node) is pyvex.IRSB:
return node.addr
# fallback
return node
def _pre_analysis(self):
pass
def _pre_job_handling(self, job):
pass
def _initial_abstract_state(self, node):
if isinstance(node, ailment.Block):
# AIL
state = PropagatorAILState(
self.project.arch,
project=self.project,
only_consts=self._only_consts,
gp=self._gp,
max_prop_expr_occurrence=1 if self.flavor == "function" else 0,
)
ail = True
spoffset_var = ailment.Expr.StackBaseOffset(None, self.project.arch.bits, 0)
sp_value = PropValue(
claripy.BVV(0x7FFF_FF00, self.project.arch.bits),
offset_and_details={0: Detail(self.project.arch.bytes, spoffset_var, self._initial_codeloc)},
)
state.store_register(
ailment.Expr.Register(None, None, self.project.arch.sp_offset, self.project.arch.bits),
sp_value,
)
else:
# VEX
state = PropagatorVEXState(
self.project.arch,
project=self.project,
only_consts=self._only_consts,
do_binops=self._do_binops,
store_tops=self._store_tops,
gp=self._gp,
max_prop_expr_occurrence=1 if self.flavor == "function" else 0,
)
spoffset_var = self._engine_vex.sp_offset(0)
ail = False
state.store_register(
self.project.arch.sp_offset,
self.project.arch.bytes,
spoffset_var,
)
if self.project.arch.name == "MIPS64":
if self._func_addr is not None:
if ail:
reg_expr = ailment.Expr.Register(
None, None, self.project.arch.registers["t9"][0], self.project.arch.registers["t9"][1]
)
reg_value = ailment.Expr.Const(None, None, self._func_addr, 64)
state.store_register(
reg_expr,
PropValue(
claripy.BVV(self._func_addr, 64),
offset_and_details={0: Detail(8, reg_value, self._initial_codeloc)},
),
)
else:
state.store_register( # pylint:disable=too-many-function-args
self.project.arch.registers["t9"][0],
self.project.arch.registers["t9"][1],
claripy.BVV(self._func_addr, 64),
)
elif self.project.arch.name == "MIPS32":
if self._func_addr is not None:
if ail:
reg_expr = ailment.Expr.Register(
None, None, self.project.arch.registers["t9"][0], self.project.arch.registers["t9"][1]
)
reg_value = ailment.Expr.Const(None, None, self._func_addr, 32)
state.store_register(
reg_expr,
PropValue(
claripy.BVV(self._func_addr, 32),
offset_and_details={0: Detail(4, reg_value, self._initial_codeloc)},
),
)
else:
state.store_register( # pylint:disable=too-many-function-args
self.project.arch.registers["t9"][0],
self.project.arch.registers["t9"][1],
claripy.BVV(self._func_addr, 32),
)
elif is_arm_arch(self.project.arch):
if ail:
# clear fpscr
reg_expr = ailment.Expr.Register(None, None, *self.project.arch.registers["fpscr"])
reg_value = ailment.Expr.Const(None, None, 0, 32)
state.store_register(
reg_expr,
PropValue(claripy.BVV(0, 32), offset_and_details={0: Detail(4, reg_value, self._initial_codeloc)}),
)
else:
state.store_register( # pylint:disable=too-many-function-args
self.project.arch.registers["fpscr"][0],
self.project.arch.registers["fpscr"][1],
claripy.BVV(0, 32),
)
self._initial_state = state
return state
def _merge_states(self, node, *states: PropagatorState):
merged_state, merge_occurred = states[0].merge(*states[1:])
return merged_state, not merge_occurred
def _run_on_node(self, node, state):
self._analyzed_states += 1
if isinstance(node, ailment.Block):
block = node
block_key = (node.addr, node.idx)
engine = self._engine_ail
else:
block = self.project.factory.block(
node.addr, node.size, opt_level=1, cross_insn_opt=self._vex_cross_insn_opt
)
block_key = node.addr
engine = self._engine_vex
if block.size == 0:
# maybe the block is not decodeable
return False, state
if state is not self._initial_state:
# make a copy of the state if it's not the initial state
state = state.copy()
state._equivalence.clear()
else:
# clear self._initial_state so that we *do not* run this optimization again!
self._initial_state = None
# Suppress spurious output
if self._base_state is not None:
self._base_state.options.add(sim_options.SYMBOL_FILL_UNCONSTRAINED_REGISTERS)
self._base_state.options.add(sim_options.SYMBOL_FILL_UNCONSTRAINED_MEMORY)
state = engine.process(
state,
block=block,
project=self.project,
base_state=self._base_state,
load_callback=self._load_callback,
fail_fast=self._fail_fast,
)
state.filter_replacements()
self.model.node_iterations[block_key] += 1
self.model.states[block_key] = state
if isinstance(state, PropagatorAILState):
self.model.block_initial_reg_values.update(state.block_initial_reg_values)
if self.model.replacements is None:
self.model.replacements = state._replacements
else:
self.merge_replacements(self.model.replacements, state._replacements)
self.model.equivalence |= state._equivalence
# TODO: Clear registers according to calling conventions
if self.model.node_iterations[block_key] < self._max_iterations:
return True, state
else:
return False, state
def _process_input_state_for_successor(
self, node, successor, input_state: Union[PropagatorAILState, PropagatorVEXState]
):
if self._only_consts and isinstance(input_state, PropagatorAILState):
key = node.addr, successor.addr
if key in self.model.block_initial_reg_values:
input_state: PropagatorAILState = input_state.copy()
for reg_atom, reg_value in self.model.block_initial_reg_values[key]:
input_state.store_register(
reg_atom,
PropValue(
claripy.BVV(reg_value.value, reg_value.bits),
offset_and_details={0: Detail(reg_atom.size, reg_value, self._initial_codeloc)},
),
)
return input_state
return input_state
def _intra_analysis(self):
pass
def _check_func_complete(self, func):
"""
Checks if a function is completely created by the CFG. Completed
functions are passed to the Propagator at initialization. Defaults to
being empty if no pass is initiated.
:param func: Function to check (knowledge_plugins.functions.function.Function)
:return: Bool
"""
complete = False
if self._completed_funcs is None:
return complete
if func.addr in self._completed_funcs:
complete = True
return complete
def _post_analysis(self):
"""
Post Analysis of Propagation().
We add the current propagation replacements result to the kb if the
function has already been completed in cfg creation.
"""
# Filter replacements and remove all TOP values
if self.model.replacements is not None:
for codeloc in list(self.model.replacements.keys()):
filtered_rep = {}
for k, v in self.model.replacements[codeloc].items():
if isinstance(v, claripy.ast.Base):
# claripy expressions
if not PropagatorState.is_top(v):
filtered_rep[k] = v
else:
# AIL expressions
if not PropagatorAILState.is_top(v):
filtered_rep[k] = v
self.model.replacements[codeloc] = filtered_rep
if self._cache_results:
self.kb.propagations.update(self.prop_key, self.model)
def _analyze(self):
"""
The main analysis for Propagator. Overwritten to include an optimization to stop
analysis if we have already analyzed the entire function once.
"""
self._pre_analysis()
# normal analysis execution
if self._graph_visitor is None:
# There is no base graph that we can rely on. The analysis itself should generate successors for the
# current job.
# An example is the CFG recovery.
self._analysis_core_baremetal()
else:
# We have a base graph to follow. Just handle the current job.
self._analysis_core_graph()
self._post_analysis()
[docs] @staticmethod
def merge_replacements(replacements_0, replacements_1) -> bool:
"""
The replacement merging logic is special: replacements_1 is the newer replacement result and replacement_0 is
the older result waiting to be updated. When both replacements_1 and replacement_0 have a non-top value for the
same variable and code location, we will update the slot in replacement_0 with the value from replacement_1.
:return: Whether merging has happened or not.
"""
merge_occurred = False
for loc, vars_ in replacements_1.items():
if loc not in replacements_0:
replacements_0[loc] = vars_.copy()
merge_occurred = True
else:
for var, repl in vars_.items():
if var not in replacements_0[loc]:
replacements_0[loc][var] = repl
merge_occurred = True
else:
if PropagatorState.is_top(repl) or PropagatorState.is_top(replacements_0[loc][var]):
t = PropagatorState.top(repl.bits if isinstance(repl, ailment.Expression) else repl.size())
replacements_0[loc][var] = t
merge_occurred = True
elif (
isinstance(replacements_0[loc][var], claripy.ast.Base) or isinstance(repl, claripy.ast.Base)
) and replacements_0[loc][var] is not repl:
replacements_0[loc][var] = repl
merge_occurred = True
elif (
not isinstance(replacements_0[loc][var], claripy.ast.Base)
and not isinstance(repl, claripy.ast.Base)
and replacements_0[loc][var] != repl
):
replacements_0[loc][var] = repl
merge_occurred = True
return merge_occurred
register_analysis(PropagatorAnalysis, "Propagator")