Commit fa19a979 authored by Petr Špaček's avatar Petr Špaček Committed by Tomas Krizek

blacklist: rework + PCAP fixes

parent 84a7f9f6
...@@ -23,14 +23,34 @@ def read_lines(instream): ...@@ -23,14 +23,34 @@ def read_lines(instream):
""" """
Yield (line number, stripped line text, representation for logs). Skip empty lines. Yield (line number, stripped line text, representation for logs). Skip empty lines.
""" """
i = 0 i = 1
for line in instream: for line in instream:
if i % REPORT_CHUNKS == 0:
logging.info('Read %d lines', i)
line = line.strip() line = line.strip()
if line: if line:
i += 1
yield (i, line, line) yield (i, line, line)
if i % REPORT_CHUNKS == 0: i += 1
logging.info('Read %d queries', i)
def extract_wire(packet: bytes) -> bytes:
"""
Extract DNS message wire format from PCAP packet.
UDP payload is passed as it was.
TCP payload will have first two bytes removed (length prefix).
Caller must verify if return value is a valid DNS message
and decice what to do with invalid ones.
"""
frame = dpkt.ethernet.Ethernet(packet)
ip = frame.data
transport = ip.data
if isinstance(transport, dpkt.tcp.TCP):
if len(transport.data) < 2:
return transport.data
wire = transport.data[2:]
else:
wire = transport.data
return wire
def parse_pcap(pcap_file): def parse_pcap(pcap_file):
...@@ -38,44 +58,51 @@ def parse_pcap(pcap_file): ...@@ -38,44 +58,51 @@ def parse_pcap(pcap_file):
Filters dns query packets from pcap_file Filters dns query packets from pcap_file
Yield (packet number, packet as wire, representation for logs) Yield (packet number, packet as wire, representation for logs)
""" """
i = 0 i = 1
pcap_file = dpkt.pcap.Reader(pcap_file) pcap_file = dpkt.pcap.Reader(pcap_file)
for _, wire in pcap_file: for _, frame in pcap_file:
if i % REPORT_CHUNKS == 0:
logging.info('Read %d frames', i)
yield (i, frame, 'frame no. {}'.format(i))
i += 1 i += 1
yield (i, wire, '')
def wrk_process_line( def wrk_process_line(
args: Tuple[int, str, str] args: Tuple[int, str, str]
) -> Tuple[Optional[bytes], Optional[bytes]]: ) -> Tuple[Optional[int], Optional[bytes]]:
""" """
Worker: parse input line, creates a packet in binary format Worker: parse input line, creates a packet in binary format
Skips over empty lines, raises for malformed inputs. Skips over malformed inputs.
""" """
qid, line, _ = args qid, line, log_repr = args
try: try:
wire = wire_from_text(line) msg = msg_from_text(line)
if blacklist.is_blacklisted(msg):
logging.debug('Blacklisted query "%s", skipping QID %d',
log_repr, qid)
return None, None
return qid, msg.to_wire()
except (ValueError, struct.error, dns.exception.DNSException) as ex: except (ValueError, struct.error, dns.exception.DNSException) as ex:
logging.error('Invalid query "%s": %s (skipping query ID %d)', line, ex, qid) logging.error('Invalid query specification "%s": %s, skipping QID %d', line, ex, qid)
return None, None return None, None
return wrk_process_wire_packet(qid, wire, line)
def wrk_process_packet(args: Tuple[int, bytes, str]): def wrk_process_frame(args: Tuple[int, bytes, str]):
""" """
Worker: convert packet from pcap to binary data Worker: convert packet from pcap to binary data
""" """
qid, wire, log_repr = args qid, frame, log_repr = args
wrk_process_wire_packet(qid, wire, log_repr) wire = extract_wire(frame)
return wrk_process_wire_packet(qid, wire, log_repr)
def wrk_process_wire_packet( def wrk_process_wire_packet(
qid: int, qid: int,
wire_packet: bytes, wire_packet: bytes,
log_repr: str log_repr: str
) -> Tuple[Optional[bytes], Optional[bytes]]: ) -> Tuple[Optional[int], Optional[bytes]]:
""" """
Worker: Return packet's data if it's not blacklisted Worker: Return packet's data if it's not blacklisted
...@@ -83,15 +110,16 @@ def wrk_process_wire_packet( ...@@ -83,15 +110,16 @@ def wrk_process_wire_packet(
:arg wire_packet packet in binary data :arg wire_packet packet in binary data
:arg log_repr representation of packet for logs :arg log_repr representation of packet for logs
""" """
if not blacklist.is_blacklisted(wire_packet): try:
key = qid2key(qid) msg = dns.message.from_wire(wire_packet)
return key, wire_packet if blacklist.is_blacklisted(msg):
logging.debug('Blacklisted query "%s", skipping QID %d',
logging.debug('Query "%s" blacklisted (skipping query ID %d)', log_repr, qid)
log_repr if log_repr else repr(blacklist.extract_packet(wire_packet)), return None, None
qid) except dns.exception.DNSException:
return None, None # pass invalid blobs to LMDB (for testing non-standard states)
pass
return qid, wire_packet
def int_or_fromtext(value, fromtext): def int_or_fromtext(value, fromtext):
try: try:
...@@ -100,7 +128,7 @@ def int_or_fromtext(value, fromtext): ...@@ -100,7 +128,7 @@ def int_or_fromtext(value, fromtext):
return fromtext(value) return fromtext(value)
def wire_from_text(text): def msg_from_text(text):
""" """
Convert line from <qname> <RR type> to DNS query in IN class. Convert line from <qname> <RR type> to DNS query in IN class.
...@@ -112,7 +140,7 @@ def wire_from_text(text): ...@@ -112,7 +140,7 @@ def wire_from_text(text):
qtype = int_or_fromtext(qtype, dns.rdatatype.from_text) qtype = int_or_fromtext(qtype, dns.rdatatype.from_text)
msg = dns.message.make_query(qname, qtype, dns.rdataclass.IN, msg = dns.message.make_query(qname, qtype, dns.rdataclass.IN,
want_dnssec=True, payload=4096) want_dnssec=True, payload=4096)
return msg.to_wire() return msg
def main(): def main():
...@@ -150,12 +178,13 @@ def main(): ...@@ -150,12 +178,13 @@ def main():
method = wrk_process_line method = wrk_process_line
elif args.in_format == 'pcap': elif args.in_format == 'pcap':
data_stream = parse_pcap(args.pcap_file) data_stream = parse_pcap(args.pcap_file)
method = wrk_process_packet method = wrk_process_frame
else: else:
logging.error('unknown in-format, use "text" or "pcap"') logging.error('unknown in-format, use "text" or "pcap"')
sys.exit(1) sys.exit(1)
for key, wire in workers.imap(method, data_stream, chunksize=1000): for qid, wire in workers.imap(method, data_stream, chunksize=1000):
if key is not None: if qid is not None:
key = qid2key(qid)
txn.put(key, wire) txn.put(key, wire)
except KeyboardInterrupt as err: except KeyboardInterrupt as err:
logging.info('SIGINT received, exiting...') logging.info('SIGINT received, exiting...')
......
import dpkt
import dns import dns
from dns.message import Message, from_wire from dns.message import Message
# dotnxdomain.net and dashnxdomain.net are used by APNIC for ephemeral # dotnxdomain.net and dashnxdomain.net are used by APNIC for ephemeral
# single-query tests so there is no point in asking these repeatedly # single-query tests so there is no point in asking these repeatedly
...@@ -9,29 +8,11 @@ _BLACKLIST_SUBDOMAINS = [dns.name.from_text(name) for name in ...@@ -9,29 +8,11 @@ _BLACKLIST_SUBDOMAINS = [dns.name.from_text(name) for name in
['dotnxdomain.net.', 'dashnxdomain.net.']] ['dotnxdomain.net.', 'dashnxdomain.net.']]
def extract_packet(packet: bytes) -> Message: def is_blacklisted(dnsmsg: Message) -> bool:
"""
Extract packet from bytes. Return dns.Message
"""
frame = dpkt.ethernet.Ethernet(packet)
ip = frame.data
transport = ip.data
if transport.data == b'':
return True
if isinstance(transport, dpkt.tcp.TCP):
wire = transport.data[2:]
else:
wire = transport.data
dnsmsg = from_wire(wire)
return dnsmsg
def is_blacklisted(packet: bytes) -> bool:
""" """
Detect if given packet is blacklisted or not. Detect if given packet is blacklisted or not.
""" """
try: try:
dnsmsg = extract_packet(packet)
flags = dns.flags.to_text(dnsmsg.flags).split() flags = dns.flags.to_text(dnsmsg.flags).split()
if 'QR' in flags: # not a query if 'QR' in flags: # not a query
return True return True
......
import binascii
import pytest
from qprep import wrk_process_frame, wrk_process_wire_packet
@pytest.mark.parametrize('wire', [
b'',
b'x',
b'xx',
])
def test_wire_input_invalid(wire):
assert wrk_process_wire_packet(1, wire, 'invalid') == (1, wire)
assert wrk_process_wire_packet(1, wire, 'invalid') == (1, wire)
@pytest.mark.parametrize('wire_hex', [
# www.audioweb.cz A
'ed21010000010000000000010377777708617564696f77656202637a00000100010000291000000080000000',
])
def test_wire_input_valid(wire_hex):
wire_in = binascii.unhexlify(wire_hex)
qid, wire_out = wrk_process_wire_packet(1, wire_in, 'qid 1')
assert wire_in == wire_out
assert qid == 1
@pytest.mark.parametrize('wire_hex', [
# test.dotnxdomain.net. A
('ce970120000100000000000104746573740b646f746e78646f6d61696e036e657400000'
'10001000029100000000000000c000a00084a69fef0f174d87e'),
# 0es-u2af5c077-c56-s1492621913-i00000000.eue.dotnxdomain.net A
('d72f01000001000000000001273065732d7532616635633037372d6335362d733134393'
'23632313931332d693030303030303030036575650b646f746e78646f6d61696e036e65'
'7400000100010000291000000080000000'),
])
def test_pcap_input_blacklist(wire_hex):
wire = binascii.unhexlify(wire_hex)
assert wrk_process_wire_packet(1, wire, 'qid 1') == (None, None)
@pytest.mark.parametrize('frame_hex, wire_hex', [
# UPD nic.cz A
('deadbeefcafecafebeefbeef08004500004bf9d000004011940d0202020201010101b533003500375520',
'b90001200001000000000001036e696302637a0000010001000029100000000000000c000a00081491f8'
'93b0c90b2f'),
# TCP nic.cz A
('deadbeefcafebeefbeefcafe080045000059e2f2400040066ae80202020201010101ace7003557b51707'
'47583400501800e5568c0000002f', '49e501200001000000000001036e696302637a00000100010000'
'29100000000000000c000a0008a1db546e1d6fa39f'),
])
def test_wrk_process_frame(frame_hex, wire_hex):
data = binascii.unhexlify(frame_hex + wire_hex)
wire = binascii.unhexlify(wire_hex)
assert wrk_process_frame((1, data, 'qid 1')) == (1, wire)
import dns.message
import dns.rdataclass
import dns.rdatatype
import dns.rrset
import pytest
from qprep import wrk_process_line
@pytest.mark.parametrize('line', [
'',
'x'*256 + ' A',
('\123x.test. 65536'),
('\321.test. 1'),
])
def test_text_input_invalid(line):
assert wrk_process_line((1, line, line)) == (None, None)
@pytest.mark.parametrize('qname, qtype', [
('x', 'A'),
('x', 1),
('blabla.test.', 'TSIG'),
])
def test_text_input_valid(qname, qtype):
line = '{} {}'.format(qname, qtype)
if isinstance(qtype, int):
rdtype = qtype
else:
rdtype = dns.rdatatype.from_text(qtype)
expected = [dns.rrset.RRset(dns.name.from_text(qname), dns.rdataclass.IN, rdtype)]
qid, wire = wrk_process_line((1, line, line))
msg = dns.message.from_wire(wire)
assert msg.question == expected
assert qid == 1
@pytest.mark.parametrize('line', [
'test. ANY',
'test. RRSIG',
'something.dotnxdomain.net. A',
'something.dashnxdomain.net. AAAA',
])
def test_text_input_blacklist(line):
assert wrk_process_line((1, line, line)) == (None, None)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment