Commit d422292d authored by Petr Špaček's avatar Petr Špaček

orchestrator: optimize storage of answers

Previously each orchestrator worker wrote its dataset in own LMDB
transaction. For some reason this led to huge database growth.

Now workers submit their data for write back to the main process
and this reduces storage usage by factor of 25.

Related: knot/resolver-benchmarking#18
parent c818a8dc
......@@ -27,19 +27,7 @@ def worker_init(envdir, resolvers, init_timeout):
timeout = init_timeout
tid = threading.current_thread().ident
selector, sockets = sendrecv.sock_init(resolvers)
config = dbhelper.env_open.copy()
config.update({
'path': envdir,
'writemap': True,
'sync': False,
'map_async': True,
'readonly': False
})
lenv = lmdb.Environment(**config)
adb = lenv.open_db(key=dbhelper.ANSWERS_DB_NAME, create=True, **dbhelper.db_open)
worker_state[tid] = (lenv, adb, selector, sockets)
worker_state[tid] = (selector, sockets)
def worker_query_lmdb_wrapper(args):
......@@ -47,27 +35,30 @@ def worker_query_lmdb_wrapper(args):
global timeout
qid, qwire = args
tid = threading.current_thread().ident
lenv, adb, selector, sockets = worker_state[tid]
selector, sockets = worker_state[tid]
replies = sendrecv.send_recv_parallel(qwire, selector, sockets, timeout)
blob = pickle.dumps(replies)
with lenv.begin(adb, write=True) as txn:
txn.put(qid, blob)
return (qid, blob)
def reader_init(envdir):
"""Open LMDB environment and database in read-only mode."""
def lmdb_init(envdir):
"""Open LMDB environment and database for writting."""
config = dbhelper.env_open.copy()
config.update({
'path': envdir,
'readonly': True
'writemap': True,
'sync': False,
'map_async': True,
'readonly': False
})
lenv = lmdb.Environment(**config)
qdb = lenv.open_db(key=dbhelper.QUERIES_DB_NAME,
create=False,
**dbhelper.db_open)
return (lenv, qdb)
adb = lenv.open_db(key=dbhelper.ANSWERS_DB_NAME, create=True, **dbhelper.db_open)
return (lenv, qdb, adb)
def main():
......@@ -100,15 +91,16 @@ def main():
args.envdir, dbhelper.ANSWERS_DB_NAME)
sys.exit(1)
lenv, qdb = reader_init(args.envdir)
lenv, qdb, adb = lmdb_init(args.envdir)
qstream = dbhelper.key_value_stream(lenv, qdb)
with pool.Pool(
processes=args.cfg['sendrecv']['jobs'],
initializer=worker_init,
initargs=[args.envdir, resolvers, args.cfg['sendrecv']['timeout']]) as p:
for _ in p.imap_unordered(worker_query_lmdb_wrapper, qstream, chunksize=100):
pass
with lenv.begin(adb, write=True) as txn:
with pool.Pool(
processes=args.cfg['sendrecv']['jobs'],
initializer=worker_init,
initargs=[args.envdir, resolvers, args.cfg['sendrecv']['timeout']]) as p:
for qid, blob in p.imap(worker_query_lmdb_wrapper, qstream, chunksize=100):
txn.put(qid, blob)
if __name__ == "__main__":
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment