worker.c 2.16 KB
Newer Older
1 2 3
#include <uv.h>

#include <libknot/packet/pkt.h>
4
#include <libknot/internal/net.h>
5 6 7 8

#include "daemon/worker.h"
#include "daemon/layer/query.h"

9
static void buf_alloc(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
10 11 12 13 14 15
{
	struct worker_ctx *worker = handle->data;
	buf->base = mm_alloc(worker->pool, suggested_size);
	buf->len = suggested_size;
}

16 17 18 19 20 21
static void buf_free(uv_handle_t* handle, const uv_buf_t* buf)
{
	struct worker_ctx *worker = handle->data;
	mm_free(worker->pool, buf->base);
}

22 23 24 25 26 27 28 29 30 31 32 33 34
static void worker_send(uv_udp_t *handle, knot_pkt_t *answer, const struct sockaddr *addr)
{
	uv_buf_t sendbuf = uv_buf_init((char *)answer->wire, answer->size);
	uv_udp_try_send(handle, &sendbuf, 1, addr);
}

static void worker_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
                        const struct sockaddr *addr, unsigned flags)
{
	struct worker_ctx *ctx = handle->data;
	assert(ctx->pool);

	if (nread < KNOT_WIRE_HEADER_SIZE) {
35
		buf_free((uv_handle_t *)handle, buf);
36 37 38 39 40 41
		return;
	}

	struct kr_result result;

	/* Create query processing context. */
42
	struct kr_layer_param param;
43 44 45 46
	param.ctx = &ctx->resolve;
	param.result = &result;

	/* Process query packet. */
47 48 49
	knot_layer_t proc;
	memset(&proc, 0, sizeof(knot_layer_t));
	proc.mm = ctx->pool;
50 51 52 53 54
	knot_layer_begin(&proc, LAYER_QUERY, &param);

	knot_pkt_t *query = knot_pkt_new((uint8_t *)buf->base, nread, ctx->pool);
	knot_pkt_parse(query, 0);
	int state = knot_layer_in(&proc, query);
55
	if (state & (KNOT_NS_PROC_DONE|KNOT_NS_PROC_FAIL)) {
56 57 58 59
		worker_send(handle, result.ans, addr);
	}

	/* Cleanup. */
60
	knot_layer_finish(&proc);
61 62
	kr_result_deinit(&result);
	kr_context_reset(&ctx->resolve);
63 64

	buf_free((uv_handle_t *)handle, buf);
65
	knot_pkt_free(&query);
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
}

void worker_init(struct worker_ctx *worker, mm_ctx_t *mm)
{
	memset(worker, 0, sizeof(struct worker_ctx));
	worker->pool = mm;

	kr_context_init(&worker->resolve, mm);
}

void worker_deinit(struct worker_ctx *worker)
{
	kr_context_deinit(&worker->resolve);
}

void worker_start(uv_udp_t *handle, struct worker_ctx *worker)
{

	handle->data = worker;
85
	uv_udp_recv_start(handle, &buf_alloc, &worker_recv);
86 87 88 89 90 91
}

void worker_stop(uv_udp_t *handle)
{
	uv_udp_recv_stop(handle);
}