Source code for angr.engines.pcode.emulate

import logging
from typing import Union

from pypcode import OpCode, Varnode, PcodeOp, Translation
import claripy
from claripy.ast.bv import BV

from ..engine import SimEngineBase
from ...utils.constants import DEFAULT_STATEMENT
from .lifter import IRSB
from .behavior import OpBehavior
from ...errors import AngrError
from ...state_plugins.inspect import BP_BEFORE, BP_AFTER


l = logging.getLogger(__name__)


[docs]class PcodeEmulatorMixin(SimEngineBase): """ Mixin for p-code execution. """ _current_ins: Union[Translation, None] _current_op: Union[PcodeOp, None] _current_behavior: Union[OpBehavior, None]
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._current_ins = None self._current_op = None self._current_behavior = None self._special_op_handlers = { OpCode.LOAD: self._execute_load, OpCode.STORE: self._execute_store, OpCode.BRANCH: self._execute_branch, OpCode.CBRANCH: self._execute_cbranch, OpCode.BRANCHIND: self._execute_branchind, OpCode.CALL: self._execute_call, OpCode.CALLIND: self._execute_callind, OpCode.CALLOTHER: self._execute_callother, OpCode.RETURN: self._execute_ret, OpCode.MULTIEQUAL: self._execute_multiequal, OpCode.INDIRECT: self._execute_indirect, OpCode.SEGMENTOP: self._execute_segment_op, OpCode.CPOOLREF: self._execute_cpool_ref, OpCode.NEW: self._execute_new, }
[docs] def handle_pcode_block(self, irsb: IRSB) -> None: """ Execute a single IRSB. :param irsb: Block to be executed. """ self.irsb = irsb # Hack on a handler here to track whether exit has been handled or not # FIXME: Vex models this as a known exit statement, which we should also # do here. For now, handle it this way. self.state.scratch.exit_handled = False fallthru_addr = None for i, ins in enumerate(irsb._instructions): l.debug( "Executing machine instruction @ %#x (%d of %d)", ins.address.offset, i + 1, len(irsb._instructions) ) # Execute a single instruction of the emulated machine self._current_ins = ins self.state.scratch.ins_addr = self._current_ins.address.offset # FIXME: Hacking this on here but ideally should use "scratch". self._pcode_tmps = {} # FIXME: Consider alignment requirements self.state._inspect("instruction", BP_BEFORE, instruction=self._current_ins.address.offset) offset = self.state.scratch.statement_offset self.state.scratch.statement_offset = 0 for op in self._current_ins.ops[offset:]: self._current_op = op self._current_behavior = irsb.behaviors.get_behavior_for_opcode(self._current_op.opcode) l.debug("Executing p-code op: %s", self._current_op) self._execute_current_op() self.state._inspect("instruction", BP_AFTER) self._current_op = None self._current_behavior = None fallthru_addr = ins.address.offset + ins.length if not self.state.scratch.exit_handled: self.successors.add_successor( self.state, fallthru_addr, self.state.scratch.guard, "Ijk_Boring", exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, )
def _execute_current_op(self) -> None: """ Execute the current p-code operation. """ assert self._current_behavior is not None if self._current_behavior.is_special: self._special_op_handlers[self._current_behavior.opcode]() elif self._current_behavior.is_unary: self._execute_unary() else: self._execute_binary() def _map_register_name(self, varnode: Varnode) -> int: """ Map SLEIGH register offset to ArchInfo register offset based on name. :param varnode: Varnode to translate. :return: Register file offset. """ # FIXME: Will need performance optimization # FIXME: Should not get trans object this way. There should be a faster mapping method than going through trans reg_name = varnode.get_register_name() try: reg_offset = self.state.project.arch.get_register_offset(reg_name.lower()) l.debug("Mapped register '%s' to offset %x", reg_name, reg_offset) except ValueError: reg_offset = varnode.offset + 0x100000 l.debug("Could not map register '%s' from archinfo. Mapping to %x", reg_name, reg_offset) return reg_offset @staticmethod def _adjust_value_size(num_bits: int, v_in: BV) -> BV: """ Ensure given bv is num_bits bits long by either zero extending or truncating. """ if v_in.size() > num_bits: v_out = v_in[num_bits - 1 : 0] l.debug("Truncating value %s (%d bits) to %s (%d bits)", v_in, v_in.size(), v_out, num_bits) return v_out elif v_in.size() < num_bits: v_out = v_in.zero_extend(num_bits - v_in.size()) l.debug("Extending value %s (%d bits) to %s (%d bits)", v_in, v_in.size(), v_out, num_bits) return v_out else: return v_in def _set_value(self, varnode: Varnode, value: BV) -> None: """ Store a value for a given varnode. This method stores to the appropriate register, or unique space, depending on the space indicated by the varnode. :param varnode: Varnode to store into. :param value: Value to store. """ space_name = varnode.space.name # FIXME: Consider moving into behavior.py value = self._adjust_value_size(varnode.size * 8, value) assert varnode.size * 8 == value.size() l.debug("Storing %s %x %s %d", space_name, varnode.offset, value, varnode.size) if space_name == "register": self.state.registers.store( self._map_register_name(varnode), value, size=varnode.size, endness=self.project.arch.register_endness ) elif space_name == "unique": self._pcode_tmps[varnode.offset] = value elif space_name in ("ram", "mem"): l.debug("Storing %s to offset %s", value, varnode.offset) self.state.memory.store(varnode.offset, value, endness=self.project.arch.memory_endness) else: raise AngrError(f"Attempted write to unhandled address space '{space_name}'") def _get_value(self, varnode: Varnode) -> BV: """ Get a value for a given varnode. This method loads from the appropriate const, register, unique, or RAM space, depending on the space indicated by the varnode. :param varnode: Varnode to load from. :return: Value loaded. """ space_name = varnode.space.name size = varnode.size l.debug("Loading %s - %x x %d", space_name, varnode.offset, size) if space_name == "const": return claripy.BVV(varnode.offset, size * 8) elif space_name == "register": return self.state.registers.load( self._map_register_name(varnode), size=size, endness=self.project.arch.register_endness ) elif space_name == "unique": # FIXME: Support loading data of different sizes. For now, assume # size of values read are same as size written. try: assert self._pcode_tmps[varnode.offset].size() == size * 8 except KeyError: # FIXME: Add unique space to state tracking? l.warning("Uninitialized read from unique space offset %x", varnode.offset) self._pcode_tmps[varnode.offset] = claripy.BVV(0, size * 8) return self._pcode_tmps[varnode.offset] elif space_name in ("ram", "mem"): val = self.state.memory.load(varnode.offset, endness=self.project.arch.memory_endness, size=size) l.debug("Loaded %s from offset %s", val, varnode.offset) return val else: raise AngrError(f"Attempted read from unhandled address space '{space_name}'") def _execute_unary(self) -> None: """ Execute the unary behavior of the current op. """ in1 = self._get_value(self._current_op.inputs[0]) l.debug("in1 = %s", in1) out = self._current_behavior.evaluate_unary(self._current_op.output.size, self._current_op.inputs[0].size, in1) l.debug("out unary = %s", out) self._set_value(self._current_op.output, out) def _execute_binary(self) -> None: """ Execute the binary behavior of the current op. """ in1 = self._get_value(self._current_op.inputs[0]) in2 = self._get_value(self._current_op.inputs[1]) l.debug("in1 = %s", in1) l.debug("in2 = %s", in2) out = self._current_behavior.evaluate_binary( self._current_op.output.size, self._current_op.inputs[0].size, in1, in2 ) l.debug("out binary = %s", out) self._set_value(self._current_op.output, out) def _execute_load(self) -> None: """ Execute a p-code load operation. """ spc = self._current_op.inputs[0].get_space_from_const() off = self._get_value(self._current_op.inputs[1]) out = self._current_op.output if spc.name in ("ram", "mem"): res = self.state.memory.load(off, out.size, endness=self.project.arch.memory_endness) elif spc.name in "register": res = self.state.registers.load(off, size=out.size, endness=self.project.arch.register_endness) else: raise AngrError("Load from unhandled address space") l.debug("Loaded %s from offset %s", res, off) self._set_value(out, res) def _execute_store(self) -> None: """ Execute a p-code store operation. """ spc = self._current_op.inputs[0].get_space_from_const() off = self._get_value(self._current_op.inputs[1]) data = self._get_value(self._current_op.inputs[2]) l.debug("Storing %s at offset %s", data, off) if spc.name in ("ram", "mem"): self.state.memory.store(off, data, endness=self.project.arch.memory_endness) elif spc.name == "register": self.state.registers.store(off, data, endness=self.project.arch.register_endness) else: raise AngrError("Store to unhandled address space") def _execute_branch(self) -> None: """ Execute a p-code branch operation. """ dest_addr = self._current_op.inputs[0].get_addr() if dest_addr.is_constant: expr = self.state.scratch.ins_addr self.state.scratch.statement_offset = dest_addr.offset + self._current_op.seq.uniq else: expr = dest_addr.offset self.successors.add_successor( self.state, expr, self.state.scratch.guard, "Ijk_Boring", exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, ) self.state.scratch.exit_handled = True def _execute_cbranch(self) -> None: """ Execute a p-code conditional branch operation. """ exit_state = self.state.copy() cond = self._get_value(self._current_op.inputs[1]) dest_addr = self._current_op.inputs[0].get_addr() if dest_addr.is_constant: expr = exit_state.scratch.ins_addr exit_state.scratch.statement_offset = dest_addr.offset + self._current_op.seq.uniq else: expr = dest_addr.offset self.successors.add_successor( exit_state, expr, cond != 0, "Ijk_Boring", exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, ) cont_state = self.state cont_condition = cond == 0 cont_state.add_constraints(cont_condition) cont_state.scratch.guard = claripy.And(cont_state.scratch.guard, cont_condition) def _execute_ret(self) -> None: """ Execute a p-code return operation. """ ret_addr = self._get_value(self._current_op.inputs[0]) self.successors.add_successor( self.state, ret_addr, self.state.scratch.guard, "Ijk_Ret", exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, ) self.state.scratch.exit_handled = True def _execute_branchind(self) -> None: """ Execute a p-code indirect branch operation. """ expr = self._get_value(self._current_op.inputs[0]) self.successors.add_successor( self.state, expr, self.state.scratch.guard, "Ijk_Boring", exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, ) self.state.scratch.exit_handled = True def _execute_call(self) -> None: """ Execute a p-code call operation. """ expr = self._current_op.inputs[0].get_addr().offset self.successors.add_successor( self.state.copy(), # FIXME: Check extra processing after call expr, self.state.scratch.guard, "Ijk_Call", exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, ) self.state.scratch.exit_handled = True def _execute_callind(self) -> None: """ Execute a p-code indirect call operation. """ expr = self._get_value(self._current_op.inputs[0]) self.successors.add_successor( self.state, expr, self.state.scratch.guard, "Ijk_Call", exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, ) self.state.scratch.exit_handled = True def _execute_callother(self) -> None: # pylint:disable=no-self-use raise AngrError("CALLOTHER emulation not currently supported") def _execute_multiequal(self) -> None: # pylint:disable=no-self-use raise AngrError("MULTIEQUAL appearing in unheritaged code?") def _execute_indirect(self) -> None: # pylint:disable=no-self-use raise AngrError("INDIRECT appearing in unheritaged code?") def _execute_segment_op(self) -> None: # pylint:disable=no-self-use raise AngrError("SEGMENTOP emulation not currently supported") def _execute_cpool_ref(self) -> None: # pylint:disable=no-self-use raise AngrError("Cannot currently emulate cpool operator") def _execute_new(self) -> None: # pylint:disable=no-self-use raise AngrError("Cannot currently emulate new operator")