Commit e0520387 authored by Petr Špaček's avatar Petr Špaček

Merge branch 'answer_checker.py' into 'master'

tools: network and forwarder checks

See merge request !152
parents 9e31e7b6 bbf10b35
Pipeline #46284 failed with stage
in 2 minutes and 22 seconds
...@@ -37,13 +37,14 @@ def recv_n_bytes_from_tcp(stream: socket.socket, n: int, deadline: float) -> byt ...@@ -37,13 +37,14 @@ def recv_n_bytes_from_tcp(stream: socket.socket, n: int, deadline: float) -> byt
return data return data
def recvfrom_blob(sock: socket.socket) -> Tuple[bytes, str]: def recvfrom_blob(sock: socket.socket,
timeout: int = SOCKET_OPERATION_TIMEOUT) -> Tuple[bytes, str]:
""" """
Receive DNS message from TCP/UDP socket. Receive DNS message from TCP/UDP socket.
""" """
# deadline is always time.monotonic # deadline is always time.monotonic
deadline = time.monotonic() + SOCKET_OPERATION_TIMEOUT deadline = time.monotonic() + timeout
while True: while True:
try: try:
...@@ -67,8 +68,9 @@ def recvfrom_blob(sock: socket.socket) -> Tuple[bytes, str]: ...@@ -67,8 +68,9 @@ def recvfrom_blob(sock: socket.socket) -> Tuple[bytes, str]:
raise raise
def recvfrom_msg(sock: socket.socket) -> Tuple[dns.message.Message, str]: def recvfrom_msg(sock: socket.socket,
data, addr = recvfrom_blob(sock) timeout: int = SOCKET_OPERATION_TIMEOUT) -> Tuple[dns.message.Message, str]:
data, addr = recvfrom_blob(sock, timeout=timeout)
msg = dns.message.from_wire(data, one_rr_per_rrset=True) msg = dns.message.from_wire(data, one_rr_per_rrset=True)
return msg, addr return msg, addr
...@@ -118,11 +120,12 @@ def send_query(sock: socket.socket, query: Union[dns.message.Message, bytes]) -> ...@@ -118,11 +120,12 @@ def send_query(sock: socket.socket, query: Union[dns.message.Message, bytes]) ->
raise raise
def get_answer(sock: socket.socket) -> bytes: def get_answer(sock: socket.socket, timeout: int = SOCKET_OPERATION_TIMEOUT) -> bytes:
""" Compatibility function """ """ Compatibility function """
answer, _ = recvfrom_blob(sock) answer, _ = recvfrom_blob(sock, timeout=timeout)
return answer return answer
def get_dns_message(sock: socket.socket) -> dns.message.Message: def get_dns_message(sock: socket.socket,
return dns.message.from_wire(get_answer(sock)) timeout: int = SOCKET_OPERATION_TIMEOUT) -> dns.message.Message:
return dns.message.from_wire(get_answer(sock, timeout=timeout))
"""Functions for sending DNS queries and checking recieved answers checking"""
# pylint: disable=C0301
# flake8: noqa
from ipaddress import IPv4Address, IPv6Address
import random
from typing import Iterable, Optional, Set, Union
import dns.message
import dns.flags
import pydnstest.matchpart
import pydnstest.mock_client
def unset_flag(message: dns.message.Message, flag: int) -> dns.message.Message:
"""Unsets given flag in given DNS message."""
assert flag in dns.flags._by_value, "This is not a DNS flag" # pylint: disable=W0212
message.flags &= ~flag
return message
def send_and_check(question: Union[dns.message.Message, bytes], # pylint: disable=R0913
expected: dns.message.Message,
server: Union[IPv4Address, IPv6Address],
match_fields: Set[str],
port: int = 53,
tcp: bool = False,
timeout: int = pydnstest.mock_client.SOCKET_OPERATION_TIMEOUT,
unset_flags: Iterable[int] = tuple()) -> bool:
"""Checks if DNS answer recieved for a question from a server matches expected one in specified
field. See pydnstest.matchpart for more information on match fields
Returns True on success, raises an exceptions on failure.
"""
print("Sending query:\n%s\n" % question)
answer = get_answer(question, server, port, tcp, timeout=timeout)
for flag in unset_flags:
answer = unset_flag(answer, flag)
print("Got answer:\n%s\n" % answer)
print("Matching:\n%s\n%s\n" % (match_fields, expected))
for field in match_fields:
pydnstest.matchpart.match_part(expected, answer, field)
return True
def get_answer(question: Union[dns.message.Message, bytes],
server: Union[IPv4Address, IPv6Address],
port: int = 53,
tcp: bool = False,
timeout: int = pydnstest.mock_client.SOCKET_OPERATION_TIMEOUT) -> dns.message.Message:
"""Get an DNS message with answer with specific query"""
sock = pydnstest.mock_client.setup_socket(str(server), port, tcp=tcp)
pydnstest.mock_client.send_query(sock, question)
return pydnstest.mock_client.get_dns_message(sock, timeout=timeout)
def string_answer(question: Union[dns.message.Message, bytes],
server: Union[IPv4Address, IPv6Address],
port: int = 53,
tcp: bool = False) -> str:
"""Prints answer of a server. Good for generating tests."""
return get_answer(question, server, port, tcp).to_text()
def randomize_case(label: bytes) -> bytes:
"""Randomize case in a DNS name label"""
output = []
for byte in label:
if random.randint(0, 1):
output.append(bytes([byte]).swapcase())
else:
output.append(bytes([byte]))
return b''.join(output)
def make_random_case_query(name: str, *args, **kwargs) -> dns.message.Message:
"""Proxy for dns.message.make_query with rANdoM-cASe"""
query = dns.message.make_query(name, *args, **kwargs)
for label in query.question[0].name.labels:
label = randomize_case(label)
return query
import ipaddress
# These are IPs of a.ns.nic.cz
AUTHORITATIVE_SERVERS = [ipaddress.IPv4Address("194.0.12.1"),
ipaddress.IPv6Address("2001:678:f::1")]
def pytest_addoption(parser):
parser.addoption("--forwarder", action="append", help="IP of forwarder to test")
def pytest_generate_tests(metafunc):
if 'forwarder' in metafunc.fixturenames:
forwarder = metafunc.config.option.forwarder
metafunc.parametrize("forwarder", [ipaddress.ip_address(f) for f in forwarder], ids=str)
if 'tcp' in metafunc.fixturenames:
metafunc.parametrize("tcp", [False, True], ids=lambda x: "TCP" if x else "UDP")
if 'server' in metafunc.fixturenames:
metafunc.parametrize("server", AUTHORITATIVE_SERVERS, ids=str)
This diff is collapsed.
"""Simple answer generator using local forwarder"""
# pylint: disable=C0301,C0111,C0103
# flake8: noqa
import ipaddress
import answer_checker
d = {"SIMPLE_ANSWER" : answer_checker.make_random_case_query("good-a.test.knot-resolver.cz", "A"),
"EDNS_ANSWER" : answer_checker.make_random_case_query("good-a.test.knot-resolver.cz", "A", use_edns=0),
"DO_ANSWER" : answer_checker.make_random_case_query("good-a.test.knot-resolver.cz", "A", want_dnssec=True),
"CD_ANSWER" : answer_checker.make_random_case_query("good-a.test.knot-resolver.cz", "A", want_dnssec=True),
"RRSIG_ANSWER" : answer_checker.make_random_case_query("good-a.test.knot-resolver.cz", "A", want_dnssec=True),
"DNSKEY_ANSWER" : answer_checker.make_random_case_query("test.knot-resolver.cz", "DNSKEY", want_dnssec=True),
"DS_ANSWER" : answer_checker.make_random_case_query("cz", "DS", want_dnssec=True),
"NSEC_NEGATIVE_ANSWER" : answer_checker.make_random_case_query("nonexistent.nsec.test.knot-resolver.cz", "A", want_dnssec=True),
"NSEC3_NEGATIVE_ANSWER" : answer_checker.make_random_case_query("nonexistent.nsec3.test.knot-resolver.cz", "A", want_dnssec=True),
"UNKNOWN_TYPE_ANSWER" : answer_checker.make_random_case_query("weird-type.test.knot-resolver.cz", "TYPE20025"),
"NONEXISTENT_DS_DELEGATION_NSEC_ANSWER" : answer_checker.make_random_case_query("unsigned.nsec.test.knot-resolver.cz", "DS", want_dnssec=True),
"NONEXISTENT_DS_DELEGATION_NSEC3_ANSWER" : answer_checker.make_random_case_query("unsigned.nsec3.test.knot-resolver.cz", "DS", want_dnssec=True),
"NONEXISTENT_DELEGATION_FROM_NSEC_ANSWER" : answer_checker.make_random_case_query("nonexistent.nsec.test.knot-resolver.cz", "DS", want_dnssec=True),
"NONEXISTENT_DELEGATION_FROM_NSEC3_ANSWER" : answer_checker.make_random_case_query("nonexistent.nsec3.test.knot-resolver.cz", "DS", want_dnssec=True),
"NONEXISTENT_TYPE_NSEC3_ANSWER" : answer_checker.make_random_case_query("nsec3.test.knot-resolver.cz", "TYPE65281", want_dnssec=True),
"NONEXISTENT_TYPE_NSEC_ANSWER" : answer_checker.make_random_case_query("nsec.test.knot-resolver.cz", "TYPE65281", want_dnssec=True)}
for k, v in d.items():
print('%s = dns.message.from_text("""%s""")\n' % (k, answer_checker.string_answer(v, ipaddress.IPv4Address("127.0.0.1"))))
"""Test suite to determine conditions of current network in regards to DNS(SEC) traffic.
Invoke with `python3 -m pytest network_check.py`. """
# pylint: disable=C0301,C0111
# flake8: noqa
import ipaddress
import socket
import pytest
import dns.message
import answer_checker
ALL = {"opcode", "qtype", "qname", "flags", "rcode", "answer", "authority", "additional"}
VERSION_QUERY = dns.message.make_query("_version.test.knot-resolver.cz", "TXT")
# dnspython's `makequery` function sets RD bit in the messsages.
# This is undesirable for query to authoritative servers since they
# may or may not copy RD flag to the response.
answer_checker.unset_flag(VERSION_QUERY, dns.flags.RD)
VERSION_ANSWER = dns.message.from_text(""";
opcode QUERY
rcode NOERROR
flags QR AA
;QUESTION
_version.test.knot-resolver.cz. IN TXT
;ANSWER
_version.test.knot-resolver.cz. 3600 IN TXT "1"
;AUTHORITY
;ADDITIONAL
""")
def test_zone_version(server):
return answer_checker.send_and_check(VERSION_QUERY,
VERSION_ANSWER,
server,
ALL - {"authority"},
unset_flags=[dns.flags.AD])
# Since AD bit may or may not be set by authoritative server
# (see https://tools.ietf.org/html/rfc4035#section-3.1.6) we normalise the answers
# by unsetting the AD bit.
QUERY = answer_checker.make_random_case_query("test.knot-resolver.cz", "A", want_dnssec=True, payload=4096)
answer_checker.unset_flag(QUERY, dns.flags.RD)
ANSWER = dns.message.from_text(""";
opcode QUERY
rcode NOERROR
flags QR AA
edns 0
eflags DO
payload 4096
;QUESTION
test.knot-resolver.cz. IN A
;ANSWER
test.knot-resolver.cz. 3600 IN A 217.31.192.130
test.knot-resolver.cz. 3600 IN RRSIG A 13 3 3600 20370119135450 20190205122450 58 test.knot-resolver.cz. G9DTWRE8QKe0MKyHn+PZcgf+ggIR9Sk+ E9qtd8IlpEt3+y28qPp0lgDQojpQL9sv lqgC0g5e2ZIsZWg1T5ICNQ==
;AUTHORITY
;ADDITIONAL
""")
def test_remote_udp_53(server):
return answer_checker.send_and_check(QUERY,
ANSWER,
server,
ALL - {"authority"},
unset_flags=[dns.flags.AD])
def test_remote_tcp_53(server):
return answer_checker.send_and_check(QUERY,
ANSWER,
server,
ALL - {"authority"},
tcp=True,
unset_flags=[dns.flags.AD])
@pytest.mark.parametrize("non_existent_server", [ipaddress.ip_address("192.0.2.1"), ipaddress.ip_address("2001:db::1")])
def test_nonexistent_addres(non_existent_server):
try:
answer_checker.get_answer(QUERY, non_existent_server, timeout=1)
except socket.timeout:
return True
return False
LONG_QUERY = answer_checker.make_random_case_query("test.knot-resolver.cz", "TXT", use_edns=0, payload=4096, want_dnssec=True)
answer_checker.unset_flag(LONG_QUERY, dns.flags.RD)
LONG_ANSWER = dns.message.from_text(""";
opcode QUERY
rcode NOERROR
flags QR AA
edns 0
payload 4096
;QUESTION
test.knot-resolver.cz. IN TXT
;ANSWER
test.knot-resolver.cz. 3600 IN TXT "Davku ve me o pln uvitani stari s tvuj neda? Tik kufr u traslo uf tabuli znaky mesity bimbal vyrvat vydelal pobezi zahajil a tajnosti. By 77 dusic ach prazdna kasari k zac platne potiz. O hon 30 otazek jiz post i rad zeleninu vyhrknu bhutanu nezdalo. I tr" "ida, ptat lzes vypadla newton, utal hm je bas samymi u sobe ukaz kazne medove u placeny ke ah jo zpola o ocich. Sul trimesicni kontrolujte v predstirana po nej mit za devetadevadesati eh mi lezu slava vuz v me smery. Tri akt dlazbu dal lamu, kavkam on zas" "luhy, sad muzikant vek. Paty neme radili trunil docist tech obou zari. My ze 11 tlusti jsemvidel. Podivej i prs kralik at ted o vynahradit ti si ma charakterni nehybny tulak poskytl rad! Muz ztuhla, ci ah propatral misce! Slz eh at? Zenou dilo intuici. Le" "su pud povesti, i jamou tej. V az vdanou zrejmo za ctil 81 kolika u ustrnule malicherny holemi nekradla jinych morfia: pocasi poplaseny zpovidat az dne vyjimecna zidovskem stejny sluzek tajemny hlidat u ruzovym pry jestli vyslychalo zem nerozbrecte farare" " strhla v mem tabule pije a odkraglujem otisky nebot. Ex povidani pusta duse eh zvalel, o pak ma bryle luzka: u posluhovi neudela 30 ze ctverce brovninku. 411 se vi rypaje nova to per ba zchoulostivil remenem. Vaze to lujzou styky, te? Ne me by pazeni tro" "ubil i srovnala dejoveho a prvnich me o zime hlasy nevsimnou. Jejim zajiste za porotam valka sekt. Oni vuli co ryb pruvod. Ode jehoz od lasce ve slouzilo co jektal hryzal lamparny. Zvlastnich ne vybil brejlil uz ah? Husa trit mu straze s zivaje abys chute" " pane ci nepochybujte ubiha k babe ach okoli zle okna deji dverim! Vymyslim do falesne. Pokaceneho leti oka krk. Nohy stejny u vykaslu, rinkotem ondyno, laureat z zabije."
test.knot-resolver.cz. 3600 IN RRSIG TXT 13 3 3600 20370119135450 20190205122450 58 test.knot-resolver.cz. YYzbiOgNyIe2YcUHUbA8LNrqUYPSHEUA U7tAOLJx54kSlTMYDB5VrnqsAIgp2PtV C1gELBVK4Xtwxrx3ajeLhA==
;AUTHORITY
;ADDITIONAL
""")
@pytest.mark.skip(reason="This doesn't work since a.ns.nic.cz (and all CZ.NIC's nameservers in that "
"matter) have internal EDNS buffer size of 1232 bytes. The answer is "
"unfortunately bigger. :(")
def test_udp_fragmentation(server):
return answer_checker.send_and_check(LONG_QUERY,
LONG_ANSWER,
server,
ALL - {"authority"},
unset_flags=[dns.flags.AD])
QUERY_WITH_SMALL_PAYLOAD = answer_checker.make_random_case_query("test.knot-resolver.cz", "TXT", use_edns=0, payload=1280, want_dnssec=True)
answer_checker.unset_flag(QUERY_WITH_SMALL_PAYLOAD, dns.flags.RD)
TRUNCATED_ANSWER = dns.message.from_text(""";
opcode QUERY
rcode NOERROR
flags QR AA TC
edns 0
payload 4096
;QUESTION
test.knot-resolver.cz. IN TXT
;ANSWER
;AUTHORITY
;ADDITIONAL
""")
def test_udp_fragmentation_truncated(server):
return answer_checker.send_and_check(QUERY_WITH_SMALL_PAYLOAD,
TRUNCATED_ANSWER,
server,
ALL - {"authority"},
unset_flags=[dns.flags.AD])
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment