Commit e860fc4a authored by Marek Vavrusa's avatar Marek Vavrusa

Set affinity of the UDP threads to specific cores on Linux.

Partial core overlap was implemented to prevent stalls and
better utilization of free cores by the scheduler. Not portable.
parent 703aa3cb
......@@ -164,7 +164,7 @@ AC_DEFINE([DSFMT_MEXP], [521], [DSFMT parameters.])
# Checks for library functions.
AC_FUNC_FORK
AC_FUNC_MMAP
AC_CHECK_FUNCS([gethostbyname gettimeofday clock_gettime memalign memmove memset munmap regcomp pselect select socket sqrt strcasecmp strchr strdup strerror strncasecmp strtol strtoul poll epoll_wait kqueue setgroups sendmmsg madvise])
AC_CHECK_FUNCS([gethostbyname gettimeofday clock_gettime memalign memmove memset munmap regcomp pselect select socket sqrt strcasecmp strchr strdup strerror strncasecmp strtol strtoul poll epoll_wait kqueue setgroups sendmmsg madvise pthread_setaffinity_np])
AC_CONFIG_FILES([Makefile
samples/Makefile
......
......@@ -854,6 +854,25 @@ int dt_stop(dt_unit_t *unit)
// return KNOTD_EOK;
//}
int dt_setaffinity(dthread_t *thread, cpu_set_t *mask)
{
if (thread == NULL || mask == NULL) {
return KNOTD_EINVAL;
}
#ifdef HAVE_PTHREAD_SETAFFINITY_NP
pthread_t tid = pthread_self();
int ret = pthread_setaffinity_np(tid, sizeof(cpu_set_t), mask);
if (ret < 0) {
return KNOTD_ERROR;
}
#else
return KNOTD_ENOTSUP;
#endif
return KNOTD_EOK;
}
int dt_repurpose(dthread_t *thread, runnable_t runnable, void *data)
{
// Check
......@@ -963,14 +982,22 @@ int dt_compact(dt_unit_t *unit)
return KNOTD_EOK;
}
int dt_optimal_size()
int dt_online_cpus()
{
int ret = -1;
#ifdef _SC_NPROCESSORS_ONLN
int ret = (int) sysconf(_SC_NPROCESSORS_ONLN);
if (ret >= 1) {
ret = (int) sysconf(_SC_NPROCESSORS_ONLN);
#endif
return ret;
}
int dt_optimal_size()
{
int ret = dt_online_cpus();
if (ret > 0) {
return ret + CPU_ESTIMATE_MAGIC;
}
#endif
dbg_dt("dthreads: failed to fetch the number of online CPUs.");
return DEFAULT_THR_COUNT;
}
......@@ -993,6 +1020,22 @@ int dt_is_cancelled(dthread_t *thread)
return ret;
}
unsigned dt_get_id(dthread_t *thread)
{
if (thread == NULL || thread->unit == NULL) {
return 0;
}
dt_unit_t *unit = thread->unit;
for(unsigned tid = 0; tid < unit->size; ++tid) {
if (thread == unit->threads[tid]) {
return tid;
}
}
return 0;
}
int dt_unit_lock(dt_unit_t *unit)
{
// Check input
......
......@@ -250,6 +250,17 @@ int dt_stop(dt_unit_t *unit);
*/
//int dt_setprio(dthread_t *thread, int prio);
/*!
* \brief Set thread affinity to masked CPU's.
*
* \param thread Target thread instance.
* \param mask CPU mask.
*
* \retval KNOTD_EOK on success.
* \retval KNOTD_EINVAL on invalid parameters.
*/
int dt_setaffinity(dthread_t *thread, cpu_set_t *mask);
/*!
* \brief Set thread to execute another runnable.
*
......@@ -306,10 +317,18 @@ int dt_cancel(dthread_t *thread);
*/
int dt_compact(dt_unit_t *unit);
/*!
* \brief Return number of online processors.
*
* \retval Number of online CPU's if success.
* \retval <0 on failure.
*/
int dt_online_cpus();
/*!
* \brief Return optimal number of threads for instance.
*
* It is estimated as NUM_CPUs + 1.
* It is estimated as NUM_CPUs + CONSTANT.
* Fallback is DEFAULT_THR_COUNT (\see common.h).
*
* \return Number of threads.
......@@ -328,6 +347,18 @@ int dt_optimal_size();
*/
int dt_is_cancelled(dthread_t *thread);
/*!
* \brief Return thread index in threading unit.
*
* \note Returns 0 when thread doesn't have a unit.
*
* \param thread Target thread instance.
*
* \return Thread index.
*/
unsigned dt_get_id(dthread_t *thread);
/*!
* \brief Lock unit to prevent parallel operations which could alter unit
* at the same time.
......
......@@ -201,6 +201,19 @@ static inline int udp_master_recvfrom(dthread_t *thread, stat_t *thread_stat)
return KNOTD_EINVAL;
}
/* Set CPU affinity to improve load distribution on multicore systems.
* Partial overlapping mask to be nice to scheduler.
*/
int cpcount = dt_online_cpus();
if (cpcount > 0) {
unsigned tid = dt_get_id(thread);
cpu_set_t cpus;
CPU_ZERO(&cpus);
CPU_SET(tid % cpcount, &cpus);
CPU_SET((tid + 1) % cpcount, &cpus);
dt_setaffinity(thread, &cpus);
}
knot_nameserver_t *ns = h->server->nameserver;
/* Initialize remote party address. */
......@@ -388,6 +401,19 @@ static inline int udp_master_recvmmsg(dthread_t *thread, stat_t *thread_stat)
msgs[i].msg_hdr.msg_name = addrs[i].ptr;
msgs[i].msg_hdr.msg_namelen = addrs[i].len;
}
/* Set CPU affinity to improve load distribution on multicore systems.
* Partial overlapping mask to be nice to scheduler.
*/
int cpcount = dt_online_cpus();
if (cpcount > 0) {
unsigned tid = dt_get_id(thread);
cpu_set_t cpus;
CPU_ZERO(&cpus);
CPU_SET(tid % cpcount, &cpus);
CPU_SET((tid + 1) % cpcount, &cpus);
dt_setaffinity(thread, &cpus);
}
/* Loop until all data is read. */
ssize_t n = 0;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment