Source code for angr.knowledge_plugins.cfg.cfg_model

# pylint:disable=no-member
import pickle
import logging
from typing import Optional, List, Dict, Tuple, DefaultDict, Callable, TYPE_CHECKING
from collections import defaultdict
import bisect
import string

import networkx

import cle

from ...engines.vex.lifter import VEX_IRSB_MAX_SIZE
from ...misc.ux import once
from ...protos import cfg_pb2, primitives_pb2
from ...serializable import Serializable
from ...utils.enums_conv import cfg_jumpkind_to_pb, cfg_jumpkind_from_pb
from ...errors import AngrCFGError
from .cfg_node import CFGNode
from .memory_data import MemoryData, MemoryDataSort
from .indirect_jump import IndirectJump

if TYPE_CHECKING:
    from angr.knowledge_plugins.xrefs import XRefManager, XRef
    from angr.analyses.cfg.segment_list import SegmentList


l = logging.getLogger(name=__name__)

_PRINTABLES = string.printable.replace("\x0b", "").replace("\x0c", "").encode()


[docs]class CFGModel(Serializable): """ This class describes a Control Flow Graph for a specific range of code. """ __slots__ = ( "ident", "graph", "jump_tables", "memory_data", "insn_addr_to_memory_data", "_nodes_by_addr", "_nodes", "_cfg_manager", "_iropt_level", "_node_addrs", "is_arm", "normalized", )
[docs] def __init__(self, ident, cfg_manager=None, is_arm=False): self.ident = ident self._cfg_manager = cfg_manager self.is_arm = is_arm # Necessary settings self._iropt_level = None # The graph self.graph = networkx.DiGraph() # Jump tables self.jump_tables: Dict[int, IndirectJump] = {} # Memory references # A mapping between address and the actual data in memory self.memory_data: Dict[int, MemoryData] = {} # A mapping between address of the instruction that's referencing the memory data and the memory data itself self.insn_addr_to_memory_data: Dict[int, MemoryData] = {} # Lists of CFGNodes indexed by the address of each block. Don't serialize self._nodes_by_addr: DefaultDict[int, List[CFGNode]] = defaultdict(list) # CFGNodes dict indexed by block ID. Don't serialize self._nodes: Dict[int, CFGNode] = {} # addresses of CFGNodes to speed up get_any_node(..., anyaddr=True). Don't serialize self._node_addrs: List[int] = [] self.normalized = False
# # Properties # @property def project(self): if self._cfg_manager is None: return None return self._cfg_manager._kb._project # # Serialization # def __getstate__(self): state = dict(map(lambda x: (x, self.__getattribute__(x)), self.__slots__)) return state def __setstate__(self, state): for attribute, value in state.items(): self.__setattr__(attribute, value) for addr in self._nodes: node = self._nodes[addr] node._cfg_model = self @classmethod def _get_cmsg(cls): return cfg_pb2.CFG()
[docs] def serialize_to_cmessage(self): if "Emulated" in self.ident: raise NotImplementedError("Serializing a CFGEmulated instance is currently not supported.") cmsg = self._get_cmsg() cmsg.ident = self.ident # nodes nodes = [] for n in self.graph.nodes(): nodes.append(n.serialize_to_cmessage()) cmsg.nodes.extend(nodes) # edges edges = [] for src, dst, data in self.graph.edges(data=True): edge = primitives_pb2.Edge() edge.src_ea = src.addr edge.dst_ea = dst.addr for k, v in data.items(): if k == "jumpkind": edge.jumpkind = cfg_jumpkind_to_pb(v) elif k == "ins_addr": edge.ins_addr = v if v is not None else 0xFFFF_FFFF_FFFF_FFFF elif k == "stmt_idx": edge.stmt_idx = v if v is not None else -1 else: edge.data[k] = pickle.dumps(v) edges.append(edge) cmsg.edges.extend(edges) # memory data memory_data = [] for data in self.memory_data.values(): memory_data.append(data.serialize_to_cmessage()) cmsg.memory_data.extend(memory_data) cmsg.normalized = self.normalized return cmsg
[docs] @classmethod def parse_from_cmessage(cls, cmsg, cfg_manager=None, loader=None): # pylint:disable=arguments-differ if cfg_manager is None: # create a new model unassociated from any project model = cls(cmsg.ident) else: model = cfg_manager.new_model(cmsg.ident) # nodes for node_pb2 in cmsg.nodes: node = CFGNode.parse_from_cmessage(node_pb2, cfg=model) model._nodes[node.block_id] = node model._nodes_by_addr[node.addr].append(node) model.graph.add_node(node) if len(model._nodes_by_addr[node.block_id]) > 1: if once("cfg_model_parse_from_cmessage many nodes at addr"): l.warning( "Importing a CFG with more than one node for a given address is currently unsupported. " "The resulting graph may be broken." ) model._node_addrs = list(sorted(model._nodes_by_addr.keys())) # edges for edge_pb2 in cmsg.edges: # more than one node at a given address is unsupported, grab the first one src = model._nodes_by_addr[edge_pb2.src_ea][0] dst = model._nodes_by_addr[edge_pb2.dst_ea][0] data = {} for k, v in edge_pb2.data.items(): data[k] = pickle.loads(v) data["jumpkind"] = cfg_jumpkind_from_pb(edge_pb2.jumpkind) data["ins_addr"] = edge_pb2.ins_addr if edge_pb2.ins_addr != 0xFFFF_FFFF_FFFF_FFFF else None data["stmt_idx"] = edge_pb2.stmt_idx if edge_pb2.stmt_idx != -1 else None model.graph.add_edge(src, dst, **data) # memory data for data_pb2 in cmsg.memory_data: md = MemoryData.parse_from_cmessage(data_pb2) if loader is not None and md.content is None: # fill in the content md.fill_content(loader) model.memory_data[md.addr] = md model.normalized = cmsg.normalized return model
# # Other methods #
[docs] def copy(self): model = CFGModel(self.ident, cfg_manager=self._cfg_manager, is_arm=self.is_arm) model.graph = networkx.DiGraph(self.graph) model.jump_tables = self.jump_tables.copy() model.memory_data = self.memory_data.copy() model.insn_addr_to_memory_data = self.insn_addr_to_memory_data.copy() model._nodes_by_addr = self._nodes_by_addr.copy() model._nodes = self._nodes.copy() return model
# # Node insertion and removal #
[docs] def add_node(self, block_id: int, node: CFGNode) -> None: self._nodes[block_id] = node self._nodes_by_addr[node.addr].append(node) if isinstance(node.addr, int): pos = bisect.bisect_left(self._node_addrs, node.addr) if pos >= len(self._node_addrs): self._node_addrs.append(node.addr) elif self._node_addrs[pos] != node.addr: self._node_addrs.insert(pos, node.addr)
[docs] def remove_node(self, block_id: int, node: CFGNode) -> None: """ Remove the given CFGNode instance. Note that this method *does not* remove the node from the graph. :param block_id: The Unique ID of the CFGNode. :param node: The CFGNode instance to remove. :return: None """ if block_id in self._nodes: del self._nodes[block_id] if node.addr in self._nodes_by_addr and node in self._nodes_by_addr[node.addr]: self._nodes_by_addr[node.addr].remove(node) if not self._nodes_by_addr[node.addr]: del self._nodes_by_addr[node.addr] if isinstance(node.addr, int): pos = bisect.bisect_left(self._node_addrs, node.addr) if pos < len(self._node_addrs) and self._node_addrs[pos] == node.addr: self._node_addrs.pop(pos)
# # CFG View #
[docs] def get_node(self, block_id): """ Get a single node from node key. :param BlockID block_id: Block ID of the node. :return: The CFGNode :rtype: CFGNode """ if block_id in self._nodes: return self._nodes[block_id] return None
[docs] def get_any_node( self, addr: int, is_syscall: bool = None, anyaddr: bool = False, force_fastpath: bool = False ) -> Optional[CFGNode]: """ Get an arbitrary CFGNode (without considering their contexts) from our graph. :param addr: Address of the beginning of the basic block. Set anyaddr to True to support arbitrary address. :param is_syscall: Whether you want to get the syscall node or any other node. This is due to the fact that syscall SimProcedures have the same address as the targer it returns to. None means get either, True means get a syscall node, False means get something that isn't a syscall node. :param anyaddr: If anyaddr is True, then addr doesn't have to be the beginning address of a basic block. By default the entire graph.nodes() will be iterated, and the first node containing the specific address is returned, which can be slow. :param force_fastpath: If force_fastpath is True, it will only perform a dict lookup in the _nodes_by_addr dict. :return: A CFGNode if there is any that satisfies given conditions, or None otherwise """ # fastpath: directly look in the nodes list if not anyaddr or addr in self._nodes_by_addr: try: return self._nodes_by_addr[addr][0] except (KeyError, IndexError): pass if force_fastpath: return None if isinstance(addr, int): # slower path # find all potential addresses that the block may cover pos = bisect.bisect_left(self._node_addrs, max(addr - VEX_IRSB_MAX_SIZE, 0)) is_cfgemulated = self.ident == "CFGEmulated" while pos < len(self._node_addrs): n = self._nodes_by_addr[self._node_addrs[pos]][0] actual_addr = n.addr if not self.is_arm else n.addr & 0xFFFF_FFFE if actual_addr > addr: break if is_cfgemulated: cond = n.looping_times == 0 else: cond = True if anyaddr and n.size is not None: cond = cond and (addr == actual_addr or actual_addr <= addr < actual_addr + n.size) else: cond = cond and (addr == actual_addr) if cond: if is_syscall is None: return n if n.is_syscall == is_syscall: return n pos += 1 return None
[docs] def get_all_nodes(self, addr: int, is_syscall: bool = None, anyaddr: bool = False) -> List[CFGNode]: """ Get all CFGNodes whose address is the specified one. :param addr: Address of the node :param is_syscall: True returns the syscall node, False returns the normal CFGNode, None returns both :return: all CFGNodes """ results = [] for cfg_node in self.graph.nodes(): if cfg_node.addr == addr or ( anyaddr and cfg_node.size is not None and cfg_node.addr <= addr < (cfg_node.addr + cfg_node.size) ): if is_syscall is None or is_syscall == cfg_node.is_syscall: results.append(cfg_node) return results
[docs] def nodes(self): """ An iterator of all nodes in the graph. :return: The iterator. :rtype: iterator """ return self.graph.nodes()
[docs] def get_predecessors( self, cfgnode: CFGNode, excluding_fakeret: bool = True, jumpkind: Optional[str] = None ) -> List[CFGNode]: """ Get predecessors of a node in the control flow graph. :param cfgnode: The node. :param excluding_fakeret: True if you want to exclude all predecessors that is connected to the node with a fakeret edge. :param jumpkind: Only return predecessors with the specified jumpkind. This argument will be ignored if set to None. :return: A list of predecessors """ if excluding_fakeret and jumpkind == "Ijk_FakeRet": return [] if not excluding_fakeret and jumpkind is None: # fast path if cfgnode in self.graph: return list(self.graph.predecessors(cfgnode)) return [] predecessors = [] for pred, _, data in self.graph.in_edges([cfgnode], data=True): jk = data["jumpkind"] if jumpkind is not None: if jk == jumpkind: predecessors.append(pred) elif excluding_fakeret: if jk != "Ijk_FakeRet": predecessors.append(pred) else: predecessors.append(pred) return predecessors
[docs] def get_successors( self, node: CFGNode, excluding_fakeret: bool = True, jumpkind: Optional[str] = None ) -> List[CFGNode]: """ Get successors of a node in the control flow graph. :param CFGNode node: The node. :param bool excluding_fakeret: True if you want to exclude all successors that is connected to the node with a fakeret edge. :param str or None jumpkind: Only return successors with the specified jumpkind. This argument will be ignored if set to None. :return: A list of successors :rtype: list """ if jumpkind is not None: if excluding_fakeret and jumpkind == "Ijk_FakeRet": return [] if not excluding_fakeret and jumpkind is None: # fast path if node in self.graph: return list(self.graph.successors(node)) return [] successors = [] for _, suc, data in self.graph.out_edges([node], data=True): jk = data["jumpkind"] if jumpkind is not None: if jumpkind == jk: successors.append(suc) elif excluding_fakeret: if jk != "Ijk_FakeRet": successors.append(suc) else: successors.append(suc) return successors
[docs] def get_successors_and_jumpkinds(self, node, excluding_fakeret=True): """ Get a list of tuples where the first element is the successor of the CFG node and the second element is the jumpkind of the successor. :param CFGNode node: The node. :param bool excluding_fakeret: True if you want to exclude all successors that are fall-through successors. :return: A list of successors and their corresponding jumpkinds. :rtype: list """ successors = [] for _, suc, data in self.graph.out_edges([node], data=True): if not excluding_fakeret or data["jumpkind"] != "Ijk_FakeRet": successors.append((suc, data["jumpkind"])) return successors
get_successors_and_jumpkind = get_successors_and_jumpkinds
[docs] def get_predecessors_and_jumpkinds( self, node: CFGNode, excluding_fakeret: bool = True ) -> List[Tuple[CFGNode, str]]: """ Get a list of tuples where the first element is the predecessor of the CFG node and the second element is the jumpkind of the predecessor. :param node: The node. :param excluding_fakeret: True if you want to exclude all predecessors that are fall-through predecessors. :return: A list of predecessors and their corresponding jumpkinds. """ predecessors = [] for pred, _, data in self.graph.in_edges([node], data=True): if not excluding_fakeret or data["jumpkind"] != "Ijk_FakeRet": predecessors.append((pred, data["jumpkind"])) return predecessors
get_predecessors_and_jumpkind = get_predecessors_and_jumpkinds
[docs] def get_all_predecessors(self, cfgnode, depth_limit=None): """ Get all predecessors of a specific node on the control flow graph. :param CFGNode cfgnode: The CFGNode object :param int depth_limit: Optional depth limit for the depth-first search :return: A list of predecessors in the CFG :rtype: list """ # use the reverse graph and query for successors (networkx.dfs_predecessors is misleading) # dfs_successors returns a dict of (node, [predecessors]). We ignore the keyset and use the values predecessors = set().union(*networkx.dfs_successors(self.graph.reverse(), cfgnode, depth_limit).values()) return list(predecessors)
[docs] def get_all_successors(self, cfgnode, depth_limit=None): """ Get all successors of a specific node on the control flow graph. :param CFGNode cfgnode: The CFGNode object :param int depth_limit: Optional depth limit for the depth-first search :return: A list of successors in the CFG :rtype: list """ # dfs_successors returns a dict of (node, [predecessors]). We ignore the keyset and use the values successors = set().union(*networkx.dfs_successors(self.graph, cfgnode, depth_limit).values()) return list(successors)
[docs] def get_branching_nodes(self): """ Returns all nodes that has an out degree >= 2 """ nodes = set() for n in self.graph.nodes(): if self.graph.out_degree(n) >= 2: nodes.add(n) return nodes
[docs] def get_exit_stmt_idx(self, src_block, dst_block): """ Get the corresponding exit statement ID for control flow to reach destination block from source block. The exit statement ID was put on the edge when creating the CFG. Note that there must be a direct edge between the two blocks, otherwise an exception will be raised. :return: The exit statement ID """ if not self.graph.has_edge(src_block, dst_block): raise AngrCFGError(f"Edge ({src_block}, {dst_block}) does not exist in CFG") return self.graph[src_block][dst_block]["stmt_idx"]
# # Memory data #
[docs] def add_memory_data( self, data_addr: int, data_type: Optional[MemoryDataSort], data_size: Optional[int] = None ) -> bool: """ Add a MemoryData entry to self.memory_data. :param data_addr: Address of the data :param data_type: Type of the memory data :param data_size: Size of the memory data, or None if unknown for now. :return: True if a new memory data entry is added, False otherwise. """ if data_addr not in self.memory_data: if data_type is not None and data_size is not None: data = MemoryData(data_addr, data_size, data_type, max_size=data_size) else: data = MemoryData(data_addr, 0, MemoryDataSort.Unknown) self.memory_data[data_addr] = data return True return False
[docs] def tidy_data_references( self, memory_data_addrs: Optional[List[int]] = None, exec_mem_regions: Optional[List[Tuple[int, int]]] = None, xrefs: Optional["XRefManager"] = None, seg_list: Optional["SegmentList"] = None, data_type_guessing_handlers: Optional[List[Callable]] = None, ) -> bool: """ Go through all data references (or the ones as specified by memory_data_addrs) and determine their sizes and types if possible. :param memory_data_addrs: A list of addresses of memory data, or None if tidying all known memory data entries. :param exec_mem_regions: A list of start and end addresses of executable memory regions. :param seg_list: The segment list that CFGFast uses during CFG recovery. :param data_type_guessing_handlers: A list of Python functions that will guess data types. They will be called in sequence to determine data types for memory data whose type is unknown. :return: True if new data entries are found, False otherwise. """ # Make sure all memory data entries cover all data sections keys = sorted(memory_data_addrs) if memory_data_addrs is not None else sorted(self.memory_data.keys()) for i, data_addr in enumerate(keys): data = self.memory_data[data_addr] if exec_mem_regions and self._addr_in_exec_memory_regions(data.address, exec_mem_regions): # TODO: Handle data in code regions (or executable regions) pass else: if i + 1 != len(keys): next_data_addr = keys[i + 1] else: next_data_addr = None # goes until the end of the section/segment # TODO: the logic needs more testing sec = self.project.loader.find_section_containing(data_addr) if sec is None: sec = self.project.loader.find_section_containing(data_addr - 1) next_sec_addr = None if sec is not None: last_addr = sec.vaddr + sec.memsize else: # it does not belong to any section. what's the next adjacent section? any memory data does not go # beyong section boundaries next_sec = self.project.loader.find_section_next_to(data_addr) if next_sec is not None: next_sec_addr = next_sec.vaddr seg = self.project.loader.find_segment_containing(data_addr) if seg is None: seg = self.project.loader.find_segment_containing(data_addr - 1) if seg is not None: last_addr = seg.vaddr + seg.memsize else: # We got an address that is not inside the current binary... l.warning( "tidy_data_references() sees an address %#08x that does not belong to any " "section or segment.", data_addr, ) last_addr = None if next_data_addr is None: boundary = last_addr elif last_addr is None: boundary = next_data_addr else: boundary = min(last_addr, next_data_addr) if next_sec_addr is not None: boundary = min(boundary, next_sec_addr) if boundary is not None: data.max_size = boundary - data_addr assert data.max_size is not None keys = sorted(self.memory_data.keys()) new_data_found = False i = 0 # pylint:disable=too-many-nested-blocks while i < len(keys): data_addr = keys[i] i += 1 memory_data = self.memory_data[data_addr] if memory_data.sort == MemoryDataSort.SegmentBoundary: continue content_holder = [] # let's see what sort of data it is if memory_data.sort in (MemoryDataSort.Unknown, MemoryDataSort.Unspecified) or ( memory_data.sort == MemoryDataSort.Integer and memory_data.size == self.project.arch.bytes ): data_type, data_size = self._guess_data_type( data_addr, memory_data.max_size, content_holder=content_holder, xrefs=xrefs, seg_list=seg_list, data_type_guessing_handlers=data_type_guessing_handlers, ) else: data_type, data_size = memory_data.sort, memory_data.size if data_type is not None: memory_data.size = data_size memory_data.sort = data_type if len(content_holder) == 1: memory_data.content = content_holder[0] if memory_data.max_size is not None and (0 < memory_data.size < memory_data.max_size): # Create another memory_data object to fill the gap new_addr = data_addr + memory_data.size new_md = MemoryData(new_addr, None, None, max_size=memory_data.max_size - memory_data.size) self.memory_data[new_addr] = new_md if xrefs is not None: # Make a copy of all old references old_crs = xrefs.get_xrefs_by_dst(data_addr) crs = [] for old_cr in old_crs: cr = old_cr.copy() cr.memory_data = new_md crs.append(cr) xrefs.add_xrefs(crs) keys.insert(i, new_addr) if data_type == MemoryDataSort.PointerArray: # make sure all pointers are identified pointer_size = self.project.arch.bytes if xrefs is not None: old_crs = xrefs.get_xrefs_by_dst(data_addr) else: old_crs = [] for j in range(0, data_size, pointer_size): ptr = self.project.loader.fast_memory_load_pointer(data_addr + j) # is this pointer coming from the current binary? obj = self.project.loader.find_object_containing(ptr, membership_check=False) if obj is not self.project.loader.main_object: # the pointer does not come from current binary. skip. continue if seg_list is not None and seg_list.is_occupied(ptr): sort = seg_list.occupied_by_sort(ptr) if sort == "code": continue if sort == "pointer-array": continue # TODO: other types if ptr not in self.memory_data: new_md = MemoryData(ptr, 0, MemoryDataSort.Unknown, pointer_addr=data_addr + j) self.memory_data[ptr] = new_md if xrefs is not None: # Make a copy of the old reference crs = [] for old_cr in old_crs: cr = old_cr.copy() cr.memory_data = new_md crs.append(cr) xrefs.add_xrefs(crs) new_data_found = True else: memory_data.size = memory_data.max_size if seg_list is not None: seg_list.occupy(data_addr, memory_data.size, memory_data.sort) return new_data_found
def _guess_data_type( self, data_addr, max_size, content_holder=None, xrefs: Optional["XRefManager"] = None, seg_list: Optional["SegmentList"] = None, data_type_guessing_handlers: Optional[List[Callable]] = None, extra_memory_regions: Optional[List[Tuple[int, int]]] = None, ): """ Make a guess to the data type. Users can provide their own data type guessing code when initializing CFGFast instance, and each guessing handler will be called if this method fails to determine what the data is. :param int data_addr: Address of the data. :param int max_size: The maximum size this data entry can be. :return: a tuple of (data type, size). (None, None) if we fail to determine the type or the size. :rtype: tuple """ if max_size is None: max_size = 0 # quick check: if it's at the beginning of a binary, it might be the ELF header elfheader_sort, elfheader_size = self._guess_data_type_elfheader(data_addr, max_size) if elfheader_sort: return elfheader_sort, elfheader_size irsb_addr, stmt_idx = None, None if xrefs is not None and seg_list is not None: try: ref: "XRef" = next(iter(xrefs.get_xrefs_by_dst(data_addr))) irsb_addr = ref.block_addr stmt_idx = ref.stmt_idx except StopIteration: pass if seg_list.is_occupied(data_addr) and seg_list.occupied_by_sort(data_addr) == "code": # it's a code reference # TODO: Further check if it's the beginning of an instruction return MemoryDataSort.CodeReference, 0 pointer_size = self.project.arch.bytes # who's using it? if irsb_addr is not None and isinstance(self.project.loader.main_object, cle.MetaELF): plt_entry = self.project.loader.main_object.reverse_plt.get(irsb_addr, None) if plt_entry is not None: # IRSB is owned by plt! return MemoryDataSort.GOTPLTEntry, pointer_size # is it in a section with zero bytes, like .bss? obj = self.project.loader.find_object_containing(data_addr) if obj is None: return None, None section = obj.find_section_containing(data_addr) if section is not None and section.only_contains_uninitialized_data: # Nothing much you can do return None, None r = self._guess_data_type_pointer_array( data_addr, pointer_size, max_size, extra_memory_regions=extra_memory_regions ) if r is not None: return r try: data = self.project.loader.memory.load(data_addr, min(1024, max_size)) except KeyError: data = b"" # Is it an unicode string? # TODO: Support unicode string longer than the max length if len(data) >= 4 and data[1] == 0 and data[2] != 0 and data[3] == 0 and data[0] in _PRINTABLES: def can_decode(n): try: data[: n * 2].decode("utf_16_le") except UnicodeDecodeError: return False return True if can_decode(4) or can_decode(5) or can_decode(6): running_failures = 0 last_success = 4 for i in range(4, len(data) // 2): if can_decode(i): last_success = i running_failures = 0 if data[i * 2 - 2] == 0 and data[i * 2 - 1] == 0: break else: running_failures += 1 if running_failures > 3: break if content_holder is not None: string_data = data[: last_success * 2] if string_data.endswith(b"\x00\x00"): string_data = string_data[:-2] content_holder.append(string_data) return MemoryDataSort.UnicodeString, last_success if data: try: zero_pos = data.index(0) except ValueError: zero_pos = None if (zero_pos is not None and zero_pos > 0 and all(c in _PRINTABLES for c in data[:zero_pos])) or all( c in _PRINTABLES for c in data ): # it's a string # however, it may not be terminated string_data = data if zero_pos is None else data[:zero_pos] if content_holder is not None: content_holder.append(string_data) string_len = len(string_data) if zero_pos: string_len += 1 return MemoryDataSort.String, min(string_len, 1024) if data_type_guessing_handlers: for handler in data_type_guessing_handlers: irsb = None if irsb_addr is None else self.get_any_node(irsb_addr).block.vex sort, size = handler(self, irsb, irsb_addr, stmt_idx, data_addr, max_size) if sort is not None: return sort, size return None, None def _guess_data_type_pointer_array( self, data_addr: int, pointer_size: int, max_size: int, extra_memory_regions: Optional[List[Tuple[int, int]]] = None, ): pointers_count = 0 max_pointer_array_size = min(512 * pointer_size, max_size) for i in range(0, max_pointer_array_size, pointer_size): ptr = self.project.loader.fast_memory_load_pointer(data_addr + i) if ptr is not None: # if self._seg_list.is_occupied(ptr) and self._seg_list.occupied_by_sort(ptr) == 'code': # # it's a code reference # # TODO: Further check if it's the beginning of an instruction # pass if ( self.project.loader.find_section_containing(ptr) is not None or self.project.loader.find_segment_containing(ptr) is not None or (extra_memory_regions and next(((a < ptr < b) for (a, b) in extra_memory_regions), None)) ): # it's a pointer of some sort # TODO: Determine what sort of pointer it is pointers_count += 1 else: break if pointers_count: return MemoryDataSort.PointerArray, pointer_size * pointers_count return None def _guess_data_type_elfheader(self, data_addr, max_size): """ Is the specified data chunk an ELF header? :param int data_addr: Address of the data chunk :param int max_size: Size of the data chunk. :return: A tuple of ('elf-header', size) if it is, or (None, None) if it is not. :rtype: tuple """ obj = self.project.loader.find_object_containing(data_addr) if obj is None: # it's not mapped return None, None if data_addr == obj.min_addr and 4 < max_size < 1000: # Does it start with the ELF magic bytes? try: data = self.project.loader.memory.load(data_addr, 4) except KeyError: return None, None if data == b"\x7fELF": # yes! return MemoryDataSort.ELFHeader, max_size return None, None # # Util methods # def _addr_in_exec_memory_regions(self, addr: int, exec_mem_regions: List[Tuple[int, int]]) -> bool: for start, end in exec_mem_regions: if start <= addr < end: return True return False