Commit bbf10b35 authored by Štěpán Balážik's avatar Štěpán Balážik

tools: network and forwarder checks

Inspired by RFC8027 we implement simple checks that diagnose the sanity of network enviroment (or a forwarding resolver) for use of DNSSEC.
parent 9e31e7b6
Pipeline #46283 failed with stage
in 1 minute and 43 seconds
......@@ -37,13 +37,14 @@ def recv_n_bytes_from_tcp(stream: socket.socket, n: int, deadline: float) -> byt
return data
def recvfrom_blob(sock: socket.socket) -> Tuple[bytes, str]:
def recvfrom_blob(sock: socket.socket,
timeout: int = SOCKET_OPERATION_TIMEOUT) -> Tuple[bytes, str]:
"""
Receive DNS message from TCP/UDP socket.
"""
# deadline is always time.monotonic
deadline = time.monotonic() + SOCKET_OPERATION_TIMEOUT
deadline = time.monotonic() + timeout
while True:
try:
......@@ -67,8 +68,9 @@ def recvfrom_blob(sock: socket.socket) -> Tuple[bytes, str]:
raise
def recvfrom_msg(sock: socket.socket) -> Tuple[dns.message.Message, str]:
data, addr = recvfrom_blob(sock)
def recvfrom_msg(sock: socket.socket,
timeout: int = SOCKET_OPERATION_TIMEOUT) -> Tuple[dns.message.Message, str]:
data, addr = recvfrom_blob(sock, timeout=timeout)
msg = dns.message.from_wire(data, one_rr_per_rrset=True)
return msg, addr
......@@ -118,11 +120,12 @@ def send_query(sock: socket.socket, query: Union[dns.message.Message, bytes]) ->
raise
def get_answer(sock: socket.socket) -> bytes:
def get_answer(sock: socket.socket, timeout: int = SOCKET_OPERATION_TIMEOUT) -> bytes:
""" Compatibility function """
answer, _ = recvfrom_blob(sock)
answer, _ = recvfrom_blob(sock, timeout=timeout)
return answer
def get_dns_message(sock: socket.socket) -> dns.message.Message:
return dns.message.from_wire(get_answer(sock))
def get_dns_message(sock: socket.socket,
timeout: int = SOCKET_OPERATION_TIMEOUT) -> dns.message.Message:
return dns.message.from_wire(get_answer(sock, timeout=timeout))
"""Functions for sending DNS queries and checking recieved answers checking"""
# pylint: disable=C0301
# flake8: noqa
from ipaddress import IPv4Address, IPv6Address
import random
from typing import Iterable, Optional, Set, Union
import dns.message
import dns.flags
import pydnstest.matchpart
import pydnstest.mock_client
def unset_flag(message: dns.message.Message, flag: int) -> dns.message.Message:
"""Unsets given flag in given DNS message."""
assert flag in dns.flags._by_value, "This is not a DNS flag" # pylint: disable=W0212
message.flags &= ~flag
return message
def send_and_check(question: Union[dns.message.Message, bytes], # pylint: disable=R0913
expected: dns.message.Message,
server: Union[IPv4Address, IPv6Address],
match_fields: Set[str],
port: int = 53,
tcp: bool = False,
timeout: int = pydnstest.mock_client.SOCKET_OPERATION_TIMEOUT,
unset_flags: Iterable[int] = tuple()) -> bool:
"""Checks if DNS answer recieved for a question from a server matches expected one in specified
field. See pydnstest.matchpart for more information on match fields
Returns True on success, raises an exceptions on failure.
"""
print("Sending query:\n%s\n" % question)
answer = get_answer(question, server, port, tcp, timeout=timeout)
for flag in unset_flags:
answer = unset_flag(answer, flag)
print("Got answer:\n%s\n" % answer)
print("Matching:\n%s\n%s\n" % (match_fields, expected))
for field in match_fields:
pydnstest.matchpart.match_part(expected, answer, field)
return True
def get_answer(question: Union[dns.message.Message, bytes],
server: Union[IPv4Address, IPv6Address],
port: int = 53,
tcp: bool = False,
timeout: int = pydnstest.mock_client.SOCKET_OPERATION_TIMEOUT) -> dns.message.Message:
"""Get an DNS message with answer with specific query"""
sock = pydnstest.mock_client.setup_socket(str(server), port, tcp=tcp)
pydnstest.mock_client.send_query(sock, question)
return pydnstest.mock_client.get_dns_message(sock, timeout=timeout)
def string_answer(question: Union[dns.message.Message, bytes],
server: Union[IPv4Address, IPv6Address],
port: int = 53,
tcp: bool = False) -> str:
"""Prints answer of a server. Good for generating tests."""
return get_answer(question, server, port, tcp).to_text()
def randomize_case(label: bytes) -> bytes:
"""Randomize case in a DNS name label"""
output = []
for byte in label:
if random.randint(0, 1):
output.append(bytes([byte]).swapcase())
else:
output.append(bytes([byte]))
return b''.join(output)
def make_random_case_query(name: str, *args, **kwargs) -> dns.message.Message:
"""Proxy for dns.message.make_query with rANdoM-cASe"""
query = dns.message.make_query(name, *args, **kwargs)
for label in query.question[0].name.labels:
label = randomize_case(label)
return query
import ipaddress
# These are IPs of a.ns.nic.cz
AUTHORITATIVE_SERVERS = [ipaddress.IPv4Address("194.0.12.1"),
ipaddress.IPv6Address("2001:678:f::1")]
def pytest_addoption(parser):
parser.addoption("--forwarder", action="append", help="IP of forwarder to test")
def pytest_generate_tests(metafunc):
if 'forwarder' in metafunc.fixturenames:
forwarder = metafunc.config.option.forwarder
metafunc.parametrize("forwarder", [ipaddress.ip_address(f) for f in forwarder], ids=str)
if 'tcp' in metafunc.fixturenames:
metafunc.parametrize("tcp", [False, True], ids=lambda x: "TCP" if x else "UDP")
if 'server' in metafunc.fixturenames:
metafunc.parametrize("server", AUTHORITATIVE_SERVERS, ids=str)
"""Test suite to test forwarders
Invoke with `python3 -m pytest forwarder_check.py --forwarder [IP of forwarder]`"""
# pylint: disable=C0301,C0111,C0103
# flake8: noqa
import ipaddress
import dns.message
import pytest
import answer_checker
ALL = {"opcode", "qtype", "qname", "flags", "rcode", "answer", "authority", "additional"}
HEADER = {"opcode", "qtype", "qname", "flags", "rcode"}
VERSION_QUERY = dns.message.make_query("_version.test.knot-resolver.cz", "TXT")
VERSION_ANSWER = dns.message.from_text(""";
opcode QUERY
rcode NOERROR
flags QR RA RD
;QUESTION
_version.test.knot-resolver.cz. IN TXT
;ANSWER
_version.test.knot-resolver.cz. 3600 IN TXT "1"
;AUTHORITY
;ADDITIONAL
""")
def test_zone_version(forwarder):
return answer_checker.send_and_check(VERSION_QUERY,
VERSION_ANSWER,
forwarder,
ALL - {"additional", "authority"})
SIMPLE_QUERY = answer_checker.make_random_case_query("good-a.test.knot-resolver.cz", "A")
SIMPLE_ANSWER = dns.message.from_text(""";
opcode QUERY
rcode NOERROR
flags QR RD RA
;QUESTION
good-a.test.knot-resolver.cz. IN A
;ANSWER
good-a.test.knot-resolver.cz. 3600 IN A 217.31.192.130
;AUTHORITY
;ADDITIONAL""")
def test_supports_simple_answers(forwarder, tcp):
return answer_checker.send_and_check(SIMPLE_QUERY,
SIMPLE_ANSWER,
forwarder,
ALL - {"additional", "authority"},
tcp=tcp)
EDNS_QUERY = answer_checker.make_random_case_query("good-a.test.knot-resolver.cz", "A", use_edns=0)
def test_supports_EDNS0(forwarder, tcp):
answer = answer_checker.get_answer(EDNS_QUERY, forwarder, tcp=tcp)
if answer.edns != 0:
raise ValueError("EDNS0 not supported")
DO_QUERY = answer_checker.make_random_case_query("good-a.test.knot-resolver.cz", "A", want_dnssec=True)
def test_supports_DO(forwarder, tcp):
answer = answer_checker.get_answer(DO_QUERY, forwarder, tcp=tcp)
if not answer.flags & dns.flags.DO:
raise ValueError("DO bit sent, but not recieved")
CD_QUERY = answer_checker.make_random_case_query("good-a.test.knot-resolver.cz", "A", want_dnssec=True)
CD_QUERY.flags += dns.flags.CD
def test_supports_CD(forwarder, tcp):
answer = answer_checker.get_answer(CD_QUERY, forwarder, tcp=tcp)
if not answer.flags & dns.flags.CD:
raise ValueError("CD bit sent, but not recieved")
RRSIG_QUERY = answer_checker.make_random_case_query("good-a.test.knot-resolver.cz", "A", want_dnssec=True)
RRSIG_ANSWER = dns.message.from_text(""";
opcode QUERY
rcode NOERROR
flags QR RD RA AD
edns 0
eflags DO
payload 4096
;QUESTION
good-a.test.knot-resolver.cz. IN A
;ANSWER
good-a.test.knot-resolver.cz. 3600 IN A 217.31.192.130
good-a.test.knot-resolver.cz. 3600 IN RRSIG A 13 4 3600 20370119135450 20190205122450 58 test.knot-resolver.cz. n7BfrYwvRztj8khwefZxnVUSBm6vvIWH 3HGTfswPSUKqNrg6yqMIxm0dpLVPSIna hPnTnP3CP6G4SEfvAGk33w==
;AUTHORITY
;ADDITIONAL""")
def test_returns_RRSIG(forwarder, tcp):
return answer_checker.send_and_check(RRSIG_QUERY,
RRSIG_ANSWER,
forwarder,
HEADER | {"answerrrsigs"},
tcp=tcp)
DNSKEY_QUERY = answer_checker.make_random_case_query("test.knot-resolver.cz", "DNSKEY", want_dnssec=True)
DNSKEY_ANSWER = dns.message.from_text(""";
opcode QUERY
rcode NOERROR
flags QR RD RA AD
edns 0
eflags DO
payload 4096
;QUESTION
test.knot-resolver.cz. IN DNSKEY
;ANSWER
test.knot-resolver.cz. 3600 IN DNSKEY 256 3 13 b5ZQUzN5iD9ercgxPeeEh9qI8UzazMa6 vo8GCART4iQNzAcsB6xPYVopHKcjyssH MUiDoQgrjVd6hOLWQqnCtg==
test.knot-resolver.cz. 3600 IN DNSKEY 257 3 13 xrbuMAmJy3GlxUF46tJgP64cmExKWQBg iRGeLhfub9x3DV69D+2m1zom+CyqHsYY VDIjYOueGzj/8XFucg1bDw==
test.knot-resolver.cz. 3600 IN RRSIG DNSKEY 13 3 3600 20370119141532 20190205124532 60526 test.knot-resolver.cz. TCJGKcojvwe5cQYJaj+vMS5/lW2xLDVi cABjowFhQ3ttTIfjNINBK1sAJgybmdtd 5GcBlgXOPz+QWRFJUnRU2g==
;AUTHORITY
;ADDITIONAL""")
def test_supports_DNSKEY(forwarder, tcp):
return answer_checker.send_and_check(DNSKEY_QUERY,
DNSKEY_ANSWER,
forwarder,
ALL - {"additional", "authority"},
tcp=tcp)
DS_QUERY = answer_checker.make_random_case_query("test.knot-resolver.cz", "DS", want_dnssec=True)
DS_ANSWER = dns.message.from_text(""";
opcode QUERY
rcode NOERROR
flags QR RD RA AD
edns 0
eflags DO
payload 4096
;QUESTION
test.knot-resolver.cz. IN DS
;ANSWER
test.knot-resolver.cz. 1800 IN DS 0 8 2 0000000000000baff1ed10ca1beefc0111ded1cedeadadd011c0feecaca0b011
test.knot-resolver.cz. 1800 IN DS 60526 13 2 9E526A3D1D1D3F78BD11ABDCE8DE5A6CF9212CD2575D28FC10EBC046 F001AEA8
test.knot-resolver.cz. 1800 IN RRSIG DS 13 3 1800 20190227092958 20190213075958 23292 knot-resolver.cz. 9yBl60FpEgGt5R5JAKWWK1n1AGLSoeQDsX3nfLz/gQtljhKgnKgkM10T MZKIPUUY9jczh89ChoqCYFr+4MzURw==
;AUTHORITY
;ADDITIONAL""")
# DS signature with tag 0 is left dangling in the zone to trigger a bug in building of
# chain of trust in older versions of Unbound
def test_supports_DS(forwarder, tcp):
return answer_checker.send_and_check(DS_QUERY,
DS_ANSWER,
forwarder,
HEADER | {"answerrrsigs"},
tcp=tcp)
NSEC_NEGATIVE_QUERY = answer_checker.make_random_case_query("nonexistent.nsec.test.knot-resolver.cz", "A", want_dnssec=True)
NSEC_NEGATIVE_ANSWER = dns.message.from_text(""";
opcode QUERY
rcode NXDOMAIN
flags QR RD RA AD
edns 0
eflags DO
payload 4096
;QUESTION
nonexistent.nsec.test.knot-resolver.cz. IN A
;ANSWER
;AUTHORITY
nsec.test.knot-resolver.cz. 3600 IN SOA knot-s-01.nic.cz. hostmaster.nic.cz. 2018042476 10800 3600 1209600 7200
nsec.test.knot-resolver.cz. 7200 IN NSEC unsigned.nsec.test.knot-resolver.cz. A NS SOA RRSIG NSEC DNSKEY CDS CDNSKEY
nsec.test.knot-resolver.cz. 3600 IN RRSIG SOA 13 4 3600 20370126162631 20190212145631 25023 nsec.test.knot-resolver.cz. Nwpe3F7+fiCeGgyP+0WgyGYC5N8MY4Pc bipFKsHBxgkwkdEyV395VvYCbhz5YuJb SyXsv9tXOVN+XSb5Sac8uQ==
nsec.test.knot-resolver.cz. 7200 IN RRSIG NSEC 13 4 7200 20370126162631 20190212145631 25023 nsec.test.knot-resolver.cz. ugmndbqwWjM5Zc/ZCEt/FeGSuw70sasu jylUhFljwdalhRNNlLNcQY9Tlr8A8Vnc YJCwI36LrwAp9m/W2ysZxQ==
;ADDITIONAL""")
def test_negative_nsec_answers(forwarder, tcp):
return answer_checker.send_and_check(NSEC_NEGATIVE_QUERY,
NSEC_NEGATIVE_ANSWER,
forwarder,
HEADER | {"authority"}, tcp=tcp)
NSEC3_NEGATIVE_QUERY = answer_checker.make_random_case_query("nonexistent.nsec3.test.knot-resolver.cz", "A", want_dnssec=True)
NSEC3_NEGATIVE_ANSWER = dns.message.from_text(""";
opcode QUERY
rcode NXDOMAIN
flags QR RD RA AD
edns 0
eflags DO
payload 4096
;QUESTION
nonexistent.nsec3.test.knot-resolver.cz. IN A
;ANSWER
;AUTHORITY
nsec3.test.knot-resolver.cz. 3600 IN SOA knot-s-01.nic.cz. hostmaster.nic.cz. 2018042476 10800 3600 1209600 7200
mn71vn3kbnse5hkqqs7kc062nf9jna3u.nsec3.test.knot-resolver.cz. 7200 IN NSEC3 1 0 10 9b987e46196cd181 6j18444t948b3ij9dlakm317q132ccii A NS SOA RRSIG DNSKEY NSEC3PARAM CDS CDNSKEY
af4kdouqgq3k3j0boq2bqlf4hi14c8qa.nsec3.test.knot-resolver.cz. 7200 IN NSEC3 1 0 10 9b987e46196cd181 druje9e1goigmosgk4m6iv7gbktg143a CNAME RRSIG
nsec3.test.knot-resolver.cz. 3600 IN RRSIG SOA 13 4 3600 20370126162631 20190212145631 52462 nsec3.test.knot-resolver.cz. 9Ne2jUhyILPa5r0lAUdqkHtbkggSiRbt yqRaH3ENGlYcIIA3Rib6U2js+wEQpYVs SdQPcuzwAkYGmsqroSnDIw==
mn71vn3kbnse5hkqqs7kc062nf9jna3u.nsec3.test.knot-resolver.cz. 7200 IN RRSIG NSEC3 13 5 7200 20370126162631 20190212145631 52462 nsec3.test.knot-resolver.cz. r7DbpNp4KXvV2a4TDoV3whUPpI6mmjKA bk5TQZnA/z1AwFMtzJDQJ7b9RCv2C9Es CbwKEa+/bLNH4N2Ed8RVPQ==
af4kdouqgq3k3j0boq2bqlf4hi14c8qa.nsec3.test.knot-resolver.cz. 7200 IN RRSIG NSEC3 13 5 7200 20370119135450 20190205122450 52462 nsec3.test.knot-resolver.cz. NXEa3JxBpufEqBDEUNQhH2kQpPQbXYDX /b1soMKA4CwSaRVgiMkw41vevUZ/XtPj SFl0D6ov88QEDLG2RzYy9g==
;ADDITIONAL""")
def test_negative_nsec3_answers(forwarder, tcp):
return answer_checker.send_and_check(NSEC3_NEGATIVE_QUERY,
NSEC3_NEGATIVE_ANSWER,
forwarder,
HEADER | {"authority"}, tcp=tcp)
UNKNOWN_TYPE_QUERY = answer_checker.make_random_case_query("weird-type.test.knot-resolver.cz", "TYPE20025", want_dnssec=True)
UNKNOWN_TYPE_ANSWER = dns.message.from_text(r""";
opcode QUERY
rcode NOERROR
flags QR RD RA AD
edns 0
eflags DO
payload 512
;QUESTION
weird-type.test.knot-resolver.cz. IN TYPE20025
;ANSWER
weird-type.test.knot-resolver.cz. 3506 IN TYPE20025 \# 4 deadbeef
weird-type.test.knot-resolver.cz. 3506 IN RRSIG TYPE20025 13 4 3600 20370119135450 20190205122450 58 test.knot-resolver.cz. eHON73HpRyhIalC4xHwu/zWcZWuyVC3T fpBaOQU1MabzitXBUy4dKoAMVXhcpj62 Pqiz2FxMMg6nXRQJupQDAA==
;AUTHORITY
;ADDITIONAL
""")
def test_unknown_rrtype(forwarder, tcp):
return answer_checker.send_and_check(UNKNOWN_TYPE_QUERY,
UNKNOWN_TYPE_ANSWER,
forwarder,
ALL - {"additional", "authority"},
tcp=tcp)
NONEXISTENT_DS_DELEGATION_NSEC_QUERY = answer_checker.make_random_case_query("unsigned.nsec.test.knot-resolver.cz", "DS", want_dnssec=True)
NONEXISTENT_DS_DELEGATION_NSEC_ANSWER = dns.message.from_text(""";
opcode QUERY
rcode NOERROR
flags QR RD RA AD
edns 0
eflags DO
payload 4096
;QUESTION
unsigned.nsec.test.knot-resolver.cz. IN DS
;ANSWER
;AUTHORITY
nsec.test.knot-resolver.cz. 3600 IN SOA knot-s-01.nic.cz. hostmaster.nic.cz. 2018042476 10800 3600 1209600 7200
unsigned.nsec.test.knot-resolver.cz. 7200 IN NSEC *.wild.nsec.test.knot-resolver.cz. NS RRSIG NSEC
nsec.test.knot-resolver.cz. 3600 IN RRSIG SOA 13 4 3600 20370126162631 20190212145631 25023 nsec.test.knot-resolver.cz. Nwpe3F7+fiCeGgyP+0WgyGYC5N8MY4Pc bipFKsHBxgkwkdEyV395VvYCbhz5YuJb SyXsv9tXOVN+XSb5Sac8uQ==
unsigned.nsec.test.knot-resolver.cz. 7200 IN RRSIG NSEC 13 5 7200 20370119135450 20190205122450 25023 nsec.test.knot-resolver.cz. SWIzKCXTRQMz1n7myOioFrfbTljjR4jG NVRV43NWKtXQ6ftIR68wSVZ+6xsATHeG GXYYJxqaoviY+mLrJdJa/g==
;ADDITIONAL""")
def test_delegation_from_nsec_to_unsigned_zone(forwarder, tcp):
return answer_checker.send_and_check(NONEXISTENT_DS_DELEGATION_NSEC_QUERY,
NONEXISTENT_DS_DELEGATION_NSEC_ANSWER,
forwarder,
ALL, tcp=tcp)
NONEXISTENT_DS_DELEGATION_NSEC3_QUERY = answer_checker.make_random_case_query("unsigned.nsec3.test.knot-resolver.cz", "DS", want_dnssec=True)
NONEXISTENT_DS_DELEGATION_NSEC3_ANSWER = dns.message.from_text(""";
opcode QUERY
rcode NOERROR
flags QR RD RA AD
edns 0
eflags DO
payload 4096
;QUESTION
unsigned.nsec3.test.knot-resolver.cz. IN DS
;ANSWER
;AUTHORITY
nsec3.test.knot-resolver.cz. 3600 IN SOA knot-s-01.nic.cz. hostmaster.nic.cz. 2018042476 10800 3600 1209600 7200
gk65ucsupb4m139fn027ci6pl01fk5gs.nsec3.test.knot-resolver.cz. 7200 IN NSEC3 1 0 10 9b987e46196cd181 mn71vn3kbnse5hkqqs7kc062nf9jna3u NS
nsec3.test.knot-resolver.cz. 3600 IN RRSIG SOA 13 4 3600 20370126162631 20190212145631 52462 nsec3.test.knot-resolver.cz. 9Ne2jUhyILPa5r0lAUdqkHtbkggSiRbt yqRaH3ENGlYcIIA3Rib6U2js+wEQpYVs SdQPcuzwAkYGmsqroSnDIw==
gk65ucsupb4m139fn027ci6pl01fk5gs.nsec3.test.knot-resolver.cz. 7200 IN RRSIG NSEC3 13 5 7200 20370119135450 20190205122450 52462 nsec3.test.knot-resolver.cz. WjWrhgoRmw8+xMuzcGLqPx76xEvPTQjN OaJOEXzK7409Jc7tVHgpolbNxsDdI0u+ h6s5Du78yx4z0QOCq2VEzg==
;ADDITIONAL""")
def test_delegation_from_nsec3_to_unsigned_zone(forwarder, tcp):
return answer_checker.send_and_check(NONEXISTENT_DS_DELEGATION_NSEC3_QUERY,
NONEXISTENT_DS_DELEGATION_NSEC3_ANSWER,
forwarder,
ALL, tcp=tcp)
NONEXISTENT_DELEGATION_FROM_NSEC_QUERY = answer_checker.make_random_case_query("nonexistent.nsec.test.knot-resolver.cz", "DS", want_dnssec=True)
NONEXISTENT_DELEGATION_FROM_NSEC_ANSWER = dns.message.from_text(""";
opcode QUERY
rcode NXDOMAIN
flags QR RD RA AD
edns 0
eflags DO
payload 4096
;QUESTION
nonexistent.nsec.test.knot-resolver.cz. IN DS
;ANSWER
;AUTHORITY
nsec.test.knot-resolver.cz. 3600 IN SOA knot-s-01.nic.cz. hostmaster.nic.cz. 2018042476 10800 3600 1209600 7200
nsec.test.knot-resolver.cz. 7200 IN NSEC unsigned.nsec.test.knot-resolver.cz. A NS SOA RRSIG NSEC DNSKEY CDS CDNSKEY
nsec.test.knot-resolver.cz. 3600 IN RRSIG SOA 13 4 3600 20370126162631 20190212145631 25023 nsec.test.knot-resolver.cz. Nwpe3F7+fiCeGgyP+0WgyGYC5N8MY4Pc bipFKsHBxgkwkdEyV395VvYCbhz5YuJb SyXsv9tXOVN+XSb5Sac8uQ==
nsec.test.knot-resolver.cz. 7200 IN RRSIG NSEC 13 4 7200 20370126162631 20190212145631 25023 nsec.test.knot-resolver.cz. ugmndbqwWjM5Zc/ZCEt/FeGSuw70sasu jylUhFljwdalhRNNlLNcQY9Tlr8A8Vnc YJCwI36LrwAp9m/W2ysZxQ==
;ADDITIONAL""")
def test_nonexistent_delegation_from_nsec(forwarder, tcp):
return answer_checker.send_and_check(NONEXISTENT_DELEGATION_FROM_NSEC_QUERY,
NONEXISTENT_DELEGATION_FROM_NSEC_ANSWER,
forwarder,
ALL, tcp=tcp,
unset_flags=[dns.flags.AA])
# Some resolvers treat generated proof of non-existence as authoritative data
# and set AA flag in this kind of answer, we have to normalize this by unsetting
# it.
NONEXISTENT_DELEGATION_FROM_NSEC3_QUERY = answer_checker.make_random_case_query("nonexistent.nsec3.test.knot-resolver.cz", "DS", want_dnssec=True)
NONEXISTENT_DELEGATION_FROM_NSEC3_ANSWER = dns.message.from_text(""";
opcode QUERY
rcode NXDOMAIN
flags QR RD RA AD
edns 0
eflags DO
payload 4096
;QUESTION
nonexistent.nsec3.test.knot-resolver.cz. IN DS
;ANSWER
;AUTHORITY
nsec3.test.knot-resolver.cz. 3600 IN SOA knot-s-01.nic.cz. hostmaster.nic.cz. 2018042476 10800 3600 1209600 7200
mn71vn3kbnse5hkqqs7kc062nf9jna3u.nsec3.test.knot-resolver.cz. 7200 IN NSEC3 1 0 10 9b987e46196cd181 6j18444t948b3ij9dlakm317q132ccii A NS SOA RRSIG DNSKEY NSEC3PARAM CDS CDNSKEY
af4kdouqgq3k3j0boq2bqlf4hi14c8qa.nsec3.test.knot-resolver.cz. 7200 IN NSEC3 1 0 10 9b987e46196cd181 druje9e1goigmosgk4m6iv7gbktg143a CNAME RRSIG
nsec3.test.knot-resolver.cz. 3600 IN RRSIG SOA 13 4 3600 20370126162631 20190212145631 52462 nsec3.test.knot-resolver.cz. 9Ne2jUhyILPa5r0lAUdqkHtbkggSiRbt yqRaH3ENGlYcIIA3Rib6U2js+wEQpYVs SdQPcuzwAkYGmsqroSnDIw==
mn71vn3kbnse5hkqqs7kc062nf9jna3u.nsec3.test.knot-resolver.cz. 7200 IN RRSIG NSEC3 13 5 7200 20370126162631 20190212145631 52462 nsec3.test.knot-resolver.cz. r7DbpNp4KXvV2a4TDoV3whUPpI6mmjKA bk5TQZnA/z1AwFMtzJDQJ7b9RCv2C9Es CbwKEa+/bLNH4N2Ed8RVPQ==
af4kdouqgq3k3j0boq2bqlf4hi14c8qa.nsec3.test.knot-resolver.cz. 7200 IN RRSIG NSEC3 13 5 7200 20370119135450 20190205122450 52462 nsec3.test.knot-resolver.cz. NXEa3JxBpufEqBDEUNQhH2kQpPQbXYDX /b1soMKA4CwSaRVgiMkw41vevUZ/XtPj SFl0D6ov88QEDLG2RzYy9g==
;ADDITIONAL""")
def test_nonexistent_delegation_from_nsec3(forwarder, tcp):
return answer_checker.send_and_check(NONEXISTENT_DELEGATION_FROM_NSEC3_QUERY,
NONEXISTENT_DELEGATION_FROM_NSEC3_ANSWER,
forwarder,
ALL, tcp=tcp,
unset_flags=[dns.flags.AA])
NONEXISTENT_TYPE_NSEC3_QUERY = answer_checker.make_random_case_query("nsec3.test.knot-resolver.cz", "TYPE65281", want_dnssec=True)
NONEXISTENT_TYPE_NSEC3_ANSWER = dns.message.from_text(""";
opcode QUERY
rcode NOERROR
flags QR RD RA AD
edns 0
eflags DO
payload 4096
;QUESTION
nsec3.test.knot-resolver.cz. IN TYPE65281
;ANSWER
;AUTHORITY
nsec3.test.knot-resolver.cz. 3600 IN SOA knot-s-01.nic.cz. hostmaster.nic.cz. 2018042476 10800 3600 1209600 7200
mn71vn3kbnse5hkqqs7kc062nf9jna3u.nsec3.test.knot-resolver.cz. 7200 IN NSEC3 1 0 10 9b987e46196cd181 6j18444t948b3ij9dlakm317q132ccii A NS SOA RRSIG DNSKEY NSEC3PARAM CDS CDNSKEY
nsec3.test.knot-resolver.cz. 3600 IN RRSIG SOA 13 4 3600 20370126162631 20190212145631 52462 nsec3.test.knot-resolver.cz. 9Ne2jUhyILPa5r0lAUdqkHtbkggSiRbt yqRaH3ENGlYcIIA3Rib6U2js+wEQpYVs SdQPcuzwAkYGmsqroSnDIw==
mn71vn3kbnse5hkqqs7kc062nf9jna3u.nsec3.test.knot-resolver.cz. 7200 IN RRSIG NSEC3 13 5 7200 20370126162631 20190212145631 52462 nsec3.test.knot-resolver.cz. r7DbpNp4KXvV2a4TDoV3whUPpI6mmjKA bk5TQZnA/z1AwFMtzJDQJ7b9RCv2C9Es CbwKEa+/bLNH4N2Ed8RVPQ==
;ADDITIONAL""")
def test_nonexistent_type_nsec3(forwarder, tcp):
return answer_checker.send_and_check(NONEXISTENT_TYPE_NSEC3_QUERY,
NONEXISTENT_TYPE_NSEC3_ANSWER,
forwarder,
ALL, tcp=tcp)
NONEXISTENT_TYPE_NSEC_QUERY = answer_checker.make_random_case_query("nsec.test.knot-resolver.cz", "TYPE65281", want_dnssec=True)
NONEXISTENT_TYPE_NSEC_ANSWER = dns.message.from_text(""";
opcode QUERY
rcode NOERROR
flags QR RD RA AD
edns 0
eflags DO
payload 4096
;QUESTION
nsec.test.knot-resolver.cz. IN TYPE65281
;ANSWER
;AUTHORITY
nsec.test.knot-resolver.cz. 3600 IN SOA knot-s-01.nic.cz. hostmaster.nic.cz. 2018042476 10800 3600 1209600 7200
nsec.test.knot-resolver.cz. 7200 IN NSEC unsigned.nsec.test.knot-resolver.cz. A NS SOA RRSIG NSEC DNSKEY CDS CDNSKEY
nsec.test.knot-resolver.cz. 3600 IN RRSIG SOA 13 4 3600 20370126162631 20190212145631 25023 nsec.test.knot-resolver.cz. Nwpe3F7+fiCeGgyP+0WgyGYC5N8MY4Pc bipFKsHBxgkwkdEyV395VvYCbhz5YuJb SyXsv9tXOVN+XSb5Sac8uQ==
nsec.test.knot-resolver.cz. 7200 IN RRSIG NSEC 13 4 7200 20370126162631 20190212145631 25023 nsec.test.knot-resolver.cz. ugmndbqwWjM5Zc/ZCEt/FeGSuw70sasu jylUhFljwdalhRNNlLNcQY9Tlr8A8Vnc YJCwI36LrwAp9m/W2ysZxQ==
;ADDITIONAL""")
def test_nonexistent_type_nsec(forwarder, tcp):
return answer_checker.send_and_check(NONEXISTENT_TYPE_NSEC_QUERY,
NONEXISTENT_TYPE_NSEC_ANSWER,
forwarder,
ALL, tcp=tcp)
"""Simple answer generator using local forwarder"""
# pylint: disable=C0301,C0111,C0103
# flake8: noqa
import ipaddress
import answer_checker
d = {"SIMPLE_ANSWER" : answer_checker.make_random_case_query("good-a.test.knot-resolver.cz", "A"),
"EDNS_ANSWER" : answer_checker.make_random_case_query("good-a.test.knot-resolver.cz", "A", use_edns=0),
"DO_ANSWER" : answer_checker.make_random_case_query("good-a.test.knot-resolver.cz", "A", want_dnssec=True),
"CD_ANSWER" : answer_checker.make_random_case_query("good-a.test.knot-resolver.cz", "A", want_dnssec=True),
"RRSIG_ANSWER" : answer_checker.make_random_case_query("good-a.test.knot-resolver.cz", "A", want_dnssec=True),
"DNSKEY_ANSWER" : answer_checker.make_random_case_query("test.knot-resolver.cz", "DNSKEY", want_dnssec=True),
"DS_ANSWER" : answer_checker.make_random_case_query("cz", "DS", want_dnssec=True),
"NSEC_NEGATIVE_ANSWER" : answer_checker.make_random_case_query("nonexistent.nsec.test.knot-resolver.cz", "A", want_dnssec=True),
"NSEC3_NEGATIVE_ANSWER" : answer_checker.make_random_case_query("nonexistent.nsec3.test.knot-resolver.cz", "A", want_dnssec=True),
"UNKNOWN_TYPE_ANSWER" : answer_checker.make_random_case_query("weird-type.test.knot-resolver.cz", "TYPE20025"),
"NONEXISTENT_DS_DELEGATION_NSEC_ANSWER" : answer_checker.make_random_case_query("unsigned.nsec.test.knot-resolver.cz", "DS", want_dnssec=True),
"NONEXISTENT_DS_DELEGATION_NSEC3_ANSWER" : answer_checker.make_random_case_query("unsigned.nsec3.test.knot-resolver.cz", "DS", want_dnssec=True),
"NONEXISTENT_DELEGATION_FROM_NSEC_ANSWER" : answer_checker.make_random_case_query("nonexistent.nsec.test.knot-resolver.cz", "DS", want_dnssec=True),
"NONEXISTENT_DELEGATION_FROM_NSEC3_ANSWER" : answer_checker.make_random_case_query("nonexistent.nsec3.test.knot-resolver.cz", "DS", want_dnssec=True),
"NONEXISTENT_TYPE_NSEC3_ANSWER" : answer_checker.make_random_case_query("nsec3.test.knot-resolver.cz", "TYPE65281", want_dnssec=True),
"NONEXISTENT_TYPE_NSEC_ANSWER" : answer_checker.make_random_case_query("nsec.test.knot-resolver.cz", "TYPE65281", want_dnssec=True)}
for k, v in d.items():
print('%s = dns.message.from_text("""%s""")\n' % (k, answer_checker.string_answer(v, ipaddress.IPv4Address("127.0.0.1"))))
"""Test suite to determine conditions of current network in regards to DNS(SEC) traffic.
Invoke with `python3 -m pytest network_check.py`. """
# pylint: disable=C0301,C0111
# flake8: noqa
import ipaddress
import socket
import pytest
import dns.message
import answer_checker
ALL = {"opcode", "qtype", "qname", "flags", "rcode", "answer", "authority", "additional"}
VERSION_QUERY = dns.message.make_query("_version.test.knot-resolver.cz", "TXT")
# dnspython's `makequery` function sets RD bit in the messsages.
# This is undesirable for query to authoritative servers since they
# may or may not copy RD flag to the response.
answer_checker.unset_flag(VERSION_QUERY, dns.flags.RD)
VERSION_ANSWER = dns.message.from_text(""";
opcode QUERY
rcode NOERROR
flags QR AA
;QUESTION
_version.test.knot-resolver.cz. IN TXT
;ANSWER
_version.test.knot-resolver.cz. 3600 IN TXT "1"
;AUTHORITY
;ADDITIONAL
""")
def test_zone_version(server):
return answer_checker.send_and_check(VERSION_QUERY,
VERSION_ANSWER,
server,
ALL - {"authority"},
unset_flags=[dns.flags.AD])
# Since AD bit may or may not be set by authoritative server
# (see https://tools.ietf.org/html/rfc4035#section-3.1.6) we normalise the answers
# by unsetting the AD bit.
QUERY = answer_checker.make_random_case_query("test.knot-resolver.cz", "A", want_dnssec=True, payload=4096)
answer_checker.unset_flag(QUERY, dns.flags.RD)
ANSWER = dns.message.from_text(""";
opcode QUERY
rcode NOERROR
flags QR AA
edns 0
eflags DO
payload 4096
;QUESTION
test.knot-resolver.cz. IN A
;ANSWER
test.knot-resolver.cz. 3600 IN A 217.31.192.130
test.knot-resolver.cz. 3600 IN RRSIG A 13 3 3600 20370119135450 20190205122450 58 test.knot-resolver.cz. G9DTWRE8QKe0MKyHn+PZcgf+ggIR9Sk+ E9qtd8IlpEt3+y28qPp0lgDQojpQL9sv lqgC0g5e2ZIsZWg1T5ICNQ==
;AUTHORITY
;ADDITIONAL
""")
def test_remote_udp_53(server):
return answer_checker.send_and_check(QUERY,
ANSWER,
server,
ALL - {"authority"},
unset_flags=[dns.flags.AD])
def test_remote_tcp_53(server):
return answer_checker.send_and_check(QUERY,
ANSWER,
server,
ALL - {"authority"},
tcp=True,
unset_flags=[dns.flags.AD])
@pytest.mark.parametrize("non_existent_server", [ipaddress.ip_address("192.0.2.1"), ipaddress.ip_address("2001:db::1")])
def test_nonexistent_addres(non_existent_server):
try:
answer_checker.get_answer(QUERY, non_existent_server, timeout=1)
except socket.timeout:
return True
return False
LONG_QUERY = answer_checker.make_random_case_query("test.knot-resolver.cz", "TXT", use_edns=0, payload=4096, want_dnssec=True)
answer_checker.unset_flag(LONG_QUERY, dns.flags.RD)
LONG_ANSWER = dns.message.from_text(""";
opcode QUERY
rcode NOERROR
flags QR AA
edns 0
payload 4096
;QUESTION
test.knot-resolver.cz. IN TXT
;ANSWER
test.knot-resolver.cz. 3600 IN TXT "Davku ve me o pln uvitani stari s tvuj neda? Tik kufr u traslo uf tabuli znaky mesity bimbal vyrvat vydelal pobezi zahajil a tajnosti. By 77 dusic ach prazdna kasari k zac platne potiz. O hon 30 otazek jiz post i rad zeleninu vyhrknu bhutanu nezdalo. I tr" "ida, ptat lzes vypadla newton, utal hm je bas samymi u sobe ukaz kazne medove u placeny ke ah jo zpola o ocich. Sul trimesicni kontrolujte v predstirana po nej mit za devetadevadesati eh mi lezu slava vuz v me smery. Tri akt dlazbu dal lamu, kavkam on zas" "luhy, sad muzikant vek. Paty neme radili trunil docist tech obou zari. My ze 11 tlusti jsemvidel. Podivej i prs kralik at ted o vynahradit ti si ma charakterni nehybny tulak poskytl rad! Muz ztuhla, ci ah propatral misce! Slz eh at? Zenou dilo intuici. Le" "su pud povesti, i jamou tej. V az vdanou zrejmo za ctil 81 kolika u ustrnule malicherny holemi nekradla jinych morfia: pocasi poplaseny zpovidat az dne vyjimecna zidovskem stejny sluzek tajemny hlidat u ruzovym pry jestli vyslychalo zem nerozbrecte farare" " strhla v mem tabule pije a odkraglujem otisky nebot. Ex povidani pusta duse eh zvalel, o pak ma bryle luzka: u posluhovi neudela 30 ze ctverce brovninku. 411 se vi rypaje nova to per ba zchoulostivil remenem. Vaze to lujzou styky, te? Ne me by pazeni tro" "ubil i srovnala dejoveho a prvnich me o zime hlasy nevsimnou. Jejim zajiste za porotam valka sekt. Oni vuli co ryb pruvod. Ode jehoz od lasce ve slouzilo co jektal hryzal lamparny. Zvlastnich ne vybil brejlil uz ah? Husa trit mu straze s zivaje abys chute" " pane ci nepochybujte ubiha k babe ach okoli zle okna deji dverim! Vymyslim do falesne. Pokaceneho leti oka krk. Nohy stejny u vykaslu, rinkotem ondyno, laureat z zabije."
test.knot-resolver.cz. 3600 IN RRSIG TXT 13 3 3600 20370119135450 20190205122450 58 test.knot-resolver.cz. YYzbiOgNyIe2YcUHUbA8LNrqUYPSHEUA U7tAOLJx54kSlTMYDB5VrnqsAIgp2PtV C1gELBVK4Xtwxrx3ajeLhA==
;AUTHORITY
;ADDITIONAL
""")
@pytest.mark.skip(reason="This doesn't work since a.ns.nic.cz (and all CZ.NIC's nameservers in that "
"matter) have internal EDNS buffer size of 1232 bytes. The answer is "
"unfortunately bigger. :(")
def test_udp_fragmentation(server):
return answer_checker.send_and_check(LONG_QUERY,
LONG_ANSWER,
server,
ALL - {"authority"},
unset_flags=[dns.flags.AD])
QUERY_WITH_SMALL_PAYLOAD = answer_checker.make_random_case_query("test.knot-resolver.cz", "TXT", use_edns=0, payload=1280, want_dnssec=True)
answer_checker.unset_flag(QUERY_WITH_SMALL_PAYLOAD, dns.flags.RD)
TRUNCATED_ANSWER = dns.message.from_text(""";
opcode QUERY
rcode NOERROR
flags QR AA TC
edns 0
payload 4096
;QUESTION
test.knot-resolver.cz. IN TXT
;ANSWER
;AUTHORITY
;ADDITIONAL
""")
def test_udp_fragmentation_truncated(server):
return answer_checker.send_and_check(QUERY_WITH_SMALL_PAYLOAD,
TRUNCATED_ANSWER,
server,
ALL - {"authority"},
unset_flags=[dns.flags.AD])
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment