Commit 3701fe8a authored by Grigorii Demidov's avatar Grigorii Demidov Committed by Petr Spacek

daemon: avoid uv_try_write() usage both in tls-client and tls-server side; bugfixes

parent c918612b
......@@ -298,6 +298,7 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls)
session->has_tls = tls;
if (tls && !session->tls_ctx) {
session->tls_ctx = tls_new(master->loop->data);
session->tls_ctx->session = session;
}
uv_timer_t *timer = &session->timeout;
uv_timer_start(timer, tcp_timeout_trigger, KR_CONN_RTT_MAX/2, KR_CONN_RTT_MAX/2);
......
This diff is collapsed.
......@@ -49,10 +49,10 @@ struct qr_task;
/* gnutls_record_recv and gnutls_record_send */
struct tls_ctx_t {
gnutls_session_t session;
gnutls_session_t tls_session;
bool handshake_done;
uv_stream_t *handle;
struct session *session;
/* for reading from the network */
const uint8_t *buf;
......@@ -68,19 +68,39 @@ typedef enum tls_client_hs_state {
TLS_HS_NOT_STARTED = 0,
TLS_HS_IN_PROGRESS,
TLS_HS_DONE,
TLS_HS_CLOSING,
TLS_HS_LAST
} tls_client_hs_state_t;
typedef int (*tls_handshake_cb) (struct session *session, int status);
struct tls_client_ctx_t {
gnutls_session_t tls_session;
tls_client_hs_state_t handshake_state;
struct session *session;
tls_handshake_cb handshake_cb;
const uint8_t *buf;
ssize_t nread;
ssize_t consumed;
uint8_t recv_buf[4096];
const struct tls_client_paramlist_entry *params;
struct worker_ctx *worker;
struct qr_task *task;
};
/*! Create an empty TLS context in query context */
struct tls_ctx_t* tls_new(struct worker_ctx *worker);
/*! Close a TLS context */
/*! Close a TLS context (call gnutls_bye()) */
void tls_close(struct tls_ctx_t *tls);
/*! Release a TLS context */
void tls_free(struct tls_ctx_t* tls);
/*! Push new data to TLS context for sending */
int tls_push(struct qr_task *task, uv_handle_t* handle, knot_pkt_t * pkt);
int tls_push(struct qr_task *task, uv_handle_t* handle, knot_pkt_t * pkt,
bool server_side);
/*! Unwrap incoming data from a TLS stream and pass them to TCP session.
* @return the number of newly-completed requests (>=0) or an error code
......@@ -115,7 +135,8 @@ int tls_client_params_set(map_t *tls_client_paramlist,
int tls_client_params_free(map_t *tls_client_paramlist);
/*! Allocate new client TLS context */
struct tls_client_ctx_t *tls_client_ctx_new(const struct tls_client_paramlist_entry *entry);
struct tls_client_ctx_t *tls_client_ctx_new(const struct tls_client_paramlist_entry *entry,
struct worker_ctx *worker);
int tls_client_process(struct worker_ctx *worker, uv_stream_t *handle,
const uint8_t *buf, ssize_t nread);
......
......@@ -369,6 +369,10 @@ static void session_close(struct session *session)
if (session->tls_client_ctx) {
tls_client_close(session->tls_client_ctx);
}
if (session->tls_ctx) {
tls_close(session->tls_ctx);
}
session->timeout.data = session;
uv_close((uv_handle_t *)&session->timeout, on_session_timer_close);
}
......@@ -870,7 +874,7 @@ static void on_send(uv_udp_send_t *req, int status)
iorequest_release(worker, req);
}
void on_write(uv_write_t *req, int status)
static void on_task_write(uv_write_t *req, int status)
{
uv_handle_t *handle = (uv_handle_t *)(req->handle);
uv_loop_t *loop = handle->loop;
......@@ -882,10 +886,21 @@ void on_write(uv_write_t *req, int status)
iorequest_release(worker, req);
}
static void on_nontask_write(uv_write_t *req, int status)
{
uv_handle_t *handle = (uv_handle_t *)(req->handle);
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
assert(worker == get_worker());
iorequest_release(worker, req);
}
ssize_t worker_gnutls_push(gnutls_transport_ptr_t h, const void *buf, size_t len)
{
struct tls_ctx_t *t = (struct tls_ctx_t *)h;
const uv_buf_t ub = {(void *)buf, len};
const uv_buf_t uv_buf[1] = {
{ (char *)buf, len }
};
VERBOSE_MSG(NULL,"[tls] push %zu <%p>\n", len, h);
if (t == NULL) {
......@@ -893,27 +908,69 @@ ssize_t worker_gnutls_push(gnutls_transport_ptr_t h, const void *buf, size_t len
return -1;
}
assert(t->handle);
assert(t->handle->type == UV_TCP);
assert(t->session && t->session->handle &&
t->session->handle->type == UV_TCP);
struct worker_ctx *worker = t->worker;
assert(worker);
if (!t->handshake_done) {
int ret = uv_try_write(t->handle, &ub, 1);
if (ret > 0) {
return (ssize_t) ret;
void *ioreq = worker_iohandle_borrow(worker);
if (!ioreq) {
errno = EFAULT;
return -1;
}
uv_write_t *write_req = (uv_write_t *)ioreq;
struct qr_task *task = t->task;
uv_write_cb write_cb = on_task_write;
if (t->handshake_done) {
assert(task);
} else {
task = NULL;
write_cb = on_nontask_write;
}
write_req->data = task;
ssize_t ret = -1;
int res = uv_write(write_req, (uv_stream_t *)t->session->handle, uv_buf, 1, write_cb);
if (res == 0) {
if (task) {
qr_task_ref(task); /* Pending ioreq on current task */
}
if (ret == UV_EAGAIN) {
errno = EAGAIN;
} else {
kr_log_error("[tls] uv_try_write: %s\n", uv_strerror(ret));
errno = EIO;
if (worker->too_many_open &&
worker->stats.rconcurrent <
worker->rconcurrent_highwatermark - 10) {
worker->too_many_open = false;
}
ret = len;
} else {
VERBOSE_MSG(NULL,"[tls] uv_write: %s\n", uv_strerror(res));
iorequest_release(worker, ioreq);
errno = EIO;
/* TODO ret == UV_EMFILE */
}
return ret;
}
ssize_t worker_gnutls_client_push(gnutls_transport_ptr_t h, const void *buf, size_t len)
{
struct tls_client_ctx_t *t = (struct tls_client_ctx_t *)h;
const uv_buf_t uv_buf[1] = {
{ (char *)buf, len }
};
VERBOSE_MSG(NULL,"[tls client] push %zu <%p>\n", len, h);
if (t == NULL) {
errno = EFAULT;
return -1;
}
assert(t->session && t->session->handle &&
t->session->handle->type == UV_TCP);
struct worker_ctx *worker = t->worker;
struct qr_task *task = t->task;
assert(worker && task);
assert(worker);
void *ioreq = worker_iohandle_borrow(worker);
if (!ioreq) {
......@@ -922,16 +979,24 @@ ssize_t worker_gnutls_push(gnutls_transport_ptr_t h, const void *buf, size_t len
}
uv_write_t *write_req = (uv_write_t *)ioreq;
uv_buf_t uv_buf[1] = {
{ (char *)buf, len }
};
struct qr_task *task = t->task;
uv_write_cb write_cb = on_task_write;
if (t->handshake_state == TLS_HS_DONE) {
assert(task);
} else {
task = NULL;
write_cb = on_nontask_write;
}
write_req->data = task;
ssize_t ret = -1;
int res = uv_write(write_req, t->handle, uv_buf, 1, &on_write);
int res = uv_write(write_req, (uv_stream_t *)t->session->handle, uv_buf, 1, write_cb);
if (res == 0) {
qr_task_ref(task); /* Pending ioreq on current task */
if (task) {
qr_task_ref(task); /* Pending ioreq on current task */
}
if (worker->too_many_open &&
worker->stats.rconcurrent <
worker->rconcurrent_highwatermark - 10) {
......@@ -939,7 +1004,7 @@ ssize_t worker_gnutls_push(gnutls_transport_ptr_t h, const void *buf, size_t len
}
ret = len;
} else {
VERBOSE_MSG(NULL,"[tls] uv_write: %s\n", uv_strerror(res));
VERBOSE_MSG(NULL,"[tls_client] uv_write: %s\n", uv_strerror(res));
iorequest_release(worker, ioreq);
errno = EIO;
/* TODO ret == UV_EMFILE */
......@@ -958,18 +1023,14 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockad
assert(session->closing == false);
if (session->has_tls) {
struct kr_request *req = &task->ctx->req;
int ret = kr_ok();
if (!session->outgoing) {
ret = tls_push(task, handle, pkt);
} else {
ret = kr_resolve_checkout(req, NULL, addr,
SOCK_STREAM, pkt);
if (session->outgoing) {
int ret = kr_resolve_checkout(req, NULL, addr,
SOCK_STREAM, pkt);
if (ret != kr_ok()) {
return ret;
}
ret = tls_client_push(task, handle, pkt);
}
return ret;
return tls_push(task, handle, pkt, !session->outgoing);
}
int ret = 0;
......@@ -1015,7 +1076,7 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockad
{ (char *)pkt->wire, pkt->size }
};
write_req->data = task;
ret = uv_write(write_req, (uv_stream_t *)handle, buf, 2, &on_write);
ret = uv_write(write_req, (uv_stream_t *)handle, buf, 2, &on_task_write);
} else {
assert(false);
}
......@@ -1698,7 +1759,7 @@ static int qr_task_step(struct qr_task *task,
struct tls_client_paramlist_entry *entry = map_get(&net->tls_client_params, key);
if (entry) {
assert(session->tls_client_ctx == NULL);
struct tls_client_ctx_t *tls_ctx = tls_client_ctx_new(entry);
struct tls_client_ctx_t *tls_ctx = tls_client_ctx_new(entry, worker);
if (!tls_ctx) {
session_del_tasks(session, task);
session_del_waiting(session, task);
......
......@@ -92,6 +92,8 @@ void worker_iohandle_release(struct worker_ctx *worker, void *h);
ssize_t worker_gnutls_push(gnutls_transport_ptr_t h, const void *buf, size_t len);
ssize_t worker_gnutls_client_push(gnutls_transport_ptr_t h, const void *buf, size_t len);
/** @cond internal */
/** Number of request within timeout window. */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment