Commit 20759f6a authored by Marek Vavrusa's avatar Marek Vavrusa

Fixed signal handling on BSD, I/O h. interrupts, bugfixes.

Commit refs #831.
parent 0c7b11fd
......@@ -51,8 +51,7 @@ int evqueue_poll(evqueue_t *q, const struct timespec *ts,
FD_SET(q->fds[EVQUEUE_READFD], &rfds);
/* Wait for events. */
int ret = pselect(q->fds[EVQUEUE_READFD] + 1, &rfds,
0, 0, ts, sigmask);
int ret = pselect(q->fds[EVQUEUE_READFD] + 1, &rfds, 0, 0, ts, sigmask);
if (ret < 0) {
return -1;
}
......
......@@ -159,6 +159,14 @@ int main(int argc, char **argv)
}
log_server_info("\n");
// Setup signal blocking
struct sigaction sa_empty;
sa_empty.sa_flags = 0;
sa_empty.sa_handler = interrupt_handle;
sigemptyset(&sa_empty.sa_mask);
sigaction(SIGALRM, &sa_empty, 0); // Use for interrupting I/O
pthread_sigmask(SIG_BLOCK, &sa_empty.sa_mask, NULL);
// Create server instance
char* pidfile = pid_filename();
......@@ -185,10 +193,6 @@ int main(int argc, char **argv)
}
log_server_info("PID stored in %s\n", pidfile);
// Setup signal blocking
sigset_t emptyset;
sigemptyset(&emptyset);
// Setup signal handler
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
......@@ -197,14 +201,12 @@ int main(int argc, char **argv)
sigaction(SIGINT, &sa, NULL);
sigaction(SIGTERM, &sa, NULL);
sigaction(SIGHUP, &sa, NULL);
sigaction(SIGALRM, &sa, NULL); // Interrupt
sa.sa_flags = 0;
sigprocmask(SIG_BLOCK, &sa.sa_mask, NULL);
pthread_sigmask(SIG_BLOCK, &sa.sa_mask, NULL);
/* Run event loop. */
for(;;) {
int ret = evqueue_poll(evqueue(), 0, &emptyset);
int ret = evqueue_poll(evqueue(), 0, &sa_empty.sa_mask);
/* Interrupts. */
/*! \todo More robust way to exit evloop.
......
......@@ -76,7 +76,7 @@ int log_init()
emask |= LOG_MASK(LOG_DEBUG);
ret = log_setup(0);
log_levels_set(LOGT_SYSLOG, LOG_ANY, imask|emask);
log_levels_set(LOGT_SYSLOG, LOG_ANY, emask);
log_levels_set(LOGT_STDERR, LOG_ANY, emask);
log_levels_set(LOGT_STDOUT, LOG_ANY, imask);
......
......@@ -73,16 +73,6 @@ static inline int dt_update_thread(dthread_t *thread, int state)
return KNOT_EOK;
}
/*!
* \brief Thread entrypoint interrupt handler.
*
* Threads shouldn't use global interrupt handler, so this no-op function
* is provided.
*/
static void thread_ep_intr(int s)
{
}
/*!
* \brief Thread entrypoint function.
*
......@@ -108,12 +98,13 @@ static void *thread_ep(void *data)
return 0;
}
// Register service and signal handler
struct sigaction sa;
sa.sa_handler = thread_ep_intr;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SIGALRM, &sa, 0);
// Ignore specific signals (except SIGALRM)
sigset_t ignset;
sigemptyset(&ignset);
sigaddset(&ignset, SIGINT);
sigaddset(&ignset, SIGTERM);
sigaddset(&ignset, SIGHUP);
pthread_sigmask(SIG_IGN, &ignset, 0);
debug_dt("dthreads: [%p] entered ep\n", thread);
......@@ -169,7 +160,7 @@ static void *thread_ep(void *data)
// Wait for notification from unit
debug_dt("dthreads: [%p] going idle\n", thread);
pthread_cond_wait(&unit->_notify, &unit->_notify_mx);
int ret = pthread_cond_wait(&unit->_notify, &unit->_notify_mx);
pthread_mutex_unlock(&unit->_notify_mx);
debug_dt("dthreads: [%p] resumed from idle\n", thread);
} else {
......
......@@ -136,13 +136,13 @@ static int server_init_iface(iface_t *new_if, conf_iface_t *cfg_if)
new_if->fd[UDP_ID] = sock;
new_if->type[UDP_ID] = cfg_if->family;
/* Set socket options. */
/* Set socket options - voluntary. */
if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &snd_opt, sizeof(snd_opt)) < 0) {
log_server_warning("Failed to configure socket "
"write buffers.\n");
// log_server_warning("Failed to configure socket "
// "write buffers.\n");
}
if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &opt, sizeof(opt)) < 0) {
log_server_warning("Failed to configure socket read buffers.\n");
// log_server_warning("Failed to configure socket read buffers.\n");
}
/* Create TCP socket. */
......@@ -419,6 +419,7 @@ iohandler_t *server_create_handler(server_t *server, int fd, dt_unit_t *unit)
handler->unit = unit;
handler->iface = 0;
handler->data = 0;
handler->interrupt = 0;
// Update unit data object
for (int i = 0; i < unit->size; ++i) {
......@@ -585,6 +586,11 @@ void server_stop(server_t *server)
WALK_LIST(h, server->handlers) {
h->state = ServerIdle;
dt_stop(h->unit);
// Call interrupt handler
if (h->interrupt) {
h->interrupt(h);
}
}
/* Unlock RCU. */
......
......@@ -47,6 +47,7 @@ typedef struct iohandler_t {
struct iface_t *iface; /*!< Reference to associated interface. */
struct server_t *server; /*!< Reference to server */
void *data; /*!< Persistent data for I/O handler. */
void (*interrupt)(struct iohandler_t *h); /*!< Interrupt handler. */
} iohandler_t;
......
......@@ -23,10 +23,10 @@
/*! \brief TCP connection. */
typedef struct tcp_io_t {
ev_io io;
ns_nameserver_t *ns; /* reference to name server */
iohandler_t *io_h; /* master I/O handler */
xfrhandler_t *xfr_h; /* XFR handler */
stat_t *stat; /* statistics gatherer */
ns_nameserver_t *ns; /*!< Name server */
iohandler_t *io_h; /*!< Master I/O handler. */
xfrhandler_t *xfr_h; /*!< XFR handler. */
stat_t *stat; /*!< Statistics gatherer */
} tcp_io_t;
/*
......@@ -123,32 +123,38 @@ static inline int tcp_recv(int fd, uint8_t *buf, size_t len, sockaddr_t *addr)
return n;
}
#if 0
/*!
* \brief TCP event handler function.
*
* Handle single TCP event.
*
* \param pool Associated connection pool.
* \param fd Associated socket.
* \param qbuf Buffer for a query wireformat.
* \param qbuf_maxlen Buffer maximum size.
* \param w Associated I/O event.
* \param revents Returned events.
*/
static inline int tcp_handle(tcp_pool_t *pool, int fd,
uint8_t *qbuf, size_t qbuf_maxlen)
static void tcp_handle(struct ev_loop *loop, ev_io *w, int revents)
{
tcp_io_t *tcp_w = (tcp_io_t *)w;
/* Check address type. */
sockaddr_t addr;
if (sockaddr_init(&addr, pool->io_h->type) != KNOT_EOK) {
if (sockaddr_init(&addr, tcp_w->io_h->type) != KNOT_EOK) {
log_server_error("Socket type %d is not supported, "
"IPv6 support is probably disabled.\n",
pool->io_h->type);
return KNOT_ENOTSUP;
tcp_w->io_h->type);
//! return KNOT_ENOTSUP;
return;
}
/* Receive data. */
int n = tcp_recv(fd, qbuf, qbuf_maxlen, &addr);
uint8_t qbuf[65535]; /*! \todo This may be problematic. */
size_t qbuf_maxlen = sizeof(qbuf);
int n = tcp_recv(w->fd, qbuf, qbuf_maxlen, &addr);
if (n <= 0) {
return KNOT_ERROR;
//! return KNOT_ERROR;
debug_net("tcp: client disconnected\n");
ev_io_stop(loop, w);
free(tcp_w);
return;
}
/* Parse query. */
......@@ -162,9 +168,10 @@ static inline int tcp_handle(tcp_pool_t *pool, int fd,
dnslib_packet_new(DNSLIB_PACKET_PREALLOC_QUERY);
if (packet == NULL) {
uint16_t pkt_id = dnslib_wire_get_id(qbuf);
ns_error_response(pool->ns, pkt_id, DNSLIB_RCODE_SERVFAIL,
ns_error_response(tcp_w->ns, pkt_id, DNSLIB_RCODE_SERVFAIL,
qbuf, &resp_len);
return KNOT_ENOMEM;
//! return KNOT_ENOMEM;
return;
}
int res = ns_parse_packet(qbuf, n, packet, &qtype);
......@@ -173,32 +180,34 @@ static inline int tcp_handle(tcp_pool_t *pool, int fd,
/* Send error response on dnslib RCODE. */
if (res > 0) {
uint16_t pkt_id = dnslib_wire_get_id(qbuf);
ns_error_response(pool->ns, pkt_id, res,
ns_error_response(tcp_w->ns, pkt_id, res,
qbuf, &resp_len);
}
// dnslib_response_free(&resp);
dnslib_packet_free(&packet);
return res;
//! return res;
return;
}
/* Handle query. */
ns_xfr_t xfr;
switch(qtype) {
case DNSLIB_QUERY_NORMAL:
res = ns_answer_normal(pool->ns, packet, qbuf, &resp_len);
res = ns_answer_normal(tcp_w->ns, packet, qbuf, &resp_len);
break;
case DNSLIB_QUERY_AXFR:
xfr.query = packet;
xfr.send = tcp_send;
xfr.session = fd;
xfr.session = w->fd;
xfr.response_wire = 0;
xfr.rsize = 0;
memcpy(&xfr.from, &addr, sizeof(sockaddr_t));
xfr_request(pool->xfr_h, &xfr);
xfr_request(tcp_w->xfr_h, &xfr);
debug_net("tcp: enqueued AXFR request size %zd.\n",
resp_len);
return KNOT_EOK;
//! return KNOT_EOK;
return;
case DNSLIB_QUERY_IXFR:
case DNSLIB_QUERY_NOTIFY:
case DNSLIB_QUERY_UPDATE:
......@@ -213,7 +222,7 @@ static inline int tcp_handle(tcp_pool_t *pool, int fd,
/* Send answer. */
if (res == KNOT_EOK) {
assert(resp_len > 0);
res = tcp_send(fd, qbuf, resp_len);
res = tcp_send(w->fd, qbuf, resp_len);
/* Check result. */
if (res != (int)resp_len) {
......@@ -223,9 +232,10 @@ static inline int tcp_handle(tcp_pool_t *pool, int fd,
}
}
return res;
//! return res;
return;
}
#if 0
/*!
* \brief Create new TCP pool.
*
......@@ -465,9 +475,43 @@ static int tcp_pool(dthread_t *thread)
}
#endif
static void tcp_accept(EV_P_ ev_io *w, int revents)
static void tcp_accept(struct ev_loop *loop, ev_io *w, int revents)
{
tcp_io_t *tcp_w = (tcp_io_t *)w;
/* Accept incoming connection. */
debug_net("tcp: accepting connection on fd = %d\n", w->fd);
int incoming = accept(w->fd, 0, 0);
/* Evaluate connection. */
if (incoming < 0) {
if (errno != EINTR) {
log_server_error("Cannot accept connection "
"(%d).\n", errno);
}
} else {
/*! \todo Store references to pending connections! */
tcp_io_t *conn = malloc(sizeof(tcp_io_t));
conn->ns = tcp_w->ns;
conn->stat = tcp_w->stat;
conn->xfr_h = tcp_w->xfr_h;
conn->io_h = tcp_w->io_h;
/* Register connection. */
ev_io_init((ev_io *)conn, tcp_handle, incoming, EV_READ);
ev_io_start(loop, (ev_io *)conn);
}
}
static void tcp_interrupt(iohandler_t *h)
{
/*! \todo Using default loop is sub-optimal solution. */
ev_io *w = (ev_io *)h->data;
struct ev_loop *loop = ev_default_loop(0);
/* Stop master socket watcher. */
ev_io_stop(loop, w);
ev_unloop(loop, EVUNLOOP_ALL);
}
/*
......@@ -478,6 +522,7 @@ int tcp_master(dthread_t *thread)
{
dt_unit_t *unit = thread->unit;
iohandler_t *handler = (iohandler_t *)thread->data;
int master_sock = handler->fd;
/* Check socket. */
......@@ -489,18 +534,31 @@ int tcp_master(dthread_t *thread)
debug_dt("dthreads: [%p] is TCP master, state: %d\n",
thread, thread->state);
/* Trim other threads. */
/*! \todo Multithreaded event-loop handling. */
if (unit->size > 1) {
dt_resize(unit, 1);
}
/* Create event loop. */
struct ev_loop *loop = ev_default_loop(0);
/* Install interrupt handler. */
handler->interrupt = tcp_interrupt;
/* Watch bound socket for incoming connections. */
tcp_io_t tcp_w;
ev_io_init((ev_io *)&tcp_w, tcp_accept, master_sock, EV_READ);
ev_io_start(loop, (ev_io *)&tcp_w);
tcp_io_t *tcp_w = malloc(sizeof(tcp_io_t));
tcp_w->io_h = handler;
tcp_w->ns = handler->server->nameserver;
tcp_w->stat = 0; //!< \todo Implement stat.
tcp_w->xfr_h = handler->server->xfr_h;
ev_io_init((ev_io *)tcp_w, tcp_accept, master_sock, EV_READ);
ev_io_start(loop, (ev_io *)tcp_w);
handler->data = tcp_w;
/* Accept clients. */
debug_net("tcp: running 1 master with %d pools\n", unit->size - 1);
for (;;) {
// Cancellation point
......@@ -513,51 +571,12 @@ int tcp_master(dthread_t *thread)
/*! \bug Implement cancellation point somehow. */
ev_loop(loop, 0);
}
#if 0
// Accept on master socket
int incoming = accept(master_sock, 0, 0);
// Register to worker
if (incoming < 0) {
if (errno != EINTR) {
log_server_error("Cannot accept connection "
"(%d).\n", errno);
}
} else {
// Select next pool (Round-Robin)
dt_unit_lock(unit);
int pool_count = unit->size - 1;
pool_id = get_next_rr(pool_id, pool_count);
dthread_t *t = unit->threads[pool_id + 1];
// Allocate new pool if needed
if (t->run != &tcp_pool) {
dt_repurpose(t, &tcp_pool, tcp_pool_new(handler));
debug_dt("dthreads: [%p] repurposed "
"as TCP pool\n", t);
}
// Add incoming socket to selected pool
tcp_pool_t *pool = (tcp_pool_t *)t->_adata;
tcp_pool_lock(pool);
debug_net("tcp_master: accept: assigned socket %d "
"to pool #%d\n",
incoming, pool_id);
if (tcp_pool_add(pool, incoming, EPOLLIN) == 0) {
++pool->evcount;
}
// Activate pool
dt_activate(t);
tcp_pool_unlock(pool);
dt_unit_unlock(unit);
}
}
#endif
// Stop whole unit
debug_net("tcp: stopping (%d master, %d pools)\n", 1, unit->size - 1);
handler->data = 0;
free(tcp_w);
return KNOT_EOK;
}
......@@ -16,7 +16,8 @@
int main(int argc, char *argv[])
{
// Open log
log_init(LOG_UPTO(LOG_ERR), LOG_MASK(LOG_ERR) | LOG_MASK(LOG_WARNING));
log_init();
log_levels_set(LOGT_SYSLOG, LOG_ANY, 0);
// Build test set
unit_api *tests[] = {
......
......@@ -51,10 +51,14 @@
#endif
/* Eliminate compiler warning with unused parameters. */
#ifndef UNUSED
#define UNUSED(param) (param) = (param)
#endif
#ifndef ERR_ALLOC_FAILED
#define ERR_ALLOC_FAILED fprintf(stderr, "Allocation failed at %s:%d\n", \
__FILE__, __LINE__)
#endif
/*!
* \brief Return data of raw data item.
......@@ -1216,8 +1220,9 @@ uint16_t * zparser_conv_apl_rdata(char *str)
return NULL;
} else if (rc == -1) {
char ebuf[256];
strerror_r(errno, ebuf, sizeof(ebuf));
fprintf(stderr, "inet_pton failed: %s",
strerror_r(errno, ebuf, sizeof(ebuf)));
ebuf);
return NULL;
}
......@@ -1815,8 +1820,9 @@ int zone_read(const char *name, const char *zonefile, const char *outfile,
assert(origin_node->parent == NULL);
if (!zone_open(zonefile, 3600, DNSLIB_CLASS_IN, origin_node)) {
strerror_r(errno, ebuf, sizeof(ebuf));
fprintf(stderr, "Cannot open '%s': %s.",
zonefile, strerror_r(errno, ebuf, sizeof(ebuf)));
zonefile, ebuf);
return KNOT_ZCOMPILE_EZONEINVAL;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment