test_worker_pool.c 3.28 KB
Newer Older
Jan Včelák's avatar
Jan Včelák committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
/*  Copyright (C) 2014 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include <tap/basic.h>

#include <errno.h>
#include <pthread.h>
#include <sched.h>
#include <signal.h>
#include <time.h>

#include "knot/worker/pool.h"
#include "knot/worker/queue.h"

#define THREADS 4
#define TASKS_BATCH 40

/*!
 * Task execution log.
 */
typedef struct task_log {
	pthread_mutex_t mx;
	unsigned executed;
} task_log_t;

/*!
 * Get number of executed tasks and clear.
 */
static unsigned executed_reset(task_log_t *log)
{
	pthread_mutex_lock(&log->mx);
	unsigned result = log->executed;
	log->executed = 0;
	pthread_mutex_unlock(&log->mx);

	return result;
}

/*!
 * Simple task, just increases the counter in the log.
 */
static void task_counting(task_t *task)
{
	task_log_t *log = task->ctx;

	pthread_mutex_lock(&log->mx);
	log->executed += 1;
	pthread_mutex_unlock(&log->mx);
}

static void interrupt_handle(int s)
{
}

int main(void)
{
	plan_lazy();

	struct sigaction sa;
	sa.sa_handler = interrupt_handle;
	sigemptyset(&sa.sa_mask);
	sa.sa_flags = 0;
	sigaction(SIGALRM, &sa, NULL); // Interrupt

	// create pool

	worker_pool_t *pool = worker_pool_create(THREADS);
	ok(pool != NULL, "create worker pool");
	if (!pool) {
		return 1;
	}

	task_log_t log = {
		.mx = PTHREAD_MUTEX_INITIALIZER,
	};

	// schedule jobs while pool is stopped

	task_t task = { .run = task_counting, .ctx = &log };
	for (int i = 0; i < TASKS_BATCH; i++) {
		worker_pool_assign(pool, &task);
	}

	sched_yield();
	ok(executed_reset(&log) == 0, "executed count before start");

	// start and wait for finish

	worker_pool_start(pool);
	worker_pool_wait(pool);
	ok(executed_reset(&log) == TASKS_BATCH, "executed count after start");

	// add additional jobs while pool is running

	for (int i = 0; i < TASKS_BATCH; i++) {
		worker_pool_assign(pool, &task);
	}

	worker_pool_wait(pool);
	ok(executed_reset(&log) == TASKS_BATCH, "executed count after add");

115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
	// temporary suspension

	worker_pool_suspend(pool);

	for (int i = 0; i < TASKS_BATCH; i++) {
		worker_pool_assign(pool, &task);
	}

	sched_yield();
	ok(executed_reset(&log) == 0, "executed count after suspend");

	worker_pool_resume(pool);
	worker_pool_wait(pool);
	ok(executed_reset(&log) == TASKS_BATCH, "executed count after resume");

Jan Včelák's avatar
Jan Včelák committed
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
	// try clean

	pthread_mutex_lock(&log.mx);
	for (int i = 0; i < THREADS + TASKS_BATCH; i++) {
		worker_pool_assign(pool, &task);
	}
	sched_yield();
	worker_pool_clear(pool);
	pthread_mutex_unlock(&log.mx);

	worker_pool_wait(pool);
	ok(executed_reset(&log) <= THREADS, "executed count after clear");

	// cleanup

	worker_pool_stop(pool);
	worker_pool_join(pool);
	worker_pool_destroy(pool);

	pthread_mutex_destroy(&log.mx);

	return 0;
}