network.py 7.41 KB
Newer Older
Martin Prudek's avatar
Martin Prudek committed
1 2
import zmq

3 4
from collections import namedtuple

5 6
from .argparser import get_arg_parser, parse

7 8
class SentinelError(Exception):
    pass
Martin Prudek's avatar
Martin Prudek committed
9

10

11
class InvalidMsgError(SentinelError):
Martin Prudek's avatar
Martin Prudek committed
12 13
    pass

14

15 16
class SockConfigError(Exception):
    pass
Martin Prudek's avatar
Martin Prudek committed
17 18 19 20 21 22 23


def resource_parser(config_list):
    """ Gets a tuple of command line arguments - each for one socket connection
    in the form {sockname,[conn/bind],SOCK_TYPE,IP,PORT}.
    Returns a dictionary filled with zmq socket configs in the form
    {name:[connection1, connection2,...]} as each ZMQ socket can handle
24
    multiple connections. Each connection is a namedtuple.
Martin Prudek's avatar
Martin Prudek committed
25
    """
26 27 28
    Connection = namedtuple(
            'Connection',
            ['direction', 'sock_type', 'address', 'port']
29
    )
Martin Prudek's avatar
Martin Prudek committed
30 31
    resources = dict()
    for config in config_list:
32
        config = config[0]
Martin Prudek's avatar
Martin Prudek committed
33 34 35 36
        splitted = config.split(",")
        if len(splitted) == 5:
            if not splitted[0] in resources:
                resources[splitted[0]] = list()
37
            resources[splitted[0]].append(Connection(*splitted[1:]))
Martin Prudek's avatar
Martin Prudek committed
38
        else:
39
            raise SockConfigError("Resource {} is invalid.".format(config))
Martin Prudek's avatar
Martin Prudek committed
40 41 42
    return resources


43
class SN:
44 45 46 47
    """ This class serves as a container for all resources. This class provides
    an API-like interface for requesting ZMQ sockets based on available
    resources.
    """
48
    def __init__(self, ctx, args=get_arg_parser().parse_args()):
49 50 51
        """ Gets a list of command line arguments - each for one socket
        connection and creates a dict of ZMQ socket configs.
        """
52
        self.context = ctx
53
        self.sock_configs = dict()
54
        res_avail = resource_parser(args.resource)
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71

        for res in res_avail:
            sc = None
            for connection in res_avail[res]:
                if sc:
                    sc.add_connection(
                        connection.sock_type,
                        connection.direction,
                        connection.address,
                        connection.port
                    )
                else:
                    sc = SockConfig(
                        self.context,
                        connection.sock_type,
                        connection.direction,
                        connection.address,
72 73
                        connection.port,
                        ipv6=not args.disable_ipv6
74 75 76
                    )
            self.sock_configs[res] = sc

77 78 79 80 81 82 83
    def get_socket(self, *sockets):
        """ Gets multiple socket names in 'get_socket(name1, name2,...)'
        or 'get_socket((name1, TYPE1), name2, (name3,TYPE3),...)' or any of
        their combinations. Returns list of all available ZMQ sockets with the
        required names. Exception is risen when there is no socket with the
        desired name or when the socket is of another type.
        """
84
        ret = list()
85 86 87
        for socket in sockets:
            if type(socket) == tuple:
                sock_name = socket[0]
88
            else:
89 90 91 92
                sock_name = socket
            if sock_name in self.sock_configs:
                if (
                    type(socket) == tuple
93
                    and not self.sock_configs[sock_name].is_type(socket[1])
94
                ):
95
                    raise SockConfigError("Socket type does not match required value!")
96 97 98
                if not self.sock_configs[sock_name].socket:
                    self.sock_configs[sock_name].connect()
                ret.append(self.sock_configs[sock_name].socket)
99
            else:
100
                raise SockConfigError("Resource {} not provided.".format(sock_name))
101 102
        if len(ret) == 1:
            return ret[0]
103
        else:
104
            return ret
105 106


Martin Prudek's avatar
Martin Prudek committed
107 108 109 110 111 112
class SockConfig:
    # a ZMQ feature: one socket can have a multiple connections
    class ZMQConnection:
        def __init__(self, addr, port):
            self.addr = addr
            self.port = port
113
            self.connection = self.get_connection_string()
Martin Prudek's avatar
Martin Prudek committed
114

115 116
        def get_connection_string(self):
            return "tcp://{}:{}".format(self.addr, self.port)
Martin Prudek's avatar
Martin Prudek committed
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134

    SOCKET_TYPE_MAP = {
        "REQ": zmq.REQ,
        "REP": zmq.REP,
        "DEALER": zmq.DEALER,
        "ROUTER": zmq.ROUTER,
        "PUB": zmq.PUB,
        "SUB": zmq.SUB,
        "PUSH": zmq.PUSH,
        "PULL": zmq.PULL,
        "PAIR": zmq.PAIR,
    }

    DIRECTIONS = [
        "connect",
        "bind",
    ]

135
    def __init__(self, context, socktype, direction, addr, port, ipv6):
136
        """ Adds socket configuruation. List
Martin Prudek's avatar
Martin Prudek committed
137 138 139 140 141
        of all connection is stored for further checking of duplicate
        connections.
        """
        self.check_params_validity(socktype, direction, addr, port)

142 143 144
        self.socktype = SockConfig.SOCKET_TYPE_MAP[socktype]
        self.direction = direction

Martin Prudek's avatar
Martin Prudek committed
145 146 147
        zmq_connection = self.ZMQConnection(addr, port)
        self.connections = list()
        self.connections.append(zmq_connection)
148 149
        self.context = context
        self.socket = None
150
        self.ipv6 = ipv6
Martin Prudek's avatar
Martin Prudek committed
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181

    def add_connection(self, socktype, direction, addr, port):
        """ Adds another ZMQ connection to an existing ZMQ socket.
        """
        self.check_params_validity(socktype, direction, addr, port)

        if self.socktype != SockConfig.SOCKET_TYPE_MAP[socktype]:
            raise SockConfigError("Socket type does not match")

        if self.direction == "bind" or direction == "bind":
            raise SockConfigError("Socket direction mismatch")

        for con in self.connections:
            if con.addr == addr and con.port == port:
                raise SockConfigError("Creating duplicate connection")

        zmq_connection = self.ZMQConnection(addr, port)
        self.connections.append(zmq_connection)

    def check_params_validity(self, socktype, direction, addr, port):
        """ Checks whether all the params are present and ZMQ-compliant
        """
        if not socktype:
            raise SockConfigError("Missing socket type")
        if not direction:
            raise SockConfigError("Missing socket direction")
        if not addr:
            raise SockConfigError("Missing address")
        if not port:
            raise SockConfigError("Missing port")

182
        if socktype not in SockConfig.SOCKET_TYPE_MAP:
Martin Prudek's avatar
Martin Prudek committed
183 184
            raise SockConfigError("Unknown socket option", socktype)

185
        if direction not in SockConfig.DIRECTIONS:
Martin Prudek's avatar
Martin Prudek committed
186 187
            raise SockConfigError("Unknown direction option", direction)

188 189
        if int(port) < 1 or int(port) > 65535:
            raise SockConfigError("Port number out of range", port)
190

191
    def is_type(self, socktype):
192 193 194 195 196 197 198
        """ Checks whether the socket type of this socket is equal to
        'socktype' string argument.
        """
        return (
            socktype in SockConfig.SOCKET_TYPE_MAP
            and self.socktype == SockConfig.SOCKET_TYPE_MAP[socktype]
        )
199 200

    def connect(self):
201 202 203
        """ Connects or binds unconnected/unbound zmq socket. An exception
        is risen when the socket is already connected.
        """
204 205
        if not self.socket:
            self.socket = self.context.socket(self.socktype)
206
            self.socket.ipv6 = self.ipv6
Martin Prudek's avatar
Martin Prudek committed
207

208 209 210 211 212
            for zmq_connection in self.connections:
                if self.direction == "bind":
                    self.socket.bind(zmq_connection.connection)
                elif self.direction == "connect":
                    self.socket.connect(zmq_connection.connection)
213 214
                else:
                    raise SockConfigError("Wrong socket direction")
215 216
        else:
            raise SockConfigError("Socket already connected")