import itertools
import logging
import math
import os
import struct
import weakref
from collections import OrderedDict, deque
from itertools import chain
from typing import Optional, Generic, TypeVar, overload, TYPE_CHECKING, List, Iterable, Iterator, Tuple, NoReturn
if TYPE_CHECKING:
from .bool import Bool
from .fp import FP
from ..annotation import Annotation
try:
import cPickle as pickle
except ImportError:
import pickle
try:
# Python's build-in MD5 is about 2x faster than hashlib.md5 on short bytestrings
import _md5 as md5
except ImportError:
import hashlib as md5
l = logging.getLogger("claripy.ast")
WORKER = bool(os.environ.get("WORKER", False))
md5_unpacker = struct.Struct("2Q")
from_iterable = chain.from_iterable
# pylint:enable=unused-argument
# pylint:disable=unidiomatic-typecheck
T = TypeVar("T", bound="Base")
[docs]class ASTCacheKey(Generic[T]):
[docs] def __init__(self, a: T):
self.ast: T = a
def __hash__(self):
return hash(self.ast)
def __eq__(self, other):
return type(self) is type(other) and self.ast._hash == other.ast._hash
def __repr__(self):
return f"<Key {self.ast._type_name()} {self.ast.__repr__(inner=True)}>"
#
# AST variable naming
#
var_counter = itertools.count()
_unique_names = True
def _make_name(name: str, size: int, explicit_name: bool = False, prefix: str = "") -> str:
if _unique_names and not explicit_name:
return "%s%s_%d_%d" % (prefix, name, next(var_counter), size)
else:
return name
def _d(h, cls, state):
"""
This function is the deserializer for ASTs.
It exists to work around the fact that pickle will (normally) call __new__() with no arguments during
deserialization. For ASTs, this does not work.
"""
op, args, length, variables, symbolic, annotations = state
return cls.__new__(
cls, op, args, length=length, variables=variables, symbolic=symbolic, annotations=annotations, hash=h
)
[docs]class Base:
"""
This is the base class of all claripy ASTs. An AST tracks a tree of operations on arguments.
This class should not be instanciated directly - instead, use one of the constructor functions (BVS, BVV, FPS,
FPV...) to construct a leaf node and then build more complicated expressions using operations.
AST objects have *hash identity*. This means that an AST that has the same hash as another AST will be the *same*
object. This is critical for efficient memory usage. As an example, the following is true::
a, b = two different ASTs
c = b + a
d = b + a
assert c is d
:ivar op: The operation that is being done on the arguments
:ivar args: The arguments that are being used
"""
__slots__ = [
"op",
"args",
"variables",
"symbolic",
"_hash",
"_simplified",
"_cached_encoded_name",
"_cache_key",
"_errored",
"_eager_backends",
"length",
"_excavated",
"_burrowed",
"_uninitialized",
"_uc_alloc_depth",
"annotations",
"simplifiable",
"_uneliminatable_annotations",
"_relocatable_annotations",
"depth",
"__weakref__",
]
_hash_cache = weakref.WeakValueDictionary()
_leaf_cache = weakref.WeakValueDictionary()
FULL_SIMPLIFY = 1
LITE_SIMPLIFY = 2
UNSIMPLIFIED = 0
LITE_REPR = 0
MID_REPR = 1
FULL_REPR = 2
def __new__(cls, op, args, add_variables=None, hash=None, **kwargs): # pylint:disable=redefined-builtin
"""
This is called when you create a new Base object, whether directly or through an operation.
It finalizes the arguments (see the _finalize function, above) and then computes
a hash. If an AST of this hash already exists, it returns that AST. Otherwise,
it creates, initializes, and returns the AST.
:param op: The AST operation ('__add__', 'Or', etc)
:param args: The arguments to the AST operation (i.e., the objects to add)
:param variables: The symbolic variables present in the AST (default: empty set)
:param symbolic: A flag saying whether or not the AST is symbolic (default: False)
:param length: An integer specifying the bit length of this AST (default: None)
:param simplified: A measure of how simplified this AST is. 0 means unsimplified,
1 means fast-simplified (basically, just undoing the Reverse
op), and 2 means simplified through z3.
:param errored: A set of backends that are known to be unable to handle this AST.
:param eager_backends: A list of backends with which to attempt eager evaluation
:param annotations: A frozenset of annotations applied onto this AST.
"""
# if any(isinstance(a, BackendObject) for a in args):
# raise Exception('asdf')
a_args = args if type(args) is tuple else tuple(args)
# initialize the following properties: symbolic, variables and errored
need_symbolic = "symbolic" not in kwargs
need_variables = "variables" not in kwargs
need_errored = "errored" not in kwargs
args_have_annotations = None
# Note that `args_have_annotations` may not be set if we don't need to set any of the above variables, in which
# case it will stay as None, and will be passed to __a_init__() "as is". __a_init__() will properly handle it
# there.
arg_max_depth = 0
if need_symbolic or need_variables or need_errored:
symbolic_flag = False
variables_set = set()
errored_set = set()
for a in a_args:
if not isinstance(a, Base):
continue
if need_symbolic and not symbolic_flag:
symbolic_flag |= a.symbolic
if need_variables:
variables_set |= a.variables
if need_errored:
errored_set |= a._errored
if args_have_annotations is not True:
args_have_annotations = args_have_annotations or bool(a.annotations)
if arg_max_depth < a.depth:
arg_max_depth = a.depth
if need_symbolic:
kwargs["symbolic"] = symbolic_flag
if need_variables:
kwargs["variables"] = frozenset(variables_set)
if need_errored:
kwargs["errored"] = errored_set
if type(kwargs["variables"]) is not frozenset: # pylint:disable=unidiomatic-typecheck
kwargs["variables"] = frozenset(kwargs["variables"])
if add_variables:
kwargs["variables"] = kwargs["variables"] | add_variables
eager_backends = list(backends._eager_backends) if "eager_backends" not in kwargs else kwargs["eager_backends"]
if not kwargs["symbolic"] and eager_backends is not None and op not in operations.leaf_operations:
for eb in eager_backends:
try:
r = operations._handle_annotations(eb._abstract(eb.call(op, args)), args)
if r is not None:
return r
else:
eager_backends.remove(eb)
except BackendError:
eager_backends.remove(eb)
# if we can't be eager anymore, null out the eagerness
kwargs["eager_backends"] = None
# whether this guy is initialized or not
if "uninitialized" not in kwargs:
kwargs["uninitialized"] = None
if "uc_alloc_depth" not in kwargs:
kwargs["uc_alloc_depth"] = None
if "annotations" not in kwargs or kwargs["annotations"] is None:
annotations = ()
else:
annotations = kwargs["annotations"]
# process annotations
if "skip_child_annotations" in kwargs:
skip_child_annotations = kwargs.pop("skip_child_annotations")
else:
skip_child_annotations = False
if not annotations and not args_have_annotations:
uneliminatable_annotations = frozenset()
relocatable_annotations = frozenset()
else:
ast_args = tuple(a for a in a_args if isinstance(a, Base))
uneliminatable_annotations = frozenset(
chain(
(
from_iterable(a._uneliminatable_annotations for a in ast_args)
if not skip_child_annotations
else tuple()
),
tuple(a for a in annotations if not a.eliminatable and not a.relocatable),
)
)
relocatable_annotations = tuple(
OrderedDict(
(e, True)
for e in tuple(
chain(
(
from_iterable(a._relocatable_annotations for a in ast_args)
if not skip_child_annotations
else tuple()
),
tuple(a for a in annotations if not a.eliminatable and a.relocatable),
)
)
).keys()
)
annotations = tuple(
chain(
(
from_iterable(a._relocatable_annotations for a in ast_args)
if not skip_child_annotations
else tuple()
),
tuple(a for a in annotations),
)
)
kwargs["annotations"] = annotations
cache = cls._hash_cache
if hash is not None:
h = hash
elif op in {"BVS", "BVV", "BoolS", "BoolV", "FPS", "FPV"} and not annotations:
if op == "FPV" and a_args[0] == 0.0 and math.copysign(1, a_args[0]) < 0:
# Python does not distinguish between +0.0 and -0.0 so we add sign to tuple to distinguish
h = (op, kwargs.get("length", None), ("-",) + a_args)
elif op == "FPV" and math.isnan(a_args[0]):
# cannot compare nans
h = (op, kwargs.get("length", None), ("nan",) + a_args[1:])
else:
h = (op, kwargs.get("length", None), a_args)
cache = cls._leaf_cache
else:
h = Base._calc_hash(op, a_args, kwargs) if hash is None else hash
self = cache.get(h, None)
if self is None:
self = super().__new__(cls)
depth = arg_max_depth + 1
self.__a_init__(
op,
a_args,
depth=depth,
uneliminatable_annotations=uneliminatable_annotations,
relocatable_annotations=relocatable_annotations,
**kwargs,
)
self._hash = h
cache[h] = self
# else:
# if self.args != a_args or self.op != op or self.variables != kwargs['variables']:
# raise Exception("CRAP -- hash collision")
return self
@classmethod
def __init_with_annotations__(
cls, op, a_args, depth=None, uneliminatable_annotations=None, relocatable_annotations=None, **kwargs
):
cache = cls._hash_cache
h = Base._calc_hash(op, a_args, kwargs)
self = cache.get(h, None)
if self is not None:
return self
self = super().__new__(cls)
self.__a_init__(
op,
a_args,
depth=depth,
uneliminatable_annotations=uneliminatable_annotations,
relocatable_annotations=relocatable_annotations,
**kwargs,
)
self._hash = h
cache[h] = self
return self
def __reduce__(self):
# HASHCONS: these attributes key the cache
# BEFORE CHANGING THIS, SEE ALL OTHER INSTANCES OF "HASHCONS" IN THIS FILE
return _d, (
self._hash,
self.__class__,
(self.op, self.args, self.length, self.variables, self.symbolic, self.annotations),
)
[docs] def __init__(self, *args, **kwargs):
pass
@staticmethod
def _calc_hash(op, args, keywords):
"""
Calculates the hash of an AST, given the operation, args, and kwargs.
:param op: The operation.
:param args: The arguments to the operation.
:param keywords: A dict including the 'symbolic', 'variables', and 'length' items.
:returns: a hash.
We do it using md5 to avoid hash collisions.
(hash(-1) == hash(-2), for example)
"""
args_tup = tuple(a if type(a) in (int, float) else getattr(a, "_hash", hash(a)) for a in args)
# HASHCONS: these attributes key the cache
# BEFORE CHANGING THIS, SEE ALL OTHER INSTANCES OF "HASHCONS" IN THIS FILE
to_hash = Base._ast_serialize(op, args_tup, keywords)
if to_hash is None:
# fall back to pickle.dumps
to_hash = (
op,
args_tup,
str(keywords.get("length", None)),
hash(keywords["variables"]),
keywords["symbolic"],
hash(keywords.get("annotations", None)),
)
to_hash = pickle.dumps(to_hash, -1)
# Why do we use md5 when it's broken? Because speed is more important
# than cryptographic integrity here. Then again, look at all those
# allocations we're doing here... fast python is painful.
hd = md5.md5(to_hash).digest()
return md5_unpacker.unpack(hd)[0] # 64 bits
@staticmethod
def _arg_serialize(arg) -> Optional[bytes]:
if arg is None:
return b"\x0f"
elif arg is True:
return b"\x1f"
elif arg is False:
return b"\x2e"
elif type(arg) is int:
if arg < 0:
if arg >= -0x7FFF:
return b"-" + struct.pack("<h", arg)
elif arg >= -0x7FFF_FFFF:
return b"-" + struct.pack("<i", arg)
elif arg >= -0x7FFF_FFFF_FFFF_FFFF:
return b"-" + struct.pack("<q", arg)
return None
else:
if arg <= 0xFFFF:
return struct.pack("<H", arg)
elif arg <= 0xFFFF_FFFF:
return struct.pack("<I", arg)
elif arg <= 0xFFFF_FFFF_FFFF_FFFF:
return struct.pack("<Q", arg)
return None
elif type(arg) is str:
return arg.encode()
elif type(arg) is float:
return struct.pack("f", arg)
elif type(arg) is tuple:
arr = []
for elem in arg:
b = Base._arg_serialize(elem)
if b is None:
return None
arr.append(b)
return b"".join(arr)
return None
@staticmethod
def _ast_serialize(op: str, args_tup, keywords) -> Optional[bytes]:
"""
Serialize the AST and get a bytestring for hashing.
:param op: The operator.
:param args_tup: A tuple of arguments.
:param keywords: A dict of keywords.
:return: The serialized bytestring.
"""
serialized_args = Base._arg_serialize(args_tup)
if serialized_args is None:
return None
if "length" in keywords:
length = Base._arg_serialize(keywords["length"])
if length is None:
return None
else:
length = b"none"
variables = struct.pack("<Q", hash(keywords["variables"]) & 0xFFFF_FFFF_FFFF_FFFF)
symbolic = b"\x01" if keywords["symbolic"] else b"\x00"
if "annotations" in keywords:
annotations = struct.pack("<Q", hash(keywords["annotations"]) & 0xFFFF_FFFF_FFFF_FFFF)
else:
annotations = b"\xf9"
return op.encode() + serialized_args + length + variables + symbolic + annotations
# pylint:disable=attribute-defined-outside-init
def __a_init__(
self,
op,
args,
variables=None,
symbolic=None,
length=None,
simplified=0,
errored=None,
eager_backends=None,
uninitialized=None,
uc_alloc_depth=None,
annotations=None,
encoded_name=None,
depth=None,
uneliminatable_annotations=None,
relocatable_annotations=None,
): # pylint:disable=unused-argument
"""
Initializes an AST. Takes the same arguments as ``Base.__new__()``
We use this instead of ``__init__`` due to python's undesirable behavior w.r.t. automatically calling it on
return from ``__new__``.
"""
# HASHCONS: these attributes key the cache
# BEFORE CHANGING THIS, SEE ALL OTHER INSTANCES OF "HASHCONS" IN THIS FILE
self.op = op
self.args = args if type(args) is tuple else tuple(args)
self.length = length
self.variables = frozenset(variables) if type(variables) is not frozenset else variables
self.symbolic = symbolic
self.annotations: Tuple[Annotation] = annotations
self._uneliminatable_annotations = uneliminatable_annotations
self._relocatable_annotations = relocatable_annotations
self.depth = depth if depth is not None else 1
self._eager_backends = eager_backends
self._cached_encoded_name = encoded_name
self._errored = errored if errored is not None else set()
self._simplified = simplified
self._cache_key = ASTCacheKey(self)
self._excavated = None
self._burrowed = None
self._uninitialized = uninitialized
self._uc_alloc_depth = uc_alloc_depth
if len(self.args) == 0:
raise ClaripyOperationError("AST with no arguments!")
# pylint:enable=attribute-defined-outside-init
def __hash__(self):
res = self._hash
if type(self._hash) is not int:
res = hash(self._hash)
return res
@property
def cache_key(self: T) -> ASTCacheKey[T]:
"""
A key that refers to this AST - this value is appropriate for usage as a key in dictionaries.
"""
return self._cache_key
@property
def _encoded_name(self):
if self._cached_encoded_name is None:
self._cached_encoded_name = self.args[0].encode() # pylint: disable=attribute-defined-outside-init
return self._cached_encoded_name
#
# Collapsing and simplification
#
# def _models_for(self, backend):
# for a in self.args:
# backend.convert_expr(a)
# else:
# yield backend.convert(a)
[docs] def make_like(self: T, op: str, args: Iterable, **kwargs) -> T:
if kwargs.pop("simplify", False) is True:
# Try to simplify the expression again
simplified = simplifications.simpleton.simplify(op, args)
else:
simplified = None
if simplified is not None:
op = simplified.op
if (
simplified is None
and len(kwargs) == 3
and "annotations" in kwargs
and kwargs["annotations"]
and "skip_child_annotations" in kwargs
and kwargs["skip_child_annotations"] is True
and "length" in kwargs
):
# fast path
annotations = tuple(kwargs["annotations"])
uneliminatable_annotations = frozenset(
anno for anno in annotations if not anno.eliminatable and not anno.relocatable
)
relocatable_annotations = tuple(anno for anno in annotations if not anno.eliminatable and anno.relocatable)
return type(self).__init_with_annotations__(
op,
args,
uneliminatable_annotations=uneliminatable_annotations,
relocatable_annotations=relocatable_annotations,
annotations=annotations,
variables=self.variables,
uninitialized=self._uninitialized,
symbolic=self.symbolic,
length=kwargs["length"],
depth=self.depth,
eager_backends=self._eager_backends,
uc_alloc_depth=self._uc_alloc_depth,
)
all_operations = operations.leaf_operations_symbolic_with_union
if "annotations" not in kwargs:
# special case: if self is one of the args, we do not copy annotations over from self since child
# annotations will be re-processed during AST creation.
if not args or not any(self is arg for arg in args):
kwargs["annotations"] = self.annotations
if "variables" not in kwargs and op in all_operations:
kwargs["variables"] = self.variables
if "uninitialized" not in kwargs:
kwargs["uninitialized"] = self._uninitialized
if "symbolic" not in kwargs and op in all_operations:
kwargs["symbolic"] = self.symbolic
if simplified is None:
# Cannot simplify the expression anymore
return type(self)(op, args, **kwargs)
else:
# The expression is simplified
r = type(self)(op, simplified.args, **kwargs)
return r
def _rename(self, new_name):
if self.op not in {"BVS", "BoolS", "FPS"}:
raise ClaripyOperationError("rename is only supported on leaf nodes")
new_args = (new_name,) + self.args[1:]
return self.make_like(self.op, new_args, length=self.length, variables={new_name})
#
# Annotations
#
def _apply_to_annotations(self, f):
return self.make_like(self.op, self.args, annotations=f(self.annotations), skip_child_annotations=True)
[docs] def append_annotation(self: T, a: "Annotation") -> T:
"""
Appends an annotation to this AST.
:param a: the annotation to append
:returns: a new AST, with the annotation added
"""
return self._apply_to_annotations(lambda alist: alist + (a,))
[docs] def append_annotations(self: T, new_tuple: Tuple["Annotation", ...]) -> T:
"""
Appends several annotations to this AST.
:param new_tuple: the tuple of annotations to append
:returns: a new AST, with the annotations added
"""
return self._apply_to_annotations(lambda alist: alist + new_tuple)
[docs] def annotate(self: T, *args: "Annotation", remove_annotations: Optional[Iterable["Annotation"]] = None) -> T:
"""
Appends annotations to this AST.
:param args: the tuple of annotations to append (variadic positional args)
:param remove_annotations: annotations to remove
:returns: a new AST, with the annotations added
"""
if not remove_annotations:
return self._apply_to_annotations(lambda alist: alist + args)
else:
return self._apply_to_annotations(
lambda alist: tuple(arg for arg in alist if arg not in remove_annotations) + args
)
[docs] def insert_annotation(self: T, a: "Annotation") -> T:
"""
Inserts an annotation to this AST.
:param a: the annotation to insert
:returns: a new AST, with the annotation added
"""
return self._apply_to_annotations(lambda alist: (a,) + alist)
[docs] def insert_annotations(self: T, new_tuple: Tuple["Annotation", ...]) -> T:
"""
Inserts several annotations to this AST.
:param new_tuple: the tuple of annotations to insert
:returns: a new AST, with the annotations added
"""
return self._apply_to_annotations(lambda alist: new_tuple + alist)
[docs] def replace_annotations(self: T, new_tuple: Tuple["Annotation", ...]) -> T:
"""
Replaces annotations on this AST.
:param new_tuple: the tuple of annotations to replace the old annotations with
:returns: a new AST, with the annotations added
"""
return self._apply_to_annotations(lambda alist: new_tuple)
[docs] def remove_annotation(self: T, a: "Annotation") -> T:
"""
Removes an annotation from this AST.
:param a: the annotation to remove
:returns: a new AST, with the annotation removed
"""
return self._apply_to_annotations(lambda alist: tuple(oa for oa in alist if oa != a))
[docs] def remove_annotations(self: T, remove_sequence: Iterable["Annotation"]) -> T:
"""
Removes several annotations from this AST.
:param remove_sequence: a sequence/set of the annotations to remove
:returns: a new AST, with the annotations removed
"""
return self._apply_to_annotations(lambda alist: tuple(oa for oa in alist if oa not in remove_sequence))
#
# Viewing and debugging
#
[docs] def dbg_repr(self, prefix=None) -> str: # pylint:disable=unused-argument
"""
Returns a debug representation of this AST.
"""
return self.shallow_repr(max_depth=None, details=Base.FULL_REPR)
def _type_name(self):
return self.__class__.__name__
def __repr__(self, inner=False, max_depth=None, explicit_length=False):
if WORKER:
return "<AST something>"
else:
return self.shallow_repr(max_depth=max_depth, explicit_length=explicit_length, inner=inner)
[docs] def shallow_repr(
self, max_depth=8, explicit_length=False, details=LITE_REPR, inner=False, parent_prec=15, left=True
) -> str:
"""
Returns a string representation of this AST, but with a maximum depth to
prevent floods of text being printed.
:param max_depth: The maximum depth to print.
:param explicit_length: Print lengths of BVV arguments.
:param details: An integer value specifying how detailed the output should be:
LITE_REPR - print short repr for both operations and BVs,
MID_REPR - print full repr for operations and short for BVs,
FULL_REPR - print full repr of both operations and BVs.
:param inner: whether or not it is an inner AST
:param parent_prec: parent operation precedence level
:param left: whether or not it is a left AST
:returns: A string representing the AST
"""
if max_depth is not None and max_depth <= 0:
return "<...>"
elif self.op in operations.reversed_ops:
op = operations.reversed_ops[self.op]
args = reversed(self.args)
else:
op = self.op
args = self.args
next_max_depth = max_depth - 1 if max_depth is not None else None
length = self.length if explicit_length else None
# if operation is not in op_precedence, assign the "least operation precedence"
op_prec = operations.op_precedence[op] if op in operations.op_precedence else 15
args = [
(
arg.shallow_repr(next_max_depth, explicit_length, details, True, op_prec, idx == 0)
if isinstance(arg, Base)
else arg
)
for idx, arg in enumerate(args)
]
prec_diff = parent_prec - op_prec
inner_infix_use_par = prec_diff < 0 or prec_diff == 0 and not left
inner_repr = self._op_repr(op, args, inner, length, details, inner_infix_use_par)
if not inner:
return f"<{self._type_name()} {inner_repr}>"
else:
return inner_repr
@staticmethod
def _op_repr(op, args, inner, length, details, inner_infix_use_par):
if details < Base.FULL_REPR:
if op == "BVS":
extras = []
if args[1] is not None:
fmt = "%#x" if type(args[1]) is int else "%s"
extras.append("min=%s" % (fmt % args[1]))
if args[2] is not None:
fmt = "%#x" if type(args[2]) is int else "%s"
extras.append("max=%s" % (fmt % args[2]))
if args[3] is not None:
fmt = "%#x" if type(args[3]) is int else "%s"
extras.append("stride=%s" % (fmt % args[3]))
if args[4] is True:
extras.append("UNINITIALIZED")
return "{}{}".format(args[0], "{%s}" % ", ".join(extras) if extras else "")
elif op == "BoolV":
return str(args[0])
elif op == "BVV":
if args[0] is None:
value = "!"
elif args[1] < 10:
value = format(args[0], "")
else:
value = format(args[0], "#x")
return value + "#%d" % length if length is not None else value
if details < Base.MID_REPR:
if op == "If":
value = f"if {args[0]} then {args[1]} else {args[2]}"
return f"({value})" if inner else value
elif op == "Not":
return f"!{args[0]}"
elif op == "Extract":
return f"{args[2]}[{args[0]}:{args[1]}]"
elif op == "ZeroExt":
value = f"0#{args[0]} .. {args[1]}"
return f"({value})" if inner else value
elif op in operations.prefix:
assert len(args) == 1
value = f"{operations.prefix[op]}{args[0]}"
return f"({value})" if inner and inner_infix_use_par else value
elif op in operations.infix:
value = f" {operations.infix[op]} ".join(args)
return f"({value})" if inner and inner_infix_use_par else value
return "{}({})".format(op, ", ".join(str(arg) for arg in args))
[docs] def children_asts(self) -> Iterator["Base"]:
"""
Return an iterator over the nested children ASTs.
"""
ast_queue = deque([iter(self.args)])
while ast_queue:
try:
ast = next(ast_queue[-1])
except StopIteration:
ast_queue.pop()
continue
if isinstance(ast, Base):
ast_queue.append(iter(ast.args))
l.debug("Yielding AST %s with hash %s with %d children", ast, hash(ast), len(ast.args))
yield ast
[docs] def leaf_asts(self) -> Iterator["Base"]:
"""
Return an iterator over the leaf ASTs.
"""
seen = set()
ast_queue = deque([self])
while ast_queue:
ast = ast_queue.pop()
if isinstance(ast, Base) and id(ast.cache_key) not in seen:
seen.add(id(ast.cache_key))
if ast.depth == 1:
yield ast
continue
ast_queue.extend(ast.args)
continue
# TODO: Deprecate this property
@property
def recursive_children_asts(self):
"""
DEPRECATED: Use children_asts() instead.
"""
return self.children_asts()
# TODO: Deprecate this property
@property
def recursive_leaf_asts(self):
"""
DEPRECATED: Use leaf_asts() instead.
"""
return self.leaf_asts()
[docs] def dbg_is_looped(self):
l.debug("Checking AST with hash %s for looping", hash(self))
seen = set()
for child_ast in self.children_asts():
if hash(child_ast) in seen:
return child_ast
seen.add(hash(child_ast))
return False
#
# Various AST modifications (replacements)
#
[docs] def swap_args(self: T, new_args, new_length=None, **kwargs) -> T:
"""
This returns the same AST, with the arguments swapped out for new_args.
"""
if len(self.args) == len(new_args) and all(a is b for a, b in zip(self.args, new_args)):
return self
# symbolic = any(a.symbolic for a in new_args if isinstance(a, Base))
# variables = frozenset.union(frozenset(), *(a.variables for a in new_args if isinstance(a, Base)))
length = self.length if new_length is None else new_length
a = self.make_like(self.op, new_args, length=length, **kwargs)
# if a.op != self.op or a.symbolic != self.symbolic or a.variables != self.variables:
# raise ClaripyOperationError("major bug in swap_args()")
return a
#
# Other helper functions
#
[docs] def split(self, split_on: Iterable[str]) -> List:
"""
Splits the AST if its operation is `split_on` (i.e., return all the arguments). Otherwise, return a list with
just the AST.
"""
if self.op in split_on:
return list(self.args)
else:
return [self]
# we don't support iterating over Base objects
def __iter__(self) -> NoReturn:
"""
This prevents people from iterating over ASTs.
"""
raise ClaripyOperationError("Please don't iterate over, or split, AST nodes!")
def __bool__(self) -> NoReturn:
"""
This prevents people from accidentally using an AST as a condition. For
example, the following was previously common::
a,b = two ASTs
if a == b:
do something
The problem is that `a == b` would return an AST, because an AST can be symbolic
and there could be no way to actually know the value of that without a
constraint solve. This caused tons of issues.
"""
raise ClaripyOperationError(
"testing Expressions for truthiness does not do what you want, as these expressions can be symbolic"
)
[docs] def structurally_match(self: T, o: T) -> bool:
"""
Structurally compares two A objects, and check if their corresponding leaves are definitely the same A object
(name-wise or hash-identity wise).
:param o: the other claripy A object
:returns: True/False
"""
# TODO: Convert a and b into canonical forms
if self.op != o.op:
return False
if len(self.args) != len(o.args):
return False
for arg_a, arg_b in zip(self.args, o.args):
if not isinstance(arg_a, Base):
if type(arg_a) != type(arg_b):
return False
# They are not ASTs
if arg_a != arg_b:
return False
else:
continue
if arg_a.op in operations.leaf_operations:
if arg_a is not arg_b:
return False
else:
if not arg_a.structurally_match(arg_b):
return False
return True
[docs] def replace_dict(self: T, replacements, variable_set=None, leaf_operation=None) -> T:
"""
Returns this AST with subexpressions replaced by those that can be found in `replacements`
dict.
:param variable_set: For optimization, ast's without these variables are not checked
for replacing.
:param replacements: A dictionary of hashes to their replacements.
:param leaf_operation: An operation that should be applied to the leaf nodes.
:returns: An AST with all instances of ast's in replacements.
"""
if variable_set is None:
variable_set = set()
if leaf_operation is None:
leaf_operation = lambda x: x
arg_queue = [iter([self])]
rep_queue = []
ast_queue = []
while arg_queue:
try:
ast = next(arg_queue[-1])
repl = ast
if not isinstance(ast, Base):
rep_queue.append(repl)
continue
elif ast.cache_key in replacements:
repl = replacements[ast.cache_key]
elif ast.variables >= variable_set:
if ast.op in operations.leaf_operations:
repl = leaf_operation(ast)
if repl is not ast:
replacements[ast.cache_key] = repl
elif ast.depth > 1:
arg_queue.append(iter(ast.args))
ast_queue.append(ast)
continue
rep_queue.append(repl)
continue
except StopIteration:
arg_queue.pop()
if ast_queue:
ast = ast_queue.pop()
repl = ast
args = rep_queue[-len(ast.args) :]
del rep_queue[-len(ast.args) :]
# Check if replacement occurred.
if any((a is not b for a, b in zip(ast.args, args))):
repl = ast.make_like(ast.op, tuple(args))
replacements[ast.cache_key] = repl
rep_queue.append(repl)
assert len(arg_queue) == 0, "arg_queue is not empty"
assert len(ast_queue) == 0, "ast_queue is not empty"
assert len(rep_queue) == 1, ("rep_queue has unexpected length", len(rep_queue))
return rep_queue.pop()
[docs] def replace(self: T, old, new, variable_set=None, leaf_operation=None) -> T: # pylint:disable=unused-argument
"""
Returns this AST but with the AST 'old' replaced with AST 'new' in its subexpressions.
"""
self._check_replaceability(old, new)
replacements = {old.cache_key: new}
return self.replace_dict(replacements, variable_set=old.variables)
@staticmethod
def _check_replaceability(old, new):
if not isinstance(old, Base) or not isinstance(new, Base):
raise ClaripyReplacementError("replacements must be AST nodes")
if type(old) is not type(new):
raise ClaripyReplacementError(f"cannot replace type {type(old)} ast with type {type(new)} ast")
def _identify_vars(self, all_vars, counter):
if self.op == "BVS":
if self.args not in all_vars:
all_vars[self.args] = BV("BVS", self.args, length=self.length, explicit_name=True)
elif self.op == "BoolS":
if self.args not in all_vars:
all_vars[self.args] = BoolS("var_" + str(next(counter)))
else:
for arg in self.args:
if isinstance(arg, Base):
arg._identify_vars(all_vars, counter)
[docs] def canonicalize(self: T, var_map=None, counter=None) -> T:
counter = itertools.count() if counter is None else counter
var_map = {} if var_map is None else var_map
for v in self.leaf_asts():
if v.cache_key not in var_map and v.op in {"BVS", "BoolS", "FPS"}:
new_name = "canonical_%d" % next(counter)
var_map[v.cache_key] = v._rename(new_name)
return var_map, counter, self.replace_dict(var_map)
#
# This code handles burrowing ITEs deeper into the ast and excavating
# them to shallower levels.
#
def _burrow_ite(self):
if self.op != "If":
# print("i'm not an if")
return self.swap_args([(a.ite_burrowed if isinstance(a, Base) else a) for a in self.args])
if not all(isinstance(a, Base) for a in self.args):
# print("not all my args are bases")
return self
old_true = self.args[1]
old_false = self.args[2]
if old_true.op != old_false.op or len(old_true.args) != len(old_false.args):
return self
if old_true.op == "If":
# let's no go into this right now
return self
if any(a.op in operations.leaf_operations for a in self.args):
# burrowing through these is pretty funny
return self
matches = [old_true.args[i] is old_false.args[i] for i in range(len(old_true.args))]
if matches.count(True) != 1 or all(matches):
# TODO: handle multiple differences for multi-arg ast nodes
# print("wrong number of matches:",matches,old_true,old_false)
return self
different_idx = matches.index(False)
inner_if = If(self.args[0], old_true.args[different_idx], old_false.args[different_idx])
new_args = list(old_true.args)
new_args[different_idx] = inner_if.ite_burrowed
# print("replaced the",different_idx,"arg:",new_args)
return old_true.__class__(old_true.op, new_args, length=self.length)
def _excavate_ite(self):
ast_queue = [iter([self])]
arg_queue = []
op_queue = []
while ast_queue:
try:
ast = next(ast_queue[-1])
if not isinstance(ast, Base):
arg_queue.append(ast)
continue
if ast.op in operations.leaf_operations:
arg_queue.append(ast)
continue
if ast.annotations:
arg_queue.append(ast)
continue
op_queue.append(ast)
ast_queue.append(iter(ast.args))
except StopIteration:
ast_queue.pop()
if op_queue:
op = op_queue.pop()
args = arg_queue[-len(op.args) :]
del arg_queue[-len(op.args) :]
ite_args = [isinstance(a, Base) and a.op == "If" for a in args]
if op.op == "If":
# if we are an If, call the If handler so that we can take advantage of its simplifiers
excavated = If(*args)
elif ite_args.count(True) == 0:
# if there are no ifs that came to the surface, there's nothing more to do
excavated = op.swap_args(args, simplify=True)
else:
# this gets called when we're *not* in an If, but there are Ifs in the args.
# it pulls those Ifs out to the surface.
cond = args[ite_args.index(True)].args[0]
new_true_args = []
new_false_args = []
for a in args:
if not isinstance(a, Base) or a.op != "If":
new_true_args.append(a)
new_false_args.append(a)
elif a.args[0] is cond:
new_true_args.append(a.args[1])
new_false_args.append(a.args[2])
elif a.args[0] is Not(cond):
new_true_args.append(a.args[2])
new_false_args.append(a.args[1])
else:
# weird conditions -- giving up!
excavated = op.swap_args(args, simplify=True)
break
else:
excavated = If(
cond,
op.swap_args(new_true_args, simplify=True),
op.swap_args(new_false_args, simplify=True),
)
# continue
arg_queue.append(excavated)
assert len(op_queue) == 0, "op_queue is not empty"
assert len(ast_queue) == 0, "ast_queue is not empty"
assert len(arg_queue) == 1, ("arg_queue has unexpected length", len(arg_queue))
return arg_queue.pop()
@property
def ite_burrowed(self: T) -> T:
"""
Returns an equivalent AST that "burrows" the ITE expressions as deep as possible into the ast, for simpler
printing.
"""
if self._burrowed is None:
self._burrowed = self._burrow_ite() # pylint:disable=attribute-defined-outside-init
self._burrowed._burrowed = self._burrowed # pylint:disable=attribute-defined-outside-init
return self._burrowed
@property
def ite_excavated(self: T) -> T:
"""
Returns an equivalent AST that "excavates" the ITE expressions out as far as possible toward the root of the
AST, for processing in static analyses.
"""
if self._excavated is None:
self._excavated = self._excavate_ite() # pylint:disable=attribute-defined-outside-init
# we set the flag for the children so that we avoid re-excavating during
# VSA backend evaluation (since the backend evaluation recursively works on
# the excavated ASTs)
self._excavated._excavated = self._excavated
return self._excavated
#
# these are convenience operations
#
def _first_backend(self, what):
for b in backends._all_backends:
if b in self._errored or b.is_smt_backend:
continue
try:
return getattr(b, what)(self)
except BackendError:
pass
return None
@property
def concrete_value(self):
return self._model_concrete.value
# yan I'm gonna kill you
@property
def cv(self):
return self._model_concrete.value
@property
def v(self):
return self._model_concrete.value
@property
def singlevalued(self) -> bool:
return self._first_backend("singlevalued")
@property
def multivalued(self) -> bool:
return self._first_backend("multivalued")
@property
def cardinality(self) -> int:
return self._first_backend("cardinality")
@property
def concrete(self) -> bool:
# fast path
if self.op in {"BVV", "BoolV", "FPV"}:
return True
if self.op in {"BVS", "BoolS", "FPS"}:
return False
if self.variables:
return False
return backends.concrete.handles(self)
@property
def uninitialized(self) -> bool:
"""
Whether this AST comes from an uninitialized dereference or not. It's only used in under-constrained symbolic
execution mode.
:returns: True/False/None (unspecified).
"""
# TODO: It should definitely be moved to the proposed Annotation backend.
return self._uninitialized
@property
def uc_alloc_depth(self) -> int:
"""
The depth of allocation by lazy-initialization. It's only used in under-constrained symbolic execution mode.
:returns: An integer indicating the allocation depth, or None if it's not from
lazy-initialization.
"""
# TODO: It should definitely be moved to the proposed Annotation backend.
return self._uc_alloc_depth
[docs] def to_claripy(self: T) -> T:
"""
Returns itself. Provides compatibility with other classes (such as SimActionObject) which provide a similar
method to unwrap to an AST.
"""
return self
#
# Backwards compatibility crap
#
def __getattr__(self, a):
if not a.startswith("_model_"):
raise AttributeError(a)
model_name = a[7:]
if not hasattr(backends, model_name):
raise AttributeError(a)
try:
return getattr(backends, model_name).convert(self)
except BackendError:
return self
[docs]def simplify(e: T) -> T:
if isinstance(e, Base) and e.op in operations.leaf_operations:
return e
s = e._first_backend("simplify")
if s is None:
l.debug("Unable to simplify expression")
return e
else:
# Copy some parameters (that should really go to the Annotation backend)
s._uninitialized = e.uninitialized
s._uc_alloc_depth = e._uc_alloc_depth
s._simplified = Base.FULL_SIMPLIFY
# dealing with annotations
if e.annotations:
ast_args = tuple(a for a in e.args if isinstance(a, Base))
annotations = tuple(
set(chain(from_iterable(a._relocatable_annotations for a in ast_args), tuple(a for a in e.annotations)))
)
if annotations != s.annotations:
s = s.remove_annotations(s.annotations)
s = s.annotate(*annotations)
return s
from ..errors import BackendError, ClaripyOperationError, ClaripyReplacementError
from .. import operations
from ..backend_manager import backends
from ..ast.bool import If, Not, BoolS
from ..ast.bv import BV
from .. import simplifications