Verified Commit c55cbb02 authored by Grigorii Demidov's avatar Grigorii Demidov Committed by Petr Špaček

daemon: improved reliability under heavy load; bugfixing & minor refactoring

parent e7c5c102
......@@ -33,6 +33,8 @@
} \
} while (0)
void io_release(uv_handle_t *handle);
static void check_bufsize(uv_handle_t* handle)
{
/* We want to buffer at least N waves in advance.
......@@ -108,7 +110,7 @@ static void session_release(struct worker_ctx *worker, uv_handle_t *handle)
static uv_stream_t *handle_alloc(uv_loop_t *loop)
{
uv_stream_t *handle = calloc(1, sizeof(*handle));
uv_stream_t *handle = calloc(1, sizeof(union uv_handles));
if (!handle) {
return NULL;
}
......@@ -116,6 +118,17 @@ static uv_stream_t *handle_alloc(uv_loop_t *loop)
return handle;
}
static uv_stream_t *handle_borrow(uv_loop_t *loop)
{
struct worker_ctx *worker = loop->data;
void *req = worker_iohandle_borrow(worker);
if (!req) {
return NULL;
}
return (uv_stream_t *)req;
}
static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
{
/* Worker has single buffer which is reused for all incoming
......@@ -144,6 +157,10 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
{
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
struct session *s = handle->data;
if (s->closing) {
return;
}
if (nread <= 0) {
if (nread < 0) { /* Error response, notify resolver */
worker_submit(worker, (uv_handle_t *)handle, NULL, addr);
......@@ -165,6 +182,7 @@ static int udp_bind_finalize(uv_handle_t *handle)
/* Handle is already created, just create context. */
struct session *session = session_new();
assert(session);
session->outgoing = false;
session->handle = handle;
handle->data = session;
return io_start_read((uv_handle_t *)handle);
......@@ -249,14 +267,14 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls)
if (status != 0) {
return;
}
uv_stream_t *client = handle_alloc(master->loop);
uv_stream_t *client = handle_borrow(master->loop);
if (!client) {
return;
}
memset(client, 0, sizeof(*client));
io_create(master->loop, (uv_handle_t *)client, SOCK_STREAM);
if (uv_accept(master, client) != 0) {
uv_close((uv_handle_t *)client, io_free);
uv_close((uv_handle_t *)client, io_release);
return;
}
......@@ -264,13 +282,12 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls)
* It will re-check every half of a request time limit if the connection
* is idle and should be terminated, this is an educated guess. */
struct session *session = client->data;
assert(session->outgoing == false);
session->has_tls = tls;
if (tls && !session->tls_ctx) {
session->tls_ctx = tls_new(master->loop->data);
}
uv_timer_t *timer = &session->timeout;
uv_timer_init(master->loop, timer);
timer->data = session;
uv_timer_start(timer, tcp_timeout_trigger, KR_CONN_RTT_MAX/2, KR_CONN_RTT_MAX/2);
io_start_read((uv_handle_t *)client);
}
......@@ -376,13 +393,14 @@ int tcp_bindfd_tls(uv_tcp_t *handle, int fd)
void io_create(uv_loop_t *loop, uv_handle_t *handle, int type)
{
int ret = -1;
if (type == SOCK_DGRAM) {
uv_udp_init(loop, (uv_udp_t *)handle);
} else {
uv_tcp_init(loop, (uv_tcp_t *)handle);
ret = uv_udp_init(loop, (uv_udp_t *)handle);
} else if (type == SOCK_STREAM) {
ret = uv_tcp_init(loop, (uv_tcp_t *)handle);
uv_tcp_nodelay((uv_tcp_t *)handle, 1);
}
assert(ret == 0);
struct worker_ctx *worker = loop->data;
struct session *session = session_borrow(worker);
assert(session);
......@@ -417,13 +435,25 @@ void io_free(uv_handle_t *handle)
free(handle);
}
void io_release(uv_handle_t *handle)
{
if (!handle) {
return;
}
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
io_deinit(handle);
worker_iohandle_release(worker, handle);
}
int io_start_read(uv_handle_t *handle)
{
if (handle->type == UV_UDP) {
return uv_udp_recv_start((uv_udp_t *)handle, &handle_getbuf, &udp_recv);
} else {
} else if (handle->type == UV_TCP) {
return uv_read_start((uv_stream_t *)handle, &handle_getbuf, &tcp_recv);
}
assert(false);
}
int io_stop_read(uv_handle_t *handle)
......
......@@ -140,7 +140,7 @@ static int open_endpoint(struct network *net, struct endpoint *ep, struct sockad
{
int ret = 0;
if (flags & NET_UDP) {
ep->udp = malloc(sizeof(*ep->udp));
ep->udp = malloc(sizeof(union uv_handles));
if (!ep->udp) {
return kr_error(ENOMEM);
}
......@@ -153,7 +153,7 @@ static int open_endpoint(struct network *net, struct endpoint *ep, struct sockad
ep->flags |= NET_UDP;
}
if (flags & NET_TCP) {
ep->tcp = malloc(sizeof(*ep->tcp));
ep->tcp = malloc(sizeof(union uv_handles));
if (!ep->tcp) {
return kr_error(ENOMEM);
}
......@@ -185,7 +185,7 @@ static int open_endpoint_fd(struct network *net, struct endpoint *ep, int fd, in
if (ep->udp) {
return kr_error(EEXIST);
}
ep->udp = malloc(sizeof(*ep->udp));
ep->udp = malloc(sizeof(union uv_handles));// malloc(sizeof(*ep->udp));
if (!ep->udp) {
return kr_error(ENOMEM);
}
......@@ -197,12 +197,11 @@ static int open_endpoint_fd(struct network *net, struct endpoint *ep, int fd, in
}
ep->flags |= NET_UDP;
return kr_ok();
}
if (sock_type == SOCK_STREAM) {
} else if (sock_type == SOCK_STREAM) {
if (ep->tcp) {
return kr_error(EEXIST);
}
ep->tcp = malloc(sizeof(*ep->tcp));
ep->tcp = malloc(sizeof(union uv_handles));
if (!ep->tcp) {
return kr_error(ENOMEM);
}
......
This diff is collapsed.
......@@ -27,6 +27,7 @@ struct qr_task;
struct worker_ctx;
/** Transport session (opaque). */
struct session;
/** Union of various libuv objects for freelist. */
/** Worker callback */
typedef void (*worker_cb_t)(struct worker_ctx *worker, struct kr_request *req, void *baton);
......@@ -80,6 +81,11 @@ void worker_reclaim(struct worker_ctx *worker);
/** Closes given session */
void worker_session_close(struct session *session);
void *worker_iohandle_borrow(struct worker_ctx *worker);
void worker_iohandle_release(struct worker_ctx *worker, void *h);
/** @cond internal */
......@@ -135,11 +141,31 @@ struct worker_ctx {
map_t tcp_waiting;
map_t outgoing;
mp_freelist_t pool_mp;
mp_freelist_t pool_ioreq;
mp_freelist_t pool_ioreqs;
mp_freelist_t pool_sessions;
mp_freelist_t pool_iohandles;
knot_mm_t pkt_pool;
};
/* @internal Union of derivatives from libuv uv_handle_t for freelist.
* These have session as their `handle->data` and own it. */
union uv_handles {
uv_handle_t handle;
uv_stream_t stream;
uv_udp_t udp;
uv_tcp_t tcp;
uv_timer_t timer;
};
/* @internal Union of derivatives from uv_req_t libuv request handles for freelist.
* These have only a reference to the task they're operating on. */
union uv_reqs {
uv_req_t req;
uv_shutdown_t sdown;
uv_write_t write;
uv_connect_t connect;
uv_udp_send_t send;
};
/** @endcond */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment