from typing import TYPE_CHECKING
import logging
import re
import string
import struct
from collections import defaultdict
from itertools import count
import capstone
import cle
import networkx
import pyvex
from . import Analysis
from .cfg.cfg_emulated import CFGEmulated
from .ddg import DDG
from .cfg.cfg_fast import CFGFast
from ..codenode import CodeNode
from ..knowledge_plugins.cfg.memory_data import MemoryDataSort
from ..knowledge_plugins.functions import Function
from ..knowledge_base import KnowledgeBase
from ..sim_variable import SimMemoryVariable, SimTemporaryVariable
if TYPE_CHECKING:
from .cfg import CFGNode
l = logging.getLogger(name=__name__)
#
# Exceptions
#
[docs]class BinaryError(Exception):
pass
[docs]class InstructionError(BinaryError):
pass
[docs]class ReassemblerFailureNotice(BinaryError):
pass
#
# Constants
#
OP_TYPE_REG = 1
OP_TYPE_IMM = 2
OP_TYPE_MEM = 3
OP_TYPE_RAW = 4
OP_TYPE_MAP = {
OP_TYPE_REG: "REG",
OP_TYPE_IMM: "IMM",
OP_TYPE_MEM: "MEM",
OP_TYPE_RAW: "RAW",
}
CAPSTONE_OP_TYPE_MAP = {
"X86": {
capstone.x86.X86_OP_REG: OP_TYPE_REG,
capstone.x86.X86_OP_IMM: OP_TYPE_IMM,
capstone.x86.X86_OP_MEM: OP_TYPE_MEM,
},
"AMD64": {
capstone.x86.X86_OP_REG: OP_TYPE_REG,
capstone.x86.X86_OP_IMM: OP_TYPE_IMM,
capstone.x86.X86_OP_MEM: OP_TYPE_MEM,
},
}
CAPSTONE_REG_MAP = {
# will be filled up by fill_reg_map()
"X86": {},
"AMD64": {},
}
# Utils
[docs]def string_escape(s):
if isinstance(s, bytes):
s = "".join(chr(i) for i in s)
s = s.encode("unicode_escape").decode("utf-8")
s = s.replace("\\'", "'")
s = s.replace('"', '\\"')
return s
[docs]def fill_reg_map():
# TODO: Support more architectures
for attr in dir(capstone.x86):
if attr.startswith("X86_REG_"):
reg_name = attr[8:]
reg_offset = getattr(capstone.x86, attr)
CAPSTONE_REG_MAP["X86"][reg_offset] = reg_name.lower()
for attr in dir(capstone.x86):
if attr.startswith("X86_REG_"):
reg_name = attr[8:]
reg_offset = getattr(capstone.x86, attr)
CAPSTONE_REG_MAP["AMD64"][reg_offset] = reg_name.lower()
[docs]def split_operands(s):
operands = []
operand = ""
in_paranthesis = False
for i, c in enumerate(s):
if in_paranthesis and c == ")":
in_paranthesis = False
if c == "(":
in_paranthesis = True
if not in_paranthesis and c == "," and (i == len(s) - 1 or s[i + 1] == " "):
operands.append(operand)
operand = ""
continue
operand += c
if operand:
operands.append(operand)
return operands
[docs]def is_hex(s):
try:
int(s, 16)
return True
except ValueError:
return False
fill_reg_map()
[docs]class Label:
g_label_ctr = count()
[docs] def __init__(self, binary, name, original_addr=None):
self.binary = binary
self.name = name
self.assigned = False
self.var_size = None
if self.name is None:
self.name = "label_%d" % next(Label.g_label_ctr)
self.original_addr = original_addr
self.base_addr = None
#
# Overridden predefined methods
#
def __str__(self):
"""
:return:
"""
# if self.var_size is not None:
# s = ".type {name},@object\n.comm {name},{size},{size}".format(name=self.name, size=self.var_size)
# else:
s = f".{self.name}:"
return s
def __hash__(self):
return hash(self.name)
def __eq__(self, other):
return self.name == other.name
#
# Properties
#
@property
def operand_str(self):
if self.base_addr is None:
return ".%s" % self.name
else:
offset = self.offset
sign = "+" if offset >= 0 else "-"
offset = abs(offset)
return ".%s%s%d" % (self.name, sign, offset)
@property
def offset(self):
if self.base_addr is None:
return 0
return self.original_addr - self.base_addr
#
# Static methods
#
[docs] @staticmethod
def new_label(binary, name=None, function_name=None, original_addr=None, data_label=False):
if function_name is not None:
return FunctionLabel(binary, function_name, original_addr)
elif data_label:
return DataLabel(binary, original_addr)
else:
return Label(binary, name, original_addr=original_addr)
[docs]class DataLabel(Label):
[docs] def __init__(self, binary, original_addr, name=None):
Label.__init__(self, binary, name, original_addr=original_addr)
@property
def operand_str(self):
if self.base_addr is None:
return self.name
else:
offset = self.offset
sign = "+" if offset >= 0 else "-"
offset = abs(offset)
return f"({self.name}{sign}{offset})"
def __str__(self):
# if self.var_size is not None:
# s = ".comm {name},{size},{size}".format(name=self.name, size=self.var_size)
# else:
s = "%s:" % (self.name)
return s
[docs]class FunctionLabel(Label):
[docs] def __init__(self, binary, function_name, original_addr, plt=False):
Label.__init__(self, binary, function_name, original_addr=original_addr)
self.plt = plt
@property
def function_name(self):
return self.name
@property
def operand_str(self):
return self.name
def __str__(self):
return ("\t.globl {func_name}\n" + "\t.type {func_name}, @function\n" + "{func_name}:").format(
func_name=self.function_name
)
[docs]class ObjectLabel(Label):
[docs] def __init__(self, binary, symbol_name, original_addr, plt=False):
Label.__init__(self, binary, symbol_name, original_addr=original_addr)
self.plt = plt
@property
def symbol_name(self):
return self.name
@property
def operand_str(self):
return self.name
def __str__(self):
return ("\t.globl {symbol_name}\n" + "\t.type {symbol_name}, @object\n" + "{symbol_name}:").format(
symbol_name=self.symbol_name
)
[docs]class NotypeLabel(Label):
[docs] def __init__(self, binary, symbol_name, original_addr, plt=False):
Label.__init__(self, binary, symbol_name, original_addr=original_addr)
self.plt = plt
@property
def symbol_name(self):
return self.name
@property
def operand_str(self):
return self.name
def __str__(self):
return ("\t.globl {symbol_name}\n" + "\t.type {symbol_name}, @notype\n" + "{symbol_name}:").format(
symbol_name=self.symbol_name
)
[docs]class SymbolManager:
"""
SymbolManager manages all symbols in the binary.
"""
[docs] def __init__(self, binary, cfg):
"""
Constructor.
:param Reassembler binary: The Binary analysis instance.
:param angr.analyses.CFG cfg: The CFG analysis instance.
:return: None
"""
self.binary = binary
self.project = binary.project
self.cfg = cfg
self.addr_to_label = defaultdict(list)
self.symbol_names = set() # deduplicate symbol names
[docs] def get_unique_symbol_name(self, symbol_name):
if symbol_name not in self.symbol_names:
self.symbol_names.add(symbol_name)
return symbol_name
i = 0
while True:
name = "%s_%d" % (symbol_name, i)
if name not in self.symbol_names:
self.symbol_names.add(name)
return name
i += 1
[docs] def new_label(self, addr, name=None, is_function=None, force=False):
if force:
if self.binary.main_nonexecutable_regions_contain(addr):
label = DataLabel(self.binary, addr, name=name)
else:
label = Label.new_label(self.binary, name=name, original_addr=addr)
self.addr_to_label[addr].append(label)
return label
if addr in self.addr_to_label:
return self.addr_to_label[addr][0]
# Check if the address points to a function by checking the plt of main binary
reverse_plt = self.project.loader.main_object.reverse_plt
if addr in reverse_plt:
# It's a PLT entry!
label = FunctionLabel(self.binary, reverse_plt[addr], addr, plt=True)
elif addr is not None and self.project.loader.find_symbol(addr) is not None:
# It's an extern symbol
symbol = self.project.loader.find_symbol(addr)
if symbol.owner is self.project.loader.main_object:
symbol_name = symbol.name
if "@" in symbol_name:
symbol_name = symbol_name[: symbol_name.index("@")]
# check the type...
if symbol.type == cle.SymbolType.TYPE_FUNCTION:
# it's a function!
unique_symbol_name = self.get_unique_symbol_name(symbol_name)
label = FunctionLabel(self.binary, unique_symbol_name, addr)
elif symbol.type == cle.SymbolType.TYPE_OBJECT:
# it's an object
unique_symbol_name = self.get_unique_symbol_name(symbol_name)
label = ObjectLabel(self.binary, unique_symbol_name, addr)
elif symbol.type == cle.SymbolType.TYPE_NONE:
# notype
unique_symbol_name = self.get_unique_symbol_name(symbol_name)
label = NotypeLabel(self.binary, unique_symbol_name, addr)
elif symbol.type == cle.SymbolType.TYPE_SECTION:
# section label
# use a normal label instead
if not name:
# handle empty names
name = None
label = Label.new_label(self.binary, name=name, original_addr=addr)
else:
raise Exception("Unsupported symbol type %s. Bug Fish about it!" % symbol.type)
else:
raise Exception(
"the symbol %s is not owned by the main object. Try reload the project with"
'"auto_load_libs=False". If that does not solve the issue, please report to GitHub.' % symbol.name
)
elif (addr is not None and addr in self.cfg.functions) or is_function:
# It's a function identified by angr's CFG recovery
if is_function and name is not None:
function_name = name
else:
function_name = self.cfg.functions[addr].name
# special function name for entry point
if addr == self.project.entry:
function_name = "_start"
label = FunctionLabel(self.binary, function_name, addr)
elif addr is not None and self.binary.main_nonexecutable_regions_contain(addr):
label = DataLabel(self.binary, addr)
else:
label = Label.new_label(self.binary, name=name, original_addr=addr)
if addr is not None:
self.addr_to_label[addr].append(label)
return label
[docs] def label_got(self, addr, label):
"""
Mark a certain label as assigned (to an instruction or a block of data).
:param int addr: The address of the label.
:param angr.analyses.reassembler.Label label:
The label that is just assigned.
:return: None
"""
if label in self.addr_to_label[addr]:
label.assigned = True
[docs]class Operand:
[docs] def __init__(
self, binary, insn_addr, insn_size, capstone_operand, operand_str, mnemonic, operand_offset, syntax=None
):
"""
Constructor.
:param Reassembler binary: The Binary analysis.
:param int insn_addr: Address of the instruction.
:param capstone_operand:
:param str operand_str: the string representation of this operand
:param str mnemonic: Mnemonic of the instruction that this operand belongs to.
:param int operand_offset: offset of the operand into the instruction.
:param str syntax: Provide a way to override the default syntax coming from `binary`.
:return: None
"""
self.binary = binary
self.project = binary.project
self.insn_addr = insn_addr
self.insn_size = insn_size
self.operand_str = operand_str
self.mnemonic = mnemonic
self.operand_offset = operand_offset
self.syntax = self.binary.syntax if syntax is None else syntax
self.type = None
self.size = capstone_operand.size
# IMM
self.is_coderef = None
self.is_dataref = None
self.label = None
self.label_offset = 0
# MEM
self.base = None
self.index = None
self.scale = None
self.disp = None
# RAW
self.raw_asm = None
self.disp_is_coderef = None
self.disp_is_dataref = None
self.disp_label = None
self.disp_label_offset = 0
self._initialize(capstone_operand)
#
# Public methods
#
[docs] def assembly(self):
if self.type == OP_TYPE_IMM and self.label:
if self.label_offset > 0:
return "%s + %d" % (self.label.operand_str, self.label_offset)
elif self.label_offset < 0:
return "%s - %d" % (self.label.operand_str, abs(self.label_offset))
else:
return self.label.operand_str
elif self.type == OP_TYPE_MEM:
disp = ""
if self.disp:
if self.disp_label:
if self.disp_label_offset > 0:
disp = "%s + %d" % (self.disp_label.operand_str, self.disp_label_offset)
elif self.disp_label_offset < 0:
disp = "%s - %d" % (self.disp_label.operand_str, abs(self.disp_label_offset))
else:
disp = self.disp_label.operand_str
else:
disp = "%d" % self.disp
base = ""
if self.base:
base = CAPSTONE_REG_MAP[self.project.arch.name][self.base]
if self.syntax == "at&t":
# displacement(base, index, scale)
base = "%%%s" % base if base else ""
if "*" in self.operand_str and disp:
# absolute memory address
disp = "*" + disp
if self.index:
s = "%s(%s, %%%s, %d)" % (
disp,
base,
CAPSTONE_REG_MAP[self.project.arch.name][self.index],
self.scale,
)
elif self.base: # not self.index
s = f"{disp}({base})"
else:
s = disp
return s
else:
s = []
if base:
s.append(base)
if self.index and self.scale:
if s:
s.append("+")
s.append("(%s * %d)" % (CAPSTONE_REG_MAP[self.project.arch.name][self.index], self.scale))
if disp:
if disp.startswith("-"):
s.append("-")
s.append(disp[1:])
else:
if s:
s.append("+")
s.append(disp)
asm = " ".join(s)
# we need to specify the size here
if self.size == 16:
asm = "xmmword ptr [%s]" % asm
elif self.size == 10:
asm = "xword ptr [%s]" % asm
elif self.size == 8:
asm = "qword ptr [%s]" % asm
elif self.size == 4:
asm = "dword ptr [%s]" % asm
elif self.size == 2:
asm = "word ptr [%s]" % asm
elif self.size == 1:
asm = "byte ptr [%s]" % asm
else:
raise BinaryError('Unsupported memory operand size for operand "%s"' % self.operand_str)
return asm
elif self.type == OP_TYPE_RAW:
return self.raw_asm
else:
# Nothing special
return None
#
# Overridden predefined methods
#
def __str__(self):
"""
:return:
"""
op_type = OP_TYPE_MAP[self.type]
ref_type = ""
if self.is_coderef:
ref_type = "CODEREF"
elif self.is_dataref:
ref_type = "DATAREF"
if ref_type:
return f"{op_type} <{ref_type}>"
else:
return op_type
#
# Properties
#
@property
def is_immediate(self):
return self.type == OP_TYPE_IMM
@property
def symbolized(self):
return self.label is not None or self.disp_label is not None
#
# Private methods
#
def _initialize(self, capstone_operand):
arch_name = self.project.arch.name
self.type = CAPSTONE_OP_TYPE_MAP[arch_name][capstone_operand.type]
if self.type == OP_TYPE_IMM:
# Check if this is a reference to code
imm = capstone_operand.imm
self.is_coderef, self.is_dataref, baseaddr = self._imm_to_ptr(imm, self.type, self.mnemonic)
if self.is_coderef or self.is_dataref:
self.label = self.binary.symbol_manager.new_label(addr=baseaddr)
self.label_offset = imm - baseaddr
if self.mnemonic.startswith("j") or self.mnemonic.startswith("loop"):
sort = "jump"
elif self.mnemonic.startswith("call"):
sort = "call"
else:
sort = "absolute"
self.binary.register_instruction_reference(self.insn_addr, imm, sort, self.operand_offset)
elif self.type == OP_TYPE_MEM:
self.base = capstone_operand.mem.base
self.index = capstone_operand.mem.index
self.scale = capstone_operand.mem.scale
self.disp = capstone_operand.mem.disp
if self.binary.project.arch.name == "AMD64" and CAPSTONE_REG_MAP["AMD64"][self.base] == "rip":
# rip-relative addressing
self.disp += self.insn_addr + self.insn_size
self.disp_is_coderef, self.disp_is_dataref, baseaddr = self._imm_to_ptr(self.disp, self.type, self.mnemonic)
if self.disp_is_coderef or self.disp_is_dataref:
self.disp_label = self.binary.symbol_manager.new_label(addr=baseaddr)
self.disp_label_offset = self.disp - baseaddr
self.binary.register_instruction_reference(self.insn_addr, self.disp, "absolute", self.operand_offset)
def _imm_to_ptr(self, imm, operand_type, mnemonic): # pylint:disable=no-self-use,unused-argument
"""
Try to classify an immediate as a pointer.
:param int imm: The immediate to test.
:param int operand_type: Operand type of this operand, can either be IMM or MEM.
:param str mnemonic: Mnemonic of the instruction that this operand belongs to.
:return: A tuple of (is code reference, is data reference, base address, offset)
:rtype: tuple
"""
is_coderef, is_dataref = False, False
baseaddr = None
if not is_coderef and not is_dataref:
if self.binary.main_executable_regions_contain(imm):
# does it point to the beginning of an instruction?
if imm in self.binary.all_insn_addrs:
is_coderef = True
baseaddr = imm
if not is_coderef and not is_dataref:
if self.binary.main_nonexecutable_regions_contain(imm):
is_dataref = True
baseaddr = imm
if not is_coderef and not is_dataref:
tolerance_before = 1024 if operand_type == OP_TYPE_MEM else 64
contains_, baseaddr_ = self.binary.main_nonexecutable_region_limbos_contain(
imm, tolerance_before=tolerance_before, tolerance_after=1024
)
if contains_:
is_dataref = True
baseaddr = baseaddr_
if not contains_:
contains_, baseaddr_ = self.binary.main_executable_region_limbos_contain(imm)
if contains_:
is_coderef = True
baseaddr = baseaddr_
return (is_coderef, is_dataref, baseaddr)
[docs]class Instruction:
"""
High-level representation of an instruction in the binary
"""
[docs] def __init__(self, binary, addr, size, insn_bytes, capstone_instr):
"""
:param Reassembler binary: The Binary analysis
:param int addr: Address of the instruction
:param int size: Size of the instruction
:param str insn_bytes: Instruction bytes
:param capstone_instr: Capstone Instr object.
:return: None
"""
self.binary = binary
self.project = binary.project
self.addr = addr
self.size = size
self.bytes = insn_bytes
self.mnemonic = capstone_instr.mnemonic
self.op_str = capstone_instr.op_str
self.capstone_operand_types = [operand.type for operand in capstone_instr.operands]
self.operands = []
self.labels = []
operand_offsets = []
for operand in capstone_instr.operands:
if operand.type == capstone.CS_OP_IMM:
operand_offsets.append(capstone_instr.imm_offset)
elif operand.type == capstone.CS_OP_MEM:
operand_offsets.append(capstone_instr.disp_offset)
else:
operand_offsets.append(0)
if self.addr is not None:
self._initialize(capstone_instr.operands, operand_offsets)
#
# Overridden predefined instructions
#
def __str__(self):
"""
:return:
"""
assembly = self.assembly(comments=True, symbolized=False)
return assembly
#
# Public methods
#
[docs] def assign_labels(self):
if self.addr in self.binary.symbol_manager.addr_to_label:
labels = self.binary.symbol_manager.addr_to_label[self.addr]
for label in labels:
if label not in self.labels:
self.labels.append(label)
[docs] def assembly(self, comments=False, symbolized=True):
"""
:return:
"""
if comments:
dbg_comments = self.dbg_comments()
else:
dbg_comments = ""
labels = "\n".join([str(lbl) for lbl in self.labels])
inserted_asm_before_label = ""
if self.addr in self.binary.inserted_asm_before_label:
# put all assembly code there
if comments:
inserted_asm_before_label += "\t# Inserted assembly code (before label):\n"
inserted_asm_before_label = "\n".join(self.binary.inserted_asm_before_label[self.addr])
inserted_asm_before_label += "\n"
inserted_asm_after_label = ""
if self.addr in self.binary.inserted_asm_after_label:
# put all assembly code there
if comments:
inserted_asm_after_label += "\t# Inserted assembly code (after label):\n"
inserted_asm_after_label = "\n".join(self.binary.inserted_asm_after_label[self.addr])
inserted_asm_after_label += "\n"
not_symbolized = f"\t{self.mnemonic}\t{self.op_str}"
if not symbolized:
asm = not_symbolized
elif not any([(operand.symbolized or operand.type == OP_TYPE_RAW) for operand in self.operands]):
# No label is involved
asm = not_symbolized
elif not self.operands:
# There is no operand
asm = not_symbolized
else:
# Now it's the tricky part. capstone doesn't give us anyway to print individual operand. We gotta parse it
# by ourselves
# Remove the address
# capstone_str = capstone_str[capstone_str.find('\t') + 1 : ]
all_operands = [operand.operand_str for operand in self.operands]
mnemonic = self.mnemonic
for i, op in enumerate(self.operands):
op_asm = op.assembly()
if op_asm is not None:
if op.type in (OP_TYPE_IMM, OP_TYPE_MEM, OP_TYPE_RAW):
all_operands[i] = op_asm
else:
raise BinaryError("Unsupported operand type %d." % op.type)
if op.type != OP_TYPE_RAW and self.capstone_operand_types[i] == capstone.CS_OP_IMM:
if mnemonic.startswith("j") or mnemonic.startswith("call") or mnemonic.startswith("loop"):
pass
else:
# mark the size of the variable
if op.is_dataref:
op.label.var_size = op.size
if self.binary.syntax == "at&t":
all_operands[i] = "$" + all_operands[i]
else:
all_operands[i] = "OFFSET FLAT:" + all_operands[i]
asm = "\t{}{}".format(mnemonic, "\t" + ", ".join(all_operands))
if self.addr in self.binary._removed_instructions:
contents = [dbg_comments, inserted_asm_before_label, labels, inserted_asm_after_label]
else:
contents = [dbg_comments, inserted_asm_before_label, labels, inserted_asm_after_label, asm]
contents = [a for a in contents if a]
return "\n".join(contents)
#
# Private methods
#
def _initialize(self, capstone_operands, operand_offsets):
"""
Initialize this object
:return: None
"""
if self.addr is None:
raise InstructionError("self.addr must be specified")
self._initialize_operands(capstone_operands, operand_offsets)
def _initialize_operands(self, capstone_operands, operand_offsets):
"""
:return:
"""
all_operands = split_operands(self.op_str)
capstone_operands = capstone_operands[
-len(all_operands) :
] # sometimes there are more operands than expected...
operand_offsets = operand_offsets[-len(all_operands) :]
for operand, operand_str, offset in zip(capstone_operands, all_operands, operand_offsets):
self.operands.append(
Operand(self.binary, self.addr, self.size, operand, operand_str, self.mnemonic, offset)
)
[docs]class BasicBlock:
"""
BasicBlock represents a basic block in the binary.
"""
[docs] def __init__(self, binary, addr, size, x86_getpc_retsite: bool = False):
"""
Constructor.
:param Reassembler binary: The Binary analysis.
:param int addr: Address of the block
:param int size: Size of the block
:return: None
"""
self.binary = binary
self.project = binary.project
self.addr = addr
self.size = size
self.x86_getpc_retsite = x86_getpc_retsite
self.instructions = []
self._initialize()
#
# Overridden predefined methods
#
def __str__(self):
"""
Return a linear representation of all instructions in this block.
:return:
"""
return self.assembly(symbolized=False)
def __repr__(self):
return "<BasicBlock %#08x>" % self.addr
#
# Public methods
#
[docs] def assign_labels(self):
for ins in self.instructions:
ins.assign_labels()
[docs] def assembly(self, comments=False, symbolized=True):
s = "\n".join([ins.assembly(comments=comments, symbolized=symbolized) for ins in self.instructions])
return s
[docs] def instruction_addresses(self):
return sorted([(ins.addr, ins.size) for ins in self.instructions], key=lambda x: x[0])
#
# Private methods
#
def _initialize(self):
"""
:return:
"""
# re-lifting
block = self.project.factory.fresh_block(self.addr, self.size)
capstone_obj = block.capstone
# Fill in instructions
for idx, instr in enumerate(capstone_obj.insns):
# special handling for X86 PIE binaries
instruction = Instruction(self.binary, instr.address, instr.size, None, instr)
if self.x86_getpc_retsite and idx == 0:
if (
self.binary.syntax == "at&t"
and instr.mnemonic == "addl"
and instr.operands[1].type == capstone.CS_OP_REG
and instr.operands[0].type == capstone.CS_OP_IMM
):
instruction.operands[0].type = OP_TYPE_RAW
instruction.operands[0].raw_asm = "$_GLOBAL_OFFSET_TABLE_"
elif (
self.binary.syntax == "intel"
and instr.mnemonic == "add"
and instr.operands[0].type == capstone.CS_OP_REG
and instr.operands[1].type == capstone.CS_OP_IMM
):
instruction.operands[1].type == OP_TYPE_RAW
instruction.operands[1].raw_asm = "OFFSET FLAG:_GLOBAL_OFFSET_TABLE_"
self.instructions.append(instruction)
self.instructions = sorted(self.instructions, key=lambda x: x.addr)
[docs]class Procedure:
"""
Procedure in the binary.
"""
[docs] def __init__(self, binary, function=None, addr=None, size=None, name=None, section=".text", asm_code=None):
"""
Constructor.
:param Reassembler binary: The Binary analysis.
:param angr.knowledge.Function function: The function it represents
:param int addr: Address of the function. Not required if `function` is provided.
:param int size: Size of the function. Not required if `function` is provided.
:param str section: Which section this function comes from.
:return: None
"""
self.binary = binary
self.project = binary.project
if function is None:
self.addr = addr
self.size = size
self.function = None
self._name = name
else:
self.addr = function.addr
self.size = None # FIXME:
self.function = function
self._name = function.name
self.asm_code = asm_code
self.section = section
self.blocks = []
self._initialize()
#
# Attributes
#
@property
def name(self):
"""
Get function name from the labels of the very first block.
:return: Function name if there is any, None otherwise
:rtype: string
"""
if self._name is not None:
return self._name
if not self.blocks:
return None
if not self.blocks[0].instructions:
return None
if not self.blocks[0].instructions[0].labels:
return None
lbl = self.blocks[0].instructions[0].labels[0]
if isinstance(lbl, FunctionLabel):
return lbl.function_name
return None
@property
def is_plt(self):
"""
If this function is a PLT entry or not.
:return: True if this function is a PLT entry, False otherwise
:rtype: bool
"""
if self.section == ".plt":
return True
if not self.blocks:
return False
initial_block = next((b for b in self.blocks if b.addr == self.addr), None)
if initial_block is None:
return False
if not initial_block.instructions:
return False
if not initial_block.instructions[0].labels:
return False
lbl = initial_block.instructions[0].labels[0]
if isinstance(lbl, FunctionLabel):
return lbl.plt
return False
#
# Overridden predefined methods
#
def __str__(self):
"""
Output all instructions of the current procedure
:return:
"""
return self.assembly(symbolized=False)
#
# Public methods
#
[docs] def assign_labels(self):
for block in self.blocks:
block.assign_labels()
[docs] def assembly(self, comments=False, symbolized=True):
"""
Get the assembly manifest of the procedure.
:param comments:
:param symbolized:
:return: A list of tuples (address, basic block assembly), ordered by basic block addresses
:rtype: list
"""
assembly = []
header = "\t.section\t{section}\n\t.align\t{alignment}\n".format(
section=self.section, alignment=self.binary.section_alignment(self.section)
)
if self.addr is not None:
procedure_name = "%#x" % self.addr
else:
procedure_name = self._name
header += "\t#Procedure %s\n" % procedure_name
if self._output_function_label:
if self.addr:
function_label = self.binary.symbol_manager.new_label(self.addr)
else:
function_label = self.binary.symbol_manager.new_label(None, name=procedure_name, is_function=True)
header += str(function_label) + "\n"
assembly.append((self.addr, header))
if self.asm_code:
s = self.asm_code
assembly.append((self.addr, s))
elif self.blocks:
b: BasicBlock
for b in sorted(self.blocks, key=lambda x: x.addr):
s = b.assembly(comments=comments, symbolized=symbolized)
assembly.append((b.addr, s))
return assembly
[docs] def instruction_addresses(self):
"""
Get all instruction addresses in the binary.
:return: A list of sorted instruction addresses.
:rtype: list
"""
addrs = []
b: BasicBlock
for b in sorted(self.blocks, key=lambda x: x.addr):
addrs.extend(b.instruction_addresses())
return sorted(set(addrs), key=lambda x: x[0])
#
# Private methods
#
def _initialize(self):
if self.function is None:
if not self.asm_code:
raise BinaryError(
"Unsupported procedure type. You must either specify a angr.knowledge.Function "
"object, or specify assembly code."
)
else:
x86_getpc_retsites = set()
if self.project.arch.name == "X86":
if "pc_reg" in self.function.info:
# this is an x86-PIC function that calls a get_pc thunk
# we need to fix the "add e{a,b,c}x, offset" instruction right after the get_pc call
# first let's identify which function is the get_pc function
for src, dst, data in self.function.transition_graph.edges(data=True):
if isinstance(src, CodeNode) and isinstance(dst, Function):
if "get_pc" in dst.info:
# found it!
x86_getpc_retsites.add(src.addr + src.size)
for block_addr in self.function.block_addrs:
b = BasicBlock(
self.binary,
block_addr,
self.function._block_sizes[block_addr],
x86_getpc_retsite=block_addr in x86_getpc_retsites,
)
self.blocks.append(b)
self.blocks = sorted(self.blocks, key=lambda x: x.addr)
@property
def _output_function_label(self):
"""
Determines if we want to output the function label in assembly. We output the function label only when the
original instruction does not output the function label.
:return: True if we should output the function label, False otherwise.
:rtype: bool
"""
if self.asm_code:
return True
if not self.blocks:
return True
the_block = next((b for b in self.blocks if b.addr == self.addr), None)
if the_block is None:
return True
if not the_block.instructions:
return True
if not the_block.instructions[0].labels:
return True
return False
[docs]class ProcedureChunk(Procedure):
"""
Procedure chunk.
"""
[docs] def __init__(self, project, addr, size):
"""
Constructor.
:param project:
:param addr:
:param size:
:return:
"""
Procedure.__init__(self, project, addr=addr, size=size)
[docs]class Data:
[docs] def __init__(
self,
binary,
memory_data=None,
section=None,
section_name=None,
name=None,
size=None,
sort=None,
addr=None,
initial_content=None,
):
self.binary = binary
self.project = binary.project
self.memory_data = memory_data
self.section = section
self.section_name = section.name if section else section_name
self.addr = addr
self.name = name
self.size = size
self.sort = sort
self._initial_content = initial_content # only used by patcherex
self._content = None
self.labels = [] # a list of tuples like (address, label)
self.end_labels = [] # a list of labels only show up at the end of this memory data entry. mostly because the
# data block after this one is removed for some reason. only assigned by other methods.
self.null_terminated = None
self.skip = False
self._initialize()
def __repr__(self):
return "<DataItem %s@%#08x, %d bytes>" % (self.sort, self.addr, self.size)
@property
def content(self):
return self._content
@content.setter
def content(self, v):
self._content = v
[docs] def shrink(self, new_size):
"""
Reduce the size of this block
:param int new_size: The new size
:return: None
"""
self.size = new_size
if self.sort == MemoryDataSort.String:
self.null_terminated = False # string without the null byte terminator
self._content[0] = self._content[0][: self.size]
elif self.sort == MemoryDataSort.PointerArray:
pointer_size = self.binary.project.arch.bytes
if self.size % pointer_size != 0:
# it's not aligned?
raise BinaryError("Fails at Data.shrink()")
pointers = self.size // pointer_size
self._content = self._content[:pointers]
else:
# unknown
self._content = [self._content[0][: self.size]]
[docs] def desymbolize(self):
"""
We believe this was a pointer and symbolized it before. Now we want to desymbolize it.
The following actions are performed:
- Reload content from memory
- Mark the sort as 'unknown'
:return: None
"""
self.sort = MemoryDataSort.Unknown
content = self.binary.fast_memory_load(self.addr, self.size, bytes)
self.content = [content]
[docs] def assign_labels(self):
# TODO: What if it's not aligned for some sort of data, like pointer array?
if self.addr is None:
# this piece of data comes from a patch, not from the original binary
return
# Put labels to self.labels
for i in range(self.size):
addr = self.addr + i
if addr in self.binary.symbol_manager.addr_to_label:
labels = self.binary.symbol_manager.addr_to_label[addr]
for label in labels:
if self.sort == MemoryDataSort.PointerArray and addr % (self.project.arch.bytes) != 0:
# we need to modify the base address of the label
base_addr = addr - (addr % (self.project.arch.bytes))
label.base_addr = base_addr
tpl = (base_addr, label)
if tpl not in self.labels:
self.labels.append(tpl)
else:
tpl = (addr, label)
if tpl not in self.labels:
self.labels.append(tpl)
[docs] def assembly(self, comments=False, symbolized=True):
s = ""
if comments:
if self.addr is not None:
s += "\t# data @ %#08x\n" % self.addr
else:
s += "\t# data (%s)\n" % self.name
if self.skip:
return s
if self.sort == MemoryDataSort.String:
if symbolized:
ss = []
last_pos = 0
for i, tpl in enumerate(self.labels):
addr, lbl = tpl
# split the string
pos = addr - self.addr
# endpos = self.labels[i + 1][0] - self.addr + 1 if i < len(self.labels) - 1 else self.size
string_piece = self.content[0][last_pos:pos]
last_pos = pos
if i == len(self.labels) - 1 and pos == self.size:
directive = ".asciz" # null at the end
else:
directive = ".ascii"
if string_piece:
ss.append(
'\t{directive} "{str}"'.format(
str=string_escape(string_piece),
directive=directive,
)
)
ss.append("%s" % str(lbl))
if last_pos <= self.size - 1:
string_piece = self.content[0][last_pos:]
directive = ".ascii" if self.null_terminated is False else ".asciz"
ss.append(
'\t{directive} "{str}"'.format(
str=string_escape(string_piece),
directive=directive,
)
)
s += "\n".join(ss)
else:
if self.null_terminated is False:
directive = ".ascii"
else:
directive = ".asciz"
s += f'\t.{directive} "{string_escape(self.content[0])}"'
s += "\n"
elif self.sort == MemoryDataSort.PointerArray:
if self.binary.project.arch.bits == 32:
directive = ".long"
elif self.binary.project.arch.bits == 64:
directive = ".quad"
else:
raise BinaryError("Unsupported pointer size %d", self.binary.project.arch.bits)
if symbolized:
addr_to_labels = {}
for k, v in self.labels:
if k not in addr_to_labels:
addr_to_labels[k] = []
addr_to_labels[k].append(v)
i = 0
if self.name is not None:
s += "%s:\n" % self.name
for symbolized_label in self.content:
if self.addr is not None and (self.addr + i) in addr_to_labels:
for label in addr_to_labels[self.addr + i]:
s += "%s\n" % str(label)
elif self.addr is not None and (self.addr + i) in self.binary.symbol_manager.addr_to_label:
labels = self.binary.symbol_manager.addr_to_label[self.addr + i]
for label in labels:
s += "%s\n" % str(label)
i += self.project.arch.bytes
if isinstance(symbolized_label, int):
s += "\t%s %d\n" % (directive, symbolized_label)
else:
s += f"\t{directive} {symbolized_label.operand_str}\n"
else:
for label in self.content:
s += f"\t{directive} {label.operand_str}\n"
elif self.sort == MemoryDataSort.SegmentBoundary:
if symbolized:
for _, label in self.labels:
s += "\t%s\n" % str(label)
elif self.sort == MemoryDataSort.Integer:
# display it as bytes only when there are references pointing to the middle
content = []
if self.size == 1:
directive = ".byte"
fmt_str = "B"
elif self.size == 2:
directive = ".short"
fmt_str = "<H"
elif self.size == 4:
directive = ".long"
fmt_str = "<I"
elif self.size == 8:
directive = ".quad"
fmt_str = "<Q"
else:
# we'll have to display it as a bunch of bytes
directive = None
fmt_str = None
if symbolized:
addr_to_labels = {}
for k, v in self.labels:
if k not in addr_to_labels:
addr_to_labels[k] = []
addr_to_labels[k].append(v)
show_integer = False
if len(addr_to_labels) == 0:
show_integer = True
elif len(addr_to_labels) == 1:
if self.addr is not None and next(iter(addr_to_labels.keys())) == self.addr:
show_integer = True
elif self.addr is None and next(iter(addr_to_labels.keys())) == 0:
show_integer = True
if directive is not None and show_integer:
# nice, we should display it as an integer
if addr_to_labels:
for label in next(iter(addr_to_labels.values())):
content += ["%s" % str(label)]
integer = struct.unpack(fmt_str, self.content[0])[0]
content += [
"\t{directive} {integer}".format(
directive=directive,
integer="%#x" % integer,
)
]
else:
# display it as bytes...
addr = self.addr if self.addr is not None else 0
for piece in self.content:
for c in piece:
if addr in addr_to_labels:
for label in addr_to_labels[addr]:
content += ["%s" % str(label)]
addr += 1
content += ["\t.byte %d" % c]
else:
integer = struct.unpack(fmt_str, self.content[0])[0]
content += [
"\t{directive} {integer}".format(
directive=directive,
integer="%#x" % integer,
)
]
s += "\n".join(content)
s += "\n"
elif self.sort == MemoryDataSort.FloatingPoint:
# we have to display it as bytes...
# TODO: switch to "ten byes" whenever time permits
content = []
if symbolized:
addr_to_labels = {}
for k, v in self.labels:
if k not in addr_to_labels:
addr_to_labels[k] = []
addr_to_labels[k].append(v)
addr = self.addr if self.addr is not None else 0
for piece in self.content:
for c in piece:
if addr in addr_to_labels:
for label in addr_to_labels[addr]:
content += ["%s" % str(label)]
addr += 1
content += ["\t.byte %d" % c]
else:
for piece in self.content:
content += ["\t.byte %d" % c for c in piece]
s += "\n".join(content)
s += "\n"
else:
content = []
if symbolized:
addr_to_labels = {}
for k, v in self.labels:
if k not in addr_to_labels:
addr_to_labels[k] = []
addr_to_labels[k].append(v)
addr = self.addr if self.addr is not None else 0
for piece in self.content:
for c in piece:
if addr in addr_to_labels:
for label in addr_to_labels[addr]:
content += ["%s" % str(label)]
addr += 1
content += ["\t.byte %d" % c]
else:
for piece in self.content:
content += ["\t.byte %d" % c for c in piece]
s += "\n".join(content)
s += "\n"
if self.end_labels:
for label in self.end_labels:
s += "%s\n" % label
return s.strip("\n")
#
# Private methods
#
def _initialize(self):
if self.memory_data is None:
if self.size is None or self._initial_content is None and self.sort is None:
raise BinaryError("You must at least specify size, initial_content, and sort.")
if self.sort == MemoryDataSort.PointerArray:
lbl = DataLabel(self.binary, -1, name=self.name)
self.labels.append((0, lbl))
# symbolize the pointer array
self._content = []
fmt_str = ""
if self.project.arch.memory_endness == "Iend_LE":
fmt_str += "<"
else:
fmt_str += ">"
if self.project.arch.bits == 32:
fmt_str += "I"
pointer_size = 4
else:
fmt_str += "Q"
pointer_size = 8
for i in range(0, len(self._initial_content), pointer_size):
addr_str = self._initial_content[i : i + pointer_size]
addr = struct.unpack(fmt_str, addr_str)[0]
if addr != 0 and (
self.binary.main_executable_regions_contain(addr)
or self.binary.main_nonexecutable_regions_contain(addr)
):
label = self.binary.symbol_manager.new_label(addr)
else:
# it might be a pointer pointing to the binary base address or something
# just keep it as it is
# TODO: some more delicate logic should be applied here. For example, if the pointer is very
# TODO: close to the beginning of .text, but after reassembling, it might be pointing to
# TODO: somewhere inside .text. In this case we'd like to fix up the reference and make it
# TODO: point to the beginning of .text minus an offset, instead of keeping the original header.
label = addr
self._content.append(label)
elif self.sort in {MemoryDataSort.String, MemoryDataSort.Unknown, MemoryDataSort.Integer}:
lbl = DataLabel(self.binary, -1, name=self.name)
self.labels.append((0, lbl))
self._content = [self._initial_content]
elif self.sort == MemoryDataSort.SegmentBoundary:
label = self.binary.symbol_manager.new_label(self.addr)
self.labels.append((self.addr, label))
self._content = []
else:
raise BinaryError('Unsupported data sort "%s"' % self.sort)
else:
self.addr = self.memory_data.address
self.size = self.memory_data.size
self.sort = self.memory_data.sort
# Symbolize the content
if self.sort == MemoryDataSort.PointerArray:
# read out the address
pointer_size = self.project.arch.bytes
pointers = self.size // pointer_size
self._content = []
for i in range(pointers):
addr = self.binary.fast_memory_load(
self.addr + i * pointer_size, pointer_size, int, endness=self.project.arch.memory_endness
)
if addr is None:
continue
obj = self.project.loader.find_object_containing(addr)
if obj is self.project.loader.main_object:
# a dynamic pointer
if self.binary.main_executable_regions_contain(
addr
) or self.binary.main_nonexecutable_regions_contain(addr):
label = self.binary.symbol_manager.new_label(addr)
self._content.append(label)
self.binary.register_data_reference(self.addr + i * pointer_size, addr)
else:
# it's a pointer pointing to a segment, but not any section. keep it as it is
self._content.append(addr)
else:
# it's a static pointer. we should use the original pointer value.
self._content.append(addr)
elif self.sort == MemoryDataSort.String:
data = self.binary.fast_memory_load(self.addr, self.size, bytes)
if data[-1] == 0:
self.null_terminated = True
data = data[:-1] # remove the null-byte. we'll use .asciz for it instead.
else:
self.null_terminated = False
self._content = [data]
elif self.sort == MemoryDataSort.Integer:
data = self.binary.fast_memory_load(self.addr, self.size, bytes)
self._content = [data]
elif self.sort == MemoryDataSort.SegmentBoundary:
label = self.binary.symbol_manager.new_label(self.addr)
self.labels.append((self.addr, label))
self._content = []
elif self.sort == MemoryDataSort.FloatingPoint:
# floating-point integers
# Python has some trouble in dealing with floating point numbers
# just store them as bytes
data = self.binary.fast_memory_load(self.addr, self.size, bytes)
self._content = [data]
else:
# other sorts
content = self.binary.fast_memory_load(self.addr, self.size, bytes)
if content is not None:
self._content = [content]
else:
self._content = []
[docs]class Relocation:
[docs] def __init__(self, addr, ref_addr, sort):
self.addr = addr
self.ref_addr = ref_addr
self.sort = sort
def __repr__(self):
s = f"<Reloc {self.sort} {self.addr:#x} ({self.ref_addr:#x})>"
return s
[docs]class Reassembler(Analysis):
"""
High-level representation of a binary with a linear representation of all instructions and data regions. After
calling "symbolize", it essentially acts as a binary reassembler.
Tested on CGC, x86 and x86-64 binaries.
Discliamer: The reassembler is an empirical solution. Don't be surprised if it does not work on some binaries.
"""
[docs] def __init__(self, syntax="intel", remove_cgc_attachments=True, log_relocations=True):
self.syntax = syntax
self._remove_cgc_attachments = remove_cgc_attachments
self.symbol_manager = None
self.cfg = None
self._cgc_attachments_removed = False
self.log_relocations = log_relocations
self.procedures = []
self.data = []
self.extra_rodata = []
self.extra_data = []
self._main_executable_regions = None
self._main_nonexecutable_regions = None
self._symbolization_needed = True
# section names to alignments
self._section_alignments = {}
# all instruction addresses
self.all_insn_addrs = set()
self._relocations = []
self._inserted_asm_before_label = defaultdict(list)
self._inserted_asm_after_label = defaultdict(list)
self._removed_instructions = set()
self._extra_memory_regions = [(0x4347C000, 0x4347C000 + 0x1000)]
self._initialize()
#
# Overridden predefined methods
#
def __str__(self):
"""
Return a linear representation of all instructions in the binary
:return:
"""
s = "\n".join([str(proc) for proc in self.procedures])
return s
#
# Properties
#
@property
def instructions(self):
"""
Get a list of all instructions in the binary
:return: A list of (address, instruction)
:rtype: tuple
"""
raise NotImplementedError()
@property
def relocations(self):
return self._relocations
@property
def inserted_asm_before_label(self):
return self._inserted_asm_before_label
@property
def inserted_asm_after_label(self):
return self._inserted_asm_after_label
@property
def main_executable_regions(self):
"""
:return:
"""
if self._main_executable_regions is None:
self._main_executable_regions = []
obj = self.project.loader.main_object
if obj.sections:
for sec in obj.sections:
if sec.is_executable:
min_addr = sec.min_addr
max_addr = sec.max_addr + 1
if max_addr <= min_addr or min_addr == 0:
continue
self._main_executable_regions.append((min_addr, max_addr))
else:
for seg in obj.segments:
if seg.is_executable:
min_addr = seg.min_addr
max_addr = seg.max_addr + 1
self._main_executable_regions.append((min_addr, max_addr))
return self._main_executable_regions
@property
def main_nonexecutable_regions(self):
"""
:return:
"""
if self._main_nonexecutable_regions is None:
self._main_nonexecutable_regions = []
obj = self.project.loader.main_object
if obj.sections:
for sec in obj.sections:
if sec.name in {".eh_frame", ".eh_frame_hdr"}:
# hack for ELF binaries...
continue
if not sec.is_executable:
min_addr = sec.min_addr
max_addr = sec.max_addr + 1
if max_addr <= min_addr or min_addr == 0:
continue
self._main_nonexecutable_regions.append((min_addr, max_addr))
else:
for seg in obj.segments:
if not seg.is_executable:
min_addr = seg.min_addr
max_addr = seg.max_addr + 1
self._main_nonexecutable_regions.append((min_addr, max_addr))
return self._main_nonexecutable_regions
#
# Public methods
#
[docs] def section_alignment(self, section_name):
"""
Get the alignment for the specific section. If the section is not found, 16 is used as default.
:param str section_name: The section.
:return: The alignment in bytes.
:rtype: int
"""
return self._section_alignments.get(section_name, 16)
[docs] def main_executable_regions_contain(self, addr):
"""
:param addr:
:return:
"""
for start, end in self.main_executable_regions:
if start <= addr < end:
return True
return False
[docs] def main_executable_region_limbos_contain(self, addr):
"""
Sometimes there exists a pointer that points to a few bytes before the beginning of a section, or a few bytes
after the beginning of the section. We take care of that here.
:param int addr: The address to check.
:return: A 2-tuple of (bool, the closest base address)
:rtype: tuple
"""
TOLERANCE = 64
closest_region = None
least_limbo = None
for start, end in self.main_executable_regions:
if start - TOLERANCE <= addr < start:
if least_limbo is None or start - addr < least_limbo:
closest_region = (True, start)
least_limbo = start - addr
if end <= addr < end + TOLERANCE:
if least_limbo is None or addr - end < least_limbo:
closest_region = (True, end)
least_limbo = addr - end
if closest_region is not None:
return closest_region
return (False, None)
[docs] def main_nonexecutable_regions_contain(self, addr):
"""
:param int addr: The address to check.
:return: True if the address is inside a non-executable region, False otherwise.
:rtype: bool
"""
for start, end in self.main_nonexecutable_regions:
if start <= addr < end:
return True
return False
[docs] def main_nonexecutable_region_limbos_contain(self, addr, tolerance_before=64, tolerance_after=64):
"""
Sometimes there exists a pointer that points to a few bytes before the beginning of a section, or a few bytes
after the beginning of the section. We take care of that here.
:param int addr: The address to check.
:return: A 2-tuple of (bool, the closest base address)
:rtype: tuple
"""
closest_region = None
least_limbo = None
for start, end in self.main_nonexecutable_regions:
if start - tolerance_before <= addr < start:
if least_limbo is None or start - addr < least_limbo:
closest_region = (True, start)
least_limbo = start - addr
if end <= addr < end + tolerance_after:
if least_limbo is None or addr - end < least_limbo:
closest_region = (True, end)
least_limbo = addr - end
if closest_region is not None:
return closest_region
return False, None
[docs] def register_instruction_reference(self, insn_addr, ref_addr, sort, operand_offset):
if not self.log_relocations:
return
addr = insn_addr + operand_offset
r = Relocation(addr, ref_addr, sort)
self._relocations.append(r)
[docs] def register_data_reference(self, data_addr, ref_addr):
if not self.log_relocations:
return
r = Relocation(data_addr, ref_addr, "absolute")
self._relocations.append(r)
[docs] def add_label(self, name, addr):
"""
Add a new label to the symbol manager.
:param str name: Name of the label.
:param int addr: Address of the label.
:return: None
"""
# set the label
self._symbolization_needed = True
self.symbol_manager.new_label(addr, name=name, force=True)
[docs] def insert_asm(self, addr, asm_code, before_label=False):
"""
Insert some assembly code at the specific address. There must be an instruction starting at that address.
:param int addr: Address of insertion
:param str asm_code: The assembly code to insert
:return: None
"""
if before_label:
self._inserted_asm_before_label[addr].append(asm_code)
else:
self._inserted_asm_after_label[addr].append(asm_code)
[docs] def append_procedure(self, name, asm_code):
"""
Add a new procedure with specific name and assembly code.
:param str name: The name of the new procedure.
:param str asm_code: The assembly code of the procedure
:return: None
"""
proc = Procedure(self, name=name, asm_code=asm_code)
self.procedures.append(proc)
[docs] def append_data(
self, name, initial_content, size, readonly=False, sort="unknown"
): # pylint:disable=unused-argument
"""
Append a new data entry into the binary with specific name, content, and size.
:param str name: Name of the data entry. Will be used as the label.
:param bytes initial_content: The initial content of the data entry.
:param int size: Size of the data entry.
:param bool readonly: If the data entry belongs to the readonly region.
:param str sort: Type of the data.
:return: None
"""
if readonly:
section_name = ".rodata"
else:
section_name = ".data"
if initial_content is None:
initial_content = b""
initial_content = initial_content.ljust(size, b"\x00")
data = Data(
self,
memory_data=None,
section_name=section_name,
name=name,
initial_content=initial_content,
size=size,
sort=sort,
)
if section_name == ".rodata":
self.extra_rodata.append(data)
else:
self.extra_data.append(data)
[docs] def remove_instruction(self, ins_addr):
"""
:param ins_addr:
:return:
"""
self._removed_instructions.add(ins_addr)
[docs] def randomize_procedures(self):
"""
:return:
"""
raise NotImplementedError()
[docs] def symbolize(self):
# clear the flag
self._symbolization_needed = False
# sanity checks
# if self._has_integer_used_as_pointers():
# raise ReassemblerFailureNotice('Integer-used-as-pointer detected. Reassembler will not work safely on '
# 'this binary. Ping Fish if you believe the detection is wrong.'
# )
for proc in self.procedures:
proc.assign_labels()
for data in self.data:
data.assign_labels()
# Get all instruction addresses, and modify those labels pointing to the middle of an instruction
insn_addrs = []
proc: Procedure
for proc in self.procedures:
insn_addrs.extend(proc.instruction_addresses())
# just to be safe
insn_addrs = sorted(set(insn_addrs), key=lambda x: x[0])
pos = 0
changed_labels = []
for label_addr in sorted(self.symbol_manager.addr_to_label.keys()):
while pos < len(insn_addrs) and label_addr > insn_addrs[pos][0]:
pos += 1
if pos >= len(insn_addrs):
break
if pos == 0:
continue
insn_addr, insn_size = insn_addrs[pos - 1]
if insn_addr < label_addr < insn_addr + insn_size:
# this label should be converted to something like 0x8000040+1
labels = self.symbol_manager.addr_to_label[label_addr]
for label in labels:
label.base_addr = insn_addrs[pos][0]
changed_labels.append(label)
for label in changed_labels:
self.symbol_manager.addr_to_label[label.original_addr].remove(label)
if not self.symbol_manager.addr_to_label[label.original_addr]:
del self.symbol_manager.addr_to_label[label.original_addr]
self.symbol_manager.addr_to_label[label.base_addr].append(label)
if changed_labels:
for proc in self.procedures:
proc.assign_labels()
[docs] def assembly(self, comments=False, symbolized=True):
if symbolized and self._symbolization_needed:
self.symbolize()
if self._remove_cgc_attachments:
self._cgc_attachments_removed = self.remove_cgc_attachments()
s = ""
if self.syntax == "intel":
s += "\t.intel_syntax noprefix\n"
all_assembly_lines = []
addr_and_assembly = []
for proc in self.procedures:
addr_and_assembly.extend(proc.assembly(comments=comments, symbolized=symbolized))
# sort it by the address - must be a stable sort!
addr_and_assembly = sorted(addr_and_assembly, key=lambda x: x[0] if x[0] is not None else -1)
all_assembly_lines.extend(line for _, line in addr_and_assembly)
last_section = None
if self._cgc_attachments_removed:
all_data = self.data + self.extra_rodata + self.extra_data
else:
# to reduce memory usage, we put extra data in front of the original data in binary
all_data = self.extra_data + self.data + self.extra_rodata
for data in all_data:
if last_section is None or data.section_name != last_section:
last_section = data.section_name
all_assembly_lines.append(
"\t.section {section}\n\t.align {alignment}".format(
section=(last_section if last_section != ".init_array" else ".data"),
alignment=self.section_alignment(last_section),
)
)
all_assembly_lines.append(data.assembly(comments=comments, symbolized=symbolized))
s = "\n".join(all_assembly_lines)
return s
[docs] def remove_cgc_attachments(self):
"""
Remove CGC attachments.
:return: True if CGC attachments are found and removed, False otherwise
:rtype: bool
"""
cgc_package_list = None
cgc_extended_application = None
for data in self.data:
if data.sort == "cgc-package-list":
cgc_package_list = data
elif data.sort == "cgc-extended-application":
cgc_extended_application = data
if not cgc_package_list or not cgc_extended_application:
return False
if cgc_package_list.skip or cgc_extended_application.skip:
# they have already been removed
# so we still return True to indicate that CGC attachments have been removed
return True
# there is a single function referencing them
cgcpl_memory_data = self.cfg.memory_data.get(cgc_package_list.addr, None)
cgcea_memory_data = self.cfg.memory_data.get(cgc_extended_application.addr, None)
refs = self.cfg.kb.xrefs
if cgcpl_memory_data is None or cgcea_memory_data is None:
return False
if len(refs.get_xrefs_by_dst(cgcpl_memory_data.addr)) != 1:
return False
if len(refs.get_xrefs_by_dst(cgcea_memory_data.addr)) != 1:
return False
# check if the irsb addresses are the same
if (
next(iter(refs.get_xrefs_by_dst(cgcpl_memory_data.addr))).block_addr
!= next(iter(refs.get_xrefs_by_dst(cgcea_memory_data.addr))).block_addr
):
return False
insn_addr = next(iter(refs.get_xrefs_by_dst(cgcpl_memory_data.addr))).ins_addr
# get the basic block
cfg_node = self.cfg.model.get_any_node(insn_addr, anyaddr=True)
if not cfg_node:
return False
func_addr = cfg_node.function_address
# this function should be calling another function
sub_func_addr = None
if func_addr not in self.cfg.functions:
return False
function = self.cfg.functions[func_addr]
# traverse the graph and make sure there is only one call edge
calling_targets = []
for _, dst, data in function.transition_graph.edges(data=True):
if "type" in data and data["type"] == "call":
calling_targets.append(dst.addr)
if len(calling_targets) != 1:
return False
sub_func_addr = calling_targets[0]
# alright. We want to nop this function, as well as the subfunction
proc = next((p for p in self.procedures if p.addr == func_addr), None)
if proc is None:
return False
subproc = next((p for p in self.procedures if p.addr == sub_func_addr), None)
if subproc is None:
return False
# if those two data entries have any label, we should properly modify them
# at this point, we are fairly confident that none of those labels are direct data references to either package
# list or extended application
has_label = True
lowest_address = min(cgc_package_list.addr, cgc_extended_application.addr)
for obj in (cgc_package_list, cgc_extended_application):
labels = obj.labels
for addr, label in labels:
if addr != lowest_address:
label.base_addr = lowest_address
if has_label:
# is there any memory data entry that ends right at the lowest address?
data = next((d for d in self.data if d.addr is not None and d.addr + d.size == lowest_address), None)
if data is None:
# since there is no gap between memory data entries (we guarantee that), this can only be that no other
# data resides in the same memory region that CGC attachments are in
pass
else:
lbl = self.symbol_manager.addr_to_label[lowest_address][0]
if lbl not in data.end_labels:
data.end_labels.append(lbl)
# practically nop the function
proc.asm_code = "\tret\n"
subproc.asm_code = "\tret\n"
# remove those two data entries
cgc_package_list.skip = True
cgc_extended_application.skip = True
l.info("CGC attachments are removed.")
return True
[docs] def remove_unnecessary_stuff(self):
"""
Remove unnecessary functions and data
:return: None
"""
# determine if the binary is compiled against glibc
is_glibc = False
for dep in self.project.loader.main_object.deps:
if dep.lower() in {"libc.so.6", "libc.so"}:
is_glibc = True
break
if is_glibc:
self.remove_unnecessary_stuff_glibc()
[docs] def remove_unnecessary_stuff_glibc(self):
glibc_functions_blacklist = {
"_start",
"init",
"_init",
"fini",
"_fini",
"__gmon_start__",
"__do_global_dtors_aux",
"frame_dummy",
"atexit",
"deregister_tm_clones",
"register_tm_clones",
"__x86.get_pc_thunk.bx",
"__libc_csu_init",
"__libc_csu_fini",
}
glibc_data_blacklist = {
"__TMC_END__",
"_GLOBAL_OFFSET_TABLE_",
"__JCR_END__",
"__dso_handle",
"__init_array_start",
"__init_array_end",
#
"stdout",
"stderr",
"stdin",
"program_invocation_short_",
"program_invocation_short_name",
"program_invocation_name",
"__progname_full",
"_IO_stdin_used",
"obstack_alloc_failed_hand",
"optind",
"optarg",
"__progname",
"_environ",
"environ",
"__environ",
}
glibc_references_blacklist = {
"frame_dummy",
"__do_global_dtors_aux",
}
self.procedures = [p for p in self.procedures if p.name not in glibc_functions_blacklist and not p.is_plt]
# special handling for _init_proc
try:
init_func = self.cfg.functions["init"]
callees = [
node
for node in init_func.transition_graph.nodes()
if isinstance(node, Function) and node.addr != self.cfg._unresolvable_call_target_addr
]
# special handling for GCC-generated X86 PIE binaries
non_getpc_callees = [callee for callee in callees if "get_pc" not in callee.info]
if len(non_getpc_callees) == 1:
# we found the _init_proc
_init_proc = non_getpc_callees[0]
self.procedures = [p for p in self.procedures if p.addr != _init_proc.addr]
except KeyError:
pass
self.data = [d for d in self.data if not any(lbl.name in glibc_data_blacklist for _, lbl in d.labels)]
for d in self.data:
if d.sort == MemoryDataSort.PointerArray:
for i in range(len(d.content)):
ptr = d.content[i]
if isinstance(ptr, Label) and ptr.name in glibc_references_blacklist:
d.content[i] = 0
elif d.sort == MemoryDataSort.SegmentBoundary:
if d.labels:
new_labels = []
for rebased_addr, label in d.labels:
# check if this label belongs to a removed function
if (
self.cfg.functions.contains_addr(rebased_addr)
and self.cfg.functions[rebased_addr].name in glibc_functions_blacklist
):
# we need to remove this label...
continue
else:
new_labels.append((rebased_addr, label))
d.labels = new_labels
#
# Private methods
#
def _initialize(self):
"""
Initialize the binary.
:return: None
"""
# figure out section alignments
for section in self.project.loader.main_object.sections:
in_segment = False
for segment in self.project.loader.main_object.segments:
segment_addr = segment.vaddr
if segment_addr <= section.vaddr < segment_addr + segment.memsize:
in_segment = True
break
if not in_segment:
continue
# calculate alignments
if section.vaddr % 0x20 == 0:
alignment = 0x20
elif section.vaddr % 0x10 == 0:
alignment = 0x10
elif section.vaddr % 0x8 == 0:
alignment = 0x8
elif section.vaddr % 0x4 == 0:
alignment = 0x4
else:
alignment = 2
self._section_alignments[section.name] = alignment
l.debug("Generating CFG...")
cfg = self.project.analyses[CFGFast].prep()(
normalize=True,
resolve_indirect_jumps=True,
data_references=True,
extra_memory_regions=self._extra_memory_regions,
data_type_guessing_handlers=[
self._sequence_handler,
self._cgc_extended_application_handler,
self._unknown_data_size_handler,
],
)
self.cfg = cfg
old_capstone_syntax = self.project.arch.capstone_x86_syntax
if old_capstone_syntax is None:
old_capstone_syntax = "intel"
if self.syntax == "at&t":
# switch capstone to AT&T style
self.project.arch.capstone_x86_syntax = "at&t"
# clear the block cache in lifter!
self.project.factory.default_engine.clear_cache()
# initialize symbol manager
self.symbol_manager = SymbolManager(self, cfg)
# collect address of all instructions
l.debug("Collecting instruction addresses...")
for cfg_node in self.cfg.nodes():
self.all_insn_addrs |= set(cfg_node.instruction_addrs)
# Functions
l.debug("Creating functions...")
for f in cfg.kb.functions.values():
# Skip all SimProcedures
if self.project.is_hooked(f.addr):
continue
elif self.project.simos.is_syscall_addr(f.addr):
continue
# Check which section the start address belongs to
section = next(
iter(
sec.name
for sec in self.project.loader.main_object.sections
if f.addr >= sec.vaddr and f.addr < sec.vaddr + sec.memsize
),
".text",
)
if section in {".got", ".plt", "init", "fini", ".init", ".fini"}:
continue
procedure = Procedure(self, function=f, section=section)
self.procedures.append(procedure)
self.procedures = sorted(self.procedures, key=lambda x: x.addr)
# Data
has_sections = len(self.project.loader.main_object.sections) > 0
l.debug("Creating data entries...")
for addr, memory_data in cfg._memory_data.items():
if memory_data.sort in ("code reference",):
continue
if memory_data.sort == "string":
# it might be the CGC package list
new_sort, new_size = self._cgc_package_list_identifier(memory_data.address, memory_data.size)
if new_sort is not None:
# oh we got it!
memory_data = memory_data.copy()
memory_data.sort = new_sort
if has_sections:
# Check which section the start address belongs to
section = next(
iter(
sec
for sec in self.project.loader.main_object.sections
if sec.vaddr <= addr < sec.vaddr + sec.memsize
),
None,
)
if section is not None and section.name not in (".note.gnu.build-id",): # ignore certain section names
data = Data(self, memory_data, section=section)
self.data.append(data)
elif memory_data.sort == "segment-boundary":
# it just points to the end of the segment or a section
section = next(
iter(
sec for sec in self.project.loader.main_object.sections if addr == sec.vaddr + sec.memsize
),
None,
)
if section is not None:
data = Data(self, memory_data, section=section)
self.data.append(data)
else:
# data = Data(self, memory_data, section_name='.data')
# the data is not really within any existing section. weird. ignored it.
pass
else:
# the binary does not have any section
# we use segment information instead
# TODO: this logic needs reviewing
segment = next(
iter(
seg
for seg in self.project.loader.main_object.segments
if seg.vaddr <= addr <= seg.vaddr + seg.memsize
),
None,
)
if segment is not None:
data = Data(self, memory_data, section_name=".data")
self.data.append(data)
# remove all data that belong to GCC-specific sections
section_names_to_ignore = {
".init",
".fini",
".fini_array",
".jcr",
".dynamic",
".got",
".got.plt",
".eh_frame_hdr",
".eh_frame",
".rel.dyn",
".rel.plt",
".rela.dyn",
".rela.plt",
".dynstr",
".dynsym",
".interp",
".note.ABI-tag",
".note.gnu.build-id",
".gnu.hash",
".gnu.version",
".gnu.version_r",
}
# make sure there are always memory data entries pointing at the end of sections
all_data_addrs = {d.addr for d in self.data}
all_procedure_addrs = {f.addr for f in self.procedures}
all_addrs = all_data_addrs | all_procedure_addrs
if has_sections:
for section in self.project.loader.main_object.sections:
if section.name in section_names_to_ignore:
# skip all sections that are CGC specific
continue
# make sure this section is not empty
if section.memsize == 0:
continue
# make sure this section is inside a segment
for segment in self.project.loader.main_object.segments:
segment_start = segment.vaddr
segment_end = segment_start + segment.memsize
if segment_start <= section.vaddr < segment_end:
break
else:
# this section is not mapped into memory
continue
section_boundary_addr = section.vaddr + section.memsize
if section_boundary_addr not in all_addrs:
data = Data(
self, addr=section_boundary_addr, size=0, sort="segment-boundary", section_name=section.name
)
self.data.append(data)
# add the address to all_data_addrs so we don't end up adding another boundary in
all_data_addrs.add(section_boundary_addr)
self.data = sorted(self.data, key=lambda x: x.addr)
data_indices_to_remove = set()
# Go through data entry list and refine them
for i, data in enumerate(self.data):
if i in data_indices_to_remove:
continue
# process the overlapping ones
if i < len(self.data) - 1:
if data.addr + data.size > self.data[i + 1].addr:
# they are overlapping :-(
# TODO: make sure new_size makes sense
new_size = self.data[i + 1].addr - data.addr
# there are cases that legit data is misclassified as pointers
# we are able to detect some of them here
if data.sort == "pointer-array":
pointer_size = self.project.arch.bytes
if new_size % pointer_size != 0:
# the self.data[i+1] cannot be pointed to by a pointer
# remove that guy later
data_indices_to_remove.add(i + 1)
# mark the source as a non-pointer
# apparently the original Reassembleable Disassembler paper cannot get this case
source_addr = self.data[i + 1].memory_data.pointer_addr
if source_addr is not None:
# find the original data
original_data = next(
(d for d in self.data if d.addr <= source_addr < d.addr + d.size), None
)
if original_data is not None:
original_data.desymbolize()
continue
data.shrink(new_size)
# process those ones whose type is unknown
if data.sort == "unknown" and data.size == 0:
# increase its size until reaching the next item
if i + 1 == len(self.data):
if data.section is None:
continue
data.size = data.section.vaddr + data.section.memsize - data.addr
else:
data.size = self.data[i + 1].addr - data.addr
for i in sorted(data_indices_to_remove, reverse=True):
self.data = self.data[:i] + self.data[i + 1 :]
# CGC-specific data filtering
self.data = [d for d in self.data if d.section_name not in section_names_to_ignore]
# restore capstone X86 syntax at the end
if self.project.arch.capstone_x86_syntax != old_capstone_syntax:
self.project.arch.capstone_x86_syntax = old_capstone_syntax
self.project.factory.default_engine.clear_cache()
l.debug("Initialized.")
def _is_sequence(self, cfg, addr, size):
data = self.fast_memory_load(addr, size, bytes)
if data is None:
return False
ints = [i for i in data]
if len({(i - j) for i, j in zip(ints, ints[1:])}) == 1:
# arithmetic progression
# backoff: it should not be ending with a pointer
closest_aligned_addr = (addr + size - 1) & 0xFFFFFFFC
ptr = self.fast_memory_load(closest_aligned_addr, 4, int, endness=self.project.arch.memory_endness)
if ptr is None:
return False
if self._is_pointer(cfg, ptr):
return False
return True
return False
def _is_pointer(self, cfg, ptr):
if (
cfg.project.loader.find_section_containing(ptr) is not None
or cfg.project.loader.find_segment_containing(ptr) is not None
or (self._extra_memory_regions and next(((a < ptr < b) for (a, b) in self._extra_memory_regions), None))
):
return True
return False
def _sequence_handler(self, cfg, irsb, irsb_addr, stmt_idx, data_addr, max_size): # pylint:disable=unused-argument
"""
Find sequences in binary data.
:param angr.analyses.CFG cfg: The control flow graph.
:param pyvex.IRSB irsb: The IRSB object.
:param int irsb_addr: Address of the block.
:param int stmt_idx: Statement ID.
:param int data_addr: Address of the data in memory.
:param int max_size: Maximum size possible.
:return: A 2-tuple of data type and size.
:rtype: tuple
"""
if not self._is_sequence(cfg, data_addr, 5):
# fail-fast
return None, None
sequence_max_size = min(256, max_size)
for i in range(5, min(256, max_size)):
if not self._is_sequence(cfg, data_addr, i):
return "sequence", i - 1
return "sequence", sequence_max_size
def _cgc_package_list_identifier(self, data_addr, data_size):
"""
Identifies the CGC package list associated with the CGC binary.
:param int data_addr: Address of the data in memory.
:param int data_size: Maximum size possible.
:return: A 2-tuple of data type and size.
:rtype: tuple
"""
if data_size < 100:
return None, None
data = self.fast_memory_load(data_addr, data_size, str)
if data[:10] != "The DECREE":
return None, None
if not all(i in string.printable for i in data):
return None, None
if not re.match(r"The DECREE packages used in the creation of this challenge binary were:", data):
return None, None
return "cgc-package-list", data_size
def _cgc_extended_application_handler(
self, cfg, irsb, irsb_addr, stmt_idx, data_addr, max_size
): # pylint:disable=unused-argument
"""
Identifies the extended application (a PDF file) associated with the CGC binary.
:param angr.analyses.CFG cfg: The control flow graph.
:param pyvex.IRSB irsb: The IRSB object.
:param int irsb_addr: Address of the block.
:param int stmt_idx: Statement ID.
:param int data_addr: Address of the data in memory.
:param int max_size: Maximum size possible.
:return: A 2-tuple of data type and size.
:rtype: tuple
"""
if max_size < 100:
return None, None
data = self.fast_memory_load(data_addr, 20, bytes)
if data is not None and data[:4] != b"The ":
return None, None
# read everything in
data = self.fast_memory_load(data_addr, max_size, str)
m = re.match(r"The ([\d]+) byte CGC Extended Application follows.", data)
if not m:
return None, None
pdf_size = int(m.group(1))
if "%PDF" not in data:
return None, None
if "%%EOF" not in data:
return None, None
pdf_data = data[data.index("%PDF") : data.index("%%EOF") + 6]
if len(pdf_data) != pdf_size:
return None, None
return "cgc-extended-application", max_size
def _unknown_data_size_handler(
self, cfg, irsb, irsb_addr, stmt_idx, data_addr, max_size
): # pylint:disable=unused-argument
"""
Return the maximum number of bytes until a potential pointer or a potential sequence is found.
:param angr.analyses.CFG cfg: The control flow graph.
:param pyvex.IRSB irsb: The IRSB object.
:param int irsb_addr: Address of the block.
:param int stmt_idx: Statement ID.
:param int data_addr: Address of the data in memory.
:param int max_size: Maximum size possible.
:return: A 2-tuple of data type and size.
:rtype: tuple
"""
sequence_offset = None
for offset in range(1, max_size):
if self._is_sequence(cfg, data_addr + offset, 5):
# a potential sequence is found
sequence_offset = offset
break
if sequence_offset is not None:
if self.project.arch.bits == 32:
max_size = min(max_size, sequence_offset)
elif self.project.arch.bits == 64:
max_size = min(max_size, sequence_offset + 5) # high 5 bytes might be all zeros...
ptr_size = cfg.project.arch.bytes
size = None
for offset in range(1, max_size - ptr_size + 1):
ptr = self.fast_memory_load(data_addr + offset, ptr_size, int, endness=cfg.project.arch.memory_endness)
if self._is_pointer(cfg, ptr):
size = offset
break
if size is not None:
return "unknown", size
elif sequence_offset is not None:
return "unknown", sequence_offset
else:
return None, None
def _has_integer_used_as_pointers(self):
"""
Test if there is any (suspicious) pointer decryption in the code.
:return: True if there is any pointer decryption, False otherwise.
:rtype: bool
"""
# check all integer accesses and see if there is any integer being used as a pointer later, but it wasn't
# classified as a pointer reference
# we only care about unknown memory data that are 4 bytes long, and is directly referenced from an IRSB
candidates = [
i
for i in self.cfg.memory_data.values()
if i.sort in ("unknown", "integer") and i.size == self.project.arch.bytes and i.irsb_addr is not None
]
if not candidates:
return False
for candidate in candidates:
# if the candidate is in .bss, we don't care about it
sec = self.cfg.project.loader.find_section_containing(candidate.address)
if sec.name in (".bss", ".got.plt"):
continue
# execute the single basic block and see how the value is used
base_graph = networkx.DiGraph()
candidate_node: CFGNode = self.cfg.model.get_any_node(candidate.irsb_addr)
if candidate_node is None:
continue
base_graph.add_node(candidate_node)
tmp_kb = KnowledgeBase(self.project)
cfg = self.project.analyses[CFGEmulated].prep(kb=tmp_kb)(
starts=(candidate.irsb_addr,), keep_state=True, base_graph=base_graph
)
candidate_irsb = cfg.get_any_irsb(candidate.irsb_addr)
ddg = self.project.analyses[DDG].prep(kb=tmp_kb)(cfg=cfg)
mem_var_node = None
for node in ddg.simplified_data_graph.nodes():
if isinstance(node.variable, SimMemoryVariable) and node.location.ins_addr == candidate.insn_addr:
# found it!
mem_var_node = node
break
else:
# mem_var_node is not found
continue
# get a sub graph
subgraph = ddg.data_sub_graph(
mem_var_node,
simplified=False,
killing_edges=False,
excluding_types={"mem_addr"},
)
# is it used as a memory address anywhere?
# TODO:
# is it used as a jump target?
next_tmp = None
if isinstance(candidate_irsb.irsb.next, pyvex.IRExpr.RdTmp):
next_tmp = candidate_irsb.irsb.next.tmp
if next_tmp is not None:
next_tmp_node = next(
(
node
for node in subgraph.nodes()
if isinstance(node.variable, SimTemporaryVariable) and node.variable.tmp_id == next_tmp
),
None,
)
if next_tmp_node is not None:
# ouch it's used as a jump target
return True
return False
[docs] def fast_memory_load(self, addr, size, data_type, endness="Iend_LE"):
"""
Load memory bytes from loader's memory backend.
:param int addr: The address to begin memory loading.
:param int size: Size in bytes.
:param data_type: Type of the data.
:param str endness: Endianness of this memory load.
:return: Data read out of the memory.
:rtype: int or bytes or str or None
"""
if data_type is int:
try:
return self.project.loader.memory.unpack_word(addr, size=size, endness=endness)
except KeyError:
return None
try:
data = self.project.loader.memory.load(addr, size)
if data_type is str:
return "".join(chr(i) for i in data)
return data
except KeyError:
return None
from angr.analyses import AnalysesHub
AnalysesHub.register_default("Reassembler", Reassembler)