Source code for angr.knowledge_plugins.cfg.cfg_node

import traceback
import logging
from typing import TYPE_CHECKING, Union, Optional

from archinfo.arch_soot import SootAddressDescriptor
import archinfo

from angr.codenode import BlockNode, HookNode, SyscallNode
from angr.engines.successors import SimSuccessors
from angr.serializable import Serializable
from angr.protos import cfg_pb2

if TYPE_CHECKING:
    from .cfg_model import CFGModel
    import angr

_l = logging.getLogger(__name__)


[docs]class CFGNodeCreationFailure: """ This class contains additional information for whenever creating a CFGNode failed. It includes a full traceback and the exception messages. """ __slots__ = ["short_reason", "long_reason", "traceback"]
[docs] def __init__(self, exc_info=None, to_copy=None): if to_copy is None: e_type, e, e_traceback = exc_info self.short_reason = str(e_type) self.long_reason = repr(e) self.traceback = traceback.format_exception(e_type, e, e_traceback) else: self.short_reason = to_copy.short_reason self.long_reason = to_copy.long_reason self.traceback = to_copy.traceback
def __hash__(self): return hash((self.short_reason, self.long_reason, self.traceback))
[docs]class CFGNode(Serializable): """ This class stands for each single node in CFG. """ __slots__ = ( "addr", "simprocedure_name", "syscall_name", "size", "no_ret", "is_syscall", "function_address", "block_id", "thumb", "byte_string", "_name", "instruction_addrs", "irsb", "has_return", "_cfg_model", "_hash", "soot_block", )
[docs] def __init__( self, addr, size, cfg, simprocedure_name=None, no_ret=False, function_address=None, block_id=None, irsb=None, soot_block=None, instruction_addrs=None, thumb=False, byte_string=None, is_syscall=None, name=None, ): """ Note: simprocedure_name is not used to recreate the SimProcedure object. It's only there for better __repr__. """ self.addr = addr self.size = size self.simprocedure_name = simprocedure_name self.no_ret = no_ret self._cfg_model: "CFGModel" = cfg self.function_address = function_address self.block_id: Union["angr.analyses.cfg.cfg_job_base.BlockID", int] = block_id self.thumb = thumb self.byte_string: Optional[bytes] = byte_string self._name = None if name is not None: self._name = name elif isinstance(addr, SootAddressDescriptor): self._name = repr(addr) else: self._name = simprocedure_name self.instruction_addrs = list(instruction_addrs) if instruction_addrs is not None else [] if is_syscall is not None: self.is_syscall = is_syscall else: self.is_syscall = bool(self.simprocedure_name and self._cfg_model.project.simos.is_syscall_addr(addr)) if not instruction_addrs and not self.is_simprocedure: # We have to collect instruction addresses by ourselves if irsb is not None: self.instruction_addrs = irsb.instruction_addresses self.irsb = None self.soot_block = soot_block self.has_return = False self._hash = None # Sanity check if self.block_id is None and type(self) is CFGNode: # pylint: disable=unidiomatic-typecheck _l.warning("block_id is unspecified for %s. Default to its address %#x.", str(self), self.addr) self.block_id = self.addr
@property def name(self): if self._name is None: sym = self._cfg_model.project.loader.find_symbol(self.addr) if sym is not None: self._name = sym.name if self._name is None and isinstance(self._cfg_model.project.arch, archinfo.ArchARM) and self.addr & 1: sym = self._cfg_model.project.loader.find_symbol(self.addr - 1) if sym is not None: self._name = sym.name if self.function_address and self._name is None: sym = self._cfg_model.project.loader.find_symbol(self.function_address) if sym is not None: self._name = sym.name if self._name is not None: offset = self.addr - self.function_address self._name = f"{self._name}{offset:+#x}" return self._name @property def successors(self): return self._cfg_model.get_successors(self) @property def predecessors(self): return self._cfg_model.get_predecessors(self)
[docs] def successors_and_jumpkinds(self, excluding_fakeret=True): return self._cfg_model.get_successors_and_jumpkinds(self, excluding_fakeret=excluding_fakeret)
[docs] def predecessors_and_jumpkinds(self, excluding_fakeret=True): return self._cfg_model.get_predecessors_and_jumpkinds(self, excluding_fakeret=excluding_fakeret)
[docs] def get_data_references(self, kb=None): """ Get the known data references for this CFGNode via the knowledge base. :param kb: Which knowledge base to use; uses the global KB by default if none is provided :return: Generator yielding xrefs to this CFGNode's block. :rtype: iter """ if not self._cfg_model.ident.startswith("CFGFast"): raise ValueError("Memory data is currently only supported in CFGFast.") if not kb: kb = self._cfg_model.project.kb if not kb: raise ValueError("The Knowledge Base does not exist!") for instr_addr in self.instruction_addrs: refs = list(kb.xrefs.get_xrefs_by_ins_addr(instr_addr)) yield from refs
@property def accessed_data_references(self): """ Property providing a view of all the known data references for this CFGNode via the global knowledge base :return: Generator yielding xrefs to this CFGNode's block. :rtype: iter """ return self.get_data_references() @property def is_simprocedure(self): return self.simprocedure_name is not None @property def callstack_key(self): # A dummy stub for the future support of context sensitivity in CFGFast return None # # Serialization # @classmethod def _get_cmsg(cls): return cfg_pb2.CFGNode()
[docs] def serialize_to_cmessage(self): if isinstance(self, CFGENode): raise NotImplementedError("CFGEmulated instances are not currently serializable") obj = self._get_cmsg() obj.ea = self.addr obj.size = self.size obj.instr_addrs.extend(self.instruction_addrs) if self.block_id is not None: if type(self.block_id) is int: obj.block_id.append(self.block_id) # pylint:disable=no-member else: # should be a BlockID raise NotImplementedError("CFGEmulated instances are not currently serializable") return obj
[docs] @classmethod def parse_from_cmessage(cls, cmsg, cfg=None): # pylint:disable=arguments-differ if len(cmsg.block_id) == 0: block_id = None else: block_id = cmsg.block_id[0] if not cmsg.instr_addrs: instruction_addrs = None else: instruction_addrs = list(cmsg.instr_addrs) obj = cls( cmsg.ea, cmsg.size, cfg=cfg, block_id=block_id, instruction_addrs=instruction_addrs, ) return obj
# # Pickling # def __getstate__(self): s = { "addr": self.addr, "size": self.size, "simprocedure_name": self.simprocedure_name, "no_ret": self.no_ret, "function_address": self.function_address, "block_id": self.block_id, "thumb": self.thumb, "byte_string": self.byte_string, "_name": self._name, "instruction_addrs": self.instruction_addrs, "is_syscall": self.is_syscall, "has_return": self.has_return, } return s def __setstate__(self, state): self.__init__( state["addr"], state["size"], None, simprocedure_name=state["simprocedure_name"], no_ret=state["no_ret"], function_address=state["function_address"], block_id=state["block_id"], thumb=state["thumb"], byte_string=state["byte_string"], name=state["_name"], instruction_addrs=state["instruction_addrs"], is_syscall=state["is_syscall"], ) self.has_return = state["has_return"] # # Methods #
[docs] def copy(self): c = CFGNode( self.addr, self.size, self._cfg_model, simprocedure_name=self.simprocedure_name, no_ret=self.no_ret, function_address=self.function_address, block_id=self.block_id, irsb=self.irsb, instruction_addrs=self.instruction_addrs, thumb=self.thumb, byte_string=self.byte_string, is_syscall=self.is_syscall, name=self._name, ) return c
[docs] def merge(self, other): """ Merges this node with the other, returning a new node that spans the both. """ new_node = self.copy() new_node.size += other.size new_node.instruction_addrs += other.instruction_addrs # FIXME: byte_string should never be none, but it is sometimes # like, for example, patcherex test_cfg.py:test_fullcfg_properties if new_node.byte_string is None or other.byte_string is None: new_node.byte_string = None else: new_node.byte_string += other.byte_string return new_node
def __repr__(self): s = "<CFGNode " if self.name is not None: s += self.name + " " elif not isinstance(self.addr, SootAddressDescriptor): s += hex(self.addr) if self.size is not None: s += "[%d]" % self.size s += ">" return s def __eq__(self, other): if isinstance(other, SimSuccessors): raise ValueError("You do not want to be comparing a SimSuccessors instance to a CFGNode.") if type(other) is not CFGNode: return False return self.addr == other.addr and self.size == other.size and self.simprocedure_name == other.simprocedure_name def __hash__(self): if self._hash is None: self._hash = hash( ( self.addr, self.simprocedure_name, ) ) return self._hash
[docs] def to_codenode(self): if self.is_syscall: return SyscallNode(self.addr, self.size, self.simprocedure_name) if self.is_simprocedure: return HookNode(self.addr, self.size, self.simprocedure_name) return BlockNode(self.addr, self.size, thumb=self.thumb)
@property def block(self): if self.is_simprocedure or self.is_syscall: return None project = self._cfg_model.project # everything in angr is connected with everything... b = project.factory.block(self.addr, size=self.size, opt_level=self._cfg_model._iropt_level) return b
[docs]class CFGENode(CFGNode): """ The CFGNode that is used in CFGEmulated. """ __slots__ = [ "input_state", "looping_times", "depth", "final_states", "creation_failure_info", "return_target", "syscall", "_callstack_key", ]
[docs] def __init__( self, addr, size, cfg, simprocedure_name=None, no_ret=False, function_address=None, block_id=None, irsb=None, instruction_addrs=None, thumb=False, byte_string=None, is_syscall=None, name=None, # CFGENode specific input_state=None, final_states=None, syscall_name=None, looping_times=0, depth=None, callstack_key=None, creation_failure_info=None, ): super().__init__( addr, size, cfg, simprocedure_name=simprocedure_name, no_ret=no_ret, function_address=function_address, block_id=block_id, irsb=irsb, instruction_addrs=instruction_addrs, thumb=thumb, byte_string=byte_string, is_syscall=is_syscall, name=name, ) self.input_state = input_state self.syscall_name = syscall_name self.looping_times = looping_times self.depth = depth self.creation_failure_info = None if creation_failure_info is not None: self.creation_failure_info = CFGNodeCreationFailure(creation_failure_info) self._callstack_key = callstack_key self.final_states = [] if final_states is None else final_states # If this CFG contains an Ijk_Call, `return_target` stores the returning site. # Note: this is regardless of whether the call returns or not. You should always check the `no_ret` property if # you are using `return_target` to do some serious stuff. self.return_target = None
@property def callstack_key(self): return self._callstack_key @property def creation_failed(self): return self.creation_failure_info is not None
[docs] def downsize(self): """ Drop saved states. """ self.input_state = None self.final_states = []
def __repr__(self): s = "<CFGENode " if self.name is not None: s += self.name + " " s += hex(self.addr) if self.size is not None: s += "[%d]" % self.size if self.looping_times > 0: s += " - %d" % self.looping_times if self.creation_failure_info is not None: s += f" - creation failed: {self.creation_failure_info.long_reason}" s += ">" return s def __eq__(self, other): if isinstance(other, SimSuccessors): raise ValueError("You do not want to be comparing a SimSuccessors instance to a CFGNode.") if not isinstance(other, CFGENode): return False return ( self.callstack_key == other.callstack_key and self.addr == other.addr and self.size == other.size and self.looping_times == other.looping_times and self.simprocedure_name == other.simprocedure_name ) def __hash__(self): return hash( (self.callstack_key, self.addr, self.looping_times, self.simprocedure_name, self.creation_failure_info) ) # # Pickeling # def __getstate__(self): s = super().__getstate__() s["syscall_name"] = self.syscall_name s["looping_times"] = self.looping_times s["depth"] = self.depth s["creation_failure_info"] = self.creation_failure_info s["_callstack_key"] = self.callstack_key s["return_target"] = self.return_target return s def __setstate__(self, state): self.__init__( state["addr"], state["size"], None, simprocedure_name=state["simprocedure_name"], no_ret=state["no_ret"], function_address=state["function_address"], block_id=state["block_id"], instruction_addrs=state["instruction_addrs"], thumb=state["thumb"], byte_string=state["byte_string"], is_syscall=state["is_syscall"], name=state["_name"], syscall_name=state["syscall_name"], looping_times=state["looping_times"], depth=state["depth"], callstack_key=state["_callstack_key"], creation_failure_info=state["creation_failure_info"], )
[docs] def copy(self): return CFGENode( self.addr, self.size, self._cfg_model, simprocedure_name=self.simprocedure_name, no_ret=self.no_ret, function_address=self.function_address, block_id=self.block_id, irsb=self.irsb, instruction_addrs=self.instruction_addrs, thumb=self.thumb, byte_string=self.byte_string, input_state=self.input_state, syscall_name=self.syscall_name, looping_times=self.looping_times, is_syscall=self.is_syscall, depth=self.depth, final_states=self.final_states[::], callstack_key=self.callstack_key, )