Commit 4680ff31 authored by Petr Špaček's avatar Petr Špaček

msgdiff experiment: read answers from one huge file

Storage on filesystem is causing problems with hunderds of thousands
queries (and thus subdirectories and files).

This is an experiment to see if situation improves significantly
if we replace many small files with a one big file.
parent feac83ce
from IPython.core.debugger import Tracer
import collections
import cProfile
import json
......@@ -158,6 +160,33 @@ def read_answers(workdir):
answers[name] = msg
return answers
import struct
def read_answer_file(filename):
with open(filename, 'rb') as af:
binary = af.read()
gidx = 0
while gidx < len(binary):
# set length
set_len, *_ = struct.unpack('H', binary[gidx:gidx+2])
gidx += 2
set_binary = binary[gidx:gidx+set_len]
gidx += set_len
sidx = 0
answers = {}
while sidx < set_len:
#Tracer()()
name, wire_len = struct.unpack('10p H', set_binary[sidx:sidx+12])
sidx += 12
name = name.decode('ascii')
msg = dns.message.from_wire(set_binary[sidx:sidx+wire_len])
sidx += wire_len
answers[name] = msg
yield answers
def diff_pair(answers, criteria, name1, name2):
"""
Returns: sequence of (field, DataMismatch())
......@@ -177,23 +206,22 @@ def transitive_equality(answers, criteria, resolvers):
res_others))
def compare(target, workdir, criteria):
def compare(target, answers, criteria):
#print('compare: %s %s %s' %(target, workdir, criteria))
answers = read_answers(workdir)
#answers = read_answers(workdir)
others = list(answers.keys())
others.remove(target)
random_other = others[0]
qid = str(answers[target].question[0])
assert len(others) >= 1
# do others agree on the answer?
from IPython.core.debugger import Tracer
#Tracer()()
others_agree = transitive_equality(answers, criteria, others)
if not others_agree:
return (workdir, False, None)
return (qid, False, None)
target_diffs = dict(diff_pair(answers, criteria, random_other, target))
return (workdir, others_agree, target_diffs)
return (qid, others_agree, target_diffs)
#target_agree = not any(target_diffs.values())
#if not target_agree:
# print('target:')
......@@ -227,14 +255,14 @@ def worker_init(criteria_arg, target_arg):
target = target_arg
#print('criteria: %s target: %s' % (criteria, target))
def compare_wrapper(workdir):
def compare_wrapper(answers):
global criteria
global target
#global result
global i
#global prof
#return compare(target, workdir, criteria)
result = compare(target, workdir, criteria)
result = compare(target, answers, criteria)
#i += 1
#if i == 10000:
# prof.disable()
......@@ -277,22 +305,18 @@ def process_results(diff_generator):
uniq[field] = collections.OrderedDict(uniq[field].most_common(20))
pprint(uniq)
def main():
target = 'kresd'
ccriteria = ['opcode', 'rcode', 'flags', 'question', 'qname', 'qtype', 'answer'] #'authority', 'additional', 'edns']
#ccriteria = ['opcode', 'rcode', 'flags', 'question', 'qname', 'qtype', 'answer', 'authority', 'additional', 'edns', 'nsid']
if False:
dir_names = itertools.tee(find_querydirs(sys.argv[1]), 2)
for d in dir_names:
print(d)
workdirs = itertools.islice(find_querydirs(sys.argv[1]), 100000)
answers_stream = itertools.islice(read_answer_file('/tmp/all.dns2'), 100000)
print('diffs = {')
serial = False
if serial:
worker_init(ccriteria, target)
process_results(map(compare_wrapper, workdirs))
process_results(map(compare_wrapper, answers_stream))
else:
with pool.Pool(
......@@ -300,7 +324,7 @@ def main():
initializer=worker_init,
initargs=(ccriteria, target)
) as p:
process_results(p.imap_unordered(compare_wrapper, workdirs, chunksize=100))
process_results(p.imap_unordered(compare_wrapper, answers_stream, chunksize=100))
if __name__ == '__main__':
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment