Commit d91e49b1 authored by Marek Vavrusa's avatar Marek Vavrusa

Ported server to DThreads instead of dispatcher.

parent 115ad1aa
......@@ -4,7 +4,6 @@
#include "zone-database.h"
#include "name-server.h"
#include "zone-parser.h"
#include "dthreads.h"
#include <unistd.h>
cute_server *cute_create()
......@@ -47,17 +46,25 @@ cute_server *cute_create()
int sock = socket_create(PF_INET, SOCK_STREAM);
socket_bind(sock, "0.0.0.0", DEFAULT_PORT);
socket_listen(sock, TCP_BACKLOG_SIZE);
cute_create_handler(server, sock, &tcp_master, 1);
// Create threading unit
dt_unit_t *unit = dt_create(thr_count);
dt_repurpose(unit->threads[0], &tcp_master, 0);
cute_create_handler(server, sock, unit);
// Create UDP socket
sock = socket_create(PF_INET, SOCK_DGRAM);
socket_bind(sock, "0.0.0.0", DEFAULT_PORT);
cute_create_handler(server, sock, &udp_worker, thr_count);
// Create threading unit
unit = dt_create_coherent(thr_count, &udp_master, 0);
cute_create_handler(server, sock, unit);
debug_server("Done\n\n");
return server;
}
int cute_create_handler(cute_server *server, int fd, thr_routine routine, int threads)
int cute_create_handler(cute_server *server, int fd, dt_unit_t* unit)
{
// Create new worker
iohandler_t* handler = malloc(sizeof(iohandler_t));
......@@ -69,10 +76,12 @@ int cute_create_handler(cute_server *server, int fd, thr_routine routine, int th
handler->state = Idle;
handler->next = server->handlers;
handler->server = server;
handler->threads = dpt_create(threads, routine, handler);
if(handler->threads == NULL) {
free(handler);
return -2;
handler->unit = unit;
// Update unit data object
for(int i = 0; i < unit->size; ++i) {
dthread_t *thread = unit->threads[i];
dt_repurpose(thread, thread->run, handler);
}
// Update list
......@@ -80,7 +89,7 @@ int cute_create_handler(cute_server *server, int fd, thr_routine routine, int th
// Run if server is online
if(server->state & Running) {
dpt_start(handler->threads);
dt_start(handler->unit);
}
return handler->fd;
......@@ -114,15 +123,15 @@ int cute_remove_handler(cute_server *server, int fd)
// Wait for dispatcher to finish
if(w->state & Running) {
w->state = Idle;
dpt_notify(w->threads, SIGALRM);
dpt_wait(w->threads);
dt_stop(w->unit);
dt_join(w->unit);
}
// Close socket
socket_close(w->fd);
// Destroy dispatcher and worker
dpt_destroy(&w->threads);
dt_delete(&w->unit);
free(w);
return 0;
}
......@@ -146,7 +155,7 @@ int cute_start( cute_server *server, char **filenames, uint zones )
server->state |= Running;
for(iohandler_t* w = server->handlers; w != NULL; w = w->next) {
w->state = Running;
ret += dpt_start(w->threads);
ret += dt_start(w->unit);
}
return ret;
......@@ -157,8 +166,10 @@ int cute_wait(cute_server *server)
// Wait for dispatchers to finish
int ret = 0;
while(server->handlers != NULL) {
ret += dpt_wait(server->handlers->threads);
debug_server("server: [%p] joining threading unit\n", server->handlers);
ret += dt_join(server->handlers->unit);
cute_remove_handler(server, server->handlers->fd);
debug_server("server: joined threading unit\n", p);
}
return ret;
......@@ -170,7 +181,7 @@ void cute_stop( cute_server *server )
server->state &= ~Running;
for(iohandler_t* w = server->handlers; w != NULL; w = w->next) {
w->state = Idle;
dpt_notify(w->threads, SIGALRM);
dt_stop(w->unit);
}
}
......
......@@ -13,21 +13,21 @@
#ifndef SERVER_H
#define SERVER_H
#include "dispatcher.h"
#include "zone-database.h"
#include "name-server.h"
#include "common.h"
#include "socket.h"
#include "dthreads.h"
/** I/O handler structure.
*/
typedef struct iohandler_t {
int fd; /* I/O filedescripto r */
unsigned state; /* Handler state */
struct iohandler_t* next; /* Next handler */
dpt_dispatcher* threads; /* Handler threads */
struct cute_server* server; /* Reference to server */
int fd; /* I/O filedescripto r */
unsigned state; /* Handler state */
struct iohandler_t* next; /* Next handler */
dt_unit_t* unit; /* Threading unit */
struct cute_server* server; /* Reference to server */
} iohandler_t;
......@@ -39,8 +39,8 @@ typedef struct iohandler_t {
/*! Server state flags.
*/
typedef enum {
Idle = 0x00,
Running = 0x01
Idle = 0 << 0,
Running = 1 << 0
} server_state;
struct cute_server;
......@@ -76,11 +76,10 @@ cute_server *cute_create();
/** Create and bind handler to given filedescriptor.
* \param fd I/O filedescriptor.
* \param routine Handler routine.
* \param threads Number of threads to spawn.
* \param unit Threading unit to serve given filedescriptor.
* \return handler identifier or -1
*/
int cute_create_handler(cute_server *server, int fd, thr_routine routine, int threads);
int cute_create_handler(cute_server *server, int fd, dt_unit_t* unit);
/** Delete handler.
* \param fd I/O handler filedescriptor.
......
This diff is collapsed.
......@@ -13,8 +13,8 @@
#include "socket.h"
#include "server.h"
#include "dthreads.h"
void *tcp_master( void *obj );
void *tcp_worker( void *obj );
int tcp_master (dthread_t* thread);
#endif
......@@ -3,66 +3,86 @@
#include <unistd.h>
#include <errno.h>
#include "udp-handler.h"
#include "name-server.h"
void* udp_worker( void* obj )
int udp_master (dthread_t *thread)
{
iohandler_t* worker = (iohandler_t*) obj;
iohandler_t* handler = (iohandler_t *)thread->data;
ns_nameserver* ns = handler->server->nameserver;
int sock = handler->fd;
// Check socket
if(worker->fd < 0) {
if (sock < 0) {
debug_net("udp_worker: null socket recevied, finishing.\n");
return NULL;
return 0;
}
int sock = worker->fd;
ns_nameserver* ns = worker->server->nameserver;
/*
* \todo: Make sure stack size is big enough.
* Although this is much cheaper,
* 16kB worth of buffers *may* pose a problem.
*/
uint8_t inbuf[SOCKET_MTU_SZ];
uint8_t outbuf[SOCKET_MTU_SZ];
struct sockaddr_in faddr;
int addrsize = sizeof(faddr);
// Loop until all data is read
debug_net("udp_worker: thread started (worker #%d).\n", worker->id);
debug_net("udp: thread started (worker %p).\n", thread);
int n = 0;
while(n >= 0) {
while (n >= 0) {
// Receive data
n = socket_recvfrom(sock, inbuf, SOCKET_MTU_SZ, 0, (struct sockaddr *)&faddr, (socklen_t *)&addrsize);
n = socket_recvfrom(sock, inbuf, SOCKET_MTU_SZ, 0,
(struct sockaddr *)&faddr, (socklen_t *)&addrsize);
// Cancellation point
if (dt_is_cancelled(thread)) {
break;
}
// Error and interrupt handling
//fprintf(stderr, "recvfrom(): thread %p ret %d errno %s.\n", (void*)pthread_self(), n, strerror(errno));
if(n <= 0) {
if(errno != EINTR && errno != 0) {
log_error("udp_worker: reading data from the socket failed: %d - %s\n", errno, strerror(errno));
}
if(!(worker->state & Running))
break;
else
continue;
if (n <= 0) {
if (errno != EINTR && errno != 0) {
log_error("udp: socket_recfrom() failed: %d - %s\n",
errno, strerror(errno));
}
if (!(handler->state & Running)) {
debug_net("udp: bailing out, iohandler is not running");
break;
} else {
continue;
}
}
debug_net("udp_worker: received %d bytes.\n", n);
// Answer request
debug_net("udp: received %d bytes.\n", n);
size_t answer_size = SOCKET_MTU_SZ;
int res = ns_answer_request(ns, inbuf, n, outbuf,
&answer_size);
&answer_size);
debug_net("udp_worker: got answer of size %u.\n", (unsigned) answer_size);
debug_net("udp: got answer of size %u.\n", (unsigned) answer_size);
// Send answer
if (res == 0) {
assert(answer_size > 0);
debug_net("udp: answer wire format (size %u):\n",
(unsigned) answer_size);
debug_net("udp_worker: answer wire format (size %u):\n", (unsigned) answer_size);
debug_net_hex((const char*) outbuf, answer_size);
for(;;) {
// Send datagram
for (;;) {
res = socket_sendto(sock, outbuf, answer_size, 0,
(struct sockaddr *) &faddr,
(socklen_t) addrsize);
(struct sockaddr *) &faddr,
(socklen_t) addrsize);
//fprintf(stderr, "sendto() in %p: written %d bytes to %d.\n", (void*)pthread_self(), res, ev->fd);
if(res != answer_size) {
log_error("udp_worker: failed to send datagram (errno %d): %s.\n", res, strerror(res));
// Check result
if (res != answer_size) {
log_error("udp: socket_sendto() failed: %d - %s.\n",
res, strerror(res));
continue;
}
......@@ -71,6 +91,6 @@ void* udp_worker( void* obj )
}
}
debug_net("udp_worker: worker #%d finished.\n", worker->id);
return NULL;
debug_net("udp: worker %p finished.\n", thread);
return 0;
}
......@@ -13,8 +13,8 @@
#include "socket.h"
#include "server.h"
#include "dthreads.h"
void *udp_master( void *obj );
void *udp_worker( void *obj );
int udp_master (dthread_t* thread);
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment