Commit c23edd06 authored by Marek Vavrusa's avatar Marek Vavrusa

daemon: out-of-order processing for TCP

* daemon now processes messages over TCP stream
out-of-order and concurrently
* support for TCP_DEFER_ACCEPT
* support for TCP Fast-Open
* there are now deadlines for TCP for idle/slow
streams (to prevent slowloris; pruning)
* there is now per-request limit on timeouts
(each request is allowed 4 timeouts before bailing)
* faster request closing, unified retry/timeout timers
* rare race condition in timer closing fixed
parent e61c48ef
......@@ -520,6 +520,18 @@ For when listening on ``localhost`` just doesn't cut it.
> net.bufsize()
4096
.. function:: net.tcp_pipeline([len])
Get/set per-client TCP pipeline limit (number of outstanding queries that a single client connection can make in parallel). Default is 50.
Example output:
.. code-block:: lua
> net.tcp_pipeline()
50
> net.tcp_pipeline(100)
Trust anchors and DNSSEC
^^^^^^^^^^^^^^^^^^^^^^^^
......
......@@ -48,6 +48,13 @@ static int format_error(lua_State* L, const char *err)
return 1;
}
static inline struct worker_ctx *wrk_luaget(lua_State *L) {
lua_getglobal(L, "__worker");
struct worker_ctx *worker = lua_touserdata(L, -1);
lua_pop(L, 1);
return worker;
}
/** List loaded modules */
static int mod_list(lua_State *L)
{
......@@ -302,14 +309,36 @@ static int net_bufsize(lua_State *L)
return 0;
}
/** Set TCP pipelining size. */
static int net_pipeline(lua_State *L)
{
struct worker_ctx *worker = wrk_luaget(L);
if (!worker) {
return 0;
}
if (!lua_isnumber(L, 1)) {
lua_pushnumber(L, worker->tcp_pipeline_max);
return 1;
}
int len = lua_tointeger(L, 1);
if (len < 0 || len > 4096) {
format_error(L, "tcp_pipeline must be within <0, 4096>");
lua_error(L);
}
worker->tcp_pipeline_max = len;
lua_pushnumber(L, len);
return 1;
}
int lib_net(lua_State *L)
{
static const luaL_Reg lib[] = {
{ "list", net_list },
{ "listen", net_listen },
{ "close", net_close },
{ "interfaces", net_interfaces },
{ "bufsize", net_bufsize },
{ "list", net_list },
{ "listen", net_listen },
{ "close", net_close },
{ "interfaces", net_interfaces },
{ "bufsize", net_bufsize },
{ "tcp_pipeline", net_pipeline },
{ NULL, NULL }
};
register_lib(L, "net", lib);
......@@ -599,13 +628,6 @@ int lib_event(lua_State *L)
return 1;
}
static inline struct worker_ctx *wrk_luaget(lua_State *L) {
lua_getglobal(L, "__worker");
struct worker_ctx *worker = lua_touserdata(L, -1);
lua_pop(L, 1);
return worker;
}
/* @internal Call the Lua callback stored in baton. */
static void resolve_callback(struct worker_ctx *worker, struct kr_request *req, void *baton)
{
......
......@@ -32,6 +32,9 @@
#ifndef QUERY_RATE_THRESHOLD
#define QUERY_RATE_THRESHOLD (2 * MP_FREELIST_SIZE) /**< Nr of parallel queries considered as high rate */
#endif
#ifndef MAX_PIPELINED
#define MAX_PIPELINED 100
#endif
/*
* @internal These are forward decls to allow building modules with engine but without Lua.
......
......@@ -18,6 +18,7 @@
#include <libknot/errcode.h>
#include <contrib/ucw/lib.h>
#include <contrib/ucw/mempool.h>
#include <assert.h>
#include "daemon/io.h"
#include "daemon/network.h"
......@@ -44,14 +45,56 @@ static void check_bufsize(uv_handle_t* handle)
#undef negotiate_bufsize
static void *handle_alloc(uv_loop_t *loop, size_t size)
static void session_clear(struct session *s)
{
return malloc(size);
assert(s->is_subreq || s->tasks.len == 0);
array_clear(s->tasks);
memset(s, 0, sizeof(*s));
}
static void handle_free(uv_handle_t *handle)
void session_free(struct session *s)
{
free(handle);
session_clear(s);
free(s);
}
struct session *session_new(void)
{
return calloc(1, sizeof(struct session));
}
static struct session *session_borrow(struct worker_ctx *worker)
{
struct session *s = NULL;
if (worker->pool_sessions.len > 0) {
s = array_tail(worker->pool_sessions);
array_pop(worker->pool_sessions);
kr_asan_unpoison(s, sizeof(*s));
} else {
s = session_new();
}
return s;
}
static void session_release(struct worker_ctx *worker, struct session *s)
{
if (worker->pool_sessions.len < MP_FREELIST_SIZE) {
session_clear(s);
array_push(worker->pool_sessions, s);
kr_asan_poison(s, sizeof(*s));
} else {
session_free(s);
}
}
static uv_stream_t *handle_alloc(uv_loop_t *loop)
{
uv_stream_t *handle = calloc(1, sizeof(*handle));
if (!handle) {
return NULL;
}
return handle;
}
static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
......@@ -61,14 +104,20 @@ static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t*
* guaranteed to be unchanged only for the duration of
* udp_read() and tcp_read().
*/
struct session *session = handle->data;
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
buf->base = (char *)worker->wire_buf;
/* Use recvmmsg() on master sockets if possible. */
if (handle->data)
/* Limit TCP stream buffer size to 4K for granularity in batches of incoming queries. */
if (handle->type == UV_TCP) {
buf->len = MIN(suggested_size, 4096);
/* Regular buffer size for subrequests. */
} else if (session->is_subreq) {
buf->len = suggested_size;
else
/* Use recvmmsg() on master sockets if possible. */
} else {
buf->len = sizeof(worker->wire_buf);
}
}
void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
......@@ -78,7 +127,7 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
struct worker_ctx *worker = loop->data;
if (nread <= 0) {
if (nread < 0) { /* Error response, notify resolver */
worker_exec(worker, (uv_handle_t *)handle, NULL, addr);
worker_submit(worker, (uv_handle_t *)handle, NULL, addr);
} /* nread == 0 is for freeing buffers, we don't need to do this */
return;
}
......@@ -86,7 +135,7 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
knot_pkt_t *query = knot_pkt_new(buf->base, nread, &worker->pkt_pool);
if (query) {
query->max_size = KNOT_WIRE_MAX_PKTSIZE;
worker_exec(worker, (uv_handle_t *)handle, query, addr);
worker_submit(worker, (uv_handle_t *)handle, query, addr);
}
mp_flush(worker->pkt_pool.ctx);
}
......@@ -101,35 +150,53 @@ int udp_bind(uv_udp_t *handle, struct sockaddr *addr)
if (ret != 0) {
return ret;
}
handle->data = NULL;
check_bufsize((uv_handle_t *)handle);
/* Handle is already created, just create context. */
handle->data = session_new();
assert(handle->data);
return io_start_read((uv_handle_t *)handle);
}
static void tcp_timeout(uv_handle_t *timer)
{
uv_handle_t *handle = timer->data;
uv_close(handle, io_free);
}
static void tcp_timeout_trigger(uv_timer_t *timer)
{
uv_handle_t *handle = timer->data;
struct session *session = handle->data;
if (session->tasks.len > 0) {
uv_timer_again(timer);
} else {
uv_close((uv_handle_t *)timer, tcp_timeout);
}
}
static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
{
uv_loop_t *loop = handle->loop;
struct session *s = handle->data;
struct worker_ctx *worker = loop->data;
/* Check for originator connection close. */
if (nread <= 0) {
if (handle->data) {
worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
}
if (!uv_is_closing((uv_handle_t *)handle)) {
uv_close((uv_handle_t *)handle, handle_free);
}
return;
}
/* TCP pipelining is rather complicated and requires cooperation from the worker
* so the whole message reassembly and demuxing logic is inside worker */
int ret = worker_process_tcp(worker, (uv_handle_t *)handle, (const uint8_t *)buf->base, nread);
if (ret == 0) {
/* Push - pull, stop reading from this handle until
* the task is finished. Since the handle has no track of the
* pending tasks, it might be freed before the task finishes
* leading various errors. */
uv_unref((uv_handle_t *)handle);
io_stop_read((uv_handle_t *)handle);
if (ret < 0) {
worker_end_tcp(worker, (uv_handle_t *)handle);
/* Exceeded per-connection quota for outstanding requests
* stop reading from stream and close after last message is processed. */
if (!s->is_subreq && !uv_is_closing((uv_handle_t *)&s->timeout)) {
uv_timer_stop(&s->timeout);
if (s->tasks.len == 0) {
uv_close((uv_handle_t *)&s->timeout, tcp_timeout);
} else { /* If there are tasks running, defer until they finish. */
uv_timer_start(&s->timeout, tcp_timeout_trigger, 1, KR_CONN_RTT_MAX/2);
}
}
/* Connection spawned more than one request, reset its deadline for next query. */
} else if (ret > 0 && !s->is_subreq) {
uv_timer_again(&s->timeout);
}
mp_flush(worker->pkt_pool.ctx);
}
......@@ -139,41 +206,79 @@ static void tcp_accept(uv_stream_t *master, int status)
if (status != 0) {
return;
}
uv_stream_t *client = handle_alloc(master->loop, sizeof(*client));
uv_stream_t *client = handle_alloc(master->loop);
if (!client) {
return;
}
memset(client, 0, sizeof(*client));
io_create(master->loop, (uv_handle_t *)client, SOCK_STREAM);
if (uv_accept(master, client) != 0) {
handle_free((uv_handle_t *)client);
io_free((uv_handle_t *)client);
return;
}
/* Set deadlines for TCP connection and start reading.
* It will re-check every half of a request time limit if the connection
* is idle and should be terminated, this is an educated guess. */
struct session *session = client->data;
uv_timer_t *timer = &session->timeout;
uv_timer_init(master->loop, timer);
timer->data = client;
uv_timer_start(timer, tcp_timeout_trigger, KR_CONN_RTT_MAX/2, KR_CONN_RTT_MAX/2);
io_start_read((uv_handle_t *)client);
}
int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr)
static int set_tcp_option(uv_tcp_t *handle, int option, int val)
{
unsigned flags = UV_UDP_REUSEADDR;
uv_os_fd_t fd = 0;
if (uv_fileno((uv_handle_t *)handle, &fd) == 0) {
return setsockopt(fd, IPPROTO_TCP, option, &val, sizeof(val));
}
return 0; /* N/A */
}
static int _tcp_bind(uv_tcp_t *handle, struct sockaddr *addr, uv_connection_cb connection)
{
unsigned flags = 0;
if (addr->sa_family == AF_INET6) {
flags |= UV_UDP_IPV6ONLY;
flags |= UV_TCP_IPV6ONLY;
}
int ret = uv_tcp_bind(handle, addr, flags);
if (ret != 0) {
return ret;
}
ret = uv_listen((uv_stream_t *)handle, 16, tcp_accept);
/* TCP_DEFER_ACCEPT delays accepting connections until there is readable data. */
#ifdef TCP_DEFER_ACCEPT
if (set_tcp_option(handle, TCP_DEFER_ACCEPT, KR_CONN_RTT_MAX/1000) != 0) {
kr_log_info("[ io ] tcp_bind (defer_accept): %s\n", strerror(errno));
}
#endif
ret = uv_listen((uv_stream_t *)handle, 16, connection);
if (ret != 0) {
return ret;
}
/* TCP_FASTOPEN enables 1 RTT connection resumptions. */
#ifdef TCP_FASTOPEN
# ifdef __linux__
(void) set_tcp_option(handle, TCP_FASTOPEN, 16); /* Accepts queue length hint */
# else
(void) set_tcp_option(handle, TCP_FASTOPEN, 1); /* Accepts on/off */
# endif
#endif
handle->data = NULL;
return 0;
}
int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr)
{
return _tcp_bind(handle, addr, tcp_accept);
}
void io_create(uv_loop_t *loop, uv_handle_t *handle, int type)
{
if (type == SOCK_DGRAM) {
......@@ -182,6 +287,34 @@ void io_create(uv_loop_t *loop, uv_handle_t *handle, int type)
uv_tcp_init(loop, (uv_tcp_t *)handle);
uv_tcp_nodelay((uv_tcp_t *)handle, 1);
}
struct worker_ctx *worker = loop->data;
handle->data = session_borrow(worker);
assert(handle->data);
}
void io_deinit(uv_handle_t *handle)
{
if (!handle) {
return;
}
uv_loop_t *loop = handle->loop;
if (loop && loop->data) {
struct worker_ctx *worker = loop->data;
session_release(worker, handle->data);
} else {
session_free(handle->data);
}
handle->data = NULL;
}
void io_free(uv_handle_t *handle)
{
if (!handle) {
return;
}
io_deinit(handle);
free(handle);
}
int io_start_read(uv_handle_t *handle)
......
......@@ -18,9 +18,30 @@
#include <uv.h>
#include <libknot/packet/pkt.h>
#include "lib/generic/array.h"
struct qr_task;
/* Per-session (TCP or UDP) persistent structure,
* that exists between remote counterpart and a local socket.
*/
struct session {
bool is_subreq;
bool throttled;
uv_timer_t timeout;
struct qr_task *buffering;
array_t(struct qr_task *) tasks;
};
void session_free(struct session *s);
struct session *session_new(void);
int udp_bind(uv_udp_t *handle, struct sockaddr *addr);
int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr);
void io_create(uv_loop_t *loop, uv_handle_t *handle, int type);
void io_deinit(uv_handle_t *handle);
void io_free(uv_handle_t *handle);
int io_start_read(uv_handle_t *handle);
int io_stop_read(uv_handle_t *handle);
\ No newline at end of file
int io_stop_read(uv_handle_t *handle);
......@@ -21,7 +21,8 @@
#include "daemon/io.h"
/* libuv 1.7.0+ is able to support SO_REUSEPORT for loadbalancing */
#if (defined(ENABLE_REUSEPORT) || defined(UV_VERSION_HEX)) && (__linux__ && SO_REUSEPORT)
#if defined(UV_VERSION_HEX)
#if (__linux__ && SO_REUSEPORT)
#define handle_init(type, loop, handle, family) do { \
uv_ ## type ## _init_ex((loop), (handle), (family)); \
uv_os_fd_t fd = 0; \
......@@ -30,6 +31,12 @@
setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)); \
} \
} while (0)
/* libuv 1.7.0+ is able to assign fd immediately */
#else
#define handle_init(type, loop, handle, family) do { \
uv_ ## type ## _init_ex((loop), (handle), (family)); \
} while (0)
#endif
#else
#define handle_init(type, loop, handle, family) \
uv_ ## type ## _init((loop), (handle))
......
......@@ -24,7 +24,7 @@
enum endpoint_flag {
NET_DOWN = 0 << 0,
NET_UDP = 1 << 0,
NET_TCP = 1 << 1
NET_TCP = 1 << 1,
};
struct endpoint {
......
This diff is collapsed.
......@@ -20,9 +20,11 @@
#include "lib/generic/array.h"
#include "lib/generic/map.h"
/** @internal Number of request within timeout window. */
#define MAX_PENDING (KR_NSREP_MAXADDR + (KR_NSREP_MAXADDR / 2))
/** @cond internal Freelist of available mempools. */
typedef array_t(void *) mp_freelist_t;
/* @endcond */
/**
* Query resolution worker.
......@@ -32,6 +34,7 @@ struct worker_ctx {
uv_loop_t *loop;
int id;
int count;
unsigned tcp_pipeline_max;
#if __linux__
uint8_t wire_buf[RECVMMSG_BATCH * KNOT_WIRE_MAX_PKTSIZE];
#else
......@@ -48,27 +51,67 @@ struct worker_ctx {
size_t timeout;
} stats;
map_t outstanding;
mp_freelist_t pools;
mp_freelist_t ioreqs;
mp_freelist_t pool_mp;
mp_freelist_t pool_ioreq;
mp_freelist_t pool_sessions;
knot_mm_t pkt_pool;
};
/* Worker callback */
typedef void (*worker_cb_t)(struct worker_ctx *worker, struct kr_request *req, void *baton);
/** @internal Query resolution task. */
struct qr_task
{
struct kr_request req;
struct worker_ctx *worker;
struct session *session;
knot_pkt_t *pktbuf;
array_t(struct qr_task *) waiting;
uv_handle_t *pending[MAX_PENDING];
uint16_t pending_count;
uint16_t addrlist_count;
uint16_t addrlist_turn;
uint16_t timeouts;
uint16_t iter_count;
uint16_t bytes_remaining;
struct sockaddr *addrlist;
uv_timer_t *timeout;
worker_cb_t on_complete;
void *baton;
struct {
union {
struct sockaddr_in ip4;
struct sockaddr_in6 ip6;
} addr;
uv_handle_t *handle;
} source;
uint32_t refs;
bool finished : 1;
bool leading : 1;
};
/* @endcond */
/**
* Process incoming packet (query or answer to subrequest).
* @return 0 or an error code
*/
int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr);
int worker_submit(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr);
/**
* Process incoming DNS/TCP message fragment.
* Process incoming DNS/TCP message fragment(s).
* If the fragment contains only a partial message, it is buffered.
* If the fragment contains a complete query or completes current fragment, execute it.
* @return 0, number of bytes remaining to assemble, or an error code
* @return 0 or an error code
*/
int worker_process_tcp(struct worker_ctx *worker, uv_handle_t *handle, const uint8_t *msg, size_t len);
int worker_process_tcp(struct worker_ctx *worker, uv_handle_t *handle, const uint8_t *msg, ssize_t len);
/**
* End current DNS/TCP session, this disassociates pending tasks from this session
* which may be freely closed afterwards.
*/
int worker_end_tcp(struct worker_ctx *worker, uv_handle_t *handle);
/**
* Schedule query for resolution.
......
......@@ -54,6 +54,7 @@ static inline int __attribute__((__cold__)) kr_error(int x) {
#define KR_CONN_RETRY 300 /* Retry interval for network activity */
#define KR_ITER_LIMIT 50 /* Built-in iterator limit */
#define KR_CNAME_CHAIN_LIMIT 40 /* Built-in maximum CNAME chain length */
#define KR_TIMEOUT_LIMIT 4 /* Maximum number of retries after timeout. */
/*
* Defines.
......
......@@ -114,6 +114,7 @@ void kr_zonecut_deinit(struct kr_zonecut *cut)
map_clear(&cut->nsset);
knot_rrset_free(&cut->key, cut->pool);
knot_rrset_free(&cut->trust_anchor, cut->pool);
cut->name = NULL;
}
void kr_zonecut_set(struct kr_zonecut *cut, const knot_dname_t *name)
......
......@@ -10,8 +10,9 @@ deckard_DIR := tests/deckard
TESTS := sets/resolver
TEMPLATE := template/kresd.j2
$(deckard_DIR)/Makefile:
@git submodule update --init
@git submodule update --init --recursive
check-integration: $(deckard_DIR)/Makefile
@[ ! -d $(deckard_DIR)/contrib/libswrap/obj ] && mkdir $(deckard_DIR)/contrib/libswrap/obj
@$(MAKE) -s -C $(deckard_DIR) TESTS=$(TESTS) DAEMON=$(abspath daemon/kresd) TEMPLATE=$(TEMPLATE) DYLD_LIBRARY_PATH=$(DYLD_LIBRARY_PATH)
deckard: check-integration
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment