import os
import binascii
import logging
from .... import concretization_strategies
from ....errors import SimUnsatError, SimMemoryAddressError
from ....engines.soot.values import (
SimSootValue_ArrayRef,
SimSootValue_ArrayBaseRef,
SimSootValue_InstanceFieldRef,
SimSootValue_Local,
SimSootValue_ParamRef,
SimSootValue_StaticFieldRef,
SimSootValue_StringRef,
)
from .. import MemoryMixin
l = logging.getLogger(name=__name__)
[docs]class JavaVmMemoryMixin(MemoryMixin):
def __init__(
self,
memory_id="mem",
stack=None,
heap=None,
vm_static_table=None,
load_strategies=None,
store_strategies=None,
max_array_size=1000,
**kwargs,
):
super().__init__(memory_id=memory_id, **kwargs)
self._stack = [] if stack is None else stack
# delayed import
from .. import KeyValueMemory
self.heap = KeyValueMemory("mem") if heap is None else heap
self.vm_static_table = KeyValueMemory("mem") if vm_static_table is None else vm_static_table
# Heap helper
# TODO: ask someone how we want to manage this
# TODO: Manage out of memory allocation
# self._heap_allocation_id = 0
self.max_array_size = max_array_size
# concretizing strategies
self.load_strategies = load_strategies if load_strategies else []
self.store_strategies = store_strategies if store_strategies else []
[docs] @staticmethod
def get_new_uuid():
"""
Generate a unique id within the scope of the JavaVM memory. This, for
example, is used for distinguishing memory objects of the same type
(e.g. multiple instances of the same class).
"""
# self._heap_allocation_id += 1
# return str(self._heap_allocation_id)
return binascii.hexlify(os.urandom(4))
[docs] def store(self, addr, data, frame=0): # pylint: disable=arguments-differ
if type(addr) is SimSootValue_Local:
cstack = self._stack[-1 + (-1 * frame)]
cstack.store(addr.id, data, type_=addr.type)
elif type(addr) is SimSootValue_ParamRef:
cstack = self._stack[-1 + (-1 * frame)]
cstack.store(addr.id, data, type_=addr.type)
elif type(addr) is SimSootValue_ArrayRef:
self.store_array_element(addr.base, addr.index, data)
elif type(addr) is SimSootValue_StaticFieldRef:
self.vm_static_table.store(addr.id, data, type_=addr.type)
elif type(addr) is SimSootValue_InstanceFieldRef:
self.heap.store(addr.id, data, type_=addr.type)
elif type(addr) is SimSootValue_StringRef:
self.heap.store(addr.id, data, type_=addr.type)
else:
l.error("Unknown addr type %s", addr)
[docs] def load(self, addr, frame=0, none_if_missing=False): # pylint: disable=arguments-differ
if type(addr) is SimSootValue_Local:
cstack = self._stack[-1 + (-1 * frame)]
return cstack.load(addr.id, none_if_missing=none_if_missing)
elif type(addr) is SimSootValue_ArrayRef:
return self.load_array_element(addr.base, addr.index)
elif type(addr) is SimSootValue_ParamRef:
cstack = self._stack[-1 + (-1 * frame)]
return cstack.load(addr.id, none_if_missing=none_if_missing)
elif type(addr) is SimSootValue_StaticFieldRef:
value = self.vm_static_table.load(addr.id, none_if_missing=none_if_missing)
if value is None:
# initialize field
value = self.state.project.simos.get_default_value_by_type(addr.type, state=self.state)
l.debug("Initializing static field %s with %s.", addr, value)
self.store(addr, value)
return value
elif type(addr) is SimSootValue_InstanceFieldRef:
value = self.heap.load(addr.id, none_if_missing=none_if_missing)
if value is None:
# initialize field
value = self.state.project.simos.get_default_value_by_type(addr.type, state=self.state)
l.debug("Initializing field %s with %s.", addr, value)
self.store(addr, value)
return value
elif type(addr) is SimSootValue_StringRef:
return self.heap.load(addr.id, none_if_missing=none_if_missing)
else:
l.error("Unknown addr type %s", addr)
return None
[docs] def push_stack_frame(self):
from .. import KeyValueMemory
self._stack.append(KeyValueMemory("mem"))
[docs] def pop_stack_frame(self):
self._stack = self._stack[:-1]
@property
def stack(self):
return self._stack[-1]
#
# Array // Store
#
[docs] def store_array_element(self, array, idx, value):
self.store_array_elements(array, idx, value)
[docs] def store_array_elements(self, array, start_idx, data):
"""
Stores either a single element or a range of elements in the array.
:param array: Reference to the array.
:param start_idx: Starting index for the store.
:param data: Either a single value or a list of values.
"""
# we process data as a list of elements
# => if there is only a single element, wrap it in a list
data = data if isinstance(data, list) else [data]
# concretize start index
concrete_start_idxes = self.concretize_store_idx(start_idx)
if len(concrete_start_idxes) == 1:
# only one start index
# => concrete store
concrete_start_idx = concrete_start_idxes[0]
for i, value in enumerate(data):
self._store_array_element_on_heap(
array=array, idx=concrete_start_idx + i, value=value, value_type=array.element_type
)
# if the index was symbolic before concretization, this
# constraint it to concrete start idx
self.state.add_constraints(concrete_start_idx == start_idx)
else:
# multiple start indexes
# => symbolic store
start_idx_options = []
for concrete_start_idx in concrete_start_idxes:
start_idx_options.append(concrete_start_idx == start_idx)
# we store elements condtioned with the start index:
# => if concrete_start_idx == start_idx
# then store the value
# else keep the current value
for i, value in enumerate(data):
self._store_array_element_on_heap(
array=array,
idx=concrete_start_idx + i,
value=value,
value_type=array.element_type,
store_condition=start_idx_options[-1],
)
# constraint start_idx, s.t. it evals to one of the concretized indexes
constraint_on_start_idx = self.state.solver.Or(*start_idx_options)
self.state.add_constraints(constraint_on_start_idx)
def _store_array_element_on_heap(self, array, idx, value, value_type, store_condition=None):
heap_elem_id = "%s[%d]" % (array.id, idx)
l.debug("Set %s to %s with condition %s", heap_elem_id, value, store_condition)
if store_condition is not None:
current_value = self._load_array_element_from_heap(array, idx)
new_value = value
value = self.state.solver.If(store_condition, new_value, current_value)
self.heap.store(heap_elem_id, value, value_type)
#
# Array // Load
#
[docs] def load_array_element(self, array, idx):
return self.load_array_elements(array, idx, 1)[0]
[docs] def load_array_elements(self, array, start_idx, no_of_elements):
"""
Loads either a single element or a range of elements from the array.
:param array: Reference to the array.
:param start_idx: Starting index for the load.
:param no_of_elements: Number of elements to load.
"""
# concretize start index
concrete_start_idxes = self.concretize_load_idx(start_idx)
if len(concrete_start_idxes) == 1:
# only one start index
# => concrete load
concrete_start_idx = concrete_start_idxes[0]
load_values = [
self._load_array_element_from_heap(array, idx)
for idx in range(concrete_start_idx, concrete_start_idx + no_of_elements)
]
# if the index was symbolic before concretization, this
# constraint it to concrete start idx
self.state.add_constraints(start_idx == concrete_start_idx)
else:
# multiple start indexes
# => symbolic load
# start with load values for the first concrete index
concrete_start_idx = concrete_start_idxes[0]
load_values = [
self._load_array_element_from_heap(array, idx)
for idx in range(concrete_start_idx, concrete_start_idx + no_of_elements)
]
start_idx_options = [concrete_start_idx == start_idx]
# update load values with all remaining start indexes
for concrete_start_idx in concrete_start_idxes[1:]:
# load values for this start index
values = [
self._load_array_element_from_heap(array, idx)
for idx in range(concrete_start_idx, concrete_start_idx + no_of_elements)
]
# update load values with the new ones
for i, value in enumerate(values):
# condition every value with the start idx
# => if concrete_start_idx == start_idx
# then use new value
# else use the current value
load_values[i] = self.state.solver.If(concrete_start_idx == start_idx, value, load_values[i])
start_idx_options.append(start_idx == concrete_start_idx)
# constraint start_idx, s.t. it evals to one of the concretized indexes
constraint_on_start_idx = self.state.solver.Or(*start_idx_options)
self.state.add_constraints(constraint_on_start_idx)
return load_values
def _load_array_element_from_heap(self, array: SimSootValue_ArrayBaseRef, idx):
# try to load the element
heap_elem_id = "%s[%d]" % (array.id, idx)
value = self.heap.load(heap_elem_id, none_if_missing=True)
# if it's not available, initialize it
if value is None:
value = array.get_default_value(self.state)
l.debug("Init %s with %s", heap_elem_id, value)
element_type = array.element_type if hasattr(array, "element_type") else None
self.heap.store(heap_elem_id, value, type_=element_type)
else:
l.debug("Load %s from %s", heap_elem_id, value)
return value
#
# Concretization strategies
#
def _apply_concretization_strategies(self, idx, strategies, action): # pylint: disable=unused-argument
"""
Applies concretization strategies on the index, until one of them succeeds.
"""
for s in strategies:
try:
idxes = s.concretize(self, idx)
except SimUnsatError:
idxes = None
if idxes:
return idxes
raise SimMemoryAddressError("Unable to concretize index %s" % idx)
[docs] def concretize_store_idx(self, idx, strategies=None):
"""
Concretizes a store index.
:param idx: An expression for the index.
:param strategies: A list of concretization strategies (to override the default).
:param min_idx: Minimum value for a concretized index (inclusive).
:param max_idx: Maximum value for a concretized index (exclusive).
:returns: A list of concrete indexes.
"""
if isinstance(idx, int):
return [idx]
elif not self.state.solver.symbolic(idx):
return [self.state.solver.eval(idx)]
strategies = self.store_strategies if strategies is None else strategies
return self._apply_concretization_strategies(idx, strategies, "store")
[docs] def concretize_load_idx(self, idx, strategies=None):
"""
Concretizes a load index.
:param idx: An expression for the index.
:param strategies: A list of concretization strategies (to override the default).
:param min_idx: Minimum value for a concretized index (inclusive).
:param max_idx: Maximum value for a concretized index (exclusive).
:returns: A list of concrete indexes.
"""
if isinstance(idx, int):
return [idx]
elif not self.state.solver.symbolic(idx):
return [self.state.solver.eval(idx)]
strategies = self.load_strategies if strategies is None else strategies
return self._apply_concretization_strategies(idx, strategies, "load")
def _create_default_load_strategies(self):
# reset dict
self.load_strategies = []
# symbolically read up to 1024 elements
s = concretization_strategies.SimConcretizationStrategyRange(1024)
self.load_strategies.append(s)
# if range is too big, fallback to load only one arbitrary element
s = concretization_strategies.SimConcretizationStrategyAny()
self.load_strategies.append(s)
def _create_default_store_strategies(self):
# reset dict
self.store_strategies = []
# symbolically write up to 256 elements
s = concretization_strategies.SimConcretizationStrategyRange(256)
self.store_strategies.append(s)
# if range is too big, fallback to store only the last element
s = concretization_strategies.SimConcretizationStrategyMax()
self.store_strategies.append(s)
#
# MISC
#
[docs] def set_state(self, state):
super().set_state(state)
if not self.load_strategies:
self._create_default_load_strategies()
if not self.store_strategies:
self._create_default_store_strategies()
@MemoryMixin.memo
def copy(self, memo):
o: "JavaVmMemoryMixin" = super().copy(memo)
o._stack = [stack_frame.copy() for stack_frame in self._stack]
o.heap = self.heap.copy()
o.vm_static_table = self.vm_static_table.copy()
o.load_strategies = [s.copy() for s in self.load_strategies]
o.store_strategies = [s.copy() for s in self.store_strategies]
o.max_array_size = self.max_array_size
return o
[docs] def merge(self, others, merge_conditions, common_ancestor=None): # pylint: disable=unused-argument
l.warning("Merging is not implemented for JavaVM memory!")
[docs] def widen(self, others): # pylint: disable=unused-argument
l.warning("Widening is not implemented for JavaVM memory!")
def _find(
self, addr, what, max_search=None, max_symbolic_bytes=None, default=None
): # pylint: disable=unused-argument
l.warning("Find is not implemented for JavaVM memory!")
return None