Source code for angr.analyses.disassembly

import logging
from collections import defaultdict
from typing import Union, Optional, Sequence, Tuple, Any

import pyvex
import archinfo
from angr.knowledge_plugins import Function

from . import Analysis

from ..utils.library import get_cpp_function_name
from ..utils.formatting import ansi_color_enabled, ansi_color, add_edge_to_buffer
from ..block import DisassemblerInsn, CapstoneInsn, SootBlockNode
from ..codenode import BlockNode
from .disassembly_utils import decode_instruction

try:
    from ..engines import pcode
    import pypcode

    IRSBType = Union[pyvex.IRSB, pcode.lifter.IRSB]
    IROpObjType = Union[pyvex.stmt.IRStmt, pypcode.PcodeOp]
except ImportError:
    pcode = None
    IRSBType = pyvex.IRSB
    IROpObjType = pyvex.stmt

l = logging.getLogger(name=__name__)

# pylint: disable=unidiomatic-typecheck


[docs]class DisassemblyPiece: addr = None ident = float("nan")
[docs] def render(self, formatting=None): x = self._render(formatting) if len(x) == 1: return [self.highlight(x[0], formatting)] else: return x
def _render(self, formatting): raise NotImplementedError
[docs] def getpiece(self, formatting, column): # pylint:disable=unused-argument return self
[docs] def width(self, formatting): r = self._render(formatting) if not r: return 0 return max(len(x) for x in r)
[docs] def height(self, formatting): return len(self._render(formatting))
[docs] @staticmethod def color(string, coloring, formatting): try: return "{}{}{}".format(formatting["colors"][coloring][0], string, formatting["colors"][coloring][1]) except KeyError: return string
[docs] def highlight(self, string, formatting=None): try: if formatting is not None: if "format_callback" in formatting: return formatting["format_callback"](self, string) if self in formatting["highlight"]: return self.color(string, "highlight", formatting) except KeyError: pass return string
def __eq__(self, other): return False
[docs]class FunctionStart(DisassemblyPiece):
[docs] def __init__(self, func): """ Constructor. :param angr.knowledge.Function func: The function instance. """ self.addr = func.addr self.vars = [] self.name = func.name self.is_simprocedure = func.is_simprocedure self.sim_procedure = None if func.is_syscall: self.sim_procedure = func._project.simos.syscall_from_addr(self.addr) elif func.is_simprocedure: self.sim_procedure = func._project.hooked_by(self.addr)
def _render(self, formatting): # TODO: Make the individual elements be individual Pieces return [f"{name} = {offset:#x}" for offset, name in self.vars]
[docs] def height(self, formatting): return len(self.vars)
[docs]class Label(DisassemblyPiece):
[docs] def __init__(self, addr, name): self.addr = addr self.name = name
def _render(self, formatting): # pylint:disable=unused-argument return [self.name + ":"]
[docs]class IROp(DisassemblyPiece): __slots__ = ( "addr", "seq", "obj", "irsb", ) addr: int seq: int obj: IROpObjType irsb: IRSBType
[docs] def __init__(self, addr: int, seq: int, obj: IROpObjType, irsb: IRSBType): self.addr = addr self.seq = seq self.obj = obj self.irsb = irsb
def __str__(self): return str(self.obj) def _render(self, formatting): # pylint:disable=unused-argument return [str(self)]
[docs]class BlockStart(DisassemblyPiece):
[docs] def __init__(self, block, parentfunc, project): self.addr = block.addr self.size = block.size self.parentfunc = parentfunc self.project = project
def _render(self, formatting): return []
[docs]class Hook(DisassemblyPiece):
[docs] def __init__(self, block): self.addr = block.addr simproc_name = str(block.sim_procedure) self.name = simproc_name.split()[-1].strip("'<>") self.short_name = simproc_name.strip("'<>").split(".")[-1]
def _render(self, formatting): return ["SimProcedure " + self.short_name] def __eq__(self, other): return type(other) is Hook and self.name == other.name
[docs]class Instruction(DisassemblyPiece):
[docs] def __init__(self, insn, parentblock, project=None): self.addr = insn.address self.size = insn.size self.insn = insn self.parentblock = parentblock self.project = parentblock.project if parentblock is not None else project self.arch = self.project.arch self.format = "" self.components = () self.opcode = None self.operands = [] # the following members will be filled in after dissecting the instruction self.type = None self.branch_type = None self.branch_target_operand = None self.dissect_instruction() if isinstance(insn, CapstoneInsn): decode_instruction(self.arch, self)
@property def mnemonic(self): return self.opcode
[docs] def reload_format(self): self.insn = CapstoneInsn(next(self.arch.capstone.disasm(self.insn.bytes, self.addr))) self.dissect_instruction()
[docs] def dissect_instruction(self): if isinstance( self.arch, (archinfo.ArchAArch64, archinfo.ArchARM, archinfo.ArchARMEL, archinfo.ArchARMHF, archinfo.ArchARMCortexM), ): self.dissect_instruction_for_arm() else: # the default one works well for x86, add more arch-specific # code when you find it doesn't meet your need. self.dissect_instruction_by_default()
[docs] def dissect_instruction_for_arm(self): self.opcode = Opcode(self) self.operands = [] # We use capstone for arm64 disassembly, so this assertion must success assert hasattr(self.insn, "operands") op_str = self.insn.op_str # NOTE: the size of dummy_operands is not necessarily equal to # operands count of capstone disasm result. If we check # len(self.operands) == len(self.insn.operands) # special cases in arm disassembly will mess up the code dummy_operands = self.split_arm_op_string(op_str) for operand in dummy_operands: opr_pieces = self.split_op_string(operand) cur_operand = [] if opr_pieces[0][0].isalpha() and opr_pieces[0] in self.arch.registers: cur_operand.append(Register(opr_pieces[0])) # handle register's suffix (e.g. "sp!", "d0[1]", "v0.16b") cur_operand.extend(opr_pieces[1:]) self.operands.append(cur_operand) continue for i, p in enumerate(opr_pieces): if p[0].isnumeric(): if any( ( i > 0 and opr_pieces[i - 1] == ".", i > 1 and ( opr_pieces[i - 2] in ["lsl", "lsr", "asr", "ror", "msl"] or opr_pieces[i - 2][:3] in ("uxt", "sxt") ), ) ): cur_operand.append(p) continue # Always set False. I don't see any '+' sign appear # in capstone's arm disasm result with_sign = False try: v = int(p, 0) except ValueError: l.error("Failed to parse operand %s at %016x. Please report.", p, self.addr) cur_operand.append(p) continue if i > 0 and opr_pieces[i - 1] == "-": v = -v cur_operand.pop() cur_operand.append(Value(v, with_sign)) elif p[0].isalpha() and p in self.arch.registers: cur_operand.append(Register(p)) else: cur_operand.append(p) self.operands.append(cur_operand) for i, opr in enumerate(self.operands): if i < len(self.insn.operands): op_type = self.insn.operands[i].type else: # set extra dummy operand type to default 0 op_type = 0 self.operands[i] = Operand.build(op_type, i, opr, self) if len(self.operands) == 0 and len(self.insn.operands) != 0: l.error("Operand parsing failed for instruction %s at address %x", str(self.insn), self.insn.address) self.operands = [] return
[docs] @staticmethod def split_arm_op_string(op_str: str): # Split arm operand string by comma outside brackets pieces = [] outside_brackets = True cur_opr = "" for c in op_str: if c == "[" or c == "{": outside_brackets = False if c == "]" or c == "}": outside_brackets = True if c == "," and outside_brackets: pieces.append(cur_opr) cur_opr = "" continue if c == " ": continue cur_opr += c if cur_opr: pieces.append(cur_opr) return pieces
[docs] def dissect_instruction_by_default(self): # perform a "smart split" of an operands string into smaller pieces insn_pieces = self.split_op_string(self.insn.op_str) self.operands = [] cur_operand = None i = len(insn_pieces) - 1 cs_op_num = -1 nested_mem = False # iterate over operands in reverse order while i >= 0: c = insn_pieces[i] if c == "": i -= 1 continue if cur_operand is None: cur_operand = [] self.operands.append(cur_operand) # Check if this is a number or an identifier. ordc = ord(c[0]) # pylint:disable=too-many-boolean-expressions if 0x30 <= ordc <= 0x39 or 0x41 <= ordc <= 0x5A or 0x61 <= ordc <= 0x7A: # perform some basic classification intc = None reg = False try: intc = int(c, 0) except ValueError: reg = c in self.arch.registers # if this is a "live" piece, liven it up! # special considerations: # - registers should consolidate with a $ or % prefix # - integers should consolidate with a sign prefix if reg: prefix = "" if i > 0 and insn_pieces[i - 1] in ("$", "%"): prefix = insn_pieces[i - 1] insn_pieces[i - 1] = "" cur_operand.append(Register(c, prefix)) elif intc is not None: with_sign = False if i > 0 and insn_pieces[i - 1] in ("+", "-"): with_sign = True if insn_pieces[i - 1] == "-": intc = -intc # pylint: disable=invalid-unary-operand-type insn_pieces[i - 1] = "" cur_operand.append(Value(intc, with_sign)) else: cur_operand.append(c) elif c == "," and not nested_mem: cs_op_num -= 1 cur_operand = None elif c == ":": # XXX this is a hack! fix this later insn_pieces[i - 1] += ":" else: # Check if we are inside braces or parentheses. Do not forget # that we are iterating in reverse order! if c == "]" or c == ")": nested_mem = True elif c == "[" or c == "(": nested_mem = False if cur_operand is None: cur_operand = [c] self.operands.append(cur_operand) else: cur_operand.append(c if c[0] != "," else c + " ") i -= 1 self.opcode = Opcode(self) self.operands.reverse() if not hasattr(self.insn, "operands"): # Not all disassemblers provide operands. Just use our smart split for i, o in enumerate(self.operands): o.reverse() self.operands[i] = Operand.build(1, i, o, self) return if len(self.operands) != len(self.insn.operands): l.error( "Operand parsing failed for instruction %s. %d operands are parsed, while %d are expected.", str(self.insn), len(self.operands), len(self.insn.operands), ) self.operands = [] return for i, o in enumerate(self.operands): o.reverse() self.operands[i] = Operand.build(self.insn.operands[i].type, i, o, self)
[docs] @staticmethod def split_op_string(insn_str): pieces = [] in_word = False for c in insn_str: if c.isspace(): in_word = False continue if c.isalnum(): if in_word: pieces[-1] += c else: in_word = True pieces.append(c) else: in_word = False pieces.append(c) return pieces
def _render(self, formatting=None): return [ "{} {}".format(self.opcode.render(formatting)[0], ", ".join(o.render(formatting)[0] for o in self.operands)) ]
[docs]class SootExpression(DisassemblyPiece):
[docs] def __init__(self, expr): self.expr = expr
def _render(self, formatting=None): return [self.expr]
[docs]class SootExpressionTarget(SootExpression):
[docs] def __init__(self, target_stmt_idx): super().__init__(target_stmt_idx) self.target_stmt_idx = target_stmt_idx
def _render(self, formatting=None): return ["Goto %d" % self.target_stmt_idx]
[docs]class SootExpressionStaticFieldRef(SootExpression):
[docs] def __init__(self, field): field_str = ".".join(field) super().__init__(field_str) self.field = field self.field_str = field_str
def _render(self, formatting=None): return [self.field_str]
[docs]class SootExpressionInvoke(SootExpression): Virtual = "virtual" Static = "static" Special = "special"
[docs] def __init__(self, invoke_type, expr): super().__init__(str(expr)) self.invoke_type = invoke_type self.base = str(expr.base) if self.invoke_type in (self.Virtual, self.Special) else "" self.method_name = expr.method_name self.arg_str = expr.list_to_arg_str(expr.args)
def _render(self, formatting=None): return [ "{}{}({}) [{}]".format( self.base + "." if self.base else "", self.method_name, self.arg_str, self.invoke_type ) ]
[docs]class SootStatement(DisassemblyPiece):
[docs] def __init__(self, block_addr, raw_stmt): self.addr = block_addr.copy() self.addr.stmt_idx = raw_stmt.label self.raw_stmt = raw_stmt self.components = [] self._parse()
@property def stmt_idx(self): return self.addr.stmt_idx def _parse(self): func = "_parse_%s" % self.raw_stmt.__class__.__name__ if hasattr(self, func): getattr(self, func)() else: # print func self.components += ["NotImplemented: %s" % func] def _expr(self, expr): func = "_handle_%s" % expr.__class__.__name__ if hasattr(self, func): return getattr(self, func)(expr) else: # print func return SootExpression(str(expr)) def _render(self, formatting=None): return [ " ".join( [ component if type(component) is str else component.render(formatting=formatting)[0] for component in self.components ] ) ] # # Statement parsers # def _parse_AssignStmt(self): self.components += [ SootExpression(str(self.raw_stmt.left_op)), "=", self._expr(self.raw_stmt.right_op), ] def _parse_InvokeStmt(self): self.components += [ self._expr(self.raw_stmt.invoke_expr), ] def _parse_GotoStmt(self): self.components += [ SootExpressionTarget(self.raw_stmt.target), ] def _parse_IfStmt(self): self.components += [ "if (", SootExpression(str(self.raw_stmt.condition)), ")", SootExpressionTarget(self.raw_stmt.target), ] def _parse_ReturnVoidStmt(self): self.components += [ "return", ] def _parse_IdentityStmt(self): self.components += [ SootExpression(str(self.raw_stmt.left_op)), "<-", SootExpression(str(self.raw_stmt.right_op)), ] # # Expression handlers # def _handle_SootStaticFieldRef(self, expr): return SootExpressionStaticFieldRef(expr.field[::-1]) def _handle_SootVirtualInvokeExpr(self, expr): return SootExpressionInvoke(SootExpressionInvoke.Virtual, expr) def _handle_SootStaticInvokeExpr(self, expr): return SootExpressionInvoke(SootExpressionInvoke.Static, expr) def _handle_SootSpecialInvokeExpr(self, expr): return SootExpressionInvoke(SootExpressionInvoke.Special, expr)
[docs]class Opcode(DisassemblyPiece):
[docs] def __init__(self, parentinsn): self.addr = parentinsn.addr self.insn = parentinsn.insn self.parentinsn = parentinsn self.opcode_string = self.insn.mnemonic self.ident = (self.addr, "opcode")
def _render(self, formatting=None): return [self.opcode_string.ljust(7)] def __eq__(self, other): return type(other) is Opcode and self.opcode_string == other.opcode_string
[docs]class Operand(DisassemblyPiece):
[docs] def __init__(self, op_num, children, parentinsn): self.addr = parentinsn.addr self.children = children self.parentinsn = parentinsn self.op_num = op_num self.ident = (self.addr, "operand", self.op_num) for i, c in enumerate(self.children): if type(c) not in (bytes, str): c.ident = (self.addr, "operand piece", self.op_num, i) c.parentop = self
@property def cs_operand(self): return self.parentinsn.insn.operands[self.op_num] def _render(self, formatting): return [ "".join( x if type(x) is str else x.decode() if type(x) is bytes else x.render(formatting)[0] for x in self.children ) ]
[docs] @staticmethod def build(operand_type, op_num, children, parentinsn): # Maps capstone operand types to operand classes MAPPING = { 0: Operand, # default type for operand that haven't been fully implemented 1: RegisterOperand, 2: ConstantOperand, 3: MemoryOperand, 4: Operand, # ARM FP 64: Operand, # ARM CIMM 65: Operand, # ARM PIMM | ARM64 REG_MRS 66: Operand, # ARM SETEND | ARM64 REG_MSR 67: Operand, # ARM SYSREG | ARM64 PSTATE 68: Operand, # ARM64 SYS 69: Operand, # ARM64 PREFETCH 70: Operand, # ARM64 BARRIER } cls = MAPPING.get(operand_type, None) if cls is None: raise ValueError("Unknown capstone operand type %s." % operand_type) operand = cls(op_num, children, parentinsn) # Post-processing if cls is MemoryOperand and parentinsn.arch.name in {"AMD64"} and len(operand.values) == 2: op0, op1 = operand.values if type(op0) is Register and op0.is_ip and type(op1) is Value: # Indirect addressing in x86_64 # 400520 push [rip+0x200782] ==> 400520 push [0x600ca8] absolute_addr = parentinsn.addr + parentinsn.size + op1.val return MemoryOperand(1, operand.prefix + ["[", Value(absolute_addr, False), "]"], parentinsn) return operand
[docs]class ConstantOperand(Operand): pass
[docs]class RegisterOperand(Operand): @property def register(self): return next((child for child in self.children if isinstance(child, Register)), None) def _render(self, formatting): custom_value_str = None if formatting is not None: try: custom_value_str = formatting["custom_values_str"][self.ident] except KeyError: pass if custom_value_str: return [custom_value_str] else: return super()._render(formatting)
[docs]class MemoryOperand(Operand):
[docs] def __init__(self, op_num, children, parentinsn): super().__init__(op_num, children, parentinsn) # a typical "children" looks like the following: # [ 'dword', 'ptr', '[', Register, Value, ']' ] # or # [ '[', Register, ']' ] # or # [ Value, '(', Regsiter, ')' ] # it will be converted into more meaningful and Pythonic properties self.segment_selector = None self.prefix = [] self.suffix_str = "" # could be arm pre index mark "!" self.values = [] self.offset = [] # offset_location # - prefix: -0xff00($gp) # - before_value: 0xff00+rax # - after_value: rax+0xff00 self.offset_location = "after_value" # values_style # - square: [rax+0x10] # - curly: {rax+0x10} # - paren: (rax+0x10) self.values_style = "square" try: if "[" in self.children: self._parse_memop_squarebracket() elif "(" in self.children: self._parse_memop_paren() else: raise ValueError() except ValueError: l.error("Failed to parse operand children %s. Please report to Fish.", self.children) # setup all dummy properties self.prefix = None self.values = None
def _parse_memop_squarebracket(self): if self.children[0] != "[": try: square_bracket_pos = self.children.index("[") except ValueError: # pylint: disable=try-except-raise raise self.prefix = self.children[:square_bracket_pos] # take out segment selector if len(self.prefix) == 3: self.segment_selector = self.prefix[-1] self.prefix = self.prefix[:-1] else: self.segment_selector = None else: # empty square_bracket_pos = 0 self.prefix = [] self.segment_selector = None close_square_pos = len(self.children) - 1 if self.children[-1] != "]": if self.children[-1] == "!" and self.children[-2] == "]": # arm64 pre index self.suffix_str = "!" close_square_pos -= 1 else: raise ValueError() self.values = self.children[square_bracket_pos + 1 : close_square_pos] def _parse_memop_paren(self): offset = [] self.values_style = "paren" if self.children[0] != "(": try: paren_pos = self.children.index("(") except ValueError: # pylint: disable=try-except-raise raise if all(isinstance(item, str) for item in self.children[:paren_pos]): # parse prefix self.prefix = self.children[:paren_pos] elif all(isinstance(item, Value) for item in self.children[:paren_pos]): # parse offset # force each piece to be rendered with its sign (+/-) offset += self.children[:paren_pos] # offset appears before the left parenthesis self.offset_location = "prefix" else: paren_pos = 0 self.prefix = [] self.segment_selector = None self.values = self.children[paren_pos + 1 : len(self.children) - 1] self.offset = offset def _render(self, formatting): if self.prefix is None: # we failed in parsing. use the default rendering return super()._render(formatting) else: values_style = self.values_style show_prefix = True custom_values_str = None if formatting is not None: try: values_style = formatting["values_style"][self.ident] except KeyError: pass try: show_prefix_str = formatting["show_prefix"][self.ident] if show_prefix_str in ("false", "False"): show_prefix = False except KeyError: pass try: custom_values_str = formatting["custom_values_str"][self.ident] except KeyError: pass prefix_str = " ".join(self.prefix) + " " if show_prefix and self.prefix else "" if custom_values_str is not None: value_str = custom_values_str else: value_str = "".join( x.render(formatting)[0] if not isinstance(x, (bytes, str)) else x for x in self.values ) if values_style == "curly": left_paren, right_paren = "{", "}" elif values_style == "paren": left_paren, right_paren = "(", ")" else: # square left_paren, right_paren = "[", "]" if self.offset: offset_str = "".join( x.render(formatting)[0] if not isinstance(x, (bytes, str)) else x for x in self.offset ) # combine values and offsets according to self.offset_location if self.offset_location == "prefix": value_str = "".join([offset_str, left_paren, value_str, right_paren]) elif self.offset_location == "before_value": value_str = "".join([left_paren, offset_str, value_str, right_paren]) else: # after_value value_str = "".join([left_paren, value_str, offset_str, right_paren]) else: value_str = left_paren + value_str + right_paren segment_selector_str = "" if self.segment_selector is None else self.segment_selector if segment_selector_str and prefix_str: prefix_str += " " return [f"{prefix_str}{segment_selector_str}{value_str}{self.suffix_str}"]
[docs]class OperandPiece(DisassemblyPiece): # pylint: disable=abstract-method # These get filled in later... addr = None parentop = None ident = None
[docs]class Register(OperandPiece):
[docs] def __init__(self, reg, prefix=""): self.reg = reg self.prefix = prefix self.is_ip = self.reg in {"eip", "rip", "pc"} # TODO: Support more architectures
def _render(self, formatting): # TODO: register renaming return [self.prefix + self.reg] def __eq__(self, other): return type(other) is Register and self.reg == other.reg
[docs]class Value(OperandPiece):
[docs] def __init__(self, val, render_with_sign): self.val = val self.render_with_sign = render_with_sign
@property def project(self): return self.parentop.parentinsn.project def __eq__(self, other): return type(other) is Value and self.val == other.val def _render(self, formatting): if formatting is not None: try: style = formatting["int_styles"][self.ident] if style[0] == "hex": if self.render_with_sign: return ["%#+x" % self.val] else: return ["%#x" % self.val] elif style[0] == "dec": if self.render_with_sign: return ["%+d" % self.val] else: return [str(self.val)] elif style[0] == "label": labeloffset = style[1] if labeloffset == 0: lbl = self.project.kb.labels[self.val] return [lbl] return [ "{}{}{:#+x}".format( "+" if self.render_with_sign else "", self.project.kb.labels[self.val + labeloffset], labeloffset, ) ] except KeyError: pass # default case try: func = self.project.kb.functions.get_by_addr(self.val) except KeyError: func = None if self.val in self.project.kb.labels: lbl = self.project.kb.labels[self.val] if func is not None: # see if lbl == func.name and func.demangled_name != func.name. if so, we prioritize the # demangled name if lbl == func.name and func.name != func.demangled_name: normalized_name = get_cpp_function_name(func.demangled_name, specialized=False, qualified=True) return [normalized_name] return [("+" if self.render_with_sign else "") + lbl] elif func is not None: return [func.demangled_name] else: if self.render_with_sign: return ["%#+x" % self.val] else: return ["%#x" % self.val]
[docs]class Comment(DisassemblyPiece):
[docs] def __init__(self, addr, text): self.addr = addr self.text = text.split("\n")
def _render(self, formatting=None): return [self.text]
[docs] def height(self, formatting): lines = len(self.text) return 0 if lines == 1 else lines
[docs]class FuncComment(DisassemblyPiece):
[docs] def __init__(self, func): self.func = func
def _render(self, formatting=None): return ["##", "## Function " + self.func.name, "##"]
[docs]class Disassembly(Analysis): """ Produce formatted machine code disassembly. """
[docs] def __init__( self, function: Optional[Function] = None, ranges: Optional[Sequence[Tuple[int, int]]] = None, thumb: bool = False, include_ir: bool = False, block_bytes: Optional[bytes] = None, ): self.raw_result = [] self.raw_result_map = { "block_starts": {}, "comments": {}, "labels": {}, "instructions": {}, "hooks": {}, "ir": defaultdict(list), } self.block_to_insn_addrs = defaultdict(list) self._func_cache = {} self._include_ir = include_ir self._block_bytes = block_bytes self._graph = None if function is not None: # sort them by address, put hooks before nonhooks self._graph = function.graph blocks = sorted(function.graph.nodes(), key=lambda node: (node.addr, not node.is_hook)) for block in blocks: self.parse_block(block) elif ranges is not None: cfg = self.project.kb.cfgs.get_most_accurate() fallback = True if cfg is not None: try: self._graph = cfg.graph for start, end in ranges: if start == end: continue assert start < end # Grab all blocks that intersect target range blocks = sorted( [ n.to_codenode() for n in self._graph.nodes() if not (n.addr + (n.size or 1) <= start or n.addr >= end) ], key=lambda node: (node.addr, not node.is_hook), ) # Trim blocks that are not within range for i, block in enumerate(blocks): if block.size and block.addr < start: delta = start - block.addr block_bytes = block.bytestr[delta:] if block.bytestr else None blocks[i] = BlockNode(block.addr + delta, block.size - delta, block_bytes) for i, block in enumerate(blocks): real_block_addr = block.addr if not block.thumb else block.addr - 1 if block.size and real_block_addr + block.size > end: delta = real_block_addr + block.size - end block_bytes = block.bytestr[0:-delta] if block.bytestr else None blocks[i] = BlockNode(block.addr, block.size - delta, block_bytes) for block in blocks: self.parse_block(block) fallback = False except KeyError: pass if fallback: # CFG not available, or the block cannot be found in the CFG (e.g., the block is dynamically # generated). Simply disassemble the code in the given regions. In the future we may want to handle # this case by automatically running CFG analysis on given ranges. for start, end in ranges: self.parse_block( BlockNode( start, end - start, thumb=thumb, bytestr=self._block_bytes if len(ranges) == 1 else None, ) )
[docs] def func_lookup(self, block): try: return self._func_cache[block.function.addr] except AttributeError: return None except KeyError: f = FunctionStart(block.function) self._func_cache[f.addr] = f return f
def _add_instruction_to_results(self, block: BlockNode, insn: DisassemblerInsn, bs: BlockStart) -> None: """ Add instruction to analysis results with associated labels and comments """ if insn.address in self.kb.labels: label = Label(insn.address, self.kb.labels[insn.address]) self.raw_result.append(label) self.raw_result_map["labels"][label.addr] = label if insn.address in self.kb.comments: comment = Comment(insn.address, self.kb.comments[insn.address]) self.raw_result.append(comment) self.raw_result_map["comments"][comment.addr] = comment instruction = Instruction(insn, bs) self.raw_result.append(instruction) self.raw_result_map["instructions"][instruction.addr] = instruction self.block_to_insn_addrs[block.addr].append(insn.address) def _add_block_ir_to_results(self, block: BlockNode, irsb: IRSBType) -> None: """ Add lifter IR for this block """ addr_to_ops_map = self.raw_result_map["ir"] addr = block.addr ops = addr_to_ops_map[addr] if irsb.statements is not None: if pcode is not None and isinstance(self.project.factory.default_engine, pcode.HeavyPcodeMixin): for ins in irsb._instructions: addr = ins.address.offset addr_to_ops_map[addr].extend([IROp(addr, op.seq.uniq, op, irsb) for op in ins.ops]) else: for seq, stmt in enumerate(irsb.statements): if isinstance(stmt, pyvex.stmt.IMark): addr = stmt.addr ops = addr_to_ops_map[addr] else: ops.append(IROp(addr, seq, stmt, irsb))
[docs] def parse_block(self, block: BlockNode) -> None: """ Parse instructions for a given block node """ func = self.func_lookup(block) if func and func.addr == block.addr: self.raw_result.append(FuncComment(block.function)) self.raw_result.append(func) bs = BlockStart(block, func, self.project) self.raw_result.append(bs) if block.is_hook: hook = Hook(block) self.raw_result.append(hook) self.raw_result_map["hooks"][block.addr] = hook elif self.project.arch.capstone_support: # Prefer Capstone first, where we are able to extract a bit more # about the operands if block.thumb: aligned_block_addr = (block.addr >> 1) << 1 cs = self.project.arch.capstone_thumb else: aligned_block_addr = block.addr cs = self.project.arch.capstone if block.bytestr is None: bytestr = self.project.loader.memory.load(aligned_block_addr, block.size) else: bytestr = block.bytestr self.block_to_insn_addrs[block.addr] = [] for cs_insn in cs.disasm(bytestr, block.addr): self._add_instruction_to_results(block, CapstoneInsn(cs_insn), bs) elif pcode is not None and isinstance(self.project.factory.default_engine, pcode.HeavyPcodeMixin): # When using the P-code engine, we can fall back on its disassembly # in the event that Capstone does not support it self.block_to_insn_addrs[block.addr] = [] b = self.project.factory.block(block.addr, size=block.size) for insn in b.disassembly.insns: self._add_instruction_to_results(block, insn, bs) elif type(block) is SootBlockNode: for raw_stmt in block.stmts: stmt = SootStatement(block.addr, raw_stmt) self.raw_result.append(stmt) self.raw_result_map["instructions"][stmt.addr] = stmt self.block_to_insn_addrs[block.addr].append(stmt.addr) else: raise TypeError("") if self._include_ir: b = self.project.factory.block(block.addr, size=block.size) self._add_block_ir_to_results(block, b.vex)
[docs] def render( self, formatting=None, show_edges: bool = True, show_addresses: bool = True, show_bytes: bool = False, ascii_only: Optional[bool] = None, color: bool = True, ) -> str: """ Render the disassembly to a string, with optional edges and addresses. Color will be added by default, if enabled. To disable color pass an empty formatting dict. """ max_bytes_per_line = 5 bytes_width = max_bytes_per_line * 3 + 1 a2ln = defaultdict(list) buf = [] if formatting is None: formatting = { "colors": { "address": "gray", "bytes": "cyan", "edge": "yellow", Label: "bright_yellow", ConstantOperand: "cyan", MemoryOperand: "yellow", Comment: "gray", Hook: "green", } if ansi_color_enabled and color else {}, "format_callback": lambda item, s: ansi_color(s, formatting["colors"].get(type(item), None)), } def col(item: Any) -> Optional[str]: try: return formatting["colors"][item] except KeyError: return None def format_address(addr: int, color: bool = True) -> str: if not show_addresses: return "" a, pad = f"{addr:x}", " " return (ansi_color(a, col("address")) if color else a) + pad def format_bytes(data: bytes, color: bool = True) -> str: s = " ".join(f"{x:02x}" for x in data).ljust(bytes_width) return ansi_color(s, col("bytes")) if color else s def format_comment(text: str, color: bool = True) -> str: s = " ; " + text return ansi_color(s, col(Comment)) if color else s comment = None for item in self.raw_result: if isinstance(item, BlockStart): if len(buf) > 0: buf.append("") elif isinstance(item, Label): pad = len(format_address(item.addr, False)) * " " if show_bytes: pad += bytes_width * " " buf.append(pad + item.render(formatting)[0]) elif isinstance(item, Comment): comment = item elif isinstance(item, Hook): a2ln[item.addr].append(len(buf)) buf.append(format_address(item.addr) + item.render(formatting)[0]) elif isinstance(item, Instruction): a2ln[item.addr].append(len(buf)) lines = [] # Chop instruction bytes into line segments p, insn_bytes = 0, [] while show_bytes and p < len(item.insn.bytes): s = item.insn.bytes[p : p + min(len(item.insn.bytes) - p, max_bytes_per_line)] p += len(s) insn_bytes.append(s) # Format the instruction's address, bytes, disassembly, and comment s_plain = format_address(item.addr, False) s = format_address(item.addr) if show_bytes: bytes_column = len(s_plain) s_plain += format_bytes(insn_bytes[0], False) s += format_bytes(insn_bytes[0]) s_plain += item.render()[0] s += item.render(formatting)[0] if comment is not None: comment_column = len(s_plain) s += format_comment(comment.text[0]) lines.append(s) # Add additional lines of instruction bytes for i in range(1, len(insn_bytes)): lines.append(" " * bytes_column + format_bytes(insn_bytes[i])) # Add additional lines of comments if comment is not None: for i in range(1, len(comment.text)): if len(lines) <= i: lines.append(" " * comment_column) lines[i] += format_comment(comment.text[i]) comment = None buf.extend(lines) else: buf.append("".join(item.render(formatting))) if self._graph is not None and show_edges and buf: edges_by_line = set() for edge in self._graph.edges.items(): from_block, to_block = edge[0] if from_block.size is None: continue if to_block.addr != from_block.addr + from_block.size: from_addr = edge[1]["ins_addr"] to_addr = to_block.addr if not (from_addr in a2ln and to_addr in a2ln): continue for f in a2ln[from_addr]: for t in a2ln[to_addr]: edges_by_line.add((f, t)) # Render block edges, to a reference buffer for tracking and output buffer for display edge_buf = ["" for _ in buf] ref_buf = ["" for _ in buf] edge_col = col("edge") for f, t in sorted(edges_by_line, key=lambda e: abs(e[0] - e[1])): add_edge_to_buffer(edge_buf, ref_buf, f, t, lambda s: ansi_color(s, edge_col), ascii_only=ascii_only) add_edge_to_buffer(ref_buf, ref_buf, f, t, ascii_only=ascii_only) max_edge_depth = max(map(len, ref_buf)) # Justify edge and combine with disassembly for i, line in enumerate(buf): buf[i] = " " * (max_edge_depth - len(ref_buf[i])) + edge_buf[i] + line return "\n".join(buf)
from angr.analyses import AnalysesHub AnalysesHub.register_default("Disassembly", Disassembly)