Source code for angr.analyses.init_finder

from collections import defaultdict

from cle.loader import MetaELF
from cle.backends import Section, Segment
import pyvex
import claripy

from angr.analyses import visitors, ForwardAnalysis
from ..engines.light import SimEngineLight, SimEngineLightVEXMixin
from . import register_analysis, PropagatorAnalysis
from .analysis import Analysis
from .propagator.vex_vars import VEXTmp


[docs]class SimEngineInitFinderVEX( SimEngineLightVEXMixin, SimEngineLight, ): """ The VEX engine class for InitFinder. """
[docs] def __init__(self, project, replacements, overlay, pointers_only=False): super().__init__() self.project = project self.replacements = replacements self.overlay = overlay self.pointers_only = pointers_only
# # Utils #
[docs] @staticmethod def is_concrete(expr) -> bool: if isinstance(expr, claripy.ast.Base) and expr.op == "BVV": return True if isinstance(expr, int): return True return False
def _is_addr_uninitialized(self, addr): # is it writing to a global, uninitialized region? if isinstance(addr, claripy.ast.Base): addr = addr.args[0] obj = self.project.loader.find_object_containing(addr) if obj is not None: if not obj.has_memory: # Objects without memory are definitely uninitialized return True section: Section = obj.find_section_containing(addr) if section is not None: return section.name in { ".bss", } if isinstance(obj, MetaELF): # for ELFs, if p_memsz >= p_filesz, the extra bytes are considered NOBITS # https://docs.oracle.com/cd/E19120-01/open.solaris/819-0690/gjpww/index.html segment: Segment = obj.find_segment_containing(addr) if segment is not None and segment.memsize > segment.filesize: return segment.vaddr + segment.filesize <= addr < segment.vaddr + segment.memsize return False def _is_pointer(self, addr): if isinstance(addr, claripy.ast.Base): addr = addr.args[0] if isinstance(addr, int): if addr > 0x400: return self.project.loader.find_object_containing(addr) is not None return False # # Statement handlers # def _handle_function(self, *args, **kwargs): pass def _handle_WrTmp(self, stmt): # Don't do anything since constant propagation has already processed it return def _handle_Put(self, stmt): # Don't do anything since constant propagation has already processed it return def _handle_Store(self, stmt): blockloc = self._codeloc(block_only=True) if type(stmt.addr) is pyvex.IRExpr.RdTmp: addr_tmp = VEXTmp(stmt.addr.tmp) if addr_tmp in self.replacements[blockloc]: addr_v = self.replacements[blockloc][addr_tmp] if self.is_concrete(addr_v) and self._is_addr_uninitialized(addr_v): # do we know what it is writing? if isinstance(stmt.data, pyvex.IRExpr.RdTmp): data_v = self._expr(stmt.data) if self.is_concrete(data_v): if isinstance(data_v, int): data_size = self.tyenv.sizeof(stmt.data.tmp) data_v = claripy.BVV(data_v, data_size) if not self.pointers_only or self._is_pointer(data_v): self.overlay.store(addr_v, data_v, endness=self.project.arch.memory_endness) def _handle_StoreG(self, stmt): blockloc = self._codeloc(block_only=True) repl = self.replacements[blockloc] if type(stmt.guard) is pyvex.IRExpr.RdTmp: # check if guard is true tmp = VEXTmp(stmt.guard.tmp) if tmp not in repl or repl[tmp] is not True: return if type(stmt.addr) is pyvex.IRExpr.RdTmp: tmp = VEXTmp(stmt.addr.tmp) if tmp not in repl: return addr_v = repl[tmp] else: return if not (self.is_concrete(addr_v) and self._is_addr_uninitialized(addr_v)): return if type(stmt.data) is pyvex.IRExpr.RdTmp: data_v = self._expr(stmt.data) else: return if self.is_concrete(data_v): if isinstance(data_v, int): data_size = self.tyenv.sizeof(stmt.data.tmp) data_v = claripy.BVV(data_v, data_size) if not self.pointers_only or self._is_pointer(data_v): self.overlay.store(addr_v, data_v, endness=self.project.arch.memory_endness) # # Expression handlers # def _handle_Get(self, expr): return None def _handle_Load(self, expr): return None def _handle_LoadG(self, stmt): return None def _handle_RdTmp(self, expr): blockloc = self._codeloc(block_only=True) tmp = VEXTmp(expr.tmp) if tmp in self.replacements[blockloc]: return self.replacements[blockloc][tmp] return None
[docs]class InitializationFinder(ForwardAnalysis, Analysis): # pylint:disable=abstract-method """ Finds possible initializations for global data sections and generate an overlay to be used in other analyses later on. """
[docs] def __init__( self, func=None, func_graph=None, block=None, max_iterations=1, replacements=None, overlay=None, pointers_only=False, ): self.pointers_only = pointers_only if func is not None: if block is not None: raise ValueError('You cannot specify both "func" and "block".') # traversing a function graph_visitor = visitors.FunctionGraphVisitor(func, func_graph) if replacements is None: prop = self.project.analyses[PropagatorAnalysis].prep()( func=func, func_graph=func_graph, base_state=self.project.factory.blank_state() ) replacements = prop.replacements elif block is not None: # traversing a block graph_visitor = visitors.SingleNodeGraphVisitor(block) if replacements is None: prop = self.project.analyses[PropagatorAnalysis].prep()( block=block, base_state=self.project.factory.blank_state() ) replacements = prop.replacements else: raise ValueError("Unsupported analysis target.") ForwardAnalysis.__init__( self, order_jobs=True, allow_merging=True, allow_widening=False, graph_visitor=graph_visitor ) self._function = func self._max_iterations = max_iterations self._replacements = replacements self._node_iterations = defaultdict(int) self.overlay_state = None if overlay is not None: self.overlay = overlay else: self.overlay_state = self.project.factory.blank_state() self.overlay = self.overlay_state.memory self._engine_vex = SimEngineInitFinderVEX( self.project, replacements, self.overlay, pointers_only=self.pointers_only ) self._engine_ail = None self._analyze()
# # Main analysis routines # def _pre_analysis(self): pass def _pre_job_handling(self, job): pass def _initial_abstract_state(self, node): return None def _merge_states(self, node, *states): return None def _run_on_node(self, node, state): block = self.project.factory.block(node.addr, node.size, opt_level=1, cross_insn_opt=False) block_key = node.addr engine = self._engine_vex engine.process(None, block=block, fail_fast=self._fail_fast) self._node_iterations[block_key] += 1 if self._node_iterations[block_key] < self._max_iterations: return True, None else: return False, None def _intra_analysis(self): pass def _post_analysis(self): pass
register_analysis(InitializationFinder, "InitializationFinder") register_analysis(InitializationFinder, "InitFinder")