Verified Commit c3d78e32 authored by Grigorii Demidov's avatar Grigorii Demidov Committed by Vladimír Čunát

daemon/session: fixes for bugs revealed by transport tests

parent e792aac3
......@@ -204,22 +204,15 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
int ret = session_wirebuf_process(s);
if (ret < 0) {
/* An error has occurred, close the session. */
worker_end_tcp(s);
/* Exceeded per-connection quota for outstanding requests
* stop reading from stream and close after last message is processed. */
uv_timer_t *t = session_get_timer(s);
if (!session_flags(s)->outgoing && !uv_is_closing((uv_handle_t *)t)) {
uv_timer_stop(t);
if (session_tasklist_is_empty(s)) {
session_close(s);
} else { /* If there are tasks running, defer until they finish. */
uv_timer_start(t, tcp_timeout_trigger,
MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
}
}
/* Connection spawned at least one request, reset its deadline for next query.
* https://tools.ietf.org/html/rfc7766#section-6.2.3 */
} else if (ret > 0 && !session_flags(s)->closing) {
/* Connection spawned at least one request
* or
* valid answer has been received from upstream.
* Reset deadline for next query.
* https://tools.ietf.org/html/rfc7766#section-6.2.3
*/
session_timer_restart(s);
}
session_wirebuf_compress(s);
......@@ -295,8 +288,7 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls)
session_tls_set_server_ctx(s, ctx);
}
}
uv_timer_t *t = session_get_timer(s);
uv_timer_start(t, tcp_timeout_trigger, timeout, idle_in_timeout);
session_timer_start(s, tcp_timeout_trigger, timeout, idle_in_timeout);
io_start_read((uv_handle_t *)client);
}
......@@ -412,10 +404,10 @@ int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family)
return ret;
}
struct session *s = session_new(handle);
assert(s);
uv_timer_t *t = session_get_timer(s);
t->data = s;
return uv_timer_init(loop, t);
if (s == NULL) {
ret = -1;
}
return ret;
}
void io_deinit(uv_handle_t *handle)
......
......@@ -281,15 +281,14 @@ struct session *session_new(uv_handle_t *handle)
session->wire_buf = worker->wire_buf;
session->wire_buf_size = sizeof(worker->wire_buf);
}
uv_timer_init(handle->loop, &session->timeout);
session->handle = handle;
handle->data = session;
return session;
}
session->timeout.data = session;
uv_timer_t *session_get_timer(struct session *session)
{
return &session->timeout;
return session;
}
size_t session_tasklist_get_len(const struct session *session)
......@@ -472,8 +471,11 @@ knot_pkt_t *session_produce_packet(struct session *session, knot_mm_t *mm)
return NULL;
}
msg_size = knot_wire_read_u16(msg_start);
if (msg_size >= session->wire_buf_size) {
session->sflags.wirebuf_error = true;
return NULL;
}
if (msg_size + 2 > wirebuf_msg_data_size) {
session->sflags.wirebuf_error = false;
return NULL;
}
msg_start += 2;
......@@ -556,6 +558,7 @@ int session_discard_packet(struct session *session, const knot_pkt_t *pkt)
}
session->sflags.wirebuf_error = false;
wirebuf_data_size = session->wire_buf_end_idx - session->wire_buf_start_idx;
if (wirebuf_data_size == 0) {
session_wirebuf_discard(session);
} else if (wirebuf_data_size < KNOT_WIRE_HEADER_SIZE) {
......@@ -634,14 +637,22 @@ int session_wirebuf_process(struct session *session)
return ret;
}
struct worker_ctx *worker = session_get_handle(session)->loop->data;
size_t wirebuf_data_size = session->wire_buf_end_idx - session->wire_buf_start_idx;
uint32_t max_iterations = (wirebuf_data_size / (KNOT_WIRE_HEADER_SIZE + KNOT_WIRE_QUESTION_MIN_SIZE)) + 1;
knot_pkt_t *query = NULL;
while (((query = session_produce_packet(session, &worker->pkt_pool)) != NULL) && (ret < 100)) {
while (((query = session_produce_packet(session, &worker->pkt_pool)) != NULL) &&
(ret < max_iterations)) {
assert (!session_wirebuf_error(session));
worker_submit(session, query);
int res = worker_submit(session, query);
if (res != kr_error(EILSEQ)) {
/* Packet has been successfully parsed. */
ret += 1;
}
if (session_discard_packet(session, query) < 0) {
/* Packet data isn't stored in memory as expected.
something went wrong, normally should not happen. */
break;
}
ret += 1;
}
if (session_wirebuf_error(session)) {
ret = -1;
......
......@@ -104,8 +104,6 @@ struct tls_common_ctx *session_tls_get_common_ctx(const struct session *session)
/** Get pointer to underlying libuv handle for IO operations. */
uv_handle_t *session_get_handle(struct session *session);
/** Get pointer to session timer handle. */
uv_timer_t *session_get_timer(struct session *session);
/** Start session timer. */
int session_timer_start(struct session *session, uv_timer_cb cb,
uint64_t timeout, uint64_t repeat);
......
......@@ -807,9 +807,7 @@ static int session_tls_hs_cb(struct session *session, int status)
assert(session_tasklist_is_empty(session));
session_close(session);
} else {
uv_timer_t *t = session_get_timer(session);
uv_timer_stop(t);
t->data = session;
session_timer_stop(session);
session_timer_start(session, on_tcp_watchdog_timeout,
MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
}
......@@ -850,8 +848,7 @@ static void on_connect(uv_connect_t *req, int status)
return;
}
uv_timer_t *t = session_get_timer(session);
uv_timer_stop(t);
session_timer_stop(session);
if (status != 0) {
worker_del_tcp_waiting(worker, peer);
......@@ -1487,7 +1484,8 @@ int worker_submit(struct session *session, knot_pkt_t *query)
struct sockaddr *addr = NULL;
if (!session_flags(session)->outgoing) { /* request from a client */
/* Ignore badly formed queries. */
if (!query || ret != 0 || knot_wire_get_qr(query->wire)) {
if (!query || (ret != kr_ok() && ret != kr_error(EMSGSIZE)) ||
knot_wire_get_qr(query->wire)) {
if (query) worker->stats.dropped += 1;
return kr_error(EILSEQ);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment