io.c 9.82 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
/*  Copyright (C) 2014 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
14
    along with this program.  If not, see <https://www.gnu.org/licenses/>.
15 16
 */

17
#include <string.h>
18
#include <libknot/errcode.h>
19 20
#include <contrib/ucw/lib.h>
#include <contrib/ucw/mempool.h>
21
#include <assert.h>
22 23 24 25 26

#include "daemon/io.h"
#include "daemon/network.h"
#include "daemon/worker.h"

27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
#define negotiate_bufsize(func, handle, bufsize_want) do { \
    int bufsize = 0; func(handle, &bufsize); \
	if (bufsize < bufsize_want) { \
		bufsize = bufsize_want; \
		func(handle, &bufsize); \
	} \
} while (0)

static void check_bufsize(uv_handle_t* handle)
{
	/* We want to buffer at least N waves in advance.
	 * This is magic presuming we can pull in a whole recvmmsg width in one wave.
	 * Linux will double this the bufsize wanted.
	 */
	const int bufsize_want = RECVMMSG_BATCH * 65535 * 2;
	negotiate_bufsize(uv_recv_buffer_size, handle, bufsize_want);
	negotiate_bufsize(uv_send_buffer_size, handle, bufsize_want);
}

#undef negotiate_bufsize

48
static void session_clear(struct session *s)
49
{
50
	assert(s->outgoing || s->tasks.len == 0);
51 52
	array_clear(s->tasks);
	memset(s, 0, sizeof(*s));
53 54
}

55
void session_free(struct session *s)
56
{
57 58 59 60
	if (s) {
		session_clear(s);
		free(s);
	}
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
}

struct session *session_new(void)
{
	return calloc(1, sizeof(struct session));
}

static struct session *session_borrow(struct worker_ctx *worker)
{
	struct session *s = NULL;
	if (worker->pool_sessions.len > 0) {
		s = array_tail(worker->pool_sessions);
		array_pop(worker->pool_sessions);
		kr_asan_unpoison(s, sizeof(*s));
	} else {
		s = session_new();
	}
	return s;
}

static void session_release(struct worker_ctx *worker, struct session *s)
{
83 84 85
	if (!s) {
		return;
	}
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
	if (worker->pool_sessions.len < MP_FREELIST_SIZE) {
		session_clear(s);
		array_push(worker->pool_sessions, s);
		kr_asan_poison(s, sizeof(*s));
	} else {
		session_free(s);
	}
}

static uv_stream_t *handle_alloc(uv_loop_t *loop)
{
	uv_stream_t *handle = calloc(1, sizeof(*handle));
	if (!handle) {
		return NULL;
	}

	return handle;
103 104
}

105
static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
106
{
107 108 109 110 111
	/* Worker has single buffer which is reused for all incoming
	 * datagrams / stream reads, the content of the buffer is
	 * guaranteed to be unchanged only for the duration of
	 * udp_read() and tcp_read().
	 */
112
	struct session *session = handle->data;
113 114
	uv_loop_t *loop = handle->loop;
	struct worker_ctx *worker = loop->data;
115
	buf->base = (char *)worker->wire_buf;
116 117 118 119
	/* Limit TCP stream buffer size to 4K for granularity in batches of incoming queries. */
	if (handle->type == UV_TCP) {
		buf->len = MIN(suggested_size, 4096);
	/* Regular buffer size for subrequests. */
120
	} else if (session->outgoing) {
121
		buf->len = suggested_size;
122 123
	/* Use recvmmsg() on master sockets if possible. */
	} else {
124
		buf->len = sizeof(worker->wire_buf);
125
	}
126 127
}

128
void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
129 130 131 132
	const struct sockaddr *addr, unsigned flags)
{
	uv_loop_t *loop = handle->loop;
	struct worker_ctx *worker = loop->data;
133
	if (nread <= 0) {
134
		if (nread < 0) { /* Error response, notify resolver */
135
			worker_submit(worker, (uv_handle_t *)handle, NULL, addr);
136
		} /* nread == 0 is for freeing buffers, we don't need to do this */
137 138 139
		return;
	}

140
	knot_pkt_t *query = knot_pkt_new(buf->base, nread, &worker->pkt_pool);
141 142
	if (query) {
		query->max_size = KNOT_WIRE_MAX_PKTSIZE;
143
		worker_submit(worker, (uv_handle_t *)handle, query, addr);
144
	}
145
	mp_flush(worker->pkt_pool.ctx);
146 147
}

148 149 150 151 152 153 154 155 156
static int udp_bind_finalize(uv_handle_t *handle)
{
	check_bufsize((uv_handle_t *)handle);
	/* Handle is already created, just create context. */
	handle->data = session_new();
	assert(handle->data);
	return io_start_read((uv_handle_t *)handle);
}

157
int udp_bind(uv_udp_t *handle, struct sockaddr *addr)
158
{
159 160 161 162 163
	unsigned flags = UV_UDP_REUSEADDR;
	if (addr->sa_family == AF_INET6) {
		flags |= UV_UDP_IPV6ONLY;
	}
	int ret = uv_udp_bind(handle, addr, flags);
164 165 166
	if (ret != 0) {
		return ret;
	}
167 168 169 170 171 172 173 174 175 176 177 178 179 180
	return udp_bind_finalize((uv_handle_t *)handle);
}

int udp_bindfd(uv_udp_t *handle, int fd)
{
	if (!handle) {
		return kr_error(EINVAL);
	}

	int ret = uv_udp_open(handle, (uv_os_sock_t) fd);
	if (ret != 0) {
		return ret;
	}
	return udp_bind_finalize((uv_handle_t *)handle);
181 182
}

183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
static void tcp_timeout(uv_handle_t *timer)
{
	uv_handle_t *handle = timer->data;
	uv_close(handle, io_free);
}

static void tcp_timeout_trigger(uv_timer_t *timer)
{
	uv_handle_t *handle = timer->data;
	struct session *session = handle->data;
	if (session->tasks.len > 0) {
		uv_timer_again(timer);
	} else {
		uv_close((uv_handle_t *)timer, tcp_timeout);
	}
}

200 201 202
static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
{
	uv_loop_t *loop = handle->loop;
203
	struct session *s = handle->data;
204
	struct worker_ctx *worker = loop->data;
205 206
	/* TCP pipelining is rather complicated and requires cooperation from the worker
	 * so the whole message reassembly and demuxing logic is inside worker */
207
	int ret = worker_process_tcp(worker, (uv_handle_t *)handle, (const uint8_t *)buf->base, nread);
208 209 210 211
	if (ret < 0) {
		worker_end_tcp(worker, (uv_handle_t *)handle);
		/* Exceeded per-connection quota for outstanding requests
		 * stop reading from stream and close after last message is processed. */
212
		if (!s->outgoing && !uv_is_closing((uv_handle_t *)&s->timeout)) {
213 214 215 216 217 218 219 220
			uv_timer_stop(&s->timeout);
			if (s->tasks.len == 0) {
				uv_close((uv_handle_t *)&s->timeout, tcp_timeout);
			} else { /* If there are tasks running, defer until they finish. */
				uv_timer_start(&s->timeout, tcp_timeout_trigger, 1, KR_CONN_RTT_MAX/2);
			}
		}
	/* Connection spawned more than one request, reset its deadline for next query. */
221
	} else if (ret > 0 && !s->outgoing) {
222
		uv_timer_again(&s->timeout);
223
	}
224
	mp_flush(worker->pkt_pool.ctx);
225
}
226

227 228 229
static void tcp_accept(uv_stream_t *master, int status)
{
	if (status != 0) {
230 231
		return;
	}
232
	uv_stream_t *client = handle_alloc(master->loop);
233 234 235
	if (!client) {
		return;
	}
236
	memset(client, 0, sizeof(*client));
237 238
	io_create(master->loop, (uv_handle_t *)client, SOCK_STREAM);
	if (uv_accept(master, client) != 0) {
239
		uv_close((uv_handle_t *)client, io_free);
240 241 242
		return;
	}

243 244 245 246 247 248 249 250
	/* Set deadlines for TCP connection and start reading.
	 * It will re-check every half of a request time limit if the connection
	 * is idle and should be terminated, this is an educated guess. */
	struct session *session = client->data;
	uv_timer_t *timer = &session->timeout;
	uv_timer_init(master->loop, timer);
	timer->data = client;
	uv_timer_start(timer, tcp_timeout_trigger, KR_CONN_RTT_MAX/2, KR_CONN_RTT_MAX/2);
251
	io_start_read((uv_handle_t *)client);
252 253
}

254
static int set_tcp_option(uv_handle_t *handle, int option, int val)
255
{
256
	uv_os_fd_t fd = 0;
257
	if (uv_fileno(handle, &fd) == 0) {
258 259 260 261 262
		return setsockopt(fd, IPPROTO_TCP, option, &val, sizeof(val));
	}
	return 0; /* N/A */
}

263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
static int tcp_bind_finalize(uv_handle_t *handle)
{
	/* TCP_FASTOPEN enables 1 RTT connection resumptions. */
#ifdef TCP_FASTOPEN
# ifdef __linux__
	(void) set_tcp_option(handle, TCP_FASTOPEN, 16); /* Accepts queue length hint */
# else
	(void) set_tcp_option(handle, TCP_FASTOPEN, 1);  /* Accepts on/off */
# endif
#endif

	handle->data = NULL;
	return 0;
}

278 279 280
static int _tcp_bind(uv_tcp_t *handle, struct sockaddr *addr, uv_connection_cb connection)
{
	unsigned flags = 0;
281
	if (addr->sa_family == AF_INET6) {
282
		flags |= UV_TCP_IPV6ONLY;
283
	}
284

285
	int ret = uv_tcp_bind(handle, addr, flags);
286 287 288 289
	if (ret != 0) {
		return ret;
	}

290 291
	/* TCP_DEFER_ACCEPT delays accepting connections until there is readable data. */
#ifdef TCP_DEFER_ACCEPT
292
	if (set_tcp_option((uv_handle_t *)handle, TCP_DEFER_ACCEPT, KR_CONN_RTT_MAX/1000) != 0) {
293 294 295 296 297
		kr_log_info("[ io ] tcp_bind (defer_accept): %s\n", strerror(errno));
	}
#endif

	ret = uv_listen((uv_stream_t *)handle, 16, connection);
298 299 300 301
	if (ret != 0) {
		return ret;
	}

302
	return tcp_bind_finalize((uv_handle_t *)handle);
303 304
}

305 306 307 308 309
int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr)
{
	return _tcp_bind(handle, addr, tcp_accept);
}

310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
int tcp_bindfd(uv_tcp_t *handle, int fd)
{
	if (!handle) {
		return kr_error(EINVAL);
	}

	int ret = uv_tcp_open(handle, (uv_os_sock_t) fd);
	if (ret != 0) {
		return ret;
	}

	ret = uv_listen((uv_stream_t *)handle, 16, tcp_accept);
	if (ret != 0) {
		return ret;
	}
	return tcp_bind_finalize((uv_handle_t *)handle);
}

328
void io_create(uv_loop_t *loop, uv_handle_t *handle, int type)
329 330
{
	if (type == SOCK_DGRAM) {
331
		uv_udp_init(loop, (uv_udp_t *)handle);
332
	} else {
333
		uv_tcp_init(loop, (uv_tcp_t *)handle);
334
		uv_tcp_nodelay((uv_tcp_t *)handle, 1);
335
	}
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363

	struct worker_ctx *worker = loop->data;
	handle->data = session_borrow(worker);
	assert(handle->data);
}

void io_deinit(uv_handle_t *handle)
{
	if (!handle) {
		return;
	}
	uv_loop_t *loop = handle->loop;
	if (loop && loop->data) {
		struct worker_ctx *worker = loop->data;
		session_release(worker, handle->data);
	} else {
		session_free(handle->data);
	}
	handle->data = NULL;
}

void io_free(uv_handle_t *handle)
{
	if (!handle) {
		return;
	}
	io_deinit(handle);
	free(handle);
364
}
365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381

int io_start_read(uv_handle_t *handle)
{
	if (handle->type == UV_UDP) {
		return uv_udp_recv_start((uv_udp_t *)handle, &handle_getbuf, &udp_recv);
	} else {
		return uv_read_start((uv_stream_t *)handle, &handle_getbuf, &tcp_recv);
	}
}

int io_stop_read(uv_handle_t *handle)
{
	if (handle->type == UV_UDP) {
		return uv_udp_recv_stop((uv_udp_t *)handle);
	} else {
		return uv_read_stop((uv_stream_t *)handle);
	}
382
}