Commit c8d8208b authored by Marek Vavruša's avatar Marek Vavruša

daemon/worker: cleanup ioreq recycling, correct checking of ioreq validity

before only current I/O handle was tracked for each subrequest, but the
requests may have outlived it (connect request isn't cancelable for
example), if such request terminated after the request was finished,
it checked the address of potentially stale task
parent 931d7365
......@@ -65,8 +65,9 @@ struct qr_task
{
struct kr_request req;
struct worker_ctx *worker;
knot_pkt_t *next_query;
uv_handle_t *next_handle;
knot_pkt_t *pktbuf;
uv_req_t *ioreq;
uv_handle_t *iohandle;
uv_timer_t timeout;
struct {
union {
......@@ -82,20 +83,10 @@ struct qr_task
/* Forward decls */
static int qr_task_step(struct qr_task *task, knot_pkt_t *packet);
static int parse_query(knot_pkt_t *query)
/** @internal Get singleton worker. */
static inline struct worker_ctx *get_worker(void)
{
/* Parse query packet. */
int ret = knot_pkt_parse(query, 0);
if (ret != KNOT_EOK) {
return kr_error(EPROTO); /* Ignore malformed query. */
}
/* Check if at least header is parsed. */
if (query->parsed < query->size) {
return kr_error(EMSGSIZE);
}
return kr_ok();
return uv_default_loop()->data;
}
static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr *addr)
......@@ -137,8 +128,9 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
return NULL;
}
task->req.answer = answer;
task->next_query = pktbuf;
task->next_handle = NULL;
task->pktbuf = pktbuf;
task->ioreq = NULL;
task->iohandle = NULL;
task->iter_count = 0;
task->flags = 0;
task->worker = worker;
......@@ -190,106 +182,117 @@ static void qr_task_free(uv_handle_t *handle)
static void qr_task_timeout(uv_timer_t *req)
{
struct qr_task *task = req->data;
if (task->next_handle) { /* Handle data may be stale when it completes */
task->next_handle->data = NULL;
if (!uv_is_closing((uv_handle_t *)req)) {
if (task->ioreq) { /* Invalidate pending IO request. */
task->ioreq->data = NULL;
}
qr_task_step(task, NULL);
}
}
static int qr_task_on_send(struct qr_task *task, int status)
static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
{
if (task) {
/* Start reading answer */
if (task->req.overlay.state != KNOT_STATE_NOOP) {
if (status == 0 && task->next_handle) {
io_start_read(task->next_handle);
}
} else { /* Finalize task */
uv_timer_stop(&task->timeout);
uv_close((uv_handle_t *)&task->timeout, qr_task_free);
if (task->req.overlay.state != KNOT_STATE_NOOP) {
if (status == 0 && handle) {
io_start_read(handle); /* Start reading answer */
}
} else { /* Finalize task */
uv_timer_stop(&task->timeout);
uv_close((uv_handle_t *)&task->timeout, qr_task_free);
}
return status;
}
static void on_close(uv_handle_t *handle)
{
struct qr_task *task = handle->data;
if (task) {
ioreq_release(task->worker, (struct ioreq *)handle);
} else free(handle);
struct worker_ctx *worker = get_worker();
ioreq_release(worker, (struct ioreq *)handle);
}
static void on_send(uv_udp_send_t *req, int status)
{
struct worker_ctx *worker = get_worker();
struct qr_task *task = req->data;
if (task) {
qr_task_on_send(task, status);
ioreq_release(task->worker, (struct ioreq *)req);
} else free(req);
qr_task_on_send(task, (uv_handle_t *)req->handle, status);
task->ioreq = NULL;
}
ioreq_release(worker, (struct ioreq *)req);
}
static void on_write(uv_write_t *req, int status)
{
struct worker_ctx *worker = get_worker();
struct qr_task *task = req->data;
if (task) {
qr_task_on_send(task, status);
ioreq_release(task->worker, (struct ioreq *)req);
} else free(req);
qr_task_on_send(task, (uv_handle_t *)req->handle, status);
task->ioreq = NULL;
}
ioreq_release(worker, (struct ioreq *)req);
}
static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt)
{
int ret = 0;
if (!handle) {
return qr_task_on_send(task, kr_error(EIO));
return qr_task_on_send(task, handle, kr_error(EIO));
}
struct ioreq *req = ioreq_take(task->worker);
if (!req) {
return qr_task_on_send(task, kr_error(ENOMEM));
struct ioreq *send_req = ioreq_take(task->worker);
if (!send_req) {
return qr_task_on_send(task, handle, kr_error(ENOMEM));
}
/* Send using given protocol */
if (handle->type == UV_UDP) {
uv_buf_t buf = { (char *)pkt->wire, pkt->size };
req->as.send.data = task;
ret = uv_udp_send(&req->as.send, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
send_req->as.send.data = task;
ret = uv_udp_send(&send_req->as.send, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
} else {
uint16_t pkt_size = htons(pkt->size);
uv_buf_t buf[2] = {
{ (char *)&pkt_size, sizeof(pkt_size) },
{ (char *)pkt->wire, pkt->size }
};
req->as.write.data = task;
ret = uv_write(&req->as.write, (uv_stream_t *)handle, buf, 2, &on_write);
send_req->as.write.data = task;
ret = uv_write(&send_req->as.write, (uv_stream_t *)handle, buf, 2, &on_write);
}
if (ret == 0) {
task->ioreq = (uv_req_t *)send_req;
} else {
ioreq_release(task->worker, send_req);
}
/* Update statistics */
if (handle != task->source.handle && addr) {
if (handle->type == UV_UDP)
task->worker->stats.udp += 1;
else task->worker->stats.tcp += 1;
else
task->worker->stats.tcp += 1;
if (addr->sa_family == AF_INET6)
task->worker->stats.ipv6 += 1;
else task->worker->stats.ipv4 += 1;
}
if (ret != 0) {
ioreq_release(task->worker, req);
else
task->worker->stats.ipv4 += 1;
}
return ret;
}
static void on_connect(uv_connect_t *req, int status)
{
struct worker_ctx *worker = get_worker();
struct qr_task *task = req->data;
if (status == 0) {
/* Retrieve endpoint IP for statistics */
struct sockaddr_in6 addr;
int addrlen = sizeof(addr);
uv_tcp_getpeername((uv_tcp_t *)req->handle, (struct sockaddr *)&addr, &addrlen);
qr_task_send(task, (uv_handle_t *)req->handle, (struct sockaddr *)&addr, task->next_query);
ioreq_release(task->worker, (struct ioreq *)req);
} else { /* Must not recycle, as 'task' may be freed. */
free(req);
if (task) {
task->ioreq = NULL;
if (status == 0) {
struct sockaddr_in6 addr;
int addrlen = sizeof(addr); /* Retrieve endpoint IP for statistics */
uv_stream_t *handle = req->handle;
uv_tcp_getpeername((uv_tcp_t *)handle, (struct sockaddr *)&addr, &addrlen);
qr_task_send(task, (uv_handle_t *)handle, (struct sockaddr *)&addr, task->pktbuf);
} else {
qr_task_step(task, NULL);
}
}
ioreq_release(worker, (struct ioreq *)req);
}
static int qr_task_finalize(struct qr_task *task, int state)
......@@ -302,23 +305,21 @@ static int qr_task_finalize(struct qr_task *task, int state)
static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
{
/* Cancel timeout if active, close handle. */
if (task->next_handle) {
if (!uv_is_closing(task->next_handle)) {
io_stop_read(task->next_handle);
uv_close(task->next_handle, on_close);
}
uv_timer_stop(&task->timeout);
task->next_handle = NULL;
/* Close subrequest handle. */
uv_timer_stop(&task->timeout);
if (task->iohandle && !uv_is_closing(task->iohandle)) {
io_stop_read(task->iohandle);
uv_close(task->iohandle, on_close);
task->iohandle = NULL;
}
/* Consume input and produce next query */
int sock_type = -1;
struct sockaddr *addr = NULL;
knot_pkt_t *next_query = task->next_query;
knot_pkt_t *pktbuf = task->pktbuf;
int state = kr_resolve_consume(&task->req, packet);
while (state == KNOT_STATE_PRODUCE) {
state = kr_resolve_produce(&task->req, &addr, &sock_type, next_query);
state = kr_resolve_produce(&task->req, &addr, &sock_type, pktbuf);
if (unlikely(++task->iter_count > KR_ITER_LIMIT)) {
return qr_task_finalize(task, KNOT_STATE_FAIL);
}
......@@ -332,25 +333,27 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
}
/* Create connection for iterative query */
task->next_handle = (uv_handle_t *)ioreq_take(task->worker);
if (!task->next_handle) {
uv_handle_t *subreq = (uv_handle_t *)ioreq_take(task->worker);
if (!subreq) {
return qr_task_finalize(task, KNOT_STATE_FAIL);
}
io_create(task->worker->loop, subreq, sock_type);
subreq->data = task;
/* Connect or issue query datagram */
io_create(task->worker->loop, task->next_handle, sock_type);
task->next_handle->data = task;
task->iohandle = subreq;
if (sock_type == SOCK_DGRAM) {
if (qr_task_send(task, task->next_handle, addr, next_query) != 0) {
if (qr_task_send(task, subreq, addr, pktbuf) != 0) {
return qr_task_step(task, NULL);
}
} else {
struct ioreq *req = ioreq_take(task->worker);
if (!req || uv_tcp_connect(&req->as.connect, (uv_tcp_t *)task->next_handle, addr, on_connect) != 0) {
ioreq_release(task->worker, req);
struct ioreq *conn_req = ioreq_take(task->worker);
conn_req->as.connect.data = task;
if (!conn_req || uv_tcp_connect(&conn_req->as.connect, (uv_tcp_t *)subreq, addr, on_connect) != 0) {
ioreq_release(task->worker, conn_req);
return qr_task_step(task, NULL);
}
req->as.connect.data = task;
task->ioreq = (uv_req_t *)conn_req;
}
/* Start next step with timeout */
......@@ -358,6 +361,22 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
return kr_ok();
}
static int parse_query(knot_pkt_t *query)
{
/* Parse query packet. */
int ret = knot_pkt_parse(query, 0);
if (ret != KNOT_EOK) {
return kr_error(EPROTO); /* Ignore malformed query. */
}
/* Check if at least header is parsed. */
if (query->parsed < query->size) {
return kr_error(EMSGSIZE);
}
return kr_ok();
}
int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr)
{
if (!worker) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment