Commit 9fcc226c authored by Tomas Hlavacek's avatar Tomas Hlavacek Committed by Jan Včelák

Add support for binding multiple UDP sockets

Add support for binding multiple UDP sockets for with SO_REUSEPORT
flag in order to overcome performance hit caused by sharing one
socket and waiting on one single lock in kernel.
parent 6645878c
......@@ -213,6 +213,10 @@ AC_ARG_ENABLE([systemd],
AS_HELP_STRING([--enable-systemd=auto|yes|no], [enable systemd integration [default=auto]]),
[enable_systemd="$enableval"], [enable_systemd=auto])
AC_ARG_ENABLE([reuseport],
AS_HELP_STRING([--enable-reuseport=auto|yes|no], [enable modern Linux networking with SO_REUSEPORT [default=auto]]),
[enable_reuseport="$enableval"], [enable_reuseport=auto])
AS_IF([test "$enable_daemon" = "yes"],[
AS_IF([test "$enable_systemd" != "no"],[
......@@ -226,6 +230,29 @@ AS_IF([test "$enable_systemd" != "no"],[
AS_IF([test "$enable_systemd" = "yes"],[
AC_DEFINE([ENABLE_SYSTEMD], [1], [Use systemd integration.])])
AS_IF([test "$enable_reuseport" != "no"],[
AS_CASE([$enable_reuseport],
[auto], [
AC_MSG_CHECKING([for socket REUSEPORT])
AC_TRY_COMPILE(
[
#include <sys/socket.h>
], [
int val = 1;
setsockopt(0, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val));
], [
enable_reuseport="yes"
AC_DEFINE([ENABLE_REUSEPORT], [1], [Use moder networking.])
AC_MSG_RESULT([yes])
], [
enable_reuseport="no"
AC_MSG_RESULT([no])
])
],
[yes],[AC_DEFINE([ENABLE_REUSEPORT], [1], [Use modern networking.])],
[*],[AC_MSG_ERROR([Invalid value of --enable-reuseport.])])
])
])
dnl Check for userspace-rcu library
......@@ -479,6 +506,7 @@ AC_MSG_RESULT([
GnuTLS: ${gnutls_LIBS} ${gnutls_CFLAGS}
Jansson: ${jansson_LIBS} ${jansson_CFLAGS}
LMDB: ${enable_lmdb} ${lmdb_LIBS} ${lmdb_CFLAGS}
SO_REUSEPORT: ${enable_reuseport}
Prefix: ${prefix}
Run dir: ${run_dir}
......
......@@ -78,13 +78,21 @@ static int evsched_run(dthread_t *thread)
static void server_remove_iface(iface_t *iface)
{
/* Free UDP handler. */
if (iface->fd[IO_UDP] > -1) {
close(iface->fd[IO_UDP]);
#ifdef ENABLE_REUSEPORT
for (int i = 0; i < iface->fd_udp_count; i++) {
if (iface->fd_udp[i] > -1) {
close(iface->fd_udp[i]);
}
}
#else
if (iface->fd_udp[0] > -1) {
close(iface->fd_udp[0]);
}
#endif
/* Free TCP handler. */
if (iface->fd[IO_TCP] > -1) {
close(iface->fd[IO_TCP]);
if (iface->fd_tcp > -1) {
close(iface->fd_tcp);
}
/* Free interface. */
......@@ -129,7 +137,7 @@ static bool enlarge_net_buffers(int sock, int min_recvsize, int min_sndsize)
* \retval 0 if successful (EOK).
* \retval <0 on errors (EACCES, EINVAL, ENOMEM, EADDRINUSE).
*/
static int server_init_iface(iface_t *new_if, struct sockaddr_storage *addr)
static int server_init_iface(iface_t *new_if, struct sockaddr_storage *addr, int udp_thread_count)
{
/* Initialize interface. */
int ret = 0;
......@@ -140,8 +148,41 @@ static int server_init_iface(iface_t *new_if, struct sockaddr_storage *addr)
char addr_str[SOCKADDR_STRLEN] = { 0 };
sockaddr_tostr(addr_str, sizeof(addr_str), addr);
/* Create bound UDP socket. */
new_if->fd_udp = malloc(udp_thread_count * sizeof(int));
if (!new_if->fd_udp)
return KNOT_ENOMEM;
int bind_flags = 0;
#ifdef ENABLE_REUSEPORT
for (int i = 0; i < udp_thread_count; i++ ) {
/* Create bound UDP socket. */
int sock = net_bound_socket(SOCK_DGRAM, addr, bind_flags | NET_REUSEPORT);
if (sock == KNOT_EADDRNOTAVAIL) {
bind_flags |= NET_BIND_NONLOCAL;
sock = net_bound_socket(SOCK_DGRAM, addr, bind_flags | NET_REUSEPORT);
if (sock >= 0) {
log_warning("address '%s' is not available", addr_str);
}
}
if (sock < 0) {
log_error("cannot bind address '%s' (%s)", addr_str, knot_strerror(sock));
for (int i = 0; i < new_if->fd_udp_count; i++ )
close(new_if->fd_udp[i]);
return sock;
}
if (!enlarge_net_buffers(sock, UDP_MIN_RCVSIZE, UDP_MIN_SNDSIZE)) {
log_warning("failed to set network buffer sizes for UDP");
}
/* Set UDP as non-blocking. */
fcntl(sock, F_SETFL, O_NONBLOCK);
new_if->fd_udp[new_if->fd_udp_count++] = sock;
}
#else
/* Create bound UDP socket. */
int sock = net_bound_socket(SOCK_DGRAM, addr, bind_flags);
if (sock == KNOT_EADDRNOTAVAIL) {
bind_flags |= NET_BIND_NONLOCAL;
......@@ -153,6 +194,8 @@ static int server_init_iface(iface_t *new_if, struct sockaddr_storage *addr)
if (sock < 0) {
log_error("cannot bind address '%s' (%s)", addr_str, knot_strerror(sock));
for (int i = 0; i < new_if->fd_udp_count; i++ )
close(new_if->fd_udp[i]);
return sock;
}
......@@ -163,34 +206,51 @@ static int server_init_iface(iface_t *new_if, struct sockaddr_storage *addr)
/* Set UDP as non-blocking. */
fcntl(sock, F_SETFL, O_NONBLOCK);
new_if->fd[IO_UDP] = sock;
new_if->fd_udp_count = 1;
new_if->fd_udp[0] = sock;
#endif
/* Create bound TCP socket. */
sock = net_bound_socket(SOCK_STREAM, addr, bind_flags);
if (sock < 0) {
close(new_if->fd[IO_UDP]);
return sock;
}
if (!enlarge_net_buffers(sock, TCP_MIN_RCVSIZE, TCP_MIN_SNDSIZE)) {
int tsock = net_bound_socket(SOCK_STREAM, addr, bind_flags);
if (tsock < 0) {
#ifdef ENABLE_REUSEPORT
for (int i = 0; i < new_if->fd_udp_count; i++ )
close(new_if->fd_udp[i]);
#else
close(new_if->fd_udp[0]);
#endif
return tsock;
}
if (!enlarge_net_buffers(tsock, TCP_MIN_RCVSIZE, TCP_MIN_SNDSIZE)) {
log_warning("failed to set network buffer sizes for TCP");
}
new_if->fd[IO_TCP] = sock;
new_if->fd_tcp = tsock;
/* Listen for incoming connections. */
ret = listen(sock, TCP_BACKLOG_SIZE);
ret = listen(tsock, TCP_BACKLOG_SIZE);
if (ret < 0) {
close(new_if->fd[IO_UDP]);
close(new_if->fd[IO_TCP]);
#ifdef ENABLE_REUSEPORT
for (int i = 0; i < new_if->fd_udp_count; i++)
close(new_if->fd_udp[i]);
#else
close(new_if->fd_udp[0]);
#endif
close(new_if->fd_tcp);
log_error("failed to listen on TCP interface '%s'", addr_str);
return KNOT_ERROR;
}
/* accept() must not block */
if (fcntl(sock, F_SETFL, O_NONBLOCK) < 0) {
close(new_if->fd[IO_UDP]);
close(new_if->fd[IO_TCP]);
if (fcntl(tsock, F_SETFL, O_NONBLOCK) < 0) {
#ifdef ENABLE_REUSEPORT
for (int i = 0; i < new_if->fd_udp_count; i++ )
close(new_if->fd_udp[i]);
#else
close(new_if->fd_udp[0]);
#endif
close(new_if->fd_tcp);
log_error("failed to listen on '%s' in non-blocking mode",
addr_str);
return KNOT_ERROR;
......@@ -271,7 +331,7 @@ static int reconfigure_sockets(conf_t *conf, server_t *s)
/* Create new interface. */
m = malloc(sizeof(iface_t));
if (server_init_iface(m, &addr) < 0) {
if (server_init_iface(m, &addr, s->handler[IO_UDP].unit->size) < 0) {
free(m);
m = 0;
}
......@@ -706,7 +766,7 @@ int server_update_zones(conf_t *conf, void *data)
return ret;
}
ref_t *server_set_ifaces(server_t *s, fdset_t *fds, int type)
ref_t *server_set_ifaces(server_t *s, fdset_t *fds, int type, int thread_id)
{
iface_t *i = NULL;
......@@ -714,7 +774,21 @@ ref_t *server_set_ifaces(server_t *s, fdset_t *fds, int type)
fdset_clear(fds);
if (s->ifaces) {
WALK_LIST(i, s->ifaces->l) {
fdset_add(fds, i->fd[type], POLLIN, NULL);
switch(type) {
case IO_TCP:
fdset_add(fds, i->fd_tcp, POLLIN, NULL);
break;
case IO_UDP:
#ifdef ENABLE_REUSEPORT
fdset_add(fds, i->fd_udp[thread_id %
i->fd_udp_count], POLLIN, NULL);
#else
fdset_add(fds, i->fd_udp[0], POLLIN, NULL);
#endif
break;
default:
assert(0);
}
}
}
......
......@@ -67,7 +67,9 @@ typedef enum {
*/
typedef struct iface {
struct node n;
int fd[2];
int *fd_udp;
int fd_udp_count;
int fd_tcp;
struct sockaddr_storage addr;
} iface_t;
......@@ -196,6 +198,6 @@ int server_update_zones(conf_t *conf, void *data);
* \param type I/O type (UDP/TCP).
* \return new interface list
*/
ref_t *server_set_ifaces(server_t *s, fdset_t *fds, int type);
ref_t *server_set_ifaces(server_t *s, fdset_t *fds, int type, int thread_id);
/*! @} */
......@@ -357,7 +357,7 @@ int tcp_master(dthread_t *thread)
}
ref_release(ref);
ref = server_set_ifaces(handler->server, &tcp.set, IO_TCP);
ref = server_set_ifaces(handler->server, &tcp.set, IO_TCP, tcp.thread_id);
if (tcp.set.n == 0) {
break; /* Terminate on zero interfaces. */
}
......
......@@ -449,7 +449,7 @@ static void forget_ifaces(ifacelist_t *ifaces, fd_set *set, int maxfd)
}
/*! \brief Add interface sockets to the watched fdset. */
static int track_ifaces(ifacelist_t *ifaces, fd_set *set, int *maxfd, int *minfd)
static int track_ifaces(ifacelist_t *ifaces, fd_set *set, int *maxfd, int *minfd, int thrid)
{
FD_ZERO(set);
*maxfd = -1;
......@@ -461,7 +461,11 @@ static int track_ifaces(ifacelist_t *ifaces, fd_set *set, int *maxfd, int *minfd
iface_t *iface = NULL;
WALK_LIST(iface, ifaces->l) {
int fd = iface->fd[IO_UDP];
#ifdef ENABLE_REUSEPORT
int fd = iface->fd_udp[thrid];
#else
int fd = iface->fd_udp[0];
#endif
*maxfd = MAX(fd, *maxfd);
*minfd = MIN(fd, *minfd);
FD_SET(fd, set);
......@@ -524,7 +528,7 @@ int udp_master(dthread_t *thread)
rcu_read_lock();
forget_ifaces(ref, &fds, maxfd);
ref = handler->server->ifaces;
track_ifaces(ref, &fds, &maxfd, &minfd);
track_ifaces(ref, &fds, &maxfd, &minfd, udp.thread_id);
rcu_read_unlock();
}
......
......@@ -125,6 +125,13 @@ int net_bound_socket(int type, const struct sockaddr_storage *ss,
int flag = 1;
(void) setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));
#ifdef ENABLE_REUSEPORT
/* Reuse ports for UDP server sockets in order to create one socket for each thread. */
if (flags & NET_REUSEPORT) {
(void) setsockopt(socket, SOL_SOCKET, SO_REUSEPORT, &flag, sizeof(flag));
}
#endif
/* Unlink UNIX socket if exists. */
if (ss->ss_family == AF_UNIX) {
unlink(addr_str);
......
......@@ -35,7 +35,8 @@
* \brief Network interface flags.
*/
enum net_flags {
NET_BIND_NONLOCAL = (1 << 0)
NET_BIND_NONLOCAL = (1 << 0),
NET_REUSEPORT = (1 << 1)
};
/*!
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment