Commit 5c374b15 authored by Marek Vavruša's avatar Marek Vavruša

dnstap: module implementation + thread tracking

parent 88586ee4
......@@ -14,6 +14,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <sys/stat.h>
#include "knot/modules/dnstap.h"
#include "knot/nameserver/query_module.h"
#include "knot/nameserver/process_query.h"
......@@ -26,80 +28,186 @@
/* Defines. */
#define MODULE_ERR(msg...) log_zone_error("Module 'dnstap': " msg)
static int log_message(int state, const knot_pkt_t *pkt, struct query_data *qdata, void *ctx, const Dnstap__Message__Type msgtype)
static int log_message(int state, const knot_pkt_t *pkt, struct query_data *qdata, void *ctx)
{
if (pkt == NULL || qdata == NULL || ctx == NULL) {
return NS_PROC_FAIL;
}
char *qname = knot_dname_to_str(knot_pkt_qname(pkt));
MODULE_ERR("answer_log: %s (%u ancount)\n", qname, knot_wire_get_ancount(pkt->wire));
free(qname);
int ret = KNOT_ERROR;
struct fstrm_iothr* iothread = ctx;
struct fstrm_iothr_queue *ioq = fstrm_iothr_get_input_queue_idx(iothread, 0);
struct fstrm_iothr_queue *ioq = fstrm_iothr_get_input_queue_idx(iothread, qdata->param->thread_id);
/* Unless we want to measure the time it takes to process each query,
* we can treat Q/R times the same. */
struct timeval tv;
gettimeofday(&tv, NULL);
Dnstap__Message msg;
int ret = dt_message_fill(&msg, msgtype,
(const struct sockaddr *)qdata->param->query_source, IPPROTO_UDP,
pkt->wire, pkt->size, NULL, &tv);
assert(ret == KNOT_EOK);
/* deal with it later (ret) */
/* Determine query / response. */
Dnstap__Message__Type msgtype = DNSTAP__MESSAGE__TYPE__AUTH_QUERY;
if (knot_wire_get_qr(pkt->wire)) {
msgtype = DNSTAP__MESSAGE__TYPE__AUTH_RESPONSE;
}
/* Determine whether we run on UDP/TCP. */
int protocol = IPPROTO_TCP;
if (qdata->param->proc_flags & NS_QUERY_LIMIT_SIZE) {
protocol = IPPROTO_UDP;
}
/* Create a dnstap message. */
Dnstap__Message msg;
ret = dt_message_fill(&msg, msgtype,
qdata->param->query_source, protocol,
pkt->wire, pkt->size, &tv, &tv);
if (ret != KNOT_EOK) {
return NS_PROC_FAIL;
}
Dnstap__Dnstap dnstap = DNSTAP__DNSTAP__INIT;
dnstap.type = DNSTAP__DNSTAP__TYPE__MESSAGE;
dnstap.message = (Dnstap__Message *)&msg;
/* Pack the message. */
uint8_t *frame = NULL;
size_t size = 0;
dt_pack(&dnstap, &frame, &size);
assert(size > 0);
assert(frame);
/* return value */
if (frame == NULL) {
return NS_PROC_FAIL;
}
/* Submit a request. */
fstrm_res res = fstrm_iothr_submit(iothread, ioq, frame, size, fstrm_free_wrapper, NULL);
if (res != fstrm_res_success) {
free(frame);
assert(0);
state = NS_PROC_FAIL;
}
/* return value */
return state;
}
static int dnstap_answer_log(int state, knot_pkt_t *pkt, struct query_data *qdata, void *ctx)
/*! \brief Submit message. */
static int dnstap_message_log(int state, knot_pkt_t *pkt, struct query_data *qdata, void *ctx)
{
if (pkt == NULL || qdata == NULL || ctx == NULL) {
return NS_PROC_FAIL;
}
log_message(state, qdata->query, qdata, ctx, DNSTAP__MESSAGE__TYPE__AUTH_QUERY);
log_message(state, pkt, qdata, ctx, DNSTAP__MESSAGE__TYPE__AUTH_RESPONSE);
return state;
return log_message(state, pkt, qdata, ctx);
}
/*! \brief Create a UNIX socket sink. */
static struct fstrm_writer* dnstap_unix_writer(const char *path)
{
struct fstrm_unix_writer_options *opt = NULL;
struct fstrm_writer_options *wopt = NULL;
struct fstrm_writer *writer = NULL;
opt = fstrm_unix_writer_options_init();
if (opt == NULL) {
goto finish;
}
fstrm_unix_writer_options_set_socket_path(opt, path);
wopt = fstrm_writer_options_init();
if (wopt == NULL) {
goto finish;
}
fstrm_writer_options_add_content_type(wopt,
(const uint8_t *) DNSTAP_CONTENT_TYPE,
strlen(DNSTAP_CONTENT_TYPE));
writer = fstrm_unix_writer_init(opt, wopt);
finish:
fstrm_unix_writer_options_destroy(&opt);
fstrm_writer_options_destroy(&wopt);
return writer;
}
/*! \brief Create a basic file writer sink. */
static struct fstrm_writer* dnstap_file_writer(const char *path)
{
struct fstrm_file_options *fopt = NULL;
struct fstrm_writer_options *wopt = NULL;
struct fstrm_writer *writer = NULL;
fopt = fstrm_file_options_init();
if (fopt == NULL) {
goto finish;
}
fstrm_file_options_set_file_path(fopt, path);
wopt = fstrm_writer_options_init();
if (wopt == NULL) {
goto finish;
}
fstrm_writer_options_add_content_type(wopt,
(const uint8_t *) DNSTAP_CONTENT_TYPE,
strlen(DNSTAP_CONTENT_TYPE));
writer = fstrm_file_writer_init(fopt, wopt);
finish:
fstrm_file_options_destroy(&fopt);
fstrm_writer_options_destroy(&wopt);
return writer;
}
/*! \brief Create a log sink according to the path string. */
static struct fstrm_writer* dnstap_writer(const char *path)
{
const char *prefix = "unix:";
const size_t prefix_len = strlen(prefix);
/* UNIX socket prefix. */
if (strlen(path) > prefix_len && strncmp(path, prefix, prefix_len) == 0) {
return dnstap_unix_writer(path + prefix_len);
}
return dnstap_file_writer(path);
}
int dnstap_load(struct query_plan *plan, struct query_module *self)
{
/* Initialize the writer and the options. */
int ret = KNOT_ENOMEM;
struct fstrm_writer *writer = dnstap_writer(self->param);
if (writer == NULL) {
goto fail;
}
struct fstrm_iothr_options *opt = fstrm_iothr_options_init();
if (opt == NULL) {
fstrm_writer_destroy(&writer);
goto fail;
}
/* Initialize queues. */
size_t qcount = conf_udp_threads(conf()) + conf_tcp_threads(conf());
fstrm_iothr_options_set_num_input_queues(opt, qcount);
/* Create the I/O thread. */
struct fstrm_iothr* iothread = fstrm_iothr_init(opt, &writer);
fstrm_iothr_options_destroy(&opt);
/* Save in query module, it takes ownership from now on. */
dt_writer_t *writer = dt_writer_create(self->param, "something");
assert(writer);
struct fstrm_iothr_options* opt = fstrm_iothr_options_init();
assert(opt);
fstrm_iothr_options_set_queue_model(opt, FSTRM_IOTHR_QUEUE_MODEL_MPSC);
struct fstrm_iothr* iothread = fstrm_iothr_init(opt, &writer->fw);
if (iothread == NULL) {
fstrm_writer_destroy(&writer);
goto fail;
}
self->ctx = iothread;
assert(iothread);
query_plan_step(plan, QPLAN_END, dnstap_answer_log, self->ctx);
/* Hook to the query plan. */
query_plan_step(plan, QPLAN_BEGIN, dnstap_message_log, self->ctx);
query_plan_step(plan, QPLAN_END, dnstap_message_log, self->ctx);
return KNOT_EOK;
fail:
MODULE_ERR("init(\"%s\") failed - %s\n", self->param, knot_strerror(ret));
return ret;
}
int dnstap_unload(struct query_module *self)
{
struct fstrm_iothr* iothread = self->ctx;
fstrm_iothr_destroy(&iothread);
self->ctx = NULL;
return KNOT_EOK;
}
......@@ -69,6 +69,7 @@ struct process_query_param {
int query_socket;
struct sockaddr_storage *query_source;
server_t *server;
unsigned thread_id;
};
/*! \brief Query processing intermediate data. */
......
......@@ -242,11 +242,13 @@ static int reconfigure_sockets(const struct conf_t *conf, server_t *s)
s->ifaces = newlist;
/* Update TCP+UDP ifacelist (reload all threads). */
unsigned thread_count = 0;
for (unsigned proto = IO_UDP; proto <= IO_TCP; ++proto) {
dt_unit_t *tu = s->handler[proto].unit;
for (unsigned i = 0; i < tu->size; ++i) {
ref_retain((ref_t *)newlist);
s->handler[proto].thread_state[i] |= ServerReload;
s->handler[proto].thread_id[i] = thread_count++;
if (s->state & ServerRunning) {
dt_activate(tu->threads[i]);
dt_signalize(tu->threads[i], SIGALRM);
......@@ -342,6 +344,13 @@ static int server_init_handler(server_t *server, int index, int thread_count,
return KNOT_ENOMEM;
}
h->thread_id = calloc(thread_count, sizeof(unsigned));
if (h->thread_id == NULL) {
free(h->thread_id);
dt_delete(&h->unit);
return KNOT_ENOMEM;
}
return KNOT_EOK;
}
......@@ -360,6 +369,7 @@ static void server_free_handler(iohandler_t *h)
/* Destroy worker context. */
dt_delete(&h->unit);
free(h->thread_state);
free(h->thread_id);
memset(h, 0, sizeof(iohandler_t));
}
......
......@@ -55,7 +55,8 @@ typedef struct iohandler {
struct node n;
struct server_t *server; /*!< Reference to server */
dt_unit_t *unit; /*!< Threading unit */
unsigned *thread_state; /*< Thread state */
unsigned *thread_state; /*!< Thread state */
unsigned *thread_id; /*!< Thread identifier. */
} iohandler_t;
/*! \brief Round-robin mechanism of switching.
......
......@@ -51,6 +51,7 @@ typedef struct tcp_context {
unsigned client_threshold; /*!< Index of first TCP client. */
timev_t last_poll_time; /*!< Time of the last socket poll. */
fdset_t set; /*!< Set of server/client sockets. */
unsigned thread_id; /*!< Thread identifier. */
} tcp_context_t;
/*
......@@ -104,6 +105,7 @@ static int tcp_handle(tcp_context_t *tcp, int fd,
param.query_socket = fd;
param.query_source = &ss;
param.server = tcp->server;
param.thread_id = tcp->thread_id;
rx->iov_len = KNOT_WIRE_MAX_PKTSIZE;
tx->iov_len = KNOT_WIRE_MAX_PKTSIZE;
......@@ -371,6 +373,7 @@ int tcp_master(dthread_t *thread)
/* Create TCP answering context. */
tcp.server = handler->server;
tcp.thread_id = handler->thread_id[dt_get_id(thread)];
/* Create big enough memory cushion. */
mm_ctx_mempool(&tcp.query_ctx.mm, 4 * sizeof(knot_pkt_t));
......
......@@ -61,6 +61,7 @@ enum {
typedef struct udp_context {
knot_process_t query_ctx; /*!< Query processing context. */
server_t *server; /*!< Name server structure. */
unsigned thread_id; /*!< Thread identifier. */
} udp_context_t;
/* FD_COPY macro compat. */
......@@ -478,6 +479,7 @@ int udp_master(dthread_t *thread)
udp_context_t udp;
memset(&udp, 0, sizeof(udp_context_t));
udp.server = handler->server;
udp.thread_id = handler->thread_id[thr_id];
/* Create big enough memory cushion. */
mm_ctx_mempool(&udp.query_ctx.mm, 4 * sizeof(knot_pkt_t));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment