Commit 9775d0ea authored by Libor Peltan's avatar Libor Peltan

Journal: re-implemented completely, LMDB, all zones in one db

parent 0f8d449e
......@@ -258,6 +258,10 @@ src/knot/events/handlers/refresh.c
src/knot/events/handlers/update.c
src/knot/events/replan.c
src/knot/events/replan.h
src/knot/journal/serialization.c
src/knot/journal/serialization.h
src/knot/journal/journal.c
src/knot/journal/journal.h
src/knot/modules/dnsproxy/dnsproxy.c
src/knot/modules/dnsproxy/dnsproxy.h
src/knot/modules/dnstap/dnstap.c
......@@ -312,10 +316,6 @@ src/knot/query/requestor.c
src/knot/query/requestor.h
src/knot/server/dthreads.c
src/knot/server/dthreads.h
src/knot/server/journal.c
src/knot/server/journal.h
src/knot/server/serialization.c
src/knot/server/serialization.h
src/knot/server/server.c
src/knot/server/server.h
src/knot/server/tcp-handler.c
......@@ -562,7 +562,7 @@ tests/contrib/test_wire_ctx.c
tests/dthreads.c
tests/fake_server.h
tests/fdset.c
tests/journal.c
tests/journal_lmdb.c
tests/libknot/test_control.c
tests/libknot/test_cookies-client.c
tests/libknot/test_cookies-opt.c
......
......@@ -871,9 +871,17 @@ This option has no effect with enabled
\fIDefault:\fP off
.SS max\-journal\-size
.sp
Maximum size of the zone journal file.
Maximum size of the journal DB.
.sp
\fIDefault:\fP 2^64
\fIDefault:\fP 1 GiB
.sp
\fBNOTE:\fP
.INDENT 0.0
.INDENT 3.5
Decreasing this value will lead to discarding
whole journal history of all zones.
.UNINDENT
.UNINDENT
.SS max\-zone\-size
.sp
Maximum size of the zone. The size is measured as size of the zone records
......
......@@ -1013,9 +1013,13 @@ is a master server for the zone.
max-journal-size
----------------
Maximum size of the zone journal file.
Maximum size of the journal DB.
*Default:* 2^64
*Default:* 1 GiB
.. NOTE::
Decreasing this value will lead to discarding
whole journal history of all zones.
.. _zone_max_zone_size:
......
......@@ -335,10 +335,10 @@ libknotd_la_SOURCES = \
knot/common/stats.h \
knot/server/dthreads.c \
knot/server/dthreads.h \
knot/server/journal.c \
knot/server/journal.h \
knot/server/serialization.c \
knot/server/serialization.h \
knot/journal/journal.c \
knot/journal/journal.h \
knot/journal/serialization.c \
knot/journal/serialization.h \
knot/server/server.c \
knot/server/server.h \
knot/server/tcp-handler.c \
......
......@@ -1048,22 +1048,19 @@ char* conf_zonefile_txn(
char* conf_journalfile_txn(
conf_t *conf,
knot_db_txn_t *txn,
const knot_dname_t *zone)
knot_db_txn_t *txn)
{
if (zone == NULL) {
return NULL;
}
conf_val_t val;
conf_val_t val = conf_zone_get_txn(conf, txn, C_JOURNAL, zone);
const char *journal = conf_str(&val);
val = conf_default_get_txn(conf, txn, C_STORAGE);
char *storage = conf_abs_path(&val, NULL);
// Use default journalfile name pattern if not specified.
if (journal == NULL) {
journal = "%s.db";
}
val = conf_default_get_txn(conf, txn, C_JOURNAL);
char *journaldir = conf_abs_path(&val, storage);
free(storage);
return get_filename(conf, txn, zone, journal);
return journaldir;
}
size_t conf_udp_threads_txn(
......
......@@ -584,16 +584,12 @@ static inline char* conf_zonefile(
*
* \return Absolute journal file path string pointer.
*/
char* conf_journalfile_txn(
conf_t *conf,
knot_db_txn_t *txn,
const knot_dname_t *zone
);
char* conf_journalfile_txn(conf_t *conf,
knot_db_txn_t *txn);
static inline char* conf_journalfile(
conf_t *conf,
const knot_dname_t *zone)
conf_t *conf)
{
return conf_journalfile_txn(conf, &conf->read_txn, zone);
return conf_journalfile_txn(conf, &conf->read_txn);
}
/*!
......
......@@ -225,10 +225,14 @@ static const yp_item_t desc_remote[] = {
{ NULL }
};
#define VIRT_MEM_TOP (2LLU * 1024 * 1204 * 1204)
#define VIRT_MEM_LIMIT(x) (((sizeof(void *) < 8) && ((x) > VIRT_MEM_TOP)) ? VIRT_MEM_TOP : (x))
#define ZONE_ITEMS(FLAGS) \
{ C_STORAGE, YP_TSTR, YP_VSTR = { STORAGE_DIR }, FLAGS }, \
{ C_FILE, YP_TSTR, YP_VNONE, FLAGS }, \
{ C_JOURNAL, YP_TSTR, YP_VNONE, FLAGS }, \
{ C_MASTER, YP_TREF, YP_VREF = { C_RMT }, YP_FMULTI, { check_ref } }, \
{ C_DDNS_MASTER, YP_TREF, YP_VREF = { C_RMT }, YP_FNONE, { check_ref } }, \
{ C_NOTIFY, YP_TREF, YP_VREF = { C_RMT }, YP_FMULTI, { check_ref } }, \
......@@ -237,8 +241,6 @@ static const yp_item_t desc_remote[] = {
{ C_DISABLE_ANY, YP_TBOOL, YP_VNONE }, \
{ C_ZONEFILE_SYNC, YP_TINT, YP_VINT = { -1, INT32_MAX, 0, YP_STIME } }, \
{ C_IXFR_DIFF, YP_TBOOL, YP_VNONE }, \
{ C_MAX_JOURNAL_SIZE, YP_TINT, YP_VINT = { 0, INT64_MAX, INT64_MAX, YP_SSIZE }, \
FLAGS }, \
{ C_MAX_ZONE_SIZE, YP_TINT, YP_VINT = { 0, INT64_MAX, INT64_MAX, YP_SSIZE }, \
FLAGS }, \
{ C_KASP_DB, YP_TSTR, YP_VSTR = { "keys" }, FLAGS }, \
......@@ -256,6 +258,9 @@ static const yp_item_t desc_template[] = {
{ C_TIMER_DB, YP_TSTR, YP_VSTR = { "timers" }, CONF_IO_FRLD_ZONES }, \
{ C_GLOBAL_MODULE, YP_TDATA, YP_VDATA = { 0, NULL, mod_id_to_bin, mod_id_to_txt }, \
YP_FMULTI | CONF_IO_FRLD_MOD, { check_modref } }, \
{ C_JOURNAL, YP_TSTR, YP_VSTR = { "journal.db" }, CONF_IO_FRLD_ZONES }, \
{ C_MAX_JOURNAL_SIZE, YP_TINT, YP_VINT = { 1024 * 1024, VIRT_MEM_LIMIT(100LLU * 1024 * 1024 * 1024 * 1024), \
VIRT_MEM_LIMIT(20LLU * 1024 * 1024 * 1024), YP_SSIZE } }, \
{ NULL }
};
......
......@@ -514,6 +514,24 @@ int check_template(
return KNOT_EINVAL;
}
// Check journal.
conf_val_t journal = conf_rawid_get_txn(args->conf, args->txn, C_TPL,
C_JOURNAL, args->id, args->id_len);
if (journal.code == KNOT_EOK) {
args->err_str = "journal location in non-default template";
return KNOT_EINVAL;
}
// Check max-journal-size.
conf_val_t max_journal_size = conf_rawid_get_txn(args->conf, args->txn, C_TPL,
C_MAX_JOURNAL_SIZE, args->id, args->id_len);
if (max_journal_size.code == KNOT_EOK) {
args->err_str = "journal size in non-default template";
return KNOT_EINVAL;
}
return KNOT_EOK;
}
......
......@@ -930,9 +930,9 @@ static int zone_purge(zone_t *zone, ctl_args_t *args)
free(zonefile);
// Purge the zone journal.
char *journalfile = conf_journalfile(conf(), zone->name);
(void)unlink(journalfile);
free(journalfile);
if (journal_open(zone->journal, zone->journal_db, zone->name) == KNOT_EOK) {
(void)scrape_journal(zone->journal);
}
// Purge the zone timers.
(void)remove_timer_db(args->server->timers_db, args->server->zone_db,
......
/* Copyright (C) 2016 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <limits.h>
#include <stdio.h>
#include <sys/stat.h>
#include <stdarg.h>
#include "knot/journal/journal.h"
#include "knot/zone/serial.h"
#include "knot/common/log.h"
#include "contrib/files.h"
#include "contrib/endian.h"
/*! \brief journal database name. */
#define DATA_DB_NAME "data"
/*! \brief Minimum journal size. */
#define FSLIMIT_MIN (1 * 1024 * 1024)
/*! \brief Changeset chunk size. */
#define CHUNK_MAX (60 * 1024)
/*! \brief Journal versoin (in plain code ... 10 means 1.0) */
#define JOURNAL_VERSION ((uint32_t) 10)
/*! \brief various metadata DB key strings */
#define MDKEY_GLOBAL_VERSION "version"
#define MDKEY_GLOBAL_JOURNAL_COUNT "journal_count"
#define MDKEY_GLOBAL_LAST_TOTAL_OCCUPIED "last_total_occupied"
#define MDKEY_GLOBAL_LAST_INSERTER_ZONE "last_inserter_zone"
#define MDKEY_PERZONE_OCCUPIED "occupied"
#define MDKEY_PERZONE_FLAGS "flags" // this one is also hardcoded in macro txn_commit_md()
enum {
LAST_FLUSHED_VALID = 1 << 0, /* "last flush is valid" flag. */
SERIAL_TO_VALID = 1 << 1, /* "last serial_to is valid" flag. */
MERGED_SERIAL_VALID= 1 << 2, /* "serial_from" of merged changeset */
DIRTY_SERIAL_VALID = 1 << 3, /* "dirty_serial" is present in the DB */
};
static int journal_flush_allowed(journal_t *j) {
conf_val_t val = conf_zone_get(conf(), C_ZONEFILE_SYNC, j->zone);
if (val.item == NULL || conf_int(&val) >= 0) {
return 1; // val->item == NULL ---> default behaviour, ie standard flush, no merge.
}
return 0;
}
static int journal_merge_allowed(journal_t *j) {
return !journal_flush_allowed(j); // TODO think of other behaviour, e.g. setting
}
static float journal_tofree_factor(journal_t *j)
{
return 2.0f;
}
static float journal_minfree_factor(journal_t *j)
{
return 0.33f;
}
static float journal_max_txn(journal_t *j)
{
return 0.05f;
}
/*
* ***************************** PART I *******************************
*
* Transaction manipulation functions
*
* ********************************************************************
*/
typedef struct {
journal_t *j;
knot_db_txn_t *txn;
int ret;
int is_rw;
knot_db_iter_t *iter;
knot_db_val_t key;
knot_db_val_t val;
uint8_t key_raw[512];
journal_metadata_t shadow_md;
} txn_t;
static void md_get(txn_t *txn, const knot_dname_t *zone, const char *mdkey, uint32_t *res);
static void md_set(txn_t *txn, const knot_dname_t *zone, const char *mdkey, uint32_t val);
static void txn_init(txn_t *txn, knot_db_txn_t *db_txn, journal_t *j)
{
txn->j = j;
txn->txn = db_txn;
txn->ret = KNOT_ESEMCHECK;
txn->iter = NULL;
txn->key.len = 0;
txn->key.data = &txn->key_raw;
txn->val.len = 0;
txn->val.data = NULL;
}
#define local_txn_t(txn_name, journal) \
knot_db_txn_t __db_txn_ ## txn_name; \
txn_t __local_txn_ ## txn_name; \
txn_t *txn_name = &__local_txn_ ## txn_name; \
txn_init(txn_name, &__db_txn_ ## txn_name, (journal))
static void txn_key_str(txn_t *txn, const knot_dname_t *zone, const char *key)
{
size_t zone_size = 0;
if (zone != NULL) zone_size = knot_dname_size(zone);
txn->key.len = zone_size + strlen(key) + 1;
if (txn->key.len > 512) {
txn->ret = KNOT_ERROR;
return;
}
if (zone != NULL) memcpy(txn->key.data, zone, zone_size);
strcpy(txn->key.data + zone_size, key);
}
static void txn_key_2u32(txn_t *txn, const knot_dname_t *zone, uint32_t key1, uint32_t key2)
{
size_t zone_size = 0;
if (zone != NULL) zone_size = knot_dname_size(zone);
txn->key.len = zone_size + 2*sizeof(uint32_t);
if (txn->key.len > 512) {
txn->ret = KNOT_ERROR;
return;
}
if (zone != NULL) memcpy(txn->key.data, zone, zone_size);
uint32_t key_be1 = htobe32(key1);
uint32_t key_be2 = htobe32(key2);
memcpy(txn->key.data + zone_size, &key_be1, sizeof(uint32_t));
memcpy(txn->key.data + zone_size + sizeof(uint32_t), &key_be2, sizeof(uint32_t));
}
static int txn_cmpkey(txn_t *txn, knot_db_val_t *key2)
{
if (txn->key.len != key2->len) {
return (txn->key.len < key2->len ? -1 : 1);
}
return memcmp(txn->key.data, key2->data, key2->len);
}
static void txn_val_u32(txn_t *txn, uint32_t *res)
{
if (txn->ret != KNOT_EOK) {
return;
}
if (txn->val.len != sizeof(uint32_t)) {
txn->ret = KNOT_EMALF;
}
*res = be32toh(*(uint32_t *)txn->val.data);
}
#define txn_begin_md(md) md_get(txn, txn->j->zone, #md, &txn->shadow_md.md)
#define txn_commit_md(md) md_set(txn, txn->j->zone, #md, txn->shadow_md.md)
#define txn_check(txn) if ((txn)->ret != KNOT_EOK) return
#define txn_check_ret(txn) if ((txn)->ret != KNOT_EOK) return ((txn)->ret)
#define txn_ret(txn) return ((txn)->ret == KNOT_ESEMCHECK ? KNOT_EOK : (txn)->ret)
static void txn_begin(txn_t *txn, int write_allowed)
{
if (txn->ret != KNOT_ESEMCHECK) {
txn->ret = KNOT_EINVAL;
return;
}
txn->ret = txn->j->db->db_api->txn_begin(txn->j->db->db, txn->txn, (unsigned) (write_allowed ? 0 : KNOT_DB_RDONLY));
txn->is_rw = (write_allowed ? 1 : 0);
txn_begin_md(first_serial);
txn_begin_md(last_serial);
txn_begin_md(last_serial_to);
txn_begin_md(last_flushed);
txn_begin_md(merged_serial);
txn_begin_md(dirty_serial);
txn_begin_md(flags);
}
static void txn_find_force(txn_t *txn)
{
if (txn->ret == KNOT_EOK) {
txn->ret = txn->j->db->db_api->find(txn->txn, &txn->key, &txn->val, 0);
}
}
static int txn_find(txn_t *txn)
{
if (txn->ret != KNOT_EOK) {
return 0;
}
txn_find_force(txn);
if (txn->ret == KNOT_ENOENT) {
txn->ret = KNOT_EOK;
return 0;
}
return (txn->ret == KNOT_EOK ? 1 : 0);
}
static void txn_insert(txn_t *txn)
{
if (txn->ret == KNOT_EOK) {
txn->ret = txn->j->db->db_api->insert(txn->txn, &txn->key, &txn->val, 0);
}
}
static void txn_del(txn_t *txn)
{
if (txn->ret == KNOT_EOK) {
txn->ret = txn->j->db->db_api->del(txn->txn, &txn->key);
}
}
static void txn_iter_begin(txn_t *txn)
{
txn_check(txn);
txn->iter = txn->j->db->db_api->iter_begin(txn->txn, KNOT_DB_FIRST);
if (txn->iter == NULL) {
txn->ret = KNOT_ENOMEM;
}
}
#define txn_check_iter if (txn->iter == NULL && txn->ret == KNOT_EOK) txn->ret = KNOT_EINVAL; if (txn->ret != KNOT_EOK) return;
static void txn_iter_seek(txn_t *txn)
{
txn_check_iter
txn->iter = txn->j->db->db_api->iter_seek(txn->iter, &txn->key, 0);
if (txn->iter == NULL) {
txn->ret = KNOT_ENOENT;
}
}
static void txn_iter_key(txn_t *txn, knot_db_val_t *at_key)
{
txn_check_iter
txn->ret = txn->j->db->db_api->iter_key(txn->iter, at_key);
}
static void txn_iter_val(txn_t *txn)
{
txn_check_iter
txn->ret = txn->j->db->db_api->iter_val(txn->iter, &txn->val);
}
static void txn_iter_next(txn_t *txn)
{
txn_check_iter
txn->iter = txn->j->db->db_api->iter_next(txn->iter);
if (txn->iter == NULL) {
txn->ret = KNOT_ENOENT;
}
}
static void txn_iter_finish(txn_t *txn)
{
if (txn->iter != NULL) {
txn->j->db->db_api->iter_finish(txn->iter);
}
txn->iter = NULL;
}
static void txn_abort(txn_t *txn)
{
if (txn->ret == KNOT_ESEMCHECK) {
return;
}
txn_iter_finish(txn);
txn->j->db->db_api->txn_abort(txn->txn);
if (txn->ret == KNOT_EOK) {
txn->ret = KNOT_ESEMCHECK;
}
}
static void txn_commit(txn_t *txn)
{
if (txn->is_rw) {
txn_commit_md(first_serial);
txn_commit_md(last_serial);
txn_commit_md(last_serial_to);
txn_commit_md(last_flushed);
txn_commit_md(merged_serial);
txn_commit_md(dirty_serial);
txn_commit_md(flags);
}
if (txn->ret != KNOT_EOK) {
txn_abort(txn);
return;
}
txn_iter_finish(txn);
txn->ret = txn->j->db->db_api->txn_commit(txn->txn);
if (txn->ret == KNOT_EOK) {
txn->ret = KNOT_ESEMCHECK;
}
txn_abort(txn); // no effect if all ok
}
static void txn_restart(txn_t *txn)
{
txn_commit(txn);
if (txn->ret == KNOT_ESEMCHECK) {
txn_begin(txn, txn->is_rw);
}
}
static void txn_reuse(txn_t **txn, txn_t *to_reuse, int write_allowed)
{
if (to_reuse == NULL) {
txn_begin(*txn, write_allowed);
}
else {
*txn = to_reuse;
}
}
static void txn_unreuse(txn_t **txn, txn_t *reused)
{
if (reused == NULL) {
txn_commit(*txn);
}
}
#define reuse_txn(name, journal, to_reuse, wa) local_txn_t(name, journal); txn_reuse(&name, to_reuse, wa)
#define unreuse_txn(name, reused) txn_unreuse(&name, reused)
/*
* ***************************** PART II ******************************
*
* DB metadata manip. and Chunk metadata headers
*
* ********************************************************************
*/
static void md_get(txn_t *txn, const knot_dname_t *zone, const char *mdkey, uint32_t *res)
{
txn_check(txn);
txn_key_str(txn, zone, mdkey);
uint32_t res1 = 0;
if (txn_find(txn)) {
txn_val_u32(txn, &res1);
}
*res = res1;
}
// allocates res
static void md_get_common_last_inserter_zone(txn_t *txn, knot_dname_t **res)
{
txn_check(txn);
txn_key_str(txn, NULL, MDKEY_GLOBAL_LAST_INSERTER_ZONE);
if (txn_find(txn)) {
*res = knot_dname_copy(txn->val.data, NULL);
}
else {
*res = NULL;
}
}
static int md_set_common_last_inserter_zone(txn_t *txn, knot_dname_t *zone)
{
txn_check_ret(txn);
txn_key_str(txn, NULL, MDKEY_GLOBAL_LAST_INSERTER_ZONE);
txn->val.len = knot_dname_size(zone);
txn->val.data = zone;
txn_insert(txn);
return txn->ret;
}
static void md_get_common_last_occupied(txn_t *txn, size_t *res)
{
uint32_t sres;
md_get(txn, NULL, MDKEY_GLOBAL_LAST_TOTAL_OCCUPIED, &sres);
*res = (size_t) sres;
}
static void md_set(txn_t *txn, const knot_dname_t *zone, const char *mdkey, uint32_t val)
{
txn_key_str(txn, zone, mdkey);
uint32_t val1 = htobe32(val);
txn->val.len = sizeof(uint32_t);
txn->val.data = &val1;
txn_insert(txn);
}
static int md_flag(txn_t *txn, int flag)
{
return ((txn->shadow_md.flags & flag) ? 1 : 0);
}
/*! \brief Marks metadata as flushed */
static void md_flush(txn_t *txn)
{
if (md_flag(txn, SERIAL_TO_VALID)) {
txn->shadow_md.last_flushed = txn->shadow_md.last_serial;
txn->shadow_md.flags |= LAST_FLUSHED_VALID;
}
}
static int md_flushed(txn_t *txn)
{
return (!md_flag(txn, SERIAL_TO_VALID) || (md_flag(txn, LAST_FLUSHED_VALID) && serial_compare(txn->shadow_md.last_flushed, txn->shadow_md.last_serial) == 0));
}
/*! \brief some "metadata" inserted to the beginning of each chunk */
typedef struct {
uint32_t serial_to; // changeset's SOA-to serial
uint32_t chunk_count; // # of changeset's chunks
} journal_header_t;
static void make_header(knot_db_val_t *to, uint32_t serial_to, int chunk_count)
{
assert(to->len >= sizeof(journal_header_t));
assert(chunk_count > 0);
journal_header_t h;
h.serial_to = htobe32(serial_to);
h.chunk_count = htobe32((uint32_t)chunk_count);
memcpy(to->data, &h, sizeof(h));
}
/*! \brief read properties from chunk header "from". All the output params are optional */
static void unmake_header(const knot_db_val_t *from, uint32_t *serial_to,
int *chunk_count, size_t *header_size)
{
assert(from->len >= sizeof(journal_header_t));
journal_header_t *h = (journal_header_t *)from->data;
if (serial_to != NULL) *serial_to = be32toh(h->serial_to);
assert(be32toh(h->chunk_count) <= INT_MAX);
if (chunk_count != NULL) *chunk_count = (int)be32toh(h->chunk_count);
if (header_size != NULL) *header_size = sizeof(*h);
}
static size_t journal_max_occupied(journal_t *j, txn_t *txn)
{
uint32_t jcnt;
md_get(txn, NULL, MDKEY_GLOBAL_JOURNAL_COUNT, &jcnt);
return (txn->ret == KNOT_EOK ? (j->db->fslimit / (jcnt + 1)) : 0);
}
static uint32_t first_digit(uint32_t of)
{
while (of > 9) of /= 10;
return of;
}
static void md_update_journal_count(txn_t * txn, int change_amount)
{
uint32_t jcnt;
md_get(txn, NULL, MDKEY_GLOBAL_JOURNAL_COUNT, &jcnt);
md_set(txn, NULL, MDKEY_GLOBAL_JOURNAL_COUNT, jcnt + change_amount);
}
static int initial_md_check(journal_t *j, int *dirty_present)
{
*dirty_present = 0;
local_txn_t(txn, j);
txn_begin(txn, 1);
txn_key_str(txn, NULL, MDKEY_GLOBAL_VERSION);
if (!txn_find(txn)) {
md_set(txn, NULL, MDKEY_GLOBAL_VERSION, JOURNAL_VERSION);
}
else {
uint32_t jver;
txn_val_u32(txn, &jver);
if (first_digit(jver) != first_digit(JOURNAL_VERSION)) {
txn_abort(txn);
return KNOT_ENOTSUP;
}
}
txn_key_str(txn, j->zone, MDKEY_PERZONE_FLAGS);
if (!txn_find(txn)) {
md_update_journal_count(txn, +1);
}
*dirty_present = md_flag(txn, DIRTY_SERIAL_VALID);
txn_commit(txn);
txn_ret(txn);
}
/*
* **************************** PART III ******************************
*
* DB iteration
*
* ********************************************************************
*/