worker.c 65.1 KB
Newer Older
1
/*  Copyright (C) 2014-2017 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
Marek Vavruša's avatar
Marek Vavruša committed
2 3 4 5 6 7 8 9 10 11 12 13

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
14
    along with this program.  If not, see <https://www.gnu.org/licenses/>.
Marek Vavruša's avatar
Marek Vavruša committed
15 16
 */

17
#include <uv.h>
18
#include <lua.h>
19
#include <libknot/packet/pkt.h>
20
#include <libknot/descriptor.h>
21 22
#include <contrib/ucw/lib.h>
#include <contrib/ucw/mempool.h>
23
#include <contrib/wire.h>
Marek Vavruša's avatar
Marek Vavruša committed
24 25 26
#if defined(__GLIBC__) && defined(_GNU_SOURCE)
#include <malloc.h>
#endif
27
#include <assert.h>
28 29
#include <sys/types.h>
#include <unistd.h>
30
#include <gnutls/gnutls.h>
31
#include "lib/utils.h"
32
#include "lib/layer.h"
33
#include "daemon/worker.h"
34
#include "daemon/bindings.h"
35
#include "daemon/engine.h"
36
#include "daemon/io.h"
37
#include "daemon/tls.h"
38

39 40
#define VERBOSE_MSG(qry, fmt...) QRVERBOSE(qry, "wrkr", fmt)

41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
/** Client request state. */
struct request_ctx
{
	struct kr_request req;
	struct {
		union inaddr addr;
		union inaddr dst_addr;
		/* uv_handle_t *handle; */

		/** NULL if the request didn't come over network. */
		struct session *session;
	} source;
	struct worker_ctx *worker;
	qr_tasklist_t tasks;
};

/** Query resolution task. */
struct qr_task
{
	struct request_ctx *ctx;
	knot_pkt_t *pktbuf;
	qr_tasklist_t waiting;
	uv_handle_t *pending[MAX_PENDING];
	uint16_t pending_count;
	uint16_t addrlist_count;
	uint16_t addrlist_turn;
	uint16_t timeouts;
	uint16_t iter_count;
	uint16_t bytes_remaining;
	struct sockaddr *addrlist;
	uint32_t refs;
	bool finished : 1;
	bool leading  : 1;
74 75
};

76

77 78 79 80
/* Convenience macros */
#define qr_task_ref(task) \
	do { ++(task)->refs; } while(0)
#define qr_task_unref(task) \
81
	do { if (task && --(task)->refs == 0) { qr_task_free(task); } } while (0)
82
#define qr_valid_handle(task, checked) \
83 84 85 86 87 88
	(!uv_is_closing((checked)) || (task)->ctx->source.session->handle == (checked))

/** @internal get key for tcp session
 *  @note kr_straddr() return pointer to static string
 */
#define tcpsess_key(addr) kr_straddr(addr)
89 90

/* Forward decls */
91
static void qr_task_free(struct qr_task *task);
92 93 94 95 96 97 98 99 100 101 102 103 104
static int qr_task_step(struct qr_task *task,
			const struct sockaddr *packet_source,
			knot_pkt_t *packet);
static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
			struct sockaddr *addr, knot_pkt_t *pkt);
static int qr_task_finalize(struct qr_task *task, int state);
static void qr_task_complete(struct qr_task *task);
static int worker_add_tcp_connected(struct worker_ctx *worker,
				    const struct sockaddr *addr,
				    struct session *session);
static int worker_del_tcp_connected(struct worker_ctx *worker,
				    const struct sockaddr *addr);
static struct session* worker_find_tcp_connected(struct worker_ctx *worker,
105
						 const struct sockaddr *addr);
106 107 108 109 110 111
static int worker_add_tcp_waiting(struct worker_ctx *worker,
				  const struct sockaddr *addr,
				  struct session *session);
static int worker_del_tcp_waiting(struct worker_ctx *worker,
				  const struct sockaddr *addr);
static struct session* worker_find_tcp_waiting(struct worker_ctx *worker,
112
					       const struct sockaddr *addr);
113 114 115 116 117 118 119 120 121 122
static int session_add_waiting(struct session *session, struct qr_task *task);
static int session_del_waiting(struct session *session, struct qr_task *task);
static int session_add_tasks(struct session *session, struct qr_task *task);
static int session_del_tasks(struct session *session, struct qr_task *task);
static void session_close(struct session *session);
static void on_session_idle_timeout(uv_timer_t *timer);
static int timer_start(struct session *session, uv_timer_cb cb,
		       uint64_t timeout, uint64_t repeat);
static void on_tcp_connect_timeout(uv_timer_t *timer);
static void on_tcp_watchdog_timeout(uv_timer_t *timer);
123 124 125 126 127 128 129

/** @internal Get singleton worker. */
static inline struct worker_ctx *get_worker(void)
{
	return uv_default_loop()->data;
}

130 131 132 133
static inline void *iohandle_borrow(struct worker_ctx *worker)
{
	void *h = NULL;

134
	const size_t size = sizeof(uv_handles_t);
135 136 137 138 139 140 141 142 143 144 145 146
	if (worker->pool_iohandles.len > 0) {
		h = array_tail(worker->pool_iohandles);
		array_pop(worker->pool_iohandles);
		kr_asan_unpoison(h, size);
	} else {
		h = malloc(size);
	}

	return h;
}

static inline void iohandle_release(struct worker_ctx *worker, void *h)
147
{
148 149 150 151
	assert(h);

	if (worker->pool_iohandles.len < MP_FREELIST_SIZE) {
		array_push(worker->pool_iohandles, h);
152
		kr_asan_poison(h, sizeof(uv_handles_t));
153
	} else {
154
		free(h);
155 156 157
	}
}

158
void *worker_iohandle_borrow(struct worker_ctx *worker)
159
{
160 161 162 163 164 165 166 167 168
	return iohandle_borrow(worker);
}

void worker_iohandle_release(struct worker_ctx *worker, void *h)
{
	iohandle_release(worker, h);
}

static inline void *iorequest_borrow(struct worker_ctx *worker)
169
{
170 171
	void *r = NULL;

172
	const size_t size = sizeof(uv_reqs_t);
173 174 175 176
	if (worker->pool_ioreqs.len > 0) {
		r = array_tail(worker->pool_ioreqs);
		array_pop(worker->pool_ioreqs);
		kr_asan_unpoison(r, size);
177
	} else {
178
		r = malloc(size);
179
	}
180 181

	return r;
182 183
}

184
static inline void iorequest_release(struct worker_ctx *worker, void *r)
185
{
186 187 188 189
	assert(r);

	if (worker->pool_ioreqs.len < MP_FREELIST_SIZE) {
		array_push(worker->pool_ioreqs, r);
190
		kr_asan_poison(r, sizeof(uv_reqs_t));
191
	} else {
192
		free(r);
193 194 195
	}
}

196

197 198 199
/*! @internal Create a UDP/TCP handle for an outgoing AF_INET* connection.
 *  socktype is SOCK_* */
static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t family)
200
{
201 202 203
	bool precond = (socktype == SOCK_DGRAM || socktype == SOCK_STREAM)
			&& (family == AF_INET  || family == AF_INET6);
	if (!precond) {
204 205
		/* assert(false); see #245 */
		kr_log_verbose("[work] ioreq_spawn: pre-condition failed\n");
206 207 208
		return NULL;
	}

209 210 211 212
	if (task->pending_count >= MAX_PENDING) {
		return NULL;
	}
	/* Create connection for iterative query */
213
	struct worker_ctx *worker = task->ctx->worker;
214 215
	void *h = iohandle_borrow(worker);
	uv_handle_t *handle = (uv_handle_t *)h;
216 217 218
	if (!handle) {
		return NULL;
	}
219
	io_create(worker->loop, handle, socktype);
220 221 222 223

	/* Bind to outgoing address, according to IP v4/v6. */
	union inaddr *addr;
	if (family == AF_INET) {
224
		addr = (union inaddr *)&worker->out_addr4;
225
	} else {
226
		addr = (union inaddr *)&worker->out_addr6;
227 228 229 230 231
	}
	int ret = 0;
	if (addr->ip.sa_family != AF_UNSPEC) {
		assert(addr->ip.sa_family == family);
		if (socktype == SOCK_DGRAM) {
232 233 234 235 236
			uv_udp_t *udp = (uv_udp_t *)handle;
			ret = uv_udp_bind(udp, &addr->ip, 0);
		} else if (socktype == SOCK_STREAM){
			uv_tcp_t *tcp = (uv_tcp_t *)handle;
			ret = uv_tcp_bind(tcp, &addr->ip, 0);
237 238 239
		}
	}

240 241
	/* Set current handle as a subrequest type. */
	struct session *session = handle->data;
242 243
	if (ret == 0) {
		session->outgoing = true;
244
		ret = session_add_tasks(session, task);
245
	}
246
	if (ret < 0) {
247
		io_deinit(handle);
248
		iohandle_release(worker, h);
249 250 251
		return NULL;
	}
	/* Connect or issue query datagram */
252
	task->pending[task->pending_count] = handle;
253
	task->pending_count += 1;
254
	return handle;
255 256
}

257
static void on_session_close(uv_handle_t *handle)
258
{
259 260
	uv_loop_t *loop = handle->loop;
	struct worker_ctx *worker = loop->data;
261
	struct session *session = handle->data;
262 263
	assert(session->handle == handle);
	session->handle = NULL;
264
	io_deinit(handle);
265
	iohandle_release(worker, handle);
266 267 268 269 270 271
}

static void on_session_timer_close(uv_handle_t *timer)
{
	struct session *session = timer->data;
	uv_handle_t *handle = session->handle;
272 273
	assert(handle && handle->data == session);
	assert (session->outgoing || handle->type == UV_TCP);
274 275 276
	if (!uv_is_closing(handle)) {
		uv_close(handle, on_session_close);
	}
277 278
}

279
static void ioreq_kill_udp(uv_handle_t *req, struct qr_task *task)
280 281
{
	assert(req);
282 283 284 285
	struct session *session = req->data;
	assert(session->outgoing);
	if (session->closing) {
		return;
286
	}
287 288 289 290
	uv_timer_stop(&session->timeout);
	session_del_tasks(session, task);
	assert(session->tasks.len == 0);
	session_close(session);
291 292
}

293
static void ioreq_kill_tcp(uv_handle_t *req, struct qr_task *task)
294
{
295 296 297 298 299 300 301 302 303 304 305 306 307
	assert(req);
	struct session *session = req->data;
	assert(session->outgoing);
	if (session->closing) {
		return;
	}

	session_del_waiting(session, task);
	session_del_tasks(session, task);

	int res = 0;

	if (session->outgoing && session->peer.ip.sa_family != AF_UNSPEC &&
308
	    session->tasks.len == 0 && session->waiting.len == 0 && !session->closing) {
309 310
		assert(session->peer.ip.sa_family == AF_INET ||
		       session->peer.ip.sa_family == AF_INET6);
311 312 313 314 315 316 317 318 319 320
		res = 1;
		if (session->connected) {
			/* This is outbound TCP connection which can be reused.
			* Close it after timeout */
			uv_timer_t *timer = &session->timeout;
			timer->data = session;
			uv_timer_stop(timer);
			res = uv_timer_start(timer, on_session_idle_timeout,
					     KR_CONN_RTT_MAX, 0);
		}
321 322 323 324 325
	}

	if (res != 0) {
		/* if any errors, close the session immediately */
		session_close(session);
326 327 328
	}
}

329
static void ioreq_kill_pending(struct qr_task *task)
330
{
331 332 333 334 335 336 337 338
	for (uint16_t i = 0; i < task->pending_count; ++i) {
		if (task->pending[i]->type == UV_UDP) {
			ioreq_kill_udp(task->pending[i], task);
		} else if (task->pending[i]->type == UV_TCP) {
			ioreq_kill_tcp(task->pending[i], task);
		} else {
			assert(false);
		}
339 340 341 342
	}
	task->pending_count = 0;
}

343 344 345 346 347 348 349 350
static void session_close(struct session *session)
{
	assert(session->tasks.len == 0 && session->waiting.len == 0);

	if (session->closing) {
		return;
	}

Grigorii Demidov's avatar
Grigorii Demidov committed
351
	if (!session->outgoing && session->buffering != NULL) {
352 353
		qr_task_complete(session->buffering);
	}
Grigorii Demidov's avatar
Grigorii Demidov committed
354
	session->buffering = NULL;
355

356 357
	uv_handle_t *handle = session->handle;
	io_stop_read(handle);
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432
	session->closing = true;
	if (session->outgoing &&
	    session->peer.ip.sa_family != AF_UNSPEC) {
		struct worker_ctx *worker = get_worker();
		struct sockaddr *peer = &session->peer.ip;
		worker_del_tcp_connected(worker, peer);
		session->connected = false;
	}

	if (!uv_is_closing((uv_handle_t *)&session->timeout)) {
		uv_timer_stop(&session->timeout);
		if (session->tls_client_ctx) {
			tls_client_close(session->tls_client_ctx);
		}
		session->timeout.data = session;
		uv_close((uv_handle_t *)&session->timeout, on_session_timer_close);
	}
}

static int session_add_waiting(struct session *session, struct qr_task *task)
{
	for (int i = 0; i < session->waiting.len; ++i) {
		if (session->waiting.at[i] == task) {
			return i;
		}
	}
	int ret = array_push(session->waiting, task);
	if (ret >= 0) {
		qr_task_ref(task);
	}
	return ret;
}

static int session_del_waiting(struct session *session, struct qr_task *task)
{
	int ret = kr_error(ENOENT);
	for (int i = 0; i < session->waiting.len; ++i) {
		if (session->waiting.at[i] == task) {
			array_del(session->waiting, i);
			qr_task_unref(task);
			ret = kr_ok();
			break;
		}
	}
	return ret;
}

static int session_add_tasks(struct session *session, struct qr_task *task)
{
	for (int i = 0; i < session->tasks.len; ++i) {
		if (session->tasks.at[i] == task) {
			return i;
		}
	}
	int ret = array_push(session->tasks, task);
	if (ret >= 0) {
		qr_task_ref(task);
	}
	return ret;
}

static int session_del_tasks(struct session *session, struct qr_task *task)
{
	int ret = kr_error(ENOENT);
	for (int i = 0; i < session->tasks.len; ++i) {
		if (session->tasks.at[i] == task) {
			array_del(session->tasks, i);
			qr_task_unref(task);
			ret = kr_ok();
			break;
		}
	}
	return ret;
}

433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456
/** @cond This memory layout is internal to mempool.c, use only for debugging. */
#if defined(__SANITIZE_ADDRESS__)
struct mempool_chunk {
  struct mempool_chunk *next;
  size_t size;
};
static void mp_poison(struct mempool *mp, bool poison)
{
	if (!poison) { /* @note mempool is part of the first chunk, unpoison it first */
		kr_asan_unpoison(mp, sizeof(*mp));
	}
	struct mempool_chunk *chunk = mp->state.last[0];
	void *chunk_off = (void *)chunk - chunk->size;
	if (poison) {
		kr_asan_poison(chunk_off, chunk->size);
	} else {
		kr_asan_unpoison(chunk_off, chunk->size);
	}
}
#else
#define mp_poison(mp, enable)
#endif
/** @endcond */

457
/** Get a mempool.  (Recycle if possible.)  */
458
static inline struct mempool *pool_borrow(struct worker_ctx *worker)
459 460
{
	struct mempool *mp = NULL;
461 462 463 464
	if (worker->pool_mp.len > 0) {
		mp = array_tail(worker->pool_mp);
		array_pop(worker->pool_mp);
		mp_poison(mp, 0);
465 466 467 468 469 470
	} else { /* No mempool on the freelist, create new one */
		mp = mp_new (4 * CPU_PAGE_SIZE);
	}
	return mp;
}

471
/** Return a mempool.  (Cache them up to some count.) */
472 473
static inline void pool_release(struct worker_ctx *worker, struct mempool *mp)
{
474
	if (worker->pool_mp.len < MP_FREELIST_SIZE) {
475
		mp_flush(mp);
476
		array_push(worker->pool_mp, mp);
477
		mp_poison(mp, 1);
478 479 480 481 482
	} else {
		mp_delete(mp);
	}
}

483 484 485 486 487 488 489
/** @internal Get key from current outgoing subrequest. */
static int subreq_key(char *dst, knot_pkt_t *pkt)
{
	assert(pkt);
	return kr_rrkey(dst, knot_pkt_qname(pkt), knot_pkt_qtype(pkt), knot_pkt_qclass(pkt));
}

490 491 492 493 494 495 496 497
/** Create and initialize a request_ctx (on a fresh mempool).
 *
 * handle and addr point to the source of the request, and they are NULL
 * in case the request didn't come from network.
 */
static struct request_ctx *request_create(struct worker_ctx *worker,
					  uv_handle_t *handle,
					  const struct sockaddr *addr)
498
{
499
	knot_mm_t pool = {
500
		.ctx = pool_borrow(worker),
501
		.alloc = (knot_mm_alloc_t) mp_alloc
502
	};
503

504 505 506 507
	/* Create request context */
	struct request_ctx *ctx = mm_alloc(&pool, sizeof(*ctx));
	if (!ctx) {
		pool_release(worker, pool.ctx);
508 509
		return NULL;
	}
510

511 512 513 514 515
	memset(ctx, 0, sizeof(*ctx));

	/* TODO Relocate pool to struct request */
	ctx->worker = worker;
	array_init(ctx->tasks);
516 517 518
	struct session *session = handle ? handle->data : NULL;
	if (session) {
		assert(session->outgoing == false);
519
	}
520
	ctx->source.session = session;
521 522 523 524

	struct kr_request *req = &ctx->req;
	req->pool = pool;

525
	/* Remember query source addr */
526 527 528
	if (!addr || (addr->sa_family != AF_INET && addr->sa_family != AF_INET6)) {
		ctx->source.addr.ip.sa_family = AF_UNSPEC;
	} else {
529 530 531
		size_t addr_len = sizeof(struct sockaddr_in);
		if (addr->sa_family == AF_INET6)
			addr_len = sizeof(struct sockaddr_in6);
532 533
		memcpy(&ctx->source.addr.ip, addr, addr_len);
		ctx->req.qsource.addr = &ctx->source.addr.ip;
534
	}
535 536 537 538 539

	worker->stats.rconcurrent += 1;

	if (!handle) {
		return ctx;
540
	}
541

542
	/* Remember the destination address. */
543 544 545 546 547 548
	int addr_len = sizeof(ctx->source.dst_addr);
	struct sockaddr *dst_addr = &ctx->source.dst_addr.ip;
	ctx->source.dst_addr.ip.sa_family = AF_UNSPEC;
	if (handle->type == UV_UDP) {
		if (uv_udp_getsockname((uv_udp_t *)handle, dst_addr, &addr_len) == 0) {
			req->qsource.dst_addr = dst_addr;
549
		}
550 551 552 553
		req->qsource.tcp = false;
	} else if (handle->type == UV_TCP) {
		if (uv_tcp_getsockname((uv_tcp_t *)handle, dst_addr, &addr_len) == 0) {
			req->qsource.dst_addr = dst_addr;
554
		}
555
		req->qsource.tcp = true;
556
	}
557 558

	return ctx;
559 560
}

561 562
/** More initialization, related to the particular incoming query/packet. */
static int request_start(struct request_ctx *ctx, knot_pkt_t *query)
563
{
564 565 566 567 568 569 570 571 572 573 574
	assert(query && ctx);
	size_t answer_max = KNOT_WIRE_MIN_PKTSIZE;
	struct kr_request *req = &ctx->req;

	/* source.session can be empty if request was generated by kresd itself */
	if (!ctx->source.session ||
	     ctx->source.session->handle->type == UV_TCP) {
		answer_max = KNOT_WIRE_MAX_PKTSIZE;
	} else if (knot_pkt_has_edns(query)) { /* EDNS */
		answer_max = MAX(knot_edns_get_payload(query->opt_rr),
				 KNOT_WIRE_MIN_PKTSIZE);
575
	}
576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606
	req->qsource.size = query->size;

	req->answer = knot_pkt_new(NULL, answer_max, &req->pool);
	if (!req->answer) {
		return kr_error(ENOMEM);
	}

	/* Remember query source TSIG key */
	if (query->tsig_rr) {
		req->qsource.key = knot_rrset_copy(query->tsig_rr, &req->pool);
	}

	/* Remember query source EDNS data */
	if (query->opt_rr) {
		req->qsource.opt = knot_rrset_copy(query->opt_rr, &req->pool);
	}
	/* Start resolution */
	struct worker_ctx *worker = ctx->worker;
	struct engine *engine = worker->engine;
	kr_resolve_begin(req, &engine->resolver, req->answer);
	worker->stats.queries += 1;
	/* Throttle outbound queries only when high pressure */
	if (worker->stats.concurrent < QUERY_RATE_THRESHOLD) {
		req->options.NO_THROTTLE = true;
	}
	return kr_ok();
}

static void request_free(struct request_ctx *ctx)
{
	struct worker_ctx *worker = ctx->worker;
607
	/* Return mempool to ring or free it if it's full */
608
	pool_release(worker, ctx->req.pool.ctx);
609
	/* @note The 'task' is invalidated from now on. */
Marek Vavruša's avatar
Marek Vavruša committed
610
	/* Decommit memory every once in a while */
611
	static int mp_delete_count = 0;
612 613 614
	if (++mp_delete_count == 100000) {
		lua_gc(worker->engine->L, LUA_GCCOLLECT, 0);
#if defined(__GLIBC__) && defined(_GNU_SOURCE)
Marek Vavruša's avatar
Marek Vavruša committed
615
		malloc_trim(0);
616
#endif
Marek Vavruša's avatar
Marek Vavruša committed
617
		mp_delete_count = 0;
618
	}
619
	worker->stats.rconcurrent -= 1;
620
}
621

622
static int request_add_tasks(struct request_ctx *ctx, struct qr_task *task)
623
{
624 625 626 627 628 629 630 631
	for (int i = 0; i < ctx->tasks.len; ++i) {
		if (ctx->tasks.at[i] == task) {
			return i;
		}
	}
	int ret = array_push(ctx->tasks, task);
	if (ret >= 0) {
		qr_task_ref(task);
632
	}
633 634
	return ret;
}
635

636 637 638 639 640 641 642 643 644 645
static int request_del_tasks(struct request_ctx *ctx, struct qr_task *task)
{
	int ret = kr_error(ENOENT);
	for (int i = 0; i < ctx->tasks.len; ++i) {
		if (ctx->tasks.at[i] == task) {
			array_del(ctx->tasks, i);
			qr_task_unref(task);
			ret = kr_ok();
			break;
		}
646
	}
647 648
	return ret;
}
649

650 651 652 653 654 655 656 657 658

static struct qr_task *qr_task_create(struct request_ctx *ctx)
{
	/* How much can client handle? */
	struct engine *engine = ctx->worker->engine;
	size_t pktbuf_max = KR_EDNS_PAYLOAD;
	if (engine->resolver.opt_rr) {
		pktbuf_max = MAX(knot_edns_get_payload(engine->resolver.opt_rr),
				 pktbuf_max);
659 660
	}

661 662 663 664
	/* Create resolution task */
	struct qr_task *task = mm_alloc(&ctx->req.pool, sizeof(*task));
	if (!task) {
		return NULL;
665
	}
666
	memset(task, 0, sizeof(*task)); /* avoid accidentally unitialized fields */
667

668 669 670 671 672
	/* Create packet buffers for answer and subrequests */
	knot_pkt_t *pktbuf = knot_pkt_new(NULL, pktbuf_max, &ctx->req.pool);
	if (!pktbuf) {
		mm_free(&ctx->req.pool, task);
		return NULL;
673
	}
674
	pktbuf->size = 0;
675

676 677 678 679 680 681 682 683 684
	task->ctx = ctx;
	task->pktbuf = pktbuf;
	array_init(task->waiting);
	task->refs = 0;
	int ret = request_add_tasks(ctx, task);
	if (ret < 0) {
		mm_free(&ctx->req.pool, task);
		mm_free(&ctx->req.pool, pktbuf);
		return NULL;
685
	}
686 687
	ctx->worker->stats.concurrent += 1;
	return task;
688 689
}

690 691 692 693 694 695 696 697 698 699 700 701
/* This is called when the task refcount is zero, free memory. */
static void qr_task_free(struct qr_task *task)
{
	struct request_ctx *ctx = task->ctx;

	assert(ctx);

	/* Process outbound session. */
	struct session *source_session = ctx->source.session;
	struct worker_ctx *worker = ctx->worker;

	/* Process source session. */
Grigorii Demidov's avatar
Grigorii Demidov committed
702 703 704 705
	if (source_session &&
	    source_session->tasks.len < worker->tcp_pipeline_max/2 &&
	    !source_session->closing && source_session->throttled) {
		uv_handle_t *handle = source_session->handle;
706 707
		/* Start reading again if the session is throttled and
		 * the number of outgoing requests is below watermark. */
Grigorii Demidov's avatar
Grigorii Demidov committed
708 709 710
		if (handle) {
			io_start_read(handle);
			source_session->throttled = false;
711 712 713 714 715 716 717 718 719 720 721 722 723
		}
	}

	if (ctx->tasks.len == 0) {
		array_clear(ctx->tasks);
		request_free(ctx);
	}

	/* Update stats */
	worker->stats.concurrent -= 1;
}

/*@ Register new qr_task within session. */
724 725
static int qr_task_register(struct qr_task *task, struct session *session)
{
726
	assert(session->outgoing == false && session->handle->type == UV_TCP);
727

728 729 730 731
	int ret = array_reserve(session->tasks, session->tasks.len + 1);
	if (ret != 0) {
		return kr_error(ENOMEM);
	}
732 733 734 735 736 737

	session_add_tasks(session, task);

	struct request_ctx *ctx = task->ctx;
	assert(ctx && (ctx->source.session == NULL || ctx->source.session == session));
	ctx->source.session = session;
738 739 740 741 742
	/* Soft-limit on parallel queries, there is no "slow down" RCODE
	 * that we could use to signalize to client, but we can stop reading,
	 * an in effect shrink TCP window size. To get more precise throttling,
	 * we would need to copy remainder of the unread buffer and reassemble
	 * when resuming reading. This is NYI.  */
743 744
	if (session->tasks.len >= task->ctx->worker->tcp_pipeline_max) {
		uv_handle_t *handle = session->handle;
Grigorii Demidov's avatar
Grigorii Demidov committed
745
		if (handle && !session->throttled && !session->closing) {
746 747 748 749
			io_stop_read(handle);
			session->throttled = true;
		}
	}
750

751 752 753 754
	return 0;
}

static void qr_task_complete(struct qr_task *task)
755
{
756
	struct request_ctx *ctx = task->ctx;
757

758
	/* Kill pending I/O requests */
759
	ioreq_kill_pending(task);
760 761
	assert(task->waiting.len == 0);
	assert(task->leading == false);
762

Grigorii Demidov's avatar
Grigorii Demidov committed
763 764 765 766 767 768
	struct session *source_session = ctx->source.session;
	if (source_session) {
		assert(source_session->outgoing == false &&
		       source_session->waiting.len == 0);
		session_del_tasks(source_session, task);
	}
769

770
	/* Release primary reference to task. */
771
	request_del_tasks(ctx, task);
772 773
}

774
/* This is called when we send subrequest / answer */
775
static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
776
{
777 778 779 780 781 782 783
	if (task->finished) {
		assert(task->leading == false);
		qr_task_complete(task);
		if (!handle || handle->type != UV_TCP) {
			return status;
		}
		struct session* session = handle->data;
Grigorii Demidov's avatar
Grigorii Demidov committed
784
		assert(session);
785 786 787 788 789 790
		if (!session->outgoing ||
		    session->waiting.len == 0) {
			return status;
		}
	}

Grigorii Demidov's avatar
Grigorii Demidov committed
791
	if (handle) {
792
		struct session* session = handle->data;
Grigorii Demidov's avatar
Grigorii Demidov committed
793 794 795
		if (!session->outgoing && task->ctx->source.session) {
			assert (task->ctx->source.session->handle == handle);
		}
796 797 798
		if (handle->type == UV_TCP && session->outgoing &&
		    session->waiting.len > 0) {
			session_del_waiting(session, task);
Grigorii Demidov's avatar
Grigorii Demidov committed
799 800 801
			if (session->closing) {
				return status;
			}
802 803 804 805 806
			/* Finalize the task, if any errors.
			 * We can't add it to the end of waiting list for retrying
			 * since it may lead endless loop in some circumstances
			 * (for instance: tls; send->tls_push->too many non-critical errors->
			 * on_send with nonzero status->re-add to waiting->send->etc).*/
Grigorii Demidov's avatar
Grigorii Demidov committed
807
			if (status != 0) {
808 809 810 811 812 813 814
				if (session->outgoing) {
					qr_task_finalize(task, KR_STATE_FAIL);
				} else {
					assert(task->ctx->source.session == session);
					task->ctx->source.session = NULL;
				}
				session_del_tasks(session, task);
815
			}
816 817
			if (session->waiting.len > 0) {
				struct qr_task *t = session->waiting.at[0];
818
				int ret = qr_task_send(t, handle, &session->peer.ip, t->pktbuf);
Grigorii Demidov's avatar
Grigorii Demidov committed
819 820 821 822 823 824
				if (ret == kr_ok()) {
					uv_timer_t *timer = &session->timeout;
					uv_timer_stop(timer);
					session->timeout.data = session;
					timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
				} else {
825 826
					uv_timer_t *timer = &session->timeout;
					uv_timer_stop(timer);
827
					while (session->waiting.len > 0) {
Grigorii Demidov's avatar
Grigorii Demidov committed
828
						struct qr_task *t = session->waiting.at[0];
829
						if (session->outgoing) {
Grigorii Demidov's avatar
Grigorii Demidov committed
830
							qr_task_finalize(t, KR_STATE_FAIL);
831
						} else {
Grigorii Demidov's avatar
Grigorii Demidov committed
832 833
							assert(t->ctx->source.session == session);
							t->ctx->source.session = NULL;
834
						}
835
						array_del(session->waiting, 0);
Grigorii Demidov's avatar
Grigorii Demidov committed
836 837
						session_del_tasks(session, t);
						qr_task_unref(t);
838 839
					}
					while (session->tasks.len > 0) {
Grigorii Demidov's avatar
Grigorii Demidov committed
840
						struct qr_task *t = session->tasks.at[0];
841
						if (session->outgoing) {
Grigorii Demidov's avatar
Grigorii Demidov committed
842
							qr_task_finalize(t, KR_STATE_FAIL);
843
						} else {
Grigorii Demidov's avatar
Grigorii Demidov committed
844 845
							assert(t->ctx->source.session == session);
							t->ctx->source.session = NULL;
846
						}
Grigorii Demidov's avatar
Grigorii Demidov committed
847
						session_del_tasks(session, t);
848 849 850 851
					}
					session_close(session);
					return status;
				}
852
			}
853
		}
Grigorii Demidov's avatar
Grigorii Demidov committed
854
		if (!session->closing) {
Daniel Kahn Gillmor's avatar
Daniel Kahn Gillmor committed
855
			io_start_read(handle); /* Start reading new query */
856 857
		}
	}
858
	return status;
859 860
}

