Source code for angr.engines.pcode.emulate

import logging
from typing import Union

from pypcode import OpCode, Varnode, PcodeOp, Translation
import claripy
from import BV

from ..engine import SimEngineBase
from ...utils.constants import DEFAULT_STATEMENT
from .lifter import IRSB
from .behavior import OpBehavior
from ...errors import AngrError
from ...state_plugins.inspect import BP_BEFORE, BP_AFTER

l = logging.getLogger(__name__)

[docs]class PcodeEmulatorMixin(SimEngineBase): """ Mixin for p-code execution. """ _current_ins: Union[Translation, None] _current_op: Union[PcodeOp, None] _current_behavior: Union[OpBehavior, None] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._current_ins = None self._current_op = None self._current_behavior = None self._special_op_handlers = { OpCode.LOAD: self._execute_load, OpCode.STORE: self._execute_store, OpCode.BRANCH: self._execute_branch, OpCode.CBRANCH: self._execute_cbranch, OpCode.BRANCHIND: self._execute_branchind, OpCode.CALL: self._execute_call, OpCode.CALLIND: self._execute_callind, OpCode.CALLOTHER: self._execute_callother, OpCode.RETURN: self._execute_ret, OpCode.MULTIEQUAL: self._execute_multiequal, OpCode.INDIRECT: self._execute_indirect, OpCode.SEGMENTOP: self._execute_segment_op, OpCode.CPOOLREF: self._execute_cpool_ref, OpCode.NEW: self._execute_new, }
[docs] def handle_pcode_block(self, irsb: IRSB) -> None: """ Execute a single IRSB. :param irsb: Block to be executed. """ self.irsb = irsb # Hack on a handler here to track whether exit has been handled or not # FIXME: Vex models this as a known exit statement, which we should also # do here. For now, handle it this way. self.state.scratch.exit_handled = False fallthru_addr = None for i, ins in enumerate(irsb._instructions): l.debug( "Executing machine instruction @ %#x (%d of %d)", ins.address.offset, i + 1, len(irsb._instructions) ) # Execute a single instruction of the emulated machine self._current_ins = ins self.state.scratch.ins_addr = self._current_ins.address.offset # FIXME: Hacking this on here but ideally should use "scratch". self._pcode_tmps = {} # FIXME: Consider alignment requirements self.state._inspect("instruction", BP_BEFORE, instruction=self._current_ins.address.offset) offset = self.state.scratch.statement_offset self.state.scratch.statement_offset = 0 for op in self._current_ins.ops[offset:]: self._current_op = op self._current_behavior = irsb.behaviors.get_behavior_for_opcode(self._current_op.opcode) l.debug("Executing p-code op: %s", self._current_op) self._execute_current_op() self.state._inspect("instruction", BP_AFTER) self._current_op = None self._current_behavior = None fallthru_addr = ins.address.offset + ins.length if not self.state.scratch.exit_handled: self.successors.add_successor( self.state, fallthru_addr, self.state.scratch.guard, "Ijk_Boring", exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, )
def _execute_current_op(self) -> None: """ Execute the current p-code operation. """ assert self._current_behavior is not None if self._current_behavior.is_special: self._special_op_handlers[self._current_behavior.opcode]() elif self._current_behavior.is_unary: self._execute_unary() else: self._execute_binary() def _map_register_name(self, varnode: Varnode) -> int: """ Map SLEIGH register offset to ArchInfo register offset based on name. :param varnode: Varnode to translate. :return: Register file offset. """ # FIXME: Will need performance optimization # FIXME: Should not get trans object this way. There should be a faster mapping method than going through trans reg_name = varnode.get_register_name() try: reg_offset = self.state.project.arch.get_register_offset(reg_name.lower()) l.debug("Mapped register '%s' to offset %x", reg_name, reg_offset) except ValueError: reg_offset = varnode.offset + 0x100000 l.debug("Could not map register '%s' from archinfo. Mapping to %x", reg_name, reg_offset) return reg_offset @staticmethod def _adjust_value_size(num_bits: int, v_in: BV) -> BV: """ Ensure given bv is num_bits bits long by either zero extending or truncating. """ if v_in.size() > num_bits: v_out = v_in[num_bits - 1 : 0] l.debug("Truncating value %s (%d bits) to %s (%d bits)", v_in, v_in.size(), v_out, num_bits) return v_out elif v_in.size() < num_bits: v_out = v_in.zero_extend(num_bits - v_in.size()) l.debug("Extending value %s (%d bits) to %s (%d bits)", v_in, v_in.size(), v_out, num_bits) return v_out else: return v_in def _set_value(self, varnode: Varnode, value: BV) -> None: """ Store a value for a given varnode. This method stores to the appropriate register, or unique space, depending on the space indicated by the varnode. :param varnode: Varnode to store into. :param value: Value to store. """ space_name = # FIXME: Consider moving into value = self._adjust_value_size(varnode.size * 8, value) assert varnode.size * 8 == value.size() l.debug("Storing %s %x %s %d", space_name, varnode.offset, value, varnode.size) if space_name == "register": self._map_register_name(varnode), value, size=varnode.size, endness=self.project.arch.register_endness ) elif space_name == "unique": self._pcode_tmps[varnode.offset] = value elif space_name in ("ram", "mem"): l.debug("Storing %s to offset %s", value, varnode.offset), value, endness=self.project.arch.memory_endness) else: raise AngrError(f"Attempted write to unhandled address space '{space_name}'") def _get_value(self, varnode: Varnode) -> BV: """ Get a value for a given varnode. This method loads from the appropriate const, register, unique, or RAM space, depending on the space indicated by the varnode. :param varnode: Varnode to load from. :return: Value loaded. """ space_name = size = varnode.size l.debug("Loading %s - %x x %d", space_name, varnode.offset, size) if space_name == "const": return claripy.BVV(varnode.offset, size * 8) elif space_name == "register": return self.state.registers.load( self._map_register_name(varnode), size=size, endness=self.project.arch.register_endness ) elif space_name == "unique": # FIXME: Support loading data of different sizes. For now, assume # size of values read are same as size written. try: assert self._pcode_tmps[varnode.offset].size() == size * 8 except KeyError: # FIXME: Add unique space to state tracking? l.warning("Uninitialized read from unique space offset %x", varnode.offset) self._pcode_tmps[varnode.offset] = claripy.BVV(0, size * 8) return self._pcode_tmps[varnode.offset] elif space_name in ("ram", "mem"): val = self.state.memory.load(varnode.offset, endness=self.project.arch.memory_endness, size=size) l.debug("Loaded %s from offset %s", val, varnode.offset) return val else: raise AngrError(f"Attempted read from unhandled address space '{space_name}'") def _execute_unary(self) -> None: """ Execute the unary behavior of the current op. """ in1 = self._get_value(self._current_op.inputs[0]) l.debug("in1 = %s", in1) out = self._current_behavior.evaluate_unary(self._current_op.output.size, self._current_op.inputs[0].size, in1) l.debug("out unary = %s", out) self._set_value(self._current_op.output, out) def _execute_binary(self) -> None: """ Execute the binary behavior of the current op. """ in1 = self._get_value(self._current_op.inputs[0]) in2 = self._get_value(self._current_op.inputs[1]) l.debug("in1 = %s", in1) l.debug("in2 = %s", in2) out = self._current_behavior.evaluate_binary( self._current_op.output.size, self._current_op.inputs[0].size, in1, in2 ) l.debug("out binary = %s", out) self._set_value(self._current_op.output, out) def _execute_load(self) -> None: """ Execute a p-code load operation. """ spc = self._current_op.inputs[0].get_space_from_const() off = self._get_value(self._current_op.inputs[1]) out = self._current_op.output if in ("ram", "mem"): res = self.state.memory.load(off, out.size, endness=self.project.arch.memory_endness) elif in "register": res = self.state.registers.load(off, size=out.size, endness=self.project.arch.register_endness) else: raise AngrError("Load from unhandled address space") l.debug("Loaded %s from offset %s", res, off) self._set_value(out, res) def _execute_store(self) -> None: """ Execute a p-code store operation. """ spc = self._current_op.inputs[0].get_space_from_const() off = self._get_value(self._current_op.inputs[1]) data = self._get_value(self._current_op.inputs[2]) l.debug("Storing %s at offset %s", data, off) if in ("ram", "mem"):, data, endness=self.project.arch.memory_endness) elif == "register":, data, endness=self.project.arch.register_endness) else: raise AngrError("Store to unhandled address space") def _execute_branch(self) -> None: """ Execute a p-code branch operation. """ dest_addr = self._current_op.inputs[0].get_addr() if dest_addr.is_constant: expr = self.state.scratch.ins_addr self.state.scratch.statement_offset = dest_addr.offset + self._current_op.seq.uniq else: expr = dest_addr.offset self.successors.add_successor( self.state, expr, self.state.scratch.guard, "Ijk_Boring", exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, ) self.state.scratch.exit_handled = True def _execute_cbranch(self) -> None: """ Execute a p-code conditional branch operation. """ exit_state = self.state.copy() cond = self._get_value(self._current_op.inputs[1]) dest_addr = self._current_op.inputs[0].get_addr() if dest_addr.is_constant: expr = exit_state.scratch.ins_addr exit_state.scratch.statement_offset = dest_addr.offset + self._current_op.seq.uniq else: expr = dest_addr.offset self.successors.add_successor( exit_state, expr, cond != 0, "Ijk_Boring", exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, ) cont_state = self.state cont_condition = cond == 0 cont_state.add_constraints(cont_condition) cont_state.scratch.guard = claripy.And(cont_state.scratch.guard, cont_condition) def _execute_ret(self) -> None: """ Execute a p-code return operation. """ ret_addr = self._get_value(self._current_op.inputs[0]) self.successors.add_successor( self.state, ret_addr, self.state.scratch.guard, "Ijk_Ret", exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, ) self.state.scratch.exit_handled = True def _execute_branchind(self) -> None: """ Execute a p-code indirect branch operation. """ expr = self._get_value(self._current_op.inputs[0]) self.successors.add_successor( self.state, expr, self.state.scratch.guard, "Ijk_Boring", exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, ) self.state.scratch.exit_handled = True def _execute_call(self) -> None: """ Execute a p-code call operation. """ expr = self._current_op.inputs[0].get_addr().offset self.successors.add_successor( self.state.copy(), # FIXME: Check extra processing after call expr, self.state.scratch.guard, "Ijk_Call", exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, ) self.state.scratch.exit_handled = True def _execute_callind(self) -> None: """ Execute a p-code indirect call operation. """ expr = self._get_value(self._current_op.inputs[0]) self.successors.add_successor( self.state, expr, self.state.scratch.guard, "Ijk_Call", exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, ) self.state.scratch.exit_handled = True def _execute_callother(self) -> None: # pylint:disable=no-self-use raise AngrError("CALLOTHER emulation not currently supported") def _execute_multiequal(self) -> None: # pylint:disable=no-self-use raise AngrError("MULTIEQUAL appearing in unheritaged code?") def _execute_indirect(self) -> None: # pylint:disable=no-self-use raise AngrError("INDIRECT appearing in unheritaged code?") def _execute_segment_op(self) -> None: # pylint:disable=no-self-use raise AngrError("SEGMENTOP emulation not currently supported") def _execute_cpool_ref(self) -> None: # pylint:disable=no-self-use raise AngrError("Cannot currently emulate cpool operator") def _execute_new(self) -> None: # pylint:disable=no-self-use raise AngrError("Cannot currently emulate new operator")