Commit 6d74a924 authored by Marek Vavrusa's avatar Marek Vavrusa

Transition to extensible socket manager.

API consists of compulsory master thread routine and non-compulsory worker routine prototypes.
parent 093da59c
......@@ -41,3 +41,7 @@ src/other/dynamic-array.c
src/other/skip-list.h
src/other/skip-list.c
tests/querytcp.c
src/server/udp-handler.h
src/server/tcp-handler.h
src/server/udp-handler.c
src/server/tcp-handler.c
#include "server.h"
#include "dispatcher.h"
#include "socket-manager.h"
#include "udp-handler.h"
#include "tcp-handler.h"
#include "zone-database.h"
#include "name-server.h"
#include "zone-parser.h"
......@@ -43,8 +43,12 @@ cute_server *cute_create()
debug_server("Done\n\n");
debug_server("Creating Socket Manager structure..\n");
// Create socket handlers
server->manager[UDP] = sm_create(server->nameserver, &udp_master, &udp_worker, DEFAULT_THR_COUNT);
server->manager[TCP] = sm_create(server->nameserver, &tcp_master, &tcp_worker, DEFAULT_THR_COUNT);
// Check socket handlers
for(int i = 0; i < 2; i++) {
server->manager[i] = sm_create(server->nameserver, DEFAULT_THR_COUNT);
if (server->manager[i] == NULL ) {
if(i == 1) {
......@@ -58,10 +62,6 @@ cute_server *cute_create()
}
}
// Register socket handlers
sm_register_handler(server->manager[UDP], &sm_udp_handler);
sm_register_handler(server->manager[TCP], &sm_tcp_handler);
debug_server("Done\n\n");
return server;
......
This diff is collapsed.
......@@ -15,10 +15,13 @@
#include "name-server.h"
#include "dispatcher.h"
//const uint SOCKET_BUFF_SIZE;
/*----------------------------------------------------------------------------*/
typedef enum {
SOCKET_BUFF_SIZE = 4096, /// \todo <= MTU size
DEFAULT_EVENTS_COUNT = 1,
} smconst_t;
typedef enum {
UDP = 0x00,
TCP = 0x01,
......@@ -48,7 +51,7 @@ typedef struct sm_event {
} sm_event;
/** Handler functio proto. */
typedef void (*iohandler_t) (sm_event*);
typedef void* (*sockhandler_t) (void*);
/** Workers descriptor. */
typedef struct sm_worker {
......@@ -61,46 +64,39 @@ typedef struct sm_worker {
pthread_cond_t wakeup;
} sm_worker;
#define next_worker(current, mgr) \
(((current) + 1) % (mgr)->workers_dpt->thread_count)
/** \todo Implement notification via Linux eventfd() instead of is_running.
*/
typedef struct sm_manager {
int epfd;
int fd_count;
volatile short is_running;
iohandler_t handler;
sm_socket *sockets;
sm_worker *workers;
ns_nameserver *nameserver;
dpt_dispatcher *listener;
dpt_dispatcher *master;
dpt_dispatcher *workers_dpt;
pthread_mutex_t sockets_mutex;
} sm_manager;
/*----------------------------------------------------------------------------*/
sm_manager *sm_create( ns_nameserver *nameserver, int thread_count );
sm_manager *sm_create( ns_nameserver *nameserver, sockhandler_t pmaster, sockhandler_t pworker, int thread_count);
int sm_start( sm_manager* manager );
int sm_wait( sm_manager* manager );
void sm_stop( sm_manager *manager );
void sm_destroy( sm_manager **manager );
// TODO: another parameter: type - in / out / something else
int sm_open_socket( sm_manager *manager, unsigned short port, socket_t type);
int sm_close_socket( sm_manager *manager, unsigned short port);
// Handlers
static inline void sm_register_handler(sm_manager* manager, iohandler_t handler) {
manager->handler = handler;
}
static inline iohandler_t sm_handler(sm_manager* manager) {
return manager->handler;
}
void sm_tcp_handler(sm_event *ev);
void sm_udp_handler(sm_event *ev);
/** \todo Temporary APIs.
* Socket manager should only accept Master prototype + non-compulsory Worker prototype.
*/
int sm_reserve_events( sm_worker *worker, uint size );
int sm_remove_event( sm_manager *manager, int socket );
int sm_add_event( sm_manager *manager, int socket, uint32_t events );
#endif
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <errno.h>
#include "tcp-handler.h"
void tcp_handler(sm_event *ev)
{
struct sockaddr_in faddr;
int addrsize = sizeof(faddr);
int incoming = 0;
// Master socket
/// \todo Lock per-socket.
if(ev->fd == ev->manager->sockets->socket) {
// Accept on master socket
while(incoming >= 0) {
pthread_mutex_lock(&ev->manager->sockets_mutex);
incoming = accept(ev->fd, (struct sockaddr *)&faddr, (socklen_t *)&addrsize);
// Register to epoll
if(incoming < 0) {
//log_error("cannot accept incoming TCP connection (errno %d): %s.\n", errno, strerror(errno));
}
else {
sm_add_event(ev->manager, incoming, EPOLLIN);
debug_sm("tcp accept: accepted %d\n", incoming);
}
pthread_mutex_unlock(&ev->manager->sockets_mutex);
}
return;
}
// Receive size
unsigned short pktsize = 0;
pthread_mutex_lock(&ev->manager->sockets_mutex);
int n = recv(ev->fd, &pktsize, sizeof(unsigned short), 0);
pktsize = ntohs(pktsize);
debug_sm("Incoming packet size on %d: %u buffer size: %u\n", ev->fd, (unsigned) pktsize, (unsigned) ev->size_in);
// Receive payload
if(n > 0 && pktsize > 0) {
if(pktsize <= ev->size_in)
n = recv(ev->fd, ev->inbuf, pktsize, 0); /// \todo Check buffer overflow.
else
n = 0;
}
// Check read result
pthread_mutex_unlock(&ev->manager->sockets_mutex);
if(n > 0) {
// Send answer
size_t answer_size = ev->size_out;
int res = ns_answer_request(ev->manager->nameserver, ev->inbuf, n, ev->outbuf + sizeof(short),
&answer_size);
debug_sm("Answer wire format (size %u, result %d).\n", (unsigned) answer_size, res);
if(res >= 0) {
// Copy header
pktsize = htons(answer_size);
memcpy(ev->outbuf, &pktsize, sizeof(unsigned short));
int sent = send(ev->fd, ev->outbuf, answer_size + sizeof(unsigned short), 0);
if (sent < 0) {
log_error("tcp send failed (errno %d): %s\n", errno, strerror(errno));
}
debug_sm("Sent answer to %d\n", ev->fd);
}
}
// Evaluate
/// \todo Do not close if there is a pending write in another thread.
if(n <= 0) {
// Zero read or error other than would-block
debug_sm("tcp disconnected: %d\n", ev->fd);
pthread_mutex_lock(&ev->manager->sockets_mutex);
sm_remove_event(ev->manager, ev->fd);
pthread_mutex_unlock(&ev->manager->sockets_mutex);
close(ev->fd);
}
}
void *tcp_master( void *obj )
{
int worker_id = 0, nfds = 0;
sm_manager* manager = (sm_manager *)obj;
while (manager->is_running) {
// Select next worker
sm_worker* worker = &manager->workers[worker_id];
pthread_mutex_lock(&worker->mutex);
// Reserve backing-store and wait
pthread_mutex_lock(&manager->sockets_mutex);
int current_fds = manager->fd_count;
sm_reserve_events(worker, current_fds * 2);
pthread_mutex_unlock(&manager->sockets_mutex);
nfds = epoll_wait(manager->epfd, worker->events, current_fds, 1000);
if (nfds < 0) {
debug_server("epoll_wait: %s\n", strerror(errno));
worker->events_count = 0;
pthread_cond_signal(&worker->wakeup);
pthread_mutex_unlock(&worker->mutex);
continue; // Keep the same worker
}
// Signalize
worker->events_count = nfds;
pthread_cond_signal(&worker->wakeup);
pthread_mutex_unlock(&worker->mutex);
// Next worker
worker_id = next_worker(worker_id, manager);
}
// Wake up all workers
int last_wrkr = worker_id;
for(;;) {
sm_worker* worker = &manager->workers[worker_id];
pthread_mutex_lock(&worker->mutex);
worker->events_count = -1; // Shut down worker
pthread_cond_signal(&worker->wakeup);
pthread_mutex_unlock(&worker->mutex);
worker_id = next_worker(worker_id, manager);
// Finish with the starting worker
if(worker_id == last_wrkr)
break;
}
return NULL;
}
void *tcp_worker( void *obj )
{
sm_worker* worker = (sm_worker *)obj;
char buf[SOCKET_BUFF_SIZE];
char answer[SOCKET_BUFF_SIZE];
sm_event event;
event.manager = worker->mgr;
event.fd = 0;
event.events = 0;
event.inbuf = buf;
event.outbuf = answer;
event.size_in = event.size_out = SOCKET_BUFF_SIZE;
for(;;) {
pthread_mutex_lock(&worker->mutex);
pthread_cond_wait(&worker->wakeup, &worker->mutex);
// Check
if(worker->events_count < 0) {
pthread_mutex_unlock(&worker->mutex);
break;
}
// Evaluate
//fprintf(stderr, "Worker [%d] wakeup %d events.\n", worker->id, worker->events_count);
for(int i = 0; i < worker->events_count; ++i) {
event.fd = worker->events[i].data.fd;
event.events = worker->events[i].events;
tcp_handler(&event);
}
pthread_mutex_unlock(&worker->mutex);
}
debug_server("Worker %d finished.\n", worker->id);
return NULL;
}
#ifndef TCPHANDLER_H
#define TCPHANDLER_H
#include "socket-manager.h"
void *tcp_master( void *obj );
void *tcp_worker( void *obj );
#endif
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <errno.h>
#include "udp-handler.h"
void udp_handler(sm_event *ev)
{
struct sockaddr_in faddr;
int addrsize = sizeof(faddr);
int n = 0;
// Loop until all data is read
while(n >= 0) {
// Receive data
// \todo Global I/O lock means ~ 8% overhead; recvfrom() should be thread-safe
n = recvfrom(ev->fd, ev->inbuf, ev->size_in, MSG_DONTWAIT, (struct sockaddr *)&faddr, (socklen_t *)&addrsize);
//char _str[INET_ADDRSTRLEN];
//inet_ntop(AF_INET, &(faddr.sin_addr), _str, INET_ADDRSTRLEN);
//fprintf(stderr, "recvfrom() in %p: received %d bytes from %s:%d.\n", (void*)pthread_self(), n, _str, faddr.sin_port);
// Socket not ready
if(n == -1 && errno == EWOULDBLOCK) {
return;
}
// Error
if(n <= 0) {
log_error("reading data from UDP socket failed: %d - %s\n", errno, strerror(errno));
return;
}
debug_sm("Received %d bytes.\n", n);
size_t answer_size = ev->size_out;
int res = ns_answer_request(ev->manager->nameserver, ev->inbuf, n, ev->outbuf,
&answer_size);
debug_sm("Got answer of size %u.\n", (unsigned) answer_size);
if (res == 0) {
assert(answer_size > 0);
debug_sm("Answer wire format (size %u):\n", answer_size);
debug_sm_hex(answer, answer_size);
for(;;) {
res = sendto(ev->fd, ev->outbuf, answer_size, MSG_DONTWAIT,
(struct sockaddr *) &faddr,
(socklen_t) addrsize);
//fprintf(stderr, "sendto() in %p: written %d bytes to %d.\n", (void*)pthread_self(), res, ev->fd);
if(res != answer_size) {
log_error("failed to send datagram (errno %d): %s.\n", res, strerror(res));
continue;
}
break;
}
}
}
}
void *udp_master( void *obj )
{
int worker_id = 0, nfds = 0;
sm_manager* manager = (sm_manager *)obj;
while (manager->is_running) {
// Select next worker
sm_worker* worker = &manager->workers[worker_id];
pthread_mutex_lock(&worker->mutex);
// Reserve backing-store and wait
pthread_mutex_lock(&manager->sockets_mutex);
int current_fds = manager->fd_count;
sm_reserve_events(worker, current_fds * 2);
pthread_mutex_unlock(&manager->sockets_mutex);
nfds = epoll_wait(manager->epfd, worker->events, current_fds, 1000);
if (nfds < 0) {
debug_server("epoll_wait: %s\n", strerror(errno));
worker->events_count = 0;
pthread_cond_signal(&worker->wakeup);
pthread_mutex_unlock(&worker->mutex);
continue; // Keep the same worker
}
// Signalize
worker->events_count = nfds;
pthread_cond_signal(&worker->wakeup);
pthread_mutex_unlock(&worker->mutex);
// Next worker
worker_id = next_worker(worker_id, manager);
}
// Wake up all workers
int last_wrkr = worker_id;
for(;;) {
sm_worker* worker = &manager->workers[worker_id];
pthread_mutex_lock(&worker->mutex);
worker->events_count = -1; // Shut down worker
pthread_cond_signal(&worker->wakeup);
pthread_mutex_unlock(&worker->mutex);
worker_id = next_worker(worker_id, manager);
// Finish with the starting worker
if(worker_id == last_wrkr)
break;
}
return NULL;
}
void *udp_worker( void *obj )
{
sm_worker* worker = (sm_worker *)obj;
char buf[SOCKET_BUFF_SIZE];
char answer[SOCKET_BUFF_SIZE];
sm_event event;
event.manager = worker->mgr;
event.fd = 0;
event.events = 0;
event.inbuf = buf;
event.outbuf = answer;
event.size_in = event.size_out = SOCKET_BUFF_SIZE;
for(;;) {
pthread_mutex_lock(&worker->mutex);
pthread_cond_wait(&worker->wakeup, &worker->mutex);
// Check
if(worker->events_count < 0) {
pthread_mutex_unlock(&worker->mutex);
break;
}
// Evaluate
//fprintf(stderr, "Worker [%d] wakeup %d events.\n", worker->id, worker->events_count);
for(int i = 0; i < worker->events_count; ++i) {
event.fd = worker->events[i].data.fd;
event.events = worker->events[i].events;
udp_handler(&event);
}
pthread_mutex_unlock(&worker->mutex);
}
debug_server("Worker %d finished.\n", worker->id);
return NULL;
}
#ifndef UDPHANDLER_H
#define UDPHANDLER_H
#include "socket-manager.h"
void *udp_master( void *obj );
void *udp_worker( void *obj );
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment