Commit 13dcfdc1 authored by Marek Vavruša's avatar Marek Vavruša

daemon/worker: keep freelist of mempools for recycling

parent f2b028e7
......@@ -23,6 +23,9 @@
#ifndef LRU_REP_SIZE
#define LRU_REP_SIZE (LRU_RTT_SIZE / 2) /**< NS reputation cache size */
#endif
#ifndef MP_FREELIST_SIZE
#define MP_FREELIST_SIZE 32 /**< Maximum length of the worker mempool freelist */
#endif
/*
* @internal These are forward decls to allow building modules with engine but without Lua.
......
......@@ -44,8 +44,8 @@ static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t*
*/
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
buf->base = (char *)worker->bufs.wire;
buf->len = sizeof(worker->bufs.wire);
buf->base = (char *)worker->wire_buf;
buf->len = sizeof(worker->wire_buf);
}
void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
......@@ -59,7 +59,7 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
}
knot_pkt_t *query = knot_pkt_new(buf->base, nread, worker->mm);
query->max_size = sizeof(worker->bufs.wire);
query->max_size = sizeof(worker->wire_buf);
worker_exec(worker, (uv_handle_t *)handle, query, addr);
knot_pkt_free(&query);
}
......@@ -109,7 +109,7 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
}
knot_pkt_t *query = knot_pkt_new(buf->base + 2, nbytes, worker->mm);
query->max_size = sizeof(worker->bufs.wire);
query->max_size = sizeof(worker->wire_buf);
int ret = worker_exec(worker, (uv_handle_t *)handle, query, NULL);
if (ret == 0) {
/* Push - pull, stop reading from this handle until
......
......@@ -153,6 +153,7 @@ int main(int argc, char **argv)
.mm = NULL,
};
loop->data = &worker;
worker_reserve(&worker, MP_FREELIST_SIZE);
/* Bind to sockets. */
if (addr != NULL) {
......@@ -185,6 +186,7 @@ int main(int argc, char **argv)
/* Cleanup. */
fprintf(stderr, "\n[system] quitting\n");
engine_deinit(&engine);
worker_reclaim(&worker);
if (ret != 0) {
ret = EXIT_FAILURE;
......
......@@ -27,6 +27,7 @@
struct qr_task
{
struct kr_request req;
struct worker_ctx *worker;
knot_pkt_t *next_query;
uv_handle_t *next_handle;
uv_timer_t timeout;
......@@ -62,10 +63,17 @@ static int parse_query(knot_pkt_t *query)
static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr *addr)
{
/* Recycle available mempool if possible */
mm_ctx_t pool = {
.ctx = mp_new (4096),
.ctx = NULL,
.alloc = (mm_alloc_t) mp_alloc
};
if (worker->pools.len > 0) {
pool.ctx = array_tail(worker->pools);
array_pop(worker->pools);
} else { /* No mempool on the freelist, create new one */
pool.ctx = mp_new (KNOT_WIRE_MAX_PKTSIZE);
}
/* Create worker task */
struct engine *engine = worker->engine;
......@@ -75,6 +83,7 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
mp_delete(pool.ctx);
return NULL;
}
task->worker = worker;
task->req.pool = pool;
task->source.handle = handle;
if (addr) {
......@@ -120,7 +129,15 @@ static void qr_task_free(uv_handle_t *handle)
uv_ref(task->source.handle);
io_start_read(task->source.handle);
}
mp_delete(task->req.pool.ctx);
/* Return mempool to ring or free it if it's full */
struct worker_ctx *worker = task->worker;
void *mp_context = task->req.pool.ctx;
if (worker->pools.len < MP_FREELIST_SIZE) {
mp_flush(mp_context);
array_push(worker->pools, mp_context);
} else {
mp_delete(mp_context);
}
}
static void qr_task_timeout(uv_timer_t *req)
......@@ -264,3 +281,18 @@ int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *quer
/* Consume input and produce next query */
return qr_task_step(task, query);
}
int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
{
array_init(worker->pools);
return array_reserve(worker->pools, ring_maxlen);
}
void worker_reclaim(struct worker_ctx *worker)
{
mp_freelist_t *pools = &worker->pools;
for (unsigned i = 0; i < pools->len; ++i) {
mp_delete(pools->at[i]);
}
array_clear(*pools);
}
......@@ -19,6 +19,11 @@
#include <libknot/internal/mempattern.h>
#include "daemon/engine.h"
#include "lib/generic/array.h"
/* @cond internal Freelist of available mempools. */
typedef array_t(void *) mp_freelist_t;
/* @endcond */
/**
* Query resolution worker.
......@@ -27,9 +32,8 @@ struct worker_ctx {
struct engine *engine;
uv_loop_t *loop;
mm_ctx_t *mm;
struct {
uint8_t wire[KNOT_WIRE_MAX_PKTSIZE];
} bufs;
uint8_t wire_buf[KNOT_WIRE_MAX_PKTSIZE];
mp_freelist_t pools;
};
/**
......@@ -43,3 +47,9 @@ struct worker_ctx {
* @return 0, error code
*/
int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr);
/** Reserve worker buffers */
int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen);
/** Collect worker mempools */
void worker_reclaim(struct worker_ctx *worker);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment