Source code for angr.engines.pcode.emulate

import logging
from typing import Optional

import claripy
from claripy.ast.bv import BV

from ..engine import SimEngineBase
from ...utils.constants import DEFAULT_STATEMENT
from .lifter import IRSB
from .behavior import OpBehavior
from ...errors import AngrError
from ...state_plugins.inspect import BP_BEFORE, BP_AFTER

try:
    from pypcode import OpCode, Varnode, PcodeOp
except ImportError:
    pass


l = logging.getLogger(__name__)


[docs]class PcodeEmulatorMixin(SimEngineBase): """ Mixin for p-code execution. """ _current_op: Optional["PcodeOp"] _current_op_idx: int _current_behavior: Optional[OpBehavior]
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._current_op = None self._current_behavior = None
[docs] def handle_pcode_block(self, irsb: IRSB) -> None: """ Execute a single P-Code IRSB. :param irsb: Block to be executed. """ self.irsb = irsb # Hack on a handler here to track whether exit has been handled or not # FIXME: Vex models this as a known exit statement, which we should also # do here. For now, handle it this way. self.state.scratch.exit_handled = False self._pcode_tmps = {} fallthru_addr = self.irsb.addr self.state.scratch.ins_addr = self.irsb.addr last_imark_op_idx = 0 # Note: start_op_idx is instruction relative start_op_idx = self.state.scratch.statement_offset self.state.scratch.statement_offset = 0 assert start_op_idx == 0, "FIXME: Test statement_offset behavior" for op_idx, op in enumerate(irsb._ops[start_op_idx:]): # FIXME: Shouldn't use protected members of IRSB op_idx += start_op_idx if op.opcode == OpCode.IMARK: if op_idx > 0: # Trigger BP for previous instruction once we reach next IMARK self.state._inspect("instruction", BP_AFTER) decode_addr = op.inputs[0].offset last_imark_op_idx = op_idx # Note: instruction BP will not be triggered on p-code-relative jumps l.debug("Executing machine instruction @ %#x", decode_addr) for vn in op.inputs: self.state._inspect("instruction", BP_BEFORE, instruction=vn.offset) # FIXME: Hacking this on here but ideally should use "scratch". self._pcode_tmps = {} # FIXME: Consider alignment requirements self.state.scratch.ins_addr = decode_addr fallthru_addr = op.inputs[-1].offset + op.inputs[-1].size continue self._current_op = op self._current_op_idx = op_idx - last_imark_op_idx l.debug("Executing P-Code op: %s", self._current_op) self._execute_current_op() self._current_op = None if self.state.scratch.statement_offset == 0: self.state._inspect("instruction", BP_AFTER) if not self.state.scratch.exit_handled: self.successors.add_successor( self.state, fallthru_addr, self.state.scratch.guard, "Ijk_Boring", exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, )
def _execute_current_op(self) -> None: """ Execute the current p-code operation. """ self._current_behavior = self.irsb.behaviors.get_behavior_for_opcode(self._current_op.opcode) if self._current_behavior.is_special: handlers = { OpCode.LOAD: self._execute_load, OpCode.STORE: self._execute_store, OpCode.BRANCH: self._execute_branch, OpCode.CBRANCH: self._execute_cbranch, OpCode.BRANCHIND: self._execute_branchind, OpCode.CALL: self._execute_call, OpCode.CALLIND: self._execute_callind, OpCode.CALLOTHER: self._execute_callother, OpCode.RETURN: self._execute_ret, OpCode.MULTIEQUAL: self._execute_multiequal, OpCode.INDIRECT: self._execute_indirect, OpCode.SEGMENTOP: self._execute_segment_op, OpCode.CPOOLREF: self._execute_cpool_ref, OpCode.NEW: self._execute_new, } handlers[self._current_behavior.opcode]() elif self._current_behavior.is_unary: self._execute_unary() else: self._execute_binary() self._current_behavior = None def _map_register_name(self, varnode: "Varnode") -> int: """ Map SLEIGH register offset to ArchInfo register offset based on name. :param varnode: Varnode to translate. :return: Register file offset. """ # FIXME: Will need performance optimization # FIXME: Should not get trans object this way. There should be a faster mapping method than going through trans reg_name = varnode.getRegisterName() try: reg_offset = self.state.project.arch.get_register_offset(reg_name.lower()) l.debug("Mapped register '%s' to offset %x", reg_name, reg_offset) except ValueError: reg_offset = varnode.offset + 0x100000 l.debug("Could not map register '%s' from archinfo. Mapping to %x", reg_name, reg_offset) return reg_offset @staticmethod def _adjust_value_size(num_bits: int, v_in: BV) -> BV: """ Ensure given bv is num_bits bits long by either zero extending or truncating. """ if v_in.size() > num_bits: v_out = v_in[num_bits - 1 : 0] l.debug("Truncating value %s (%d bits) to %s (%d bits)", v_in, v_in.size(), v_out, num_bits) return v_out elif v_in.size() < num_bits: v_out = v_in.zero_extend(num_bits - v_in.size()) l.debug("Extending value %s (%d bits) to %s (%d bits)", v_in, v_in.size(), v_out, num_bits) return v_out else: return v_in def _set_value(self, varnode: "Varnode", value: BV) -> None: """ Store a value for a given varnode. This method stores to the appropriate register, or unique space, depending on the space indicated by the varnode. :param varnode: Varnode to store into. :param value: Value to store. """ # FIXME: Consider moving into behavior.py value = self._adjust_value_size(varnode.size * 8, value) assert varnode.size * 8 == value.size() space = varnode.space l.debug("Storing %s %x %s %d", space.name, varnode.offset, value, varnode.size) if space.name == "register": self.state.registers.store( self._map_register_name(varnode), value, size=varnode.size, endness=self.project.arch.register_endness ) elif space.name == "unique": self._pcode_tmps[varnode.offset] = value elif space.name in ("ram", "mem"): l.debug("Storing %s to offset %s", value, varnode.offset) self.state.memory.store(varnode.offset, value, endness=self.project.arch.memory_endness) else: raise AngrError(f"Attempted write to unhandled address space '{space.name}'") def _get_value(self, varnode: "Varnode") -> BV: """ Get a value for a given varnode. This method loads from the appropriate const, register, unique, or RAM space, depending on the space indicated by the varnode. :param varnode: Varnode to load from. :return: Value loaded. """ space_name = varnode.space.name size = varnode.size l.debug("Loading %s - %x x %d", space_name, varnode.offset, size) if space_name == "const": return claripy.BVV(varnode.offset, size * 8) elif space_name == "register": return self.state.registers.load( self._map_register_name(varnode), size=size, endness=self.project.arch.register_endness ) elif space_name == "unique": # FIXME: Support loading data of different sizes. For now, assume # size of values read are same as size written. try: assert self._pcode_tmps[varnode.offset].size() == size * 8 except KeyError: # FIXME: Add unique space to state tracking? l.warning("Uninitialized read from unique space offset %x", varnode.offset) self._pcode_tmps[varnode.offset] = claripy.BVV(0, size * 8) return self._pcode_tmps[varnode.offset] elif space_name in ("ram", "mem"): val = self.state.memory.load(varnode.offset, endness=self.project.arch.memory_endness, size=size) l.debug("Loaded %s from offset %s", val, varnode.offset) return val else: raise AngrError(f"Attempted read from unhandled address space '{space_name}'") def _execute_unary(self) -> None: """ Execute the unary behavior of the current op. """ in0 = self._get_value(self._current_op.inputs[0]) out = self._current_behavior.evaluate_unary(self._current_op.output.size, self._current_op.inputs[0].size, in0) self._set_value(self._current_op.output, out) def _execute_binary(self) -> None: """ Execute the binary behavior of the current op. """ # Validate output assert self._current_op.output is not None if ( self._current_op.opcode in [ OpCode.INT_LESS, OpCode.INT_SLESS, OpCode.INT_LESSEQUAL, OpCode.INT_SLESSEQUAL, OpCode.INT_EQUAL, OpCode.INT_NOTEQUAL, ] and self._current_op.output.size != 1 ): l.warning( "SLEIGH spec states output size for op %s must be 1, but op has %d", self._current_op.opcode.__name__, self._current_op.output.size, ) # Validate ops that mandate inputs of equal sizes # Validate ops that mandate output of greater size # Validate inputs in0 = self._get_value(self._current_op.inputs[0]) in1 = self._get_value(self._current_op.inputs[1]) out = self._current_behavior.evaluate_binary( self._current_op.output.size, self._current_op.inputs[0].size, in0, in1 ) self._set_value(self._current_op.output, out) def _execute_load(self) -> None: """ Execute a p-code load operation. """ space = self._current_op.inputs[0].getSpaceFromConst() offset = self._get_value(self._current_op.inputs[1]) out = self._current_op.output if space.name in ("ram", "mem"): res = self.state.memory.load(offset, out.size, endness=self.project.arch.memory_endness) elif space.name in "register": res = self.state.registers.load(offset, size=out.size, endness=self.project.arch.register_endness) else: raise AngrError("Load from unhandled address space") l.debug("Loaded %s from offset %s", res, offset) self._set_value(out, res) # CHECKME: wordsize condition in cpuid load def _execute_store(self) -> None: """ Execute a p-code store operation. """ space = self._current_op.inputs[0].getSpaceFromConst() offset = self._get_value(self._current_op.inputs[1]) data = self._get_value(self._current_op.inputs[2]) l.debug("Storing %s at offset %s", data, offset) if space.name in ("ram", "mem"): self.state.memory.store(offset, data, endness=self.project.arch.memory_endness) elif space.name == "register": self.state.registers.store(offset, data, endness=self.project.arch.register_endness) else: raise AngrError("Store to unhandled address space") def _execute_branch(self) -> None: """ Execute a p-code branch operation. """ dest = self._current_op.inputs[0] if dest.space.name == "const": # P-Code-relative branch expr = self.state.scratch.ins_addr self.state.scratch.statement_offset = self._current_op_idx + dest.offset else: expr = dest.offset self.successors.add_successor( self.state, expr, self.state.scratch.guard, "Ijk_Boring", exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, ) self.state.scratch.exit_handled = True def _execute_cbranch(self) -> None: """ Execute a p-code conditional branch operation. """ exit_state = self.state.copy() cond = self._get_value(self._current_op.inputs[1]) dest = self._current_op.inputs[0] if dest.space.name == "const": # P-Code-relative branch expr = exit_state.scratch.ins_addr exit_state.scratch.statement_offset = self._current_op_idx + dest.offset else: expr = dest.offset self.successors.add_successor( exit_state, expr, cond != 0, "Ijk_Boring", exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, ) cont_state = self.state cont_condition = cond == 0 cont_state.add_constraints(cont_condition) cont_state.scratch.guard = claripy.And(cont_state.scratch.guard, cont_condition) def _execute_ret(self) -> None: """ Execute a p-code return operation. """ self.successors.add_successor( self.state, self._get_value(self._current_op.inputs[0]), self.state.scratch.guard, "Ijk_Ret", exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, ) self.state.scratch.exit_handled = True def _execute_branchind(self) -> None: """ Execute a p-code indirect branch operation. """ self.successors.add_successor( self.state, self._get_value(self._current_op.inputs[0]), self.state.scratch.guard, "Ijk_Boring", exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, ) self.state.scratch.exit_handled = True def _execute_call(self) -> None: """ Execute a p-code call operation. """ # FIXME: Spec claims CALL is semantically equivalent to BRANCH. But are p-code relative calls allowed? We assume # not. self.successors.add_successor( self.state.copy(), # FIXME: Check extra processing after call self._current_op.inputs[0].offset, self.state.scratch.guard, "Ijk_Call", exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, ) self.state.scratch.exit_handled = True def _execute_callind(self) -> None: """ Execute a p-code indirect call operation. """ self.successors.add_successor( self.state, self._get_value(self._current_op.inputs[0]), self.state.scratch.guard, "Ijk_Call", exit_stmt_idx=DEFAULT_STATEMENT, exit_ins_addr=self.state.scratch.ins_addr, ) self.state.scratch.exit_handled = True def _execute_callother(self) -> None: # pylint:disable=no-self-use raise AngrError("CALLOTHER emulation not currently supported") def _execute_multiequal(self) -> None: # pylint:disable=no-self-use raise AngrError("MULTIEQUAL appearing in unheritaged code?") def _execute_indirect(self) -> None: # pylint:disable=no-self-use raise AngrError("INDIRECT appearing in unheritaged code?") def _execute_segment_op(self) -> None: # pylint:disable=no-self-use raise AngrError("SEGMENTOP emulation not currently supported") def _execute_cpool_ref(self) -> None: # pylint:disable=no-self-use raise AngrError("Cannot currently emulate cpool operator") def _execute_new(self) -> None: # pylint:disable=no-self-use raise AngrError("Cannot currently emulate new operator")