pytests: import test_tls_session_resumption (test18)

parent a3091007
from contextlib import contextmanager
import random
import socket
import pytest
from kresd import Kresd
def is_port_free(port, ip=None, ip6=None):
def check(family, type_, dest):
sock = socket.socket(family, type_)
sock.bind(dest)
sock.close()
try:
if ip is not None:
check(socket.AF_INET, socket.SOCK_STREAM, (ip, port))
check(socket.AF_INET, socket.SOCK_DGRAM, (ip, port))
if ip6 is not None:
check(socket.AF_INET6, socket.SOCK_STREAM, (ip6, port, 0, 0))
check(socket.AF_INET6, socket.SOCK_DGRAM, (ip6, port, 0, 0))
except OSError as exc:
if exc.errno == 98: # address alrady in use
return False
else:
raise
return True
@contextmanager
def make_kresd(workdir, certname=None):
ip = '127.0.0.1'
ip6 = '::1'
def make_port():
for _ in range(10): # max attempts
port = random.randint(1024, 65535)
if is_port_free(port, ip, ip6):
return port
raise RuntimeError("No available port found!")
port = make_port()
tls_port = make_port()
with Kresd(workdir, port, tls_port, ip, ip6, certname) as kresd:
yield kresd
# TODO: add verbose option?
# with open(kresd.logfile_path) as log:
# print(log.read()) # display log for debugging purposes
from kresd import make_kresd
@pytest.fixture
......
from contextlib import ContextDecorator
from contextlib import ContextDecorator, contextmanager
import os
import random
import re
import socket
import subprocess
......@@ -170,3 +171,43 @@ class Kresd(ContextDecorator):
def ip6_tls_socket(self):
return self._tls_socket_with_retry(socket.AF_INET6)
def is_port_free(port, ip=None, ip6=None):
def check(family, type_, dest):
sock = socket.socket(family, type_)
sock.bind(dest)
sock.close()
try:
if ip is not None:
check(socket.AF_INET, socket.SOCK_STREAM, (ip, port))
check(socket.AF_INET, socket.SOCK_DGRAM, (ip, port))
if ip6 is not None:
check(socket.AF_INET6, socket.SOCK_STREAM, (ip6, port, 0, 0))
check(socket.AF_INET6, socket.SOCK_DGRAM, (ip6, port, 0, 0))
except OSError as exc:
if exc.errno == 98: # address alrady in use
return False
else:
raise
return True
def make_port(ip=None, ip6=None):
for _ in range(10): # max attempts
port = random.randint(1024, 65535)
if is_port_free(port, ip, ip6):
return port
raise RuntimeError("No available port found!")
@contextmanager
def make_kresd(workdir, certname=None, ip='127.0.0.1', ip6='::1'):
port = make_port(ip, ip6)
tls_port = make_port(ip, ip6)
with Kresd(workdir, port, tls_port, ip, ip6, certname) as kresd:
yield kresd
# TODO: add verbose option?
# with open(kresd.logfile_path) as log:
# print(log.read()) # display log for debugging purposes
"""TLS-specific tests"""
import itertools
import os
from socket import AF_INET, AF_INET6
import ssl
import sys
import pytest
from kresd import make_kresd
import utils
......@@ -45,3 +50,38 @@ def test_tls_cert_expired(kresd_tt_expired, sock_family):
with pytest.raises(ssl.SSLError):
ssock.connect(dest)
@pytest.mark.skipif(sys.version_info < (3, 6),
reason="requires python3.6 or higher")
@pytest.mark.parametrize('sf1, sf2, sf3', itertools.product(
[AF_INET, AF_INET6], [AF_INET, AF_INET6], [AF_INET, AF_INET6]))
def test_tls_session_resumption(tmpdir, sf1, sf2, sf3):
"""Attempt TLS session resumption against the same kresd instance and a different one."""
# TODO ensure that session can't be resumed after session ticket key regeneration
# at the first kresd instance
def connect(kresd, ctx, sf, session=None):
sock, dest = kresd.stream_socket(sf, tls=True)
ssock = ctx.wrap_socket(
sock, server_hostname='transport-test-server.com', session=session)
ssock.connect(dest)
new_session = ssock.session
assert new_session.has_ticket
assert ssock.session_reused == (session is not None)
utils.ping_alive(ssock)
ssock.close()
return new_session
workdir = os.path.join(str(tmpdir), 'kresd')
os.makedirs(workdir)
with make_kresd(workdir, 'tt') as kresd:
ctx = utils.make_ssl_context(verify_location=kresd.tls_cert_path)
session = connect(kresd, ctx, sf1) # initial conn
connect(kresd, ctx, sf2, session) # resume session on the same instance
workdir2 = os.path.join(str(tmpdir), 'kresd2')
os.makedirs(workdir2)
with make_kresd(workdir2, 'tt') as kresd2:
connect(kresd2, ctx, sf3, session) # resume session on a different instance
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment