Source code for angr.exploration_techniques.loop_seer

import logging

from . import ExplorationTechnique
from ..knowledge_base import KnowledgeBase
from ..knowledge_plugins.functions import Function


l = logging.getLogger(name=__name__)


[docs]class LoopSeer(ExplorationTechnique): """ This exploration technique monitors exploration and maintains all loop-related data (well, currently it is just the loop trip counts, but feel free to add something else). """
[docs] def __init__( self, cfg=None, functions=None, loops=None, use_header=False, bound=None, bound_reached=None, discard_stash="spinning", limit_concrete_loops=True, ): """ :param cfg: Normalized CFG is required. :param functions: Function(s) containing the loop(s) to be analyzed. :param loops: Specific group of Loop(s) to be analyzed, if this is None we run the LoopFinder analysis. :param use_header: Whether to use header based trip counter to compare with the bound limit. :param bound: Limit the number of iterations a loop may be executed. :param bound_reached: If provided, should be a function that takes the LoopSeer and the succ_state. Will be called when loop execution reach the given bound. Default to moving states that exceed the loop limit to a discard stash. :param discard_stash: Name of the stash containing states exceeding the loop limit. :param limit_concrete_loops: If False, do not limit a loop back-edge if it is the only successor (Defaults to True to maintain the original behavior) """ super().__init__() self.cfg = cfg self.functions = functions self.bound = bound self.bound_reached = bound_reached self.discard_stash = discard_stash self.use_header = use_header self.limit_concrete_loops = limit_concrete_loops self.loops = {} self.cut_succs = [] if type(loops) is Loop: loops = [loops] if type(loops) in (list, tuple) and all(type(l) is Loop for l in loops): for loop in loops: if loop.entry_edges: self.loops[loop.entry_edges[0][1].addr] = loop elif loops is not None: raise TypeError("Invalid type for 'loops' parameter!")
[docs] def setup(self, simgr): if self.cfg is None: cfg_kb = KnowledgeBase(self.project) self.cfg = self.project.analyses.CFGFast(kb=cfg_kb, normalize=True) elif not self.cfg.normalized: l.warning("LoopSeer must use a normalized CFG. Normalizing the provided CFG...") self.cfg.normalize() funcs = None if type(self.functions) in (str, int, Function): funcs = [self._get_function(self.functions)] elif type(self.functions) in (list, tuple) and all(type(f) in (str, int, Function) for f in self.functions): funcs = [] for f in self.functions: func = self._get_function(f) if func is not None: funcs.append(func) funcs = None if not funcs else funcs elif self.functions is not None: raise TypeError("Invalid type for 'functions' parameter!") if not self.loops: loop_finder = self.project.analyses.LoopFinder(kb=self.cfg.kb, normalize=True, functions=funcs) for loop in loop_finder.loops: if loop.entry_edges: entry = loop.entry_edges[0][1] self.loops[entry.addr] = loop
[docs] def filter(self, simgr, state, **kwargs): if state in self.cut_succs: self.cut_succs.remove(state) return self.discard_stash else: return simgr.filter(state, **kwargs)
[docs] def successors(self, simgr, state, **kwargs): node = self.cfg.model.get_any_node(state.addr) if node is not None: kwargs["num_inst"] = min(kwargs.get("num_inst", float("inf")), len(node.instruction_addrs)) succs = simgr.successors(state, **kwargs) # Edge case: When limiting concrete loops, we may not want to do so # via the header. If there is a way out of the loop, and we can # chose not to take it (e.g., the loop is not concrete), increase the trip count at_loop_exit = False for succ_state in succs.successors: if succ_state.loop_data.current_loop: if succ_state.addr in succ_state.loop_data.current_loop[-1][1]: l.debug( "One of the successors: %s is at the exit of the current loop %s", hex(succ_state.addr), succ_state.loop_data.current_loop[-1][0], ) at_loop_exit = True for succ_state in succs.successors: # Processing a currently running loop if succ_state.loop_data.current_loop: l.debug("Loops currently active are %s", succ_state.loop_data.current_loop) # Extract info about the loop ([-1] takes the last active loop and [0] the loop object) loop = succ_state.loop_data.current_loop[-1][0] header = loop.entry.addr l.debug("Loop currently active is %s with entry %s", loop, hex(header)) if succ_state.addr == header: continue_addrs = [e[0].addr for e in loop.continue_edges] # If there's only one successor, the loop is "concrete" # We may wish not to cut loops that are concrete, as this can dead-end # the path prematurely, even when there won't be state explosion. if self.limit_concrete_loops or len(succs.successors) > 1: # If the previous state contains an address inside the continue_addrs, a.k.a "we have # traversed the continue edge" we did an iteration over the back edge. if succ_state.history.addr in continue_addrs: l.debug( "Continue edge traversed, incrementing back_edge_trip_counts for addr at %s", hex(succ_state.addr), ) # This is an iteration on the back edge. succ_state.loop_data.back_edge_trip_counts[succ_state.addr][-1] += 1 l.debug( "Continue edge traversed, incrementing header_trip_counts for addr at %s", hex(succ_state.addr), ) # This is also an iteration over the loop's header succ_state.loop_data.header_trip_counts[succ_state.addr][-1] += 1 # current_loop[-1][1] is the exit node of the current loop. elif succ_state.addr in succ_state.loop_data.current_loop[-1][1]: # We have terminated the loop, so let's pop it out from the current active. l.debug("Deactivating loop at %s because hits the exit node", hex(succ_state.addr)) succ_state.loop_data.current_loop.pop() elif at_loop_exit: # We're not at the header, but we're where we exit the loop # NOTE: this only matters if you want to not limit concrete loops if not self.limit_concrete_loops and len(succs.successors) > 1: l.debug("At loop exit, incrementing back_edge_trip_counts for addr at %s", hex(succ_state.addr)) succ_state.loop_data.back_edge_trip_counts[succ_state.addr][-1] += 1 # If we have set a bound for symbolic/concrete loops we want to handle it here if self.bound is not None and succ_state.loop_data.current_loop: counts = 0 # Decide how we should check the bounds if self.use_header: counts = succ_state.loop_data.header_trip_counts[header][-1] else: if succ_state.addr in succ_state.loop_data.back_edge_trip_counts: counts = succ_state.loop_data.back_edge_trip_counts[succ_state.addr][-1] if counts > self.bound: if self.bound_reached is not None: # We want to pass self to modify the LoopSeer state if needed # Users can modify succ_state in the handler to implement their own logic # or edit the state of the LoopSeer. self.bound_reached(self, succ_state) else: # Remove the state from the successors object # This state is going to be filtered by the self.filter function self.cut_succs.append(succ_state) l.debug("%s back edge based trip counts %s", state, state.loop_data.back_edge_trip_counts) l.debug("%s header based trip counts %s", state, state.loop_data.header_trip_counts) else: l.debug("No loop are currently active at %s", hex(succ_state.addr)) # Loop entry detected. This test is put here because in case of # nested loops, we want to handle the outer loop before proceeding # the inner loop. if succ_state.addr in self.loops and not self._inside_current_loops(succ_state): loop = self.loops[succ_state.addr] header = loop.entry.addr l.debug("Activating loop %s for state at %s", loop, hex(succ_state.addr)) exits = [e[1].addr for e in loop.break_edges] succ_state.loop_data.back_edge_trip_counts[header].append(0) # If we are not limiting concrete loops, we also consider # trip counts at the possible exits if not self.limit_concrete_loops: for node in loop.body_nodes: succ_state.loop_data.back_edge_trip_counts[node.addr].append(0) # save info about current active loop for the succ state succ_state.loop_data.header_trip_counts[header].append(1) succ_state.loop_data.current_loop.append((loop, exits)) return succs
# pylint: disable=R0201 def _inside_current_loops(self, succ_state): current_loops_addrs = [x[0].entry.addr for x in succ_state.loop_data.current_loop] if succ_state.addr in current_loops_addrs: return True return False def _get_function(self, func): f = None if type(func) is str: f = self.cfg.kb.functions.function(name=func) if f is None: l.warning("Function '%s' doesn't exist in the CFG. Skipping...", func) elif type(func) is int: f = self.cfg.kb.functions.function(addr=func) if f is None: l.warning("Function at 0x%x doesn't exist in the CFG. Skipping...", func) elif type(func) is Function: f = func return f
from ..analyses.loopfinder import Loop