Commit e823060f authored by Petr Špaček's avatar Petr Špaček

orchestrator: minor code cleanup

Shared database code is being slowly moved to dbhelper module.
parent 082d357b
import lmdb
QUERIES_DB_NAME = b'queries'
env_open = {
'map_size': 1024**4,
'max_readers': 64,
......@@ -11,6 +15,7 @@ db_open = {
'reverse_key': True
}
def key_stream(lenv, db):
"""
yield all keys from given db
......@@ -23,6 +28,16 @@ def key_stream(lenv, db):
cont = cur.next()
def key_value_stream(lenv, db):
"""
yield all (key, value) pairs from given db
"""
with lenv.begin(db) as txn:
cur = txn.cursor(db)
for key, blob in cur:
yield (key, blob)
def qid2key(qid):
"""Encode query ID to database key"""
return str(qid).encode('ascii')
......
import multiprocessing.pool as pool
import os
import pickle
import sys
import threading
......@@ -21,6 +20,7 @@ resolvers = [
global worker_state
worker_state = {} # shared by all workers
def worker_init(envdir, resolvers, init_timeout):
"""
make sure it works with distincts processes and threads as well
......@@ -58,35 +58,31 @@ def worker_query_lmdb_wrapper(args):
with lenv.begin(adb, write=True) as txn:
txn.put(qid, blob)
def read_queries_lmdb(lenv, qdb):
with lenv.begin(qdb) as txn:
cur = txn.cursor(qdb)
for qid, qwire in cur:
yield (qid, qwire)
#selector.close() # TODO
# init LMDB
def reader_init(envdir):
"""Open LMDB environment and database in read-only mode."""
config = dbhelper.env_open.copy()
config.update({
'path': envdir,
'readonly': True
})
lenv = lmdb.Environment(**config)
qdb = lenv.open_db(key=b'queries', **dbhelper.db_open, create=False)
qdb = lenv.open_db(key=dbhelper.QUERIES_DB_NAME,
create=False,
**dbhelper.db_open)
return (lenv, qdb)
def main():
envdir = sys.argv[1]
lenv, qdb = reader_init(envdir)
qstream = read_queries_lmdb(lenv, qdb)
qstream = dbhelper.key_value_stream(lenv, qdb)
with pool.Pool(
processes=64,
initializer=worker_init,
initargs=[envdir, resolvers, timeout]) as p:
for i in p.imap_unordered(worker_query_lmdb_wrapper, qstream, chunksize=100):
for _ in p.imap_unordered(worker_query_lmdb_wrapper, qstream, chunksize=100):
pass
if __name__ == "__main__":
......
......@@ -15,7 +15,6 @@ import blacklist
import dbhelper
QUERIES_DB_NAME = b'queries'
REPORT_CHUNKS = 10000
......@@ -112,12 +111,12 @@ def main():
help='path where to create LMDB environment')
args = parser.parse_args()
if dbhelper.db_exists(args.envpath, QUERIES_DB_NAME):
if dbhelper.db_exists(args.envpath, dbhelper.QUERIES_DB_NAME):
logging.critical(
'LMDB environment "%s" already contains DB %s! '
'Overwritting it would invalidate data in the environment, '
'terminating.',
args.envpath, QUERIES_DB_NAME)
args.envpath, dbhelper.QUERIES_DB_NAME)
sys.exit(1)
qstream = read_lines(sys.stdin)
......
......@@ -24,7 +24,7 @@ def sock_init(resolvers):
sock.setblocking(False)
sockets.append((name, sock, destination))
selector.register(sock, selectors.EVENT_READ, name)
#print(sockets)
# selector.close() ? # TODO
return selector, sockets
def send_recv_parallel(what, selector, sockets, timeout):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment