Commit a550a150 authored by Vladimír Čunát's avatar Vladimír Čunát

worker: convert to a proper singleton

On many places we've been assuming that there's only a single worker,
but we still often didn't utilize the property well.  To get the pointer
we used various ways, all even untyped:
 - __worker global variable in lua
 - uv_default_loop()->data
 - kr_request::daemon_context

Now we instead simply define a global typed pointer the_worker.

Nitpick: also worker_{init,deinit}() are reordered to correspond
to the order of the fields, etc.
parent 750c76ee
......@@ -390,7 +390,7 @@ static int cache_zone_import(lua_State *L)
int ret = -1;
char msg[128];
struct worker_ctx *worker = wrk_luaget(L);
struct worker_ctx *worker = the_worker;
if (!worker) {
strncpy(msg, "internal error, empty worker pointer", sizeof(msg));
goto finish;
......
......@@ -84,13 +84,6 @@ static inline void lua_error_maybe(lua_State *L, int err)
if (err) lua_error_p(L, "%s", kr_strerror(err));
}
static inline struct worker_ctx *wrk_luaget(lua_State *L) {
lua_getglobal(L, "__worker");
struct worker_ctx *worker = lua_touserdata(L, -1);
lua_pop(L, 1);
return worker;
}
static inline int execute_callback(lua_State *L, int argc)
{
int ret = engine_pcall(L, argc);
......
......@@ -319,7 +319,7 @@ static int net_bufsize(lua_State *L)
/** Set TCP pipelining size. */
static int net_pipeline(lua_State *L)
{
struct worker_ctx *worker = wrk_luaget(L);
struct worker_ctx *worker = the_worker;
if (!worker) {
return 0;
}
......@@ -824,12 +824,11 @@ static int net_tls_sticket_secret_file(lua_State *L)
static int net_outgoing(lua_State *L, int family)
{
struct worker_ctx *worker = wrk_luaget(L);
union inaddr *addr;
if (family == AF_INET)
addr = (union inaddr*)&worker->out_addr4;
addr = (union inaddr*)&the_worker->out_addr4;
else
addr = (union inaddr*)&worker->out_addr6;
addr = (union inaddr*)&the_worker->out_addr6;
if (lua_gettop(L) == 0) { /* Return the current value. */
if (addr->ip.sa_family == AF_UNSPEC) {
......
......@@ -21,8 +21,7 @@
/** resolve_pkt(pkt, options, init_cb) */
static int wrk_resolve_pkt(lua_State *L)
{
struct worker_ctx *worker = wrk_luaget(L);
if (!worker) {
if (!the_worker) {
return 0;
}
......@@ -36,7 +35,7 @@ static int wrk_resolve_pkt(lua_State *L)
lua_error_p(L, "invalid options");
/* Create task and start with a first question */
struct qr_task *task = worker_resolve_start(worker, pkt, *options);
struct qr_task *task = worker_resolve_start(the_worker, pkt, *options);
if (!task) {
lua_error_p(L, "couldn't create a resolution request");
}
......@@ -57,7 +56,7 @@ static int wrk_resolve_pkt(lua_State *L)
/** resolve(qname, qtype, qclass, options, init_cb) */
static int wrk_resolve(lua_State *L)
{
struct worker_ctx *worker = wrk_luaget(L);
struct worker_ctx *worker = the_worker;
if (!worker) {
return 0;
}
......@@ -122,7 +121,7 @@ static inline double getseconds(uv_timeval_t *tv)
/** Return worker statistics. */
static int wrk_stats(lua_State *L)
{
struct worker_ctx *worker = wrk_luaget(L);
struct worker_ctx *worker = the_worker;
if (!worker) {
return 0;
}
......
......@@ -281,7 +281,7 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls)
return;
}
struct worker_ctx *worker = (struct worker_ctx *)master->loop->data;
struct worker_ctx *worker = the_worker;
uv_tcp_t *client = malloc(sizeof(uv_tcp_t));
if (!client) {
return;
......@@ -323,8 +323,7 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls)
return;
}
const struct engine *engine = worker->engine;
const struct network *net = &engine->net;
const struct network *net = &worker->engine->net;
uint64_t idle_in_timeout = net->tcp.in_idle_timeout;
uint64_t timeout = KR_CONN_RTT_MAX / 2;
......
......@@ -198,7 +198,6 @@ struct kr_request {
int vars_ref;
knot_mm_t pool;
unsigned int uid;
void *daemon_context;
};
enum kr_rank {KR_RANK_INITIAL, KR_RANK_OMIT, KR_RANK_TRY, KR_RANK_INDET = 4, KR_RANK_BOGUS, KR_RANK_MISMATCH, KR_RANK_MISSING, KR_RANK_INSECURE, KR_RANK_AUTH = 16, KR_RANK_SECURE = 32};
struct kr_cdb_stats {
......
......@@ -122,8 +122,7 @@ static void tty_process_input(uv_stream_t *stream, ssize_t nread, const uv_buf_t
goto finish;
}
struct engine *engine = ((struct worker_ctx *)stream->loop->data)->engine;
lua_State *L = engine->L;
lua_State *L = the_worker->engine->L;
int ret = engine_cmd(L, cmd, false);
const char *message = "";
if (lua_gettop(L) > 0) {
......@@ -764,17 +763,14 @@ int main(int argc, char **argv)
kr_log_error("[system] failed to initialize engine: %s\n", kr_strerror(ret));
return EXIT_FAILURE;
}
/* Create worker */
struct worker_ctx *worker = worker_create(&engine, &pool, fork_id, args.forks);
if (!worker) {
kr_log_error("[system] not enough memory\n");
/* Initialize the worker */
ret = worker_init(&engine, fork_id, args.forks);
if (ret != 0) {
kr_log_error("[system] failed to initialize worker: %s\n", kr_strerror(ret));
return EXIT_FAILURE;
}
uv_loop_t *loop = uv_default_loop();
worker->loop = loop;
loop->data = worker;
/* Catch some signals. */
uv_signal_t sigint, sigterm;
if (true) ret = uv_signal_init(loop, &sigint);
......@@ -840,7 +836,7 @@ int main(int argc, char **argv)
cleanup:/* Cleanup. */
engine_deinit(&engine);
worker_reclaim(worker);
worker_deinit();
if (loop != NULL) {
uv_loop_close(loop);
}
......
......@@ -51,8 +51,7 @@ static int endpoint_open_lua_cb(struct network *net, struct endpoint *ep,
return kr_error(EINVAL);
}
/* First find callback in the endpoint registry. */
struct worker_ctx *worker = net->loop->data; // LATER: the_worker
lua_State *L = worker->engine->L;
lua_State *L = the_worker->engine->L;
void **pp = trie_get_try(net->endpoint_kinds, ep->flags.kind,
strlen(ep->flags.kind));
if (!pp && net->missing_kind_is_error) {
......@@ -115,8 +114,7 @@ int network_engage_endpoints(struct network *net)
/** Notify the registered function about endpoint about to be closed. */
static void endpoint_close_lua_cb(struct network *net, struct endpoint *ep)
{
struct worker_ctx *worker = net->loop->data; // LATER: the_worker
lua_State *L = worker->engine->L;
lua_State *L = the_worker->engine->L;
void **pp = trie_get_try(net->endpoint_kinds, ep->flags.kind,
strlen(ep->flags.kind));
if (!pp && net->missing_kind_is_error) {
......@@ -215,8 +213,7 @@ void network_deinit(struct network *net)
{
if (net != NULL) {
network_close_force(net);
struct worker_ctx *worker = net->loop->data; // LATER: the_worker
trie_apply(net->endpoint_kinds, kind_unregister, worker->engine->L);
trie_apply(net->endpoint_kinds, kind_unregister, the_worker->engine->L);
trie_free(net->endpoint_kinds);
tls_credentials_free(net->tls_credentials);
......
......@@ -122,11 +122,8 @@ static struct session* worker_find_tcp_waiting(struct worker_ctx *worker,
const struct sockaddr *addr);
static void on_tcp_connect_timeout(uv_timer_t *timer);
/** @internal Get singleton worker. */
static inline struct worker_ctx *get_worker(void)
{
return uv_default_loop()->data;
}
struct worker_ctx the_worker_value; /**< Static allocation is suitable for the singleton. */
struct worker_ctx *the_worker = NULL;
/*! @internal Create a UDP/TCP handle for an outgoing AF_INET* connection.
* socktype is SOCK_* */
......@@ -294,7 +291,6 @@ static struct request_ctx *request_create(struct worker_ctx *worker,
req->pool = pool;
req->vars_ref = LUA_NOREF;
req->uid = uid;
req->daemon_context = worker;
/* Remember query source addr */
if (!addr || (addr->sa_family != AF_INET && addr->sa_family != AF_INET6)) {
......@@ -829,7 +825,8 @@ static int send_waiting(struct session *session)
static void on_connect(uv_connect_t *req, int status)
{
struct worker_ctx *worker = get_worker();
struct worker_ctx *worker = the_worker;
assert(worker);
uv_stream_t *handle = req->handle;
struct session *session = handle->data;
struct sockaddr *peer = session_get_peer(session);
......@@ -954,7 +951,8 @@ static void on_tcp_connect_timeout(uv_timer_t *timer)
struct session *session = timer->data;
uv_timer_stop(timer);
struct worker_ctx *worker = get_worker();
struct worker_ctx *worker = the_worker;
assert(worker);
assert (session_tasklist_is_empty(session));
......@@ -1939,21 +1937,22 @@ bool worker_task_finished(struct qr_task *task)
{
return task->finished;
}
/** Reserve worker buffers */
/** Reserve worker buffers. We assume worker's been zeroed. */
static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
{
worker->tcp_connected = map_make(NULL);
worker->tcp_waiting = map_make(NULL);
worker->subreq_out = trie_create(NULL);
array_init(worker->pool_mp);
if (array_reserve(worker->pool_mp, ring_maxlen)) {
return kr_error(ENOMEM);
}
memset(&worker->pkt_pool, 0, sizeof(worker->pkt_pool));
worker->pkt_pool.ctx = mp_new (4 * sizeof(knot_pkt_t));
worker->pkt_pool.alloc = (knot_mm_alloc_t) mp_alloc;
worker->subreq_out = trie_create(NULL);
worker->tcp_connected = map_make(NULL);
worker->tcp_waiting = map_make(NULL);
worker->tcp_pipeline_max = MAX_PIPELINED;
memset(&worker->stats, 0, sizeof(worker->stats));
return kr_ok();
}
......@@ -1967,43 +1966,59 @@ static inline void reclaim_mp_freelist(mp_freelist_t *list)
array_clear(*list);
}
void worker_reclaim(struct worker_ctx *worker)
void worker_deinit(void)
{
reclaim_mp_freelist(&worker->pool_mp);
mp_delete(worker->pkt_pool.ctx);
worker->pkt_pool.ctx = NULL;
trie_free(worker->subreq_out);
worker->subreq_out = NULL;
map_clear(&worker->tcp_connected);
map_clear(&worker->tcp_waiting);
struct worker_ctx *worker = the_worker;
assert(worker);
if (worker->z_import != NULL) {
zi_free(worker->z_import);
worker->z_import = NULL;
}
map_clear(&worker->tcp_connected);
map_clear(&worker->tcp_waiting);
trie_free(worker->subreq_out);
worker->subreq_out = NULL;
reclaim_mp_freelist(&worker->pool_mp);
mp_delete(worker->pkt_pool.ctx);
worker->pkt_pool.ctx = NULL;
the_worker = NULL;
}
struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool,
int worker_id, int worker_count)
int worker_init(struct engine *engine, int worker_id, int worker_count)
{
assert(engine && engine->L);
assert(the_worker == NULL);
kr_bindings_register(engine->L);
/* Create main worker. */
struct worker_ctx *worker = mm_alloc(pool, sizeof(*worker));
if (!worker) {
return NULL;
}
struct worker_ctx *worker = &the_worker_value;
memset(worker, 0, sizeof(*worker));
worker->engine = engine;
uv_loop_t *loop = uv_default_loop();
worker->loop = loop;
worker->id = worker_id;
worker->count = worker_count;
worker->engine = engine;
worker->next_request_uid = UINT16_MAX + 1;
worker_reserve(worker, MP_FREELIST_SIZE);
/* Register table for worker per-request variables */
lua_newtable(engine->L);
lua_setfield(engine->L, -2, "vars");
lua_getfield(engine->L, -1, "vars");
worker->vars_table_ref = luaL_ref(engine->L, LUA_REGISTRYINDEX);
lua_pop(engine->L, 1);
worker->tcp_pipeline_max = MAX_PIPELINED;
worker->out_addr4.sin_family = AF_UNSPEC;
worker->out_addr6.sin6_family = AF_UNSPEC;
/* Register worker in Lua thread */
lua_pushlightuserdata(engine->L, worker);
lua_setglobal(engine->L, "__worker");
int ret = worker_reserve(worker, MP_FREELIST_SIZE);
if (ret) return ret;
worker->next_request_uid = UINT16_MAX + 1;
/* Set some worker.* fields in Lua */
lua_getglobal(engine->L, "worker");
lua_pushnumber(engine->L, worker_id);
lua_setfield(engine->L, -2, "id");
......@@ -2011,13 +2026,11 @@ struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool,
lua_setfield(engine->L, -2, "pid");
lua_pushnumber(engine->L, worker_count);
lua_setfield(engine->L, -2, "count");
/* Register table for worker per-request variables */
lua_newtable(engine->L);
lua_setfield(engine->L, -2, "vars");
lua_getfield(engine->L, -1, "vars");
worker->vars_table_ref = luaL_ref(engine->L, LUA_REGISTRYINDEX);
lua_pop(engine->L, 1);
return worker;
the_worker = worker;
loop->data = the_worker;
/* ^^^^ This shouldn't be used anymore, but it's hard to be 100% sure. */
return kr_ok();
}
#undef VERBOSE_MSG
......@@ -30,9 +30,15 @@ struct session;
/** Zone import context (opaque). */
struct zone_import_ctx;
/** Create and initialize the worker. */
struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool,
int worker_id, int worker_count);
/** Pointer to the singleton worker. NULL if not initialized. */
KR_EXPORT extern struct worker_ctx *the_worker;
/** Create and initialize the worker.
* \return error code (ENOMEM) */
int worker_init(struct engine *engine, int worker_id, int worker_count);
/** Destroy the worker (free memory). */
void worker_deinit(void);
/**
* Process an incoming packet (query from a client or answer from upstream).
......@@ -67,9 +73,6 @@ int worker_resolve_exec(struct qr_task *task, knot_pkt_t *query);
/** @return struct kr_request associated with opaque task */
struct kr_request *worker_task_request(struct qr_task *task);
/** Collect worker mempools */
void worker_reclaim(struct worker_ctx *worker);
int worker_task_step(struct qr_task *task, const struct sockaddr *packet_source,
knot_pkt_t *packet);
......
......@@ -233,7 +233,6 @@ struct kr_request {
int vars_ref; /**< Reference to per-request variable table. LUA_NOREF if not set. */
knot_mm_t pool;
unsigned int uid; /** for logging purposes only */
void *daemon_context; /** pointer to worker from daemon. Can be used in modules. */
};
/** Initializer for an array of *_selected. */
......
......@@ -41,9 +41,7 @@ static int edns_keepalive_finalize(kr_layer_t *ctx)
if (!ka_want) {
return ctx->state;
}
const struct worker_ctx *worker = (const struct worker_ctx *)req->daemon_context;
assert(worker);
const struct network *net = &worker->engine->net;
const struct network *net = &the_worker->engine->net;
uint64_t timeout = net->tcp.in_idle_timeout / 100;
if (timeout > UINT16_MAX) {
timeout = UINT16_MAX;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment