Commit 12076fb7 authored by Jan Včelák's avatar Jan Včelák 🚀

initial experiments for background workers

parent 1666cd1f
......@@ -149,6 +149,10 @@ src/knot/updates/ddns.c
src/knot/updates/ddns.h
src/knot/updates/xfr-in.c
src/knot/updates/xfr-in.h
src/knot/worker/queue.c
src/knot/worker/queue.h
src/knot/worker/task.c
src/knot/worker/task.h
src/knot/zone/estimator.c
src/knot/zone/estimator.h
src/knot/zone/node.c
......
......@@ -232,8 +232,8 @@ libknotd_la_SOURCES = \
knot/server/rrl.h \
knot/server/server.c \
knot/server/server.h \
knot/server/net.c \
knot/server/net.h \
knot/server/net.c \
knot/server/net.h \
knot/server/tcp-handler.c \
knot/server/tcp-handler.h \
knot/server/udp-handler.c \
......@@ -252,20 +252,28 @@ libknotd_la_SOURCES = \
knot/updates/ddns.h \
knot/updates/xfr-in.c \
knot/updates/xfr-in.h \
knot/worker/pool.c \
knot/worker/pool.h \
knot/worker/queue.c \
knot/worker/queue.h \
knot/worker/task.c \
knot/worker/task.h \
knot/zone/estimator.c \
knot/zone/estimator.h \
knot/zone/events.c \
knot/zone/events.h \
knot/zone/node.c \
knot/zone/node.h \
knot/zone/semantic-check.c \
knot/zone/semantic-check.h \
knot/zone/zone-contents.c \
knot/zone/zone-contents.h \
knot/zone/zone-create.c \
knot/zone/zone-create.h \
knot/zone/zone-diff.c \
knot/zone/zone-diff.h \
knot/zone/zone-dump.c \
knot/zone/zone-dump.h \
knot/zone/zone-create.c \
knot/zone/zone-create.h \
knot/zone/zone-tree.c \
knot/zone/zone-tree.h \
knot/zone/zone.c \
......
......@@ -264,9 +264,6 @@ int main(int argc, char **argv)
log_levels_add(LOGT_STDOUT, LOG_ANY, mask);
}
// Initialize pseudorandom number generator
srand(time(NULL));
/* POSIX 1003.1e capabilities. */
setup_capabilities();
......
/* Copyright (C) 2014 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <config.h>
#include <assert.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include "common/errcode.h"
#include "knot/server/dthreads.h"
#include "knot/worker/pool.h"
#include "knot/worker/queue.h"
/*!
* \brief Worker pool state.
*/
struct worker_pool {
dt_unit_t *threads;
pthread_mutex_t lock;
pthread_cond_t wake;
bool suspended;
bool terminating;
worker_queue_t tasks;
};
/*!
* \brief Worker thread.
*
* The thread takes a task from the tasks queue and runs it, while checking
* if the dispatching of new tasks is allowed by the thread pool.
*
* An execution of a running thread cannot be enforced.
*
*/
static int worker_main(dthread_t *thread)
{
assert(thread);
worker_pool_t *pool = thread->data;
pthread_mutex_lock(&pool->lock);
for (;;) {
if (pool->terminating) {
break;
}
if (pool->suspended) {
pthread_cond_wait(&pool->wake, &pool->lock);
continue;
}
task_t *task = worker_queue_dequeue(&pool->tasks);
if (task == NULL) {
pthread_cond_wait(&pool->wake, &pool->lock);
continue;
}
assert(task);
assert(task->run);
pthread_mutex_unlock(&pool->lock);
task->run(task);
pthread_mutex_lock(&pool->lock);
}
pthread_mutex_unlock(&pool->lock);
return KNOT_EOK;
}
/* -- public API ------------------------------------------------------------ */
worker_pool_t *worker_pool_create(unsigned threads)
{
worker_pool_t *pool = malloc(sizeof(worker_pool_t));
if (pool == NULL) {
return NULL;
}
memset(pool, 0, sizeof(worker_pool_t));
pool->threads = dt_create(threads, worker_main, NULL, pool);
if (pool->threads == NULL) {
free(pool);
return NULL;
}
pthread_mutex_init(&pool->lock, NULL);
pthread_cond_init(&pool->wake, NULL);
worker_queue_init(&pool->tasks);
return pool;
}
void worker_pool_destroy(worker_pool_t *pool)
{
if (!pool) {
return;
}
dt_delete(&pool->threads);
pthread_mutex_destroy(&pool->lock);
pthread_cond_destroy(&pool->wake);
worker_queue_deinit(&pool->tasks);
free(pool);
}
void worker_pool_start(worker_pool_t *pool)
{
if (!pool) {
return;
}
dt_start(pool->threads);
}
void worker_pool_stop(worker_pool_t *pool)
{
if (!pool) {
return;
}
pthread_mutex_lock(&pool->lock);
pool->terminating = true;
pthread_cond_broadcast(&pool->wake);
pthread_mutex_unlock(&pool->lock);
dt_stop(pool->threads);
}
void worker_pool_join(worker_pool_t *pool)
{
if (!pool) {
return;
}
dt_join(pool->threads);
}
void worker_pool_suspend(worker_pool_t *pool)
{
if (!pool) {
return;
}
pthread_mutex_lock(&pool->lock);
pool->suspended = true;
pthread_mutex_unlock(&pool->lock);
}
void worker_pool_continue(worker_pool_t *pool)
{
if (!pool) {
return;
}
pthread_mutex_lock(&pool->lock);
pool->suspended = false;
pthread_cond_broadcast(&pool->wake);
pthread_mutex_unlock(&pool->lock);
}
void worker_pool_assign(worker_pool_t *pool, task_t *task)
{
if (!pool || !task) {
return;
}
pthread_mutex_lock(&pool->lock);
worker_queue_enqueue(&pool->tasks, task);
if (!pool->suspended) {
pthread_cond_signal(&pool->wake);
}
pthread_mutex_unlock(&pool->lock);
}
void worker_pool_clear(worker_pool_t *pool)
{
if (!pool) {
return;
}
pthread_mutex_lock(&pool->lock);
worker_queue_deinit(&pool->tasks);
worker_queue_init(&pool->tasks);
pthread_mutex_unlock(&pool->lock);
}
/* Copyright (C) 2014 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "knot/worker/task.h"
struct worker_pool;
typedef struct worker_pool worker_pool_t;
/*!
* \brief Initialize worker pool.
*
* \param threads Number of threads to be created.
*
* \return Thread pool or NULL in case of error.
*/
worker_pool_t *worker_pool_create(unsigned threads);
/*!
* \brief Destroy the worker pool.
*/
void worker_pool_destroy(worker_pool_t *pool);
/*!
* \brief Start all threads in the worker pool.
*/
void worker_pool_start(worker_pool_t *pool);
/*!
* \brief Stop processing of new tasks, start stopping worker threads when possible.
*/
void worker_pool_stop(worker_pool_t *pool);
/*!
* \brief Wait for all threads to terminate.
*/
void worker_pool_join(worker_pool_t *pool);
/*!
* \brief Suspend execution of new tasks, existing task are not terminated.
*/
void worker_pool_suspend(worker_pool_t *pool);
/*!
* \brief Continue execution of new tasks.
*/
void worker_pool_continue(worker_pool_t *pool);
/*!
* \brief Assign a task to be performed by a worker in the pool.
*/
void worker_pool_assign(worker_pool_t *pool, task_t *task);
/*!
* \brief Clear all tasks enqueued in pool processing queue.
*/
void worker_pool_clear(worker_pool_t *pool);
/* Copyright (C) 2014 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <config.h>
#include "common/lists.h"
#include "knot/worker/queue.h"
#include "knot/worker/task.h"
void worker_queue_init(worker_queue_t *queue)
{
if (!queue) {
return;
}
memset(queue, 0, sizeof(worker_queue_t));
init_list(&queue->list);
mm_ctx_init(&queue->mm_ctx);
}
void worker_queue_deinit(worker_queue_t *queue)
{
ptrlist_free(&queue->list, &queue->mm_ctx);
}
void worker_queue_enqueue(worker_queue_t *queue, task_t *task)
{
if (!queue || !task) {
return;
}
ptrlist_add(&queue->list, task, &queue->mm_ctx);
}
task_t *worker_queue_dequeue(worker_queue_t *queue)
{
if (!queue) {
return NULL;
}
task_t *task = NULL;
if (!EMPTY_LIST(queue->list)) {
ptrnode_t *node = HEAD(queue->list);
task = (void *)node->d;
rem_node(&node->n);
queue->mm_ctx.free(&node->n);
}
return task;
}
/* Copyright (C) 2014 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "common/lists.h"
#include "knot/worker/task.h"
/*!
* \brief Worker queue.
*/
typedef struct worker_queue {
mm_ctx_t mm_ctx;
list_t list;
} worker_queue_t;
/*!
* \brief Initialize worker queue.
*/
void worker_queue_init(worker_queue_t *queue);
/*!
* \brief Deinitialize worker queue.
*/
void worker_queue_deinit(worker_queue_t *queue);
/*!
* \brief Insert new item into the queue.
*/
void worker_queue_enqueue(worker_queue_t *queue, task_t *task);
/*!
* \brief Remove item from the queue.
*
* \return The item or NULL if the queue is empty.
*/
task_t *worker_queue_dequeue(worker_queue_t *queue);
/* Copyright (C) 2014 CZ.NIC, z.s.p.o. <knot-dns@task.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <config.h>
#include <stdlib.h>
#include <string.h>
#include "knot/worker/task.h"
task_t *task_create(void *ctx, task_cb run)
{
if (!ctx || !run) {
return NULL;
}
task_t *task = malloc(sizeof(task_t *));
if (!task) {
return NULL;
}
memset(task, 0, sizeof(task_t));
task->ctx = ctx;
task->run = run;
return task;
}
void task_free(task_t *task)
{
if (!task) {
return;
}
free(task);
}
/* Copyright (C) 2014 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <time.h>
struct task;
typedef struct task task_t;
typedef void (*task_cb)(task_t *);
struct task {
void *ctx;
task_cb run;
};
task_t *task_create(void *ctx, task_cb run);
void task_free(task_t *task);
/* Copyright (C) 2014 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <time.h>
#include "common/evsched.h"
#include "knot/server/server.h"
#include "knot/worker/pool.h"
#include "knot/worker/task.h"
#include "knot/zone/events.h"
#include "knot/zone/zone.h"
/* -- zone events handling callbacks --------------------------------------- */
typedef void (*zone_event_cb)(zone_t *zone);
#include <stdio.h>
static void event_reload(zone_t *zone)
{
assert(zone);
fprintf(stderr, "LOADING ZONE %p\n", zone);
}
static void event_refresh(zone_t *zone)
{
assert(zone);
fprintf(stderr, "REFRESHING ZONE %p\n", zone);
// zone_schedule_event(zone, ZONE_EVENT_REFRESH, time);
}
static void event_expire(zone_t *zone)
{
assert(zone);
fprintf(stderr, "EXPIRING ZONE %p\n", zone);
}
static void event_dnssec(zone_t *zone)
{
assert(zone);
fprintf(stderr, "RESIGNING ZONE %p\n", zone);
// zone_schedule_event(zone, ZONE_EVENT_REFRESH, time);
}
/* -- internal API --------------------------------------------------------- */
static bool valid_event(zone_event_type_t type)
{
return (type > 0 && type < ZONE_EVENT_COUNT);
}
/*!
* \brief Find next scheduled zone event.
*
* \param events Zone events.
*
* \return Zone event type, or ZONE_EVENT_INVALID if no event is scheduled.
*/
static zone_event_type_t get_next_event(zone_events_t *events)
{
if (!events) {
return ZONE_EVENT_INVALID;
}
zone_event_type_t next_type = ZONE_EVENT_INVALID;
time_t next = 0;
for (int i = 0; i < ZONE_EVENT_COUNT; i++) {
time_t current = events->time[i];
if (current == 0) {
continue;
}
if (next == 0 || current < next) {
next = current;
next_type = i;
}
}
return next_type;
}
/*!
* \brief Set time of a given event type.
*/
static void event_set_time(zone_events_t *events, zone_event_type_t type, time_t time)
{
assert(events);
assert(valid_event(type));
events->time[type] = time;
}
/*!
* \brief Cancel scheduled item, schedule first enqueued item.
*
* Make sure this is not called multiple times simultaneously.
*/
static void reschedule(zone_events_t *events)
{
assert(events);
assert(events->event);
evsched_cancel(events->event);
zone_event_type_t type = get_next_event(events);
if (!valid_event) {
return;
}
time_t now = time(NULL);
time_t planned = events->time[type];
time_t diff = now < planned ? (planned - now) : 0;
evsched_schedule(events->event, diff * 1000);
}
/* -- callbacks control ---------------------------------------------------- */
/*!
* \brief Get callback for given type of event.
*/
static zone_event_cb get_event_callback(zone_event_type_t type)
{
switch (type) {
case ZONE_EVENT_RELOAD: return event_reload;
case ZONE_EVENT_REFRESH: return event_refresh;
case ZONE_EVENT_EXPIRE: return event_expire;
case ZONE_EVENT_DNSSEC: return event_dnssec;
default: return NULL;
}
}
/*!
* \brief Zone event wrapper, expected to be called from worker thread.
*
* 1. Takes the next planned event.
* 2. Resets the event's scheduled time.
* 3. Perform the event's callback.
* 4. Schedule next event planned event.
*/
static void event_wrap(task_t *task)
{
assert(task);
assert(task->ctx);
zone_t *zone = task->ctx;
zone_events_t *events = &zone->events;
zone_event_type_t type = get_next_event(events);
if (!valid_event(type)) {
return;
}
zone_event_cb run = get_event_callback(type);
assert(run);
event_set_time(events, type, 0);
run(zone);
reschedule(events);
}
/*!
* \brief Called by scheduler thread if the event occurs.
*/
static int event_dispatch(event_t *event)
{
assert(event);
assert(event->data);
zone_events_t *events = event->data;
worker_pool_assign(events->pool, &events->task);
return KNOT_EOK;
}
/* -- public API ----------------------------------------------------------- */
int zone_events_init(zone_t *zone, server_t *server)
{
if (!zone || !server) {
return KNOT_EINVAL;
}
event_t *event = evsched_event_create(&server->sched, event_dispatch,
&zone->events);
if (!event) {
return KNOT_ENOMEM;
}
memset(&zone->events, 0, sizeof(zone->events));
zone->events.event = event;
zone->events.pool = server->workers;
zone->events.task.ctx = zone;
zone->events.task.run = event_wrap;
return KNOT_EOK;
}