Commit 92ac648c authored by Marek Vavrusa's avatar Marek Vavrusa

Simplified dthreads.

parent 688c1ce0
......@@ -286,7 +286,7 @@ static void dt_delete_thread(dthread_t **thread)
* Public APIs.
*/
dt_unit_t *dt_create(int count)
static dt_unit_t *dt_create_unit(int count)
{
// Check count
if (count <= 0) {
......@@ -379,8 +379,7 @@ dt_unit_t *dt_create(int count)
return unit;
}
dt_unit_t *dt_create_coherent(int count, runnable_t runnable,
runnable_t destructor, void *data)
dt_unit_t *dt_create(int count, runnable_t runnable, runnable_t destructor, void *data)
{
// Check count
if (count <= 0) {
......@@ -388,7 +387,7 @@ dt_unit_t *dt_create_coherent(int count, runnable_t runnable,
}
// Create unit
dt_unit_t *unit = dt_create(count);
dt_unit_t *unit = dt_create_unit(count);
if (unit == 0) {
return 0;
}
......@@ -452,39 +451,7 @@ void dt_delete(dt_unit_t **unit)
*unit = 0;
}
int dt_start(dt_unit_t *unit)
{
// Check input
if (unit == 0) {
return KNOT_EINVAL;
}
// Lock unit
pthread_mutex_lock(&unit->_notify_mx);
dt_unit_lock(unit);
for (int i = 0; i < unit->size; ++i) {
dthread_t *thread = unit->threads[i];
int res = dt_start_id(thread);
if (res != 0) {
dbg_dt("dthreads: failed to create thread '%d'.", i);
dt_unit_unlock(unit);
pthread_mutex_unlock(&unit->_notify_mx);
return res;
}
dbg_dt("dthreads: [%p] %s: thread started\n",
thread, __func__);
}
// Unlock unit
dt_unit_unlock(unit);
pthread_cond_broadcast(&unit->_notify);
pthread_mutex_unlock(&unit->_notify_mx);
return KNOT_EOK;
}
int dt_start_id(dthread_t *thread)
static int dt_start_id(dthread_t *thread)
{
// Check input
if (thread == 0) {
......@@ -520,6 +487,38 @@ int dt_start_id(dthread_t *thread)
return res;
}
int dt_start(dt_unit_t *unit)
{
// Check input
if (unit == 0) {
return KNOT_EINVAL;
}
// Lock unit
pthread_mutex_lock(&unit->_notify_mx);
dt_unit_lock(unit);
for (int i = 0; i < unit->size; ++i) {
dthread_t *thread = unit->threads[i];
int res = dt_start_id(thread);
if (res != 0) {
dbg_dt("dthreads: failed to create thread '%d'.", i);
dt_unit_unlock(unit);
pthread_mutex_unlock(&unit->_notify_mx);
return res;
}
dbg_dt("dthreads: [%p] %s: thread started\n",
thread, __func__);
}
// Unlock unit
dt_unit_unlock(unit);
pthread_cond_broadcast(&unit->_notify);
pthread_mutex_unlock(&unit->_notify_mx);
return KNOT_EOK;
}
int dt_signalize(dthread_t *thread, int signum)
{
// Check input
......@@ -599,32 +598,6 @@ int dt_join(dt_unit_t *unit)
return KNOT_EOK;
}
int dt_stop_id(dthread_t *thread)
{
// Check input
if (thread == 0) {
return KNOT_EINVAL;
}
// Signalize active thread to stop
lock_thread_rw(thread);
if (thread->state & (ThreadIdle | ThreadActive)) {
thread->state = ThreadDead | ThreadCancelled;
dt_signalize(thread, SIGALRM);
}
unlock_thread_rw(thread);
// Broadcast notification
dt_unit_t *unit = thread->unit;
if (unit != 0) {
pthread_mutex_lock(&unit->_notify_mx);
pthread_cond_broadcast(&unit->_notify);
pthread_mutex_unlock(&unit->_notify_mx);
}
return KNOT_EOK;
}
int dt_stop(dt_unit_t *unit)
{
// Check unit
......@@ -704,48 +677,6 @@ int dt_setaffinity(dthread_t *thread, unsigned* cpu_id, size_t cpu_count)
return KNOT_EOK;
}
int dt_repurpose(dthread_t *thread, runnable_t runnable, void *data)
{
// Check
if (thread == 0) {
return KNOT_EINVAL;
}
// Stop here if thread isn't a member of a unit
dt_unit_t *unit = thread->unit;
if (unit == 0) {
lock_thread_rw(thread);
thread->state = ThreadActive | ThreadCancelled;
unlock_thread_rw(thread);
return KNOT_ENOTSUP;
}
// Lock thread state changes
pthread_mutex_lock(&unit->_notify_mx);
lock_thread_rw(thread);
// Repurpose it's object and runnable
thread->run = runnable;
thread->_adata = data;
// Cancel current runnable if running
if (thread->state & (ThreadIdle | ThreadActive)) {
// Update state
thread->state = ThreadActive | ThreadCancelled;
unlock_thread_rw(thread);
// Notify thread
pthread_cond_broadcast(&unit->_notify);
pthread_mutex_unlock(&unit->_notify_mx);
} else {
unlock_thread_rw(thread);
pthread_mutex_unlock(&unit->_notify_mx);
}
return KNOT_EOK;
}
int dt_activate(dthread_t *thread)
{
return dt_update_thread(thread, ThreadActive);
......
......@@ -104,19 +104,6 @@ typedef struct dt_unit_t {
pthread_mutex_t _mx; /* Unit lock */
} dt_unit_t;
/*!
* \brief Create a set of threads with no initial runnable.
*
* \note All threads are created with Dead state.
* This means, they're not physically created unit dt_start() is called.
*
* \param count Requested thread count.
*
* \retval New instance if successful
* \retval NULL on error
*/
dt_unit_t *dt_create(int count);
/*!
* \brief Create a set of coherent threads.
*
......@@ -130,8 +117,7 @@ dt_unit_t *dt_create(int count);
* \retval New instance if successful
* \retval NULL on error
*/
dt_unit_t *dt_create_coherent(int count, runnable_t runnable,
runnable_t destructor, void *data);
dt_unit_t *dt_create(int count, runnable_t runnable, runnable_t destructor, void *data);
/*!
* \brief Free unit.
......@@ -153,16 +139,6 @@ void dt_delete(dt_unit_t **unit);
*/
int dt_start(dt_unit_t *unit);
/*!
* \brief Start given thread.
*
* \param thread Target thread instance.
*
* \retval KNOT_EOK on success.
* \retval KNOT_EINVAL on invalid parameters.
*/
int dt_start_id(dthread_t *thread);
/*!
* \brief Send given signal to thread.
*
......@@ -189,18 +165,6 @@ int dt_signalize(dthread_t *thread, int signum);
*/
int dt_join(dt_unit_t *unit);
/*!
* \brief Stop thread from running.
*
* Active thread is interrupted at the nearest runnable cancellation point.
*
* \param thread Target thread instance.
*
* \retval KNOT_EOK on success.
* \retval KNOT_EINVAL on invalid parameters.
*/
int dt_stop_id(dthread_t *thread);
/*!
* \brief Stop all threads in unit.
*
......@@ -225,19 +189,6 @@ int dt_stop(dt_unit_t *unit);
*/
int dt_setaffinity(dthread_t *thread, unsigned* cpu_id, size_t cpu_count);
/*!
* \brief Set thread to execute another runnable.
*
* \param thread Target thread instance.
* \param runnable Runnable function for target thread.
* \param data Data passed to target thread.
*
* \retval KNOT_EOK on success.
* \retval KNOT_EINVAL on invalid parameters.
* \retval KNOT_ENOTSUP operation not supported.
*/
int dt_repurpose(dthread_t *thread, runnable_t runnable, void *data);
/*!
* \brief Wake up thread from idle state.
*
......
......@@ -52,7 +52,7 @@ int runnable(struct dthread_t *thread)
}
/* Destructor data. */
static volatile int _destructor_data;
static volatile int _destructor_data = 0;
static pthread_mutex_t _destructor_mx;
/*! \brief Thread destructor. */
......@@ -65,47 +65,6 @@ int destruct(struct dthread_t *thread)
return 0;
}
/*! \brief Create unit. */
static inline dt_unit_t *dt_test_create(int size)
{
return dt_create(size);
}
/*! \brief Assign a task. */
static inline int dt_test_single(dt_unit_t *unit)
{
return dt_repurpose(unit->threads[0], &runnable, NULL) == 0;
}
/*! \brief Assign task to all unit threads. */
static inline int dt_test_coherent(dt_unit_t *unit)
{
int ret = 0;
for (int i = 0; i < unit->size; ++i) {
ret += dt_repurpose(unit->threads[i], &runnable, NULL);
}
return ret == 0;
}
/*! \brief Start unit. */
static inline int dt_test_start(dt_unit_t *unit)
{
return dt_start(unit) == 0;
}
/*! \brief Stop unit. */
static inline int dt_test_stop(dt_unit_t *unit)
{
return dt_stop(unit);
}
/*! \brief Join unit. */
static inline int dt_test_join(dt_unit_t *unit)
{
return dt_join(unit) == 0;
}
// Signal handler
static void interrupt_handle(int s)
{
......@@ -114,7 +73,7 @@ static void interrupt_handle(int s)
/*! API: run tests. */
int main(int argc, char *argv[])
{
plan(15);
plan(8);
// Register service and signal handler
struct sigaction sa;
......@@ -129,93 +88,56 @@ int main(int argc, char *argv[])
pthread_mutex_init(&_destructor_mx, NULL);
/* Test 1: Create unit */
dt_unit_t *unit = dt_test_create(2);
ok(unit != 0, "dthreads: create unit (optimal size %d)", unit->size);
if (unit == 0) {
skip_block(17, "No dthreads unit");
dt_unit_t *unit = dt_create(2, &runnable, NULL, NULL);
ok(unit != NULL, "dthreads: create unit (size %d)", unit->size);
if (unit == NULL) {
skip_block(7, "No dthreads unit");
goto skip_all;
}
/* Test 2: Assign a single task. */
ok(dt_test_single(unit), "dthreads: assign single task");
/* Test 3: Start tasks. */
/* Test 2: Start tasks. */
_runnable_i = 0;
ok(dt_test_start(unit), "dthreads: start single task");
ok(dt_start(unit) == 0, "dthreads: start single task");
/* Test 4: Wait for tasks. */
ok(dt_test_join(unit), "dthreads: join threads");
/* Test 3: Wait for tasks. */
ok(dt_join(unit) == 0, "dthreads: join threads");
/* Test 5: Compare counter. */
int expected = _runnable_cycles * 1;
/* Test 4: Compare counter. */
int expected = _runnable_cycles * 2;
is_int(expected, _runnable_i, "dthreads: result ok");
/* Test 6: Repurpose threads. */
_runnable_i = 0;
ok(dt_test_coherent(unit), "dthreads: repurpose to coherent");
/* Test 7: Restart threads. */
ok(dt_test_start(unit), "dthreads: start coherent unit");
/* Test 8: Wait for tasks. */
ok(dt_test_join(unit), "dthreads: join threads");
/* Test 9: Deinitialize */
/* Test 5: Deinitialize */
dt_delete(&unit);
ok(unit == NULL, "dthreads: delete unit");
/* Test 10: Wrong values. */
unit = dt_create(-1);
/* Test 6: Wrong values. */
unit = dt_create(-1, NULL, NULL, NULL);
ok(unit == NULL, "dthreads: create with negative count");
unit = dt_create_coherent(dt_optimal_size(), 0, 0, 0);
/* Test 11: NULL runnable. */
is_int(0, dt_start(unit), "dthreads: start with NULL runnable");
/* Test 12: NULL operations crashing. */
int op_count = 14;
int expected_min = op_count * -1;
// All functions must return -1 at least
/* Test 7: NULL operations crashing. */
int ret = 0;
ret += dt_activate(0); // -1
ret += dt_cancel(0); // -1
ret += dt_compact(0); // -1
dt_delete(0); //
ret += dt_is_cancelled(0); // 0
ret += dt_join(0); // -1
ret += dt_repurpose(0, 0, 0); // -1
ret += dt_signalize(0, SIGALRM); // -1
ret += dt_start(0); // -1
ret += dt_start_id(0); // -1
ret += dt_stop(0); // -1
ret += dt_stop_id(0); // -1
ret += dt_unit_lock(0); // -1
ret += dt_unit_unlock(0); // -1
is_int(-1464, ret, "dthreads: not crashed while executing functions on NULL context");
/* Test 13: expected results. */
ok(ret <= expected_min,
"dthreads: correct values when passed NULL context "
"(%d, min: %d)", ret, expected_min);
/* Test 14: Thread destructor. */
ret += dt_activate(0);
ret += dt_cancel(0);
ret += dt_compact(0);
dt_delete(0);
ret += dt_is_cancelled(0);
ret += dt_join(0);
ret += dt_signalize(0, SIGALRM);
ret += dt_start(0);
ret += dt_stop(0);
ret += dt_unit_lock(0);
ret += dt_unit_unlock(0);
is_int(-1098, ret, "dthreads: correct values when passed NULL context");
/* Test 8: Thread destructor. */
_destructor_data = 0;
unit = dt_create_coherent(2, 0, destruct, 0);
unit = dt_create(2, 0, destruct, 0);
dt_start(unit);
dt_stop(unit);
dt_join(unit);
is_int(2, _destructor_data, "dthreads: destructor with dt_create_coherent()");
dt_delete(&unit);
/* Test 15: Thread destructor setter. */
unit = dt_create(1);
dt_set_desctructor(unit->threads[0], destruct);
dt_start(unit);
dt_stop(unit);
dt_join(unit);
is_int(3, _destructor_data, "dthreads: destructor with dt_set_desctructor()");
dt_delete(&unit);
skip_all:
pthread_mutex_destroy(&_runnable_mx);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment