Commit ef9af85c authored by Marek Vavrusa's avatar Marek Vavrusa

All thread units are now coherent, dthreads do not allow any fiddling.

This is because it makes the design straightforward and simple to
understand.
parent 92ac648c
......@@ -307,10 +307,10 @@ static int server_bind_sockets(server_t *s)
/* Update TCP+UDP ifacelist (reload all threads). */
for (unsigned proto = IO_UDP; proto <= IO_TCP; ++proto) {
dt_unit_t *tu = s->h[proto].unit;
dt_unit_t *tu = s->handler[proto].unit;
for (unsigned i = 0; i < tu->size; ++i) {
ref_retain((ref_t *)newlist);
s->h[proto].state[i].s |= ServerReload;
s->handler[proto].thread_state[i] |= ServerReload;
if (s->state & ServerRunning) {
dt_activate(tu->threads[i]);
dt_signalize(tu->threads[i], SIGALRM);
......@@ -336,7 +336,7 @@ server_t *server_create()
// Create event scheduler
dbg_server("server: creating event scheduler\n");
server->sched = evsched_new();
server->iosched = dt_create_coherent(1, evsched_run, evsched_destruct,
server->iosched = dt_create(1, evsched_run, evsched_destruct,
server->sched);
// Create name server
......@@ -360,29 +360,29 @@ server_t *server_create()
return server;
}
int server_init_handler(iohandler_t * h, server_t *s, dt_unit_t *tu, void *d)
static int server_init_handler(server_t *server, int index, int thread_count,
runnable_t runnable, runnable_t destructor)
{
/* Initialize */
iohandler_t *h = &server->handler[index];
memset(h, 0, sizeof(iohandler_t));
h->server = s;
h->unit = tu;
h->data = d;
h->state = malloc(tu->size * sizeof(iostate_t));
/* Update unit data object */
for (int i = 0; i < tu->size; ++i) {
dthread_t *thread = tu->threads[i];
h->state[i].h = h;
h->state[i].s = 0;
if (thread->run) {
dt_repurpose(thread, thread->run, h->state + i);
}
h->server = server;
h->unit = dt_create(thread_count, runnable, destructor, h);
if (h->unit == NULL) {
return KNOT_ENOMEM;
}
h->thread_state = malloc(thread_count * sizeof(unsigned));
if (h->thread_state == NULL) {
dt_delete(&h->unit);
return KNOT_ENOMEM;
}
memset(h->thread_state, 0, thread_count * sizeof(unsigned));
return KNOT_EOK;
}
int server_free_handler(iohandler_t *h)
static int server_free_handler(iohandler_t *h)
{
if (!h || !h->server) {
return KNOT_EINVAL;
......@@ -395,12 +395,8 @@ int server_free_handler(iohandler_t *h)
}
/* Destroy worker context. */
if (h->dtor) {
h->dtor(h->data);
h->data = NULL;
}
dt_delete(&h->unit);
free(h->state);
free(h->thread_state);
memset(h, 0, sizeof(iohandler_t));
return KNOT_EOK;
}
......@@ -425,7 +421,7 @@ int server_start(server_t *s)
s->state |= ServerRunning;
if (s->tu_size > 0) {
for (unsigned i = 0; i < IO_COUNT; ++i) {
ret = dt_start(s->h[i].unit);
ret = dt_start(s->handler[i].unit);
}
}
......@@ -447,7 +443,7 @@ int server_wait(server_t *s)
int ret = KNOT_EOK;
for (unsigned i = 0; i < IO_COUNT; ++i) {
if ((ret = server_free_handler(s->h + i)) != KNOT_EOK) {
if ((ret = server_free_handler(s->handler + i)) != KNOT_EOK) {
break;
}
}
......@@ -585,25 +581,34 @@ int server_conf_hook(const struct conf_t *conf, void *data)
/* Free old handlers */
if (server->tu_size > 0) {
for (unsigned i = 0; i < IO_COUNT; ++i) {
ret = server_free_handler(server->h + i);
ret = server_free_handler(server->handler + i);
}
}
/* Initialize I/O handlers. */
dt_unit_t *tu = dt_create_coherent(tu_size, &udp_master,
&udp_master_destruct, NULL);
server_init_handler(server->h + IO_UDP, server, tu, NULL);
ret = server_init_handler(server, IO_UDP, tu_size,
&udp_master, &udp_master_destruct);
if (ret != KNOT_EOK) {
log_server_error("Failed to create UDP threads: %s\n",
knot_strerror(ret));
return ret;
}
/* Create at least CONFIG_XFERS threads for TCP for faster
* processing of massive bootstrap queries. */
tu = dt_create_coherent(MAX(tu_size * 2, CONFIG_XFERS),
&tcp_master, &tcp_master_destruct, NULL);
server_init_handler(server->h + IO_TCP, server, tu, NULL);
ret = server_init_handler(server, IO_TCP, MAX(tu_size * 2, CONFIG_XFERS),
&tcp_master, &tcp_master_destruct);
if (ret != KNOT_EOK) {
log_server_error("Failed to create TCP threads: %s\n",
knot_strerror(ret));
return ret;
}
/* Start if server is running. */
if (server->state & ServerRunning) {
for (unsigned i = 0; i < IO_COUNT; ++i) {
ret = dt_start(server->h[i].unit);
ret = dt_start(server->handler[i].unit);
}
}
server->tu_size = tu_size;
......
......@@ -50,20 +50,13 @@ struct iface_t;
struct server_t;
struct conf_t;
typedef struct iostate {
volatile unsigned s;
struct iohandler* h;
} iostate_t;
/*! \brief I/O handler structure.
*/
typedef struct iohandler {
struct node n;
dt_unit_t *unit; /*!< Threading unit */
struct server_t *server; /*!< Reference to server */
void *data; /*!< Persistent data for I/O handler. */
iostate_t *state;
void (*dtor)(void *data); /*!< Data destructor. */
dt_unit_t *unit; /*!< Threading unit */
unsigned *thread_state; /*< Thread state */
} iohandler_t;
/*! \brief Round-robin mechanism of switching.
......@@ -119,7 +112,7 @@ typedef struct server_t {
/*! \brief I/O handlers. */
unsigned tu_size;
xfrhandler_t *xfr;
iohandler_t h[IO_COUNT];
iohandler_t handler[IO_COUNT];
/*! \brief Event scheduler. */
dt_unit_t *iosched;
......@@ -143,29 +136,6 @@ typedef struct server_t {
*/
server_t *server_create();
/*!
* \brief Create I/O handler.
*
* \param h Initialized handler.
* \param s Server structure to be used for operation.
* \param u Threading unit to serve given filedescriptor.
* \param d Handler data.
*
* \retval Handler instance if successful.
* \retval NULL If an error occured.
*/
int server_init_handler(iohandler_t * h, server_t *s, dt_unit_t *u, void *d);
/*!
* \brief Delete handler.
*
* \param ref I/O handler instance.
*
* \retval KNOT_EOK on success.
* \retval KNOT_EINVAL on invalid parameters.
*/
int server_free_handler(iohandler_t *h);
/*!
* \brief Starts the server.
*
......
......@@ -366,16 +366,17 @@ int tcp_master(dthread_t *thread)
return KNOT_EINVAL;
}
iohandler_t *handler = (iohandler_t *)thread->data;
unsigned *iostate = &handler->thread_state[dt_get_id(thread)];
int ret = KNOT_EOK;
iostate_t *st = (iostate_t *)thread->data;
server_t *server = st->h->server;
ref_t *ref = NULL;
tcp_context_t tcp;
memset(&tcp, 0, sizeof(tcp_context_t));
/* Create TCP answering context. */
memset(&tcp.query_ctx, 0, sizeof(tcp.query_ctx));
tcp.query_ctx.ns = server->nameserver;
tcp.query_ctx.ns = handler->server->nameserver;
/* Create big enough memory cushion. */
mm_ctx_mempool(&tcp.query_ctx.mm, 4 * sizeof(knot_pkt_t));
......@@ -404,8 +405,8 @@ int tcp_master(dthread_t *thread)
for(;;) {
/* Check handler state. */
if (knot_unlikely(st->s & ServerReload)) {
st->s &= ~ServerReload;
if (knot_unlikely(*iostate & ServerReload)) {
*iostate &= ~ServerReload;
/* Cancel client connections. */
for (unsigned i = tcp.client_threshold; i < tcp.set.n; ++i) {
......@@ -413,7 +414,7 @@ int tcp_master(dthread_t *thread)
}
ref_release(ref);
ref = server_set_ifaces(server, &tcp.set, IO_TCP);
ref = server_set_ifaces(handler->server, &tcp.set, IO_TCP);
if (tcp.set.n == 0) {
break; /* Terminate on zero interfaces. */
}
......
......@@ -455,20 +455,35 @@ void __attribute__ ((constructor)) udp_master_init()
#endif /* HAVE_RECVMMSG */
}
int udp_reader(iohandler_t *h, dthread_t *thread)
int udp_master(dthread_t *thread)
{
unsigned cpu = dt_online_cpus();
if (cpu > 1) {
unsigned cpu_mask[2];
cpu_mask[0] = dt_get_id(thread) % cpu;
cpu_mask[1] = (cpu_mask[0] + 2) % cpu;
dt_setaffinity(thread, cpu_mask, 2);
}
iostate_t *st = (iostate_t *)thread->data;
/* Drop all capabilities on all workers. */
#ifdef HAVE_CAP_NG_H
if (capng_have_capability(CAPNG_EFFECTIVE, CAP_SETPCAP)) {
capng_clear(CAPNG_SELECT_BOTH);
capng_apply(CAPNG_SELECT_BOTH);
}
#endif /* HAVE_CAP_NG_H */
/* Prepare structures for bound sockets. */
unsigned thr_id = dt_get_id(thread);
iohandler_t *handler = (iohandler_t *)thread->data;
unsigned *iostate = &handler->thread_state[thr_id];
void *rq = _udp_init();
ifacelist_t *ref = NULL;
/* Create UDP answering context. */
ns_proc_context_t query_ctx;
memset(&query_ctx, 0, sizeof(query_ctx));
query_ctx.ns = h->server->nameserver;
query_ctx.ns = handler->server->nameserver;
/* Create big enough memory cushion. */
mm_ctx_mempool(&query_ctx.mm, 4 * sizeof(knot_pkt_t));
......@@ -490,15 +505,15 @@ int udp_reader(iohandler_t *h, dthread_t *thread)
for (;;) {
/* Check handler state. */
if (knot_unlikely(st->s & ServerReload)) {
st->s &= ~ServerReload;
if (knot_unlikely(*iostate & ServerReload)) {
*iostate &= ~ServerReload;
maxfd = 0;
minfd = INT_MAX;
FD_ZERO(&fds);
rcu_read_lock();
ref_release((ref_t *)ref);
ref = h->server->ifaces;
ref = handler->server->ifaces;
if (ref) {
iface_t *i = NULL;
WALK_LIST(i, ref->l) {
......@@ -544,30 +559,6 @@ int udp_reader(iohandler_t *h, dthread_t *thread)
return KNOT_EOK;
}
int udp_master(dthread_t *thread)
{
unsigned cpu = dt_online_cpus();
if (cpu > 1) {
unsigned cpu_mask[2];
cpu_mask[0] = dt_get_id(thread) % cpu;
cpu_mask[1] = (cpu_mask[0] + 2) % cpu;
dt_setaffinity(thread, cpu_mask, 2);
}
/* Drop all capabilities on all workers. */
#ifdef HAVE_CAP_NG_H
if (capng_have_capability(CAPNG_EFFECTIVE, CAP_SETPCAP)) {
capng_clear(CAPNG_SELECT_BOTH);
capng_apply(CAPNG_SELECT_BOTH);
}
#endif /* HAVE_CAP_NG_H */
iostate_t *st = (iostate_t *)thread->data;
if (!st) return KNOT_EINVAL;
iohandler_t *h = st->h;
return udp_reader(h, thread);
}
int udp_master_destruct(dthread_t *thread)
{
knot_crypto_cleanup_thread();
......
......@@ -511,11 +511,11 @@ static int xfr_async_finish(fdset_t *set, unsigned id)
}
/*! \brief Finalize XFR/IN transfer. */
static int xfr_task_finalize(xfrworker_t *w, knot_ns_xfr_t *rq)
static int xfr_task_finalize(xfrhandler_t *xfr, knot_ns_xfr_t *rq)
{
int ret = KNOT_EINVAL;
rcu_read_lock();
knot_nameserver_t *ns = w->master->ns;
knot_nameserver_t *ns = xfr->ns;
if (rq->type == XFR_TYPE_AIN) {
ret = zones_save_zone(rq);
......@@ -558,9 +558,9 @@ static int xfr_task_finalize(xfrworker_t *w, knot_ns_xfr_t *rq)
}
/*! \brief Query response event handler function. */
static int xfr_task_resp(xfrworker_t *w, knot_ns_xfr_t *rq)
static int xfr_task_resp(xfrhandler_t *xfr, knot_ns_xfr_t *rq)
{
knot_nameserver_t *ns = w->master->ns;
knot_nameserver_t *ns = xfr->ns;
knot_pkt_t *re = knot_pkt_new(rq->wire, rq->wire_size, NULL);
if (re == NULL) {
return KNOT_ENOMEM;
......@@ -641,7 +641,7 @@ static int xfr_task_resp(xfrworker_t *w, knot_ns_xfr_t *rq)
/*! \brief This will fall back to AXFR on active connection.
* \note The active connection is expected to be force shut.
*/
static int xfr_start_axfr(xfrworker_t *w, knot_ns_xfr_t *rq, const char *reason)
static int xfr_start_axfr(xfrhandler_t *xfr, knot_ns_xfr_t *rq, const char *reason)
{
log_zone_notice("%s %s\n", rq->msg, reason);
......@@ -657,7 +657,7 @@ static int xfr_start_axfr(xfrworker_t *w, knot_ns_xfr_t *rq, const char *reason)
/* Enqueue new request and close the original. */
log_server_notice("%s Retrying with AXFR.\n", rq->msg);
xfr_enqueue(w->master, axfr);
xfr_enqueue(xfr, axfr);
return KNOT_ECONNREFUSED;
}
......@@ -685,11 +685,11 @@ static int xfr_fallback_axfr(knot_ns_xfr_t *rq)
return ret;
}
static int xfr_task_xfer(xfrworker_t *w, knot_ns_xfr_t *rq)
static int xfr_task_xfer(xfrhandler_t *xfr, knot_ns_xfr_t *rq)
{
/* Process incoming packet. */
int ret = KNOT_EOK;
knot_nameserver_t *ns = w->master->ns;
knot_nameserver_t *ns = xfr->ns;
switch(rq->type) {
case XFR_TYPE_AIN:
ret = knot_ns_process_axfrin(ns, rq);
......@@ -719,7 +719,7 @@ static int xfr_task_xfer(xfrworker_t *w, knot_ns_xfr_t *rq)
if (rq->type == XFR_TYPE_IIN) {
switch(ret) {
case KNOT_ESPACE: /* Fallthrough */
return xfr_start_axfr(w, rq, diff_nospace_msg);
return xfr_start_axfr(xfr, rq, diff_nospace_msg);
case KNOT_EXFRREFUSED:
return xfr_fallback_axfr(rq);
default:
......@@ -736,7 +736,7 @@ static int xfr_task_xfer(xfrworker_t *w, knot_ns_xfr_t *rq)
/* Only for successful xfers. */
if (ret > 0) {
ret = xfr_task_finalize(w, rq);
ret = xfr_task_finalize(xfr, rq);
/* EBUSY on incremental transfer has a special meaning and
* is caused by a journal not able to free up space for incoming
......@@ -750,9 +750,9 @@ static int xfr_task_xfer(xfrworker_t *w, knot_ns_xfr_t *rq)
* zone transfer in this case. */
if (ret == KNOT_EBUSY && rq->type == XFR_TYPE_IIN) {
return xfr_start_axfr(w, rq, diff_nospace_msg);
return xfr_start_axfr(xfr, rq, diff_nospace_msg);
} else if (ret == KNOT_EINVAL && rq->type == XFR_TYPE_IIN) {
return xfr_start_axfr(w, rq, diff_invalid_msg);
return xfr_start_axfr(xfr, rq, diff_invalid_msg);
} else {
/* Passed, schedule NOTIFYs. */
......@@ -776,7 +776,7 @@ static int xfr_task_xfer(xfrworker_t *w, knot_ns_xfr_t *rq)
}
/*! \brief Incoming packet handling function. */
static int xfr_process_event(xfrworker_t *w, knot_ns_xfr_t *rq)
static int xfr_process_event(xfrhandler_t *xfr, knot_ns_xfr_t *rq)
{
/* Check if zone is valid. */
if (knot_zone_flags(rq->zone) & KNOT_ZONE_DISCARDED) {
......@@ -800,9 +800,9 @@ static int xfr_process_event(xfrworker_t *w, knot_ns_xfr_t *rq)
case XFR_TYPE_NOTIFY:
case XFR_TYPE_SOA:
case XFR_TYPE_FORWARD:
return xfr_task_resp(w, rq);
return xfr_task_resp(xfr, rq);
default:
return xfr_task_xfer(w, rq);
return xfr_task_xfer(xfr, rq);
}
}
......@@ -841,8 +841,7 @@ static enum fdset_sweep_state xfr_sweep(fdset_t *set, int i, void *data)
int xfr_worker(dthread_t *thread)
{
assert(thread != NULL && thread->data != NULL);
xfrworker_t *w = (xfrworker_t *)thread->data;
xfrhandler_t *xfr = w->master;
xfrhandler_t *xfr = (xfrhandler_t *)thread->data;
/* Buffer for answering. */
size_t buflen = SOCKET_MTU_SZ;
......@@ -858,7 +857,7 @@ int xfr_worker(dthread_t *thread)
next_sweep.tv_sec += XFR_SWEEP_INTERVAL;
/* Approximate thread capacity limits. */
unsigned threads = w->master->unit->size;
unsigned threads = xfr->unit->size;
unsigned thread_capacity = XFR_MAX_TASKS / threads;
/* Set of connections. */
......@@ -940,7 +939,7 @@ int xfr_worker(dthread_t *thread)
if (rq->flags & XFR_FLAG_CONNECTING)
ret = xfr_async_finish(&set, i);
else
ret = xfr_process_event(w, rq);
ret = xfr_process_event(xfr, rq);
} else {
/* Inactive connection. */
++i;
......@@ -1001,38 +1000,25 @@ int xfr_worker(dthread_t *thread)
xfrhandler_t *xfr_create(size_t thrcount, knot_nameserver_t *ns)
{
/* Create XFR handler data. */
const size_t total_size = sizeof(xfrhandler_t) + thrcount * sizeof(xfrworker_t);
xfrhandler_t *xfr = malloc(total_size);
xfrhandler_t *xfr = malloc(sizeof(xfrhandler_t));
if (xfr == NULL) {
return NULL;
}
memset(xfr, 0, total_size);
memset(xfr, 0, sizeof(xfrhandler_t));
xfr->ns = ns;
/* Create threading unit. */
xfr->unit = dt_create(thrcount);
xfr->unit = dt_create(thrcount, xfr_worker, NULL, xfr);
if (xfr->unit == NULL) {
free(xfr);
return NULL;
}
/* Create worker threads. */
for (unsigned i = 0; i < thrcount; ++i) {
xfrworker_t *w = xfr->workers + i;
w->master = xfr;
}
/* Create tasks structure and mutex. */
pthread_mutex_init(&xfr->mx, 0);
pthread_mutex_init(&xfr->pending_mx, 0);
init_list(&xfr->queue);
/* Assign worker threads. */
dthread_t **threads = xfr->unit->threads;
for (unsigned i = 0; i < thrcount; ++i) {
dt_repurpose(threads[i], xfr_worker, xfr->workers + i);
}
return xfr;
}
......
......@@ -41,14 +41,6 @@ enum xfrstate_t {
XFR_PENDING,
};
/*!
* \brief XFR worker structure.
*/
typedef struct xfrworker_t
{
struct xfrhandler_t *master; /*! \brief Worker master. */
} xfrworker_t;
/*!
* \brief XFR handler structure.
*/
......@@ -60,7 +52,6 @@ typedef struct xfrhandler_t
pthread_mutex_t mx; /*!< \brief Tasks synchronisation. */
knot_nameserver_t *ns;
dt_unit_t *unit; /*!< \brief Threading unit. */
xfrworker_t workers[]; /*!< \brief Workers. */
} xfrhandler_t;
/*!
......
......@@ -803,7 +803,7 @@ static knot_zonedb_t *load_zonedb(knot_nameserver_t *ns, const conf_t *conf)
/* Initialize threads. */
size_t thread_count = MIN(conf->zones_count, dt_optimal_size());
dt_unit_t *unit = NULL;
unit = dt_create_coherent(thread_count, &zone_loader_thread,
unit = dt_create(thread_count, &zone_loader_thread,
&zone_loader_destruct, &ctx);
if (unit != NULL) {
/* Start loading. */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment