Commit 67f36494 authored by Tomas Krizek's avatar Tomas Krizek

diffrepro: use new dataformat

parent d9ab5f37
......@@ -61,6 +61,7 @@ _CFGFMT_SERVER = {
'port': (int, True),
'transport': (transport_opt, True),
'graph_color': (str, False),
'restart_script': (str, False),
}
......
......@@ -63,11 +63,6 @@ class DataMismatch(Exception):
return hash(self.key)
def read_json(filename):
with open(filename, 'r') as f:
return json.load(f)
class JSONDataObject:
"""Object class for (de)serialization into JSON-compatible dictionary."""
_ATTRIBUTES = {} # type: Mapping[str, Tuple[RestoreFunction, SaveFunction]]
......
......@@ -18,7 +18,6 @@ class LMDB:
ANSWERS = b'answers'
DIFFS = b'diffs'
QUERIES = b'queries'
REPROSTATS = b'reprostats'
ENV_DEFAULTS = {
'map_size': 10 * 1024**3, # 10 G
......
import pickle
#!/usr/bin/env python3
import argparse
import logging
import subprocess
from typing import Any, Mapping
import sys
import cfg
from dbhelper import LMDB
import diffsum
from msgdiff import DataMismatch # noqa: needed for unpickling
from dataformat import Diff, DiffReport, ReproData, ResolverID
import msgdiff
import orchestrator
from orchestrator import get_resolvers
import sendrecv
def load_stats(lmdb, qid):
"""(count, others_agreed, diff_matched)"""
reprodb = lmdb.get_db(LMDB.REPROSTATS)
with lmdb.env.begin(reprodb) as txn:
stats_bin = txn.get(qid)
if stats_bin:
stats = pickle.loads(stats_bin)
else:
stats = (0, 0, 0)
def restart_resolver(script_path: str) -> None:
try:
subprocess.check_call(script_path)
except subprocess.CalledProcessError as exc:
logging.warning('Resolver restart failed (exit code %d): %s',
exc.returncode, script_path)
except PermissionError as exc:
logging.warning('Resolver restart failed (permission error): %s',
script_path)
assert len(stats) == 3
assert stats[0] >= stats[1] >= stats[2]
return stats[0], stats[1], stats[2]
def get_restart_scripts(config: Mapping[str, Any]) -> Mapping[ResolverID, str]:
restart_scripts = {}
for resolver in config['servers']['names']:
try:
restart_scripts[resolver] = config[resolver]['restart_script']
except KeyError:
logging.warning('No restart script available for "%s"!', resolver)
return restart_scripts
def save_stats(lmdb, qid, stats):
assert len(stats) == 3
assert stats[0] >= stats[1] >= stats[2]
stats_bin = pickle.dumps(stats)
reprodb = lmdb.get_db(LMDB.REPROSTATS)
with lmdb.env.begin(reprodb, write=True) as txn:
txn.put(qid, stats_bin)
def main():
logging.basicConfig(format='%(levelname)s %(message)s', level=logging.INFO)
parser = argparse.ArgumentParser(
description='attempt to reproduce original diffs from JSON report')
parser.add_argument('-d', '--datafile', default='report.json',
help='JSON report file (default: report.json)')
parser.add_argument('-c', '--config', default='respdiff.cfg', dest='cfgpath',
help='config file (default: respdiff.cfg)')
parser.add_argument('envdir', type=str,
help='LMDB environment to read queries and answers from')
args = parser.parse_args()
config = cfg.read_cfg(args.cfgpath)
report = DiffReport.from_json(args.datafile)
criteria = config['diff']['criteria']
timeout = config['sendrecv']['timeout']
selector, sockets = sendrecv.sock_init(get_resolvers(config))
restart_scripts = get_restart_scripts(config)
if len(sockets) < len(config['servers']['names']):
logging.critical("Couldn't create sockets for all resolvers.")
sys.exit(1)
if report.reprodata is None:
report.reprodata = ReproData()
with LMDB(args.envdir, readonly=True) as lmdb:
lmdb.open_db(LMDB.QUERIES)
queries = diffsum.get_query_iterator(lmdb, report.target_disagreements)
for qid, qwire in queries:
diff = report.target_disagreements[qid]
reprocounter = report.reprodata[qid]
# verify if answers are stable
if reprocounter.retries != reprocounter.upstream_stable:
logging.debug('Skipping QID %d: unstable upstream', diff.qid)
continue
def main():
criteria = [
'opcode', 'rcode', 'flags', 'question', 'qname', 'qtype', 'answertypes', 'answerrrsigs'
] # FIXME
selector, sockets = sendrecv.sock_init(getattr(orchestrator, 'resolvers'))
for script in restart_scripts.values():
restart_resolver(script)
with LMDB(sys.argv[1]) as lmdb:
lmdb.open_db(LMDB.QUERIES)
lmdb.open_db(LMDB.DIFFS)
lmdb.open_db(LMDB.REPROSTATS, create=True)
diff_stream = diffsum.read_diffs_lmdb(lmdb)
processed = 0
verified = 0
for qid, qwire, orig_others_agree, orig_diffs in diff_stream:
if not orig_others_agree:
continue # others do not agree, nothing to verify
# others agree, verify if answers are stable and the diff is reproducible
retries, upstream_stable, diff_matches = load_stats(lmdb, qid)
if retries > 0:
if retries != upstream_stable or upstream_stable != diff_matches:
continue # either unstable upstream or diff is not 100 % reproducible, skip it
processed += 1
# it might be reproducible, restart everything
if len(sys.argv) == 3:
subprocess.check_call([sys.argv[2]])
wire_blobs = sendrecv.send_recv_parallel(qwire, selector, sockets, orchestrator.timeout)
wire_blobs, _ = sendrecv.send_recv_parallel(qwire, selector, sockets, timeout)
answers = msgdiff.decode_wire_dict(wire_blobs)
new_others_agree, new_diffs = msgdiff.compare(answers, criteria, 'kresd') # FIXME
retries += 1
if orig_others_agree == new_others_agree:
upstream_stable += 1
if orig_diffs == new_diffs:
diff_matches += 1
print(qid, (retries, upstream_stable, diff_matches))
save_stats(lmdb, qid, (retries, upstream_stable, diff_matches))
if retries == upstream_stable == diff_matches:
verified += 1
print('processed :', processed)
print('verified :', verified)
print('falzified : {} {:6.2f} %'.format(
processed - verified, 100.0 * (processed - verified) / processed))
others_agree, mismatches = msgdiff.compare(answers, criteria, config['diff']['target'])
reprocounter.retries += 1
if others_agree:
reprocounter.upstream_stable += 1
if diff == Diff(diff.qid, mismatches):
reprocounter.verified += 1
report.export_json(args.datafile)
if __name__ == '__main__':
......
......@@ -8,11 +8,11 @@ import pickle
import random
import threading
import time
from typing import List, Tuple, Dict, Any # noqa: type hints
from typing import List, Tuple, Dict, Any, Mapping, Sequence # noqa: type hints
import sys
import cfg
from dataformat import DiffReport
from dataformat import DiffReport, ResolverID
from dbhelper import LMDB
import sendrecv
......@@ -109,9 +109,18 @@ def export_statistics(lmdb, datafile, start_time):
report.export_json(datafile)
def get_resolvers(config: Mapping[str, Any]) -> Sequence[Tuple[ResolverID, str, str, int]]:
resolvers_ = []
for resname in config['servers']['names']:
rescfg = config[resname]
resolvers_.append((resname, rescfg['ip'], rescfg['transport'], rescfg['port']))
return resolvers_
def main():
global ignore_timeout
global max_timeouts
global resolvers
global timeout
global time_delay_min
global time_delay_max
......@@ -131,10 +140,7 @@ def main():
help='LMDB environment to read queries from and to write answers to')
args = parser.parse_args()
for resname in args.cfg['servers']['names']:
rescfg = args.cfg[resname]
resolvers.append((resname, rescfg['ip'], rescfg['transport'], rescfg['port']))
resolvers = get_resolvers(args.cfg)
ignore_timeout = args.ignore_timeout
timeout = args.cfg['sendrecv']['timeout']
time_delay_min = args.cfg['sendrecv']['time_delay_min']
......
......@@ -22,6 +22,8 @@ port = 53
transport = tcp
# optional graph color: common names or hex (#00FFFF) allowed
graph_color = cyan
# optional restart script to clean cache and restart resolver, used by diffrepro
# restart_script = /usr/local/bin/restart-kresd
[surfnet]
ip = 145.100.185.15
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment