io.c 10.8 KB
Newer Older
1
/*  Copyright (C) 2014-2017 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
2 3 4 5 6 7 8 9 10 11 12 13

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
14
    along with this program.  If not, see <https://www.gnu.org/licenses/>.
15 16
 */

17
#include <string.h>
18
#include <libknot/errcode.h>
19 20
#include <contrib/ucw/lib.h>
#include <contrib/ucw/mempool.h>
21
#include <assert.h>
22 23 24 25

#include "daemon/io.h"
#include "daemon/network.h"
#include "daemon/worker.h"
26
#include "daemon/tls.h"
27

28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
#define negotiate_bufsize(func, handle, bufsize_want) do { \
    int bufsize = 0; func(handle, &bufsize); \
	if (bufsize < bufsize_want) { \
		bufsize = bufsize_want; \
		func(handle, &bufsize); \
	} \
} while (0)

static void check_bufsize(uv_handle_t* handle)
{
	/* We want to buffer at least N waves in advance.
	 * This is magic presuming we can pull in a whole recvmmsg width in one wave.
	 * Linux will double this the bufsize wanted.
	 */
	const int bufsize_want = RECVMMSG_BATCH * 65535 * 2;
	negotiate_bufsize(uv_recv_buffer_size, handle, bufsize_want);
	negotiate_bufsize(uv_send_buffer_size, handle, bufsize_want);
}

#undef negotiate_bufsize

49
static void session_clear(struct session *s)
50
{
51
	assert(s->outgoing || s->tasks.len == 0);
52
	array_clear(s->tasks);
53
	tls_free(s->tls_ctx);
54
	memset(s, 0, sizeof(*s));
55 56
}

57
void session_free(struct session *s)
58
{
59 60 61 62
	if (s) {
		session_clear(s);
		free(s);
	}
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
}

struct session *session_new(void)
{
	return calloc(1, sizeof(struct session));
}

static struct session *session_borrow(struct worker_ctx *worker)
{
	struct session *s = NULL;
	if (worker->pool_sessions.len > 0) {
		s = array_tail(worker->pool_sessions);
		array_pop(worker->pool_sessions);
		kr_asan_unpoison(s, sizeof(*s));
	} else {
		s = session_new();
	}
	return s;
}

83
static void session_release(struct worker_ctx *worker, uv_handle_t *handle)
84
{
85 86 87 88
	if (!worker || !handle) {
		return;
	}
	struct session *s = handle->data;
89 90 91
	if (!s) {
		return;
	}
92 93 94
	if (!s->outgoing && handle->type == UV_TCP) {
		worker_end_tcp(worker, handle); /* to free the buffering task */
	}
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
	if (worker->pool_sessions.len < MP_FREELIST_SIZE) {
		session_clear(s);
		array_push(worker->pool_sessions, s);
		kr_asan_poison(s, sizeof(*s));
	} else {
		session_free(s);
	}
}

static uv_stream_t *handle_alloc(uv_loop_t *loop)
{
	uv_stream_t *handle = calloc(1, sizeof(*handle));
	if (!handle) {
		return NULL;
	}

	return handle;
112 113
}

114
static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
115
{
116 117 118 119 120
	/* Worker has single buffer which is reused for all incoming
	 * datagrams / stream reads, the content of the buffer is
	 * guaranteed to be unchanged only for the duration of
	 * udp_read() and tcp_read().
	 */
121
	struct session *session = handle->data;
122 123
	uv_loop_t *loop = handle->loop;
	struct worker_ctx *worker = loop->data;
124
	buf->base = (char *)worker->wire_buf;
125 126 127 128
	/* Limit TCP stream buffer size to 4K for granularity in batches of incoming queries. */
	if (handle->type == UV_TCP) {
		buf->len = MIN(suggested_size, 4096);
	/* Regular buffer size for subrequests. */
129
	} else if (session->outgoing) {
130
		buf->len = suggested_size;
131 132
	/* Use recvmmsg() on master sockets if possible. */
	} else {
133
		buf->len = sizeof(worker->wire_buf);
134
	}
135 136
}

137
void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
138 139 140 141
	const struct sockaddr *addr, unsigned flags)
{
	uv_loop_t *loop = handle->loop;
	struct worker_ctx *worker = loop->data;
142
	if (nread <= 0) {
143
		if (nread < 0) { /* Error response, notify resolver */
144
			worker_submit(worker, (uv_handle_t *)handle, NULL, addr);
145
		} /* nread == 0 is for freeing buffers, we don't need to do this */
146 147 148
		return;
	}

149
	knot_pkt_t *query = knot_pkt_new(buf->base, nread, &worker->pkt_pool);
150 151
	if (query) {
		query->max_size = KNOT_WIRE_MAX_PKTSIZE;
152
		worker_submit(worker, (uv_handle_t *)handle, query, addr);
153
	}
154
	mp_flush(worker->pkt_pool.ctx);
155 156
}

157 158 159 160 161 162 163 164 165
static int udp_bind_finalize(uv_handle_t *handle)
{
	check_bufsize((uv_handle_t *)handle);
	/* Handle is already created, just create context. */
	handle->data = session_new();
	assert(handle->data);
	return io_start_read((uv_handle_t *)handle);
}

166
int udp_bind(uv_udp_t *handle, struct sockaddr *addr)
167
{
168 169 170 171 172
	unsigned flags = UV_UDP_REUSEADDR;
	if (addr->sa_family == AF_INET6) {
		flags |= UV_UDP_IPV6ONLY;
	}
	int ret = uv_udp_bind(handle, addr, flags);
173 174 175
	if (ret != 0) {
		return ret;
	}
176 177 178 179 180 181 182 183 184 185 186 187 188 189
	return udp_bind_finalize((uv_handle_t *)handle);
}

int udp_bindfd(uv_udp_t *handle, int fd)
{
	if (!handle) {
		return kr_error(EINVAL);
	}

	int ret = uv_udp_open(handle, (uv_os_sock_t) fd);
	if (ret != 0) {
		return ret;
	}
	return udp_bind_finalize((uv_handle_t *)handle);
190 191
}

192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
static void tcp_timeout(uv_handle_t *timer)
{
	uv_handle_t *handle = timer->data;
	uv_close(handle, io_free);
}

static void tcp_timeout_trigger(uv_timer_t *timer)
{
	uv_handle_t *handle = timer->data;
	struct session *session = handle->data;
	if (session->tasks.len > 0) {
		uv_timer_again(timer);
	} else {
		uv_close((uv_handle_t *)timer, tcp_timeout);
	}
}

209 210 211
static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
{
	uv_loop_t *loop = handle->loop;
212
	struct session *s = handle->data;
213
	struct worker_ctx *worker = loop->data;
214 215
	/* TCP pipelining is rather complicated and requires cooperation from the worker
	 * so the whole message reassembly and demuxing logic is inside worker */
216 217
	int ret = 0;
	if (s->has_tls) {
218
		ret = tls_process(worker, handle, (const uint8_t *)buf->base, nread);
219 220 221
	} else {
		ret = worker_process_tcp(worker, handle, (const uint8_t *)buf->base, nread);
	}
222 223 224 225
	if (ret < 0) {
		worker_end_tcp(worker, (uv_handle_t *)handle);
		/* Exceeded per-connection quota for outstanding requests
		 * stop reading from stream and close after last message is processed. */
226
		if (!s->outgoing && !uv_is_closing((uv_handle_t *)&s->timeout)) {
227 228 229 230 231 232 233
			uv_timer_stop(&s->timeout);
			if (s->tasks.len == 0) {
				uv_close((uv_handle_t *)&s->timeout, tcp_timeout);
			} else { /* If there are tasks running, defer until they finish. */
				uv_timer_start(&s->timeout, tcp_timeout_trigger, 1, KR_CONN_RTT_MAX/2);
			}
		}
234 235
	/* Connection spawned at least one request, reset its deadline for next query.
	 * https://tools.ietf.org/html/rfc7766#section-6.2.3 */
236
	} else if (ret > 0 && !s->outgoing) {
237
		uv_timer_again(&s->timeout);
238
	}
239
	mp_flush(worker->pkt_pool.ctx);
240
}
241

242
static void _tcp_accept(uv_stream_t *master, int status, bool tls)
243 244
{
	if (status != 0) {
245 246
		return;
	}
247
	uv_stream_t *client = handle_alloc(master->loop);
248 249 250
	if (!client) {
		return;
	}
251
	memset(client, 0, sizeof(*client));
252 253
	io_create(master->loop, (uv_handle_t *)client, SOCK_STREAM);
	if (uv_accept(master, client) != 0) {
254
		uv_close((uv_handle_t *)client, io_free);
255 256 257
		return;
	}

258 259 260 261
	/* Set deadlines for TCP connection and start reading.
	 * It will re-check every half of a request time limit if the connection
	 * is idle and should be terminated, this is an educated guess. */
	struct session *session = client->data;
262
	session->has_tls = tls;
263 264 265
	if (tls && !session->tls_ctx) {
		session->tls_ctx = tls_new(master->loop->data);
	}
266 267 268 269
	uv_timer_t *timer = &session->timeout;
	uv_timer_init(master->loop, timer);
	timer->data = client;
	uv_timer_start(timer, tcp_timeout_trigger, KR_CONN_RTT_MAX/2, KR_CONN_RTT_MAX/2);
270
	io_start_read((uv_handle_t *)client);
271 272
}

273 274 275 276 277 278 279 280 281 282
static void tcp_accept(uv_stream_t *master, int status)
{
	_tcp_accept(master, status, false);
}

static void tls_accept(uv_stream_t *master, int status)
{
	_tcp_accept(master, status, true);
}

283
static int set_tcp_option(uv_handle_t *handle, int option, int val)
284
{
285
	uv_os_fd_t fd = 0;
286
	if (uv_fileno(handle, &fd) == 0) {
287 288 289 290 291
		return setsockopt(fd, IPPROTO_TCP, option, &val, sizeof(val));
	}
	return 0; /* N/A */
}

292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
static int tcp_bind_finalize(uv_handle_t *handle)
{
	/* TCP_FASTOPEN enables 1 RTT connection resumptions. */
#ifdef TCP_FASTOPEN
# ifdef __linux__
	(void) set_tcp_option(handle, TCP_FASTOPEN, 16); /* Accepts queue length hint */
# else
	(void) set_tcp_option(handle, TCP_FASTOPEN, 1);  /* Accepts on/off */
# endif
#endif

	handle->data = NULL;
	return 0;
}

307 308 309
static int _tcp_bind(uv_tcp_t *handle, struct sockaddr *addr, uv_connection_cb connection)
{
	unsigned flags = 0;
310
	if (addr->sa_family == AF_INET6) {
311
		flags |= UV_TCP_IPV6ONLY;
312
	}
313

314
	int ret = uv_tcp_bind(handle, addr, flags);
315 316 317 318
	if (ret != 0) {
		return ret;
	}

319 320
	/* TCP_DEFER_ACCEPT delays accepting connections until there is readable data. */
#ifdef TCP_DEFER_ACCEPT
321
	if (set_tcp_option((uv_handle_t *)handle, TCP_DEFER_ACCEPT, KR_CONN_RTT_MAX/1000) != 0) {
322 323 324 325 326
		kr_log_info("[ io ] tcp_bind (defer_accept): %s\n", strerror(errno));
	}
#endif

	ret = uv_listen((uv_stream_t *)handle, 16, connection);
327 328 329 330
	if (ret != 0) {
		return ret;
	}

331
	return tcp_bind_finalize((uv_handle_t *)handle);
332 333
}

334 335 336 337 338
int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr)
{
	return _tcp_bind(handle, addr, tcp_accept);
}

339 340 341 342 343
int tcp_bind_tls(uv_tcp_t *handle, struct sockaddr *addr)
{
	return _tcp_bind(handle, addr, tls_accept);
}

344
static int _tcp_bindfd(uv_tcp_t *handle, int fd, uv_connection_cb connection)
345 346 347 348 349 350 351 352 353 354
{
	if (!handle) {
		return kr_error(EINVAL);
	}

	int ret = uv_tcp_open(handle, (uv_os_sock_t) fd);
	if (ret != 0) {
		return ret;
	}

355
	ret = uv_listen((uv_stream_t *)handle, 16, connection);
356 357 358 359 360 361
	if (ret != 0) {
		return ret;
	}
	return tcp_bind_finalize((uv_handle_t *)handle);
}

362 363 364 365 366 367 368 369 370 371
int tcp_bindfd(uv_tcp_t *handle, int fd)
{
	return _tcp_bindfd(handle, fd, tcp_accept);
}

int tcp_bindfd_tls(uv_tcp_t *handle, int fd)
{
	return _tcp_bindfd(handle, fd, tls_accept);
}

372
void io_create(uv_loop_t *loop, uv_handle_t *handle, int type)
373 374
{
	if (type == SOCK_DGRAM) {
375
		uv_udp_init(loop, (uv_udp_t *)handle);
376
	} else {
377
		uv_tcp_init(loop, (uv_tcp_t *)handle);
378
		uv_tcp_nodelay((uv_tcp_t *)handle, 1);
379
	}
380 381 382 383 384 385 386 387 388 389 390 391 392 393

	struct worker_ctx *worker = loop->data;
	handle->data = session_borrow(worker);
	assert(handle->data);
}

void io_deinit(uv_handle_t *handle)
{
	if (!handle) {
		return;
	}
	uv_loop_t *loop = handle->loop;
	if (loop && loop->data) {
		struct worker_ctx *worker = loop->data;
394
		session_release(worker, handle);
395 396 397 398 399 400 401 402 403 404 405 406 407
	} else {
		session_free(handle->data);
	}
	handle->data = NULL;
}

void io_free(uv_handle_t *handle)
{
	if (!handle) {
		return;
	}
	io_deinit(handle);
	free(handle);
408
}
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425

int io_start_read(uv_handle_t *handle)
{
	if (handle->type == UV_UDP) {
		return uv_udp_recv_start((uv_udp_t *)handle, &handle_getbuf, &udp_recv);
	} else {
		return uv_read_start((uv_stream_t *)handle, &handle_getbuf, &tcp_recv);
	}
}

int io_stop_read(uv_handle_t *handle)
{
	if (handle->type == UV_UDP) {
		return uv_udp_recv_stop((uv_udp_t *)handle);
	} else {
		return uv_read_stop((uv_stream_t *)handle);
	}
426
}