io.c 9.03 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*  Copyright (C) 2014 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

17
#include <string.h>
18
#include <libknot/errcode.h>
19 20
#include <contrib/ucw/lib.h>
#include <contrib/ucw/mempool.h>
21
#include <assert.h>
22 23 24 25 26

#include "daemon/io.h"
#include "daemon/network.h"
#include "daemon/worker.h"

27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
#define negotiate_bufsize(func, handle, bufsize_want) do { \
    int bufsize = 0; func(handle, &bufsize); \
	if (bufsize < bufsize_want) { \
		bufsize = bufsize_want; \
		func(handle, &bufsize); \
	} \
} while (0)

static void check_bufsize(uv_handle_t* handle)
{
	/* We want to buffer at least N waves in advance.
	 * This is magic presuming we can pull in a whole recvmmsg width in one wave.
	 * Linux will double this the bufsize wanted.
	 */
	const int bufsize_want = RECVMMSG_BATCH * 65535 * 2;
	negotiate_bufsize(uv_recv_buffer_size, handle, bufsize_want);
	negotiate_bufsize(uv_send_buffer_size, handle, bufsize_want);
}

#undef negotiate_bufsize

48
static void session_clear(struct session *s)
49
{
50
	assert(s->outgoing || s->tasks.len == 0);
51 52
	array_clear(s->tasks);
	memset(s, 0, sizeof(*s));
53 54
}

55
void session_free(struct session *s)
56
{
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
	session_clear(s);
	free(s);
}

struct session *session_new(void)
{
	return calloc(1, sizeof(struct session));
}

static struct session *session_borrow(struct worker_ctx *worker)
{
	struct session *s = NULL;
	if (worker->pool_sessions.len > 0) {
		s = array_tail(worker->pool_sessions);
		array_pop(worker->pool_sessions);
		kr_asan_unpoison(s, sizeof(*s));
	} else {
		s = session_new();
	}
	return s;
}

static void session_release(struct worker_ctx *worker, struct session *s)
{
	if (worker->pool_sessions.len < MP_FREELIST_SIZE) {
		session_clear(s);
		array_push(worker->pool_sessions, s);
		kr_asan_poison(s, sizeof(*s));
	} else {
		session_free(s);
	}
}

static uv_stream_t *handle_alloc(uv_loop_t *loop)
{
	uv_stream_t *handle = calloc(1, sizeof(*handle));
	if (!handle) {
		return NULL;
	}

	return handle;
98 99
}

100
static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
101
{
102 103 104 105 106
	/* Worker has single buffer which is reused for all incoming
	 * datagrams / stream reads, the content of the buffer is
	 * guaranteed to be unchanged only for the duration of
	 * udp_read() and tcp_read().
	 */
107
	struct session *session = handle->data;
108 109
	uv_loop_t *loop = handle->loop;
	struct worker_ctx *worker = loop->data;
110
	buf->base = (char *)worker->wire_buf;
111 112 113 114
	/* Limit TCP stream buffer size to 4K for granularity in batches of incoming queries. */
	if (handle->type == UV_TCP) {
		buf->len = MIN(suggested_size, 4096);
	/* Regular buffer size for subrequests. */
115
	} else if (session->outgoing) {
116
		buf->len = suggested_size;
117 118
	/* Use recvmmsg() on master sockets if possible. */
	} else {
119
		buf->len = sizeof(worker->wire_buf);
120
	}
121 122
}

123
void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
124 125 126 127
	const struct sockaddr *addr, unsigned flags)
{
	uv_loop_t *loop = handle->loop;
	struct worker_ctx *worker = loop->data;
128
	if (nread <= 0) {
129
		if (nread < 0) { /* Error response, notify resolver */
130
			worker_submit(worker, (uv_handle_t *)handle, NULL, addr);
131
		} /* nread == 0 is for freeing buffers, we don't need to do this */
132 133 134
		return;
	}

135
	knot_pkt_t *query = knot_pkt_new(buf->base, nread, &worker->pkt_pool);
136 137
	if (query) {
		query->max_size = KNOT_WIRE_MAX_PKTSIZE;
138
		worker_submit(worker, (uv_handle_t *)handle, query, addr);
139
	}
140
	mp_flush(worker->pkt_pool.ctx);
141 142
}

143
int udp_bind(uv_udp_t *handle, struct sockaddr *addr)
144
{
145 146 147 148 149
	unsigned flags = UV_UDP_REUSEADDR;
	if (addr->sa_family == AF_INET6) {
		flags |= UV_UDP_IPV6ONLY;
	}
	int ret = uv_udp_bind(handle, addr, flags);
150 151 152
	if (ret != 0) {
		return ret;
	}
153
	check_bufsize((uv_handle_t *)handle);
154 155 156
	/* Handle is already created, just create context. */
	handle->data = session_new();
	assert(handle->data);
157
	return io_start_read((uv_handle_t *)handle);
158 159
}

160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
static void tcp_timeout(uv_handle_t *timer)
{
	uv_handle_t *handle = timer->data;
	uv_close(handle, io_free);
}

static void tcp_timeout_trigger(uv_timer_t *timer)
{
	uv_handle_t *handle = timer->data;
	struct session *session = handle->data;
	if (session->tasks.len > 0) {
		uv_timer_again(timer);
	} else {
		uv_close((uv_handle_t *)timer, tcp_timeout);
	}
}

177 178 179
static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
{
	uv_loop_t *loop = handle->loop;
180
	struct session *s = handle->data;
181
	struct worker_ctx *worker = loop->data;
182 183
	/* TCP pipelining is rather complicated and requires cooperation from the worker
	 * so the whole message reassembly and demuxing logic is inside worker */
184
	int ret = worker_process_tcp(worker, (uv_handle_t *)handle, (const uint8_t *)buf->base, nread);
185 186 187 188
	if (ret < 0) {
		worker_end_tcp(worker, (uv_handle_t *)handle);
		/* Exceeded per-connection quota for outstanding requests
		 * stop reading from stream and close after last message is processed. */
189
		if (!s->outgoing && !uv_is_closing((uv_handle_t *)&s->timeout)) {
190 191 192 193 194 195 196 197
			uv_timer_stop(&s->timeout);
			if (s->tasks.len == 0) {
				uv_close((uv_handle_t *)&s->timeout, tcp_timeout);
			} else { /* If there are tasks running, defer until they finish. */
				uv_timer_start(&s->timeout, tcp_timeout_trigger, 1, KR_CONN_RTT_MAX/2);
			}
		}
	/* Connection spawned more than one request, reset its deadline for next query. */
198
	} else if (ret > 0 && !s->outgoing) {
199
		uv_timer_again(&s->timeout);
200
	}
201
	mp_flush(worker->pkt_pool.ctx);
202
}
203

204 205 206
static void tcp_accept(uv_stream_t *master, int status)
{
	if (status != 0) {
207 208
		return;
	}
209
	uv_stream_t *client = handle_alloc(master->loop);
210 211 212
	if (!client) {
		return;
	}
213
	memset(client, 0, sizeof(*client));
214 215
	io_create(master->loop, (uv_handle_t *)client, SOCK_STREAM);
	if (uv_accept(master, client) != 0) {
216
		io_free((uv_handle_t *)client);
217 218 219
		return;
	}

220 221 222 223 224 225 226 227
	/* Set deadlines for TCP connection and start reading.
	 * It will re-check every half of a request time limit if the connection
	 * is idle and should be terminated, this is an educated guess. */
	struct session *session = client->data;
	uv_timer_t *timer = &session->timeout;
	uv_timer_init(master->loop, timer);
	timer->data = client;
	uv_timer_start(timer, tcp_timeout_trigger, KR_CONN_RTT_MAX/2, KR_CONN_RTT_MAX/2);
228
	io_start_read((uv_handle_t *)client);
229 230
}

231
static int set_tcp_option(uv_tcp_t *handle, int option, int val)
232
{
233 234 235 236 237 238 239 240 241 242
	uv_os_fd_t fd = 0;
	if (uv_fileno((uv_handle_t *)handle, &fd) == 0) {
		return setsockopt(fd, IPPROTO_TCP, option, &val, sizeof(val));
	}
	return 0; /* N/A */
}

static int _tcp_bind(uv_tcp_t *handle, struct sockaddr *addr, uv_connection_cb connection)
{
	unsigned flags = 0;
243
	if (addr->sa_family == AF_INET6) {
244
		flags |= UV_TCP_IPV6ONLY;
245
	}
246

247
	int ret = uv_tcp_bind(handle, addr, flags);
248 249 250 251
	if (ret != 0) {
		return ret;
	}

252 253 254 255 256 257 258 259
	/* TCP_DEFER_ACCEPT delays accepting connections until there is readable data. */
#ifdef TCP_DEFER_ACCEPT
	if (set_tcp_option(handle, TCP_DEFER_ACCEPT, KR_CONN_RTT_MAX/1000) != 0) {
		kr_log_info("[ io ] tcp_bind (defer_accept): %s\n", strerror(errno));
	}
#endif

	ret = uv_listen((uv_stream_t *)handle, 16, connection);
260 261 262 263
	if (ret != 0) {
		return ret;
	}

264 265 266 267 268 269 270 271 272
	/* TCP_FASTOPEN enables 1 RTT connection resumptions. */
#ifdef TCP_FASTOPEN
# ifdef __linux__
	(void) set_tcp_option(handle, TCP_FASTOPEN, 16); /* Accepts queue length hint */
# else
	(void) set_tcp_option(handle, TCP_FASTOPEN, 1);  /* Accepts on/off */
# endif
#endif

273
	handle->data = NULL;
274 275 276
	return 0;
}

277 278 279 280 281
int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr)
{
	return _tcp_bind(handle, addr, tcp_accept);
}

282
void io_create(uv_loop_t *loop, uv_handle_t *handle, int type)
283 284
{
	if (type == SOCK_DGRAM) {
285
		uv_udp_init(loop, (uv_udp_t *)handle);
286
	} else {
287
		uv_tcp_init(loop, (uv_tcp_t *)handle);
288
		uv_tcp_nodelay((uv_tcp_t *)handle, 1);
289
	}
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317

	struct worker_ctx *worker = loop->data;
	handle->data = session_borrow(worker);
	assert(handle->data);
}

void io_deinit(uv_handle_t *handle)
{
	if (!handle) {
		return;
	}
	uv_loop_t *loop = handle->loop;
	if (loop && loop->data) {
		struct worker_ctx *worker = loop->data;
		session_release(worker, handle->data);
	} else {
		session_free(handle->data);
	}
	handle->data = NULL;
}

void io_free(uv_handle_t *handle)
{
	if (!handle) {
		return;
	}
	io_deinit(handle);
	free(handle);
318
}
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335

int io_start_read(uv_handle_t *handle)
{
	if (handle->type == UV_UDP) {
		return uv_udp_recv_start((uv_udp_t *)handle, &handle_getbuf, &udp_recv);
	} else {
		return uv_read_start((uv_stream_t *)handle, &handle_getbuf, &tcp_recv);
	}
}

int io_stop_read(uv_handle_t *handle)
{
	if (handle->type == UV_UDP) {
		return uv_udp_recv_stop((uv_udp_t *)handle);
	} else {
		return uv_read_stop((uv_stream_t *)handle);
	}
336
}