Commit f3737eab authored by Daniel Salzman's avatar Daniel Salzman

server: unify function arguments, separate TCP and UDP workers reconfiguration

parent 465e18fa
/* Copyright (C) 2011 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
/* Copyright (C) 2015 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -14,24 +14,18 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <urcu.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <errno.h>
#include <assert.h>
#include <urcu.h>
#include "libknot/errcode.h"
#include "knot/common/log.h"
#include "knot/server/server.h"
#include "knot/server/udp-handler.h"
#include "knot/server/tcp-handler.h"
#include "knot/conf/conf.h"
#include "knot/worker/pool.h"
#include "knot/zone/timers.h"
#include "knot/zone/zonedb-load.h"
#include "knot/worker/pool.h"
#include "contrib/net.h"
#include "contrib/sockaddr.h"
#include "contrib/trim.h"
......@@ -272,7 +266,8 @@ static int reconfigure_sockets(conf_t *conf, server_t *s)
/* Create new interface. */
m = malloc(sizeof(iface_t));
if (server_init_iface(m, &addr, s->handler[IO_UDP].unit->size) < 0) {
unsigned size = s->handlers[IO_UDP].handler.unit->size;
if (server_init_iface(m, &addr, size) < 0) {
free(m);
m = 0;
}
......@@ -290,9 +285,9 @@ static int reconfigure_sockets(conf_t *conf, server_t *s)
/* Wait for readers that are reconfiguring right now. */
/*! \note This subsystem will be reworked in #239 */
for (unsigned proto = IO_UDP; proto <= IO_TCP; ++proto) {
dt_unit_t *tu = s->handler[proto].unit;
iohandler_t *ioh = &s->handler[proto];
for (unsigned i = IO_UDP; i <= IO_TCP; ++i) {
dt_unit_t *tu = s->handlers[i].handler.unit;
iohandler_t *ioh = &s->handlers[i].handler;
for (unsigned i = 0; i < tu->size; ++i) {
while (ioh->thread_state[i] & ServerReload) {
sleep(1);
......@@ -305,12 +300,12 @@ static int reconfigure_sockets(conf_t *conf, server_t *s)
/* Update TCP+UDP ifacelist (reload all threads). */
unsigned thread_count = 0;
for (unsigned proto = IO_UDP; proto <= IO_TCP; ++proto) {
dt_unit_t *tu = s->handler[proto].unit;
for (unsigned i = IO_UDP; i <= IO_TCP; ++i) {
dt_unit_t *tu = s->handlers[i].handler.unit;
for (unsigned i = 0; i < tu->size; ++i) {
ref_retain((ref_t *)newlist);
s->handler[proto].thread_state[i] |= ServerReload;
s->handler[proto].thread_id[i] = thread_count++;
s->handlers[i].handler.thread_state[i] |= ServerReload;
s->handlers[i].handler.thread_id[i] = thread_count++;
if (s->state & ServerRunning) {
dt_activate(tu->threads[i]);
dt_signalize(tu->threads[i], SIGALRM);
......@@ -384,7 +379,7 @@ static int server_init_handler(server_t *server, int index, int thread_count,
runnable_t runnable, runnable_t destructor)
{
/* Initialize */
iohandler_t *h = &server->handler[index];
iohandler_t *h = &server->handlers[index].handler;
memset(h, 0, sizeof(iohandler_t));
h->server = server;
h->unit = dt_create(thread_count, runnable, destructor, h);
......@@ -427,50 +422,50 @@ static void server_free_handler(iohandler_t *h)
memset(h, 0, sizeof(iohandler_t));
}
int server_start(server_t *s, bool async)
int server_start(server_t *server, bool async)
{
if (s == 0) {
if (server == NULL) {
return KNOT_EINVAL;
}
/* Start workers. */
worker_pool_start(s->workers);
worker_pool_start(server->workers);
/* Wait for enqueued events if not asynchronous. */
if (!async) {
worker_pool_wait(s->workers);
worker_pool_wait(server->workers);
}
/* Start evsched handler. */
evsched_start(&s->sched);
evsched_start(&server->sched);
/* Start I/O handlers. */
int ret = KNOT_EOK;
s->state |= ServerRunning;
if (s->tu_size > 0) {
for (unsigned i = 0; i < IO_COUNT; ++i) {
ret = dt_start(s->handler[i].unit);
server->state |= ServerRunning;
for (int i = IO_UDP; i <= IO_TCP; ++i) {
if (server->handlers[i].size > 0) {
int ret = dt_start(server->handlers[i].handler.unit);
if (ret != KNOT_EOK) {
return ret;
}
}
}
return ret;
return KNOT_EOK;
}
void server_wait(server_t *s)
void server_wait(server_t *server)
{
if (s == NULL) {
if (server == NULL) {
return;
}
evsched_join(&s->sched);
worker_pool_join(s->workers);
evsched_join(&server->sched);
worker_pool_join(server->workers);
if (s->tu_size == 0) {
return;
}
for (unsigned i = 0; i < IO_COUNT; ++i) {
server_free_handler(s->handler + i);
for (int i = IO_UDP; i <= IO_TCP; ++i) {
if (server->handlers[i].size > 0) {
server_free_handler(&server->handlers[i].handler);
}
}
}
......@@ -543,49 +538,42 @@ void server_stop(server_t *server)
server->state &= ~ServerRunning;
}
/*! \brief Reconfigure UDP and TCP query processing threads. */
static int reconfigure_threads(conf_t *conf, server_t *server)
static int reset_handler(server_t *server, int index, unsigned size, runnable_t run)
{
/* Estimate number of threads/manager. */
int ret = KNOT_EOK;
int tu_size = conf_udp_threads(conf);
if ((unsigned)tu_size != server->tu_size) {
if (server->handlers[index].size != size) {
/* Free old handlers */
if (server->tu_size > 0) {
for (unsigned i = 0; i < IO_COUNT; ++i) {
server_free_handler(server->handler + i);
}
if (server->handlers[index].size > 0) {
server_free_handler(&server->handlers[index].handler);
}
/* Initialize I/O handlers. */
ret = server_init_handler(server, IO_UDP, conf_udp_threads(conf),
&udp_master, NULL);
if (ret != KNOT_EOK) {
log_error("failed to create UDP threads (%s)",
knot_strerror(ret));
return ret;
}
/* Create at least CONFIG_XFERS threads for TCP for faster
* processing of massive bootstrap queries. */
ret = server_init_handler(server, IO_TCP, conf_tcp_threads(conf),
&tcp_master, NULL);
int ret = server_init_handler(server, index, size, run, NULL);
if (ret != KNOT_EOK) {
log_error("failed to create TCP threads (%s)",
knot_strerror(ret));
return ret;
}
/* Start if server is running. */
if (server->state & ServerRunning) {
for (unsigned i = 0; i < IO_COUNT; ++i) {
ret = dt_start(server->handler[i].unit);
ret = dt_start(server->handlers[index].handler.unit);
if (ret != KNOT_EOK) {
return ret;
}
}
server->tu_size = tu_size;
server->handlers[index].size = size;
}
return ret;
return KNOT_EOK;
}
/*! \brief Reconfigure UDP and TCP query processing threads. */
static int reconfigure_threads(conf_t *conf, server_t *server)
{
int ret = reset_handler(server, IO_UDP, conf_udp_threads(conf), udp_master);
if (ret != KNOT_EOK) {
return ret;
}
return reset_handler(server, IO_TCP, conf_tcp_threads(conf), tcp_master);
}
static int reconfigure_rate_limits(conf_t *conf, server_t *server)
......@@ -674,10 +662,8 @@ static void reopen_timers_database(conf_t *conf, server_t *server)
}
}
int server_update_zones(conf_t *conf, void *data)
int server_update_zones(conf_t *conf, server_t *server)
{
server_t *server = (server_t *)data;
/* Prevent emitting of new zone events. */
if (server->zone_db) {
knot_zonedb_foreach(server->zone_db, zone_events_freeze);
......@@ -704,33 +690,34 @@ int server_update_zones(conf_t *conf, void *data)
return ret;
}
ref_t *server_set_ifaces(server_t *s, fdset_t *fds, int type, int thread_id)
ref_t *server_set_ifaces(server_t *server, fdset_t *fds, int index, int thread_id)
{
iface_t *i = NULL;
if (server == NULL || server->ifaces == NULL || fds == NULL) {
return NULL;
}
rcu_read_lock();
fdset_clear(fds);
if (s->ifaces) {
WALK_LIST(i, s->ifaces->l) {
iface_t *i = NULL;
WALK_LIST(i, server->ifaces->l) {
#ifdef ENABLE_REUSEPORT
int udp_id = thread_id % i->fd_udp_count;
int udp_id = thread_id % i->fd_udp_count;
#else
int udp_id = 0;
int udp_id = 0;
#endif
switch(type) {
case IO_TCP:
fdset_add(fds, i->fd_tcp, POLLIN, NULL);
break;
case IO_UDP:
fdset_add(fds, i->fd_udp[udp_id], POLLIN, NULL);
break;
default:
assert(0);
}
switch(index) {
case IO_TCP:
fdset_add(fds, i->fd_tcp, POLLIN, NULL);
break;
case IO_UDP:
fdset_add(fds, i->fd_udp[udp_id], POLLIN, NULL);
break;
default:
assert(0);
}
}
rcu_read_unlock();
return (ref_t *)s->ifaces;
return &server->ifaces->ref;
}
/* Copyright (C) 2011 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
/* Copyright (C) 2015 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -14,9 +14,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/*!
* \file server.h
*
* \author Lubos Slovak <lubos.slovak@nic.cz>
* \file
*
* \brief Core server functions.
*
......@@ -31,7 +29,7 @@
#include "sys/socket.h"
#include "libknot/libknot.h"
#include "knot/conf/conf.h"
#include "knot/common/evsched.h"
#include "knot/common/fdset.h"
#include "knot/server/dthreads.h"
......@@ -42,9 +40,7 @@
#include "contrib/ucw/lists.h"
/* Forwad declarations. */
struct iface;
struct server;
struct conf;
/*! \brief I/O handler structure.
*/
......@@ -75,11 +71,10 @@ typedef struct iface {
struct sockaddr_storage addr;
} iface_t;
/* Handler types. */
/* Handler indexes. */
enum {
IO_UDP = 0,
IO_TCP,
IO_COUNT
IO_TCP = 1
};
typedef struct ifacelist {
......@@ -103,8 +98,10 @@ typedef struct server {
knot_db_t *timers_db;
/*! \brief I/O handlers. */
unsigned tu_size;
iohandler_t handler[IO_COUNT];
struct {
unsigned size;
iohandler_t handler;
} handlers[2];
/*! \brief Background jobs. */
worker_pool_t *workers;
......@@ -190,15 +187,17 @@ int server_reconfigure(conf_t *conf, void *data);
*
* \return KNOT_EOK on success or KNOT_ error
*/
int server_update_zones(conf_t *conf, void *data);
int server_update_zones(conf_t *conf, server_t *server);
/*!
* \brief Update fdsets from current interfaces list.
* \param s Server.
* \param fds Filedescriptor set.
* \param type I/O type (UDP/TCP).
*
* \param server Server.
* \param fds File descriptor set.
* \param index I/O index (UDP/TCP).
*
* \return new interface list
*/
ref_t *server_set_ifaces(server_t *s, fdset_t *fds, int type, int thread_id);
ref_t *server_set_ifaces(server_t *server, fdset_t *fds, int index, int thread_id);
/*! @} */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment