orchestrator.py 2.04 KB
Newer Older
1 2 3
#!/usr/bin/env python3

import argparse
4
import logging
5
from multiprocessing import pool
6
import sys
7

8
from respdiff import cli, sendrecv
9
from respdiff.database import LMDB, MetaDatabase
10 11


12
def main():
13
    cli.setup_logging()
14 15 16
    parser = argparse.ArgumentParser(
        description='read queries from LMDB, send them in parallel to servers '
                    'listed in configuration file, and record answers into LMDB')
17 18
    cli.add_arg_envdir(parser)
    cli.add_arg_config(parser)
19 20
    parser.add_argument('--ignore-timeout', action="store_true",
                        help='continue despite consecutive timeouts from resolvers')
21

22
    args = parser.parse_args()
23
    sendrecv.module_init(args)
24

25
    with LMDB(args.envdir) as lmdb:
26
        meta = MetaDatabase(lmdb, args.cfg['servers']['names'], create=True)
27 28 29
        meta.write_version()
        meta.write_start_time()

30
        lmdb.open_db(LMDB.QUERIES)
31 32 33
        adb = lmdb.open_db(LMDB.ANSWERS, create=True, check_notexists=True)

        qstream = lmdb.key_value_stream(LMDB.QUERIES)
34 35
        txn = lmdb.env.begin(adb, write=True)
        try:
36
            # process queries in parallel
37 38
            with pool.Pool(
                    processes=args.cfg['sendrecv']['jobs'],
39
                    initializer=sendrecv.worker_init) as p:
40
                i = 0
Tomas Krizek's avatar
Tomas Krizek committed
41 42
                for qkey, blob in p.imap(sendrecv.worker_perform_query, qstream,
                                         chunksize=100):
43 44 45
                    i += 1
                    if i % 10000 == 0:
                        logging.info('Received {:d} answers'.format(i))
Tomas Krizek's avatar
Tomas Krizek committed
46
                    txn.put(qkey, blob)
47 48 49
        except KeyboardInterrupt as err:
            logging.info('SIGINT received, exiting...')
            sys.exit(130)
50 51 52
        except RuntimeError as err:
            logging.error(err)
            sys.exit(1)
53 54
        finally:
            # attempt to preserve data if something went wrong (or not)
55
            logging.debug('Comitting LMDB transaction...')
56
            txn.commit()
57
            meta.write_end_time()
58

59

60 61
if __name__ == "__main__":
    main()