Commit 1ff02d03 authored by Robin Obůrka's avatar Robin Obůrka

network: Improve IP/address check

parent 7b8def9a
import re import re
import socket
import zmq import zmq
...@@ -8,7 +9,7 @@ from .exceptions import * ...@@ -8,7 +9,7 @@ from .exceptions import *
class Resource: class Resource:
NAME = re.compile("[a-z0-9_-]+") NAME = re.compile("[a-z0-9_-]+")
ADDRESS = re.compile("[a-z0-9_-]+") #TODO Use something better SIMPLE_ADDRESS = re.compile("[a-z0-9_-]+")
DIRECTIONS = [ DIRECTIONS = [
"connect", "connect",
"bind" "bind"
...@@ -35,7 +36,7 @@ class Resource: ...@@ -35,7 +36,7 @@ class Resource:
if sock_type not in Resource.SOCK_TYPES: if sock_type not in Resource.SOCK_TYPES:
raise SockConfigError("Inadmissible or empty socket type") raise SockConfigError("Inadmissible or empty socket type")
if not Resource.ADDRESS.match(address) and address != "*": if not self.check_address(address):
raise SockConfigError("Inadmissible characters in resource address") raise SockConfigError("Inadmissible characters in resource address")
try: try:
...@@ -57,6 +58,28 @@ class Resource: ...@@ -57,6 +58,28 @@ class Resource:
self.port = port_number self.port = port_number
def check_address(self, address):
if address == "*":
return True
if Resource.SIMPLE_ADDRESS.match(address):
return True
try:
if socket.inet_pton(socket.AF_INET, address):
return True
except OSError:
pass
try:
if socket.inet_pton(socket.AF_INET6, address.strip("[]")):
return True
except OSError:
pass
return False
def get_connection_string(self): def get_connection_string(self):
return "tcp://{}:{}".format(self.address, self.port) return "tcp://{}:{}".format(self.address, self.port)
......
...@@ -51,6 +51,7 @@ def bad_resources_mock(request): ...@@ -51,6 +51,7 @@ def bad_resources_mock(request):
@pytest.fixture(params=[ @pytest.fixture(params=[
"--resource res,connect,PUSH,127.0.0.1,8800", "--resource res,connect,PUSH,127.0.0.1,8800",
"--resource res,connect,PUSH,setinel.turris.cz,8800", "--resource res,connect,PUSH,setinel.turris.cz,8800",
"--resource res,connect,PUSH,[::1],8800",
]) ])
def connect_resources_mock(request): def connect_resources_mock(request):
with patch("sys.argv", args_from_string(request.param)) as m: with patch("sys.argv", args_from_string(request.param)) as m:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment