Commit 9773a383 authored by Marek Vavrusa's avatar Marek Vavrusa

New packet API now in both UDP and TCP handlers.

(just for testing, will be refactored)
parent cba78c8b
......@@ -34,6 +34,7 @@
#include "common/sockaddr.h"
#include "common/fdset.h"
#include "common/mempool.h"
#include "knot/knot.h"
#include "knot/server/tcp-handler.h"
#include "knot/server/xfr-handler.h"
......@@ -41,6 +42,7 @@
#include "libknot/nameserver/name-server.h"
#include "libknot/packet/wire.h"
#include "libknot/dnssec/cleanup.h"
#include "libknot/nameserver/ns_proc_query.h"
/*! \brief TCP worker data. */
typedef struct tcp_worker_t {
......@@ -68,27 +70,6 @@ static inline int tcp_throttle() {
return (rand() % TCP_THROTTLE_HI) + TCP_THROTTLE_LO;
}
/*! \brief Send reply. */
static int tcp_reply(int fd, uint8_t *qbuf, size_t resp_len)
{
dbg_net("tcp: got answer of size %zd.\n",
resp_len);
int res = 0;
if (resp_len > 0) {
res = tcp_send(fd, qbuf, resp_len);
}
/* Check result. */
if (res < 0 || (size_t)res != resp_len) {
dbg_net("tcp: %s: failed: %d - %d.\n",
"socket_send()",
res, errno);
}
return res;
}
/*! \brief Sweep TCP connection. */
static enum fdset_sweep_state tcp_sweep(fdset_t *set, int i, void *data)
{
......@@ -139,27 +120,18 @@ static enum fdset_sweep_state tcp_sweep(fdset_t *set, int i, void *data)
* and ensure that in case of good packet the response
* is proper.
*/
static int tcp_handle(tcp_worker_t *w, int fd, uint8_t *buf[], size_t qbuf_maxlen)
static int tcp_handle(ns_proc_context_t *query_ctx, int fd,
struct iovec *rx, struct iovec *tx)
{
if (fd < 0 || !w || !w->ioh) {
dbg_net("tcp: tcp_handle(%p, %d) - invalid parameters\n", w, fd);
return KNOT_EINVAL;
}
dbg_net("tcp: handling TCP event on fd=%d in thread %p.\n",
fd, (void*)pthread_self());
knot_nameserver_t *ns = w->ioh->server->nameserver;
/* Check address type. */
sockaddr_t addr;
sockaddr_prep(&addr);
/* Receive data. */
int n = tcp_recv(fd, buf[QBUF], qbuf_maxlen, &addr);
if (n <= 0) {
int ret = tcp_recv(fd, rx->iov_base, rx->iov_len, &addr);
if (ret <= 0) {
dbg_net("tcp: client on fd=%d disconnected\n", fd);
if (n == KNOT_EAGAIN) {
if (ret == KNOT_EAGAIN) {
char r_addr[SOCKADDR_STRLEN];
sockaddr_tostr(&addr, r_addr, sizeof(r_addr));
int r_port = sockaddr_portnum(&addr);
......@@ -170,120 +142,32 @@ static int tcp_handle(tcp_worker_t *w, int fd, uint8_t *buf[], size_t qbuf_maxle
rcu_read_unlock();
}
return KNOT_ECONNREFUSED;
} else {
rx->iov_len = ret;
}
/* Parse query. */
size_t resp_len = qbuf_maxlen; // 64K
knot_packet_type_t qtype = KNOT_QUERY_NORMAL;
knot_pkt_t *packet = knot_pkt_new(buf[QBUF], n, NULL);
if (packet == NULL) {
int ret = knot_ns_error_response_from_query_wire(ns, buf[QBUF], n,
KNOT_RCODE_SERVFAIL,
buf[QRBUF], &resp_len);
if (ret == KNOT_EOK) {
tcp_reply(fd, buf[QRBUF], resp_len);
}
/* Reset context. */
uint16_t tx_len = tx->iov_len;
ns_proc_reset(query_ctx);
return KNOT_EOK;
}
/* Input packet. */
int state = ns_proc_in(rx->iov_base, rx->iov_len, query_ctx);
int parse_res = knot_ns_parse_packet(packet, &qtype);
if (knot_unlikely(parse_res != KNOT_EOK)) {
if (parse_res > 0) { /* Returned RCODE */
int ret = knot_ns_error_response_from_query(ns, packet,
parse_res, buf[QRBUF], &resp_len);
/* Resolve until NOOP or finished. */
while (state == NS_PROC_FULL || state == NS_PROC_FAIL) {
state = ns_proc_out(tx->iov_base, &tx_len, query_ctx);
if (ret == KNOT_EOK) {
tcp_reply(fd, buf[QRBUF], resp_len);
/* If it has response, send it. */
if (state == NS_PROC_FINISH || state == NS_PROC_FULL) {
if (tcp_send(fd, tx->iov_base, tx_len) != tx_len) {
ret = KNOT_ECONN;
break;
}
tx_len = tx->iov_len; /* Reset size. */
}
knot_pkt_free(&packet);
return KNOT_EOK;
}
/* Handle query. */
int xfrt = -1;
knot_ns_xfr_t *xfr = NULL;
int res = KNOT_ERROR;
switch(qtype) {
/* Query types. */
case KNOT_QUERY_NORMAL:
//res = knot_ns_answer_normal(ns, packet, qbuf, &resp_len);
if (zones_normal_query_answer(ns, packet, &addr,
buf[QRBUF], &resp_len,
NS_TRANSPORT_TCP) == KNOT_EOK) {
res = KNOT_EOK;
}
break;
case KNOT_QUERY_AXFR:
case KNOT_QUERY_IXFR:
if (qtype == KNOT_QUERY_IXFR) {
xfrt = XFR_TYPE_IOUT;
} else {
xfrt = XFR_TYPE_AOUT;
}
/* Answer from query. */
xfr = xfr_task_create(NULL, xfrt, XFR_FLAG_TCP);
if (xfr == NULL) {
knot_ns_error_response_from_query(ns, packet,
KNOT_RCODE_SERVFAIL,
buf[QRBUF], &resp_len);
res = KNOT_EOK;
break;
}
xfr->session = fd;
xfr->wire = buf[QRBUF];
xfr->wire_size = qbuf_maxlen;
xfr->query = packet;
xfr_task_setaddr(xfr, &addr, NULL);
res = xfr_answer(ns, xfr);
knot_pkt_free(&packet);
return res;
case KNOT_QUERY_UPDATE:
res = zones_process_update(ns, packet, &addr, buf[QRBUF], &resp_len,
fd, NS_TRANSPORT_TCP);
break;
case KNOT_QUERY_NOTIFY:
res = notify_process_request(ns, packet, &addr,
buf[QRBUF], &resp_len);
break;
/* Unhandled opcodes. */
case KNOT_RESPONSE_NOTIFY: /*!< Only in UDP. */
case KNOT_RESPONSE_NORMAL: /*!< TCP handler doesn't send queries. */
case KNOT_RESPONSE_AXFR: /*!< Processed in XFR handler. */
case KNOT_RESPONSE_IXFR: /*!< Processed in XFR handler. */
knot_ns_error_response_from_query(ns, packet,
KNOT_RCODE_REFUSED,
buf[QRBUF], &resp_len);
res = KNOT_EOK;
break;
/* Unknown opcodes. */
default:
knot_ns_error_response_from_query(ns, packet,
KNOT_RCODE_FORMERR,
buf[QRBUF], &resp_len);
res = KNOT_EOK;
break;
}
/* Send answer. */
if (res == KNOT_EOK) {
tcp_reply(fd, buf[QRBUF], resp_len);
} else {
dbg_net("tcp: failed to respond to query type=%d on fd=%d - %s\n",
qtype, fd, knot_strerror(res));;
}
knot_pkt_free(&packet);
return res;
return ret;
}
int tcp_accept(int fd)
......@@ -578,19 +462,32 @@ int tcp_loop_worker(dthread_t *thread)
}
#endif /* HAVE_CAP_NG_H */
uint8_t *buf[NBUFS];
for (unsigned i = 0; i < NBUFS; ++i) {
buf[i] = malloc(SOCKET_MTU_SZ);
}
/* Create TCP answering context. */
tcp_worker_t *w = thread->data;
if (w == NULL || buf[QBUF] == NULL || buf[QRBUF] == NULL) {
for (unsigned i = 0; i < NBUFS; ++i) {
free(buf[i]);
int ret = KNOT_EOK;
ns_proc_context_t query_ctx;
memset(&query_ctx, 0, sizeof(query_ctx));
query_ctx.ns = w->ioh->server->nameserver;
mm_ctx_mempool(&query_ctx.mm, 2 * sizeof(knot_pkt_t));
/* Packet size not limited by EDNS. */
query_ctx.flags |= NS_PKTSIZE_NOLIMIT;
/* Create iovec abstraction. */
mm_ctx_t *mm = &query_ctx.mm;
struct iovec bufs[2];
for (unsigned i = 0; i < 2; ++i) {
bufs[i].iov_len = KNOT_WIRE_MAX_PKTSIZE;
bufs[i].iov_base = mm->alloc(mm->ctx, bufs[i].iov_len);
if (bufs[i].iov_base == NULL) {
ret = KNOT_ENOMEM;
goto finish;
}
return KNOT_EINVAL;
}
/* Create query processing context. */
ns_proc_begin(&query_ctx, NS_PROC_QUERY);
/* Accept clients. */
dbg_net("tcp: worker %p started\n", w);
fdset_t *set = &w->set;
......@@ -637,7 +534,7 @@ int tcp_loop_worker(dthread_t *thread)
if (fd == w->pipe[0]) {
tcp_loop_assign(fd, set);
} else {
int ret = tcp_handle(w, fd, buf, SOCKET_MTU_SZ);
int ret = tcp_handle(&query_ctx, fd, &bufs[0], &bufs[1]);
if (ret == KNOT_EOK) {
/* Update socket activity timer. */
fdset_set_watchdog(set, i, max_idle);
......@@ -664,12 +561,9 @@ int tcp_loop_worker(dthread_t *thread)
}
}
/* Stop whole unit. */
for (unsigned i = 0; i < NBUFS; ++i) {
free(buf[i]);
}
dbg_net("tcp: worker %p finished\n", w);
return KNOT_EOK;
finish:
mp_delete(mm->ctx);
return ret;
}
int tcp_handler_destruct(dthread_t *thread)
......
......@@ -105,15 +105,6 @@ static inline void udp_pps_begin() {}
static inline void udp_pps_sample(unsigned n, unsigned thr_id) {}
#endif
/* Answering context. */
struct answer_ctx
{
server_t *srv;
mm_ctx_t *mm;
unsigned slip;
};
/*! \brief RRL reject procedure. */
static size_t udp_rrl_reject(const knot_nameserver_t *ns,
const knot_pkt_t *packet,
......@@ -141,7 +132,7 @@ static size_t udp_rrl_reject(const knot_nameserver_t *ns,
return 0; /* Discard response. */
}
int udp_handle(struct answer_ctx *ans, int fd, sockaddr_t *addr,
int udp_handle(ns_proc_context_t *query_ctx, int fd, sockaddr_t *addr,
struct iovec *rx, struct iovec *tx)
{
#ifdef DEBUG_ENABLE_BRIEF
......@@ -152,138 +143,51 @@ int udp_handle(struct answer_ctx *ans, int fd, sockaddr_t *addr,
strfrom, sockaddr_portnum(addr));
#endif
#ifdef PACKET_NG
ns_proc_context_t query_ctx = {0};
memcpy(&query_ctx.mm, ans->mm, sizeof(mm_ctx_t));
query_ctx.ns = ans->srv->nameserver;
/* Reset context. */
uint16_t tx_len = tx->iov_len;
ns_proc_begin(&query_ctx, NS_PROC_QUERY);
int state = ns_proc_in(rx->iov_base, rx->iov_len, &query_ctx);
if (state == NS_PROC_FULL) {
state = ns_proc_out(tx->iov_base, &tx_len, &query_ctx);
}
if (state == NS_PROC_FAIL) {
state = ns_proc_out(tx->iov_base, &tx_len, &query_ctx);
}
if (state == NS_PROC_FINISH) {
tx->iov_len = tx_len;
}
/*! \todo IXFR query */
/*! \todo NOTIFY query */
/*! \todo UPDATE */
ns_proc_reset(query_ctx);
ns_proc_finish(&query_ctx);
return KNOT_EOK;
#else
int res = KNOT_EOK;
int rcode = KNOT_RCODE_NOERROR;
knot_nameserver_t *ns = ans->srv->nameserver;
rrl_table_t *rrl = ans->srv->rrl;
knot_packet_type_t qtype = KNOT_QUERY_INVALID;
/* The packet MUST contain at least DNS header.
* If it doesn't, it's not a DNS packet and we should discard it.
*/
if (rx->iov_len < KNOT_WIRE_HEADER_SIZE) {
return KNOT_EFEWDATA;
}
/* Input packet. */
int state = ns_proc_in(rx->iov_base, rx->iov_len, query_ctx);
#ifdef MIRROR_MODE
memcpy(tx->iov_base, rx->iov_base, rx->iov_len);
knot_wire_set_qr(tx->iov_base);
tx->iov_len = rx->iov_len;
return KNOT_EOK;
#endif
knot_pkt_t *query = knot_pkt_new(rx->iov_base, rx->iov_len, ans->mm);
if (query == NULL) {
dbg_net("udp: failed to create packet\n");
int ret = knot_ns_error_response_from_query_wire(ns, rx->iov_base, rx->iov_len,
KNOT_RCODE_SERVFAIL,
tx->iov_base, &tx->iov_len);
return ret;
/* Process answer. */
if (state == NS_PROC_FULL) {
state = ns_proc_out(tx->iov_base, &tx_len, query_ctx);
}
/* Parse query. */
rcode = knot_ns_parse_packet(query, &qtype);
if (rcode < KNOT_RCODE_NOERROR) {
dbg_net("udp: failed to parse packet\n");
rcode = KNOT_RCODE_SERVFAIL;
/* Process error response (if failed). */
if (state == NS_PROC_FAIL) {
state = ns_proc_out(tx->iov_base, &tx_len, query_ctx);
}
/* Handle query. */
switch(qtype) {
case KNOT_QUERY_NORMAL:
res = zones_normal_query_answer(ns, query, addr, tx->iov_base,
&tx->iov_len, NS_TRANSPORT_UDP);
break;
case KNOT_QUERY_AXFR:
/* RFC1034, p.28 requires reliable transfer protocol.
* Bind responds with FORMERR.
*/
res = knot_ns_error_response_from_query(ns, query,
KNOT_RCODE_FORMERR, tx->iov_base,
&tx->iov_len);
break;
case KNOT_QUERY_IXFR:
/* According to RFC1035, respond with SOA. */
res = zones_normal_query_answer(ns, query, addr,
tx->iov_base, &tx->iov_len,
NS_TRANSPORT_UDP);
break;
case KNOT_QUERY_NOTIFY:
res = notify_process_request(ns, query, addr,
tx->iov_base, &tx->iov_len);
break;
case KNOT_QUERY_UPDATE:
res = zones_process_update(ns, query, addr, tx->iov_base, &tx->iov_len,
fd, NS_TRANSPORT_UDP);
break;
/* Do not issue response to incoming response to avoid loops. */
case KNOT_RESPONSE_AXFR: /*!< Processed in XFR handler. */
case KNOT_RESPONSE_IXFR: /*!< Processed in XFR handler. */
case KNOT_RESPONSE_NORMAL:
case KNOT_RESPONSE_NOTIFY:
case KNOT_RESPONSE_UPDATE:
res = KNOT_EOK;
/* Send response only if finished successfuly. */
if (state == NS_PROC_FINISH) {
tx->iov_len = tx_len;
} else {
tx->iov_len = 0;
break;
/* Unknown opcodes */
default:
res = knot_ns_error_response_from_query(ns, query,
rcode, tx->iov_base,
&tx->iov_len);
break;
}
/* Process RRL. */
if (knot_unlikely(rrl != NULL) && rrl->rate > 0) {
rrl_req_t rrl_rq;
memset(&rrl_rq, 0, sizeof(rrl_req_t));
rrl_rq.w = tx->iov_base; /* Wire */
rrl_rq.query = query;
rcu_read_lock();
rrl_rq.flags = query->flags;
if (rrl_query(rrl, addr, &rrl_rq, query->zone) != KNOT_EOK) {
tx->iov_len = udp_rrl_reject(ns, query, tx->iov_base,
KNOT_WIRE_MAX_PKTSIZE,
knot_wire_get_rcode(query->wire),
&ans->slip);
}
rcu_read_unlock();
}
return KNOT_EOK;
knot_pkt_free(&query);
return res;
#endif /* PACKET_NG */
/*! \todo Move RRL to IN processing. */
// rrl_table_t *rrl = ans->srv->rrl;
// /* Process RRL. */
// if (knot_unlikely(rrl != NULL) && rrl->rate > 0) {
// rrl_req_t rrl_rq;
// memset(&rrl_rq, 0, sizeof(rrl_req_t));
// rrl_rq.w = tx->iov_base; /* Wire */
// rrl_rq.query = query;
// rcu_read_lock();
// rrl_rq.flags = query->flags;
// if (rrl_query(rrl, addr, &rrl_rq, query->zone) != KNOT_EOK) {
// tx->iov_len = udp_rrl_reject(ns, query, tx->iov_base,
// KNOT_WIRE_MAX_PKTSIZE,
// knot_wire_get_rcode(query->wire),
// &ans->slip);
// }
// rcu_read_unlock();
// }
}
/* Check for sendmmsg syscall. */
......@@ -299,7 +203,7 @@ int udp_handle(struct answer_ctx *ans, int fd, sockaddr_t *addr,
static void* (*_udp_init)(void) = 0;
static int (*_udp_deinit)(void *) = 0;
static int (*_udp_recv)(int, void *) = 0;
static int (*_udp_handle)(struct answer_ctx *, void *) = 0;
static int (*_udp_handle)(ns_proc_context_t *, void *) = 0;
static int (*_udp_send)(void *) = 0;
/* UDP recvfrom() request struct. */
......@@ -354,7 +258,7 @@ static int udp_recvfrom_recv(int fd, void *d)
return 0;
}
static int udp_recvfrom_handle(struct answer_ctx *ans, void *d)
static int udp_recvfrom_handle(ns_proc_context_t *ctx, void *d)
{
struct udp_recvfrom *rq = (struct udp_recvfrom *)d;
......@@ -364,7 +268,7 @@ static int udp_recvfrom_handle(struct answer_ctx *ans, void *d)
rq->iov[TX].iov_len = KNOT_WIRE_MAX_PKTSIZE;
/* Process received pkt. */
int ret = udp_handle(ans, rq->fd, &rq->addr, &rq->iov[RX], &rq->iov[TX]);
int ret = udp_handle(ctx, rq->fd, &rq->addr, &rq->iov[RX], &rq->iov[TX]);
if (ret != KNOT_EOK) {
rq->iov[TX].iov_len = 0;
}
......@@ -505,7 +409,7 @@ static int udp_recvmmsg_recv(int fd, void *d)
return n;
}
static int udp_recvmmsg_handle(struct answer_ctx *st, void *d)
static int udp_recvmmsg_handle(ns_proc_context_t *ctx, void *d)
{
struct udp_recvmmsg *rq = (struct udp_recvmmsg *)d;
......@@ -517,7 +421,7 @@ static int udp_recvmmsg_handle(struct answer_ctx *st, void *d)
rx->iov_len = rq->msgs[RX][i].msg_len; /* Received bytes. */
rq->addrs[i].len = rq->msgs[RX][i].msg_hdr.msg_namelen;
ret = udp_handle(st, rq->fd, rq->addrs + i, rx, tx);
ret = udp_handle(ctx, rq->fd, rq->addrs + i, rx, tx);
if (ret != KNOT_EOK) { /* Do not send. */
tx->iov_len = 0;
}
......@@ -593,15 +497,18 @@ int udp_reader(iohandler_t *h, dthread_t *thread)
void *rq = _udp_init();
ifacelist_t *ref = NULL;
/* Create memory pool context. */
mm_ctx_t mm;
mm_ctx_mempool(&mm, 2 * sizeof(knot_pkt_t));
/* Create UDP answering context. */
struct answer_ctx ans_ctx;
ans_ctx.srv = h->server;
ans_ctx.slip = 0;
ans_ctx.mm = &mm;
ns_proc_context_t query_ctx;
memset(&query_ctx, 0, sizeof(query_ctx));
query_ctx.ns = h->server->nameserver;
mm_ctx_mempool(&query_ctx.mm, 2 * sizeof(knot_pkt_t));
/* Disable transfers over UDP. */
query_ctx.flags |= NS_QUERY_NO_AXFR;
query_ctx.flags |= NS_QUERY_NO_IXFR;
/* Create query processing context. */
ns_proc_begin(&query_ctx, NS_PROC_QUERY);
/* Chose select as epoll/kqueue has larger overhead for a
* single or handful of sockets. */
......@@ -654,18 +561,20 @@ int udp_reader(iohandler_t *h, dthread_t *thread)
for (unsigned fd = minfd; fd <= maxfd; ++fd) {
if (FD_ISSET(fd, &rfds)) {
while ((rcvd = _udp_recv(fd, rq)) > 0) {
_udp_handle(&ans_ctx, rq);
_udp_handle(&query_ctx, rq);
_udp_send(rq);
mp_flush(mm.ctx);
udp_pps_sample(rcvd, thr_id);
}
}
}
}
/* Close query processing context. */
ns_proc_finish(&query_ctx);
_udp_deinit(rq);
ref_release((ref_t *)ref);
mp_delete(mm.ctx);
mp_delete(query_ctx.mm.ctx);
return KNOT_EOK;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment