worker.c 54.8 KB
Newer Older
1
/*  Copyright (C) 2014-2017 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
Marek Vavruša's avatar
Marek Vavruša committed
2 3 4 5 6 7 8 9 10 11 12 13

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
14
    along with this program.  If not, see <https://www.gnu.org/licenses/>.
Marek Vavruša's avatar
Marek Vavruša committed
15 16
 */

17
#include <uv.h>
18
#include <lua.h>
19
#include <libknot/packet/pkt.h>
20
#include <libknot/descriptor.h>
21 22
#include <contrib/ucw/lib.h>
#include <contrib/ucw/mempool.h>
23
#include <contrib/wire.h>
Marek Vavruša's avatar
Marek Vavruša committed
24 25 26
#if defined(__GLIBC__) && defined(_GNU_SOURCE)
#include <malloc.h>
#endif
27
#include <assert.h>
28 29
#include <sys/types.h>
#include <unistd.h>
30
#include <gnutls/gnutls.h>
31
#include "lib/utils.h"
32
#include "lib/layer.h"
33
#include "daemon/worker.h"
34
#include "daemon/bindings.h"
35
#include "daemon/engine.h"
36
#include "daemon/io.h"
37
#include "daemon/tls.h"
38
#include "daemon/zimport.h"
39
#include "daemon/session.h"
40

41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56

/* Magic defaults for the worker. */
#ifndef MP_FREELIST_SIZE
# ifdef __clang_analyzer__
#  define MP_FREELIST_SIZE 0
# else
#  define MP_FREELIST_SIZE 64 /**< Maximum length of the worker mempool freelist */
# endif
#endif
#ifndef QUERY_RATE_THRESHOLD
#define QUERY_RATE_THRESHOLD (2 * MP_FREELIST_SIZE) /**< Nr of parallel queries considered as high rate */
#endif
#ifndef MAX_PIPELINED
#define MAX_PIPELINED 100
#endif

57 58
#define VERBOSE_MSG(qry, fmt...) QRVERBOSE(qry, "wrkr", fmt)

59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
/** Client request state. */
struct request_ctx
{
	struct kr_request req;
	struct {
		union inaddr addr;
		union inaddr dst_addr;
		/* uv_handle_t *handle; */

		/** NULL if the request didn't come over network. */
		struct session *session;
	} source;
	struct worker_ctx *worker;
	qr_tasklist_t tasks;
};

/** Query resolution task. */
struct qr_task
{
	struct request_ctx *ctx;
	knot_pkt_t *pktbuf;
	qr_tasklist_t waiting;
	uv_handle_t *pending[MAX_PENDING];
	uint16_t pending_count;
	uint16_t addrlist_count;
	uint16_t addrlist_turn;
	uint16_t timeouts;
	uint16_t iter_count;
	uint16_t bytes_remaining;
	struct sockaddr *addrlist;
	uint32_t refs;
	bool finished : 1;
	bool leading  : 1;
92 93
};

94

95 96 97 98
/* Convenience macros */
#define qr_task_ref(task) \
	do { ++(task)->refs; } while(0)
#define qr_task_unref(task) \
99
	do { if (task && --(task)->refs == 0) { qr_task_free(task); } } while (0)
100
#define qr_valid_handle(task, checked) \
101 102 103 104 105 106
	(!uv_is_closing((checked)) || (task)->ctx->source.session->handle == (checked))

/** @internal get key for tcp session
 *  @note kr_straddr() return pointer to static string
 */
#define tcpsess_key(addr) kr_straddr(addr)
107 108

/* Forward decls */
109
static void qr_task_free(struct qr_task *task);
110 111 112 113 114 115 116 117
static int qr_task_step(struct qr_task *task,
			const struct sockaddr *packet_source,
			knot_pkt_t *packet);
static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
			struct sockaddr *addr, knot_pkt_t *pkt);
static int qr_task_finalize(struct qr_task *task, int state);
static void qr_task_complete(struct qr_task *task);
static struct session* worker_find_tcp_connected(struct worker_ctx *worker,
118
						 const struct sockaddr *addr);
119 120 121 122 123 124
static int worker_add_tcp_waiting(struct worker_ctx *worker,
				  const struct sockaddr *addr,
				  struct session *session);
static int worker_del_tcp_waiting(struct worker_ctx *worker,
				  const struct sockaddr *addr);
static struct session* worker_find_tcp_waiting(struct worker_ctx *worker,
125
					       const struct sockaddr *addr);
126 127
static void on_tcp_connect_timeout(uv_timer_t *timer);
static void on_tcp_watchdog_timeout(uv_timer_t *timer);
128 129 130 131 132 133 134

/** @internal Get singleton worker. */
static inline struct worker_ctx *get_worker(void)
{
	return uv_default_loop()->data;
}

135 136 137
/*! @internal Create a UDP/TCP handle for an outgoing AF_INET* connection.
 *  socktype is SOCK_* */
static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t family)
138
{
139 140 141
	bool precond = (socktype == SOCK_DGRAM || socktype == SOCK_STREAM)
			&& (family == AF_INET  || family == AF_INET6);
	if (!precond) {
142 143
		/* assert(false); see #245 */
		kr_log_verbose("[work] ioreq_spawn: pre-condition failed\n");
144 145 146
		return NULL;
	}

147 148 149 150
	if (task->pending_count >= MAX_PENDING) {
		return NULL;
	}
	/* Create connection for iterative query */
151
	struct worker_ctx *worker = task->ctx->worker;
152 153
	uv_handle_t *handle = malloc(socktype == SOCK_DGRAM
					? sizeof(uv_udp_t) : sizeof(uv_tcp_t));
154 155 156
	if (!handle) {
		return NULL;
	}
157 158 159 160 161 162
	int ret = io_create(worker->loop, handle, socktype, family);
	if (ret) {
		if (ret == UV_EMFILE) {
			worker->too_many_open = true;
			worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
		}
163
		free(handle);
164 165
		return NULL;
	}
166 167 168 169

	/* Bind to outgoing address, according to IP v4/v6. */
	union inaddr *addr;
	if (family == AF_INET) {
170
		addr = (union inaddr *)&worker->out_addr4;
171
	} else {
172
		addr = (union inaddr *)&worker->out_addr6;
173 174 175 176
	}
	if (addr->ip.sa_family != AF_UNSPEC) {
		assert(addr->ip.sa_family == family);
		if (socktype == SOCK_DGRAM) {
177 178 179 180 181
			uv_udp_t *udp = (uv_udp_t *)handle;
			ret = uv_udp_bind(udp, &addr->ip, 0);
		} else if (socktype == SOCK_STREAM){
			uv_tcp_t *tcp = (uv_tcp_t *)handle;
			ret = uv_tcp_bind(tcp, &addr->ip, 0);
182 183 184
		}
	}

185 186
	/* Set current handle as a subrequest type. */
	struct session *session = handle->data;
187
	if (ret == 0) {
188
		session_flags(session)->outgoing = true;
189
		ret = session_tasklist_add(session, task);
190
	}
191
	if (ret < 0) {
192
		io_deinit(handle);
193
		free(handle);
194 195 196
		return NULL;
	}
	/* Connect or issue query datagram */
197
	task->pending[task->pending_count] = handle;
198
	task->pending_count += 1;
199
	return handle;
200 201
}

202
static void ioreq_kill_pending(struct qr_task *task)
203
{
204
	for (uint16_t i = 0; i < task->pending_count; ++i) {
205
		session_kill_ioreq(task->pending[i]->data, task);
206 207 208 209
	}
	task->pending_count = 0;
}

210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
/** @cond This memory layout is internal to mempool.c, use only for debugging. */
#if defined(__SANITIZE_ADDRESS__)
struct mempool_chunk {
  struct mempool_chunk *next;
  size_t size;
};
static void mp_poison(struct mempool *mp, bool poison)
{
	if (!poison) { /* @note mempool is part of the first chunk, unpoison it first */
		kr_asan_unpoison(mp, sizeof(*mp));
	}
	struct mempool_chunk *chunk = mp->state.last[0];
	void *chunk_off = (void *)chunk - chunk->size;
	if (poison) {
		kr_asan_poison(chunk_off, chunk->size);
	} else {
		kr_asan_unpoison(chunk_off, chunk->size);
	}
}
#else
#define mp_poison(mp, enable)
#endif
/** @endcond */

234
/** Get a mempool.  (Recycle if possible.)  */
235
static inline struct mempool *pool_borrow(struct worker_ctx *worker)
236 237
{
	struct mempool *mp = NULL;
238 239 240 241
	if (worker->pool_mp.len > 0) {
		mp = array_tail(worker->pool_mp);
		array_pop(worker->pool_mp);
		mp_poison(mp, 0);
242 243 244 245 246 247
	} else { /* No mempool on the freelist, create new one */
		mp = mp_new (4 * CPU_PAGE_SIZE);
	}
	return mp;
}

248
/** Return a mempool.  (Cache them up to some count.) */
249 250
static inline void pool_release(struct worker_ctx *worker, struct mempool *mp)
{
251
	if (worker->pool_mp.len < MP_FREELIST_SIZE) {
252
		mp_flush(mp);
253
		array_push(worker->pool_mp, mp);
254
		mp_poison(mp, 1);
255 256 257 258 259
	} else {
		mp_delete(mp);
	}
}

260 261 262 263 264 265 266 267 268 269 270 271
/** Create a key for an outgoing subrequest: qname, qclass, qtype.
 * @param key Destination buffer for key size, MUST be SUBREQ_KEY_LEN or larger.
 * @return key length if successful or an error
 */
static const size_t SUBREQ_KEY_LEN = KR_RRKEY_LEN;
static int subreq_key(char *dst, knot_pkt_t *pkt)
{
	assert(pkt);
	return kr_rrkey(dst, knot_pkt_qclass(pkt), knot_pkt_qname(pkt),
			knot_pkt_qtype(pkt), knot_pkt_qtype(pkt));
}

272 273 274 275 276 277 278 279
/** Create and initialize a request_ctx (on a fresh mempool).
 *
 * handle and addr point to the source of the request, and they are NULL
 * in case the request didn't come from network.
 */
static struct request_ctx *request_create(struct worker_ctx *worker,
					  uv_handle_t *handle,
					  const struct sockaddr *addr)
280
{
281
	knot_mm_t pool = {
282
		.ctx = pool_borrow(worker),
283
		.alloc = (knot_mm_alloc_t) mp_alloc
284
	};
285

286 287 288 289
	/* Create request context */
	struct request_ctx *ctx = mm_alloc(&pool, sizeof(*ctx));
	if (!ctx) {
		pool_release(worker, pool.ctx);
290 291
		return NULL;
	}
292

293 294 295 296 297
	memset(ctx, 0, sizeof(*ctx));

	/* TODO Relocate pool to struct request */
	ctx->worker = worker;
	array_init(ctx->tasks);
298 299
	struct session *s = handle ? handle->data : NULL;
	if (s) {
300
		assert(session_flags(s)->outgoing == false);
301
	}
302
	ctx->source.session = s;
303 304 305

	struct kr_request *req = &ctx->req;
	req->pool = pool;
306
	req->vars_ref = LUA_NOREF;
307

308
	/* Remember query source addr */
309 310 311
	if (!addr || (addr->sa_family != AF_INET && addr->sa_family != AF_INET6)) {
		ctx->source.addr.ip.sa_family = AF_UNSPEC;
	} else {
312 313 314
		size_t addr_len = sizeof(struct sockaddr_in);
		if (addr->sa_family == AF_INET6)
			addr_len = sizeof(struct sockaddr_in6);
315 316
		memcpy(&ctx->source.addr.ip, addr, addr_len);
		ctx->req.qsource.addr = &ctx->source.addr.ip;
317
	}
318 319 320 321 322

	worker->stats.rconcurrent += 1;

	if (!handle) {
		return ctx;
323
	}
324

325
	/* Remember the destination address. */
326 327 328 329 330 331
	int addr_len = sizeof(ctx->source.dst_addr);
	struct sockaddr *dst_addr = &ctx->source.dst_addr.ip;
	ctx->source.dst_addr.ip.sa_family = AF_UNSPEC;
	if (handle->type == UV_UDP) {
		if (uv_udp_getsockname((uv_udp_t *)handle, dst_addr, &addr_len) == 0) {
			req->qsource.dst_addr = dst_addr;
332
		}
333 334 335 336
		req->qsource.tcp = false;
	} else if (handle->type == UV_TCP) {
		if (uv_tcp_getsockname((uv_tcp_t *)handle, dst_addr, &addr_len) == 0) {
			req->qsource.dst_addr = dst_addr;
337
		}
338
		req->qsource.tcp = true;
339
	}
340 341

	return ctx;
342 343
}

344 345
/** More initialization, related to the particular incoming query/packet. */
static int request_start(struct request_ctx *ctx, knot_pkt_t *query)
346
{
347 348 349 350 351
	assert(query && ctx);
	size_t answer_max = KNOT_WIRE_MIN_PKTSIZE;
	struct kr_request *req = &ctx->req;

	/* source.session can be empty if request was generated by kresd itself */
352 353
	struct session *s = ctx->source.session;
	if (!s || session_get_handle(s)->type == UV_TCP) {
354 355 356 357
		answer_max = KNOT_WIRE_MAX_PKTSIZE;
	} else if (knot_pkt_has_edns(query)) { /* EDNS */
		answer_max = MAX(knot_edns_get_payload(query->opt_rr),
				 KNOT_WIRE_MIN_PKTSIZE);
358
	}
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
	req->qsource.size = query->size;

	req->answer = knot_pkt_new(NULL, answer_max, &req->pool);
	if (!req->answer) {
		return kr_error(ENOMEM);
	}

	/* Remember query source TSIG key */
	if (query->tsig_rr) {
		req->qsource.key = knot_rrset_copy(query->tsig_rr, &req->pool);
	}

	/* Remember query source EDNS data */
	if (query->opt_rr) {
		req->qsource.opt = knot_rrset_copy(query->opt_rr, &req->pool);
	}
	/* Start resolution */
	struct worker_ctx *worker = ctx->worker;
	struct engine *engine = worker->engine;
	kr_resolve_begin(req, &engine->resolver, req->answer);
	worker->stats.queries += 1;
	/* Throttle outbound queries only when high pressure */
	if (worker->stats.concurrent < QUERY_RATE_THRESHOLD) {
		req->options.NO_THROTTLE = true;
	}
	return kr_ok();
}

static void request_free(struct request_ctx *ctx)
{
	struct worker_ctx *worker = ctx->worker;
390 391 392 393 394 395 396 397 398 399 400 401 402 403
	/* Dereference any Lua vars table if exists */
	if (ctx->req.vars_ref != LUA_NOREF) {
		lua_State *L = worker->engine->L;
		/* Get worker variables table */
		lua_rawgeti(L, LUA_REGISTRYINDEX, worker->vars_table_ref);
		/* Get next free element (position 0) and store it under current reference (forming a list) */
		lua_rawgeti(L, -1, 0);
		lua_rawseti(L, -2, ctx->req.vars_ref);
		/* Set current reference as the next free element */
		lua_pushinteger(L, ctx->req.vars_ref);
		lua_rawseti(L, -2, 0);
		lua_pop(L, 1);
		ctx->req.vars_ref = LUA_NOREF;
	}
404
	/* Return mempool to ring or free it if it's full */
405
	pool_release(worker, ctx->req.pool.ctx);
406
	/* @note The 'task' is invalidated from now on. */
Marek Vavruša's avatar
Marek Vavruša committed
407
	/* Decommit memory every once in a while */
408
	static int mp_delete_count = 0;
409 410 411
	if (++mp_delete_count == 100000) {
		lua_gc(worker->engine->L, LUA_GCCOLLECT, 0);
#if defined(__GLIBC__) && defined(_GNU_SOURCE)
Marek Vavruša's avatar
Marek Vavruša committed
412
		malloc_trim(0);
413
#endif
Marek Vavruša's avatar
Marek Vavruša committed
414
		mp_delete_count = 0;
415
	}
416
	worker->stats.rconcurrent -= 1;
417
}
418

419
static int request_add_tasks(struct request_ctx *ctx, struct qr_task *task)
420
{
421 422 423 424 425 426 427 428
	for (int i = 0; i < ctx->tasks.len; ++i) {
		if (ctx->tasks.at[i] == task) {
			return i;
		}
	}
	int ret = array_push(ctx->tasks, task);
	if (ret >= 0) {
		qr_task_ref(task);
429
	}
430 431
	return ret;
}
432

433 434 435 436 437 438 439 440 441 442
static int request_del_tasks(struct request_ctx *ctx, struct qr_task *task)
{
	int ret = kr_error(ENOENT);
	for (int i = 0; i < ctx->tasks.len; ++i) {
		if (ctx->tasks.at[i] == task) {
			array_del(ctx->tasks, i);
			qr_task_unref(task);
			ret = kr_ok();
			break;
		}
443
	}
444 445
	return ret;
}
446

447 448 449 450 451 452 453 454
static struct qr_task *qr_task_create(struct request_ctx *ctx)
{
	/* How much can client handle? */
	struct engine *engine = ctx->worker->engine;
	size_t pktbuf_max = KR_EDNS_PAYLOAD;
	if (engine->resolver.opt_rr) {
		pktbuf_max = MAX(knot_edns_get_payload(engine->resolver.opt_rr),
				 pktbuf_max);
455 456
	}

457 458 459 460
	/* Create resolution task */
	struct qr_task *task = mm_alloc(&ctx->req.pool, sizeof(*task));
	if (!task) {
		return NULL;
461
	}
462
	memset(task, 0, sizeof(*task)); /* avoid accidentally unintialized fields */
463

464 465 466 467 468
	/* Create packet buffers for answer and subrequests */
	knot_pkt_t *pktbuf = knot_pkt_new(NULL, pktbuf_max, &ctx->req.pool);
	if (!pktbuf) {
		mm_free(&ctx->req.pool, task);
		return NULL;
469
	}
470
	pktbuf->size = 0;
471

472 473 474 475 476 477 478 479 480
	task->ctx = ctx;
	task->pktbuf = pktbuf;
	array_init(task->waiting);
	task->refs = 0;
	int ret = request_add_tasks(ctx, task);
	if (ret < 0) {
		mm_free(&ctx->req.pool, task);
		mm_free(&ctx->req.pool, pktbuf);
		return NULL;
481
	}
482 483
	ctx->worker->stats.concurrent += 1;
	return task;
484 485
}

486 487 488 489 490 491 492 493
/* This is called when the task refcount is zero, free memory. */
static void qr_task_free(struct qr_task *task)
{
	struct request_ctx *ctx = task->ctx;

	assert(ctx);

	/* Process outbound session. */
494
	struct session *s = ctx->source.session;
495 496 497
	struct worker_ctx *worker = ctx->worker;

	/* Process source session. */
498
	if (s && session_tasklist_get_len(s) < worker->tcp_pipeline_max/2 &&
499
	    !session_flags(s)->closing && !session_flags(s)->throttled) {
500
		uv_handle_t *handle = session_get_handle(s);
501 502
		/* Start reading again if the session is throttled and
		 * the number of outgoing requests is below watermark. */
Grigorii Demidov's avatar
Grigorii Demidov committed
503 504
		if (handle) {
			io_start_read(handle);
505
			session_flags(s)->throttled = false;
506 507 508 509 510 511 512 513 514 515 516 517
		}
	}

	if (ctx->tasks.len == 0) {
		array_clear(ctx->tasks);
		request_free(ctx);
	}

	/* Update stats */
	worker->stats.concurrent -= 1;
}

518 519 520
/*@ Register new qr_task within session. */
static int qr_task_register(struct qr_task *task, struct session *session)
{
521
	assert(!session_flags(session)->outgoing && session_get_handle(session)->type == UV_TCP);
522 523 524 525 526 527 528 529 530 531 532 533 534

	session_tasklist_add(session, task);

	struct request_ctx *ctx = task->ctx;
	assert(ctx && (ctx->source.session == NULL || ctx->source.session == session));
	ctx->source.session = session;
	/* Soft-limit on parallel queries, there is no "slow down" RCODE
	 * that we could use to signalize to client, but we can stop reading,
	 * an in effect shrink TCP window size. To get more precise throttling,
	 * we would need to copy remainder of the unread buffer and reassemble
	 * when resuming reading. This is NYI.  */
	if (session_tasklist_get_len(session) >= task->ctx->worker->tcp_pipeline_max) {
		uv_handle_t *handle = session_get_handle(session);
535
		if (handle && !session_flags(session)->throttled && !session_flags(session)->closing) {
536
			io_stop_read(handle);
537
			session_flags(session)->throttled = true;
538 539 540 541 542 543
		}
	}

	return 0;
}

544
static void qr_task_complete(struct qr_task *task)
545
{
546
	struct request_ctx *ctx = task->ctx;
547

548
	/* Kill pending I/O requests */
549
	ioreq_kill_pending(task);
550 551
	assert(task->waiting.len == 0);
	assert(task->leading == false);
552

553 554
	struct session *s = ctx->source.session;
	if (s) {
555
		assert(!session_flags(s)->outgoing && session_waitinglist_is_empty(s));
556
		session_tasklist_del(s, task);
Grigorii Demidov's avatar
Grigorii Demidov committed
557
	}
558

559
	/* Release primary reference to task. */
560
	request_del_tasks(ctx, task);
561 562
}

563
/* This is called when we send subrequest / answer */
564
static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
565
{
566 567 568 569 570 571
	if (task->finished) {
		assert(task->leading == false);
		qr_task_complete(task);
		if (!handle || handle->type != UV_TCP) {
			return status;
		}
572 573
		struct session* s = handle->data;
		assert(s);
574
		if (!session_flags(s)->outgoing || session_waitinglist_is_empty(s)) {
575 576 577 578
			return status;
		}
	}

Grigorii Demidov's avatar
Grigorii Demidov committed
579
	if (handle) {
580
		struct session* s = handle->data;
581
		bool outgoing = session_flags(s)->outgoing;
582 583 584 585 586
		if (!outgoing) {
			struct session* source_s = task->ctx->source.session;
			if (source_s) {
				assert (session_get_handle(source_s) == handle);
			}
Grigorii Demidov's avatar
Grigorii Demidov committed
587
		}
588 589
		if (handle->type == UV_TCP && outgoing && !session_waitinglist_is_empty(s)) {
			session_waitinglist_del(s, task);
590
			if (session_flags(s)->closing) {
Grigorii Demidov's avatar
Grigorii Demidov committed
591 592
				return status;
			}
593 594 595 596 597
			/* Finalize the task, if any errors.
			 * We can't add it to the end of waiting list for retrying
			 * since it may lead endless loop in some circumstances
			 * (for instance: tls; send->tls_push->too many non-critical errors->
			 * on_send with nonzero status->re-add to waiting->send->etc).*/
Grigorii Demidov's avatar
Grigorii Demidov committed
598
			if (status != 0) {
599
				if (outgoing) {
600 601
					qr_task_finalize(task, KR_STATE_FAIL);
				} else {
602
					assert(task->ctx->source.session == s);
603 604
					task->ctx->source.session = NULL;
				}
605
				session_tasklist_del(s, task);
606
			}
607 608 609 610 611
			struct qr_task *waiting_task = session_waitinglist_get_first(s);
			if (waiting_task) {
				struct sockaddr *peer = session_get_peer(s);
				knot_pkt_t *pkt = waiting_task->pktbuf;
				int ret = qr_task_send(waiting_task, handle, peer, pkt);
612
				if (ret != kr_ok()) {
613 614
					session_tasks_finalize(s, KR_STATE_FAIL);
					session_close(s);
615 616
					return status;
				}
617
			}
618
		}
619
		if (!session_flags(s)->closing) {
Daniel Kahn Gillmor's avatar
Daniel Kahn Gillmor committed
620
			io_start_read(handle); /* Start reading new query */
621 622
		}
	}
623
	return status;
624 625
}

626 627
static void on_send(uv_udp_send_t *req, int status)
{
628 629
	uv_handle_t *handle = (uv_handle_t *)(req->handle);
	uv_loop_t *loop = handle->loop;
630
	struct qr_task *task = req->data;
Grigorii Demidov's avatar
Grigorii Demidov committed
631
	qr_task_on_send(task, handle, status);
632
	qr_task_unref(task);
633
	free(req);
634
}
635
// TODO: unify these two
636
static void on_task_write(uv_write_t *req, int status)
637
{
638 639
	uv_handle_t *handle = (uv_handle_t *)(req->handle);
	uv_loop_t *loop = handle->loop;
640
	struct qr_task *task = req->data;
Grigorii Demidov's avatar
Grigorii Demidov committed
641
	qr_task_on_send(task, handle, status);
642
	qr_task_unref(task);
643
	free(req);
644 645
}

646 647
static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
			struct sockaddr *addr, knot_pkt_t *pkt)
648
{
649
	if (!handle) {
650
		return qr_task_on_send(task, handle, kr_error(EIO));
651
	}
652

653
	int ret = 0;
654 655
	struct request_ctx *ctx = task->ctx;
	struct kr_request *req = &ctx->req;
656 657 658 659

	const bool is_stream = handle->type == UV_TCP;
	if (!is_stream && handle->type != UV_UDP) abort();

660
	if (knot_wire_get_qr(pkt->wire) == 0) {
661 662 663 664 665 666 667 668 669 670 671 672
		/*
		 * Query must be finalised using destination address before
		 * sending.
		 *
		 * Libuv does not offer a convenient way how to obtain a source
		 * IP address from a UDP handle that has been initialised using
		 * uv_udp_init(). The uv_udp_getsockname() fails because of the
		 * lazy socket initialisation.
		 *
		 * @note -- A solution might be opening a separate socket and
		 * trying to obtain the IP address from it.
		 */
673
		ret = kr_resolve_checkout(req, NULL, addr,
674
		                          is_stream ? SOCK_STREAM : SOCK_DGRAM,
675
		                          pkt);
676
		if (ret != 0) {
677
			return ret;
678
		}
679
	}
680

681 682 683 684 685
	uv_handle_t *ioreq = malloc(is_stream ? sizeof(uv_write_t) : sizeof(uv_udp_send_t));
	if (!ioreq) {
		return qr_task_on_send(task, handle, kr_error(ENOMEM));
	}

686 687 688
	/* Pending ioreq on current task */
	qr_task_ref(task);

689
	/* Send using given protocol */
690
	struct session *session = handle->data;
691 692
	assert(!session_flags(session)->closing);
	if (session_flags(session)->has_tls) {
693 694 695 696
		uv_write_t *write_req = (uv_write_t *)ioreq;
		write_req->data = task;
		ret = tls_write(write_req, handle, pkt, &on_task_write);
	} else if (handle->type == UV_UDP) {
697
		uv_udp_send_t *send_req = (uv_udp_send_t *)ioreq;
698
		uv_buf_t buf = { (char *)pkt->wire, pkt->size };
699 700 701 702
		send_req->data = task;
		ret = uv_udp_send(send_req, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
	} else if (handle->type == UV_TCP) {
		uv_write_t *write_req = (uv_write_t *)ioreq;
703 704 705 706 707
		uint16_t pkt_size = htons(pkt->size);
		uv_buf_t buf[2] = {
			{ (char *)&pkt_size, sizeof(pkt_size) },
			{ (char *)pkt->wire, pkt->size }
		};
708
		write_req->data = task;
709
		ret = uv_write(write_req, (uv_stream_t *)handle, buf, 2, &on_task_write);
710 711
	} else {
		assert(false);
712
	}
713

714
	struct worker_ctx *worker = ctx->worker;
715
	if (ret == 0) {
716 717
		if (worker->too_many_open &&
		    worker->stats.rconcurrent <
718
			worker->rconcurrent_highwatermark - 10) {
719 720
			worker->too_many_open = false;
		}
721
	} else {
722
		free(ioreq);
723
		qr_task_unref(task);
724 725 726 727
		if (ret == UV_EMFILE) {
			worker->too_many_open = true;
			worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
		}
728
	}
729

730
	/* Update statistics */
731 732
	if (session_flags(session)->outgoing && addr) {
		if (session_flags(session)->has_tls)
733 734
			worker->stats.tls += 1;
		else if (handle->type == UV_UDP)
735
			worker->stats.udp += 1;
736
		else
737
			worker->stats.tcp += 1;
738

739
		if (addr->sa_family == AF_INET6)
740
			worker->stats.ipv6 += 1;
741
		else if (addr->sa_family == AF_INET)
742
			worker->stats.ipv4 += 1;
743
	}
744
	return ret;
745 746
}

747 748 749
static int session_next_waiting_send(struct session *session)
{
	int ret = kr_ok();
750 751 752 753 754
	if (!session_waitinglist_is_empty(session)) {
		struct sockaddr *peer = session_get_peer(session);
		struct qr_task *task = session_waitinglist_get_first(session);
		uv_handle_t *handle = session_get_handle(session);
		ret = qr_task_send(task, handle, peer, task->pktbuf);
755 756 757 758 759 760
	}
	return ret;
}

static int session_tls_hs_cb(struct session *session, int status)
{
761
	assert(session_flags(session)->outgoing);
762 763 764 765 766
	uv_handle_t *handle = session_get_handle(session);
	uv_loop_t *loop = handle->loop;
	struct worker_ctx *worker = loop->data;
	struct sockaddr *peer = session_get_peer(session);
	int deletion_res = worker_del_tcp_waiting(worker, peer);
767
	int ret = kr_ok();
768

769
	if (status) {
770
		kr_nsrep_update_rtt(NULL, peer, KR_NS_DEAD,
771 772
				    worker->engine->resolver.cache_rtt,
				    KR_NS_UPDATE_NORESET);
773 774 775 776
		return ret;
	}

	/* handshake was completed successfully */
777
	struct tls_client_ctx_t *tls_client_ctx = session_tls_get_client_ctx(session);
778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795
	struct tls_client_paramlist_entry *tls_params = tls_client_ctx->params;
	gnutls_session_t tls_session = tls_client_ctx->c.tls_session;
	if (gnutls_session_is_resumed(tls_session) != 0) {
		kr_log_verbose("[tls_client] TLS session has resumed\n");
	} else {
		kr_log_verbose("[tls_client] TLS session has not resumed\n");
		/* session wasn't resumed, delete old session data ... */
		if (tls_params->session_data.data != NULL) {
			gnutls_free(tls_params->session_data.data);
			tls_params->session_data.data = NULL;
			tls_params->session_data.size = 0;
		}
		/* ... and get the new session data */
		gnutls_datum_t tls_session_data = { NULL, 0 };
		ret = gnutls_session_get_data2(tls_session, &tls_session_data);
		if (ret == 0) {
			tls_params->session_data = tls_session_data;
		}
796
	}
797

798
	ret = worker_add_tcp_connected(worker, peer, session);
799 800 801 802 803 804 805 806 807 808 809
	if (deletion_res == kr_ok() && ret == kr_ok()) {
		ret = session_next_waiting_send(session);
	} else {
		ret = kr_error(EINVAL);
	}

	if (ret != kr_ok()) {
		/* Something went wrong.
		 * Session isn't in the list of waiting sessions,
		 * or addition to the list of connected sessions failed,
		 * or write to upstream failed. */
810
		worker_del_tcp_connected(worker, peer);
811
		session_waitinglist_finalize(session, KR_STATE_FAIL);
812
		assert(session_tasklist_is_empty(session));
813
		session_close(session);
814
	} else {
815 816 817
		uv_timer_t *t = session_get_timer(session);
		uv_timer_stop(t);
		t->data = session;
818 819
		session_timer_start(session, on_tcp_watchdog_timeout,
				    MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
820 821 822 823
	}
	return kr_ok();
}

824

825 826 827
static struct kr_query *task_get_last_pending_query(struct qr_task *task)
{
	if (!task || task->ctx->req.rplan.pending.len == 0) {
828 829 830 831 832 833
		return NULL;
	}

	return array_tail(task->ctx->req.rplan.pending);
}

834

835
static void on_connect(uv_connect_t *req, int status)
836
{
837
	struct worker_ctx *worker = get_worker();
838
	uv_stream_t *handle = req->handle;
839
	struct session *session = handle->data;
840 841
	struct sockaddr *peer = session_get_peer(session);

842
	assert(session_flags(session)->outgoing);
843 844

	if (status == UV_ECANCELED) {
845
		worker_del_tcp_waiting(worker, peer);
846
		assert(session_is_empty(session) && session_flags(session)->closing);
847
		free(req);
848 849 850
		return;
	}

851
	if (session_flags(session)->closing) {
852 853
		worker_del_tcp_waiting(worker, peer);
		assert(session_is_empty(session));
854
		free(req);
855 856 857
		return;
	}

858 859
	uv_timer_t *t = session_get_timer(session);
	uv_timer_stop(t);
860

861
	if (status != 0) {
862 863 864
		worker_del_tcp_waiting(worker, peer);
		session_waitinglist_retry(session, false);
		assert(session_tasklist_is_empty(session));
865
		free(req);
866 867 868 869
		session_close(session);
		return;
	}

870
	if (!session_flags(session)->has_tls) {
871 872
		/* if there is a TLS, session still waiting for handshake,
		 * otherwise remove it from waiting list */
873
		if (worker_del_tcp_waiting(worker, peer) != 0) {
874 875
			/* session isn't in list of waiting queries, *
			 * something gone wrong */
876 877
			session_waitinglist_finalize(session, KR_STATE_FAIL);
			assert(session_tasklist_is_empty(session));
878
			free(req);
879 880 881 882 883
			session_close(session);
			return;
		}
	}

884 885
	struct qr_task *task = session_waitinglist_get_first(session);
	struct kr_query *qry = task_get_last_pending_query(task);
886
	WITH_VERBOSE (qry) {
887 888 889 890
		struct sockaddr *peer = session_get_peer(session);
		char peer_str[INET6_ADDRSTRLEN];
		inet_ntop(peer->sa_family, kr_inaddr(peer), peer_str, sizeof(peer_str));
		VERBOSE_MSG(qry, "=> connected to '%s'\n", peer_str);
891 892
	}

893
	session_flags(session)->connected = true;
894 895

	int ret = kr_ok();
896
	if (session_flags(session)->has_tls) {
897 898
		struct tls_client_ctx_t *tls_ctx = session_tls_get_client_ctx(session);
		ret = tls_client_connect_start(tls_ctx, session, session_tls_hs_cb);
899
		if (ret == kr_error(EAGAIN)) {
900
			free(req);
901
			session_start_read(session);
902 903
			session_timer_start(session, on_tcp_watchdog_timeout,
					    MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
904 905
			return;
		}
906 907
	} else {
		worker_add_tcp_connected(worker, peer, session);
908 909 910 911 912
	}

	if (ret == kr_ok()) {
		ret = session_next_waiting_send(session);
		if (ret == kr_ok()) {
913 914
			session_timer_start(session, on_tcp_watchdog_timeout,
					    MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
915
			free(req);
916 917
			return;
		}
918
	}
919

920 921
	session_waitinglist_finalize(session, KR_STATE_FAIL);
	assert(session_tasklist_is_empty(session));
922
	free(req);
923
	session_close(session);
924 925
}

926
static void on_tcp_connect_timeout(uv_timer_t *timer)
927
{
928 929 930
	struct session *session = timer->data;

	uv_timer_stop(timer);
931
	struct worker_ctx *worker = get_worker();
932

933
	assert (session_waitinglist_get_len(session) == session_tasklist_get_len(session));
934

935 936
	struct sockaddr *peer = session_get_peer(session);
	worker_del_tcp_waiting(worker, peer);
937

938 939
	struct qr_task *task = session_waitinglist_get_first(session);
	struct kr_query *qry = task_get_last_pending_query(task);
940
	WITH_VERBOSE (qry) {
941 942 943
		char peer_str[INET6_ADDRSTRLEN];
		inet_ntop(peer->sa_family, kr_inaddr(peer), peer_str, sizeof(peer_str));
		VERBOSE_MSG(qry, "=> connection to '%s' failed\n", peer_str);
944
	}
945

946
	kr_nsrep_update_rtt(NULL, peer, KR_NS_DEAD,
947 948
			    worker->engine->resolver.cache_rtt,
			    KR_NS_UPDATE_NORESET);
949

950 951 952
	worker->stats.timeout += session_waitinglist_get_len(session);
	session_waitinglist_retry(session, true);
	assert (session_tasklist_is_empty(session));
953
	session_close(session);
954 955
}

956
static void on_tcp_watchdog_timeout(uv_timer_t *timer)
957
{
958
	struct session *session = timer->data;
959 960 961
	struct worker_ctx *worker =  timer->loop->data;
	struct sockaddr *peer = session_get_peer(session);

962
	assert(session_flags(session)->outgoing);
963 964 965

	uv_timer_stop(timer);

966
	if (session_flags(session)->has_tls) {
967
		worker_del_tcp_waiting(worker, peer);
968 969
	}

970 971 972 973 974
	worker_del_tcp_connected(worker, peer);
	worker->stats.timeout += session_waitinglist_get_len(session);
	session_waitinglist_finalize(session, KR_STATE_FAIL);
	worker->stats.timeout += session_tasklist_get_len(session);
	session_tasklist_finalize(session, KR_STATE_FAIL);
975
	session_close(session);
976 977 978
}

/* This is called when I/O timeouts */
979
static void on_udp_timeout(uv_timer_t *timer)
980
{
981
	struct session *session = timer->data;
982 983 984
	assert(session_get_handle(session)->data == session);
	assert(session_tasklist_get_len(session) == 1);
	assert(session_waitinglist_is_empty(session));
985 986

	uv_timer_stop(timer);
987 988

	/* Penalize all tried nameservers with a timeout. */
989
	struct qr_task *task = session_tasklist_get_first(session);
990
	struct worker_ctx *worker = task->ctx->worker;
991
	if (task->leading && task->pending_count > 0) {
992
		struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
993 994 995
		struct sockaddr_in6 *addrlist = (struct sockaddr_in6 *)task->addrlist;
		for (uint16_t i = 0; i < MIN(task->pending_count, task->addrlist_count); ++i) {
			struct sockaddr *choice = (struct sockaddr *)(&addrlist[i]);
996
			WITH_VERBOSE(qry) {
997 998
				char addr_str[INET6_ADDRSTRLEN];
				inet_ntop(choice->sa_family, kr_inaddr(choice), addr_str, sizeof(addr_str));
999
				VERBOSE_MSG(qry, "=> server: '%s' flagged as 'bad'\n", addr_str);
1000
			}
1001
			kr_nsrep_update_rtt(&qry->ns, choice, KR_NS_DEAD,
1002 1003
					    worker->engine->resolver.cache_rtt,
					    KR_NS_UPDATE_NORESET);
1004 1005 1006 1007 1008
		}
	}
	task->timeouts += 1;
	worker->stats.timeout += 1;
	qr_task_step(task, NULL, NULL);
1009 1010
}

1011
static uv_handle_t *retransmit(struct qr_task *task)
1012
{
1013
	uv_handle_t *ret = NULL;
1014
	if (task && task->addrlist && task->addrlist_count > 0) {
1015
		struct sockaddr_in6 *choice = &((struct sockaddr_in6 *)task->addrlist)[task->addrlist_turn];
Grigorii Demidov's avatar
Grigorii Demidov committed
1016 1017 1018
		if (!choice) {
			return ret;
		}
1019
		ret = ioreq_spawn(task, SOCK_DGRAM, choice->sin6_family);
1020 1021 1022 1023 1024
		if (!ret) {
			return ret;
		}
		struct sockaddr *addr = (struct sockaddr *)choice;
		struct session *session = ret->data;
1025
		struct sockaddr *peer = session_get_peer(session);
1026
		assert (peer->sa_family == AF_UNSPEC && session_flags(session)->outgoing);
1027
		memcpy(peer, addr, kr_sockaddr_len(addr));
1028
		if (qr_task_send(task, ret, (struct sockaddr *)choice,
1029 1030 1031
				 task->pktbuf) == 0) {
			task->addrlist_turn = (task->addrlist_turn + 1) %
					      task->addrlist_count; /* Round robin */
1032 1033
		}
	}
1034
	return ret;
1035 1036 1037 1038
}

static void on_retransmit(uv_timer_t *req)
{
1039
	struct session *session = req->data;
1040
	assert(session_tasklist_get_len(session) == 1);
1041 1042

	uv_timer_stop(req);
1043
	struct qr_task *task = session_tasklist_get_first(session);
1044
	if (retransmit(task) == NULL) {
1045 1046
		/* Not possible to spawn request, start timeout timer with remaining deadline. */
		uint64_t timeout = KR_CONN_RTT_MAX - task->pending_count * KR_CONN_RETRY;
1047
		uv_timer_start(req, on_udp_timeout, timeout, 0);
1048 1049
	} else {
		uv_timer_start(req, on_retransmit, KR_CONN_RETRY, 0);
1050
	}
1051 1052
}

1053
static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
1054
{
1055
	/* Close pending timer */
1056
	ioreq_kill_pending(task);
1057
	/* Clear from outgoing table. */
1058 1059
	if (!task->leading)
		return;
1060 1061 1062 1063 1064
	char key[SUBREQ_KEY_LEN];
	const int klen = subreq_key(key, task->pktbuf);
	if (klen > 0) {
		void *val_deleted;
		int ret = trie_del(task->ctx->worker->subreq_out, key, klen, &val_deleted);
1065
		assert(ret == KNOT_EOK && val_deleted == task); (void)ret;
1066 1067
	}
	/* Notify waiting tasks. */
1068
	struct kr_query *leader_qry = array_tail(task->ctx->req.rplan.pending);
1069 1070
	for (size_t i = task->waiting.len; i > 0; i--) {
		struct qr_task *follower = task->waiting.at[i - 1];
1071
		/* Reuse MSGID and 0x20 secret */
1072 1073
		if (follower->ctx->req.rplan.pending.len > 0) {
			struct kr_query *qry = array_tail(follower->ctx->req.rplan.pending);
1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087
			qry->id = leader_qry->id;
			qry->secret = leader_qry->secret;
			leader_qry->secret = 0; /* Next will be already decoded */
		}
		qr_task_step(follower, packet_source, pkt);
		qr_task_unref(follower);
	}
	task->waiting.len = 0;
	task->leading = false;
}

static void subreq_lead(struct qr_task *task)
{
	assert(task);
1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098
	char key[SUBREQ_KEY_LEN];
	const int klen = subreq_key(key, task->pktbuf);
	if (klen < 0)
		return;
	struct qr_task **tvp = (struct qr_task **)
		trie_get_ins(task->ctx->worker->subreq_out, key, klen);
	if (unlikely(!tvp))
		return; /*ENOMEM*/
	if (unlikely(*tvp != NULL)) {
		assert(false);
		return;
1099
	}
1100 1101
	*tvp = task;
	task->leading = true;
1102 1103 1104 1105 1106
}

static bool subreq_enqueue(struct qr_task *task)
{
	assert(task);
1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121
	char key[SUBREQ_KEY_LEN];
	const int klen = subreq_key(key, task->pktbuf);
	if (klen < 0)
		return false;
	struct qr_task **leader = (struct qr_task **)
		trie_get_try(task->ctx->worker->subreq_out, key, klen);
	if (!leader /*ENOMEM*/ || !*leader)
		return false;
	/* Enqueue itself to leader for this subrequest. */
	int ret = array_push_mm((*leader)->waiting, task,
				kr_memreserve, &(*leader)->ctx->req.pool);
	if (unlikely(ret < 0)) /*ENOMEM*/
		return false;
	qr_task_ref(task);
	return true;
1122 1123 1124 1125 1126
}

static int qr_task_finalize(struct qr_task *task, int state)
{
	assert(task && task->leading == false);
1127 1128 1129
	if (task->finished) {
		return 0;
	}
1130 1131
	struct request_ctx *ctx = task->ctx;
	kr_resolve_finish(&ctx->req, state);
1132

1133
	task->finished = true;
1134 1135 1136 1137 1138
	if (ctx->source.session == NULL) {
		(void) qr_task_on_send(task, NULL, kr_error(EIO));
		return state == KR_STATE_DONE ? 0 : kr_error(EIO);
	}

1139 1140 1141
	/* Reference task as the callback handler can close it */
	qr_task_ref(task);

1142
	/* Send back answer */
1143
	struct session *source_session = ctx->source.session;
1144
	uv_handle_t *handle = session_get_handle(source_session);
1145
	assert(!session_flags(source_session)->closing);
1146 1147 1148 1149 1150 1151
	assert(handle && handle->data == ctx->source.session);
	assert(ctx->source.addr.ip.sa_family != AF_UNSPEC);
	int res = qr_task_send(task, handle,
			       (struct sockaddr *)&ctx->source.addr,
			        ctx->req.answer);
	if (res != kr_ok()) {
1152
		(void) qr_task_on_send(task, NULL, kr_error(EIO));
1153
		/* Since source session is erroneous detach all tasks. */
1154 1155
		while (!session_tasklist_is_empty(source_session)) {
			struct qr_task *t = session_tasklist_get_first(source_session);
1156 1157 1158 1159 1160 1161
			struct request_ctx *c = t->ctx;
			assert(c->source.session == source_session);
			c->source.session = NULL;
			/* Don't finalize them as there can be other tasks
			 * waiting for answer to this particular task.
			 * (ie. task->leading is true) */
1162
			session_tasklist_del_index(source_session, 0);
1163 1164
		}
		session_close(source_session);
1165
	} else if (handle->type == UV_TCP && ctx->source.session) {
1166 1167
		/* Don't try to close source session at least
		 * retry_interval_for_timeout_timer milliseconds */
1168
		session_timer_restart(ctx->source.session);
1169
	}
1170

1171 1172
	qr_task_unref(task);

1173
	return state == KR_STATE_DONE ? 0 : kr_error(EIO);
1174
}