Commit 3e6e2d44 authored by Robin Obůrka's avatar Robin Obůrka

msgloop: Change style of context allocation and its management

parent fa6920bb
...@@ -20,35 +20,20 @@ class LoopHardFail(Exception): ...@@ -20,35 +20,20 @@ class LoopHardFail(Exception):
pass pass
def sn_main(box_name, process, setup=None, teardown=None, argparser=None, args=None):
zmq_ctx = zmq.Context.instance()
sn_ctx = SN(zmq_ctx, argparser or get_arg_parser(), args=args)
socket_recv = get_socket(sn_ctx, "in")
socket_send = get_socket(sn_ctx, "out")
if not socket_recv and not socket_send:
raise LoopError("Neither input nor output socket provided")
if not socket_recv and not inspect.isgeneratorfunction(process):
raise LoopError("Generator is expected for output-only box")
logger.info("SN main starting loop for %s box", box_name) def sn_main(box_name, process, setup=None, teardown=None, argparser=None, args=None):
sn_ctx = SN(zmq.Context.instance(), argparser or get_arg_parser(), args=args)
data_for_context = {
"zmq_ctx": zmq_ctx,
"sn_ctx": sn_ctx,
"args": sn_ctx.args,
"socket_recv": socket_recv,
"socket_send": socket_send,
}
context = None context = None
try: try:
user_data = get_user_data(setup) user_data = get_user_data(setup)
context = build_context(box_name, data_for_context, user_data)
register_signals(context) context = build_context(box_name, sn_ctx, user_data)
check_configuration(context, process)
logger.info("SN main starting loop for %s box", box_name)
register_signals(context)
_sn_main_loop(context, process) _sn_main_loop(context, process)
except LoopHardFail as e: except LoopHardFail as e:
...@@ -66,53 +51,66 @@ def sn_main(box_name, process, setup=None, teardown=None, argparser=None, args=N ...@@ -66,53 +51,66 @@ def sn_main(box_name, process, setup=None, teardown=None, argparser=None, args=N
teardown_context(context) teardown_context(context)
def get_socket(context, sock_name):
socket = None
try:
socket = context.get_socket(sock_name)
except UndefinedSocketError as e:
pass
return socket
def get_user_data(setup): def get_user_data(setup):
if setup: if setup:
user_data = setup() user_data = setup()
if isinstance(user_data, dict): if isinstance(user_data, dict):
return user_data return user_data
else: else:
raise LoopError("Setup function didn't return a dictionary") raise SetupError("Setup function didn't return a dictionary")
return {} return {}
def build_context(box_name, context_data, user_data): def build_context(box_name, sn_ctx, user_data):
socket_recv = get_socket(sn_ctx, "in")
socket_send = get_socket(sn_ctx, "out")
ctx = { ctx = {
"name": box_name, "name": box_name,
"logger": logging.getLogger(box_name), "logger": logging.getLogger(box_name),
"loop_continue": True, "loop_continue": True,
"errors_in_row": 0, "errors_in_row": 0,
"sn_ctx": sn_ctx,
"zmq_ctx": sn_ctx.context,
"args": sn_ctx.args,
"socket_recv": socket_recv,
"socket_send": socket_send,
} }
ctx.update(context_data)
for k, v in user_data.items(): for k, v in user_data.items():
if k in ctx: if k in ctx:
raise LoopError("Used reserved word in user_data: %s", k) raise SetupError("Used reserved word in user_data: %s", k)
else: else:
ctx[k] = v ctx[k] = v
return SimpleNamespace(**ctx) return SimpleNamespace(**ctx)
def teardown_context(ctx): def get_socket(context, sock_name):
if ctx.socket_recv: socket = None
ctx.socket_recv.close() try:
if ctx.socket_send: socket = context.get_socket(sock_name)
ctx.socket_send.close()
ctx.zmq_ctx.destroy() except UndefinedSocketError as e:
pass
return socket
def check_configuration(context, process):
if not context.socket_recv and not context.socket_send:
raise SetupError("Neither input nor output socket provided")
if not context.socket_recv and not inspect.isgeneratorfunction(process):
raise SetupError("Generator is expected for output-only box")
def teardown_context(context):
if context.socket_recv:
context.socket_recv.close()
if context.socket_send:
context.socket_send.close()
context.zmq_ctx.destroy()
def _sn_main_loop(context, process): def _sn_main_loop(context, process):
...@@ -165,10 +163,10 @@ def process_result(socket_send, result): ...@@ -165,10 +163,10 @@ def process_result(socket_send, result):
# TODO: Hard fail on InvalidMsgError in box output? # TODO: Hard fail on InvalidMsgError in box output?
def register_signals(ctx): def register_signals(context):
def signal_handler(signum, frame): def signal_handler(signum, frame):
ctx.logger.info("Signal %s received", signum) context.logger.info("Signal %s received", signum)
ctx.loop_continue = False context.loop_continue = False
for sig in [ signal.SIGHUP, signal.SIGTERM, signal.SIGQUIT, signal.SIGABRT ]: for sig in [ signal.SIGHUP, signal.SIGTERM, signal.SIGQUIT, signal.SIGABRT ]:
signal.signal(sig, signal_handler) signal.signal(sig, signal_handler)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment