worker.c 53.2 KB
Newer Older
1
/*  Copyright (C) 2014-2017 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
Marek Vavruša's avatar
Marek Vavruša committed
2 3 4 5 6 7 8 9 10 11 12 13

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
14
    along with this program.  If not, see <https://www.gnu.org/licenses/>.
Marek Vavruša's avatar
Marek Vavruša committed
15 16
 */

17
#include <uv.h>
18
#include <lua.h>
19
#include <libknot/packet/pkt.h>
20
#include <libknot/descriptor.h>
21 22
#include <contrib/ucw/lib.h>
#include <contrib/ucw/mempool.h>
23
#include <contrib/wire.h>
Marek Vavruša's avatar
Marek Vavruša committed
24 25 26
#if defined(__GLIBC__) && defined(_GNU_SOURCE)
#include <malloc.h>
#endif
27
#include <assert.h>
28 29
#include <sys/types.h>
#include <unistd.h>
30
#include <gnutls/gnutls.h>
31
#include "lib/utils.h"
32
#include "lib/layer.h"
33
#include "daemon/worker.h"
34
#include "daemon/bindings.h"
35
#include "daemon/engine.h"
36
#include "daemon/io.h"
37
#include "daemon/tls.h"
38
#include "daemon/zimport.h"
39
#include "daemon/session.h"
40

41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56

/* Magic defaults for the worker. */
#ifndef MP_FREELIST_SIZE
# ifdef __clang_analyzer__
#  define MP_FREELIST_SIZE 0
# else
#  define MP_FREELIST_SIZE 64 /**< Maximum length of the worker mempool freelist */
# endif
#endif
#ifndef QUERY_RATE_THRESHOLD
#define QUERY_RATE_THRESHOLD (2 * MP_FREELIST_SIZE) /**< Nr of parallel queries considered as high rate */
#endif
#ifndef MAX_PIPELINED
#define MAX_PIPELINED 100
#endif

57 58
#define VERBOSE_MSG(qry, fmt...) QRVERBOSE(qry, "wrkr", fmt)

59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
/** Client request state. */
struct request_ctx
{
	struct kr_request req;
	struct {
		union inaddr addr;
		union inaddr dst_addr;
		/* uv_handle_t *handle; */

		/** NULL if the request didn't come over network. */
		struct session *session;
	} source;
	struct worker_ctx *worker;
	qr_tasklist_t tasks;
};

/** Query resolution task. */
struct qr_task
{
	struct request_ctx *ctx;
	knot_pkt_t *pktbuf;
	qr_tasklist_t waiting;
	uv_handle_t *pending[MAX_PENDING];
	uint16_t pending_count;
	uint16_t addrlist_count;
	uint16_t addrlist_turn;
	uint16_t timeouts;
	uint16_t iter_count;
	uint16_t bytes_remaining;
	struct sockaddr *addrlist;
	uint32_t refs;
	bool finished : 1;
	bool leading  : 1;
92 93
};

94

95 96 97 98
/* Convenience macros */
#define qr_task_ref(task) \
	do { ++(task)->refs; } while(0)
#define qr_task_unref(task) \
99
	do { if (task && --(task)->refs == 0) { qr_task_free(task); } } while (0)
100
#define qr_valid_handle(task, checked) \
101 102 103 104 105 106
	(!uv_is_closing((checked)) || (task)->ctx->source.session->handle == (checked))

/** @internal get key for tcp session
 *  @note kr_straddr() return pointer to static string
 */
#define tcpsess_key(addr) kr_straddr(addr)
107 108

/* Forward decls */
109
static void qr_task_free(struct qr_task *task);
110 111 112
static int qr_task_step(struct qr_task *task,
			const struct sockaddr *packet_source,
			knot_pkt_t *packet);
113
static int qr_task_send(struct qr_task *task, struct session *session,
114 115 116 117
			struct sockaddr *addr, knot_pkt_t *pkt);
static int qr_task_finalize(struct qr_task *task, int state);
static void qr_task_complete(struct qr_task *task);
static struct session* worker_find_tcp_connected(struct worker_ctx *worker,
118
						 const struct sockaddr *addr);
119 120 121 122 123 124
static int worker_add_tcp_waiting(struct worker_ctx *worker,
				  const struct sockaddr *addr,
				  struct session *session);
static int worker_del_tcp_waiting(struct worker_ctx *worker,
				  const struct sockaddr *addr);
static struct session* worker_find_tcp_waiting(struct worker_ctx *worker,
125
					       const struct sockaddr *addr);
126 127
static void on_tcp_connect_timeout(uv_timer_t *timer);
static void on_tcp_watchdog_timeout(uv_timer_t *timer);
128 129 130 131 132 133 134

/** @internal Get singleton worker. */
static inline struct worker_ctx *get_worker(void)
{
	return uv_default_loop()->data;
}

135 136
/*! @internal Create a UDP/TCP handle for an outgoing AF_INET* connection.
 *  socktype is SOCK_* */
