worker_ctx: migrate leaders from map_t to trie_t

parent 80b833eb
Pipeline #35405 passed with stages
in 9 minutes and 6 seconds
......@@ -484,13 +484,6 @@ static inline void pool_release(struct worker_ctx *worker, struct mempool *mp)
}
}
/** @internal Get key from current outgoing subrequest. */
static int subreq_key(char *dst, knot_pkt_t *pkt)
{
assert(pkt);
return kr_rrkey(dst, knot_pkt_qname(pkt), knot_pkt_qtype(pkt), knot_pkt_qclass(pkt));
}
/** Create and initialize a request_ctx (on a fresh mempool).
*
* handle and addr point to the source of the request, and they are NULL
......@@ -1410,6 +1403,35 @@ static int timer_start(struct session *session, uv_timer_cb cb,
return 0;
}
/** Create a key for an outgoing subrequest: qname, qclass, qtype.
* @param key Destination buffer for key size, MUST be SUBREQ_KEY_LEN or larger.
* @return key length if successful or an error
*/
static int subreq_key(char *dst, knot_pkt_t *pkt)
{
assert(dst && pkt);
const char * const dst_begin = dst;
int ret = knot_dname_to_wire((uint8_t *)dst, knot_pkt_qname(pkt), KNOT_DNAME_MAXLEN);
if (ret <= 0) {
assert(false); /*EINVAL*/
return kr_error(ret);
}
knot_dname_to_lower((knot_dname_t *)dst);
dst += ret;
const uint16_t qclass = knot_pkt_qclass(pkt);
memcpy(dst, &qclass, sizeof(qclass));
dst += sizeof(qclass);
const uint16_t qtype = knot_pkt_qtype(pkt);
memcpy(dst, &qtype, sizeof(qtype));
dst += sizeof(qtype);
return dst - dst_begin;
}
static const size_t SUBREQ_KEY_LEN = KNOT_DNAME_MAXLEN + 2 * sizeof(uint16_t);
static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
{
/* Close pending timer */
......@@ -1417,11 +1439,12 @@ static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_
/* Clear from outgoing table. */
if (!task->leading)
return;
char key[KR_RRKEY_LEN];
int ret = subreq_key(key, task->pktbuf);
if (ret > 0) {
assert(map_get(&task->ctx->worker->outgoing, key) == task);
map_del(&task->ctx->worker->outgoing, key);
char key[SUBREQ_KEY_LEN];
const int klen = subreq_key(key, task->pktbuf);
if (klen > 0) {
void *val_deleted;
int ret = trie_del(task->ctx->worker->subreq_out, key, klen, &val_deleted);
assert(ret == KNOT_EOK && val_deleted == task);
}
/* Notify waiting tasks. */
struct kr_query *leader_qry = array_tail(task->ctx->req.rplan.pending);
......@@ -1444,34 +1467,43 @@ static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_
static void subreq_lead(struct qr_task *task)
{
assert(task);
char key[KR_RRKEY_LEN];
if (subreq_key(key, task->pktbuf) > 0) {
assert(map_contains(&task->ctx->worker->outgoing, key) == false);
map_set(&task->ctx->worker->outgoing, key, task);
task->leading = true;
char key[SUBREQ_KEY_LEN];
const int klen = subreq_key(key, task->pktbuf);
if (klen < 0)
return;
struct qr_task **tvp = (struct qr_task **)
trie_get_ins(task->ctx->worker->subreq_out, key, klen);
if (unlikely(!tvp))
return; /*ENOMEM*/
if (unlikely(*tvp != NULL)) {
assert(false);
return;
}
*tvp = task;
task->leading = true;
}
static bool subreq_enqueue(struct qr_task *task)
{
assert(task);
char key[KR_RRKEY_LEN];
if (subreq_key(key, task->pktbuf) > 0) {
struct qr_task *leader = map_get(&task->ctx->worker->outgoing, key);
if (leader) {
/* Enqueue itself to leader for this subrequest. */
int ret = array_reserve_mm(leader->waiting, leader->waiting.len + 1,
kr_memreserve, &leader->ctx->req.pool);
if (ret == 0) {
array_push(leader->waiting, task);
qr_task_ref(task);
return true;
}
}
}
return false;
char key[SUBREQ_KEY_LEN];
const int klen = subreq_key(key, task->pktbuf);
if (klen < 0)
return false;
struct qr_task **leader = (struct qr_task **)
trie_get_try(task->ctx->worker->subreq_out, key, klen);
if (!leader /*ENOMEM*/ || !*leader)
return false;
/* Enqueue itself to leader for this subrequest. */
int ret = array_push_mm((*leader)->waiting, task,
kr_memreserve, &(*leader)->ctx->req.pool);
if (unlikely(ret < 0)) /*ENOMEM*/
return false;
qr_task_ref(task);
return true;
}
static int qr_task_finalize(struct qr_task *task, int state)
{
assert(task && task->leading == false);
......@@ -2378,7 +2410,7 @@ static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
memset(&worker->pkt_pool, 0, sizeof(worker->pkt_pool));
worker->pkt_pool.ctx = mp_new (4 * sizeof(knot_pkt_t));
worker->pkt_pool.alloc = (knot_mm_alloc_t) mp_alloc;
worker->outgoing = map_make(NULL);
worker->subreq_out = trie_create(NULL);
worker->tcp_connected = map_make(NULL);
worker->tcp_waiting = map_make(NULL);
worker->tcp_pipeline_max = MAX_PIPELINED;
......@@ -2402,7 +2434,8 @@ void worker_reclaim(struct worker_ctx *worker)
reclaim_freelist(worker->pool_sessions, struct session, session_free);
mp_delete(worker->pkt_pool.ctx);
worker->pkt_pool.ctx = NULL;
map_clear(&worker->outgoing);
trie_free(worker->subreq_out);
worker->subreq_out = NULL;
map_clear(&worker->tcp_connected);
map_clear(&worker->tcp_waiting);
}
......
......@@ -142,11 +142,12 @@ struct worker_ctx {
bool too_many_open;
size_t rconcurrent_highwatermark;
/* List of active outbound TCP sessions */
/** List of active outbound TCP sessions */
map_t tcp_connected;
/* List of outbound TCP sessions waiting to be accepted */
/** List of outbound TCP sessions waiting to be accepted */
map_t tcp_waiting;
map_t outgoing;
/** Subrequest leaders (struct qr_task*), indexed by qname+qtype+qclass. */
trie_t *subreq_out;
mp_freelist_t pool_mp;
mp_freelist_t pool_ioreqs;
mp_freelist_t pool_sessions;
......
......@@ -45,21 +45,27 @@ typedef struct trie trie_t;
typedef struct trie_it trie_it_t;
/*! \brief Create a trie instance. */
KR_EXPORT
trie_t* trie_create(knot_mm_t *mm);
/*! \brief Free a trie instance. */
KR_EXPORT
void trie_free(trie_t *tbl);
/*! \brief Clear a trie instance (make it empty). */
KR_EXPORT
void trie_clear(trie_t *tbl);
/*! \brief Return the number of keys in the trie. */
KR_EXPORT
size_t trie_weight(const trie_t *tbl);
/*! \brief Search the trie, returning NULL on failure. */
KR_EXPORT
trie_val_t* trie_get_try(trie_t *tbl, const char *key, uint32_t len);
/*! \brief Search the trie, inserting NULL trie_val_t on failure. */
KR_EXPORT
trie_val_t* trie_get_ins(trie_t *tbl, const char *key, uint32_t len);
/*!
......@@ -87,6 +93,7 @@ int trie_apply(trie_t *tbl, int (*f)(trie_val_t *, void *), void *d);
*
* If val!=NULL and deletion succeeded, the deleted value is set.
*/
KR_EXPORT
int trie_del(trie_t *tbl, const char *key, uint32_t len, trie_val_t *val);
/*! \brief Create a new iterator pointing to the first element (if any). */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment