Commit 7afffaa9 authored by Marek Vavruša's avatar Marek Vavruša Committed by Marek Vavrusa

daemon: unified query completion callback with trace callback in resolve

This is a followup on addition of trace callbacks in the resolver library,
to get rid of the Lua/C interfacing in daemon and unify it with the log tracing.
All modules can now install completion callback on the kr_request object that
will be called after the resolution is done.
parent e25358d4
......@@ -435,14 +435,13 @@ Environment
> user('root')
Operation not permitted
.. function:: resolve(name, type[, class = kres.class.IN, options = 0, finish = nil, begin = nil])
.. function:: resolve(name, type[, class = kres.class.IN, options = 0, finish = nil])
:param string name: Query name (e.g. 'com.')
:param number type: Query type (e.g. ``kres.type.NS``)
:param number class: Query class *(optional)* (e.g. ``kres.class.IN``)
:param number options: Resolution options (see query flags)
:param function finish: Callback to be executed when resolution completes (e.g. `function cb (pkt, req) end`). The callback gets a packet containing the final answer and doesn't have to return anything.
:param function begin: Callback to be executed when the request is created.
:return: boolean
The function can also be executed with a table of arguments instead. This is useful if you'd like to skip some arguments, for example:
......@@ -452,8 +451,7 @@ Environment
resolve {
name = 'example.com',
type = kres.type.AAAA,
finish = function ()
print('done')
init = function (req)
end,
}
......
......@@ -1185,21 +1185,6 @@ int lib_event(lua_State *L)
return 1;
}
/* @internal Call the Lua callback stored in baton. */
static void resolve_callback(struct worker_ctx *worker, struct kr_request *req, void *baton)
{
assert(worker);
assert(req);
assert(baton);
lua_State *L = worker->engine->L;
intptr_t cb_ref = (intptr_t) baton;
lua_rawgeti(L, LUA_REGISTRYINDEX, cb_ref);
luaL_unref(L, LUA_REGISTRYINDEX, cb_ref);
lua_pushlightuserdata(L, req->answer);
lua_pushlightuserdata(L, req);
(void) execute_callback(L, 2);
}
static int wrk_resolve(lua_State *L)
{
struct worker_ctx *worker = wrk_luaget(L);
......@@ -1264,17 +1249,9 @@ static int wrk_resolve(lua_State *L)
lua_error(L);
}
/* Store completion callback in registry */
/* Add initialisation callback */
if (lua_isfunction(L, 5)) {
lua_pushvalue(L, 5);
int cb = luaL_ref(L, LUA_REGISTRYINDEX);
task->on_complete = resolve_callback;
task->baton = (void *) (intptr_t)cb;
}
/* Add initialisation callback */
if (lua_isfunction(L, 6)) {
lua_pushvalue(L, 6);
lua_pushlightuserdata(L, &task->req);
(void) execute_callback(L, 1);
}
......
......@@ -11,6 +11,7 @@ typedef struct knot_mm {
typedef void *(*map_alloc_f)(void *, size_t);
typedef void (*map_free_f)(void *baton, void *ptr);
typedef void (*trace_log_f) (const struct kr_query *, const char *, const char *);
typedef void (*trace_callback_f)(struct kr_request *);
typedef enum {KNOT_ANSWER, KNOT_AUTHORITY, KNOT_ADDITIONAL} knot_section_t;
typedef struct {
uint16_t pos;
......@@ -161,6 +162,7 @@ struct kr_request {
struct kr_rplan rplan;
int has_tls;
trace_log_f trace_log;
trace_callback_f trace_finish;
knot_mm_t pool;
};
enum kr_rank {KR_RANK_INITIAL, KR_RANK_OMIT, KR_RANK_TRY, KR_RANK_INDET = 4, KR_RANK_BOGUS, KR_RANK_MISMATCH, KR_RANK_MISSING, KR_RANK_INSECURE, KR_RANK_AUTH = 16, KR_RANK_SECURE = 32};
......
......@@ -33,6 +33,7 @@ typedef struct knot_mm {
typedef void *(*map_alloc_f)(void *, size_t);
typedef void (*map_free_f)(void *baton, void *ptr);
typedef void (*trace_log_f) (const struct kr_query *, const char *, const char *);
typedef void (*trace_callback_f)(struct kr_request *);
"
./scripts/gen-cdefs.sh libkres types <<-EOF
......
local ffi = require('ffi')
-- Units
kB = 1024
MB = 1024*kB
......@@ -28,7 +30,7 @@ if rawget(kres, 'str2dname') ~= nil then
end
-- Compatibility wrapper for query flags.
worker.resolve = function (qname, qtype, qclass, options, finish, begin)
worker.resolve = function (qname, qtype, qclass, options, finish, init)
-- Alternatively use named arguments
if type(qname) == 'table' then
local t = qname
......@@ -37,11 +39,28 @@ worker.resolve = function (qname, qtype, qclass, options, finish, begin)
qclass = t.class or kres.class.IN
options = t.options
finish = t.finish
begin = t.begin
init = t.init
end
local init_cb, finish_cb = init, nil
if finish then
-- Create callback for finalization
finish_cb = ffi.cast('trace_callback_f', function (req)
req = kres.request_t(req)
finish(req.answer, req)
finish_cb:free()
end)
-- Wrap initialiser to install finish callback
init_cb = function (req)
req = kres.request_t(req)
if init then init(req) end
req.trace_finish = finish_cb
end
end
-- Translate options and resolve
options = kres.mk_qflags(options)
return worker.resolve_unwrapped(qname, qtype, qclass, options, finish, begin)
return worker.resolve_unwrapped(qname, qtype, qclass, options, init_cb)
end
resolve = worker.resolve
......@@ -280,7 +299,6 @@ function table_print (tt, indent, done)
return result
end
--
-- This extends the worker module to allow asynchronous execution of functions and nonblocking I/O.
-- The current implementation combines cqueues for Lua interface, and event.socket() in order to not
-- block resolver engine while waiting for I/O or timers.
......
......@@ -256,6 +256,8 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
mp_delete(pool.ctx);
return NULL;
}
memset(&task->req, 0, sizeof(task->req));
/* Create packet buffers for answer and subrequests */
task->req.pool = pool;
knot_pkt_t *pktbuf = knot_pkt_new(NULL, pktbuf_max, &task->req.pool);
......@@ -264,7 +266,6 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
return NULL;
}
pktbuf->size = 0;
task->req.answer = NULL;
task->pktbuf = pktbuf;
array_init(task->waiting);
task->addrlist = NULL;
......@@ -279,13 +280,6 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
task->session = NULL;
task->source.handle = handle;
task->timeout = NULL;
task->on_complete = NULL;
task->baton = NULL;
task->req.qsource.key = NULL;
task->req.qsource.addr = NULL;
task->req.qsource.dst_addr = NULL;
task->req.qsource.packet = NULL;
task->req.qsource.opt = NULL;
/* Remember query source addr */
if (addr) {
size_t addr_len = sizeof(struct sockaddr_in);
......@@ -420,15 +414,10 @@ static int qr_task_register(struct qr_task *task, struct session *session)
static void qr_task_complete(struct qr_task *task)
{
struct worker_ctx *worker = task->worker;
/* Kill pending I/O requests */
ioreq_killall(task);
assert(task->waiting.len == 0);
assert(task->leading == false);
/* Run the completion callback. */
if (task->on_complete) {
task->on_complete(worker, &task->req, task->baton);
}
/* Release primary reference to task. */
qr_task_unref(task);
}
......@@ -1077,8 +1066,7 @@ int worker_resolve_exec(struct qr_task *task, knot_pkt_t *query)
return qr_task_step(task, NULL, query);
}
int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, struct kr_qflags options,
worker_cb_t on_complete, void *baton)
int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, struct kr_qflags options)
{
if (!worker || !query) {
return kr_error(EINVAL);
......@@ -1090,9 +1078,6 @@ int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, struct kr_qflag
return kr_error(ENOMEM);
}
/* Install completion handler */
task->baton = baton;
task->on_complete = on_complete;
return worker_resolve_exec(task, query);
}
......
......@@ -24,8 +24,6 @@
/** Worker state (opaque). */
struct worker_ctx;
struct qr_task;
/** Worker callback */
typedef void (*worker_cb_t)(struct worker_ctx *worker, struct kr_request *req, void *baton);
/** Create and initialize the worker. */
struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool,
......@@ -76,8 +74,7 @@ int worker_resolve_exec(struct qr_task *task, knot_pkt_t *query);
* @note the options passed are |-combined with struct kr_context::options
* @todo maybe better semantics for this?
*/
int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, struct kr_qflags options,
worker_cb_t on_complete, void *baton);
int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, struct kr_qflags options);
/** Collect worker mempools */
void worker_reclaim(struct worker_ctx *worker);
......@@ -143,8 +140,6 @@ struct qr_task
uint16_t bytes_remaining;
struct sockaddr *addrlist;
uv_timer_t *timeout;
worker_cb_t on_complete;
void *baton;
struct {
union inaddr addr;
union inaddr dst_addr;
......
......@@ -720,6 +720,7 @@ int kr_resolve_begin(struct kr_request *request, struct kr_context *ctx, knot_pk
request->answ_validated = false;
request->auth_validated = false;
request->trace_log = NULL;
request->trace_finish = NULL;
/* Expect first query */
kr_rplan_init(&request->rplan, request, &request->pool);
......@@ -1567,6 +1568,16 @@ int kr_resolve_finish(struct kr_request *request, int state)
struct kr_query *last = rplan->resolved.len > 0 ? array_tail(rplan->resolved) : NULL;
VERBOSE_MSG(last, "finished: %d, queries: %zu, mempool: %zu B\n",
request->state, rplan->resolved.len, (size_t) mp_total_size(request->pool.ctx));
/* Trace request finish */
if (request->trace_finish) {
request->trace_finish(request);
}
/* Uninstall all tracepoints */
request->trace_finish = NULL;
request->trace_log = NULL;
return KR_STATE_DONE;
}
......
......@@ -204,6 +204,7 @@ struct kr_request {
struct kr_rplan rplan;
int has_tls;
trace_log_f trace_log; /**< Logging tracepoint */
trace_callback_f trace_finish; /**< Request finish tracepoint */
knot_mm_t pool;
};
......
......@@ -32,13 +32,16 @@
#include "lib/defines.h"
struct kr_query;
struct kr_request;
/*
* Logging and debugging.
*/
/** @brief Callback for request events. */
typedef void (*trace_callback_f)(struct kr_request *request);
/** @brief Callback for request logging handler. */
typedef void *(*trace_log_f)(const struct kr_query *query, const char *source, const char *msg);
typedef void (*trace_log_f)(const struct kr_query *query, const char *source, const char *msg);
#define kr_log_info(fmt, ...) do { printf((fmt), ## __VA_ARGS__); fflush(stdout); } while(0)
#define kr_log_error(fmt, ...) fprintf(stderr, (fmt), ## __VA_ARGS__)
......
......@@ -48,43 +48,47 @@ local function serve_trace(h, _)
-- Create logging handler callback
local buffer = {}
local buffer_log_cb = ffi.cast('trace_log_f', function (query, source, msg)
local message = string.format('[%5u] [%s] %s',
tonumber(query.id), ffi.string(source), ffi.string(msg))
local message = string.format('[%5s] [%s] %s',
query.id, ffi.string(source), ffi.string(msg))
table.insert(buffer, message)
end)
-- Wait for the result of the query
-- Note: We can't do non-blocking write to stream directly from resolve callbacks
-- because they don't run inside cqueue.
local answers, authority = {}, {}
local cond = condition.new()
local done = false
local waiting, done = false, false
local finish_cb = ffi.cast('trace_callback_f', function (req)
req = kres.request_t(req)
add_selected_records(answers, req.answ_selected)
add_selected_records(authority, req.auth_selected)
if waiting then
cond:signal()
end
done = true
end)
-- Resolve query and buffer logs into table
local answers, authority = {}, {}
resolve {
name = qname,
type = qtype,
options = {'TRACE'},
begin = function (req)
init = function (req)
req = kres.request_t(req)
req.trace_log = buffer_log_cb
end,
finish = function (_, req)
req = kres.request_t(req)
add_selected_records(answers, req.answ_selected)
add_selected_records(authority, req.auth_selected)
cond:signal()
done = true
req.trace_finish = finish_cb
end
}
-- Wait for asynchronous query and free callbacks
if done then
cond:wait(0) -- Must pick up the signal
else
if not done then
waiting = true
cond:wait()
end
buffer_log_cb:free()
finish_cb:free()
-- Build the result
local result = table.concat(buffer, '') .. '\n'
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment