Source code for angr.storage.pcap

import socket
import logging

import dpkt

l = logging.getLogger(name=__name__)


[docs]class PCAP:
[docs] def __init__(self, path, ip_port_tup, init=True): self.path = path self.packet_num = 0 self.pos = 0 self.in_streams = [] self.out_streams = [] self.ip = ip_port_tup[0] self.port = ip_port_tup[1] if init: self.initialize(self.path)
[docs] def initialize(self, path): f = open(path) pcap = dpkt.pcap.Reader(f) for _, buf in pcap: ip = dpkt.ethernet.Ethernet(buf).ip tcp = ip.data myip = socket.inet_ntoa(ip.dst) if myip is self.ip and tcp.dport is self.port and len(tcp.data) != 0: self.out_streams.append((len(tcp.data), tcp.data)) elif len(tcp.data) != 0: self.in_streams.append((len(tcp.data), tcp.data)) f.close()
[docs] def recv(self, length): temp = 0 initial_packet = self.packet_num plength, pdata = self.in_streams[self.packet_num] length = min(length, plength) if self.pos == 0: if plength > length: temp = length else: self.packet_num += 1 packet_data = pdata[self.pos : length] self.pos += temp else: if (self.pos + length) >= plength: rest = plength - self.pos length = rest self.packet_num += 1 packet_data = pdata[self.pos : plength] if self.packet_num is not initial_packet: self.pos = 0 return packet_data, length
[docs] def copy(self): new_pcap = PCAP(self.path, (self.ip, self.port), init=False) new_pcap.packet_num = self.packet_num new_pcap.pos = self.pos new_pcap.in_streams = self.in_streams new_pcap.out_streams = self.out_streams return new_pcap