Commit f52231b6 authored by Marek Vavruša's avatar Marek Vavruša Committed by Grigorii Demidov

daemon/worker: fixes error handling from TLS writes

The error handling loop for uncorking TLS data was wrong, as the
underlying push function is asynchronous and there's no relationship
between completed DNS packet writes and number of TLS message writes.
In case of the asynchronous function, the buffered data must be valid
until the write is complete, currently this is not guaranteed and
loading the resolver with pipelined requests results in memory errors:

```
$ getdns_query @127.0.0.1#853 -s -a -s -l L -B -F queries -q
...
==47111==ERROR: AddressSanitizer: heap-use-after-free on address 0x6290040a1253 at pc 0x00010da960d3 bp 0x7ffee2628b30 sp 0x7ffee26282e0
READ of size 499 at 0x6290040a1253 thread T0
    #0 0x10da960d2 in wrap_write (libclang_rt.asan_osx_dynamic.dylib:x86_64h+0x1f0d2)
    #1 0x10d855971 in uv__write (libuv.1.dylib:x86_64+0xf971)
    #2 0x10d85422e in uv__stream_io (libuv.1.dylib:x86_64+0xe22e)
    #3 0x10d85b35a in uv__io_poll (libuv.1.dylib:x86_64+0x1535a)
    #4 0x10d84c644 in uv_run (libuv.1.dylib:x86_64+0x6644)
    #5 0x10d602ddf in main main.c:422
    #6 0x7fff6a28a014 in start (libdyld.dylib:x86_64+0x1014)

0x6290040a1253 is located 83 bytes inside of 16895-byte region [0x6290040a1200,0x6290040a53ff)
freed by thread T0 here:
    #0 0x10dacdfdd in wrap_free (libclang_rt.asan_osx_dynamic.dylib:x86_64h+0x56fdd)
    #1 0x10d913c2e in _mbuffer_head_remove_bytes (libgnutls.30.dylib:x86_64+0xbc2e)
    #2 0x10d915080 in _gnutls_io_write_flush (libgnutls.30.dylib:x86_64+0xd080)
    #3 0x10d90ca18 in _gnutls_send_tlen_int (libgnutls.30.dylib:x86_64+0x4a18)
    #4 0x10d90edde in gnutls_record_send2 (libgnutls.30.dylib:x86_64+0x6dde)
    #5 0x10d90f085 in gnutls_record_uncork (libgnutls.30.dylib:x86_64+0x7085)
    #6 0x10d5f6569 in tls_push tls.c:238
    #7 0x10d5e5b2a in qr_task_send worker.c:1002
    #8 0x10d5e2ea6 in qr_task_finalize worker.c:1562
    #9 0x10d5dab99 in qr_task_step worker.c
    #10 0x10d5e12fe in worker_process_tcp worker.c:2410
```

The current implementation adds opportunistic uv_try_write which
either writes the requested data, or returns UV_EAGAIN or an error,
which then falls back to slower asynchronous write that copies the buffered data.

The function signature is changed from simple write to vectorized write.

This also enables TLS False Start to save 1RTT when possible.
parent 88e78c66
......@@ -793,37 +793,53 @@ static int net_outgoing(lua_State *L, int family)
static int net_outgoing_v4(lua_State *L) { return net_outgoing(L, AF_INET); }
static int net_outgoing_v6(lua_State *L) { return net_outgoing(L, AF_INET6); }
static int net_tcp_in_idle(lua_State *L)
static int net_update_timeout(lua_State *L, uint64_t *timeout, const char *name)
{
struct engine *engine = engine_luaget(L);
struct network *net = &engine->net;
/* Only return current idle timeout. */
if (lua_gettop(L) == 0) {
lua_pushnumber(L, net->tcp.in_idle_timeout);
lua_pushnumber(L, *timeout);
return 1;
}
if ((lua_gettop(L) != 1)) {
lua_pushstring(L, "net.tcp_in_idle takes one parameter: (\"idle timeout\")");
lua_pushstring(L, name);
lua_pushstring(L, " takes one parameter: (\"idle timeout\")");
lua_error(L);
}
if (lua_isnumber(L, 1)) {
int idle_timeout = lua_tointeger(L, 1);
if (idle_timeout <= 0) {
lua_pushstring(L, "net.tcp_in_idle parameter has to be positive number");
lua_pushstring(L, name);
lua_pushstring(L, " parameter has to be positive number");
lua_error(L);
}
net->tcp.in_idle_timeout = idle_timeout;
*timeout = idle_timeout;
} else {
lua_pushstring(L, "net.tcp_in_idle parameter has to be positive number");
lua_pushstring(L, name);
lua_pushstring(L, " parameter has to be positive number");
lua_error(L);
}
lua_pushboolean(L, true);
return 1;
}
static int net_tcp_in_idle(lua_State *L)
{
struct engine *engine = engine_luaget(L);
struct network *net = &engine->net;
return net_update_timeout(L, &net->tcp.in_idle_timeout, "net.tcp_in_idle");
}
static int net_tls_handshake_timeout(lua_State *L)
{
struct engine *engine = engine_luaget(L);
struct network *net = &engine->net;
return net_update_timeout(L, &net->tcp.tls_handshake_timeout, "net.tls_handshake_timeout");
}
int lib_net(lua_State *L)
{
static const luaL_Reg lib[] = {
......@@ -842,6 +858,7 @@ int lib_net(lua_State *L)
{ "outgoing_v4", net_outgoing_v4 },
{ "outgoing_v6", net_outgoing_v6 },
{ "tcp_in_idle", net_tcp_in_idle },
{ "tls_handshake_timeout", net_tls_handshake_timeout },
{ NULL, NULL }
};
register_lib(L, "net", lib);
......
......@@ -108,17 +108,6 @@ static void session_release(struct worker_ctx *worker, uv_handle_t *handle)
}
}
static uv_stream_t *handle_borrow(uv_loop_t *loop)
{
struct worker_ctx *worker = loop->data;
void *req = worker_iohandle_borrow(worker);
if (!req) {
return NULL;
}
return (uv_stream_t *)req;
}
static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
{
/* Worker has single buffer which is reused for all incoming
......@@ -276,12 +265,21 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls)
return;
}
uv_stream_t *client = handle_borrow(master->loop);
struct worker_ctx *worker = (struct worker_ctx *)master->loop->data;
uv_stream_t *client = worker_iohandle_borrow(worker);
if (!client) {
return;
}
memset(client, 0, sizeof(*client));
io_create(master->loop, (uv_handle_t *)client, SOCK_STREAM);
int res = io_create(master->loop, (uv_handle_t *)client, SOCK_STREAM, 0);
if (res) {
if (res == UV_EMFILE) {
worker->too_many_open = true;
worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
}
worker_iohandle_release(worker, client);
return;
}
if (uv_accept(master, client) != 0) {
uv_close((uv_handle_t *)client, io_release);
return;
......@@ -297,11 +295,11 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls)
int addr_len = sizeof(union inaddr);
int ret = uv_tcp_getpeername((uv_tcp_t *)client, addr, &addr_len);
if (ret || addr->sa_family == AF_UNSPEC) {
worker_iohandle_release(worker, client);
worker_session_close(session);
return;
}
const struct worker_ctx *worker = (struct worker_ctx *)master->loop->data;
const struct engine *engine = worker->engine;
const struct network *net = &engine->net;
uint64_t idle_in_timeout = net->tcp.in_idle_timeout;
......@@ -424,16 +422,18 @@ int tcp_bindfd_tls(uv_tcp_t *handle, int fd)
return _tcp_bindfd(handle, fd, tls_accept);
}
void io_create(uv_loop_t *loop, uv_handle_t *handle, int type)
int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family)
{
int ret = -1;
int ret = 0;
if (type == SOCK_DGRAM) {
ret = uv_udp_init(loop, (uv_udp_t *)handle);
} else if (type == SOCK_STREAM) {
ret = uv_tcp_init(loop, (uv_tcp_t *)handle);
ret = uv_tcp_init_ex(loop, (uv_tcp_t *)handle, family);
uv_tcp_nodelay((uv_tcp_t *)handle, 1);
}
assert(ret == 0);
if (ret != 0) {
return ret;
}
struct worker_ctx *worker = loop->data;
struct session *session = session_borrow(worker);
assert(session);
......@@ -441,6 +441,7 @@ void io_create(uv_loop_t *loop, uv_handle_t *handle, int type)
handle->data = session;
session->timeout.data = session;
uv_timer_init(worker->loop, &session->timeout);
return ret;
}
void io_deinit(uv_handle_t *handle)
......
......@@ -60,7 +60,7 @@ int tcp_bindfd(uv_tcp_t *handle, int fd);
int tcp_bindfd_tls(uv_tcp_t *handle, int fd);
/** Initialize the handle, incl. ->data = struct session * instance. type = SOCK_* */
void io_create(uv_loop_t *loop, uv_handle_t *handle, int type);
int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family);
void io_deinit(uv_handle_t *handle);
void io_free(uv_handle_t *handle);
......
......@@ -53,8 +53,9 @@ void network_init(struct network *net, uv_loop_t *loop)
net->endpoints = map_make(NULL);
net->tls_client_params = map_make(NULL);
net->tls_session_ticket_ctx = /* unsync. random, by default */
tls_session_ticket_ctx_create(loop, NULL, 0);
tls_session_ticket_ctx_create(loop, NULL, 0);
net->tcp.in_idle_timeout = 10000;
net->tcp.tls_handshake_timeout = TLS_MAX_HANDSHAKE_TIME;
}
}
......
......@@ -44,6 +44,7 @@ typedef array_t(struct endpoint*) endpoint_array_t;
struct net_tcp_param {
uint64_t in_idle_timeout;
uint64_t tls_handshake_timeout;
};
struct tls_session_ticket_ctx;
......
This diff is collapsed.
......@@ -42,6 +42,9 @@
*/
#define TLS_MAX_HANDSHAKE_TIME (KR_CONN_RTT_MAX * 3)
/** Transport session (opaque). */
struct session;
struct tls_ctx_t;
struct tls_client_ctx_t;
struct tls_credentials {
......@@ -59,6 +62,7 @@ struct tls_client_paramlist_entry {
array_t(const char *) pins;
gnutls_certificate_credentials_t credentials;
gnutls_datum_t session_data;
uint32_t refs;
};
struct worker_ctx;
......@@ -93,7 +97,6 @@ struct tls_common_ctx {
uint8_t recv_buf[4096];
tls_handshake_cb handshake_cb;
struct worker_ctx *worker;
struct qr_task *task;
};
struct tls_ctx_t {
......@@ -126,7 +129,7 @@ void tls_close(struct tls_common_ctx *ctx);
void tls_free(struct tls_ctx_t* tls);
/*! Push new data to TLS context for sending */
int tls_push(struct qr_task *task, uv_handle_t* handle, knot_pkt_t * pkt);
int tls_write(uv_write_t *req, uv_handle_t* handle, knot_pkt_t * pkt, uv_write_cb cb);
/*! Unwrap incoming data from a TLS stream and pass them to TCP session.
* @return the number of newly-completed requests (>=0) or an error code
......@@ -158,6 +161,15 @@ tls_hs_state_t tls_get_hs_state(const struct tls_common_ctx *ctx);
/*! Set TLS handshake state. */
int tls_set_hs_state(struct tls_common_ctx *ctx, tls_hs_state_t state);
/*! Find TLS parameters for given address. Attempt opportunistic upgrade for port 53 to 853,
* if the address is configured with a working DoT on port 853.
*/
struct tls_client_paramlist_entry *tls_client_try_upgrade(map_t *tls_client_paramlist,
const struct sockaddr *addr);
/*! Clear (remove) TLS parameters for given address. */
int tls_client_params_clear(map_t *tls_client_paramlist, const char *addr, uint16_t port);
/*! Set TLS authentication parameters for given address.
* Note: hostnames must be imported before ca files,
* otherwise ca files will not be imported at all.
......@@ -170,7 +182,7 @@ int tls_client_params_set(map_t *tls_client_paramlist,
int tls_client_params_free(map_t *tls_client_paramlist);
/*! Allocate new client TLS context */
struct tls_client_ctx_t *tls_client_ctx_new(const struct tls_client_paramlist_entry *entry,
struct tls_client_ctx_t *tls_client_ctx_new(struct tls_client_paramlist_entry *entry,
struct worker_ctx *worker);
/*! Free client TLS context */
......@@ -180,9 +192,7 @@ int tls_client_connect_start(struct tls_client_ctx_t *client_ctx,
struct session *session,
tls_handshake_cb handshake_cb);
int tls_client_ctx_set_params(struct tls_client_ctx_t *ctx,
struct tls_client_paramlist_entry *entry,
struct session *session);
int tls_client_ctx_set_session(struct tls_client_ctx_t *ctx, struct session *session);
/* Session tickets, server side. Implementation in ./tls_session_ticket-srv.c */
......
......@@ -217,7 +217,15 @@ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t
if (!handle) {
return NULL;
}
io_create(worker->loop, handle, socktype);
int ret = io_create(worker->loop, handle, socktype, family);
if (ret) {
if (ret == UV_EMFILE) {
worker->too_many_open = true;
worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
}
iohandle_release(worker, h);
return NULL;
}
/* Bind to outgoing address, according to IP v4/v6. */
union inaddr *addr;
......@@ -226,7 +234,6 @@ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t
} else {
addr = (union inaddr *)&worker->out_addr6;
}
int ret = 0;
if (addr->ip.sa_family != AF_UNSPEC) {
assert(addr->ip.sa_family == family);
if (socktype == SOCK_DGRAM) {
......@@ -900,86 +907,6 @@ static void on_task_write(uv_write_t *req, int status)
iorequest_release(worker, req);
}
static void on_nontask_write(uv_write_t *req, int status)
{
uv_handle_t *handle = (uv_handle_t *)(req->handle);
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
assert(worker == get_worker());
iorequest_release(worker, req);
}
ssize_t worker_gnutls_push(gnutls_transport_ptr_t h, const void *buf, size_t len)
{
struct tls_common_ctx *t = (struct tls_common_ctx *)h;
const uv_buf_t uv_buf[1] = {
{ (char *)buf, len }
};
if (t == NULL) {
errno = EFAULT;
return -1;
}
assert(t->session && t->session->handle &&
t->session->handle->type == UV_TCP);
VERBOSE_MSG(NULL,"[%s] push %zu <%p>\n",
t->client_side ? "tls_client" : "tls", len, h);
struct worker_ctx *worker = t->worker;
assert(worker);
void *ioreq = worker_iohandle_borrow(worker);
if (!ioreq) {
errno = EFAULT;
return -1;
}
uv_write_t *write_req = (uv_write_t *)ioreq;
struct qr_task *task = t->task;
uv_write_cb write_cb = on_task_write;
if (t->handshake_state == TLS_HS_DONE) {
assert(task);
} else {
task = NULL;
write_cb = on_nontask_write;
}
write_req->data = task;
ssize_t ret = -1;
int res = uv_write(write_req, (uv_stream_t *)t->session->handle, uv_buf, 1, write_cb);
if (res == 0) {
if (task) {
qr_task_ref(task); /* Pending ioreq on current task */
struct request_ctx *ctx = task->ctx;
if (ctx && ctx->source.session &&
t->session->handle != ctx->source.session->handle) {
struct sockaddr *addr = &t->session->peer.ip;
worker->stats.tls += 1;
if (addr->sa_family == AF_INET6)
worker->stats.ipv6 += 1;
else if (addr->sa_family == AF_INET)
worker->stats.ipv4 += 1;
}
}
if (worker->too_many_open &&
worker->stats.rconcurrent <
worker->rconcurrent_highwatermark - 10) {
worker->too_many_open = false;
}
ret = len;
} else {
VERBOSE_MSG(NULL,"[%s] uv_write: %s\n",
t->client_side ? "tls_client" : "tls", uv_strerror(res));
iorequest_release(worker, ioreq);
errno = EIO;
}
return ret;
}
static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
struct sockaddr *addr, knot_pkt_t *pkt)
{
......@@ -987,21 +914,6 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
return qr_task_on_send(task, handle, kr_error(EIO));
}
/* Synchronous push to TLS context, bypassing event loop. */
struct session *session = handle->data;
assert(session->closing == false);
if (session->has_tls) {
struct kr_request *req = &task->ctx->req;
if (session->outgoing) {
int ret = kr_resolve_checkout(req, NULL, addr,
SOCK_STREAM, pkt);
if (ret != kr_ok()) {
return ret;
}
}
return tls_push(task, handle, pkt);
}
int ret = 0;
struct request_ctx *ctx = task->ctx;
struct worker_ctx *worker = ctx->worker;
......@@ -1031,8 +943,18 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
return ret;
}
}
/* Pending ioreq on current task */
qr_task_ref(task);
/* Send using given protocol */
if (handle->type == UV_UDP) {
struct session *session = handle->data;
assert(session->closing == false);
if (session->has_tls) {
uv_write_t *write_req = (uv_write_t *)ioreq;
write_req->data = task;
ret = tls_write(write_req, handle, pkt, &on_task_write);
} else if (handle->type == UV_UDP) {
uv_udp_send_t *send_req = (uv_udp_send_t *)ioreq;
uv_buf_t buf = { (char *)pkt->wire, pkt->size };
send_req->data = task;
......@@ -1051,7 +973,6 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
}
if (ret == 0) {
qr_task_ref(task); /* Pending ioreq on current task */
if (worker->too_many_open &&
worker->stats.rconcurrent <
worker->rconcurrent_highwatermark - 10) {
......@@ -1059,6 +980,7 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
}
} else {
iorequest_release(worker, ioreq);
qr_task_unref(task);
if (ret == UV_EMFILE) {
worker->too_many_open = true;
worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
......@@ -1069,15 +991,19 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
if (ctx->source.session &&
handle != ctx->source.session->handle &&
addr) {
if (handle->type == UV_UDP)
if (session->has_tls)
worker->stats.tls += 1;
else if (handle->type == UV_UDP)
worker->stats.udp += 1;
else
worker->stats.tcp += 1;
if (addr->sa_family == AF_INET6)
worker->stats.ipv6 += 1;
else if (addr->sa_family == AF_INET)
worker->stats.ipv4 += 1;
}
return ret;
}
......@@ -1544,12 +1470,16 @@ static int qr_task_finalize(struct qr_task *task, int state)
}
struct request_ctx *ctx = task->ctx;
kr_resolve_finish(&ctx->req, state);
task->finished = true;
if (ctx->source.session == NULL) {
(void) qr_task_on_send(task, NULL, kr_error(EIO));
return state == KR_STATE_DONE ? 0 : kr_error(EIO);
}
/* Reference task as the callback handler can close it */
qr_task_ref(task);
/* Send back answer */
struct session *source_session = ctx->source.session;
uv_handle_t *handle = source_session->handle;
......@@ -1573,12 +1503,14 @@ static int qr_task_finalize(struct qr_task *task, int state)
session_del_tasks(source_session, t);
}
session_close(source_session);
} else if (handle->type == UV_TCP) {
} else if (handle->type == UV_TCP && ctx->source.session) {
/* Don't try to close source session at least
* retry_interval_for_timeout_timer milliseconds */
uv_timer_again(&ctx->source.session->timeout);
}
qr_task_unref(task);
return state == KR_STATE_DONE ? 0 : kr_error(EIO);
}
......@@ -1820,7 +1752,7 @@ static int qr_task_step(struct qr_task *task,
subreq_finalize(task, packet_source, packet);
return qr_task_step(task, NULL, NULL);
}
tls_client_ctx_set_params(tls_ctx, entry, session);
tls_client_ctx_set_session(tls_ctx, session);
session->tls_client_ctx = tls_ctx;
session->has_tls = true;
}
......
......@@ -16,8 +16,6 @@
#pragma once
#include <gnutls/gnutls.h>
#include "daemon/engine.h"
#include "lib/generic/array.h"
#include "lib/generic/map.h"
......@@ -92,10 +90,6 @@ void *worker_iohandle_borrow(struct worker_ctx *worker);
void worker_iohandle_release(struct worker_ctx *worker, void *h);
ssize_t worker_gnutls_push(gnutls_transport_ptr_t h, const void *buf, size_t len);
ssize_t worker_gnutls_client_push(gnutls_transport_ptr_t h, const void *buf, size_t len);
/** Finalize given task */
int worker_task_finalize(struct qr_task *task, int state);
......
......@@ -410,6 +410,18 @@ uint16_t kr_inaddr_port(const struct sockaddr *addr)
}
}
void kr_inaddr_set_port(struct sockaddr *addr, uint16_t port)
{
if (!addr) {
return;
}
switch (addr->sa_family) {
case AF_INET: ((struct sockaddr_in *)addr)->sin_port = htons(port);
case AF_INET6: ((struct sockaddr_in6 *)addr)->sin6_port = htons(port);
default: break;
}
}
int kr_inaddr_str(const struct sockaddr *addr, char *buf, size_t *buflen)
{
int ret = kr_ok();
......
......@@ -247,6 +247,9 @@ int kr_sockaddr_cmp(const struct sockaddr *left, const struct sockaddr *right);
/** Port. */
KR_EXPORT KR_PURE
uint16_t kr_inaddr_port(const struct sockaddr *addr);
/** Set port. */
KR_EXPORT
void kr_inaddr_set_port(struct sockaddr *addr, uint16_t port);
/** String representation for given address as "<addr>#<port>" */
KR_EXPORT
int kr_inaddr_str(const struct sockaddr *addr, char *buf, size_t *buflen);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment