Verified Commit 6d0e0df0 authored by Grigorii Demidov's avatar Grigorii Demidov Committed by Petr Špaček

daemon: retry waiting tasks when outbound TCP connection has problems

parent eb876948
......@@ -309,17 +309,19 @@ static void ioreq_kill_tcp(uv_handle_t *req, struct qr_task *task)
int res = 0;
if (session->outgoing && session->peer.ip.sa_family != AF_UNSPEC &&
session->tasks.len == 0 && session->waiting.len == 0 &&
session->connected && !session->closing) {
session->tasks.len == 0 && session->waiting.len == 0 && !session->closing) {
assert(session->peer.ip.sa_family == AF_INET ||
session->peer.ip.sa_family == AF_INET6);
/* This is outbound TCP connection which can be reused.
* Close it after timeout */
uv_timer_t *timer = &session->timeout;
timer->data = session;
uv_timer_stop(timer);
res = uv_timer_start(timer, on_session_idle_timeout,
KR_CONN_RTT_MAX, 0);
res = 1;
if (session->connected) {
/* This is outbound TCP connection which can be reused.
* Close it after timeout */
uv_timer_t *timer = &session->timeout;
timer->data = session;
uv_timer_stop(timer);
res = uv_timer_start(timer, on_session_idle_timeout,
KR_CONN_RTT_MAX, 0);
}
}
if (res != 0) {
......@@ -1043,17 +1045,9 @@ static void on_connect(uv_connect_t *req, int status)
struct qr_task *task = session->waiting.at[0];
session_del_tasks(session, task);
array_del(session->waiting, 0);
/* TODO fixme
* Daemon should not have direct access to rplan */
struct request_ctx *ctx = task->ctx;
assert(ctx);
struct kr_request *req = &ctx->req;
struct kr_rplan *rplan = &req->rplan;
struct kr_query *qry = array_tail(rplan->pending);
/* Prevent from KR_STATE_FAIL in kr_resolve_consume() */
qry->flags.TCP = false;
qr_task_step(task, task->addrlist, NULL);
assert(task->refs > 1);
qr_task_unref(task);
qr_task_step(task, NULL, NULL);
}
assert(session->tasks.len == 0);
iorequest_release(worker, req);
......@@ -1061,6 +1055,13 @@ static void on_connect(uv_connect_t *req, int status)
return;
}
WITH_VERBOSE {
char addr_str[INET6_ADDRSTRLEN];
inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip),
addr_str, sizeof(addr_str));
VERBOSE_MSG(NULL, "=> connected to '%s'\n", addr_str);
}
session->connected = true;
session->handle = (uv_handle_t *)handle;
......@@ -1107,15 +1108,26 @@ static void on_tcp_connect_timeout(uv_timer_t *timer)
assert (session->waiting.len == session->tasks.len);
union inaddr *peer = &session->peer;
worker_del_tcp_waiting(worker, &peer->ip);
WITH_VERBOSE {
char addr_str[INET6_ADDRSTRLEN];
inet_ntop(peer->ip.sa_family, kr_inaddr(&peer->ip), addr_str, sizeof(addr_str));
VERBOSE_MSG(NULL, "=> connection to '%s' failed\n", addr_str);
}
while (session->waiting.len > 0) {
struct qr_task *task = session->waiting.at[0];
struct request_ctx *ctx = task->ctx;
assert(ctx);
task->timeouts += 1;
worker->stats.timeout += 1;
session_del_tasks(session, task);
array_del(session->waiting, 0);
assert(task->refs > 1);
qr_task_unref(task);
qr_task_finalize(task, KR_STATE_FAIL);
qr_task_step(task, NULL, NULL);
}
assert (session->tasks.len == 0);
......@@ -1569,6 +1581,12 @@ static int qr_task_step(struct qr_task *task,
return qr_task_finalize(task, KR_STATE_FAIL);
}
WITH_VERBOSE {
char addr_str[INET6_ADDRSTRLEN];
inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip), addr_str, sizeof(addr_str));
VERBOSE_MSG(NULL, "=> connecting to: '%s'\n", addr_str);
}
if (uv_tcp_connect(conn, (uv_tcp_t *)client,
addr , on_connect) != 0) {
session_del_tasks(session, task);
......@@ -1664,7 +1682,6 @@ int worker_submit(struct worker_ctx *worker, uv_handle_t *handle,
}
assert(session->closing == false);
}
fflush(stdout);
assert(uv_is_closing(session->handle) == false);
/* Consume input and produce next message */
......@@ -1817,37 +1834,19 @@ int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
struct qr_task *task = session->waiting.at[0];
array_del(session->waiting, 0);
assert(task->refs > 1);
qr_task_unref(task);
session_del_tasks(session, task);
if (session->outgoing) {
/* TODO fixme
* Daemon should not have direct access to rplan */
struct request_ctx *ctx = task->ctx;
assert(ctx);
struct kr_request *req = &ctx->req;
struct kr_rplan *rplan = &req->rplan;
struct kr_query *qry = array_tail(rplan->pending);
/* Prevent from KR_STATE_FAIL in kr_resolve_consume() */
qry->flags.TCP = false;
qr_task_step(task, task->addrlist, NULL);
qr_task_step(task, NULL, NULL);
} else {
assert(task->ctx->source.session == session);
task->ctx->source.session = NULL;
}
session_del_tasks(session, task);
qr_task_unref(task);
}
while (session->tasks.len > 0) {
struct qr_task *task = session->tasks.at[0];
if (session->outgoing) {
/* TODO fixme
* Daemon should not have direct access to rplan */
struct request_ctx *ctx = task->ctx;
assert(ctx);
struct kr_request *req = &ctx->req;
struct kr_rplan *rplan = &req->rplan;
struct kr_query *qry = array_tail(rplan->pending);
/* Prevent from KR_STATE_FAIL in kr_resolve_consume() */
qry->flags.TCP = false;
qr_task_step(task, task->addrlist, NULL);
qr_task_step(task, NULL, NULL);
} else {
assert(task->ctx->source.session == session);
task->ctx->source.session = NULL;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment