network.py 6.56 KB
Newer Older
1
import re
Martin Prudek's avatar
Martin Prudek committed
2

3
import zmq
4

5
from .argparser import get_arg_parser
6
from .exceptions import *
Martin Prudek's avatar
Martin Prudek committed
7 8


9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
class Resource:
    NAME = re.compile("[a-z0-9_-]+")
    ADDRESS = re.compile("[a-z0-9_-]+") #TODO Use something better
    DIRECTIONS = [
        "connect",
        "bind"
    ]
    SOCK_TYPES = [
        "REQ",
        "REP",
        "DEALER",
        "ROUTER",
        "PUB",
        "SUB",
        "PUSH",
        "PULL",
        "PAIR",
    ]
Martin Prudek's avatar
Martin Prudek committed
27

28 29 30
    def __init__(self, name, direction, sock_type, address, port):
        if not Resource.NAME.match(name):
            raise SockConfigError("Inadmissible characters in resource name")
31

32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
        if direction not in Resource.DIRECTIONS:
            raise SockConfigError("Inadmissible or empty value for direction (use connect or bind)")

        if sock_type not in Resource.SOCK_TYPES:
            raise SockConfigError("Inadmissible or empty socket type")

        if not Resource.ADDRESS.match(address) and address != "*":
            raise SockConfigError("Inadmissible characters in resource address")

        try:
            port_number = int(port)
        except ValueError:
            raise SockConfigError("Port must be a number")

        if port_number < 1 or port_number > 65535:
            raise SockConfigError("Port number is out of range (0-65535)")

        # This is a little bit higher logic
        if address == "*" and direction == "connect":
            raise SockConfigError("On '*' is only bind operation permitted")

        self.name = name
        self.direction = direction
        self.sock_type = sock_type
        self.address = address
        self.port = port_number
58 59


60 61
    def get_connection_string(self):
        return "tcp://{}:{}".format(self.address, self.port)
Martin Prudek's avatar
Martin Prudek committed
62 63


64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
    @classmethod
    def from_string(cls, arg):
        splitted = arg.split(",")

        if len(splitted) != 5:
            raise SockConfigError("Bad count of resource string items")

        return cls(*splitted)


    def __eq__(self, other):
        if self.name == other.name and \
               self.direction == other.direction and \
               self.sock_type == other.sock_type and \
               self.address == other.address and \
               self.port == other.port:
            return True

        return False


    def __ne__(self, other):
        return not self.__eq__(other)


class Socket:
Martin Prudek's avatar
Martin Prudek committed
90 91 92 93 94 95 96 97 98 99 100 101
    SOCKET_TYPE_MAP = {
        "REQ": zmq.REQ,
        "REP": zmq.REP,
        "DEALER": zmq.DEALER,
        "ROUTER": zmq.ROUTER,
        "PUB": zmq.PUB,
        "SUB": zmq.SUB,
        "PUSH": zmq.PUSH,
        "PULL": zmq.PULL,
        "PAIR": zmq.PAIR,
    }

102 103 104 105 106 107 108
    def __init__(self, name, **configuration):
        self.name = name
        self.resources = []
        self.my_type = None
        self.my_direction = None
        self.configuration = configuration
        self.setup_done = False
Martin Prudek's avatar
Martin Prudek committed
109 110


111
    def check_resource(self, resource):
112 113
        if self.name != resource.name:
            raise SockConfigError("Putting bad resource to socket")
114

115 116
        if not self.my_type:
            self.my_type = resource.sock_type
Martin Prudek's avatar
Martin Prudek committed
117

118 119
        if self.my_type != resource.sock_type:
            raise SockConfigError("New resource is different type than current Socket type")
Martin Prudek's avatar
Martin Prudek committed
120

121 122
        if not self.my_direction:
            self.my_direction = resource.direction
Martin Prudek's avatar
Martin Prudek committed
123

124 125
        if self.setup_done and self.my_direction == "bind":
            raise SockConfigError("Socket can have only one bind operation")
Martin Prudek's avatar
Martin Prudek committed
126

127 128
        if resource in self.resources:
            raise SockConfigError("Resource duplication")
Martin Prudek's avatar
Martin Prudek committed
129

130 131 132 133

    def add_resource(self, resource):
        self.check_resource(resource)

134
        self.resources.append(resource)
Martin Prudek's avatar
Martin Prudek committed
135

136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
        self.setup_done = True


    def build(self, ctx, name, sock_type=None):
        if self.name != name:
            raise SockConfigError("Name of requested resource is invalid")

        if sock_type and self.my_type != sock_type:
            raise SockConfigError("Unmatched socket type with requested one")

        socket = ctx.socket(Socket.SOCKET_TYPE_MAP[self.my_type])

        if self.my_direction == "bind":
            socket.bind(self.resources[0].get_connection_string())

        else:
            for resource in self.resources:
                socket.connect(resource.get_connection_string())

        self.configure(socket)

        return socket


    def configure(self, socket):
        if "ipv6" in self.configuration:
            socket.ipv6 = self.configuration["ipv6"]
163
        socket.setsockopt(zmq.LINGER, 1*1000)  # In msec
164 165 166 167 168 169 170


class SN:
    """ This class serves as a container for all resources. This class provides
    an API-like interface for requesting ZMQ sockets based on available
    resources.
    """
171
    def __init__(self, ctx, argparser=None):
172 173 174 175
        ## Gather data
        self.context = ctx

        if argparser:
176
            self.args = argparser.parse_args()
177
        else:
178
            self.args = get_arg_parser().parse_args()
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212

        ## Build all necessary configuration
        self.build_global_configuration()
        self.parse_resources()
        self.build_sockets()


    def build_global_configuration(self):
        self.global_configuration = {
            "ipv6": not self.args.disable_ipv6,
        }


    def parse_resources(self):
        ## Currently I don't know why but resources is array of arrays
        self.resources = [ Resource.from_string(res[0]) for res in self.args.resource ]


    def build_sockets(self):
        self.sockets = {}

        for resource in self.resources:
            if resource.name not in self.sockets:
                self.sockets[resource.name] = Socket(resource.name, **self.global_configuration)

            self.sockets[resource.name].add_resource(resource)


    def get_socket(self, *requested_sockets):
        """ Gets multiple socket names in 'get_socket(name1, name2,...)'
        or 'get_socket((name1, TYPE1), name2, (name3,TYPE3),...)' or any of
        their combinations. Returns list of all available ZMQ sockets with the
        required names. Exception is risen when there is no socket with the
        desired name or when the socket is of another type.
213
        """
214 215 216 217 218 219 220 221
        ret = []

        for request in requested_sockets:
            if type(request) == tuple:
                sock_name, sock_type = request
            else:
                sock_name, sock_type = request, None

222
            if sock_name not in self.sockets:
223
                raise UndefinedSocketError("Requesting undefined socket")
224

225 226 227 228 229
            socket = self.sockets[sock_name]
            ret.append(socket.build(self.context, sock_name, sock_type))

        if len(ret) == 1:
            return ret[0]
230
        else:
231
            return ret