Commit 98c5efd5 authored by Petr Špaček's avatar Petr Špaček

Merge branch 'tcp-rst' into 'master'

respdiff/orchestrator: improve TCP conn error handling

See merge request !37
parents 8f08dd31 3f87f740
Pipeline #41492 passed with stage
in 1 minute and 28 seconds
......@@ -49,6 +49,13 @@ __timeout_reply = DNSReply(None) # optimization: create only one timeout_reply
__dnsreplies_factory = None
CONN_RESET_RETRIES = 2
class TcpDnsLengthError(ConnectionError):
pass
def module_init(args: Namespace) -> None:
global __resolvers
global __max_timeouts
......@@ -104,25 +111,15 @@ def worker_perform_query(args: Tuple[QKey, WireFormat]) -> Tuple[QKey, RepliesBl
"""DNS query performed by orchestrator"""
qkey, qwire = args
try:
selector = __worker_state.selector
sockets = __worker_state.sockets
except AttributeError:
# handle improper initialization
raise __worker_state.exception
# optional artificial delay for testing
if __time_delay_max > 0:
time.sleep(random.uniform(__time_delay_min, __time_delay_max))
replies, reinit = send_recv_parallel(qwire, selector, sockets, __timeout)
replies = send_recv_parallel(qwire, __timeout)
if not __ignore_timeout:
_check_timeout(replies)
if reinit: # a connection is broken or something
worker_deinit()
worker_reinit()
assert __dnsreplies_factory is not None, "Module wasn't initilized!"
blob = __dnsreplies_factory.serialize(replies)
return qkey, blob
......@@ -132,10 +129,8 @@ def worker_perform_single_query(args: Tuple[QKey, WireFormat]) -> Tuple[QKey, Re
"""Perform a single DNS query with setup and teardown of sockets. Used by diffrepro."""
qkey, qwire = args
worker_reinit()
selector = __worker_state.selector
sockets = __worker_state.sockets
replies, _ = send_recv_parallel(qwire, selector, sockets, __timeout)
replies = send_recv_parallel(qwire, __timeout, reinit_on_tcpfin=False)
worker_deinit()
......@@ -224,16 +219,16 @@ def sock_init(retry: int = 3) -> Tuple[Selector, Sequence[Tuple[ResolverID, Sock
def _recv_msg(sock: Socket, isstream: IsStreamFlag) -> WireFormat:
"""Receive DNS message from socket and remove preambule (if present)."""
if isstream: # parse preambule
blength = sock.recv(2) # TODO: does not work with TLS: , socket.MSG_WAITALL)
if not blength: # stream closed
raise ConnectionError('TCP recv length == 0')
blength = sock.recv(2)
if len(blength) != 2: # FIN / RST
raise TcpDnsLengthError('failed to recv DNS packet length')
(length, ) = struct.unpack('!H', blength)
else:
length = 65535 # max. UDP message size, no IPv6 jumbograms
return sock.recv(length)
def send_recv_parallel(
def _send_recv_parallel(
dgram: WireFormat, # DNS message suitable for UDP transport
selector: Selector,
sockets: ResolverSockets,
......@@ -265,10 +260,13 @@ def send_recv_parallel(
sock = key.fileobj
try:
wire = _recv_msg(sock, isstream)
except ConnectionError:
reinit = True
selector.unregister(sock)
continue # receive answers from other parties
except TcpDnsLengthError:
if name in replies: # we have a reply already, most likely TCP FIN
reinit = True
selector.unregister(sock)
continue # receive answers from other parties
else: # no reply -> raise error
raise
# assert len(wire) > 14
if dgram[0:2] != wire[0:2]:
continue # wrong msgid, this might be a delayed answer - ignore it
......@@ -280,3 +278,30 @@ def send_recv_parallel(
replies[resolver] = __timeout_reply
return replies, reinit
def send_recv_parallel(
dgram: WireFormat, # DNS message suitable for UDP transport
timeout: float,
reinit_on_tcpfin: bool = True
) -> Mapping[ResolverID, DNSReply]:
for _ in range(CONN_RESET_RETRIES + 1):
try: # get sockets and selector
selector = __worker_state.selector
sockets = __worker_state.sockets
except AttributeError:
# handle improper initialization
raise __worker_state.exception
try:
replies, reinit = _send_recv_parallel(dgram, selector, sockets, timeout)
if reinit_on_tcpfin and reinit: # a connection is closed
worker_deinit()
worker_reinit()
return replies
except (TcpDnsLengthError, ConnectionError): # most likely TCP RST
worker_deinit() # re-establish connection
worker_reinit()
raise RuntimeError(
'ConnectionError received {} times in a row, exiting!'.format(
CONN_RESET_RETRIES + 1))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment