Commit bce4e7b5 authored by Štěpán Balážik's avatar Štěpán Balážik Committed by Petr Špaček

rplint: refactor rplint.py for parallel use

run `./rplint.sh -n <number of processors> --scenarios=<path to rpls>`
parent 9d6cdec8
......@@ -89,6 +89,16 @@ def scenarios(paths, configs):
return scenario_list
def rpls(paths):
for path in paths:
if os.path.isfile(path):
filelist = [path] # path to single file, accept it
else:
filelist = sorted(glob.glob(os.path.join(path, "*.rpl")))
return filelist
def pytest_addoption(parser):
parser.addoption("--config", action="append", help="path to Deckard configuration .yaml file")
parser.addoption("--scenarios", action="append", help="directory with .rpl files")
......@@ -108,3 +118,6 @@ def pytest_generate_tests(metafunc):
paths = metafunc.config.option.scenarios
metafunc.parametrize("scenario", scenarios(paths, configs), ids=str)
if 'rpl' in metafunc.fixturenames:
paths = metafunc.config.option.scenarios
metafunc.parametrize("rpl", rpls(paths), ids=str)
......@@ -19,6 +19,10 @@ FLAGS = {"QR", "AA", "TC", "RD", "RA", "AD", "CD"}
SECTIONS = {"question", "answer", "authority", "additional"}
class RplintError(ValueError):
pass
def get_line_number(file, char_number):
pos = 0
for number, line in enumerate(open(file)):
......@@ -55,7 +59,7 @@ class Step:
self.entry = None
class Test:
class RplintTest:
def __init__(self, path):
aug = pydnstest.augwrap.AugeasWrapper(confpath=os.path.realpath(path),
lens='Deckard',
......@@ -73,26 +77,33 @@ class Test:
self.ranges = [pydnstest.scenario.Range(n) for n in self.node.match("/scenario/range")]
self.results = None
self.checks = [entry_more_than_one_rcode, entry_no_qname_qtype_copy_query,
entry_ns_in_authority, range_overlapping_ips, range_shadowing_match_rules,
step_check_answer_no_match, step_query_match, step_section_unchecked,
step_unchecked_match, step_unchecked_rcode, test_ad_or_rrsig_no_ta,
test_timestamp, test_trust_anchor_trailing_period_missing,
step_unchecked_match, step_unchecked_rcode, scenario_ad_or_rrsig_no_ta,
scenario_timestamp, config_trust_anchor_trailing_period_missing,
step_duplicate_id]
def print_results(self):
def run_checks(self):
"""returns True iff all tests passed"""
self.results = ""
failed = False
for check in self.checks:
fails = check(self)
if fails and not failed:
print(self.path)
failed = True
for fail in fails:
pos = get_line_number(self.path, fail)
print("\t line " + str(pos), check.__doc__)
self.results += " ".join(["line", str(pos), check.__name__, check.__doc__, "\n"])
if self.results == "":
return True
return False
def print_results(self):
print(self.results)
def test_trust_anchor_trailing_period_missing(test):
def config_trust_anchor_trailing_period_missing(test):
"""Trust-anchor option in configuration contains domain without trailing period"""
for conf in test.config:
if conf[0] == "trust-anchor":
......@@ -101,7 +112,7 @@ def test_trust_anchor_trailing_period_missing(test):
return []
def test_timestamp(test):
def scenario_timestamp(test):
"""RRSSIG record present in test but no val-override-date or val-override-timestamp in config"""
rrsigs = []
for entry in test.entries:
......@@ -145,7 +156,7 @@ def entry_more_than_one_rcode(test):
return fails
def test_ad_or_rrsig_no_ta(test):
def scenario_ad_or_rrsig_no_ta(test):
"""AD or RRSIG present in test but no trust-anchor present in config"""
dnssec = []
for entry in test.entries:
......@@ -247,9 +258,12 @@ def range_shadowing_match_rules(test):
def step_duplicate_id(test):
"""STEP has the same ID as one of previous ones"""
fails = []
for step1, step2 in itertools.combinations(test.steps, 2):
if step1.node.value == step2.node.value:
fails.append(step2.node.char)
step_numbers = set()
for step in test.steps:
if step.node.value in step_numbers:
fails.append(step.node.char)
else:
step_numbers.add(step.node.value)
return fails
......@@ -258,12 +272,23 @@ def step_duplicate_id(test):
# if "copy_id" not in adjust:
# entry_error(test, entry, "copy_id should be in ADJUST")
def test_run_rplint(rpl):
t = RplintTest(rpl)
passed = t.run_checks()
if not passed:
raise RplintError(t.results)
if __name__ == '__main__':
tests_path = sys.argv[1]
if tests_path.endswith(".rpl"):
t = Test(tests_path)
t.print_results()
else:
for file_path in sorted(glob.glob(os.path.join(tests_path, "*.rpl"))):
t = Test(file_path)
try:
test_path = sys.argv[1]
except IndexError:
print("usage: %s <path to rpl file>" % sys.argv[0])
sys.exit(2)
print("Linting %s" % test_path)
t = RplintTest(test_path)
passed = t.run_checks()
t.print_results()
if passed:
sys.exit(0)
sys.exit(1)
#!/bin/bash
set -x
set -o errexit -o nounset
MAKEDIR="$(dirname "$0")"
python3 -m pytest -c "${MAKEDIR}/rplint_pytest.ini" ${TESTS:+"--scenarios=${TESTS}"} "$@"
[pytest]
log_print = False
python_files=rplint.py
norecursedirs=*
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment