Commit 7180a80a authored by Marek Vavrusa's avatar Marek Vavrusa

Extended Sockets API.

 * socket_create()
 * socket_connect()
 * socket_send()/socket_sendto()
 * socket_recv()/socket_recvfrom()
 * Preliminary \addtogroup Doxygen group "network"
parent f64b9dc6
......@@ -43,12 +43,12 @@ cute_server *cute_create()
debug_server("Estimated number of threads per handler: %d\n", thr_count);
// Create socket handlers
int sock = socket(AF_INET, SOCK_STREAM, 0);
int sock = socket_create(PF_INET, SOCK_STREAM);
socket_bind(sock, "0.0.0.0", DEFAULT_PORT);
socket_listen(sock, TCP_BACKLOG_SIZE);
cute_create_handler(server, sock, &tcp_master, 1);
sock = socket(AF_INET, SOCK_DGRAM, 0);
sock = socket_create(PF_INET, SOCK_DGRAM);
socket_bind(sock, "0.0.0.0", DEFAULT_PORT);
cute_create_handler(server, sock, &udp_worker, thr_count);
debug_server("Done\n\n");
......
......@@ -5,26 +5,60 @@
#include <string.h>
#include <errno.h>
#include <stdio.h>
#include <sys/epoll.h>
#include <netdb.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
int socket_create( int family, int type )
{
// Create socket
int sock = socket(family, type, 0);
// Reuse open socket
int on = 1;
if(setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
(const char*)&on, sizeof(on)) < 0) {
return socket_close(sock);
}
return sock;
}
int socket_connect( int socket, const char* addr, unsigned short port )
{
// Create socket
struct hostent* hent = gethostbyname(addr);
if(hent == 0)
return -1;
// Prepare host address
struct sockaddr_in saddr;
socklen_t addrlen = sizeof(struct sockaddr_in);
saddr.sin_family = AF_INET;
saddr.sin_port = htons(port);
memcpy(&saddr.sin_addr, hent->h_addr, hent->h_length);
// Connect to host
return connect(socket, (struct sockaddr *)&saddr, addrlen);
}
int socket_bind( int socket, const char* addr, unsigned short port )
{
// Initialize socket address
struct sockaddr_in saddr;
socklen_t addrlen = sizeof(struct sockaddr_in);
if(getsockname(socket, &saddr, &addrlen) < 0) {
return -1;
return -1;
}
// Set address and port
saddr.sin_port = htons(port);
saddr.sin_addr.s_addr = inet_addr(addr);
if(saddr.sin_addr.s_addr == INADDR_NONE) {
log_error("socket_listen: address %s is invalid, using 0.0.0.0 instead", addr);
saddr.sin_addr.s_addr = htonl(INADDR_ANY);
log_error("socket_listen: address %s is invalid, using 0.0.0.0 instead",
addr);
saddr.sin_addr.s_addr = htonl(INADDR_ANY);
}
// Reuse old address if taken
......@@ -36,7 +70,8 @@ int socket_bind( int socket, const char* addr, unsigned short port )
// Bind to specified address
int res = bind(socket, (struct sockaddr *)& saddr, sizeof(saddr));
if (res == -1) {
log_error("cannot bind socket (errno %d): %s.\n", errno, strerror(errno));
log_error("cannot bind socket (errno %d): %s.\n",
errno, strerror(errno));
return -3;
}
......@@ -48,49 +83,29 @@ int socket_listen( int socket, int backlog_size )
return listen(socket, backlog_size);
}
int socket_close( int socket )
ssize_t socket_recv( int socket, void *buf, size_t len, int flags )
{
return close(socket);
return recv(socket, buf, len, flags);
}
int socket_poll_create( int events_count )
ssize_t socket_recvfrom( int socket, void *buf, size_t len, int flags,
struct sockaddr *from, socklen_t *fromlen )
{
return epoll_create(events_count);
return recvfrom(socket, buf, len, flags, from, fromlen);
}
int socket_poll_remove( int epfd, int socket )
ssize_t socket_send( int socket, const void *buf, size_t len, int flags )
{
// Compatibility with kernels < 2.6.9, require non-NULL ptr.
struct epoll_event ev;
// find socket ptr
if(epoll_ctl(epfd, EPOLL_CTL_DEL, socket, &ev) != 0) {
perror ("epoll_ctl");
return -1;
}
return 0;
return send(socket, buf, len, flags);
}
int socket_poll_add( int epfd, int socket, uint32_t events )
ssize_t socket_sendto( int socket, const void *buf, size_t len, int flags,
const struct sockaddr *to, socklen_t tolen )
{
struct epoll_event ev;
memset(&ev, 0, sizeof(struct epoll_event));
// All polled events should use non-blocking mode.
int old_flag = fcntl(socket, F_GETFL, 0);
if (fcntl(socket, F_SETFL, old_flag | O_NONBLOCK) == -1) {
log_error("error setting non-blocking mode on the socket.\n");
return -1;
}
// Register to epoll
ev.data.fd = socket;
ev.events = events;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, socket, &ev) != 0) {
log_error("failed to add socket to event set (errno %d): %s.\n", errno, strerror(errno));
return -1;
}
return sendto(socket, buf, len, flags, to, tolen);
}
return 0;
int socket_close( int socket )
{
return close(socket);
}
/*!
* \file socket.h
* \date 1.11.2010
* \author Marek Vavrusa <marek.vavrusa@nic.cz>
* \group Server
*
* \brief Generic sockets APIs.
*
* This file provides platform-independent sockets.
* Functions work on sockets created via system socket(2) functions.
*
* \addtogroup network
* @{
*/
#ifndef CUTE_SOCKET_H
......@@ -22,6 +23,25 @@ enum {
SOCKET_MTU_SZ = 8192, //!< \todo <= Determine UDP MTU size.
} socket_const_t;
/*!
* \brief Create socket.
*
* \param family Socket family (PF_INET, PF_IPX, PF_PACKET, PF_UNIX).
* \param type Socket type (SOCK_STREAM, SOCK_DGRAM, SOCK_RAW).
* \return On success: >=0, on failure: <0.
*/
int socket_create( int family, int type );
/*!
* \brief Connect to remote host.
*
* \param socket Socket filedescriptor.
* \param addr Requested address.
* \param port Requested port.
* \return On success: 0, on failure: <0.
*/
int socket_connect( int socket, const char *addr, unsigned short port );
/*!
* \brief Listen on given socket.
*
......@@ -30,7 +50,7 @@ enum {
* \param port Requested port.
* \return On success: 0, on failure: <0.
*/
int socket_bind( int socket, const char* addr, unsigned short port );
int socket_bind( int socket, const char *addr, unsigned short port );
/*!
* \brief Listen on given TCP socket.
......@@ -42,44 +62,63 @@ int socket_bind( int socket, const char* addr, unsigned short port );
int socket_listen( int socket, int backlog_size );
/*!
* \brief Close and deinitialize socket.
* \brief Receive data from connection-mode socket.
*
* \param socket Socket filedescriptor.
* \param buf Destination buffer.
* \param len Maximum data length.
* \param flags Additional flags.
* \return On success: 0, on failure: <0.
*/
int socket_close( int socket );
ssize_t socket_recv( int socket, void *buf, size_t len, int flags );
/*!
* \brief Create fd for polling.
*
* \deprecated Use libevent http://monkey.org/~provos/libevent/
* \brief Receive data from datagram-mode socket.
*
* \param events_count Initial events backing store size.
* \param socket Socket filedescriptor.
* \param buf Destination buffer.
* \param len Maximum data length.
* \param flags Additional flags.
* \param from Datagram source address.
* \param fromlen Address length.
* \return On success: 0, on failure: <0.
*/
int socket_poll_create( int events_count );
ssize_t socket_recvfrom( int socket, void *buf, size_t len, int flags,
struct sockaddr *from, socklen_t *fromlen );
/*!
* \brief Add socket to poll set.
* \brief Send data to connection-mode socket.
*
* \deprecated Use libevent http://monkey.org/~provos/libevent/
*
* \param epfd Poll set filedescriptor.
* \param socket Socket waiting to be added to set.
* \param events Requested poll flags.
* \param socket Socket filedescriptor.
* \param buf Source buffer.
* \param len Data length.
* \param flags Additional flags.
* \return On success: 0, on failure: <0.
*/
int socket_poll_add( int epfd, int socket, uint32_t events );
ssize_t socket_send( int socket, const void *buf, size_t len, int flags );
/*!
* \brief Remove socket from poll set.
* \brief Send data to datagram-mode socket.
*
* \deprecated Use libevent http://monkey.org/~provos/libevent/
* \param socket Socket filedescriptor.
* \param buf Source buffer.
* \param len Data length.
* \param flags Additional flags.
* \param to Datagram source address.
* \param tolen Address length.
* \return On success: 0, on failure: <0.
*/
ssize_t socket_sendto( int socket, const void *buf, size_t len, int flags,
const struct sockaddr *to, socklen_t tolen );
/*!
* \brief Close and deinitialize socket.
*
* \param epfd Poll set filedescriptor.
* \param socket Socket waiting to be removed from set.
* \param socket Socket filedescriptor.
* \return On success: 0, on failure: <0.
*/
int socket_poll_remove( int epfd, int socket );
int socket_close( int socket );
/** @} */
#endif
#endif // CUTE_SOCKET_H
......@@ -3,6 +3,7 @@
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include "tcp-handler.h"
......@@ -18,7 +19,48 @@ typedef struct tcp_worker_t {
pthread_cond_t wakeup;
} tcp_worker_t;
/*! \todo Make generic in socket.h interface. */
int tcp_epoll_create( int size )
{
return epoll_create(size);
}
int tcp_epoll_add( int epfd, int socket, uint32_t events )
{
struct epoll_event ev;
memset(&ev, 0, sizeof(struct epoll_event));
// All polled events should use non-blocking mode.
int old_flag = fcntl(socket, F_GETFL, 0);
if (fcntl(socket, F_SETFL, old_flag | O_NONBLOCK) == -1) {
log_error("error setting non-blocking mode on the socket.\n");
return -1;
}
// Register to epoll
ev.data.fd = socket;
ev.events = events;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, socket, &ev) != 0) {
log_error("failed to add socket to event set (errno %d): %s.\n", errno, strerror(errno));
return -1;
}
return 0;
}
int tcp_epoll_remove( int epfd, int socket )
{
// Compatibility with kernels < 2.6.9, require non-NULL ptr.
struct epoll_event ev;
// find socket ptr
if(epoll_ctl(epfd, EPOLL_CTL_DEL, socket, &ev) != 0) {
perror ("epoll_ctl");
return -1;
}
return 0;
}
static int tcp_reserve_bs(tcp_worker_t* worker, uint size)
{
if(worker->events_size >= size)
......@@ -43,7 +85,7 @@ tcp_worker_t* tcp_worker_create(cute_server* server)
return NULL;
// Create epoll
worker->epfd = socket_poll_create(1);
worker->epfd = tcp_epoll_create(1);
if (worker->epfd == -1) {
free(worker);
return NULL;
......@@ -142,7 +184,7 @@ void *tcp_master( void *obj )
tcp_worker_t* tcp_worker = tcp_workers[worker_id];
pthread_mutex_lock(&tcp_worker->mutex);
debug_net("tcp_master: accept: assigned socket %d to worker #%d\n", incoming, worker_id);
if(socket_poll_add(tcp_worker->epfd, incoming, EPOLLIN) == 0)
if(tcp_epoll_add(tcp_worker->epfd, incoming, EPOLLIN) == 0)
++tcp_worker->events_count;
// Run worker
......@@ -193,14 +235,14 @@ static inline void tcp_handler(int fd, uint8_t* inbuf, int inbuf_sz, uint8_t* ou
{
// Receive size
unsigned short pktsize = 0;
int n = recv(fd, &pktsize, sizeof(unsigned short), 0);
int n = socket_recv(fd, &pktsize, sizeof(unsigned short), 0);
pktsize = ntohs(pktsize);
debug_net("tcp: incoming packet size on %d: %u buffer size: %u\n", fd, (unsigned) pktsize, (unsigned) inbuf_sz);
// Receive payload
if(n > 0 && pktsize > 0) {
if(pktsize <= inbuf_sz)
n = recv(fd, inbuf, pktsize, 0); /// \todo Check buffer overflow.
n = socket_recv(fd, inbuf, pktsize, 0); /// \todo Check buffer overflow.
else
n = 0;
}
......@@ -223,7 +265,7 @@ static inline void tcp_handler(int fd, uint8_t* inbuf, int inbuf_sz, uint8_t* ou
((unsigned short*) outbuf)[0] = htons(answer_size);
int sent = -1;
while(sent < 0) {
sent = send(fd, outbuf, answer_size + sizeof(unsigned short), 0);
sent = socket_send(fd, outbuf, answer_size + sizeof(unsigned short), 0);
}
// Uncork
......@@ -272,9 +314,9 @@ void *tcp_worker( void *obj )
// Disconnect
debug_net("tcp: disconnected: %d\n", fd);
pthread_mutex_lock(&worker->mutex);
socket_poll_remove(worker->epfd, fd);
tcp_epoll_remove(worker->epfd, fd);
--worker->events_count;
close(fd);
socket_close(fd);
pthread_mutex_unlock(&worker->mutex);
}
}
......
......@@ -30,7 +30,7 @@ static inline void udp_epoll_handler(sm_event *ev)
// Receive data
// \todo Global I/O lock means ~ 8% overhead; recvfrom() should be thread-safe
n = recvfrom(ev->fd, ev->inbuf, ev->size_in, MSG_DONTWAIT, (struct sockaddr *)&faddr, (socklen_t *)&addrsize);
n = socket_recvfrom(ev->fd, ev->inbuf, ev->size_in, MSG_DONTWAIT, (struct sockaddr *)&faddr, (socklen_t *)&addrsize);
//char _str[INET_ADDRSTRLEN];
//inet_ntop(AF_INET, &(faddr.sin_addr), _str, INET_ADDRSTRLEN);
//fprintf(stderr, "recvfrom() in %p: received %d bytes from %s:%d.\n", (void*)pthread_self(), n, _str, faddr.sin_port);
......@@ -60,7 +60,7 @@ static inline void udp_epoll_handler(sm_event *ev)
debug_sm_hex(answer, answer_size);
for(;;) {
res = sendto(ev->fd, ev->outbuf, answer_size, MSG_DONTWAIT,
res = socket_sendto(ev->fd, ev->outbuf, answer_size, MSG_DONTWAIT,
(struct sockaddr *) &faddr,
(socklen_t) addrsize);
......
......@@ -27,7 +27,7 @@ void* udp_worker( void* obj )
while(n >= 0) {
// Receive data
n = recvfrom(sock, inbuf, SOCKET_MTU_SZ, 0, (struct sockaddr *)&faddr, (socklen_t *)&addrsize);
n = socket_recvfrom(sock, inbuf, SOCKET_MTU_SZ, 0, (struct sockaddr *)&faddr, (socklen_t *)&addrsize);
// Error and interrupt handling
//fprintf(stderr, "recvfrom(): thread %p ret %d errno %s.\n", (void*)pthread_self(), n, strerror(errno));
......@@ -56,7 +56,7 @@ void* udp_worker( void* obj )
debug_net_hex((const char*) outbuf, answer_size);
for(;;) {
res = sendto(sock, outbuf, answer_size, 0,
res = socket_sendto(sock, outbuf, answer_size, 0,
(struct sockaddr *) &faddr,
(socklen_t) addrsize);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment