Commit 4ab28cb4 authored by Daniel Salzman's avatar Daniel Salzman

func-tests: change zone type from dict to list

parent 13d3d811
......@@ -10,7 +10,6 @@ import dns.message
import dns.query
import dns.update
from subprocess import Popen, PIPE, DEVNULL, check_call
from dnstest.utils import *
import dnstest.params as params
import dnstest.keys
......@@ -84,14 +83,17 @@ class BindConf(object):
class Zone(object):
'''DNS zone description'''
def __init__(self, name, filename, ddns=False):
self.name = name
self.filename = filename
def __init__(self, zone_file=None, ddns=False):
self.zfile = zone_file
self.master = None
self.slaves = set()
# ddns: True - ddns, False(master) - ixfrFromDiff, False(slave) - empty
# True: DDNS, False on master: ixfrFromDiff, False on slave: empty
self.ddns = ddns
@property
def name(self):
return self.zfile.name
class Server(object):
'''Specification of DNS server'''
......@@ -163,24 +165,29 @@ class Server(object):
return True
def zone_master(self, name, file, slave=None, ddns=False):
if name in self.zones:
if slave:
self.zones[name].slaves.add(slave)
def set_master(self, zone, slave=None, ddns=False):
'''Set the server as a master for the zone'''
if zone.name not in self.zones:
master_file = zone.clone(self.dir + "/master")
z = Zone(master_file, ddns)
self.zones[zone.name] = z
else:
z = Zone(name, file, ddns)
if slave:
z.slaves.add(slave)
self.zones[name] = z
z = self.zones[zone.name]
def zone_slave(self, name, file, master, ddns=False):
if name in self.zones:
if slave:
z.slaves.add(slave)
def set_slave(self, zone, master, ddns=False):
'''Set the server as a slave for the zone'''
if zone.name in self.zones:
raise Exception("Can't set zone %s as a slave" % name)
else:
slave_file = self.dir + "/__" + name + "slave"
z = Zone(name, slave_file, ddns)
z.master = master
self.zones[name] = z
slave_file = zone.clone(self.dir + "/slave", exists=False)
z = Zone(slave_file, ddns)
z.master = master
self.zones[zone.name] = z
def compile(self):
try:
......@@ -370,7 +377,7 @@ class Server(object):
_serial = 0
for t in range(20):
resp = self.dig(zone, "SOA", udp=True, tries=1)
resp = self.dig(zone.name, "SOA", udp=True, tries=1)
if resp.resp.rcode() == 0:
soa = str((resp.resp.answer[0]).to_rdataset())
_serial = int(soa.split()[5])
......@@ -381,8 +388,8 @@ class Server(object):
break
time.sleep(2)
else:
raise Exception("Can't get %s SOA%s from %s." % \
(zone, ">%i" % serial if serial else "", self.name))
raise Exception("Can't get %s SOA%s from %s." % (zone.name,
">%i" % serial if serial else "", self.name))
return _serial
......@@ -393,24 +400,11 @@ class Server(object):
def update(self, zone):
if len(zone) != 1:
raise Exception("One zone required.")
zname = list(zone.keys())[0]
key_params = self.tsig.key_params if self.tsig else dict()
return dnstest.update.Update(self, dns.update.Update(zname, **key_params))
def zone_update(self, zone_name, file_name):
# Add trailing dot if missing.
if zone_name[-1] != ".":
zone_name += "."
src_file = self.data_dir + file_name
dst_file = self.zones[zone_name].filename
try:
shutil.copyfile(src_file, dst_file)
except:
raise Exception("Can't use zone file %s" % src_file)
return dnstest.update.Update(self, dns.update.Update(zone[0].name,
**key_params))
class Bind(Server):
......@@ -472,7 +466,7 @@ class Bind(Server):
s.end()
keys = set() # Duplicy check.
for zone in self.zones:
for zone in sorted(self.zones):
z = self.zones[zone]
if z.master and z.master.tsig.name not in keys:
t = z.master.tsig
......@@ -490,10 +484,10 @@ class Bind(Server):
s.end()
keys.add(t.name)
for zone in self.zones:
for zone in sorted(self.zones):
z = self.zones[zone]
s.begin("zone", z.name)
s.item_str("file", z.filename)
s.item_str("file", z.zfile.path)
s.item("check-names", "warn")
if z.master:
s.item("type", "slave")
......@@ -609,7 +603,7 @@ class Knot(Server):
s.item_str("\"%s\" %s" % (t.name, t.alg), t.key)
keys = set() # Duplicy check.
for zone in self.zones:
for zone in sorted(self.zones):
z = self.zones[zone]
if z.master and z.master.tsig.name not in keys:
t = z.master.tsig
......@@ -630,7 +624,7 @@ class Knot(Server):
s.end()
servers = set() # Duplicity check.
for zone in self.zones:
for zone in sorted(self.zones):
z = self.zones[zone]
if z.master and z.master.name not in servers:
s.begin(z.master.name)
......@@ -659,10 +653,10 @@ class Knot(Server):
if self.dnssec_enable:
s.item_str("dnssec-keydir", self.keydir)
s.item("dnssec-enable", "on")
for zone in self.zones:
for zone in sorted(self.zones):
z = self.zones[zone]
s.begin(z.name)
s.item_str("file", z.filename)
s.item_str("file", z.zfile.path)
if z.master:
s.item("notify-in", z.master.name)
......
......@@ -7,11 +7,11 @@ import socket
import time
import dns.zone
import zone_generate
from dnstest.utils import *
import dnstest.params as params
import dnstest.server
import dnstest.keys
import dnstest.zonefile
class Test(object):
'''Specification of DNS test topology'''
......@@ -32,10 +32,6 @@ class Test(object):
self.out_dir = params.out_dir
self.data_dir = params.test_dir + "/data/"
self.zones_dir = self.out_dir + "/zones/"
try:
os.mkdir(self.zones_dir)
except:
raise Exception("Can't create directory %s" % self.zones_dir)
self.ip = ip if ip else random.choice([4, 6])
if self.ip not in [4, 6]:
......@@ -181,74 +177,52 @@ class Test(object):
def sleep(self, seconds):
time.sleep(seconds)
def zone(self, zone_name, file_name=None, exists=True):
# Add trailing dot if missing.
if zone_name[-1] != ".":
zone_name += "."
def zone(self, name, file_name=None, local=False, dnssec=None, serial=None,
exists=True):
if file_name:
src_file = self.data_dir + file_name
dst_file = self.zones_dir + file_name
else:
if zone_name == ".":
file_name = "rootzone.zone"
else:
file_name = zone_name + "zone"
zone = dnstest.zonefile.ZoneFile(self.zones_dir)
zone.set_name(name)
src_file = params.common_data_dir + '/' + file_name
dst_file = self.zones_dir + file_name
if local:
src_dir = self.data_dir
else:
src_dir = params.common_data_dir
try:
if exists is True:
shutil.copyfile(src_file, dst_file)
except:
raise Exception("Can't use zone file %s" % src_file)
zone.set_file(file_name=file_name, storage=src_dir, dnssec=dnssec,
exists=exists)
return {zone_name: dst_file}
return [zone]
def zone_rnd(self, number, dnssec=None, records=None):
zones = dict()
def zone_rnd(self, number, dnssec=None, records=None, serial=None):
zones = list()
# Generate unique zone names.
names = zone_generate.main(["-n", number]).split()
for name in names:
if dnssec == None:
sign = random.choice([True, False])
else:
sign = True if dnssec else False
serial = random.randint(1, 4294967295)
items = records if records else random.randint(1, 1000)
filename = self.zones_dir + name + ".rndzone"
try:
params = ["-i", serial, "-o", filename, name, items]
if sign:
params = ["-s"] + params
zone = zone_generate.main(params)
except OSError:
err("Can't create zone file %s" % filename)
zones[name + "."] = filename
zone = dnstest.zonefile.ZoneFile(self.zones_dir)
zone.set_name(name)
zone.gen_file(dnssec=dnssec, records=records, serial=serial)
zones.append(zone)
return zones
def link(self, zones, master, slave=None, ddns=False):
for zone in zones:
if master not in self.servers:
raise Exception("Uncovered server in test")
master.zone_master(zone, zones[zone], slave, ddns)
raise Exception("Server is out of testing scope")
master.set_master(zone, slave, ddns)
if slave:
if slave not in self.servers:
raise Exception("Uncovered server in test")
slave.zone_slave(zone, zones[zone], master, ddns)
raise Exception("Server is out of testing scope")
slave.set_slave(zone, master, ddns)
def xfr_diff(self, server1, server2, zones):
check_log("CHECK AXFR DIFF")
for zone in zones:
detail_log("Zone %s %s-%s:" % (zone, server1.name, server2.name))
z1 = dns.zone.from_xfr(server1.dig(zone, "AXFR").resp)
z2 = dns.zone.from_xfr(server2.dig(zone, "AXFR").resp)
detail_log("Zone %s %s-%s:" % (zone.name, server1.name, server2.name))
z1 = dns.zone.from_xfr(server1.dig(zone.name, "AXFR").resp)
z2 = dns.zone.from_xfr(server2.dig(zone.name, "AXFR").resp)
z1_keys = set(z1.nodes.keys())
z2_keys = set(z2.nodes.keys())
......
......@@ -40,7 +40,7 @@ class ZoneFile(object):
self.name = zone_generate.main(["-n", 1]).strip()
def set_file(self, file_name=None, storage=None, dnssec=None, serial=None,
empty=False):
exists=True):
'''Make a copy of an existing zone file. If no file name is specified,
the file name is constructed from the zone name (zname.zone).
The storage is a directory containg the zone file.'''
......@@ -54,7 +54,7 @@ class ZoneFile(object):
else:
src_file = os.path.join(storage, self.file_name)
if empty:
if not exists:
return
try:
......@@ -110,11 +110,12 @@ class ZoneFile(object):
detail_log(SEP)
def clone(self, file_dir, empty=False):
def clone(self, file_dir, exists=True):
'''Make a copy of the zone file'''
new = ZoneFile(file_dir)
new.set_name(self.name)
new.set_file(file_name=self.path, dnssec=self.dnssec,
serial=self.serial, empty=empty)
serial=self.serial,
exists=exists and os.path.isfile(self.path))
return new
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment