Commit 8664300f authored by Libor Peltan's avatar Libor Peltan

journal on-the-fly deserialization: support in journal

parent decb11b2
......@@ -135,6 +135,8 @@ src/knot/events/log.h
src/knot/events/replan.c
src/knot/events/replan.h
src/knot/include/module.h
src/knot/journal/chgset_ctx.c
src/knot/journal/chgset_ctx.h
src/knot/journal/journal.c
src/knot/journal/journal.h
src/knot/journal/serialization.c
......
......@@ -121,6 +121,8 @@ libknotd_la_SOURCES = \
knot/common/stats.h \
knot/server/dthreads.c \
knot/server/dthreads.h \
knot/journal/chgset_ctx.c \
knot/journal/chgset_ctx.h \
knot/journal/journal.c \
knot/journal/journal.h \
knot/journal/serialization.c \
......
/* Copyright (C) 2018 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "chgset_ctx.h"
#include "knot/journal/journal.h"
#include "knot/journal/serialization.h"
chgset_ctx_t *chgset_ctx_create(size_t chunk_count)
{
chgset_ctx_t *ch = calloc(1, sizeof(*ch));
if (ch != NULL) {
ch->chunk_count = chunk_count;
ch->src_chunks = calloc(chunk_count, sizeof(*ch->src_chunks));
ch->chunk_sizes = calloc(chunk_count, sizeof(*ch->chunk_sizes));
if (ch->src_chunks == NULL || ch->chunk_sizes == NULL) {
chgset_ctx_free(ch);
ch = NULL;
}
}
return ch;
}
void chgset_ctx_free(chgset_ctx_t *ch)
{
free(ch->src_chunks);
free(ch->chunk_sizes);
free(ch);
}
void chgset_ctx_list_close(chgset_ctx_list_t *l)
{
chgset_ctx_t *ch = NULL, *nxt = NULL;
WALK_LIST_DELSAFE(ch, nxt, l->l) {
chgset_ctx_free(ch);
}
journal_txn_commit(l->txn);
free(l->txn);
memset(l, 0, sizeof(*l));
}
void chgset_ctx_iterate(chgset_ctx_t *ch)
{
assert(ch->chunk_count > 0);
ch->curr_chunk = 0;
ch->wire = wire_ctx_init(ch->src_chunks[0], ch->chunk_sizes[0]);
ch->phase = CHGSET_CTX_START;
}
int chgset_ctx_next(chgset_ctx_t *ch, knot_rrset_t *rrset)
{
int ret = deserialize_rrset_chunks(&ch->wire, rrset, ch->src_chunks,
ch->chunk_sizes, ch->chunk_count, &ch->curr_chunk);
if (ret != KNOT_EOK) {
return ret;
}
if (ch->phase == CHGSET_CTX_START && rrset->type != KNOT_RRTYPE_SOA) {
return KNOT_EMALF;
}
if (ch->phase == CHGSET_CTX_SOA_FROM || ch->phase == CHGSET_CTX_SOA_TO ||
rrset->type == KNOT_RRTYPE_SOA) {
ch->phase++;
}
if (ch->curr_chunk == ch->chunk_count - 1 && wire_ctx_available(&ch->wire) == 0) {
ch->phase = CHGSET_CTX_DONE;
} else if (ch->phase == CHGSET_CTX_DONE) {
return KNOT_EMALF;
}
return ret;
}
/* Copyright (C) 2018 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "contrib/ucw/lists.h"
#include "contrib/wire_ctx.h"
#include "libknot/rrset.h"
typedef enum {
CHGSET_CTX_NOITER = 0,
CHGSET_CTX_START,
CHGSET_CTX_SOA_FROM,
CHGSET_CTX_REM,
CHGSET_CTX_SOA_TO,
CHGSET_CTX_ADD,
CHGSET_CTX_DONE,
} chgset_ctx_phase_t;
struct journal_txn; // journal.c
typedef struct {
node_t n;
uint8_t **src_chunks;
size_t *chunk_sizes;
size_t chunk_count;
size_t curr_chunk;
wire_ctx_t wire;
chgset_ctx_phase_t phase;
uint32_t serial_from;
uint32_t serial_to;
} chgset_ctx_t;
typedef struct {
list_t l;
struct journal_txn *txn;
} chgset_ctx_list_t;
chgset_ctx_t *chgset_ctx_create(size_t chunk_count);
void chgset_ctx_free(chgset_ctx_t *ch);
void chgset_ctx_list_close(chgset_ctx_list_t *l);
void chgset_ctx_iterate(chgset_ctx_t *ch);
int chgset_ctx_next(chgset_ctx_t *ch, knot_rrset_t *rrset);
......@@ -120,7 +120,7 @@ typedef struct {
uint32_t flags; // LAST_FLUSHED_VALID, SERIAL_TO_VALID, MERGED_SERIAL_VALID.
} metadata_t;
typedef struct {
typedef struct journal_txn {
journal_t *j;
knot_db_txn_t *txn;
int ret;
......@@ -416,6 +416,13 @@ static void txn_commit(txn_t *txn)
txn_abort(txn); // no effect if all ok
}
void journal_txn_commit(struct journal_txn *txn)
{
if (txn != NULL) {
txn_commit(txn);
}
}
static void txn_restart(txn_t *txn)
{
txn_commit(txn);
......@@ -815,6 +822,30 @@ static int vals_to_changeset(knot_db_val_t *vals, int nvals,
return KNOT_EOK;
}
static int vals_to_chgset_ctx(knot_db_val_t *vals, int nvals, uint32_t serial_from,
uint32_t serial_to, chgset_ctx_t **ch)
{
if (nvals < 1) {
return KNOT_EINVAL;
}
chgset_ctx_t *t_ch = chgset_ctx_create(nvals);
if (t_ch == NULL) {
return KNOT_ENOMEM;
}
for (size_t i = 0; i < nvals; i++) {
t_ch->src_chunks[i] = vals[i].data + JOURNAL_HEADER_SIZE;
t_ch->chunk_sizes[i] = vals[i].len - JOURNAL_HEADER_SIZE;
}
t_ch->serial_from = serial_from;
t_ch->serial_to = serial_to;
*ch = t_ch;
return KNOT_EOK;
}
static int load_one_itercb(iteration_ctx_t *ctx)
{
changeset_t *ch = NULL, **targ = ctx->iter_context;
......@@ -840,6 +871,19 @@ static int load_list_itercb(iteration_ctx_t *ctx)
return ret;
}
static int load_list_ctx_itercb(iteration_ctx_t *ctx)
{
chgset_ctx_t *ch = NULL;
list_t *chlist = *(list_t **) ctx->iter_context;
int ret = vals_to_chgset_ctx(ctx->val, ctx->chunk_count, ctx->serial, ctx->serial_to, &ch);
if (ret == KNOT_EOK) {
add_tail(chlist, &ch->n);
}
return ret;
}
/*! \brief Load one changeset (with serial) from DB */
static int load_one(journal_t *j, txn_t *_txn, uint32_t serial, changeset_t **ch)
{
......@@ -879,6 +923,8 @@ int journal_load_changesets(journal_t *j, list_t *dst, uint32_t from)
local_txn_t(txn, j);
txn_begin(txn, false);
// TODO this should not be necessary:
// the merged changeset can be normally loaded with standard iteration
changeset_t *mch = NULL;
load_merged_changeset(j, txn, &mch, &from);
if (mch != NULL) {
......@@ -894,6 +940,32 @@ int journal_load_changesets(journal_t *j, list_t *dst, uint32_t from)
return txn->ret;
}
int journal_load_chgset_ctx(journal_t *j, chgset_ctx_list_t *dst, uint32_t from)
{
if (j == NULL || j->db == NULL || dst == NULL) return KNOT_EINVAL;
txn_t *txn = calloc(1, sizeof(*txn) + sizeof(*txn->txn));
if (txn == NULL) {
return KNOT_ENOMEM;
}
txn_init(txn, ((void *)txn) + sizeof(*txn), j);
txn_begin(txn, false);
init_list(&dst->l);
dst->txn = txn;
list_t *dstl = &dst->l;
uint32_t ls = txn->shadow_md.last_serial;
iterate(j, txn, load_list_ctx_itercb, JOURNAL_ITERATION_CHANGESETS, &dstl, from,
ls, normal_iterkeycb);
if (txn->ret != KNOT_EOK) {
txn_commit(txn);
}
return txn->ret;
}
int load_bootstrap_iterkeycb(iteration_ctx_t *ctx)
{
txn_key_str_u32(ctx->txn, ctx->txn->j->zone, KEY_BOOTSTRAP_CHANGESET, ctx->chunk_index);
......
......@@ -22,6 +22,7 @@
#include "contrib/ucw/lists.h"
#include "knot/updates/changesets.h"
#include "knot/journal/serialization.h"
#include "knot/journal/chgset_ctx.h"
#include "knot/zone/serial.h"
/*! \brief Minimum journal size. */
......@@ -53,6 +54,8 @@ typedef enum {
JOURNAL_CHECK_STDERR = 3, // Log everything and redirect to stderr.
} journal_check_level_t;
struct journal_txn;
/*!
* \brief Initialize shared journal DB file. The DB will be open on first use.
*
......@@ -134,6 +137,8 @@ void journal_close(journal_t *journal);
*/
int journal_load_changesets(journal_t *journal, list_t *dst, uint32_t from);
int journal_load_chgset_ctx(journal_t *j, chgset_ctx_list_t *dst, uint32_t from);
/*!
* \brief Load changesets from journal, starting with bootstrap changeset.
*
......@@ -225,4 +230,6 @@ void journal_metadata_info(journal_t *journal, bool *is_empty, kserial_t *merged
*/
int journal_check(journal_t *journal, journal_check_level_t warn_level);
void journal_txn_commit(struct journal_txn *txn);
/*! @} */
......@@ -18,7 +18,6 @@
#include "knot/journal/serialization.h"
#include "libknot/libknot.h"
#include "contrib/wire_ctx.h"
#define SERIALIZE_RRSET_INIT (-1)
#define SERIALIZE_RRSET_DONE ((1L<<16)+1)
......@@ -241,9 +240,9 @@ static int deserialize_rrset(wire_ctx_t *wire, knot_rrset_t *rrset, long *phase)
return KNOT_EOK;
}
static int deserialize_rrset_chunks(wire_ctx_t *wire, knot_rrset_t *rrset,
uint8_t *src_chunks[], const size_t *chunk_sizes,
size_t chunks_count, size_t *cur_chunk)
int deserialize_rrset_chunks(wire_ctx_t *wire, knot_rrset_t *rrset,
uint8_t *src_chunks[], const size_t *chunk_sizes,
size_t chunks_count, size_t *cur_chunk)
{
long phase = SERIALIZE_RRSET_INIT;
while (1) {
......
......@@ -19,6 +19,7 @@
#include <stdint.h>
#include "libknot/rrset.h"
#include "knot/updates/changesets.h"
#include "contrib/wire_ctx.h"
typedef struct serialize_ctx serialize_ctx_t;
......@@ -79,3 +80,19 @@ size_t changeset_serialized_size(const changeset_t *ch);
*/
int changeset_deserialize(changeset_t *ch, uint8_t *src_chunks[],
const size_t *chunks_sizes, size_t chunks_count);
/*!
* \brief Deserializes single RRSet being part of a changeset serialized in chunks.
*
* \param wire[in] Current chunk ready to be parsed.
* \param rrset[out] RRSet to be deserialized (empty before).
* \param src_chunks[in] All chunks of the serialized changeset.
* \param chunk_sizes[in] Their sizes.
* \param chunks_count[in] Their count.
* \param cur_chunk[in+out] Index of current chunk.
*
* \return KNOT_E*
*/
int deserialize_rrset_chunks(wire_ctx_t *wire, knot_rrset_t *rrset,
uint8_t *src_chunks[], const size_t *chunk_sizes,
size_t chunks_count, size_t *cur_chunk);
......@@ -312,6 +312,25 @@ int zone_changes_load(conf_t *conf, zone_t *zone, list_t *dst, uint32_t from)
return ret;
}
int zone_chgset_ctx_load(conf_t *conf, zone_t *zone, chgset_ctx_list_t *dst, uint32_t from)
{
if (conf == NULL || zone == NULL || dst == NULL) {
return KNOT_EINVAL;
}
int ret = KNOT_ENOENT;
if (journal_exists(zone->journal_db, zone->name)) {
ret = open_journal(zone);
}
if (ret == KNOT_EOK) {
ret = journal_load_chgset_ctx(zone->journal, dst, from);
}
return ret;
}
int zone_in_journal_load(conf_t *conf, zone_t *zone, list_t *dst)
{
if (conf == NULL || zone == NULL || dst == NULL) {
......
/* Copyright (C) 2017 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
/* Copyright (C) 2018 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -113,6 +113,7 @@ void zone_control_clear(zone_t *zone);
int zone_change_store(conf_t *conf, zone_t *zone, changeset_t *change);
int zone_changes_load(conf_t *conf, zone_t *zone, list_t *dst, uint32_t from);
int zone_chgset_ctx_load(conf_t *conf, zone_t *zone, chgset_ctx_list_t *dst, uint32_t from);
int zone_in_journal_load(conf_t *conf, zone_t *zone, list_t *dst);
int zone_in_journal_store(conf_t *conf, zone_t *zone, zone_contents_t *new_contents);
int zone_journal_serial(conf_t *conf, zone_t *zone, bool *is_empty, uint32_t *serial_to);
......
......@@ -296,6 +296,12 @@ static void test_store_load(void)
ret = journal_check(j, JOURNAL_CHECK_STDERR);
is_int(KNOT_EOK, ret, "journal check (%d)", ret);
/* Load ctx's. */
chgset_ctx_list_t cl = { { 0 }, 0 };
ret = journal_load_chgset_ctx(j, &cl, 0);
ok(ret == KNOT_EOK, "journal: chgset_ctx: load (%s)", knot_strerror(ret));
chgset_ctx_list_close(&cl);
changesets_free(&l);
changesets_free(&k);
......@@ -334,6 +340,11 @@ static void test_store_load(void)
ret = journal_load_changesets(j, &l, 1);
ok(ret == KNOT_EOK && changesets_list_eq(&l, &k), "journal: re-load changesets (%d)", ret);
ret = journal_load_chgset_ctx(j, &cl, 1);
ok(ret == KNOT_EOK, "journal: chgset_ctx: load 2 (%s)", knot_strerror(ret));
ok(list_size(&cl.l) == list_size(&l), "journal: chgset_ctx: load size %zu ?== %zu", list_size(&cl.l), list_size(&l));
chgset_ctx_list_close(&cl);
changesets_free(&l);
init_list(&l);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment