Source code for angr.analyses.propagator.engine_vex

from typing import TYPE_CHECKING
import logging

import claripy
import pyvex
import archinfo

from angr.knowledge_plugins.propagations.states import RegisterAnnotation, RegisterComparisonAnnotation
from ...engines.light import SimEngineLightVEXMixin
from ...calling_conventions import DEFAULT_CC, SYSCALL_CC, default_cc, SimRegArg
from .values import Top, Bottom
from .engine_base import SimEnginePropagatorBase
from .top_checker_mixin import TopCheckerMixin
from .vex_vars import VEXReg, VEXTmp, VEXMemVar

if TYPE_CHECKING:
    from angr.analyses.propagator.propagator import PropagatorVEXState


_l = logging.getLogger(name=__name__)


[docs]class SimEnginePropagatorVEX( TopCheckerMixin, SimEngineLightVEXMixin, SimEnginePropagatorBase, ): state: "PropagatorVEXState" # # Private methods # def _process_block_end(self): super()._process_block_end() if self.block.vex.jumpkind == "Ijk_Call": if self.arch.call_pushes_ret: # pop ret from the stack sp_offset = self.arch.sp_offset sp_value = self.state.load_register(sp_offset, self.arch.bytes) if sp_value is not None: self.state.store_register(sp_offset, self.arch.bytes, sp_value + self.arch.bytes) if self.block.vex.jumpkind == "Ijk_Call" or self.block.vex.jumpkind.startswith("Ijk_Sys"): self._handle_return_from_call() def _allow_loading(self, addr, size): if type(addr) in (Top, Bottom): return False if self._load_callback is None: return True return self._load_callback(addr, size) def _expr(self, expr): v = super()._expr(expr) if v is not None and type(v) not in {Bottom, Top} and v is not expr: # Record the replacement if type(expr) is pyvex.IRExpr.Get: if expr.offset not in ( self.arch.sp_offset, self.arch.ip_offset, ): self.state.add_replacement( self._codeloc(block_only=False), VEXReg(expr.offset, expr.result_size(self.tyenv) // 8), v ) return v def _load_data(self, addr, size, endness): if isinstance(addr, claripy.ast.Base): sp_offset = self.extract_offset_to_sp(addr) if sp_offset is not None: # Local variable v = self.state.load_local_variable(sp_offset, size, endness) return v elif addr.op == "BVV": addr = addr.args[0] # Try loading from the state if self._allow_loading(addr, size): if self.base_state is not None: _l.debug("Loading %d bytes from %x.", size, addr) data = self.base_state.memory.load(addr, size, endness=endness) if not data.symbolic: return data else: try: val = self.project.loader.memory.unpack_word(addr, size=size, endness=endness) return claripy.BVV(val, size * self.arch.byte_width) except KeyError: return None return None # # Function handlers # def _handle_function(self, addr): if self.arch.name == "X86": if isinstance(addr, claripy.ast.Base) and addr.op == "BVV": try: b = self._project.loader.memory.load(addr.args[0], 4) except KeyError: return except TypeError: return if b == b"\x8b\x1c\x24\xc3": # getpc: # mov ebx, [esp] # ret ebx_offset = self.arch.registers["ebx"][0] self.state.store_register(ebx_offset, 4, claripy.BVV(self.block.addr + self.block.size, 32)) def _handle_return_from_call(self): # FIXME: Handle the specific function calling convention when known syscall = self.block.vex.jumpkind.startswith("Ijk_Sys") cc_map = SYSCALL_CC if syscall else DEFAULT_CC if self.arch.name in cc_map: cc = default_cc( self.arch.name, platform=self.project.simos.name if self.project.simos is not None else None, syscall=syscall, ) # don't instantiate the class for speed if isinstance(cc.RETURN_VAL, SimRegArg): offset, size = self.arch.registers[cc.RETURN_VAL.reg_name] self.state.store_register(offset, size, self.state.top(size * self.arch.byte_width)) if cc.CALLER_SAVED_REGS: for reg_name in cc.CALLER_SAVED_REGS: offset, size = self.arch.registers[reg_name] self.state.store_register(offset, size, self.state.top(size * self.arch.byte_width)) # # VEX statement handlers # def _handle_Dirty(self, stmt): # For RISCV CSR and mret operations, the Dirty statement is skipped. if archinfo.arch_riscv64.is_riscv_arch(self.project.arch): helper = str(stmt.cee) if helper in ( "riscv_dirtyhelper_CSR_rw", "riscv_dirtyhelper_CSR_s", "riscv_dirtyhelper_CSR_c", "riscv_dirtyhelper_mret", ): pass else: self.l.warning("Unimplemented Dirty node for current architecture.") def _handle_WrTmp(self, stmt): super()._handle_WrTmp(stmt) if stmt.tmp in self.tmps: self.state.add_replacement(self._codeloc(block_only=True), VEXTmp(stmt.tmp), self.tmps[stmt.tmp]) def _handle_Put(self, stmt): size = stmt.data.result_size(self.tyenv) // self.arch.byte_width data = self._expr(stmt.data) if not (data is None or self.state.is_top(data)) or self.state._store_tops: if data is None: # make sure it's a top data = self.state.top(size * self.arch.byte_width) self.state.store_register(stmt.offset, size, data) self.state.add_replacement(self._codeloc(block_only=False), VEXReg(stmt.offset, size), data) def _handle_PutI(self, stmt): self._expr(stmt.data) def _store_data(self, addr, data, size, endness): # pylint: disable=unused-argument,no-self-use if isinstance(addr, claripy.ast.Base): sp_offset = self.extract_offset_to_sp(addr) if sp_offset is not None: # Local variables self.state.store_local_variable(sp_offset, size, data, endness) elif addr.op == "BVV": # a memory address addr = addr.args[0] variable = VEXMemVar(addr, size) self.state.add_replacement(self._codeloc(block_only=False), variable, data) def _handle_Store(self, stmt): addr = self._expr(stmt.addr) if self.state.is_top(addr): return size = stmt.data.result_size(self.tyenv) // self.arch.byte_width data = self._expr(stmt.data) if not (data is None or self.state.is_top(data)) or self.state._store_tops: if data is None: # make sure it's a top data = self.state.top(size * self.arch.byte_width) self._store_data(addr, data, size, self.arch.memory_endness) def _handle_LoadG(self, stmt): guard = self._expr(stmt.guard) if guard is True: addr = self._expr(stmt.addr) if addr is not None: self.tmps[stmt.dst] = self._load_data( addr, stmt.alt.result_size(self.tyenv) // 8, self.arch.memory_endness ) elif guard is False: data = self._expr(stmt.alt) self.tmps[stmt.dst] = data else: self.tmps[stmt.dst] = None # add replacement if stmt.dst in self.tmps and self.tmps[stmt.dst]: self.state.add_replacement(self._codeloc(block_only=True), VEXTmp(stmt.dst), self.tmps[stmt.dst]) def _handle_StoreG(self, stmt): guard = self._expr(stmt.guard) data = self._expr(stmt.data) if guard is True: addr = self._expr(stmt.addr) if addr is not None: self._store_data(addr, data, stmt.data.result_size(self.tyenv) // 8, self.arch.memory_endness) # elif guard is False: # data = self._expr(stmt.alt) # self.tmps[stmt.dst] = data # else: # self.tmps[stmt.dst] = None def _handle_LLSC(self, stmt: pyvex.IRStmt.LLSC): if stmt.storedata is None: # load-link addr = self._expr(stmt.addr) size = self.tyenv.sizeof(stmt.result) // self.arch.byte_width data = self._load_data(addr, size, stmt.endness) if data is not None: self.tmps[stmt.result] = data if stmt.result in self.tmps: self.state.add_replacement(self._codeloc(block_only=True), VEXTmp(stmt.result), self.tmps[stmt.result]) else: # store-conditional storedata = self._expr(stmt.storedata) if storedata is not None: addr = self._expr(stmt.addr) size = storedata.size() // self.arch.byte_width self._store_data(addr, storedata, size, stmt.endness) self.tmps[stmt.result] = 1 self.state.add_replacement(self._codeloc(block_only=True), VEXTmp(stmt.result), self.tmps[stmt.result]) def _handle_CmpEQ(self, expr): arg0, arg1 = self._expr(expr.args[0]), self._expr(expr.args[1]) if arg1 is not None and arg1.concrete and arg0 is not None and len(arg0.annotations) == 1: anno = arg0.annotations[0] if isinstance(anno, RegisterAnnotation): cmp_anno = RegisterComparisonAnnotation(anno.offset, anno.size, "eq", arg1.concrete_value) bits = expr.result_size(self.tyenv) return self.state.top(bits).annotate(cmp_anno) return super()._handle_CmpEQ(expr) # # Expression handlers # def _handle_Get(self, expr): size = expr.result_size(self.tyenv) // self.arch.byte_width return self.state.load_register(expr.offset, size) def _handle_GetI(self, expr): return self.state.top(expr.result_size(self.tyenv)) def _handle_Load(self, expr): addr = self._expr(expr.addr) if addr is None or type(addr) in (Top, Bottom): return None size = expr.result_size(self.tyenv) // self.arch.byte_width return self._load_data(addr, size, expr.endness) def _handle_CCall(self, expr): return None def _handle_Binop(self, expr: pyvex.IRExpr.Binop): if not self.state.do_binops: return self.state.top(expr.result_size(self.tyenv)) r = super()._handle_Binop(expr) # print(expr.op, r) return r def _handle_Triop(self, expr: pyvex.IRExpr.Triop): if not self.state.do_binops: return self.state.top(expr.result_size(self.tyenv)) r = super()._handle_Triop(expr) return r def _handle_Conversion(self, expr): expr_ = self._expr(expr.args[0]) to_size = expr.result_size(self.tyenv) if expr_ is None: return self._top(to_size) if self._is_top(expr_): return self._top(to_size).annotate(*expr_.annotations) if isinstance(expr_, claripy.ast.Base) and expr_.op == "BVV": if expr_.size() > to_size: # truncation return expr_[to_size - 1 : 0] elif expr_.size() < to_size: # extension return claripy.ZeroExt(to_size - expr_.size(), expr_) else: return expr_ return self._top(to_size) def _handle_Exit(self, stmt): guard = self._expr(stmt.guard) if guard is not None and len(guard.annotations) == 1: dst = self._expr(stmt.dst) if dst is not None and dst.concrete: anno = guard.annotations[0] if isinstance(anno, RegisterComparisonAnnotation): if anno.cmp_op == "eq": v = (anno.offset, anno.size, anno.value) if v not in self.state.block_initial_reg_values[self.block.addr, dst.concrete_value]: self.state.block_initial_reg_values[self.block.addr, dst.concrete_value].append(v) super()._handle_Exit(stmt) _handle_CmpF = _handle_CmpEQ