861 862
static void on_send(uv_udp_send_t *req, int status)
{
863 864 865 866
	uv_handle_t *handle = (uv_handle_t *)(req->handle);
	uv_loop_t *loop = handle->loop;
	struct worker_ctx *worker = loop->data;
	assert(worker == get_worker());
867
	struct qr_task *task = req->data;
Grigorii Demidov's avatar
Grigorii Demidov committed
868
	qr_task_on_send(task, handle, status);
869
	qr_task_unref(task);
870
	iorequest_release(worker, req);
871 872 873 874
}

static void on_write(uv_write_t *req, int status)
{
875 876 877 878
	uv_handle_t *handle = (uv_handle_t *)(req->handle);
	uv_loop_t *loop = handle->loop;
	struct worker_ctx *worker = loop->data;
	assert(worker == get_worker());
879
	struct qr_task *task = req->data;
Grigorii Demidov's avatar
Grigorii Demidov committed
880
	qr_task_on_send(task, handle, status);
881
	qr_task_unref(task);
882
	iorequest_release(worker, req);
883 884
}

885 886
static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt)
{
887
	if (!handle) {
888
		return qr_task_on_send(task, handle, kr_error(EIO));
889
	}
890 891 892

	/* Synchronous push to TLS context, bypassing event loop. */
	struct session *session = handle->data;
893
	assert(session->closing == false);
894
	if (session->has_tls) {
895 896 897 898 899 900 901 902 903 904 905 906
		struct kr_request *req = &task->ctx->req;
		int ret = kr_ok();
		if (!session->outgoing) {
			ret = tls_push(task, handle, pkt);
		} else {
			ret = kr_resolve_checkout(req, NULL, addr,
					          SOCK_STREAM, pkt);
			if (ret != kr_ok()) {
				return ret;
			}
			ret = tls_client_push(task, handle, pkt);
		}
907
		return qr_task_on_send(task, handle, ret);
908
	}
909

910
	int ret = 0;
911 912 913
	struct request_ctx *ctx = task->ctx;
	struct worker_ctx *worker = ctx->worker;
	struct kr_request *req = &ctx->req;
914 915
	void *ioreq = iorequest_borrow(worker);
	if (!ioreq) {
916 917
		return qr_task_on_send(task, handle, kr_error(ENOMEM));
	}
918
	if (knot_wire_get_qr(pkt->wire) == 0) {
919 920 921 922 923 924 925 926 927 928 929 930
		/*
		 * Query must be finalised using destination address before
		 * sending.
		 *
		 * Libuv does not offer a convenient way how to obtain a source
		 * IP address from a UDP handle that has been initialised using
		 * uv_udp_init(). The uv_udp_getsockname() fails because of the
		 * lazy socket initialisation.
		 *
		 * @note -- A solution might be opening a separate socket and
		 * trying to obtain the IP address from it.
		 */
931
		ret = kr_resolve_checkout(req, NULL, addr,
932 933
		                          handle->type == UV_UDP ? SOCK_DGRAM : SOCK_STREAM,
		                          pkt);
934
		if (ret != 0) {
935
			iorequest_release(worker, ioreq);
936
			return ret;
937
		}
938 939 940
	}
	/* Send using given protocol */
	if (handle->type == UV_UDP) {
941
		uv_udp_send_t *send_req = (uv_udp_send_t *)ioreq;
942
		uv_buf_t buf = { (char *)pkt->wire, pkt->size };
943 944 945 946
		send_req->data = task;
		ret = uv_udp_send(send_req, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
	} else if (handle->type == UV_TCP) {
		uv_write_t *write_req = (uv_write_t *)ioreq;
947 948 949 950 951
		uint16_t pkt_size = htons(pkt->size);
		uv_buf_t buf[2] = {
			{ (char *)&pkt_size, sizeof(pkt_size) },
			{ (char *)pkt->wire, pkt->size }
		};
952 953 954 955
		write_req->data = task;
		ret = uv_write(write_req, (uv_stream_t *)handle, buf, 2, &on_write);
	} else {
		assert(false);
956
	}
957

958
	if (ret == 0) {
959
		qr_task_ref(task); /* Pending ioreq on current task */
960 961
		if (worker->too_many_open &&
		    worker->stats.rconcurrent <
962
			worker->rconcurrent_highwatermark - 10) {
963 964
			worker->too_many_open = false;
		}
965
	} else {
966
		iorequest_release(worker, ioreq);
967 968 969 970
		if (ret == UV_EMFILE) {
			worker->too_many_open = true;
			worker->rconcurrent_highwatermark = worker->stats.rconcurrent;
		}
971
	}
972

973
	/* Update statistics */
974 975 976
	if (ctx->source.session &&
	    handle != ctx->source.session->handle &&
	    addr) {
977
		if (handle->type == UV_UDP)
978
			worker->stats.udp += 1;
979
		else
980
			worker->stats.tcp += 1;
981
		if (addr->sa_family == AF_INET6)
982
			worker->stats.ipv6 += 1;
983
		else if (addr->sa_family == AF_INET)
984
			worker->stats.ipv4 += 1;
985
	}
986
	return ret;
987 988
}

989 990 991 992 993 994 995 996
static int session_next_waiting_send(struct session *session)
{
	union inaddr *peer = &session->peer;
	int ret = kr_ok();
	if (session->waiting.len > 0) {
		struct qr_task *task = session->waiting.at[0];
		ret = qr_task_send(task, session->handle, &peer->ip, task->pktbuf);
	}
Grigorii Demidov's avatar
Grigorii Demidov committed
997 998
	session->timeout.data = session;
	timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
999 1000 1001 1002 1003 1004 1005
	return ret;
}

static int session_tls_hs_cb(struct session *session, int status)
{
	VERBOSE_MSG(NULL, "=> server: '%s' TLS handshake has %s\n",
		    kr_straddr(&session->peer.ip), status ? "failed" : "completed");
1006 1007 1008 1009 1010

	struct worker_ctx *worker = get_worker();
	union inaddr *peer = &session->peer;
	int deletion_res = worker_del_tcp_waiting(worker, &peer->ip);

1011 1012 1013 1014 1015 1016 1017 1018
	if (status) {
		for (size_t i = 0; i < session->waiting.len; ++i) {
			struct qr_task *task = session->waiting.at[0];
			struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
			kr_nsrep_update_rtt(&qry->ns, &peer->ip, KR_NS_TIMEOUT,
					    worker->engine->resolver.cache_rtt, KR_NS_UPDATE);
		}
	} else {
1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033
		if (deletion_res != 0) {
			/* session isn't in list of waiting queries, *
			 * something gone wrong */
			while (session->waiting.len > 0) {
				struct qr_task *task = session->waiting.at[0];
				session_del_tasks(session, task);
				array_del(session->waiting, 0);
				qr_task_finalize(task, KR_STATE_FAIL);
				qr_task_unref(task);
			}
			assert(session->tasks.len == 0);
			session_close(session);
			return kr_ok();
		}

1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044
		int ret = session_next_waiting_send(session);
		if (ret == kr_ok()) {
			struct worker_ctx *worker = get_worker();
			union inaddr *peer = &session->peer;
			int ret = worker_add_tcp_connected(worker, &peer->ip, session);
			assert(ret == 0);
		}
	}
	return kr_ok();
}

1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058
static struct kr_query *session_current_query(struct session *session)
{
	if (session->waiting.len == 0) {
		return NULL;
	}

	struct qr_task *task = session->waiting.at[0];
	if (task->ctx->req.rplan.pending.len == 0) {
		return NULL;
	}

	return array_tail(task->ctx->req.rplan.pending);
}

1059
static void on_connect(uv_connect_t *req, int status)
1060
{
1061
	struct worker_ctx *worker = get_worker();
1062
	uv_stream_t *handle = req->handle;
1063 1064 1065
	struct session *session = handle->data;

	union inaddr *peer = &session->peer;
1066
	uv_timer_stop(&session->timeout);
1067 1068 1069 1070

	if (status == UV_ECANCELED) {
		worker_del_tcp_waiting(worker, &peer->ip);
		assert(session->closing && session->waiting.len == 0 && session->tasks.len == 0);
1071
		iorequest_release(worker, req);
1072 1073 1074 1075 1076 1077
		return;
	}

	if (session->closing) {
		worker_del_tcp_waiting(worker, &peer->ip);
		assert(session->waiting.len == 0 && session->tasks.len == 0);
1078
		iorequest_release(worker, req);
1079 1080 1081 1082
		return;
	}

	if (status != 0) {
1083
		worker_del_tcp_waiting(worker, &peer->ip);
1084 1085 1086 1087
		while (session->waiting.len > 0) {
			struct qr_task *task = session->waiting.at[0];
			session_del_tasks(session, task);
			array_del(session->waiting, 0);
1088
			assert(task->refs > 1);
1089
			qr_task_unref(task);
1090
			qr_task_step(task, NULL, NULL);
1091 1092
		}
		assert(session->tasks.len == 0);
1093
		iorequest_release(worker, req);
1094 1095 1096 1097
		session_close(session);
		return;
	}

1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117
	if (!session->has_tls) {
		/* if there is a TLS, session still waiting for handshake,
		 * otherwise remove it from waiting list */
		if (worker_del_tcp_waiting(worker, &peer->ip) != 0) {
			/* session isn't in list of waiting queries, *
			 * something gone wrong */
			while (session->waiting.len > 0) {
				struct qr_task *task = session->waiting.at[0];
				session_del_tasks(session, task);
				array_del(session->waiting, 0);
				qr_task_finalize(task, KR_STATE_FAIL);
				qr_task_unref(task);
			}
			assert(session->tasks.len == 0);
			iorequest_release(worker, req);
			session_close(session);
			return;
		}
	}

1118 1119
	struct kr_query *qry = session_current_query(session);
	WITH_VERBOSE (qry) {
1120 1121 1122
		char addr_str[INET6_ADDRSTRLEN];
		inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip),
			  addr_str, sizeof(addr_str));
1123
		VERBOSE_MSG(qry, "=> connected to '%s'\n", addr_str);
1124 1125
	}

1126 1127 1128 1129 1130 1131 1132 1133
	session->connected = true;
	session->handle = (uv_handle_t *)handle;

	int ret = kr_ok();
	if (session->has_tls) {
		ret = tls_client_connect_start(session->tls_client_ctx,
					       session, session_tls_hs_cb);
		if (ret == kr_error(EAGAIN)) {
1134
			iorequest_release(worker, req);
1135
			io_start_read(session->handle);
1136
			timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
1137 1138 1139 1140 1141 1142 1143 1144
			return;
		}
	}

	if (ret == kr_ok()) {
		ret = session_next_waiting_send(session);
		if (ret == kr_ok()) {
			worker_add_tcp_connected(worker, &session->peer.ip, session);
1145
			iorequest_release(worker, req);
1146 1147
			return;
		}
1148
	}
1149

1150 1151 1152 1153 1154 1155
	while (session->waiting.len > 0) {
		struct qr_task *task = session->waiting.at[0];
		session_del_tasks(session, task);
		array_del(session->waiting, 0);
		qr_task_finalize(task, KR_STATE_FAIL);
		qr_task_unref(task);
1156
	}
1157 1158 1159

	assert(session->tasks.len == 0);

1160
	iorequest_release(worker, req);
1161
	session_close(session);
1162 1163
}

1164
static void on_tcp_connect_timeout(uv_timer_t *timer)
1165
{
1166 1167 1168
	struct session *session = timer->data;

	uv_timer_stop(timer);
1169
	struct worker_ctx *worker = get_worker();
1170 1171 1172

	assert (session->waiting.len == session->tasks.len);

1173 1174 1175
	union inaddr *peer = &session->peer;
	worker_del_tcp_waiting(worker, &peer->ip);

1176 1177
	struct kr_query *qry = session_current_query(session);
	WITH_VERBOSE (qry) {
1178 1179
		char addr_str[INET6_ADDRSTRLEN];
		inet_ntop(peer->ip.sa_family, kr_inaddr(&peer->ip), addr_str, sizeof(addr_str));
1180
		VERBOSE_MSG(qry, "=> connection to '%s' failed\n", addr_str);
1181
	}
1182

1183 1184 1185
	while (session->waiting.len > 0) {
		struct qr_task *task = session->waiting.at[0];
		struct request_ctx *ctx = task->ctx;
1186
		assert(ctx);
1187 1188 1189 1190
		task->timeouts += 1;
		worker->stats.timeout += 1;
		session_del_tasks(session, task);
		array_del(session->waiting, 0);
1191
		assert(task->refs > 1);
1192
		qr_task_unref(task);
1193
		qr_task_step(task, NULL, NULL);
1194 1195 1196 1197
	}

	assert (session->tasks.len == 0);
	session_close(session);
1198 1199
}

1200
static void on_tcp_watchdog_timeout(uv_timer_t *timer)
1201
{
1202 1203 1204 1205 1206 1207
	struct session *session = timer->data;

	assert(session->outgoing);
	uv_timer_stop(timer);
	struct worker_ctx *worker = get_worker();

Grigorii Demidov's avatar
Grigorii Demidov committed
1208
	if (session->outgoing) {
1209 1210 1211
		if (session->has_tls) {
			worker_del_tcp_waiting(worker, &session->peer.ip);
		}
Grigorii Demidov's avatar
Grigorii Demidov committed
1212
		worker_del_tcp_connected(worker, &session->peer.ip);
1213

Grigorii Demidov's avatar
Grigorii Demidov committed
1214 1215 1216 1217 1218 1219 1220 1221 1222
		while (session->waiting.len > 0) {
			struct qr_task *task = session->waiting.at[0];
			task->timeouts += 1;
			worker->stats.timeout += 1;
			array_del(session->waiting, 0);
			session_del_tasks(session, task);
			qr_task_finalize(task, KR_STATE_FAIL);
			qr_task_unref(task);
		}
1223 1224 1225 1226 1227 1228 1229 1230 1231
	}

	while (session->tasks.len > 0) {
		struct qr_task *task = session->tasks.at[0];
		task->timeouts += 1;
		worker->stats.timeout += 1;
		assert(task->refs > 1);
		array_del(session->tasks, 0);
		qr_task_finalize(task, KR_STATE_FAIL);
Grigorii Demidov's avatar
Grigorii Demidov committed
1232
		qr_task_unref(task);
1233 1234 1235
	}

	session_close(session);
1236 1237 1238
}

/* This is called when I/O timeouts */
1239
static void on_udp_timeout(uv_timer_t *timer)
1240
{
1241 1242
	struct session *session = timer->data;

1243 1244 1245 1246
	uv_handle_t *handle = session->handle;
	assert(handle->data == session);

	uv_timer_stop(timer);
1247 1248
	assert(session->tasks.len == 1);
	assert(session->waiting.len == 0);
1249 1250

	/* Penalize all tried nameservers with a timeout. */
1251 1252
	struct qr_task *task = session->tasks.at[0];
	struct worker_ctx *worker = task->ctx->worker;
1253
	if (task->leading && task->pending_count > 0) {
1254
		struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
1255 1256 1257
		struct sockaddr_in6 *addrlist = (struct sockaddr_in6 *)task->addrlist;
		for (uint16_t i = 0; i < MIN(task->pending_count, task->addrlist_count); ++i) {
			struct sockaddr *choice = (struct sockaddr *)(&addrlist[i]);
1258
			WITH_VERBOSE(qry) {
1259 1260
				char addr_str[INET6_ADDRSTRLEN];
				inet_ntop(choice->sa_family, kr_inaddr(choice), addr_str, sizeof(addr_str));
1261
				VERBOSE_MSG(qry, "=> server: '%s' flagged as 'bad'\n", addr_str);
1262
			}
1263 1264
			kr_nsrep_update_rtt(&qry->ns, choice, KR_NS_TIMEOUT,
					    worker->engine->resolver.cache_rtt, KR_NS_UPDATE);
1265 1266 1267 1268 1269
		}
	}
	task->timeouts += 1;
	worker->stats.timeout += 1;
	qr_task_step(task, NULL, NULL);
1270 1271
}

1272 1273 1274
static void on_session_idle_timeout(uv_timer_t *timer)
{
	struct session *s = timer->data;
Grigorii Demidov's avatar
Grigorii Demidov committed
1275
	assert(s);
1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287
	uv_timer_stop(timer);
	if (s->closing) {
		return;
	}
	/* session was not in use during timer timeout
	 * remove it from connection list and close
	 */
	assert(s->tasks.len == 0 && s->waiting.len == 0);
	session_close(s);
}

static uv_handle_t *retransmit(struct qr_task *task)
1288
{
1289
	uv_handle_t *ret = NULL;
1290
	if (task && task->addrlist && task->addrlist_count > 0) {
1291
		struct sockaddr_in6 *choice = &((struct sockaddr_in6 *)task->addrlist)[task->addrlist_turn];
Grigorii Demidov's avatar
Grigorii Demidov committed
1292 1293 1294
		if (!choice) {
			return ret;
		}
1295 1296 1297 1298 1299 1300
		ret = ioreq_spawn(task, SOCK_DGRAM, choice->sin6_family);
		if (ret &&
		    qr_task_send(task, ret, (struct sockaddr *)choice,
				 task->pktbuf) == 0) {
			task->addrlist_turn = (task->addrlist_turn + 1) %
					      task->addrlist_count; /* Round robin */
1301 1302
		}
	}
1303
	return ret;
1304 1305 1306 1307
}

static void on_retransmit(uv_timer_t *req)
{
1308 1309
	struct session *session = req->data;
	assert(session->tasks.len == 1);
1310 1311

	uv_timer_stop(req);
1312 1313
	struct qr_task *task = session->tasks.at[0];
	if (retransmit(task) == NULL) {
1314 1315
		/* Not possible to spawn request, start timeout timer with remaining deadline. */
		uint64_t timeout = KR_CONN_RTT_MAX - task->pending_count * KR_CONN_RETRY;
1316
		uv_timer_start(req, on_udp_timeout, timeout, 0);
Marek Vavrusa's avatar