Verified Commit a1485566 authored by Grigorii Demidov's avatar Grigorii Demidov Committed by Vladimír Čunát

Unificate handling of TCP timeouts for input and output connections; avoid...

Unificate handling of TCP timeouts for input and output connections; avoid redundant code; bugfixing
parent 38694b56
Pipeline #41232 failed with stages
in 21 minutes and 57 seconds
......@@ -140,16 +140,45 @@ int udp_bindfd(uv_udp_t *handle, int fd)
return udp_bind_finalize((uv_handle_t *)handle);
}
static void tcp_timeout_trigger(uv_timer_t *timer)
void tcp_timeout_trigger(uv_timer_t *timer)
{
struct session *s = timer->data;
assert(!session_flags(s)->outgoing);
assert(!session_flags(s)->closing);
assert(session_waitinglist_is_empty(s));
struct worker_ctx *worker = timer->loop->data;
if (!session_tasklist_is_empty(s)) {
int finalized = session_tasklist_finalize_expired(s);
worker->stats.timeout += finalized;
/* session_tasklist_finalize_expired() may call worker_task_finalize().
* If session is a source session and there were IO errors,
* worker_task_finalize() can filnalize all tasks and close session. */
if (session_flags(s)->closing) {
return;
}
}
if (!session_tasklist_is_empty(s)) {
uv_timer_again(timer);
} else if (!session_flags(s)->closing) {
uv_timer_stop(timer);
session_close(s);
session_timer_start(s, tcp_timeout_trigger,
KR_RESOLVE_TIME_LIMIT / 2,
KR_RESOLVE_TIME_LIMIT / 2);
} else {
const struct engine *engine = worker->engine;
const struct network *net = &engine->net;
uint64_t idle_in_timeout = net->tcp.in_idle_timeout;
uint64_t last_activity = session_last_input_activity(s);
uint64_t idle_time = kr_now() - last_activity;
if (idle_time < idle_in_timeout) {
idle_in_timeout -= idle_time;
uv_timer_stop(timer);
session_timer_start(s, tcp_timeout_trigger,
idle_in_timeout, idle_in_timeout);
} else {
session_close(s);
}
}
}
......@@ -206,14 +235,6 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
if (ret < 0) {
/* An error has occurred, close the session. */
worker_end_tcp(s);
} else if (ret > 0 && !session_flags(s)->closing) {
/* Connection spawned at least one request
* or
* valid answer has been received from upstream.
* Reset deadline for next query.
* https://tools.ietf.org/html/rfc7766#section-6.2.3
*/
session_timer_restart(s);
}
session_wirebuf_compress(s);
mp_flush(worker->pkt_pool.ctx);
......
......@@ -31,6 +31,7 @@ int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr);
int tcp_bind_tls(uv_tcp_t *handle, struct sockaddr *addr);
int tcp_bindfd(uv_tcp_t *handle, int fd);
int tcp_bindfd_tls(uv_tcp_t *handle, int fd);
void tcp_timeout_trigger(uv_timer_t *timer);
/** Initialize the handle, incl. ->data = struct session * instance.
* \param type = SOCK_*
......
......@@ -14,21 +14,22 @@
* that exists between remote counterpart and a local socket.
*/
struct session {
struct session_flags sflags; /**< miscellaneous flags. */
union inaddr peer; /**< address of peer; is not set for client's UDP sessions. */
uv_handle_t *handle; /**< libuv handle for IO operations. */
uv_timer_t timeout; /**< libuv handle for timer. */
struct session_flags sflags; /**< miscellaneous flags. */
union inaddr peer; /**< address of peer; is not set for client's UDP sessions. */
uv_handle_t *handle; /**< libuv handle for IO operations. */
uv_timer_t timeout; /**< libuv handle for timer. */
struct tls_ctx_t *tls_ctx; /**< server side tls-related data. */
struct tls_ctx_t *tls_ctx; /**< server side tls-related data. */
struct tls_client_ctx_t *tls_client_ctx; /**< client side tls-related data. */
trie_t *tasks; /**< list of tasks assotiated with given session. */
trie_t *tasks; /**< list of tasks assotiated with given session. */
queue_t(struct qr_task *) waiting; /**< list of tasks waiting for sending to upstream. */
uint8_t *wire_buf; /**< Buffer for DNS message. */
ssize_t wire_buf_size; /**< Buffer size. */
ssize_t wire_buf_start_idx; /**< Data start offset in wire_buf. */
ssize_t wire_buf_end_idx; /**< Data end offset in wire_buf. */
uint8_t *wire_buf; /**< Buffer for DNS message. */
ssize_t wire_buf_size; /**< Buffer size. */
ssize_t wire_buf_start_idx; /**< Data start offset in wire_buf. */
ssize_t wire_buf_end_idx; /**< Data end offset in wire_buf. */
uint64_t last_input_activity; /**< Either creatoion time or time of peer's last activity */
};
static void on_session_close(uv_handle_t *handle)
......@@ -160,7 +161,7 @@ int session_tasklist_add(struct session *session, struct qr_task *task)
worker_task_ref(task);
} else if (*v != task) {
assert(false);
return kr_error(ENOMEM);
return kr_error(EINVAL);
}
return kr_ok();
}
......@@ -216,9 +217,10 @@ struct qr_task* session_tasklist_del_msgid(const struct session *session, uint16
trie_val_t val;
int res = trie_del(t, key, key_len, &val);
if (res == kr_ok()) {
ret = val;
assert(worker_task_numrefs(ret) > 1);
worker_task_unref(ret);
if (worker_task_numrefs(val) > 1) {
ret = val;
}
worker_task_unref(val);
}
return ret;
}
......@@ -322,6 +324,7 @@ struct session *session_new(uv_handle_t *handle)
session->handle = handle;
handle->data = session;
session->timeout.data = session;
session_touch(session);
return session;
}
......@@ -366,12 +369,11 @@ void session_waitinglist_retry(struct session *session, bool increase_timeout_cn
{
while (!session_waitinglist_is_empty(session)) {
struct qr_task *task = session_waitinglist_pop(session, false);
assert(worker_task_numrefs(task) > 1);
if (increase_timeout_cnt) {
worker_task_timeout_inc(task);
}
worker_task_unref(task);
worker_task_step(task, NULL, NULL);
worker_task_unref(task);
}
}
......@@ -379,13 +381,7 @@ void session_waitinglist_finalize(struct session *session, int status)
{
while (!session_waitinglist_is_empty(session)) {
struct qr_task *t = session_waitinglist_pop(session, false);
if (session->sflags.outgoing) {
worker_task_finalize(t, status);
} else {
struct request_ctx *ctx = worker_task_get_request(t);
assert(worker_request_get_source_session(ctx) == session);
worker_request_set_source_session(ctx, NULL);
}
worker_task_finalize(t, status);
worker_task_unref(t);
}
}
......@@ -395,21 +391,62 @@ void session_tasklist_finalize(struct session *session, int status)
while (session_tasklist_get_len(session) > 0) {
struct qr_task *t = session_tasklist_del_first(session, false);
assert(worker_task_numrefs(t) > 0);
if (session->sflags.outgoing) {
worker_task_finalize(t, status);
} else {
struct request_ctx *ctx = worker_task_get_request(t);
assert(worker_request_get_source_session(ctx) == session);
worker_request_set_source_session(ctx, NULL);
}
worker_task_finalize(t, status);
worker_task_unref(t);
}
}
void session_tasks_finalize(struct session *session, int status)
int session_tasklist_finalize_expired(struct session *session)
{
session_waitinglist_finalize(session, status);
session_tasklist_finalize(session, status);
int ret = 0;
queue_t(struct qr_task *) q;
uint64_t now = kr_now();
trie_t *t = session->tasks;
trie_it_t *it;
queue_init(q);
for (it = trie_it_begin(t); !trie_it_finished(it); trie_it_next(it)) {
trie_val_t *v = trie_it_val(it);
struct qr_task *task = (struct qr_task *)*v;
if ((now - worker_task_creation_time(task)) >= KR_RESOLVE_TIME_LIMIT) {
queue_push(q, task);
worker_task_ref(task);
}
}
trie_it_free(it);
struct qr_task *task = NULL;
uint16_t msg_id = 0;
char *key = (char *)&task;
int32_t keylen = sizeof(struct qr_task *);
if (session->sflags.outgoing) {
key = (char *)&msg_id;
keylen = sizeof(msg_id);
}
while (queue_len(q) > 0) {
task = queue_head(q);
if (session->sflags.outgoing) {
knot_pkt_t *pktbuf = worker_task_get_pktbuf(task);
msg_id = knot_wire_get_id(pktbuf->wire);
}
int res = trie_del(t, key, keylen, NULL);
if (!worker_task_finished(task)) {
/* task->pending_count must be zero,
* but there are can be followers,
* so run worker_task_subreq_finalize() to ensure retrying
* for all the followers. */
worker_task_subreq_finalize(task);
worker_task_finalize(task, KR_STATE_FAIL);
}
if (res == KNOT_EOK) {
worker_task_unref(task);
}
queue_pop(q);
worker_task_unref(task);
++ret;
}
queue_deinit(q);
return ret;
}
int session_timer_start(struct session *session, uv_timer_cb cb,
......@@ -673,55 +710,30 @@ int session_wirebuf_process(struct session *session)
return ret;
}
static void on_session_idle_timeout(uv_timer_t *timer)
void session_kill_ioreq(struct session *s, struct qr_task *task)
{
struct session *s = timer->data;
assert(s);
uv_timer_stop(timer);
if (s->sflags.closing) {
if (!s) {
return;
}
/* session was not in use during timer timeout
* remove it from connection list and close
*/
assert(session_is_empty(s));
session_close(s);
}
void session_kill_ioreq(struct session *s, struct qr_task *task)
{
assert(s && s->sflags.outgoing && s->handle);
assert(s->sflags.outgoing && s->handle);
if (s->sflags.closing) {
return;
}
session_tasklist_del(s, task);
if (s->handle->type == UV_UDP) {
uv_timer_stop(&s->timeout);
session_tasklist_del(s, task);
assert(session_tasklist_is_empty(s));
session_close(s);
return;
}
/* TCP-specific code now. */
if (s->handle->type != UV_TCP) abort();
int res = 0;
const struct sockaddr *peer = &s->peer.ip;
if (peer->sa_family != AF_UNSPEC && session_is_empty(s) && !s->sflags.closing) {
assert(peer->sa_family == AF_INET || peer->sa_family == AF_INET6);
res = 1;
if (s->sflags.connected) {
/* This is outbound TCP connection which can be reused.
* Close it after timeout */
s->timeout.data = s;
uv_timer_stop(&s->timeout);
res = uv_timer_start(&s->timeout, on_session_idle_timeout,
KR_CONN_RTT_MAX, 0);
}
}
}
if (res != 0) {
/* if any errors, close the session immediately */
session_close(s);
}
/** Update timestamp */
void session_touch(struct session *s)
{
s->last_input_activity = kr_now();
}
uint64_t session_last_input_activity(struct session *s)
{
return s->last_input_activity;
}
......@@ -80,12 +80,12 @@ struct qr_task* session_tasklist_del_msgid(const struct session *session, uint16
struct qr_task* session_tasklist_find_msgid(const struct session *session, uint16_t msg_id);
/** Finalize all tasks in the list. */
void session_tasklist_finalize(struct session *session, int status);
/** Finalize all expired tasks in the list. */
int session_tasklist_finalize_expired(struct session *session);
/** Both of task lists (associated & waiting). */
/** Check if empty. */
bool session_is_empty(const struct session *session);
/** Finalize all tasks. */
void session_tasks_finalize(struct session *session, int status);
/** Get pointer to session flags */
struct session_flags *session_flags(struct session *session);
/** Get peer address. */
......@@ -141,3 +141,6 @@ knot_pkt_t *session_produce_packet(struct session *session, knot_mm_t *mm);
int session_discard_packet(struct session *session, const knot_pkt_t *pkt);
void session_kill_ioreq(struct session *s, struct qr_task *task);
/** Update timestamp */
void session_touch(struct session *s);
uint64_t session_last_input_activity(struct session *s);
......@@ -89,6 +89,7 @@ struct qr_task
uint32_t refs;
bool finished : 1;
bool leading : 1;
uint64_t creation_time;
};
......@@ -97,8 +98,6 @@ struct qr_task
do { ++(task)->refs; } while(0)
#define qr_task_unref(task) \
do { if (task && --(task)->refs == 0) { qr_task_free(task); } } while (0)
#define qr_valid_handle(task, checked) \
(!uv_is_closing((checked)) || (task)->ctx->source.session->handle == (checked))
/** @internal get key for tcp session
* @note kr_straddr() return pointer to static string
......@@ -124,7 +123,6 @@ static int worker_del_tcp_waiting(struct worker_ctx *worker,
static struct session* worker_find_tcp_waiting(struct worker_ctx *worker,
const struct sockaddr *addr);
static void on_tcp_connect_timeout(uv_timer_t *timer);
static void on_tcp_watchdog_timeout(uv_timer_t *timer);
/** @internal Get singleton worker. */
static inline struct worker_ctx *get_worker(void)
......@@ -442,6 +440,7 @@ static struct qr_task *qr_task_create(struct request_ctx *ctx)
ctx->task = task;
/* Make the primary reference to task. */
qr_task_ref(task);
task->creation_time = kr_now();
ctx->worker->stats.concurrent += 1;
return task;
}
......@@ -453,24 +452,8 @@ static void qr_task_free(struct qr_task *task)
assert(ctx);
/* Process outbound session. */
struct session *s = ctx->source.session;
struct worker_ctx *worker = ctx->worker;
/* Process source session. */
if (s && session_tasklist_get_len(s) < worker->tcp_pipeline_max/2 &&
!session_flags(s)->closing && session_flags(s)->throttled) {
uv_handle_t *handle = session_get_handle(s);
/* Start reading again if the session is throttled and
* the number of outgoing requests is below watermark. */
if (handle) {
io_start_read(handle);
session_flags(s)->throttled = false;
}
}
task->ctx = NULL;
if (ctx->task == NULL) {
request_free(ctx);
}
......@@ -515,6 +498,7 @@ static void qr_task_complete(struct qr_task *task)
struct session *s = ctx->source.session;
if (s) {
assert(!session_flags(s)->outgoing && session_waitinglist_is_empty(s));
ctx->source.session = NULL;
session_tasklist_del(s, task);
}
......@@ -771,7 +755,7 @@ static int session_tls_hs_cb(struct session *session, int status)
session_close(session);
} else {
session_timer_stop(session);
session_timer_start(session, on_tcp_watchdog_timeout,
session_timer_start(session, tcp_timeout_trigger,
MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
}
return kr_ok();
......@@ -810,8 +794,6 @@ static void on_connect(uv_connect_t *req, int status)
return;
}
session_timer_stop(session);
if (status != 0) {
worker_del_tcp_waiting(worker, peer);
assert(session_tasklist_is_empty(session));
......@@ -850,7 +832,8 @@ static void on_connect(uv_connect_t *req, int status)
struct tls_client_ctx_t *tls_ctx = session_tls_get_client_ctx(session);
ret = tls_client_connect_start(tls_ctx, session, session_tls_hs_cb);
if (ret == kr_error(EAGAIN)) {
session_timer_start(session, on_tcp_watchdog_timeout,
session_timer_stop(session);
session_timer_start(session, tcp_timeout_trigger,
MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
return;
}
......@@ -861,15 +844,16 @@ static void on_connect(uv_connect_t *req, int status)
struct qr_task *t = session_waitinglist_get(session);
ret = qr_task_send(t, session, NULL, NULL);
if (ret != 0) {
assert(session_tasklist_is_empty(session));
worker_del_tcp_connected(worker, peer);
session_waitinglist_finalize(session, KR_STATE_FAIL);
session_tasklist_finalize(session, KR_STATE_FAIL);
session_close(session);
return;
}
session_waitinglist_pop(session, true);
}
session_timer_start(session, on_tcp_watchdog_timeout,
session_timer_stop(session);
session_timer_start(session, tcp_timeout_trigger,
MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
}
......@@ -903,30 +887,6 @@ static void on_tcp_connect_timeout(uv_timer_t *timer)
session_close(session);
}
static void on_tcp_watchdog_timeout(uv_timer_t *timer)
{
struct session *session = timer->data;
assert(session_flags(session)->outgoing);
assert(!session_flags(session)->closing);
struct worker_ctx *worker = timer->loop->data;
struct sockaddr *peer = session_get_peer(session);
uv_timer_stop(timer);
if (session_flags(session)->has_tls) {
worker_del_tcp_waiting(worker, peer);
}
worker_del_tcp_connected(worker, peer);
worker->stats.timeout += session_waitinglist_get_len(session);
session_waitinglist_finalize(session, KR_STATE_FAIL);
worker->stats.timeout += session_tasklist_get_len(session);
session_tasklist_finalize(session, KR_STATE_FAIL);
session_close(session);
}
/* This is called when I/O timeouts */
static void on_udp_timeout(uv_timer_t *timer)
{
......@@ -1013,6 +973,9 @@ static void on_retransmit(uv_timer_t *req)
static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
{
if (!task || task->finished) {
return;
}
/* Close pending timer */
ioreq_kill_pending(task);
/* Clear from outgoing table. */
......@@ -1121,10 +1084,6 @@ static int qr_task_finalize(struct qr_task *task, int state)
worker_task_unref(t);
}
session_close(source_session);
} else if (session_get_handle(source_session)->type == UV_TCP) {
/* Don't try to close source session at least
* retry_interval_for_timeout_timer milliseconds */
session_timer_restart(source_session);
}
qr_task_unref(task);
......@@ -1230,9 +1189,11 @@ static int qr_task_step(struct qr_task *task,
}
} else {
assert (sock_type == SOCK_STREAM);
assert(task->pending_count == 0);
const struct sockaddr *addr =
packet_source ? packet_source : task->addrlist;
if (addr->sa_family == AF_UNSPEC) {
/* task->pending_count is zero, but there are can be followers */
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
......@@ -1260,7 +1221,6 @@ static int qr_task_step(struct qr_task *task,
return qr_task_finalize(task, KR_STATE_FAIL);
}
session_timer_stop(session);
while (!session_waitinglist_is_empty(session)) {
struct qr_task *t = session_waitinglist_get(session);
ret = qr_task_send(t, session, NULL, NULL);
......@@ -1286,18 +1246,6 @@ static int qr_task_step(struct qr_task *task,
session_close(session);
return qr_task_finalize(task, KR_STATE_FAIL);
}
ret = session_timer_start(session, on_tcp_watchdog_timeout,
MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
if (ret < 0) {
session_tasklist_finalize(session, KR_STATE_FAIL);
subreq_finalize(task, packet_source, packet);
session_close(session);
return qr_task_finalize(task, KR_STATE_FAIL);
}
assert(task->pending_count == 0);
task->pending[task->pending_count] = session_get_handle(session);
task->pending_count += 1;
} else {
/* Make connection */
uv_connect_t *conn = malloc(sizeof(uv_connect_t));
......@@ -1465,6 +1413,9 @@ int worker_submit(struct session *session, knot_pkt_t *query)
}
assert(uv_is_closing(session_get_handle(session)) == false);
/* Packet was successfully parsed.
* Task was created (found). */
session_touch(session);
/* Consume input and produce next message */
return qr_task_step(task, addr, query);
}
......@@ -1578,7 +1529,6 @@ int worker_end_tcp(struct session *session)
tls_set_hs_state(&tls_ctx->c, TLS_HS_NOT_STARTED);
}
assert(session_tasklist_get_len(session) >= session_waitinglist_get_len(session));
while (!session_waitinglist_is_empty(session)) {
struct qr_task *task = session_waitinglist_pop(session, false);
assert(task->refs > 1);
......@@ -1750,6 +1700,20 @@ void worker_task_pkt_set_msgid(struct qr_task *task, uint16_t msgid)
q->id = msgid;
}
uint64_t worker_task_creation_time(struct qr_task *task)
{
return task->creation_time;
}
void worker_task_subreq_finalize(struct qr_task *task)
{
subreq_finalize(task, NULL, NULL);
}
bool worker_task_finished(struct qr_task *task)
{
return task->finished;
}
/** Reserve worker buffers */
static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
{
......
......@@ -102,6 +102,9 @@ void worker_request_set_source_session(struct request_ctx *, struct session *ses
uint16_t worker_task_pkt_get_msgid(struct qr_task *task);
void worker_task_pkt_set_msgid(struct qr_task *task, uint16_t msgid);
uint64_t worker_task_creation_time(struct qr_task *task);
void worker_task_subreq_finalize(struct qr_task *task);
bool worker_task_finished(struct qr_task *task);
/** @cond internal */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment