Commit 7c8fc998 authored by Petr Špaček's avatar Petr Špaček

msgdiff: read answers from LMDB environment

Answers are stored in LMDB sub-database 'answers'.

One key in the DB is one set of anwers, key is internal query ID
(not the same as in the DNS message).

Value of the key are binary data in Python pickle format.
Unpickling it produces dictionary with structure
{'name of the resolver': <answer in wire format>}

Wire format is used instead of DNS message objects for two reasons:
1. it allows us to store invalid answers as received from the resolver
2. to save storage space
parent 98e924e8
......@@ -3,11 +3,15 @@ from IPython.core.debugger import Tracer
import collections
import cProfile
import json
import pickle
from pprint import pprint
import sys
import dns.message
import dns.exception
import lmdb
import lmdbcfg
#m1 = dns.message.from_wire(open(sys.argv[1], 'rb').read())
#print('--- m1 ---')
......@@ -144,6 +148,32 @@ def find_querydirs(workdir):
#print('yield %s' % root)
yield root
def find_answer_qids(lenv, db):
with lenv.begin(db) as txn:
with txn.cursor(db) as cur:
cont = cur.first()
while cont:
yield cur.key()
cont = cur.next()
def read_answers_lmdb(lenv, db, qid):
answers = {}
with lenv.begin(db) as txn:
blob = txn.get(qid)
assert blob
blob_dict = pickle.loads(blob)
assert isinstance(blob_dict, dict)
for k, v in blob_dict.items():
# decode bytes to dns.message objects
#if isinstance(v, bytes):
try:
answers[k] = dns.message.from_wire(v)
except Exception as ex:
#answers[k] = ex # decoding failed, record it!
continue
return answers
def read_answers(workdir):
answers = {}
for filename in os.listdir(workdir):
......@@ -176,7 +206,6 @@ def read_answer_file(filename):
answers = {}
while sidx < set_len:
#Tracer()()
name, wire_len = struct.unpack('10p H', set_binary[sidx:sidx+12])
sidx += 12
......@@ -207,21 +236,20 @@ def transitive_equality(answers, criteria, resolvers):
res_others))
def compare(target, workdir, criteria):
def compare(target, qid, criteria):
global lenv
global answers_db
#print('compare: %s %s %s' %(target, workdir, criteria))
#answers = read_answers(workdir)
# convert from wire format to DNS message object
try:
answers = read_answers(workdir)
except (ValueError, dns.exception.DNSException):
return (None, False, None) # malformed reply!
answers = read_answers_lmdb(lenv, answers_db, qid)
others = list(answers.keys())
try:
others.remove(target)
except ValueError:
return (None, False, None) # HACK, target did not reply
#qid = str(answers[target].question[0])
qid = (workdir, answers[target].question[0].name, answers[target].question[0].rdtype)
#qid = (workdir, answers[target].question[0].name, answers[target].question[0].rdtype)
if len(others) <= 1:
return (qid, False, None) # HACK, not enough targets to compare
random_other = others[0]
......@@ -259,6 +287,19 @@ def worker_init(criteria_arg, target_arg):
global target
global prof
global i
global lenv
global answers_db
config = lmdbcfg.env_open.copy()
config.update({
'path': sys.argv[1],
'readonly': False,
'create': False
})
lenv = lmdb.Environment(**config)
answers_db = lenv.open_db(key=b'answers', create=True, **lmdbcfg.db_open)
i = 0
#prof = cProfile.Profile()
#prof.enable()
......@@ -267,14 +308,14 @@ def worker_init(criteria_arg, target_arg):
target = target_arg
#print('criteria: %s target: %s' % (criteria, target))
def compare_wrapper(workdir):
def compare_wrapper(qid):
global criteria
global target
#global result
global i
#global prof
#return compare(target, workdir, criteria)
result = compare(target, workdir, criteria)
result = compare(target, qid, criteria)
#i += 1
#if i == 10000:
# prof.disable()
......@@ -301,7 +342,8 @@ def process_results(diff_generator):
if target_diff:
stats['target_disagrees'] += 1
print('("%s", "%s", "%s"): ' % qid)
#print('("%s", "%s", "%s"): ' % qid)
print('"%s": ' % qid)
pprint(target_diff)
print(',')
diff_fields = list(target_diff.keys())
......@@ -309,10 +351,10 @@ def process_results(diff_generator):
for field, value in target_diff.items():
if field == 'answer':
continue
question = qid[1:]
queries.update([question])
print(type(question))
print(question)
#question = qid[1:]
#queries.update([question])
#print(type(question))
#print(question)
uniq.setdefault(field, collections.Counter()).update([value])
print('}')
......@@ -324,11 +366,11 @@ def process_results(diff_generator):
#for field in uniq:
# uniq[field] = collections.OrderedDict(uniq[field].most_common(100))
pprint(uniq)
print('most common mismatches (not counting answer section):')
for query, count in queries.most_common(100):
qname, qtype = query
qtype = dns.rdatatype.to_text(qtype)
print("%s %s: %s mismatches" % (qname, qtype, count))
#print('most common mismatches (not counting answer section):')
#for query, count in queries.most_common(100):
# qname, qtype = query
# qtype = dns.rdatatype.to_text(qtype)
# print("%s %s: %s mismatches" % (qname, qtype, count))
#pprint(collections.OrderedDict(queries.most_common(100)))
......@@ -338,24 +380,37 @@ def main():
#ccriteria = ['opcode', 'rcode', 'flags', 'question', 'qname', 'qtype', 'answer', 'authority', 'additional', 'edns', 'nsid']
#answers_stream = itertools.islice(read_answer_file('/tmp/all.dns2'), 100000)
#answers_stream = itertools.islice(find_querydirs(sys.argv[1]), 90140, 90141)
answers_stream = itertools.islice(find_querydirs(sys.argv[1]), 300000)
#answers_stream = itertools.islice(find_querydirs(sys.argv[1]), 300000)
#answers_stream = find_querydirs(sys.argv[1])
#for a in answers_stream:
# print(a)
#answers_stream = itertools.islice(find_querydirs(sys.argv[1]), 300000)
config = lmdbcfg.env_open.copy()
config.update({
'path': sys.argv[1],
'readonly': True,
'create': False
})
lenv = lmdb.Environment(**config)
db = lenv.open_db(key=b'answers', create=False, **lmdbcfg.db_open)
#qid_stream = itertools.islice(find_answer_qids(lenv, db), 300000)
qid_stream = find_answer_qids(lenv, db)
print('diffs = {')
serial = False
if serial:
worker_init(ccriteria, target)
process_results(map(compare_wrapper, answers_stream))
process_results(map(compare_wrapper, qid_stream))
else:
with pool.Pool(
processes=4,
#processes=4,
initializer=worker_init,
initargs=(ccriteria, target)
) as p:
process_results(p.imap_unordered(compare_wrapper, answers_stream, chunksize=100))
process_results(p.imap_unordered(compare_wrapper, qid_stream, chunksize=10))
if __name__ == '__main__':
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment