Commit e0446f2c authored by Tomas Krizek's avatar Tomas Krizek

qprep: refactor lmdb handling

Instead of relying on global objects, handle all transactions in the
main process, similarly to orcherstrator.py.
parent 97972aa2
......@@ -57,8 +57,8 @@ def wrk_process_line(args: Tuple[int, str, str]) -> Tuple[bytes, bytes]:
wire = wire_from_text(line)
except (ValueError, struct.error, dns.exception.DNSException) as ex:
logging.error('Invalid query "%s": %s (skipping query ID %d)', line, ex, qid)
return
wrk_process_wire_packet(qid, wire, line)
return None, None
return wrk_process_wire_packet(qid, wire, line)
def wrk_process_packet(args: Tuple[int, bytes, str]):
......@@ -69,20 +69,22 @@ def wrk_process_packet(args: Tuple[int, bytes, str]):
wrk_process_wire_packet(qid, wire, log_repr)
def wrk_process_wire_packet(qid: int, wire_packet: bytes, log_repr: str):
def wrk_process_wire_packet(qid: int, wire_packet: bytes, log_repr: str) -> Tuple[bytes, bytes]:
"""
Worker: Check if given packet is blacklisted and save it into lmdb db
Worker: Return packet's data if it's not blacklisted
:arg qid number of packet
:arg wire_packet packet in binary data
:arg log_repr representation of packet for logs
"""
if not blacklist.is_blacklisted(wire_packet):
wrk_lmdb_write(qid, wire_packet)
key = dbhelper.qid2key(qid)
return key, wire_packet
else:
logging.debug('Query "%s" blacklisted (skipping query ID %d)',
log_repr if log_repr else repr(blacklist.extract_packet(wire_packet)),
qid)
return None, None
def int_or_fromtext(value, fromtext):
......@@ -107,33 +109,17 @@ def wire_from_text(text):
return msg.to_wire()
def wrk_lmdb_init(envdir):
"""
Worker: initialize LMDB env and open 'queries' database
"""
global env
global db
def lmdb_init(envdir):
"""Open LMDB environment and database"""
config = dbhelper.env_open.copy()
config.update({
'path': envdir,
'sync': False, # unsafe but fast
'writemap': True # we do not care, this is a new database
})
env = lmdb.Environment(**config)
db = env.open_db(key=b'queries', **dbhelper.db_open)
def wrk_lmdb_write(qid: int, wire: bytes):
"""
Worker: write query wire format into database
"""
global env
global db
key = dbhelper.qid2key(qid)
with env.begin(db, write=True) as txn:
txn.put(key, wire)
lenv = lmdb.Environment(**config)
qdb = lenv.open_db(key=dbhelper.QUERIES_DB_NAME, **dbhelper.db_open)
return (lenv, qdb)
def main():
......@@ -165,18 +151,23 @@ def main():
'terminating.',
args.envpath, dbhelper.QUERIES_DB_NAME)
sys.exit(1)
with pool.Pool(initializer=wrk_lmdb_init, initargs=(args.envpath,)) as workers:
if args.in_format == 'text':
data_stream = read_lines(sys.stdin)
method = wrk_process_line
elif args.in_format == 'pcap':
data_stream = parse_pcap(args.pcap_file)
method = wrk_process_packet
else:
logging.error('unknown in-format, use "text" or "pcap"')
sys.exit(1)
for _ in workers.imap_unordered(method, data_stream, chunksize=1000):
pass
lenv, qdb = lmdb_init(args.envpath)
with lenv.begin(qdb, write=True) as txn:
with pool.Pool() as workers:
if args.in_format == 'text':
data_stream = read_lines(sys.stdin)
method = wrk_process_line
elif args.in_format == 'pcap':
data_stream = parse_pcap(args.pcap_file)
method = wrk_process_packet
else:
logging.error('unknown in-format, use "text" or "pcap"')
sys.exit(1)
for key, wire in workers.imap(method, data_stream, chunksize=1000):
if key is not None:
txn.put(key, wire)
if __name__ == '__main__':
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment