rplint.py 11.9 KB
Newer Older
1 2 3 4 5
#!/usr/bin/env python3

import itertools
import os
import sys
Tomas Krizek's avatar
Tomas Krizek committed
6
from typing import Any, Callable, Iterable, Iterator, Optional, List, Union, Set  # noqa
7 8 9 10 11

import pydnstest.augwrap
import pydnstest.matchpart
import pydnstest.scenario

12 13
Element = Union["Entry", "Step", pydnstest.scenario.Range]

14 15 16 17 18 19 20
RCODES = {"NOERROR", "FORMERR", "SERVFAIL", "NXDOMAIN", "NOTIMP", "REFUSED", "YXDOMAIN", "YXRRSET",
          "NXRRSET", "NOTAUTH", "NOTZONE", "BADVERS", "BADSIG", "BADKEY", "BADTIME", "BADMODE",
          "BADNAME", "BADALG", "BADTRUNC", "BADCOOKIE"}
FLAGS = {"QR", "AA", "TC", "RD", "RA", "AD", "CD"}
SECTIONS = {"question", "answer", "authority", "additional"}


21
class RplintError(ValueError):
22 23 24 25 26
    def __init__(self, fails):
        msg = ""
        for fail in fails:
            msg += str(fail) + "\n"
        super().__init__(msg)
27 28


29
def get_line_number(file: str, char_number: int) -> int:
30 31 32 33 34
    pos = 0
    for number, line in enumerate(open(file)):
        pos += len(line)
        if pos >= char_number:
            return number + 2
35
    return 0
36 37


38
def is_empty(iterable: Iterator[Any]) -> bool:
39 40 41 42 43 44 45 46
    try:
        next(iterable)
    except StopIteration:
        return True
    return False


class Entry:
47
    def __init__(self, node: pydnstest.augwrap.AugeasNode) -> None:
48 49 50 51 52 53 54 55 56
        self.match = {m.value for m in node.match("/match")}
        self.adjust = {a.value for a in node.match("/adjust")}
        self.authority = list(node.match("/section/authority/record"))
        self.reply = {r.value for r in node.match("/reply")}
        self.records = list(node.match("/section/*/record"))
        self.node = node


class Step:
57
    def __init__(self, node: pydnstest.augwrap.AugeasNode) -> None:
58 59 60
        self.node = node
        self.type = node["/type"].value
        try:
61
            self.entry = Entry(node["/entry"])  # type: Optional[Entry]
62 63 64
        except KeyError:
            self.entry = None

65

66
class RplintFail:
67 68 69
    def __init__(self, test: "RplintTest",
                 element: Optional[Element] = None,
                 etc: str = "") -> None:
70
        self.path = test.path
71
        self.element = element  # type: Optional[Element]
72 73
        self.line = get_line_number(self.path, element.node.char if element is not None else 0)
        self.etc = etc
74
        self.check = None  # type: Optional[Callable[[RplintTest], List[RplintFail]]]
75 76 77 78 79 80 81 82

    def __str__(self):
        if self.etc:
            return "{}:{} {}: {} ({})".format(os.path.basename(self.path), self.line,
                                              self.check.__name__, self.check.__doc__, self.etc)
        return "{}:{} {}: {}".format(os.path.basename(self.path), self.line, self.check.__name__,
                                     self.check.__doc__)

83

84
class RplintTest:
85
    def __init__(self, path: str) -> None:
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
        aug = pydnstest.augwrap.AugeasWrapper(confpath=os.path.realpath(path),
                                              lens='Deckard',
                                              loadpath=os.path.join(os.path.dirname(__file__),
                                                                    'pydnstest'))
        self.node = aug.tree
        self.name = os.path.basename(path)
        self.path = path

        _, self.config = pydnstest.scenario.parse_file(os.path.realpath(path))
        self.range_entries = [Entry(node) for node in self.node.match("/scenario/range/entry")]
        self.steps = [Step(node) for node in self.node.match("/scenario/step")]
        self.step_entries = [step.entry for step in self.steps if step.entry is not None]
        self.entries = self.range_entries + self.step_entries

        self.ranges = [pydnstest.scenario.Range(n) for n in self.node.match("/scenario/range")]

102
        self.fails = None  # type: Optional[List[RplintFail]]
