Commit 3a921252 authored by Marek Vavrusa's avatar Marek Vavrusa

Updated to the new fdset API and implemented precise pending xfer counter.

refs #71, #65
parent b856c6b1
......@@ -17,6 +17,7 @@
#include <config.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "common/fdset.h"
#include "common.h"
......@@ -50,12 +51,20 @@ static int fdset_resize(fdset_t *set, unsigned size)
int fdset_init(fdset_t *set, unsigned size)
{
if (set == NULL) {
return KNOT_EINVAL;
}
memset(set, 0, sizeof(fdset_t));
return fdset_resize(set, size);
}
int fdset_clear(fdset_t* set)
{
if (set == NULL) {
return KNOT_EINVAL;
}
free(set->ctx);
free(set->pfd);
free(set->tmout);
......@@ -79,8 +88,10 @@ int fdset_add(fdset_t *set, int fd, unsigned events, void *ctx)
set->pfd[i].events = events;
set->pfd[i].revents = 0;
set->ctx[i] = ctx;
set->tmout[i] = 0;
return KNOT_EOK;
/* Return index to this descriptor. */
return i;
}
int fdset_remove(fdset_t *set, unsigned i)
......@@ -94,8 +105,8 @@ int fdset_remove(fdset_t *set, unsigned i)
/* Nothing else if it is the last one.
* Move last -> i if some remain. */
if (set->n > 0) {
unsigned last = set->n; /* Already decremented */
unsigned last = set->n; /* Already decremented */
if (i < last) {
set->pfd[i] = set->pfd[last];
set->tmout[i] = set->tmout[last];
set->ctx[i] = set->ctx[last];
......
......@@ -31,6 +31,7 @@
#include <stddef.h>
#include <poll.h>
#include <sys/time.h>
#include <signal.h>
#define FDSET_INIT_SIZE 256 /* Resize step. */
......
......@@ -327,7 +327,7 @@ int main(int argc, char **argv)
pthread_sigmask(SIG_BLOCK, &sa.sa_mask, NULL);
/* Bind to control interface. */
uint8_t buf[65535]; /*! \todo #2035 should be on heap */
uint8_t buf[SOCKET_MTU_SZ];
size_t buflen = sizeof(buf);
int remote = -1;
if (conf()->ctl.iface != NULL) {
......
......@@ -629,18 +629,15 @@ int server_conf_hook(const struct conf_t *conf, void *data)
return ret;
}
ref_t *server_set_ifaces(server_t *s, fdset_t **fds, int *count, int type)
ref_t *server_set_ifaces(server_t *s, fdset_t *fds, int type)
{
iface_t *i = NULL;
*count = 0;
rcu_read_lock();
fdset_destroy(*fds);
*fds = fdset_new();
fdset_clear(fds);
if (s->ifaces) {
WALK_LIST(i, s->ifaces->l) {
fdset_add(*fds, i->fd[type], OS_EV_READ);
*count += 1;
fdset_add(fds, i->fd[type], POLLIN, NULL);
}
}
......
......@@ -236,11 +236,10 @@ int server_conf_hook(const struct conf_t *conf, void *data);
* \brief Update fdsets from current interfaces list.
* \param s Server.
* \param fds Filedescriptor set.
* \param count Number of ifaces (will be set to N).
* \param type I/O type (UDP/TCP).
* \return new interface list
*/
ref_t *server_set_ifaces(server_t *s, fdset_t **fds, int *count, int type);
ref_t *server_set_ifaces(server_t *s, fdset_t *fds, int type);
#endif // _KNOTD_SERVER_H_
......
......@@ -39,9 +39,7 @@
#include "common/sockaddr.h"
/*! \brief Socket-related constants. */
typedef enum {
SOCKET_MTU_SZ = 65535, /*!< Maximum MTU size. */
} socket_const_t;
#define SOCKET_MTU_SZ 65535 /*!< Maximum MTU size. */
/*!
* \brief Create socket.
......
......@@ -39,13 +39,10 @@
#include "libknot/nameserver/name-server.h"
#include "libknot/util/wire.h"
/* Defines */
#define TCP_BUFFER_SIZE 65535 /*! Do not change, as it is used for maximum DNS/TCP packet size. */
/*! \brief TCP worker data. */
typedef struct tcp_worker_t {
iohandler_t *ioh; /*!< Shortcut to I/O handler. */
fdset_t *fdset; /*!< File descriptor set. */
fdset_t set; /*!< File descriptor set. */
int pipe[2]; /*!< Master-worker signalization pipes. */
} tcp_worker_t;
......@@ -83,17 +80,19 @@ static int tcp_reply(int fd, uint8_t *qbuf, size_t resp_len)
}
/*! \brief Sweep TCP connection. */
static void tcp_sweep(fdset_t *set, int fd, void* data)
static enum fdset_sweep_state tcp_sweep(fdset_t *set, int i, void *data)
{
UNUSED(data);
assert(set && i < set->n && i >= 0);
int fd = set->pfd[i].fd;
char r_addr[SOCKADDR_STRLEN] = { '\0' };
int r_port = 0;
struct sockaddr_storage addr;
socklen_t len = sizeof(addr);
if (getpeername(fd, (struct sockaddr*)&addr, &len) < 0) {
dbg_net("tcp: sweep getpeername() on invalid socket=%d\n", fd);
return;
return FDSET_SWEEP;
}
/* Translate */
......@@ -111,8 +110,8 @@ static void tcp_sweep(fdset_t *set, int fd, void* data)
log_server_notice("Connection with '%s@%d' was terminated due to "
"inactivity.\n", r_addr, r_port);
fdset_remove(set, fd);
close(fd);
return FDSET_SWEEP;
}
/*!
......@@ -327,30 +326,28 @@ int tcp_accept(int fd)
tcp_worker_t* tcp_worker_create()
{
tcp_worker_t *w = malloc(sizeof(tcp_worker_t));
if (w == NULL) {
dbg_net("tcp: out of memory when creating worker\n");
return NULL;
}
if (w == NULL)
goto cleanup;
/* Create signal pipes. */
memset(w, 0, sizeof(tcp_worker_t));
if (pipe(w->pipe) < 0) {
free(w);
return NULL;
}
if (pipe(w->pipe) < 0)
goto cleanup;
/* Create fdset. */
w->fdset = fdset_new();
if (!w->fdset) {
if (fdset_init(&w->set, FDSET_INIT_SIZE) != KNOT_EOK) {
close(w->pipe[0]);
close(w->pipe[1]);
free(w);
return NULL;
goto cleanup;
}
fdset_add(w->fdset, w->pipe[0], OS_EV_READ);
fdset_add(&w->set, w->pipe[0], POLLIN, NULL);
return w;
/* Cleanup */
cleanup:
free(w);
return NULL;
}
void tcp_worker_free(tcp_worker_t* w)
......@@ -359,8 +356,8 @@ void tcp_worker_free(tcp_worker_t* w)
return;
}
/* Destroy fdset. */
fdset_destroy(w->fdset);
/* Clear fdset. */
fdset_clear(&w->set);
/* Close pipe write end and worker. */
close(w->pipe[0]);
......@@ -493,23 +490,22 @@ int tcp_loop_master(dthread_t *thread)
tcp_worker_t **workers = h->data;
/* Prepare structures for bound sockets. */
fdset_it_t it;
fdset_t *fds = NULL;
ref_t *ref = NULL;
int if_cnt = 0;
fdset_t set;
fdset_init(&set, conf()->ifaces_count);
/* Accept connections. */
int id = 0;
dbg_net("tcp: created 1 master with %d workers, backend is '%s' \n",
unit->size - 1, fdset_method());
int id = 0, ret = 0;
dbg_net("tcp: created 1 master with %d workers\n", unit->size - 1);
for(;;) {
/* Check handler state. */
if (knot_unlikely(st->s & ServerReload)) {
st->s &= ~ServerReload;
ref_release(ref);
ref = server_set_ifaces(h->server, &fds, &if_cnt, IO_TCP);
if (if_cnt == 0) break;
ref = server_set_ifaces(h->server, &set, IO_TCP);
if (set.n == 0) /* Terminate on zero interfaces. */
break;
}
/* Check for cancellation. */
......@@ -518,35 +514,34 @@ int tcp_loop_master(dthread_t *thread)
}
/* Wait for events. */
int nfds = fdset_wait(fds, OS_EV_FOREVER);
int nfds = poll(set.pfd, set.n, -1);
if (nfds <= 0) {
if (errno == EINTR) continue;
if (errno == EINTR)
continue;
break;
}
fdset_begin(fds, &it);
while(nfds > 0) {
/* Accept client. */
int client = tcp_accept(it.fd);
if (client > -1) {
/* Add to worker in RR fashion. */
if (write(workers[id]->pipe[1], &client, sizeof(int)) < 0) {
dbg_net("tcp: failed to register fd=%d to worker=%d\n",
client, id);
close(client);
continue;
}
id = get_next_rr(id, unit->size - 1);
}
for (unsigned i = 0; nfds > 0 && i < set.n; ++i) {
/* Skip inactive. */
if (!(set.pfd[i].revents & POLLIN))
continue;
if (fdset_next(fds, &it) != 0) {
break;
}
/* Accept client. */
--nfds; /* One less active event. */
int client = tcp_accept(set.pfd[i].fd);
if (client < 0)
continue;
/* Add to worker in RR fashion. */
id = get_next_rr(id, unit->size - 1);
ret = write(workers[id]->pipe[1], &client, sizeof(int));
if (ret < 0)
close(client);
}
}
dbg_net("tcp: master thread finished\n");
fdset_destroy(fds);
fdset_clear(&set);
ref_release(ref);
return KNOT_EOK;
......@@ -554,18 +549,6 @@ int tcp_loop_master(dthread_t *thread)
int tcp_loop_worker(dthread_t *thread)
{
tcp_worker_t *w = thread->data;
if (!w) {
return KNOT_EINVAL;
}
/* Allocate buffer for requests. */
uint8_t *qbuf = malloc(TCP_BUFFER_SIZE);
if (qbuf == NULL) {
dbg_net("tcp: failed to allocate buffers for TCP worker\n");
return KNOT_EINVAL;
}
/* Drop all capabilities on workers. */
#ifdef HAVE_CAP_NG_H
if (capng_have_capability(CAPNG_EFFECTIVE, CAP_SETPCAP)) {
......@@ -574,25 +557,29 @@ int tcp_loop_worker(dthread_t *thread)
}
#endif /* HAVE_CAP_NG_H */
/* Next sweep time. */
uint8_t *qbuf = malloc(SOCKET_MTU_SZ);
tcp_worker_t *w = thread->data;
if (w == NULL || qbuf == NULL) {
free(qbuf);
return KNOT_EINVAL;
}
/* Accept clients. */
dbg_net("tcp: worker %p started\n", w);
fdset_t *set = &w->set;
timev_t next_sweep;
time_now(&next_sweep);
next_sweep.tv_sec += TCP_SWEEP_INTERVAL;
/* Accept clients. */
dbg_net_verb("tcp: worker %p started\n", w);
for (;;) {
/* Cancellation point. */
if (dt_is_cancelled(thread)) {
if (dt_is_cancelled(thread))
break;
}
/* Wait for events. */
int nfds = fdset_wait(w->fdset, (TCP_SWEEP_INTERVAL * 1000)/2);
if (nfds < 0) {
int nfds = poll(set->pfd, set->n, TCP_SWEEP_INTERVAL * 1000);
if (nfds < 0)
continue;
}
/* Establish timeouts. */
rcu_read_lock();
......@@ -601,57 +588,49 @@ int tcp_loop_worker(dthread_t *thread)
rcu_read_unlock();
/* Process incoming events. */
dbg_net_verb("tcp: worker %p registered %d events\n",
w, nfds);
fdset_it_t it;
fdset_begin(w->fdset, &it);
while(nfds > 0) {
/* Handle incoming clients. */
if (it.fd == w->pipe[0]) {
int client = 0;
if (read(it.fd, &client, sizeof(int)) < 0) {
continue;
}
unsigned i = 0;
while (nfds > 0 && i < set->n) {
if (!(set->pfd[i].revents & set->pfd[i].events)) {
/* Skip inactive. */
++i;
continue;
} else {
/* One less active event. */
--nfds;
}
dbg_net_verb("tcp: worker %p registered "
"client %d\n",
w, client);
fdset_add(w->fdset, client, OS_EV_READ);
fdset_set_watchdog(w->fdset, client, max_hs);
dbg_net("tcp: watchdog for fd=%d set to %ds\n",
client, max_hs);
int fd = set->pfd[i].fd;
if (fd == w->pipe[0]) {
/* Register incoming TCP connection. */
int client, next_id;
if (read(fd, &client, sizeof(int)) == sizeof(int)) {
next_id = fdset_add(set, client, POLLIN, NULL);
fdset_set_tmout(set, next_id, max_hs);
}
} else {
/* Handle other events. */
int ret = tcp_handle(w, it.fd, qbuf,
TCP_BUFFER_SIZE);
/* Process query over TCP. */
int ret = tcp_handle(w, fd, qbuf, SOCKET_MTU_SZ);
if (ret == KNOT_EOK) {
fdset_set_watchdog(w->fdset, it.fd,
max_idle);
dbg_net("tcp: watchdog for fd=%d "
"set to %ds\n",
it.fd, max_idle);
/* Update socket activity timer. */
fdset_set_tmout(set, i, max_idle);
}
/*! \todo Refactor to allow erase on iterator.*/
if (ret == KNOT_ECONNREFUSED) {
fdset_remove(w->fdset, it.fd);
close(it.fd);
break;
fdset_remove(set, i);
close(fd);
continue; /* Stay on the same index. */
}
}
/* Check if next exists. */
if (fdset_next(w->fdset, &it) != 0) {
break;
}
/* Next active. */
++i;
}
/* Sweep inactive. */
timev_t now;
if (time_now(&now) == 0) {
if (now.tv_sec >= next_sweep.tv_sec) {
fdset_sweep(w->fdset, &tcp_sweep, NULL);
fdset_sweep(set, &tcp_sweep, NULL);
memcpy(&next_sweep, &now, sizeof(next_sweep));
next_sweep.tv_sec += TCP_SWEEP_INTERVAL;
}
......@@ -660,7 +639,7 @@ int tcp_loop_worker(dthread_t *thread)
/* Stop whole unit. */
free(qbuf);
dbg_net_verb("tcp: worker %p finished\n", w);
dbg_net("tcp: worker %p finished\n", w);
return KNOT_EOK;
}
......
This diff is collapsed.
......@@ -48,11 +48,6 @@ enum xfrstate_t {
*/
typedef struct xfrworker_t
{
struct {
ahtable_t *t;
fdset_t *fds;
} pool;
unsigned pending;
struct xfrhandler_t *master; /*! \brief Worker master. */
} xfrworker_t;
......@@ -62,6 +57,8 @@ typedef struct xfrworker_t
typedef struct xfrhandler_t
{
list queue;
unsigned pending; /*!< \brief Pending transfers. */
pthread_mutex_t pending_mx;
pthread_mutex_t mx; /*!< \brief Tasks synchronisation. */
knot_nameserver_t *ns;
dt_unit_t *unit; /*!< \brief Threading unit. */
......
......@@ -2495,7 +2495,7 @@ int zones_process_response(knot_nameserver_t *nameserver,
/* No updates available. */
if (ret == 0) {
zones_schedule_refresh(zone, 0);
zones_schedule_refresh(zone, REFRESH_DEFAULT);
rcu_read_unlock();
return KNOT_EUPTODATE;
}
......@@ -2698,8 +2698,7 @@ int zones_ns_conf_hook(const struct conf_t *conf, void *data)
/* REFRESH zones. */
for (unsigned i = 0; i < knot_zonedb_zone_count(ns->zone_db); ++i) {
/* Refresh new slave zones (almost) immediately. */
zones_schedule_refresh(zones[i], tls_rand() * 500 + i/2);
zones_schedule_refresh(zones[i], 0); /* Now. */
zones_schedule_notify(zones[i]);
}
......@@ -3177,7 +3176,7 @@ int zones_schedule_notify(knot_zone_t *zone)
return KNOT_EOK;
}
int zones_schedule_refresh(knot_zone_t *zone, unsigned time)
int zones_schedule_refresh(knot_zone_t *zone, int time)
{
if (!zone || !zone->data) {
return KNOT_EINVAL;
......@@ -3206,18 +3205,17 @@ int zones_schedule_refresh(knot_zone_t *zone, unsigned time)
if (zd->xfr_in.has_master) {
/* Schedule REFRESH timer. */
uint32_t refresh_tmr = time;
if (refresh_tmr == 0) {
if (knot_zone_contents(zone)) {
refresh_tmr = zones_jitter(zones_soa_refresh(zone));
} else {
refresh_tmr = zd->xfr_in.bootstrap_retry;
}
if (time < 0) {
if (knot_zone_contents(zone))
time = zones_jitter(zones_soa_refresh(zone));
else
time = zd->xfr_in.bootstrap_retry;
}
zd->xfr_in.timer = evsched_schedule_cb(sch, zones_refresh_ev,
zone, refresh_tmr);
zone, time);
dbg_zones("zone: REFRESH '%s' set to %u\n",
zd->conf->name, refresh_tmr);
zd->conf->name, time);
zd->xfr_in.state = XFR_SCHED;
}
rcu_read_unlock();
......
......@@ -46,6 +46,10 @@
#define IXFR_DBSYNC_TIMEOUT (60*1000) /*!< Database sync timeout = 60s. */
#define AXFR_BOOTSTRAP_RETRY (30*1000) /*!< Interval between AXFR BS retries. */
enum {
REFRESH_DEFAULT = -1 /* Use time value from zone structure. */
};
/*!
* \brief Zone-related data.
*/
......@@ -302,13 +306,13 @@ int zones_store_and_apply_chgsets(knot_changesets_t *chs,
* REFRESH/RETRY/EXPIRE timers are updated according to SOA.
*
* \param zone Related zone.
* \param time Specific time or 0 for default.
* \param time Specific time or REFRESH_DEFAULT for default.
*
* \retval KNOT_EOK
* \retval KNOT_EINVAL
* \retval KNOT_ERROR
*/
int zones_schedule_refresh(knot_zone_t *zone, unsigned time);
int zones_schedule_refresh(knot_zone_t *zone, int time);
/*!
* \brief Schedule NOTIFY after zone update.
......
......@@ -174,8 +174,8 @@ typedef enum knot_ns_transport {
typedef enum knot_ns_xfr_type_t {
/* DNS events. */
XFR_TYPE_AIN = 0, /*!< AXFR-IN request (start transfer). */
XFR_TYPE_AOUT, /*!< AXFR-OUT request (incoming transfer). */
XFR_TYPE_IIN, /*!< IXFR-IN request (start transfer). */
XFR_TYPE_AOUT, /*!< AXFR-OUT request (incoming transfer). */
XFR_TYPE_IOUT, /*!< IXFR-OUT request (incoming transfer). */
XFR_TYPE_SOA, /*!< Pending SOA request. */
XFR_TYPE_NOTIFY, /*!< Pending NOTIFY query. */
......
......@@ -98,22 +98,21 @@ static int fdset_tests_count(int argc, char *argv[])
static int fdset_tests_run(int argc, char *argv[])
{
diag("fdset: implements '%s'", fdset_method());
/* 1. Create fdset. */
fdset_t *set = fdset_new();
ok(set != 0, "fdset: new");
fdset_t set;
int ret = fdset_init(&set, 32);
ok(ret == 0, "fdset: init");
/* 2. Create pipe. */
int fds[2], tmpfds[2];
int ret = pipe(fds);
ret = pipe(fds);
ok(ret >= 0, "fdset: pipe() works");
ret = pipe(tmpfds);
/* 3. Add fd to set. */
ret = fdset_add(set, fds[0], OS_EV_READ);
ret = fdset_add(&set, fds[0], POLLIN, NULL);
ok(ret == 0, "fdset: add to set works");
fdset_add(set, tmpfds[0], OS_EV_READ);
fdset_add(&set, tmpfds[0], POLLIN, NULL);
/* Schedule write. */
struct timeval ts, te;
......@@ -122,54 +121,44 @@ static int fdset_tests_run(int argc, char *argv[])
pthread_create(&t, 0, thr_action, &fds[1]);
/* 4. Watch fdset. */
ret = fdset_wait(set, OS_EV_FOREVER);
int nfds = poll(set.pfd, set.n, 2000);
gettimeofday(&te, 0);
size_t diff = timeval_diff(&ts, &te);
ok(ret > 0 && diff > 99 && diff < 10000,
"fdset: poll returned events in %zu ms", diff);
ok(nfds > 0 && diff > 99 && diff < 10000,
"fdset: poll returned %d events in %zu ms", nfds, diff);
/* 5. Prepare event set. */
fdset_it_t it;
ret = fdset_begin(set, &it);
ok(ret == 0 && it.fd == fds[0], "fdset: begin is valid, ret=%d", ret);
ok(set.pfd[0].revents & POLLIN, "fdset: pipe is active");
/* 6. Receive data. */
char buf = 0x00;
ret = read(it.fd, &buf, WRITE_PATTERN_LEN);
ok(ret >= 0 && buf == WRITE_PATTERN, "fdset: contains valid data, fd=%d", it.fd);
/* 7. Iterate event set. */
ret = fdset_next(set, &it);
ok(ret < 0, "fdset: boundary check works");
ret = read(set.pfd[0].fd, &buf, WRITE_PATTERN_LEN);
ok(ret >= 0 && buf == WRITE_PATTERN, "fdset: contains valid data");
/* 8. Remove from event set. */
ret = fdset_remove(set, fds[0]);
/* 7-9. Remove from event set. */
ret = fdset_remove(&set, 0);
ok(ret == 0, "fdset: remove from fdset works");
close(fds[0]);
close(fds[1]);
ret = fdset_remove(set, tmpfds[0]);
ret = fdset_remove(&set, 0);
close(tmpfds[1]);
close(tmpfds[1]);
/* 9. Poll empty fdset. */
ret = fdset_wait(set, OS_EV_FOREVER);
ok(ret <= 0, "fdset: polling empty fdset returns -1 (ret=%d)", ret);
ok(ret == 0, "fdset: remove from fdset works (2)");
ret = fdset_remove(&set, 0);
ok(ret != 0, "fdset: removing nonexistent item");
/* 10. Crash test. */
lives_ok({
fdset_destroy(0);
fdset_add(0, -1, 0);
fdset_remove(0, -1);
fdset_wait(0, OS_EV_NOWAIT);
fdset_begin(0, 0);
fdset_end(0, 0);
fdset_next(0, 0);
fdset_method();
fdset_init(0, 0);
fdset_add(0, 1, 1, 0);
fdset_add(0, 0, 1, 0);
fdset_remove(0, 1);
fdset_remove(0, 0);
}, "fdset: crash test successful");
/* 11. Destroy fdset. */
ret = fdset_destroy(set);
ret = fdset_clear(&set);
ok(ret == 0, "fdset: destroyed");
/* Cleanup. */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment