Commit 07f71578 authored by Marek Vavruša's avatar Marek Vavruša

daemon/bindings: worker.resolve() can now call callback on completion

example:

worker.resolve('cz', kres.type.NS, kres.class.IN, 0,
function (pkt)
	local answer = kres.pkt_t(pkt)
	print (answer:rcode())
end)
parent 899e447d
...@@ -433,23 +433,28 @@ static void event_free(uv_timer_t *timer) ...@@ -433,23 +433,28 @@ static void event_free(uv_timer_t *timer)
free(timer); free(timer);
} }
static int execute_callback(lua_State *L, int argc)
{
int ret = engine_pcall(L, argc);
if (ret != 0) {
fprintf(stderr, "error: %s\n", lua_tostring(L, -1));
}
/* Clear the stack, there may be event a/o enything returned */
lua_settop(L, 0);
lua_gc(L, LUA_GCCOLLECT, 0);
return ret;
}
static void event_callback(uv_timer_t *timer) static void event_callback(uv_timer_t *timer)
{ {
struct worker_ctx *worker = timer->loop->data; struct worker_ctx *worker = timer->loop->data;
lua_State *L = worker->engine->L; lua_State *L = worker->engine->L;
/* Retrieve callback and execute */ /* Retrieve callback and execute */
int top = lua_gettop(L);
lua_rawgeti(L, LUA_REGISTRYINDEX, (intptr_t) timer->data); lua_rawgeti(L, LUA_REGISTRYINDEX, (intptr_t) timer->data);
lua_rawgeti(L, -1, 1); lua_rawgeti(L, -1, 1);
lua_pushinteger(L, (intptr_t) timer->data); lua_pushinteger(L, (intptr_t) timer->data);
int ret = engine_pcall(L, 1); int ret = execute_callback(L, 1);
if (ret != 0) {
fprintf(stderr, "error: %s\n", lua_tostring(L, -1));
}
/* Clear the stack, there may be event a/o enything returned */
lua_settop(L, top);
lua_gc(L, LUA_GCCOLLECT, 0);
/* Free callback if not recurrent or an error */ /* Free callback if not recurrent or an error */
if (ret != 0 || uv_timer_get_repeat(timer) == 0) { if (ret != 0 || uv_timer_get_repeat(timer) == 0) {
uv_close((uv_handle_t *)timer, (uv_close_cb) event_free); uv_close((uv_handle_t *)timer, (uv_close_cb) event_free);
...@@ -553,6 +558,20 @@ static inline struct worker_ctx *wrk_luaget(lua_State *L) { ...@@ -553,6 +558,20 @@ static inline struct worker_ctx *wrk_luaget(lua_State *L) {
return worker; return worker;
} }
/* @internal Call the Lua callback stored in baton. */
static void resolve_callback(struct worker_ctx *worker, struct kr_request *req, void *baton)
{
assert(worker);
assert(req);
assert(baton);
lua_State *L = worker->engine->L;
intptr_t cb_ref = (intptr_t) baton;
lua_rawgeti(L, LUA_REGISTRYINDEX, cb_ref);
luaL_unref(L, LUA_REGISTRYINDEX, cb_ref);
lua_pushlightuserdata(L, req->answer);
(void) execute_callback(L, 1);
}
static int wrk_resolve(lua_State *L) static int wrk_resolve(lua_State *L)
{ {
struct worker_ctx *worker = wrk_luaget(L); struct worker_ctx *worker = wrk_luaget(L);
...@@ -589,9 +608,17 @@ static int wrk_resolve(lua_State *L) ...@@ -589,9 +608,17 @@ static int wrk_resolve(lua_State *L)
knot_pkt_free(&pkt); knot_pkt_free(&pkt);
return 0; return 0;
} }
/* Resolve it */ /* Add completion callback */
unsigned options = lua_tointeger(L, 4); unsigned options = lua_tointeger(L, 4);
ret = worker_resolve(worker, pkt, options); if (lua_isfunction(L, 5)) {
/* Store callback in registry */
lua_pushvalue(L, 5);
int cb = luaL_ref(L, LUA_REGISTRYINDEX);
ret = worker_resolve(worker, pkt, options, resolve_callback, (void *) (intptr_t)cb);
} else {
ret = worker_resolve(worker, pkt, options, NULL, NULL);
}
knot_pkt_free(&pkt); knot_pkt_free(&pkt);
lua_pushboolean(L, ret == 0); lua_pushboolean(L, ret == 0);
return 1; return 1;
......
...@@ -7,6 +7,9 @@ sec = 1000 ...@@ -7,6 +7,9 @@ sec = 1000
minute = 60 * sec minute = 60 * sec
hour = 60 * minute hour = 60 * minute
-- Resolver bindings
kres = require('kres')
-- Function aliases -- Function aliases
-- `env.VAR returns os.getenv(VAR)` -- `env.VAR returns os.getenv(VAR)`
env = {} env = {}
......
...@@ -69,6 +69,8 @@ struct qr_task ...@@ -69,6 +69,8 @@ struct qr_task
uv_req_t *ioreq; uv_req_t *ioreq;
uv_handle_t *iohandle; uv_handle_t *iohandle;
uv_timer_t timeout; uv_timer_t timeout;
worker_cb_t on_complete;
void *baton;
struct { struct {
union { union {
struct sockaddr_in ip4; struct sockaddr_in ip4;
...@@ -138,6 +140,7 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha ...@@ -138,6 +140,7 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
task->source.handle = handle; task->source.handle = handle;
uv_timer_init(worker->loop, &task->timeout); uv_timer_init(worker->loop, &task->timeout);
task->timeout.data = task; task->timeout.data = task;
task->on_complete = NULL;
/* Remember query source addr */ /* Remember query source addr */
if (addr) { if (addr) {
memcpy(&task->source.addr, addr, sockaddr_len(addr)); memcpy(&task->source.addr, addr, sockaddr_len(addr));
...@@ -159,6 +162,11 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha ...@@ -159,6 +162,11 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
static void qr_task_free(uv_handle_t *handle) static void qr_task_free(uv_handle_t *handle)
{ {
struct qr_task *task = handle->data; struct qr_task *task = handle->data;
struct worker_ctx *worker = task->worker;
/* Run the completion callback. */
if (task->on_complete) {
task->on_complete(worker, &task->req, task->baton);
}
/* Return handle to the event loop in case /* Return handle to the event loop in case
* it was exclusively taken by this task. */ * it was exclusively taken by this task. */
if (task->source.handle && !uv_has_ref(task->source.handle)) { if (task->source.handle && !uv_has_ref(task->source.handle)) {
...@@ -166,7 +174,6 @@ static void qr_task_free(uv_handle_t *handle) ...@@ -166,7 +174,6 @@ static void qr_task_free(uv_handle_t *handle)
io_start_read(task->source.handle); io_start_read(task->source.handle);
} }
/* Return mempool to ring or free it if it's full */ /* Return mempool to ring or free it if it's full */
struct worker_ctx *worker = task->worker;
void *mp_context = task->req.pool.ctx; void *mp_context = task->req.pool.ctx;
if (worker->pools.len < MP_FREELIST_SIZE) { if (worker->pools.len < MP_FREELIST_SIZE) {
mp_flush(mp_context); mp_flush(mp_context);
...@@ -416,9 +423,9 @@ int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *quer ...@@ -416,9 +423,9 @@ int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *quer
return qr_task_step(task, query); return qr_task_step(task, query);
} }
int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, unsigned options) int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, unsigned options, worker_cb_t on_complete, void *baton)
{ {
if (!worker) { if (!worker || !query) {
return kr_error(EINVAL); return kr_error(EINVAL);
} }
...@@ -427,6 +434,8 @@ int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, unsigned option ...@@ -427,6 +434,8 @@ int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, unsigned option
if (!task) { if (!task) {
return kr_error(ENOMEM); return kr_error(ENOMEM);
} }
task->baton = baton;
task->on_complete = on_complete;
task->req.options |= options; task->req.options |= options;
return qr_task_step(task, query); return qr_task_step(task, query);
} }
......
...@@ -48,6 +48,9 @@ struct worker_ctx { ...@@ -48,6 +48,9 @@ struct worker_ctx {
mm_ctx_t pkt_pool; mm_ctx_t pkt_pool;
}; };
/* Worker callback */
typedef void (*worker_cb_t)(struct worker_ctx *worker, struct kr_request *req, void *baton);
/** /**
* Process incoming packet (query or answer to subrequest). * Process incoming packet (query or answer to subrequest).
* @return 0 or an error code * @return 0 or an error code
...@@ -58,7 +61,7 @@ int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *quer ...@@ -58,7 +61,7 @@ int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *quer
* Schedule query for resolution. * Schedule query for resolution.
* @return 0 or an error code * @return 0 or an error code
*/ */
int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, unsigned options); int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, unsigned options, worker_cb_t on_complete, void *baton);
/** Reserve worker buffers */ /** Reserve worker buffers */
int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen); int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment