network.py 6.5 KB
Newer Older
1
import re
Martin Prudek's avatar
Martin Prudek committed
2

3
import zmq
4

5
from .argparser import get_arg_parser
6
from .exceptions import *
Martin Prudek's avatar
Martin Prudek committed
7 8


9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
class Resource:
    NAME = re.compile("[a-z0-9_-]+")
    ADDRESS = re.compile("[a-z0-9_-]+") #TODO Use something better
    DIRECTIONS = [
        "connect",
        "bind"
    ]
    SOCK_TYPES = [
        "REQ",
        "REP",
        "DEALER",
        "ROUTER",
        "PUB",
        "SUB",
        "PUSH",
        "PULL",
        "PAIR",
    ]
Martin Prudek's avatar
Martin Prudek committed
27

28 29 30
    def __init__(self, name, direction, sock_type, address, port):
        if not Resource.NAME.match(name):
            raise SockConfigError("Inadmissible characters in resource name")
31

32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
        if direction not in Resource.DIRECTIONS:
            raise SockConfigError("Inadmissible or empty value for direction (use connect or bind)")

        if sock_type not in Resource.SOCK_TYPES:
            raise SockConfigError("Inadmissible or empty socket type")

        if not Resource.ADDRESS.match(address) and address != "*":
            raise SockConfigError("Inadmissible characters in resource address")

        try:
            port_number = int(port)
        except ValueError:
            raise SockConfigError("Port must be a number")

        if port_number < 1 or port_number > 65535:
            raise SockConfigError("Port number is out of range (0-65535)")

        # This is a little bit higher logic
        if address == "*" and direction == "connect":
            raise SockConfigError("On '*' is only bind operation permitted")

        self.name = name
        self.direction = direction
        self.sock_type = sock_type
        self.address = address
        self.port = port_number
58 59


60 61
    def get_connection_string(self):
        return "tcp://{}:{}".format(self.address, self.port)
Martin Prudek's avatar
Martin Prudek committed
62 63


64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
    @classmethod
    def from_string(cls, arg):
        splitted = arg.split(",")

        if len(splitted) != 5:
            raise SockConfigError("Bad count of resource string items")

        return cls(*splitted)


    def __eq__(self, other):
        if self.name == other.name and \
               self.direction == other.direction and \
               self.sock_type == other.sock_type and \
               self.address == other.address and \
               self.port == other.port:
            return True

        return False


    def __ne__(self, other):
        return not self.__eq__(other)


class Socket:
Martin Prudek's avatar
Martin Prudek committed
90 91 92 93 94 95 96 97 98 99 100 101
    SOCKET_TYPE_MAP = {
        "REQ": zmq.REQ,
        "REP": zmq.REP,
        "DEALER": zmq.DEALER,
        "ROUTER": zmq.ROUTER,
        "PUB": zmq.PUB,
        "SUB": zmq.SUB,
        "PUSH": zmq.PUSH,
        "PULL": zmq.PULL,
        "PAIR": zmq.PAIR,
    }

102 103 104 105 106 107 108
    def __init__(self, name, **configuration):
        self.name = name
        self.resources = []
        self.my_type = None
        self.my_direction = None
        self.configuration = configuration
        self.setup_done = False
Martin Prudek's avatar
Martin Prudek committed
109 110


111
    def check_resource(self, resource):
112 113
        if self.name != resource.name:
            raise SockConfigError("Putting bad resource to socket")
114

115 116
        if not self.my_type:
            self.my_type = resource.sock_type
Martin Prudek's avatar
Martin Prudek committed
117

118 119
        if self.my_type != resource.sock_type:
            raise SockConfigError("New resource is different type than current Socket type")
Martin Prudek's avatar
Martin Prudek committed
120

121 122
        if not self.my_direction:
            self.my_direction = resource.direction
Martin Prudek's avatar
Martin Prudek committed
123

124 125
        if self.setup_done and self.my_direction == "bind":
            raise SockConfigError("Socket can have only one bind operation")
Martin Prudek's avatar
Martin Prudek committed
126

127 128
        if resource in self.resources:
            raise SockConfigError("Resource duplication")
Martin Prudek's avatar
Martin Prudek committed
129

130 131 132 133

    def add_resource(self, resource):
        self.check_resource(resource)

134
        self.resources.append(resource)
Martin Prudek's avatar
Martin Prudek committed
135

136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
        self.setup_done = True


    def build(self, ctx, name, sock_type=None):
        if self.name != name:
            raise SockConfigError("Name of requested resource is invalid")

        if sock_type and self.my_type != sock_type:
            raise SockConfigError("Unmatched socket type with requested one")

        socket = ctx.socket(Socket.SOCKET_TYPE_MAP[self.my_type])

        if self.my_direction == "bind":
            socket.bind(self.resources[0].get_connection_string())

        else:
            for resource in self.resources:
                socket.connect(resource.get_connection_string())

        self.configure(socket)

        return socket


    def configure(self, socket):
        if "ipv6" in self.configuration:
            socket.ipv6 = self.configuration["ipv6"]


class SN:
    """ This class serves as a container for all resources. This class provides
    an API-like interface for requesting ZMQ sockets based on available
    resources.
    """
170
    def __init__(self, ctx, argparser=None):
171 172 173 174
        ## Gather data
        self.context = ctx

        if argparser:
175
            self.args = argparser.parse_args()
176
        else:
177
            self.args = get_arg_parser().parse_args()
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211

        ## Build all necessary configuration
        self.build_global_configuration()
        self.parse_resources()
        self.build_sockets()


    def build_global_configuration(self):
        self.global_configuration = {
            "ipv6": not self.args.disable_ipv6,
        }


    def parse_resources(self):
        ## Currently I don't know why but resources is array of arrays
        self.resources = [ Resource.from_string(res[0]) for res in self.args.resource ]


    def build_sockets(self):
        self.sockets = {}

        for resource in self.resources:
            if resource.name not in self.sockets:
                self.sockets[resource.name] = Socket(resource.name, **self.global_configuration)

            self.sockets[resource.name].add_resource(resource)


    def get_socket(self, *requested_sockets):
        """ Gets multiple socket names in 'get_socket(name1, name2,...)'
        or 'get_socket((name1, TYPE1), name2, (name3,TYPE3),...)' or any of
        their combinations. Returns list of all available ZMQ sockets with the
        required names. Exception is risen when there is no socket with the
        desired name or when the socket is of another type.
212
        """
213 214 215 216 217 218 219 220
        ret = []

        for request in requested_sockets:
            if type(request) == tuple:
                sock_name, sock_type = request
            else:
                sock_name, sock_type = request, None

221
            if sock_name not in self.sockets:
222
                raise UndefinedSocketError("Requesting undefined socket")
223

224 225 226 227 228
            socket = self.sockets[sock_name]
            ret.append(socket.build(self.context, sock_name, sock_type))

        if len(ret) == 1:
            return ret[0]
229
        else:
230
            return ret