Commit f8ebb5ba authored by Martin Straka's avatar Martin Straka

Added worker pool

parent 724dceb0
......@@ -102,9 +102,15 @@ SOURCES += \
src/models/filemodel.cpp \
src/models/list_sort_filter_proxy_model.cpp \
src/models/messagemodel.cpp \
src/net/db_wrapper.cpp \
src/net/isds_session.cpp \
src/net/isds_wrapper.cpp \
src/net/net_layer.cpp \
src/net/xml_layer.cpp \
src/qml_interaction/interaction_zfo_file.cpp \
src/qml_interaction/message_info.cpp \
src/settings.cpp \
src/setwrapper.cpp \
src/sqlite/account_db.cpp \
src/sqlite/dbs.cpp \
src/sqlite/db_tables.cpp \
......@@ -112,12 +118,7 @@ SOURCES += \
src/sqlite/file_db.cpp \
src/sqlite/message_db_container.cpp \
src/sqlite/message_db.cpp \
src/net/xml_layer.cpp \
src/net/net_layer.cpp \
src/net/isds_wrapper.cpp \
src/net/isds_session.cpp \
src/net/db_wrapper.cpp \
src/setwrapper.cpp
src/worker/pool.cpp
HEADERS += \
src/accounts.h \
......@@ -139,9 +140,16 @@ HEADERS += \
src/models/filemodel.h \
src/models/list_sort_filter_proxy_model.h \
src/models/messagemodel.h \
src/net/db_wrapper.h \
src/net/isds_const.h \
src/net/isds_session.h \
src/net/isds_wrapper.h \
src/net/net_layer.h \
src/net/xml_layer.h \
src/qml_interaction/interaction_zfo_file.h \
src/qml_interaction/message_info.h \
src/settings.h \
src/setwrapper.h \
src/sqlite/account_db.h \
src/sqlite/dbs.h \
src/sqlite/db_tables.h \
......@@ -149,13 +157,7 @@ HEADERS += \
src/sqlite/file_db.h \
src/sqlite/message_db_container.h \
src/sqlite/message_db.h \
src/net/xml_layer.h \
src/net/isds_const.h \
src/net/net_layer.h \
src/net/isds_wrapper.h \
src/net/isds_session.h \
src/net/db_wrapper.h \
src/setwrapper.h
src/worker/pool.h
android {
SOURCES += \
......
......@@ -30,12 +30,22 @@
#include "src/settings.h"
IsdsWrapper::IsdsWrapper(QObject *parent)
: QObject(parent)
: QObject(parent),
m_workPool(1)
/*
* TODO -- To be able to run multiple therads in the pool a locking mechanism
* over libisds context structures must be implemented.
* Also, per-context queueing ought to be implemented to avoid unnecessary
* waiting.
*/
{
m_workPool.start();
}
IsdsWrapper::~IsdsWrapper(void)
{
m_workPool.wait();
m_workPool.stop();
}
bool IsdsWrapper::syncAllAccounts(const QVariant &acntModelVariant)
......
......@@ -28,6 +28,7 @@
#include "src/net/isds_session.h"
#include "src/net/xml_layer.h"
#include "src/worker/pool.h"
class MessageListModel; /* Forward declaration. */
......@@ -262,6 +263,11 @@ private:
* Class IsdsSession holds and manages isds contexts of accounts.
*/
IsdsSession m_isdsSession;
/*!
* @brief Worker pool instance.
*/
WorkerPool m_workPool;
};
#endif // _ISDS_WRAPPER_H_
/*
* Copyright (C) 2014-2017 CZ.NIC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In addition, as a special exception, the copyright holders give
* permission to link the code of portions of this program with the
* OpenSSL library under certain conditions as described in each
* individual source file, and distribute linked combinations including
* the two.
*/
#include "src/worker/pool.h"
WorkerThread::WorkerThread(WorkerPool *pool)
: m_pool(pool)
{
}
void WorkerThread::run(void)
{
WorkerPool::run(m_pool);
}
WorkerPool::WorkerPool(unsigned threads, QObject *parent)
: QObject(parent),
m_threadPtrs(threads),
m_lock(QMutex::NonRecursive),
m_wake(),
m_terminating(false),
m_suspended(false),
m_running(0),
m_singleTask(0),
m_tasksHi(),
m_tasksLo(),
m_dequeuedRunning(),
m_singleState(FINISHED)
{
for (unsigned i = 0; i < threads; ++i) {
m_threadPtrs[i] = new (std::nothrow) WorkerThread(this);
}
}
WorkerPool::~WorkerPool(void)
{
for (int i = 0; i < m_threadPtrs.size(); ++i) {
delete m_threadPtrs[i]; m_threadPtrs[i] = 0;
}
}
void WorkerPool::start(void)
{
m_terminating = false;
m_suspended = false;
for (int i = 0; i < m_threadPtrs.size(); ++i) {
m_threadPtrs[i]->start();
}
}
void WorkerPool::stop(void)
{
m_lock.lock();
m_terminating = true;
m_wake.wakeAll();
m_lock.unlock();
for (int i = 0; i < m_threadPtrs.size(); ++i) {
m_threadPtrs[i]->wait();
}
}
void WorkerPool::suspend(void)
{
m_lock.lock();
m_suspended = true;
m_lock.unlock();
}
void WorkerPool::resume(void)
{
m_lock.lock();
m_suspended = false;
m_wake.wakeAll();
m_lock.unlock();
}
void WorkerPool::wait(void)
{
m_lock.lock();
while ((0 != m_singleTask) || !m_tasksHi.isEmpty() ||
!m_tasksLo.isEmpty() || (m_running > 0)) {
m_wake.wait(&m_lock);
}
m_lock.unlock();
}
void WorkerPool::assignLo(QRunnable *task, enum WorkerPool::EnqueueOrder order)
{
if (0 == task) {
return;
}
m_lock.lock();
if (APPEND == order) {
m_tasksLo.enqueue(task);
} else {
m_tasksLo.prepend(task);
}
m_wake.wakeAll();
m_lock.unlock();
}
void WorkerPool::assignHi(QRunnable *task, enum WorkerPool::EnqueueOrder order)
{
if (0 == task) {
return;
}
m_lock.lock();
if (APPEND == order) {
m_tasksHi.enqueue(task);
} else {
m_tasksHi.prepend(task);
}
m_wake.wakeAll();
m_lock.unlock();
}
void WorkerPool::runSingle(QRunnable *task)
{
if (0 == task) {
return;
}
m_lock.lock();
while (0 != m_singleTask) {
m_wake.wait(&m_lock);
}
m_singleTask = task;
m_singleState = PENDING;
m_wake.wakeAll();
m_lock.unlock();
m_lock.lock();
while (FINISHED != m_singleState) {
m_wake.wait(&m_lock);
}
m_singleTask = 0; /* Leave in FINISHED. */
m_wake.wakeAll();
m_lock.unlock();
}
/*!
* @brief Empties task queues.
*
* @param[in,out] taskQueue Task queue to be emptied.
*/
static
void clearTaskQueue(QQueue<QRunnable *> &taskQueue)
{
while (!taskQueue.isEmpty()) {
QRunnable *task = taskQueue.dequeue();
if (task->autoDelete()) {
delete task;
}
}
}
void WorkerPool::clear(void)
{
m_lock.lock();
clearTaskQueue(m_tasksHi);
clearTaskQueue(m_tasksLo);
m_lock.unlock();
}
bool WorkerPool::working(void)
{
bool isWorking = false;
m_lock.lock();
isWorking = !((0 == m_running) && (0 == m_singleTask) &&
m_tasksHi.isEmpty() && m_tasksLo.isEmpty());
m_lock.unlock();
return isWorking;
}
void WorkerPool::run(WorkerPool *pool)
{
Q_ASSERT(0 != pool);
pool->m_lock.lock();
forever {
if (pool->m_terminating) {
break;
}
QRunnable *task = 0;
if (!pool->m_suspended) {
if ((0 != pool->m_singleTask) && (PENDING == pool->m_singleState)) {
task = pool->m_singleTask;
pool->m_singleState = EXECUTING;
} else if (!pool->m_tasksHi.isEmpty()) {
task = pool->m_tasksHi.dequeue();
pool->m_dequeuedRunning.insert(task);
} else if (!pool->m_tasksLo.isEmpty()) {
task = pool->m_tasksLo.dequeue();
pool->m_dequeuedRunning.insert(task);
}
}
if (0 == task) {
pool->m_wake.wait(&pool->m_lock);
continue;
}
++pool->m_running;
pool->m_lock.unlock();
task->run();
if (task->autoDelete()) {
QRunnable *deletedTask = task;
delete deletedTask;
}
pool->m_lock.lock();
--pool->m_running;
if (task == pool->m_singleTask) {
Q_ASSERT(EXECUTING == pool->m_singleState);
pool->m_singleState = FINISHED;
} else {
Q_ASSERT(pool->m_dequeuedRunning.contains(task));
pool->m_dequeuedRunning.remove(task);
}
if (pool->m_dequeuedRunning.isEmpty() &&
pool->m_tasksHi.isEmpty() &&
pool->m_tasksLo.isEmpty()) {
if (task == pool->m_singleTask) {
Q_ASSERT(0 == pool->m_running);
emit pool->finished();
} else {
Q_ASSERT(1 >= pool->m_running);
emit pool->assignedFinished();
}
}
pool->m_wake.wakeAll();
}
pool->m_lock.unlock();
}
/*
* Copyright (C) 2014-2017 CZ.NIC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In addition, as a special exception, the copyright holders give
* permission to link the code of portions of this program with the
* OpenSSL library under certain conditions as described in each
* individual source file, and distribute linked combinations including
* the two.
*/
#ifndef _POOL_H_
#define _POOL_H_
#include <QMutex>
#include <QQueue>
#include <QObject>
#include <QRunnable>
#include <QSet>
#include <QThread>
#include <QVector>
#include <QWaitCondition>
/*
* QThreadPool is not be best choice.
* Worker objects are unnecessary.
*/
class WorkerPool;
/*!
* @brief Worker.
*/
class WorkerThread : public QThread {
Q_OBJECT
public:
/*!
* @brief Constructor.
*
* @param[in] pool Pointer to be stored.
*/
WorkerThread(WorkerPool *pool);
protected:
/*!
* @brief Runs the worker code.
*/
virtual
void run(void) Q_DECL_OVERRIDE;
friend class WorkerPool;
private:
WorkerPool *m_pool; /* Pointer to worker pool, must be non-null. */
};
/*!
* @brief Pool of workers.
*/
class WorkerPool : public QObject {
Q_OBJECT
public:
/*!
* @brief Whether to enqueue at the front or the rear of the queue.
*/
enum EnqueueOrder {
APPEND = 0,
PREPEND
};
/*!
* @brief Constructor.
*
* @param[in] threads Number of threads to create.
* @param[in] parent Object parent.
*/
WorkerPool(unsigned threads, QObject *parent = 0);
/*!
* @brief Destructor.
*/
~WorkerPool(void);
/*!
* @brief Start all threads in the worker pool.
*/
void start(void);
/*!
* @brief Stop processing of new tasks, start stopping worker threads
* when possible.
*/
void stop(void);
/*!
* @brief Temporarily suspend the execution of worker pool.
*/
void suspend(void);
/*!
* @brief Resume the execution of worker pool.
*/
void resume(void);
/*!
* @brief Wait till the number of pending tasks is zero.
*/
void wait(void);
/*!
* @brief Assign a low priority task to be performed by a worker in
* the pool.
*
* @param[in] task Task to be performed by the worker.
* @param[in] order Whether to prepend a task, default is append.
*/
void assignLo(QRunnable *task, enum EnqueueOrder order = APPEND);
/*!
* @brief Assign a high priority task to be performed by a worker in
* the pool.
*
* @param[in] task Task to be performed by the worker.
* @param[in] order Whether to prepend a task, default is append.
*/
void assignHi(QRunnable *task, enum EnqueueOrder order = APPEND);
/*!
* @brief Run a single task to be performed by a worker in the pool.
*
* @note Blocks until it can be enqueued and waits for being finished.
*
* @param[in] task Task to be performed by the worker.
*/
void runSingle(QRunnable *task);
/*!
* @brief Clear all tasks enqueued in pool processing queue.
*/
void clear(void);
/*!
* @brief Return true if some workers have jobs to do.
*/
bool working(void);
signals:
/*!
* @brief Emitted when all jobs finished and queues are empty.
*/
void finished(void);
/*!
* @brief Emitted when all enqueued jobs finished and queues are empty.
*/
void assignedFinished(void);
protected:
/*!
* @brief Worker code.
*/
static
void run(WorkerPool *pool);
friend class WorkerThread;
private:
Q_DISABLE_COPY(WorkerPool)
QVector<WorkerThread *> m_threadPtrs; /*!< Pool of threads. */
QMutex m_lock;
QWaitCondition m_wake;
bool m_terminating; /*!< Is the pool terminating? */
bool m_suspended; /*!< Is the execution temporarily suspended? */
int m_running; /*!< Number of running threads. */
/*
* Single task has the highest priority. It is used when the code
* has to directly wait for the results -- it may block the event loop.
* Single tasks are advised to have the auto delete property disabled.
*
* Tasks in high priority queue take precedence over all low priority
* tasks. High priority tasks are mainly used for sending messages,
* whereas low priority tasks are prevalently used for downloading
* data. Tasks in these queues should have their auto delete properties
* enabled as there is currently no other mechanism how to delete them.
*/
QRunnable *m_singleTask; /*!< Single task. */
QQueue<QRunnable *> m_tasksHi; /*!< Queue of high priority tasks. */
QQueue<QRunnable *> m_tasksLo; /*!< Queue of low priority tasks. */
QSet<QRunnable *> m_dequeuedRunning; /*!<
* Set of running tasks except
* the single task.
*/
/*
* Single task has the highest priority. The runSingle() method
* blocks when a single task has already been assigned but didn't
* finish.
*
* The single task is meant a as a synchronous blocking supplement of
* direct worker calls such as download single message.
*/
enum ExecutionState {
PENDING, /*!< Task waiting to be executed. */
EXECUTING, /*!< Task currently being executed. */
FINISHED /*!< Task execution finished. */
};
enum ExecutionState m_singleState; /*!< Single execution state. */
};
#endif /* _POOL_H_ */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment