Commit f5bbac8e authored by Petr Špaček's avatar Petr Špaček

orchestrator: add proper command-line interface and help texts

Configuration options are read from file respdiff.cfg.
See file example.
parent 44342fab
"""
Configuration parser for respdiff suite.
Read-only.
"""
import configparser
import dns.inet
def ipaddr_check(addr):
"""
Verify that string addr can be parsed as a IP address and return it.
Raise otherwise.
"""
dns.inet.af_for_address(addr) # raises ValueError if the address is bogus
return addr
# declarative config format description for always-present sections
# dict structure: dict[section name][key name] = type
_CFGFMT = {
'sendrecv': {
'timeout': float,
'jobs': int,
},
'servers': {
'names': lambda s: [n.strip() for n in s.split(',')]
},
'diff': {
'target': str,
}
}
# declarative config format description for per-server section
# dict structure: dict[key name] = type
_CFGFMT_SERVER = {
'ip': ipaddr_check,
'port': int
}
def cfg2dict_convert(fmt, cparser):
"""
Convert values from ConfigParser into dict with proper data types.
Raises ValueError if a mandatory section or key is missing
and if an extra key is detected.
"""
cdict = {}
for sectname, sectfmt in fmt.items():
sectdict = cdict.setdefault(sectname, {})
for valname, valfmt in sectfmt.items():
try:
if not cparser[sectname][valname].strip():
raise ValueError('empty values are not allowed')
sectdict[valname] = valfmt(cparser[sectname][valname])
except ValueError as ex:
raise ValueError('config section [{}] key "{}" has invalid format: '
'{}; expected format: {}'.format(
sectname, valname, ex, valfmt))
except KeyError:
raise KeyError('config section [{}] key "{}" not found'.format(
sectname, valname))
unsupported_keys = set(cparser[sectname].keys()) - set(sectfmt.keys())
if unsupported_keys:
raise ValueError('unexpected keys {} in section [{}]'.format(
unsupported_keys, sectname))
return cdict
def cfg2dict_check_sect(fmt, cfg):
"""
Check non-existence of unhandled config sections.
"""
supported_sections = set(fmt.keys())
present_sections = set(cfg.keys()) - {'DEFAULT'}
unsupported_sections = present_sections - supported_sections
if unsupported_sections:
raise ValueError('unexpected config sections {}'.format(
', '.join('[{}]'.format(sn) for sn in unsupported_sections)))
def cfg2dict_check_diff(cdict):
"""
Check if diff target is listed among servers.
"""
if cdict['diff']['target'] not in cdict['servers']['names']:
raise ValueError('[diff] target value "{}" must be listed in [servers] names'.format(
cdict['diff']['target']))
def read_cfg(filename):
"""
Read config file, convert values, validate data and return dict[section][key] = value.
"""
# verify the file exists (ConfigParser does not do it)
with open(filename, 'r') as cfile:
pass
parser = configparser.ConfigParser(
delimiters='=',
comment_prefixes='#',
interpolation=None,
empty_lines_in_values=False)
parser.read(filename)
# parse things which must be present
cdict = cfg2dict_convert(_CFGFMT, parser)
# parse variable server-specific data
cfgfmt_servers = _CFGFMT.copy()
for server in cdict['servers']['names']:
cfgfmt_servers[server] = _CFGFMT_SERVER
cdict = cfg2dict_convert(cfgfmt_servers, parser)
# check existence of undefined extra sections
cfg2dict_check_sect(cfgfmt_servers, parser)
cfg2dict_check_diff(cdict)
return cdict
if __name__ == '__main__':
from pprint import pprint
import sys
pprint(read_cfg(sys.argv[1]))
import lmdb
ANSWERS_DB_NAME = b'answers'
QUERIES_DB_NAME = b'queries'
......
#!/usr/bin/env python3
import argparse
import multiprocessing.pool as pool
import pickle
import sys
......@@ -5,17 +8,10 @@ import threading
import lmdb
import cfg
import dbhelper
import sendrecv
timeout = 5
resolvers = [
('kresd', '127.0.0.1', 5353),
('unbound', '127.0.0.1', 53535),
('bind', '127.0.0.1', 53533)
]
# find query files
global worker_state
worker_state = {} # shared by all workers
......@@ -40,7 +36,7 @@ def worker_init(envdir, resolvers, init_timeout):
'readonly': False
})
lenv = lmdb.Environment(**config)
adb = lenv.open_db(key=b'answers', create=True, **dbhelper.db_open)
adb = lenv.open_db(key=dbhelper.ANSWERS_DB_NAME, create=True, **dbhelper.db_open)
worker_state[tid] = (lenv, adb, selector, sockets)
......@@ -74,14 +70,42 @@ def reader_init(envdir):
def main():
envdir = sys.argv[1]
lenv, qdb = reader_init(envdir)
parser = argparse.ArgumentParser(
description='read queries from LMDB, send them in parallel to servers '
'listed in configuration file, and record answers into LMDB')
parser.add_argument('-c', '--config', type=cfg.read_cfg, default='respdiff.cfg', dest='cfg',
help='config file (default: respdiff.cfg)')
parser.add_argument('envdir', type=str,
help='LMDB environment to read queries from and to write answers to')
args = parser.parse_args()
resolvers = []
for resname in args.cfg['servers']['names']:
rescfg = args.cfg[resname]
resolvers.append((resname, rescfg['ip'], rescfg['port']))
if not dbhelper.db_exists(args.envdir, dbhelper.QUERIES_DB_NAME):
logging.critical(
'LMDB environment "%s does not contain DB %s! '
'Use qprep to prepare queries.',
args.envpath, dbhelper.ANSWERS_DB_NAME)
sys.exit(1)
if dbhelper.db_exists(args.envdir, dbhelper.ANSWERS_DB_NAME):
logging.critical(
'LMDB environment "%s" already contains DB %s! '
'Overwritting it would invalidate data in the environment, '
'terminating.',
args.envpath, dbhelper.ANSWERS_DB_NAME)
sys.exit(1)
lenv, qdb = reader_init(args.envdir)
qstream = dbhelper.key_value_stream(lenv, qdb)
with pool.Pool(
processes=64,
processes=args.cfg['sendrecv']['jobs'],
initializer=worker_init,
initargs=[envdir, resolvers, timeout]) as p:
initargs=[args.envdir, resolvers, args.cfg['sendrecv']['timeout']]) as p:
for _ in p.imap_unordered(worker_query_lmdb_wrapper, qstream, chunksize=100):
pass
......
[sendrecv]
# in seconds
timeout = 5
# number of queries to run simultaneously
jobs = 64
[servers]
names = kresd, bind, unbound
# symbolic names of DNS servers under test
# separate multiple values by ,
# each symbolic name in [servers] section refers to config section
# containing IP address and port of particular server
[kresd]
ip = ::1
port = 5353
[bind]
ip = 127.0.0.1
port = 53533
[unbound]
ip = 127.0.0.1
port = 53535
[diff]
# symbolic name of server under test
target = kresd
# other servers are used as reference when comparing answers from the target
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment