Source code for pyvex.block

import copy
import itertools
import logging
from typing import List, Optional

from . import expr, stmt
from .const import get_type_size
from .data_ref import DataRef
from .enums import VEXObject
from .errors import SkipStatementsError
from .expr import RdTmp
from .native import pvc
from .stmt import CAS, LLSC, Dirty, Exit, IMark, IRExpr, IRStmt, LoadG, WrTmp, get_enum_from_int, get_int_from_enum

log = logging.getLogger("pyvex.block")


[docs]class IRSB(VEXObject): """ The IRSB is the primary interface to pyvex. Constructing one of these will make a call into LibVEX to perform a translation. IRSB stands for *Intermediate Representation Super-Block*. An IRSB in VEX is a single-entry, multiple-exit code block. :ivar arch: The architecture this block is lifted under :vartype arch: :class:`archinfo.Arch` :ivar statements: The statements in this block :vartype statements: list of :class:`IRStmt` :ivar next: The expression for the default exit target of this block :vartype next: :class:`IRExpr` :ivar int offsIP: The offset of the instruction pointer in the VEX guest state :ivar int stmts_used: The number of statements in this IRSB :ivar str jumpkind: The type of this block's default jump (call, boring, syscall, etc) as a VEX enum string :ivar bool direct_next: Whether this block ends with a direct (not indirect) jump or branch :ivar int size: The size of this block in bytes :ivar int addr: The address of this basic block, i.e. the address in the first IMark """ __slots__ = ( "addr", "arch", "statements", "next", "_tyenv", "jumpkind", "_direct_next", "_size", "_instructions", "_exit_statements", "default_exit_target", "_instruction_addresses", "data_refs", ) # The following constants shall match the defs in pyvex.h MAX_EXITS = 400 MAX_DATA_REFS = 2000
[docs] def __init__( self, data, mem_addr, arch, max_inst=None, max_bytes=None, bytes_offset=0, traceflags=0, opt_level=1, num_inst=None, num_bytes=None, strict_block_end=False, skip_stmts=False, collect_data_refs=False, cross_insn_opt=True, ): """ :param data: The bytes to lift. Can be either a string of bytes or a cffi buffer object. You may also pass None to initialize an empty IRSB. :type data: str or bytes or cffi.FFI.CData or None :param int mem_addr: The address to lift the data at. :param arch: The architecture to lift the data as. :type arch: :class:`archinfo.Arch` :param max_inst: The maximum number of instructions to lift. (See note below) :param max_bytes: The maximum number of bytes to use. :param num_inst: Replaces max_inst if max_inst is None. If set to None as well, no instruction limit is used. :param num_bytes: Replaces max_bytes if max_bytes is None. If set to None as well, no byte limit is used. :param bytes_offset: The offset into `data` to start lifting at. Note that for ARM THUMB mode, both `mem_addr` and `bytes_offset` must be odd (typically `bytes_offset` is set to 1). :param traceflags: The libVEX traceflags, controlling VEX debug prints. :param opt_level: The level of optimization to apply to the IR, -1 through 2. -1 is the strictest unoptimized level, 0 is unoptimized but will perform some lookahead/lookbehind optimizations, 1 performs constant propogation, and 2 performs loop unrolling, which honestly doesn't make much sense in the context of pyvex. The default is 1. :param strict_block_end: Should the LibVEX arm-thumb split block at some instructions, for example CB{N}Z. .. note:: Explicitly specifying the number of instructions to lift (`max_inst`) may not always work exactly as expected. For example, on MIPS, it is meaningless to lift a branch or jump instruction without its delay slot. VEX attempts to Do The Right Thing by possibly decoding fewer instructions than requested. Specifically, this means that lifting a branch or jump on MIPS as a single instruction (`max_inst=1`) will result in an empty IRSB, and subsequent attempts to run this block will raise `SimIRSBError('Empty IRSB passed to SimIRSB.')`. .. note:: If no instruction and byte limit is used, pyvex will continue lifting the block until the block ends properly or until it runs out of data to lift. """ if max_inst is None: max_inst = num_inst if max_bytes is None: max_bytes = num_bytes VEXObject.__init__(self) self.addr = mem_addr self.arch = arch self.statements: List[IRStmt] = [] self.next: Optional[IRExpr] = None self._tyenv = None self.jumpkind: Optional[str] = None self._direct_next = None self._size = None self._instructions = None self._exit_statements = None self.default_exit_target = None self.data_refs = () self._instruction_addresses = () if data is not None: # This is the slower path (because we need to call _from_py() to copy the content in the returned IRSB to # the current IRSB instance. You should always call `lift()` directly. This method is kept for compatibility # concerns. from pyvex.lifting import lift irsb = lift( data, mem_addr, arch, max_bytes=max_bytes, max_inst=max_inst, bytes_offset=bytes_offset, opt_level=opt_level, traceflags=traceflags, strict_block_end=strict_block_end, skip_stmts=skip_stmts, collect_data_refs=collect_data_refs, cross_insn_opt=cross_insn_opt, ) self._from_py(irsb)
[docs] @staticmethod def empty_block(arch, addr, statements=None, nxt=None, tyenv=None, jumpkind=None, direct_next=None, size=None): block = IRSB(None, addr, arch) block._set_attributes(statements, nxt, tyenv, jumpkind, direct_next, size=size) return block
@property def tyenv(self): if self._tyenv is None: self._tyenv = IRTypeEnv(self.arch) return self._tyenv @tyenv.setter def tyenv(self, v): self._tyenv = v @property def has_statements(self): return self.statements is not None and self.statements @property def exit_statements(self): if self._exit_statements is not None: return self._exit_statements # Delayed process if not self.has_statements: return [] self._exit_statements = [] ins_addr = None for idx, stmt_ in enumerate(self.statements): if type(stmt_) is IMark: ins_addr = stmt_.addr + stmt_.delta elif type(stmt_) is Exit: self._exit_statements.append((ins_addr, idx, stmt_)) self._exit_statements = tuple(self._exit_statements) return self._exit_statements
[docs] def copy(self): return copy.deepcopy(self)
[docs] def extend(self, extendwith): """ Appends an irsb to the current irsb. The irsb that is appended is invalidated. The appended irsb's jumpkind and default exit are used. :param extendwith: The IRSB to append to this IRSB :vartype extendwith: :class:`IRSB` """ if self.stmts_used == 0: self._from_py(extendwith) return conversion_dict = {} invalid_vals = (0xFFFFFFFF, -1) new_size = self.size + extendwith.size new_instructions = self.instructions + extendwith.instructions new_direct_next = extendwith.direct_next def convert_tmp(tmp): """ Converts a tmp from the appended-block into one in the appended-to-block. Creates a new tmp if it does not already exist. Prevents collisions in tmp numbers between the two blocks. :param tmp: The tmp number to convert """ if tmp not in conversion_dict: tmp_type = extendwith.tyenv.lookup(tmp) conversion_dict[tmp] = self.tyenv.add(tmp_type) return conversion_dict[tmp] def convert_expr(expr_): """ Converts a VEX expression to use tmps in the appended-block instead of the appended-to-block. Used to prevent collisions in tmp numbers between the two blocks. :param tmp: The VEX expression to convert :vartype expr: :class:`IRExpr` """ if type(expr_) is RdTmp: return RdTmp.get_instance(convert_tmp(expr_.tmp)) return expr_ for stmt_ in extendwith.statements: stmttype = type(stmt_) if stmttype is WrTmp: stmt_.tmp = convert_tmp(stmt_.tmp) elif stmttype is LoadG: stmt_.dst = convert_tmp(stmt_.dst) elif stmttype is LLSC: stmt_.result = convert_tmp(stmt_.result) elif stmttype is Dirty: if stmt_.tmp not in invalid_vals: stmt_.tmp = convert_tmp(stmt_.tmp) for e in stmt_.args: convert_expr(e) elif stmttype is CAS: if stmt_.oldLo not in invalid_vals: stmt_.oldLo = convert_tmp(stmt_.oldLo) if stmt_.oldHi not in invalid_vals: stmt_.oldHi = convert_tmp(stmt_.oldHi) # Convert all expressions to_replace = {} for expr_ in stmt_.expressions: replacement = convert_expr(expr_) if replacement is not expr_: to_replace[expr_] = replacement stmt_.replace_expression(to_replace) # Add the converted statement to self.statements self.statements.append(stmt_) extendwith.next = convert_expr(extendwith.next) self.next = extendwith.next self.jumpkind = extendwith.jumpkind self._size = new_size self._instructions = new_instructions self._direct_next = new_direct_next
# TODO: Change exit_statements, data_references, etc.
[docs] def invalidate_direct_next(self): self._direct_next = None
[docs] def pp(self): """ Pretty-print the IRSB to stdout. """ print(self._pp_str())
def __repr__(self): return f"IRSB <0x{self.size:x} bytes, {self.instructions} ins., {str(self.arch)}> at 0x{self.addr:x}" def __str__(self): return self._pp_str() def __eq__(self, other): return ( isinstance(other, IRSB) and self.addr == other.addr and self.arch.name == other.arch.name and self.statements == other.statements and self.next == other.next and self.jumpkind == other.jumpkind ) def __hash__(self): return hash((IRSB, self.addr, self.arch.name, tuple(self.statements), self.next, self.jumpkind))
[docs] def typecheck(self): try: # existence assertions assert self.next is not None, "Missing next expression" assert self.jumpkind is not None, "Missing jumpkind" # Type assertions assert isinstance(self.next, expr.IRExpr), "Next expression is not an expression" assert type(self.jumpkind is str), "Jumpkind is not a string" assert self.jumpkind.startswith("Ijk_"), "Jumpkind is not a jumpkind enum" assert self.tyenv.typecheck(), "Type environment contains invalid types" # statement assertions last_imark = None for i, st in enumerate(self.statements): assert isinstance(st, stmt.IRStmt), "Statement %d is not an IRStmt" % i try: assert st.typecheck(self.tyenv), "Statement %d failed to typecheck" % i except Exception: # pylint: disable=bare-except assert False, "Statement %d errored in typechecking" % i if type(st) is stmt.NoOp: continue elif type(st) is stmt.IMark: if last_imark is not None: # pylint: disable=unsubscriptable-object assert last_imark[0] + last_imark[1] == st.addr, "IMarks sizes overlap or have gaps" last_imark = (st.addr, st.len) else: assert last_imark is not None, "Operation statement appears before IMark" assert last_imark is not None, "No IMarks present in block" except AssertionError as e: log.debug(e.args[0]) return False return True
# # alternate constructors #
[docs] @staticmethod def from_c(c_irsb, mem_addr, arch): irsb = IRSB(None, mem_addr, arch) irsb._from_c(c_irsb) return irsb
[docs] @staticmethod def from_py(tyenv, stmts, next_expr, jumpkind, mem_addr, arch): irsb = IRSB(None, mem_addr, arch) irsb.tyenv = tyenv irsb.statements = stmts irsb.next = next_expr irsb.jumpkind = jumpkind irsb._direct_next = irsb._is_defaultexit_direct_jump() return irsb
# # simple properties useful for analysis # @property def stmts_used(self): if self.statements is None: return 0 return len(self.statements) @property def offsIP(self): return self.arch.ip_offset @property def direct_next(self): if self._direct_next is None: self._direct_next = self._is_defaultexit_direct_jump() return self._direct_next @property def expressions(self): """ Return an iterator of all expressions contained in the IRSB. """ for s in self.statements: yield from s.expressions yield self.next @property def instructions(self): """ The number of instructions in this block """ if self._instructions is None: if self.statements is None: self._instructions = 0 else: self._instructions = len([s for s in self.statements if type(s) is stmt.IMark]) return self._instructions @property def instruction_addresses(self): """ Addresses of instructions in this block. """ if self._instruction_addresses is None: if self.statements is None: self._instruction_addresses = [] else: self._instruction_addresses = [(s.addr + s.delta) for s in self.statements if type(s) is stmt.IMark] return self._instruction_addresses @property def size(self): """ The size of this block, in bytes """ if self._size is None: self._size = sum(s.len for s in self.statements if type(s) is stmt.IMark) return self._size @property def operations(self): """ A list of all operations done by the IRSB, as libVEX enum names """ ops = [] for e in self.expressions: if hasattr(e, "op"): ops.append(e.op) return ops @property def all_constants(self): """ Returns all constants in the block (including incrementing of the program counter) as :class:`pyvex.const.IRConst`. """ return sum((e.constants for e in self.expressions), []) @property def constants(self): """ The constants (excluding updates of the program counter) in the IRSB as :class:`pyvex.const.IRConst`. """ return sum((s.constants for s in self.statements if not (type(s) is stmt.Put and s.offset == self.offsIP)), []) @property def constant_jump_targets(self): """ A set of the static jump targets of the basic block. """ exits = set() if self.exit_statements: for _, _, stmt_ in self.exit_statements: exits.add(stmt_.dst.value) default_target = self.default_exit_target if default_target is not None: exits.add(default_target) return exits @property def constant_jump_targets_and_jumpkinds(self): """ A dict of the static jump targets of the basic block to their jumpkind. """ exits = {} if self.exit_statements: for _, _, stmt_ in self.exit_statements: exits[stmt_.dst.value] = stmt_.jumpkind default_target = self.default_exit_target if default_target is not None: exits[default_target] = self.jumpkind return exits # # private methods # def _pp_str(self): """ Return the pretty-printed IRSB. :rtype: str """ sa = [] sa.append("IRSB {") if self.statements is not None: sa.append(" %s" % self.tyenv) sa.append("") if self.statements is not None: for i, s in enumerate(self.statements): if isinstance(s, stmt.Put): stmt_str = s.__str__( reg_name=self.arch.translate_register_name(s.offset, s.data.result_size(self.tyenv) // 8) ) elif isinstance(s, stmt.WrTmp) and isinstance(s.data, expr.Get): stmt_str = s.__str__( reg_name=self.arch.translate_register_name(s.data.offset, s.data.result_size(self.tyenv) // 8) ) elif isinstance(s, stmt.Exit): stmt_str = s.__str__(reg_name=self.arch.translate_register_name(s.offsIP, self.arch.bits // 8)) else: stmt_str = s.__str__() sa.append(" %02d | %s" % (i, stmt_str)) else: sa.append(" Statements are omitted.") sa.append(f" NEXT: PUT({self.arch.translate_register_name(self.offsIP)}) = {self.next}; {self.jumpkind}") sa.append("}") return "\n".join(sa) def _is_defaultexit_direct_jump(self): """ Checks if the default of this IRSB a direct jump or not. """ if not (self.jumpkind == "Ijk_InvalICache" or self.jumpkind == "Ijk_Boring" or self.jumpkind == "Ijk_Call"): return False target = self.default_exit_target return target is not None # # internal "constructors" to fill this block out with data from various sources # def _from_c(self, lift_r, skip_stmts=False): c_irsb = lift_r.irsb if not skip_stmts: self.statements = [stmt.IRStmt._from_c(c_irsb.stmts[i]) for i in range(c_irsb.stmts_used)] self.tyenv = IRTypeEnv._from_c(self.arch, c_irsb.tyenv) else: self.statements = None self.tyenv = None self.next = expr.IRExpr._from_c(c_irsb.next) self.jumpkind = get_enum_from_int(c_irsb.jumpkind) self._size = lift_r.size self._instructions = lift_r.insts self._instruction_addresses = tuple(itertools.islice(lift_r.inst_addrs, lift_r.insts)) # Conditional exits self._exit_statements = [] if skip_stmts: if lift_r.exit_count > self.MAX_EXITS: # There are more exits than the default size of the exits array. We will need all statements raise SkipStatementsError("exit_count exceeded MAX_EXITS (%d)" % self.MAX_EXITS) for i in range(lift_r.exit_count): ex = lift_r.exits[i] exit_stmt = stmt.IRStmt._from_c(ex.stmt) self._exit_statements.append((ex.ins_addr, ex.stmt_idx, exit_stmt)) self._exit_statements = tuple(self._exit_statements) else: self._exit_statements = None # It will be generated when self.exit_statements is called # The default exit if lift_r.is_default_exit_constant == 1: self.default_exit_target = lift_r.default_exit else: self.default_exit_target = None # Data references self.data_refs = None if lift_r.data_ref_count > 0: if lift_r.data_ref_count > self.MAX_DATA_REFS: raise SkipStatementsError("data_ref_count exceeded MAX_DATA_REFS (%d)" % self.MAX_DATA_REFS) self.data_refs = [DataRef.from_c(lift_r.data_refs[i]) for i in range(lift_r.data_ref_count)] def _set_attributes( self, statements=None, nxt=None, tyenv=None, jumpkind=None, direct_next=None, size=None, instructions=None, instruction_addresses=None, exit_statements=None, default_exit_target=None, ): self.statements = statements if statements is not None else [] self.next = nxt if tyenv is not None: self.tyenv = tyenv self.jumpkind = jumpkind self._direct_next = direct_next self._size = size self._instructions = instructions self._instruction_addresses = instruction_addresses self._exit_statements = exit_statements self.default_exit_target = default_exit_target def _from_py(self, irsb): self._set_attributes( irsb.statements, irsb.next, irsb.tyenv, irsb.jumpkind, irsb.direct_next, irsb.size, instructions=irsb._instructions, instruction_addresses=irsb._instruction_addresses, exit_statements=irsb.exit_statements, default_exit_target=irsb.default_exit_target, )
[docs]class IRTypeEnv(VEXObject): """ An IR type environment. :ivar types: A list of the types of all the temporaries in this block as VEX enum strings. `types[3]` is the type of t3. :vartype types: list of str """ __slots__ = ["types", "wordty"]
[docs] def __init__(self, arch, types=None): VEXObject.__init__(self) self.types = [] if types is None else types self.wordty = "Ity_I%d" % arch.bits
def __str__(self): return " ".join(("t%d:%s" % (i, t)) for i, t in enumerate(self.types))
[docs] def lookup(self, tmp): """ Return the type of temporary variable `tmp` as an enum string """ if tmp < 0 or tmp > self.types_used: log.debug("Invalid temporary number %d", tmp) raise IndexError(tmp) return self.types[tmp]
[docs] def sizeof(self, tmp): return get_type_size(self.lookup(tmp))
[docs] def add(self, ty): """ Add a new tmp of type `ty` to the environment. Returns the number of the new tmp. """ self.types.append(ty) return self.types_used - 1
@property def types_used(self): return len(self.types) @staticmethod def _from_c(arch, c_tyenv): return IRTypeEnv(arch, [get_enum_from_int(c_tyenv.types[t]) for t in range(c_tyenv.types_used)]) @staticmethod def _to_c(tyenv): c_tyenv = pvc.emptyIRTypeEnv() for ty in tyenv.types: pvc.newIRTemp(c_tyenv, get_int_from_enum(ty)) return c_tyenv
[docs] def typecheck(self): for ty in self.types: try: get_type_size(ty) except ValueError: return False return True