Source code for angr.analyses.decompiler.optimization_passes.optimization_pass

# pylint:disable=unused-argument
from typing import Optional, Dict, Set, Tuple, Generator, TYPE_CHECKING
from enum import Enum

import networkx  # pylint:disable=unused-import

import ailment


if TYPE_CHECKING:
    from angr.knowledge_plugins.functions import Function


[docs]class MultipleBlocksException(Exception): """ An exception that is raised in _get_block() where multiple blocks satisfy the criteria but only one block was requested. """ pass
[docs]class OptimizationPassStage(Enum): """ Enums about optimization pass stages. Note that the region identification pass (RegionIdentifier) may modify existing AIL blocks *without updating the topology of the original AIL graph*. For example, loop successor refinement may modify create a new AIL block with an artificial address, and alter existing jump targets of jump statements and conditional jump statements to point to this new block. However, loop successor refinement does not update the topology of the original AIL graph, which means this new AIL block does not exist in the original AIL graph. As a result, until this behavior of RegionIdentifier changes in the future, DURING_REGION_IDENTIFICATION optimization passes should not modify existing jump targets. """ AFTER_AIL_GRAPH_CREATION = 0 AFTER_SINGLE_BLOCK_SIMPLIFICATION = 1 AFTER_GLOBAL_SIMPLIFICATION = 2 AFTER_VARIABLE_RECOVERY = 3 BEFORE_REGION_IDENTIFICATION = 4 DURING_REGION_IDENTIFICATION = 5 AFTER_STRUCTURING = 6
[docs]class BaseOptimizationPass: """ The base class for any optimization pass. """ ARCHES = [] # strings of supported architectures PLATFORMS = [] # strings of supported platforms. Can be one of the following: "win32", "linux" STAGE: int = None # Specifies when this optimization pass should be executed STRUCTURING: Optional[ str ] = None # specifies if this optimization pass is specific to a certain structuring algorithm
[docs] def __init__(self, func): self._func: "Function" = func
@property def project(self): return self._func.project @property def kb(self): return self.project.kb
[docs] def analyze(self): ret, cache = self._check() if ret: self._analyze(cache=cache)
def _check(self): """ Check if this optimization applies to this function. :returns: a tuple of (does_apply, cache) where cache is a way to pass information to _analyze so it does not have to be recalculated """ raise NotImplementedError() def _analyze(self, cache=None): """ Run the analysis. :param cache: information passed from _check so it does not have to be recalculated :returns: None """ raise NotImplementedError()
[docs]class OptimizationPass(BaseOptimizationPass): """ The base class for any function-level graph optimization pass. """
[docs] def __init__( self, func, blocks_by_addr=None, blocks_by_addr_and_idx=None, graph=None, variable_kb=None, region_identifier=None, reaching_definitions=None, **kwargs, ): super().__init__(func) # self._blocks is just a cache self._blocks_by_addr: Dict[int, Set[ailment.Block]] = blocks_by_addr self._blocks_by_addr_and_idx: Dict[Tuple[int, Optional[int]], ailment.Block] = blocks_by_addr_and_idx self._graph: Optional[networkx.DiGraph] = graph self._variable_kb = variable_kb self._ri = region_identifier self._rd = reaching_definitions # output self.out_graph: Optional[networkx.DiGraph] = None
@property def blocks_by_addr(self) -> Dict[int, Set[ailment.Block]]: return self._blocks_by_addr @property def blocks_by_addr_and_idx(self) -> Dict[Tuple[int, Optional[int]], ailment.Block]: return self._blocks_by_addr_and_idx # # Util methods # def _get_block(self, addr, idx=None) -> Optional[ailment.Block]: if not self._blocks_by_addr: return None else: if idx is None: blocks = self._blocks_by_addr.get(addr, None) else: blocks = [self._blocks_by_addr_and_idx.get((addr, idx), None)] if not blocks: return None if len(blocks) == 1: return next(iter(blocks)) raise MultipleBlocksException( "There are %d blocks at address %#x.%s but only one is requested." % (len(blocks), addr, idx) ) def _get_blocks(self, addr, idx=None) -> Generator[ailment.Block, None, None]: if not self._blocks_by_addr: return else: if idx is None: blocks = self._blocks_by_addr.get(addr, None) if blocks is not None: yield from blocks else: block = self._blocks_by_addr_and_idx.get((addr, idx), None) if block is not None: yield block def _update_block(self, old_block, new_block): if self.out_graph is None: self.out_graph = self._graph # we do not make copy here for performance reasons. we can change it if needed if old_block not in self.out_graph: return in_edges = list(self.out_graph.in_edges(old_block, data=True)) out_edges = list(self.out_graph.out_edges(old_block, data=True)) self._remove_block(old_block) self.out_graph.add_node(new_block) self._blocks_by_addr[new_block.addr].add(new_block) self._blocks_by_addr_and_idx[(new_block.addr, new_block.idx)] = new_block for src, _, data in in_edges: if src is old_block: src = new_block self.out_graph.add_edge(src, new_block, **data) for _, dst, data in out_edges: if dst is old_block: dst = new_block self.out_graph.add_edge(new_block, dst, **data) def _remove_block(self, block): if self.out_graph is None: self.out_graph = self._graph if block in self.out_graph: self.out_graph.remove_node(block) if block.addr in self._blocks_by_addr and block in self._blocks_by_addr[block.addr]: self._blocks_by_addr[block.addr].remove(block) del self._blocks_by_addr_and_idx[(block.addr, block.idx)] @staticmethod def _is_add(expr): return isinstance(expr, ailment.Expr.BinaryOp) and expr.op == "Add" @staticmethod def _is_sub(expr): return isinstance(expr, ailment.Expr.BinaryOp) and expr.op == "Sub"
[docs]class SequenceOptimizationPass(BaseOptimizationPass): """ The base class for any sequence node optimization pass. """ ARCHES = [] # strings of supported architectures PLATFORMS = [] # strings of supported platforms. Can be one of the following: "win32", "linux" STAGE: int = None # Specifies when this optimization pass should be executed
[docs] def __init__(self, func, seq=None, **kwargs): super().__init__(func) self.seq = seq self.out_seq = None