Commit 3a9205cc authored by Robin Obůrka's avatar Robin Obůrka Committed by Robin Obůrka

msgloop: Provide object oriented alternative for sn_main()

parent 81a309d5
......@@ -2,5 +2,5 @@ from sn.exceptions import *
from sn.messages import *
from sn.argparser import *
from sn.network import *
from sn.msgloop import sn_main
from sn.msgloop import SNPipelineBox, SNGeneratorBox, SNTerminationBox
import sn.logging
......@@ -13,9 +13,6 @@ from .messages import encode_msg, parse_msg
from .exceptions import *
logger = logging.getLogger("sn_main")
class LoopHardFail(Exception):
pass
......@@ -24,179 +21,200 @@ class LoopFail(Exception):
pass
def sn_main(box_name, process, setup=None, teardown=None, before_first_request=None, argparser=None):
sn_ctx = SN(zmq.Context.instance(), argparser or get_arg_parser())
class SNBox():
def __init__(self, box_name, argparser=None):
# Important provided values into box
self.name = box_name
self.logger = logging.getLogger(box_name)
# Internal context values
self.zmq_ctx = zmq.Context.instance()
self.sn_ctx = SN(self.zmq_ctx, argparser or get_arg_parser())
self.args = self.sn_ctx.args
# User data
self.context = None
# Error management
self.loop_continue = True
self.errors_in_row = 0
# Core methods - Will be implemented in non-abstract boxes
def check_configuration(self):
raise NotImplementedError("check_configuration")
def get_processed_message(self):
raise NotImplementedError("get_processed_message")
def process_result(self, result):
raise NotImplementedError("process_result")
# Public API for boxes - will be optionally implemented in final boxes
def setup(self):
return {}
def teardown(self):
pass
context = None
try:
user_data = get_user_data(setup, sn_ctx.args)
def before_first_request(self):
pass
context = build_context(box_name, sn_ctx, user_data)
check_configuration(context, process)
def process(self, msg_type, payload):
raise NotImplementedError("process")
logger.info("SN main starting loop for %s box", box_name)
register_signals(context)
# Provided functionality - should be final implementation
def run(self):
self.check_configuration()
try:
self.context = self.get_user_data()
_sn_main_loop(context, before_first_request, process)
self.logger.info("SN main starting loop for %s box", self.name)
self.register_signals()
except LoopHardFail as e:
logger.error("Hard Fail of box: %s", context.name)
logger.exception(e)
# Finally will be called, because sys.exit() raises exception that will be uncaught.
sys.exit(1)
self.before_first_request()
self.run_loop()
except KeyboardInterrupt:
pass
except LoopHardFail as e:
self.logger.error("Hard Fail of box: %s", self.name)
self.logger.exception(e)
# Finally will be called, because sys.exit() raises exception that will be uncaught.
sys.exit(1)
finally:
if context:
# Is possible that context wasn't built yet (e.g. error in setup callback)
if teardown:
teardown(context)
teardown_context(context)
except KeyboardInterrupt:
pass
finally:
self.teardown_box()
self.teardown()
def get_user_data(setup, args):
if setup:
number_of_arguments = len((inspect.signature(setup).parameters))
if number_of_arguments == 1:
user_data = setup(args)
else:
# Let the function fail in implicit way in case that setup callback
# has more than 1 argument
user_data = setup()
def get_user_data(self):
user_data = self.setup()
if isinstance(user_data, dict):
return user_data
return SimpleNamespace(**user_data)
else:
raise SetupError("Setup function didn't return a dictionary")
return {}
def register_signals(self):
def signal_handler(signum, frame):
self.logger.info("Signal %s received", signum)
self.loop_continue = False
for sig in [ signal.SIGHUP, signal.SIGTERM, signal.SIGQUIT, signal.SIGABRT ]:
signal.signal(sig, signal_handler)
def build_context(box_name, sn_ctx, user_data):
socket_recv = get_socket(sn_ctx, "in")
socket_send = get_socket(sn_ctx, "out")
def teardown_box(self):
self.zmq_ctx.destroy()
ctx = {
"name": box_name,
"logger": logging.getLogger(box_name),
"loop_continue": True,
"errors_in_row": 0,
"sn_ctx": sn_ctx,
"zmq_ctx": sn_ctx.context,
"args": sn_ctx.args,
"socket_recv": socket_recv,
"socket_send": socket_send,
}
def run_loop(self):
while self.loop_continue:
try:
result = self.get_processed_message()
self.process_result(result)
self.errors_in_row = 0
for k, v in user_data.items():
if k in ctx:
raise SetupError("Used reserved word in user_data: %s", k)
else:
ctx[k] = v
except StopIteration:
self.logger.info("Box %s raised StopIteration", self.name)
break
return SimpleNamespace(**ctx)
except (SetupError, NotImplementedError) as e:
raise e
except Exception as e:
self.logger.error("Uncaught exception from loop: %s", type(e).__name__)
self.logger.exception(e)
def get_socket(context, sock_name):
socket = None
try:
socket = context.get_socket(sock_name)
self.errors_in_row += 1
if self.errors_in_row > 10:
raise LoopHardFail("Many errors in row.")
except UndefinedSocketError as e:
pass
# Helper methods
def get_socket(self, sock_name):
socket = None
try:
socket = self.sn_ctx.get_socket(sock_name)
return socket
except UndefinedSocketError as e:
pass
return socket
def check_configuration(context, process):
if not context.socket_recv and not context.socket_send:
raise SetupError("Neither input nor output socket provided")
if not context.socket_recv and not inspect.isgeneratorfunction(process):
raise SetupError("Generator is expected for output-only box")
class SNPipelineBox(SNBox):
def __init__(self, box_name, argparser=None):
super().__init__(box_name, argparser)
self.socket_recv = self.get_socket("in")
self.socket_send = self.get_socket("out")
def teardown_context(context):
if context.socket_recv:
context.socket_recv.close()
if context.socket_send:
context.socket_send.close()
context.zmq_ctx.destroy()
def check_configuration(self):
if not self.socket_recv and not self.socket_send:
raise SetupError("Neither input nor output socket provided")
def get_processed_message(self):
msg = self.socket_recv.recv_multipart()
msg_type, payload = parse_msg(msg)
def _sn_main_loop(context, before_first_request, process):
if inspect.isgeneratorfunction(process):
get_processed_message = processed_message_from_generator(context, process)
else:
get_processed_message = processed_message_from_function(context, process)
return self.process(msg_type, payload)
if before_first_request:
result = before_first_request(context)
if result:
process_result(context.socket_send, result)
def process_result(self, result):
if not result:
# The box hasn't any reasonable answer
return
while context.loop_continue:
try:
result = get_processed_message()
process_result(context.socket_send, result)
context.errors_in_row = 0
except StopIteration:
context.logger.info("Box %s raised StopIteration", context.name)
break
msg_type, payload = result
msg_out = encode_msg(msg_type, payload)
self.socket_send.send_multipart(msg_out)
except SetupError as e:
raise e
except (ValueError, InvalidMsgError):
raise LoopFail("Generated broken output message. Possibly bug in box.")
except Exception as e:
logger.error("Uncaught exception from loop: %s", type(e).__name__)
logger.exception(e)
context.errors_in_row += 1
if context.errors_in_row > 10:
raise LoopHardFail("Many errors in row.")
class SNGeneratorBox(SNBox):
def __init__(self, box_name, argparser=None):
super().__init__(box_name, argparser)
self.socket_send = self.get_socket("out")
# Ensure about process() method before try to get iterator
self.check_configuration()
def processed_message_from_generator(context, process):
iterator = process(context)
def get_from_generator():
return next(iterator)
self.process_iterator = self.process()
return get_from_generator
def check_configuration(self):
if not self.socket_send:
raise SetupError("Output socket wasn't provided")
if not inspect.isgeneratorfunction(self.process):
raise SetupError("Generator is expected for output-only box")
def get_processed_message(self):
return next(self.process_iterator)
def processed_message_from_function(context, process):
def get_from_function():
msg = context.socket_recv.recv_multipart()
msg_type, payload = parse_msg(msg)
return process(context, msg_type, payload)
return get_from_function
def process_result(self, result):
if not result:
# The box hasn't any reasonable answer
return
try:
msg_type, payload = result
msg_out = encode_msg(msg_type, payload)
self.socket_send.send_multipart(msg_out)
def process_result(socket_send, result):
if not result:
# The box is output-only or it hasn't any reasonable answer
return
except (ValueError, InvalidMsgError):
raise LoopFail("Generated broken output message. Possibly bug in box.")
if not socket_send:
raise SetupError("Box generated output but there is any output socket. Bad configuration?")
try:
msg_type, payload = result
msg_out = encode_msg(msg_type, payload)
socket_send.send_multipart(msg_out)
class SNTerminationBox(SNBox):
def __init__(self, box_name, argparser=None):
super().__init__(box_name, argparser)
self.socket_recv = self.get_socket("in")
except (ValueError, InvalidMsgError):
raise LoopFail("Generated broken output message. Possibly bug in box.")
def check_configuration(self):
if not self.socket_recv:
raise SetupError("Input socket wasn't provided")
def get_processed_message(self):
msg = self.socket_recv.recv_multipart()
msg_type, payload = parse_msg(msg)
def register_signals(context):
def signal_handler(signum, frame):
context.logger.info("Signal %s received", signum)
context.loop_continue = False
return self.process(msg_type, payload)
for sig in [ signal.SIGHUP, signal.SIGTERM, signal.SIGQUIT, signal.SIGABRT ]:
signal.signal(sig, signal_handler)
def process_result(self, result):
if result:
raise LoopFail("Input-only box generated output message. Possibly bug in box.")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment