from typing import Optional, Iterable
import claripy
import logging
from angr.calling_conventions import DEFAULT_CC, SimRegArg
from angr.engines.engine import SuccessorsMixin, SimSuccessors
from angr.misc.ux import once
from ...utils.constants import DEFAULT_STATEMENT
from ... import sim_options as o
from ... import errors
from .lifter import PcodeLifterEngineMixin, IRSB
from .emulate import PcodeEmulatorMixin
l = logging.getLogger(__name__)
# pylint:disable=abstract-method
[docs]class HeavyPcodeMixin(
SuccessorsMixin,
PcodeLifterEngineMixin,
PcodeEmulatorMixin,
):
"""
Execution engine based on P-code, Ghidra's IR.
Responds to the following parameters to the step stack:
- irsb: The P-Code IRSB object to use for execution. If not provided one will be lifted.
- skip_stmts: The number of statements to skip in processing
- last_stmt: Do not execute any statements after this statement
- thumb: Whether the block should be force to be lifted in ARM's THUMB mode. (FIXME)
- extra_stop_points:
An extra set of points at which to break basic blocks
- insn_bytes: A string of bytes to use for the block instead of the project.
- size: The maximum size of the block, in bytes.
- num_inst: The maximum number of instructions.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._addr = None
self._insn_bytes = None
self._thumb = None
self._size = None
self._num_inst = None
self._extra_stop_points = None
[docs] def process_successors(
self,
successors: SimSuccessors,
irsb: Optional[IRSB] = None,
insn_text: Optional[str] = None,
insn_bytes: Optional[bytes] = None,
thumb: bool = False,
size: Optional[int] = None,
num_inst: Optional[int] = None,
extra_stop_points: Optional[Iterable[int]] = None,
**kwargs,
) -> None:
# pylint:disable=arguments-differ
if type(successors.addr) is not int:
return super().process_successors(
successors,
extra_stop_points=extra_stop_points,
num_inst=num_inst,
size=size,
insn_text=insn_text,
insn_bytes=insn_bytes,
**kwargs,
)
if insn_text is not None:
if insn_bytes is not None:
raise errors.SimEngineError("You cannot provide both 'insn_bytes' and 'insn_text'!")
insn_bytes = self.project.arch.asm(insn_text, addr=successors.addr, thumb=thumb)
if insn_bytes is None:
raise errors.AngrAssemblyError(
"Assembling failed. Please make sure keystone is installed, and the assembly string is correct."
)
successors.sort = "IRSB"
successors.description = "IRSB"
self.state.history.recent_block_count = 1
self.state.scratch.guard = claripy.true
self.state.scratch.sim_procedure = None
addr = successors.addr
self.state.scratch.bbl_addr = addr
self.state.scratch.irsb = irsb
self._addr = addr
self._insn_bytes = insn_bytes
self._thumb = thumb
self._size = size
self._num_inst = num_inst
self._extra_stop_points = extra_stop_points
# Lift and process the block to get successors, retry if necessary
finished = False
while not finished:
self._lift_irsb()
self._probe_access()
self._store_successor_artifacts(successors)
finished = self._process_irsb()
self._process_successor_exits(successors)
successors.processed = True
def _lift_irsb(self):
irsb = self.state.scratch.irsb
if irsb is None:
irsb = self.lift_pcode(
addr=self._addr,
state=self.state,
insn_bytes=self._insn_bytes,
thumb=self._thumb,
size=self._size,
num_inst=self._num_inst,
extra_stop_points=self._extra_stop_points,
)
if irsb.size == 0:
if irsb.jumpkind == "Ijk_NoDecode" and not self.state.project.is_hooked(irsb.addr):
raise errors.SimIRSBNoDecodeError(
"IR decoding error at %#x. You can hook this instruction with "
"a python replacement using project.hook"
"(%#x, your_function, length=length_of_instruction)." % (self._addr, self._addr)
)
raise errors.SimIRSBError("Empty IRSB passed to HeavyPcodeMixin.")
self.state.scratch.irsb = irsb
def _store_successor_artifacts(self, successors: SimSuccessors) -> None:
"""
Update successors.artifacts with IRSB details.
"""
irsb = self.state.scratch.irsb
successors.artifacts["irsb"] = irsb
successors.artifacts["irsb_size"] = irsb.size
successors.artifacts["irsb_direct_next"] = irsb.direct_next
successors.artifacts["irsb_default_jumpkind"] = irsb.jumpkind
successors.artifacts["insn_addrs"] = []
def _probe_access(self) -> None:
"""
Check permissions, are we allowed to execute here? Do we care?
"""
if o.STRICT_PAGE_ACCESS not in self.state.options:
return
try:
perms = self.state.memory.permissions(self._addr)
except errors.SimMemoryError:
raise errors.SimSegfaultError(self._addr, "exec-miss") from errors.SimMemoryError
else:
if not perms.symbolic:
perms = self.state.solver.eval(perms)
if not perms & 4 and o.ENABLE_NX in self.state.options:
raise errors.SimSegfaultError(self._addr, "non-executable")
def _process_irsb(self) -> bool:
"""
Execute the IRSB. Returns True if successfully processed.
"""
try:
self.handle_pcode_block(self.state.scratch.irsb)
return True
except errors.SimReliftException as e:
self.state = e.state
if self._insn_bytes is not None:
raise errors.SimEngineError("You cannot pass self-modifying code as insn_bytes!!!")
new_ip = self.state.scratch.ins_addr
if self._size is not None:
self._size -= new_ip - self._addr
if self._num_inst is not None:
self._num_inst -= self.state.scratch.num_insns
self._addr = new_ip
# clear the stage before creating the new IRSB
self.state.scratch.dirty_addrs.clear()
self.state.scratch.irsb = None
except errors.SimError as ex:
ex.record_state(self.state)
raise
# FIXME:
# except VEXEarlyExit:
# break
return False
def _process_successor_exits(self, successors: SimSuccessors) -> None:
"""
Do return emulation and call-less stuff.
"""
for exit_state in list(successors.all_successors):
exit_jumpkind = exit_state.history.jumpkind
if exit_jumpkind is None:
exit_jumpkind = ""
if o.CALLLESS in self.state.options and exit_jumpkind == "Ijk_Call":
# get the default calling convention for the architecture and retrieve the return value offset
if exit_state.arch.name in DEFAULT_CC:
cc = DEFAULT_CC[exit_state.arch.name]
ret_reg = cc.RETURN_VAL
if isinstance(ret_reg, SimRegArg):
ret_offset = exit_state.arch.registers[ret_reg.reg_name][0]
exit_state.registers.store(
ret_offset,
exit_state.solver.Unconstrained("fake_ret_value", exit_state.arch.bits),
)
else:
if once("return_val_is_not_reg"):
l.warning(
"The return value of the default calling convention for architecture %s is not "
"stored in a register. We cannot set the fake return value in Call-less mode. "
"Please report to GitHub.",
exit_state.arch.name,
)
else:
if once("missing_default_cc"):
l.warning(
"Default calling convention is not set for architecture %s. We cannot set the fake "
"return value in Call-less mode.",
exit_state.arch.name,
)
exit_state.scratch.target = exit_state.solver.BVV(
successors.addr + self.state.scratch.irsb.size, exit_state.arch.bits
)
exit_state.history.jumpkind = "Ijk_Ret"
exit_state.regs.ip = exit_state.scratch.target
if exit_state.arch.call_pushes_ret:
exit_state.regs.sp = exit_state.regs.sp + exit_state.arch.bytes
elif o.DO_RET_EMULATION in exit_state.options and (
exit_jumpkind == "Ijk_Call" or exit_jumpkind.startswith("Ijk_Sys")
):
l.debug("%s adding postcall exit.", self)
ret_state = exit_state.copy()
guard = (
ret_state.solver.true
if o.TRUE_RET_EMULATION_GUARD in self.state.options
else ret_state.solver.false
)
ret_target = ret_state.solver.BVV(successors.addr + self.state.scratch.irsb.size, ret_state.arch.bits)
if ret_state.arch.call_pushes_ret and not exit_jumpkind.startswith("Ijk_Sys"):
ret_state.regs.sp = ret_state.regs.sp + ret_state.arch.bytes
successors.add_successor(
ret_state,
ret_target,
guard,
"Ijk_FakeRet",
exit_stmt_idx=DEFAULT_STATEMENT,
exit_ins_addr=self.state.scratch.ins_addr,
)