Commit 20cbf329 authored by Tomas Krizek's avatar Tomas Krizek

qprep: commit LMDB transaction on SIGINT

Closes #17
parent fe6fa360
Pipeline #36846 passed with stage
in 1 minute and 25 seconds
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
import argparse import argparse
import logging import logging
import multiprocessing.pool as pool import multiprocessing.pool as pool
import signal
import struct import struct
import sys import sys
from typing import Optional, Tuple from typing import Optional, Tuple
...@@ -139,8 +140,11 @@ def main(): ...@@ -139,8 +140,11 @@ def main():
with LMDB(args.envdir) as lmdb: with LMDB(args.envdir) as lmdb:
qdb = lmdb.open_db(LMDB.QUERIES, create=True, check_notexists=True) qdb = lmdb.open_db(LMDB.QUERIES, create=True, check_notexists=True)
with lmdb.env.begin(qdb, write=True) as txn: txn = lmdb.env.begin(qdb, write=True)
with pool.Pool() as workers: try:
with pool.Pool(
initializer=lambda: signal.signal(signal.SIGINT, signal.SIG_IGN)
) as workers:
if args.in_format == 'text': if args.in_format == 'text':
data_stream = read_lines(sys.stdin) data_stream = read_lines(sys.stdin)
method = wrk_process_line method = wrk_process_line
...@@ -153,6 +157,16 @@ def main(): ...@@ -153,6 +157,16 @@ def main():
for key, wire in workers.imap(method, data_stream, chunksize=1000): for key, wire in workers.imap(method, data_stream, chunksize=1000):
if key is not None: if key is not None:
txn.put(key, wire) txn.put(key, wire)
except KeyboardInterrupt as err:
logging.info('SIGINT received, exiting...')
sys.exit(130)
except RuntimeError as err:
logging.error(err)
sys.exit(1)
finally:
# attempt to preserve data if something went wrong (or not)
logging.debug('Comitting LMDB transaction...')
txn.commit()
if __name__ == '__main__': if __name__ == '__main__':
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment