Source code for angr.analyses.analysis

import functools
import sys
import contextlib
from collections import defaultdict
from inspect import Signature
from typing import TYPE_CHECKING, TypeVar, Type, Generic, Callable, Optional

import progressbar
import logging
import time

from ..misc.plugins import PluginVendor, VendorPreset
from ..misc.ux import deprecated

if TYPE_CHECKING:
    from ..knowledge_base import KnowledgeBase
    from ..project import Project
    from typing_extensions import ParamSpec

    AnalysisParams = ParamSpec("AnalysisParams")

l = logging.getLogger(name=__name__)


[docs]class AnalysisLogEntry: def __init__(self, message, exc_info=False): if exc_info: (e_type, value, traceback) = sys.exc_info() self.exc_type = e_type self.exc_value = value self.exc_traceback = traceback else: self.exc_type = None self.exc_value = None self.exc_traceback = None self.message = message def __getstate__(self): return ( str(self.__dict__.get("exc_type")), str(self.__dict__.get("exc_value")), str(self.__dict__.get("exc_traceback")), self.message, ) def __setstate__(self, s): self.exc_type, self.exc_value, self.exc_traceback, self.message = s def __repr__(self): if self.exc_type is None: msg_str = repr(self.message) if len(msg_str) > 70: msg_str = msg_str[:66] + "..." if msg_str[0] in ('"', "'"): msg_str += msg_str[0] return "<AnalysisLogEntry %s>" % msg_str else: msg_str = repr(self.message) if len(msg_str) > 40: msg_str = msg_str[:36] + "..." if msg_str[0] in ('"', "'"): msg_str += msg_str[0] return f"<AnalysisLogEntry {msg_str} with {self.exc_type.__name__}: {self.exc_value}>"
A = TypeVar("A", bound="Analysis")
[docs]class AnalysesHub(PluginVendor): """ This class contains functions for all the registered and runnable analyses, """ def __init__(self, project): super().__init__() self.project = project @deprecated() def reload_analyses(self): # pylint: disable=no-self-use return def _init_plugin(self, plugin_cls: Type[A]) -> "AnalysisFactory[A]": return AnalysisFactory(self.project, plugin_cls) def __getstate__(self): s = super().__getstate__() return (s, self.project) def __setstate__(self, sd): s, self.project = sd super().__setstate__(s) def __getitem__(self, plugin_cls: "Type[A]") -> "AnalysisFactory[A]": return AnalysisFactory(self.project, plugin_cls)
[docs]class AnalysisFactory(Generic[A]): def __init__(self, project: "Project", analysis_cls: Type[A]): self._project = project self._analysis_cls = analysis_cls self.__doc__ = "" self.__doc__ += analysis_cls.__doc__ or "" self.__doc__ += analysis_cls.__init__.__doc__ or "" self.__call__.__func__.__signature__ = Signature.from_callable(analysis_cls.__init__)
[docs] def prep( self, fail_fast=False, kb: Optional["KnowledgeBase"] = None, progress_callback: Optional[Callable] = None, show_progressbar: bool = False, ) -> Type[A]: @functools.wraps(self._analysis_cls.__init__) def wrapper(*args, **kwargs): oself = object.__new__(self._analysis_cls) oself.named_errors = defaultdict(list) oself.errors = [] oself.log = [] oself._fail_fast = fail_fast oself._name = self._analysis_cls.__name__ oself.project = self._project oself.kb = kb or self._project.kb oself._progress_callback = progress_callback oself._show_progressbar = show_progressbar oself.__init__(*args, **kwargs) return oself return wrapper # type: ignore
def __call__(self, *args, **kwargs) -> A: fail_fast = kwargs.pop("fail_fast", False) kb = kwargs.pop("kb", self._project.kb) progress_callback = kwargs.pop("progress_callback", None) show_progressbar = kwargs.pop("show_progressbar", False) w = self.prep( fail_fast=fail_fast, kb=kb, progress_callback=progress_callback, show_progressbar=show_progressbar ) r = w(*args, **kwargs) # clean up so that it's always pickleable r._progressbar = None return r
[docs]class StatusBar(progressbar.widgets.WidgetBase): """ Implements a progressbar component for displaying raw text. """ def __init__(self, width: Optional[int] = 40): super().__init__() self.status: str = "" self.width = width def __call__(self, progress, data, **kwargs): # pylint:disable=unused-argument if self.width is None: return self.status if len(self.status) < self.width: return self.status.ljust(self.width, " ") else: return self.status[: self.width]
[docs]class Analysis: """ This class represents an analysis on the program. :ivar project: The project for this analysis. :type project: angr.Project :ivar KnowledgeBase kb: The knowledgebase object. :ivar _progress_callback: A callback function for receiving the progress of this analysis. It only takes one argument, which is a float number from 0.0 to 100.0 indicating the current progress. :ivar bool _show_progressbar: If a progressbar should be shown during the analysis. It's independent from _progress_callback. :ivar progressbar.ProgressBar _progressbar: The progress bar object. """ project: "Project" = None kb: "KnowledgeBase" = None _fail_fast = None _name = None errors = [] named_errors = defaultdict(list) _progress_callback = None _show_progressbar = False _progressbar = None _statusbar: Optional[StatusBar] = None _PROGRESS_WIDGETS = [ progressbar.Percentage(), " ", progressbar.Bar(), " ", progressbar.Timer(), " ", progressbar.ETA(), " ", StatusBar(), ] @contextlib.contextmanager def _resilience(self, name=None, exception=Exception): try: yield except exception: # pylint:disable=broad-except if self._fail_fast: raise else: error = AnalysisLogEntry("exception occurred", exc_info=True) l.error("Caught and logged %s with resilience: %s", error.exc_type.__name__, error.exc_value) if name is None: self.errors.append(error) else: self.named_errors[name].append(error) def _initialize_progressbar(self): """ Initialize the progressbar. :return: None """ self._progressbar = progressbar.ProgressBar(widgets=Analysis._PROGRESS_WIDGETS, max_value=10000 * 100).start() self._statusbar = self._progressbar.widgets[-1] def _update_progress(self, percentage, text=None, **kwargs): """ Update the progress with a percentage, including updating the progressbar as well as calling the progress callback. :param float percentage: Percentage of the progressbar. from 0.0 to 100.0. :param kwargs: Other parameters that will be passed to the progress_callback handler. :return: None """ if self._show_progressbar: if self._progressbar is None: self._initialize_progressbar() self._progressbar.update(percentage * 10000) if text is not None and self._statusbar is not None: self._statusbar.status = text if self._progress_callback is not None: self._progress_callback(percentage, text=text, **kwargs) # pylint:disable=not-callable def _finish_progress(self): """ Mark the progressbar as finished. :return: None """ if self._show_progressbar: if self._progressbar is None: self._initialize_progressbar() if self._progressbar is not None: self._progressbar.finish() # Remove the progressbar object so it will not be pickled self._progressbar = None if self._progress_callback is not None: self._progress_callback(100.0) # pylint:disable=not-callable @staticmethod def _release_gil(ctr, freq, sleep_time=0.001): """ Periodically calls time.sleep() and releases the GIL so other threads (like, GUI threads) have a much better chance to be scheduled, and other critical components (like the GUI) can be kept responsiveness. This is, of course, a hack before we move all computational intensive tasks to pure C++ implementations. :param int ctr: A number provided by the caller. :param int freq: How frequently time.sleep() should be called. time.sleep() is called when ctr % freq == 0. :param sleep_time: Number (or fraction) of seconds to sleep. :return: None """ if ctr != 0 and ctr % freq == 0: time.sleep(sleep_time) def __getstate__(self): d = dict(self.__dict__) if "_progressbar" in d: del d["_progressbar"] if "_progress_callback" in d: del d["_progress_callback"] if "_statusbar" in d: del d["_statusbar"] return d def __setstate__(self, state): self.__dict__.update(state) def __repr__(self): return f"<{self._name} Analysis Result at {id(self):#x}>"
default_analyses = VendorPreset() AnalysesHub.register_preset("default", default_analyses)