Commit 4398e04d authored by Petr Špaček's avatar Petr Špaček

orchestrator: experiment with threads

There is a question whether overhead of 64 processes is not unnecesairly
big for simple task as sending and receiving blobs from network.

This commit adds ability to run orchestrator with threads instead
of worker processes. It will be subject of further tunning.
parent f12dc5e9
import multiprocessing.pool as pool
import multiprocessing.dummy as pool
import os
import sendrecv
......@@ -27,7 +27,7 @@ def find_querydirs(workdir):
#selector.close() # TODO
with pool.Pool(
processes=64,
processes=4,
initializer=sendrecv.worker_init,
initargs=[resolvers, timeout]) as p:
for i in p.imap_unordered(sendrecv.query_resolvers, find_querydirs('.'), chunksize=1000):
......
import os
import selectors
import socket
import threading
import dns.inet
import dns.message
......@@ -46,17 +47,24 @@ def send_recv_parallel(what, selector, sockets, timeout):
return replies
global network_state
network_state = {} # shared by all workers
def worker_init(resolvers, init_timeout):
global selector
global sockets
"""
make sure it works with distincts processes and threads as well
"""
global network_state # initialized to empty dict
global timeout
timeout = init_timeout
selector, sockets = sock_init(resolvers)
tid = threading.current_thread().ident
network_state[tid] = sock_init(resolvers)
def query_resolvers(workdir):
global selector
global sockets
global network_state # initialized in worker_init
global timeout
tid = threading.current_thread().ident
selector, sockets = network_state[tid]
qfilename = os.path.join(workdir, 'q.dns')
#print(qfilename)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment