# pylint:disable=missing-class-docstring,no-self-use
import logging
from angr.analyses import ForwardAnalysis, visitors
from ..block import SootBlockNode
from ..errors import AngrLoopAnalysisError
from . import register_analysis
from .analysis import Analysis
l = logging.getLogger(name=__name__)
[docs]class VariableTypes:
Iterator = "Iterator"
HasNext = "HasNext"
Next = "Next"
[docs]class AnnotatedVariable:
__slots__ = ["variable", "type"]
[docs] def __init__(self, variable, type_):
self.variable = variable
self.type = type_
def __repr__(self):
return f"{self.variable}//{self.type}"
def __eq__(self, other):
return type(other) is AnnotatedVariable and other.type == self.type and other.variable == self.variable
[docs]class Condition:
Equal = "=="
NotEqual = "!="
[docs] def __init__(self, op, val0, val1):
self.op = op
self.val0 = val0
self.val1 = val1
def __repr__(self):
return f"{self.val0} {self.op} {self.val1}"
[docs] @classmethod
def from_opstr(cls, opstr):
mapping = {
"eq": cls.Equal,
"==": cls.Equal,
"ne": cls.NotEqual,
"!=": cls.NotEqual,
}
return mapping.get(opstr, None)
[docs]class SootBlockProcessor:
[docs] def __init__(self, state, block, loop, defuse):
self.state = state
self.block = block
self.loop = loop
self.defuse = defuse
[docs] def process(self):
if not isinstance(self.block, SootBlockNode):
raise AngrLoopAnalysisError("Got an unexpected type of block %s." % type(self.block))
if not self.block.stmts:
return None
for stmt in self.block.stmts:
func_name = "_handle_%s" % (stmt.__class__.__name__)
if hasattr(self, func_name):
getattr(self, func_name)(stmt)
return self.state
def _stmt_inside_loop(self, stmt_idx):
"""
Test whether a statement is inside the loop body or not.
:param stmt_idx:
:return:
"""
# TODO: This is slow. Fix the performance issue
for node in self.loop.body_nodes:
if node.addr.stmt_idx <= stmt_idx < node.addr.stmt_idx + node.size:
return True
return False
def _expr(self, expr):
func_name = "_handle_%s" % (expr.__class__.__name__)
if hasattr(self, func_name):
return getattr(self, func_name)(expr)
return expr
#
# Statement handlers
#
def _handle_AssignStmt(self, stmt):
left_op, right_op = stmt.left_op, stmt.right_op
expr = self._expr(right_op)
if expr is not None:
try:
from pysoot.sootir.soot_value import SootLocal
except ImportError:
l.error("Please install PySoot before analyzing Java byte code.")
raise
if isinstance(left_op, SootLocal):
# Log a def for the local variable
self.state.locals[left_op.name] = expr
def _handle_IfStmt(self, stmt):
target = stmt.target
# is it jumping outside the loop?
if not self._stmt_inside_loop(target):
cond = Condition(
Condition.from_opstr(stmt.condition.op),
self._expr(stmt.condition.value1),
self._expr(stmt.condition.value2),
)
self.state.add_loop_exit_stmt(stmt.label, condition=cond)
#
# Expression handlers
#
def _handle_SootLocal(self, expr):
local_name = expr.name
# First try state.locals
try:
return self.state.locals[local_name]
except KeyError:
pass
# Then try defuse graph
try:
def_ = self.defuse.defs[local_name]
expr = def_.evaluated
except KeyError:
pass
return expr
def _handle_SootIntConstant(self, expr):
return expr.value
def _handle_SootInterfaceInvokeExpr(self, expr):
full_method = expr.class_name + "." + expr.method_name
mapping = {
"java.util.Set.iterator": VariableTypes.Iterator,
"java.util.Iterator.hasNext": VariableTypes.HasNext,
"java.util.Iterator.next": VariableTypes.Next,
}
base_var = self._expr(expr.base)
if base_var is None:
return None
# Try to annotate the variable when applicable
try:
try:
from pysoot.sootir.soot_value import SootValue
except ImportError:
l.error("Please install PySoot before analyzing Java byte code.")
raise
var_type = mapping[full_method]
if isinstance(base_var, (SootValue, AnnotatedVariable)):
annotated_var = AnnotatedVariable(base_var, var_type)
return annotated_var
except KeyError:
pass
return None
[docs]class LoopAnalysisState:
[docs] def __init__(self, block):
self.block = block
self.induction_variables = {}
self.locals = {}
self.loop_exit_stmts = set()
def __repr__(self):
return "<LoopAnalysisState %s>" % self.block.addr
[docs] def copy(self):
state = LoopAnalysisState(block=self.block)
state.induction_variables = self.induction_variables.copy()
state.locals = self.locals.copy()
state.loop_exit_stmts = self.loop_exit_stmts.copy()
return state
[docs] def merge(self, state):
s = self.copy()
# TODO: Induction variables
s.locals.update(state.locals)
s.loop_exit_stmts |= state.loop_exit_stmts
return s
[docs] def add_loop_exit_stmt(self, stmt_idx, condition=None):
self.loop_exit_stmts.add((condition, stmt_idx))
[docs]class LoopAnalysis(ForwardAnalysis, Analysis):
"""
Analyze a loop and recover important information about the loop (e.g., invariants, induction variables) in a static
manner.
"""
[docs] def __init__(self, loop, defuse):
visitor = visitors.LoopVisitor(loop)
ForwardAnalysis.__init__(self, order_jobs=True, allow_merging=True, allow_widening=False, graph_visitor=visitor)
self.loop = loop
self.defuse = defuse
self._traversed = set()
self._last_state = None
self.loop_exit_stmts = None
self.locals = None
self.bounded = None # Whether this loop is bounded
self._analyze()
#
# Main analysis routines
#
def _pre_analysis(self):
pass
def _pre_job_handling(self, job):
pass
def _initial_abstract_state(self, node):
state = LoopAnalysisState(node)
return state
def _merge_states(self, node, *states):
merged = states[0]
for other in states[1:]:
if other is not None:
merged = merged.merge(other)
return merged
def _run_on_node(self, node, state):
if node in self._traversed:
return False, state
self._traversed.add(node)
state = state.copy()
processor = SootBlockProcessor(state, node, self.loop, self.defuse)
state = processor.process()
self._last_state = state
return True, state
def _intra_analysis(self):
pass
def _post_analysis(self):
self.loop_exit_stmts = self._last_state.loop_exit_stmts
self.locals = self._last_state.locals
self._last_state = None
# Is it bounded?
self.bounded = self._is_bounded()
def _is_bounded(self):
"""
Checks whether this loop is bounded. We basically does a bunch of pattern matching.
:return: True if this loop is bounded, False is this loop is not bounded, None otherwise (undetermined).
:rtype: bool or None
"""
b = self._is_bounded_iterator_based()
if b is not None:
return b
# Cannot determine
return None
def _is_bounded_iterator_based(self):
"""
Iterator based check.
With respect to a certain variable/value A,
- there must be at least one exit condition being A//Iterator//HasNext == 0
- there must be at least one local that ticks the iterator next: A//Iterator//Next
"""
# Condition 0
def check_0(cond):
return (
isinstance(cond, Condition)
and cond.op == Condition.Equal
and cond.val1 == 0
and isinstance(cond.val0, AnnotatedVariable)
and cond.val0.type == VariableTypes.HasNext
)
check_0_results = [(check_0(stmt[0]), stmt[0]) for stmt in self.loop_exit_stmts]
check_0_conds = [cond for r, cond in check_0_results if r] # remove all False ones
if not check_0_conds:
return None
the_iterator = check_0_conds[0].val0.variable
# Condition 1
def check_1(local):
return (
isinstance(local, AnnotatedVariable)
and local.type == VariableTypes.Next
and local.variable == the_iterator
)
if not any([check_1(local) for local in self.locals.values()]):
return None
return True
register_analysis(LoopAnalysis, "LoopAnalysis")