Source code for angr.analyses.forward_analysis.forward_analysis

from collections import defaultdict
from typing import Any, Dict, List, Callable, Optional, Generic, Type, TypeVar, Tuple, Set, TYPE_CHECKING, Union

import networkx

from .visitors.graph import NodeType
from ...sim_state import SimState
from ...errors import AngrForwardAnalysisError
from ...errors import AngrSkipJobNotice, AngrDelayJobNotice, AngrJobMergingFailureNotice, AngrJobWideningFailureNotice
from ...utils.algo import binary_insert
from .job_info import JobInfo, JobType, JobKey

if TYPE_CHECKING:
    from .visitors.graph import GraphVisitor

AnalysisState = TypeVar("AnalysisState")


[docs]class ForwardAnalysis(Generic[AnalysisState, NodeType, JobType, JobKey]): """ This is my very first attempt to build a static forward analysis framework that can serve as the base of multiple static analyses in angr, including CFG analysis, VFG analysis, DDG, etc. In short, ForwardAnalysis performs a forward data-flow analysis by traversing a graph, compute on abstract values, and store results in abstract states. The user can specify what graph to traverse, how a graph should be traversed, how abstract values and abstract states are defined, etc. ForwardAnalysis has a few options to toggle, making it suitable to be the base class of several different styles of forward data-flow analysis implementations. ForwardAnalysis supports a special mode when no graph is available for traversal (for example, when a CFG is being initialized and constructed, no other graph can be used). In that case, the graph traversal functionality is disabled, and the optimal graph traversal order is not guaranteed. The user can provide a job sorting method to sort the jobs in queue and optimize traversal order. Feel free to discuss with me (Fish) if you have any suggestions or complaints. """
[docs] def __init__( self, order_jobs=False, allow_merging=False, allow_widening=False, status_callback: Optional[Callable[[Type["ForwardAnalysis"]], Any]] = None, graph_visitor: "Optional[GraphVisitor[NodeType]]" = None, ): """ Constructor :param bool order_jobs: If all jobs should be ordered or not. :param bool allow_merging: If job merging is allowed. :param bool allow_widening: If job widening is allowed. :param graph_visitor: A graph visitor to provide successors. :type graph_visitor: GraphVisitor or None :return: None """ self._order_jobs = order_jobs self._allow_merging = allow_merging self._allow_widening = allow_widening self._status_callback = status_callback self._graph_visitor = graph_visitor # sanity checks if self._allow_widening and not self._allow_merging: raise AngrForwardAnalysisError("Merging must be allowed if widening is allowed.") # Analysis progress control self._should_abort = False # All remaining jobs self._job_info_queue: List[JobInfo[JobType, JobKey]] = [] # A map between job key to job. Jobs with the same key will be merged by calling _merge_jobs() self._job_map: Dict[JobKey, JobInfo[JobType, JobKey]] = {} # A mapping between node and its input states self._input_states: Dict[NodeType, List[AnalysisState]] = defaultdict(list) # A mapping between node and its output state self._output_state: Dict[NodeType, AnalysisState] = {} # The graph! # Analysis results (nodes) are stored here self._graph = networkx.DiGraph()
# # Properties # @property def should_abort(self): """ Should the analysis be terminated. :return: True/False """ return self._should_abort @property def graph(self) -> networkx.DiGraph: return self._graph @property def jobs(self): for job_info in self._job_info_queue: yield job_info.job # # Public methods #
[docs] def abort(self): """ Abort the analysis :return: None """ self._should_abort = True
[docs] def has_job(self, job: JobType) -> bool: """ Checks whether there exists another job which has the same job key. :param job: The job to check. :return: True if there exists another job with the same key, False otherwise. """ job_key = self._job_key(job) return job_key in self._job_map
[docs] def downsize(self): self._input_states = defaultdict(list) self._output_state = {}
# # Abstract interfaces # # Common interfaces def _pre_analysis(self) -> None: raise NotImplementedError("_pre_analysis() is not implemented.") def _intra_analysis(self) -> None: raise NotImplementedError("_intra_analysis() is not implemented.") def _post_analysis(self) -> None: raise NotImplementedError("_post_analysis() is not implemented.") def _job_key(self, job: JobType) -> JobKey: raise NotImplementedError("_job_key() is not implemented.") def _get_successors(self, job: JobType) -> Union[List[SimState], List[JobType]]: raise NotImplementedError("_get_successors() is not implemented.") def _pre_job_handling(self, job: JobType) -> None: raise NotImplementedError("_pre_job_handling() is not implemented.") def _post_job_handling(self, job: JobType, new_jobs, successors: List[SimState]) -> None: raise NotImplementedError("_post_job_handling() is not implemented.") def _handle_successor(self, job: JobType, successor: SimState, successors: List[SimState]) -> List[JobType]: raise NotImplementedError("_handle_successor() is not implemented.") def _job_queue_empty(self) -> None: raise NotImplementedError("_job_queue_empty() is not implemented.") def _initial_abstract_state(self, node: NodeType) -> AnalysisState: raise NotImplementedError("_initial_abstract_state() is not implemented.") def _process_input_state_for_successor( self, node: NodeType, successor: NodeType, input_state: AnalysisState ) -> AnalysisState: # can be overwritten return input_state def _node_key(self, node: NodeType) -> NodeType: # pylint:disable=no-self-use """ Override this method if hash(node) is slow for the type of node that are in use. """ return node def _run_on_node(self, node: NodeType, state: AnalysisState) -> Tuple[bool, AnalysisState]: """ The analysis routine that runs on each node in the graph. For compatibility reasons, the variable `changed` in the returning tuple can have three values: - True, means a change has occurred, the output state is not the same as the input state, and a fixed point is not reached. Usually used if the analysis performs weak updates. - False, means no change has occurred. Usually used if the analysis performs weak updates. - None, means no change detection is performed during this process (e.g., if the analysis requires strong updates), and change detection will be performed later during _merge_states(). :param node: A node in the graph. :param state: An abstract state that acts as the initial abstract state of this analysis routine. :return: A tuple: (changed, output abstract state) """ raise NotImplementedError("_run_on_node() is not implemented.") def _merge_states(self, node: NodeType, *states: AnalysisState) -> Tuple[AnalysisState, bool]: """ Merge multiple abstract states into one. :param node: A node in the graph. :param states: Abstract states to merge. :return: A merged abstract state, and a boolean variable indicating if a local fixed-point has reached ( i.e., union(state0, state1) == state0), in which case, its successors will not be revisited. """ raise NotImplementedError("_merge_states() is not implemented.") def _widen_states(self, *states: AnalysisState) -> AnalysisState: raise NotImplementedError("_widen_states() is not implemented.") # Special interfaces for non-graph-traversal mode def _merge_jobs(self, *jobs: JobType): raise NotImplementedError("_merge_jobs() is not implemented.") def _should_widen_jobs(self, *jobs: JobType): raise NotImplementedError("_should_widen_jobs() is not implemented.") def _widen_jobs(self, *jobs: JobType): raise NotImplementedError("_widen_jobs() is not implemented.") def _job_sorting_key(self, job: JobType) -> int: raise NotImplementedError("_job_sorting_key() is not implemented.") # # Private methods # def _analyze(self) -> None: """ The main analysis routine. :return: None """ self._pre_analysis() if self._graph_visitor is None: # There is no base graph that we can rely on. The analysis itself should generate successors for the # current job. # An example is the CFG recovery. self._analysis_core_baremetal() else: # We have a base graph to follow. Just handle the current job. self._analysis_core_graph() self._post_analysis() def _analysis_core_graph(self) -> None: while not self.should_abort: self._intra_analysis() n: NodeType = self._graph_visitor.next_node() if n is None: break job_state = self._get_and_update_input_state(n) if job_state is None: job_state = self._initial_abstract_state(n) changed, output_state = self._run_on_node(n, job_state) if changed is False: # no change is detected self._output_state[self._node_key(n)] = output_state continue if changed is True: # changes detected # output state of node n is input state for successors to node n self._add_input_state(n, output_state) # revisit all its successors self._graph_visitor.revisit_successors(n, include_self=False) else: # the change of states are determined during state merging (_add_input_state()) instead of during # simulated execution (_run_on_node()). if self._node_key(n) not in self._output_state: reached_fixedpoint = False else: # is the output state the same as the old one? _, reached_fixedpoint = self._merge_states(n, self._output_state[self._node_key(n)], output_state) self._output_state[self._node_key(n)] = output_state if not reached_fixedpoint: successors_to_visit = self._add_input_state(n, output_state) # revisit all successors in the `successors_to_visit` list for succ in successors_to_visit: self._graph_visitor.revisit_node(succ) def _add_input_state(self, node: NodeType, input_state: AnalysisState) -> Set[NodeType]: """ Add the input state to all successors of the given node. :param node: The node whose successors' input states will be touched. :param input_state: The state that will be added to successors of the node. """ successors = set(self._graph_visitor.successors(node)) # successors_to_visit = set() # a collection of successors whose input states did not reach a fixed point for succ in successors: # if a node has only one predecessor, we overwrite existing input states # otherwise, we add the state as a new input state # this is an approximation for removing input states for all nodes that `node` dominates new_input_state = self._process_input_state_for_successor(node, succ, input_state) if sum(1 for _ in self._graph_visitor.predecessors(succ)) == 1: self._input_states[self._node_key(succ)] = [new_input_state] else: self._input_states[self._node_key(succ)].append(new_input_state) return successors def _get_and_update_input_state(self, node: NodeType) -> Optional[AnalysisState]: """ Get the input abstract state for this node, and remove it from the state map. :param node: The node in graph. :return: A merged state, or None if there is no input state for this node available. """ if self._node_key(node) in self._input_states: input_state = self._get_input_state(node) self._input_states[self._node_key(node)] = [input_state] return input_state return None def _get_input_state(self, node: NodeType) -> Optional[AnalysisState]: """ Get the input abstract state for this node. :param node: The node in graph. :return: A merged state, or None if there is no input state for this node available. """ if self._node_key(node) not in self._input_states: return None all_input_states = self._input_states.get(self._node_key(node)) if len(all_input_states) == 1: return all_input_states[0] merged_state, _ = self._merge_states(node, *all_input_states) return merged_state def _analysis_core_baremetal(self) -> None: if not self._job_info_queue: self._job_queue_empty() while not self.should_abort: if self._status_callback is not None: self._status_callback(self) # should_abort might be changed by the status callback function if self.should_abort: return if not self._job_info_queue: self._job_queue_empty() if not self._job_info_queue: # still no job available break job_info = self._job_info_queue[0] try: self._pre_job_handling(job_info.job) except AngrDelayJobNotice: # delay the handling of this job continue except AngrSkipJobNotice: # consume and skip this job self._job_info_queue = self._job_info_queue[1:] self._job_map.pop(self._job_key(job_info.job), None) continue # remove the job info from the map self._job_map.pop(self._job_key(job_info.job), None) self._job_info_queue = self._job_info_queue[1:] self._process_job_and_get_successors(job_info) # Short-cut for aborting the analysis if self.should_abort: break self._intra_analysis() def _process_job_and_get_successors(self, job_info: JobInfo[JobType, JobKey]) -> None: """ Process a job, get all successors of this job, and call _handle_successor() to handle each successor. :param JobInfo job_info: The JobInfo instance :return: None """ job = job_info.job successors = self._get_successors(job) all_new_jobs = [] for successor in successors: new_jobs = self._handle_successor(job, successor, successors) if new_jobs: all_new_jobs.extend(new_jobs) for new_job in new_jobs: self._insert_job(new_job) self._post_job_handling(job, all_new_jobs, successors) def _insert_job(self, job: JobType) -> None: """ Insert a new job into the job queue. If the job queue is ordered, this job will be inserted at the correct position. :param job: The job to insert :return: None """ key = self._job_key(job) if self._allow_merging: if key in self._job_map: job_info = self._job_map[key] # decide if we want to trigger a widening # if not, we'll simply do the merge # TODO: save all previous jobs for the sake of widening job_added = False if self._allow_widening and self._should_widen_jobs(job_info.job, job): try: widened_job = self._widen_jobs(job_info.job, job) # remove the old job since now we have a widened one if job_info in self._job_info_queue: self._job_info_queue.remove(job_info) job_info.add_job(widened_job, widened=True) job_added = True except AngrJobWideningFailureNotice: # widening failed # fall back to merging... pass if not job_added: try: merged_job = self._merge_jobs(job_info.job, job) # remove the old job since now we have a merged one if job_info in self._job_info_queue: self._job_info_queue.remove(job_info) job_info.add_job(merged_job, merged=True) except AngrJobMergingFailureNotice: # merging failed job_info = JobInfo(key, job) # update the job map self._job_map[key] = job_info else: job_info = JobInfo(key, job) self._job_map[key] = job_info else: job_info = JobInfo(key, job) self._job_map[key] = job_info if self._order_jobs: binary_insert(self._job_info_queue, job_info, lambda elem: self._job_sorting_key(elem.job)) else: self._job_info_queue.append(job_info) def _peek_job(self, pos: int) -> JobType: """ Return the job currently at position `pos`, but still keep it in the job queue. An IndexError will be raised if that position does not currently exist in the job list. :param int pos: Position of the job to get. :return: The job """ if pos < len(self._job_info_queue): return self._job_info_queue[pos].job raise IndexError() def _remove_job(self, predicate: Callable) -> None: """ Remove jobs that satisfy certain criteria from job_info_queue. :param predicate: A method that determines if a job should be removed or not. """ to_remove = [] for job_info in self._job_info_queue: if predicate(job_info.job): to_remove.append(job_info) for job_info in to_remove: self._job_info_queue.remove(job_info) key = self._job_key(job_info.job) if key in self._job_map: del self._job_map[key]