Commit 8cecbf0d authored by Vladimír Čunát's avatar Vladimír Čunát

simplify approach to bind()

The complication is that we need to work with addresses and
just file-descriptors passed from some parent process.
The former approach lead to logical duplication of some steps;
now we add a step converting addresses to file-descriptors.
Thanks to that we always do bind() without touching libuv,
so the problem with forking disappears :-)
parent 19c43771
......@@ -104,40 +104,48 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
mp_flush(worker->pkt_pool.ctx);
}
static int udp_bind_finalize(uv_handle_t *handle)
int io_bind(const struct sockaddr *addr, int type)
{
check_bufsize(handle);
/* Handle is already created, just create context. */
struct session *s = session_new(handle, false);
assert(s);
session_flags(s)->outgoing = false;
return io_start_read(handle);
}
const int fd = socket(addr->sa_family, type, 0);
if (fd < 0) return kr_error(errno);
int yes = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
return kr_error(errno);
#ifdef SO_REUSEPORT
if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
return kr_error(errno);
#endif
#ifdef IPV6_V6ONLY
if (addr->sa_family == AF_INET6
&& setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof(yes)))
return kr_error(errno);
#endif
int udp_bind(uv_udp_t *handle, const struct sockaddr *addr)
{
unsigned flags = UV_UDP_REUSEADDR;
if (addr->sa_family == AF_INET6) {
flags |= UV_UDP_IPV6ONLY;
}
int ret = uv_udp_bind(handle, addr, flags);
if (ret != 0) {
return ret;
}
return udp_bind_finalize((uv_handle_t *)handle);
if (bind(fd, addr, kr_sockaddr_len(addr)))
return kr_error(errno);
return fd;
}
int udp_bindfd(uv_udp_t *handle, int fd)
int io_listen_udp(uv_loop_t *loop, uv_udp_t *handle, int fd)
{
if (!handle) {
return kr_error(EINVAL);
}
int ret = uv_udp_init(loop, handle);
if (ret) return ret;
int ret = uv_udp_open(handle, (uv_os_sock_t) fd);
if (ret != 0) {
return ret;
}
return udp_bind_finalize((uv_handle_t *)handle);
ret = uv_udp_open(handle, fd);
if (ret) return ret;
uv_handle_t *h = (uv_handle_t *)handle;
check_bufsize(h);
/* Handle is already created, just create context. */
struct session *s = session_new(h, false);
assert(s);
session_flags(s)->outgoing = false;
return io_start_read(h);
}
void tcp_timeout_trigger(uv_timer_t *timer)
......@@ -348,47 +356,24 @@ static void tls_accept(uv_stream_t *master, int status)
_tcp_accept(master, status, true);
}
static int set_tcp_option(uv_handle_t *handle, int option, int val)
int io_listen_tcp(uv_loop_t *loop, uv_tcp_t *handle, int fd, int tcp_backlog, bool has_tls)
{
uv_os_fd_t fd = 0;
if (uv_fileno(handle, &fd) == 0) {
return setsockopt(fd, IPPROTO_TCP, option, &val, sizeof(val));
const uv_connection_cb connection = has_tls ? tls_accept : tcp_accept;
if (!handle) {
return kr_error(EINVAL);
}
return 0; /* N/A */
}
int ret = uv_tcp_init(loop, handle);
if (ret) return ret;
static int tcp_bind_finalize(uv_handle_t *handle)
{
/* TCP_FASTOPEN enables 1 RTT connection resumptions. */
#ifdef TCP_FASTOPEN
# ifdef __linux__
(void) set_tcp_option(handle, TCP_FASTOPEN, 16); /* Accepts queue length hint */
# else
(void) set_tcp_option(handle, TCP_FASTOPEN, 1); /* Accepts on/off */
# endif
#endif
handle->data = NULL;
return 0;
}
static int _tcp_bind(uv_tcp_t *handle, const struct sockaddr *addr,
uv_connection_cb connection, int tcp_backlog)
{
unsigned flags = 0;
if (addr->sa_family == AF_INET6) {
flags |= UV_TCP_IPV6ONLY;
}
int ret = uv_tcp_bind(handle, addr, flags);
if (ret != 0) {
return ret;
}
ret = uv_tcp_open(handle, (uv_os_sock_t) fd);
if (ret) return ret;
int val; (void)val;
/* TCP_DEFER_ACCEPT delays accepting connections until there is readable data. */
#ifdef TCP_DEFER_ACCEPT
if (set_tcp_option((uv_handle_t *)handle, TCP_DEFER_ACCEPT, KR_CONN_RTT_MAX/1000) != 0) {
kr_log_info("[ io ] tcp_bind (defer_accept): %s\n", strerror(errno));
val = KR_CONN_RTT_MAX/1000;
if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, sizeof(val))) {
kr_log_error("[ io ] tcp_bind (defer_accept): %s\n", strerror(errno));
}
#endif
......@@ -397,45 +382,20 @@ static int _tcp_bind(uv_tcp_t *handle, const struct sockaddr *addr,
return ret;
}
return tcp_bind_finalize((uv_handle_t *)handle);
}
int tcp_bind(uv_tcp_t *handle, const struct sockaddr *addr, int tcp_backlog)
{
return _tcp_bind(handle, addr, tcp_accept, tcp_backlog);
}
int tcp_bind_tls(uv_tcp_t *handle, const struct sockaddr *addr, int tcp_backlog)
{
return _tcp_bind(handle, addr, tls_accept, tcp_backlog);
}
static int _tcp_bindfd(uv_tcp_t *handle, int fd, uv_connection_cb connection, int tcp_backlog)
{
if (!handle) {
return kr_error(EINVAL);
}
int ret = uv_tcp_open(handle, (uv_os_sock_t) fd);
if (ret != 0) {
return ret;
}
ret = uv_listen((uv_stream_t *)handle, tcp_backlog, connection);
if (ret != 0) {
return ret;
/* TCP_FASTOPEN enables 1 RTT connection resumptions. */
#ifdef TCP_FASTOPEN
#ifdef __linux__
val = 16; /* Accepts queue length hint */
#else
val = 1; /* Accepts on/off */
#endif
if (setsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, &val, sizeof(val))) {
kr_log_error("[ io ] tcp_bind (fastopen): %s\n", strerror(errno));
}
return tcp_bind_finalize((uv_handle_t *)handle);
}
int tcp_bindfd(uv_tcp_t *handle, int fd, int tcp_backlog)
{
return _tcp_bindfd(handle, fd, tcp_accept, tcp_backlog);
}
#endif
int tcp_bindfd_tls(uv_tcp_t *handle, int fd, int tcp_backlog)
{
return _tcp_bindfd(handle, fd, tls_accept, tcp_backlog);
handle->data = NULL;
return 0;
}
int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family, bool has_tls)
......
......@@ -25,12 +25,13 @@
struct tls_ctx_t;
struct tls_client_ctx_t;
int udp_bind(uv_udp_t *handle, const struct sockaddr *addr);
int udp_bindfd(uv_udp_t *handle, int fd);
int tcp_bind(uv_tcp_t *handle, const struct sockaddr *addr, int tcp_backlog);
int tcp_bind_tls(uv_tcp_t *handle, const struct sockaddr *addr, int tcp_backlog);
int tcp_bindfd(uv_tcp_t *handle, int fd, int tcp_backlog);
int tcp_bindfd_tls(uv_tcp_t *handle, int fd, int tcp_backlog);
/** Bind address into a file-descriptor (only, no libuv). type is e.g. SOCK_DGRAM */
int io_bind(const struct sockaddr *addr, int type);
/** Initialize a UDP handle and start listening. */
int io_listen_udp(uv_loop_t *loop, uv_udp_t *handle, int fd);
/** Initialize a TCP handle and start listening. */
int io_listen_tcp(uv_loop_t *loop, uv_tcp_t *handle, int fd, int tcp_backlog, bool has_tls);
void tcp_timeout_trigger(uv_timer_t *timer);
/** Initialize the handle, incl. ->data = struct session * instance.
......
......@@ -38,17 +38,13 @@
#include "lib/defines.h"
#include "lib/resolve.h"
#include "lib/dnssec.h"
#include "daemon/io.h"
#include "daemon/network.h"
#include "daemon/worker.h"
#include "daemon/engine.h"
#include "daemon/tls.h"
#include "lib/dnssec/ta.h"
/* We can fork early on Linux 3.9+ and do SO_REUSEPORT for better performance. */
#if defined(UV_VERSION_HEX) && defined(SO_REUSEPORT) && defined(__linux__)
#define CAN_FORK_EARLY 1
#endif
/* @internal Array of ip address shorthand. */
typedef array_t(char*) addr_array_t;
......@@ -564,52 +560,68 @@ static int parse_args(int argc, char **argv, struct args *args)
return -1;
}
static int bind_fds(struct network *net, fd_array_t *fd_set, bool tls) {
int ret = 0;
for (size_t i = 0; i < fd_set->len; ++i) {
ret = network_listen_fd(net, fd_set->at[i], tls);
if (ret != 0) {
kr_log_error("[system] %slisten on fd=%d %s\n",
tls ? "TLS " : "", fd_set->at[i], kr_strerror(ret));
break;
}
}
return ret;
}
static int bind_sockets(struct network *net, addr_array_t *addr_set, bool tls) {
endpoint_flags_t flags = { .tls = tls };
for (size_t i = 0; i < addr_set->len; ++i) {
/** Just convert addresses to file-descriptors.
* @return zero or exit code for main()
*/
static int bind_sockets(addr_array_t *addrs, bool tls, fd_array_t *fds)
{
for (size_t i = 0; i < addrs->len; ++i) {
uint16_t port = tls ? KR_DNS_TLS_PORT : KR_DNS_PORT;
char addr_str[INET6_ADDRSTRLEN + 1];
int ret = kr_straddr_split(addr_set->at[i], addr_str, &port);
int ret = kr_straddr_split(addrs->at[i], addr_str, &port);
struct sockaddr *sa = NULL;
if (ret == 0) {
sa = kr_straddr_socket(addr_str, port, NULL);
if (!sa) ret = kr_error(EINVAL); /* could be ENOMEM but unlikely */
}
if (ret == 0 && !tls) {
flags.sock_type = SOCK_DGRAM;
ret = network_listen(net, addr_str, port, flags);
const int fd = io_bind(sa, SOCK_DGRAM);
if (fd < 0)
ret = fd;
else if (array_push(*fds, fd) < 0)
ret = kr_error(ENOMEM);
}
if (ret == 0) { /* common for TCP and TLS */
flags.sock_type = SOCK_STREAM;
ret = network_listen(net, addr_str, port, flags);
const int fd = io_bind(sa, SOCK_STREAM);
if (fd < 0)
ret = fd;
else if (array_push(*fds, fd) < 0)
ret = kr_error(ENOMEM);
}
free(sa);
if (ret != 0) {
kr_log_error("[system] bind to '%s' %s%s\n",
addr_set->at[i], tls ? "(TLS) " : "", kr_strerror(ret));
return ret;
addrs->at[i], tls ? "(TLS) " : "", kr_strerror(ret));
return EXIT_FAILURE;
}
}
return kr_ok();
}
static int bind_fds(struct network *net, fd_array_t *fd_set, bool tls) {
int ret = 0;
for (size_t i = 0; i < fd_set->len; ++i) {
ret = network_listen_fd(net, fd_set->at[i], tls);
if (ret != 0) {
kr_log_error("[system] %slisten on fd=%d %s\n",
tls ? "TLS " : "", fd_set->at[i], kr_strerror(ret));
break;
}
}
return ret;
}
int main(int argc, char **argv)
{
int ret = 0;
struct args args;
args_init(&args);
if ((ret = parse_args(argc, argv, &args)) >= 0) {
return ret;
}
int ret = parse_args(argc, argv, &args);
if (ret >= 0) goto cleanup_args;
ret = bind_sockets(&args.addr_set, false, &args.fd_set);
if (ret) goto cleanup_args;
ret = bind_sockets(&args.tls_set, true, &args.tls_fd_set);
if (ret) goto cleanup_args;
#ifdef HAS_SYSTEMD
/* Accept passed sockets from systemd supervisor. */
......@@ -658,17 +670,6 @@ int main(int argc, char **argv)
args.config = "config";
}
#ifndef CAN_FORK_EARLY
/* Forking is currently broken with libuv. We need libuv to bind to
* sockets etc. before forking, but at the same time can't touch it before
* forking otherwise it crashes, so it's a chicken and egg problem.
* Disabling until https://github.com/libuv/libuv/pull/846 is done. */
if (args.forks > 1 && args.fd_set.len == 0 && args.tls_fd_set.len == 0) {
kr_log_error("[system] forking >1 workers supported only on Linux 3.9+ or with supervisor\n");
return EXIT_FAILURE;
}
#endif
/* Connect forks with local socket */
fd_array_t ipc_set;
array_init(ipc_set);
......@@ -730,9 +731,7 @@ int main(int argc, char **argv)
/* Bind to passed fds and sockets*/
if (bind_fds(&engine.net, &args.fd_set, false) != 0 ||
bind_fds(&engine.net, &args.tls_fd_set, true) != 0 ||
bind_sockets(&engine.net, &args.addr_set, false) != 0 ||
bind_sockets(&engine.net, &args.tls_set, true) != 0
bind_fds(&engine.net, &args.tls_fd_set, true) != 0
) {
ret = EXIT_FAILURE;
goto cleanup;
......@@ -771,8 +770,11 @@ int main(int argc, char **argv)
uv_loop_close(loop);
}
mp_delete(pool.ctx);
cleanup_args:
array_clear(args.addr_set);
array_clear(args.tls_set);
array_clear(args.fd_set);
array_clear(args.tls_fd_set);
kr_crypto_cleanup();
return ret;
}
......@@ -21,31 +21,6 @@
#include "daemon/io.h"
#include "daemon/tls.h"
/* libuv 1.7.0+ is able to support SO_REUSEPORT for loadbalancing */
#if defined(UV_VERSION_HEX)
#if (__linux__ && SO_REUSEPORT)
#define handle_init(type, loop, handle, family) do { \
uv_ ## type ## _init_ex((loop), (uv_ ## type ## _t *)(handle), (family)); \
uv_os_fd_t hi_fd = 0; \
if (uv_fileno((handle), &hi_fd) == 0) { \
int hi_on = 1; \
int hi_ret = setsockopt(hi_fd, SOL_SOCKET, SO_REUSEPORT, &hi_on, sizeof(hi_on)); \
if (hi_ret) { \
return kr_error(errno); \
} \
} \
} while (0)
/* libuv 1.7.0+ is able to assign fd immediately */
#else
#define handle_init(type, loop, handle, family) do { \
uv_ ## type ## _init_ex((loop), (uv_ ## type ## _t *)(handle), (family)); \
} while (0)
#endif
#else
#define handle_init(type, loop, handle, family) \
uv_ ## type ## _init((loop), (uv_ ## type ## _t *)(handle))
#endif
void network_init(struct network *net, uv_loop_t *loop, int tcp_backlog)
{
if (net != NULL) {
......@@ -143,52 +118,31 @@ static int open_endpoint(struct network *net, struct endpoint *ep,
return kr_error(EEXIST);
}
if (sa) {
fd = io_bind(sa, ep->flags.sock_type);
if (fd < 0) return fd;
}
if (ep->flags.sock_type == SOCK_DGRAM) {
if (ep->flags.tls) {
assert(!EINVAL);
return kr_error(EINVAL);
}
uv_udp_t *ep_handle = calloc(1, sizeof(uv_udp_t));
uv_udp_t *ep_handle = malloc(sizeof(uv_udp_t));
ep->handle = (uv_handle_t *)ep_handle;
if (!ep->handle) {
return kr_error(ENOMEM);
}
if (sa) {
handle_init(udp, net->loop, ep->handle, sa->sa_family);
/*^^ can return! */
return udp_bind(ep_handle, sa);
} else {
int ret = uv_udp_init(net->loop, ep_handle);
if (ret == 0) {
ret = udp_bindfd(ep_handle, fd);
}
return ret;
}
return io_listen_udp(net->loop, ep_handle, fd);
} /* else */
if (ep->flags.sock_type == SOCK_STREAM) {
uv_tcp_t *ep_handle = calloc(1, sizeof(uv_tcp_t));
uv_tcp_t *ep_handle = malloc(sizeof(uv_tcp_t));
ep->handle = (uv_handle_t *)ep_handle;
if (!ep->handle) {
return kr_error(ENOMEM);
}
if (sa) {
handle_init(tcp, net->loop, ep->handle, sa->sa_family); /* can return! */
} else {
int ret = uv_tcp_init(net->loop, ep_handle);
if (ret) {
return ret;
}
}
if (ep->flags.tls) {
return sa
? tcp_bind_tls (ep_handle, sa, net->tcp_backlog)
: tcp_bindfd_tls(ep_handle, fd, net->tcp_backlog);
} else {
return sa
? tcp_bind (ep_handle, sa, net->tcp_backlog)
: tcp_bindfd(ep_handle, fd, net->tcp_backlog);
}
return io_listen_tcp(net->loop, ep_handle, fd, net->tcp_backlog, ep->flags.tls);
} /* else */
assert(!EINVAL);
......
......@@ -477,6 +477,7 @@ struct sockaddr * kr_straddr_socket(const char *addr, int port, knot_mm_t *pool)
}
}
default:
assert(!EINVAL);
return NULL;
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment