import logging
import networkx
from . import Analysis
l = logging.getLogger(name=__name__)
[docs]class Loop:
def __init__(self, entry, entry_edges, break_edges, continue_edges, body_nodes, graph, subloops):
self.entry = entry
self.entry_edges = entry_edges
self.break_edges = break_edges
self.continue_edges = continue_edges
self.body_nodes = body_nodes
self.graph = graph
self.subloops = subloops
self.has_calls = any(map(lambda loop: loop.has_calls, subloops))
if not self.has_calls:
for _, _, data in self.graph.edges(data=True):
if "type" in data and data["type"] == "fake_return":
# this is a function call.
self.has_calls = True
break
def __repr__(self):
s = "<Loop @ %s, %d blocks>" % (self.entry.addr, len(self.body_nodes))
return s
[docs]class LoopFinder(Analysis):
"""
Extracts all the loops from all the functions in a binary.
"""
def __init__(self, functions=None, normalize=True):
if functions is None:
functions = self.kb.functions.values()
found_any = False
self.loops = []
self.loops_hierarchy = {}
for function in functions:
if self.project.is_hooked(function.addr) or self.project.simos.is_syscall_addr(function.addr):
# skip SimProcedures and syscalls
continue
found_any = True
with self._resilience():
if normalize:
function.normalize()
tops, alls = self._parse_loops_from_graph(function.graph)
self.loops += alls
self.loops_hierarchy[function.addr] = tops
if not found_any:
l.error("No knowledge of functions is present. Did you forget to construct a CFG?")
def _parse_loop_graph(self, subg: networkx.DiGraph, bigg: networkx.DiGraph):
"""
Create a Loop object for a strongly connected graph, and any strongly
connected subgraphs, if possible.
:param subg: A strongly connected subgraph.
:param bigg: The graph which subg is a subgraph of.
:return: A list of Loop objects, some of which may be inside others,
but all need to be documented.
"""
loop_body_nodes = list(subg.nodes())[:]
entry_edges = []
break_edges = []
continue_edges = []
entry_node = None
for node in loop_body_nodes:
for pred_node in bigg.predecessors(node):
if pred_node not in loop_body_nodes:
if entry_node is not None and entry_node != node:
l.warning("Bad loop: more than one entry point (%s, %s)", entry_node, node)
return None, []
entry_node = node
entry_edges.append((pred_node, node))
subg.add_edge(pred_node, node)
for succ_node in bigg.successors(node):
if succ_node not in loop_body_nodes:
break_edges.append((node, succ_node))
subg.add_edge(node, succ_node)
if entry_node is None:
entry_node = min(loop_body_nodes, key=lambda n: n.addr)
l.info("Couldn't find entry point, assuming it's the first by address (%s)", entry_node)
acyclic_subg = subg.copy()
for pred_node in subg.predecessors(entry_node):
if pred_node in loop_body_nodes:
continue_edge = (pred_node, entry_node)
acyclic_subg.remove_edge(*continue_edge)
continue_edges.append(continue_edge)
removed_exits = {}
removed_entries = {}
tops, alls = self._parse_loops_from_graph(acyclic_subg)
for subloop in tops:
if subloop.entry in loop_body_nodes:
# break existing entry edges, exit edges
# re-link in loop object
# the exception logic is to handle when you have two loops adjacent to each other
# you gotta link the two loops together and remove the dangling edge
for entry_edge in subloop.entry_edges:
try:
subg.remove_edge(*entry_edge)
except networkx.NetworkXError:
if entry_edge in removed_entries:
subg.add_edge(removed_entries[entry_edge], subloop)
try:
subg.remove_edge(removed_entries[entry_edge], entry_edge[1])
except networkx.NetworkXError:
pass
else:
raise
else:
subg.add_edge(entry_edge[0], subloop)
removed_entries[entry_edge] = subloop
for exit_edge in subloop.break_edges:
try:
subg.remove_edge(*exit_edge)
except networkx.NetworkXError:
if exit_edge in removed_entries:
subg.add_edge(subloop, removed_entries[exit_edge])
try:
subg.remove_edge(exit_edge[0], removed_entries[exit_edge])
except networkx.NetworkXError:
pass
else:
raise
else:
subg.add_edge(subloop, exit_edge[1])
removed_exits[exit_edge] = subloop
_subgraphs = (
networkx.induced_subgraph(subg, nodes).copy()
for nodes in networkx.weakly_connected_components(subg)
)
subg = next(filter(lambda g: entry_node in g.nodes(), _subgraphs))
me = Loop(entry_node, entry_edges, break_edges, continue_edges, loop_body_nodes, subg, tops[:])
return me, [me] + alls
def _parse_loops_from_graph(self, graph: networkx.DiGraph):
"""
Return all Loop instances that can be extracted from a graph.
:param graph: The graph to analyze.
:return: A list of all the Loop instances that were found in the graph.
"""
outtop = []
outall = []
subg: networkx.DiGraph
for subg in (
networkx.induced_subgraph(graph, nodes).copy() for nodes in networkx.strongly_connected_components(graph)
):
if len(subg.nodes()) == 1:
if len(list(subg.successors(list(subg.nodes())[0]))) == 0:
continue
thisloop, allloops = self._parse_loop_graph(subg, graph)
if thisloop is not None:
outall += allloops
outtop.append(thisloop)
return outtop, outall
from angr.analyses import AnalysesHub
AnalysesHub.register_default("LoopFinder", LoopFinder)