103
        self.checks = [
Štěpán Balážik's avatar
Štěpán Balážik committed
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
            entry_more_than_one_rcode,
            entry_no_qname_qtype_copy_query,
            # Commented out for now until we implement selective turning off of checks
            # entry_ns_in_authority,
            range_overlapping_ips,
            range_shadowing_match_rules,
            step_check_answer_no_match,
            step_query_match,
            step_section_unchecked,
            step_unchecked_match,
            step_unchecked_rcode,
            scenario_ad_or_rrsig_no_ta,
            scenario_timestamp,
            config_trust_anchor_trailing_period_missing,
            step_duplicate_id,
        ]
120

121
    def run_checks(self) -> bool:
122
        """returns True iff all tests passed"""
123
        self.fails = []
124 125 126
        for check in self.checks:
            fails = check(self)
            for fail in fails:
127 128
                fail.check = check
            self.fails += fails
129

130
        if self.fails == []:
131 132 133
            return True
        return False

134 135 136
    def print_fails(self) -> None:
        if self.fails is None:
            raise RuntimeError("Maybe you should run some test first…")
137 138
        for fail in self.fails:
            print(fail)
139 140


141
def config_trust_anchor_trailing_period_missing(test: RplintTest) -> List[RplintFail]:
142 143 144 145
    """Trust-anchor option in configuration contains domain without trailing period"""
    for conf in test.config:
        if conf[0] == "trust-anchor":
            if conf[1].split()[0][-1] != ".":
146
                return [RplintFail(test, etc=conf[1])]
147 148 149
    return []


150
def scenario_timestamp(test: RplintTest) -> List[RplintFail]:
151 152 153 154 155
    """RRSSIG record present in test but no val-override-date or val-override-timestamp in config"""
    rrsigs = []
    for entry in test.entries:
        for record in entry.records:
            if record["/type"].value == "RRSIG":
156
                rrsigs.append(RplintFail(test, entry))
157 158 159 160 161 162 163
    if rrsigs:
        for k in test.config:
            if k[0] == "val-override-date" or k[0] == "val-override-timestamp":
                return []
    return rrsigs


164
def entry_no_qname_qtype_copy_query(test: RplintTest) -> List[RplintFail]:
165 166 167
    """ENTRY without qname and qtype in MATCH and without copy_query in ADJUST"""
    fails = []
    for entry in test.range_entries:
Štěpán Balážik's avatar
Štěpán Balážik committed
168 169
        if "question" not in entry.match and ("qname" not in entry.match or
                                              "qtype" not in entry.match):
170
            if "copy_query" not in entry.adjust:
171
                fails.append(RplintFail(test, entry))
172 173 174
    return fails


175
def entry_ns_in_authority(test: RplintTest) -> List[RplintFail]:
176 177 178 179 180 181
    """ENTRY has authority section with NS records, consider using MATCH subdomain"""
    fails = []
    for entry in test.range_entries:
        if entry.authority and "subdomain" not in entry.match:
            for record in entry.authority:
                if record["/type"].value == "NS":
182
                    fails.append(RplintFail(test, entry))
183 184 185
    return fails


186
def entry_more_than_one_rcode(test: RplintTest) -> List[RplintFail]:
187 188 189 190
    """ENTRY has more than one rcode in MATCH"""
    fails = []
    for entry in test.entries:
        if len(RCODES & entry.reply) > 1:
191
            fails.append(RplintFail(test, entry))
192 193 194
    return fails


195
def scenario_ad_or_rrsig_no_ta(test: RplintTest) -> List[RplintFail]:
196 197 198 199
    """AD or RRSIG present in test but no trust-anchor present in config"""
    dnssec = []
    for entry in test.entries:
        if "AD" in entry.reply or "AD" in entry.match:
200
            dnssec.append(RplintFail(test, entry))
201 202 203
        else:
            for record in entry.records:
                if record["/type"].value == "RRSIG":
204
                    dnssec.append(RplintFail(test, entry))
205 206 207 208 209 210 211 212

    if dnssec:
        for k in test.config:
            if k[0] == "trust-anchor":
                return []
    return dnssec


213
def step_query_match(test: RplintTest) -> List[RplintFail]:
214
    """STEP QUERY has a MATCH rule"""
215 216
    return [RplintFail(test, step) for step in test.steps if step.type == "QUERY" and
            step.entry and step.entry.match]
217 218


219
def step_check_answer_no_match(test: RplintTest) -> List[RplintFail]:
220
    """ENTRY in STEP CHECK_ANSWER has no MATCH rule"""
221
    return [RplintFail(test, step) for step in test.steps if step.type == "CHECK_ANSWER" and
222
            step.entry and not step.entry.match]
