Commit 1093ce48 authored by Marek Vavruša's avatar Marek Vavruša

reworked query resolution

- cache is now fully optional, resolution works without cache as well
- proper mempool per each request, not mixed
- zone cut resolution doesn't need any per-context memory
- now it's possible to do qname minimization
- rplan keeps tabs on resolved queries
- added documentation etc.
parent fc2b00f6
......@@ -13,6 +13,8 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
#include <libknot/errcode.h>
#include "daemon/layer/query.h"
#include "lib/resolve.h"
......@@ -30,7 +32,6 @@ static int begin(knot_layer_t *ctx, void *module_param)
static int input_query(knot_layer_t *ctx, knot_pkt_t *pkt)
{
assert(pkt && ctx);
struct kr_layer_param *param = ctx->data;
/* Check if at least header is parsed. */
if (pkt->parsed < pkt->size) {
......@@ -42,21 +43,24 @@ static int input_query(knot_layer_t *ctx, knot_pkt_t *pkt)
return KNOT_NS_PROC_NOOP; /* Ignore. */
}
return KNOT_NS_PROC_FULL;
}
static int output_answer(knot_layer_t *ctx, knot_pkt_t *pkt)
{
assert(pkt && ctx);
/* Prepare for query processing. */
int ret = kr_resolve(param->ctx, param->result,
int ret = kr_resolve(ctx->data, pkt,
knot_pkt_qname(pkt),
knot_pkt_qclass(pkt),
knot_pkt_qtype(pkt));
/* Set correct message ID. */
knot_pkt_t *answer = param->result->ans;
knot_wire_set_id(answer->wire, knot_wire_get_id(pkt->wire));
if (ret != KNOT_EOK) {
return KNOT_NS_PROC_FAIL;
} else {
return KNOT_NS_PROC_DONE;
}
return KNOT_NS_PROC_DONE;
}
/*! \brief Module implementation. */
......@@ -65,7 +69,7 @@ static const knot_layer_api_t LAYER_QUERY_MODULE = {
NULL,
&reset,
&input_query,
NULL,
&output_answer,
NULL
};
......
......@@ -2,6 +2,7 @@
#include <libknot/packet/pkt.h>
#include <libknot/internal/net.h>
#include <libknot/errcode.h>
#include "daemon/worker.h"
#include "daemon/layer/query.h"
......@@ -28,41 +29,45 @@ static void worker_send(uv_udp_t *handle, knot_pkt_t *answer, const struct socka
static void worker_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
const struct sockaddr *addr, unsigned flags)
{
struct worker_ctx *ctx = handle->data;
assert(ctx->pool);
struct worker_ctx *worker = handle->data;
assert(worker->pool);
if (nread < KNOT_WIRE_HEADER_SIZE) {
buf_free((uv_handle_t *)handle, buf);
return;
}
struct kr_result result;
/* Create query processing context. */
struct kr_layer_param param;
param.ctx = &ctx->resolve;
param.result = &result;
/* Parse query packet. */
knot_pkt_t *query = knot_pkt_new((uint8_t *)buf->base, nread, worker->pool);
int ret = knot_pkt_parse(query, 0);
if (ret != KNOT_EOK) {
knot_pkt_free(&query);
buf_free((uv_handle_t *)handle, buf);
return; /* Ignore malformed query. */
}
/* Process query packet. */
knot_layer_t proc;
memset(&proc, 0, sizeof(knot_layer_t));
proc.mm = ctx->pool;
knot_layer_begin(&proc, LAYER_QUERY, &param);
knot_pkt_t *query = knot_pkt_new((uint8_t *)buf->base, nread, ctx->pool);
knot_pkt_parse(query, 0);
proc.mm = worker->pool;
knot_layer_begin(&proc, LAYER_QUERY, &worker->resolve);
int state = knot_layer_in(&proc, query);
if (state & (KNOT_NS_PROC_DONE|KNOT_NS_PROC_FAIL)) {
worker_send(handle, result.ans, addr);
/* Build an answer. */
knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, worker->pool);
while (state == KNOT_NS_PROC_FULL) {
knot_pkt_init_response(answer, query);
state = knot_layer_out(&proc, answer);
if (answer->size > 0) {
worker_send(handle, answer, addr);
}
}
/* Cleanup. */
knot_layer_finish(&proc);
kr_result_deinit(&result);
kr_context_reset(&ctx->resolve);
buf_free((uv_handle_t *)handle, buf);
knot_pkt_free(&query);
knot_pkt_free(&answer);
}
void worker_init(struct worker_ctx *worker, mm_ctx_t *mm)
......
#include <string.h>
#include <sys/time.h>
#include <libknot/errcode.h>
#include <libknot/internal/sockaddr.h>
#include "lib/context.h"
#include "lib/rplan.h"
......@@ -11,110 +11,18 @@ int kr_context_init(struct kr_context *ctx, mm_ctx_t *mm)
ctx->pool = mm;
kr_rplan_init(&ctx->rplan, mm);
kr_zonecut_init(&ctx->dp_map, mm);
ctx->cache = kr_cache_open("/tmp/kresolved", 0, mm);
ctx->cache = kr_cache_open("/tmp/kresolved", mm);
if (ctx->cache == NULL) {
fprintf(stderr, "Cache directory '/tmp/kresolved' not exists, exitting.\n");
assert(ctx->cache);
}
return 0;
}
int kr_context_reset(struct kr_context *ctx)
{
/* Finalize transactions. */
int ret = kr_context_txn_commit(ctx);
if (ret != KNOT_EOK) {
return ret;
return KNOT_ERROR;
}
ctx->state = 0;
ctx->resolved_qry = NULL;
ctx->current_ns = NULL;
ctx->query = NULL;
ctx->txn.read = NULL;
ctx->txn.write = NULL;
kr_rplan_clear(&ctx->rplan);
return KNOT_EOK;
}
int kr_context_deinit(struct kr_context *ctx)
{
kr_zonecut_deinit(&ctx->dp_map);
kr_cache_close(ctx->cache);
return KNOT_EOK;
}
struct kr_txn *kr_context_txn_acquire(struct kr_context *ctx, unsigned flags)
{
struct kr_txn **txn = &ctx->txn.write;
if (flags & KR_CACHE_RDONLY) {
txn = &ctx->txn.read;
}
if (*txn != NULL) {
return *txn;
}
return *txn = kr_cache_txn_begin(ctx->cache, NULL, flags, ctx->pool);
}
void kr_context_txn_release(struct kr_txn *txn)
{
/*! \note Transactions are reused and commited on checkpoints only. */
}
int kr_context_txn_commit(struct kr_context *ctx)
{
if (ctx == NULL) {
return KNOT_EINVAL;
}
int ret = KNOT_EOK;
if (ctx->txn.write) {
ret = kr_cache_txn_commit(ctx->txn.write);
}
if (ctx->txn.read) {
kr_cache_txn_abort(ctx->txn.read);
}
ctx->txn.read = ctx->txn.write = NULL;
return ret;
}
int kr_result_init(struct kr_context *ctx, struct kr_result *result)
{
memset(result, 0, sizeof(struct kr_result));
/* Initialize answer packet. */
knot_pkt_t *ans = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, ctx->pool);
if (ans == NULL) {
return -1;
}
struct kr_query *qry = kr_rplan_next(&ctx->rplan);
if (qry == NULL) {
knot_pkt_free(&ans);
return -1;
}
knot_pkt_put_question(ans, qry->sname, qry->sclass, qry->stype);
knot_wire_set_rcode(ans->wire, KNOT_RCODE_SERVFAIL);
knot_wire_set_qr(ans->wire);
result->ans = ans;
return 0;
}
int kr_result_deinit(struct kr_result *result)
{
knot_pkt_free(&result->ans);
return 0;
}
......@@ -15,50 +15,38 @@ limitations under the License.
#pragma once
#include <stdint.h>
#include <libknot/internal/mempattern.h>
#include <libknot/internal/sockaddr.h>
#include <libknot/internal/lists.h>
#include "lib/zonecut.h"
#include "lib/rplan.h"
#include "lib/cache.h"
/*! \brief Name resolution result. */
struct kr_result {
knot_pkt_t *ans;
unsigned flags;
struct timeval t_start, t_end;
unsigned total_rtt;
unsigned nr_queries;
};
/*! \brief Name resolution context. */
/*!
* \brief Name resolution context.
*
* Resolution context provides basic services like cache, configuration and options.
*
* \note This structure is persistent between name resolutions and may
* be shared between threads.
*/
struct kr_context
{
struct kr_ns *current_ns;
struct kr_zonecut *zone_cut;
struct kr_query *resolved_qry;
const knot_pkt_t *query;
struct kr_rplan rplan;
struct kr_zonecut_map dp_map;
struct kr_cache *cache;
struct {
struct kr_txn *read;
struct kr_txn *write;
} txn;
mm_ctx_t *pool;
unsigned state;
list_t layers;
unsigned options;
mm_ctx_t *pool;
};
/*!
* \brief Initialize query resolution context.
* \param ctx context to be initialized
* \param mm memory context
* \return KNOT_E*
*/
int kr_context_init(struct kr_context *ctx, mm_ctx_t *mm);
int kr_context_reset(struct kr_context *ctx);
int kr_context_deinit(struct kr_context *ctx);
struct kr_txn *kr_context_txn_acquire(struct kr_context *ctx, unsigned flags);
void kr_context_txn_release(struct kr_txn *txn);
int kr_context_txn_commit(struct kr_context *ctx);
int kr_result_init(struct kr_context *ctx, struct kr_result *result);
int kr_result_deinit(struct kr_result *result);
/*!
* \brief Deinitialize query resolution context.
* \param ctx context to be deinitialized
* \return KNOT_E*
*/
int kr_context_deinit(struct kr_context *ctx);
......@@ -15,6 +15,10 @@ limitations under the License.
#pragma once
#include <libknot/errcode.h>
#include <libknot/dname.h>
#include <libknot/rrset.h>
/*
* Connection limits.
*/
......@@ -24,3 +28,9 @@ limitations under the License.
* Timers.
*/
#define KR_TTL_GRACE ((KR_CONN_RTT_MAX) / 1000) /* TTL expire grace period. */
/*
* Defines.
*/
#define KR_DNS_PORT 53
#define KR_DNAME_ROOT ((const knot_dname_t*)"")
......@@ -16,13 +16,18 @@ limitations under the License.
#pragma once
#include <libknot/processing/layer.h>
#include <libknot/packet/pkt.h>
#include "lib/context.h"
#include "lib/rplan.h"
/*!
* \brief Processing module parameters.
*
* \note These parameters are passed to each processing layer.
*/
struct kr_layer_param {
struct kr_context *ctx;
struct kr_result *result;
struct kr_rplan *rplan;
knot_pkt_t *answer;
};
This diff is collapsed.
......@@ -20,43 +20,14 @@ limitations under the License.
#define DEBUG_MSG(fmt, ...) fprintf(stderr, "[stats] " fmt, ## __VA_ARGS__)
static void update_ns_preference(struct kr_ns *ns, struct kr_ns *next)
{
assert(ns);
assert(next);
/* Push down if next has better score. */
if (next->stat.M < ns->stat.M) {
rem_node(&ns->node);
insert_node(&ns->node, &next->node);
}
}
static void update_ns_preference_list(struct kr_ns *cur)
{
assert(cur);
struct kr_ns *next = (struct kr_ns *)cur->node.next;
/* O(n), walk the list (shouldn't be too large). */
/* TODO: cut on first swap? random swaps? */
while (next && next->node.next != NULL) {
update_ns_preference(cur, next);
cur = next;
next = (struct kr_ns *)cur->node.next;
}
}
static void update_stats(struct kr_ns *ns, double rtt)
{
/* Knuth, TAOCP, p.232 (Welford running variance/mean). */
double d_mean = (rtt - ns->stat.M);
ns->stat.n += 1;
ns->stat.M += d_mean / ns->stat.n;
ns->stat.S += d_mean * (rtt - ns->stat.M);
/* Update NS position in preference list. */
update_ns_preference_list(ns);
}
//static void update_stats(struct kr_ns *ns, double rtt)
//{
// /* Knuth, TAOCP, p.232 (Welford running variance/mean). */
// double d_mean = (rtt - ns->stat.M);
// ns->stat.n += 1;
// ns->stat.M += d_mean / ns->stat.n;
// ns->stat.S += d_mean * (rtt - ns->stat.M);
//}
static int begin(knot_layer_t *ctx, void *param)
{
......@@ -67,55 +38,23 @@ static int begin(knot_layer_t *ctx, void *param)
static int finish(knot_layer_t *ctx)
{
struct kr_layer_param *param = ctx->data;
struct kr_result *result = param->result;
struct kr_rplan *rplan = param->rplan;
const knot_pkt_t *answer = param->answer;
#ifndef NDEBUG
char qnamestr[KNOT_DNAME_MAXLEN] = { '\0' };
knot_dname_to_str(qnamestr, knot_pkt_qname(result->ans), sizeof(qnamestr) - 1);
DEBUG_MSG("resolution of %s\n", qnamestr);
DEBUG_MSG("rcode: %d (%u RRs)\n", knot_wire_get_rcode(result->ans->wire), result->ans->rrset_count);
DEBUG_MSG("queries: %u\n", result->nr_queries);
DEBUG_MSG("total time: %u msecs\n", result->total_rtt);
#endif
return ctx->state;
}
static int query(knot_layer_t *ctx, knot_pkt_t *pkt)
{
struct kr_layer_param *param = ctx->data;
struct kr_result *result = param->result;
/* Store stats. */
gettimeofday(&result->t_start, NULL);
return ctx->state;
}
static int answer(knot_layer_t *ctx, knot_pkt_t *pkt)
{
assert(pkt && ctx);
struct kr_layer_param *param = ctx->data;
struct kr_context* resolve = param->ctx;
struct kr_result *result = param->result;
struct kr_ns *ns = resolve->current_ns;
/* Store stats. */
gettimeofday(&result->t_end, NULL);
/* Update NS statistics. */
double rtt = time_diff(&result->t_start, &result->t_end);
if (rtt > 0.0) {
update_stats(ns, rtt);
result->total_rtt += rtt;
/* Calculate total RTT and number of queries. */
double total_rtt = 0.0;
size_t nr_queries = list_size(&rplan->resolved);
if (nr_queries > 0) {
struct kr_query *query_first = HEAD(rplan->resolved);
struct timeval t_end;
gettimeofday(&t_end, NULL);
total_rtt = time_diff(&query_first->timestamp, &t_end);
}
#ifndef NDEBUG
char ns_name[KNOT_DNAME_MAXLEN] = { '\0' };
knot_dname_to_str(ns_name, ns->name, sizeof(ns_name));
DEBUG_MSG("answer from '%s' RC=%d, AA=%d, RTT: %.02f msecs\n",
ns_name, knot_wire_get_rcode(pkt->wire),
knot_wire_get_aa(pkt->wire) != 0, rtt);
lookup_table_t *rcode = lookup_by_id(knot_rcode_names, knot_wire_get_rcode(answer->wire));
DEBUG_MSG("result => %s [%u records]\n", rcode ? rcode->name : "??", answer->rrset_count);
DEBUG_MSG("rtt => %.02lf [ms]\n", total_rtt);
#endif
return ctx->state;
......@@ -126,8 +65,8 @@ static const knot_layer_api_t LAYER_STATS_MODULE = {
&begin,
NULL,
&finish,
&answer,
&query,
NULL,
NULL,
NULL
};
......
This diff is collapsed.
......@@ -15,7 +15,18 @@ limitations under the License.
#pragma once
#include <libknot/packet/pkt.h>
#include "context.h"
int kr_resolve(struct kr_context* ctx, struct kr_result* result,
/*!
* \brief Resolve an input query and produce a packet with an answer.
* \note The function doesn't change the packet question or message ID.
* \param ctx resolution context
* \param answer answer packet to be written
* \param qname resolved query name
* \param qclass resolved query class
* \param qtype resolved query type
* \return KNOT_E*
*/
int kr_resolve(struct kr_context* ctx, knot_pkt_t *answer,
const knot_dname_t *qname, uint16_t qclass, uint16_t qtype);
#include <sys/time.h>
#include <libknot/descriptor.h>
#include <libknot/processing/layer.h>
#include <libknot/errcode.h>
#include "lib/rplan.h"
#include "lib/context.h"
#include "lib/cache.h"
#define DEBUG_MSG(fmt, ...) fprintf(stderr, "[rplan] " fmt, ## __VA_ARGS__)
......@@ -27,20 +33,38 @@ static void query_free(mm_ctx_t *pool, struct kr_query *qry)
mm_free(pool, qry);
}
void kr_rplan_init(struct kr_rplan *rplan, mm_ctx_t *pool)
void kr_rplan_init(struct kr_rplan *rplan, struct kr_context *context, mm_ctx_t *pool)
{
memset(rplan, 0, sizeof(struct kr_rplan));
rplan->state = KNOT_NS_PROC_MORE;
rplan->pool = pool;
init_list(&rplan->q);
rplan->context = context;
init_list(&rplan->pending);
init_list(&rplan->resolved);
}
void kr_rplan_clear(struct kr_rplan *rplan)
void kr_rplan_deinit(struct kr_rplan *rplan)
{
struct kr_query *qry = NULL, *next = NULL;
WALK_LIST_DELSAFE(qry, next, rplan->q) {
WALK_LIST_DELSAFE(qry, next, rplan->pending) {
query_free(rplan->pool, qry);
}
WALK_LIST_DELSAFE(qry, next, rplan->resolved) {
query_free(rplan->pool, qry);
}
kr_rplan_init(rplan, rplan->pool);
/* Abort any pending transactions. */
if (rplan->txn.db != NULL) {
kr_cache_txn_abort(&rplan->txn);
}
kr_rplan_init(rplan, rplan->context, rplan->pool);
}
bool kr_rplan_empty(struct kr_rplan *rplan)
{
return EMPTY_LIST(rplan->pending);
}
struct kr_query *kr_rplan_push(struct kr_rplan *rplan, const knot_dname_t *name,
......@@ -53,9 +77,9 @@ struct kr_query *kr_rplan_push(struct kr_rplan *rplan, const knot_dname_t *name,
qry->sclass = cls;
qry->stype = type;
qry->flags = RESOLVE_QUERY;
gettimeofday(&qry->timestamp, NULL);
add_head(&rplan->q, &qry->node);
add_tail(&rplan->pending, &qry->node);
#ifndef NDEBUG
char name_str[KNOT_DNAME_MAXLEN], type_str[16];
......@@ -70,14 +94,70 @@ struct kr_query *kr_rplan_push(struct kr_rplan *rplan, const knot_dname_t *name,
int kr_rplan_pop(struct kr_rplan *rplan, struct kr_query *qry)
{
rem_node(&qry->node);
query_free(rplan->pool, qry);
return 0;
add_tail(&rplan->resolved, &qry->node);
return KNOT_EOK;
}
struct kr_query *kr_rplan_next(struct kr_rplan *rplan)
struct kr_query *kr_rplan_current(struct kr_rplan *rplan)
{
if (EMPTY_LIST(rplan->q)) {
if (EMPTY_LIST(rplan->pending)) {
return NULL;
}
return HEAD(rplan->q);
return TAIL(rplan->pending);
}
struct kr_query *kr_rplan_last(struct kr_rplan *rplan)
{
if (EMPTY_LIST(rplan->pending)) {
return NULL;
}
return HEAD(rplan->pending);
}
namedb_txn_t *kr_rplan_txn_acquire(struct kr_rplan *rplan, unsigned flags)
{
if (rplan == NULL) {
return NULL;
}
/* Discard current transaction if RDONLY, but WR is requested. */
if ((rplan->txn_flags & NAMEDB_RDONLY) && !(flags & NAMEDB_RDONLY)) {
kr_cache_txn_abort(&rplan->txn);
rplan->txn.db = NULL;
}
/* Reuse transaction if exists. */
if (rplan->txn.db != NULL) {
return &rplan->txn;
}
/* Transaction doesn't exist, start new one. */
int ret = kr_cache_txn_begin(rplan->context->cache, &rplan->txn, flags);
if (ret != KNOT_EOK) {
rplan->txn.db = NULL;
return NULL;
}
rplan->txn_flags = flags;
return &rplan->txn;
}
int kr_rplan_txn_commit(struct kr_rplan *rplan)
{
if (rplan == NULL) {
return KNOT_EINVAL;
}
/* Just discard RDONLY transactions. */
int ret = KNOT_EOK;
if (rplan->txn_flags & NAMEDB_RDONLY) {
kr_cache_txn_abort(&rplan->txn);
} else {
/* Commit write transactions. */
ret = kr_cache_txn_commit(&rplan->txn);
}
rplan->txn.db = NULL;
return ret;
}
......@@ -17,31 +17,111 @@ limitations under the License.
#include <libknot/dname.h>
#include <libknot/internal/lists.h>
#include <libknot/internal/namedb/namedb.h>
#include <libknot/internal/sockaddr.h>
enum {
RESOLVE_QUERY = 0 << 0,
RESOLVE_DELEG = 1 << 0,
};
#include "lib/context.h"
#include "lib/zonecut.h"
/*!
* \brief Single query representation.
*/
struct kr_query {
node_t node;
struct timeval timestamp;
knot_dname_t *sname;
uint16_t stype;
uint16_t sclass;
uint16_t id;
uint16_t flags;
void *ext;
};
/*!
* \brief Query resolution plan structure.
*
* The structure most importantly holds the original query, answer and the
* list of pending queries required to resolve the original query.
* It also keeps a notion of current zone cut.
*/
struct kr_rplan {
list_t q;
mm_ctx_t *pool;
unsigned state; /*!< Query resolution state. */
struct kr_zonecut zone_cut;
unsigned txn_flags; /*!< Current transaction flags. */
namedb_txn_t txn; /*!< Current transaction (may be r/o). */
list_t pending; /*!< List of pending queries. */
list_t resolved; /*!< List of resolved queries. */
struct kr_context *context; /*!< Parent resolution context. */
mm_ctx_t *pool; /*!< Temporary memory pool. */
};
void kr_rplan_init(struct kr_rplan *rplan, mm_ctx_t *pool);
void kr_rplan_clear(struct kr_rplan *rplan);
/*!
* \brief Initialize resolution plan (empty).
* \param rplan plan instance
* \param context resolution context
* \param pool ephemeral memory pool for whole resolution
*/
void kr_rplan_init(struct kr_rplan *rplan, struct kr_context *context, mm_ctx_t *pool);
/*!
* \brief Deinitialize resolution plan, aborting any uncommited transactions.
* \param rplan plan instance
*/
void kr_rplan_deinit(struct kr_rplan *rplan);
/*!
* \brief Return true if the resolution plan is empty (i.e. finished or initialized)
* \param rplan plan instance
* \return true or false
*/
bool kr_rplan_empty(struct kr_rplan *rplan);
/*!
* \brief Acquire rplan transaction (read or write only).
* \note The transaction is shared during the whole resolution, read only transactions
* may be promoted to write-enabled transactions if requested, but never demoted.
* \param rplan plan instance
* \param flags transaction flags
* \return transaction instance or NULL
*/
namedb_txn_t *kr_rplan_txn_acquire(struct kr_rplan *rplan, unsigned flags);
struct kr_query *kr_rplan_push(struct kr_rplan *rplan, const knot_dname_t *name, uint16_t cls,
uint16_t type);
/*!
* \brief Commit any existing transaction, read-only transactions may be just aborted.
* \param rplan plan instance
* \return KNOT_E*