pytests: fix prefix tests

parent 5daaab15
......@@ -8,13 +8,6 @@ length, excluding the two byte length field.
The following test suite focuses on edge cases for the prefix - when it
is either too short or too long, instead of matching the length of DNS
message exactly.
The tests with invalid prefix attempt to sequentially send the invalid
message. After a certain period of time (affected by net.tcp_in_idle,
TCP_DEFER_ACCEPT, ...), kresd should close the connection. There are
three variants of these tests - either no valid query is sent, or one
valid query is sent along with the invalid buffer at once, or one valid
query is sent and afterwards the invalid buffer is sent.
"""
import time
......@@ -24,85 +17,75 @@ import pytest
import utils
def send_invalid_repeatedly(sock, buff, delay=1):
"""Utility function to keep sending the buffer until MAX_TIMEOUT is reached.
It is expected kresd will close the connection, since the buffer
contains invalid prefix of the message.
If the connection remains open, test is failed.
"""
end_time = time.time() + utils.MAX_TIMEOUT
with utils.expect_kresd_close():
while time.time() < end_time:
sock.sendall(buff)
time.sleep(delay)
@pytest.fixture(params=[
'no_query_before',
'send_query_before_invalid',
'send_query_before_invalid_single_buffer',
'query_before',
'query_before_in_single_buffer',
])
def send_query_before(request):
"""This either performs no query, or sends a query along with invalid buffer at once, or
sends a query and then the invalid buffer."""
def send_query(request):
"""Function sends a buffer, either by itself, or with a valid query before.
If a valid query is sent before, it can be sent either in a separate buffer, or
along with the provided buffer."""
# pylint: disable=possibly-unused-variable
def no_query_before(*args, **kwargs): # pylint: disable=unused-argument
pass
def no_query_before(sock, buff): # pylint: disable=unused-argument
sock.sendall(buff)
def send_query_before_invalid(sock, invalid_buff, single_buffer=False):
def query_before(sock, buff, single_buffer=False):
"""Send an initial query and expect a response."""
msg_buff, msgid = utils.get_msgbuff()
if single_buffer:
sock.sendall(msg_buff + invalid_buff)
sock.sendall(msg_buff + buff)
else:
sock.sendall(msg_buff)
sock.sendall(invalid_buff)
sock.sendall(buff)
answer = utils.receive_parse_answer(sock)
assert answer.id == msgid
def send_query_before_invalid_single_buffer(sock, invalid_buff):
return send_query_before_invalid(sock, invalid_buff, single_buffer=True)
def query_before_in_single_buffer(sock, buff):
return query_before(sock, buff, single_buffer=True)
return locals()[request.param]
def test_prefix_less_than_header(kresd_sock, send_query_before):
"""Prefix is less than the length of the DNS message header."""
@pytest.mark.parametrize('datalen', [
1, # just one byte of DNS header
11, # DNS header size minus 1
14, # DNS Header size plus 2
])
def test_prefix_cuts_message(kresd_sock, datalen, send_query):
"""Prefix is shorter than the DNS message."""
wire, _ = utils.prepare_wire()
datalen = 11 # DNS header size minus 1
assert datalen < len(wire)
invalid_buff = utils.prepare_buffer(wire, datalen)
send_query_before(kresd_sock, invalid_buff)
send_invalid_repeatedly(kresd_sock, invalid_buff)
send_query(kresd_sock, invalid_buff) # buffer breaks parsing of TCP stream
with utils.expect_kresd_close():
utils.ping_alive(kresd_sock)
def test_prefix_greater_than_message(kresd_sock, send_query_before):
def test_prefix_greater_than_message(kresd_sock, send_query):
"""Prefix is greater than the length of the entire DNS message."""
wire, _ = utils.prepare_wire()
wire, invalid_msgid = utils.prepare_wire()
datalen = len(wire) + 16
invalid_buff = utils.prepare_buffer(wire, datalen)
send_query_before(kresd_sock, invalid_buff)
send_invalid_repeatedly(kresd_sock, invalid_buff)
send_query(kresd_sock, invalid_buff)
valid_buff, _ = utils.get_msgbuff()
kresd_sock.sendall(valid_buff)
def test_prefix_cuts_message(kresd_sock, send_query_before):
"""Prefix is greater than the length of the DNS message header, but shorter than
the entire DNS message."""
wire, _ = utils.prepare_wire()
datalen = 14 # DNS Header size plus 2
assert datalen < len(wire)
invalid_buff = utils.prepare_buffer(wire, datalen)
# invalid_buff is answered (treats additional data as trailing garbage)
answer = utils.receive_parse_answer(kresd_sock)
assert answer.id == invalid_msgid
send_query_before(kresd_sock, invalid_buff)
send_invalid_repeatedly(kresd_sock, invalid_buff)
# parsing stream is broken by the invalid_buff, valid query is never answered
with utils.expect_kresd_close():
utils.receive_parse_answer(kresd_sock)
@pytest.mark.parametrize('glength', [
......
from contextlib import contextmanager
import random
import ssl
import struct
import random
import time
import dns
import dns.message
......@@ -99,6 +100,7 @@ def ping_alive(sock, msgid=None):
def expect_kresd_close(rst_ok=False):
with pytest.raises(BrokenPipeError, message="kresd didn't close the connection"):
try:
time.sleep(0.05) # give kresd time to close connection with TCP FIN
yield
except ConnectionResetError:
if rst_ok:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment