Source code for pyvex.block

import copy
import itertools
import logging
from typing import List, Optional, Tuple

from . import expr, stmt
from .const import get_type_size
from .data_ref import DataRef
from .enums import VEXObject
from .errors import SkipStatementsError
from .expr import RdTmp
from .native import pvc
from .stmt import (
    CAS,
    LLSC,
    Dirty,
    Exit,
    IMark,
    IRExpr,
    IRStmt,
    LoadG,
    WrTmp,
    get_enum_from_int,
    get_int_from_enum,
)
from .types import Arch

log = logging.getLogger("pyvex.block")


[docs]class IRSB(VEXObject): """ The IRSB is the primary interface to pyvex. Constructing one of these will make a call into LibVEX to perform a translation. IRSB stands for *Intermediate Representation Super-Block*. An IRSB in VEX is a single-entry, multiple-exit code block. :ivar arch: The architecture this block is lifted under. Must duck-type as :class:`archinfo.arch.Arch` :ivar statements: The statements in this block :vartype statements: list of :class:`IRStmt` :ivar next: The expression for the default exit target of this block :vartype next: :class:`IRExpr` :ivar int offsIP: The offset of the instruction pointer in the VEX guest state :ivar int stmts_used: The number of statements in this IRSB :ivar str jumpkind: The type of this block's default jump (call, boring, syscall, etc) as a VEX enum string :ivar bool direct_next: Whether this block ends with a direct (not indirect) jump or branch :ivar int size: The size of this block in bytes :ivar int addr: The address of this basic block, i.e. the address in the first IMark """ __slots__ = ( "addr", "arch", "statements", "next", "_tyenv", "jumpkind", "_direct_next", "_size", "_instructions", "_exit_statements", "default_exit_target", "_instruction_addresses", "data_refs", ) # The following constants shall match the defs in pyvex.h MAX_EXITS = 400 MAX_DATA_REFS = 2000
[docs] def __init__( self, data, mem_addr, arch: Arch, max_inst=None, max_bytes=None, bytes_offset=0, traceflags=0, opt_level=1, num_inst=None, num_bytes=None, strict_block_end=False, skip_stmts=False, collect_data_refs=False, cross_insn_opt=True, ): """ :param data: The bytes to lift. Can be either a string of bytes or a cffi buffer object. You may also pass None to initialize an empty IRSB. :type data: str or bytes or cffi.FFI.CData or None :param int mem_addr: The address to lift the data at. :param arch: The architecture to lift the data as. :param max_inst: The maximum number of instructions to lift. (See note below) :param max_bytes: The maximum number of bytes to use. :param num_inst: Replaces max_inst if max_inst is None. If set to None as well, no instruction limit is used. :param num_bytes: Replaces max_bytes if max_bytes is None. If set to None as well, no byte limit is used. :param bytes_offset: The offset into `data` to start lifting at. Note that for ARM THUMB mode, both `mem_addr` and `bytes_offset` must be odd (typically `bytes_offset` is set to 1). :param traceflags: The libVEX traceflags, controlling VEX debug prints. :param opt_level: The level of optimization to apply to the IR, -1 through 2. -1 is the strictest unoptimized level, 0 is unoptimized but will perform some lookahead/lookbehind optimizations, 1 performs constant propogation, and 2 performs loop unrolling, which honestly doesn't make much sense in the context of pyvex. The default is 1. :param strict_block_end: Should the LibVEX arm-thumb split block at some instructions, for example CB{N}Z. .. note:: Explicitly specifying the number of instructions to lift (`max_inst`) may not always work exactly as expected. For example, on MIPS, it is meaningless to lift a branch or jump instruction without its delay slot. VEX attempts to Do The Right Thing by possibly decoding fewer instructions than requested. Specifically, this means that lifting a branch or jump on MIPS as a single instruction (`max_inst=1`) will result in an empty IRSB, and subsequent attempts to run this block will raise `SimIRSBError('Empty IRSB passed to SimIRSB.')`. .. note:: If no instruction and byte limit is used, pyvex will continue lifting the block until the block ends properly or until it runs out of data to lift. """ if max_inst is None: max_inst = num_inst if max_bytes is None: max_bytes = num_bytes VEXObject.__init__(self) self.addr = mem_addr self.arch: Arch = arch self.statements: List[IRStmt] = [] self.next: Optional[IRExpr] = None self._tyenv: Optional["IRTypeEnv"] = None self.jumpkind: Optional[str] = None self._direct_next: Optional[bool] = None self._size: Optional[int] = None self._instructions: Optional[int] = None self._exit_statements: Optional[Tuple[Tuple[int, int, IRStmt], ...]] = None self.default_exit_target = None self.data_refs = () self._instruction_addresses: Tuple[int, ...] = () if data is not None: # This is the slower path (because we need to call _from_py() to copy the content in the returned IRSB to # the current IRSB instance. You should always call `lift()` directly. This method is kept for compatibility # concerns. from pyvex.lifting import lift irsb = lift( data, mem_addr, arch, max_bytes=max_bytes, max_inst=max_inst, bytes_offset=bytes_offset, opt_level=opt_level, traceflags=traceflags, strict_block_end=strict_block_end, skip_stmts=skip_stmts, collect_data_refs=collect_data_refs, cross_insn_opt=cross_insn_opt, ) self._from_py(irsb)
[docs] @staticmethod def empty_block(arch, addr, statements=None, nxt=None, tyenv=None, jumpkind=None, direct_next=None, size=None): block = IRSB(None, addr, arch) block._set_attributes(statements, nxt, tyenv, jumpkind, direct_next, size=size) return block
@property def tyenv(self) -> "IRTypeEnv": if self._tyenv is None: self._tyenv = IRTypeEnv(self.arch) return self._tyenv @tyenv.setter def tyenv(self, v): self._tyenv = v @property def has_statements(self) -> bool: return self.statements is not None and bool(self.statements) @property def exit_statements(self) -> Tuple[Tuple[int, int, IRStmt], ...]: if self._exit_statements is not None: return self._exit_statements # Delayed process if not self.has_statements: return () exit_statements = [] ins_addr = None for idx, stmt_ in enumerate(self.statements): if type(stmt_) is IMark: ins_addr = stmt_.addr + stmt_.delta elif type(stmt_) is Exit: assert ins_addr is not None exit_statements.append((ins_addr, idx, stmt_)) self._exit_statements = tuple(exit_statements) return self._exit_statements
[docs] def copy(self) -> "IRSB": return copy.deepcopy(self)
[docs] def extend(self, extendwith) -> None: """ Appends an irsb to the current irsb. The irsb that is appended is invalidated. The appended irsb's jumpkind and default exit are used. :param extendwith: The IRSB to append to this IRSB :vartype extendwith: :class:`IRSB` """ if self.stmts_used == 0: self._from_py(extendwith) return conversion_dict = {} invalid_vals = (0xFFFFFFFF, -1) new_size = self.size + extendwith.size new_instructions = self.instructions + extendwith.instructions new_direct_next = extendwith.direct_next def convert_tmp(tmp): """ Converts a tmp from the appended-block into one in the appended-to-block. Creates a new tmp if it does not already exist. Prevents collisions in tmp numbers between the two blocks. :param tmp: The tmp number to convert """ if tmp not in conversion_dict: tmp_type = extendwith.tyenv.lookup(tmp) conversion_dict[tmp] = self.tyenv.add(tmp_type) return conversion_dict[tmp] def convert_expr(expr_): """ Converts a VEX expression to use tmps in the appended-block instead of the appended-to-block. Used to prevent collisions in tmp numbers between the two blocks. :param tmp: The VEX expression to convert :vartype expr: :class:`IRExpr` """ if type(expr_) is RdTmp: return RdTmp.get_instance(convert_tmp(expr_.tmp)) return expr_ for stmt_ in extendwith.statements: stmttype = type(stmt_) if stmttype is WrTmp: stmt_.tmp = convert_tmp(stmt_.tmp) elif stmttype is LoadG: stmt_.dst = convert_tmp(stmt_.dst) elif stmttype is LLSC: stmt_.result = convert_tmp(stmt_.result) elif stmttype is Dirty: if stmt_.tmp not in invalid_vals: stmt_.tmp = convert_tmp(stmt_.tmp) for e in stmt_.args: convert_expr(e) elif stmttype is CAS: if stmt_.oldLo not in invalid_vals: stmt_.oldLo = convert_tmp(stmt_.oldLo) if stmt_.oldHi not in invalid_vals: stmt_.oldHi = convert_tmp(stmt_.oldHi) # Convert all expressions to_replace = {} for expr_ in stmt_.expressions: replacement = convert_expr(expr_) if replacement is not expr_: to_replace[expr_] = replacement stmt_.replace_expression(to_replace) # Add the converted statement to self.statements self.statements.append(stmt_) extendwith.next = convert_expr(extendwith.next) self.next = extendwith.next self.jumpkind = extendwith.jumpkind self._size = new_size self._instructions = new_instructions self._direct_next = new_direct_next
# TODO: Change exit_statements, data_references, etc.
[docs] def invalidate_direct_next(self) -> None: self._direct_next = None
[docs] def pp(self) -> None: """ Pretty-print the IRSB to stdout. """ print(self._pp_str())
def __repr__(self): return f"IRSB <0x{self.size:x} bytes, {self.instructions} ins., {str(self.arch)}> at 0x{self.addr:x}" def __str__(self): return self._pp_str() def __eq__(self, other): return ( isinstance(other, IRSB) and self.addr == other.addr and self.arch.name == other.arch.name and self.statements == other.statements and self.next == other.next and self.jumpkind == other.jumpkind ) def __hash__(self): return hash((IRSB, self.addr, self.arch.name, tuple(self.statements), self.next, self.jumpkind))
[docs] def typecheck(self) -> bool: try: # existence assertions assert self.next is not None, "Missing next expression" assert self.jumpkind is not None, "Missing jumpkind" # Type assertions assert isinstance(self.next, expr.IRExpr), "Next expression is not an expression" assert type(self.jumpkind is str), "Jumpkind is not a string" assert self.jumpkind.startswith("Ijk_"), "Jumpkind is not a jumpkind enum" assert self.tyenv.typecheck(), "Type environment contains invalid types" # statement assertions last_imark = None for i, st in enumerate(self.statements): assert isinstance(st, stmt.IRStmt), "Statement %d is not an IRStmt" % i try: assert st.typecheck(self.tyenv), "Statement %d failed to typecheck" % i except Exception: # pylint: disable=bare-except assert False, "Statement %d errored in typechecking" % i if type(st) is stmt.NoOp: continue elif type(st) is stmt.IMark: if last_imark is not None: # pylint: disable=unsubscriptable-object assert last_imark[0] + last_imark[1] == st.addr, "IMarks sizes overlap or have gaps" last_imark = (st.addr, st.len) else: assert last_imark is not None, "Operation statement appears before IMark" assert last_imark is not None, "No IMarks present in block" except AssertionError as e: log.debug(e.args[0]) return False return True
# # alternate constructors #
[docs] @staticmethod def from_c(c_irsb, mem_addr, arch) -> "IRSB": irsb = IRSB(None, mem_addr, arch) irsb._from_c(c_irsb) return irsb
[docs] @staticmethod def from_py(tyenv, stmts, next_expr, jumpkind, mem_addr, arch) -> "IRSB": irsb = IRSB(None, mem_addr, arch) irsb.tyenv = tyenv irsb.statements = stmts irsb.next = next_expr irsb.jumpkind = jumpkind irsb._direct_next = irsb._is_defaultexit_direct_jump() return irsb
# # simple properties useful for analysis # @property def stmts_used(self) -> int: if self.statements is None: return 0 return len(self.statements) @property def offsIP(self) -> int: return self.arch.ip_offset @property def direct_next(self): if self._direct_next is None: self._direct_next = self._is_defaultexit_direct_jump() return self._direct_next @property def expressions(self): """ Return an iterator of all expressions contained in the IRSB. """ for s in self.statements: yield from s.expressions yield self.next @property def instructions(self): """ The number of instructions in this block """ if self._instructions is None: if self.statements is None: self._instructions = 0 else: self._instructions = len([s for s in self.statements if type(s) is stmt.IMark]) return self._instructions @property def instruction_addresses(self) -> Tuple[int, ...]: """ Addresses of instructions in this block. """ if self._instruction_addresses is None: if self.statements is None: self._instruction_addresses = () else: self._instruction_addresses = tuple( (s.addr + s.delta) for s in self.statements if type(s) is stmt.IMark ) return self._instruction_addresses @property def size(self): """ The size of this block, in bytes """ if self._size is None: self._size = sum(s.len for s in self.statements if type(s) is stmt.IMark) return self._size @property def operations(self): """ A list of all operations done by the IRSB, as libVEX enum names """ ops = [] for e in self.expressions: if hasattr(e, "op"): ops.append(e.op) return ops @property def all_constants(self): """ Returns all constants in the block (including incrementing of the program counter) as :class:`pyvex.const.IRConst`. """ return sum((e.constants for e in self.expressions), []) @property def constants(self): """ The constants (excluding updates of the program counter) in the IRSB as :class:`pyvex.const.IRConst`. """ return sum((s.constants for s in self.statements if not (type(s) is stmt.Put and s.offset == self.offsIP)), []) @property def constant_jump_targets(self): """ A set of the static jump targets of the basic block. """ exits = set() if self.exit_statements: for _, _, stmt_ in self.exit_statements: exits.add(stmt_.dst.value) default_target = self.default_exit_target if default_target is not None: exits.add(default_target) return exits @property def constant_jump_targets_and_jumpkinds(self): """ A dict of the static jump targets of the basic block to their jumpkind. """ exits = {} if self.exit_statements: for _, _, stmt_ in self.exit_statements: exits[stmt_.dst.value] = stmt_.jumpkind default_target = self.default_exit_target if default_target is not None: exits[default_target] = self.jumpkind return exits # # private methods # def _pp_str(self) -> str: """ Return the pretty-printed IRSB. """ sa = [] sa.append("IRSB {") if self.statements is not None: sa.append(" %s" % self.tyenv) sa.append("") if self.statements is not None: for i, s in enumerate(self.statements): if isinstance(s, stmt.Put): stmt_str = s.pp_str( reg_name=self.arch.translate_register_name(s.offset, s.data.result_size(self.tyenv) // 8) ) elif isinstance(s, stmt.WrTmp) and isinstance(s.data, expr.Get): stmt_str = s.pp_str( reg_name=self.arch.translate_register_name(s.data.offset, s.data.result_size(self.tyenv) // 8) ) elif isinstance(s, stmt.Exit): stmt_str = s.pp_str(reg_name=self.arch.translate_register_name(s.offsIP, self.arch.bits // 8)) else: stmt_str = s.pp_str() sa.append(" %02d | %s" % (i, stmt_str)) else: sa.append(" Statements are omitted.") sa.append(f" NEXT: PUT({self.arch.translate_register_name(self.offsIP)}) = {self.next}; {self.jumpkind}") sa.append("}") return "\n".join(sa) def _is_defaultexit_direct_jump(self): """ Checks if the default of this IRSB a direct jump or not. """ if not (self.jumpkind == "Ijk_InvalICache" or self.jumpkind == "Ijk_Boring" or self.jumpkind == "Ijk_Call"): return False target = self.default_exit_target return target is not None # # internal "constructors" to fill this block out with data from various sources # def _from_c(self, lift_r, skip_stmts=False): c_irsb = lift_r.irsb if not skip_stmts: self.statements = [stmt.IRStmt._from_c(c_irsb.stmts[i]) for i in range(c_irsb.stmts_used)] self.tyenv = IRTypeEnv._from_c(self.arch, c_irsb.tyenv) else: self.statements = None self.tyenv = None self.next = expr.IRExpr._from_c(c_irsb.next) self.jumpkind = get_enum_from_int(c_irsb.jumpkind) self._size = lift_r.size self._instructions = lift_r.insts self._instruction_addresses = tuple(itertools.islice(lift_r.inst_addrs, lift_r.insts)) # Conditional exits exit_statements = [] if skip_stmts: if lift_r.exit_count > self.MAX_EXITS: # There are more exits than the default size of the exits array. We will need all statements raise SkipStatementsError("exit_count exceeded MAX_EXITS (%d)" % self.MAX_EXITS) for i in range(lift_r.exit_count): ex = lift_r.exits[i] exit_stmt = stmt.IRStmt._from_c(ex.stmt) exit_statements.append((ex.ins_addr, ex.stmt_idx, exit_stmt)) self._exit_statements = tuple(exit_statements) else: self._exit_statements = None # It will be generated when self.exit_statements is called # The default exit if lift_r.is_default_exit_constant == 1: self.default_exit_target = lift_r.default_exit else: self.default_exit_target = None # Data references self.data_refs = None if lift_r.data_ref_count > 0: if lift_r.data_ref_count > self.MAX_DATA_REFS: raise SkipStatementsError("data_ref_count exceeded MAX_DATA_REFS (%d)" % self.MAX_DATA_REFS) self.data_refs = [DataRef.from_c(lift_r.data_refs[i]) for i in range(lift_r.data_ref_count)] def _set_attributes( self, statements=None, nxt=None, tyenv=None, jumpkind=None, direct_next=None, size=None, instructions=None, instruction_addresses=None, exit_statements=None, default_exit_target=None, ): self.statements = statements if statements is not None else [] self.next = nxt if tyenv is not None: self.tyenv = tyenv self.jumpkind = jumpkind self._direct_next = direct_next self._size = size self._instructions = instructions self._instruction_addresses = instruction_addresses self._exit_statements = exit_statements self.default_exit_target = default_exit_target def _from_py(self, irsb): self._set_attributes( irsb.statements, irsb.next, irsb.tyenv, irsb.jumpkind, irsb.direct_next, irsb.size, instructions=irsb._instructions, instruction_addresses=irsb._instruction_addresses, exit_statements=irsb.exit_statements, default_exit_target=irsb.default_exit_target, )
[docs]class IRTypeEnv(VEXObject): """ An IR type environment. :ivar types: A list of the types of all the temporaries in this block as VEX enum strings. `types[3]` is the type of t3. :vartype types: list of str """ __slots__ = ["types", "wordty"]
[docs] def __init__(self, arch, types=None): VEXObject.__init__(self) self.types = [] if types is None else types self.wordty = "Ity_I%d" % arch.bits
def __str__(self): return " ".join(("t%d:%s" % (i, t)) for i, t in enumerate(self.types))
[docs] def lookup(self, tmp): """ Return the type of temporary variable `tmp` as an enum string """ if tmp < 0 or tmp > self.types_used: log.debug("Invalid temporary number %d", tmp) raise IndexError(tmp) return self.types[tmp]
[docs] def sizeof(self, tmp): return get_type_size(self.lookup(tmp))
[docs] def add(self, ty): """ Add a new tmp of type `ty` to the environment. Returns the number of the new tmp. """ self.types.append(ty) return self.types_used - 1
@property def types_used(self): return len(self.types) @staticmethod def _from_c(arch, c_tyenv): return IRTypeEnv(arch, [get_enum_from_int(c_tyenv.types[t]) for t in range(c_tyenv.types_used)]) @staticmethod def _to_c(tyenv): c_tyenv = pvc.emptyIRTypeEnv() for ty in tyenv.types: pvc.newIRTemp(c_tyenv, get_int_from_enum(ty)) return c_tyenv
[docs] def typecheck(self): for ty in self.types: try: get_type_size(ty) except ValueError: return False return True