Commit 2dcd8511 authored by Marek Vavrusa's avatar Marek Vavrusa

Implemented SOA/NOTIFY timeout using fdset watchdogs.

This prevents potential racing conditions and doesn't require locking,
as it's per worker.

refs #1608
parent bfeeeede
......@@ -325,6 +325,6 @@ int evsched_cancel(evsched_t *s, event_t *ev)
/* Enable running events. */
pthread_mutex_unlock(&s->rl);
return 0;
return found;
}
......@@ -212,7 +212,8 @@ event_t* evsched_schedule_term(evsched_t *s, uint32_t dt);
* \param s Event scheduler.
* \param ev Scheduled event.
*
* \retval 0 on success.
* \retval 0 if already ran.
* \retval 1 if found and cancelled.
* \retval <0 on error.
*/
int evsched_cancel(evsched_t *s, event_t *ev);
......
......@@ -173,7 +173,7 @@ int fdset_set_watchdog(fdset_t* fdset, int fd, int interval)
return 0;
}
int fdset_sweep(fdset_t* fdset, void(*cb)(fdset_t*, int))
int fdset_sweep(fdset_t* fdset, void(*cb)(fdset_t*, int, void*), void *data)
{
fdset_base_t *base = (fdset_base_t*)fdset;
if (base == NULL || base->atimes == NULL) {
......@@ -195,7 +195,7 @@ int fdset_sweep(fdset_t* fdset, void(*cb)(fdset_t*, int))
/* Evaluate */
timev_t *ts = (timev_t*)n->value;
if (ts->tv_sec <= now.tv_sec) {
cb(fdset, (int)(((ssize_t)n->key)));
cb(fdset, (int)(((ssize_t)n->key)), data);
++sweeped;
}
n = pnext;
......
......@@ -224,11 +224,12 @@ int fdset_set_watchdog(fdset_t* fdset, int fd, int interval);
*
* \param fdset Target set.
* \param cb Callback for sweeped descriptors.
* \param data Custom data for sweep operation.
*
* \retval number of sweeped descriptors.
* \retval -1 on errors.
*/
int fdset_sweep(fdset_t* fdset, void(*cb)(fdset_t* set, int fd));
int fdset_sweep(fdset_t* fdset, void(*cb)(fdset_t*, int, void*), void *data);
#endif /* _KNOTD_FDSET_H_ */
......
......@@ -128,6 +128,17 @@ typedef unsigned int uint; /*!< \brief Unsigned. */
} while (0)
#endif
/* Workarounds for clock_gettime() not available on some platforms. */
#ifdef HAVE_CLOCK_GETTIME
#define time_now(x) clock_gettime(CLOCK_MONOTONIC, (x))
typedef struct timespec timev_t;
#elif HAVE_GETTIMEOFDAY
#define time_now(x) gettimeofday((x), NULL)
typedef struct timeval timev_t;
#else
#error Neither clock_gettime() nor gettimeofday() found. At least one is required.
#endif
#endif /* _KNOTD_COMMON_H_ */
/*! @} */
......@@ -40,18 +40,6 @@
#include "libknot/util/error.h"
#include "libknot/util/wire.h"
/* Workarounds for clock_gettime() not available on some platforms. */
#ifdef HAVE_CLOCK_GETTIME
#define time_now(x) clock_gettime(CLOCK_MONOTONIC, (x))
typedef struct timespec timev_t;
#elif HAVE_GETTIMEOFDAY
#define time_now(x) gettimeofday((x), NULL)
typedef struct timeval timev_t;
#else
#error Neither clock_gettime() nor gettimeofday() found. At least one is required.
#endif
/* Defines */
#define TCP_BUFFER_SIZE 65535 /*! Do not change, as it is used for maximum DNS/TCP packet size. */
......@@ -108,7 +96,10 @@ static int tcp_reply(int fd, uint8_t *qbuf, size_t resp_len)
}
/*! \brief Sweep TCP connection. */
static void tcp_sweep(fdset_t *set, int fd) {
static void tcp_sweep(fdset_t *set, int fd, void* data)
{
UNUSED(data);
char r_addr[SOCKADDR_STRLEN] = { '\0' };
int r_port = 0;
struct sockaddr_storage addr;
......@@ -591,7 +582,7 @@ int tcp_loop_worker(dthread_t *thread)
timev_t now;
if (time_now(&now) == 0) {
if (now.tv_sec >= next_sweep.tv_sec) {
fdset_sweep(w->fdset, &tcp_sweep);
fdset_sweep(w->fdset, &tcp_sweep, NULL);
memcpy(&next_sweep, &now, sizeof(next_sweep));
next_sweep.tv_sec += TCP_SWEEP_INTERVAL;
}
......
......@@ -42,6 +42,8 @@
#include "common/prng.h"
/* Constants */
#define XFR_QUERY_WD 10 /*!< SOA/NOTIFY query timeout [s]. */
#define XFR_SWEEP_INTERVAL 2 /*! [seconds] between sweeps. */
#define XFR_BUFFER_SIZE 65535 /*! Do not change this - maximum value for UDP packet length. */
void xfr_interrupt(xfrhandler_t *h)
......@@ -59,24 +61,72 @@ static void xfr_request_deinit(knot_ns_xfr_t *r)
}
}
/*!
* \brief SOA query timeout handler.
*/
static int xfr_udp_timeout(event_t *e)
/*! \todo Document me (issue #1586) */
static void xfr_free_task(knot_ns_xfr_t *task)
{
knot_ns_xfr_t *data = (knot_ns_xfr_t *)e->data;
if (!data) {
return KNOTD_EINVAL;
if (!task) {
return;
}
xfrworker_t *w = (xfrworker_t *)task->owner;
if (!w) {
free(task);
return;
}
/* Remove reference to this event. */
if (data->zone != NULL) {
zonedata_t *zd = (zonedata_t *)knot_zone_data(data->zone);
if (zd != NULL && zd->soa_pending == e) {
zd->soa_pending = NULL;
/* Remove from fdset. */
if (w->fdset) {
dbg_xfr("xfr_free_task: freeing fd=%d.\n", task->session);
fdset_remove(w->fdset, task->session);
}
/* Unlock if XFR/IN.*/
if (task->type == XFR_TYPE_AIN || task->type == XFR_TYPE_IIN) {
knot_zone_t *zone = task->zone;
zonedata_t *zd = (zonedata_t *)knot_zone_data(zone);
if (zd) {
zd->xfr_in.wrkr = 0;
pthread_mutex_unlock(&zd->xfr_in.lock);
}
}
/* Remove fd-related data. */
xfrhandler_t *h = w->master;
pthread_mutex_lock(&h->tasks_mx);
skip_remove(h->tasks, (void*)((size_t)task->session), 0, 0);
pthread_mutex_unlock(&h->tasks_mx);
/* Deinitialize */
xfr_request_deinit(task);
close(task->session);
free(task);
}
static knot_ns_xfr_t *xfr_handler_task(xfrworker_t *w, int fd)
{
xfrhandler_t *h = w->master;
pthread_mutex_lock(&h->tasks_mx);
knot_ns_xfr_t *data = skip_find(h->tasks, (void*)((size_t)fd));
pthread_mutex_unlock(&h->tasks_mx);
if (data == NULL) {
dbg_xfr_verb("xfr: worker=%p processing event on "
"fd=%d got empty data.\n",
w, fd);
fdset_remove(w->fdset, fd);
close(fd); /* Always dup()'d or created. */
return NULL;
}
return data;
}
/*!
* \brief SOA query timeout handler.
*/
static int xfr_udp_timeout(knot_ns_xfr_t *data)
{
/* Close socket. */
knot_zone_t *z = data->zone;
if (z && knot_zone_get_contents(z) && knot_zone_data(z)) {
......@@ -84,21 +134,13 @@ static int xfr_udp_timeout(event_t *e)
data->msgpref);
}
knot_ns_xfr_t cr = {};
cr.type = XFR_TYPE_CLOSE;
cr.session = data->session;
cr.data = data;
cr.zone = data->zone;
xfrworker_t *w = (xfrworker_t *)data->owner;
if (w) {
evqueue_write(w->q, &cr, sizeof(knot_ns_xfr_t));
}
/* Invalidate pending query. */
xfr_free_task(data);
return KNOTD_EOK;
}
/*!
* \brief Query reponse event handler function.
* \brief Query response event handler function.
*
* Handle single query response event.
*
......@@ -106,7 +148,7 @@ static int xfr_udp_timeout(event_t *e)
* \param w Associated socket watcher.
* \param revents Returned events.
*/
static int xfr_process_udp_query(xfrworker_t *w, int fd, knot_ns_xfr_t *data)
static int xfr_process_udp_resp(xfrworker_t *w, int fd, knot_ns_xfr_t *data)
{
/* Receive msg. */
ssize_t n = recvfrom(data->session, data->wire, data->wire_size, 0, data->addr.ptr, &data->addr.len);
......@@ -114,77 +156,35 @@ static int xfr_process_udp_query(xfrworker_t *w, int fd, knot_ns_xfr_t *data)
if (n > 0) {
udp_handle(fd, data->wire, n, &resp_len, &data->addr, w->ns);
}
if(data->type == XFR_TYPE_SOA && data->zone) {
zonedata_t * zd = (zonedata_t*)knot_zone_data(data->zone);
zd->soa_pending = NULL;
}
/* Disable timeout. */
evsched_t *sched =
((server_t *)knot_ns_get_data(w->ns))->sched;
event_t *ev = (event_t *)data->data;
if (ev) {
dbg_xfr("xfr: cancelling UDP query timeout\n");
evsched_cancel(sched, ev);
ev = (event_t *)data->data;
if (ev) {
evsched_event_free(sched, ev);
data->data = 0;
}
/* Close after receiving response. */
knot_ns_xfr_t cr = {};
cr.type = XFR_TYPE_CLOSE;
cr.session = data->session;
cr.data = data;
cr.zone = data->zone;
evqueue_write(w->q, &cr, sizeof(knot_ns_xfr_t));
}
xfr_free_task(data);
return KNOTD_EOK;
}
/*! \todo Document me (issue #1586) */
static void xfr_free_task(knot_ns_xfr_t *task)
/*! \brief Sweep non-replied connection. */
static void xfr_sweep(fdset_t *set, int fd, void *data)
{
if (!task) {
return;
}
dbg_xfr("xfr: sweeping fd=%d\n", fd);
xfrworker_t *w = (xfrworker_t *)task->owner;
if (!w) {
free(task);
if (!set || !data) {
dbg_xfr("xfr: invalid sweep operation on NULL worker or set\n");
return;
}
/* Remove from fdset. */
if (w->fdset) {
dbg_xfr("xfr_free_task: freeing fd=%d.\n", task->session);
fdset_remove(w->fdset, task->session);
knot_ns_xfr_t *t = xfr_handler_task((xfrworker_t *)data, fd);
if (!t) {
dbg_xfr("xfr: NULL data to sweep\n");
}
/* Unlock if XFR/IN.*/
if (task->type == XFR_TYPE_AIN || task->type == XFR_TYPE_IIN) {
knot_zone_t *zone = task->zone;
zonedata_t *zd = (zonedata_t *)knot_zone_data(zone);
if (zd) {
zd->xfr_in.wrkr = 0;
pthread_mutex_unlock(&zd->xfr_in.lock);
}
/* Skip non-sweepable types. */
switch(t->type) {
case XFR_TYPE_SOA:
case XFR_TYPE_NOTIFY:
xfr_udp_timeout(t);
break;
default:
dbg_xfr("xfr: sweep request on unsupported type\n");
break;
}
/* Remove fd-related data. */
xfrhandler_t *h = w->master;
pthread_mutex_lock(&h->tasks_mx);
skip_remove(h->tasks, (void*)((size_t)task->session), 0, 0);
pthread_mutex_unlock(&h->tasks_mx);
/* Deinitialize */
xfr_request_deinit(task);
close(task->session);
free(task);
}
/*! \todo Document me (issue #1586) */
......@@ -593,7 +593,7 @@ int xfr_process_event(xfrworker_t *w, int fd, knot_ns_xfr_t *data, uint8_t *buf,
/* Handle SOA/NOTIFY responses. */
if (data->type == XFR_TYPE_NOTIFY || data->type == XFR_TYPE_SOA) {
return xfr_process_udp_query(w, fd, data);
return xfr_process_udp_resp(w, fd, data);
}
/* Read DNS/TCP packet. */
......@@ -1353,12 +1353,7 @@ static int xfr_process_request(xfrworker_t *w, uint8_t *buf, size_t buflen)
conf_read_lock();
/* Handle request. */
zonedata_t *zd = NULL;
if(xfr.zone != NULL) {
zd = (zonedata_t *)knot_zone_data(xfr.zone);
}
knot_ns_xfr_t *task = NULL;
evsched_t *sch = NULL;
dbg_xfr_verb("xfr: processing request type '%d'\n", xfr.type);
dbg_xfr_verb("xfr: query ptr: %p\n", xfr.query);
switch(xfr.type) {
......@@ -1383,12 +1378,7 @@ static int xfr_process_request(xfrworker_t *w, uint8_t *buf, size_t buflen)
}
/* Add timeout. */
sch = ((server_t *)knot_ns_get_data(w->ns))->sched;
task->data = evsched_schedule_cb(sch, xfr_udp_timeout,
task, SOA_QRY_TIMEOUT);
if (zd && xfr.type == XFR_TYPE_SOA) {
zd->soa_pending = (event_t*)task->data;
}
fdset_set_watchdog(w->fdset, task->session, XFR_QUERY_WD);
log_server_info("%s Query issued.\n", xfr.msgpref);
ret = KNOTD_EOK;
break;
......@@ -1430,6 +1420,10 @@ int xfr_worker(dthread_t *thread)
return KNOTD_ENOMEM;
}
/* Next sweep time. */
timev_t next_sweep;
time_now(&next_sweep);
next_sweep.tv_sec += XFR_SWEEP_INTERVAL;
/* Accept requests. */
int ret = 0;
......@@ -1442,8 +1436,8 @@ int xfr_worker(dthread_t *thread)
}
/* Poll fdset. */
int nfds = fdset_wait(w->fdset, OS_EV_FOREVER);
if (nfds <= 0) {
int nfds = fdset_wait(w->fdset, (XFR_SWEEP_INTERVAL * 1000)/2);
if (nfds < 0) {
continue;
}
......@@ -1453,12 +1447,11 @@ int xfr_worker(dthread_t *thread)
}
/* Iterate fdset. */
xfrhandler_t *h = w->master;
knot_ns_xfr_t *data = 0;
int rfd = evqueue_pollfd(w->q);
fdset_it_t it;
fdset_begin(w->fdset, &it);
while(1) {
while(nfds > 0) {
/* Check if it request. */
if (it.fd == rfd) {
......@@ -1470,16 +1463,8 @@ int xfr_worker(dthread_t *thread)
}
} else {
/* Find data. */
pthread_mutex_lock(&h->tasks_mx);
data = skip_find(h->tasks, (void*)((size_t)it.fd));
pthread_mutex_unlock(&h->tasks_mx);
data = xfr_handler_task(w, it.fd);
if (data == NULL) {
dbg_xfr_verb("xfr: worker=%p processing event on "
"fd=%d got empty data.\n",
w, it.fd);
fdset_remove(w->fdset, it.fd);
close(it.fd); /* Always dup()'d or created. */
/* Next fd. */
if (fdset_next(w->fdset, &it) < 0) {
break;
......@@ -1501,9 +1486,18 @@ int xfr_worker(dthread_t *thread)
break;
}
}
/* Sweep inactive. */
timev_t now;
if (time_now(&now) == 0) {
if (now.tv_sec >= next_sweep.tv_sec) {
fdset_sweep(w->fdset, &xfr_sweep, w);
memcpy(&next_sweep, &now, sizeof(next_sweep));
next_sweep.tv_sec += XFR_SWEEP_INTERVAL;
}
}
}
/* Stop whole unit. */
free(buf);
dbg_xfr_verb("xfr: worker=%p finished.\n", w);
......
......@@ -425,9 +425,11 @@ static int zones_refresh_ev(event_t *e)
pthread_mutex_unlock(&zd->xfr_in.lock);
}
/* Invalidate pending SOA query. */
event_t *soa_pending = zd->soa_pending;
zd->soa_pending = NULL;
/*! \todo Invalidate pending SOA query. */
// if(zd->soa_pending >= 0) {
// dbg_xfr("xfr: closing previous SOA qry fd=%d\n", zd->soa_pending);
// close(zd->soa_pending);
// }
/*! \todo [TSIG] CHANGE!!! only for compatibility now. */
knot_ns_xfr_t xfr_req;
......@@ -470,6 +472,7 @@ static int zones_refresh_ev(event_t *e)
/* Check result. */
if (ret == KNOTD_EOK) {
zd->xfr_in.next_id = knot_wire_get_id(qbuf);
zd->soa_pending = sock;
dbg_zones("zones: expecting SOA response "
"ID=%d for '%s'\n",
zd->xfr_in.next_id, zd->conf->name);
......@@ -512,13 +515,7 @@ static int zones_refresh_ev(event_t *e)
/* Unlock RCU. */
rcu_read_unlock();
/* Close invalidated SOA query. */
evsched_event_finished(e->parent);
if (soa_pending != NULL) {
/* Execute */
evsched_schedule(e->parent, soa_pending, 0);
}
return ret;
}
......
......@@ -42,7 +42,6 @@
#include "libknot/updates/xfr-in.h"
/* Constants. */
#define SOA_QRY_TIMEOUT 10000 /*!< SOA query timeout (ms). */
#define IXFR_DBSYNC_TIMEOUT (60*1000) /*!< Database sync timeout = 60s. */
#define AXFR_BOOTSTRAP_RETRY (60*1000) /*!< Interval between AXFR BS retries. */
......@@ -82,8 +81,8 @@ typedef struct zonedata_t
/*! \brief List of pending NOTIFY events. */
list notify_pending;
/*! \brief List of pending SOA queries. */
struct event_t* soa_pending;
/*! \brief List of fds with pending SOA queries. */
int soa_pending;
/*! \brief Zone IXFR history. */
journal_t *ixfr_db;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment