Commit 4c88e0b4 authored by Marek Vavrusa's avatar Marek Vavrusa

Added unit expanding/shrinking via dt_resize().

Beware though, it is not thread-safe so make sure
you are not in dt_join() while silently killing them.
The Earth might explode in the process.
parent bd1513f0
This diff is collapsed.
......@@ -85,7 +85,7 @@ typedef struct dthread_t {
*/
typedef struct dt_unit_t {
int size; /*!< Unit width (number of allocated threads) */
struct dthread_t *threads; /*!< Array of threads */
struct dthread_t **threads; /*!< Array of threads */
pthread_cond_t _notify; /* Notify thread */
pthread_mutex_t _notify_mx; /* Condition mutex */
pthread_cond_t _report; /* Report thread state */
......@@ -121,6 +121,26 @@ dt_unit_t *dt_create_coherent (int count, runnable_t runnable, void *data);
*/
void dt_delete (dt_unit_t **unit);
/*!
* \brief Resize unit to given number.
*
* \note Newly created dthreads will have
* no runnable or data, their state
* will be ThreadJoined (that means
* no thread will be physically created until
* next dt_start()).
*
* \warning Be careful when shrinking unit,
* joined and idle threads are reclaimed first,
* but it may kill your active threads as a last resort.
* However, threads will stop at their cancellation point,
* so this is potentially an expensive operation.
*
* \param size New unit size.
* \return On success: 0, else <0
*/
int dt_resize(dt_unit_t *unit, int size);
/*!
* \brief Start all threads in selected unit.
*
......@@ -148,14 +168,14 @@ int dt_signalize (dthread_t *thread, int signum);
int dt_join (dt_unit_t *unit);
/*!
* \brief Stop all threads from running.
* \brief Stop thread from running.
*
* Active threads are interrupted at the nearest
* Active thread is interrupted at the nearest
* runnable cancellation point.
*
* \return Number of affected threads.
* \return On success: 0, else <0
*/
int dt_stop (dt_unit_t *unit);
int dt_stop (dthread_t* thread);
/*!
* \brief Modify thread priority.
......
......@@ -19,7 +19,7 @@ unit_api dthreads_tests_api = {
/*
* Unit implementation.
*/
static const int DT_TEST_COUNT = 14;
static const int DT_TEST_COUNT = 16;
/* Unit runnable data. */
static pthread_mutex_t _runnable_mx;
......@@ -65,7 +65,7 @@ static inline dt_unit_t *dt_test_create(int size)
/*! \brief Assign a task. */
static inline int dt_test_single(dt_unit_t *unit)
{
return dt_repurpose(unit->threads + 0, &runnable, NULL) == 0;
return dt_repurpose(unit->threads[0], &runnable, NULL) == 0;
}
/*! \brief Assign task to all unit threads. */
......@@ -73,7 +73,7 @@ static inline int dt_test_coherent(dt_unit_t *unit)
{
int ret = 0;
for (int i = 0; i < unit->size; ++i) {
ret += dt_repurpose(unit->threads + i, &runnable, NULL);
ret += dt_repurpose(unit->threads[i], &runnable, NULL);
}
return ret == 0;
......@@ -82,13 +82,13 @@ static inline int dt_test_coherent(dt_unit_t *unit)
/*! \brief Repurpose single thread. */
static inline int dt_test_repurpose(dt_unit_t *unit, int id)
{
return dt_repurpose(unit->threads + id, &runnable_simio, NULL) == 0;
return dt_repurpose(unit->threads[id], &runnable_simio, NULL) == 0;
}
/*! \brief Cancel single thread. */
static inline int dt_test_cancel(dt_unit_t *unit, int id)
{
return dt_cancel(unit->threads + id) == 0;
return dt_cancel(unit->threads[id]) == 0;
}
/*! \brief Reanimate dead threads. */
......@@ -100,11 +100,11 @@ static inline int dt_test_reanimate(dt_unit_t *unit)
// Remove purpose from all
for (int i = 0; i < unit->size; ++i) {
ret += dt_repurpose(unit->threads + i, 0, 0);
ret += dt_repurpose(unit->threads[i], 0, 0);
}
// Set single thread to purpose
ret += dt_repurpose(unit->threads, &runnable, 0);
ret += dt_repurpose(unit->threads[0], &runnable, 0);
// Restart
_runnable_i = 0;
......@@ -123,6 +123,45 @@ static inline int dt_test_reanimate(dt_unit_t *unit)
return ret == 0;
}
/*! \brief Resize unit. */
static inline int dt_test_resize(dt_unit_t *unit, int size)
{
// Resize
int ret = 0;
ret = dt_resize(unit, size);
if (ret < 0) {
return 0;
}
// Check outcome
if (unit->size != size) {
return 0;
}
// Repurpose all
for (int i = 0; i < size; ++i) {
ret += dt_repurpose(unit->threads[i], &runnable, 0);
}
// Restart
_runnable_i = 0;
ret += dt_start(unit);
// Wait for finish
ret += dt_join(unit);
// Verify
int expected = size * _runnable_cycles;
note("resize test: %d threads, %d ticks, %d expected",
size, _runnable_i, expected);
if(_runnable_i != expected) {
return 0;
}
// Check return codes
return ret == 0;
}
/*! \brief Start unit. */
static inline int dt_test_start(dt_unit_t *unit)
{
......@@ -132,7 +171,11 @@ static inline int dt_test_start(dt_unit_t *unit)
/*! \brief Stop unit. */
static inline int dt_test_stop(dt_unit_t *unit)
{
return dt_stop(unit) == 0;
int ret = 0;
for (int i = 0; i < unit->size; ++i)
ret += dt_stop(unit->threads[i]);
return ret;
}
/*! \brief Join unit. */
......@@ -213,7 +256,17 @@ static int dt_tests_run(int argc, char * argv[])
/* Test 13: Reanimate dead threads. */
ok(dt_test_reanimate(unit), "dthreads: reanimate dead threads");
/* Test 14: Deinitialize */
/* Test 14: Expand unit by 100%. */
int size = unit->size * 2;
ok(dt_test_resize(unit, size),
"dthreads: expanding unit to size * 2 (%d threads)", size);
/* Test 15: Shrink unit to half. */
size = unit->size / 2;
ok(dt_test_resize(unit, size),
"dthreads: shrinking unit to size / 2 (%d threads)", size);
/* Test 16: Deinitialize */
dt_delete(&unit);
ok(unit == 0, "dthreads: delete unit");
endskip;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment