dthreads.c 16.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*  Copyright (C) 2011 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

17 18 19
#include <signal.h>
#include <stdlib.h>
#include <string.h>
20 21
#include <stdio.h>
#include <unistd.h>
22
#include <errno.h>
23 24
#include <urcu.h>

25 26 27
#ifdef HAVE_CAP_NG_H
#include <cap-ng.h>
#endif /* HAVE_CAP_NG_H */
28 29 30
#ifdef HAVE_PTHREAD_NP_H
#include <pthread_np.h>
#endif /* HAVE_PTHREAD_NP_H */
31

32
#include "knot/server/dthreads.h"
Daniel Salzman's avatar
Daniel Salzman committed
33
#include "libknot/libknot.h"
34

35 36 37 38 39
/* BSD cpu set compatibility. */
#if defined(HAVE_CPUSET_BSD)
typedef cpuset_t cpu_set_t;
#endif

40
/*! \brief Lock thread state for R/W. */
41
static inline void lock_thread_rw(dthread_t *thread)
42
{
Marek Vavrusa's avatar
Marek Vavrusa committed
43
	pthread_mutex_lock(&thread->_mx);
44
}
45
/*! \brief Unlock thread state for R/W. */
46 47
static inline void unlock_thread_rw(dthread_t *thread)
{
Marek Vavrusa's avatar
Marek Vavrusa committed
48
	pthread_mutex_unlock(&thread->_mx);
49 50
}

51
/*! \brief Signalize thread state change. */
52
static inline void unit_signalize_change(dt_unit_t *unit)
53
{
Marek Vavrusa's avatar
Marek Vavrusa committed
54 55 56
	pthread_mutex_lock(&unit->_report_mx);
	pthread_cond_signal(&unit->_report);
	pthread_mutex_unlock(&unit->_report_mx);
57 58
}

59 60 61 62 63
/*!
 * \brief Update thread state with notification.
 * \param thread Given thread.
 * \param state New state for thread.
 * \retval 0 on success.
64
 * \retval <0 on error (EINVAL, ENOTSUP).
65
 */
66
static inline int dt_update_thread(dthread_t *thread, int state)
67
{
Marek Vavrusa's avatar
Marek Vavrusa committed
68
	// Check
Marek Vavrusa's avatar
Marek Vavrusa committed
69
	if (thread == 0) {
Marek Vavrusa's avatar
Marek Vavrusa committed
70
		return KNOT_EINVAL;
Marek Vavrusa's avatar
Marek Vavrusa committed
71
	}
Marek Vavrusa's avatar
Marek Vavrusa committed
72 73

	// Cancel with lone thread
Marek Vavrusa's avatar
Marek Vavrusa committed
74 75
	dt_unit_t *unit = thread->unit;
	if (unit == 0) {
Marek Vavrusa's avatar
Marek Vavrusa committed
76
		return KNOT_ENOTSUP;
Marek Vavrusa's avatar
Marek Vavrusa committed
77
	}
Marek Vavrusa's avatar
Marek Vavrusa committed
78 79 80 81 82 83 84 85 86 87 88 89 90 91

	// Cancel current runnable if running
	pthread_mutex_lock(&unit->_notify_mx);
	lock_thread_rw(thread);
	if (thread->state & (ThreadIdle | ThreadActive)) {

		// Update state
		thread->state = state;
		unlock_thread_rw(thread);

		// Notify thread
		pthread_cond_broadcast(&unit->_notify);
		pthread_mutex_unlock(&unit->_notify_mx);
	} else {
92
		/* Unable to update thread, it is already dead. */
Marek Vavrusa's avatar
Marek Vavrusa committed
93
		unlock_thread_rw(thread);
94
		pthread_mutex_unlock(&unit->_notify_mx);
Marek Vavrusa's avatar
Marek Vavrusa committed
95
		return KNOT_EINVAL;
Marek Vavrusa's avatar
Marek Vavrusa committed
96 97
	}

Marek Vavrusa's avatar
Marek Vavrusa committed
98
	return KNOT_EOK;
99 100
}

101 102 103 104 105 106 107 108 109 110
/*!
 * \brief Thread entrypoint function.
 *
 * When a thread is created and started, it immediately enters this function.
 * Depending on thread state, it either enters runnable or
 * blocks until it is awakened.
 *
 * This function also handles "ThreadIdle" state to quickly suspend and resume
 * threads and mitigate thread creation costs. Also, thread runnable may
 * be changed to alter the thread behavior on runtime
111
 */
112
static void *thread_ep(void *data)
113
{
Marek Vavrusa's avatar
Marek Vavrusa committed
114 115 116 117 118 119 120
	// Check data
	dthread_t *thread = (dthread_t *)data;
	if (thread == 0) {
		return 0;
	}

	// Check if is a member of unit
Marek Vavrusa's avatar
Marek Vavrusa committed
121
	dt_unit_t *unit = thread->unit;
Marek Vavrusa's avatar
Marek Vavrusa committed
122 123 124 125
	if (unit == 0) {
		return 0;
	}

126
	// Unblock SIGALRM for synchronization
127
	sigset_t mask;
128
	(void)sigemptyset(&mask);
129 130
	sigaddset(&mask, SIGALRM);
	pthread_sigmask(SIG_UNBLOCK, &mask, NULL);
Marek Vavrusa's avatar
Marek Vavrusa committed
131

132
	rcu_register_thread();
Jan Včelák's avatar
Jan Včelák committed
133

134
	/* Drop capabilities except FS access. */
135
#ifdef HAVE_CAP_NG_H
136
	if (capng_have_capability(CAPNG_EFFECTIVE, CAP_SETPCAP)) {
137
		capng_type_t tp = CAPNG_EFFECTIVE|CAPNG_PERMITTED;
138
		capng_clear(CAPNG_SELECT_BOTH);
139
		capng_update(CAPNG_ADD, tp, CAP_DAC_OVERRIDE);
140 141
		capng_apply(CAPNG_SELECT_BOTH);
	}
142
#endif /* HAVE_CAP_NG_H */
Marek Vavrusa's avatar
Marek Vavrusa committed
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177

	// Run loop
	for (;;) {

		// Check thread state
		lock_thread_rw(thread);
		if (thread->state == ThreadDead) {
			unlock_thread_rw(thread);
			break;
		}

		// Update data
		thread->data = thread->_adata;
		runnable_t _run = thread->run;

		// Start runnable if thread is marked Active
		if ((thread->state == ThreadActive) && (thread->run != 0)) {
			unlock_thread_rw(thread);
			_run(thread);
		} else {
			unlock_thread_rw(thread);
		}

		// If the runnable was cancelled, start new iteration
		lock_thread_rw(thread);
		if (thread->state & ThreadCancelled) {
			thread->state &= ~ThreadCancelled;
			unlock_thread_rw(thread);
			continue;
		}
		unlock_thread_rw(thread);

		// Runnable finished without interruption, mark as Idle
		pthread_mutex_lock(&unit->_notify_mx);
		lock_thread_rw(thread);
Marek Vavrusa's avatar
Marek Vavrusa committed
178
		if (thread->state & ThreadActive) {
Marek Vavrusa's avatar
Marek Vavrusa committed
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
			thread->state &= ~ThreadActive;
			thread->state |= ThreadIdle;
		}

		// Go to sleep if idle
		if (thread->state & ThreadIdle) {
			unlock_thread_rw(thread);

			// Signalize state change
			unit_signalize_change(unit);

			// Wait for notification from unit
			pthread_cond_wait(&unit->_notify, &unit->_notify_mx);
			pthread_mutex_unlock(&unit->_notify_mx);
		} else {
			unlock_thread_rw(thread);
			pthread_mutex_unlock(&unit->_notify_mx);
		}
	}

199 200 201 202 203
	// Thread destructor
	if (thread->destruct) {
		thread->destruct(thread);
	}

Marek Vavrusa's avatar
Marek Vavrusa committed
204 205 206 207 208
	// Report thread state change
	unit_signalize_change(unit);
	lock_thread_rw(thread);
	thread->state |= ThreadJoinable;
	unlock_thread_rw(thread);
209
	rcu_unregister_thread();
Marek Vavrusa's avatar
Marek Vavrusa committed
210 211 212

	// Return
	return 0;
213 214
}

215 216
/*!
 * \brief Create single thread.
217 218
 * \retval New thread instance on success.
 * \retval NULL on error.
219
 */
220
static dthread_t *dt_create_thread(dt_unit_t *unit)
221
{
Marek Vavrusa's avatar
Marek Vavrusa committed
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
	// Alloc thread
	dthread_t *thread = malloc(sizeof(dthread_t));
	if (thread == 0) {
		return 0;
	}

	memset(thread, 0, sizeof(dthread_t));

	// Blank thread state
	thread->state = ThreadJoined;
	pthread_mutex_init(&thread->_mx, 0);

	// Set membership in unit
	thread->unit = unit;

	// Initialize attribute
	pthread_attr_t *attr = &thread->_attr;
	pthread_attr_init(attr);
240 241 242
	//pthread_attr_setinheritsched(attr, PTHREAD_INHERIT_SCHED);
	//pthread_attr_setschedpolicy(attr, SCHED_OTHER);
	pthread_attr_setstacksize(attr, 1024*1024);
Marek Vavrusa's avatar
Marek Vavrusa committed
243
	return thread;
244 245
}

246
/*! \brief Delete single thread. */
247
static void dt_delete_thread(dthread_t **thread)
248
{
Marek Vavrusa's avatar
Marek Vavrusa committed
249
	// Check
250
	if (!thread || !*thread) {
Marek Vavrusa's avatar
Marek Vavrusa committed
251
		return;
Marek Vavrusa's avatar
Marek Vavrusa committed
252
	}
253

Marek Vavrusa's avatar
Marek Vavrusa committed
254 255 256 257
	dthread_t* thr = *thread;
	thr->unit = 0;
	*thread = 0;

Marek Vavrusa's avatar
Marek Vavrusa committed
258
	// Delete attribute
Marek Vavrusa's avatar
Marek Vavrusa committed
259
	pthread_attr_destroy(&(thr)->_attr);
260

Marek Vavrusa's avatar
Marek Vavrusa committed
261
	// Delete mutex
Marek Vavrusa's avatar
Marek Vavrusa committed
262
	pthread_mutex_destroy(&(thr)->_mx);
263

Marek Vavrusa's avatar
Marek Vavrusa committed
264
	// Free memory
Marek Vavrusa's avatar
Marek Vavrusa committed
265
	free(thr);
266 267
}

268 269 270 271
/*
 * Public APIs.
 */

Marek Vavrusa's avatar
Marek Vavrusa committed
272
static dt_unit_t *dt_create_unit(int count)
273
{
Marek Vavrusa's avatar
Marek Vavrusa committed
274
	// Check count
Marek Vavrusa's avatar
Marek Vavrusa committed
275
	if (count <= 0) {
Marek Vavrusa's avatar
Marek Vavrusa committed
276
		return 0;
Marek Vavrusa's avatar
Marek Vavrusa committed
277
	}
Marek Vavrusa's avatar
Marek Vavrusa committed
278 279

	dt_unit_t *unit = malloc(sizeof(dt_unit_t));
Marek Vavrusa's avatar
Marek Vavrusa committed
280
	if (unit == 0) {
Marek Vavrusa's avatar
Marek Vavrusa committed
281
		return 0;
Marek Vavrusa's avatar
Marek Vavrusa committed
282
	}
Marek Vavrusa's avatar
Marek Vavrusa committed
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321

	// Initialize conditions
	if (pthread_cond_init(&unit->_notify, 0) != 0) {
		free(unit);
		return 0;
	}
	if (pthread_cond_init(&unit->_report, 0) != 0) {
		pthread_cond_destroy(&unit->_notify);
		free(unit);
		return 0;
	}

	// Initialize mutexes
	if (pthread_mutex_init(&unit->_notify_mx, 0) != 0) {
		pthread_cond_destroy(&unit->_notify);
		pthread_cond_destroy(&unit->_report);
		free(unit);
		return 0;
	}
	if (pthread_mutex_init(&unit->_report_mx, 0) != 0) {
		pthread_cond_destroy(&unit->_notify);
		pthread_cond_destroy(&unit->_report);
		pthread_mutex_destroy(&unit->_notify_mx);
		free(unit);
		return 0;
	}
	if (pthread_mutex_init(&unit->_mx, 0) != 0) {
		pthread_cond_destroy(&unit->_notify);
		pthread_cond_destroy(&unit->_report);
		pthread_mutex_destroy(&unit->_notify_mx);
		pthread_mutex_destroy(&unit->_report_mx);
		free(unit);
		return 0;
	}

	// Save unit size
	unit->size = count;

	// Alloc threads
322
	unit->threads = calloc(count, sizeof(dthread_t *));
Marek Vavrusa's avatar
Marek Vavrusa committed
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362
	if (unit->threads == 0) {
		pthread_cond_destroy(&unit->_notify);
		pthread_cond_destroy(&unit->_report);
		pthread_mutex_destroy(&unit->_notify_mx);
		pthread_mutex_destroy(&unit->_report_mx);
		pthread_mutex_destroy(&unit->_mx);
		free(unit);
		return 0;
	}

	// Initialize threads
	int init_success = 1;
	for (int i = 0; i < count; ++i) {
		unit->threads[i] = dt_create_thread(unit);
		if (unit->threads[i] == 0) {
			init_success = 0;
			break;
		}
	}

	// Check thread initialization
	if (!init_success) {

		// Delete created threads
		for (int i = 0; i < count; ++i) {
			dt_delete_thread(&unit->threads[i]);
		}

		// Free rest of the unit
		pthread_cond_destroy(&unit->_notify);
		pthread_cond_destroy(&unit->_report);
		pthread_mutex_destroy(&unit->_notify_mx);
		pthread_mutex_destroy(&unit->_report_mx);
		pthread_mutex_destroy(&unit->_mx);
		free(unit->threads);
		free(unit);
		return 0;
	}

	return unit;
363 364
}

Marek Vavrusa's avatar
Marek Vavrusa committed
365
dt_unit_t *dt_create(int count, runnable_t runnable, runnable_t destructor, void *data)
366
{
Marek Vavrusa's avatar
Marek Vavrusa committed
367
	// Check count
Marek Vavrusa's avatar
Marek Vavrusa committed
368
	if (count <= 0) {
Marek Vavrusa's avatar
Marek Vavrusa committed
369
		return 0;
Marek Vavrusa's avatar
Marek Vavrusa committed
370
	}
Marek Vavrusa's avatar
Marek Vavrusa committed
371 372

	// Create unit
Marek Vavrusa's avatar
Marek Vavrusa committed
373
	dt_unit_t *unit = dt_create_unit(count);
Marek Vavrusa's avatar
Marek Vavrusa committed
374
	if (unit == 0) {
Marek Vavrusa's avatar
Marek Vavrusa committed
375
		return 0;
Marek Vavrusa's avatar
Marek Vavrusa committed
376
	}
Marek Vavrusa's avatar
Marek Vavrusa committed
377 378 379 380

	// Set threads common purpose
	pthread_mutex_lock(&unit->_notify_mx);
	dt_unit_lock(unit);
Marek Vavrusa's avatar
Marek Vavrusa committed
381

Marek Vavrusa's avatar
Marek Vavrusa committed
382 383 384 385
	for (int i = 0; i < count; ++i) {
		dthread_t *thread = unit->threads[i];
		lock_thread_rw(thread);
		thread->run = runnable;
386
		thread->destruct = destructor;
Marek Vavrusa's avatar
Marek Vavrusa committed
387 388 389
		thread->_adata = data;
		unlock_thread_rw(thread);
	}
Marek Vavrusa's avatar
Marek Vavrusa committed
390

Marek Vavrusa's avatar
Marek Vavrusa committed
391 392 393 394
	dt_unit_unlock(unit);
	pthread_mutex_unlock(&unit->_notify_mx);

	return unit;
395 396
}

