Commit 43b28017 authored by Marek Vavrusa's avatar Marek Vavrusa

Cleanup and improvements in xfer/udp handling code.

parent 6692eb13
......@@ -50,10 +50,9 @@ typedef unsigned int uint; /*!< \brief Unsigned. */
#define CPU_ESTIMATE_MAGIC 0 /*!< \brief Extra threads to the number of cores.*/
#define DEFAULT_THR_COUNT 2 /*!< \brief Default thread count. */
#define DEFAULT_PORT 53531 /*!< \brief Default interface port. */
#define TCP_BACKLOG_SIZE 5 /*!< \brief TCP listen backlog size. */
#define TCP_BACKLOG_SIZE 10 /*!< \brief TCP listen backlog size. */
#define XFR_THREADS_COUNT 3 /*!< \brief Number of threads for XFR handler. */
#define RECVMMSG_BATCHLEN 16 /*!< \brief Define for recvmmsg() batch size. */
#define RECVMMSG_BATCHLEN 10 /*!< \brief Define for recvmmsg() batch size. */
///*! \brief If defined, zone structures will use hash table for lookup. */
//#define COMPRESSION_PEDANTIC
......
......@@ -834,11 +834,6 @@ int dt_optimal_size()
return DEFAULT_THR_COUNT;
}
/*!
* \note Use memory barriers or asynchronous read-only access, locking
* poses a thread performance decrease by 1.31%.
*/
int dt_is_cancelled(dthread_t *thread)
{
// Check input
......@@ -846,10 +841,7 @@ int dt_is_cancelled(dthread_t *thread)
return 0;
}
lock_thread_rw(thread);
int ret = thread->state & ThreadCancelled;
unlock_thread_rw(thread);
return ret;
return thread->state & ThreadCancelled; /* No need to be locked. */
}
unsigned dt_get_id(dthread_t *thread)
......
......@@ -127,6 +127,16 @@ static int server_init_iface(iface_t *new_if, conf_iface_t *cfg_if)
sock = ret;
}
/* Set socket options. */
#ifndef DISABLE_IPV6
int flag = 1;
if (cfg_if->family == AF_INET6) {
/* Disable dual-stack for performance reasons. */
if(setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &flag, sizeof(flag)) < 0) {
dbg_net("udp: failed to set IPV6_V6ONLY to socket, using default config\n");
}
}
#endif
ret = socket_bind(sock, cfg_if->family, cfg_if->address, cfg_if->port);
if (ret < 0) {
socket_close(sock);
......@@ -141,21 +151,6 @@ static int server_init_iface(iface_t *new_if, conf_iface_t *cfg_if)
new_if->port = cfg_if->port;
new_if->addr = strdup(cfg_if->address);
/* Set socket options - voluntary. */
// int opt = 1024 * 1024;
// int snd_opt = 1024 * 1024;
// if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &snd_opt, sizeof(snd_opt)) < 0) {
// ret = KNOT_ENOTSUP;
//// strerror_r(errno, ebuf, sizeof(ebuf));
//// log_server_warning("Failed to configure socket "
//// "write buffers: %s.\n", ebuf);
// }
// if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &opt, sizeof(opt)) < 0) {
// ret = KNOT_ENOTSUP;
//// strerror_r(errno, ebuf, sizeof(ebuf));
//// log_server_warning("Failed to configure socket read buffers: %s.\n", ebuf);
// }
/* Create TCP socket. */
ret = socket_create(cfg_if->family, SOCK_STREAM);
if (ret < 0) {
......@@ -170,11 +165,9 @@ static int server_init_iface(iface_t *new_if, conf_iface_t *cfg_if)
/* Set socket options. */
#ifndef DISABLE_IPV6
int flag = 1;
if (cfg_if->family == AF_INET6) {
/* Disable dual-stack for performance reasons. */
if(setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &flag, sizeof(flag)) < 0) {
dbg_net("udp: failed to set IPV6_V6ONLY to socket, using default config\n");
dbg_net("tcp: failed to set IPV6_V6ONLY to socket, using default config\n");
}
}
#endif
......
......@@ -86,51 +86,25 @@ int udp_handle(int fd, uint8_t *qbuf, size_t qbuflen, size_t *resp_len,
strfrom, sockaddr_portnum(addr));
#endif
knot_packet_type_t qtype = KNOT_QUERY_NORMAL;
int res = KNOT_EOK;
int rcode = KNOT_RCODE_NOERROR;
knot_packet_type_t qtype = KNOT_QUERY_INVALID;
*resp_len = SOCKET_MTU_SZ;
knot_packet_t *packet =
knot_packet_new(KNOT_PACKET_PREALLOC_QUERY);
knot_packet_t *packet = knot_packet_new(KNOT_PACKET_PREALLOC_QUERY);
if (packet == NULL) {
dbg_net("udp: failed to create packet on fd=%d\n", fd);
int ret = knot_ns_error_response_from_query_wire(ns, qbuf, qbuflen,
KNOT_RCODE_SERVFAIL,
qbuf, resp_len);
if (ret != KNOT_EOK) {
return KNOT_EMALF;
}
return KNOT_EOK; /* Created error response. */
return ret;
}
/* Prepare RRL structs. */
rrl_req_t rrl_rq;
memset(&rrl_rq, 0, sizeof(rrl_req_t));
rrl_rq.w = qbuf; /* Wire */
/* Parse query. */
int res = knot_ns_parse_packet(qbuf, qbuflen, packet, &qtype);
if (rrl) rrl_rq.qst = &packet->question;
if (knot_unlikely(res != KNOT_EOK)) {
rcode = knot_ns_parse_packet(qbuf, qbuflen, packet, &qtype);
if (rcode < KNOT_RCODE_NOERROR) {
dbg_net("udp: failed to parse packet on fd=%d\n", fd);
if (res > 0) { /* Returned RCODE */
res = knot_ns_error_response_from_query(
ns, packet, res, qbuf, resp_len);
if (res != KNOT_EOK) {
knot_packet_free(&packet);
return KNOT_EMALF;
}
} else {
res = knot_ns_error_response_from_query_wire(
ns, qbuf, qbuflen, KNOT_RCODE_SERVFAIL, qbuf,
resp_len);
if (res != KNOT_EOK) {
knot_packet_free(&packet);
return res;
}
}
rcode = KNOT_RCODE_SERVFAIL;
}
/* Handle query. */
......@@ -143,10 +117,9 @@ int udp_handle(int fd, uint8_t *qbuf, size_t qbuflen, size_t *resp_len,
/* RFC1034, p.28 requires reliable transfer protocol.
* Bind responds with FORMERR.
*/
knot_ns_error_response_from_query(ns, packet,
KNOT_RCODE_FORMERR, qbuf,
resp_len);
res = KNOT_EOK;
res = knot_ns_error_response_from_query(ns, packet,
KNOT_RCODE_FORMERR, qbuf,
resp_len);
break;
case KNOT_QUERY_IXFR:
/* According to RFC1035, respond with SOA. */
......@@ -167,23 +140,26 @@ int udp_handle(int fd, uint8_t *qbuf, size_t qbuflen, size_t *resp_len,
/* Unhandled opcodes. */
case KNOT_RESPONSE_AXFR: /*!< Processed in XFR handler. */
case KNOT_RESPONSE_IXFR: /*!< Processed in XFR handler. */
knot_ns_error_response_from_query(ns, packet,
KNOT_RCODE_REFUSED, qbuf,
resp_len);
res = KNOT_EOK;
res = knot_ns_error_response_from_query(ns, packet,
KNOT_RCODE_REFUSED, qbuf,
resp_len);
break;
/* Unknown opcodes */
default:
knot_ns_error_response_from_query(ns, packet,
KNOT_RCODE_FORMERR, qbuf,
resp_len);
res = KNOT_EOK;
res = knot_ns_error_response_from_query(ns, packet,
rcode, qbuf,
resp_len);
break;
}
/* Process RRL. */
if (rrl) {
if (knot_unlikely(rrl != NULL)) {
rrl_req_t rrl_rq;
memset(&rrl_rq, 0, sizeof(rrl_req_t));
rrl_rq.w = qbuf; /* Wire */
rrl_rq.qst = &packet->question;
rcu_read_lock();
rrl_rq.flags = packet->flags;
if (rrl_query(rrl, addr, &rrl_rq, packet->zone) != KNOT_EOK) {
......@@ -539,7 +515,7 @@ int udp_master(dthread_t *thread)
}
fdset_begin(fds, &it);
while(nfds > 0) {
for (;;) {
_udp_handle(server, it.fd, rqdata);
if (fdset_next(fds, &it) != 0) {
break;
......
......@@ -974,7 +974,7 @@ int xfr_worker(dthread_t *thread)
}
/* Check pending threads. */
if (w->pending == 0) {
if (dt_is_cancelled(thread) || w->pending == 0) {
break;
}
......@@ -985,11 +985,6 @@ int xfr_worker(dthread_t *thread)
break;
}
/* Check for cancellation. */
if (dt_is_cancelled(thread)) {
break;
}
/* Iterate fdset. */
fdset_it_t it;
fdset_begin(w->pool.fds, &it);
......
......@@ -48,6 +48,7 @@ typedef enum knot_opcode {
* OPCODE and the QTYPE.
*/
typedef enum knot_packet_type {
KNOT_QUERY_INVALID, /*!< Invalid query. */
KNOT_QUERY_NORMAL, /*!< Normal query. */
KNOT_QUERY_AXFR, /*!< Request for AXFR transfer. */
KNOT_QUERY_IXFR, /*!< Request for IXFR transfer. */
......
......@@ -3130,6 +3130,7 @@ int knot_ns_parse_packet(const uint8_t *query_wire, size_t qsize,
dbg_ns_verb("Parsing packet...\n");
int ret = 0;
*type = KNOT_QUERY_INVALID;
if ((ret = knot_packet_parse_from_wire(packet, query_wire,
qsize, 1, 0)) != 0) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment