import logging
from collections import defaultdict
from typing import DefaultDict
from pyvex import const
from pyvex.block import IRSB
from pyvex.const import vex_int_class
from pyvex.errors import LiftingException, NeedStatementsNotification, PyVEXError, SkipStatementsError
from pyvex.expr import Const
from pyvex.native import ffi
from pyvex.types import LiftSource, PyLiftSource
from .lifter import Lifter
from .post_processor import Postprocessor
log = logging.getLogger(__name__)
lifters: DefaultDict[str, list[type[Lifter]]] = defaultdict(list)
postprocessors: DefaultDict[str, list[type[Postprocessor]]] = defaultdict(list)
[docs]
def lift(
data: LiftSource,
addr,
arch,
max_bytes=None,
max_inst=None,
bytes_offset=0,
opt_level=1,
traceflags=0,
strict_block_end=True,
inner=False,
skip_stmts=False,
collect_data_refs=False,
cross_insn_opt=True,
load_from_ro_regions=False,
const_prop=False,
):
"""
Recursively lifts blocks using the registered lifters and postprocessors. Tries each lifter in the order in
which they are registered on the data to lift.
If a lifter raises a LiftingException on the data, it is skipped.
If it succeeds and returns a block with a jumpkind of Ijk_NoDecode, all of the lifters are tried on the rest
of the data and if they work, their output is appended to the first block.
:param arch: The arch to lift the data as.
:param addr: The starting address of the block. Effects the IMarks.
:param data: The bytes to lift as either a python string of bytes or a cffi buffer object.
:param max_bytes: The maximum number of bytes to lift. If set to None, no byte limit is used.
:param max_inst: The maximum number of instructions to lift. If set to None, no instruction limit is used.
:param bytes_offset: The offset into `data` to start lifting at.
:param opt_level: The level of optimization to apply to the IR, -1 through 2. -1 is the strictest
unoptimized level, 0 is unoptimized but will perform some lookahead/lookbehind
optimizations, 1 performs constant propogation, and 2 performs loop unrolling,
which honestly doesn't make much sense in the context of pyvex. The default is 1.
:param traceflags: The libVEX traceflags, controlling VEX debug prints.
.. note:: Explicitly specifying the number of instructions to lift (`max_inst`) may not always work
exactly as expected. For example, on MIPS, it is meaningless to lift a branch or jump
instruction without its delay slot. VEX attempts to Do The Right Thing by possibly decoding
fewer instructions than requested. Specifically, this means that lifting a branch or jump
on MIPS as a single instruction (`max_inst=1`) will result in an empty IRSB, and subsequent
attempts to run this block will raise `SimIRSBError('Empty IRSB passed to SimIRSB.')`.
.. note:: If no instruction and byte limit is used, pyvex will continue lifting the block until the block
ends properly or until it runs out of data to lift.
"""
if max_bytes is not None and max_bytes <= 0:
raise PyVEXError("Cannot lift block with no data (max_bytes <= 0)")
if not data:
raise PyVEXError("Cannot lift block with no data (data is empty)")
if isinstance(data, str):
raise TypeError("Cannot pass unicode string as data to lifter")
py_data: PyLiftSource | None
if isinstance(data, (bytes, bytearray, memoryview)):
py_data = data
c_data = None
allow_arch_optimizations = False
else:
if max_bytes is None:
raise PyVEXError("Cannot lift block with ffi pointer and no size (max_bytes is None)")
c_data = data
py_data = None
allow_arch_optimizations = True
# In order to attempt to preserve the property that
# VEX lifts the same bytes to the same IR at all times when optimizations are disabled
# we hack off all of VEX's non-IROpt optimizations when opt_level == -1.
# This is intended to enable comparisons of the lifted IR between code that happens to be
# found in different contexts.
if opt_level < 0:
allow_arch_optimizations = False
opt_level = 0
for lifter in lifters[arch.name]:
try:
u_data: LiftSource = data
if lifter.REQUIRE_DATA_C:
if c_data is None:
assert py_data is not None
if isinstance(py_data, (bytearray, memoryview)):
u_data = ffi.from_buffer(ffi.BVoidP, py_data)
else:
u_data = ffi.from_buffer(ffi.BVoidP, py_data + b"\0" * 8)
max_bytes = min(len(py_data), max_bytes) if max_bytes is not None else len(py_data)
else:
u_data = c_data
skip = 0
elif lifter.REQUIRE_DATA_PY:
if bytes_offset and arch.name.startswith("ARM") and (addr & 1) == 1:
skip = bytes_offset - 1
else:
skip = bytes_offset
if py_data is None:
assert c_data is not None
if max_bytes is None:
log.debug("Cannot create py_data from c_data when no max length is given")
continue
u_data = ffi.buffer(c_data + skip, max_bytes)[:]
else:
if max_bytes is None:
u_data = py_data[skip:]
else:
u_data = py_data[skip : skip + max_bytes]
else:
raise RuntimeError(
"Incorrect lifter configuration. What type of data does %s expect?" % lifter.__class__
)
try:
final_irsb = lifter(arch, addr).lift(
u_data,
bytes_offset - skip,
max_bytes,
max_inst,
opt_level,
traceflags,
allow_arch_optimizations,
strict_block_end,
skip_stmts,
collect_data_refs=collect_data_refs,
cross_insn_opt=cross_insn_opt,
load_from_ro_regions=load_from_ro_regions,
const_prop=const_prop,
)
except SkipStatementsError:
assert skip_stmts is True
final_irsb = lifter(arch, addr).lift(
u_data,
bytes_offset - skip,
max_bytes,
max_inst,
opt_level,
traceflags,
allow_arch_optimizations,
strict_block_end,
skip_stmts=False,
collect_data_refs=collect_data_refs,
cross_insn_opt=cross_insn_opt,
load_from_ro_regions=load_from_ro_regions,
const_prop=const_prop,
)
break
except LiftingException as ex:
log.debug("Lifting Exception: %s", str(ex))
continue
else:
final_irsb = IRSB.empty_block(
arch,
addr,
size=0,
nxt=Const(const.vex_int_class(arch.bits)(addr)),
jumpkind="Ijk_NoDecode",
)
final_irsb.invalidate_direct_next()
return final_irsb
if final_irsb.size > 0 and final_irsb.jumpkind == "Ijk_NoDecode":
# We have decoded a few bytes before we hit an undecodeable instruction.
# Determine if this is an intentional NoDecode, like the ud2 instruction on AMD64
nodecode_addr_expr = final_irsb.next
if type(nodecode_addr_expr) is Const:
nodecode_addr = nodecode_addr_expr.con.value
next_irsb_start_addr = addr + final_irsb.size
if nodecode_addr != next_irsb_start_addr:
# The last instruction of the IRSB has a non-zero length. This is an intentional NoDecode.
# The very last instruction has been decoded
final_irsb.jumpkind = "Ijk_NoDecode"
final_irsb.next = final_irsb.next
final_irsb.invalidate_direct_next()
return final_irsb
# Decode more bytes
if skip_stmts:
# When gymrat will be invoked, we will merge future basic blocks to the current basic block. In this case,
# statements are usually required.
# TODO: In the future, we may further optimize it to handle cases where getting statements in gymrat is not
# TODO: required.
return lift(
data,
addr,
arch,
max_bytes=max_bytes,
max_inst=max_inst,
bytes_offset=bytes_offset,
opt_level=opt_level,
traceflags=traceflags,
strict_block_end=strict_block_end,
skip_stmts=False,
collect_data_refs=collect_data_refs,
load_from_ro_regions=load_from_ro_regions,
const_prop=const_prop,
)
next_addr = addr + final_irsb.size
if max_bytes is not None:
max_bytes -= final_irsb.size
if isinstance(data, (bytes, bytearray, memoryview)):
data_left = data[final_irsb.size :]
else:
data_left = data + final_irsb.size
if max_inst is not None:
max_inst -= final_irsb.instructions
if (max_bytes is None or max_bytes > 0) and (max_inst is None or max_inst > 0) and data_left:
more_irsb = lift(
data_left,
next_addr,
arch,
max_bytes=max_bytes,
max_inst=max_inst,
bytes_offset=bytes_offset,
opt_level=opt_level,
traceflags=traceflags,
strict_block_end=strict_block_end,
inner=True,
skip_stmts=False,
collect_data_refs=collect_data_refs,
load_from_ro_regions=load_from_ro_regions,
const_prop=const_prop,
)
if more_irsb.size:
# Successfully decoded more bytes
final_irsb.extend(more_irsb)
elif max_bytes == 0:
# We have no more bytes left. Mark the jumpkind of the IRSB as Ijk_Boring
if final_irsb.size > 0 and final_irsb.jumpkind == "Ijk_NoDecode":
final_irsb.jumpkind = "Ijk_Boring"
final_irsb.next = Const(vex_int_class(arch.bits)(final_irsb.addr + final_irsb.size))
if not inner:
for postprocessor in postprocessors[arch.name]:
try:
postprocessor(final_irsb).postprocess()
except NeedStatementsNotification as e:
# The post-processor cannot work without statements. Re-lift the current block with skip_stmts=False
if not skip_stmts:
# sanity check
# Why does the post-processor raise NeedStatementsNotification when skip_stmts is False?
raise TypeError(
"Bad post-processor %s: "
"NeedStatementsNotification is raised when statements are available." % postprocessor.__class__
) from e
# Re-lift the current IRSB
return lift(
data,
addr,
arch,
max_bytes=max_bytes,
max_inst=max_inst,
bytes_offset=bytes_offset,
opt_level=opt_level,
traceflags=traceflags,
strict_block_end=strict_block_end,
inner=inner,
skip_stmts=False,
collect_data_refs=collect_data_refs,
load_from_ro_regions=load_from_ro_regions,
const_prop=const_prop,
)
except LiftingException:
continue
return final_irsb
[docs]
def register(lifter, arch_name):
"""
Registers a Lifter or Postprocessor to be used by pyvex. Lifters are are given priority based on the order
in which they are registered. Postprocessors will be run in registration order.
:param lifter: The Lifter or Postprocessor to register
:vartype lifter: :class:`Lifter` or :class:`Postprocessor`
"""
if issubclass(lifter, Lifter):
log.debug("Registering lifter %s for architecture %s.", lifter.__name__, arch_name)
lifters[arch_name].append(lifter)
if issubclass(lifter, Postprocessor):
log.debug("Registering postprocessor %s for architecture %s.", lifter.__name__, arch_name)
postprocessors[arch_name].append(lifter)