Commit f355a274 authored by Jan Včelák's avatar Jan Včelák 🚀

tests-extra: rewrite RRL tests, cover all options

parent c56a2918
......@@ -2,7 +2,10 @@
'''Basic RRL functionality test'''
import dns.name
import dns.exception
import dns.message
import dns.query
import time
from dnstest.test import Test
from dnstest.utils import *
......@@ -10,33 +13,80 @@ from dnstest.utils import *
t = Test(stress=False)
knot = t.server("knot")
zone = t.zone("example.com.")
t.link(zone, knot)
# Enable RRL.
knot.ratelimit = 2
def send_queries(server, run_time=1.0, query_time=0.05):
"""
Send UDP queries to the server for certain time and get replies statistics.
"""
replied, truncated, dropped = 0, 0, 0
start = time.time()
while time.time() < start + run_time:
try:
query = dns.message.make_query("example.com", "SOA", want_dnssec=True)
response = dns.query.udp(query, server.addr, port=server.port, timeout=query_time)
except dns.exception.Timeout:
response = None
t.start()
if response is None:
dropped += 1
elif response.flags & dns.flags.TC:
truncated += 1
else:
replied += 1
return dict(replied=replied, truncated=truncated, dropped=dropped)
def rrl_result(name, stats, success):
detail_log("RRL %s" % name)
detail_log(", ".join(["%s %d" % (s, stats[s]) for s in ["replied", "truncated", "dropped"]]))
if success:
detail_log("success")
else:
detail_log("error")
set_err("RRL ERROR")
t.start()
knot.zone_wait(zone)
t.sleep(1)
tc_bit = False
#
# We cannot send queries in parallel. And we have to give the server some time
# to respond, especially under valgrind. Therefore we have to be tolerant when
# counting responses when packets are being dropped.
#
stats = send_queries(knot)
ok = stats["replied"] >= 100 and stats["truncated"] == 0 and stats["dropped"] == 0
rrl_result("RRL disabled", stats, ok)
knot.ratelimit = 5
knot.gen_confile()
knot.reload()
stats = send_queries(knot)
ok = stats["replied"] > 0 and stats["replied"] < 100 and stats["truncated"] >= 100 and stats["dropped"] == 0
rrl_result("RRL enabled, all slips", stats, ok)
time.sleep(5)
def have_flag(response, flag):
flag_val = dns.flags.from_text(flag)
return (response.resp.flags & flag_val) != 0
knot.ratelimit_slip = 0
knot.gen_confile()
knot.reload()
stats = send_queries(knot)
ok = stats["replied"] > 0 and stats["replied"] < 100 and stats["truncated"] == 0 and stats["dropped"] >= 5
rrl_result("RRL enabled, no slips", stats, ok)
for i in range(20):
resp = knot.dig("example.com", "SOA", udp=True, timeout=0.05, tries=1)
resp.check(rcode="NOERROR")
knot.ratelimit_slip = 2
knot.gen_confile()
knot.reload()
stats = send_queries(knot)
ok = stats["replied"] > 0 and stats["replied"] < 100 and stats["truncated"] >= 5 and stats["dropped"] >= 5
rrl_result("RRL enabled, 50% slips", stats, ok)
# Check for proper flags after and before active RRL.
if i < knot.ratelimit and have_flag(resp, "TC"):
set_err("CHECK NO TC FLAG")
check_log("ERROR: CHECK TC FLAG ABSENCE")
elif i > 10 and not have_flag(resp, "TC"):
set_err("CHECK TC FLAG")
check_log("ERROR: CHECK TC FLAG PRESENCE")
knot.ratelimit_whitelist = knot.addr
knot.gen_confile()
knot.reload()
stats = send_queries(knot)
ok = stats["replied"] >= 100 and stats["truncated"] == 0 and stats["dropped"] == 0
rrl_result("RRL enabled, whitelist effective", stats, ok)
t.end()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment