Source code for angr.analyses.cfg.segment_list

# pylint:disable=no-else-break
import logging
from typing import Optional, Tuple, List

from ...errors import AngrCFGError


l = logging.getLogger(name=__name__)


[docs]class Segment: """ Representing a memory block. This is not the "Segment" in ELF memory model """ __slots__ = ["start", "end", "sort"]
[docs] def __init__(self, start, end, sort): """ :param int start: Start address. :param int end: End address. :param str sort: Type of the segment, can be code, data, etc. :return: None """ self.start = start self.end = end self.sort = sort
def __repr__(self): s = f"[{self.start:#x}-{self.end:#x}, {self.sort}]" return s @property def size(self): """ Calculate the size of the Segment. :return: Size of the Segment. :rtype: int """ return self.end - self.start
[docs] def copy(self): """ Make a copy of the Segment. :return: A copy of the Segment instance. :rtype: angr.analyses.cfg_fast.Segment """ return Segment(self.start, self.end, self.sort)
[docs]class SegmentList: """ SegmentList describes a series of segmented memory blocks. You may query whether an address belongs to any of the blocks or not, and obtain the exact block(segment) that the address belongs to. """ __slots__ = ["_list", "_bytes_occupied"]
[docs] def __init__(self): self._list: List[Segment] = [] self._bytes_occupied = 0
# # Overridden methods # def __len__(self): return len(self._list) def __getitem__(self, idx: int) -> Segment: return self._list[idx] # # Private methods # def _insert_and_merge(self, address: int, size: int, sort: str, idx: int) -> None: """ Determines whether the block specified by (address, size) should be merged with adjacent blocks. :param address: Starting address of the block to be merged. :param size: Size of the block to be merged. :param sort: Type of the block. :param idx: ID of the address. """ # sanity check if idx > 0 and address + size <= self._list[idx - 1].start: # There is a bug, since _list[idx] must be the closest one that is less than the current segment l.warning("BUG FOUND: new segment should always be greater than _list[idx].") # Anyways, let's fix it. self._insert_and_merge(address, size, sort, idx - 1) return # Insert the block first # The new block might be overlapping with other blocks. _insert_and_merge_core will fix the overlapping. if idx == len(self._list): self._list.append(Segment(address, address + size, sort)) else: self._list.insert(idx, Segment(address, address + size, sort)) # Apparently _bytes_occupied will be wrong if the new block overlaps with any existing block. We will fix it # later self._bytes_occupied += size # Search forward to merge blocks if necessary pos = idx while pos < len(self._list): merged, pos, bytes_change = self._insert_and_merge_core(pos, "forward") if not merged: break self._bytes_occupied += bytes_change # Search backward to merge blocks if necessary pos = idx while pos > 0: merged, pos, bytes_change = self._insert_and_merge_core(pos, "backward") if not merged: break self._bytes_occupied += bytes_change def _insert_and_merge_core(self, pos: int, direction: str): """ The core part of method _insert_and_merge. :param pos: The starting position. :param direction: If we are traversing forwards or backwards in the list. It determines where the "sort" of the overlapping memory block comes from. If everything works as expected, "sort" of the overlapping block is always equal to the segment occupied most recently. :return: A tuple of (merged (bool), new position to begin searching (int), change in total bytes (int) :rtype: tuple """ bytes_changed = 0 if direction == "forward": if pos == len(self._list) - 1: return False, pos, 0 previous_segment = self._list[pos] previous_segment_pos = pos segment = self._list[pos + 1] segment_pos = pos + 1 else: # if direction == "backward": if pos == 0: return False, pos, 0 segment = self._list[pos] segment_pos = pos previous_segment = self._list[pos - 1] previous_segment_pos = pos - 1 merged = False new_pos = pos if segment.start <= previous_segment.end: # we should always have new_start+new_size >= segment.start if segment.sort == previous_segment.sort: # They are of the same sort - we should merge them! new_end = max(previous_segment.end, segment.start + segment.size) new_start = min(previous_segment.start, segment.start) new_size = new_end - new_start self._list[segment_pos] = Segment(new_start, new_end, segment.sort) self._list.pop(previous_segment_pos) bytes_changed = -(segment.size + previous_segment.size - new_size) merged = True new_pos = previous_segment_pos else: # Different sorts. It's a bit trickier. if segment.start == previous_segment.end: # They are adjacent. Just don't merge. pass else: # They are overlapping. We will create one, two, or three different blocks based on how they are # overlapping new_segments = [] if segment.start < previous_segment.start: new_segments.append(Segment(segment.start, previous_segment.start, segment.sort)) sort = previous_segment.sort if direction == "forward" else segment.sort new_segments.append(Segment(previous_segment.start, previous_segment.end, sort)) if segment.end < previous_segment.end: new_segments.append(Segment(segment.end, previous_segment.end, previous_segment.sort)) elif segment.end > previous_segment.end: new_segments.append(Segment(previous_segment.end, segment.end, segment.sort)) else: # segment.start >= previous_segment.start if segment.start > previous_segment.start: new_segments.append(Segment(previous_segment.start, segment.start, previous_segment.sort)) sort = previous_segment.sort if direction == "forward" else segment.sort if segment.end > previous_segment.end: new_segments.append(Segment(segment.start, previous_segment.end, sort)) new_segments.append(Segment(previous_segment.end, segment.end, segment.sort)) elif segment.end < previous_segment.end: new_segments.append(Segment(segment.start, segment.end, sort)) new_segments.append(Segment(segment.end, previous_segment.end, previous_segment.sort)) else: new_segments.append(Segment(segment.start, segment.end, sort)) # merge segments in new_segments array if they are of the same sort i = 0 while len(new_segments) > 1 and i < len(new_segments) - 1: s0 = new_segments[i] s1 = new_segments[i + 1] if s0.sort == s1.sort: new_segments = ( new_segments[:i] + [Segment(s0.start, s1.end, s0.sort)] + new_segments[i + 2 :] ) else: i += 1 # Put new segments into self._list old_size = sum([seg.size for seg in self._list[previous_segment_pos : segment_pos + 1]]) new_size = sum([seg.size for seg in new_segments]) bytes_changed = new_size - old_size self._list = self._list[:previous_segment_pos] + new_segments + self._list[segment_pos + 1 :] merged = True if direction == "forward": new_pos = previous_segment_pos + len(new_segments) - 1 else: new_pos = previous_segment_pos return merged, new_pos, bytes_changed def _remove(self, init_address: int, init_size: int, init_idx: int) -> None: address = init_address size = init_size idx = init_idx while idx < len(self._list): segment = self._list[idx] if segment.start <= address: if address < segment.start + segment.size < address + size: # |---segment---| # |---address + size---| # shrink segment segment.end = address # adjust address new_address = segment.start + segment.size # adjust size size = address + size - new_address address = new_address # update idx idx = self.search(address) elif address < segment.start + segment.size and address + size <= segment.start + segment.size: # |--------segment--------| # |--address + size--| # break segment seg0 = Segment(segment.start, address, segment.sort) seg1 = Segment(address + size, segment.start + segment.size, segment.sort) # remove the current segment self._list.remove(segment) if seg1.size > 0: self._list.insert(idx, seg1) if seg0.size > 0: self._list.insert(idx, seg0) # done break else: raise RuntimeError("Unreachable reached") else: # if segment.start > address if address + size <= segment.start: # |--- segment ---| # |-- address + size --| # no overlap break elif segment.start < address + size <= segment.start + segment.size: # |---- segment ----| # |-- address + size --| # # update the start of the segment segment.start = address + size if segment.size == 0: # remove the segment self._list.remove(segment) break elif address + size > segment.start + segment.size: # |---- segment ----| # |--------- address + size ----------| self._list.remove(segment) new_address = segment.end size = address + size - new_address address = new_address idx = self.search(address) else: raise RuntimeError("Unreachable reached") def _dbg_output(self): """ Returns a string representation of the segments that form this SegmentList :return: String representation of contents :rtype: str """ s = "[" lst = [] for segment in self._list: lst.append(repr(segment)) s += ", ".join(lst) s += "]" return s def _debug_check(self): """ Iterates over list checking segments with same sort do not overlap :raise: Exception: if segments overlap space with same sort """ # old_start = 0 old_end = 0 old_sort = "" for segment in self._list: if segment.start <= old_end and segment.sort == old_sort: raise AngrCFGError("Error in SegmentList: blocks are not merged") # old_start = start old_end = segment.end old_sort = segment.sort # # Public methods #
[docs] def search(self, addr: int) -> int: """ Checks which segment that the address `addr` should belong to, and, returns the offset of that segment. Note that the address may not actually belong to the block. :param addr: The address to search :return: The offset of the segment. """ start = 0 end = len(self._list) while start != end: mid = (start + end) // 2 segment = self._list[mid] if addr < segment.start: end = mid elif addr >= segment.end: start = mid + 1 else: # Overlapped :( start = mid break return start
[docs] def next_free_pos(self, address): """ Returns the next free position with respect to an address, including that address itself :param address: The address to begin the search with (including itself) :return: The next free position """ idx = self.search(address) if idx < len(self._list) and self._list[idx].start <= address < self._list[idx].end: # Occupied i = idx while i + 1 < len(self._list) and self._list[i].end == self._list[i + 1].start: i += 1 if i == len(self._list): return self._list[-1].end return self._list[i].end return address
[docs] def next_pos_with_sort_not_in(self, address, sorts, max_distance=None): """ Returns the address of the next occupied block whose sort is not one of the specified ones. :param int address: The address to begin the search with (including itself). :param sorts: A collection of sort strings. :param max_distance: The maximum distance between `address` and the next position. Search will stop after we come across an occupied position that is beyond `address` + max_distance. This check will be disabled if `max_distance` is set to None. :return: The next occupied position whose sort is not one of the specified ones, or None if no such position exists. :rtype: int or None """ list_length = len(self._list) idx = self.search(address) if idx < list_length: # Occupied block = self._list[idx] if max_distance is not None and address + max_distance < block.start: return None if block.start <= address < block.end: # the address is inside the current block if block.sort not in sorts: return address # tick the idx forward by 1 idx += 1 i = idx while i < list_length: if max_distance is not None and address + max_distance < self._list[i].start: return None if self._list[i].sort not in sorts: return self._list[i].start i += 1 return None
[docs] def is_occupied(self, address): """ Check if an address belongs to any segment :param address: The address to check :return: True if this address belongs to a segment, False otherwise """ idx = self.search(address) if len(self._list) <= idx: return False if self._list[idx].start <= address < self._list[idx].end: return True if idx > 0 and address < self._list[idx - 1].end: # TODO: It seems that this branch is never reached. Should it be removed? return True return False
[docs] def occupied_by_sort(self, address: int) -> Optional[str]: """ Check if an address belongs to any segment, and if yes, returns the sort of the segment :param address: The address to check :return: Sort of the segment that occupies this address """ idx = self.search(address) if len(self._list) <= idx: return None if self._list[idx].start <= address < self._list[idx].end: return self._list[idx].sort if idx > 0 and address < self._list[idx - 1].end: # TODO: It seems that this branch is never reached. Should it be removed? return self._list[idx - 1].sort return None
[docs] def occupied_by(self, address: int) -> Optional[Tuple[int, int, str]]: """ Check if an address belongs to any segment, and if yes, returns the beginning, the size, and the sort of the segment. :param address: The address to check """ idx = self.search(address) if len(self._list) <= idx: return None if self._list[idx].start <= address < self._list[idx].end: block = self._list[idx] return block.start, block.size, block.sort if idx > 0 and address < self._list[idx - 1].end: # TODO: It seems that this branch is never reached. Should it be removed? block = self._list[idx - 1] return block.start, block.size, block.sort return None
[docs] def occupy(self, address, size, sort): """ Include a block, specified by (address, size), in this segment list. :param int address: The starting address of the block. :param int size: Size of the block. :param str sort: Type of the block. :return: None """ if size is None or size <= 0: # Cannot occupy a non-existent block return # l.debug("Occpuying 0x%08x-0x%08x", address, address + size) if not self._list: self._list.append(Segment(address, address + size, sort)) self._bytes_occupied += size return # Find adjacent element in our list idx = self.search(address) self._insert_and_merge(address, size, sort, idx)
# self._debug_check()
[docs] def release(self, address: int, size: int) -> None: """ Remove a block, specified by (address, size), in this segment list. :param address: The starting address of the block. :param size: Size of the block. """ if size is None or size <= 0: # cannot release a non-existent block return if not self._list: return idx = self.search(address) if idx < len(self._list): self._remove(address, size, idx)
# self._debug_check()
[docs] def copy(self): """ Make a copy of the SegmentList. :return: A copy of the SegmentList instance. :rtype: angr.analyses.cfg_fast.SegmentList """ n = SegmentList() n._list = [a.copy() for a in self._list] n._bytes_occupied = self._bytes_occupied return n
# # Properties # @property def occupied_size(self): """ The sum of sizes of all blocks :return: An integer """ return self._bytes_occupied @property def has_blocks(self): """ Returns if this segment list has any block or not. !is_empty :return: True if it's not empty, False otherwise """ return len(self._list) > 0