network.py 8.39 KB
Newer Older
Martin Prudek's avatar
Martin Prudek committed
1 2 3 4 5 6
#!/usr/bin/env python3
import argparse

import msgpack
import zmq

7 8
from collections import namedtuple

Martin Prudek's avatar
Martin Prudek committed
9 10 11 12

class InvalidMsgError(Exception):
    pass

13

Martin Prudek's avatar
Martin Prudek committed
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
def parse_msg(data):
    """ Gets a Sentinel-type ZMQ message and parses message type and its
    payload.
    """
    try:
        msg_type = str(data[0], encoding="UTF-8")
        payload = msgpack.unpackb(data[1], encoding="UTF-8")

    except IndexError:
        raise InvalidMsgError("Not enough parts in message")

    return msg_type, payload


def encode_msg(msg_type, data):
29 30 31
    """ Gets string message type and its's string data. Then, both of them are
    packed to be prepared for zmg.send_multipart().
    """
Martin Prudek's avatar
Martin Prudek committed
32 33 34 35 36
    b = bytes(msg_type, encoding="UTF-8")
    msg = msgpack.packb(data)

    return (b, msg)

37 38

def get_arg_parser():
39 40
    """ Creates own arguments parser and return it as an object.
    """
41 42
    parser = argparse.ArgumentParser()
    parser.add_argument('--resource', nargs=1, action='append')
43
    parser.add_argument('--disable-ipv6', action='store_true')
44 45 46 47
    return parser


def parse(aparser):
48
    return aparser.parse_args()
Martin Prudek's avatar
Martin Prudek committed
49 50 51 52 53 54 55


def resource_parser(config_list):
    """ Gets a tuple of command line arguments - each for one socket connection
    in the form {sockname,[conn/bind],SOCK_TYPE,IP,PORT}.
    Returns a dictionary filled with zmq socket configs in the form
    {name:[connection1, connection2,...]} as each ZMQ socket can handle
56
    multiple connections. Each connection is a namedtuple.
Martin Prudek's avatar
Martin Prudek committed
57
    """
58 59 60
    Connection = namedtuple(
            'Connection',
            ['direction', 'sock_type', 'address', 'port']
61
    )
Martin Prudek's avatar
Martin Prudek committed
62 63
    resources = dict()
    for config in config_list:
64
        config = config[0]
Martin Prudek's avatar
Martin Prudek committed
65 66 67 68
        splitted = config.split(",")
        if len(splitted) == 5:
            if not splitted[0] in resources:
                resources[splitted[0]] = list()
69
            resources[splitted[0]].append(Connection(*splitted[1:]))
Martin Prudek's avatar
Martin Prudek committed
70
        else:
71 72
            raise SockConfigError("Resource {arg} is\
                    invalid.".format(arg=config))
Martin Prudek's avatar
Martin Prudek committed
73 74 75 76 77 78 79
    return resources


class SockConfigError(Exception):
    pass


80 81 82 83 84
class Resources:
    """ This class serves as a container for all resources. This class provides
    an API-like interface for requesting ZMQ sockets based on available
    resources.
    """
85
    def __init__(self, ctx, args=get_arg_parser().parse_args()):
86 87 88
        """ Gets a list of command line arguments - each for one socket
        connection and creates a dict of ZMQ socket configs.
        """
89
        self.context = ctx
90
        self.sock_configs = dict()
91
        res_avail = resource_parser(args.resource)
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108

        for res in res_avail:
            sc = None
            for connection in res_avail[res]:
                if sc:
                    sc.add_connection(
                        connection.sock_type,
                        connection.direction,
                        connection.address,
                        connection.port
                    )
                else:
                    sc = SockConfig(
                        self.context,
                        connection.sock_type,
                        connection.direction,
                        connection.address,
109 110
                        connection.port,
                        ipv6=not args.disable_ipv6
111 112 113
                    )
            self.sock_configs[res] = sc

114 115 116 117 118 119 120
    def get_socket(self, *sockets):
        """ Gets multiple socket names in 'get_socket(name1, name2,...)'
        or 'get_socket((name1, TYPE1), name2, (name3,TYPE3),...)' or any of
        their combinations. Returns list of all available ZMQ sockets with the
        required names. Exception is risen when there is no socket with the
        desired name or when the socket is of another type.
        """
121
        ret = list()
122 123 124
        for socket in sockets:
            if type(socket) == tuple:
                sock_name = socket[0]
125
            else:
126 127 128 129
                sock_name = socket
            if sock_name in self.sock_configs:
                if (
                    type(socket) == tuple
130
                    and not self.sock_configs[sock_name].is_type(socket[1])
131 132 133 134 135 136
                ):
                    raise SockConfigError("Socket type does not match\
                        required value!")
                if not self.sock_configs[sock_name].socket:
                    self.sock_configs[sock_name].connect()
                ret.append(self.sock_configs[sock_name].socket)
