worker.h 5.69 KB
Newer Older
1
/*  Copyright (C) 2014-2017 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
2

Marek Vavruša's avatar
Marek Vavruša committed
3 4 5 6
    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.
7

Marek Vavruša's avatar
Marek Vavruša committed
8 9 10 11
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.
12

Marek Vavruša's avatar
Marek Vavruša committed
13
    You should have received a copy of the GNU General Public License
14
    along with this program.  If not, see <https://www.gnu.org/licenses/>.
Marek Vavruša's avatar
Marek Vavruša committed
15
 */
16 17 18

#pragma once

19
#include "daemon/engine.h"
20
#include "lib/generic/array.h"
21
#include "lib/generic/map.h"
22

23

24 25
/** Query resolution task (opaque). */
struct qr_task;
26 27
/** Worker state (opaque). */
struct worker_ctx;
28 29
/** Transport session (opaque). */
struct session;
30 31
/** Zone import context (opaque). */
struct zone_import_ctx;
32

33 34 35 36
/** Create and initialize the worker. */
struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool,
		int worker_id, int worker_count);

37
/**
38 39
 * Process an incoming packet (query from a client or answer from upstream).
 *
40 41
 * @param session  session the where packet came from
 * @param query    the packet, or NULL on an error from the transport layer
42 43
 * @return 0 or an error code
 */
44
int worker_submit(struct session *session, knot_pkt_t *query);
45 46 47 48 49

/**
 * End current DNS/TCP session, this disassociates pending tasks from this session
 * which may be freely closed afterwards.
 */
50
int worker_end_tcp(struct session *session);
51

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
/**
 * Start query resolution with given query.
 *
 * @return task or NULL
 */
struct qr_task *worker_resolve_start(struct worker_ctx *worker, knot_pkt_t *query, struct kr_qflags options);

/**
 * Execute a request with given query.
 * It expects task to be created with \fn worker_resolve_start.
 *
 * @return 0 or an error code
 */
int worker_resolve_exec(struct qr_task *task, knot_pkt_t *query);

67 68
/** @return struct kr_request associated with opaque task */
struct kr_request *worker_task_request(struct qr_task *task);
69 70 71 72

/** Collect worker mempools */
void worker_reclaim(struct worker_ctx *worker);

73 74 75 76 77
int worker_task_step(struct qr_task *task, const struct sockaddr *packet_source,
		     knot_pkt_t *packet);

int worker_task_numrefs(const struct qr_task *task);

78 79 80
/** Finalize given task */
int worker_task_finalize(struct qr_task *task, int state);

81 82 83 84 85 86 87 88 89 90 91 92 93
void worker_task_complete(struct qr_task *task);

void worker_task_ref(struct qr_task *task);

void worker_task_unref(struct qr_task *task);

void worker_task_timeout_inc(struct qr_task *task);

int worker_add_tcp_connected(struct worker_ctx *worker,
			     const struct sockaddr *addr,
			     struct session *session);
int worker_del_tcp_connected(struct worker_ctx *worker,
			     const struct sockaddr *addr);
94 95
int worker_del_tcp_waiting(struct worker_ctx *worker,
			   const struct sockaddr* addr);
96 97 98 99 100 101 102 103
knot_pkt_t *worker_task_get_pktbuf(const struct qr_task *task);

struct request_ctx *worker_task_get_request(struct qr_task *task);

struct session *worker_request_get_source_session(struct request_ctx *);

void worker_request_set_source_session(struct request_ctx *, struct session *session);

104 105
uint16_t worker_task_pkt_get_msgid(struct qr_task *task);
void worker_task_pkt_set_msgid(struct qr_task *task, uint16_t msgid);
106 107 108
uint64_t worker_task_creation_time(struct qr_task *task);
void worker_task_subreq_finalize(struct qr_task *task);
bool worker_task_finished(struct qr_task *task);
109

110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125

/** Various worker statistics.  Sync with wrk_stats() */
struct worker_stats {
	size_t queries;     /**< Total number of requests (from clients and internal ones). */
	size_t concurrent;  /**< The number of requests currently in processing. */
	size_t rconcurrent; /*< TODO: remove?  I see no meaningful difference from .concurrent. */
	size_t dropped;     /**< The number of requests dropped due to being badly formed.  See #471. */

	size_t timeout; /**< Number of outbound queries that timed out. */
	size_t udp;  /**< Number of outbound queries over UDP. */
	size_t tcp;  /**< Number of outbound queries over TCP (excluding TLS). */
	size_t tls;  /**< Number of outbound queries over TLS. */
	size_t ipv4; /**< Number of outbound queries over IPv4.*/
	size_t ipv6; /**< Number of outbound queries over IPv6. */
};

126 127 128 129 130
/** @cond internal */

/** Number of request within timeout window. */
#define MAX_PENDING KR_NSREP_MAXADDR

131
/** Maximum response time from TCP upstream, milliseconds */
132
#define MAX_TCP_INACTIVITY (KR_RESOLVE_TIME_LIMIT + KR_CONN_RTT_MAX)
133

134 135 136 137
#ifndef RECVMMSG_BATCH /* see check_bufsize() */
#define RECVMMSG_BATCH 1
#endif

138
/** Freelist of available mempools. */
139
typedef array_t(struct mempool *) mp_freelist_t;
140

141 142 143
/** List of query resolution tasks. */
typedef array_t(struct qr_task *) qr_tasklist_t;

144
/** \details Worker state is meant to persist during the whole life of daemon. */
145
struct worker_ctx {
146 147
	struct engine *engine;
	uv_loop_t *loop;
148 149
	int id;
	int count;
150
	int vars_table_ref;
151
	unsigned tcp_pipeline_max;
152 153 154 155 156

	/** Addresses to bind for outgoing connections or AF_UNSPEC. */
	struct sockaddr_in out_addr4;
	struct sockaddr_in6 out_addr6;

157
	uint8_t wire_buf[RECVMMSG_BATCH * KNOT_WIRE_MAX_PKTSIZE];
158

159
	struct worker_stats stats;
160

161
	struct zone_import_ctx* z_import;
162 163
	bool too_many_open;
	size_t rconcurrent_highwatermark;
164
	/** List of active outbound TCP sessions */
165
	map_t tcp_connected;
166
	/** List of outbound TCP sessions waiting to be accepted */
167
	map_t tcp_waiting;
168 169
	/** Subrequest leaders (struct qr_task*), indexed by qname+qtype+qclass. */
	trie_t *subreq_out;
170
	mp_freelist_t pool_mp;
171
	knot_mm_t pkt_pool;
172
	unsigned int next_request_uid;
173 174
};

175
/** @endcond */
176