Commit 6defebb9 authored by Daniel Salzman's avatar Daniel Salzman

func-test: initial commit

parent 9ab7f4c8
$ORIGIN .
$TTL 3601 ; 1 hour
example.com IN SOA dns1.example.com. hostmaster.example.com. (
2010111217 ; serial
21600 ; refresh (6 hours)
3600 ; retry (1 hour)
604800 ; expire (1 week)
86400 ; minimum (1 day)
)
NS dns1.example.com.
NS dns2.example.com.
MX 10 mail.example.com.
$ORIGIN example.com.
12345 A 1.2.3.5
dns1 A 192.0.2.1
AAAA 2001:db8::1
dns2 A 192.0.2.2
AAAA 2001:db8::2
mail A 192.0.2.3
AAAA 2001:db8::3
$ORIGIN example2.com.
$TTL 3600
@ SOA dns1.example2.com. hostmaster.example2.com. (
2010111213 ; serial
6h ; refresh
1h ; retry
1w ; expire
1d ) ; minimum
NS dns1
NS dns2
MX 10 mail
dns1 A 192.0.2.1
AAAA 2001:DB8::1
dns2 A 192.0.2.2
AAAA 2001:DB8::2
mail A 192.0.2.3
AAAA 2001:DB8::3
#!/usr/bin/env python3
import os, sys
test_dir = os.path.dirname(os.path.realpath(__file__))
sys.path.append(test_dir + "/../../tools")
from dnstest import *
################################################################################
t = DnsTest(test_dir, sys.argv[1])
master1 = t.server("knot", nsid="nsid", ident=True, version="Knot XXX")
master2 = t.server("knot", ident="ahoj")
slave = t.server("knot", ident="0xabcd")
#z1 = t.zone_rnd(10)
z1 = t.zone("example.com.", "example.com.zone")
z2 = t.zone("example2.com.", "example2.com.zone")
t.link(z1, master1, slave, ddns=True)
t.link(z2, master2, slave)
t.start()
t.stop()
'''
t.link(z2, master, slave2)
t.link(z2, master, slave1)
t.dig()
t.stop()
t.end()
'''
#!/usr/bin/env python3
import os, sys, tempfile, time
import subprocess
def run_test(test_file, out_dir):
return subprocess.call([test_file, out_dir])
test_cnt = 0
fail_cnt = 0
tests_dir = "./cases"
outs_dir = tempfile.mkdtemp(prefix="knottest-%s-" % int(time.time()))
print("Starting Knot test suite %s" % outs_dir)
for test in sorted(os.listdir(tests_dir)):
test_dir = tests_dir + "/" + test
if not os.path.isdir(test_dir):
continue
test_file = test_dir + "/test.py"
if not os.path.isfile(test_file):
print("Missing test file %s" % test_file)
continue
try:
out_dir = outs_dir + "/" + test
os.mkdir(out_dir)
print("Test %s: " % test, end="")
test_cnt = test_cnt + 1
if run_test(test_file, out_dir):
print("failed")
fail_cnt = fail_cnt + 1
else:
print("ok")
except (OSError, Exception) as err:
print(format(err))
exit(1)
except:
print("Unexpected error:", sys.exc_info()[0])
if fail_cnt:
print("Failed tests: %i/%i" % (fail_cnt, test_cnt))
exit(1)
else:
print("All tests passed")
exit(0)
#!/usr/bin/env python3
import base64
import os
import random
import shutil
import socket
import string
import sys
import time
from subprocess import Popen, PIPE
knot_vars = [
["KNOT_TEST_KNOT", "knotd"],
["KNOT_TEST_KNOTC", "knotc"]
]
bind_vars = [
["KNOT_TEST_BIND", "named"],
["KNOT_TEST_BINDC", "rndc"]
]
nsd_vars = [
["KNOT_TEST_NSD", "nsd"],
["KNOT_TEST_NSDC", "nsdc"]
]
class Tsig(object):
'''TSIG key generator'''
algs = {
"hmac-md5": 16,
"hmac-sha1": 20,
"hmac-sha224": 28,
"hmac-sha256": 32,
"hmac-sha384": 48,
"hmac-sha512": 64
}
vocabulary = string.ascii_uppercase + string.ascii_lowercase + \
string.digits
def __init__(self):
nlabels = random.randint(1, 10)
self.name = ""
for i in range(nlabels):
label_len = random.randint(1, 63)
# Check for maximal dname length (255 B = max fqdn in wire).
if len(self.name) + 1 + label_len >= 255:
break
# Add label separator.
if i > 0:
self.name += "."
self.name += "".join(random.choice(Tsig.vocabulary)
for x in range(label_len))
self.alg = random.choice(list(Tsig.algs.keys()))
self.key = base64.b64encode(os.urandom(Tsig.algs[self.alg])).decode('ascii')
self.tsig = self.alg + ":" + self.name + ":" + self.key
class KnotConf(object):
'''Knot server config generator'''
def __init__(self):
self.conf = ""
self.indent = ""
def sub(self):
self.indent += "\t"
def unsub(self):
self.indent = self.indent[:-1]
def begin(self, name):
self.conf += "%s%s {\n" % (self.indent, name)
self.sub()
def end(self):
self.unsub()
self.conf += "%s}\n" % (self.indent)
if not self.indent:
self.conf += "\n"
def item(self, name, value):
self.conf += "%s%s %s;\n" % (self.indent, name, value)
def item_str(self, name, value):
self.conf += "%s%s \"%s\";\n" % (self.indent, name, value)
class BindConf(object):
'''Bind server config generator'''
def __init__(self):
self.conf = ""
self.indent = ""
def sub(self):
self.indent += "\t"
def unsub(self):
self.indent = self.indent[:-1]
def begin(self, name, string=None):
if string:
self.conf += "%s%s \"%s\" {\n" % (self.indent, name, string)
else:
self.conf += "%s%s {\n" % (self.indent, name)
self.sub()
def end(self):
self.unsub()
self.conf += "%s};\n" % (self.indent)
if not self.indent:
self.conf += "\n"
def item(self, name, value=None):
if value:
self.conf += "%s%s %s;\n" % (self.indent, name, value)
else:
self.conf += "%s%s;\n" % (self.indent, name)
def item_str(self, name, value):
self.conf += "%s%s \"%s\";\n" % (self.indent, name, value)
class Zone(object):
''' DNS zone description'''
def __init__(self, name, filename, ddns=None):
self.name = name
self.filename = filename
self.master = None
self.slaves = set()
# ddns: True - ddns, False - ixfrFromDiff, None - empty
self.ddns = ddns
class DnsServer(object):
'''Specification of DNS server'''
START_WAIT = 1
STOP_TIMEOUT = 10
COMPILE_TIMEOUT = 60
# Instance counter.
count = 0
def __init__(self):
self.start_params = None
self.compile_params = None
self.run_prefix = None
self.nsid = None
self.ident = None
self.version = None
self.ip = None
self.addr = None
self.port = None
self.ctlport = None
self.tsig = None
self.zones = dict()
# Working directory.
self.dir = None
# Name of server instance.
self.name = None
self.fout = None
self.ferr = None
self.conffile = None
def zone_master(self, name, file, slave=None, ddns=False):
if name in self.zones:
self.zones[name].slaves.add(slave)
else:
z = Zone(name, file, ddns)
z.slaves.add(slave)
self.zones[name] = z
def zone_slave(self, name, file, master):
if name in self.zones:
raise Exception("Can't set zone %s as a slave" % name)
else:
slave_file = self.dir + "/" + name + "slave"
z = Zone(name, slave_file, ddns=None)
z.master = master
self.zones[name] = z
def find_binary(self, desc):
explicit = os.environ.get(desc[0])
if explicit:
path = shutil.which(explicit)
else:
path = shutil.which(desc[1])
if not path:
raise Exception("No binary found")
return path
def set_paths(self, vars):
self.daemon_bin = self.find_binary(vars[0])
self.control_bin = self.find_binary(vars[1])
def compile(self):
try:
p = Popen([self.control_bin] + self.compile_params,
stdout=self.fout_, stderr=self.ferr)
p.communicate(timeout=DnsServer.COMPILE_TIMEOUT)
except:
print("Compile error")
def start(self):
try:
self.fout = open(self.fout, mode="w")
self.ferr = open(self.ferr, mode="w")
if self.compile_params:
self.compile()
self.proc = Popen(self.run_prefix + [self.daemon_bin] + self.start_params,
stdout=self.fout, stderr=self.ferr)
time.sleep(DnsServer.START_WAIT)
except OSError:
print("fout error")
def stop(self):
try:
self.proc.terminate()
self.proc.wait(DnsServer.STOP_TIMEOUT)
except TimeoutError:
print("killing")
self.proc.kill()
self.fout.close()
self.ferr.close()
def gen_confile(self):
f = open(self.confile, "w")
f.write(self.get_config())
f.close
class Bind(DnsServer):
def __init__(self):
super().__init__()
super().set_paths(bind_vars)
def get_config(self):
s = BindConf()
s.begin("options")
s.item_str("directory", self.dir)
s.item_str("pid-file", "bind.pid")
if self.ip == 4:
s.item("listen-on port", "%i { %s; }" % (self.port, self.addr))
else:
s.item("listen-on-v6 port", "%i { %s; }" % (self.port, self.addr))
s.item("auth-nxdomain", "no")
s.item("recursion", "no")
s.end()
for zone in self.zones:
z = self.zones[zone]
s.begin("zone", z.name)
s.item_str("file", z.filename)
if z.master:
s.item("type", "slave")
else:
s.item("type", "master")
s.item("notify", "explicit")
if z.ddns == True:
pass
elif z.ddns == False:
s.item("ixfr-from-differences", "yes")
s.end()
self.start_params = ["-c", self.confile, "-g"]
return s.conf
class Knot(DnsServer):
def __init__(self):
super().__init__()
super().set_paths(knot_vars)
def _on_str_hex(self, conf, name, value):
if value == True:
conf.item(name, "on")
elif value:
if value[:2] == "0x":
conf.item(name, value)
else:
conf.item_str(name, value)
def get_config(self):
s = KnotConf()
s.begin("system")
self._on_str_hex(s, "identity", self.ident)
self._on_str_hex(s, "version", self.version)
self._on_str_hex(s, "nsid", self.nsid)
s.item_str("storage", self.dir)
s.item_str("rundir", self.dir)
s.end()
s.begin("control")
s.item_str("listen-on", "knot.sock")
s.end()
s.begin("interfaces")
if self.ip == 4:
s.begin("ipv4")
else:
s.begin("ipv6")
s.item("address", self.addr)
s.item("port", self.port)
s.end()
s.end()
s.begin("keys")
if self.tsig:
t = self.tsig
s.item_str("%s %s" % (t.name, t.alg), t.key)
# Duplicy check.
keys = set()
for zone in self.zones:
z = self.zones[zone]
if z.master and z.master.tsig and z.master.tsig.name not in keys:
t = z.master.tsig
s.item_str("%s %s" % (t.name, t.alg), t.key)
keys.add(t.name)
for slave in z.slaves:
if slave.tsig and slave.tsig.name not in keys:
t = slave.tsig
s.item_str("%s %s" % (t.name, t.alg), t.key)
keys.add(t.name)
s.end()
s.begin("remotes")
s.begin("local")
s.item("address", self.addr)
if self.tsig:
s.item("key", self.tsig.name)
s.end()
# Duplicity check.
servers = set()
for zone in self.zones:
z = self.zones[zone]
if z.master and z.master.name not in servers:
s.begin(z.master.name)
s.item("address", z.master.addr)
s.item("port", z.master.port)
if z.master.tsig:
s.item("key", z.master.tsig.name)
s.end()
servers.add(z.master.name)
for slave in z.slaves:
if slave.name not in servers:
s.begin(slave.name)
s.item("address", slave.addr)
s.item("port", slave.port)
if slave.tsig:
s.item("key", slave.tsig.name)
s.end()
servers.add(slave.name)
s.end()
s.begin("zones")
s.item("zonefile-sync", "5s")
s.item("notify-timeout", "5")
s.item("notify-retries", "5")
for zone in self.zones:
z = self.zones[zone]
s.begin(z.name)
s.item_str("file", z.filename)
if z.master:
s.item("notify-in", z.master.name)
s.item("xfr-in", z.master.name)
slaves = ""
if z.slaves:
for slave in z.slaves:
slaves += slave.name + " "
s.item("notify-out", slaves.strip())
s.item("xfr-out", "local")
if z.ddns == True:
s.item("update-in", "local")
elif z.ddns == False:
s.item("ixfr-from-differences", "on")
s.end()
s.end()
s.begin("log")
s.begin("stdout")
s.item("any", "all")
s.end()
s.begin("stderr")
s.end()
s.begin("syslog")
s.end()
s.end()
self.start_params = ["-c", self.confile]
return s.conf
class Nsd(DnsServer):
def __init__(self):
super().__init__()
super().set_paths(nsd_vars)
def get_config(self):
self.start_params = ["-c", self.confile, "-d"]
self.compile_params = ["-c", self.confile, "rebuild"]
def check_socket(ip, proto, addr, port, pid):
iface = "%i%s@%s:%i" % (ip, proto, addr, port)
proc = Popen(["lsof", "-t", "-i", iface],
stdout=PIPE, stderr=PIPE, universal_newlines=True)
(out, err) = proc.communicate()
# Create list of pids excluding last empty line.
pids = list(filter(None, out.split("\n")))
# Check for successful bind.
if str(pid) not in pids:
print("x")
return False
# More binded processes is not acceptable too.
if len(pids) > 1:
return False
return True
#print(check_socket(4, "tcp", "127.0.0.1", 5354, 7930))
class DnsTest(object):
'''Specification of DNS test topology'''
LOCAL_ADDR = {4: "127.0.0.1", 6: "::1"}
# Value of the last generated port.
last_port = None
def __init__(self, test_dir, out_dir, ip=None, tsig=None):
if not os.path.exists(out_dir):
raise Exception("Output directory doesn't exist")
self.out_dir = str(out_dir)
self.data_dir = str(test_dir) + "/data/"
self.zones_dir = self.out_dir + "/zones/"
try:
os.mkdir(self.zones_dir)
except:
raise Exception("Can't create directory %s" % self.zones_dir)
self.ip = ip if ip else random.choice([4, 6])
if self.ip not in [4, 6]:
raise Exception("Invalid IP version")
self.tsig = bool(tsig) if tsig else random.choice([True, False])
self.servers = set()
def _check_port(self, port):
if not port:
return False
proto = socket.AF_INET if self.ip == 4 else socket.AF_INET6
try:
s = socket.socket(proto, socket.SOCK_DGRAM)
s.bind((DnsTest.LOCAL_ADDR[self.ip], port))
s.close
s = socket.socket(proto, socket.SOCK_STREAM)
s.bind((DnsTest.LOCAL_ADDR[self.ip], port))
s.close
except:
return False
return True
def _gen_port(self):
min_port = 10000
max_port = 50000
port = DnsTest.last_port
if port:
port = port + 1 if port < max_port else min_port
while not self._check_port(port):
port = random.randint(min_port, max_port)
DnsTest.last_port = port
return port
def server(self, server, nsid=None, ident=None, version=None, prefix=None):
if server == "knot":
srv = Knot()
elif server == "bind":
srv = Bind()
elif server == "nsd":
srv = Nsd()
else:
raise Exception("Usupported server %s" % server)
type(srv).count += 1
if prefix:
srv.run_prefix = prefix
elif server == "knot":
srv.run_prefix = ["valgrind", "--leak-check=full"]
else:
srv.run_prefix = []
srv.nsid = nsid
srv.ident = ident
srv.version = version
srv.ip = self.ip
srv.addr = DnsTest.LOCAL_ADDR[self.ip]
srv.port = self._gen_port()
srv.ctlport = self._gen_port()
srv.tsig = Tsig() if self.tsig else None
srv.name = "%s%s" % (server, srv.count)
srv.dir = self.out_dir + "/" + srv.name
srv.fout = srv.dir + "/stdout"
srv.ferr = srv.dir + "/stderr"
srv.confile = srv.dir + "/%s.conf" % srv.name
try:
os.mkdir(srv.dir)
except:
raise Exception("Can't create directory %s" % srv.dir)
self.servers.add(srv)
return srv
def _generate_conf(self):
for server in self.servers:
server.gen_confile()
def start(self):
self._generate_conf()
def srv_sort(server):
masters = 0
for z in server.zones:
if server.zones[z].master: masters += 1
return masters
# Sort server list by number of masters. I.e. masters are prefered.
for server in sorted(self.servers, key=srv_sort):
server.start()
time.sleep(5)
def stop(self):
for server in self.servers:
server.stop()
def end(self):
pass
def zone(self, name, filename):
try:
src_file = self.data_dir + filename
dst_file = self.zones_dir + filename
shutil.copyfile(src_file, dst_file)
except:
raise Exception("Can't use zone file %s" % filename)
zone_name = name
if zone_name[-1] != ".":
zone_name += "."