msgloop.py 3.57 KB
Newer Older
1 2
import logging
import inspect
3
import signal
4 5 6 7 8

from collections import namedtuple

import zmq

Robin Obůrka's avatar
Robin Obůrka committed
9 10 11 12
from .network import SN
from .network import get_arg_parser
from .messages import encode_msg, parse_msg
from .exceptions import *
13 14 15 16 17 18 19 20 21 22


logger = logging.getLogger("sn_main")

EnvData = namedtuple("EnvData", [
                                 "name",
                                 "logger",
                                ])


23 24 25 26 27 28 29 30
class SignalReceived(Exception):
    pass


def signal_handler(signum, frame):
    raise SignalReceived()


31
def sn_main(box_name, process, setup=None, teardown=None, argparser=None, args=None):
32
    ctx = SN(zmq.Context.instance(), argparser or get_arg_parser(), args=args)
33 34
    socket_recv, socket_send = detect_and_get_sockets(ctx)

35
    if not socket_recv and not socket_send:
Robin Obůrka's avatar
Robin Obůrka committed
36
        raise LoopError("Neither input nor output socket provided")
37
    if not socket_recv and not inspect.isgeneratorfunction(process):
Robin Obůrka's avatar
Robin Obůrka committed
38
        raise LoopError("Generator is expected for output-only box")
39 40 41

    logger.info("SN main starting loop for %s box", box_name)

42 43
    env_data = init_env_data(box_name)

44 45 46
    for sig in [ signal.SIGHUP, signal.SIGTERM, signal.SIGQUIT, signal.SIGABRT ]:
        signal.signal(sig, signal_handler)

47 48
    try:
        user_data = setup() if setup else None
49
        _sn_main_loop(env_data, user_data, socket_recv, socket_send, setup, process, teardown)
50

51 52 53
    except SignalReceived as e:
        logger.info("Box %s stopped by signal", box_name)

Robin Obůrka's avatar
Robin Obůrka committed
54
    except LoopError as e:
55 56
        raise e

57 58 59 60
    except AssertionError as e:
        # For pytest
        raise e

61 62 63 64 65 66 67
    except Exception as e:
        logger.error("Uncaught exception from loop")
        logger.exception(e)

    finally:
        if teardown:
            teardown(user_data)
68
        ctx.context.destroy()
69 70


71 72 73 74 75 76
def init_env_data(box_name):
    return EnvData(
                   name = box_name,
                   logger = logging.getLogger(box_name)
                   )

77

78
def _sn_main_loop(env_data, user_data, socket_recv, socket_send, setup=None, process=None, teardown=None):
79
    if socket_recv:
80
        try:
81
            while True:
82
                msg_in = socket_recv.recv_multipart()
Robin Obůrka's avatar
Robin Obůrka committed
83
                msg_type, payload = parse_msg(msg_in)
84 85 86 87

                result = process(env_data, user_data, msg_type, payload)
                process_result(socket_send, result)

Robin Obůrka's avatar
Robin Obůrka committed
88
        except InvalidMsgError as e:
89 90
            logger.error("Received broken message")

91 92 93 94
    else:
        for result in process(env_data, user_data):
            process_result(socket_send, result)

95 96

def process_result(socket_send, result):
97
    if not result:
98 99 100
        # The box is output-only or it hasn't any reasonable answer
        return

101
    if not socket_send:
Robin Obůrka's avatar
Robin Obůrka committed
102
        raise LoopError("Box generated output but there is any output socket. Bad configuration?")
103

104 105
    try:
        msg_type, payload = result
Robin Obůrka's avatar
Robin Obůrka committed
106
        msg_out = encode_msg(msg_type, payload)
107 108
        socket_send.send_multipart(msg_out)

Robin Obůrka's avatar
Robin Obůrka committed
109
    except (ValueError, InvalidMsgError) as e:
110 111 112 113
        # Invalid message on input means that a received some bad message and I
        # just want to not fail. Invalid message on output means a
        # programmer error of the box author and I need to distinguish between
        # them.
Robin Obůrka's avatar
Robin Obůrka committed
114
        raise LoopError("Box generates broken messages")
115 116 117 118 119 120 121 122


def detect_and_get_sockets(context):
    socket_recv = None
    socket_send = None

    try:
        socket_recv = context.get_socket("in")
Robin Obůrka's avatar
Robin Obůrka committed
123
    except UndefinedSocketError as e:
124 125 126 127
        pass

    try:
        socket_send = context.get_socket("out")
Robin Obůrka's avatar
Robin Obůrka committed
128
    except UndefinedSocketError as e:
129 130 131
        pass

    return socket_recv, socket_send