Commit 264c08a0 authored by Marek Vavrusa's avatar Marek Vavrusa

modules/stats: stats.upstreams()

the new function returns a list of upstream
authoritative servers that resolver contacted
recently and the RTT information for them,
this is useful for sampling information about
the quality of outbound connections for speculative
keepalive and other purposes
parent 5fb8050c
......@@ -39,6 +39,15 @@ in new ones.
-- Fetch most common queries (sorted by frequency)
> table.sort(stats.frequent(), function (a, b) return a.count > b.count end)
-- Show recently contacted authoritative servers
> stats.upstreams()
[2a01:618:404::1] => {
[1] => 26 -- RTT
}
[128.241.220.33] => {
[1] => 31 - RTT
}
Properties
^^^^^^^^^^
......@@ -62,6 +71,12 @@ Set nominal value of given metric.
Outputs collected metrics as a JSON dictionary.
.. function:: stats.upstreams()
Outputs a list of recent upstreams and their RTT. It is sorted by time and stored in a ring buffer of
a fixed size. This means it's not aggregated and readable by multiple consumers, but also that
you may lose entries if you don't read quickly enough. The default ring size is 512 entries, and may be overriden on compile time by ``-DUPSTREAMS_COUNT=X``.
.. function:: stats.frequent()
Outputs list of most frequent iterative queries as a JSON array. The queries are sampled probabilistically,
......
......@@ -26,6 +26,7 @@
#include <libknot/descriptor.h>
#include <ccan/json/json.h>
#include <contrib/cleanup.h>
#include <arpa/inet.h>
#include "lib/layer/iterate.h"
#include "lib/rplan.h"
......@@ -39,8 +40,15 @@
/* Defaults */
#define DEBUG_MSG(qry, fmt...) QRDEBUG(qry, "stat", fmt)
#define FREQUENT_COUNT 5000 /* Size of frequent tables */
#define FREQUENT_PSAMPLE 10 /* Sampling rate, 1 in N */
#define FREQUENT_PSAMPLE 10 /* Sampling rate, 1 in N */
#ifdef LRU_REP_SIZE
#define FREQUENT_COUNT LRU_REP_SIZE /* Size of frequent tables */
#else
#define FREQUENT_COUNT 5000 /* Size of frequent tables */
#endif
#ifndef UPSTREAMS_COUNT
#define UPSTREAMS_COUNT 512 /* Size of recent upstreams */
#endif
/** @cond internal Fixed-size map of predefined metrics. */
#define CONST_METRICS(X) \
......@@ -68,6 +76,7 @@ static struct const_metric_elm const_metrics[] = {
/** @internal LRU hash of most frequent names. */
typedef lru_hash(unsigned) namehash_t;
typedef array_t(struct sockaddr_in6) addrlist_t;
/** @internal Stats data structure. */
struct stat_data {
......@@ -76,8 +85,15 @@ struct stat_data {
namehash_t *frequent;
namehash_t *expiring;
} queries;
struct {
addrlist_t q;
size_t head;
} upstreams;
};
/** @internal We don't store/publish port, repurpose it for RTT instead. */
#define sin6_rtt sin6_port
/** @internal Add to const map counter */
static inline void stat_const_add(struct stat_data *data, enum const_metric key, ssize_t incr)
{
......@@ -137,6 +153,35 @@ static void collect_sample(struct stat_data *data, struct kr_rplan *rplan, knot_
}
}
static int collect_rtt(knot_layer_t *ctx, knot_pkt_t *pkt)
{
struct kr_request *req = ctx->data;
struct kr_query *qry = req->current_query;
if (qry->flags & QUERY_CACHED || !req->upstream.addr) {
return ctx->state;
}
/* Push address and RTT to the ring buffer head */
struct kr_module *module = ctx->api->data;
struct stat_data *data = module->data;
/* Socket address is encoded into sockaddr_in6 struct that
* unions with sockaddr_in and differ in sa_family */
struct sockaddr_in6 *e = &data->upstreams.q.at[data->upstreams.head];
const struct sockaddr *src = req->upstream.addr;
switch (src->sa_family) {
case AF_INET: memcpy(e, src, sizeof(struct sockaddr_in)); break;
case AF_INET6: memcpy(e, src, sizeof(struct sockaddr_in6)); break;
default: return ctx->state;
}
/* Replace port number with the RTT information (cap is UINT16_MAX milliseconds) */
e->sin6_rtt = req->upstream.rtt;
/* Advance ring buffer head */
data->upstreams.head = (data->upstreams.head + 1) % UPSTREAMS_COUNT;
return ctx->state;
}
static int collect(knot_layer_t *ctx)
{
struct kr_request *param = ctx->data;
......@@ -341,6 +386,43 @@ static char* clear_expiring(void *env, struct kr_module *module, const char *arg
return NULL;
}
static char* dump_upstreams(void *env, struct kr_module *module, const char *args)
{
struct stat_data *data = module->data;
if (!data) {
return NULL;
}
/* Walk the ring backwards until AF_UNSPEC or we hit head. */
JsonNode *root = json_mkobject();
size_t head = data->upstreams.head;
for (size_t i = 1; i < UPSTREAMS_COUNT; ++i) {
size_t h = (UPSTREAMS_COUNT + head - i) % UPSTREAMS_COUNT;
struct sockaddr_in6 *e = &data->upstreams.q.at[h];
if (e->sin6_family == AF_UNSPEC) {
break;
}
/* Convert address to string */
char addr_str[INET6_ADDRSTRLEN];
const char *ret = inet_ntop(e->sin6_family, kr_inaddr((const struct sockaddr *)e), addr_str, sizeof(addr_str));
if (!ret) {
break;
}
/* Append to map with an array encoding RTTs */
JsonNode *json_val = json_find_member(root, addr_str);
if (!json_val) {
json_val = json_mkarray();
json_append_member(root, addr_str, json_val);
}
json_append_element(json_val, json_mknumber(e->sin6_rtt));
}
/* Encode and return */
char *ret = json_encode(root);
json_delete(root);
return ret;
}
/*
* Module implementation.
*/
......@@ -349,6 +431,7 @@ KR_EXPORT
const knot_layer_api_t *stats_layer(struct kr_module *module)
{
static knot_layer_api_t _layer = {
.consume = &collect_rtt,
.finish = &collect,
};
/* Store module reference */
......@@ -363,6 +446,7 @@ int stats_init(struct kr_module *module)
if (!data) {
return kr_error(ENOMEM);
}
memset(data, 0, sizeof(*data));
data->map = map_make();
module->data = data;
data->queries.frequent = malloc(lru_size(namehash_t, FREQUENT_COUNT));
......@@ -373,6 +457,15 @@ int stats_init(struct kr_module *module)
if (data->queries.expiring) {
lru_init(data->queries.expiring, FREQUENT_COUNT);
}
/* Initialize ring buffer of recently visited upstreams */
array_init(data->upstreams.q);
if (array_reserve(data->upstreams.q, UPSTREAMS_COUNT) != 0) {
return kr_error(ENOMEM);
}
for (size_t i = 0; i < UPSTREAMS_COUNT; ++i) {
struct sockaddr *sa = (struct sockaddr *)&data->upstreams.q.at[i];
sa->sa_family = AF_UNSPEC;
}
return kr_ok();
}
......@@ -386,6 +479,7 @@ int stats_deinit(struct kr_module *module)
lru_deinit(data->queries.expiring);
free(data->queries.frequent);
free(data->queries.expiring);
array_clear(data->upstreams.q);
free(data);
}
return kr_ok();
......@@ -402,6 +496,7 @@ struct kr_prop *stats_props(void)
{ &clear_frequent,"clear_frequent", "Clear frequent queries log.", },
{ &dump_expiring, "expiring", "List expiring records.", },
{ &clear_expiring,"clear_expiring", "Clear expiring records log.", },
{ &dump_upstreams, "upstreams", "List recently seen authoritatives.", },
{ NULL, NULL, NULL }
};
return prop_list;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment