Commit 808d0f3d authored by Petr Špaček's avatar Petr Špaček

Deckard: PEP8 whitespace fixes

Cheap re-indentation using python-autopep8-1.2.1-3.fc25 with few manual
tweaks for very long lines.

This costs nothing and will avoid PEP8 complaints about whitespace in CI.
parent ed51c163
This diff is collapsed.
......@@ -5,6 +5,7 @@ import threading
dprint_lock = threading.Lock()
def dprint(tag, msg):
""" Verbose logging (if enabled). """
if 'VERBOSE' in os.environ:
......
This diff is collapsed.
......@@ -3,6 +3,7 @@ import os
import traceback
import time
class Test:
""" Small library to imitate CMocka output. """
......
......@@ -13,7 +13,8 @@ import struct
import binascii
from pydnstest.dprint import dprint
def recvfrom_msg(stream, raw = False):
def recvfrom_msg(stream, raw=False):
"""
Receive DNS message from TCP/UDP socket.
......@@ -27,7 +28,7 @@ def recvfrom_msg(stream, raw = False):
data = stream.recv(2)
if len(data) == 0:
return None, None
msg_len = struct.unpack_from("!H",data)[0]
msg_len = struct.unpack_from("!H", data)[0]
data = b""
received = 0
while received < msg_len:
......@@ -53,13 +54,14 @@ def sendto_msg(stream, message, addr=None):
else:
stream.sendto(message, addr)
elif stream.type & socket.SOCK_STREAM:
data = struct.pack("!H",len(message)) + message
data = struct.pack("!H", len(message)) + message
stream.send(data)
else:
assert False, "[sendto_msg]: unknown socket type '%i'" % stream.type
except: # Failure to respond is OK, resolver should recover
pass
def get_local_addr_str(family, iface):
""" Returns pattern string for localhost address """
if family == socket.AF_INET:
......@@ -70,13 +72,16 @@ def get_local_addr_str(family, iface):
raise NotImplementedError("[get_local_addr_str] family not supported '%i'" % family)
return addr_local_pattern.format(iface)
class AddrMapInfo:
""" Saves mapping info between adresses from rpl and cwrap adresses """
def __init__(self, family, local, external):
self.family = family
self.local = local
self.external = external
class TestServer:
""" This simulates UDP DNS server returning scripted or mirror DNS responses. """
......@@ -102,7 +107,7 @@ class TestServer:
if self.active is True:
self.stop()
def start(self, port = 53):
def start(self, port=53):
""" Synchronous start """
if self.active is True:
raise Exception('TestServer already started')
......@@ -126,7 +131,7 @@ class TestServer:
self.connections = []
self.scenario = None
def check_family (self, addr, family):
def check_family(self, addr, family):
""" Determines if address matches family """
test_addr = None
try:
......@@ -148,10 +153,10 @@ class TestServer:
if k == 'stub-addr':
kroot_addr = v
if kroot_addr is not None:
if self.check_family (kroot_addr, socket.AF_INET):
if self.check_family(kroot_addr, socket.AF_INET):
self.addr_family = socket.AF_INET
self.kroot_local = kroot_addr
elif self.check_family (kroot_addr, socket.AF_INET6):
elif self.check_family(kroot_addr, socket.AF_INET6):
self.addr_family = socket.AF_INET6
self.kroot_local = kroot_addr
else:
......@@ -160,10 +165,10 @@ class TestServer:
def address(self):
""" Returns opened sockets list """
addrlist = [];
addrlist = []
for s in self.srv_socks:
addrlist.append(s.getsockname());
return addrlist;
addrlist.append(s.getsockname())
return addrlist
def handle_query(self, client):
"""
......@@ -177,24 +182,23 @@ class TestServer:
query, addr = recvfrom_msg(client)
if query is None:
return False
dprint ("[ handle_query ]", "%s incoming query from %s\n%s" % (client_address, addr, query))
dprint("[ handle_query ]", "%s incoming query from %s\n%s" % (client_address, addr, query))
response = dns.message.make_response(query)
is_raw_data = False
if self.scenario is not None:
response, is_raw_data = self.scenario.reply(query, client_address)
if response:
if is_raw_data is False:
data_to_wire = response.to_wire(max_size = 65535)
dprint ("[ handle_query ]", "response\n%s" % response)
data_to_wire = response.to_wire(max_size=65535)
dprint("[ handle_query ]", "response\n%s" % response)
else:
data_to_wire = response
dprint ("[ handle_query ]", "raw response found")
dprint("[ handle_query ]", "raw response found")
else:
response = dns.message.make_response(query)
response.set_rcode(dns.rcode.SERVFAIL)
data_to_wire = response.to_wire()
dprint ("[ handle_query ]", "response failed, SERVFAIL")
dprint("[ handle_query ]", "response failed, SERVFAIL")
sendto_msg(client, data_to_wire, addr)
return True
......@@ -218,11 +222,12 @@ class TestServer:
sock.close()
self.connections.remove(sock)
else:
raise Exception("[query_io] Socket IO internal error {}, exit".format(sock.getsockname()))
raise Exception(
"[query_io] Socket IO internal error {}, exit".format(sock.getsockname()))
for sock in to_error:
raise Exception("[query_io] Socket IO error {}, exit".format(sock.getsockname()))
def start_srv(self, address = None, family = socket.AF_INET, proto = socket.IPPROTO_UDP):
def start_srv(self, address=None, family=socket.AF_INET, proto=socket.IPPROTO_UDP):
""" Starts listening thread if necessary """
if family == None:
family = socket.AF_INET
......@@ -236,7 +241,7 @@ class TestServer:
if address[0] is None:
address = (get_local_addr_str(family, self.default_iface), 53)
else:
raise Exception("[start_srv] unsupported protocol family {family}".format(family=family))
raise NotImplementedError("[start_srv] unsupported protocol family {0}".format(family))
if proto == None:
proto = socket.IPPROTO_UDP
......@@ -252,7 +257,9 @@ class TestServer:
self.thread.start()
for srv_sock in self.srv_socks:
if srv_sock.family == family and srv_sock.getsockname() == address and srv_sock.proto == proto:
if (srv_sock.family == family
and srv_sock.getsockname() == address
and srv_sock.proto == proto):
return srv_sock.getsockname()
sock = socket.socket(family, socktype, proto)
......@@ -278,11 +285,11 @@ if __name__ == '__main__':
CHILD_IFACE = 0
if "SOCKET_WRAPPER_DEFAULT_IFACE" in os.environ:
DEFAULT_IFACE = int(os.environ["SOCKET_WRAPPER_DEFAULT_IFACE"])
if DEFAULT_IFACE < 2 or DEFAULT_IFACE > 254 :
if DEFAULT_IFACE < 2 or DEFAULT_IFACE > 254:
DEFAULT_IFACE = 10
os.environ["SOCKET_WRAPPER_DEFAULT_IFACE"]="{}".format(DEFAULT_IFACE)
os.environ["SOCKET_WRAPPER_DEFAULT_IFACE"] = "{}".format(DEFAULT_IFACE)
# Mirror server
server = TestServer(None,None,DEFAULT_IFACE)
server = TestServer(None, None, DEFAULT_IFACE)
server.start()
print("[==========] Mirror server running at", server.address())
try:
......
......@@ -5,25 +5,26 @@ from distutils.core import setup
version = '0.1.2'
kwargs = {
'name' : 'pydnstest',
'version' : version,
'description' : 'DNS toolkit',
'long_description' : \
"""pydnstest is a DNS software testing library. It supports parsing and running Unbound-like test scenarios,
'name': 'pydnstest',
'version': version,
'description': 'DNS toolkit',
'long_description':
"""pydnstest is a DNS software testing library.
It supports parsing and running Unbound-like test scenarios,
and setting up a mock DNS server. It's based on dnspython.""",
'author' : 'Marek Vavrusa',
'author_email' : 'marek@vavrusa.com',
'license' : 'BSD',
'url' : 'https://github.com/CZ-NIC/deckard',
'packages' : ['pydnstest'],
'classifiers' : [
'author': 'Marek Vavrusa',
'author_email': 'marek@vavrusa.com',
'license': 'BSD',
'url': 'https://github.com/CZ-NIC/deckard',
'packages': ['pydnstest'],
'classifiers': [
"Intended Audience :: Developers",
"Intended Audience :: System Administrators",
"Programming Language :: Python",
"Topic :: Internet :: Name Service (DNS)",
"Topic :: Software Development :: Libraries :: Python Modules",
],
}
}
if sys.hexversion >= 0x02050000:
kwargs['requires'] = ['dns']
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment