network.py 6.72 KB
Newer Older
1
import re
Martin Prudek's avatar
Martin Prudek committed
2

3
import zmq
4

Robin Obůrka's avatar
Robin Obůrka committed
5
from .argparser import get_arg_parser
6
from .exceptions import *
Martin Prudek's avatar
Martin Prudek committed
7 8


9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
class Resource:
    NAME = re.compile("[a-z0-9_-]+")
    ADDRESS = re.compile("[a-z0-9_-]+") #TODO Use something better
    DIRECTIONS = [
        "connect",
        "bind"
    ]
    SOCK_TYPES = [
        "REQ",
        "REP",
        "DEALER",
        "ROUTER",
        "PUB",
        "SUB",
        "PUSH",
        "PULL",
        "PAIR",
    ]
Martin Prudek's avatar
Martin Prudek committed
27

28 29 30
    def __init__(self, name, direction, sock_type, address, port):
        if not Resource.NAME.match(name):
            raise SockConfigError("Inadmissible characters in resource name")
31

32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
        if direction not in Resource.DIRECTIONS:
            raise SockConfigError("Inadmissible or empty value for direction (use connect or bind)")

        if sock_type not in Resource.SOCK_TYPES:
            raise SockConfigError("Inadmissible or empty socket type")

        if not Resource.ADDRESS.match(address) and address != "*":
            raise SockConfigError("Inadmissible characters in resource address")

        try:
            port_number = int(port)
        except ValueError:
            raise SockConfigError("Port must be a number")

        if port_number < 1 or port_number > 65535:
            raise SockConfigError("Port number is out of range (0-65535)")

        # This is a little bit higher logic
        if address == "*" and direction == "connect":
            raise SockConfigError("On '*' is only bind operation permitted")

        self.name = name
        self.direction = direction
        self.sock_type = sock_type
        self.address = address
        self.port = port_number
58 59


60 61
    def get_connection_string(self):
        return "tcp://{}:{}".format(self.address, self.port)
Martin Prudek's avatar
Martin Prudek committed
62 63


64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
    @classmethod
    def from_string(cls, arg):
        splitted = arg.split(",")

        if len(splitted) != 5:
            raise SockConfigError("Bad count of resource string items")

        return cls(*splitted)


    def __eq__(self, other):
        if self.name == other.name and \
               self.direction == other.direction and \
               self.sock_type == other.sock_type and \
               self.address == other.address and \
               self.port == other.port:
            return True

        return False


    def __ne__(self, other):
        return not self.__eq__(other)


class Socket:
Martin Prudek's avatar
Martin Prudek committed
90 91 92 93 94 95 96 97 98 99 100 101
    SOCKET_TYPE_MAP = {
        "REQ": zmq.REQ,
        "REP": zmq.REP,
        "DEALER": zmq.DEALER,
        "ROUTER": zmq.ROUTER,
        "PUB": zmq.PUB,
        "SUB": zmq.SUB,
        "PUSH": zmq.PUSH,
        "PULL": zmq.PULL,
        "PAIR": zmq.PAIR,
    }

102 103 104 105 106 107 108
    def __init__(self, name, **configuration):
        self.name = name
        self.resources = []
        self.my_type = None
        self.my_direction = None
        self.configuration = configuration
        self.setup_done = False
Martin Prudek's avatar
Martin Prudek committed
109 110


111
    def check_resource(self, resource):
112 113
        if self.name != resource.name:
            raise SockConfigError("Putting bad resource to socket")
Martin Prudek's avatar
Martin Prudek committed
114

115 116
        if not self.my_type:
            self.my_type = resource.sock_type
Martin Prudek's avatar
Martin Prudek committed
117

118 119
        if self.my_type != resource.sock_type:
            raise SockConfigError("New resource is different type than current Socket type")
Martin Prudek's avatar
Martin Prudek committed
120

121 122
        if not self.my_direction:
            self.my_direction = resource.direction
Martin Prudek's avatar
Martin Prudek committed
123

124 125
        if self.setup_done and self.my_direction == "bind":
            raise SockConfigError("Socket can have only one bind operation")
Martin Prudek's avatar
Martin Prudek committed
126

127 128
        if resource in self.resources:
            raise SockConfigError("Resource duplication")
Martin Prudek's avatar
Martin Prudek committed
129

130 131 132 133

    def add_resource(self, resource):
        self.check_resource(resource)

134
        self.resources.append(resource)
Martin Prudek's avatar
Martin Prudek committed
135

136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
        self.setup_done = True


    def build(self, ctx, name, sock_type=None):
        if self.name != name:
            raise SockConfigError("Name of requested resource is invalid")

        if sock_type and self.my_type != sock_type:
            raise SockConfigError("Unmatched socket type with requested one")

        socket = ctx.socket(Socket.SOCKET_TYPE_MAP[self.my_type])

        if self.my_direction == "bind":
            socket.bind(self.resources[0].get_connection_string())

        else:
            for resource in self.resources:
                socket.connect(resource.get_connection_string())

        self.configure(socket)

        return socket


    def configure(self, socket):
        if "ipv6" in self.configuration:
            socket.ipv6 = self.configuration["ipv6"]


class SN:
    """ This class serves as a container for all resources. This class provides
    an API-like interface for requesting ZMQ sockets based on available
    resources.
    """
    def __init__(self, ctx, argparser=None, **options):
        ## Gather data
        self.context = ctx

174 175 176 177 178
        ## This is small hack to make the whole module ready for black-box tests
        args_to_parse = None
        if "args" in options:
            args_to_parse = options["args"]

179
        if argparser:
180
            self.args = argparser.parse_args(args_to_parse)
181
        else:
182
            self.args = get_arg_parser().parse_args(args_to_parse)
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216

        ## Build all necessary configuration
        self.build_global_configuration()
        self.parse_resources()
        self.build_sockets()


    def build_global_configuration(self):
        self.global_configuration = {
            "ipv6": not self.args.disable_ipv6,
        }


    def parse_resources(self):
        ## Currently I don't know why but resources is array of arrays
        self.resources = [ Resource.from_string(res[0]) for res in self.args.resource ]


    def build_sockets(self):
        self.sockets = {}

        for resource in self.resources:
            if resource.name not in self.sockets:
                self.sockets[resource.name] = Socket(resource.name, **self.global_configuration)

            self.sockets[resource.name].add_resource(resource)


    def get_socket(self, *requested_sockets):
        """ Gets multiple socket names in 'get_socket(name1, name2,...)'
        or 'get_socket((name1, TYPE1), name2, (name3,TYPE3),...)' or any of
        their combinations. Returns list of all available ZMQ sockets with the
        required names. Exception is risen when there is no socket with the
        desired name or when the socket is of another type.
Martin Prudek's avatar
Martin Prudek committed
217
        """
218 219 220 221 222 223 224 225
        ret = []

        for request in requested_sockets:
            if type(request) == tuple:
                sock_name, sock_type = request
            else:
                sock_name, sock_type = request, None

226
            if sock_name not in self.sockets:
227
                raise UndefinedSocketError("Requesting undefined socket")
228

229 230 231 232 233
            socket = self.sockets[sock_name]
            ret.append(socket.build(self.context, sock_name, sock_type))

        if len(ret) == 1:
            return ret[0]
234
        else:
235
            return ret