137
            else:
138 139
                raise SockConfigError("Resource {arg} not\
                    provided.".format(arg=sock_name))
140 141
        if len(ret) == 1:
            return ret[0]
142
        else:
143
            return ret
144 145


Martin Prudek's avatar
Martin Prudek committed
146 147 148 149 150 151
class SockConfig:
    # a ZMQ feature: one socket can have a multiple connections
    class ZMQConnection:
        def __init__(self, addr, port):
            self.addr = addr
            self.port = port
152
            self.connection = self.get_connection_string()
Martin Prudek's avatar
Martin Prudek committed
153

154 155
        def get_connection_string(self):
            return "tcp://{}:{}".format(self.addr, self.port)
Martin Prudek's avatar
Martin Prudek committed
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173

    SOCKET_TYPE_MAP = {
        "REQ": zmq.REQ,
        "REP": zmq.REP,
        "DEALER": zmq.DEALER,
        "ROUTER": zmq.ROUTER,
        "PUB": zmq.PUB,
        "SUB": zmq.SUB,
        "PUSH": zmq.PUSH,
        "PULL": zmq.PULL,
        "PAIR": zmq.PAIR,
    }

    DIRECTIONS = [
        "connect",
        "bind",
    ]

174
    def __init__(self, context, socktype, direction, addr, port, ipv6):
175
        """ Adds socket configuruation. List
Martin Prudek's avatar
Martin Prudek committed
176 177 178 179 180
        of all connection is stored for further checking of duplicate
        connections.
        """
        self.check_params_validity(socktype, direction, addr, port)

181 182 183
        self.socktype = SockConfig.SOCKET_TYPE_MAP[socktype]
        self.direction = direction

Martin Prudek's avatar
Martin Prudek committed
184 185 186
        zmq_connection = self.ZMQConnection(addr, port)
        self.connections = list()
        self.connections.append(zmq_connection)
187 188
        self.context = context
        self.socket = None
189
        self.ipv6 = ipv6
Martin Prudek's avatar
Martin Prudek committed
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220

    def add_connection(self, socktype, direction, addr, port):
        """ Adds another ZMQ connection to an existing ZMQ socket.
        """
        self.check_params_validity(socktype, direction, addr, port)

        if self.socktype != SockConfig.SOCKET_TYPE_MAP[socktype]:
            raise SockConfigError("Socket type does not match")

        if self.direction == "bind" or direction == "bind":
            raise SockConfigError("Socket direction mismatch")

        for con in self.connections:
            if con.addr == addr and con.port == port:
                raise SockConfigError("Creating duplicate connection")

        zmq_connection = self.ZMQConnection(addr, port)
        self.connections.append(zmq_connection)

    def check_params_validity(self, socktype, direction, addr, port):
        """ Checks whether all the params are present and ZMQ-compliant
        """
        if not socktype:
            raise SockConfigError("Missing socket type")
        if not direction:
            raise SockConfigError("Missing socket direction")
        if not addr:
            raise SockConfigError("Missing address")
        if not port:
            raise SockConfigError("Missing port")

221
        if socktype not in SockConfig.SOCKET_TYPE_MAP:
Martin Prudek's avatar
Martin Prudek committed
222 223
            raise SockConfigError("Unknown socket option", socktype)

224
        if direction not in SockConfig.DIRECTIONS:
Martin Prudek's avatar
Martin Prudek committed
225 226
            raise SockConfigError("Unknown direction option", direction)

227 228
        if int(port) < 1 or int(port) > 65535:
            raise SockConfigError("Port number out of range", port)
229

230
    def is_type(self, socktype):
231 232 233 234 235 236 237
        """ Checks whether the socket type of this socket is equal to
        'socktype' string argument.
        """
        return (
            socktype in SockConfig.SOCKET_TYPE_MAP
            and self.socktype == SockConfig.SOCKET_TYPE_MAP[socktype]
        )
238 239

    def connect(self):
240 241 242
        """ Connects or binds unconnected/unbound zmq socket. An exception
        is risen when the socket is already connected.
        """
243 244
        if not self.socket:
            self.socket = self.context.socket(self.socktype)
245
            self.socket.ipv6 = self.ipv6
Martin Prudek's avatar
Martin Prudek committed
246

247 248 249 250 251
            for zmq_connection in self.connections:
                if self.direction == "bind":
                    self.socket.bind(zmq_connection.connection)
                elif self.direction == "connect":
                    self.socket.connect(zmq_connection.connection)
252 253
                else:
                    raise SockConfigError("Wrong socket direction")
254 255
        else:
            raise SockConfigError("Socket already connected")