Commit 8d81ca01 authored by Vladimír Čunát's avatar Vladimír Čunát

Merge !210: support setting address for outgoing connections

Closes #158.
parents 8e2716f8 09645444
......@@ -601,6 +601,11 @@ For when listening on ``localhost`` just doesn't cut it.
have size of multiplies of 64 (64, 128, 192, ...). Setting padding to
value < 2 will disable it.
.. function:: net.outgoing_v4([string address])
Get/set the IPv4 address used to perform queries. There is also ``net.outgoing_v6`` for IPv6.
The default is ``nil``, which lets the OS choose any address.
Trust anchors and DNSSEC
^^^^^^^^^^^^^^^^^^^^^^^^
......
......@@ -22,6 +22,7 @@
#include "lib/cache.h"
#include "lib/cdb.h"
#include "lib/utils.h"
#include "daemon/bindings.h"
#include "daemon/worker.h"
#include "daemon/tls.h"
......@@ -422,6 +423,63 @@ static int net_tls_padding(lua_State *L)
return 1;
}
static int net_outgoing(lua_State *L, int family)
{
struct worker_ctx *worker = wrk_luaget(L);
union inaddr *addr;
if (family == AF_INET)
addr = (union inaddr*)&worker->out_addr4;
else
addr = (union inaddr*)&worker->out_addr6;
if (lua_gettop(L) == 0) { /* Return the current value. */
if (addr->ip.sa_family == AF_UNSPEC) {
lua_pushnil(L);
return 1;
}
if (addr->ip.sa_family != family) {
assert(false);
lua_error(L);
}
char addr_buf[INET6_ADDRSTRLEN];
int err;
if (family == AF_INET)
err = uv_ip4_name(&addr->ip4, addr_buf, sizeof(addr_buf));
else
err = uv_ip6_name(&addr->ip6, addr_buf, sizeof(addr_buf));
if (err)
lua_error(L);
lua_pushstring(L, addr_buf);
return 1;
}
if ((lua_gettop(L) != 1) || (!lua_isstring(L, 1) && !lua_isnil(L, 1))) {
format_error(L, "net.outgoing_vX takes one address string parameter or nil");
lua_error(L);
}
if (lua_isnil(L, 1)) {
addr->ip.sa_family = AF_UNSPEC;
return 1;
}
const char *addr_str = lua_tostring(L, 1);
int err;
if (family == AF_INET)
err = uv_ip4_addr(addr_str, 0, &addr->ip4);
else
err = uv_ip6_addr(addr_str, 0, &addr->ip6);
if (err) {
format_error(L, "net.outgoing_vX: failed to parse the address");
lua_error(L);
}
lua_pushboolean(L, true);
return 1;
}
static int net_outgoing_v4(lua_State *L) { return net_outgoing(L, AF_INET); }
static int net_outgoing_v6(lua_State *L) { return net_outgoing(L, AF_INET6); }
int lib_net(lua_State *L)
{
static const luaL_Reg lib[] = {
......@@ -433,6 +491,8 @@ int lib_net(lua_State *L)
{ "tcp_pipeline", net_pipeline },
{ "tls", net_tls },
{ "tls_padding", net_tls_padding },
{ "outgoing_v4", net_outgoing_v4 },
{ "outgoing_v6", net_outgoing_v6 },
{ NULL, NULL }
};
register_lib(L, "net", lib);
......
......@@ -46,6 +46,7 @@ int tcp_bind_tls(uv_tcp_t *handle, struct sockaddr *addr);
int tcp_bindfd(uv_tcp_t *handle, int fd);
int tcp_bindfd_tls(uv_tcp_t *handle, int fd);
/** Initialize the handle, incl. ->data = struct session * instance. type = SOCK_* */
void io_create(uv_loop_t *loop, uv_handle_t *handle, int type);
void io_deinit(uv_handle_t *handle);
void io_free(uv_handle_t *handle);
......
......@@ -94,9 +94,17 @@ static inline void req_release(struct worker_ctx *worker, struct req *req)
}
}
/*! @internal Create a UDP/TCP handle */
static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype)
/*! @internal Create a UDP/TCP handle for an outgoing AF_INET* connection.
* socktype is SOCK_* */
static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t family)
{
bool precond = (socktype == SOCK_DGRAM || socktype == SOCK_STREAM)
&& (family == AF_INET || family == AF_INET6);
if (!precond) {
assert(false);
return NULL;
}
if (task->pending_count >= MAX_PENDING) {
return NULL;
}
......@@ -106,10 +114,30 @@ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype)
return NULL;
}
io_create(task->worker->loop, handle, socktype);
/* Bind to outgoing address, according to IP v4/v6. */
union inaddr *addr;
if (family == AF_INET) {
addr = (union inaddr *)&task->worker->out_addr4;
} else {
addr = (union inaddr *)&task->worker->out_addr6;
}
int ret = 0;
if (addr->ip.sa_family != AF_UNSPEC) {
assert(addr->ip.sa_family == family);
if (socktype == SOCK_DGRAM) {
ret = uv_udp_bind((uv_udp_t *)handle, &addr->ip, 0);
} else {
ret = uv_tcp_bind((uv_tcp_t *)handle, &addr->ip, 0);
}
}
/* Set current handle as a subrequest type. */
struct session *session = handle->data;
session->outgoing = true;
int ret = array_push(session->tasks, task);
if (ret == 0) {
session->outgoing = true;
ret = array_push(session->tasks, task);
}
if (ret < 0) {
io_deinit(handle);
req_release(task->worker, (struct req *)handle);
......@@ -575,9 +603,9 @@ static void on_timeout(uv_timer_t *req)
static bool retransmit(struct qr_task *task)
{
if (task && task->addrlist && task->addrlist_count > 0) {
uv_handle_t *subreq = ioreq_spawn(task, SOCK_DGRAM);
struct sockaddr_in6 *choice = &((struct sockaddr_in6 *)task->addrlist)[task->addrlist_turn];
uv_handle_t *subreq = ioreq_spawn(task, SOCK_DGRAM, choice->sin6_family);
if (subreq) { /* Create connection for iterative query */
struct sockaddr_in6 *choice = &((struct sockaddr_in6 *)task->addrlist)[task->addrlist_turn];
if (qr_task_send(task, subreq, (struct sockaddr *)choice, task->pktbuf) == 0) {
task->addrlist_turn = (task->addrlist_turn + 1) % task->addrlist_count; /* Round robin */
return true;
......@@ -772,13 +800,15 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour
if (!conn) {
return qr_task_step(task, NULL, NULL);
}
uv_handle_t *client = ioreq_spawn(task, sock_type);
const struct sockaddr *addr =
packet_source ? packet_source : task->addrlist;
uv_handle_t *client = ioreq_spawn(task, sock_type, addr->sa_family);
if (!client) {
req_release(task->worker, (struct req *)conn);
return qr_task_step(task, NULL, NULL);
}
conn->data = task;
if (uv_tcp_connect(conn, (uv_tcp_t *)client, packet_source?packet_source:task->addrlist, on_connect) != 0) {
if (uv_tcp_connect(conn, (uv_tcp_t *)client, addr , on_connect) != 0) {
req_release(task->worker, (struct req *)conn);
return qr_task_step(task, NULL, NULL);
}
......@@ -1083,6 +1113,8 @@ struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool,
worker->count = worker_count;
worker->engine = engine;
worker_reserve(worker, MP_FREELIST_SIZE);
worker->out_addr4.sin_family = AF_UNSPEC;
worker->out_addr6.sin6_family = AF_UNSPEC;
/* Register worker in Lua thread */
lua_pushlightuserdata(engine->L, worker);
lua_setglobal(engine->L, "__worker");
......
......@@ -78,6 +78,11 @@ struct worker_ctx {
int id;
int count;
unsigned tcp_pipeline_max;
/** Addresses to bind for outgoing connections or AF_UNSPEC. */
struct sockaddr_in out_addr4;
struct sockaddr_in6 out_addr6;
#if __linux__
uint8_t wire_buf[RECVMMSG_BATCH * KNOT_WIRE_MAX_PKTSIZE];
#else
......@@ -93,6 +98,7 @@ struct worker_ctx {
size_t dropped;
size_t timeout;
} stats;
map_t outgoing;
mp_freelist_t pool_mp;
mp_freelist_t pool_ioreq;
......@@ -120,14 +126,8 @@ struct qr_task
worker_cb_t on_complete;
void *baton;
struct {
union {
struct sockaddr_in ip4;
struct sockaddr_in6 ip6;
} addr;
union {
struct sockaddr_in ip4;
struct sockaddr_in6 ip6;
} dst_addr;
union inaddr addr;
union inaddr dst_addr;
uv_handle_t *handle;
} source;
uint32_t refs;
......
......@@ -78,11 +78,7 @@ struct kr_nsrep
unsigned reputation; /**< NS reputation */
const knot_dname_t *name; /**< NS name */
struct kr_context *ctx; /**< Resolution context */
union {
struct sockaddr ip;
struct sockaddr_in ip4;
struct sockaddr_in6 ip6;
} addr[KR_NSREP_MAXADDR]; /**< NS address(es) */
union inaddr addr[KR_NSREP_MAXADDR]; /**< NS address(es) */
};
/** @internal Address bytes for given family. */
......
......@@ -137,6 +137,13 @@ KR_EXPORT
int kr_pkt_put(knot_pkt_t *pkt, const knot_dname_t *name, uint32_t ttl,
uint16_t rclass, uint16_t rtype, const uint8_t *rdata, uint16_t rdlen);
/** Simple storage for IPx address or AF_UNSPEC. */
union inaddr {
struct sockaddr ip;
struct sockaddr_in ip4;
struct sockaddr_in6 ip6;
};
/** Address bytes for given family. */
KR_EXPORT KR_PURE
const char *kr_inaddr(const struct sockaddr *addr);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment