Commit d944e311 authored by Marek Vavrusa's avatar Marek Vavrusa

daemon: support running in supervised mode (--fd=X)

daemon can accept existing fds on command line,
thus supporting process managers like circus or
upstart. a tiny supervisor script is attached
parent bb0c707f
......@@ -124,6 +124,22 @@ of running processes, and you can test the process for liveliness by connecting
.. warning:: This is very basic way to orchestrate multi-core deployments and doesn't scale in multi-node clusters. Keep an eye on the prepared ``hive`` module that is going to automate everything from service discovery to deployment and consistent configuration.
Running supervised
==================
Knot Resolver can run under a supervisor to allow for graceful restarts, watchdog process and socket activation. This way the supervisor binds to sockets and lends them to resolver daemon. Thus if the resolver terminates or is killed, the sockets are still active and no queries are dropped.
The watchdog process must notify kresd about active file descriptors, and kresd will automatically determine the socket type and bound address, thus it will appear as any other address. There's a tiny supervisor script for convenience, but you should have a look at [real process managers](http://blog.crocodoc.com/post/48703468992/process-managers-the-good-the-bad-and-the-ugly).
.. code-block:: bash
$ python scripts/supervisor.py ./daemon/kresd 127.0.0.1@53
$ [system] interactive mode
> quit()
> [2016-03-28 16:06:36.795879] process finished, pid = 99342, status = 0, uptime = 0:00:01.720612
[system] interactive mode
>
Configuration
=============
......
......@@ -148,6 +148,7 @@ static void help(int argc, char *argv[])
printf("Usage: %s [parameters] [rundir]\n", argv[0]);
printf("\nParameters:\n"
" -a, --addr=[addr] Server address (default: localhost#53).\n"
" -S, --fd=[fd] Listen on given fd (handed out by supervisor).\n"
" -c, --config=[path] Config file path (relative to [rundir]) (default: config).\n"
" -k, --keyfile=[path] File containing trust anchors (DS or DNSKEY).\n"
" -f, --forks=N Start N forks sharing the configuration.\n"
......@@ -225,7 +226,9 @@ int main(int argc, char **argv)
{
int forks = 1;
array_t(char*) addr_set;
array_t(int) fd_set;
array_init(addr_set);
array_init(fd_set);
char *keyfile = NULL;
const char *config = NULL;
char *keyfile_buf = NULL;
......@@ -234,6 +237,7 @@ int main(int argc, char **argv)
int c = 0, li = 0, ret = 0;
struct option opts[] = {
{"addr", required_argument, 0, 'a'},
{"fd", required_argument, 0, 'S'},
{"config", required_argument, 0, 'c'},
{"keyfile",required_argument, 0, 'k'},
{"forks",required_argument, 0, 'f'},
......@@ -243,12 +247,15 @@ int main(int argc, char **argv)
{"help", no_argument, 0, 'h'},
{0, 0, 0, 0}
};
while ((c = getopt_long(argc, argv, "a:c:f:k:vqVh", opts, &li)) != -1) {
while ((c = getopt_long(argc, argv, "a:S:c:f:k:vqVh", opts, &li)) != -1) {
switch (c)
{
case 'a':
array_push(addr_set, optarg);
break;
case 'S':
array_push(fd_set, atoi(optarg));
break;
case 'c':
config = optarg;
break;
......@@ -372,13 +379,21 @@ int main(int argc, char **argv)
kr_log_error("[system] not enough memory\n");
return EXIT_FAILURE;
}
/* Bind to passed fds and run */
for (size_t i = 0; i < fd_set.len; ++i) {
ret = network_listen_fd(&engine.net, fd_set.at[i]);
if (ret != 0) {
kr_log_error("[system] listen on fd=%d %s\n", fd_set.at[i], kr_strerror(ret));
ret = EXIT_FAILURE;
}
}
/* Bind to sockets and run */
for (size_t i = 0; i < addr_set.len; ++i) {
int port = 53;
const char *addr = set_addr(addr_set.at[i], &port);
ret = network_listen(&engine.net, addr, (uint16_t)port, NET_UDP|NET_TCP);
if (ret != 0) {
kr_log_error("[system] bind to '%s#%d' %s\n", addr, port, knot_strerror(ret));
kr_log_error("[system] bind to '%s#%d' %s\n", addr, port, kr_strerror(ret));
ret = EXIT_FAILURE;
}
}
......
......@@ -15,6 +15,7 @@
*/
#include <unistd.h>
#include <assert.h>
#include "daemon/network.h"
#include "daemon/worker.h"
#include "daemon/io.h"
......@@ -153,6 +154,44 @@ static int open_endpoint(struct network *net, struct endpoint *ep, struct sockad
return kr_ok();
}
/** Open fd as endpoint. */
static int open_endpoint_fd(struct network *net, struct endpoint *ep, int fd, int sock_type)
{
if (sock_type == SOCK_DGRAM) {
if (ep->udp) {
return kr_error(EEXIST);
}
ep->udp = malloc(sizeof(*ep->udp));
if (!ep->udp) {
return kr_error(ENOMEM);
}
uv_udp_init(net->loop, ep->udp);
int ret = uv_udp_open(ep->udp, (uv_os_sock_t) fd);
if (ret != 0) {
close_handle((uv_handle_t *)ep->udp, false);
return ret;
}
ep->flags |= NET_UDP;
}
if (sock_type == SOCK_STREAM) {
if (ep->tcp) {
return kr_error(EEXIST);
}
ep->tcp = malloc(sizeof(*ep->tcp));
if (!ep->tcp) {
return kr_error(ENOMEM);
}
uv_tcp_init(net->loop, ep->tcp);
int ret = uv_tcp_open(ep->tcp, (uv_os_sock_t) fd);
if (ret != 0) {
close_handle((uv_handle_t *)ep->tcp, false);
return ret;
}
ep->flags |= NET_TCP;
}
return kr_ok();
}
/** @internal Fetch endpoint array and offset of the address/port query. */
static endpoint_array_t *network_get(struct network *net, const char *addr, uint16_t port, size_t *index)
{
......@@ -169,6 +208,56 @@ static endpoint_array_t *network_get(struct network *net, const char *addr, uint
return NULL;
}
int network_listen_fd(struct network *net, int fd)
{
/* Extract local address and socket type. */
int sock_type = SOCK_DGRAM;
socklen_t len = sizeof(sock_type);
int ret = getsockopt(fd, SOL_SOCKET, SO_TYPE, &sock_type, &len);
if (ret != 0) {
return kr_error(EBADF);
}
/* Extract local address for this socket. */
struct sockaddr_storage ss;
socklen_t addr_len = sizeof(ss);
ret = getsockname(fd, (struct sockaddr *)&ss, &addr_len);
if (ret != 0) {
return kr_error(EBADF);
}
int port = 0;
char addr_str[INET6_ADDRSTRLEN]; /* http://tools.ietf.org/html/rfc4291 */
if (ss.ss_family == AF_INET) {
uv_ip4_name((const struct sockaddr_in*)&ss, addr_str, sizeof(addr_str));
port = ntohs(((struct sockaddr_in *)&ss)->sin_port);
} else if (ss.ss_family == AF_INET6) {
uv_ip6_name((const struct sockaddr_in6*)&ss, addr_str, sizeof(addr_str));
port = ntohs(((struct sockaddr_in6 *)&ss)->sin6_port);
} else {
uv_ip4_name((const struct sockaddr_in*)&ss, addr_str, sizeof(addr_str));
port = ntohs(((struct sockaddr_in *)&ss)->sin_port);
return kr_error(EAFNOSUPPORT);
}
/* Fetch or create endpoint for this fd */
size_t index = 0;
endpoint_array_t *ep_array = network_get(net, addr_str, port, &index);
if (!ep_array) {
struct endpoint *ep = malloc(sizeof(*ep));
memset(ep, 0, sizeof(*ep));
ep->flags = NET_DOWN;
ep->port = port;
ret = insert_endpoint(net, addr_str, ep);
if (ret != 0) {
return ret;
}
ep_array = network_get(net, addr_str, port, &index);
}
/* Open fd in found/created endpoint. */
struct endpoint *ep = ep_array->at[index];
assert(ep != NULL);
/* Create a libuv struct for this socket. */
return open_endpoint_fd(net, ep, fd, sock_type);
}
int network_listen(struct network *net, const char *addr, uint16_t port, uint32_t flags)
{
if (net == NULL || addr == 0 || port == 0) {
......
......@@ -45,5 +45,6 @@ struct network {
void network_init(struct network *net, uv_loop_t *loop);
void network_deinit(struct network *net);
int network_listen_fd(struct network *net, int fd);
int network_listen(struct network *net, const char *addr, uint16_t port, uint32_t flags);
int network_close(struct network *net, const char *addr, uint16_t port);
#!/usr/bin/env python
#
# This is an example of simple supervisor process owning bound sockets and
# handing them over to supervised process, allowing for graceful restarts.
#
import time, datetime
import socket
import os, sys
# Help
def help():
print('Usage: %s <bin> addr@port ...' % sys.argv[0])
print('Example: python scripts/supervisor.py ./daemon/kresd 127.0.0.1')
sys.exit(1)
if len(sys.argv) < 3:
help()
# Bind to sockets
daemon = sys.argv[1]
sockets = []
for addr in sys.argv[2:]:
try:
if '@' in addr:
addr, port = addr.split('@')
port = int(port)
else:
port = 53
except: help()
# Open TCP socket
tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcp.bind((addr, port))
tcp.listen(5)
sockets.append(tcp)
# Open UDP socket
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
udp.bind((addr, port))
sockets.append(udp)
while True: # Fork forever
pid = os.fork()
if pid == 0:
args = ['kresd'] + ['--fd=%d' % s.fileno() for s in sockets]
os.execv('./daemon/kresd', args)
else: # Wait for fork to die
start = datetime.datetime.now()
_, status = os.waitpid(pid, 0)
end = datetime.datetime.now()
print('[%s] process finished, pid = %d, status = %d, uptime = %s' % \
(start, pid, status, end - start))
time.sleep(0.5)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment