# pylint:disable=too-many-boolean-expressions,global-statement
from typing import Dict, Optional, Tuple, TYPE_CHECKING
import logging
import pyvex
import archinfo
from .... import options, BP_BEFORE
from ....blade import Blade
from ....annocfg import AnnotatedCFG
from ....exploration_techniques import Slicecutor
from ....utils.constants import DEFAULT_STATEMENT
from .resolver import IndirectJumpResolver
if TYPE_CHECKING:
from angr.block import Block
l = logging.getLogger(name=__name__)
PROFILING = False
HITS_CASE_0, HITS_CASE_1, MISSES = 0, 0, 0
[docs]def enable_profiling():
global PROFILING, HITS_CASE_0, HITS_CASE_1, MISSES
PROFILING = True
HITS_CASE_0 = 0
HITS_CASE_1 = 0
MISSES = 0
[docs]def disable_profiling():
global PROFILING
PROFILING = False
[docs]class OverwriteTmpValueCallback:
"""
Overwrites temporary values during resolution
"""
[docs] def __init__(self, gp_value):
self.gp_value = gp_value
[docs] def overwrite_tmp_value(self, state):
state.inspect.tmp_write_expr = state.solver.BVV(self.gp_value, state.arch.bits)
[docs]class MipsElfFastResolver(IndirectJumpResolver):
"""
A timeless indirect jump resolver for R9-based indirect function calls in MIPS ELFs.
"""
[docs] def __init__(self, project):
super().__init__(project, timeless=True)
[docs] def filter(self, cfg, addr, func_addr, block, jumpkind):
if not isinstance(
self.project.arch,
(
archinfo.ArchMIPS32,
archinfo.ArchMIPS64,
),
):
return False
return True
[docs] def resolve( # pylint:disable=unused-argument
self, cfg, addr, func_addr, block, jumpkind, func_graph_complete: bool = True, **kwargs
):
"""
Wrapper for _resolve that slowly increments the max_depth used by Blade for finding sources
until we can resolve the addr or we reach the default max_depth
:param cfg: A CFG instance.
:param int addr: IRSB address.
:param int func_addr: The function address.
:param pyvex.IRSB block: The IRSB.
:param str jumpkind: The jumpkind.
:return: If it was resolved and targets alongside it
:rtype: tuple
"""
for max_level in range(2, 4):
resolved, resolved_targets = self._resolve(cfg, addr, func_addr, block, jumpkind, max_level=max_level)
if resolved:
return resolved, resolved_targets
return False, []
def _resolve(self, cfg, addr, func_addr, block, jumpkind, max_level): # pylint:disable=unused-argument
"""
Resolves the indirect jump in MIPS ELF binaries where all external function calls are indexed using gp.
:param cfg: A CFG instance.
:param int addr: IRSB address.
:param int func_addr: The function address.
:param pyvex.IRSB block: The IRSB.
:param str jumpkind: The jumpkind.
:param int max_level: maximum level for Blade to resolve when looking for sources
:return: If it was resolved and targets alongside it
:rtype: tuple
"""
global HITS_CASE_0, HITS_CASE_1, MISSES
project = self.project
b = Blade(
cfg.graph,
addr,
-1,
cfg=cfg,
project=project,
ignore_sp=True,
ignore_bp=True,
ignored_regs=("gp",),
cross_insn_opt=False,
stop_at_calls=True,
max_level=max_level,
include_imarks=False,
)
func = cfg.kb.functions.function(addr=func_addr)
gp_value = func.info.get("gp", None)
# see if gp is used on this slice at all
gp_used = self._is_gp_used_on_slice(project, b)
if gp_used and gp_value is None:
# this might a special case: gp is only used once in this function, and it can be initialized right
# before its use site.
# however, it should have been determined in CFGFast
# cannot determine the value of gp. quit
l.warning("Failed to determine value of register gp for function %#x.", func.addr)
return False, []
if gp_value is not None:
target = self._try_handle_simple_case_0(gp_value, b)
if target is not None:
if PROFILING:
HITS_CASE_0 += 1
# print(f"hit/miss: {HITS_CASE_0 + HITS_CASE_1}/{MISSES}, {HITS_CASE_0}|{HITS_CASE_1}")
return True, [target]
target = self._try_handle_simple_case_1(gp_value, b)
if target is not None:
if PROFILING:
HITS_CASE_1 += 1
# print(f"hit/miss: {HITS_CASE_0 + HITS_CASE_1}/{MISSES}, {HITS_CASE_0}|{HITS_CASE_1}")
return True, [target]
if PROFILING:
MISSES += 1
# print(f"hit/miss: {HITS_CASE_0 + HITS_CASE_1}/{MISSES}, {HITS_CASE_0}|{HITS_CASE_1}")
sources = [n for n in b.slice.nodes() if b.slice.in_degree(n) == 0]
if not sources:
return False, []
source = sources[0]
source_addr = source[0]
annotated_cfg = AnnotatedCFG(project, None, detect_loops=False)
annotated_cfg.from_digraph(b.slice)
state = project.factory.blank_state(
addr=source_addr,
mode="fastpath",
remove_options=options.refs,
# suppress unconstrained stack reads for `gp`
add_options={
options.SYMBOL_FILL_UNCONSTRAINED_REGISTERS,
options.SYMBOL_FILL_UNCONSTRAINED_MEMORY,
options.NO_CROSS_INSN_OPT,
},
)
state.regs._t9 = func_addr
if gp_used:
# Special handling for cases where `gp` is stored on the stack
gp_offset = project.arch.registers["gp"][0]
self._set_gp_load_callback(state, b, project, gp_offset, gp_value)
state.regs._gp = gp_value
simgr = self.project.factory.simulation_manager(state)
simgr.use_technique(Slicecutor(annotated_cfg, force_sat=True))
simgr.run()
if simgr.cut:
# pick the successor that is cut right after executing `addr`
try:
target_state = next(iter(cut for cut in simgr.cut if cut.history.addr == addr))
except StopIteration:
l.info("Indirect jump at %#x cannot be resolved by %s.", addr, repr(self))
return False, []
target = target_state.addr
if self._is_target_valid(cfg, target) and target != func_addr:
l.debug("Indirect jump at %#x is resolved to target %#x.", addr, target)
return True, [target]
l.info("Indirect jump at %#x is resolved to target %#x, which seems to be invalid.", addr, target)
return False, []
l.info("Indirect jump at %#x cannot be resolved by %s.", addr, repr(self))
return False, []
def _try_handle_simple_case_0(self, gp: int, blade: Blade) -> Optional[int]:
# we only attempt to support the following case:
# + A | t37 = GET:I32(gp)
# + B | t36 = Add32(t37,0xffff8624)
# + C | t38 = LDbe:I32(t36)
# + D | PUT(t9) = t38
# + E | t8 = GET:I32(t9)
# Next: t8
nodes_with_no_outedges = []
for node in blade.slice.nodes():
if blade.slice.out_degree(node) == 0:
nodes_with_no_outedges.append(node)
if len(nodes_with_no_outedges) != 1:
return None
end_node = nodes_with_no_outedges[0]
if end_node[-1] != DEFAULT_STATEMENT:
return None
end_block = self.project.factory.block(end_node[0], cross_insn_opt=blade._cross_insn_opt).vex
if not isinstance(end_block.next, pyvex.IRExpr.RdTmp):
return None
next_tmp = end_block.next.tmp
# step backward
# E
previous_node = self._previous_node(blade, end_node)
if previous_node is None:
return None
stmt = end_block.statements[previous_node[1]]
if not isinstance(stmt, pyvex.IRStmt.WrTmp) or not isinstance(stmt.data, pyvex.IRExpr.Get):
return None
if stmt.tmp != next_tmp:
return None
if stmt.data.offset != self.project.arch.registers["t9"][0]:
return None
# D
previous_node = self._previous_node(blade, previous_node)
if previous_node is None:
return None
stmt = end_block.statements[previous_node[1]]
if not isinstance(stmt, pyvex.IRStmt.Put) or not isinstance(stmt.data, pyvex.IRExpr.RdTmp):
return None
if stmt.offset != self.project.arch.registers["t9"][0]:
return None
data_tmp = stmt.data.tmp
# C
previous_node = self._previous_node(blade, previous_node)
if previous_node is None:
return None
stmt = end_block.statements[previous_node[1]]
if (
not isinstance(stmt, pyvex.IRStmt.WrTmp)
or not isinstance(stmt.data, pyvex.IRExpr.Load)
or not isinstance(stmt.data.addr, pyvex.IRExpr.RdTmp)
):
return None
if stmt.tmp != data_tmp:
return None
addr_tmp = stmt.data.addr.tmp
# B
previous_node = self._previous_node(blade, previous_node)
if previous_node is None:
return None
stmt = end_block.statements[previous_node[1]]
if (
not isinstance(stmt, pyvex.IRStmt.WrTmp)
or stmt.tmp != addr_tmp
or not isinstance(stmt.data, pyvex.IRExpr.Binop)
or stmt.data.op != "Iop_Add32"
or not isinstance(stmt.data.args[0], pyvex.IRExpr.RdTmp)
or not isinstance(stmt.data.args[1], pyvex.IRExpr.Const)
):
return None
add_tmp = stmt.data.args[0].tmp
add_const = stmt.data.args[1].con.value
# A
previous_node = self._previous_node(blade, previous_node)
if previous_node is None:
return None
stmt = end_block.statements[previous_node[1]]
if (
not isinstance(stmt, pyvex.IRStmt.WrTmp)
or stmt.tmp != add_tmp
or not isinstance(stmt.data, pyvex.IRExpr.Get)
):
return None
if stmt.data.offset != self.project.arch.registers["gp"][0]:
return None
# matching complete
addr = (gp + add_const) & 0xFFFF_FFFF
try:
target = self.project.loader.memory.unpack_word(addr, size=4)
return target
except KeyError:
return None
def _try_handle_simple_case_1(self, gp: int, blade: Blade) -> Optional[int]:
# we only attempt to support the following case:
# + A | t22 = GET:I32(gp)
# + B | t21 = Add32(t22,0xffff8020)
# + C | t23 = LDbe:I32(t21)
# + D | PUT(t9) = t23
# + E | t27 = GET:I32(t9)
# + F | t26 = Add32(t27,0x00007cec)
# + G | PUT(t9) = t26
# + H | t4 = GET:I32(t9)
# + Next: t4
nodes_with_no_outedges = []
for node in blade.slice.nodes():
if blade.slice.out_degree(node) == 0:
nodes_with_no_outedges.append(node)
if len(nodes_with_no_outedges) != 1:
return None
end_node = nodes_with_no_outedges[0]
if end_node[-1] != DEFAULT_STATEMENT:
return None
end_block = self.project.factory.block(end_node[0], cross_insn_opt=blade._cross_insn_opt).vex
if not isinstance(end_block.next, pyvex.IRExpr.RdTmp):
return None
next_tmp = end_block.next.tmp
# step backward
# H
previous_node = self._previous_node(blade, end_node)
if previous_node is None:
return None
stmt = end_block.statements[previous_node[1]]
if not isinstance(stmt, pyvex.IRStmt.WrTmp) or not isinstance(stmt.data, pyvex.IRExpr.Get):
return None
if stmt.tmp != next_tmp:
return None
if stmt.data.offset != self.project.arch.registers["t9"][0]:
return None
# G
previous_node = self._previous_node(blade, previous_node)
if previous_node is None:
return None
stmt = end_block.statements[previous_node[1]]
if not isinstance(stmt, pyvex.IRStmt.Put) or not isinstance(stmt.data, pyvex.IRExpr.RdTmp):
return None
if stmt.offset != self.project.arch.registers["t9"][0]:
return None
t9_tmp_G = stmt.data.tmp
# F
previous_node = self._previous_node(blade, previous_node)
if previous_node is None:
return None
stmt = end_block.statements[previous_node[1]]
if (
not isinstance(stmt, pyvex.IRStmt.WrTmp)
or not stmt.tmp == t9_tmp_G
or not isinstance(stmt.data, pyvex.IRExpr.Binop)
or stmt.data.op != "Iop_Add32"
or not isinstance(stmt.data.args[0], pyvex.IRExpr.RdTmp)
or not isinstance(stmt.data.args[1], pyvex.IRExpr.Const)
):
return None
t9_tmp_F = stmt.data.args[0].tmp
t9_add_const = stmt.data.args[1].con.value
# E
previous_node = self._previous_node(blade, previous_node)
if previous_node is None:
return None
stmt = end_block.statements[previous_node[1]]
if not isinstance(stmt, pyvex.IRStmt.WrTmp) or not isinstance(stmt.data, pyvex.IRExpr.Get):
return None
if stmt.tmp != t9_tmp_F:
return None
if stmt.data.offset != self.project.arch.registers["t9"][0]:
return None
# D
previous_node = self._previous_node(blade, previous_node)
if previous_node is None:
return None
stmt = end_block.statements[previous_node[1]]
if not isinstance(stmt, pyvex.IRStmt.Put) or not isinstance(stmt.data, pyvex.IRExpr.RdTmp):
return None
if stmt.offset != self.project.arch.registers["t9"][0]:
return None
t9_tmp_D = stmt.data.tmp
# C
previous_node = self._previous_node(blade, previous_node)
if previous_node is None:
return None
stmt = end_block.statements[previous_node[1]]
if (
not isinstance(stmt, pyvex.IRStmt.WrTmp)
or not isinstance(stmt.data, pyvex.IRExpr.Load)
or not isinstance(stmt.data.addr, pyvex.IRExpr.RdTmp)
):
return None
if stmt.tmp != t9_tmp_D:
return None
addr_tmp = stmt.data.addr.tmp
# B
previous_node = self._previous_node(blade, previous_node)
if previous_node is None:
return None
stmt = end_block.statements[previous_node[1]]
if (
not isinstance(stmt, pyvex.IRStmt.WrTmp)
or stmt.tmp != addr_tmp
or not isinstance(stmt.data, pyvex.IRExpr.Binop)
or stmt.data.op != "Iop_Add32"
or not isinstance(stmt.data.args[0], pyvex.IRExpr.RdTmp)
or not isinstance(stmt.data.args[1], pyvex.IRExpr.Const)
):
return None
add_tmp = stmt.data.args[0].tmp
add_const = stmt.data.args[1].con.value
# A
previous_node = self._previous_node(blade, previous_node)
if previous_node is None:
return None
stmt = end_block.statements[previous_node[1]]
if (
not isinstance(stmt, pyvex.IRStmt.WrTmp)
or stmt.tmp != add_tmp
or not isinstance(stmt.data, pyvex.IRExpr.Get)
):
return None
if stmt.data.offset != self.project.arch.registers["gp"][0]:
return None
# matching complete
addr = (gp + add_const) & 0xFFFF_FFFF
try:
target_0 = self.project.loader.memory.unpack_word(addr, size=4)
target = (target_0 + t9_add_const) & 0xFFFF_FFFF
return target
except KeyError:
return None
@staticmethod
def _previous_node(blade: Blade, curr_node: Tuple[int, int]) -> Optional[Tuple[int, int]]:
if blade.slice.in_degree(curr_node) != 1:
return None
nn = next(iter(blade.slice.predecessors(curr_node)))
if nn[0] != curr_node[0]:
return None
return nn
@staticmethod
def _set_gp_load_callback(state, blade, project, gp_offset, gp_value):
tmps = {}
for block_addr_in_slice in {slice_node[0] for slice_node in blade.slice.nodes()}:
for stmt in project.factory.block(block_addr_in_slice, cross_insn_opt=False).vex.statements:
if isinstance(stmt, pyvex.IRStmt.WrTmp) and isinstance(stmt.data, pyvex.IRExpr.Load):
# Load from memory to a tmp - assuming it's loading from the stack
tmps[stmt.tmp] = "stack"
elif isinstance(stmt, pyvex.IRStmt.Put) and stmt.offset == gp_offset:
if isinstance(stmt.data, pyvex.IRExpr.RdTmp):
tmp_offset = stmt.data.tmp # pylint:disable=cell-var-from-loop
if tmps.get(tmp_offset, None) == "stack":
# found the load from stack
# we must make sure value of that temporary variable equals to the correct gp value
state.inspect.make_breakpoint(
"tmp_write",
when=BP_BEFORE,
condition=(
lambda s, bbl_addr_=block_addr_in_slice, tmp_offset_=tmp_offset: s.scratch.bbl_addr
== bbl_addr_
and s.inspect.tmp_write_num == tmp_offset_
),
action=OverwriteTmpValueCallback(gp_value).overwrite_tmp_value,
)
break
@staticmethod
def _is_gp_used_on_slice(project, b: Blade) -> bool:
gp_offset = project.arch.registers["gp"][0]
blocks_on_slice: Dict[int, "Block"] = {}
for block_addr, block_stmt_idx in b.slice.nodes():
if block_addr not in blocks_on_slice:
blocks_on_slice[block_addr] = project.factory.block(block_addr, cross_insn_opt=False)
block = blocks_on_slice[block_addr]
if block_stmt_idx == DEFAULT_STATEMENT:
if isinstance(block.vex.next, pyvex.IRExpr.Get) and block.vex.next.offset == gp_offset:
gp_used = True
break
else:
stmt = block.vex.statements[block_stmt_idx]
if (
isinstance(stmt, pyvex.IRStmt.WrTmp)
and isinstance(stmt.data, pyvex.IRExpr.Get)
and stmt.data.offset == gp_offset
):
gp_used = True
break
else:
gp_used = False
return gp_used