Commit 6addcded authored by Marek Vavruša's avatar Marek Vavruša

daemon/worker: deduplicate outbound queries

worker can track outbound requests and if N resolutions want the same
subrequest, only one will lead it and others will be notified when it
finishes

this massively reduces number of outbound requests for
slow/unresponsive/low ttl requests
parent 6170374f
......@@ -56,6 +56,7 @@ struct qr_task
struct kr_request req;
struct worker_ctx *worker;
knot_pkt_t *pktbuf;
array_t(struct qr_task *) waiting;
uv_handle_t *pending[MAX_PENDING];
uint16_t pending_count;
uint16_t addrlist_count;
......@@ -74,7 +75,8 @@ struct qr_task
uint16_t iter_count;
uint16_t refs;
uint16_t bytes_remaining;
uint16_t finished;
bool finished;
bool leading;
};
/* Convenience macros */
......@@ -246,12 +248,14 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
}
task->req.answer = answer;
task->pktbuf = pktbuf;
array_init(task->waiting);
task->addrlist = NULL;
task->pending_count = 0;
task->bytes_remaining = 0;
task->iter_count = 0;
task->refs = 1;
task->finished = false;
task->leading = false;
task->worker = worker;
task->source.handle = handle;
uv_timer_init(worker->loop, &task->retry);
......@@ -310,6 +314,8 @@ static void qr_task_complete(uv_handle_t *handle)
struct worker_ctx *worker = task->worker;
/* Kill pending I/O requests */
ioreq_killall(task);
assert(task->waiting.len == 0);
assert(task->leading == false);
/* Run the completion callback. */
if (task->on_complete) {
task->on_complete(worker, &task->req, task->baton);
......@@ -475,16 +481,16 @@ static void on_retransmit(uv_timer_t *req)
}
}
static int qr_task_finalize(struct qr_task *task, int state)
/** @internal Get key from current outstanding subrequest. */
static int subreq_key(char *dst, struct qr_task *task)
{
kr_resolve_finish(&task->req, state);
task->finished = true;
/* Send back answer */
(void) qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer);
return state == KNOT_STATE_DONE ? 0 : kr_error(EIO);
assert(task);
knot_pkt_t *pkt = task->pktbuf;
assert(knot_wire_get_qr(pkt->wire) == false);
return kr_rrmap_key(dst, knot_pkt_qname(pkt), knot_pkt_qtype(pkt), knot_pkt_qclass(pkt));
}
static void cancel_subrequests(struct qr_task *task)
static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
{
/* Close pending I/O requests */
if (uv_is_active((uv_handle_t *)&task->retry))
......@@ -492,6 +498,71 @@ static void cancel_subrequests(struct qr_task *task)
if (uv_is_active((uv_handle_t *)&task->timeout))
uv_timer_stop(&task->timeout);
ioreq_killall(task);
/* Clear from outstanding table. */
if (!task->leading)
return;
char key[RRMAP_KEYSIZE];
int ret = subreq_key(key, task);
if (ret > 0) {
assert(map_get(&task->worker->outstanding, key) == task);
map_del(&task->worker->outstanding, key);
}
/* Notify waiting tasks. */
struct kr_query *leader_qry = TAIL(task->req.rplan.pending);
for (size_t i = task->waiting.len; i --> 0;) {
struct qr_task *follower = task->waiting.at[i];
struct kr_query *qry = TAIL(follower->req.rplan.pending);
/* Reuse MSGID and 0x20 secret */
if (qry) {
qry->id = leader_qry->id;
qry->secret = leader_qry->secret;
leader_qry->secret = 0; /* Next will be already decoded */
}
qr_task_step(follower, packet_source, pkt);
qr_task_unref(follower);
}
task->waiting.len = 0;
task->leading = false;
}
static void subreq_lead(struct qr_task *task)
{
assert(task);
char key[RRMAP_KEYSIZE];
if (subreq_key(key, task) > 0) {
assert(map_contains(&task->worker->outstanding, key) == false);
map_set(&task->worker->outstanding, key, task);
task->leading = true;
}
}
static bool subreq_enqueue(struct qr_task *task)
{
assert(task);
char key[RRMAP_KEYSIZE];
if (subreq_key(key, task) > 0) {
struct qr_task *leader = map_get(&task->worker->outstanding, key);
if (leader) {
/* Enqueue itself to leader for this subrequest. */
int ret = array_reserve_mm(leader->waiting, leader->waiting.len + 1, mm_reserve, &leader->req.pool);
if (ret == 0) {
array_push(leader->waiting, task);
qr_task_ref(task);
return true;
}
}
}
return false;
}
static int qr_task_finalize(struct qr_task *task, int state)
{
assert(task && task->leading == false);
kr_resolve_finish(&task->req, state);
task->finished = true;
/* Send back answer */
(void) qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer);
return state == KNOT_STATE_DONE ? 0 : kr_error(EIO);
}
static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet)
......@@ -501,7 +572,7 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour
return kr_error(ESTALE);
}
/* Close pending I/O requests */
cancel_subrequests(task);
subreq_finalize(task, packet_source, packet);
/* Consume input and produce next query */
int sock_type = -1;
task->addrlist = NULL;
......@@ -532,11 +603,20 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour
/* Start fast retransmit with UDP, otherwise connect. */
if (sock_type == SOCK_DGRAM) {
/* If such subrequest is outstanding, enqueue to it. */
if (subreq_enqueue(task)) {
return kr_ok(); /* Will be notified when outstanding subrequest finishes. */
}
/* Start transmitting */
if (retransmit(task)) {
uv_timer_start(&task->retry, on_retransmit, KR_CONN_RETRY, KR_CONN_RETRY);
} else {
return qr_task_step(task, NULL, NULL);
}
/* Announce and start subrequest.
* @note Only UDP can lead I/O as it doesn't touch 'task->pktbuf' for reassembly.
*/
subreq_lead(task);
} else {
struct ioreq *conn = ioreq_take(task->worker);
if (!conn) {
......@@ -549,7 +629,6 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour
}
conn->as.connect.data = task;
if (uv_tcp_connect(&conn->as.connect, (uv_tcp_t *)client, task->addrlist, on_connect) != 0) {
DEBUG_MSG("task conn_start %p => failed\n", task);
ioreq_release(task->worker, conn);
return qr_task_step(task, NULL, NULL);
}
......@@ -559,9 +638,12 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour
/* Start next step with timeout, fatal if can't start a timer. */
int ret = uv_timer_start(&task->timeout, on_timeout, KR_CONN_RTT_MAX, 0);
if (ret != 0)
if (ret != 0) {
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KNOT_STATE_FAIL);
return kr_ok();
}
return ret;
}
static int parse_packet(knot_pkt_t *query)
......@@ -695,6 +777,7 @@ int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
memset(&worker->pkt_pool, 0, sizeof(worker->pkt_pool));
worker->pkt_pool.ctx = mp_new (4 * sizeof(knot_pkt_t));
worker->pkt_pool.alloc = (mm_alloc_t) mp_alloc;
worker->outstanding = map_make();
return kr_ok();
}
......@@ -712,6 +795,7 @@ void worker_reclaim(struct worker_ctx *worker)
reclaim_freelist(worker->ioreqs, struct ioreq, free);
mp_delete(worker->pkt_pool.ctx);
worker->pkt_pool.ctx = NULL;
map_clear(&worker->outstanding);
}
#undef DEBUG_MSG
......@@ -20,6 +20,7 @@
#include "daemon/engine.h"
#include "lib/generic/array.h"
#include "lib/generic/map.h"
/* @cond internal Freelist of available mempools. */
typedef array_t(void *) mp_freelist_t;
......@@ -46,6 +47,7 @@ struct worker_ctx {
size_t dropped;
size_t timeout;
} stats;
map_t outstanding;
mp_freelist_t pools;
mp_freelist_t ioreqs;
mm_ctx_t pkt_pool;
......
......@@ -307,7 +307,6 @@ int kr_bitcmp(const char *a, const char *b, int bits)
int kr_rrmap_key(char *key, const knot_dname_t *owner, uint16_t type, uint8_t rank)
{
if (!key || !owner) {
printf("key owner %p %p\n", key, owner);
return kr_error(EINVAL);
}
key[0] = (rank << 2) | 0x01; /* Must be non-zero */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment