Verified Commit 15c91147 authored by Grigorii Demidov's avatar Grigorii Demidov Committed by Vladimír Čunát

daemon: fix memory leaks & asan errors; improvements in buffering

parent c57914ec
......@@ -100,6 +100,7 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
ssize_t consumed = session_wirebuf_consume(s, (const uint8_t *)buf->base, nread);
assert(consumed == nread);
session_wirebuf_process(s);
session_wirebuf_discard(s);
mp_flush(worker->pkt_pool.ctx);
}
......@@ -222,6 +223,7 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
} else if (ret > 0 && !session_is_closing(s)) {
session_timer_restart(s);
}
session_wirebuf_compress(s);
mp_flush(worker->pkt_pool.ctx);
}
......
......@@ -39,7 +39,8 @@ struct session {
uint8_t *wire_buf; /**< Buffer for DNS message. */
ssize_t wire_buf_size; /**< Buffer size. */
ssize_t wire_buf_idx; /**< The number of bytes in wire_buf filled so far. */
ssize_t wire_buf_start_idx; /**< Data start offset in wire_buf. */
ssize_t wire_buf_end_idx; /**< Data end offset in wire_buf. */
};
static void on_session_close(uv_handle_t *handle)
......@@ -466,40 +467,54 @@ int session_timer_stop(struct session *session)
ssize_t session_wirebuf_consume(struct session *session, const uint8_t *data, ssize_t len)
{
if (data != &session->wire_buf[session->wire_buf_idx]) {
if (data != &session->wire_buf[session->wire_buf_end_idx]) {
/* shouldn't happen */
return kr_error(EINVAL);
}
if (session->wire_buf_idx + len > session->wire_buf_size) {
if (session->wire_buf_end_idx + len > session->wire_buf_size) {
/* shouldn't happen */
return kr_error(EINVAL);
}
session->wire_buf_idx += len;
session->wire_buf_end_idx += len;
return len;
}
knot_pkt_t *session_produce_packet(struct session *session, knot_mm_t *mm)
{
session->sflags.wirebuf_error = false;
if (session->wire_buf_idx == 0) {
if (session->wire_buf_end_idx == 0) {
return NULL;
}
if (session->wire_buf_start_idx == session->wire_buf_end_idx) {
session->wire_buf_start_idx = 0;
session->wire_buf_end_idx = 0;
return NULL;
}
if (session->wire_buf_start_idx > session->wire_buf_end_idx) {
session->sflags.wirebuf_error = true;
session->wire_buf_start_idx = 0;
session->wire_buf_end_idx = 0;
return NULL;
}
const uv_handle_t *handle = session->handle;
uint8_t *msg_start = session->wire_buf;
uint16_t msg_size = session->wire_buf_idx;
uint8_t *msg_start = &session->wire_buf[session->wire_buf_start_idx];
ssize_t wirebuf_msg_data_size = session->wire_buf_end_idx - session->wire_buf_start_idx;
uint16_t msg_size = wirebuf_msg_data_size;
if (!handle) {
session->sflags.wirebuf_error = true;
return NULL;
} else if (handle->type == UV_TCP) {
if (session->wire_buf_idx < 2) {
if (msg_size < 2) {
return NULL;
}
msg_size = knot_wire_read_u16(session->wire_buf);
if (msg_size + 2 > session->wire_buf_idx) {
msg_size = knot_wire_read_u16(msg_start);
if (msg_size + 2 > wirebuf_msg_data_size) {
session->sflags.wirebuf_error = false;
return NULL;
}
......@@ -516,66 +531,104 @@ knot_pkt_t *session_produce_packet(struct session *session, knot_mm_t *mm)
int session_discard_packet(struct session *session, const knot_pkt_t *pkt)
{
uv_handle_t *handle = session->handle;
uint8_t *wirebuf_data_start = session->wire_buf;
size_t wirebuf_msg_data_size = session->wire_buf_idx;
uint8_t *wirebuf_msg_start = session->wire_buf;
size_t wirebuf_msg_size = session->wire_buf_idx;
/* Pointer to data start in wire_buf */
uint8_t *wirebuf_data_start = &session->wire_buf[session->wire_buf_start_idx];
/* Number of data bytes in wire_buf */
size_t wirebuf_data_size = session->wire_buf_end_idx - session->wire_buf_start_idx;
/* Pointer to message start in wire_buf */
uint8_t *wirebuf_msg_start = wirebuf_data_start;
/* Number of message bytes in wire_buf.
* For UDP it is the same number as wirebuf_data_size. */
size_t wirebuf_msg_size = wirebuf_data_size;
/* Wire data from parsed packet. */
uint8_t *pkt_msg_start = pkt->wire;
/* Number of bytes in packet wire buffer. */
size_t pkt_msg_size = pkt->size;
if (pkt->tsig_rr) {
if (knot_pkt_has_tsig(pkt)) {
pkt_msg_size += pkt->tsig_wire.len;
}
session->sflags.wirebuf_error = true;
if (!handle) {
return kr_error(EINVAL);
} else if (handle->type == UV_UDP) {
/* Fast check for UDP */
if (wirebuf_msg_start != pkt_msg_start) {
} else if (handle->type == UV_TCP) {
/* wire_buf contains TCP DNS message. */
if (wirebuf_data_size < 2) {
/* TCP message length field isn't in buffer, must not happen. */
assert(0);
session->wire_buf_start_idx = 0;
session->wire_buf_end_idx = 0;
return kr_error(EINVAL);
}
wirebuf_msg_size = knot_wire_read_u16(wirebuf_msg_start);
wirebuf_msg_start += 2;
if (wirebuf_msg_size + 2 > wirebuf_data_size) {
/* TCP message length field is greater then
* number of bytes in buffer, must not happen. */
assert(0);
session->wire_buf_start_idx = 0;
session->wire_buf_end_idx = 0;
return kr_error(EINVAL);
}
session->wire_buf_idx = 0;
session->sflags.wirebuf_error = false;
return kr_ok();
}
if (session->wire_buf_idx < 2) {
return kr_error(EINVAL);
}
wirebuf_msg_size = knot_wire_read_u16(wirebuf_data_start);
wirebuf_msg_start += 2;
wirebuf_msg_data_size = wirebuf_msg_size + 2;
if (wirebuf_msg_start != pkt_msg_start) {
/* packet wirebuf must be located at the beginning
* of the session wirebuf, must not happen. */
assert(0);
session->wire_buf_start_idx = 0;
session->wire_buf_end_idx = 0;
return kr_error(EINVAL);
}
if (wirebuf_msg_size != pkt_msg_size) {
if (wirebuf_msg_size < pkt_msg_size) {
/* Message length field is lesser then packet size,
* must not happen. */
assert(0);
session->wire_buf_start_idx = 0;
session->wire_buf_end_idx = 0;
return kr_error(EINVAL);
}
if (wirebuf_msg_data_size > session->wire_buf_idx) {
return kr_error(EINVAL);
if (handle->type == UV_TCP) {
session->wire_buf_start_idx += wirebuf_msg_size + 2;
} else {
session->wire_buf_start_idx += pkt_msg_size;
}
session->sflags.wirebuf_error = false;
uint16_t wirebuf_data_amount = session->wire_buf_idx - wirebuf_msg_data_size;
if (wirebuf_data_amount) {
if (wirebuf_msg_data_size < wirebuf_data_amount) {
memmove(wirebuf_data_start, &wirebuf_data_start[wirebuf_msg_data_size],
wirebuf_data_amount);
} else {
memcpy(wirebuf_data_start, &wirebuf_data_start[wirebuf_msg_data_size],
wirebuf_data_amount);
}
if (wirebuf_data_size == 0) {
session_wirebuf_discard(session);
} else if (wirebuf_data_size < KNOT_WIRE_HEADER_SIZE) {
session_wirebuf_compress(session);
}
session->wire_buf_idx = wirebuf_data_amount;
session->sflags.wirebuf_error = false;
return kr_ok();
}
void session_wirebuf_discard(struct session *session)
{
session->wire_buf_start_idx = 0;
session->wire_buf_end_idx = 0;
}
void session_wirebuf_compress(struct session *session)
{
if (session->wire_buf_start_idx == 0) {
return;
}
uint8_t *wirebuf_data_start = &session->wire_buf[session->wire_buf_start_idx];
size_t wirebuf_data_size = session->wire_buf_end_idx - session->wire_buf_start_idx;
if (session->wire_buf_start_idx < wirebuf_data_size) {
memmove(session->wire_buf, wirebuf_data_start, wirebuf_data_size);
} else {
memcpy(session->wire_buf, wirebuf_data_start, wirebuf_data_size);
}
session->wire_buf_start_idx = 0;
session->wire_buf_end_idx = wirebuf_data_size;
}
bool session_wirebuf_error(struct session *session)
{
return session->sflags.wirebuf_error;
......@@ -588,7 +641,7 @@ uint8_t *session_wirebuf_get_start(struct session *session)
size_t session_wirebuf_get_len(struct session *session)
{
return session->wire_buf_idx;
return session->wire_buf_end_idx;
}
size_t session_wirebuf_get_size(struct session *session)
......@@ -598,12 +651,12 @@ size_t session_wirebuf_get_size(struct session *session)
uint8_t *session_wirebuf_get_free_start(struct session *session)
{
return &session->wire_buf[session->wire_buf_idx];
return &session->wire_buf[session->wire_buf_end_idx];
}
size_t session_wirebuf_get_free_size(struct session *session)
{
return session->wire_buf_size - session->wire_buf_idx;
return session->wire_buf_size - session->wire_buf_end_idx;
}
void session_poison(struct session *session)
......@@ -619,7 +672,7 @@ void session_unpoison(struct session *session)
int session_wirebuf_process(struct session *session)
{
int ret = 0;
if (session->wire_buf_idx == 0) {
if (session->wire_buf_start_idx == session->wire_buf_end_idx) {
return ret;
}
struct worker_ctx *worker = session_get_handle(session)->loop->data;
......@@ -632,7 +685,6 @@ int session_wirebuf_process(struct session *session)
}
ret += 1;
}
assert(ret < 100);
if (session_wirebuf_error(session)) {
ret = -1;
}
......
......@@ -119,16 +119,20 @@ int session_timer_restart(struct session *session);
/** Stop session timer. */
int session_timer_stop(struct session *session);
/** Get start of session buffer for wire data. */
/** Get pointer to the beginning of session wirebuffer. */
uint8_t *session_wirebuf_get_start(struct session *session);
/** Get size of session wirebuffer. */
size_t session_wirebuf_get_size(struct session *session);
/** Get length of data in the session wirebuffer. */
size_t session_wirebuf_get_len(struct session *session);
/** Get start of free space in session wirebuffer. */
/** Get pointer to the beginning of free space in session wirebuffer. */
uint8_t *session_wirebuf_get_free_start(struct session *session);
/** Get amount of free space in session wirebuffer. */
size_t session_wirebuf_get_free_size(struct session *session);
/** Discard all data in session wirebuffer. */
void session_wirebuf_discard(struct session *session);
/** Move all data to the beginning of the buffer. */
void session_wirebuf_compress(struct session *session);
int session_wirebuf_process(struct session *session);
ssize_t session_wirebuf_consume(struct session *session,
const uint8_t *data, ssize_t len);
......
......@@ -623,6 +623,32 @@ static void qr_task_free(struct qr_task *task)
worker->stats.concurrent -= 1;
}
/*@ Register new qr_task within session. */
static int qr_task_register(struct qr_task *task, struct session *session)
{
assert(!session_is_outgoing(session) && session_get_handle(session)->type == UV_TCP);
session_tasklist_add(session, task);
struct request_ctx *ctx = task->ctx;
assert(ctx && (ctx->source.session == NULL || ctx->source.session == session));
ctx->source.session = session;
/* Soft-limit on parallel queries, there is no "slow down" RCODE
* that we could use to signalize to client, but we can stop reading,
* an in effect shrink TCP window size. To get more precise throttling,
* we would need to copy remainder of the unread buffer and reassemble
* when resuming reading. This is NYI. */
if (session_tasklist_get_len(session) >= task->ctx->worker->tcp_pipeline_max) {
uv_handle_t *handle = session_get_handle(session);
if (handle && !session_is_throttled(session) && !session_is_closing(session)) {
io_stop_read(handle);
session_set_throttled(session, true);
}
}
return 0;
}
static void qr_task_complete(struct qr_task *task)
{
struct request_ctx *ctx = task->ctx;
......@@ -810,9 +836,7 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
}
/* Update statistics */
if (ctx->source.session &&
handle != session_get_handle(ctx->source.session) &&
addr) {
if (session_is_outgoing(session) && addr) {
if (session_has_tls(session))
worker->stats.tls += 1;
else if (handle->type == UV_UDP)
......@@ -825,7 +849,6 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
else if (addr->sa_family == AF_INET)
worker->stats.ipv4 += 1;
}
return ret;
}
......@@ -1613,6 +1636,10 @@ int worker_submit(struct session *session, knot_pkt_t *query)
request_free(ctx);
return kr_error(ENOMEM);
}
if (handle->type == UV_TCP && qr_task_register(task, session)) {
return kr_error(ENOMEM);
}
} else if (query) { /* response from upstream */
if ((ret != kr_ok() && ret != kr_error(EMSGSIZE)) ||
!knot_wire_get_qr(query->wire)) {
......@@ -1762,7 +1789,6 @@ int worker_end_tcp(struct session *session)
assert(task->ctx->source.session == session);
task->ctx->source.session = NULL;
}
qr_task_unref(task);
}
while (!session_tasklist_is_empty(session)) {
struct qr_task *task = session_tasklist_get_first(session);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment