Commit 9af26c60 authored by Daniel Salzman's avatar Daniel Salzman

func-test: split dnstest to submodules

parent 8770ca93
......@@ -3,7 +3,8 @@
import argparse, importlib, logging, os, re, sys, tempfile, time, traceback
current_dir = os.path.dirname(os.path.realpath(__file__))
sys.path.append(current_dir + "/tools")
import params, dnstest
import dnstest.params as params
import dnstest.utils
TESTS_DIR = "tests"
COMMON_DATA_DIR = "data"
......@@ -137,7 +138,7 @@ def main(args):
try:
importlib.import_module("%s.%s.%s.test" % (TESTS_DIR, test, case))
except dnstest.Skip as exc:
except dnstest.utils.Skip as exc:
log.error(" * case \'%s\':\tSKIPPED (%s)" % (case, format(exc)))
skip_cnt += 1
except Exception as exc:
......
......@@ -2,9 +2,9 @@
'''Test for AXFR from Bind to Knot'''
import dnstest
from dnstest.test import Test
t = dnstest.DnsTest()
t = Test()
master = t.server("bind")
slave = t.server("knot")
......
......@@ -2,9 +2,10 @@
'''Test for Knot clean-up after interruption of AXFR from Bind'''
import dnstest
from dnstest.test import Test
from dnstest.utils import *
t = dnstest.DnsTest()
t = Test()
master = t.server("bind")
slave = t.server("knot")
......@@ -15,7 +16,7 @@ t.link(zones, master, slave)
t.start()
t.sleep(2)
dnstest.check_log("Killing master %s" % master.name)
check_log("Killing master %s" % master.name)
master.proc.kill()
t.sleep(5)
......
......@@ -2,9 +2,9 @@
'''Test for AXFR from Knot to Bind'''
import dnstest
from dnstest.test import Test
t = dnstest.DnsTest()
t = Test()
master = t.server("knot")
slave = t.server("bind")
......
......@@ -2,9 +2,9 @@
'''Test for AXFR from Knot to Knot'''
import dnstest
from dnstest.test import Test
t = dnstest.DnsTest()
t = Test()
master = t.server("knot")
slave = t.server("knot")
......
......@@ -2,9 +2,9 @@
'''Test for header flags in response'''
import dnstest
from dnstest.test import Test
t = dnstest.DnsTest()
t = Test()
knot = t.server("knot")
bind = t.server("bind")
......
......@@ -2,9 +2,9 @@
'''Test for response rcodes'''
import dnstest
from dnstest.test import Test
t = dnstest.DnsTest()
t = Test()
knot = t.server("knot")
bind = t.server("bind")
......
......@@ -2,10 +2,10 @@
'''Test for server identification over CH/TXT'''
import dnstest
from dnstest.test import Test
import socket
t = dnstest.DnsTest()
t = Test()
name = "Knot DNS server name"
server1 = t.server("knot", ident=name)
......
......@@ -2,9 +2,9 @@
'''Test for server version over CH/TXT'''
import dnstest
from dnstest.test import Test
t = dnstest.DnsTest()
t = Test()
ver = "ver. 1.3.1-p3"
server1 = t.server("knot", version=ver)
......
......@@ -2,9 +2,9 @@
'''Test for DDNS forwarding'''
import dnstest
from dnstest.test import Test
t = dnstest.DnsTest()
t = Test()
master = t.server("bind")
slave = t.server("knot")
......
......@@ -2,9 +2,9 @@
'''Test for DDNS prerequisites'''
import dnstest
from dnstest.test import Test
t = dnstest.DnsTest()
t = Test()
srv = t.server("knot")
zone = t.zone("ddns.", "ddns.zone")
......
......@@ -4,17 +4,19 @@
Check if DNSKEY lifetime timestamps are proccessed correctly by Knot.
"""
import dnstest
import dns
import collections
import os
import re
import shutil
import sys
import dns
from dnstest.utils import *
from dnstest.test import Test
import dnstest.server
# patched Knot class, enabling DNSSEC
class DnssecEnabledKnot(dnstest.Knot):
class DnssecEnabledKnot(dnstest.server.Knot):
@property
def keydir(self):
return os.path.join(self.dir, "keys")
......@@ -66,17 +68,17 @@ def check_zone(server, expect_dnskey, expect_rrsig):
expect_dnskeys = 2 if expect_dnskey else 1
expect_rrsigs = 2 if expect_rrsig else 1
dnstest.detail_log("DNSKEYs: %d (expected %d) RRSIGs: %d (expected %d)" % (
detail_log("DNSKEYs: %d (expected %d) RRSIGs: %d (expected %d)" % (
found_dnskeys, expect_dnskeys, found_rrsigs, expect_rrsigs));
if found_dnskeys != expect_dnskeys or found_rrsigs != expect_rrsigs:
dnstest.err("Expectations do not match.")
dnstest.set_err("DNSKEYs not published and activated as expected.")
err("Expectations do not match.")
set_err("DNSKEYs not published and activated as expected.")
# Ugly Monkey patch
dnstest.Knot = DnssecEnabledKnot
dnstest.server.Knot = DnssecEnabledKnot
t = dnstest.DnsTest()
t = Test()
knot = t.server("knot")
zone = t.zone("example.com.", "example.com.zone")
......
......@@ -2,10 +2,10 @@
'''Test for EDNS0/NSID identification'''
import dnstest
from dnstest.test import Test
import socket
t = dnstest.DnsTest()
t = Test()
name = "Knot DNS server"
hex_name = "0x01020304"
......
......@@ -2,9 +2,9 @@
'''Test for EDNS0 UDP payload size'''
import dnstest
from dnstest.test import Test
t = dnstest.DnsTest()
t = Test()
knot = t.server("knot")
bind = t.server("bind")
......
......@@ -2,9 +2,9 @@
'''Test for support of obsolete records over XFR'''
import dnstest
from dnstest.test import Test
t = dnstest.DnsTest()
t = Test()
master = t.server("bind")
slave = t.server("knot")
......
......@@ -2,9 +2,9 @@
'''Test for dropping of out of zone records during reading of zone file'''
import dnstest
from dnstest.test import Test
t = dnstest.DnsTest()
t = Test()
master = t.server("knot")
slave = t.server("bind")
......
......@@ -2,9 +2,9 @@
'''Test for dropping of out of zone records during incoming XFR'''
import dnstest
from dnstest.test import Test
t = dnstest.DnsTest()
t = Test()
master = t.server("bind")
slave = t.server("knot")
......
......@@ -3,9 +3,9 @@
'''DNS packet header parsing tests. '''
import socket
import dnstest
from dnstest.test import Test
t = dnstest.DnsTest()
t = Test()
knot = t.server("knot")
zone = t.zone("example.com.")
t.link(zone, knot)
......
......@@ -3,9 +3,9 @@
'''DNS packet header parsing tests. '''
import shutil
from subprocess import check_call
import dnstest
from params import get_binary
from subprocess import check_call
from dnstest.test import Test
from dnstest.params import get_binary
# Find PROTOS binaries
protos_java_bin = get_binary("PROTOS_JAVA_BIN", "java")
......@@ -14,10 +14,10 @@ protos_zonetransfer_bin = get_binary("PROTOS_ZONETRANSFER_BIN", "c09-dns-zonetra
if not protos_java_bin:
raise Exception("Java not found, skipping test.")
t = dnstest.DnsTest(ip = 4) # PROTOS works on IPv4
t = Test(ip = 4) # PROTOS works on IPv4
master = t.server("dummy")
slave = t.server("knot")
zone = t.zone("protos.invalid.", exists=False)
zone = t.zone("protos.invalid.", exists=False)
t.link(zone, master, slave)
# Update configuration
......
__all__ = [
"keys",
"params",
"response",
"server",
"test",
"update",
"utils"
]
#!/usr/bin/env python3
import base64
import os
import random
import string
import dns.tsigkeyring
import dnstest.server
class Tsig(object):
'''TSIG key generator'''
algs = {
"hmac-md5": 16,
"hmac-sha1": 20,
"hmac-sha224": 28,
"hmac-sha256": 32,
"hmac-sha384": 48,
"hmac-sha512": 64
}
vocabulary = string.ascii_uppercase + string.ascii_lowercase + \
string.digits
def __init__(self, alg=None):
nlabels = random.randint(1, 10)
self.name = ""
for i in range(nlabels):
label_len = random.randint(1, 63)
# Check for maximal dname length (255 B = max fqdn in wire).
# 255 = 1 leading byte + 253 + 1 trailing byte.
if len(self.name) + 1 + label_len > 253:
break
# Add label separator.
if i > 0:
self.name += "."
self.name += "".join(random.choice(Tsig.vocabulary)
for x in range(label_len))
if alg and alg not in Tsig.algs:
raise Exception("Unsupported TSIG algorithm %s" % alg)
self.alg = alg if alg else random.choice(list(Tsig.algs.keys()))
self.key = base64.b64encode(os.urandom(Tsig.algs[self.alg])). \
decode('ascii')
# TSIG preparation for pythondns utils.
if self.alg == "hmac-md5":
alg = "hmac-md5.sig-alg.reg.int"
else:
alg = self.alg
key = dns.tsigkeyring.from_text({
self.name: self.key
})
self.key_params = dict(keyname=self.name, keyalgorithm=alg, keyring=key)
def dump(self, filename):
s = dnstest.server.BindConf()
s.begin("key", self.name)
s.item("algorithm", self.alg)
s.item_str("secret", self.key)
s.end()
file = open(filename, mode="w")
file.write(s.conf)
file.close()
#!/usr/bin/env python3
import binascii
import dns.name
from dnstest.utils import *
class Response(object):
'''Dig output context'''
def __init__(self, server, response, args):
self.resp = response
self.args = args
self.srv = server
self.rname = dns.name.from_text(self.args["rname"])
if type(self.args["rtype"]) is str:
self.rtype = dns.rdatatype.from_text(self.args["rtype"])
else:
self.rtype = self.args["rtype"]
if type(self.args["rclass"]) is str:
self.rclass = dns.rdataclass.from_text(self.args["rclass"])
else:
self.rclass = self.args["rclass"]
def _check_question(self):
question = self.resp.question.pop()
compare(question.name, self.rname, "question.name")
compare(question.rdclass, self.rclass, "question.class")
compare(question.rdtype, self.rtype, "question.type")
def _check_flags(self, flags, noflags):
flag_names = flags.split()
for flag in flag_names:
flag_val = dns.flags.from_text(flag)
isset(self.resp.flags & flag_val, "%s flag" % flag)
flag_names = noflags.split()
for flag in flag_names:
flag_val = dns.flags.from_text(flag)
isset(not(self.resp.flags & flag_val), "no %s flag" % flag)
def check(self, rdata=None, ttl=None, rcode="NOERROR", flags="", \
noflags=""):
'''Flags are text strings separated by whitespace character'''
self._check_flags(flags, noflags)
self._check_question()
# Check rcode.
if type(rcode) is str:
rc = dns.rcode.from_text(rcode)
else:
rc = rcode
compare(self.resp.rcode(), rc, "RCODE")
# Check rdata only if NOERROR.
if rc != 0 or rdata == None:
return
# We work with just one rdata with TTL=0 (this TTL is not used).
ref = list(dns.rdataset.from_text(self.rclass, self.rtype, 0, rdata))[0]
# Check answer section if contains reference rdata.
for data in self.resp.answer:
for rdata in data.to_rdataset():
# Compare Rdataset instances.
if rdata == ref:
# Check CLASS.
compare(data.rdclass, self.rclass, "CLASS")
# Check TYPE.
compare(data.rdtype, self.rtype, "TYPE")
# Check TTL if specified.
if ttl != None:
compare(data.ttl, int(ttl), "TTL")
return
else:
err("RDATA (" + str(rdata) + ") not in ANSWER section")
set_err("CHECK rdata")
def check_edns(self, nsid=None, buff_size=None):
compare(self.resp.edns, 0, "EDNS version")
options = 1 if nsid != None else 0
compare(len(self.resp.options), options, "number of EDNS0 options")
if options > 0:
option = list(self.resp.options)[0]
compare(option.otype, dns.edns.NSID, "option type")
if nsid[:2] == "0x":
compare(binascii.hexlify(option.data).decode('ascii'), \
nsid[2:], "hex NSID")
else:
compare(option.data.decode('ascii'), nsid, "txt NSID")
def diff(self, resp, flags=True, answer=True, authority=True, \
additional=False):
'''Compares specified response sections against another response'''
if flags:
compare(dns.flags.to_text(self.resp.flags), \
dns.flags.to_text(resp.resp.flags), "FLAGS")
compare(dns.flags.edns_to_text(self.resp.ednsflags), \
dns.flags.edns_to_text(resp.resp.ednsflags), "EDNS FLAGS")
if answer:
compare_sections(self.resp.answer, self.srv.name, \
resp.resp.answer, resp.srv.name, \
"ANSWER")
if authority:
compare_sections(self.resp.answer, self.srv.name, \
resp.resp.answer, resp.srv.name, \
"AUTHORITY")
if additional:
compare_sections(self.resp.answer, self.srv.name, \
resp.resp.answer, resp.srv.name, \
"ADDITIONAL")
def cmp(self, server, flags=True, answer=True, authority=True, \
additional=False):
'''Asks server for the same question an compares specified sections'''
resp = server.dig(**self.args)
self.diff(resp, flags, answer, authority, additional)
#!/usr/bin/env python3
import os
import random
import shutil
import socket
import time
import dns.zone
import zone_generate
from dnstest.utils import *
import dnstest.params as params
import dnstest.server
import dnstest.keys
class Test(object):
'''Specification of DNS test topology'''
MAX_START_TRIES = 10
LOCAL_ADDR = {4: "127.0.0.1", 6: "::1"}
# Value of the last generated port.
last_port = None
# Number of unsuccessful starts of servers. Recursion protection.
start_tries = 0
def __init__(self, ip=None, tsig=None):
if not os.path.exists(params.out_dir):
raise Exception("Output directory doesn't exist")
self.out_dir = params.out_dir
self.data_dir = params.test_dir + "/data/"
self.zones_dir = self.out_dir + "/zones/"
try:
os.mkdir(self.zones_dir)
except:
raise Exception("Can't create directory %s" % self.zones_dir)
self.ip = ip if ip else random.choice([4, 6])
if self.ip not in [4, 6]:
raise Exception("Invalid IP version")
self.tsig = bool(tsig) if tsig != None else random.choice([True, False])
self.servers = set()
dnstest.server.Knot.count = 0
dnstest.server.Bind.count = 0
dnstest.server.Nsd.count = 0
dnstest.server.Dummy.count = 0
def _check_port(self, port):
if not port:
return False
proto = socket.AF_INET if self.ip == 4 else socket.AF_INET6
try:
s = socket.socket(proto, socket.SOCK_DGRAM)
s.bind((Test.LOCAL_ADDR[self.ip], port))
s.close
s = socket.socket(proto, socket.SOCK_STREAM)
s.bind((Test.LOCAL_ADDR[self.ip], port))
s.close
except:
return False
return True
def _gen_port(self):
min_port = 10000
max_port = 50000
port = Test.last_port
if port:
port = port + 1 if port < max_port else min_port
while not self._check_port(port):
port = random.randint(min_port, max_port)
Test.last_port = port
return port
def server(self, server, nsid=None, ident=None, version=None, \
valgrind=None):
if server == "knot":
srv = dnstest.server.Knot()
elif server == "bind":
srv = dnstest.server.Bind()
elif server == "nsd":
srv = dnstest.server.Nsd()
elif server == "dummy":
srv = dnstest.server.Dummy()
else:
raise Exception("Usupported server %s" % server)
type(srv).count += 1
if params.valgrind_bin and \
(valgrind or (valgrind == None and server == "knot")):
srv.valgrind = [params.valgrind_bin, params.valgrind_flags]
srv.data_dir = self.data_dir
srv.nsid = nsid
srv.ident = ident
srv.version = version
srv.ip = self.ip
srv.addr = Test.LOCAL_ADDR[self.ip]
srv.tsig = dnstest.keys.Tsig() if self.tsig else None
srv.name = "%s%s" % (server, srv.count)
srv.dir = self.out_dir + "/" + srv.name
srv.fout = srv.dir + "/stdout"
srv.ferr = srv.dir + "/stderr"
srv.confile = srv.dir + "/%s.conf" % srv.name
try:
os.mkdir(srv.dir)
except:
raise Exception("Can't create directory %s" % srv.dir)
if srv.ctlkey:
srv.ctlkeyfile = srv.dir + "/%s.ctlkey" % srv.name
srv.ctlkey.dump(srv.ctlkeyfile)
self.servers.add(srv)
return srv
def _generate_conf(self):
# Next two loops can't be merged!
for server in self.servers:
server.port = self._gen_port()
server.ctlport = self._gen_port()
for server in self.servers:
server.gen_confile()
def start(self):
'''Start all test servers'''
if self.start_tries > Test.MAX_START_TRIES:
raise Exception("Can't start all servers")
self.start_tries += 1
self._generate_conf()
def srv_sort(server):
masters = 0
for z in server.zones:
if server.zones[z].master: masters += 1
return masters
# Sort server list by number of masters. I.e. masters are prefered.
for server in sorted(self.servers, key=srv_sort):
server.start()
if not server.running():
self.stop()
self.start()
params.test = self
self.start_tries = 0
def stop(self):
'''Stop all servers'''
for server in self.servers:
server.stop()
params.test = None
def end(self):
'''Finish testing'''
<