Commit 0dca21f4 authored by Marek Vavrusa's avatar Marek Vavrusa

scenario: PCAP='myfile' for capturing test in pcap

if the PCAP env variable is present during the test,
all test queries/responses are captured in a pcap
parent b912aaaf
......@@ -5,13 +5,35 @@ import dns.dnssec
import dns.tsigkeyring
import binascii
import socket, struct
import os
import os, sys
import itertools
import time
from datetime import datetime
from dprint import dprint
from testserver import recvfrom_msg, sendto_msg
# If PCAP is pointed to a file, queries/responses from the test are captured
g_pcap = None
if 'PCAP' in os.environ:
import dpkt
g_pcap = dpkt.pcap.Writer(open(os.environ['PCAP'], 'wb'))
def log_packet(sock, buf, query = True):
""" Fake underlying layers and store packet in a pcap. """
if not g_pcap:
src, dst = (sock.getpeername()[0], 53), sock.getsockname()
if query:
src, dst = sock.getsockname(), (sock.getpeername()[0], 53)
# Synthesise IP/UDP/Eth layers
transport = dpkt.udp.UDP(data = buf, dport = dst[1], sport = src[1])
transport.ulen = len(transport)
ip = dpkt.ip.IP(src = socket.inet_pton(, src[0]),
dst = socket.inet_pton(, dst[0]), p = dpkt.ip.IP_PROTO_UDP) = transport
ip.len = len(ip)
eth = dpkt.ethernet.Ethernet(data = ip)
# Element comparators
......@@ -386,12 +408,12 @@ class Step:
raise Exception("response definition required")
expected =[0]
if expected.is_raw_data_entry is True:
dprint("[ __check_answer ]", ctx.last_raw_answer.to_text())
dprint("", ctx.last_raw_answer.to_text())
if ctx.last_answer is None:
raise Exception("no answer from preceding query")
dprint("[ __check_answer ]", ctx.last_answer.to_text())
dprint("", ctx.last_answer.to_text())
def __query(self, ctx, tcp = False, choice = None, source = None):
......@@ -422,6 +444,7 @@ class Step:
if source: sock.bind((source, 0))
# Send query to client and wait for response
log_packet(sock, data_to_wire, query = True)
while True:
sendto_msg(sock, data_to_wire)
......@@ -445,6 +468,7 @@ class Step:
ctx.last_raw_answer = answer
if self.raw_answer is not None:
self.answer = dns.message.from_wire(self.raw_answer)
log_packet(sock, answer, query = False)
self.answer = None
ctx.last_answer = self.answer