223 224


225
def step_unchecked_rcode(test: RplintTest) -> List[RplintFail]:
226 227 228
    """ENTRY specifies rcode but STEP MATCH does not check for it."""
    fails = []
    for step in test.steps:
229
        if step.type == "CHECK_ANSWER" and step.entry and "all" not in step.entry.match:
230
            if step.entry.reply & RCODES and "rcode" not in step.entry.match:
231
                fails.append(RplintFail(test, step.entry))
232 233 234
    return fails


235
def step_unchecked_match(test: RplintTest) -> List[RplintFail]:
236 237 238 239 240
    """ENTRY specifies flags but MATCH does not check for them"""
    fails = []
    for step in test.steps:
        if step.type == "CHECK_ANSWER":
            entry = step.entry
241 242
            if entry and "all" not in entry.match and entry.reply - RCODES and \
               "flags" not in entry.match:
243
                fails.append(RplintFail(test, entry, str(entry.reply - RCODES)))
244 245 246
    return fails


247
def step_section_unchecked(test: RplintTest) -> List[RplintFail]:
248 249 250
    """ENTRY has non-empty sections but MATCH does not check for all of them"""
    fails = []
    for step in test.steps:
251
        if step.type == "CHECK_ANSWER" and step.entry and "all" not in step.entry.match:
252 253 254
            for section in SECTIONS:
                if not is_empty(step.node.match("/entry/section/" + section + "/*")):
                    if section not in step.entry.match:
255
                        fails.append(RplintFail(test, step.entry, section))
256 257 258
    return fails


259
def range_overlapping_ips(test: RplintTest) -> List[RplintFail]:
260 261 262 263 264 265
    """RANGE has common IPs with some previous overlapping RANGE"""
    fails = []
    for r1, r2 in itertools.combinations(test.ranges, 2):
        # If the ranges overlap
        if min(r1.b, r2.b) >= max(r1.a, r2.a):
            if r1.addresses & r2.addresses:
266 267
                info = "previous range on line %d" % get_line_number(test.path, r1.node.char)
                fails.append(RplintFail(test, r2, info))
268 269 270
    return fails


271
def range_shadowing_match_rules(test: RplintTest) -> List[RplintFail]:
272
    """ENTRY has no effect since one of previous entries has the same or broader match rules"""
273 274 275
    fails = []
    for r in test.ranges:
        for e1, e2 in itertools.combinations(r.stored, 2):
276 277
            try:
                e1.match(e2.message)
278
            except ValueError:
279 280
                pass
            else:
281
                info = "previous entry on line %d" % get_line_number(test.path, e1.node.char)
282 283
                if e1.match_fields > e2.match_fields:
                    continue
284 285
                if "subdomain" not in e1.match_fields and "subdomain" in e2.match_fields:
                    continue
286
                fails.append(RplintFail(test, e2, info))
287 288 289
    return fails


290
def step_duplicate_id(test: RplintTest) -> List[RplintFail]:
291 292
    """STEP has the same ID as one of previous ones"""
    fails = []
293
    step_numbers = set()  # type: Set[int]
294 295
    for step in test.steps:
        if step.node.value in step_numbers:
296
            fails.append(RplintFail(test, step))
297 298
        else:
            step_numbers.add(step.node.value)
299 300 301 302 303 304 305 306
    return fails


# TODO: This will make sense after we fix how we handle defaults in deckard.aug and scenario.py
# We might just not use defaults altogether as testbound does
# if "copy_id" not in adjust:
#    entry_error(test, entry, "copy_id should be in ADJUST")

307
def test_run_rplint(rpl_path: str) -> None:
308
    t = RplintTest(rpl_path)
309 310
    passed = t.run_checks()
    if not passed:
311
        raise RplintError(t.fails)
312

313

314
def main():
315 316 317 318 319
    try:
        test_path = sys.argv[1]
    except IndexError:
        print("usage: %s <path to rpl file>" % sys.argv[0])
        sys.exit(2)
320 321 322 323
    if not os.path.isfile(test_path):
        print("rplint.py works on single file only.")
        print("Use rplint.sh with --scenarios=<directory with rpls> to run on rpls.")
        sys.exit(2)
324 325 326
    print("Linting %s" % test_path)
    t = RplintTest(test_path)
    passed = t.run_checks()
327
    t.print_fails()
328 329 330 331

    if passed:
        sys.exit(0)
    sys.exit(1)
332 333 334 335


if __name__ == '__main__':
    main()