Commit 9058115a authored by Marek Vavrusa's avatar Marek Vavrusa

Updated event queue with callbacks.

parent 5bff3e5f
...@@ -50,38 +50,32 @@ int evqueue_poll(evqueue_t *q, const sigset_t *sigmask) ...@@ -50,38 +50,32 @@ int evqueue_poll(evqueue_t *q, const sigset_t *sigmask)
} }
void *evqueue_get(evqueue_t *q) int evqueue_get(evqueue_t *q, event_t *ev)
{ {
/* Check. */ /* Check. */
if (!q) { if (!q || !ev) {
return 0; return -1;
} }
/* Prepare msg. */
event_t ev;
/* Read data. */ /* Read data. */
if (read(q->fds[EVQUEUE_READFD], &ev, sizeof(ev)) != sizeof(ev)) { int ret = read(q->fds[EVQUEUE_READFD], ev, sizeof(event_t));
return 0; if (ret != sizeof(event_t)) {
return -2;
} }
return ev.data; return 0;
} }
int evqueue_add(evqueue_t *q, void *item) int evqueue_add(evqueue_t *q, const event_t *ev)
{ {
/* Check. */ /* Check. */
if (!q) { if (!q || !ev) {
return -1; return -1;
} }
/* Prepare msg. */
event_t ev;
ev.data = item;
/* Write data. */ /* Write data. */
int ret = write(q->fds[EVQUEUE_WRITEFD], &ev, sizeof(ev)); int ret = write(q->fds[EVQUEUE_WRITEFD], ev, sizeof(event_t));
if (ret != sizeof(ev)) { if (ret != sizeof(event_t)) {
return -2; return -2;
} }
......
...@@ -16,11 +16,20 @@ ...@@ -16,11 +16,20 @@
#include "common.h" #include "common.h"
#include "lib/lists.h" #include "lib/lists.h"
struct event_t;
/*!
* \brief Event callback.
*/
typedef int (*eventcb_t)(struct event_t *);
/*! /*!
* \brief Event structure. * \brief Event structure.
*/ */
typedef struct { typedef struct event_t {
void *data; /*!< Usable data ptr. */ int code; /*!< Event code. */
void *data; /*!< Usable data ptr. */
eventcb_t cb; /*!< Event callback. */
} event_t; } event_t;
/*! /*!
...@@ -77,20 +86,22 @@ int evqueue_poll(evqueue_t *q, const sigset_t *sigmask); ...@@ -77,20 +86,22 @@ int evqueue_poll(evqueue_t *q, const sigset_t *sigmask);
* \brief Read event from event queue. * \brief Read event from event queue.
* *
* \param q Event queue. * \param q Event queue.
* \retval Event data on success. * \param ev Event structure for writing.
* \retval NULL on error. *
* \retval 0 on success.
* \retval <0 on error.
*/ */
void *evqueue_get(evqueue_t *q); int evqueue_get(evqueue_t *q, event_t *ev);
/*! /*!
* \brief Add event to queue. * \brief Add event to queue.
* *
* \param q Event queue. * \param q Event queue.
* \param item Pointer to event-related data. * \param ev Event structure to read.
* \retval 0 on success. * \retval 0 on success.
* \retval <0 on error. * \retval <0 on error.
*/ */
int evqueue_add(evqueue_t *q, void *item); int evqueue_add(evqueue_t *q, const event_t *ev);
/* Singleton event queue pointer. */ /* Singleton event queue pointer. */
extern evqueue_t *s_evqueue; extern evqueue_t *s_evqueue;
......
...@@ -9,31 +9,29 @@ ...@@ -9,31 +9,29 @@
#include "ctl/process.h" #include "ctl/process.h"
#include "conf/conf.h" #include "conf/conf.h"
#include "conf/logconf.h" #include "conf/logconf.h"
#include "lib/evqueue.h"
/*----------------------------------------------------------------------------*/ /*----------------------------------------------------------------------------*/
static volatile short s_stopping = 0; /* Signal flags. */
static server_t *s_server = NULL; static volatile short sig_req_stop = 0;
static volatile short sig_req_reload = 0;
static volatile short sig_stopping = 0;
// SIGINT signal handler // SIGINT signal handler
void interrupt_handle(int s) void interrupt_handle(int s)
{ {
// Omit other signals
if (s_server == NULL) {
return;
}
// Reload configuration // Reload configuration
if (s == SIGHUP) { if (s == SIGHUP) {
log_server_info("server: fixme: reload configuration.\n"); sig_req_reload = 1;
/// \todo Reload configuration? return;
} }
// Stop server // Stop server
if (s == SIGINT || s == SIGTERM) { if (s == SIGINT || s == SIGTERM) {
if (s_stopping == 0) { if (sig_stopping == 0) {
s_stopping = 1; sig_req_stop = 1;
server_stop(s_server); sig_stopping = 1;
} else { } else {
log_server_error("server: \nOK! OK! Exiting immediately.\n"); log_server_error("server: \nOK! OK! Exiting immediately.\n");
exit(1); exit(1);
...@@ -86,6 +84,9 @@ int main(int argc, char **argv) ...@@ -86,6 +84,9 @@ int main(int argc, char **argv)
} }
} }
// Setup event queue
evqueue_set(evqueue_new());
// Initialize log // Initialize log
log_init(); log_init();
...@@ -139,11 +140,11 @@ int main(int argc, char **argv) ...@@ -139,11 +140,11 @@ int main(int argc, char **argv)
// Create server instance // Create server instance
const char* pidfile = pid_filename(); const char* pidfile = pid_filename();
s_server = server_create(); server_t *server = server_create();
// Run server // Run server
int res = 0; int res = 0;
if ((res = server_start(s_server, zfs, zfs_count)) == 0) { if ((res = server_start(server, zfs, zfs_count)) == 0) {
// Save PID // Save PID
if (daemonize) { if (daemonize) {
...@@ -158,15 +159,25 @@ int main(int argc, char **argv) ...@@ -158,15 +159,25 @@ int main(int argc, char **argv)
} }
} }
// Register service and signal handler // Setup signal blocking
sigset_t emptyset, blockset;
sigemptyset(&emptyset);
sigemptyset(&blockset);
sigaddset(&blockset, SIGINT);
sigaddset(&blockset, SIGTERM);
sigaddset(&blockset, SIGHUP);
sigaddset(&blockset, SIGALRM); // Interrupt
sigprocmask(SIG_BLOCK, &blockset, NULL);
// Setup signal handler
struct sigaction sa; struct sigaction sa;
sa.sa_handler = interrupt_handle; sa.sa_handler = interrupt_handle;
sigemptyset(&sa.sa_mask); sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SIGINT, &sa, NULL); sigaction(SIGINT, &sa, NULL);
sigaction(SIGTERM, &sa, NULL); sigaction(SIGTERM, &sa, NULL);
sigaction(SIGHUP, &sa, NULL); sigaction(SIGHUP, &sa, NULL);
sigaction(SIGALRM, &sa, NULL); // Interrupt sigaction(SIGALRM, &sa, NULL); // Interrupt
sa.sa_flags = 0;
// Change directory if daemonized // Change directory if daemonized
log_server_info("server: Started.\n"); log_server_info("server: Started.\n");
...@@ -176,19 +187,50 @@ int main(int argc, char **argv) ...@@ -176,19 +187,50 @@ int main(int argc, char **argv)
} }
/* Run event loop. */ /* Run event loop. */
fprintf(stderr, "<<in event loop>>\n");
for(;;) {
int ret = evqueue_poll(evqueue(), &emptyset);
/* Interrupts. */
if (ret == -1) {
if (sig_req_stop) {
sig_req_stop = 0;
server_stop(server);
fprintf(stderr, "<<stop req>>\n");
}
if (sig_req_reload) {
sig_req_reload = 0;
//! \todo Reload config.
fprintf(stderr, "<<reload config>>\n");
}
}
/* Events. */
if (ret > 0) {
event_t ev;
if (evqueue_get(evqueue(), &ev) == 0) {
fprintf(stderr, "received new event\n");
if (ev.cb) {
ev.cb(&ev);
}
}
}
}
if ((res = server_wait(s_server)) != 0) { fprintf(stderr, "<<out of event loop>>\n");
if ((res = server_wait(server)) != 0) {
log_server_error("server: An error occured while " log_server_error("server: An error occured while "
"waiting for server to finish.\n"); "waiting for server to finish.\n");
} }
} else { } else {
log_server_fatal("server: An error occured while " log_server_fatal("server: An error occured while "
"starting the server.\n"); "starting the server.\n");
} }
// Stop server and close log // Stop server and close log
server_destroy(&s_server); server_destroy(&server);
// Remove PID file if daemonized // Remove PID file if daemonized
if (daemonize) { if (daemonize) {
...@@ -203,5 +245,9 @@ int main(int argc, char **argv) ...@@ -203,5 +245,9 @@ int main(int argc, char **argv)
log_server_info("server: Shut down.\n"); log_server_info("server: Shut down.\n");
log_close(); log_close();
// Destroy event loop
evqueue_t *q = evqueue();
evqueue_free(&q);
return res; return res;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment