pytests/proxy: Python wrapper

parent 0c792781
from contextlib import contextmanager, ContextDecorator
import os
import subprocess
from typing import Any, Dict, Optional
import dns
import dns.rcode
import pytest
from kresd import CERTS_DIR, Forward, Kresd, make_kresd, make_port, PYTESTS_DIR
import utils
HINTS = {
'0.foo.': '127.0.0.1',
'1.foo.': '127.0.0.1',
'2.foo.': '127.0.0.1',
'3.foo.': '127.0.0.1',
}
def resolve_hint(sock, qname):
buff, msgid = utils.get_msgbuff(qname)
sock.sendall(buff)
answer = utils.receive_parse_answer(sock)
assert answer.id == msgid
assert answer.rcode() == dns.rcode.NOERROR
assert answer.answer[0][0].address == HINTS[qname]
class Proxy(ContextDecorator):
PATH = ''
def __init__(
self,
local_ip: str = '127.0.0.1',
local_port: Optional[int] = None,
upstream_ip: str = '127.0.0.1',
upstream_port: Optional[int] = None
) -> None:
self.local_ip = local_ip
self.local_port = local_port
self.upstream_ip = upstream_ip
self.upstream_port = upstream_port
self.proxy = None
def get_args(self):
args = []
args.append('--local')
args.append(self.local_ip)
if self.local_port is not None:
args.append('--lport')
args.append(str(self.local_port))
args.append('--upstream')
args.append(self.upstream_ip)
if self.upstream_port is not None:
args.append('--uport')
args.append(str(self.upstream_port))
return args
def __enter__(self):
if not os.path.exists(self.PATH):
pytest.skip("proxy executable '{}' not found (did you compile it?)".format(self.PATH))
cwd, cmd = os.path.split(self.PATH)
cmd = './' + cmd
args = [cmd] + self.get_args()
print(' '.join(args))
self.proxy = subprocess.Popen(
args, cwd=cwd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return self
def __exit__(self, exc_type, exc_value, traceback):
if self.proxy is not None:
self.proxy.terminate()
self.proxy = None
class TLSProxy(Proxy):
PATH = os.path.join(PYTESTS_DIR, 'proxy', 'tlsproxy')
def __init__(
self,
local_ip: str = '127.0.0.1',
local_port: Optional[int] = None,
upstream_ip: str = '127.0.0.1',
upstream_port: Optional[int] = None,
certname: Optional[str] = 'tt',
close: Optional[int] = None,
rehandshake: bool = False
) -> None:
super().__init__(local_ip, local_port, upstream_ip, upstream_port)
if certname is not None:
self.cert_path = os.path.join(CERTS_DIR, certname + '.cert.pem')
self.key_path = os.path.join(CERTS_DIR, certname + '.key.pem')
else:
self.cert_path = None
self.key_path = None
self.close = close
self.rehandshake = rehandshake
def get_args(self):
args = super().get_args()
if self.cert_path is not None:
args.append('--cert')
args.append(self.cert_path)
if self.key_path is not None:
args.append('--key')
args.append(self.key_path)
if self.close is not None:
args.append('--close')
args.append(str(self.close))
if self.rehandshake:
args.append('--rehandshake')
return args
@contextmanager
def kresd_tls_client(
workdir: str,
proxy: TLSProxy,
kresd_tls_client_kwargs: Optional[Dict[Any, Any]] = None,
kresd_fwd_target_kwargs: Optional[Dict[Any, Any]] = None
) -> Kresd:
"""kresd_tls_client --(tls)--> tlsproxy --(tcp)--> kresd_fwd_target"""
ALLOWED_IPS = {'127.0.0.1', '::1'}
assert proxy.local_ip in ALLOWED_IPS, "only localhost IPs supported for proxy"
assert proxy.upstream_ip in ALLOWED_IPS, "only localhost IPs are supported for proxy"
if kresd_tls_client_kwargs is None:
kresd_tls_client_kwargs = dict()
if kresd_fwd_target_kwargs is None:
kresd_fwd_target_kwargs = dict()
# run forward target instance
dir1 = os.path.join(workdir, 'kresd_fwd_target')
os.makedirs(dir1)
with make_kresd(dir1, hints=HINTS, **kresd_fwd_target_kwargs) as kresd_fwd_target:
sock = kresd_fwd_target.ip_tcp_socket()
resolve_hint(sock, list(HINTS.keys())[0])
proxy.local_port = make_port('127.0.0.1', '::1')
proxy.upstream_port = kresd_fwd_target.port
with proxy:
# run test kresd instance
dir2 = os.path.join(workdir, 'kresd_tls_client')
os.makedirs(dir2)
forward = Forward(
proto='tls', ip=proxy.local_ip, port=proxy.local_port,
hostname='transport-test-server.com', ca_file=proxy.cert_path)
with make_kresd(dir2, forward=forward, **kresd_tls_client_kwargs) as kresd:
yield kresd
......@@ -2,6 +2,7 @@
disable=
missing-docstring,
too-few-public-methods,
too-many-arguments,
too-many-instance-attributes,
fixme,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment