Commit 474fa41b authored by Petr Špaček's avatar Petr Špaček

Merge branch 'rplint' into 'master'

Rplint

See merge request !84
parents ca6336af ba3544ef
Pipeline #34210 passed with stage
in 1 minute and 15 seconds
......@@ -33,6 +33,17 @@ test:pylint:
- linux
- amd64
test:rplint:
stage: test
script:
- cp ci/common.sh /tmp
- cp ci/compare-rplint.sh /tmp
- /tmp/compare-rplint.sh
tags:
- docker
- linux
- amd64
test:unittests:
stage: test
script:
......
#!/bin/bash
set -o nounset -o xtrace -o errexit
source "$(dirname "$(readlink -f "$0")")/common.sh"
function find_new_tests {
: detect tests affected by current merge request
: store list of modified tests in ${NEW_TESTS_FILE}
git diff --name-only --diff-filter=A ${MERGEBASE} ${HEAD} | fgrep .rpl > "${NEW_TESTS_FILE}" || : no new tests detected
}
NEW_TESTS_FILE="/tmp/new_tests"
find_new_tests
truncate -s0 /tmp/rplint_fails
for test in $(cat ${NEW_TESTS_FILE})
do
${PYTHON} -m rplint $test >> /tmp/rplint_fails
done
cat /tmp/rplint_fails
test "!" -s /tmp/rplint_fails
......@@ -19,6 +19,10 @@ test -n "${STATUS}" && echo "Working tree is dirty, commit your changes now." &&
trap checkout_back EXIT
trap "{ FAILURE_DETECTED=1; }" ERR
"${CIDIR}"/compare-rplint.sh
checkout_back
git clean -xdf
"${CIDIR}"/compare-pylint.sh
checkout_back
git clean -xdf
......
......@@ -161,6 +161,10 @@ class AugeasNode(collections.MutableMapping):
self._span = "char position %s" % self._aug.span(self._path)[5]
return self._span
@property
def char(self):
return self._aug.span(self._path)[5]
@value.setter
def value(self, value):
"""
......
......@@ -33,7 +33,7 @@ let hex_re = /[0-9a-fA-F]+/
let match_option = "opcode" | "qtype" | "qcase" | "qname" | "subdomain" | "flags" | "rcode" | "question" | "answer" | "authority" | "additional" | "all" | "TCP" | "ttl"
let adjust_option = "copy_id" | "copy_query"
let reply_option = "QR" | "TC" | "AA" | "AD" | "RD" | "RA" | "CD" | "DO" | "NOERROR" | "FORMERR" | "SERVFAIL" | "NXDOMAIN" | "NOTIMP" | "REFUSED" | "YXDOMAIN" | "YXRRSET" | "NXRRSET" | "NOTAUTH" | "NOTZONE" | "BADVERS" | "QUERY" | "IQUERY" | "STATUS" | "NOTIFY" | "UPDATE"
let reply_option = "QR" | "TC" | "AA" | "AD" | "RD" | "RA" | "CD" | "DO" | "NOERROR" | "FORMERR" | "SERVFAIL" | "NXDOMAIN" | "NOTIMP" | "REFUSED" | "YXDOMAIN" | "YXRRSET" | "NXRRSET" | "NOTAUTH" | "NOTZONE" | "BADVERS" | "BADSIG" | "BADKEY" | "BADTIME" | "BADMODE" | "BADNAME" | "BADALG" | "BADTRUNC" | "BADCOOKIE"
let step_option = "REPLY" | "QUERY" | "CHECK_ANSWER" | "CHECK_OUT_QUERY" | /TIME_PASSES[ \t]+ELAPSE/
let mandatory = [del_str "MANDATORY" . label "mandatory" . value "true" . comment_or_eol]
......
"""matchpart is used to compare two DNS messages using a single criterion"""
import dns.rcode
import dns.edns
class DataMismatch(Exception):
def __init__(self, exp_val, got_val):
super().__init__()
self.exp_val = exp_val
self.got_val = got_val
def __str__(self):
return 'expected "{0.exp_val}" got "{0.got_val}"'.format(self)
def __hash__(self):
return hash((self.exp_val, self.got_val))
def __eq__(self, other):
return (isinstance(other, DataMismatch)
and self.exp_val == other.exp_val
and self.got_val == other.got_val)
def __ne__(self, other):
return not self.__eq__(other)
def compare_val(exp, got):
"""Compare arbitraty objects, throw exception if different. """
if exp != got:
raise DataMismatch(exp, got)
return True
def compare_rrs(expected, got):
""" Compare lists of RR sets, throw exception if different. """
for rr in expected:
if rr not in got:
raise DataMismatch(expected, got)
for rr in got:
if rr not in expected:
raise DataMismatch(expected, got)
if len(expected) != len(got):
raise DataMismatch(expected, got)
return True
def compare_rrs_types(exp_val, got_val, skip_rrsigs):
"""sets of RR types in both sections must match"""
def rr_ordering_key(rrset):
if rrset.covers:
return rrset.covers, 1 # RRSIGs go to the end of RRtype list
else:
return rrset.rdtype, 0
def key_to_text(rrtype, rrsig):
if not rrsig:
return dns.rdatatype.to_text(rrtype)
else:
return 'RRSIG(%s)' % dns.rdatatype.to_text(rrtype)
if skip_rrsigs:
exp_val = (rrset for rrset in exp_val
if rrset.rdtype != dns.rdatatype.RRSIG)
got_val = (rrset for rrset in got_val
if rrset.rdtype != dns.rdatatype.RRSIG)
exp_types = frozenset(rr_ordering_key(rrset) for rrset in exp_val)
got_types = frozenset(rr_ordering_key(rrset) for rrset in got_val)
if exp_types != got_types:
exp_types = tuple(key_to_text(*i) for i in sorted(exp_types))
got_types = tuple(key_to_text(*i) for i in sorted(got_types))
raise DataMismatch(exp_types, got_types)
def match_opcode(exp, got):
return compare_val(exp.opcode(),
got.opcode())
def match_qtype(exp, got):
if not exp.question:
return True
return compare_val(exp.question[0].rdtype,
got.question[0].rdtype)
def match_qname(exp, got):
if not exp.question:
return True
return compare_val(exp.question[0].name,
got.question[0].name)
def match_qcase(exp, got):
return compare_val(exp.question[0].name.labels,
got.question[0].name.labels)
def match_subdomain(exp, got):
if not exp.question:
return True
qname = dns.name.from_text(got.question[0].name.to_text().lower())
if exp.question[0].name.is_superdomain(qname):
return True
raise DataMismatch(exp, got)
def match_flags(exp, got):
return compare_val(dns.flags.to_text(exp.flags),
dns.flags.to_text(got.flags))
def match_rcode(exp, got):
return compare_val(dns.rcode.to_text(exp.rcode()),
dns.rcode.to_text(got.rcode()))
def match_question(exp, got):
return compare_rrs(exp.question,
got.question)
def match_answer(exp, got):
return compare_rrs(exp.answer,
got.answer)
def match_ttl(exp, got):
return compare_rrs(exp.answer,
got.answer)
def match_answertypes(exp, got):
return compare_rrs_types(exp.answer,
got.answer, skip_rrsigs=True)
def match_answerrrsigs(exp, got):
return compare_rrs_types(exp.answer,
got.answer, skip_rrsigs=False)
def match_authority(exp, got):
return compare_rrs(exp.authority,
got.authority)
def match_additional(exp, got):
return compare_rrs(exp.additional,
got.additional)
def match_edns(exp, got):
if got.edns != exp.edns:
raise DataMismatch(exp.edns,
got.edns)
if got.payload != exp.payload:
raise DataMismatch(exp.payload,
got.payload)
def match_nsid(exp, got):
nsid_opt = None
for opt in exp.options:
if opt.otype == dns.edns.NSID:
nsid_opt = opt
break
# Find matching NSID
for opt in got.options:
if opt.otype == dns.edns.NSID:
if not nsid_opt:
raise DataMismatch(None, opt.data)
if opt == nsid_opt:
return True
else:
raise DataMismatch(nsid_opt.data, opt.data)
if nsid_opt:
raise DataMismatch(nsid_opt.data, None)
return True
MATCH = {"opcode": match_opcode, "qtype": match_qtype, "qname": match_qname, "qcase": match_qcase,
"subdomain": match_subdomain, "flags": match_flags, "rcode": match_rcode,
"question": match_question, "answer": match_answer, "ttl": match_ttl,
"answertypes": match_answertypes, "answerrrsigs": match_answerrrsigs,
"authority": match_authority, "additional": match_additional, "edns": match_edns,
"nsid": match_nsid}
def match_part(exp, got, code):
try:
return MATCH[code](exp, got)
except KeyError:
raise NotImplementedError('unknown match request "%s"' % code)
......@@ -21,6 +21,7 @@ import dns.rrset
import dns.tsigkeyring
import pydnstest.augwrap
import pydnstest.matchpart
def str2bool(v):
......@@ -32,39 +33,6 @@ def str2bool(v):
g_rtt = 0.0
g_nqueries = 0
#
# Element comparators
#
def compare_rrs(expected, got):
""" Compare lists of RR sets, throw exception if different. """
for rr in expected:
if rr not in got:
raise ValueError("expected record '%s'" % rr.to_text())
for rr in got:
if rr not in expected:
raise ValueError("unexpected record '%s'" % rr.to_text())
if len(expected) != len(got):
raise ValueError("expected %s records but got %s records "
"(a duplicate RR somewhere?)"
% (len(expected), len(got)))
return True
def compare_val(expected, got):
""" Compare values, throw exception if different. """
if expected != got:
raise ValueError("expected '%s', got '%s'" % (expected, got))
return True
def compare_sub(got, expected):
""" Check if got subdomain of expected, throw exception if different. """
if not expected.is_subdomain(got):
raise ValueError("expected subdomain of '%s', got '%s'" % (expected, got))
return True
def recvfrom_msg(stream, raw=False):
"""
......@@ -352,67 +320,6 @@ class Entry:
return None
return opcodes[0]
def match_part(self, code, msg):
""" Compare scripted reply to given message using single criteria. """
if code not in self.match_fields and 'all' not in self.match_fields:
return True
expected = self.message
if code == 'opcode':
return compare_val(expected.opcode(), msg.opcode())
elif code == 'qtype':
if not expected.question:
return True
return compare_val(expected.question[0].rdtype, msg.question[0].rdtype)
elif code == 'qname':
if not expected.question:
return True
qname = dns.name.from_text(msg.question[0].name.to_text().lower())
return compare_val(expected.question[0].name, qname)
elif code == 'qcase':
return compare_val(msg.question[0].name.labels, expected.question[0].name.labels)
elif code == 'subdomain':
if not expected.question:
return True
qname = dns.name.from_text(msg.question[0].name.to_text().lower())
return compare_sub(expected.question[0].name, qname)
elif code == 'flags':
return compare_val(dns.flags.to_text(expected.flags), dns.flags.to_text(msg.flags))
elif code == 'rcode':
return compare_val(dns.rcode.to_text(expected.rcode()), dns.rcode.to_text(msg.rcode()))
elif code == 'question':
return compare_rrs(expected.question, msg.question)
elif code == 'answer' or code == 'ttl':
return compare_rrs(expected.answer, msg.answer)
elif code == 'authority':
return compare_rrs(expected.authority, msg.authority)
elif code == 'additional':
return compare_rrs(expected.additional, msg.additional)
elif code == 'edns':
if msg.edns != expected.edns:
raise ValueError('expected EDNS %d, got %d' % (expected.edns, msg.edns))
if msg.payload != expected.payload:
raise ValueError('expected EDNS bufsize %d, got %d'
% (expected.payload, msg.payload))
elif code == 'nsid':
nsid_opt = None
for opt in expected.options:
if opt.otype == dns.edns.NSID:
nsid_opt = opt
break
# Find matching NSID
for opt in msg.options:
if opt.otype == dns.edns.NSID:
if not nsid_opt:
raise ValueError('unexpected NSID value "%s"' % opt.data)
if opt == nsid_opt:
return True
else:
raise ValueError('expected NSID "%s", got "%s"' % (nsid_opt.data, opt.data))
if nsid_opt:
raise ValueError('expected NSID "%s"' % nsid_opt.data)
else:
raise ValueError('unknown match request "%s"' % code)
def match(self, msg):
""" Compare scripted reply to given message based on match criteria. """
match_fields = self.match_fields
......@@ -421,8 +328,8 @@ class Entry:
match_fields += ['flags'] + ['rcode'] + self.sections
for code in match_fields:
try:
self.match_part(code, msg)
except ValueError as ex:
pydnstest.matchpart.match_part(self.message, msg, code)
except pydnstest.matchpart.DataMismatch as ex:
errstr = '%s in the response:\n%s' % (str(ex), msg.to_text())
# TODO: cisla radku
raise ValueError("%s, \"%s\": %s" % (self.node.span, code, errstr))
......@@ -503,6 +410,7 @@ class Range:
self.node = node
self.a = int(node['/from'].value)
self.b = int(node['/to'].value)
assert self.a <= self.b
address = node["/address"].value
self.addresses = {address} if address is not None else set()
......
#!/usr/bin/env python3
from contextlib import suppress
import glob
import itertools
import os
import sys
import dns.name
import pydnstest.augwrap
import pydnstest.matchpart
import pydnstest.scenario
RCODES = {"NOERROR", "FORMERR", "SERVFAIL", "NXDOMAIN", "NOTIMP", "REFUSED", "YXDOMAIN", "YXRRSET",
"NXRRSET", "NOTAUTH", "NOTZONE", "BADVERS", "BADSIG", "BADKEY", "BADTIME", "BADMODE",
"BADNAME", "BADALG", "BADTRUNC", "BADCOOKIE"}
FLAGS = {"QR", "AA", "TC", "RD", "RA", "AD", "CD"}
SECTIONS = {"question", "answer", "authority", "additional"}
def get_line_number(file, char_number):
pos = 0
for number, line in enumerate(open(file)):
pos += len(line)
if pos >= char_number:
return number + 2
def is_empty(iterable):
try:
next(iterable)
except StopIteration:
return True
return False
class Entry:
def __init__(self, node):
self.match = {m.value for m in node.match("/match")}
self.adjust = {a.value for a in node.match("/adjust")}
self.authority = list(node.match("/section/authority/record"))
self.reply = {r.value for r in node.match("/reply")}
self.records = list(node.match("/section/*/record"))
self.node = node
class Step:
def __init__(self, node):
self.node = node
self.type = node["/type"].value
try:
self.entry = Entry(node["/entry"])
except KeyError:
self.entry = None
class Test:
def __init__(self, path):
aug = pydnstest.augwrap.AugeasWrapper(confpath=os.path.realpath(path),
lens='Deckard',
loadpath=os.path.join(os.path.dirname(__file__),
'pydnstest'))
self.node = aug.tree
self.name = os.path.basename(path)
self.path = path
_, self.config = pydnstest.scenario.parse_file(os.path.realpath(path))
self.range_entries = [Entry(node) for node in self.node.match("/scenario/range/entry")]
self.steps = [Step(node) for node in self.node.match("/scenario/step")]
self.step_entries = [step.entry for step in self.steps if step.entry is not None]
self.entries = self.range_entries + self.step_entries
self.ranges = [pydnstest.scenario.Range(n) for n in self.node.match("/scenario/range")]
self.checks = [entry_more_than_one_rcode, entry_no_qname_qtype_copy_query,
entry_ns_in_authority, range_overlapping_ips, range_shadowing_match_rules,
step_check_answer_no_match, step_query_match, step_section_unchecked,
step_unchecked_match, step_unchecked_rcode, test_ad_or_rrsig_no_ta,
test_timestamp, test_trust_anchor_trailing_period_missing,
step_duplicate_id]
def print_results(self):
failed = False
for check in self.checks:
fails = check(self)
if fails and not failed:
print(self.path)
failed = True
for fail in fails:
pos = get_line_number(self.path, fail)
print("\t line " + str(pos), check.__doc__)
def test_trust_anchor_trailing_period_missing(test):
"""Trust-anchor option in configuration contains domain without trailing period"""
for conf in test.config:
if conf[0] == "trust-anchor":
if conf[1].split()[0][-1] != ".":
return [0]
return []
def test_timestamp(test):
"""RRSSIG record present in test but no val-override-date or val-override-timestamp in config"""
rrsigs = []
for entry in test.entries:
for record in entry.records:
if record["/type"].value == "RRSIG":
rrsigs.append(record.char)
if rrsigs:
for k in test.config:
if k[0] == "val-override-date" or k[0] == "val-override-timestamp":
return []
return rrsigs
def entry_no_qname_qtype_copy_query(test):
"""ENTRY without qname and qtype in MATCH and without copy_query in ADJUST"""
fails = []
for entry in test.range_entries:
if "qname" not in entry.match or "qtype" not in entry.match:
if "copy_query" not in entry.adjust:
fails.append(entry.node.char)
return fails
def entry_ns_in_authority(test):
"""ENTRY has authority section with NS records, consider using MATCH subdomain"""
fails = []
for entry in test.range_entries:
if entry.authority and "subdomain" not in entry.match:
for record in entry.authority:
if record["/type"].value == "NS":
fails.append(entry.node.char)
return fails
def entry_more_than_one_rcode(test):
"""ENTRY has more than one rcode in MATCH"""
fails = []
for entry in test.entries:
if len(RCODES & entry.reply) > 1:
fails.append(entry.node.char)
return fails
def test_ad_or_rrsig_no_ta(test):
"""AD or RRSIG present in test but no trust-anchor present in config"""
dnssec = []
for entry in test.entries:
if "AD" in entry.reply or "AD" in entry.match:
dnssec.append(entry.node.char)
else:
for record in entry.records:
if record["/type"].value == "RRSIG":
dnssec.append(entry.node.char)
if dnssec:
for k in test.config:
if k[0] == "trust-anchor":
return []
return dnssec
def step_query_match(test):
"""STEP QUERY has a MATCH rule"""
return [step.node.char for step in test.steps if step.type == "QUERY" and step.entry.match]
def step_check_answer_no_match(test):
"""ENTRY in STEP CHECK_ANSWER has no MATCH rule"""
return [step.entry.node.char for step in test.steps if step.type == "CHECK_ANSWER"
and not step.entry.match]
def step_unchecked_rcode(test):
"""ENTRY specifies rcode but STEP MATCH does not check for it."""
fails = []
for step in test.steps:
if step.type == "CHECK_ANSWER" and "all" not in step.entry.match:
if step.entry.reply & RCODES and "rcode" not in step.entry.match:
fails.append(step.entry.node.char)
return fails
def step_unchecked_match(test):
"""ENTRY specifies flags but MATCH does not check for them"""
fails = []
for step in test.steps:
if step.type == "CHECK_ANSWER":
entry = step.entry
if "all" not in entry.match and entry.reply - RCODES and "flags" not in entry.match:
fails.append(entry.node.char)
return fails
def step_section_unchecked(test):
"""ENTRY has non-empty sections but MATCH does not check for all of them"""
fails = []
for step in test.steps:
if step.type == "CHECK_ANSWER" and "all" not in step.entry.match:
for section in SECTIONS:
if not is_empty(step.node.match("/entry/section/" + section + "/*")):
if section not in step.entry.match:
fails.append(step.entry.node.char)
return fails
def range_overlapping_ips(test):
"""RANGE has common IPs with some previous overlapping RANGE"""
fails = []
for r1, r2 in itertools.combinations(test.ranges, 2):
# If the ranges overlap
if min(r1.b, r2.b) >= max(r1.a, r2.a):
if r1.addresses & r2.addresses:
fails.append(r2.node.char)
return fails
def range_shadowing_match_rules(test):
"""ENTRY has no effect since one of previous entries has broader match rules"""
fails = []
for r in test.ranges:
for e1, e2 in itertools.combinations(r.stored, 2):
match1 = set(e1.match_fields)
match2 = set(e2.match_fields)
msg1 = e1.message
msg2 = e2.message
if match1 >= match2:
with suppress(pydnstest.matchpart.DataMismatch):
if pydnstest.matchpart.compare_rrs(msg1.question, msg2.question):
fails.append(e2.node.char)
if "subdomain" in match1:
if msg1.question[0].name.is_superdomain(msg2.question[0].name):
match1.discard("subdomain")
match2.discard("subdomain")
if match1 >= match2:
msg1.question[0].name = dns.name.Name("")
msg2.question[0].name = dns.name.Name("")
with suppress(pydnstest.matchpart.DataMismatch):
if pydnstest.matchpart.compare_rrs(msg1.question, msg2.question):
fails.append(e2.node.char)
return fails
def step_duplicate_id(test):
"""STEP has the same ID as one of previous ones"""
fails = []
for step1, step2 in itertools.combinations(test.steps, 2):
if step1.node.value == step2.node.value:
fails.append(step2.node.char)
return fails
# TODO: This will make sense after we fix how we handle defaults in deckard.aug and scenario.py
# We might just not use defaults altogether as testbound does
# if "copy_id" not in adjust:
# entry_error(test, entry, "copy_id should be in ADJUST")
if __name__ == '__main__':
tests_path = sys.argv[1]
if tests_path.endswith(".rpl"):
t = Test(tests_path)
t.print_results()
else:
for file_path in sorted(glob.glob(os.path.join(tests_path, "*.rpl"))):
t = Test(file_path)
t.print_results()
......@@ -346,7 +346,6 @@ ENTRY_END
; explicitly defined records gets properly validated even with cached wildcard
STEP 40 QUERY
ENTRY_BEGIN
MATCH all
REPLY RD DO AD
SECTION QUESTION
explicita.nsec.example. IN A
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment