Commit a4ab0e3b authored by Tomas Krizek's avatar Tomas Krizek

orchestrator: exit when resolvers aren't responding

If one of the resolvers doesn't respond to N queries in a row, exit
the script. This should help to avoid wasting resources when resolvers
weren't started up properly (or crashed).

Related knot/resolver-benchmarking#17
parent 9a0578cf
......@@ -40,6 +40,7 @@ _CFGFMT = {
'jobs': (int, True),
'time_delay_min': (float, True),
'time_delay_max': (float, True),
'max_timeouts': (int, False),
},
'servers': {
'names': (comma_list, True),
......
......@@ -7,15 +7,19 @@ import pickle
import random
import threading
import time
from typing import List, Tuple # noqa: type hints
from typing import List, Tuple, Dict, Any # noqa: type hints
import sys
import cfg
from dbhelper import LMDB
import sendrecv
worker_state = {} # shared by all workers
worker_state = threading.local()
resolvers = [] # type: List[Tuple[str, str, str, int]]
ignore_timeout = False
max_timeouts = 10 # crash when N consecutive timeouts are received from a single resolver
timeout = None
time_delay_min = 0
time_delay_max = 0
......@@ -25,9 +29,14 @@ def worker_init():
"""
make sure it works with distincts processes and threads as well
"""
tid = threading.current_thread().ident
worker_state.timeouts = {}
worker_reinit()
def worker_reinit():
selector, sockets = sendrecv.sock_init(resolvers)
worker_state[tid] = (selector, sockets)
worker_state.selector = selector
worker_state.sockets = sockets
def worker_deinit(selector, sockets):
......@@ -42,35 +51,56 @@ def worker_deinit(selector, sockets):
def worker_query_lmdb_wrapper(args):
qid, qwire = args
tid = threading.current_thread().ident
selector, sockets = worker_state[tid]
selector = worker_state.selector
sockets = worker_state.sockets
# optional artificial delay for testing
if time_delay_max > 0:
time.sleep(random.uniform(time_delay_min, time_delay_max))
replies, reinit = sendrecv.send_recv_parallel(qwire, selector, sockets, timeout)
if not ignore_timeout:
check_timeout(replies)
if reinit: # a connection is broken or something
# TODO: log this?
worker_deinit(selector, sockets)
worker_init()
worker_reinit()
blob = pickle.dumps(replies)
return (qid, blob)
def check_timeout(replies):
for resolver, reply in replies.items():
timeouts = worker_state.timeouts
if reply.wire is not None:
timeouts[resolver] = 0
else:
timeouts[resolver] = timeouts.get(resolver, 0) + 1
if timeouts[resolver] >= max_timeouts:
raise RuntimeError(
"Resolver '{}' timed-out {:d} times in a row. "
"Use '--ignore-timeout' to supress this error.".format(
resolver, max_timeouts))
def main():
global ignore_timeout
global max_timeouts
global timeout
global time_delay_min
global time_delay_max
logging.basicConfig(level=logging.INFO)
logging.basicConfig(format='%(levelname)s %(message)s', level=logging.INFO)
parser = argparse.ArgumentParser(
description='read queries from LMDB, send them in parallel to servers '
'listed in configuration file, and record answers into LMDB')
parser.add_argument('-c', '--config', type=cfg.read_cfg, default='respdiff.cfg', dest='cfg',
help='config file (default: respdiff.cfg)')
parser.add_argument('--ignore-timeout', action="store_true",
help='continue despite consecutive timeouts from resolvers')
parser.add_argument('envdir', type=str,
help='LMDB environment to read queries from and to write answers to')
args = parser.parse_args()
......@@ -79,9 +109,14 @@ def main():
rescfg = args.cfg[resname]
resolvers.append((resname, rescfg['ip'], rescfg['transport'], rescfg['port']))
ignore_timeout = args.ignore_timeout
timeout = args.cfg['sendrecv']['timeout']
time_delay_min = args.cfg['sendrecv']['time_delay_min']
time_delay_max = args.cfg['sendrecv']['time_delay_max']
try:
max_timeouts = args.cfg['sendrecv']['max_timeouts']
except KeyError:
pass
stats = {
'start_time': time.time(),
'end_time': None,
......@@ -95,6 +130,7 @@ def main():
qstream = lmdb.key_value_stream(LMDB.QUERIES)
txn = lmdb.env.begin(adb, write=True)
try:
# process queries in parallel
with pool.Pool(
processes=args.cfg['sendrecv']['jobs'],
initializer=worker_init) as p:
......@@ -105,6 +141,9 @@ def main():
if i % 10000 == 0:
logging.info('Received {:d} answers'.format(i))
txn.put(qid, blob)
except RuntimeError as err:
logging.error(err)
sys.exit(1)
finally:
# attempt to preserve data if something went wrong (or not)
txn.commit()
......
......@@ -6,6 +6,8 @@ jobs = 16
# in seconds (float); delay each query by a random time (uniformly distributed) between min and max; set max to 0 to disable
time_delay_min = 0
time_delay_max = 0
# number of maximum consecutive timeouts received from a single resolver before exiting
max_timeouts = 10
[servers]
names = google, surfnet, cznic
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment