qprep.py 6.55 KB
Newer Older
1 2 3 4
#!/usr/bin/env python3

import argparse
import logging
5
from multiprocessing import pool
6
import signal
7
import struct
8
import sys
9
from typing import Optional, Tuple
10

11
import dpkt
12
import dns.exception
13 14
import dns.message
import dns.rdatatype
15

16
from respdiff import blacklist, cli
17
from respdiff.database import LMDB, qid2key
18

19
REPORT_CHUNKS = 10000
20 21


22 23
def read_lines(instream):
    """
24
    Yield (line number, stripped line text, representation for logs). Skip empty lines.
25
    """
26
    i = 1
27
    for line in instream:
28 29
        if i % REPORT_CHUNKS == 0:
            logging.info('Read %d lines', i)
30 31
        line = line.strip()
        if line:
32
            yield (i, line, line)
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
        i += 1


def extract_wire(packet: bytes) -> bytes:
    """
    Extract DNS message wire format from PCAP packet.
    UDP payload is passed as it was.
    TCP payload will have first two bytes removed (length prefix).
    Caller must verify if return value is a valid DNS message
    and decice what to do with invalid ones.
    """
    frame = dpkt.ethernet.Ethernet(packet)
    ip = frame.data
    transport = ip.data
    if isinstance(transport, dpkt.tcp.TCP):
        if len(transport.data) < 2:
            return transport.data
        wire = transport.data[2:]
    else:
        wire = transport.data
    return wire
54 55


56 57 58 59 60
def parse_pcap(pcap_file):
    """
    Filters dns query packets from pcap_file
    Yield (packet number, packet as wire, representation for logs)
    """
61
    i = 1
62
    pcap_file = dpkt.pcap.Reader(pcap_file)
63 64 65 66
    for _, frame in pcap_file:
        if i % REPORT_CHUNKS == 0:
            logging.info('Read %d frames', i)
        yield (i, frame, 'frame no. {}'.format(i))
67 68 69
        i += 1


70 71
def wrk_process_line(
            args: Tuple[int, str, str]
72
        ) -> Tuple[Optional[int], Optional[bytes]]:
73 74 75
    """
    Worker: parse input line, creates a packet in binary format

76
    Skips over malformed inputs.
77
    """
78
    qid, line, log_repr = args
79 80

    try:
81 82 83 84 85 86
        msg = msg_from_text(line)
        if blacklist.is_blacklisted(msg):
            logging.debug('Blacklisted query "%s", skipping QID %d',
                          log_repr, qid)
            return None, None
        return qid, msg.to_wire()
87
    except (ValueError, struct.error, dns.exception.DNSException) as ex:
88
        logging.error('Invalid query specification "%s": %s, skipping QID %d', line, ex, qid)
89
        return None, None
90 91


Tomas Krizek's avatar
Tomas Krizek committed
92
def wrk_process_frame(args: Tuple[int, bytes, str]) -> Tuple[Optional[int], Optional[bytes]]:
93 94 95
    """
    Worker: convert packet from pcap to binary data
    """
96 97 98
    qid, frame, log_repr = args
    wire = extract_wire(frame)
    return wrk_process_wire_packet(qid, wire, log_repr)
99 100


101 102 103 104
def wrk_process_wire_packet(
            qid: int,
            wire_packet: bytes,
            log_repr: str
105
        ) -> Tuple[Optional[int], Optional[bytes]]:
106
    """
107
    Worker: Return packet's data if it's not blacklisted
108 109 110 111 112

    :arg qid number of packet
    :arg wire_packet packet in binary data
    :arg log_repr representation of packet for logs
    """
113 114
    try:
        msg = dns.message.from_wire(wire_packet)
Tomas Krizek's avatar
Tomas Krizek committed
115 116 117 118
    except dns.exception.DNSException:
        # pass invalid blobs to LMDB (for testing non-standard states)
        pass
    else:
119 120 121 122 123
        if blacklist.is_blacklisted(msg):
            logging.debug('Blacklisted query "%s", skipping QID %d',
                          log_repr, qid)
            return None, None
    return qid, wire_packet
124

Tomas Krizek's avatar
Tomas Krizek committed
125

126
def int_or_fromtext(value, fromtext):
127
    try:
128 129 130
        return int(value)
    except ValueError:
        return fromtext(value)
131 132


133
def msg_from_text(text):
134
    """
135
    Convert line from <qname> <RR type> to DNS query in IN class.
136

137
    Returns: DNS packet in binary form
138 139
    Raises: ValueError or dns.exception.Exception on invalid input
    """
Tomas Krizek's avatar
Tomas Krizek committed
140 141 142 143
    try:
        qname, qtype = text.split()
    except ValueError:
        raise ValueError('space is only allowed as separator between qname qtype')
144
    qname = dns.name.from_text(qname.encode('ascii'))
145
    qtype = int_or_fromtext(qtype, dns.rdatatype.from_text)
146 147
    msg = dns.message.make_query(qname, qtype, dns.rdataclass.IN,
                                 want_dnssec=True, payload=4096)
148
    return msg
149 150 151


def main():
152
    cli.setup_logging()
153
    parser = argparse.ArgumentParser(
154 155 156
        formatter_class=argparse.RawTextHelpFormatter,
        description='Convert queries data from standard input and store '
                    'wire format into LMDB "queries" DB.')
157
    cli.add_arg_envdir(parser)
158
    parser.add_argument('-f', '--in-format', type=str, choices=['text', 'pcap'], default='text',
159 160 161 162 163
                        help='define format for input data, default value is text\n'
                             'Expected input for "text" is: "<qname> <RR type>", '
                             'one query per line.\n'
                             'Expected input for "pcap" is content of the pcap file.')
    parser.add_argument('--pcap-file', type=argparse.FileType('rb'))
164

165 166
    args = parser.parse_args()

167
    if args.in_format == 'text' and args.pcap_file:
168 169
        logging.critical("Argument --pcap-file can be use only in combination with -f pcap")
        sys.exit(1)
170
    if args.in_format == 'pcap' and not args.pcap_file:
171 172
        logging.critical("Missing path to pcap file, use argument --pcap-file")
        sys.exit(1)
173

174
    with LMDB(args.envdir) as lmdb:
175
        qdb = lmdb.open_db(LMDB.QUERIES, create=True, check_notexists=True)
176 177 178 179 180
        txn = lmdb.env.begin(qdb, write=True)
        try:
            with pool.Pool(
                    initializer=lambda: signal.signal(signal.SIGINT, signal.SIG_IGN)
                    ) as workers:
181 182 183 184 185
                if args.in_format == 'text':
                    data_stream = read_lines(sys.stdin)
                    method = wrk_process_line
                elif args.in_format == 'pcap':
                    data_stream = parse_pcap(args.pcap_file)
186
                    method = wrk_process_frame
187 188 189
                else:
                    logging.error('unknown in-format, use "text" or "pcap"')
                    sys.exit(1)
190 191 192
                for qid, wire in workers.imap(method, data_stream, chunksize=1000):
                    if qid is not None:
                        key = qid2key(qid)
193
                        txn.put(key, wire)
Tomas Krizek's avatar
Tomas Krizek committed
194
        except KeyboardInterrupt:
195 196 197 198 199 200 201 202 203
            logging.info('SIGINT received, exiting...')
            sys.exit(130)
        except RuntimeError as err:
            logging.error(err)
            sys.exit(1)
        finally:
            # attempt to preserve data if something went wrong (or not)
            logging.debug('Comitting LMDB transaction...')
            txn.commit()
204

205

206 207
if __name__ == '__main__':
    main()