Commit c56efb1b authored by Petr Špaček's avatar Petr Špaček

WIP: fixup

parent 2adcdd26
Pipeline #48039 passed with stage
in 1 minute and 2 seconds
#!/usr/bin/python3
import collections
import ipaddress
import logging
import pickle
import re
import sys
from typing import Counter, Dict, List, Tuple
from ednsevalzone import EDNSResult, AnIPAddress
# hichina.com. @2400:3200:2000:59::1 (ns2.hichina.com.): tcp=failed do=timeout signed=timeout ednstcp=failed
# hichina.com. @106.11.211.54 (ns2.hichina.com.): tcp=connection-refused do=ok signed=ok ednstcp=connection-refused
# nic.cz. @193.29.206.1 (d.ns.nic.cz.): tcp=ok do=ok signed=ok,yes ednstcp=ok
# test. @127.0.0.10 (test.): tcp=nosoa,noaa do=nosoa,noaa signed=nosoa,noaa ednstcp=timeout
def parse_nsip_line(line: str) -> Tuple[AnIPAddress, Dict[str, List[str]]]:
"""parse one line from genreport log"""
matches = re.match('^[^ ]*\\. @(?P<ip>[^ ]+) \\([^)]+\\): (?P<results>tcp=.*)$', line)
if not matches:
raise ValueError('line "{}" does not have expected format, skipping'.format(line))
tests_list = matches.group('results').split()
try:
tests_results = {test.split('=')[0]:
test.split('=')[1].split(',')
for test in tests_list}
except IndexError:
raise ValueError('skipping nonsense test results "{}"'.format(line))
tests_results.pop('signed', None) # ignore DNSSEC things
return ipaddress.ip_address(matches.group("ip")), tests_results
def eval_tcp_strict(tcp_results: Dict[str, List[str]]) -> EDNSResult:
"""
Evaluate impact of TCP failures on clients with small EDNS buffer size.
Failure will prevent clients with small buffer size from retrieving data.
"""
return EDNSResult.dead
def eval_tcp_permissive(tcp_results: Dict[str, List[str]]) -> EDNSResult:
"""
Evaluate impact of TCP failures on clients with big EDNS buffer size.
Timeouts force retries elsewhere and add high latency.
"""
if 'ok' in tcp_results['do']:
return EDNSResult.high_latency # EDNS over UDP works but TCP has issues
else:
return EDNSResult.dead
def eval_tcp(tests_results: Dict[str, List[str]], timeout_evaluator) -> EDNSResult:
"""
Combine individual tests into overall result for a single IP address.
"""
if all('ok' in results for results in tests_results.values()):
return EDNSResult.ok
# it is not 100% compliant but answers -> it is kind of "compatible" but does not get highest grade
if (all('timeout' not in results for results in tests_results.values())
and all('failed' not in results for results in tests_results.values())
and all('connection-refused' not in results for results in tests_results.values())):
return EDNSResult.compatible
else: # impact of not working TCP is different before and after the flag day
return timeout_evaluator(tests_results)
def collect_server_stats(eval_tcp_func, edns_infns: str) -> Dict[AnIPAddress, Counter[EDNSResult]]:
"""
Combine results from all files with genreport output and summarize stats
"""
server_stats = {} # type: Dict[AnIPAddress, Counter[EDNSResult]]
i = 1
for infilename in edns_infns:
logging.info('processed file no. {}, file name "{}"'.format(i, infilename))
with open(infilename) as infile:
for line in infile:
line = line.strip()
try:
ip, edns_results = parse_nsip_line(line)
combined_result = eval_tcp(edns_results, eval_tcp_func)
server = server_stats.setdefault(ip, collections.Counter())
server[combined_result] += 1
except ValueError as ex:
logging.warning('%s', ex)
#raise
i += 1
return server_stats
def save(nsstats, criteria: str) -> None:
"""
param criteria: name of criteria - strict / permissive
"""
filename = 'tcpstats_{}.pickle'.format(criteria)
logging.info('saving TCP results into {}'.format(filename))
pickle.dump(nsstats, open(filename, 'wb'))
def main(infiles):
"""
infiles - names of files with output from ISC genreport
"""
logging.info('processing input in EDNS strict mode')
nsstats_strict = collect_server_stats(eval_tcp_strict, infiles)
save(nsstats_strict, 'strict')
logging.info('processing input in EDNS permissive mode')
nsstats_permissive = collect_server_stats(eval_tcp_permissive, infiles)
save(nsstats_permissive, 'permissive')
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(message)s')
if len(sys.argv) < 2:
print('Usage: {} tcpcomp_output1 [tcpcomp_output2] ...'.format(sys.argv[0]))
sys.exit(1)
main(sys.argv[1:])
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment