Commit 556e91ac authored by Marek Vavrusa's avatar Marek Vavrusa

More DThreads threading fixes.

parent 28ac2b38
......@@ -3,6 +3,7 @@
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include "dthreads.h"
#include "common.h"
#include "log.h"
......@@ -112,7 +113,7 @@ static void *thread_ep (void *data)
// Check thread state
lock_thread_rw(thread);
if (thread->state == ThreadDead) {
debug_dt(stderr, "dthreads: [%p] marked dead, finishing\n", thread);
debug_dt("dthreads: [%p] marked dead, finishing\n", thread);
unlock_thread_rw(thread);
break;
}
......@@ -134,7 +135,7 @@ static void *thread_ep (void *data)
// If the runnable was cancelled, start new iteration
lock_thread_rw(thread);
if (thread->state & ThreadCancelled) {
debug_dt(stderr, "dthreads: [%p] cancelled\n", thread);
debug_dt("dthreads: [%p] cancelled\n", thread);
thread->state &= ~ThreadCancelled;
unlock_thread_rw(thread);
continue;
......@@ -142,6 +143,7 @@ static void *thread_ep (void *data)
unlock_thread_rw(thread);
// Runnable finished without interruption, mark as Idle
pthread_mutex_lock(&unit->_notify_mx);
lock_thread_rw(thread);
if(thread->state & ThreadActive) {
thread->state &= ~ThreadActive;
......@@ -154,18 +156,24 @@ static void *thread_ep (void *data)
if (thread->state & ThreadIdle) {
unlock_thread_rw(thread);
// Signalize state change
unit_signalize_change(unit);
// Wait for notification from unit
pthread_mutex_lock(&unit->_notify_mx);
pthread_cond_wait(&unit->_notify, &unit->_notify_mx);
pthread_mutex_unlock(&unit->_notify_mx);
} else {
unlock_thread_rw(thread);
pthread_mutex_unlock(&unit->_notify_mx);
}
}
// Report thread state change
debug_dt(stderr, "dthreads: [%p] thread finished\n", thread);
debug_dt("dthreads: [%p] thread finished\n", thread);
pthread_mutex_lock(&unit->_notify_mx);
unit_signalize_change(unit);
pthread_mutex_unlock(&unit->_notify_mx);
debug_dt("dthreads: [%p] thread exited runnable\n", thread);
// Return
return 0;
......@@ -556,6 +564,7 @@ int dt_join (dt_unit_t *unit)
for(;;) {
// Lock unit
pthread_mutex_lock(&unit->_notify_mx);
lock_unit_rw(unit);
// Browse threads
......@@ -570,15 +579,16 @@ int dt_join (dt_unit_t *unit)
}
// Reclaim dead threads, but only fast
if(thread->state == ThreadDead) {
/*if(thread->state == ThreadDead) {
unlock_thread_rw(thread);
debug_dt(stderr, "dthreads: [%p] join: reclaiming thread\n", thread);
debug_dt("dthreads: [%p] join: reclaiming thread\n", thread);
pthread_join(thread->_thr, 0);
debug_dt(stderr, "dthreads: [%p] join: thread reclaimed\n", thread);
debug_dt("dthreads: [%p] join: thread reclaimed\n", thread);
thread->state = ThreadJoined;
} else {
unlock_thread_rw(thread);
}
}*/
unlock_thread_rw(thread);
}
// Unlock unit
......@@ -586,11 +596,13 @@ int dt_join (dt_unit_t *unit)
// Check result
if (active_threads == 0) {
pthread_mutex_unlock(&unit->_notify_mx);
break;
}
// Wait for a thread to finish
pthread_mutex_lock(&unit->_report_mx);
pthread_mutex_unlock(&unit->_notify_mx);
pthread_cond_wait(&unit->_report, &unit->_report_mx);
pthread_mutex_unlock(&unit->_report_mx);
}
......@@ -633,7 +645,7 @@ int dt_stop (dt_unit_t *unit)
if(thread->state & (ThreadIdle | ThreadActive)) {
thread->state = ThreadDead | ThreadCancelled;
dt_signalize(thread, SIGALRM);
debug_dt(stderr, "dthreads: [%p] signalizing to stop\n", thread);
debug_dt("dthreads: [%p] signalizing to stop\n", thread);
}
unlock_thread_rw(thread);
}
......@@ -756,17 +768,17 @@ int dt_compact (dt_unit_t *unit)
dthread_t *thread = unit->threads[i];
lock_thread_rw(thread);
if (thread->state & (ThreadDead)) {
debug_dt(stderr, "dthreads: [%p] compact: reclaiming thread\n", thread);
debug_dt("dthreads: [%p] compact: reclaiming thread\n", thread);
unlock_thread_rw(thread);
pthread_join(thread->_thr, 0);
debug_dt(stderr, "dthreads: [%p] compact: thread reclaimed\n", thread);
debug_dt("dthreads: [%p] compact: thread reclaimed\n", thread);
thread->state = ThreadJoined;
} else {
unlock_thread_rw(thread);
}
}
debug_dt(stderr, "dthreads: compact: joined all threads\n");
debug_dt("dthreads: compact: joined all threads\n");
// Unlock unit
unlock_unit_rw(unit);
......
......@@ -96,6 +96,9 @@ typedef struct dt_unit_t {
/*!
* \brief Create a set of threads with no initial runnable.
*
* \note All threads are created with Dead state.
* This means, they're not physically created unit dt_start() is called.
*
* \param count Requested thread count.
* \return On success: new instance, else 0
*/
......@@ -207,8 +210,11 @@ int dt_repurpose (dthread_t* thread, runnable_t runnable, void *data);
/*!
* \brief Wake up thread from idle state.
*
* Thread is awoken from idle state and enters runnable.
* This function has no effect on running threads.
* Thread is awoken from idle state and reenters runnable.
* This function only affects idle threads.
*
* \note Unit needs to be started with dt_start() first, as the function
* doesn't affect dead threads.
*/
int dt_activate (dthread_t *thread);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment