More sockets can be got from one get_socket call

A few more changes:
	no need to init argparser explicitely
	better exception formating
	sn.parse() now returns all arguments
	it is possible to specify enforced socket type
parent 5b184268
...@@ -12,15 +12,20 @@ Instalation and test instructions: ...@@ -12,15 +12,20 @@ Instalation and test instructions:
3) Install the package 3) Install the package
All the dependencies are installed automatically. All the dependencies are installed automatically.
Using tarball:
pip install sn-<version>.tar.gz pip install sn-<version>.tar.gz
(or)Cloned from git:
pip install sn/ (where 'sn' is cloned folder containing 'setup.py')
4) Test it 4) Test it
tar xzf sn-<version>.tar.gz Using tarball:
cd sn-<version>/tests/ tar xzf sn-<version>.tar.gz
./run_example.sh cd sn-<version>/tests/
./run_example.sh
Package build - produces sn-<version>.tar.gz: (or)Cloned from git:
cd sn/tests
./run_example.sh
Package build (tarball creation)- produces sn-<version>.tar.gz:
python setup.py sdist python setup.py sdist
......
...@@ -26,6 +26,9 @@ def parse_msg(data): ...@@ -26,6 +26,9 @@ def parse_msg(data):
def encode_msg(msg_type, data): def encode_msg(msg_type, data):
""" Gets string message type and its's string data. Then, both of them are
packed to be prepared for zmg.send_multipart().
"""
b = bytes(msg_type, encoding="UTF-8") b = bytes(msg_type, encoding="UTF-8")
msg = msgpack.packb(data) msg = msgpack.packb(data)
...@@ -33,14 +36,15 @@ def encode_msg(msg_type, data): ...@@ -33,14 +36,15 @@ def encode_msg(msg_type, data):
def get_arg_parser(): def get_arg_parser():
""" Creates own arguments parser and return it as an object.
"""
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--resource', nargs=1, action='append') parser.add_argument('--resource', nargs=1, action='append')
return parser return parser
def parse(aparser): def parse(aparser):
args = aparser.parse_args() return aparser.parse_args()
return args.resource
def resource_parser(config_list): def resource_parser(config_list):
...@@ -50,9 +54,9 @@ def resource_parser(config_list): ...@@ -50,9 +54,9 @@ def resource_parser(config_list):
{name:[connection1, connection2,...]} as each ZMQ socket can handle {name:[connection1, connection2,...]} as each ZMQ socket can handle
multiple connections. Each connection is a namedtuple. multiple connections. Each connection is a namedtuple.
""" """
Connection_t = namedtuple( Connection = namedtuple(
'Connection_t', 'Connection',
'direction sock_type address port' ['direction', 'sock_type', 'address', 'port']
) )
resources = dict() resources = dict()
for config in config_list: for config in config_list:
...@@ -61,9 +65,9 @@ def resource_parser(config_list): ...@@ -61,9 +65,9 @@ def resource_parser(config_list):
if len(splitted) == 5: if len(splitted) == 5:
if not splitted[0] in resources: if not splitted[0] in resources:
resources[splitted[0]] = list() resources[splitted[0]] = list()
resources[splitted[0]].append(Connection_t(*splitted[1:])) resources[splitted[0]].append(Connection(*splitted[1:]))
else: else:
raise SockConfigError("Invalid resource: " + config) raise SockConfigError("Resource {arg} is invalid.".format(arg=config))
return resources return resources
...@@ -76,18 +80,19 @@ class Resources: ...@@ -76,18 +80,19 @@ class Resources:
an API-like interface for requesting ZMQ sockets based on available an API-like interface for requesting ZMQ sockets based on available
resources. resources.
""" """
def __init__(self, ctx, resources): def __init__(self, ctx, args=None):
self.context = ctx
""" Gets a list of command line arguments - each for one socket """ Gets a list of command line arguments - each for one socket
connection and creates a dict of ZMQ socket configs. connection and creates a dict of ZMQ socket configs.
""" """
if not args:
args = get_arg_parser().parse_args()
self.context = ctx
self.sock_configs = dict() self.sock_configs = dict()
res_avail = resource_parser(resources) res_avail = resource_parser(args.resource)
for res in res_avail: for res in res_avail:
sc = None sc = None
for connection in res_avail[res]: for connection in res_avail[res]:
print("connection="+str(connection))
if sc: if sc:
sc.add_connection( sc.add_connection(
connection.sock_type, connection.sock_type,
...@@ -105,13 +110,25 @@ class Resources: ...@@ -105,13 +110,25 @@ class Resources:
) )
self.sock_configs[res] = sc self.sock_configs[res] = sc
def get_socket(self, name): def get_socket(self, *args):
if name in self.sock_configs: ret = list()
if not self.sock_configs[name].socket: for arg in args:
self.sock_configs[name].connect() if type(arg) == tuple:
return self.sock_configs[name].socket name = arg[0]
else:
name = arg
if name in self.sock_configs:
if type(arg) == tuple and not self.sock_configs[name].isType(arg[1]):
raise SockConfigError("Socket type does not match required value!")
if not self.sock_configs[name].socket:
self.sock_configs[name].connect()
ret.append(self.sock_configs[name].socket)
else:
raise SockConfigError("Resource {arg} not provided.".format(arg=name))
if len(ret) == 1:
return ret[0]
else: else:
raise SockConfigError("Resource not provided: " + name) return ret
class SockConfig: class SockConfig:
...@@ -143,11 +160,10 @@ class SockConfig: ...@@ -143,11 +160,10 @@ class SockConfig:
] ]
def __init__(self, context, socktype, direction, addr, port): def __init__(self, context, socktype, direction, addr, port):
""" Initilizes ZMQ Context, Socket and its first connection. List """ Adds socket configuruation. List
of all connection is stored for further checking of duplicate of all connection is stored for further checking of duplicate
connections. connections.
""" """
print("addr="+str(addr)+", port="+str(port))
self.check_params_validity(socktype, direction, addr, port) self.check_params_validity(socktype, direction, addr, port)
zmq_connection = self.ZMQConnection(addr, port) zmq_connection = self.ZMQConnection(addr, port)
...@@ -198,6 +214,8 @@ class SockConfig: ...@@ -198,6 +214,8 @@ class SockConfig:
if int(port) < 1 or int(port) > 65535: if int(port) < 1 or int(port) > 65535:
raise SockConfigError("Port number out of range", port) raise SockConfigError("Port number out of range", port)
def isType(self, socktype):
return (socktype in SockConfig.SOCKET_TYPE_MAP and self.socktype == SockConfig.SOCKET_TYPE_MAP[socktype])
def connect(self): def connect(self):
if not self.socket: if not self.socket:
...@@ -209,5 +227,7 @@ class SockConfig: ...@@ -209,5 +227,7 @@ class SockConfig:
self.socket.bind(zmq_connection.connection) self.socket.bind(zmq_connection.connection)
elif self.direction == "connect": elif self.direction == "connect":
self.socket.connect(zmq_connection.connection) self.socket.connect(zmq_connection.connection)
else:
raise SockConfigError("Wrong socket direction")
else: else:
raise SockConfigError("Socket already connected") raise SockConfigError("Socket already connected")
...@@ -7,14 +7,15 @@ import time ...@@ -7,14 +7,15 @@ import time
from random import randint from random import randint
aparser = sn.get_arg_parser()
args = sn.parse(aparser)
print(args)
ctx = zmq.Context.instance() ctx = zmq.Context.instance()
sctx = sn.Resources(ctx, args) sctx = sn.Resources(ctx)
sock_cli = sctx.get_socket("sock_cli") # Resources are passed using internal argument parser:
# Socket "sock_cli" is enforced to be "REQ" type:
sock_cli = sctx.get_socket(("sock_cli","REQ"))
# Some work:
rand_ID = randint(10,99) rand_ID = randint(10,99)
print("client ID (randomly generated)="+str(rand_ID)) print("client ID (randomly generated)="+str(rand_ID))
...@@ -22,12 +23,8 @@ for request in range(1, 4): ...@@ -22,12 +23,8 @@ for request in range(1, 4):
message = randint(100, 999) message = randint(100, 999)
print("(Client " + str(rand_ID) + "): Sending request["+str(message)+"]") print("(Client " + str(rand_ID) + "): Sending request["+str(message)+"]")
#sock_cli.send(str(rand_ID) + ":" + str(message))
sock_cli.send_multipart(sn.encode_msg("sn/test", str(rand_ID) + ":" + str(message))) sock_cli.send_multipart(sn.encode_msg("sn/test", str(rand_ID) + ":" + str(message)))
msg_type, message = sn.parse_msg(sock_cli.recv_multipart()) msg_type, message = sn.parse_msg(sock_cli.recv_multipart())
#message = sock_cli.recv()
message = message.split(":") message = message.split(":")
print("(Client " + str(rand_ID) + "): Received reply[" + message[1] + "] from server " + message[0]) print("(Client " + str(rand_ID) + "): Received reply[" + message[1] + "] from server " + message[0])
......
...@@ -3,10 +3,17 @@ ...@@ -3,10 +3,17 @@
import sys import sys
import sn import sn
import time import time
import zmq
from random import randint from random import randint
(sock_cli, sock_cli2) = sn.socket_builder(("sock_cli", "sock_cli2"), sys.argv[1:]) # Custom argument parser is created here
aparser = sn.get_arg_parser()
args = sn.parse(aparser)
ctx = zmq.Context.instance()
sctx = sn.Resources(ctx, args)
sock_cli, sock_cli2 = sctx.get_socket("sock_cli", ("sock_cli2", "REQ"))
rand_ID = randint(10,99) rand_ID = randint(10,99)
print("client ID (randomly generated)="+str(rand_ID)) print("client ID (randomly generated)="+str(rand_ID))
...@@ -15,12 +22,8 @@ for request in range(1, 4): ...@@ -15,12 +22,8 @@ for request in range(1, 4):
message = randint(100, 999) message = randint(100, 999)
print("(Client " + str(rand_ID) + "): Sending request["+str(message)+"]") print("(Client " + str(rand_ID) + "): Sending request["+str(message)+"]")
#sock_cli.send(str(rand_ID) + ":" + str(message))
sock_cli.send_multipart(sn.encode_msg("sn/test", str(rand_ID) + ":" + str(message))) sock_cli.send_multipart(sn.encode_msg("sn/test", str(rand_ID) + ":" + str(message)))
msg_type, message = sn.parse_msg(sock_cli.recv_multipart()) msg_type, message = sn.parse_msg(sock_cli.recv_multipart())
#message = sock_cli.recv()
message = message.split(":") message = message.split(":")
print("(Client " + str(rand_ID) + "): Received reply[" + message[1] + "] from server " + message[0]) print("(Client " + str(rand_ID) + "): Received reply[" + message[1] + "] from server " + message[0])
...@@ -28,14 +31,9 @@ for request in range(1, 4): ...@@ -28,14 +31,9 @@ for request in range(1, 4):
message = randint(100, 999) message = randint(100, 999)
print("(Client " + str(rand_ID) + "): Sending request["+str(message)+"]") print("(Client " + str(rand_ID) + "): Sending request["+str(message)+"]")
#sock_cli.send(str(rand_ID) + ":" + str(message))
sock_cli2.send_multipart(sn.encode_msg("sn/test", str(rand_ID) + ":" + str(message))) sock_cli2.send_multipart(sn.encode_msg("sn/test", str(rand_ID) + ":" + str(message)))
msg_type, message = sn.parse_msg(sock_cli2.recv_multipart()) msg_type, message = sn.parse_msg(sock_cli2.recv_multipart())
#message = sock_cli.recv()
message = message.split(":") message = message.split(":")
print("(Client " + str(rand_ID) + "): Received reply[" + message[1] + "] from server " + message[0]) print("(Client " + str(rand_ID) + "): Received reply[" + message[1] + "] from server " + message[0])
time.sleep(1) time.sleep(1)
...@@ -2,9 +2,11 @@ ...@@ -2,9 +2,11 @@
xterm -geometry 93x20+100+200 -hold -e ./client.py \ xterm -geometry 93x20+100+200 -hold -e ./client.py \
--resource "sock_cli,connect,REQ,127.0.0.1,9000" \ --resource "sock_cli,connect,REQ,127.0.0.1,9000" \
--resource "sock_cli,connect,REQ,127.0.0.1,9001" & --resource "sock_cli,connect,REQ,127.0.0.1,9001" &
#xterm -geometry 93x20+100+500 -hold -e ./client2.py "sock_cli,connect,REQ,127.0.0.1,9000" "sock_cli2,connect,REQ,127.0.0.1,9000" & xterm -geometry 93x20+100+500 -hold -e ./client2.py \
--resource "sock_cli,connect,REQ,127.0.0.1,9000" \
--resource "sock_cli2,connect,REQ,127.0.0.1,9000" &
xterm -geometry 93x20+100+800 -hold -e ./client.py \ xterm -geometry 93x20+100+800 -hold -e ./client.py \
--resource "sock_cli,connect,REQ,127.0.0.1,9000" & --resource "sock_cli,connect,REQ,127.0.0.1,9001" &
xterm -geometry 93x31+650+200 -hold -e ./server.py \ xterm -geometry 93x31+650+200 -hold -e ./server.py \
......
...@@ -7,21 +7,18 @@ from random import randint ...@@ -7,21 +7,18 @@ from random import randint
import sn import sn
aparser = sn.get_arg_parser()
args = sn.parse(aparser)
print(args)
ctx = zmq.Context.instance() ctx = zmq.Context.instance()
sctx = sn.Resources(ctx, args) # Resources are passed using internal argument parser:
sctx = sn.Resources(ctx)
sock_srv = sctx.get_socket("sock_srv") sock_srv = sctx.get_socket("sock_srv")
# Some work:
rand_ID = randint(10,99) rand_ID = randint(10,99)
print("server ID (randomly generated)="+str(rand_ID)) print("server ID (randomly generated)="+str(rand_ID))
while True: while True:
# Wait for next request from client # Wait for next request from client
msg_type, message = sn.parse_msg(sock_srv.recv_multipart()) msg_type, message = sn.parse_msg(sock_srv.recv_multipart())
#message = sock_srv.recv()
message = message.split(":") message = message.split(":")
print("(Server " + str(rand_ID) + "): Received request[" + message[1] print("(Server " + str(rand_ID) + "): Received request[" + message[1]
+ "] from client " + message[0]) + "] from client " + message[0])
...@@ -29,4 +26,3 @@ while True: ...@@ -29,4 +26,3 @@ while True:
sock_srv.send_multipart(sn.encode_msg("sn/test", str(rand_ID) + ":" sock_srv.send_multipart(sn.encode_msg("sn/test", str(rand_ID) + ":"
+ str(message[1]))) + str(message[1])))
#sock_srv.send(str(rand_ID) + ":" + str(message[1]))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment