Commit 5bff3e5f authored by Marek Vavrusa's avatar Marek Vavrusa

Event queue implementation based on fifo and pselect().

Commit refs #541.
parent b140737b
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include "common.h"
#include "lib/evqueue.h"
static inline int evqueue_lock(evqueue_t *q)
{
return pthread_mutex_lock(&q->mx);
}
static inline int evqueue_unlock(evqueue_t *q)
{
return pthread_mutex_unlock(&q->mx);
}
evqueue_t *s_evqueue = 0;
evqueue_t *evqueue_new()
{
evqueue_t* q = malloc(sizeof(evqueue_t));
if (evqueue_init(q) < 0) {
/* Initialize fds. */
if (pipe(q->fds) < 0) {
free(q);
q = 0;
}
......@@ -25,105 +20,71 @@ evqueue_t *evqueue_new()
return q;
}
int evqueue_init(evqueue_t *q)
void evqueue_free(evqueue_t **q)
{
/* Initialize queue. */
init_list(&q->q);
/* Initialize synchronisation. */
if (pthread_mutex_init(&q->mx, 0) != 0) {
return -1;
}
if (pthread_cond_init(&q->notify, 0) != 0) {
pthread_mutex_destroy(&q->mx);
return -1;
}
/* Invalidate pointer to queue. */
evqueue_t *eq = *q;
*q = 0;
return 0;
/* Deinitialize. */
close(eq->fds[EVQUEUE_READFD]);
close(eq->fds[EVQUEUE_WRITEFD]);
free(eq);
}
int evqueue_clear(evqueue_t *q)
int evqueue_poll(evqueue_t *q, const sigset_t *sigmask)
{
/* Check. */
if (!q) {
return -1;
}
if (evqueue_lock(q) != 0) {
return -2;
}
int i = 0;
node *n = 0, *nxt = 0;
WALK_LIST_DELSAFE (n, nxt, q->q) {
free(n);
++i;
}
evqueue_unlock(q);
return i;
}
void evqueue_free(evqueue_t **q)
{
/* Invalidate pointer to queue. */
evqueue_t *eq = *q;
*q = 0;
/* Prepare fd set. */
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(q->fds[EVQUEUE_READFD], &rfds);
/* Clear queue. */
evqueue_clear(eq);
/* Wait for events. */
return pselect(q->fds[EVQUEUE_READFD] + 1, &rfds,
0, 0, 0, sigmask);
/* Deinitialize. */
pthread_mutex_destroy(&eq->mx);
pthread_cond_destroy(&eq->notify);
free(eq);
}
void *evqueue_get(evqueue_t *q)
{
void *ret = 0;
/* Lock event queue. */
/* Check. */
if (!q) {
return ret;
return 0;
}
if (evqueue_lock(q) != 0) {
return ret;
}
/* Prepare msg. */
event_t ev;
/* Take first event. */
event_t *ev = (event_t*)HEAD(q->q);
if (ev) {
rem_node((node *)ev);
ret = ev->data;
free(ev);
/* Read data. */
if (read(q->fds[EVQUEUE_READFD], &ev, sizeof(ev)) != sizeof(ev)) {
return 0;
}
/* Unlock and return. */
evqueue_unlock(q);
return ret;
return ev.data;
}
int evqueue_add(evqueue_t *q, void *item)
{
/* Check. */
if (!q) {
return -1;
}
/* Create item. */
event_t *ev = malloc(sizeof(event_t));
ev->data = item;
/* Prepare msg. */
event_t ev;
ev.data = item;
/* Lock event queue. */
if (evqueue_lock(q) != 0) {
free(ev);
return -1;
/* Write data. */
int ret = write(q->fds[EVQUEUE_WRITEFD], &ev, sizeof(ev));
if (ret != sizeof(ev)) {
return -2;
}
/* Insert into queue. */
add_tail(&q->q, (node *)ev);
evqueue_unlock(q);
return 0;
}
......@@ -20,27 +20,94 @@
* \brief Event structure.
*/
typedef struct {
struct node *next, *prev; /* Compatibility with node */
void *data; /*!< Usable data ptr. */
} event_t;
/*!
* \brief Event queue constants.
*/
enum {
EVQUEUE_READFD = 0,
EVQUEUE_WRITEFD = 1
};
/*!
* \brief Event queue structure.
*/
typedef struct {
pthread_mutex_t mx; /*!< Notification mutex. */
pthread_cond_t notify; /*!< Notification condition. */
list q; /*!< Event queue using list. */
int fds[2]; /*!< Read and Write fds. */
} evqueue_t;
/*!
* \brief Create new event queue.
*
* Event queue is thread-safe and POSIX signal-safe.
* It uses piped fds for queueing and pselect(2) to
* wait for events.
*
* \retval New instance on success.
* \retval NULL on error.
*/
evqueue_t *evqueue_new();
int evqueue_init(evqueue_t *q);
/*!
* \brief Deinitialize and free event queue.
*
* \param q Pointer to queue instance.
* \note *q is set to 0.
*/
void evqueue_free(evqueue_t **q);
int evqueue_clear(evqueue_t *q);
/*!
* \brief Poll for new events.
*
* Unblocked signals during polling are specified
* in a sigmask.
*
* \param q Event queue.
* \param timeout Specified timeout. Use NULL for infinite.
* \param sigmask Bitmask of signals to receive.
*
* \retval Number of polled events on success.
* \retval -1 On error or signal interrupt.
*/
int evqueue_poll(evqueue_t *q, const sigset_t *sigmask);
/*!
* \brief Read event from event queue.
*
* \param q Event queue.
* \retval Event data on success.
* \retval NULL on error.
*/
void *evqueue_get(evqueue_t *q);
/*!
* \brief Add event to queue.
*
* \param q Event queue.
* \param item Pointer to event-related data.
* \retval 0 on success.
* \retval <0 on error.
*/
int evqueue_add(evqueue_t *q, void *item);
/* Singleton event queue pointer. */
extern evqueue_t *s_evqueue;
/*!
* \brief Event queue singleton.
*/
static inline evqueue_t *evqueue() {
return s_evqueue;
}
/*!
* \brief Set event queue singleton.
*/
static inline void evqueue_set(evqueue_t *q) {
s_evqueue = q;
}
#endif /* _CUTEDNS_EVQUEUE_H_ */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment