Commit dde46712 authored by Petr Špaček's avatar Petr Špaček

orchestrator: optimize write performance at cost of crash-safety

These tests are usually quite easy to repeat so we do not care that much
about crash-proofness.
parent b2e63679
import multiprocessing.dummy as pool
import multiprocessing.pool as pool
import os
import pickle
import sys
......@@ -35,6 +35,8 @@ def worker_init(envdir, resolvers, init_timeout):
config.update({
'path': envdir,
'writemap': True,
'sync': False,
'map_async': True,
'readonly': False
})
lenv = lmdb.Environment(**config)
......@@ -42,6 +44,7 @@ def worker_init(envdir, resolvers, init_timeout):
worker_state[tid] = (lenv, adb, selector, sockets)
def worker_query_lmdb_wrapper(args):
global worker_state # initialized in worker_init
global timeout
......@@ -51,6 +54,7 @@ def worker_query_lmdb_wrapper(args):
replies = sendrecv.send_recv_parallel(qwire, selector, sockets, timeout)
blob = pickle.dumps(replies)
with lenv.begin(adb, write=True) as txn:
txn.put(qid, blob)
......@@ -79,10 +83,10 @@ def main():
qstream = read_queries_lmdb(lenv, qdb)
with pool.Pool(
processes=4,
processes=64,
initializer=worker_init,
initargs=[envdir, resolvers, timeout]) as p:
for i in p.imap_unordered(worker_query_lmdb_wrapper, qstream, chunksize=10):
for i in p.imap_unordered(worker_query_lmdb_wrapper, qstream, chunksize=100):
pass
if __name__ == "__main__":
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment