Commit 47fd7c7b authored by Petr Špaček's avatar Petr Špaček

orchestrator: reads queries and writes answers from/to LMDB

parent 7c8fc998
import multiprocessing.dummy as pool
import os
import pickle
import sys
import threading
import lmdb
import lmdbcfg
import sendrecv
timeout = 5
......@@ -12,23 +18,72 @@ resolvers = [
# find query files
def find_querydirs(workdir):
i = 0
for root, dirs, files in os.walk('.'):
dirs.sort()
if not 'q.dns' in files:
continue
#print('yield %s' % root)
yield root
i += 1
if i % 10000 == 0:
print(i)
global worker_state
worker_state = {} # shared by all workers
def worker_init(envdir, resolvers, init_timeout):
"""
make sure it works with distincts processes and threads as well
"""
global worker_state # initialized to empty dict
global timeout
timeout = init_timeout
tid = threading.current_thread().ident
selector, sockets = sendrecv.sock_init(resolvers)
config = lmdbcfg.env_open.copy()
config.update({
'path': envdir,
'writemap': True,
'readonly': False
})
lenv = lmdb.Environment(**config)
adb = lenv.open_db(key=b'answers', create=True, **lmdbcfg.db_open)
worker_state[tid] = (lenv, adb, selector, sockets)
def worker_query_lmdb_wrapper(args):
global worker_state # initialized in worker_init
global timeout
qid, qwire = args
tid = threading.current_thread().ident
lenv, adb, selector, sockets = worker_state[tid]
replies = sendrecv.send_recv_parallel(qwire, selector, sockets, timeout)
blob = pickle.dumps(replies)
with lenv.begin(adb, write=True) as txn:
txn.put(qid, blob)
def read_queries_lmdb(lenv, qdb):
with lenv.begin(qdb) as txn:
cur = txn.cursor(qdb)
for qid, qwire in cur:
yield (qid, qwire)
#selector.close() # TODO
with pool.Pool(
processes=4,
initializer=sendrecv.worker_init,
initargs=[resolvers, timeout]) as p:
for i in p.imap_unordered(sendrecv.query_resolvers, find_querydirs('.'), chunksize=1000):
pass
# init LMDB
def reader_init(envdir):
config = lmdbcfg.env_open.copy()
config.update({
'path': envdir,
'readonly': True
})
lenv = lmdb.Environment(**config)
qdb = lenv.open_db(key=b'queries', **lmdbcfg.db_open, create=False)
return (lenv, qdb)
def main():
envdir = sys.argv[1]
lenv, qdb = reader_init(envdir)
qstream = read_queries_lmdb(lenv, qdb)
with pool.Pool(
processes=4,
initializer=worker_init,
initargs=[envdir, resolvers, timeout]) as p:
for i in p.imap_unordered(worker_query_lmdb_wrapper, qstream, chunksize=10):
pass
if __name__ == "__main__":
main()
import os
import selectors
import socket
import threading
import dns.inet
import dns.message
......@@ -29,11 +28,11 @@ def sock_init(resolvers):
return selector, sockets
def send_recv_parallel(what, selector, sockets, timeout):
replies = []
replies = {}
for _, sock, destination in sockets:
sock.sendto(what, destination)
# receive replies
# receive replies
while len(replies) != len(sockets):
events = selector.select(timeout=timeout) # BLEH! timeout shortening
for key, _ in events:
......@@ -41,38 +40,9 @@ def send_recv_parallel(what, selector, sockets, timeout):
sock = key.fileobj
(wire, from_address) = sock.recvfrom(65535)
#assert len(wire) > 14
replies.append((name, wire))
# TODO: check msgid to detect delayed answers
replies[name] = wire
if not events:
break # TIMEOUT
return replies
global network_state
network_state = {} # shared by all workers
def worker_init(resolvers, init_timeout):
"""
make sure it works with distincts processes and threads as well
"""
global network_state # initialized to empty dict
global timeout
timeout = init_timeout
tid = threading.current_thread().ident
network_state[tid] = sock_init(resolvers)
def query_resolvers(workdir):
global network_state # initialized in worker_init
global timeout
tid = threading.current_thread().ident
selector, sockets = network_state[tid]
qfilename = os.path.join(workdir, 'q.dns')
#print(qfilename)
with open(qfilename, 'rb') as qfile:
qwire = qfile.read()
replies = send_recv_parallel(qwire, selector, sockets, timeout)
for answer in replies:
afilename = os.path.join(workdir, "%s.dns" % answer[0])
with open(afilename, 'wb') as afile:
afile.write(answer[1])
#print('%s DONE' % qfilename)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment