Commit a7c3778d authored by Štěpán Kotek's avatar Štěpán Kotek

qprep: parse cap/pcap files for dns queries. Used as alternative source data for qprep

parent af23b77c
......@@ -7,7 +7,7 @@ do
cd "`dirname "${FILE}"`"
echo "${FILE}"
python3 -m pylint -E "`basename \"${FILE}\"`" 2>/dev/null || RET_CODE=1
python3 -m pylint -E "`basename \"${FILE}\"`" || RET_CODE=1
exit $RET_CODE
\ No newline at end of file
import dns.rdatatype
import dpkt
import dns
from dns.message import Message, from_wire
def obj_blacklisted(msg):
def extract_packet(packet: bytes) -> Message:
Extract packet from bytes. Return dns.Message
frame = dpkt.ethernet.Ethernet(packet)
ip =
transport =
if == b'':
return True
if isinstance(transport, dpkt.tcp.TCP):
wire =[2:]
wire =
dnsmsg = from_wire(wire)
return dnsmsg
def is_blacklisted(packet: bytes) -> bool:
Detect blacklisted DNS message objects.
Detect if given packet is blacklisted or not.
if len(msg.question) >= 1:
if msg.question[0].rdtype == dns.rdatatype.ANY:
dnsmsg = extract_packet(packet)
flags = dns.flags.to_text(dnsmsg.flags).split()
if 'QR' in flags: # not a query
return True
dnspacket = dnsmsg.question[0]
if dnspacket.rdtype == dns.rdatatype.ANY:
return True
return False
return False
except Exception:
return False
......@@ -6,32 +6,84 @@ import multiprocessing.pool as pool
import struct
import sys
import dpkt
import blacklist
import dbhelper
import dns.exception
import dns.message
import dns.rdatatype
import lmdb
import blacklist
import dbhelper
def read_lines(instream):
Yield (line number, stripped line text). Skip empty lines.
Yield (line number, stripped line text, representation for logs). Skip empty lines.
i = 0
for line in instream:
line = line.strip()
if line:
i += 1
yield (i, line)
yield (i, line, line)
if i % REPORT_CHUNKS == 0:'Read %d queries', i)
def parse_pcap(pcap_file):
Filters dns query packets from pcap_file
Yield (packet number, packet as wire, representation for logs)
i = 0
pcap_file = dpkt.pcap.Reader(pcap_file)
for ts, wire in pcap_file:
i += 1
yield (i, wire, '')
def wrk_process_line(args: (int, str, str)):
Worker: parse input line, creates a packet in binary format
Skips over empty lines, raises for malformed inputs.
qid, line, log_repr = args
wire = wire_from_text(line)
except (ValueError, struct.error, dns.exception.DNSException) as ex:
logging.error('Invalid query "%s": %s (skipping query ID %d)', line, ex, qid)
wrk_process_wire_packet(qid, wire, line)
def wrk_process_packet(args: (int, bytes, str)):
Worker: convert packet from pcap to binary data
qid, wire, log_repr = args
wrk_process_wire_packet(qid, wire, log_repr)
def wrk_process_wire_packet(qid: int, wire_packet: bytes, log_repr: str):
Worker: Check if given packet is blacklisted and save it into lmdb db
:arg qid number of packet
:arg wire_packet packet in binary data
:arg log_repr representation of packet for logs
if not blacklist.is_blacklisted(wire_packet):
wrk_lmdb_write(qid, wire_packet)
logging.debug('Query "%s" blacklisted (skipping query ID %d)',
log_repr if log_repr else repr(blacklist.extract_packet(wire_packet)),
def int_or_fromtext(value, fromtext):
return int(value)
......@@ -39,18 +91,19 @@ def int_or_fromtext(value, fromtext):
return fromtext(value)
def q_fromtext(line):
def wire_from_text(text):
Convert line from <qname> <RR type> to DNS query in IN class.
Returns: DNS message object
Returns: DNS packet in binary form
Raises: ValueError or dns.exception.Exception on invalid input
qname, qtype = line.rsplit(None, 1)
qname, qtype = text.rsplit(None, 1)
qname =
qtype = int_or_fromtext(qtype, dns.rdatatype.from_text)
return dns.message.make_query(qname, qtype, dns.rdataclass.IN,
want_dnssec=True, payload=4096)
msg = dns.message.make_query(qname, qtype, dns.rdataclass.IN,
want_dnssec=True, payload=4096)
return msg.to_wire()
def wrk_lmdb_init(envdir):
......@@ -63,14 +116,14 @@ def wrk_lmdb_init(envdir):
config = dbhelper.env_open.copy()
'path': envdir,
'sync': False, # unsafe but fast
'sync': False, # unsafe but fast
'writemap': True # we do not care, this is a new database
env = lmdb.Environment(**config)
db = env.open_db(key=b'queries', **dbhelper.db_open)
def wrk_lmdb_write(qid, wire):
def wrk_lmdb_write(qid: int, wire: bytes):
Worker: write query wire format into database
......@@ -82,35 +135,28 @@ def wrk_lmdb_write(qid, wire):
txn.put(key, wire)
def wrk_process_line(args):
Worker: parse input line and write (qid, wire formt) to LMDB queries DB
Skips over empty lines, raises for malformed inputs.
qid, line = args
msg = q_fromtext(line)
if not blacklist.obj_blacklisted(msg):
wrk_lmdb_write(qid, msg.to_wire())
logging.debug('Query "%s" blacklisted (skipping query ID %d)', line, qid)
except (ValueError, struct.error, dns.exception.DNSException) as ex:
logging.error('Invalid query "%s": %s (skipping query ID %d)', line, ex, qid)
def main():
logging.basicConfig(format='%(levelname)s %(message)s', level=logging.DEBUG)
parser = argparse.ArgumentParser(
description='Convert text list of queries from standard input '
'and store wire format into LMDB "queries" DB. '
'Expected query format is: "<qname> <RR type>", '
'one query per line.')
parser.add_argument('envpath', type=str,
help='path where to create LMDB environment')
description='Convert queries data from standard input and store '
'wire format into LMDB "queries" DB.')
parser.add_argument('envpath', type=str, help='path where to create LMDB environment')
parser.add_argument('-f', '--informat', type=str, choices=['text', 'pcap'], default='text',
help='define format for input data, default value is text\n'
'Expected input for "text" is: "<qname> <RR type>", '
'one query per line.\n'
'Expected input for "pcap" is content of the pcap file.')
parser.add_argument('--pcap-file', type=argparse.FileType('rb'))
args = parser.parse_args()
if args.informat == 'text' and args.pcap_file:
logging.critical("Argument --pcap-file can be use only in combination with -f pcap")
if args.informat == 'pcap' and not args.pcap_file:
logging.critical("Missing path to pcap file, use argument --pcap-file")
if dbhelper.db_exists(args.envpath, dbhelper.QUERIES_DB_NAME):
'LMDB environment "%s" already contains DB %s! '
......@@ -118,10 +164,17 @@ def main():
args.envpath, dbhelper.QUERIES_DB_NAME)
qstream = read_lines(sys.stdin)
with pool.Pool(initializer=wrk_lmdb_init, initargs=(args.envpath,)) as workers:
for _ in workers.imap_unordered(wrk_process_line, qstream, chunksize=1000):
if args.informat == 'text':
data_stream = read_lines(sys.stdin)
method = wrk_process_line
elif args.informat == 'pcap':
data_stream = parse_pcap(args.pcap_file)
method = wrk_process_packet
logging.error('unknown informat, use "text" or "pcap"')
for _ in workers.imap_unordered(method, data_stream, chunksize=1000):
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment