Commit c7d62055 authored by Marek Vavruša's avatar Marek Vavruša

daemon/worker: reworked multiplexed worker

* each query is assigned a task
* each task contains request, some primitives and mempool
* worker can process multiple tasks at once and
  offload I/O to event loop

Not finished:

* it depends on icmp/system timeouts, #22
* tcp reads are going to be bad if the messages
  arrive fragmented #21
parent fadc042d
......@@ -21,62 +21,50 @@
#include "daemon/network.h"
#include "daemon/worker.h"
#define ENDPOINT_BUFSIZE 512 /**< This is an artificial limit for DNS query. */
static void buf_get(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
static void *handle_alloc(uv_loop_t *loop, size_t size)
{
#warning TODO: freelist from worker allocation
buf->base = malloc(ENDPOINT_BUFSIZE);
if (buf->base) {
buf->len = ENDPOINT_BUFSIZE;
} else {
buf->len = 0;
uv_handle_t *handle = malloc(size);
if (handle) {
memset(handle, 0, size);
}
return handle;
}
static void handle_free(uv_handle_t *handle)
{
free(handle);
}
int udp_send(uv_udp_t *handle, knot_pkt_t *answer, const struct sockaddr *addr)
static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
{
uv_buf_t sendbuf = uv_buf_init((char *)answer->wire, answer->size);
return uv_udp_try_send(handle, &sendbuf, 1, addr);
/* Worker has single buffer which is reused for all incoming
* datagrams / stream reads, the content of the buffer is
* guaranteed to be unchanged only for the duration of
* udp_read() and tcp_read().
*/
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
buf->base = (char *)worker->bufs.wire;
buf->len = sizeof(worker->bufs.wire);
}
static void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
const struct sockaddr *addr, unsigned flags)
{
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
/* Check the incoming wire length. */
if (nread < KNOT_WIRE_HEADER_SIZE) {
return;
if (nread > KNOT_WIRE_HEADER_SIZE) {
knot_pkt_t *query = knot_pkt_new(buf->base, nread, worker->mm);
worker_exec(worker, (uv_handle_t *)handle, query, addr);
knot_pkt_free(&query);
}
/* Create packets */
knot_pkt_t *query = knot_pkt_new(buf->base, nread, worker->mm);
knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, worker->mm);
/* Resolve */
int ret = worker_exec(worker, (uv_handle_t *)handle, answer, query);
if (ret == KNOT_EOK && answer->size > 0) {
udp_send(handle, answer, addr);
/* UDP requests are oneshot, always close afterwards */
if (handle->data) { /* Do not free master socket */
uv_close((uv_handle_t *)handle, handle_free);
}
/* Cleanup */
knot_pkt_free(&query);
knot_pkt_free(&answer);
free(buf->base);
}
static uv_udp_t *udp_create(uv_loop_t *loop)
{
uv_udp_t *handle = malloc(sizeof(uv_udp_t));
if (!handle) {
return handle;
}
uv_udp_init(loop, handle);
return handle;
}
int udp_bind(struct endpoint *ep, struct sockaddr *addr)
......@@ -87,7 +75,8 @@ int udp_bind(struct endpoint *ep, struct sockaddr *addr)
return ret;
}
return uv_udp_recv_start(handle, &buf_get, &udp_recv);
handle->data = NULL;
return uv_udp_recv_start(handle, &handle_getbuf, &udp_recv);
}
void udp_unbind(struct endpoint *ep)
......@@ -102,58 +91,31 @@ static void tcp_unbind_handle(uv_handle_t *handle)
uv_read_stop((uv_stream_t *)handle);
}
static void tcp_send(uv_handle_t *handle, const knot_pkt_t *answer)
{
uint16_t pkt_size = htons(answer->size);
uv_buf_t buf[2];
buf[0].base = (char *)&pkt_size;
buf[0].len = sizeof(pkt_size);
buf[1].base = (char *)answer->wire;
buf[1].len = answer->size;
uv_try_write((uv_stream_t *)handle, buf, 2);
}
static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
{
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
/* Check the incoming wire length (malformed, EOF or error). */
if (nread < (ssize_t) sizeof(uint16_t)) {
tcp_unbind_handle((uv_handle_t *)handle);
uv_close((uv_handle_t *)handle, (uv_close_cb) free);
/* Check for connection close */
if (nread <= 0) {
uv_close((uv_handle_t *)handle, handle_free);
return;
} else if (nread < 2) {
/* Not enough bytes to read length */
return;
}
/* Set packet size */
nread = wire_read_u16((const uint8_t *)buf->base);
/* Create packets */
knot_pkt_t *query = knot_pkt_new(buf->base + sizeof(uint16_t), nread, worker->mm);
knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, worker->mm);
/** @todo This is not going to work if the packet is fragmented in the stream ! */
uint16_t nbytes = wire_read_u16((const uint8_t *)buf->base);
/* Resolve */
int ret = worker_exec(worker, (uv_handle_t *)handle, answer, query);
if (ret == KNOT_EOK && answer->size > 0) {
tcp_send((uv_handle_t *)handle, answer);
/* Check if there's enough data and execute */
if (nbytes + 2 < nread) {
return;
}
/* Cleanup */
knot_pkt_t *query = knot_pkt_new(buf->base + 2, nbytes, worker->mm);
worker_exec(worker, (uv_handle_t *)handle, query, NULL);
knot_pkt_free(&query);
knot_pkt_free(&answer);
free(buf->base);
}
static uv_tcp_t *tcp_create(uv_loop_t *loop)
{
uv_tcp_t *handle = malloc(sizeof(uv_tcp_t));
if (!handle) {
return handle;
}
uv_tcp_init(loop, handle);
return handle;
}
static void tcp_accept(uv_stream_t *master, int status)
......@@ -162,13 +124,13 @@ static void tcp_accept(uv_stream_t *master, int status)
return;
}
uv_tcp_t *client = tcp_create(master->loop);
if (!client || uv_accept(master, (uv_stream_t*)client) != 0) {
free(client);
uv_stream_t *client = (uv_stream_t *)io_create(master->loop, SOCK_STREAM);
if (!client || uv_accept(master, client) != 0) {
handle_free((uv_handle_t *)client);
return;
}
uv_read_start((uv_stream_t*)client, buf_get, tcp_recv);
uv_read_start(client, handle_getbuf, tcp_recv);
}
int tcp_bind(struct endpoint *ep, struct sockaddr *addr)
......@@ -185,6 +147,7 @@ int tcp_bind(struct endpoint *ep, struct sockaddr *addr)
return ret;
}
handle->data = NULL;
return 0;
}
......@@ -196,28 +159,18 @@ void tcp_unbind(struct endpoint *ep)
uv_handle_t *io_create(uv_loop_t *loop, int type)
{
uv_handle_t *handle = NULL;
if (type == SOCK_DGRAM) {
handle = (uv_handle_t *)udp_create(loop);
uv_udp_t *handle = handle_alloc(loop, sizeof(*handle));
if (handle) {
uv_udp_recv_start((uv_udp_t *)handle, &buf_get, &udp_recv);
uv_udp_init(loop, handle);
uv_udp_recv_start(handle, &handle_getbuf, &udp_recv);
}
return (uv_handle_t *)handle;
} else {
handle = (uv_handle_t *)tcp_create(loop);
uv_tcp_t *handle = handle_alloc(loop, sizeof(*handle));
if (handle) {
uv_read_start((uv_stream_t*)handle, buf_get, tcp_recv);
uv_tcp_init(loop, handle);
}
return (uv_handle_t *)handle;
}
return handle;
}
uv_connect_t *io_connect(uv_handle_t *handle, struct sockaddr *addr, uv_connect_cb on_connect)
{
uv_connect_t* connect = malloc(sizeof(uv_connect_t));
if (uv_tcp_connect(connect, (uv_tcp_t *)handle, addr, on_connect) != 0) {
free(connect);
return NULL;
}
return connect;
}
......@@ -20,11 +20,8 @@
#include <libknot/packet/pkt.h>
struct endpoint;
int udp_send(uv_udp_t *handle, knot_pkt_t *answer, const struct sockaddr *addr);
int udp_bind(struct endpoint *ep, struct sockaddr *addr);
void udp_unbind(struct endpoint *ep);
int tcp_bind(struct endpoint *ep, struct sockaddr *addr);
void tcp_unbind(struct endpoint *ep);
uv_handle_t *io_create(uv_loop_t *loop, int type);
uv_connect_t *io_connect(uv_handle_t *handle, struct sockaddr *addr, uv_connect_cb on_connect);
\ No newline at end of file
......@@ -28,10 +28,23 @@
struct qr_task
{
struct kr_request req;
knot_pkt_t *pending;
uv_handle_t *handle;
knot_pkt_t *next_query;
union {
uv_write_t tcp_send;
uv_udp_send_t udp_send;
uv_connect_t connect;
} ioreq;
struct {
union {
struct sockaddr_in ip4;
struct sockaddr_in6 ip6;
} addr;
uv_handle_t *handle;
} source;
};
static int qr_task_step(struct qr_task *task, knot_pkt_t *packet);
static int parse_query(knot_pkt_t *query)
{
/* Parse query packet. */
......@@ -45,15 +58,10 @@ static int parse_query(knot_pkt_t *query)
return kr_error(EMSGSIZE);
}
/* Accept only queries, no authoritative service. */
if (knot_wire_get_qr(query->wire) || !knot_wire_get_rd(query->wire)) {
return kr_error(EINVAL); /* Ignore. */
}
return kr_ok();
}
static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle)
static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, const struct sockaddr *addr)
{
mm_ctx_t pool;
mm_ctx_mempool(&pool, MM_DEFAULT_BLKSIZE);
......@@ -66,42 +74,121 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
return NULL;
}
task->req.pool = pool;
task->handle = handle;
task->source.handle = handle;
if (addr) {
memcpy(&task->source.addr, addr, sockaddr_len(addr));
}
#warning TODO: devise a better scheme to manage answer buffer, it needs copy each time now
/* Create buffers */
knot_pkt_t *pending = knot_pkt_new(NULL, KNOT_WIRE_MIN_PKTSIZE, &task->req.pool);
knot_pkt_t *next_query = knot_pkt_new(NULL, KNOT_WIRE_MIN_PKTSIZE, &task->req.pool);
knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, &task->req.pool);
if (!pending || !answer) {
if (!next_query || !answer) {
mp_delete(pool.ctx);
return NULL;
}
task->req.answer = answer;
task->pending = pending;
task->next_query = next_query;
/* Start resolution */
kr_resolve_begin(&task->req, &engine->resolver, answer);
return task;
}
static int qr_task_finalize(struct qr_task *task, knot_pkt_t *dst, int state)
static void qr_task_on_send(uv_req_t* req, int status)
{
struct qr_task *task = req->data;
if (task) {
/* Failed to send, invalidate */
if (status != 0) {
qr_task_step(task, NULL);
}
if (task->req.overlay.state == KNOT_STATE_NOOP) {
mp_delete(task->req.pool.ctx);
}
}
}
static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt)
{
if (handle->type == UV_UDP) {
uv_buf_t buf = { (char *)pkt->wire, pkt->size };
uv_udp_send_t *req = &task->ioreq.udp_send;
req->data = task;
return uv_udp_send(req, (uv_udp_t *)handle, &buf, 1, addr, (uv_udp_send_cb)qr_task_on_send);
} else {
uint16_t pkt_size = htons(pkt->size);
uv_buf_t buf[2] = {
{ (char *)&pkt_size, sizeof(pkt_size) },
{ (char *)pkt->wire, pkt->size }
};
uv_write_t *req = &task->ioreq.tcp_send;
req->data = task;
return uv_write(req, (uv_stream_t *)handle, buf, 2, (uv_write_cb)qr_task_on_send);
}
}
static void qr_task_on_connect(uv_connect_t *connect, int status)
{
uv_stream_t *handle = connect->handle;
struct qr_task *task = connect->data;
if (status != 0) { /* Failed to connect */
qr_task_step(task, NULL);
} else {
qr_task_send(task, (uv_handle_t *)handle, NULL, task->next_query);
}
}
static int qr_task_finalize(struct qr_task *task, int state)
{
knot_pkt_t *answer = task->req.answer;
kr_resolve_finish(&task->req, state);
memcpy(dst->wire, answer->wire, answer->size);
dst->size = answer->size;
#warning TODO: send answer asynchronously
mp_delete(task->req.pool.ctx);
qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer);
return state == KNOT_STATE_DONE ? 0 : kr_error(EIO);
}
static void qr_task_on_connect(uv_connect_t *connect, int status)
static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
{
#warning TODO: if not connected, retry
#warning TODO: if connected, send pending query
/* Consume input and produce next query */
assert(task);
int sock_type = -1;
struct sockaddr *addr = NULL;
knot_pkt_t *next_query = task->next_query;
int state = kr_resolve_consume(&task->req, packet);
while (state == KNOT_STATE_PRODUCE) {
state = kr_resolve_produce(&task->req, &addr, &sock_type, next_query);
}
/* We're done, no more iterations needed */
if (state & (KNOT_STATE_DONE|KNOT_STATE_FAIL)) {
return qr_task_finalize(task, state);
}
/* Create connection for iterative query */
uv_handle_t *source_handle = task->source.handle;
uv_handle_t *next_handle = io_create(source_handle->loop, sock_type);
if (next_handle == NULL) {
return qr_task_finalize(task, KNOT_STATE_FAIL);
}
/* Connect or issue query datagram */
next_handle->data = task;
if (sock_type == SOCK_STREAM) {
uv_connect_t *connect = &task->ioreq.connect;
if (uv_tcp_connect(connect, (uv_tcp_t *)next_handle, addr, qr_task_on_connect) != 0) {
uv_close(next_handle, (uv_close_cb) free);
return qr_task_step(task, NULL);
}
connect->data = task;
} else {
if (qr_task_send(task, next_handle, addr, next_query) != 0) {
uv_close(next_handle, (uv_close_cb) free);
return qr_task_step(task, NULL);
}
}
return kr_ok();
}
int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *answer, knot_pkt_t *query)
int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr)
{
if (!worker) {
return kr_error(EINVAL);
......@@ -113,43 +200,20 @@ int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *answ
return ret;
}
/* Get pending request or start new */
/* Start new task on master sockets, or resume existing */
struct qr_task *task = handle->data;
if (!task) {
task = qr_task_create(worker, handle);
bool is_master_socket = (!task);
if (is_master_socket) {
/* Accept only queries */
if (knot_wire_get_qr(query->wire)) {
return kr_error(EINVAL); /* Ignore. */
}
task = qr_task_create(worker, handle, addr);
if (!task) {
return kr_error(ENOMEM);
}
}
/* Consume input and produce next query */
int proto = 0;
struct sockaddr *addr = NULL;
#warning TODO: it shouldnt be needed to provide NULL answer if I/O fails
int state = kr_resolve_consume(&task->req, query);
while (state == KNOT_STATE_PRODUCE) {
state = kr_resolve_produce(&task->req, &addr, &proto, task->pending);
}
if (state & (KNOT_STATE_DONE|KNOT_STATE_FAIL)) {
return qr_task_finalize(task, answer, state);
}
/* Create connection for iterative query */
uv_handle_t *next_handle = io_create(handle->loop, proto);
#warning TODO: improve error checking
next_handle->data = task;
if (proto == SOCK_STREAM) {
uv_connect_t *connect = io_connect(next_handle, addr, qr_task_on_connect);
if (!connect) {
#warning TODO: close next_handle
return kr_error(ENOMEM);
}
} else {
/* Fake connection as libuv doesn't support connected UDP */
uv_connect_t fake_connect;
fake_connect.handle = (uv_stream_t *)next_handle;
qr_task_on_connect(&fake_connect, 0);
}
return kr_ok();
return qr_task_step(task, query);
}
......@@ -27,6 +27,9 @@ struct worker_ctx {
struct engine *engine;
uv_loop_t *loop;
mm_ctx_t *mm;
struct {
uint8_t wire[KNOT_WIRE_MAX_PKTSIZE];
} bufs;
};
/**
......@@ -36,6 +39,7 @@ struct worker_ctx {
* @param handle
* @param answer
* @param query
* @param addr
* @return 0, error code
*/
int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *answer, knot_pkt_t *query);
int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr);
......@@ -351,8 +351,8 @@ int kr_resolve_finish(struct kr_request *request, int state)
}
/* Clean up. */
knot_overlay_reset(&request->overlay);
knot_overlay_deinit(&request->overlay);
request->overlay.state = KNOT_STATE_NOOP;
kr_rplan_deinit(&request->rplan);
return KNOT_STATE_DONE;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment