Commit 51d52754 authored by Marek Vavruša's avatar Marek Vavruša

daemon/worker: close all handles in worker, pass through errors

do all socket closing in worker to avoid double closes with
timeout timer, also propagate bad messages to discover errors
earlier than timeout
parent 53cb90dc
......@@ -53,18 +53,9 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
{
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
/* UDP requests are oneshot, always close afterwards */
if (handle->data && !uv_is_closing((uv_handle_t *)handle)) { /* Do not free master socket */
io_close((uv_handle_t *)handle);
}
/* Check the incoming wire length. */
if (nread > KNOT_WIRE_HEADER_SIZE) {
knot_pkt_t *query = knot_pkt_new(buf->base, nread, worker->mm);
worker_exec(worker, (uv_handle_t *)handle, query, addr);
knot_pkt_free(&query);
}
knot_pkt_t *query = knot_pkt_new(buf->base, nread, worker->mm);
worker_exec(worker, (uv_handle_t *)handle, query, addr);
knot_pkt_free(&query);
}
int udp_bind(struct endpoint *ep, struct sockaddr *addr)
......@@ -94,23 +85,23 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
/* Check for connection close */
if (nread <= 0) {
/* Check for originator connection close */
if (nread <= 0 && handle->data == 0) {
io_close((uv_handle_t *)handle);
return;
} else if (nread < 2) {
/* Not enough bytes to read length */
worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
return;
}
/* Set packet size */
/** @todo This is not going to work if the packet is fragmented in the stream ! */
uint16_t nbytes = wire_read_u16((const uint8_t *)buf->base);
/* Check if there's enough data and execute */
if (nbytes + 2 < nread) {
worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
return;
}
knot_pkt_t *query = knot_pkt_new(buf->base + 2, nbytes, worker->mm);
worker_exec(worker, (uv_handle_t *)handle, query, NULL);
knot_pkt_free(&query);
......
......@@ -110,9 +110,8 @@ static void qr_task_free(uv_handle_t *handle)
static void qr_task_timeout(uv_timer_t *req)
{
struct qr_task *task = req->data;
if (!uv_is_closing(task->next_handle)) {
if (task->next_handle) {
io_stop_read(task->next_handle);
uv_close(task->next_handle, (uv_close_cb) free);
qr_task_step(task, NULL);
}
}
......@@ -127,6 +126,7 @@ static void qr_task_on_send(uv_req_t* req, int status)
io_start_read(task->next_handle);
}
} else { /* Finalize task */
uv_timer_stop(&task->timeout);
uv_close((uv_handle_t *)&task->timeout, qr_task_free);
}
}
......@@ -171,9 +171,12 @@ static int qr_task_finalize(struct qr_task *task, int state)
static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
{
/* Cancel timeout if active */
uv_timer_stop(&task->timeout);
task->next_handle = NULL;
/* Cancel timeout if active, close handle. */
if (task->next_handle) {
uv_close(task->next_handle, (uv_close_cb) free);
uv_timer_stop(&task->timeout);
task->next_handle = NULL;
}
/* Consume input and produce next query */
int sock_type = -1;
......@@ -187,10 +190,7 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
/* We're done, no more iterations needed */
if (state & (KNOT_STATE_DONE|KNOT_STATE_FAIL)) {
return qr_task_finalize(task, state);
}
/* Iteration limit */
if (++task->iter_count > KR_ITER_LIMIT) {
} else if (++task->iter_count > KR_ITER_LIMIT) {
return qr_task_finalize(task, KNOT_STATE_FAIL);
}
......@@ -206,20 +206,17 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
if (sock_type == SOCK_STREAM) {
uv_connect_t *connect = &task->ioreq.connect;
if (uv_tcp_connect(connect, (uv_tcp_t *)task->next_handle, addr, qr_task_on_connect) != 0) {
uv_close(task->next_handle, (uv_close_cb) free);
return qr_task_step(task, NULL);
}
connect->data = task;
} else {
if (qr_task_send(task, task->next_handle, addr, next_query) != 0) {
uv_close(task->next_handle, (uv_close_cb) free);
return qr_task_step(task, NULL);
}
}
/* Start next timeout */
/* Start next step with timeout */
uv_timer_start(&task->timeout, qr_task_timeout, KR_CONN_RTT_MAX, 0);
return kr_ok();
}
......@@ -231,15 +228,13 @@ int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *quer
/* Parse query */
int ret = parse_query(query);
if (ret != 0) {
return ret;
}
/* Start new task on master sockets, or resume existing */
struct qr_task *task = handle->data;
bool is_master_socket = (!task);
if (is_master_socket) {
if (knot_wire_get_qr(query->wire)) {
/* Ignore badly formed queries or responses. */
if (ret != 0 || knot_wire_get_qr(query->wire)) {
return kr_error(EINVAL); /* Ignore. */
}
task = qr_task_create(worker, handle, addr);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment