Source code for angr.knowledge_plugins.functions.function_parser

# pylint:disable=no-member,raise-missing-from
import logging
import pickle

from collections import defaultdict

from ...codenode import BlockNode, HookNode
from ...utils.enums_conv import func_edge_type_to_pb, func_edge_type_from_pb
from ...protos import primitives_pb2, function_pb2

l = logging.getLogger(name=__name__)


[docs]class FunctionParser: """ The implementation of the serialization methods for the <Function> class. """
[docs] @staticmethod def serialize(function): """ :return : """ # delayed import from .function import Function # pylint:disable=import-outside-toplevel obj = Function._get_cmsg() obj.ea = function.addr obj.is_entrypoint = False # TODO: Set this up accordingly obj.name = function.name obj.is_plt = function.is_plt obj.is_syscall = function.is_syscall obj.is_simprocedure = function.is_simprocedure obj.returning = function.returning obj.alignment = function.alignment obj.binary_name = function.binary_name obj.normalized = function.normalized # signature matched? if not function.from_signature: obj.matched_from = function_pb2.Function.UNMATCHED else: if function.from_signature == "flirt": obj.matched_from = function_pb2.Function.FLIRT else: raise ValueError( f"Cannot convert from_signature {function.from_signature} into a SignatureSource " f"enum." ) # blocks blocks_list = [b.serialize_to_cmessage() for b in function.blocks] obj.blocks.extend(blocks_list) # pylint:disable=no-member block_addrs_set = function.block_addrs_set # graph edges = [] external_addrs = set() TRANSITION_JK = func_edge_type_to_pb("transition") # default edge type for src, dst, data in function.transition_graph.edges(data=True): edge = primitives_pb2.Edge() edge.src_ea = src.addr edge.dst_ea = dst.addr if src.addr not in block_addrs_set: # this is a Block in another function, or just another Function instance. external_addrs.add(src.addr) if dst.addr not in block_addrs_set: external_addrs.add(dst.addr) edge.jumpkind = TRANSITION_JK for key, value in data.items(): if key == "type": edge.jumpkind = func_edge_type_to_pb(value) elif key == "ins_addr": if value is not None: edge.ins_addr = value elif key == "stmt_idx": if value is not None: edge.stmt_idx = value elif key == "outside": edge.is_outside = value else: edge.data[key] = pickle.dumps(value) # pylint:disable=no-member edges.append(edge) obj.graph.edges.extend(edges) # pylint:disable=no-member # referenced functions obj.external_functions.extend(external_addrs) # pylint:disable=no-member return obj
[docs] @staticmethod def parse_from_cmsg(cmsg, function_manager=None, project=None, all_func_addrs=None): """ :param cmsg: The data to instanciate the <Function> from. :return Function: """ # delayed import from .function import Function # pylint:disable=import-outside-toplevel obj = Function( function_manager, cmsg.ea, name=cmsg.name, is_plt=cmsg.is_plt, syscall=cmsg.is_syscall, is_simprocedure=cmsg.is_simprocedure, returning=cmsg.returning, alignment=cmsg.alignment, binary_name=cmsg.binary_name, ) obj._project = project obj.normalized = cmsg.normalized # signature matched? if cmsg.matched_from == function_pb2.Function.UNMATCHED: obj.from_signature = None elif cmsg.matched_from == function_pb2.Function.FLIRT: obj.from_signature = "flirt" else: raise ValueError(f"Cannot convert SignatureSource enum {cmsg.matched_from} to Function.from_signature.") # blocks blocks = {} for b in cmsg.blocks: if project is not None and project.is_hooked(b.ea): # create a HookNode block = HookNode(b.ea, 0, project.hooked_by(b.ea)) else: block = BlockNode(b.ea, b.size, bytestr=b.bytes) blocks[block.addr] = block external_addrs = set(cmsg.external_functions) # addresses of referenced blocks or nodes that are not inside # the current function # edges edges = {} fake_return_edges = defaultdict(list) for edge_cmsg in cmsg.graph.edges: try: src = FunctionParser._get_block_or_func( edge_cmsg.src_ea, blocks, external_addrs, function_manager, project, all_func_addrs=all_func_addrs, ) except KeyError: raise KeyError("Address of the edge source %#x is not found." % edge_cmsg.src_ea) edge_type = func_edge_type_from_pb(edge_cmsg.jumpkind) assert edge_type is not None dst = None dst_addr = edge_cmsg.dst_ea if dst_addr not in blocks: if edge_type == "call" or ( # call has to go to either a HookNode or a function all_func_addrs is not None and dst_addr in all_func_addrs ): # jumps to another function if function_manager is not None: # get a function dst = FunctionParser._get_func(dst_addr, function_manager) else: l.warning( "About to get or create a function at %#x, but function_manager is not provided. " "Will create a block instead.", dst_addr, ) if dst is None: # create a block instead try: dst = FunctionParser._get_block_or_func( dst_addr, blocks, external_addrs, function_manager, project, all_func_addrs=all_func_addrs, ) except KeyError: raise KeyError("Address of the edge destination %#x is not found." % edge_cmsg.dst_ea) data = {k: pickle.loads(v) for k, v in edge_cmsg.data.items()} data["outside"] = edge_cmsg.is_outside data["ins_addr"] = edge_cmsg.ins_addr data["stmt_idx"] = edge_cmsg.stmt_idx if edge_type == "fake_return": fake_return_edges[edge_cmsg.src_ea].append((src, dst, data)) else: edges[(edge_cmsg.src_ea, dst_addr, edge_type)] = (src, dst, data) added_nodes = set() for k, v in edges.items(): src_addr, dst_addr, edge_type = k src, dst, data = v outside = data.get("outside", False) ins_addr = data.get("ins_addr", None) stmt_idx = data.get("stmt_idx", None) added_nodes.add(src) added_nodes.add(dst) if edge_type in ("transition", "exception"): obj._transit_to( src, dst, outside=outside, ins_addr=ins_addr, stmt_idx=stmt_idx, is_exception=edge_type == "exception", ) elif edge_type == "call": # find the corresponding fake_ret edge fake_ret_edge = next( iter(edge_ for edge_ in fake_return_edges[src_addr] if edge_[1].addr == src.addr + src.size), None ) if dst is None: l.warning( "The destination function %#x does not exist, and it cannot be created since function " "manager is not provided. Please consider passing in a function manager to rebuild this " "graph.", dst_addr, ) else: if isinstance(dst, Function): obj._call_to( src, dst, None if fake_ret_edge is None else fake_ret_edge[1], stmt_idx=stmt_idx, ins_addr=ins_addr, return_to_outside=fake_ret_edge is None, ) if fake_ret_edge is not None: fakeret_src, fakeret_dst, fakeret_data = fake_ret_edge added_nodes.add(fakeret_dst) obj._fakeret_to( fakeret_src, fakeret_dst, confirmed=fakeret_data.get("confirmed"), to_outside=fakeret_data.get("outside", None), ) elif edge_type == "fake_return": pass # add leftover blocks for block in blocks.values(): if block not in added_nodes: obj._register_nodes(True, block) return obj
@staticmethod def _get_block_or_func(addr, blocks, external_addrs, function_manager, project, all_func_addrs=None): # should we get a block or a function? try: r = blocks[addr] # it's a block. just return it return r except KeyError: pass if addr in external_addrs: if project is not None and project.is_hooked(addr): # get a hook node instead r = HookNode(addr, 0, project.hooked_by(addr)) return r if all_func_addrs is not None and addr in all_func_addrs: # get a function (which is yet to be created in the function manager) r = function_manager.function(addr=addr, create=True) return r else: # create a block # TODO: We are deciding the size by re-lifting the block from project. This is usually fine except for # TODO: the cases where the block does not exist in project (e.g., when the block was dynamically # TODO: created). The correct solution is to store the size and bytes of the block, too. if project is not None: block = project.factory.block(addr) block_size = block.size bytestr = block.bytes else: l.warning( "The Project instance is not specified. Use a dummy block size of 1 byte for block %#x.", addr ) block_size = 1 bytestr = b"\x00" r = BlockNode(addr, block_size, bytestr=bytestr) return r raise ValueError( "Unsupported case: The block %#x is not in external_addrs and is not in local blocks. " "This probably indicates a bug in angrdb generation." ) @staticmethod def _get_func(addr, function_manager): func = function_manager.function(addr=addr, create=True) return func