Commit 2e2fefe0 authored by Jan Včelák's avatar Jan Včelák 🚀

server: replace select() with poll() for UDP

parent 076c6701
......@@ -439,34 +439,57 @@ void __attribute__ ((constructor)) udp_master_init()
#endif /* HAVE_RECVMMSG */
}
/*! \brief Release the reference on the interface list and clear watched fdset. */
static void forget_ifaces(ifacelist_t *ifaces, fd_set *set, int maxfd)
/*! \brief Get interface UDP descriptor for a given thread. */
static int iface_udp_fd(const iface_t *iface, int thread_id)
{
#ifdef ENABLE_REUSEPORT
return iface->fd_udp[thread_id % iface->fd_udp_count];
#else
return iface->fd_udp[0];
#endif
}
/*! \brief Release the interface list reference and free watched descriptor set. */
static void forget_ifaces(ifacelist_t *ifaces, struct pollfd **fds_ptr)
{
ref_release((ref_t *)ifaces);
FD_ZERO(set);
free(*fds_ptr);
*fds_ptr = NULL;
}
/*! \brief Add interface sockets to the watched fdset. */
static void track_ifaces(ifacelist_t *ifaces, fd_set *set,
int *maxfd, int *minfd, int thrid)
/*!
* \brief Make a set of watched descriptors based on the interface list.
*
* \param[in] ifaces New interface list.
* \param[in] thrid Thread ID.
* \param[out] fds_ptr Allocated set of descriptors.
*
* \return Number of watched descriptors, zero on error.
*/
static nfds_t track_ifaces(const ifacelist_t *ifaces, int thrid,
struct pollfd **fds_ptr)
{
assert(ifaces && set && maxfd && minfd);
assert(ifaces && fds_ptr);
FD_ZERO(set);
*maxfd = 0;
*minfd = FD_SETSIZE - 1;
nfds_t nfds = list_size(&ifaces->l);
struct pollfd *fds = malloc(nfds * sizeof(*fds));
if (!fds) {
*fds_ptr = NULL;
return 0;
}
iface_t *iface = NULL;
int i = 0;
WALK_LIST(iface, ifaces->l) {
#ifdef ENABLE_REUSEPORT
int fd = iface->fd_udp[thrid % iface->fd_udp_count];
#else
int fd = iface->fd_udp[0];
#endif
*maxfd = MAX(fd, *maxfd);
*minfd = MIN(fd, *minfd);
FD_SET(fd, set);
fds[i].fd = iface_udp_fd(iface, thrid);
fds[i].events = POLLIN;
fds[i].revents = 0;
i += 1;
}
assert(i == nfds);
*fds_ptr = fds;
return nfds;
}
int udp_master(dthread_t *thread)
......@@ -503,12 +526,9 @@ int udp_master(dthread_t *thread)
mm_ctx_mempool(&mm, 16 * MM_DEFAULT_BLKSIZE);
udp.overlay.mm = &mm;
/* Chose select as epoll/kqueue has larger overhead for a
* single or handful of sockets. */
fd_set fds;
FD_ZERO(&fds);
int minfd = 0, maxfd = 0;
int rcvd = 0;
/* Event source. */
struct pollfd *fds = NULL;
nfds_t nfds = 0;
udp_pps_begin();
......@@ -521,10 +541,13 @@ int udp_master(dthread_t *thread)
udp.thread_id = handler->thread_id[thr_id];
rcu_read_lock();
forget_ifaces(ref, &fds, maxfd);
forget_ifaces(ref, &fds);
ref = handler->server->ifaces;
track_ifaces(ref, &fds, &maxfd, &minfd, udp.thread_id);
nfds = track_ifaces(ref, udp.thread_id, &fds);
rcu_read_unlock();
if (nfds == 0) {
break;
}
}
/* Cancellation point. */
......@@ -533,29 +556,31 @@ int udp_master(dthread_t *thread)
}
/* Wait for events. */
fd_set rfds;
FD_COPY(&fds, &rfds);
int nfds = select(maxfd + 1, &rfds, NULL, NULL, NULL);
if (nfds <= 0) {
int events = poll(fds, nfds, -1);
if (events <= 0) {
if (errno == EINTR) continue;
break;
}
/* Bound sockets will be usually closely coupled. */
for (unsigned fd = minfd; fd <= maxfd; ++fd) {
if (FD_ISSET(fd, &rfds)) {
if ((rcvd = _udp_recv(fd, rq)) > 0) {
_udp_handle(&udp, rq);
/* Flush allocated memory. */
mp_flush(mm.ctx);
_udp_send(rq);
udp_pps_sample(rcvd, thr_id);
}
/* Process the events. */
for (nfds_t i = 0; i < nfds && events > 0; i++) {
if (fds[i].revents == 0) {
continue;
}
events -= 1;
int rcvd = 0;
if ((rcvd = _udp_recv(fds[i].fd, rq)) > 0) {
_udp_handle(&udp, rq);
/* Flush allocated memory. */
mp_flush(mm.ctx);
_udp_send(rq);
udp_pps_sample(rcvd, thr_id);
}
}
}
_udp_deinit(rq);
forget_ifaces(ref, &fds, maxfd);
forget_ifaces(ref, &fds);
mp_delete(mm.ctx);
return KNOT_EOK;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment