import logging
import re
from typing import TYPE_CHECKING
from collections import defaultdict
from angr.knowledge_base import KnowledgeBase
from angr.codenode import HookNode
from angr.sim_variable import SimConstantVariable, SimRegisterVariable, SimMemoryVariable, SimStackVariable
from angr import SIM_PROCEDURES
from . import Analysis, CFGEmulated, DDG
if TYPE_CHECKING:
from angr.knowledge_plugins import Function
l = logging.getLogger(name=__name__)
[docs]class ConstantPropagation:
[docs] def __init__(self, constant, constant_assignment_loc, constant_consuming_loc):
self.constant = constant
self.constant_assignment_loc = constant_assignment_loc
self.constant_consuming_loc = constant_consuming_loc
def __repr__(self):
s = "<Constant {:#x} propagates from {:#x} to {:#x}>".format(
self.constant, self.constant_assignment_loc.ins_addr, self.constant_consuming_loc.ins_addr
)
return s
[docs]class RedundantStackVariable:
[docs] def __init__(self, argument, stack_variable, stack_variable_consuming_locs):
self.argument = argument
self.stack_variable = stack_variable
self.stack_variable_consuming_locs = stack_variable_consuming_locs
self.argument_register_as_retval = False
def __repr__(self):
s = "<StackVar %s for %s at %d locations%s>" % (
self.stack_variable,
self.argument,
len(self.stack_variable_consuming_locs),
" - retval" if self.argument_register_as_retval else "",
)
return s
[docs]class RegisterReallocation:
[docs] def __init__(
self,
stack_variable,
register_variable,
stack_variable_sources,
stack_variable_consumers,
prologue_addr,
prologue_size,
epilogue_addr,
epilogue_size,
):
"""
Constructor.
:param SimStackVariable stack_variable:
:param SimRegisterVariable register_variable:
:param list stack_variable_sources:
:param list stack_variable_consumers:
:param int prologue_addr:
:param int prologue_size:
:param int epilogue_addr:
:param int epilogue_size:
"""
self.stack_variable = stack_variable
self.register_variable = register_variable
self.stack_variable_sources = stack_variable_sources
self.stack_variable_consumers = stack_variable_consumers
self.prologue_addr = prologue_addr
self.prologue_size = prologue_size
self.epilogue_addr = epilogue_addr
self.epilogue_size = epilogue_size
def __repr__(self):
s = "<RegisterReallocation %s for %s with %d sources and %d consumers>" % (
self.register_variable,
self.stack_variable,
len(self.stack_variable_sources),
len(self.stack_variable_consumers),
)
return s
[docs]class DeadAssignment:
[docs] def __init__(self, pv):
"""
Constructor.
:param angr.analyses.ddg.ProgramVariable pv: The assignment to remove.
"""
self.pv = pv
def __repr__(self):
s = "<DeadAssignmentElimination %s>" % self.pv
return s
[docs]class BinaryOptimizer(Analysis):
"""
This is a collection of binary optimization techniques we used in Mechanical Phish during the finals of Cyber Grand
Challange. It focuses on dealing with some serious speed-impacting code constructs, and *sort of* worked on *some*
CGC binaries compiled with O0. Use this analysis as a reference of how to use data dependency graph and such.
There is no guarantee that BinaryOptimizer will ever work on non-CGC binaries. Feel free to give us PR or MR, but
please *do not* ask for support of non-CGC binaries.
"""
BLOCKS_THRESHOLD = 500 # do not optimize a function if it has more than this number of blocks
[docs] def __init__(self, cfg, techniques):
self.cfg = cfg
if techniques is None:
raise Exception("At least one optimization technique must be specified.")
supported_techniques = {
"constant_propagation",
"redundant_stack_variable_removal",
"register_reallocation",
"dead_assignment_elimination",
}
if techniques - supported_techniques:
raise Exception("At least one optimization technique specified is not supported.")
self._techniques = techniques.copy()
self.constant_propagations = []
self.redundant_stack_variables = []
self.register_reallocations = []
self.dead_assignments = []
self.optimize()
[docs] def optimize(self):
f: Function
for f in self.kb.functions.values():
# if there are unresolved targets in this function, we do not try to optimize it
unresolvable_targets = (
SIM_PROCEDURES["stubs"]["UnresolvableJumpTarget"],
SIM_PROCEDURES["stubs"]["UnresolvableCallTarget"],
)
if any([n.sim_procedure in unresolvable_targets for n in f.graph.nodes() if isinstance(n, HookNode)]):
continue
if len(f.block_addrs_set) > self.BLOCKS_THRESHOLD:
continue
self._optimize_function(f)
def _optimize_function(self, function):
"""
:param Function function:
:return:
"""
# if function.addr != 0x8048250:
# return
func_kb = KnowledgeBase(self.project)
# switch to non-optimized IR, since optimized IR will optimize away register reads/writes
# for example,
# .text:08048285 add eax, [ebp+var_8]
# .text:08048288 mov [ebp+var_C], eax
# becomes
# 06 | ------ IMark(0x8048285, 3, 0) ------
# 07 | t25 = Add32(t24,0xfffffff8)
# 08 | t5 = LDle:I32(t25)
# 09 | t4 = Add32(t2,t5)
# 10 | PUT(eip) = 0x08048288
# 11 | ------ IMark(0x8048288, 3, 0) ------
# 12 | t27 = Add32(t24,0xfffffff4)
# 13 | STle(t27) = t4
# 14 | PUT(eip) = 0x0804828b
# there is no write to or read from eax
cfg = self.project.analyses[CFGEmulated].prep(kb=func_kb)(
call_depth=1,
base_graph=function.graph,
keep_state=True,
starts=(function.addr,),
iropt_level=0,
)
ddg = self.project.analyses[DDG].prep(kb=func_kb)(cfg=cfg)
if "constant_propagation" in self._techniques:
self._constant_propagation(function, ddg.simplified_data_graph)
if "redundant_stack_variable_removal" in self._techniques:
self._redundant_stack_variable_removal(function, ddg.simplified_data_graph)
if "register_reallocation" in self._techniques:
self._register_reallocation(function, ddg.simplified_data_graph)
if "dead_assignment_elimination" in self._techniques:
self._dead_assignment_elimination(function, ddg.simplified_data_graph)
def _constant_propagation(self, function, data_graph): # pylint:disable=unused-argument
"""
:param function:
:param networkx.MultiDiGraph data_graph:
:return:
"""
# find all edge sequences that looks like const->reg->memory
for n0 in data_graph.nodes():
if not isinstance(n0.variable, SimConstantVariable):
continue
n1s = list(data_graph.successors(n0))
if len(n1s) != 1:
continue
n1 = n1s[0]
if not isinstance(n1.variable, SimRegisterVariable):
continue
if len(list(data_graph.predecessors(n1))) != 1:
continue
n2s = list(data_graph.successors(n1))
if len(n2s) != 1:
continue
n2 = n2s[0]
if not isinstance(n2.variable, SimMemoryVariable):
continue
n2_inedges = data_graph.in_edges(n2, data=True)
if len([0 for _, _, data in n2_inedges if "type" in data and data["type"] == "mem_data"]) != 1:
continue
cp = ConstantPropagation(n0.variable.value, n0.location, n2.location)
self.constant_propagations.append(cp)
# print n0, n1, n2
def _redundant_stack_variable_removal(self, function, data_graph):
"""
If an argument passed from the stack (i.e. dword ptr [ebp+4h]) is saved to a local variable on the stack at the
beginning of the function, and this local variable was never modified anywhere in this function, and no pointer
of any stack variable is saved in any register, then we can replace all references to this local variable to
that argument instead.
:param function:
:param networkx.MultiDiGraph data_graph:
:return:
"""
# check if there is any stack pointer being stored into any register other than esp
# basically check all consumers of stack pointers
stack_ptrs = []
sp_offset = self.project.arch.registers["esp"][0]
bp_offset = self.project.arch.registers["ebp"][0]
for n in data_graph.nodes():
if isinstance(n.variable, SimRegisterVariable) and n.variable.reg in (sp_offset, bp_offset):
stack_ptrs.append(n)
# for each stack pointer variable, make sure none of its consumers is a general purpose register
for stack_ptr in stack_ptrs:
out_edges = data_graph.out_edges(stack_ptr, data=True)
for _, dst, data in out_edges:
if "type" in data and data["type"] == "kill":
# we don't care about killing edges
continue
if (
isinstance(dst.variable, SimRegisterVariable)
and dst.variable.reg < 40
and dst.variable.reg not in (sp_offset, bp_offset)
):
# oops
l.debug(
"Function %s does not satisfy requirements of redundant stack variable removal.", repr(function)
)
return
argument_variables = []
for n in data_graph.nodes():
if isinstance(n.variable, SimStackVariable) and n.variable.base == "bp" and n.variable.offset >= 0:
argument_variables.append(n)
if not argument_variables:
return
# print function
# print argument_variables
argument_to_local = {}
argument_register_as_retval = set()
# for each argument, find its correspondence on the local stack frame
for argument_variable in argument_variables:
# is it copied to the stack?
successors0 = list(data_graph.successors(argument_variable))
if not successors0:
continue
if len(successors0) != 1:
continue
if isinstance(successors0[0].variable, SimRegisterVariable):
# argument -> register -> stack
out_edges = data_graph.out_edges(successors0[0], data=True)
successors1 = [s for _, s, data in out_edges if "type" not in data or data["type"] != "kill"]
if len(successors1) == 1:
successor1 = successors1[0]
if isinstance(successor1.variable, SimStackVariable):
if (successor1.variable.base == "sp" and successor1.variable.offset > 0) or (
successor1.variable.base == "bp" and successor1.variable.offset < 0
):
# yes it's copied onto the stack!
argument_to_local[argument_variable] = successor1
# if the register is eax, and it's not killed later, it might be the return value of this function
# in that case, we cannot eliminate the instruction that moves stack argument to that register
if successors0[0].variable.reg == self.project.arch.registers["eax"][0]:
killers = [s for _, s, data in out_edges if "type" in data and data["type"] == "kill"]
if not killers:
# it might be the return value
argument_register_as_retval.add(argument_variable)
else:
raise NotImplementedError() # TODO:
# import pprint
# pprint.pprint(argument_to_local, width=160)
# find local correspondence that are not modified throughout this function
redundant_stack_variables = []
for argument, local_var in argument_to_local.items():
# local_var cannot be killed anywhere
out_edges = data_graph.out_edges(local_var, data=True)
consuming_locs = []
for _, consumer, data in out_edges:
consuming_locs.append(consumer.location)
if "type" in data and data["type"] == "kill":
break
else:
# no killing edges. the value is not changed!
rsv = RedundantStackVariable(argument, local_var, consuming_locs)
if argument in argument_register_as_retval:
rsv.argument_register_as_retval = True
redundant_stack_variables.append(rsv)
self.redundant_stack_variables.extend(redundant_stack_variables)
def _register_reallocation(self, function, data_graph):
"""
Find unused registers throughout the function, and use those registers to replace stack variables.
Only functions that satisfy the following criteria can be optimized in this way:
- The function does not call any other function.
- The function does not use esp to index any stack variable.
- Prologue and epilogue of the function is identifiable.
- At least one register is not used in the entire function.
:param Function function:
:param networkx.MultiDiGraph data_graph:
:return: None
"""
# make sure this function does not call other functions
if function.callout_sites:
return
if len(function.endpoints) != 1:
return
# identify function prologue and epilogue
startpoint_block = self.project.factory.block(function.startpoint.addr).capstone
startpoint_insns = startpoint_block.insns
# supported function prologues:
#
# push ebp
# mov ebp, esp
# sub esp, [0-9a-f]+h
#
# push ebp
# mov ebp, esp
# push eax
if len(startpoint_insns) < 3:
return
insn0, insn1, insn2 = startpoint_insns[:3]
if not (insn0.mnemonic == "push" and insn0.op_str == "ebp"):
return
if not (insn1.mnemonic == "mov" and insn1.op_str == "ebp, esp"):
return
if not (insn2.mnemonic == "sub" and re.match(r"esp, [0-9a-fx]+", insn2.op_str)) and not (
insn2.mnemonic == "push" and insn2.op_str == "eax"
):
return
endpoint_block = self.project.factory.block(function.endpoints[0].addr).capstone
endpoint_insns = endpoint_block.insns
# supported function epilogues:
#
# add esp, [0-9a-f]+h
# pop ebp
# ret
if len(endpoint_insns) < 3:
return
insn3, insn4, insn5 = endpoint_insns[-3:]
if not (insn3.mnemonic == "add" and re.match(r"esp, [0-9a-fx]+", insn3.op_str)):
return
if not (insn4.mnemonic == "pop" and insn4.op_str == "ebp"):
return
if not insn5.mnemonic == "ret":
return
# make sure esp is not used anywhere else - all stack variables must be indexed using ebp
esp_offset = self.project.arch.registers["esp"][0]
ebp_offset = self.project.arch.registers["ebp"][0]
esp_variables = []
for n in data_graph.nodes():
if isinstance(n.variable, SimRegisterVariable) and n.variable.reg == esp_offset:
esp_variables.append(n)
# find out all call instructions
call_insns = set()
for src, dst, data in function.transition_graph.edges(data=True):
if "type" in data and data["type"] == "call":
src_block = function._get_block(src.addr)
call_insns.add(src_block.instruction_addrs[-1])
# there should be six esp variables + all call sites
# push ebp (insn0 - read, insn0 - write) ; sub esp, 0xXX (insn2) ;
# add esp, 0xXX (insn3) ; pop ebp (insn4) ; ret (insn5)
esp_insns = {n.location.ins_addr for n in esp_variables}
if esp_insns != {insn0.address, insn2.address, insn3.address, insn4.address, insn5.address} | call_insns:
return
prologue_addr = insn0.address
prologue_size = insn0.size + insn1.size + insn2.size
epilogue_addr = insn3.address
epilogue_size = insn3.size + insn4.size + insn5.size
# look at consumer of those esp variables. no other instruction should be consuming them
# esp_consumer_insns = { insn0.address, insn1.address, insn2.address, insn3.address, insn4.address,
# insn5.address} | esp_insns
# esp_variable: angr.analyses.ddg.ProgramVariable
# for esp_variable in esp_variables:
# consumers = data_graph.successors(esp_variable)
# if any([ consumer.location.ins_addr not in esp_consumer_insns for consumer in consumers ]):
# return
# make sure we never gets the address of those stack variables into any register
# say, lea edx, [ebp-0x4] is forbidden
# check all edges in data graph
for src, dst, data in data_graph.edges(data=True):
if (
isinstance(dst.variable, SimRegisterVariable)
and dst.variable.reg != ebp_offset
and dst.variable.reg < 40
):
# to a register other than ebp
if isinstance(src.variable, SimRegisterVariable) and src.variable.reg == ebp_offset:
# from ebp
l.debug(
"Found a lea operation from ebp at %#x. Function %s cannot be optimized.",
dst.location.ins_addr,
repr(function),
)
return
# we definitely don't want to mess with fp or sse operations
for node in data_graph.nodes():
if (
isinstance(node.variable, SimRegisterVariable) and 72 <= node.variable.reg < 288
): # offset(mm0) <= node.variable.reg < offset(cs)
l.debug(
"Found a float-point/SSE register access at %#x. Function %s cannot be optimized.",
node.location.ins_addr,
repr(function),
)
return
l.debug("RegisterReallocation: function %s satisfies the criteria.", repr(function))
# nice. let's see if we can optimize this function
# do we have free registers?
used_general_registers = set()
for n in data_graph.nodes():
if isinstance(n.variable, SimRegisterVariable):
if n.variable.reg < 40: # this is a hardcoded limit - we only care about general registers
used_general_registers.add(n.variable.reg)
registers = self.project.arch.registers
all_general_registers = { # registers['eax'][0], registers['ecx'][0], registers['edx'][0],
registers["ebx"][0],
registers["edi"][0],
registers["esi"][0],
registers["esp"][0],
registers["ebp"][0],
}
unused_general_registers = all_general_registers - used_general_registers
if not unused_general_registers:
l.debug("RegisterReallocation: function %s does not have any free register.", repr(function))
return
l.debug(
"RegisterReallocation: function %s has %d free register(s): %s",
repr(function),
len(unused_general_registers),
", ".join([self.project.arch.register_names[u] for u in unused_general_registers]),
)
# find local stack variables of size 4
stack_variables = set()
for n in data_graph.nodes():
if (
isinstance(n.variable, SimStackVariable)
and n.variable.base == "bp"
and n.variable.size == 4
and n.variable.offset < 0
):
stack_variables.add(n)
# alright, now we need to make sure that stack variables are never accessed by indexes
# in other words, they must be accessed directly in forms of 'dword ptr [ebp+x]'
# it's easy to do this: we get mem_addr predecessors of each stack variable, and make sure there are only two of
# them: one is ebp, the other one is a constant
#
# ah, also, since we do not want to mess with crazy fp registers, we further require none of the stack variable
# sources and consumers is a FP register.
filtered_stack_variables = set()
for stack_variable in stack_variables:
failed = False
# check how they are accessed
in_edges = data_graph.in_edges(stack_variable, data=True)
for src, _, data in in_edges:
if "type" in data and data["type"] == "mem_addr":
if isinstance(src.variable, SimRegisterVariable) and src.variable.reg == ebp_offset:
# ebp
pass
elif isinstance(src.variable, SimConstantVariable):
# the constant
pass
else:
# ouch
failed = True
break
if isinstance(src.variable, SimRegisterVariable) and src.variable.reg >= 72:
# it comes from a FP register
failed = True
break
if failed:
continue
# check consumers
out_edges = data_graph.out_edges(stack_variable, data=True)
for _, dst, data in out_edges:
if "type" in data and data["type"] == "kill":
continue
if isinstance(dst.variable, SimRegisterVariable) and dst.variable.reg >= 72:
# an FP register is the consumer
failed = True
break
if failed:
continue
filtered_stack_variables.add(stack_variable)
# order the stack variables by the sum of their in and out degrees.
stack_variable_to_degree = defaultdict(int)
stack_variable_sources = defaultdict(list)
for sv in filtered_stack_variables:
stack_variable_to_degree[sv.variable] += data_graph.in_degree(sv)
stack_variable_to_degree[sv.variable] += data_graph.out_degree(sv)
stack_variable_sources[sv.variable].append(sv)
sorted_stack_variables = sorted(
stack_variable_to_degree.keys(), key=lambda sv: stack_variable_to_degree[sv], reverse=True
)
# aha these are the ones that we can replace!
for reg, sv in zip(unused_general_registers, sorted_stack_variables):
non_initial_sources = [src for src in stack_variable_sources[sv] if not src.initial]
if not non_initial_sources:
# we failed to find any source for it, which indicates a failure in our dependence analysis
# skip
continue
# get consumers
consumers = set()
for src in stack_variable_sources[sv]:
out_edges = data_graph.out_edges(src, data=True)
for _, dst, data in out_edges:
if "type" not in data or data["type"] != "kill":
consumers.add(dst)
rr = RegisterReallocation(
sv,
SimRegisterVariable(reg, 4),
non_initial_sources,
list(consumers),
prologue_addr,
prologue_size,
epilogue_addr,
epilogue_size,
)
self.register_reallocations.append(rr)
l.debug(
"RegisterReallocation: %s will replace %s in function %s.",
rr.register_variable,
rr.stack_variable,
repr(function),
)
def _dead_assignment_elimination(self, function, data_graph): # pylint:disable=unused-argument
"""
Remove assignments to registers that has no consumers, but immediately killed.
BROKEN - DO NOT USE IT
:param Function function:
:param networkx.MultiDiGraph data_graph:
:return: None
"""
register_pvs = set()
for node in data_graph.nodes():
if (
isinstance(node.variable, SimRegisterVariable)
and node.variable.reg is not None
and node.variable.reg < 40
):
register_pvs.add(node)
for reg in register_pvs:
# does it have a consumer?
out_edges = data_graph.out_edges(reg, data=True)
consumers = []
killers = []
for _, _, data in out_edges:
if "type" in data and data["type"] == "kill":
killers.append(data)
else:
consumers.append(data)
if not consumers and killers:
# we can remove the assignment!
da = DeadAssignment(reg)
self.dead_assignments.append(da)
from angr.analyses import AnalysesHub
AnalysesHub.register_default("BinaryOptimizer", BinaryOptimizer)