Commit 3cd85c1e authored by Vladimír Čunát's avatar Vladimír Čunát

Merge !675: daemon: first part of refactoring

- mainly the daemon/session.* files are separated,
  moving lots of logic from daemon/worker.*;
- lib/generic/queue.* are added;
- verbose logging gets different IDs;
- various minor changes around.
parents 776b6ad1 68d62808
Pipeline #41308 passed with stages
in 10 minutes and 32 seconds
......@@ -9,6 +9,7 @@ kresd_SOURCES := \
daemon/tls_ephemeral_credentials.c \
daemon/tls_session_ticket-srv.c \
daemon/zimport.c \
daemon/session.c \
daemon/main.c
kresd_DIST := daemon/lua/kres.lua daemon/lua/kres-gen.lua \
......
......@@ -33,6 +33,21 @@
#include "lib/cache/cdb_lmdb.h"
#include "lib/dnssec/ta.h"
/* Magic defaults for the engine. */
#ifndef LRU_RTT_SIZE
#define LRU_RTT_SIZE 65536 /**< NS RTT cache size */
#endif
#ifndef LRU_REP_SIZE
#define LRU_REP_SIZE (LRU_RTT_SIZE / 4) /**< NS reputation cache size */
#endif
#ifndef LRU_COOKIES_SIZE
#ifdef ENABLE_COOKIES
#define LRU_COOKIES_SIZE LRU_RTT_SIZE /**< DNS cookies cache size. */
#else
#define LRU_COOKIES_SIZE LRU_ASSOC /* simpler than guards everywhere */
#endif
#endif
/** @internal Compatibility wrapper for Lua < 5.2 */
#if LUA_VERSION_NUM < 502
#define lua_rawlen(L, obj) lua_objlen((L), (obj))
......@@ -608,6 +623,7 @@ static int l_trampoline(lua_State *L)
static int init_resolver(struct engine *engine)
{
/* Note: it had been zored by engine_init(). */
/* Open resolution context */
engine->resolver.trust_anchors = map_make(NULL);
engine->resolver.negative_anchors = map_make(NULL);
......
......@@ -16,33 +16,6 @@
#pragma once
/* Magic defaults */
#ifndef LRU_RTT_SIZE
#define LRU_RTT_SIZE 65536 /**< NS RTT cache size */
#endif
#ifndef LRU_REP_SIZE
#define LRU_REP_SIZE (LRU_RTT_SIZE / 4) /**< NS reputation cache size */
#endif
#ifndef LRU_COOKIES_SIZE
#define LRU_COOKIES_SIZE LRU_RTT_SIZE /**< DNS cookies cache size. */
#endif
#ifndef MP_FREELIST_SIZE
# ifdef __clang_analyzer__
# define MP_FREELIST_SIZE 0
# else
# define MP_FREELIST_SIZE 64 /**< Maximum length of the worker mempool freelist */
# endif
#endif
#ifndef RECVMMSG_BATCH
#define RECVMMSG_BATCH 4
#endif
#ifndef QUERY_RATE_THRESHOLD
#define QUERY_RATE_THRESHOLD (2 * MP_FREELIST_SIZE) /**< Nr of parallel queries considered as high rate */
#endif
#ifndef MAX_PIPELINED
#define MAX_PIPELINED 100
#endif
/*
* @internal These are forward decls to allow building modules with engine but without Lua.
*/
......
This diff is collapsed.
......@@ -25,39 +25,13 @@
struct tls_ctx_t;
struct tls_client_ctx_t;
/* Per-session (TCP or UDP) persistent structure,
* that exists between remote counterpart and a local socket.
*/
struct session {
bool outgoing; /**< True: to upstream; false: from a client. */
bool throttled;
bool has_tls;
bool connected;
bool closing;
union inaddr peer;
uv_handle_t *handle;
uv_timer_t timeout;
struct qr_task *buffering; /**< Worker buffers the incomplete TCP query here. */
struct tls_ctx_t *tls_ctx;
struct tls_client_ctx_t *tls_client_ctx;
uint8_t msg_hdr[4]; /**< Buffer for DNS message header. */
ssize_t msg_hdr_idx; /**< The number of bytes in msg_hdr filled so far. */
qr_tasklist_t tasks;
qr_tasklist_t waiting;
ssize_t bytes_to_skip;
};
void session_free(struct session *s);
struct session *session_new(void);
int udp_bind(uv_udp_t *handle, struct sockaddr *addr);
int udp_bindfd(uv_udp_t *handle, int fd);
int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr);
int tcp_bind_tls(uv_tcp_t *handle, struct sockaddr *addr);
int tcp_bindfd(uv_tcp_t *handle, int fd);
int tcp_bindfd_tls(uv_tcp_t *handle, int fd);
void tcp_timeout_trigger(uv_timer_t *timer);
/** Initialize the handle, incl. ->data = struct session * instance.
* \param type = SOCK_*
......
......@@ -195,6 +195,7 @@ struct kr_request {
trace_callback_f trace_finish;
int vars_ref;
knot_mm_t pool;
unsigned int uid;
};
enum kr_rank {KR_RANK_INITIAL, KR_RANK_OMIT, KR_RANK_TRY, KR_RANK_INDET = 4, KR_RANK_BOGUS, KR_RANK_MISMATCH, KR_RANK_MISSING, KR_RANK_INSECURE, KR_RANK_AUTH = 16, KR_RANK_SECURE = 32};
struct kr_cache {
......
......@@ -727,20 +727,11 @@ int main(int argc, char **argv)
return EXIT_FAILURE;
}
uv_loop_t *loop = NULL;
/* Bind to passed fds and sockets*/
if (bind_fds(&engine.net, &args.fd_set, false) != 0 ||
bind_fds(&engine.net, &args.tls_fd_set, true) != 0 ||
bind_sockets(&engine.net, &args.addr_set, false) != 0 ||
bind_sockets(&engine.net, &args.tls_set, true) != 0
) {
ret = EXIT_FAILURE;
goto cleanup;
}
uv_loop_t *loop = uv_default_loop();
worker->loop = loop;
loop->data = worker;
/* Catch some signals. */
loop = uv_default_loop();
uv_signal_t sigint, sigterm;
if (true) ret = uv_signal_init(loop, &sigint);
if (!ret) ret = uv_signal_init(loop, &sigterm);
......@@ -766,10 +757,18 @@ int main(int argc, char **argv)
goto cleanup;
}
/* Bind to passed fds and sockets*/
if (bind_fds(&engine.net, &args.fd_set, false) != 0 ||
bind_fds(&engine.net, &args.tls_fd_set, true) != 0 ||
bind_sockets(&engine.net, &args.addr_set, false) != 0 ||
bind_sockets(&engine.net, &args.tls_set, true) != 0
) {
ret = EXIT_FAILURE;
goto cleanup;
}
/* Start the scripting engine */
engine_set_moduledir(&engine, args.moduledir);
worker->loop = loop;
loop->data = worker;
if (engine_load_sandbox(&engine) != 0) {
ret = EXIT_FAILURE;
......
This diff is collapsed.
/* Copyright (C) 2018 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <stdbool.h>
#include <uv.h>
struct qr_task;
struct worker_ctx;
struct session;
struct session_flags {
bool outgoing : 1; /**< True: to upstream; false: from a client. */
bool throttled : 1; /**< True: data reading from peer is temporarily stopped. */
bool has_tls : 1; /**< True: given session uses TLS. */
bool connected : 1; /**< True: TCP connection is established. */
bool closing : 1; /**< True: session close sequence is in progress. */
bool wirebuf_error : 1; /**< True: last operation with wirebuf ended up with an error. */
};
/* Allocate new session for a libuv handle. */
struct session *session_new(uv_handle_t *handle);
/* Clear and free given session. */
void session_free(struct session *session);
/* Clear session. */
void session_clear(struct session *session);
/** Close session. */
void session_close(struct session *session);
/** Start reading from underlying libuv IO handle. */
int session_start_read(struct session *session);
/** Stop reading from underlying libuv IO handle. */
int session_stop_read(struct session *session);
/** List of tasks been waiting for IO. */
/** Check if list is empty. */
bool session_waitinglist_is_empty(const struct session *session);
/** Add task to the end of the list. */
int session_waitinglist_push(struct session *session, struct qr_task *task);
/** Get the first element. */
struct qr_task *session_waitinglist_get(const struct session *session);
/** Get the first element and remove it from the list. */
struct qr_task *session_waitinglist_pop(struct session *session, bool deref);
/** Get the list length. */
size_t session_waitinglist_get_len(const struct session *session);
/** Retry resolution for each task in the list. */
void session_waitinglist_retry(struct session *session, bool increase_timeout_cnt);
/** Finalize all tasks in the list. */
void session_waitinglist_finalize(struct session *session, int status);
/** List of tasks associated with session. */
/** Check if list is empty. */
bool session_tasklist_is_empty(const struct session *session);
/** Get the first element. */
struct qr_task *session_tasklist_get_first(struct session *session);
/** Get the first element and remove it from the list. */
struct qr_task *session_tasklist_del_first(struct session *session, bool deref);
/** Get the list length. */
size_t session_tasklist_get_len(const struct session *session);
/** Add task to the list. */
int session_tasklist_add(struct session *session, struct qr_task *task);
/** Remove task from the list. */
int session_tasklist_del(struct session *session, struct qr_task *task);
/** Remove task with given msg_id, session_flags(session)->outgoing must be true. */
struct qr_task* session_tasklist_del_msgid(const struct session *session, uint16_t msg_id);
/** Find task with given msg_id */
struct qr_task* session_tasklist_find_msgid(const struct session *session, uint16_t msg_id);
/** Finalize all tasks in the list. */
void session_tasklist_finalize(struct session *session, int status);
/** Finalize all expired tasks in the list. */
int session_tasklist_finalize_expired(struct session *session);
/** Both of task lists (associated & waiting). */
/** Check if empty. */
bool session_is_empty(const struct session *session);
/** Get pointer to session flags */
struct session_flags *session_flags(struct session *session);
/** Get peer address. */
struct sockaddr *session_get_peer(struct session *session);
/** Get pointer to server-side tls-related data. */
struct tls_ctx_t *session_tls_get_server_ctx(const struct session *session);
/** Set pointer to server-side tls-related data. */
void session_tls_set_server_ctx(struct session *session, struct tls_ctx_t *ctx);
/** Get pointer to client-side tls-related data. */
struct tls_client_ctx_t *session_tls_get_client_ctx(const struct session *session);
/** Set pointer to client-side tls-related data. */
void session_tls_set_client_ctx(struct session *session, struct tls_client_ctx_t *ctx);
/** Get pointer to that part of tls-related data which has common structure for
* server and client. */
struct tls_common_ctx *session_tls_get_common_ctx(const struct session *session);
/** Get pointer to underlying libuv handle for IO operations. */
uv_handle_t *session_get_handle(struct session *session);
struct session *session_get(uv_handle_t *h);
/** Start session timer. */
int session_timer_start(struct session *session, uv_timer_cb cb,
uint64_t timeout, uint64_t repeat);
/** Restart session timer without changing it parameters. */
int session_timer_restart(struct session *session);
/** Stop session timer. */
int session_timer_stop(struct session *session);
/** Get pointer to the beginning of session wirebuffer. */
uint8_t *session_wirebuf_get_start(struct session *session);
/** Get size of session wirebuffer. */
size_t session_wirebuf_get_size(struct session *session);
/** Get length of data in the session wirebuffer. */
size_t session_wirebuf_get_len(struct session *session);
/** Get pointer to the beginning of free space in session wirebuffer. */
uint8_t *session_wirebuf_get_free_start(struct session *session);
/** Get amount of free space in session wirebuffer. */
size_t session_wirebuf_get_free_size(struct session *session);
/** Discard all data in session wirebuffer. */
void session_wirebuf_discard(struct session *session);
/** Move all data to the beginning of the buffer. */
void session_wirebuf_compress(struct session *session);
int session_wirebuf_process(struct session *session);
ssize_t session_wirebuf_consume(struct session *session,
const uint8_t *data, ssize_t len);
/** poison session structure with ASAN. */
void session_poison(struct session *session);
/** unpoison session structure with ASAN. */
void session_unpoison(struct session *session);
knot_pkt_t *session_produce_packet(struct session *session, knot_mm_t *mm);
int session_discard_packet(struct session *session, const knot_pkt_t *pkt);
void session_kill_ioreq(struct session *s, struct qr_task *task);
/** Update timestamp */
void session_touch(struct session *s);
uint64_t session_last_input_activity(struct session *s);
This diff is collapsed.
......@@ -94,9 +94,10 @@ struct tls_common_ctx {
const uint8_t *buf;
ssize_t nread;
ssize_t consumed;
uint8_t recv_buf[4096];
uint8_t recv_buf[8192];
tls_handshake_cb handshake_cb;
struct worker_ctx *worker;
size_t write_queue_size;
};
struct tls_ctx_t {
......@@ -134,7 +135,7 @@ int tls_write(uv_write_t *req, uv_handle_t* handle, knot_pkt_t * pkt, uv_write_c
/*! Unwrap incoming data from a TLS stream and pass them to TCP session.
* @return the number of newly-completed requests (>=0) or an error code
*/
int tls_process(struct worker_ctx *worker, uv_stream_t *handle, const uint8_t *buf, ssize_t nread);
ssize_t tls_process_input_data(struct session *s, const uint8_t *buf, ssize_t nread);
/*! Set TLS certificate and key from files. */
int tls_certificate_set(struct network *net, const char *tls_cert, const char *tls_key);
......
This diff is collapsed.
......@@ -37,30 +37,17 @@ struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool,
/**
* Process an incoming packet (query from a client or answer from upstream).
*
* @param worker the singleton worker
* @param handle socket through which the request came
* @param query the packet, or NULL on an error from the transport layer
* @param addr the address from which the packet came (or NULL, possibly, on error)
* @param session session the where packet came from
* @param query the packet, or NULL on an error from the transport layer
* @return 0 or an error code
*/
int worker_submit(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query,
const struct sockaddr* addr);
/**
* Process incoming DNS message fragment(s) that arrived over a stream (TCP, TLS).
*
* If the fragment contains only a partial message, it is buffered.
* If the fragment contains a complete query or completes current fragment, execute it.
* @return the number of newly-completed requests (>=0) or an error code
*/
int worker_process_tcp(struct worker_ctx *worker, uv_stream_t *handle,
const uint8_t *msg, ssize_t len);
int worker_submit(struct session *session, knot_pkt_t *query);
/**
* End current DNS/TCP session, this disassociates pending tasks from this session
* which may be freely closed afterwards.
*/
int worker_end_tcp(struct worker_ctx *worker, uv_handle_t *handle);
int worker_end_tcp(struct session *session);
/**
* Start query resolution with given query.
......@@ -83,16 +70,42 @@ struct kr_request *worker_task_request(struct qr_task *task);
/** Collect worker mempools */
void worker_reclaim(struct worker_ctx *worker);
/** Closes given session */
void worker_session_close(struct session *session);
void *worker_iohandle_borrow(struct worker_ctx *worker);
int worker_task_step(struct qr_task *task, const struct sockaddr *packet_source,
knot_pkt_t *packet);
void worker_iohandle_release(struct worker_ctx *worker, void *h);
int worker_task_numrefs(const struct qr_task *task);
/** Finalize given task */
int worker_task_finalize(struct qr_task *task, int state);
void worker_task_complete(struct qr_task *task);
void worker_task_ref(struct qr_task *task);
void worker_task_unref(struct qr_task *task);
void worker_task_timeout_inc(struct qr_task *task);
int worker_add_tcp_connected(struct worker_ctx *worker,
const struct sockaddr *addr,
struct session *session);
int worker_del_tcp_connected(struct worker_ctx *worker,
const struct sockaddr *addr);
knot_pkt_t *worker_task_get_pktbuf(const struct qr_task *task);
struct request_ctx *worker_task_get_request(struct qr_task *task);
struct session *worker_request_get_source_session(struct request_ctx *);
void worker_request_set_source_session(struct request_ctx *, struct session *session);
uint16_t worker_task_pkt_get_msgid(struct qr_task *task);
void worker_task_pkt_set_msgid(struct qr_task *task, uint16_t msgid);
uint64_t worker_task_creation_time(struct qr_task *task);
void worker_task_subreq_finalize(struct qr_task *task);
bool worker_task_finished(struct qr_task *task);
/** @cond internal */
/** Number of request within timeout window. */
......@@ -101,15 +114,16 @@ int worker_task_finalize(struct qr_task *task, int state);
/** Maximum response time from TCP upstream, milliseconds */
#define MAX_TCP_INACTIVITY (KR_RESOLVE_TIME_LIMIT + KR_CONN_RTT_MAX)
#ifndef RECVMMSG_BATCH /* see check_bufsize() */
#define RECVMMSG_BATCH 1
#endif
/** Freelist of available mempools. */
typedef array_t(void *) mp_freelist_t;
typedef array_t(struct mempool *) mp_freelist_t;
/** List of query resolution tasks. */
typedef array_t(struct qr_task *) qr_tasklist_t;
/** Session list. */
typedef array_t(struct session *) qr_sessionlist_t;
/** \details Worker state is meant to persist during the whole life of daemon. */
struct worker_ctx {
struct engine *engine;
......@@ -123,11 +137,8 @@ struct worker_ctx {
struct sockaddr_in out_addr4;
struct sockaddr_in6 out_addr6;
#if __linux__
uint8_t wire_buf[RECVMMSG_BATCH * KNOT_WIRE_MAX_PKTSIZE];
#else
uint8_t wire_buf[KNOT_WIRE_MAX_PKTSIZE];
#endif
struct {
size_t concurrent;
size_t rconcurrent;
......@@ -151,35 +162,9 @@ struct worker_ctx {
/** Subrequest leaders (struct qr_task*), indexed by qname+qtype+qclass. */
trie_t *subreq_out;
mp_freelist_t pool_mp;
mp_freelist_t pool_ioreqs;
mp_freelist_t pool_sessions;
mp_freelist_t pool_iohandles;
knot_mm_t pkt_pool;
unsigned int next_request_uid;
};
/* @internal Union of some libuv handles for freelist.
* These have session as their `handle->data` and own it.
* Subset of uv_any_handle. */
union uv_handles {
uv_handle_t handle;
uv_stream_t stream;
uv_udp_t udp;
uv_tcp_t tcp;
uv_timer_t timer;
};
typedef union uv_any_handle uv_handles_t;
/* @internal Union of derivatives from uv_req_t libuv request handles for freelist.
* These have only a reference to the task they're operating on.
* Subset of uv_any_req. */
union uv_reqs {
uv_req_t req;
uv_shutdown_t sdown;
uv_write_t write;
uv_connect_t connect;
uv_udp_send_t send;
};
typedef union uv_reqs uv_reqs_t;
/** @endcond */
......@@ -614,7 +614,8 @@ static int stash_rrarray_entry(ranked_rr_array_t *arr, int arr_i,
ssize_t written = stash_rrset(cache, qry, rr, rr_sigs, qry->timestamp.tv_sec,
entry->rank, nsec_pmap, has_optout);
if (written < 0) {
kr_log_error("[%5hu][cach] stash failed, ret = %d\n", qry->id, ret);
kr_log_error("[%05u.%02u][cach] stash failed, ret = %d\n", qry->request->uid,
qry->uid, ret);
return (int) written;
}
......
......@@ -91,8 +91,12 @@ void __asan_poison_memory_region(void const volatile *addr, size_t size);
void __asan_unpoison_memory_region(void const volatile *addr, size_t size);
#define kr_asan_poison(addr, size) __asan_poison_memory_region((addr), (size))
#define kr_asan_unpoison(addr, size) __asan_unpoison_memory_region((addr), (size))
#define kr_asan_custom_poison(fn, addr) fn ##_poison((addr))
#define kr_asan_custom_unpoison(fn, addr) fn ##_unpoison((addr))
#else
#define kr_asan_poison(addr, size)
#define kr_asan_unpoison(addr, size)
#define kr_asan_custom_poison(fn, addr)
#define kr_asan_custom_unpoison(fn, addr)
#endif
/* @endcond */
......@@ -7,6 +7,7 @@ doesn't allow custom allocation scheme. BSD-licensed (or compatible) code is all
as long as it comes with a test case in `tests/test_generics.c`.
* array_ - a set of simple macros to make working with dynamic arrays easier.
* queue_ - a FIFO + LIFO queue.
* map_ - a `Crit-bit tree`_ key-value map implementation (public domain) that comes with tests.
* set_ - set abstraction implemented on top of ``map`` (unused now).
* pack_ - length-prefixed list of objects (i.e. array-list).
......@@ -19,6 +20,12 @@ array
.. doxygenfile:: array.h
:project: libkres
queue
~~~~~
.. doxygenfile:: queue.h
:project: libkres
map
~~~
......
......@@ -21,7 +21,15 @@ typedef struct lru_group lru_group_t;
struct lru_item {
uint16_t key_len, val_len; /**< Two bytes should be enough for our purposes. */
char data[]; /**< Place for both key and value. */
char data[];
/**< Place for both key and value.
*
* We use "char" to satisfy the C99+ aliasing rules.
* See C99 section 6.5 Expressions, paragraph 7.
* Any type can be accessed through char-pointer,
* so we can use a common struct definition
* for all types being held.
*/
};
/** @internal Compute offset of value in struct lru_item. */
......
......@@ -24,32 +24,31 @@
* most frequent keys/hashes. This tracking is done for *more* keys than
* those that are actually stored.
*
* # Example usage:
*
* Example usage:
* @code{.c}
* // Define new LRU type
* typedef lru_t(int) lru_int_t;
*
* // Create LRU
* lru_int_t *lru;
* lru_create(&lru, 5, NULL);
* lru_create(&lru, 5, NULL, NULL);
*
* // Insert some values
* int *pi = lru_get_new(lru, "luke", strlen("luke"));
* int *pi = lru_get_new(lru, "luke", strlen("luke"), NULL);
* if (pi)
* *pi = 42;
* pi = lru_get_new(lru, "leia", strlen("leia"));
* pi = lru_get_new(lru, "leia", strlen("leia"), NULL);
* if (pi)
* *pi = 24;
*
* // Retrieve values
* int *ret = lru_get_try(lru, "luke", strlen("luke"));
* int *ret = lru_get_try(lru, "luke", strlen("luke"), NULL);
* if (!ret) printf("luke dropped out!\n");
* else printf("luke's number is %d\n", *ret);
*
* char *enemies[] = {"goro", "raiden", "subzero", "scorpion"};
* for (int i = 0; i < 4; ++i) {
* int *val = lru_get_new(lru, enemies[i], strlen(enemies[i]));
* int *val = lru_get_new(lru, enemies[i], strlen(enemies[i]), NULL);
* if (val)
* *val = i;
* }
......
/* Copyright (C) 2018 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "lib/generic/queue.h"
#include <string.h>
KR_EXPORT void queue_init_impl(struct queue *q, size_t item_size)
{
q->len = 0;
q->item_size = item_size;
q->head = q->tail = NULL;
/* Take 128 B (two x86 cache lines), except a small margin
* that the allocator can use for its overhead.
* Normally (64-bit pointers) this means 16 B header + 13*8 B data. */
q->chunk_cap = ( ((ssize_t)128) - offsetof(struct queue_chunk, data)
- sizeof(size_t)
) / item_size;
if (!q->chunk_cap) q->chunk_cap = 1; /* item_size big enough by itself */
}
KR_EXPORT void queue_deinit_impl(struct queue *q)
{
assert(q);
struct queue_chunk *p = q->head;
while (p != NULL) {
struct queue_chunk *pf = p;
p = p->next;
free(pf);
}
#ifndef NDEBUG
memset(q, 0, sizeof(*q));
#endif
}
static struct queue_chunk * queue_chunk_new(const struct queue *q)
{
struct queue_chunk *c = malloc(offsetof(struct queue_chunk, data)
+ q->chunk_cap * q->item_size);
if (unlikely(!c)) abort(); // simplify stuff
memset(c, 0, offsetof(struct queue_chunk, data));
c->cap = q->chunk_cap;
/* ->begin and ->end are zero, i.e. we optimize for _push
* and not _push_head, by default. */
return c;
}
/* Return pointer to the space for the new element. */
KR_EXPORT void * queue_push_impl(struct queue *q)
{
assert(q);
struct queue_chunk *t = q->tail; // shorthand
if (unlikely(!t)) {
assert(!q->head && !q->len);
q->head = q->tail = t = queue_chunk_new(q);
} else
if (t->end == t->cap) {
if (t->begin * 2 >= t->cap) {
/* Utilization is below 50%, so let's shift (no overlap). */
memcpy(t->data, t->data + t->begin * q->item_size,
(t->end - t->begin) * q->item_size);
t->end -= t->begin;
t->begin = 0;
} else {
/* Let's grow the tail by another chunk. */
assert(!t->next);
t->next = queue_chunk_new(q);
t = q->tail = t->next;
}
}
assert(t->end < t->cap);
++(q->len);
++(t->end);
return t->data + q->item_size * (t->end - 1);
}
/* Return pointer to the space for the new element. */
KR_EXPORT void * queue_push_head_impl(struct queue *q)
{
/* When we have choice, we optimize for further _push_head,
* i.e. when shifting or allocating a chunk,
* we store items on the tail-end of the chunk. */
assert(q);
struct queue_chunk *h = q->head; // shorthand
if (unlikely(!h)) {
assert(!q->tail && !q->len);
h = q->head = q->tail = queue_chunk_new(q);
h->begin = h->end = h->cap;
} else
if (h->begin == 0) {
if (h->end * 2 <= h->cap) {
/* Utilization is below 50%, so let's shift (no overlap).
* Computations here are simplified due to h->begin == 0. */
const int cnt = h->end;
memcpy(h->data + (h->cap - cnt) * q->item_size, h->data,
cnt * q->item_size);
h->begin = h->cap - cnt;
h->end = h->cap;
} else {
/* Let's grow the head by another chunk. */
h = queue_chunk_new(q);
h->next = q->head;
q->head = h;
h->begin = h->end = h->cap;
}
}
assert(h->begin > 0);
--(h->begin);
++(q->len);
return h->data + q->item_size * h->begin;
}
/* Copyright (C) 2018 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.