Commit f49e55e4 authored by Marek Vavruša's avatar Marek Vavruša

daemon/worker: cherry-picked refcounting for worker tasks

this is going to be needed for issuing multiple timed queries + fast
retry
parent 9db469c9
......@@ -66,7 +66,6 @@ struct qr_task
struct kr_request req;
struct worker_ctx *worker;
knot_pkt_t *pktbuf;
uv_req_t *ioreq;
uv_handle_t *iohandle;
uv_timer_t timeout;
worker_cb_t on_complete;
......@@ -79,11 +78,19 @@ struct qr_task
uv_handle_t *handle;
} source;
uint16_t iter_count;
uint16_t flags;
uint16_t refs;
};
/* Convenience macros */
#define qr_task_ref(task) \
do { ++(task)->refs; } while(0)
#define qr_task_unref(task) \
do { if (--(task)->refs == 0) { qr_task_free(task); } } while (0)
#define qr_valid_handle(task, checked) \
((task)->iohandle == (checked) || (task)->source.handle == (checked))
/* Forward decls */
static int qr_task_step(struct qr_task *task, knot_pkt_t *packet);
static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet);
/** @internal Get singleton worker. */
static inline struct worker_ctx *get_worker(void)
......@@ -135,10 +142,9 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
}
task->req.answer = answer;
task->pktbuf = pktbuf;
task->ioreq = NULL;
task->iohandle = NULL;
task->iter_count = 0;
task->flags = 0;
task->refs = 1;
task->worker = worker;
task->source.handle = handle;
uv_timer_init(worker->loop, &task->timeout);
......@@ -162,21 +168,10 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
return task;
}
static void qr_task_free(uv_handle_t *handle)
static void qr_task_free(struct qr_task *task)
{
struct qr_task *task = handle->data;
struct worker_ctx *worker = task->worker;
/* Run the completion callback. */
if (task->on_complete) {
task->on_complete(worker, &task->req, task->baton);
}
/* Return handle to the event loop in case
* it was exclusively taken by this task. */
if (task->source.handle && !uv_has_ref(task->source.handle)) {
uv_ref(task->source.handle);
io_start_read(task->source.handle);
}
/* Return mempool to ring or free it if it's full */
struct worker_ctx *worker = task->worker;
void *mp_context = task->req.pool.ctx;
if (worker->pools.len < MP_FREELIST_SIZE) {
mp_flush(mp_context);
......@@ -193,7 +188,24 @@ static void qr_task_free(uv_handle_t *handle)
#endif
mp_delete_count = 0;
}
}
static void qr_task_complete(uv_handle_t *handle)
{
struct qr_task *task = handle->data;
struct worker_ctx *worker = task->worker;
/* Run the completion callback. */
if (task->on_complete) {
task->on_complete(worker, &task->req, task->baton);
}
/* Return handle to the event loop in case
* it was exclusively taken by this task. */
if (task->source.handle && !uv_has_ref(task->source.handle)) {
uv_ref(task->source.handle);
io_start_read(task->source.handle);
}
/* Release task */
qr_task_unref(task);
/* Update stats */
worker->stats.concurrent -= 1;
}
......@@ -202,10 +214,7 @@ static void qr_task_timeout(uv_timer_t *req)
{
struct qr_task *task = req->data;
if (!uv_is_closing((uv_handle_t *)req)) {
if (task->ioreq) { /* Invalidate pending IO request. */
task->ioreq->data = NULL;
}
qr_task_step(task, NULL);
qr_task_step(task, NULL, NULL);
}
}
......@@ -217,7 +226,7 @@ static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status
}
} else { /* Finalize task */
uv_timer_stop(&task->timeout);
uv_close((uv_handle_t *)&task->timeout, qr_task_free);
uv_close((uv_handle_t *)&task->timeout, qr_task_complete);
}
return status;
}
......@@ -232,10 +241,10 @@ static void on_send(uv_udp_send_t *req, int status)
{
struct worker_ctx *worker = get_worker();
struct qr_task *task = req->data;
if (task) {
if (qr_valid_handle(task, (uv_handle_t *)req->handle)) {
qr_task_on_send(task, (uv_handle_t *)req->handle, status);
task->ioreq = NULL;
}
qr_task_unref(task);
ioreq_release(worker, (struct ioreq *)req);
}
......@@ -243,16 +252,15 @@ static void on_write(uv_write_t *req, int status)
{
struct worker_ctx *worker = get_worker();
struct qr_task *task = req->data;
if (task) {
if (qr_valid_handle(task, (uv_handle_t *)req->handle)) {
qr_task_on_send(task, (uv_handle_t *)req->handle, status);
task->ioreq = NULL;
}
qr_task_unref(task);
ioreq_release(worker, (struct ioreq *)req);
}
static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt)
{
int ret = 0;
if (!handle) {
return qr_task_on_send(task, handle, kr_error(EIO));
}
......@@ -262,6 +270,7 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockad
}
/* Send using given protocol */
int ret = 0;
if (handle->type == UV_UDP) {
uv_buf_t buf = { (char *)pkt->wire, pkt->size };
send_req->as.send.data = task;
......@@ -276,7 +285,7 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockad
ret = uv_write(&send_req->as.write, (uv_stream_t *)handle, buf, 2, &on_write);
}
if (ret == 0) {
task->ioreq = (uv_req_t *)send_req;
qr_task_ref(task); /* Pending ioreq on current task */
} else {
ioreq_release(task->worker, send_req);
}
......@@ -299,18 +308,18 @@ static void on_connect(uv_connect_t *req, int status)
{
struct worker_ctx *worker = get_worker();
struct qr_task *task = req->data;
if (task) {
task->ioreq = NULL;
uv_stream_t *handle = req->handle;
if (qr_valid_handle(task, (uv_handle_t *)req->handle)) {
struct sockaddr_in6 addr;
int addrlen = sizeof(addr); /* Retrieve endpoint IP for statistics */
uv_tcp_getpeername((uv_tcp_t *)handle, (struct sockaddr *)&addr, &addrlen);
if (status == 0) {
struct sockaddr_in6 addr;
int addrlen = sizeof(addr); /* Retrieve endpoint IP for statistics */
uv_stream_t *handle = req->handle;
uv_tcp_getpeername((uv_tcp_t *)handle, (struct sockaddr *)&addr, &addrlen);
qr_task_send(task, (uv_handle_t *)handle, (struct sockaddr *)&addr, task->pktbuf);
} else {
qr_task_step(task, NULL);
qr_task_step(task, (struct sockaddr *)&addr, NULL);
}
}
qr_task_unref(task);
ioreq_release(worker, (struct ioreq *)req);
}
......@@ -322,7 +331,7 @@ static int qr_task_finalize(struct qr_task *task, int state)
return state == KNOT_STATE_DONE ? 0 : kr_error(EIO);
}
static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet)
{
/* Close subrequest handle. */
uv_timer_stop(&task->timeout);
......@@ -336,7 +345,7 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
int sock_type = -1;
struct sockaddr *addr = NULL;
knot_pkt_t *pktbuf = task->pktbuf;
int state = kr_resolve_consume(&task->req, NULL, packet);
int state = kr_resolve_consume(&task->req, packet_source, packet);
while (state == KNOT_STATE_PRODUCE) {
state = kr_resolve_produce(&task->req, &addr, &sock_type, pktbuf);
if (unlikely(++task->iter_count > KR_ITER_LIMIT)) {
......@@ -348,7 +357,7 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
if (state & (KNOT_STATE_DONE|KNOT_STATE_FAIL)) {
return qr_task_finalize(task, state);
} else if (!addr || sock_type < 0) {
return qr_task_step(task, NULL);
return qr_task_step(task, NULL, NULL);
}
/* Create connection for iterative query */
......@@ -363,19 +372,19 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
task->iohandle = subreq;
if (sock_type == SOCK_DGRAM) {
if (qr_task_send(task, subreq, addr, pktbuf) != 0) {
return qr_task_step(task, NULL);
return qr_task_step(task, NULL, NULL);
}
} else {
struct ioreq *conn_req = ioreq_take(task->worker);
if (!conn_req) {
return qr_task_step(task, NULL);
return qr_task_step(task, NULL, NULL);
}
conn_req->as.connect.data = task;
task->ioreq = (uv_req_t *)conn_req;
if (uv_tcp_connect(&conn_req->as.connect, (uv_tcp_t *)subreq, addr, on_connect) != 0) {
ioreq_release(task->worker, conn_req);
return qr_task_step(task, NULL);
return qr_task_step(task, NULL, NULL);
}
qr_task_ref(task);
}
/* Start next step with timeout */
......@@ -423,7 +432,7 @@ int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *quer
}
/* Consume input and produce next query */
return qr_task_step(task, query);
return qr_task_step(task, addr, query);
}
int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, unsigned options, worker_cb_t on_complete, void *baton)
......@@ -440,7 +449,7 @@ int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, unsigned option
task->baton = baton;
task->on_complete = on_complete;
task->req.options |= options;
return qr_task_step(task, query);
return qr_task_step(task, NULL, query);
}
int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment