Commit a167361f authored by Tomas Krizek's avatar Tomas Krizek

clarify QID/QKey usage

parent b7c726b9
from typing import Dict, Any, Tuple, Generator # NOQA: needed for type hint in comment
import os
import struct
from typing import Any, Dict, Iterator, Tuple # NOQA: needed for type hint in comment
import lmdb
from dataformat import QID
def qid2key(qid):
QKey = bytes
def qid2key(qid: QID) -> QKey:
"""Encode query ID to database key"""
return struct.pack('@I', qid) # native integer
def key2qid(key):
def key2qid(key: QKey) -> QID:
return struct.unpack('@I', key)[0]
......@@ -104,7 +109,7 @@ class LMDB:
except KeyError:
raise RuntimeError("Database {} isn't open!".format(dbname.decode('utf-8')))
def key_stream(self, dbname: bytes):
def key_stream(self, dbname: bytes) -> Iterator[bytes]:
"""yield all keys from given db"""
db = self.get_db(dbname)
with self.env.begin(db) as txn:
......@@ -112,7 +117,7 @@ class LMDB:
for key in cur.iternext(keys=True, values=False):
yield key
def key_value_stream(self, dbname: bytes):
def key_value_stream(self, dbname: bytes) -> Iterator[Tuple[bytes, bytes]]:
"""yield all (key, value) pairs from given db"""
db = self.get_db(dbname)
with self.env.begin(db) as txn:
......
......@@ -63,12 +63,12 @@ def main():
processes=args.cfg['sendrecv']['jobs'],
initializer=sendrecv.worker_init) as p:
i = 0
for qid, blob in p.imap(sendrecv.worker_perform_query, qstream,
chunksize=100):
for qkey, blob in p.imap(sendrecv.worker_perform_query, qstream,
chunksize=100):
i += 1
if i % 10000 == 0:
logging.info('Received {:d} answers'.format(i))
txn.put(qid, blob)
txn.put(qkey, blob)
except RuntimeError as err:
logging.error(err)
sys.exit(1)
......
......@@ -23,7 +23,8 @@ from typing import Any, Dict, List, Mapping, Sequence, Tuple # noqa: type hints
import dns.inet
import dns.message
from dataformat import Reply, QID, WireFormat
from dataformat import Reply, WireFormat
from dbhelper import QKey
ResolverID = str
......@@ -91,9 +92,9 @@ def worker_deinit() -> None:
sck.close()
def worker_perform_query(args: Tuple[QID, WireFormat]) -> Tuple[QID, RepliesBlob]:
def worker_perform_query(args: Tuple[QKey, WireFormat]) -> Tuple[QKey, RepliesBlob]:
"""DNS query performed by orchestrator"""
qid, qwire = args
qkey, qwire = args
selector = __worker_state.selector
sockets = __worker_state.sockets
......@@ -111,7 +112,7 @@ def worker_perform_query(args: Tuple[QID, WireFormat]) -> Tuple[QID, RepliesBlob
worker_reinit()
blob = pickle.dumps(replies)
return qid, blob
return qkey, blob
def get_resolvers(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment