network.py 6.92 KB
Newer Older
1
import re
2
import socket
Martin Prudek's avatar
Martin Prudek committed
3

4
import zmq
5

6
from .argparser import get_arg_parser
7
from .exceptions import *
Martin Prudek's avatar
Martin Prudek committed
8 9


10 11
class Resource:
    NAME = re.compile("[a-z0-9_-]+")
12
    SIMPLE_ADDRESS = re.compile("[a-z0-9_-]+")
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
    DIRECTIONS = [
        "connect",
        "bind"
    ]
    SOCK_TYPES = [
        "REQ",
        "REP",
        "DEALER",
        "ROUTER",
        "PUB",
        "SUB",
        "PUSH",
        "PULL",
        "PAIR",
    ]
Martin Prudek's avatar
Martin Prudek committed
28

29 30 31
    def __init__(self, name, direction, sock_type, address, port):
        if not Resource.NAME.match(name):
            raise SockConfigError("Inadmissible characters in resource name")
32

33 34 35 36 37 38
        if direction not in Resource.DIRECTIONS:
            raise SockConfigError("Inadmissible or empty value for direction (use connect or bind)")

        if sock_type not in Resource.SOCK_TYPES:
            raise SockConfigError("Inadmissible or empty socket type")

39
        if not self.check_address(address):
40 41 42 43 44 45 46 47 48 49 50
            raise SockConfigError("Inadmissible characters in resource address")

        try:
            port_number = int(port)
        except ValueError:
            raise SockConfigError("Port must be a number")

        if port_number < 1 or port_number > 65535:
            raise SockConfigError("Port number is out of range (0-65535)")

        # This is a little bit higher logic
51
        if address == "*" and direction != "bind":
52 53 54 55 56 57 58
            raise SockConfigError("On '*' is only bind operation permitted")

        self.name = name
        self.direction = direction
        self.sock_type = sock_type
        self.address = address
        self.port = port_number
59 60


61 62 63 64 65 66 67 68 69 70 71 72 73
    def check_address(self, address):
        try:
            if socket.inet_pton(socket.AF_INET, address):
                return True
        except OSError:
            pass

        try:
            if socket.inet_pton(socket.AF_INET6, address.strip("[]")):
                return True
        except OSError:
            pass

74 75 76 77 78 79
        if address == "*":
            return True

        if Resource.SIMPLE_ADDRESS.match(address):
            return True

80 81 82
        return False


83 84
    def get_connection_string(self):
        return "tcp://{}:{}".format(self.address, self.port)
Martin Prudek's avatar
Martin Prudek committed
85 86


87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
    @classmethod
    def from_string(cls, arg):
        splitted = arg.split(",")

        if len(splitted) != 5:
            raise SockConfigError("Bad count of resource string items")

        return cls(*splitted)


    def __eq__(self, other):
        if self.name == other.name and \
               self.direction == other.direction and \
               self.sock_type == other.sock_type and \
               self.address == other.address and \
               self.port == other.port:
            return True

        return False


    def __ne__(self, other):
        return not self.__eq__(other)


class Socket:
Martin Prudek's avatar
Martin Prudek committed
113 114 115 116 117 118 119 120 121 122 123 124
    SOCKET_TYPE_MAP = {
        "REQ": zmq.REQ,
        "REP": zmq.REP,
        "DEALER": zmq.DEALER,
        "ROUTER": zmq.ROUTER,
        "PUB": zmq.PUB,
        "SUB": zmq.SUB,
        "PUSH": zmq.PUSH,
        "PULL": zmq.PULL,
        "PAIR": zmq.PAIR,
    }

125 126 127 128 129 130 131
    def __init__(self, name, **configuration):
        self.name = name
        self.resources = []
        self.my_type = None
        self.my_direction = None
        self.configuration = configuration
        self.setup_done = False
Martin Prudek's avatar
Martin Prudek committed
132 133


134
    def check_resource(self, resource):
135 136
        if self.name != resource.name:
            raise SockConfigError("Putting bad resource to socket")
137

138 139
        if not self.my_type:
            self.my_type = resource.sock_type
Martin Prudek's avatar
Martin Prudek committed
140

141 142
        if self.my_type != resource.sock_type:
            raise SockConfigError("New resource is different type than current Socket type")
Martin Prudek's avatar
Martin Prudek committed
143

144 145
        if not self.my_direction:
            self.my_direction = resource.direction
Martin Prudek's avatar
Martin Prudek committed
146

147 148
        if self.setup_done and self.my_direction == "bind":
            raise SockConfigError("Socket can have only one bind operation")
Martin Prudek's avatar
Martin Prudek committed
149

150 151
        if resource in self.resources:
            raise SockConfigError("Resource duplication")
Martin Prudek's avatar
Martin Prudek committed
152

153 154 155 156

    def add_resource(self, resource):
        self.check_resource(resource)

157
        self.resources.append(resource)
Martin Prudek's avatar
Martin Prudek committed
158

159 160 161 162 163 164 165 166 167 168 169
        self.setup_done = True


    def build(self, ctx, name, sock_type=None):
        if self.name != name:
            raise SockConfigError("Name of requested resource is invalid")

        if sock_type and self.my_type != sock_type:
            raise SockConfigError("Unmatched socket type with requested one")

        socket = ctx.socket(Socket.SOCKET_TYPE_MAP[self.my_type])
170
        self.configure(socket)
171 172 173 174 175 176 177 178 179 180 181 182 183 184

        if self.my_direction == "bind":
            socket.bind(self.resources[0].get_connection_string())

        else:
            for resource in self.resources:
                socket.connect(resource.get_connection_string())

        return socket


    def configure(self, socket):
        if "ipv6" in self.configuration:
            socket.ipv6 = self.configuration["ipv6"]
185
        socket.setsockopt(zmq.LINGER, 1*1000)  # In msec
186 187 188 189 190 191 192


class SN:
    """ This class serves as a container for all resources. This class provides
    an API-like interface for requesting ZMQ sockets based on available
    resources.
    """
193
    def __init__(self, ctx, argparser=None):
194 195 196 197
        ## Gather data
        self.context = ctx

        if argparser:
198
            self.args = argparser.parse_args()
199
        else:
200
            self.args = get_arg_parser().parse_args()
201 202 203 204 205 206 207 208 209 210 211 212 213 214

        ## Build all necessary configuration
        self.build_global_configuration()
        self.parse_resources()
        self.build_sockets()


    def build_global_configuration(self):
        self.global_configuration = {
            "ipv6": not self.args.disable_ipv6,
        }


    def parse_resources(self):
215
        self.resources = [ Resource.from_string(res) for res in self.args.resource ]
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233


    def build_sockets(self):
        self.sockets = {}

        for resource in self.resources:
            if resource.name not in self.sockets:
                self.sockets[resource.name] = Socket(resource.name, **self.global_configuration)

            self.sockets[resource.name].add_resource(resource)


    def get_socket(self, *requested_sockets):
        """ Gets multiple socket names in 'get_socket(name1, name2,...)'
        or 'get_socket((name1, TYPE1), name2, (name3,TYPE3),...)' or any of
        their combinations. Returns list of all available ZMQ sockets with the
        required names. Exception is risen when there is no socket with the
        desired name or when the socket is of another type.
234
        """
235 236 237 238 239 240 241 242
        ret = []

        for request in requested_sockets:
            if type(request) == tuple:
                sock_name, sock_type = request
            else:
                sock_name, sock_type = request, None

243
            if sock_name not in self.sockets:
244
                raise UndefinedSocketError("Requesting undefined socket")
245

246 247 248 249 250
            socket = self.sockets[sock_name]
            ret.append(socket.build(self.context, sock_name, sock_type))

        if len(ret) == 1:
            return ret[0]
251
        else:
252
            return ret