Commit eddaeb61 authored by Tomas Krizek's avatar Tomas Krizek

orchestrator: keep answers data in case of crash / kdb interrupt

parent 30a00c74
......@@ -82,20 +82,25 @@ def main():
sdb = lmdb.open_db(LMDB.STATS, create=True)
qstream = lmdb.key_value_stream(LMDB.QUERIES)
with lmdb.env.begin(adb, write=True) as txn:
txn = lmdb.env.begin(adb, write=True)
try:
with pool.Pool(
processes=args.cfg['sendrecv']['jobs'],
initializer=worker_init) as p:
i = 0
for qid, blob in p.imap(worker_query_lmdb_wrapper, qstream, chunksize=100):
for qid, blob in p.imap(worker_query_lmdb_wrapper, qstream,
chunksize=100):
i += 1
if i % 10000 == 0:
logging.info('Received {:d} answers'.format(i))
txn.put(qid, blob)
finally:
# attempt to preserve data if something went wrong (or not)
txn.commit()
stats['end_time'] = time.time()
with lmdb.env.begin(sdb, write=True) as txn:
txn.put(b'global_stats', pickle.dumps(stats))
stats['end_time'] = time.time()
with lmdb.env.begin(sdb, write=True) as txn:
txn.put(b'global_stats', pickle.dumps(stats))
if __name__ == "__main__":
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment