Commit 20e9e676 authored by Daniel Salzman's avatar Daniel Salzman

func-test: add dns quering using pythondns

parent d6412c1e
#!/usr/bin/env python3
import argparse, importlib, os, sys, tempfile, time
import argparse, importlib, os, sys, tempfile, time, traceback
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + "/tools")
from dnstest import log, err
import params
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--debug", help="enable exception traceback", action='store_true')
parser.add_argument("-d", "--debug", \
help="enable exception traceback on stdout", \
action='store_true')
args = parser.parse_args()
if args.debug:
params.debug = True
......@@ -18,6 +20,11 @@ fail_cnt = 0
log("Starting Knot test suite %s" % outs_dir)
def save_traceback(outdir):
file = open(params.out_dir + "/traceback", mode="w")
traceback.print_exc(file=file)
file.close()
for test in sorted(os.listdir("./" + tests_dir)):
test_dir = "./%s/%s" % (tests_dir, test)
if not os.path.isdir(test_dir):
......@@ -51,12 +58,14 @@ for test in sorted(os.listdir("./" + tests_dir)):
except Exception as exc:
params.err = True
params.errmsg = format(exc)
save_traceback(params.out_dir)
if params.debug:
traceback.print_exc(exc)
traceback.print_exc()
except BaseException as exc:
log("Interrupted")
save_traceback(params.out_dir)
if params.debug:
traceback.print_exc(exc)
traceback.print_exc()
exit(1)
# Stop servers if still running.
......
#!/usr/bin/env python3
'''Test for AXFR from Bind to Knot'''
import dnstest
t = dnstest.DnsTest(tsig=True)
master = t.server("knot")
slave = t.server("bind")
#zones = t.zone_rnd(2)
zones = t.zone("mysqb8.fluid", "mysqb8.fluid.zone")
t.link(zones, master, slave)
t.start()
master.zones_wait(zones)
slave.zones_wait(zones)
t.xfr_diff(master, slave, zones)
t.stop()
$ORIGIN example2.com.
$TTL 3600
@ SOA dns1.example2.com. hostmaster.example2.com. (
2010111213 ; serial
6h ; refresh
1h ; retry
1w ; expire
1d ) ; minimum
NS dns1
NS dns2
MX 10 mail
dns1 A 192.0.2.1
AAAA 2001:DB8::1
dns2 A 192.0.2.2
AAAA 2001:DB8::2
mail A 192.0.2.3
AAAA 2001:DB8::3
#!/usr/bin/env python3
import dnstest
t = dnstest.DnsTest()
master1 = t.server("knot", nsid="nsid", ident=True, version="Knot XXX")
master2 = t.server("bind", ident="ahoj", version="xx")
slave = t.server("bind", nsid="0xabcd")
z1 = t.zone("example.com.", "example.com.zone")
z2 = t.zone("example2.com.", "example2.com.zone")
z3 = t.zone_rnd(5)
t.link(z1, master1, slave, ddns=True)
t.link(z2, master2, slave)
t.link(z3, master2, slave)
t.link(z3, slave, master1)
t.start()
t.end()
......@@ -9,6 +9,9 @@ import socket
import string
import sys
import time
import dns.message
import dns.query
import dns.tsigkeyring
from subprocess import Popen, PIPE, DEVNULL, check_call
import zone_generate, params
......@@ -71,9 +74,19 @@ class Tsig(object):
self.alg = alg if alg else random.choice(list(Tsig.algs.keys()))
self.key = base64.b64encode(os.urandom(Tsig.algs[self.alg])).decode('ascii')
self.key = base64.b64encode(os.urandom(Tsig.algs[self.alg])). \
decode('ascii')
self.tsig = self.alg + ":" + self.name + ":" + self.key
# TSIG preparation for pythondns utils.
if self.alg == "hmac-md5":
alg = "hmac-md5.sig-alg.reg.int"
else:
alg = self.alg
key = dns.tsigkeyring.from_text({
self.name: self.key
})
self.key_params = dict(keyname=self.name, keyalgorithm=alg, keyring=key)
def dump(self, filename):
s = BindConf()
......@@ -169,6 +182,7 @@ class DnsServer(object):
START_WAIT_VALGRIND = 5
STOP_TIMEOUT = 60
COMPILE_TIMEOUT = 60
DIG_TIMEOUT = 5
# Instance counter.
count = 0
......@@ -277,8 +291,8 @@ class DnsServer(object):
if self.compile_params:
self.compile()
self.proc = Popen(self.valgrind + [self.daemon_bin] + self.start_params,
stdout=fout, stderr=ferr)
self.proc = Popen(self.valgrind + [self.daemon_bin] + \
self.start_params, stdout=fout, stderr=ferr)
if self.valgrind:
time.sleep(DnsServer.START_WAIT_VALGRIND)
......@@ -322,12 +336,12 @@ class DnsServer(object):
lost_line = re.search("lost:", line)
if lost_line:
lost += int(line[lost_line.end():].lstrip().\
lost += int(line[lost_line.end():].lstrip(). \
split(" ")[0].replace(",", ""))
reach_line = re.search("reachable:", line)
if reach_line:
reachable += int(line[reach_line.end():].lstrip().\
reachable += int(line[reach_line.end():].lstrip(). \
split(" ")[0].replace(",", ""))
f.close()
......@@ -350,6 +364,52 @@ class DnsServer(object):
f.write(self.get_config())
f.close
def dig(self, rname, rtype, rclass="IN", use_udp=None, serial=None, \
timeout=DIG_TIMEOUT):
key_params = self.tsig.key_params if self.tsig else dict()
try:
if rtype.upper() == "AXFR":
# Always use TCP.
resp = dns.query.xfr(self.addr, rname, rtype, rclass, \
port=self.port, lifetime=timeout, \
use_udp=False, **key_params)
elif rtype.upper() == "IXFR":
# Use TCP if not specified.
use_udp = use_udp if use_udp != None else False
resp = dns.query.xfr(self.addr, rname, rtype, rclass, \
port=self.port, lifetime=timeout, \
use_udp=use_udp, serial=serial, \
**key_params)
else:
# Use TCP or UDP at random if not specified.
use_udp = use_udp if use_udp != None else \
random.choice([True, False])
query = dns.message.make_query(rname, rtype, rclass)
if use_udp:
resp = dns.query.udp(query, self.addr, port=self.port, \
timeout=timeout)
else:
resp = dns.query.tcp(query, self.addr, port=self.port, \
timeout=timeout)
return resp
except:
return None
def zones_wait(self, zones):
for zone in zones:
for attempt in range(10):
resp = self.dig(zone, "SOA", use_udp=True)
if resp and resp.answer:
break
time.sleep(DnsServer.DIG_TIMEOUT)
else:
raise Exception("Can't get %s SOA from %s." % \
(zone, self.name))
class Bind(DnsServer):
def __init__(self):
......@@ -686,7 +746,8 @@ class DnsTest(object):
DnsTest.last_port = port
return port
def server(self, server, nsid=None, ident=None, version=None, valgrind=None):
def server(self, server, nsid=None, ident=None, version=None, \
valgrind=None):
if server == "knot":
srv = Knot()
elif server == "bind":
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment