Source code for angr.analyses.propagator.engine_vex

from typing import TYPE_CHECKING
import logging

import claripy
import pyvex

from ...engines.light import SimEngineLightVEXMixin
from ...calling_conventions import DEFAULT_CC, SimRegArg
from .values import Top, Bottom
from .engine_base import SimEnginePropagatorBase
from .top_checker_mixin import TopCheckerMixin
from .vex_vars import VEXReg, VEXTmp, VEXMemVar

if TYPE_CHECKING:
    from angr.analyses.propagator.propagator import PropagatorVEXState


_l = logging.getLogger(name=__name__)


[docs]class SimEnginePropagatorVEX( TopCheckerMixin, SimEngineLightVEXMixin, SimEnginePropagatorBase, ): state: "PropagatorVEXState" # # Private methods # def _process(self, state, successors, block=None, whitelist=None, **kwargs): # pylint:disable=arguments-differ super()._process(state, successors, block=block, whitelist=whitelist, **kwargs) if self.block.vex.jumpkind == "Ijk_Call": if self.arch.call_pushes_ret: # pop ret from the stack sp_offset = self.arch.sp_offset sp_value = state.load_register(sp_offset, self.arch.bytes) if sp_value is not None: state.store_register(sp_offset, self.arch.bytes, sp_value + self.arch.bytes) return state def _allow_loading(self, addr, size): if type(addr) in (Top, Bottom): return False if self._load_callback is None: return True return self._load_callback(addr, size) def _expr(self, expr): v = super()._expr(expr) if v is not None and type(v) not in {Bottom, Top} and v is not expr: # Record the replacement if type(expr) is pyvex.IRExpr.Get: if expr.offset not in ( self.arch.sp_offset, self.arch.ip_offset, ): self.state.add_replacement( self._codeloc(block_only=False), VEXReg(expr.offset, expr.result_size(self.tyenv) // 8), v ) return v def _load_data(self, addr, size, endness): if isinstance(addr, claripy.ast.Base): sp_offset = self.extract_offset_to_sp(addr) if sp_offset is not None: # Local variable v = self.state.load_local_variable(sp_offset, size, endness) return v elif addr.op == "BVV": addr = addr.args[0] # Try loading from the state if self._allow_loading(addr, size): if self.base_state is not None: _l.debug("Loading %d bytes from %x.", size, addr) data = self.base_state.memory.load(addr, size, endness=endness) if not data.symbolic: return data else: try: val = self.project.loader.memory.unpack_word(addr, size=size, endness=endness) return claripy.BVV(val, size * self.arch.byte_width) except KeyError: return None return None # # Function handlers # def _handle_function(self, addr): if self.arch.name == "X86": if isinstance(addr, claripy.ast.Base) and addr.op == "BVV": try: b = self._project.loader.memory.load(addr.args[0], 4) except KeyError: return except TypeError: return if b == b"\x8b\x1c\x24\xc3": # getpc: # mov ebx, [esp] # ret ebx_offset = self.arch.registers["ebx"][0] self.state.store_register(ebx_offset, 4, claripy.BVV(self.block.addr + self.block.size, 32)) if self.arch.name in DEFAULT_CC: cc = DEFAULT_CC[self.arch.name] # don't instantiate the class for speed if isinstance(cc.RETURN_VAL, SimRegArg): offset, size = self.arch.registers[cc.RETURN_VAL.reg_name] self.state.store_register(offset, size, self.state.top(size * self.arch.byte_width)) if cc.CALLER_SAVED_REGS: for reg_name in cc.CALLER_SAVED_REGS: offset, size = self.arch.registers[reg_name] self.state.store_register(offset, size, self.state.top(size * self.arch.byte_width)) # # VEX statement handlers # def _handle_WrTmp(self, stmt): super()._handle_WrTmp(stmt) if stmt.tmp in self.tmps: self.state.add_replacement(self._codeloc(block_only=True), VEXTmp(stmt.tmp), self.tmps[stmt.tmp]) def _handle_Put(self, stmt): size = stmt.data.result_size(self.tyenv) // self.arch.byte_width data = self._expr(stmt.data) if not (data is None or self.state.is_top(data)) or self.state._store_tops: if data is None: # make sure it's a top data = self.state.top(size * self.arch.byte_width) self.state.store_register(stmt.offset, size, data) self.state.add_replacement(self._codeloc(block_only=False), VEXReg(stmt.offset, size), data) def _store_data(self, addr, data, size, endness): # pylint: disable=unused-argument,no-self-use if isinstance(addr, claripy.ast.Base): sp_offset = self.extract_offset_to_sp(addr) if sp_offset is not None: # Local variables self.state.store_local_variable(sp_offset, size, data, endness) elif addr.op == "BVV": # a memory address addr = addr.args[0] variable = VEXMemVar(addr, size) self.state.add_replacement(self._codeloc(block_only=False), variable, data) def _handle_Store(self, stmt): addr = self._expr(stmt.addr) if self.state.is_top(addr): return size = stmt.data.result_size(self.tyenv) // self.arch.byte_width data = self._expr(stmt.data) if not (data is None or self.state.is_top(data)) or self.state._store_tops: if data is None: # make sure it's a top data = self.state.top(size * self.arch.byte_width) self._store_data(addr, data, size, self.arch.memory_endness) def _handle_LoadG(self, stmt): guard = self._expr(stmt.guard) if guard is True: addr = self._expr(stmt.addr) if addr is not None: self.tmps[stmt.dst] = self._load_data( addr, stmt.alt.result_size(self.tyenv) // 8, self.arch.memory_endness ) elif guard is False: data = self._expr(stmt.alt) self.tmps[stmt.dst] = data else: self.tmps[stmt.dst] = None # add replacement if stmt.dst in self.tmps and self.tmps[stmt.dst]: self.state.add_replacement(self._codeloc(block_only=True), VEXTmp(stmt.dst), self.tmps[stmt.dst]) def _handle_StoreG(self, stmt): guard = self._expr(stmt.guard) data = self._expr(stmt.data) if guard is True: addr = self._expr(stmt.addr) if addr is not None: self._store_data(addr, data, stmt.data.result_size(self.tyenv) // 8, self.arch.memory_endness) # elif guard is False: # data = self._expr(stmt.alt) # self.tmps[stmt.dst] = data # else: # self.tmps[stmt.dst] = None def _handle_LLSC(self, stmt: pyvex.IRStmt.LLSC): if stmt.storedata is None: # load-link addr = self._expr(stmt.addr) size = self.tyenv.sizeof(stmt.result) // self.arch.byte_width data = self._load_data(addr, size, stmt.endness) if data is not None: self.tmps[stmt.result] = data if stmt.result in self.tmps: self.state.add_replacement(self._codeloc(block_only=True), VEXTmp(stmt.result), self.tmps[stmt.result]) else: # store-conditional storedata = self._expr(stmt.storedata) if storedata is not None: addr = self._expr(stmt.addr) size = storedata.size() // self.arch.byte_width self._store_data(addr, storedata, size, stmt.endness) self.tmps[stmt.result] = 1 self.state.add_replacement(self._codeloc(block_only=True), VEXTmp(stmt.result), self.tmps[stmt.result]) # # Expression handlers # def _handle_Get(self, expr): size = expr.result_size(self.tyenv) // self.arch.byte_width return self.state.load_register(expr.offset, size) def _handle_Load(self, expr): addr = self._expr(expr.addr) if addr is None or type(addr) in (Top, Bottom): return None size = expr.result_size(self.tyenv) // self.arch.byte_width return self._load_data(addr, size, expr.endness) def _handle_CCall(self, expr): return None def _handle_Binop(self, expr: pyvex.IRExpr.Binop): if not self.state.do_binops: return self.state.top(expr.result_size(self.tyenv)) r = super()._handle_Binop(expr) # print(expr.op, r) return r