Source code for angr.analyses.propagator.propagator

# pylint:disable=isinstance-second-argument-not-valid-type
from typing import Optional, Any, Tuple, Union, TYPE_CHECKING
import logging
import time

import claripy
import ailment
import pyvex

from angr.code_location import CodeLocation
from angr.analyses import ForwardAnalysis, visitors
from angr.knowledge_plugins.propagations.propagation_model import PropagationModel
from angr.knowledge_plugins.propagations.prop_value import PropValue, Detail
from angr.knowledge_plugins.propagations.states import PropagatorAILState, PropagatorVEXState, PropagatorState
from ... import sim_options
from .. import register_analysis
from ..analysis import Analysis
from .engine_vex import SimEnginePropagatorVEX
from .engine_ail import SimEnginePropagatorAIL

if TYPE_CHECKING:
    from angr.analyses.reaching_definitions.reaching_definitions import ReachingDefinitionsModel


_l = logging.getLogger(name=__name__)


[docs]class PropagatorAnalysis(ForwardAnalysis, Analysis): # pylint:disable=abstract-method """ PropagatorAnalysis implements copy propagation. It propagates values (either constant values or variables) and expressions inside a block or across a function. PropagatorAnalysis supports both VEX and AIL. The VEX propagator only performs constant propagation. The AIL propagator performs both constant propagation and copy propagation of depth-N expressions. PropagatorAnalysis performs certain arithmetic operations between constants, including but are not limited to: - addition - subtraction - multiplication - division - xor It also performs the following memory operations: - Loading values from a known address - Writing values to a stack variable """
[docs] def __init__( self, func=None, block=None, func_graph=None, base_state=None, max_iterations=3, load_callback=None, stack_pointer_tracker=None, only_consts=False, completed_funcs=None, do_binops=True, store_tops=True, vex_cross_insn_opt=False, func_addr: Optional[int] = None, gp: Optional[int] = None, cache_results: bool = False, key_prefix: Optional[str] = None, reaching_definitions: Optional["ReachingDefinitionsModel"] = None, profiling: bool = False, ): if block is None and func is not None: # only func is specified. traversing a function self.flavor = "function" elif block is not None: # traversing a block (but func might be specified at the same time to provide extra information, e.g., the # value for register t9 for MIPS32/64 binaries) self.flavor = "block" else: raise ValueError("Unsupported analysis target.") start = time.perf_counter_ns() / 1000000 self._base_state = base_state self._function = func self._func_addr = func_addr if func_addr is not None else (None if func is None else func.addr) self._block = block self._block_addr = block.addr if block is not None else None self._max_iterations = max_iterations self._load_callback = load_callback self._stack_pointer_tracker = stack_pointer_tracker # only used when analyzing AIL functions self._only_consts = only_consts self._completed_funcs = completed_funcs self._do_binops = do_binops self._store_tops = store_tops self._vex_cross_insn_opt = vex_cross_insn_opt self._gp = gp self._prop_key_prefix = key_prefix self._cache_results = cache_results self._reaching_definitions = reaching_definitions self._initial_codeloc: CodeLocation if self.flavor == "function": self._initial_codeloc = CodeLocation(self._func_addr, stmt_idx=0, ins_addr=self._func_addr) else: # flavor == "block" self._initial_codeloc = CodeLocation(self._block_addr, stmt_idx=0, ins_addr=self._block_addr) self.model: PropagationModel = None if self._cache_results: # Resume the analysis from the previously unfinished result self.model = self.kb.propagations.get(self.prop_key, None) if self.model is None: self.model = PropagationModel( self.prop_key, function=self._function, ) cache_used = False else: cache_used = True graph_visitor: Union[visitors.SingleNodeGraphVisitor, visitors.FunctionGraphVisitor] if self.flavor == "block": graph_visitor = None if self._cache_results: graph_visitor: Optional[visitors.SingleNodeGraphVisitor] = self.model.graph_visitor if graph_visitor is None: graph_visitor = visitors.SingleNodeGraphVisitor(block) elif self.flavor == "function": graph_visitor = None if self._cache_results: graph_visitor: Optional[visitors.FunctionGraphVisitor] = self.model.graph_visitor if graph_visitor is not None: # resume resumed = graph_visitor.resume_with_new_graph(func_graph if func_graph is not None else func.graph) if not resumed: # clean up... self.model = PropagationModel(self.prop_key, function=self._function) if graph_visitor is None: graph_visitor = visitors.FunctionGraphVisitor(func, func_graph) self.model.graph_visitor = graph_visitor else: raise TypeError(f"Unsupported flavor {self.flavor}") ForwardAnalysis.__init__( self, order_jobs=True, allow_merging=True, allow_widening=False, graph_visitor=graph_visitor ) self._engine_vex = SimEnginePropagatorVEX( project=self.project, arch=self.project.arch, reaching_definitions=self._reaching_definitions ) self._engine_ail = SimEnginePropagatorAIL( arch=self.project.arch, stack_pointer_tracker=self._stack_pointer_tracker, # We only propagate tmps within the same block. This is because the lifetime of tmps is one block only. propagate_tmps=block is not None, reaching_definitions=self._reaching_definitions, ) # optimization: skip state copying for the initial state self._initial_state = None # performance counters self._analyzed_states: int = 0 self._analyzed_statements: int = 0 self._analyze() if self._cache_results: # update the cache self.kb.propagations.update(self.prop_key, self.model) if profiling: elapsed = time.perf_counter_ns() / 1000000 - start if self.flavor == "function": _l.warning("%r:", self._function) else: _l.warning("%r:", self._block) _l.warning(" Time elapsed: %s milliseconds", elapsed) _l.warning(" Cache used: %s", cache_used) _l.warning(" Analyzed states: %d", self._analyzed_states) _l.warning(" Analyzed statements: %d", self._analyzed_statements)
@property def prop_key(self) -> Tuple[Optional[str], str, int, bool, bool, bool]: """ Gets a key that represents the function and the "flavor" of the propagation result. """ addr = self._func_addr if self._func_addr is not None else self._block_addr return self._prop_key_prefix, self.flavor, addr, self._do_binops, self._only_consts, self._vex_cross_insn_opt @property def replacements(self): return self.model.replacements @replacements.setter def replacements(self, v): self.model.replacements = v # # Main analysis routines # def _node_key(self, node: Union[ailment.Block, pyvex.IRSB]) -> Any: if type(node) is ailment.Block: return node.addr, node.idx elif type(node) is pyvex.IRSB: return node.addr # fallback return node def _pre_analysis(self): pass def _pre_job_handling(self, job): pass def _initial_abstract_state(self, node): cls = PropagatorAILState if isinstance(node, ailment.Block) else PropagatorVEXState self._initial_state = cls.initial_state( self.project, only_consts=self._only_consts, gp=self._gp, do_binops=self._do_binops, store_tops=self._store_tops, func_addr=self._func_addr, max_prop_expr_occurrence=1 if self.flavor == "function" else 0, initial_codeloc=self._initial_codeloc, ) return self._initial_state def _merge_states(self, node, *states: PropagatorState): merged_state, merge_occurred = states[0].merge(*states[1:]) return merged_state, not merge_occurred def _run_on_node(self, node, state): self._analyzed_states += 1 if isinstance(node, ailment.Block): block = node block_key = (node.addr, node.idx) engine = self._engine_ail else: block = self.project.factory.block( node.addr, node.size, opt_level=1, cross_insn_opt=self._vex_cross_insn_opt ) block_key = node.addr engine = self._engine_vex if block.size == 0: # maybe the block is not decodeable return False, state if state is not self._initial_state: # make a copy of the state if it's not the initial state state = state.copy() state._equivalence.clear() else: # clear self._initial_state so that we *do not* run this optimization again! self._initial_state = None # Suppress spurious output if self._base_state is not None: self._base_state.options.add(sim_options.SYMBOL_FILL_UNCONSTRAINED_REGISTERS) self._base_state.options.add(sim_options.SYMBOL_FILL_UNCONSTRAINED_MEMORY) state = engine.process( state, block=block, project=self.project, base_state=self._base_state, load_callback=self._load_callback, fail_fast=self._fail_fast, ) state.filter_replacements() self.model.node_iterations[block_key] += 1 self.model.states[block_key] = state if isinstance(state, PropagatorAILState): self.model.block_initial_reg_values.update(state.block_initial_reg_values) if self.model.replacements is None: self.model.replacements = state._replacements else: PropagatorState.merge_replacements(self.model.replacements, state._replacements) self.model.equivalence |= state._equivalence # TODO: Clear registers according to calling conventions if self.model.node_iterations[block_key] < self._max_iterations: return True, state else: return False, state def _process_input_state_for_successor( self, node, successor, input_state: Union[PropagatorAILState, PropagatorVEXState] ): if self._only_consts and isinstance(input_state, PropagatorAILState): key = node.addr, successor.addr if key in self.model.block_initial_reg_values: input_state: PropagatorAILState = input_state.copy() for reg_atom, reg_value in self.model.block_initial_reg_values[key]: input_state.store_register( reg_atom, PropValue( claripy.BVV(reg_value.value, reg_value.bits), offset_and_details={0: Detail(reg_atom.size, reg_value, self._initial_codeloc)}, ), ) return input_state return input_state def _intra_analysis(self): pass def _check_func_complete(self, func): """ Checks if a function is completely created by the CFG. Completed functions are passed to the Propagator at initialization. Defaults to being empty if no pass is initiated. :param func: Function to check (knowledge_plugins.functions.function.Function) :return: Bool """ complete = False if self._completed_funcs is None: return complete if func.addr in self._completed_funcs: complete = True return complete def _post_analysis(self): """ Post Analysis of Propagation(). We add the current propagation replacements result to the kb if the function has already been completed in cfg creation. """ # Filter replacements and remove all TOP values if self.model.replacements is not None: for codeloc in list(self.model.replacements.keys()): filtered_rep = {} for k, v in self.model.replacements[codeloc].items(): if isinstance(v, claripy.ast.Base): # claripy expressions if not PropagatorState.is_top(v): filtered_rep[k] = v else: # AIL expressions if not PropagatorAILState.is_top(v): filtered_rep[k] = v self.model.replacements[codeloc] = filtered_rep if self._cache_results: self.kb.propagations.update(self.prop_key, self.model) def _analyze(self): """ The main analysis for Propagator. Overwritten to include an optimization to stop analysis if we have already analyzed the entire function once. """ self._pre_analysis() # normal analysis execution if self._graph_visitor is None: # There is no base graph that we can rely on. The analysis itself should generate successors for the # current job. # An example is the CFG recovery. self._analysis_core_baremetal() else: # We have a base graph to follow. Just handle the current job. self._analysis_core_graph() self._post_analysis()
register_analysis(PropagatorAnalysis, "Propagator")