137
static uv_handle_t *ioreq_spawn(struct worker_ctx *worker, int socktype, sa_family_t family)
138
{
139 140 141
	bool precond = (socktype == SOCK_DGRAM || socktype == SOCK_STREAM)
			&& (family == AF_INET  || family == AF_INET6);
	if (!precond) {
142 143
		/* assert(false); see #245 */
		kr_log_verbose("[work] ioreq_spawn: pre-condition failed\n");
144 145 146
		return NULL;
	}

147
	/* Create connection for iterative query */
148 149
	uv_handle_t *handle = malloc(socktype == SOCK_DGRAM
					? sizeof(uv_udp_t) : sizeof(uv_tcp_t));
150 151 152
	if (!handle) {
		return NULL;
	}
153 154 155 156 157 158
	int ret = io_create(worker->loop, handle, socktype, family);
	if (ret) {
		if (ret == UV_EMFILE) {
			worker->too_many_open = true;
			worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
		}
159
		free(handle);
160 161
		return NULL;
	}
162 163 164 165

	/* Bind to outgoing address, according to IP v4/v6. */
	union inaddr *addr;
	if (family == AF_INET) {
166
		addr = (union inaddr *)&worker->out_addr4;
167
	} else {
168
		addr = (union inaddr *)&worker->out_addr6;
169 170 171 172
	}
	if (addr->ip.sa_family != AF_UNSPEC) {
		assert(addr->ip.sa_family == family);
		if (socktype == SOCK_DGRAM) {
173 174 175 176 177
			uv_udp_t *udp = (uv_udp_t *)handle;
			ret = uv_udp_bind(udp, &addr->ip, 0);
		} else if (socktype == SOCK_STREAM){
			uv_tcp_t *tcp = (uv_tcp_t *)handle;
			ret = uv_tcp_bind(tcp, &addr->ip, 0);
178 179 180
		}
	}

181
	if (ret != 0) {
182
		io_deinit(handle);
183
		free(handle);
184 185
		return NULL;
	}
186 187 188 189

	/* Set current handle as a subrequest type. */
	struct session *session = handle->data;
	session_flags(session)->outgoing = true;
190
	/* Connect or issue query datagram */
191
	return handle;
192 193
}

194
static void ioreq_kill_pending(struct qr_task *task)
195
{
196
	for (uint16_t i = 0; i < task->pending_count; ++i) {
197
		session_kill_ioreq(task->pending[i]->data, task);
198 199 200 201
	}
	task->pending_count = 0;
}

202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
/** @cond This memory layout is internal to mempool.c, use only for debugging. */
#if defined(__SANITIZE_ADDRESS__)
struct mempool_chunk {
  struct mempool_chunk *next;
  size_t size;
};
static void mp_poison(struct mempool *mp, bool poison)
{
	if (!poison) { /* @note mempool is part of the first chunk, unpoison it first */
		kr_asan_unpoison(mp, sizeof(*mp));
	}
	struct mempool_chunk *chunk = mp->state.last[0];
	void *chunk_off = (void *)chunk - chunk->size;
	if (poison) {
		kr_asan_poison(chunk_off, chunk->size);
	} else {
		kr_asan_unpoison(chunk_off, chunk->size);
	}
}
#else
#define mp_poison(mp, enable)
#endif
/** @endcond */

226
/** Get a mempool.  (Recycle if possible.)  */
227
static inline struct mempool *pool_borrow(struct worker_ctx *worker)
228 229
{
	struct mempool *mp = NULL;
230 231 232 233
	if (worker->pool_mp.len > 0) {
		mp = array_tail(worker->pool_mp);
		array_pop(worker->pool_mp);
		mp_poison(mp, 0);
234 235 236 237 238 239
	} else { /* No mempool on the freelist, create new one */
		mp = mp_new (4 * CPU_PAGE_SIZE);
	}
	return mp;
}

240
/** Return a mempool.  (Cache them up to some count.) */
241 242
static inline void pool_release(struct worker_ctx *worker, struct mempool *mp)
{
243
	if (worker->pool_mp.len < MP_FREELIST_SIZE) {
244
		mp_flush(mp);
245
		array_push(worker->pool_mp, mp);
246
		mp_poison(mp, 1);
247 248 249 250 251
	} else {
		mp_delete(mp);
	}
}

252 253 254 255 256 257 258 259 260 261 262 263
/** Create a key for an outgoing subrequest: qname, qclass, qtype.
 * @param key Destination buffer for key size, MUST be SUBREQ_KEY_LEN or larger.
 * @return key length if successful or an error
 */
static const size_t SUBREQ_KEY_LEN = KR_RRKEY_LEN;
static int subreq_key(char *dst, knot_pkt_t *pkt)
{
	assert(pkt);
	return kr_rrkey(dst, knot_pkt_qclass(pkt), knot_pkt_qname(pkt),
			knot_pkt_qtype(pkt), knot_pkt_qtype(pkt));
}

264 265 266 267 268 269 270 271
/** Create and initialize a request_ctx (on a fresh mempool).
 *
 * handle and addr point to the source of the request, and they are NULL
 * in case the request didn't come from network.
 */
static struct request_ctx *request_create(struct worker_ctx *worker,
					  uv_handle_t *handle,
					  const struct sockaddr *addr)
272
{
273
	knot_mm_t pool = {
274
		.ctx = pool_borrow(worker),
275
		.alloc = (knot_mm_alloc_t) mp_alloc
276
	};
277

278 279 280 281
	/* Create request context */
	struct request_ctx *ctx = mm_alloc(&pool, sizeof(*ctx));
	if (!ctx) {
		pool_release(worker, pool.ctx);
282 283
		return NULL;
	}
284

285 286 287 288 289
	memset(ctx, 0, sizeof(*ctx));

	/* TODO Relocate pool to struct request */
	ctx->worker = worker;
	array_init(ctx->tasks);
290 291
	struct session *s = handle ? handle->data : NULL;
	if (s) {
292
		assert(session_flags(s)->outgoing == false);
293
	}
294
	ctx->source.session = s;
295 296 297

	struct kr_request *req = &ctx->req;
	req->pool = pool;
298
	req->vars_ref = LUA_NOREF;
299

300
	/* Remember query source addr */
301 302 303
	if (!addr || (addr->sa_family != AF_INET && addr->sa_family != AF_INET6)) {
		ctx->source.addr.ip.sa_family = AF_UNSPEC;
	} else {
304 305 306
		size_t addr_len = sizeof(struct sockaddr_in);
		if (addr->sa_family == AF_INET6)
			addr_len = sizeof(struct sockaddr_in6);
307 308
		memcpy(&ctx->source.addr.ip, addr, addr_len);
		ctx->req.qsource.addr = &ctx->source.addr.ip;
309
	}
310 311 312 313 314

	worker->stats.rconcurrent += 1;

	if (!handle) {
		return ctx;
315
	}
316

317
	/* Remember the destination address. */
318 319 320 321 322 323
	int addr_len = sizeof(ctx->source.dst_addr);
	struct sockaddr *dst_addr = &ctx->source.dst_addr.ip;
	ctx->source.dst_addr.ip.sa_family = AF_UNSPEC;
	if (handle->type == UV_UDP) {
		if (uv_udp_getsockname((uv_udp_t *)handle, dst_addr, &addr_len) == 0) {
			req->qsource.dst_addr = dst_addr;
324
		}
325 326 327 328
		req->qsource.tcp = false;
	} else if (handle->type == UV_TCP) {
		if (uv_tcp_getsockname((uv_tcp_t *)handle, dst_addr, &addr_len) == 0) {
			req->qsource.dst_addr = dst_addr;
329
		}
330
		req->qsource.tcp = true;
331
	}
332 333

	return ctx;
334 335
}

336 337
/** More initialization, related to the particular incoming query/packet. */
static int request_start(struct request_ctx *ctx, knot_pkt_t *query)
338
{
339 340 341 342 343
	assert(query && ctx);
	size_t answer_max = KNOT_WIRE_MIN_PKTSIZE;
	struct kr_request *req = &ctx->req;

	/* source.session can be empty if request was generated by kresd itself */
344 345
	struct session *s = ctx->source.session;
	if (!s || session_get_handle(s)->type == UV_TCP) {
346 347 348 349
		answer_max = KNOT_WIRE_MAX_PKTSIZE;
	} else if (knot_pkt_has_edns(query)) { /* EDNS */
		answer_max = MAX(knot_edns_get_payload(query->opt_rr),
				 KNOT_WIRE_MIN_PKTSIZE);
350
	}
351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381
	req->qsource.size = query->size;

	req->answer = knot_pkt_new(NULL, answer_max, &req->pool);
	if (!req->answer) {
		return kr_error(ENOMEM);
	}

	/* Remember query source TSIG key */
	if (query->tsig_rr) {
		req->qsource.key = knot_rrset_copy(query->tsig_rr, &req->pool);
	}

	/* Remember query source EDNS data */
	if (query->opt_rr) {
		req->qsource.opt = knot_rrset_copy(query->opt_rr, &req->pool);
	}
	/* Start resolution */
	struct worker_ctx *worker = ctx->worker;
	struct engine *engine = worker->engine;
	kr_resolve_begin(req, &engine->resolver, req->answer);
	worker->stats.queries += 1;
	/* Throttle outbound queries only when high pressure */
	if (worker->stats.concurrent < QUERY_RATE_THRESHOLD) {
		req->options.NO_THROTTLE = true;
	}
	return kr_ok();
}

static void request_free(struct request_ctx *ctx)
{
	struct worker_ctx *worker = ctx->worker;
382 383 384 385 386 387 388 389 390 391 392 393 394 395
	/* Dereference any Lua vars table if exists */
	if (ctx->req.vars_ref != LUA_NOREF) {
		lua_State *L = worker->engine->L;
		/* Get worker variables table */
		lua_rawgeti(L, LUA_REGISTRYINDEX, worker->vars_table_ref);
		/* Get next free element (position 0) and store it under current reference (forming a list) */
		lua_rawgeti(L, -1, 0);
		lua_rawseti(L, -2, ctx->req.vars_ref);
		/* Set current reference as the next free element */
		lua_pushinteger(L, ctx->req.vars_ref);
		lua_rawseti(L, -2, 0);
		lua_pop(L, 1);
		ctx->req.vars_ref = LUA_NOREF;
	}
396
	/* Return mempool to ring or free it if it's full */
397
	pool_release(worker, ctx->req.pool.ctx);
398
	/* @note The 'task' is invalidated from now on. */
Marek Vavruša's avatar
Marek Vavruša committed
399
	/* Decommit memory every once in a while */
400
	static int mp_delete_count = 0;
401 402 403
	if (++mp_delete_count == 100000) {
		lua_gc(worker->engine->L, LUA_GCCOLLECT, 0);
#if defined(__GLIBC__) && defined(_GNU_SOURCE)
Marek Vavruša's avatar
Marek Vavruša committed
404
		malloc_trim(0);
405
#endif
Marek Vavruša's avatar
Marek Vavruša committed
406
		mp_delete_count = 0;
407
	}
408
	worker->stats.rconcurrent -= 1;
409
}
410

411
static int request_add_tasks(struct request_ctx *ctx, struct qr_task *task)
412
{
413 414 415 416 417 418 419 420
	for (int i = 0; i < ctx->tasks.len; ++i) {
		if (ctx->tasks.at[i] == task) {
			return i;
		}
	}
	int ret = array_push(ctx->tasks, task);
	if (ret >= 0) {
		qr_task_ref(task);
421
	}
422 423
	return ret;
}
424

425 426 427 428 429 430 431 432 433 434
static int request_del_tasks(struct request_ctx *ctx, struct qr_task *task)
{
	int ret = kr_error(ENOENT);
	for (int i = 0; i < ctx->tasks.len; ++i) {
		if (ctx->tasks.at[i] == task) {
			array_del(ctx->tasks, i);
			qr_task_unref(task);
			ret = kr_ok();
			break;
		}
435
	}
436 437
	return ret;
}
438

439 440 441 442 443 444 445 446
static struct qr_task *qr_task_create(struct request_ctx *ctx)
{
	/* How much can client handle? */
	struct engine *engine = ctx->worker->engine;
	size_t pktbuf_max = KR_EDNS_PAYLOAD;
	if (engine->resolver.opt_rr) {
		pktbuf_max = MAX(knot_edns_get_payload(engine->resolver.opt_rr),
				 pktbuf_max);
447 448
	}

449 450 451 452
	/* Create resolution task */
	struct qr_task *task = mm_alloc(&ctx->req.pool, sizeof(*task));
	if (!task) {
		return NULL;
453
	}
454
	memset(task, 0, sizeof(*task)); /* avoid accidentally unintialized fields */
455

456 457 458 459 460
	/* Create packet buffers for answer and subrequests */
	knot_pkt_t *pktbuf = knot_pkt_new(NULL, pktbuf_max, &ctx->req.pool);
	if (!pktbuf) {
		mm_free(&ctx->req.pool, task);
		return NULL;
461
	}
462
	pktbuf->size = 0;
463

464 465 466 467 468 469 470 471 472
	task->ctx = ctx;
	task->pktbuf = pktbuf;
	array_init(task->waiting);
	task->refs = 0;
	int ret = request_add_tasks(ctx, task);
	if (ret < 0) {
		mm_free(&ctx->req.pool, task);
		mm_free(&ctx->req.pool, pktbuf);
		return NULL;
473
	}
474 475
	ctx->worker->stats.concurrent += 1;
	return task;
476 477
}

478 479 480 481 482 483 484 485
/* This is called when the task refcount is zero, free memory. */
static void qr_task_free(struct qr_task *task)
{
	struct request_ctx *ctx = task->ctx;

	assert(ctx);

	/* Process outbound session. */
486
	struct session *s = ctx->source.session;
487 488 489
	struct worker_ctx *worker = ctx->worker;

	/* Process source session. */
490
	if (s && session_tasklist_get_len(s) < worker->tcp_pipeline_max/2 &&
491
	    !session_flags(s)->closing && session_flags(s)->throttled) {
492
		uv_handle_t *handle = session_get_handle(s);
493 494
		/* Start reading again if the session is throttled and
		 * the number of outgoing requests is below watermark. */
Grigorii Demidov's avatar
Grigorii Demidov committed
495 496
		if (handle) {
			io_start_read(handle);
497
			session_flags(s)->throttled = false;
498 499 500 501 502 503 504 505 506 507 508 509
		}
	}

	if (ctx->tasks.len == 0) {
		array_clear(ctx->tasks);
		request_free(ctx);
	}

	/* Update stats */
	worker->stats.concurrent -= 1;
}

510 511 512
/*@ Register new qr_task within session. */
static int qr_task_register(struct qr_task *task, struct session *session)
{
513
	assert(!session_flags(session)->outgoing && session_get_handle(session)->type == UV_TCP);
514 515 516 517 518 519 520 521 522 523 524

	session_tasklist_add(session, task);

	struct request_ctx *ctx = task->ctx;
	assert(ctx && (ctx->source.session == NULL || ctx->source.session == session));
	ctx->source.session = session;
	/* Soft-limit on parallel queries, there is no "slow down" RCODE
	 * that we could use to signalize to client, but we can stop reading,
	 * an in effect shrink TCP window size. To get more precise throttling,
	 * we would need to copy remainder of the unread buffer and reassemble
	 * when resuming reading. This is NYI.  */
525 526 527 528
	if (session_tasklist_get_len(session) >= task->ctx->worker->tcp_pipeline_max &&
	    !session_flags(session)->throttled && !session_flags(session)->closing) {
		session_stop_read(session);
		session_flags(session)->throttled = true;
529 530 531 532 533
	}

	return 0;
}

534
static void qr_task_complete(struct qr_task *task)
535
{
536
	struct request_ctx *ctx = task->ctx;
537

538
	/* Kill pending I/O requests */
539
	ioreq_kill_pending(task);
540 541
	assert(task->waiting.len == 0);
	assert(task->leading == false);
542

543 544
	struct session *s = ctx->source.session;
	if (s) {
545
		assert(!session_flags(s)->outgoing && session_waitinglist_is_empty(s));
546
		session_tasklist_del(s, task);
Grigorii Demidov's avatar
Grigorii Demidov committed
547
	}
548

549
	/* Release primary reference to task. */
550
	request_del_tasks(ctx, task);
551 552
}

553
/* This is called when we send subrequest / answer */
554
static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
555
{
556

557 558 559 560 561
	if (task->finished) {
		assert(task->leading == false);
		qr_task_complete(task);
	}

562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582
	if (!handle || handle->type != UV_TCP) {
		return status;
	}

	struct session* s = handle->data;
	assert(s);
	if (status != 0) {
		session_tasklist_del(s, task);
	}

	if (session_flags(s)->outgoing || session_flags(s)->closing) {
		return status;
	}

	struct worker_ctx *worker = task->ctx->worker;
	if (session_flags(s)->throttled &&
	    session_tasklist_get_len(s) < worker->tcp_pipeline_max/2) {
	   /* Start reading again if the session is throttled and
	    * the number of outgoing requests is below watermark. */
		session_start_read(s);
		session_flags(s)->throttled = false;
583
	}
584

585
	return status;
586 587
}

588 589 590
static void on_send(uv_udp_send_t *req, int status)
{
	struct qr_task *task = req->data;
591 592
	uv_handle_t *h = (uv_handle_t *)req->handle;
	qr_task_on_send(task, h, status);
593
	qr_task_unref(task);
594
	free(req);
595
}
596 597

static void on_write(uv_write_t *req, int status)
598 599
{
	struct qr_task *task = req->data;
600 601
	uv_handle_t *h = (uv_handle_t *)req->handle;
	qr_task_on_send(task, h, status);
602
	qr_task_unref(task);
603
	free(req);
604 605
}

606
static int qr_task_send(struct qr_task *task, struct session *session,
607
			struct sockaddr *addr, knot_pkt_t *pkt)
608
{
609 610
	if (!session) {
		return qr_task_on_send(task, NULL, kr_error(EIO));
611
	}
612

613
	int ret = 0;
614 615
	struct request_ctx *ctx = task->ctx;
	struct kr_request *req = &ctx->req;
616

617 618
	uv_handle_t *handle = session_get_handle(session);
	assert(handle && handle->data == session);
619 620 621
	const bool is_stream = handle->type == UV_TCP;
	if (!is_stream && handle->type != UV_UDP) abort();

622 623 624 625 626 627 628 629 630 631 632
	if (addr == NULL) {
		addr = session_get_peer(session);
	}

	if (pkt == NULL) {
		pkt = worker_task_get_pktbuf(task);
	}

	if (session_flags(session)->outgoing) {
		size_t try_limit = session_tasklist_get_len(session) + 1;
		uint16_t msg_id = knot_wire_get_id(pkt->wire);
633
		size_t try_count = 0;
634 635 636 637 638 639
		while (session_tasklist_find_msgid(session, msg_id) &&
		       try_count <= try_limit) {
			++msg_id;
			++try_count;
		}
		if (try_count > try_limit) {
640
			return kr_error(ENOENT);
641 642 643 644
		}
		worker_task_pkt_set_msgid(task, msg_id);
	}

645
	if (knot_wire_get_qr(pkt->wire) == 0) {
646 647 648 649 650 651 652 653 654 655 656 657
		/*
		 * Query must be finalised using destination address before
		 * sending.
		 *
		 * Libuv does not offer a convenient way how to obtain a source
		 * IP address from a UDP handle that has been initialised using
		 * uv_udp_init(). The uv_udp_getsockname() fails because of the
		 * lazy socket initialisation.
		 *
		 * @note -- A solution might be opening a separate socket and
		 * trying to obtain the IP address from it.
		 */
658
		ret = kr_resolve_checkout(req, NULL, addr,
659
		                          is_stream ? SOCK_STREAM : SOCK_DGRAM,
660
		                          pkt);
661
		if (ret != 0) {
662
			return ret;
663
		}
664
	}
665

666 667 668 669 670
	uv_handle_t *ioreq = malloc(is_stream ? sizeof(uv_write_t) : sizeof(uv_udp_send_t));
	if (!ioreq) {
		return qr_task_on_send(task, handle, kr_error(ENOMEM));
	}

671 672 673
	/* Pending ioreq on current task */
	qr_task_ref(task);

674
	struct worker_ctx *worker = ctx->worker;
675
	/* Send using given protocol */
676 677
	assert(!session_flags(session)->closing);
	if (session_flags(session)->has_tls) {
678 679
		uv_write_t *write_req = (uv_write_t *)ioreq;
		write_req->data = task;
680
		ret = tls_write(write_req, handle, pkt, &on_write);
681
	} else if (handle->type == UV_UDP) {
682
		uv_udp_send_t *send_req = (uv_udp_send_t *)ioreq;
683
		uv_buf_t buf = { (char *)pkt->wire, pkt->size };
684 685 686 687
		send_req->data = task;
		ret = uv_udp_send(send_req, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
	} else if (handle->type == UV_TCP) {
		uv_write_t *write_req = (uv_write_t *)ioreq;
688 689 690 691 692
		uint16_t pkt_size = htons(pkt->size);
		uv_buf_t buf[2] = {
			{ (char *)&pkt_size, sizeof(pkt_size) },
			{ (char *)pkt->wire, pkt->size }
		};
693
		write_req->data = task;
694
		ret = uv_write(write_req, (uv_stream_t *)handle, buf, 2, &on_write);
695 696
	} else {
		assert(false);
697
	}
698

699
	if (ret == 0) {
700 701 702
		if (session_flags(session)->outgoing) {
			session_tasklist_add(session, task);
		}
703 704
		if (worker->too_many_open &&
		    worker->stats.rconcurrent <
705
			worker->rconcurrent_highwatermark - 10) {
706 707
			worker->too_many_open = false;
		}
708
	} else {
709
		free(ioreq);
710
		qr_task_unref(task);
711 712 713
		if (ret == UV_EMFILE) {
			worker->too_many_open = true;
			worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
714
			ret = kr_error(UV_EMFILE);
715
		}
716
	}
717

718
	/* Update statistics */
719 720
	if (session_flags(session)->outgoing && addr) {
		if (session_flags(session)->has_tls)
721 722
			worker->stats.tls += 1;
		else if (handle->type == UV_UDP)
723
			worker->stats.udp += 1;
724
		else
725
			worker->stats.tcp += 1;
726

727
		if (addr->sa_family == AF_INET6)
728
			worker->stats.ipv6 += 1;
729
		else if (addr->sa_family == AF_INET)
730
			worker->stats.ipv4 += 1;
731
	}
732
	return ret;
733 734
}

735 736
static int session_tls_hs_cb(struct session *session, int status)
{
737
	assert(session_flags(session)->outgoing);
738 739 740 741 742
	uv_handle_t *handle = session_get_handle(session);
	uv_loop_t *loop = handle->loop;
	struct worker_ctx *worker = loop->data;
	struct sockaddr *peer = session_get_peer(session);
	int deletion_res = worker_del_tcp_waiting(worker, peer);
743
	int ret = kr_ok();
744

745
	if (status) {
746
		kr_nsrep_update_rtt(NULL, peer, KR_NS_DEAD,
747 748
				    worker->engine->resolver.cache_rtt,
				    KR_NS_UPDATE_NORESET);
749 750 751 752
		return ret;
	}

	/* handshake was completed successfully */
753
	struct tls_client_ctx_t *tls_client_ctx = session_tls_get_client_ctx(session);
754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771
	struct tls_client_paramlist_entry *tls_params = tls_client_ctx->params;
	gnutls_session_t tls_session = tls_client_ctx->c.tls_session;
	if (gnutls_session_is_resumed(tls_session) != 0) {
		kr_log_verbose("[tls_client] TLS session has resumed\n");
	} else {
		kr_log_verbose("[tls_client] TLS session has not resumed\n");
		/* session wasn't resumed, delete old session data ... */
		if (tls_params->session_data.data != NULL) {
			gnutls_free(tls_params->session_data.data);
			tls_params->session_data.data = NULL;
			tls_params->session_data.size = 0;
		}
		/* ... and get the new session data */
		gnutls_datum_t tls_session_data = { NULL, 0 };
		ret = gnutls_session_get_data2(tls_session, &tls_session_data);
		if (ret == 0) {
			tls_params->session_data = tls_session_data;
		}
772
	}
773

774
	ret = worker_add_tcp_connected(worker, peer, session);
775
	if (deletion_res == kr_ok() && ret == kr_ok()) {
776 777 778 779 780 781 782 783
		while (!session_waitinglist_is_empty(session)) {
			struct qr_task *t = session_waitinglist_get(session);
			ret = qr_task_send(t, session, NULL, NULL);
			if (ret != 0) {
				break;
			}
			session_waitinglist_pop(session, true);
		}
784 785 786 787 788 789 790 791 792
	} else {
		ret = kr_error(EINVAL);
	}

	if (ret != kr_ok()) {
		/* Something went wrong.
		 * Session isn't in the list of waiting sessions,
		 * or addition to the list of connected sessions failed,
		 * or write to upstream failed. */
793
		worker_del_tcp_connected(worker, peer);
794
		session_waitinglist_finalize(session, KR_STATE_FAIL);
795
		assert(session_tasklist_is_empty(session));
796
		session_close(session);
797
	} else {
798
		session_timer_stop(session);
799 800
		session_timer_start(session, on_tcp_watchdog_timeout,
				    MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
801 802 803 804
	}
	return kr_ok();
}

805

806 807 808
static struct kr_query *task_get_last_pending_query(struct qr_task *task)
{
	if (!task || task->ctx->req.rplan.pending.len == 0) {
809 810 811 812 813 814
		return NULL;
	}

	return array_tail(task->ctx->req.rplan.pending);
}

815

816
static void on_connect(uv_connect_t *req, int status)
817
{
818
	struct worker_ctx *worker = get_worker();
819
	uv_stream_t *handle = req->handle;
820
	struct session *session = handle->data;
821
	struct sockaddr *peer = session_get_peer(session);
822
	free(req);
823

824
	assert(session_flags(session)->outgoing);
825 826

	if (status == UV_ECANCELED) {
827
		worker_del_tcp_waiting(worker, peer);
828
		assert(session_is_empty(session) && session_flags(session)->closing);
829 830 831
		return;
	}

832
	if (session_flags(session)->closing) {
833 834
		worker_del_tcp_waiting(worker, peer);
		assert(session_is_empty(session));
835 836 837
		return;
	}

838
	session_timer_stop(session);
839

840
	if (status != 0) {
841 842
		worker_del_tcp_waiting(worker, peer);
		assert(session_tasklist_is_empty(session));
843
		session_waitinglist_retry(session, false);
844 845 846 847
		session_close(session);
		return;
	}

848
	if (!session_flags(session)->has_tls) {
849 850
		/* if there is a TLS, session still waiting for handshake,
		 * otherwise remove it from waiting list */
851
		if (worker_del_tcp_waiting(worker, peer) != 0) {
852 853
			/* session isn't in list of waiting queries, *
			 * something gone wrong */
854 855
			session_waitinglist_finalize(session, KR_STATE_FAIL);
			assert(session_tasklist_is_empty(session));
856 857 858 859 860
			session_close(session);
			return;
		}
	}

861
	struct qr_task *task = session_waitinglist_get(session);
862
	struct kr_query *qry = task_get_last_pending_query(task);
863
	WITH_VERBOSE (qry) {
864 865 866 867
		struct sockaddr *peer = session_get_peer(session);
		char peer_str[INET6_ADDRSTRLEN];
		inet_ntop(peer->sa_family, kr_inaddr(peer), peer_str, sizeof(peer_str));
		VERBOSE_MSG(qry, "=> connected to '%s'\n", peer_str);
868 869
	}

870
	session_flags(session)->connected = true;
871
	session_start_read(session);
872 873

	int ret = kr_ok();
874
	if (session_flags(session)->has_tls) {
875 876
		struct tls_client_ctx_t *tls_ctx = session_tls_get_client_ctx(session);
		ret = tls_client_connect_start(tls_ctx, session, session_tls_hs_cb);
877
		if (ret == kr_error(EAGAIN)) {
878 879
			session_timer_start(session, on_tcp_watchdog_timeout,
					    MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
880 881
			return;
		}
882 883
	} else {
		worker_add_tcp_connected(worker, peer, session);
884
	}
885 886 887 888 889 890 891 892
	while (!session_waitinglist_is_empty(session)) {
		struct qr_task *t = session_waitinglist_get(session);
		ret = qr_task_send(t, session, NULL, NULL);
		if (ret != 0) {
			assert(session_tasklist_is_empty(session));
			worker_del_tcp_connected(worker, peer);
			session_waitinglist_finalize(session, KR_STATE_FAIL);
			session_close(session);
893 894
			return;
		}
895
		session_waitinglist_pop(session, true);
896
	}
897 898
	session_timer_start(session, on_tcp_watchdog_timeout,
			    MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
899 900
}

901
static void on_tcp_connect_timeout(uv_timer_t *timer)
902
{
903 904 905
	struct session *session = timer->data;

	uv_timer_stop(timer);
906
	struct worker_ctx *worker = get_worker();
907

908
	assert (session_tasklist_is_empty(session));
909

910 911
	struct sockaddr *peer = session_get_peer(session);
	worker_del_tcp_waiting(worker, peer);
912

913
	struct qr_task *task = session_waitinglist_get(session);
914
	struct kr_query *qry = task_get_last_pending_query(task);
915
	WITH_VERBOSE (qry) {
916 917 918
		char peer_str[INET6_ADDRSTRLEN];
		inet_ntop(peer->sa_family, kr_inaddr(peer), peer_str, sizeof(peer_str));
		VERBOSE_MSG(qry, "=> connection to '%s' failed\n", peer_str);
919
	}
920

921
	kr_nsrep_update_rtt(NULL, peer, KR_NS_DEAD,
922 923
			    worker->engine->resolver.cache_rtt,
			    KR_NS_UPDATE_NORESET);
924

925 926 927
	worker->stats.timeout += session_waitinglist_get_len(session);
	session_waitinglist_retry(session, true);
	assert (session_tasklist_is_empty(session));
928
	session_close(session);
929 930
}

931
static void on_tcp_watchdog_timeout(uv_timer_t *timer)
932
{
933
	struct session *session = timer->data;
934

935
	assert(session_flags(session)->outgoing);
936 937 938 939
	assert(!session_flags(session)->closing);

	struct worker_ctx *worker =  timer->loop->data;
	struct sockaddr *peer = session_get_peer(session);
940 941 942

	uv_timer_stop(timer);

943
	if (session_flags(session)->has_tls) {
944
		worker_del_tcp_waiting(worker, peer);
945 946
	}

947 948 949 950 951
	worker_del_tcp_connected(worker, peer);
	worker->stats.timeout += session_waitinglist_get_len(session);
	session_waitinglist_finalize(session, KR_STATE_FAIL);
	worker->stats.timeout += session_tasklist_get_len(session);
	session_tasklist_finalize(session, KR_STATE_FAIL);
952
	session_close(session);
953 954 955
}

/* This is called when I/O timeouts */
956
static void on_udp_timeout(uv_timer_t *timer)
957
{
958
	struct session *session = timer->data;
959 960 961
	assert(session_get_handle(session)->data == session);
	assert(session_tasklist_get_len(session) == 1);
	assert(session_waitinglist_is_empty(session));
962 963

	uv_timer_stop(timer);
964 965

	/* Penalize all tried nameservers with a timeout. */
966
	struct qr_task *task = session_tasklist_get_first(session);
967
	struct worker_ctx *worker = task->ctx->worker;
968
	if (task->leading && task->pending_count > 0) {
969
		struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
970 971 972
		struct sockaddr_in6 *addrlist = (struct sockaddr_in6 *)task->addrlist;
		for (uint16_t i = 0; i < MIN(task->pending_count, task->addrlist_count); ++i) {
			struct sockaddr *choice = (struct sockaddr *)(&addrlist[i]);
973
			WITH_VERBOSE(qry) {
974 975
				char addr_str[INET6_ADDRSTRLEN];
				inet_ntop(choice->sa_family, kr_inaddr(choice), addr_str, sizeof(addr_str));
976
				VERBOSE_MSG(qry, "=> server: '%s' flagged as 'bad'\n", addr_str);
977
			}
978
			kr_nsrep_update_rtt(&qry->ns, choice, KR_NS_DEAD,
979 980
					    worker->engine->resolver.cache_rtt,
					    KR_NS_UPDATE_NORESET);
981 982 983 984 985
		}
	}
	task->timeouts += 1;
	worker->stats.timeout += 1;
	qr_task_step(task, NULL, NULL);
986 987
}

988
static uv_handle_t *retransmit(struct qr_task *task)
989
{
990
	uv_handle_t *ret = NULL;
991
	if (task && task->addrlist && task->addrlist_count > 0) {
992
		struct sockaddr_in6 *choice = &((struct sockaddr_in6 *)task->addrlist)[task->addrlist_turn];
Grigorii Demidov's avatar
Grigorii Demidov committed
993 994 995
		if (!choice) {
			return ret;
		}
996 997 998 999
		if (task->pending_count >= MAX_PENDING) {
			return ret;
		}
		ret = ioreq_spawn(task->ctx->worker, SOCK_DGRAM, choice->sin6_family);
1000 1001 1002 1003 1004
		if (!ret) {
			return ret;
		}
		struct sockaddr *addr = (struct sockaddr *)choice;
		struct session *session = ret->data;
1005
		struct sockaddr *peer = session_get_peer(session);
1006
		assert (peer->sa_family == AF_UNSPEC && session_flags(session)->outgoing);
1007
		memcpy(peer, addr, kr_sockaddr_len(addr));
1008 1009 1010 1011 1012 1013 1014
		if (qr_task_send(task, session, (struct sockaddr *)choice,
				 task->pktbuf) != 0) {
			session_close(session);
			ret = NULL;
		} else {
			task->pending[task->pending_count] = session_get_handle(session);
			task->pending_count += 1;
1015 1016
			task->addrlist_turn = (task->addrlist_turn + 1) %
					      task->addrlist_count; /* Round robin */
1017
			session_start_read(session); /* Start reading answer */
1018 1019
		}
	}
1020
	return ret;
1021 1022 1023 1024
}

static void on_retransmit(uv_timer_t *req)
{
1025
	struct session *session = req->data;
1026
	assert(session_tasklist_get_len(session) == 1);
1027 1028

	uv_timer_stop(req);
1029
	struct qr_task *task = session_tasklist_get_first(session);
1030
	if (retransmit(task) == NULL) {
1031 1032
		/* Not possible to spawn request, start timeout timer with remaining deadline. */
		uint64_t timeout = KR_CONN_RTT_MAX - task->pending_count * KR_CONN_RETRY;
1033
		uv_timer_start(req, on_udp_timeout, timeout, 0);
1034 1035
	} else {
		uv_timer_start(req, on_retransmit, KR_CONN_RETRY, 0);
1036
	}
1037 1038
}

1039
static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
1040
{
1041
	/* Close pending timer */
1042
	ioreq_kill_pending(task);
1043
	/* Clear from outgoing table. */
1044 1045
	if (!task->leading)
		return;
1046 1047 1048 1049 1050
	char key[SUBREQ_KEY_LEN];
	const int klen = subreq_key(key, task->pktbuf);
	if (klen > 0) {
		void *val_deleted;
		int ret = trie_del(task->ctx->worker->subreq_out, key, klen, &val_deleted);
1051
		assert(ret == KNOT_EOK && val_deleted == task); (void)ret;
1052 1053
	}
	/* Notify waiting tasks. */
1054
	struct kr_query *leader_qry = array_tail(task->ctx->req.rplan.pending);
1055 1056
	for (size_t i = task->waiting.len; i > 0; i--) {
		struct qr_task *follower = task->waiting.at[i - 1];
1057
		/* Reuse MSGID and 0x20 secret */
1058 1059
		if (follower->ctx->req.rplan.pending.len > 0) {
			struct kr_query *qry = array_tail(follower->ctx->req.rplan.pending);
1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073
			qry->id = leader_qry->id;
			qry->secret = leader_qry->secret;
			leader_qry->secret = 0; /* Next will be already decoded */
		}
		qr_task_step(follower, packet_source, pkt);
		qr_task_unref(follower);
	}
	task->waiting.len = 0;
	task->leading = false;
}

static void subreq_lead(struct qr_task *task)
{
	assert(task);
1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084
	char key[SUBREQ_KEY_LEN];
	const int klen = subreq_key(key, task->pktbuf);
	if (klen < 0)
		return;
	struct qr_task **tvp = (struct qr_task **)
		trie_get_ins(task->ctx->worker->subreq_out, key, klen);
	if (unlikely(!tvp))
		return; /*ENOMEM*/
	if (unlikely(*tvp != NULL)) {
		assert(false);
		return;
1085
	}
1086 1087
	*tvp = task;
	task->leading = true;
1088 1089 1090 1091 1092
}

static bool subreq_enqueue(struct qr_task *task)
{
	assert(task);
1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107
	char key[SUBREQ_KEY_LEN];
	const int klen = subreq_key(key, task->pktbuf);
	if (klen < 0)
		return false;
	struct qr_task **leader = (struct qr_task **)
		trie_get_try(task->ctx->worker->subreq_out, key, klen);
	if (!leader /*ENOMEM*/ || !*leader)
		return false;
	/* Enqueue itself to leader for this subrequest. */
	int ret = array_push_mm((*leader)->waiting, task,
				kr_memreserve, &(*leader)->ctx->req.pool);
	if (unlikely(ret < 0)) /*ENOMEM*/
		return false;
	qr_task_ref(task);
	return true;
1108 1109 1110 1111 1112
}

static int qr_task_finalize(struct qr_task *task, int state)
{
	assert(task && task->leading == false);
1113 1114 1115
	if (task->finished) {
		return 0;
	}
1116 1117
	struct request_ctx *ctx = task->ctx;
	kr_resolve_finish(&ctx->req, state);
1118

1119
	task->finished = true;
1120 1121 1122 1123 1124
	if (ctx->source.session == NULL) {
		(void) qr_task_on_send(task, NULL, kr_error(EIO));
		return state == KR_STATE_DONE ? 0 : kr_error(EIO);
	}

1125 1126 1127
	/* Reference task as the callback handler can close it */
	qr_task_ref(task);

1128
	/* Send back answer */
1129
	struct session *source_session = ctx->source.session;
1130
	assert(!session_flags(source_session)->closing);
1131
	assert(ctx->source.addr.ip.sa_family != AF_UNSPEC);
1132
	int res = qr_task_send(task, source_session,
1133 1134 1135
			       (struct sockaddr *)&ctx->source.addr,
			        ctx->req.answer);
	if (res != kr_ok()) {
1136
		(void) qr_task_on_send(task, NULL, kr_error(EIO));
1137
		/* Since source session is erroneous detach all tasks. */
1138
		while (!session_tasklist_is_empty(source_session)) {
1139
			struct qr_task *t = session_tasklist_del_first(source_session, false);
1140 1141 1142 1143 1144 1145
			struct request_ctx *c = t->ctx;
			assert(c->source.session == source_session);
			c->source.session = NULL;
			/* Don't finalize them as there can be other tasks
			 * waiting for answer to this particular task.
			 * (ie. task->leading is true) */
1146
			worker_task_unref(t);
1147 1148
		}
		session_close(source_session);
1149
	} else if (session_get_handle(source_session)->type == UV_TCP) {
1150 1151
		/* Don't try to close source session at least
		 * retry_interval_for_timeout_timer milliseconds */
1152
		session_timer_restart(source_session);
1153
	}
1154

1155 1156
	qr_task_unref(task);

1157
	return state == KR_STATE_DONE ? 0 : kr_error(EIO);
1158
}
1159

1160 1161
static int qr_task_step(struct qr_task *task,
			const struct sockaddr *packet_source, knot_pkt_t *packet)
1162 1163
{
	/* No more steps after we're finished. */
1164
	if (!task || task->finished) {
1165 1166
		return kr_error(ESTALE);
	}
1167

1168
	/* Close pending I/O requests */
1169
	subreq_finalize(task, packet_source, packet);
1170
	/* Consume input and produce next query */
1171
	struct request_ctx *ctx = task->ctx;
1172
	assert(ctx);
1173 1174
	struct kr_request *req = &ctx->req;
	struct worker_ctx *worker = ctx->worker;
1175
	int sock_type = -1;
1176 1177 1178
	task->addrlist = NULL;
	task->addrlist_count = 0;
	task->addrlist_turn = 0;
1179
	req->has_tls = (ctx->source.session && session_flags(ctx->source.session)->has_tls);
1180 1181 1182 1183

	if (worker->too_many_open) {
		struct kr_rplan *rplan = &req->rplan;
		if (worker->stats.rconcurrent <
1184
			worker->rconcurrent_highwatermark - 10) {
1185
			worker->too_many_open = false