Commit 3060ee6d authored by Vladimír Čunát's avatar Vladimír Čunát

Merge !45: various nitpicks

Assortment of unimportant changes that I created during trying to understand how the whole resolution process works. Best read each commit separately.
parents 1a568228 4179a5ca
......@@ -86,7 +86,7 @@ Scaling out
The server can clone itself into multiple processes upon startup, this enables you to scale it on multiple cores.
Multiple processes can serve different addresses, but still share the same working directory and cache.
You can add start and stop processes on runtime based on the load.
You can add, start and stop processes during runtime based on the load.
.. code-block:: bash
......@@ -102,7 +102,7 @@ You can add start and stop processes on runtime based on the load.
bash(3533)─┬─kresd(19212)─┬─kresd(19212)
│ ├─kresd(19212)
│ └─kresd(19212)
└─pstree(19460)
└─pstree(19460)
.. _daemon-reuseport:
......
......@@ -259,7 +259,6 @@ static int fork_workers(fd_array_t *ipc_set, int forks)
array_clear(*ipc_set);
array_push(*ipc_set, sv[0]);
close(sv[1]);
kr_crypto_reinit();
return forks;
/* Parent process */
} else {
......@@ -291,39 +290,6 @@ static void help(int argc, char *argv[])
" [rundir] Path to the working directory (default: .)\n");
}
static struct worker_ctx *init_worker(struct engine *engine, knot_mm_t *pool, int worker_id, int worker_count)
{
/* Load bindings */
engine_lualib(engine, "modules", lib_modules);
engine_lualib(engine, "net", lib_net);
engine_lualib(engine, "cache", lib_cache);
engine_lualib(engine, "event", lib_event);
engine_lualib(engine, "worker", lib_worker);
/* Create main worker. */
struct worker_ctx *worker = mm_alloc(pool, sizeof(*worker));
if(!worker) {
return NULL;
}
memset(worker, 0, sizeof(*worker));
worker->id = worker_id;
worker->count = worker_count;
worker->engine = engine;
worker_reserve(worker, MP_FREELIST_SIZE);
/* Register worker in Lua thread */
lua_pushlightuserdata(engine->L, worker);
lua_setglobal(engine->L, "__worker");
lua_getglobal(engine->L, "worker");
lua_pushnumber(engine->L, worker_id);
lua_setfield(engine->L, -2, "id");
lua_pushnumber(engine->L, getpid());
lua_setfield(engine->L, -2, "pid");
lua_pushnumber(engine->L, worker_count);
lua_setfield(engine->L, -2, "count");
lua_pop(engine->L, 1);
return worker;
}
static int run_worker(uv_loop_t *loop, struct engine *engine, fd_array_t *ipc_set, bool leader, int control_fd)
{
/* Control sockets or TTY */
......@@ -436,14 +402,18 @@ int main(int argc, char **argv)
case 'f':
g_interactive = false;
forks = atoi(optarg);
if (forks == 0) {
kr_log_error("[system] error '-f' requires number, not '%s'\n", optarg);
if (forks <= 0) {
kr_log_error("[system] error '-f' requires a positive"
" number, not '%s'\n", optarg);
return EXIT_FAILURE;
}
break;
case 'k':
keyfile_buf = malloc(PATH_MAX);
assert(keyfile_buf);
if (!keyfile_buf) {
kr_log_error("[system] not enough memory\n");
return EXIT_FAILURE;
}
/* Check if the path is absolute */
if (optarg[0] == '/') {
keyfile = strdup(optarg);
......@@ -464,7 +434,8 @@ int main(int argc, char **argv)
}
free(keyfile_buf);
if (!keyfile) {
kr_log_error("[system] keyfile '%s': not writeable\n", optarg);
kr_log_error("[system] keyfile '%s':"
"failed to construct absolute path\n", optarg);
return EXIT_FAILURE;
}
break;
......@@ -541,8 +512,6 @@ int main(int argc, char **argv)
}
#endif
kr_crypto_init();
/* Connect forks with local socket */
fd_array_t ipc_set;
array_init(ipc_set);
......@@ -552,6 +521,8 @@ int main(int argc, char **argv)
return EXIT_FAILURE;
}
kr_crypto_init();
/* Create a server engine. */
knot_mm_t pool = {
.ctx = mp_new (4096),
......@@ -564,7 +535,7 @@ int main(int argc, char **argv)
return EXIT_FAILURE;
}
/* Create worker */
struct worker_ctx *worker = init_worker(&engine, &pool, fork_id, forks);
struct worker_ctx *worker = worker_create(&engine, &pool, fork_id, forks);
if (!worker) {
kr_log_error("[system] not enough memory\n");
return EXIT_FAILURE;
......@@ -639,10 +610,10 @@ int main(int argc, char **argv)
/* Run the event loop */
ret = run_worker(loop, &engine, &ipc_set, fork_id == 0, control_fd);
}
}
if (ret != 0) {
perror("[system] worker failed");
ret = EXIT_FAILURE;
if (ret != 0) {
perror("[system] worker failed");
ret = EXIT_FAILURE;
}
}
/* Cleanup. */
engine_deinit(&engine);
......
......@@ -25,9 +25,12 @@
#include <malloc.h>
#endif
#include <assert.h>
#include <sys/types.h>
#include <unistd.h>
#include "lib/utils.h"
#include "lib/layer.h"
#include "daemon/worker.h"
#include "daemon/bindings.h"
#include "daemon/engine.h"
#include "daemon/io.h"
#include "daemon/tls.h"
......@@ -1022,7 +1025,8 @@ int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, unsigned option
return qr_task_step(task, NULL, query);
}
int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
/** Reserve worker buffers */
static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
{
array_init(worker->pool_mp);
array_init(worker->pool_ioreq);
......@@ -1057,4 +1061,38 @@ void worker_reclaim(struct worker_ctx *worker)
map_clear(&worker->outgoing);
}
struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool,
int worker_id, int worker_count)
{
/* Load bindings */
engine_lualib(engine, "modules", lib_modules);
engine_lualib(engine, "net", lib_net);
engine_lualib(engine, "cache", lib_cache);
engine_lualib(engine, "event", lib_event);
engine_lualib(engine, "worker", lib_worker);
/* Create main worker. */
struct worker_ctx *worker = mm_alloc(pool, sizeof(*worker));
if (!worker) {
return NULL;
}
memset(worker, 0, sizeof(*worker));
worker->id = worker_id;
worker->count = worker_count;
worker->engine = engine;
worker_reserve(worker, MP_FREELIST_SIZE);
/* Register worker in Lua thread */
lua_pushlightuserdata(engine->L, worker);
lua_setglobal(engine->L, "__worker");
lua_getglobal(engine->L, "worker");
lua_pushnumber(engine->L, worker_id);
lua_setfield(engine->L, -2, "id");
lua_pushnumber(engine->L, getpid());
lua_setfield(engine->L, -2, "pid");
lua_pushnumber(engine->L, worker_count);
lua_setfield(engine->L, -2, "count");
lua_pop(engine->L, 1);
return worker;
}
#undef DEBUG_MSG
......@@ -20,15 +20,58 @@
#include "lib/generic/array.h"
#include "lib/generic/map.h"
/** @internal Number of request within timeout window. */
#define MAX_PENDING KR_NSREP_MAXADDR
/** @cond internal Freelist of available mempools. */
typedef array_t(void *) mp_freelist_t;
/** Worker state (opaque). */
struct worker_ctx;
/** Worker callback */
typedef void (*worker_cb_t)(struct worker_ctx *worker, struct kr_request *req, void *baton);
/** Create and initialize the worker. */
struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool,
int worker_id, int worker_count);
/**
* Process incoming packet (query or answer to subrequest).
* @return 0 or an error code
*/
int worker_submit(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query,
const struct sockaddr* addr);
/**
* Process incoming DNS/TCP message fragment(s).
* If the fragment contains only a partial message, it is buffered.
* If the fragment contains a complete query or completes current fragment, execute it.
* @return 0 or an error code
*/
int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
const uint8_t *msg, ssize_t len);
/**
* Query resolution worker.
* End current DNS/TCP session, this disassociates pending tasks from this session
* which may be freely closed afterwards.
*/
int worker_end_tcp(struct worker_ctx *worker, uv_handle_t *handle);
/**
* Schedule query for resolution.
* @return 0 or an error code
*/
int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, unsigned options,
worker_cb_t on_complete, void *baton);
/** Collect worker mempools */
void worker_reclaim(struct worker_ctx *worker);
/** @cond internal */
/** Number of request within timeout window. */
#define MAX_PENDING KR_NSREP_MAXADDR
/** Freelist of available mempools. */
typedef array_t(void *) mp_freelist_t;
/** \details Worker state is meant to persist during the whole life of daemon. */
struct worker_ctx {
struct engine *engine;
uv_loop_t *loop;
......@@ -57,10 +100,7 @@ struct worker_ctx {
knot_mm_t pkt_pool;
};
/* Worker callback */
typedef void (*worker_cb_t)(struct worker_ctx *worker, struct kr_request *req, void *baton);
/** @internal Query resolution task. */
/** Query resolution task. */
struct qr_task
{
struct kr_request req;
......@@ -94,40 +134,6 @@ struct qr_task
bool finished : 1;
bool leading : 1;
};
/* @endcond */
/**
* Process incoming packet (query or answer to subrequest).
* @return 0 or an error code
*/
int worker_submit(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr);
/**
* Process incoming DNS/TCP message fragment(s).
* If the fragment contains only a partial message, it is buffered.
* If the fragment contains a complete query or completes current fragment, execute it.
* @return 0 or an error code
*/
int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle, const uint8_t *msg, ssize_t len);
/**
* End current DNS/TCP session, this disassociates pending tasks from this session
* which may be freely closed afterwards.
*/
int worker_end_tcp(struct worker_ctx *worker, uv_handle_t *handle);
/**
* Schedule query for resolution.
* @return 0 or an error code
*/
int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, unsigned options, worker_cb_t on_complete, void *baton);
/** Reserve worker buffers */
int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen);
/** Collect worker mempools */
void worker_reclaim(struct worker_ctx *worker);
/** @endcond */
struct qr_task;
typedef int (*qr_task_send_cb)(struct qr_task *task, uv_handle_t *handle, int status);
......@@ -111,7 +111,7 @@ This is only passive processing of the incoming answer. If you want to change th
if (can_satisfy(cur)) {
/* This flag makes the resolver move the query
* to the "resolved" list. */
query->flags |= QUERY_RESOLVED;
cur->flags |= QUERY_RESOLVED;
return KNOT_STATE_DONE;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment