msgdiff.py 4.6 KB
Newer Older
1 2 3
#!/usr/bin/env python3

import argparse
4
from functools import partial
5
import logging
6
from multiprocessing import pool
7
import os
8
import pickle
9
from typing import Any, Dict, Iterator, Mapping, Optional, Sequence, Tuple  # noqa
10

11 12
from respdiff import cli
from respdiff.dataformat import (
13
    DiffReport, Disagreements, DisagreementsCounter, FieldLabel, QID)
14
from respdiff.database import DNSRepliesFactory, DNSReply, key2qid, LMDB, MetaDatabase
Tomas Krizek's avatar
Tomas Krizek committed
15
from respdiff.match import compare
16
from respdiff.typing import ResolverID
17 18


19
lmdb = None
20 21


22 23 24
def read_answers_lmdb(
            dnsreplies_factory: DNSRepliesFactory,
            qid: QID
25
        ) -> Mapping[ResolverID, DNSReply]:
26
    assert lmdb is not None, "LMDB wasn't initialized!"
27 28
    adb = lmdb.get_db(LMDB.ANSWERS)
    with lmdb.env.begin(adb) as txn:
29 30
        replies_blob = txn.get(qid)
    assert replies_blob
31
    return dnsreplies_factory.parse(replies_blob)
32 33


34 35 36 37 38 39 40 41
def compare_lmdb_wrapper(
            criteria: Sequence[FieldLabel],
            target: ResolverID,
            dnsreplies_factory: DNSRepliesFactory,
            qid: QID
        ) -> None:
    assert lmdb is not None, "LMDB wasn't initialized!"
    answers = read_answers_lmdb(dnsreplies_factory, qid)
42
    others_agree, target_diffs = compare(answers, criteria, target)
43 44 45
    if others_agree and not target_diffs:
        return  # all agreed, nothing to write
    blob = pickle.dumps((others_agree, target_diffs))
46 47
    ddb = lmdb.get_db(LMDB.DIFFS)
    with lmdb.env.begin(ddb, write=True) as txn:
48
        txn.put(qid, blob)
49

50

51
def export_json(filename: str, report: DiffReport):
52
    assert lmdb is not None, "LMDB wasn't initialized!"
53 54 55 56 57 58 59 60 61 62 63
    report.other_disagreements = DisagreementsCounter()
    report.target_disagreements = Disagreements()

    # get diff data
    ddb = lmdb.get_db(LMDB.DIFFS)
    with lmdb.env.begin(ddb) as txn:
        with txn.cursor() as diffcur:
            for key, diffblob in diffcur:
                qid = key2qid(key)
                others_agree, diff = pickle.loads(diffblob)
                if not others_agree:
64
                    report.other_disagreements.queries.add(qid)
65 66 67 68
                else:
                    for field, mismatch in diff.items():
                        report.target_disagreements.add_mismatch(field, mismatch, qid)

Tomas Krizek's avatar
Tomas Krizek committed
69 70
    # NOTE: msgdiff is the first tool in the toolchain to generate report.json
    #       thus it doesn't make sense to re-use existing report.json file
71 72 73 74 75 76
    if os.path.exists(filename):
        backup_filename = filename + '.bak'
        os.rename(filename, backup_filename)
        logging.warning(
            'JSON report already exists, overwriting file. Original '
            'file backed up as %s', backup_filename)
77 78 79
    report.export_json(filename)


80
def prepare_report(lmdb_, servers: Sequence[ResolverID]) -> DiffReport:
81 82 83 84 85 86
    qdb = lmdb_.open_db(LMDB.QUERIES)
    adb = lmdb_.open_db(LMDB.ANSWERS)
    with lmdb_.env.begin() as txn:
        total_queries = txn.stat(qdb)['entries']
        total_answers = txn.stat(adb)['entries']

87
    meta = MetaDatabase(lmdb_, servers)
88 89 90 91 92 93 94 95 96 97
    start_time = meta.read_start_time()
    end_time = meta.read_end_time()

    return DiffReport(
        start_time,
        end_time,
        total_queries,
        total_answers)


98
def main():
99 100
    global lmdb

101
    cli.setup_logging()
102 103
    parser = argparse.ArgumentParser(
        description='compute diff from answers stored in LMDB and write diffs to LMDB')
104 105 106
    cli.add_arg_envdir(parser)
    cli.add_arg_config(parser)
    cli.add_arg_datafile(parser)
107

108
    args = parser.parse_args()
109
    datafile = cli.get_datafile(args, check_exists=False)
110 111
    criteria = args.cfg['diff']['criteria']
    target = args.cfg['diff']['target']
112
    servers = args.cfg['servers']['names']
113

114
    with LMDB(args.envdir) as lmdb_:
115 116 117
        # NOTE: To avoid an lmdb.BadRslotError, probably caused by weird
        # interaction when using multiple transaction / processes, open a separate
        # environment. Also, any dbs have to be opened before using MetaDatabase().
118
        report = prepare_report(lmdb_, servers)
119
        cli.check_metadb_servers_version(lmdb_, servers)
120

121
    with LMDB(args.envdir, fast=True) as lmdb_:
122
        lmdb = lmdb_
123
        lmdb.open_db(LMDB.ANSWERS)
124 125
        lmdb.open_db(LMDB.DIFFS, create=True, drop=True)
        qid_stream = lmdb.key_stream(LMDB.ANSWERS)
126 127

        dnsreplies_factory = DNSRepliesFactory(servers)
Tomas Krizek's avatar
Tomas Krizek committed
128 129
        compare_func = partial(
            compare_lmdb_wrapper, criteria, target, dnsreplies_factory)
130
        with pool.Pool() as p:
Tomas Krizek's avatar
Tomas Krizek committed
131
            for _ in p.imap_unordered(compare_func, qid_stream, chunksize=10):
132
                pass
133
        export_json(datafile, report)
134

135

136 137
if __name__ == '__main__':
    main()