397
void dt_delete(dt_unit_t **unit)
398
{
Marek Vavrusa's avatar
Marek Vavrusa committed
399 400 401 402 403 404 405
	/*
	 *  All threads must be stopped or idle at this point,
	 *  or else the behavior is undefined.
	 *  Sorry.
	 */

	// Check
Marek Vavrusa's avatar
Marek Vavrusa committed
406
	if (unit == 0) {
Marek Vavrusa's avatar
Marek Vavrusa committed
407
		return;
Marek Vavrusa's avatar
Marek Vavrusa committed
408 409
	}
	if (*unit == 0) {
Marek Vavrusa's avatar
Marek Vavrusa committed
410
		return;
Marek Vavrusa's avatar
Marek Vavrusa committed
411
	}
Marek Vavrusa's avatar
Marek Vavrusa committed
412 413 414 415 416 417 418 419 420 421 422 423 424

	// Compact and reclaim idle threads
	dt_unit_t *d_unit = *unit;
	dt_compact(d_unit);

	// Delete threads
	for (int i = 0; i < d_unit->size; ++i) {
		dt_delete_thread(&d_unit->threads[i]);
	}

	// Deinit mutexes
	pthread_mutex_destroy(&d_unit->_notify_mx);
	pthread_mutex_destroy(&d_unit->_report_mx);
425
	pthread_mutex_destroy(&d_unit->_mx);
Marek Vavrusa's avatar
Marek Vavrusa committed
426 427 428 429 430 431 432 433 434

	// Deinit conditions
	pthread_cond_destroy(&d_unit->_notify);
	pthread_cond_destroy(&d_unit->_report);

	// Free memory
	free(d_unit->threads);
	free(d_unit);
	*unit = 0;
435 436
}

Marek Vavrusa's avatar
Marek Vavrusa committed
437
static int dt_start_id(dthread_t *thread)
438
{
Marek Vavrusa's avatar
Marek Vavrusa committed
439
	// Check input
440
	if (thread == 0) {
Marek Vavrusa's avatar
Marek Vavrusa committed
441
		return KNOT_EINVAL;
Marek Vavrusa's avatar
Marek Vavrusa committed
442
	}
Marek Vavrusa's avatar
Marek Vavrusa committed
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460

	lock_thread_rw(thread);

	// Update state
	int prev_state = thread->state;
	thread->state |= ThreadActive;
	thread->state &= ~ThreadIdle;
	thread->state &= ~ThreadDead;
	thread->state &= ~ThreadJoined;
	thread->state &= ~ThreadJoinable;

	// Do not re-create running threads
	if (prev_state != ThreadJoined) {
		unlock_thread_rw(thread);
		return 0;
	}

	// Start thread
461 462
	sigset_t mask_all, mask_old;
	sigfillset(&mask_all);
463
	sigdelset(&mask_all, SIGPROF);
464
	pthread_sigmask(SIG_SETMASK, &mask_all, &mask_old);
Marek Vavrusa's avatar
Marek Vavrusa committed
465
	int res = pthread_create(&thread->_thr,  /* pthread_t */
466 467 468
	                         &thread->_attr, /* pthread_attr_t */
	                         thread_ep,      /* routine: thread_ep */
	                         thread);        /* passed object: dthread_t */
469
	pthread_sigmask(SIG_SETMASK, &mask_old, NULL);
Marek Vavrusa's avatar
Marek Vavrusa committed
470 471 472 473

	// Unlock thread
	unlock_thread_rw(thread);
	return res;
474 475
}

Marek Vavrusa's avatar
Marek Vavrusa committed
476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
int dt_start(dt_unit_t *unit)
{
	// Check input
	if (unit == 0) {
		return KNOT_EINVAL;
	}

	// Lock unit
	pthread_mutex_lock(&unit->_notify_mx);
	dt_unit_lock(unit);
	for (int i = 0; i < unit->size; ++i) {

		dthread_t *thread = unit->threads[i];
		int res = dt_start_id(thread);
		if (res != 0) {
			dt_unit_unlock(unit);
			pthread_mutex_unlock(&unit->_notify_mx);
			return res;
		}
	}

	// Unlock unit
	dt_unit_unlock(unit);
	pthread_cond_broadcast(&unit->_notify);
	pthread_mutex_unlock(&unit->_notify_mx);
	return KNOT_EOK;
}

504
int dt_signalize(dthread_t *thread, int signum)
505
{
Marek Vavrusa's avatar
Marek Vavrusa committed
506
	// Check input
Marek Vavrusa's avatar
Marek Vavrusa committed
507
	if (thread == 0) {
Marek Vavrusa's avatar
Marek Vavrusa committed
508
		return KNOT_EINVAL;
Marek Vavrusa's avatar
Marek Vavrusa committed
509
	}
510

511 512 513 514
	int ret = pthread_kill(thread->_thr, signum);

	/* Not thread id found or invalid signum. */
	if (ret == EINVAL || ret == ESRCH) {
Marek Vavrusa's avatar
Marek Vavrusa committed
515
		return KNOT_EINVAL;
516 517 518 519
	}

	/* Generic error. */
	if (ret < 0) {
Marek Vavrusa's avatar
Marek Vavrusa committed
520
		return KNOT_ERROR;
521 522
	}

Marek Vavrusa's avatar
Marek Vavrusa committed
523
	return KNOT_EOK;
524 525
}

Marek Vavrusa's avatar
Marek Vavrusa committed
526
int dt_join(dt_unit_t *unit)
527
{
Marek Vavrusa's avatar
Marek Vavrusa committed
528
	// Check input
Marek Vavrusa's avatar
Marek Vavrusa committed
529
	if (unit == 0) {
Marek Vavrusa's avatar
Marek Vavrusa committed
530
		return KNOT_EINVAL;
Marek Vavrusa's avatar
Marek Vavrusa committed
531
	}
Marek Vavrusa's avatar
Marek Vavrusa committed
532

Marek Vavrusa's avatar
Marek Vavrusa committed
533
	for (;;) {
Marek Vavrusa's avatar
Marek Vavrusa committed
534 535 536 537 538 539 540 541 542

		// Lock unit
		pthread_mutex_lock(&unit->_report_mx);
		dt_unit_lock(unit);

		// Browse threads
		int active_threads = 0;
		for (int i = 0; i < unit->size; ++i) {

543
			// Count active or cancelled but pending threads
Marek Vavrusa's avatar
Marek Vavrusa committed
544 545
			dthread_t *thread = unit->threads[i];
			lock_thread_rw(thread);
546
			if (thread->state & (ThreadActive|ThreadCancelled)) {
Marek Vavrusa's avatar
Marek Vavrusa committed
547 548 549 550
				++active_threads;
			}

			// Reclaim dead threads, but only fast
Marek Vavrusa's avatar
Marek Vavrusa committed
551
			if (thread->state & ThreadJoinable) {
Marek Vavrusa's avatar
Marek Vavrusa committed
552 553
				unlock_thread_rw(thread);
				pthread_join(thread->_thr, 0);
Marek Vavrusa's avatar
Marek Vavrusa committed
554
				lock_thread_rw(thread);
Marek Vavrusa's avatar
Marek Vavrusa committed
555
				thread->state = ThreadJoined;
Marek Vavrusa's avatar
Marek Vavrusa committed
556
				unlock_thread_rw(thread);
Marek Vavrusa's avatar
Marek Vavrusa committed
557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
			} else {
				unlock_thread_rw(thread);
			}
		}

		// Unlock unit
		dt_unit_unlock(unit);

		// Check result
		if (active_threads == 0) {
			pthread_mutex_unlock(&unit->_report_mx);
			break;
		}

		// Wait for a thread to finish
		pthread_cond_wait(&unit->_report, &unit->_report_mx);
		pthread_mutex_unlock(&unit->_report_mx);
	}

Marek Vavrusa's avatar
Marek Vavrusa committed
576
	return KNOT_EOK;
577 578
}

579
int dt_stop(dt_unit_t *unit)
580
{
Marek Vavrusa's avatar
Marek Vavrusa committed
581
	// Check unit
Marek Vavrusa's avatar
Marek Vavrusa committed
582
	if (unit == 0) {
Marek Vavrusa's avatar
Marek Vavrusa committed
583
		return KNOT_EINVAL;
Marek Vavrusa's avatar
Marek Vavrusa committed
584
	}
Marek Vavrusa's avatar
Marek Vavrusa committed
585 586 587 588 589 590 591 592 593 594 595

	// Lock unit
	pthread_mutex_lock(&unit->_notify_mx);
	dt_unit_lock(unit);

	// Signalize all threads to stop
	for (int i = 0; i < unit->size; ++i) {

		// Lock thread
		dthread_t *thread = unit->threads[i];
		lock_thread_rw(thread);
Marek Vavrusa's avatar
Marek Vavrusa committed
596
		if (thread->state & (ThreadIdle | ThreadActive)) {
Marek Vavrusa's avatar
Marek Vavrusa committed
597 598 599 600 601 602 603 604 605 606 607 608 609
			thread->state = ThreadDead | ThreadCancelled;
			dt_signalize(thread, SIGALRM);
		}
		unlock_thread_rw(thread);
	}

	// Unlock unit
	dt_unit_unlock(unit);

	// Broadcast notification
	pthread_cond_broadcast(&unit->_notify);
	pthread_mutex_unlock(&unit->_notify_mx);

Marek Vavrusa's avatar
Marek Vavrusa committed
610
	return KNOT_EOK;
611 612
}

613
int dt_setaffinity(dthread_t *thread, unsigned* cpu_id, size_t cpu_count)
614
{
615
	if (thread == NULL) {
Marek Vavrusa's avatar
Marek Vavrusa committed
616
		return KNOT_EINVAL;
617
	}
Jan Včelák's avatar
Jan Včelák committed
618

619
#ifdef HAVE_PTHREAD_SETAFFINITY_NP
620
	int ret = -1;
Jan Včelák's avatar
Jan Včelák committed
621

622 623 624 625 626 627 628 629 630 631 632 633
/* Linux, FreeBSD interface. */
#if defined(HAVE_CPUSET_LINUX) || defined(HAVE_CPUSET_BSD)
	cpu_set_t set;
	CPU_ZERO(&set);
	for (unsigned i = 0; i < cpu_count; ++i) {
		CPU_SET(cpu_id[i], &set);
	}
	ret = pthread_setaffinity_np(thread->_thr, sizeof(cpu_set_t), &set);
/* NetBSD interface. */
#elif defined(HAVE_CPUSET_NETBSD)
	cpuset_t *set = cpuset_create();
	if (set == NULL) {
Marek Vavrusa's avatar
Marek Vavrusa committed
634
		return KNOT_ENOMEM;
635 636 637
	}
	cpuset_zero(set);
	for (unsigned i = 0; i < cpu_count; ++i) {
638
		cpuset_set(cpu_id[i], set);
639
	}
640 641 642 643
	ret = pthread_setaffinity_np(thread->_thr, cpuset_size(set), set);
	cpuset_destroy(set);
#endif /* interface */

644
	if (ret < 0) {
Marek Vavrusa's avatar
Marek Vavrusa committed
645
		return KNOT_ERROR;
646
	}
647 648

#else /* HAVE_PTHREAD_SETAFFINITY_NP */
Marek Vavrusa's avatar
Marek Vavrusa committed
649
	return KNOT_ENOTSUP;
650
#endif
Jan Včelák's avatar
Jan Včelák committed
651

Marek Vavrusa's avatar
Marek Vavrusa committed
652
	return KNOT_EOK;
653 654
}

655
int dt_activate(dthread_t *thread)
656
{
Marek Vavrusa's avatar
Marek Vavrusa committed
657
	return dt_update_thread(thread, ThreadActive);
658
}
659

660
int dt_cancel(dthread_t *thread)
661
{
Marek Vavrusa's avatar
Marek Vavrusa committed
662
	return dt_update_thread(thread, ThreadIdle | ThreadCancelled);
663 664
}

665
int dt_compact(dt_unit_t *unit)
666
{
Marek Vavrusa's avatar
Marek Vavrusa committed
667
	// Check input
Marek Vavrusa's avatar
Marek Vavrusa committed
668
	if (unit == 0) {
Marek Vavrusa's avatar
Marek Vavrusa committed
669
		return KNOT_EINVAL;
Marek Vavrusa's avatar
Marek Vavrusa committed
670
	}
Marek Vavrusa's avatar
Marek Vavrusa committed
671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701

	// Lock unit
	pthread_mutex_lock(&unit->_notify_mx);
	dt_unit_lock(unit);

	// Reclaim all Idle threads
	for (int i = 0; i < unit->size; ++i) {

		// Locked state update
		dthread_t *thread = unit->threads[i];
		lock_thread_rw(thread);
		if (thread->state & (ThreadIdle)) {
			thread->state = ThreadDead | ThreadCancelled;
			dt_signalize(thread, SIGALRM);
		}
		unlock_thread_rw(thread);
	}

	// Notify all threads
	pthread_cond_broadcast(&unit->_notify);
	pthread_mutex_unlock(&unit->_notify_mx);

	// Join all threads
	for (int i = 0; i < unit->size; ++i) {

		// Reclaim all dead threads
		dthread_t *thread = unit->threads[i];
		lock_thread_rw(thread);
		if (thread->state & (ThreadDead)) {
			unlock_thread_rw(thread);
			pthread_join(thread->_thr, 0);
Marek Vavrusa's avatar
Marek Vavrusa committed
702
			lock_thread_rw(thread);
Marek Vavrusa's avatar
Marek Vavrusa committed
703
			thread->state = ThreadJoined;
Marek Vavrusa's avatar
Marek Vavrusa committed
704
			unlock_thread_rw(thread);
Marek Vavrusa's avatar
Marek Vavrusa committed
705 706 707 708 709 710 711 712
		} else {
			unlock_thread_rw(thread);
		}
	}

	// Unlock unit
	dt_unit_unlock(unit);

Marek Vavrusa's avatar
Marek Vavrusa committed
713
	return KNOT_EOK;
714 715
}

716
int dt_online_cpus(void)
717
{
718
	int ret = -1;
719
/* Linux, Solaris, OS X 10.4+ */
720
#ifdef _SC_NPROCESSORS_ONLN
721
	ret = (int) sysconf(_SC_NPROCESSORS_ONLN);
722 723 724 725 726 727 728 729
#else
/* FreeBSD, NetBSD, OpenBSD, OS X < 10.4 */
#if HAVE_SYSCTLBYNAME
	size_t rlen = sizeof(int);
	if (sysctlbyname("hw.ncpu", &ret, &rlen, NULL, 0) < 0) {
		ret = -1;
	}
#endif
730 731 732 733
#endif
	return ret;
}

734
int dt_optimal_size(void)
735 736
{
	int ret = dt_online_cpus();
737
	if (ret > 1) {
738
		return ret;
Marek Vavrusa's avatar
Marek Vavrusa committed
739
	}
Jan Včelák's avatar
Jan Včelák committed
740

Marek Vavrusa's avatar
Marek Vavrusa committed
741
	return DEFAULT_THR_COUNT;
742 743
}

744
int dt_is_cancelled(dthread_t *thread)
745
{
Marek Vavrusa's avatar
Marek Vavrusa committed
746
	// Check input
Marek Vavrusa's avatar
Marek Vavrusa committed
747
	if (thread == 0) {
Marek Vavrusa's avatar
Marek Vavrusa committed
748
		return 0;
Marek Vavrusa's avatar
Marek Vavrusa committed
749
	}
Marek Vavrusa's avatar
Marek Vavrusa committed
750

751
	return thread->state & ThreadCancelled; /* No need to be locked. */
752
}
753

754 755 756 757 758 759 760
unsigned dt_get_id(dthread_t *thread)
{
	if (thread == NULL || thread->unit == NULL) {
		return 0;
	}

	dt_unit_t *unit = thread->unit;
761
	for(int tid = 0; tid < unit->size; ++tid) {
762 763 764 765
		if (thread == unit->threads[tid]) {
			return tid;
		}
	}
Jan Včelák's avatar
Jan Včelák committed
766

767 768 769
	return 0;
}

770 771
int dt_unit_lock(dt_unit_t *unit)
{
Marek Vavrusa's avatar
Marek Vavrusa committed
772
	// Check input
Marek Vavrusa's avatar
Marek Vavrusa committed
773
	if (unit == 0) {
Marek Vavrusa's avatar
Marek Vavrusa committed
774
		return KNOT_EINVAL;
Marek Vavrusa's avatar
Marek Vavrusa committed
775
	}
776

777 778 779
	int ret = pthread_mutex_lock(&unit->_mx);

	/* Map errors. */
780
	if (ret < 0) {
781
		return knot_map_errno();
782 783
	}

Marek Vavrusa's avatar
Marek Vavrusa committed
784
	return KNOT_EOK;
785 786 787 788
}

int dt_unit_unlock(dt_unit_t *unit)
{
Marek Vavrusa's avatar
Marek Vavrusa committed
789
	// Check input
Marek Vavrusa's avatar
Marek Vavrusa committed
790
	if (unit == 0) {
Marek Vavrusa's avatar
Marek Vavrusa committed
791
		return KNOT_EINVAL;
Marek Vavrusa's avatar
Marek Vavrusa committed
792
	}
793

794 795 796
	int ret = pthread_mutex_unlock(&unit->_mx);

	/* Map errors. */
797
	if (ret < 0) {
798
		return knot_map_errno();
799 800
	}

Marek Vavrusa's avatar
Marek Vavrusa committed
801
	return KNOT_EOK;
802
}