Commit 20d115c5 authored by Marek Vavrusa's avatar Marek Vavrusa

Lock unit while working with dt_unit_t instance.

This makes dt_resize() and others thread-safe.
parent 4c88e0b4
......@@ -19,6 +19,19 @@ static inline void unlock_thread_rw(dthread_t *thread)
pthread_mutex_unlock(&thread->_mx);
}
/* Lock unit state for R/W. */
static inline void lock_unit_rw(dt_unit_t *unit)
{
pthread_mutex_lock(&unit->_mx);
}
/* Unlock unit state for R/W. */
static inline void unlock_unit_rw(dt_unit_t *unit)
{
pthread_mutex_unlock(&unit->_mx);
}
/* Signalize thread state change. */
static inline void unit_signalize_change(dt_unit_t *unit)
{
......@@ -75,11 +88,12 @@ static void *thread_ep(void *data)
// Update data
lock_thread_rw(thread);
thread->data = thread->_adata;
runnable_t _run = thread->run;
// Start runnable if thread is marked Active
if ((thread->state == ThreadActive) && (thread->run != 0)) {
unlock_thread_rw(thread);
thread->run(thread);
_run(thread);
} else {
unlock_thread_rw(thread);
}
......@@ -209,6 +223,14 @@ dt_unit_t *dt_create (int count)
free(unit);
return 0;
}
if (pthread_mutex_init(&unit->_mx, 0) != 0) {
pthread_cond_destroy(&unit->_notify);
pthread_cond_destroy(&unit->_report);
pthread_mutex_destroy(&unit->_notify_mx);
pthread_mutex_destroy(&unit->_report_mx);
free(unit);
return 0;
}
// Save unit size
unit->size = count;
......@@ -220,6 +242,7 @@ dt_unit_t *dt_create (int count)
pthread_cond_destroy(&unit->_report);
pthread_mutex_destroy(&unit->_notify_mx);
pthread_mutex_destroy(&unit->_report_mx);
pthread_mutex_destroy(&unit->_mx);
free(unit);
return 0;
}
......@@ -247,6 +270,7 @@ dt_unit_t *dt_create (int count)
pthread_cond_destroy(&unit->_report);
pthread_mutex_destroy(&unit->_notify_mx);
pthread_mutex_destroy(&unit->_report_mx);
pthread_mutex_destroy(&unit->_mx);
free(unit->threads);
free(unit);
return 0;
......@@ -264,8 +288,11 @@ dt_unit_t *dt_create_coherent (int count, runnable_t runnable, void *data)
// Set threads common purpose
for (int i = 0; i < count; ++i) {
unit->threads[i]->run = runnable;
unit->threads[i]->_adata = data;
dthread_t *thread = unit->threads[i];
lock_thread_rw(thread);
thread->run = runnable;
thread->_adata = data;
unlock_thread_rw(thread);
}
return unit;
......@@ -325,6 +352,9 @@ int dt_resize(dt_unit_t *unit, int size)
if (threads == 0)
return -1;
// Lock unit
lock_unit_rw(unit);
// Reassign
unit->threads = threads;
......@@ -335,6 +365,7 @@ int dt_resize(dt_unit_t *unit, int size)
// Update unit
unit->size = size;
unlock_unit_rw(unit);
return 0;
}
......@@ -347,6 +378,9 @@ int dt_resize(dt_unit_t *unit, int size)
if (threads == 0)
return -1;
// Lock unit
lock_unit_rw(unit);
// Iterate while there is space in new unit
memset(threads, 0, size * sizeof(dthread_t*));
int threshold = ThreadActive;
......@@ -431,11 +465,16 @@ int dt_resize(dt_unit_t *unit, int size)
free(unit->threads);
unit->threads = threads;
// Unlock unit
unlock_unit_rw(unit);
return 0;
}
int dt_start (dt_unit_t *unit)
{
// Lock unit
lock_unit_rw(unit);
for (int i = 0; i < unit->size; ++i)
{
dthread_t* thread = unit->threads[i];
......@@ -464,10 +503,14 @@ int dt_start (dt_unit_t *unit)
unlock_thread_rw(thread);
if (res != 0) {
log_error("%s: failed to create thread %d", __func__, i);
unlock_unit_rw(unit);
return res;
}
}
// Unlock unit
unlock_unit_rw(unit);
return 0;
}
......@@ -483,6 +526,9 @@ int dt_join (dt_unit_t *unit)
// Lock threads state
pthread_mutex_lock(&unit->_report_mx);
// Lock unit
lock_unit_rw(unit);
// Browse threads
int active_threads = 0;
for (int i = 0; i < unit->size; ++i) {
......@@ -501,6 +547,9 @@ int dt_join (dt_unit_t *unit)
unlock_thread_rw(thread);
}
// Unlock unit
unlock_unit_rw(unit);
// Check result
if (active_threads == 0) {
pthread_mutex_unlock(&unit->_report_mx);
......@@ -634,6 +683,9 @@ int dt_cancel (dthread_t *thread)
int dt_compact (dt_unit_t *unit)
{
// Lock unit
lock_unit_rw(unit);
// Reclaim all Idle threads
for (int i = 0; i < unit->size; ++i) {
......@@ -646,11 +698,17 @@ int dt_compact (dt_unit_t *unit)
unlock_thread_rw(thread);
}
// Unlock unit
unlock_unit_rw(unit);
// Notify all threads
pthread_mutex_lock(&unit->_notify_mx);
pthread_cond_broadcast(&unit->_notify);
pthread_mutex_unlock(&unit->_notify_mx);
// Lock unit
lock_unit_rw(unit);
// Join all threads
for (int i = 0; i < unit->size; ++i) {
......@@ -662,6 +720,9 @@ int dt_compact (dt_unit_t *unit)
}
}
// Unlock unit
unlock_unit_rw(unit);
return 0;
}
......
......@@ -90,11 +90,9 @@ typedef struct dt_unit_t {
pthread_mutex_t _notify_mx; /* Condition mutex */
pthread_cond_t _report; /* Report thread state */
pthread_mutex_t _report_mx; /* Condition mutex */
pthread_mutex_t _mx; /* Unit lock */
} dt_unit_t;
/*! \brief Accessor to threads in unit. */
#define dt_get(p_unit, id) (p_unit->threads + (id))
/*!
* \brief Create a set of threads with no initial runnable.
*
......
......@@ -19,7 +19,7 @@ unit_api dthreads_tests_api = {
/*
* Unit implementation.
*/
static const int DT_TEST_COUNT = 16;
static const int DT_TEST_COUNT = 17;
/* Unit runnable data. */
static pthread_mutex_t _runnable_mx;
......@@ -162,6 +162,50 @@ static inline int dt_test_resize(dt_unit_t *unit, int size)
return ret == 0;
}
/*! \brief Resize unit while threads are active. */
static inline int dt_test_liveresize(dt_unit_t *unit)
{
// Size
int size = unit->size;
int size_hi = size + 2;
int size_lo = size - 1;
// Expand
int ret = 0;
ret = dt_resize(unit, size_hi);
if (ret < 0) {
return 0;
}
// Repurpose all
for (int i = 0; i < unit->size; ++i) {
ret += dt_repurpose(unit->threads[i], &runnable, 0);
}
// Restart
_runnable_i = 0;
ret += dt_start(unit);
// Shrink
ret += dt_resize(unit, size_lo);
// Wait for finish
ret += dt_join(unit);
// Verify
int expected_hi = size_hi * _runnable_cycles;
int expected_lo = size_lo * _runnable_cycles;
note("resize test: %d->%d->%d threads, %d ticks, <%d,%d> expected",
size, size_hi, size_lo, _runnable_i, expected_lo, expected_hi);
if(_runnable_i > expected_hi || _runnable_i < expected_lo) {
return 0;
}
// Check return codes
return ret == 0;
}
/*! \brief Start unit. */
static inline int dt_test_start(dt_unit_t *unit)
{
......@@ -266,7 +310,10 @@ static int dt_tests_run(int argc, char * argv[])
ok(dt_test_resize(unit, size),
"dthreads: shrinking unit to size / 2 (%d threads)", size);
/* Test 16: Deinitialize */
/* Test 16: Resize while threads are active. */
ok(dt_test_liveresize(unit), "dthreads: resizing unit while active");
/* Test 17: Deinitialize */
dt_delete(&unit);
ok(unit == 0, "dthreads: delete unit");
endskip;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment