Commit 5169ffaa authored by Tomas Krizek's avatar Tomas Krizek Committed by Petr Špaček

orchestrator: use globals properly

parent beee150c
......@@ -15,20 +15,15 @@ import dbhelper
import sendrecv
global worker_state
worker_state = {} # shared by all workers
resolvers = []
timeout = None
def worker_init(init_resolvers, init_timeout):
def worker_init():
"""
make sure it works with distincts processes and threads as well
"""
global worker_state # initialized to empty dict
global resolvers
global timeout
resolvers = init_resolvers
timeout = init_timeout
tid = threading.current_thread().ident
selector, sockets = sendrecv.sock_init(resolvers)
worker_state[tid] = (selector, sockets)
......@@ -44,8 +39,6 @@ def worker_deinit(selector, sockets):
def worker_query_lmdb_wrapper(args):
global worker_state # initialized in worker_init
global timeout
qid, qwire = args
tid = threading.current_thread().ident
......@@ -55,7 +48,7 @@ def worker_query_lmdb_wrapper(args):
if reinit: # a connection is broken or something
# TODO: log this?
worker_deinit(selector, sockets)
worker_init(resolvers, timeout)
worker_init()
blob = pickle.dumps(replies)
return (qid, blob)
......@@ -81,6 +74,8 @@ def lmdb_init(envdir):
def main():
global timeout
parser = argparse.ArgumentParser(
description='read queries from LMDB, send them in parallel to servers '
'listed in configuration file, and record answers into LMDB')
......@@ -90,11 +85,12 @@ def main():
help='LMDB environment to read queries from and to write answers to')
args = parser.parse_args()
resolvers = []
for resname in args.cfg['servers']['names']:
rescfg = args.cfg[resname]
resolvers.append((resname, rescfg['ip'], rescfg['transport'], rescfg['port']))
timeout = args.cfg['sendrecv']['timeout']
if not dbhelper.db_exists(args.envdir, dbhelper.QUERIES_DB_NAME):
logging.critical(
'LMDB environment "%s does not contain DB %s! '
......@@ -120,8 +116,7 @@ def main():
with lenv.begin(adb, write=True) as txn:
with pool.Pool(
processes=args.cfg['sendrecv']['jobs'],
initializer=worker_init,
initargs=[resolvers, args.cfg['sendrecv']['timeout']]) as p:
initializer=worker_init) as p:
for qid, blob in p.imap(worker_query_lmdb_wrapper, qstream, chunksize=100):
txn.put(qid, blob)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment