Commit 2daebeb0 authored by Daniel Salzman's avatar Daniel Salzman

Merge branch 'journal_optimization' into 'master'

Journal optimization

See merge request !880
parents 3040aae1 dc1428e7
......@@ -23,6 +23,7 @@
#include "contrib/files.h"
#include "contrib/ctype.h"
#include "libknot/endian.h"
#include "contrib/dynarray.h"
/*! \brief Journal version. */
#define JOURNAL_VERSION "1.0"
......@@ -427,8 +428,7 @@ static void txn_reuse(txn_t **txn, txn_t *to_reuse, bool write_allowed)
{
if (to_reuse == NULL) {
txn_begin(*txn, write_allowed);
}
else {
} else {
*txn = to_reuse;
}
}
......@@ -480,8 +480,7 @@ static void md_get_common_last_inserter_zone(txn_t *txn, knot_dname_t **res)
txn_key_str(txn, NULL, MDKEY_GLOBAL_LAST_INSERTER_ZONE);
if (txn_find(txn)) {
*res = knot_dname_copy(txn->val.data, NULL);
}
else {
} else {
*res = NULL;
}
}
......@@ -614,8 +613,7 @@ static int initial_md_check(journal_t *j, bool *dirty_present)
txn->val.data = JOURNAL_VERSION;
txn_insert(txn);
something_updated = true;
}
else {
} else {
char * jver = txn->val.data;
if (first_digit(jver) != first_digit(JOURNAL_VERSION)) {
txn_abort(txn);
......@@ -631,8 +629,7 @@ static int initial_md_check(journal_t *j, bool *dirty_present)
if (something_updated) {
txn_commit(txn);
}
else { // abort to gain up speed when opening a lot of zones
} else { // abort to gain up speed when opening a lot of zones
txn_abort(txn);
}
......@@ -750,8 +747,7 @@ static int iterate(journal_t *j, txn_t *_txn, iteration_cb_t cb, int method,
ctx.serial = ctx.serial_to;
ctx.chunk_index = 0;
}
else {
} else {
ctx.chunk_index++;
}
......@@ -1008,8 +1004,7 @@ static int delete_merged_changeset(journal_t *j, txn_t *t)
txn_check_ret(txn);
if (!md_flag(txn, MERGED_SERIAL_VALID)) {
txn->ret = KNOT_ENOENT;
}
else {
} else {
delete_upto(j, txn, txn->shadow_md.merged_serial, txn->shadow_md.merged_serial);
}
unreuse_txn(txn, t);
......@@ -1249,6 +1244,9 @@ m_u_ch_end:
return txn->ret;
}
dynarray_declare(chunk, knot_db_val_t, DYNARRAY_VISIBILITY_STATIC, 32)
dynarray_define(chunk, knot_db_val_t, DYNARRAY_VISIBILITY_STATIC)
// uses local context, e.g.: j, txn, changesets, nchs, serialized_size_total, store_changeset_cleanup, inserting_merged
#define try_flush \
if (!md_flushed(txn)) { \
......@@ -1278,10 +1276,7 @@ static int store_changesets(journal_t *j, list_t *changesets)
size_t nchs = 0, inserted_size = 0, insert_txn_count = 1;
size_t serialized_size_changes = 0, serialized_size_merged = 0;
uint8_t *allchunks = NULL;
uint8_t **chunkptrs = NULL;
size_t *chunksizes = NULL;
knot_db_val_t *vals = NULL;
size_t chunks = 0;
bool inserting_merged = false;
bool merged_into_bootstrap = false;
......@@ -1346,7 +1341,8 @@ static int store_changesets(journal_t *j, list_t *changesets)
delete_tofree(j, txn, tofree, &freed);
if (freed < free_min) {
txn->ret = KNOT_ESPACE;
log_zone_warning(j->zone, "journal, unable to make free space for insert");
log_zone_warning(j->zone, "journal, unable to make free space for insert, "
"required: %zu, max: %zu", occupied, occupied_max);
goto store_changeset_cleanup;
}
}
......@@ -1416,54 +1412,34 @@ static int store_changesets(journal_t *j, list_t *changesets)
}
}
// PART 5: serializing into chunks
// PART 5: serializing into lmdb
WALK_LIST(ch, *changesets) {
if (txn->ret != KNOT_EOK) {
break;
}
// Twice chsize seems like enough room to store all chunks together.
size_t maxchunks = changeset_serialized_size(ch) * 2 / CHUNK_MAX + 1, chunks;
allchunks = malloc(maxchunks * CHUNK_MAX);
chunkptrs = malloc(maxchunks * sizeof(uint8_t *));
chunksizes = malloc(maxchunks * sizeof(size_t));
vals = malloc(maxchunks * sizeof(knot_db_val_t));
if (allchunks == NULL || chunkptrs == NULL || chunksizes == NULL || vals == NULL) {
chunk_dynarray_t dchunks = { 0 };
chunks = 0;
serialize_ctx_t *sctx = serialize_init(ch);
if (sctx == NULL) {
txn->ret = KNOT_ENOMEM;
break;
}
for (size_t i = 0; i < maxchunks; i++) {
chunkptrs[i] = allchunks + i*CHUNK_MAX + JOURNAL_HEADER_SIZE;
}
txn->ret = changeset_serialize(ch, chunkptrs, CHUNK_MAX - JOURNAL_HEADER_SIZE,
maxchunks, chunksizes, &chunks);
if (txn->ret != KNOT_EOK) {
break;
}
bool is_this_merged = (inserting_merged && ch == TAIL(*changesets));
bool is_this_bootstrap = (ch->soa_from == NULL);
uint32_t serial = is_this_bootstrap ? 0 : knot_soa_serial(&ch->soa_from->rrs);
uint32_t serial_to = knot_soa_serial(&ch->soa_to->rrs);
for (size_t i = 0; i < chunks; i++) {
vals[i].data = allchunks + i*CHUNK_MAX;
vals[i].len = JOURNAL_HEADER_SIZE + chunksizes[i];
make_header(vals + i, serial_to, chunks);
}
// PART 6: inserting vals into db
for (size_t i = 0; i < chunks; i++) {
if (txn->ret != KNOT_EOK) break;
if (is_this_bootstrap) {
txn_key_str_u32(txn, j->zone, KEY_BOOTSTRAP_CHANGESET, i);
}
else {
txn_key_2u32(txn, j->zone, serial, i);
while (serialize_unfinished(sctx)) {
size_t chunk_size;
serialize_prepare(sctx, CHUNK_MAX - JOURNAL_HEADER_SIZE, &chunk_size);
if (chunk_size == 0) {
break;
}
txn->val = vals[i];
txn_insert(txn);
inserted_size += (vals+i)->len;
inserted_size += chunk_size;
if ((float)inserted_size > journal_max_txn(j) * (float)j->db->fslimit) { // insert txn too large
inserted_size = 0;
txn->shadow_md.dirty_serial = serial;
......@@ -1472,8 +1448,33 @@ static int store_changesets(journal_t *j, list_t *changesets)
insert_txn_count++;
txn->shadow_md.flags &= ~DIRTY_SERIAL_VALID;
}
if (is_this_bootstrap) {
txn_key_str_u32(txn, j->zone, KEY_BOOTSTRAP_CHANGESET, chunks);
} else {
txn_key_2u32(txn, j->zone, serial, chunks);
}
txn->val.data = NULL;
txn->val.len = chunk_size + JOURNAL_HEADER_SIZE;
txn_insert(txn);
if (txn->ret != KNOT_EOK) break;
chunk_dynarray_add(&dchunks, &txn->val);
chunks++;
serialize_chunk(sctx, txn->val.data + JOURNAL_HEADER_SIZE, chunk_size);
}
serialize_deinit(sctx);
dynarray_foreach(chunk, knot_db_val_t, val, dchunks) {
make_header(val, serial_to, chunks);
}
chunk_dynarray_free(&dchunks);
// PART 7: metadata update
if (txn->ret != KNOT_EOK) {
break;
......@@ -1488,8 +1489,7 @@ static int store_changesets(journal_t *j, list_t *changesets)
txn->shadow_md.last_serial_to = serial_to;
}
txn->shadow_md.flags |= SERIAL_TO_VALID;
}
else {
} else {
if (!md_flag(txn, SERIAL_TO_VALID) || md_flag(txn, FIRST_SERIAL_INVALID)) {
txn->shadow_md.first_serial = serial;
}
......@@ -1499,15 +1499,6 @@ static int store_changesets(journal_t *j, list_t *changesets)
txn->shadow_md.last_serial_to = serial_to;
txn->shadow_md.changeset_count++;
}
free(allchunks);
free(chunkptrs);
free(chunksizes);
free(vals);
allchunks = NULL;
chunkptrs = NULL;
chunksizes = NULL;
vals = NULL;
}
// PART X : finalization and cleanup
......@@ -1525,11 +1516,6 @@ store_changeset_cleanup:
txn_commit(ddtxn);
}
if (allchunks != NULL) free(allchunks);
if (chunkptrs != NULL) free(chunkptrs);
if (chunksizes != NULL) free(chunksizes);
if (vals != NULL) free(vals);
changeset_t *dbgchst = TAIL(*changesets);
if (inserting_merged) {
......@@ -2026,13 +2012,11 @@ int journal_check(journal_t *j, journal_check_level_t warn_level)
ret = load_bootstrap_changeset(j, txn, &ch);
if (ret != KNOT_EOK) {
jch_warn("can't read bootstrap changeset (%s)", knot_strerror(ret));
}
else {
} else {
changeset_free(ch);
}
goto check_merged;
}
else {
} else {
ret = load_bootstrap_changeset(j, txn, &ch);
switch (ret) {
case KNOT_EOK:
......@@ -2067,8 +2051,7 @@ int journal_check(journal_t *j, journal_check_level_t warn_level)
if (ret != KNOT_EOK) {
jch_warn("can't read last flushed changeset %u (%s)",
txn->shadow_md.last_flushed, knot_strerror(ret));
}
else {
} else {
first_unflushed = knot_soa_serial(&ch->soa_to->rrs);
}
}
......@@ -2083,8 +2066,7 @@ int journal_check(journal_t *j, journal_check_level_t warn_level)
ret = load_one(j, txn, sto, &ch);
if (ret != KNOT_EOK) {
jch_warn("can't read second changeset %u (%s)", sto, knot_strerror(ret));
}
else {
} else {
sfrom = knot_soa_serial(&ch->soa_from->rrs);
if (!serial_equal(sfrom, sto)) {
jch_warn("second changeset's serial 'from' %u is not ok", sfrom);
......@@ -2130,8 +2112,7 @@ check_merged:
ret = load_merged_changeset(j, txn, &ch, NULL);
if (ret != KNOT_EOK) {
jch_warn("can't read merged changeset (%s)", knot_strerror(ret));
}
else {
} else {
sfrom = knot_soa_serial(&ch->soa_from->rrs);
sto = knot_soa_serial(&ch->soa_to->rrs);
jch_info("merged changeset %u -> %u (size %zu)", sfrom, sto,
......
......@@ -23,40 +23,173 @@
#define SERIALIZE_RRSET_INIT (-1)
#define SERIALIZE_RRSET_DONE ((1L<<16)+1)
static int serialize_rrset(wire_ctx_t *wire, const knot_rrset_t *rrset, long *phase)
typedef enum {
PHASE_SOA_1,
PHASE_REM,
PHASE_SOA_2,
PHASE_ADD,
PHASE_END,
} serialize_phase_t;
#define RRSET_BUF_MAXSIZE 256
struct serialize_ctx {
const changeset_t *ch;
changeset_iter_t it;
serialize_phase_t changeset_phase;
long rrset_phase;
knot_rrset_t rrset_buf[RRSET_BUF_MAXSIZE];
size_t rrset_buf_size;
};
serialize_ctx_t *serialize_init(const changeset_t *ch)
{
assert(wire != NULL && rrset != NULL && phase != NULL);
assert(*phase >= SERIALIZE_RRSET_INIT && *phase < SERIALIZE_RRSET_DONE);
serialize_ctx_t *ctx = calloc(1, sizeof(*ctx));
if (ctx == NULL) {
return NULL;
}
ctx->ch = ch;
ctx->changeset_phase = ch->soa_from != NULL ? PHASE_SOA_1 : PHASE_SOA_2;
ctx->rrset_phase = SERIALIZE_RRSET_INIT;
ctx->rrset_buf_size = 0;
return ctx;
}
if (*phase == SERIALIZE_RRSET_INIT) {
// write owner, type, class, rrcnt
int size = knot_dname_to_wire(wire->position, rrset->owner,
wire_ctx_available(wire));
if (size < 0 || wire_ctx_available(wire) < size + 3 * sizeof(uint16_t)) {
return KNOT_EOK;
static const knot_rrset_t get_next_rrset(serialize_ctx_t *ctx)
{
knot_rrset_t res;
knot_rrset_init_empty(&res);
switch (ctx->changeset_phase) {
case PHASE_SOA_1:
changeset_iter_rem(&ctx->it, ctx->ch);
ctx->changeset_phase = PHASE_REM;
return *ctx->ch->soa_from;
case PHASE_REM:
res = changeset_iter_next(&ctx->it);
if (knot_rrset_empty(&res)) {
changeset_iter_clear(&ctx->it);
changeset_iter_add(&ctx->it, ctx->ch);
ctx->changeset_phase = PHASE_ADD;
return *ctx->ch->soa_to;
}
wire_ctx_skip(wire, size);
wire_ctx_write_u16(wire, rrset->type);
wire_ctx_write_u16(wire, rrset->rclass);
wire_ctx_write_u16(wire, rrset->rrs.rr_count);
(*phase)++;
return res;
case PHASE_SOA_2:
if (ctx->it.node != NULL) {
changeset_iter_clear(&ctx->it);
}
changeset_iter_add(&ctx->it, ctx->ch);
ctx->changeset_phase = PHASE_ADD;
return *ctx->ch->soa_to;
case PHASE_ADD:
res = changeset_iter_next(&ctx->it);
if (knot_rrset_empty(&res)) {
changeset_iter_clear(&ctx->it);
ctx->changeset_phase = PHASE_END;
}
return res;
default:
return res;
}
}
for ( ; *phase < rrset->rrs.rr_count; (*phase)++) {
const knot_rdata_t *rr = knot_rdataset_at(&rrset->rrs, *phase);
assert(rr);
uint16_t rdlen = rr->len;
if (wire_ctx_available(wire) < sizeof(uint32_t) + sizeof(uint16_t) + rdlen) {
return KNOT_EOK;
void serialize_prepare(serialize_ctx_t *ctx, size_t max_size, size_t *realsize)
{
*realsize = 0;
// check if we are in middle of a rrset
if (ctx->rrset_phase != SERIALIZE_RRSET_INIT) {
assert(ctx->rrset_buf_size > 0);
ctx->rrset_buf[0] = ctx->rrset_buf[ctx->rrset_buf_size - 1];
ctx->rrset_buf_size = 1;
} else {
ctx->rrset_buf[0] = get_next_rrset(ctx);
if (ctx->changeset_phase == PHASE_END) {
ctx->rrset_buf_size = 0;
return;
}
// Compatibility, but one TTL per rrset would be enough.
wire_ctx_write_u32(wire, rrset->ttl);
wire_ctx_write_u16(wire, rdlen);
wire_ctx_write(wire, rr->data, rdlen);
ctx->rrset_buf_size = 1;
}
*phase = SERIALIZE_RRSET_DONE;
return KNOT_EOK;
size_t candidate = 0;
long tmp_phase = ctx->rrset_phase;
while (1) {
if (tmp_phase >= ctx->rrset_buf[ctx->rrset_buf_size - 1].rrs.rr_count) {
if (ctx->rrset_buf_size >= RRSET_BUF_MAXSIZE) {
return;
}
ctx->rrset_buf[ctx->rrset_buf_size++] = get_next_rrset(ctx);
if (ctx->changeset_phase == PHASE_END) {
ctx->rrset_buf_size--;
return;
}
tmp_phase = SERIALIZE_RRSET_INIT;
}
if (tmp_phase == SERIALIZE_RRSET_INIT) {
candidate += 3 * sizeof(uint16_t) +
knot_dname_size(ctx->rrset_buf[ctx->rrset_buf_size - 1].owner);
} else {
candidate += sizeof(uint32_t) + sizeof(uint16_t) +
knot_rdataset_at(&ctx->rrset_buf[ctx->rrset_buf_size - 1].rrs, tmp_phase)->len;
}
if (candidate > max_size) {
return;
}
*realsize = candidate;
tmp_phase++;
}
}
void serialize_chunk(serialize_ctx_t *ctx, uint8_t *dst_chunk, size_t chunk_size)
{
wire_ctx_t wire = wire_ctx_init(dst_chunk, chunk_size);
for (size_t i = 0; ; ) {
if (ctx->rrset_phase >= ctx->rrset_buf[i].rrs.rr_count) {
if (++i >= ctx->rrset_buf_size) {
break;
}
ctx->rrset_phase = SERIALIZE_RRSET_INIT;
}
if (ctx->rrset_phase == SERIALIZE_RRSET_INIT) {
int size = knot_dname_to_wire(wire.position, ctx->rrset_buf[i].owner,
wire_ctx_available(&wire));
if (size < 0 || wire_ctx_available(&wire) < size + 3 * sizeof(uint16_t)) {
break;
}
wire_ctx_skip(&wire, size);
wire_ctx_write_u16(&wire, ctx->rrset_buf[i].type);
wire_ctx_write_u16(&wire, ctx->rrset_buf[i].rclass);
wire_ctx_write_u16(&wire, ctx->rrset_buf[i].rrs.rr_count);
} else {
const knot_rdata_t *rr = knot_rdataset_at(&ctx->rrset_buf[i].rrs,
ctx->rrset_phase);
assert(rr);
uint16_t rdlen = rr->len;
if (wire_ctx_available(&wire) < sizeof(uint32_t) + sizeof(uint16_t) + rdlen) {
break;
}
// Compatibility, but one TTL per rrset would be enough.
wire_ctx_write_u32(&wire, ctx->rrset_buf[i].ttl);
wire_ctx_write_u16(&wire, rdlen);
wire_ctx_write(&wire, rr->data, rdlen);
}
ctx->rrset_phase++;
}
}
bool serialize_unfinished(serialize_ctx_t *ctx)
{
return ctx->changeset_phase < PHASE_END;
}
void serialize_deinit(serialize_ctx_t *ctx)
{
if (ctx->it.node != NULL) {
changeset_iter_clear(&ctx->it);
}
free(ctx);
}
static int deserialize_rrset(wire_ctx_t *wire, knot_rrset_t *rrset, long *phase)
......@@ -95,7 +228,7 @@ static int deserialize_rrset(wire_ctx_t *wire, knot_rrset_t *rrset, long *phase)
if (wire->error != KNOT_EOK ||
wire_ctx_available(wire) < rdata_size ||
knot_rrset_add_rdata(rrset, wire->position, rdata_size,
NULL) != KNOT_EOK) {
NULL) != KNOT_EOK) {
knot_rrset_clear(rrset, NULL);
return KNOT_EMALF;
}
......@@ -108,30 +241,6 @@ static int deserialize_rrset(wire_ctx_t *wire, knot_rrset_t *rrset, long *phase)
return KNOT_EOK;
}
static int serialize_rrset_chunks(wire_ctx_t *wire, const knot_rrset_t *rrset,
uint8_t *dst_chunks[], size_t chunk_size,
size_t chunks_count, size_t *chunks_real_sizes,
size_t *cur_chunk)
{
long phase = SERIALIZE_RRSET_INIT;
while (1) {
int ret = serialize_rrset(wire, rrset, &phase);
if (ret != KNOT_EOK || phase == SERIALIZE_RRSET_DONE) {
return ret;
}
// now the rrset didn't fit whole to this chunk
if (*cur_chunk >= chunks_count - 1) {
return KNOT_ESPACE;
}
if (wire->error != KNOT_EOK) {
return wire->error;
}
chunks_real_sizes[*cur_chunk] = wire_ctx_offset(wire);
(*cur_chunk)++;
*wire = wire_ctx_init(dst_chunks[*cur_chunk], chunk_size);
}
}
static int deserialize_rrset_chunks(wire_ctx_t *wire, knot_rrset_t *rrset,
uint8_t *src_chunks[], const size_t *chunk_sizes,
size_t chunks_count, size_t *cur_chunk)
......@@ -200,86 +309,6 @@ size_t changeset_serialized_size(const changeset_t *ch)
return soa_from_size + soa_to_size + change_size;
}
int changeset_serialize(const changeset_t *ch, uint8_t *dst_chunks[],
size_t chunk_size, size_t chunks_count, size_t *chunks_real_sizes,
size_t *chunks_real_count)
{
if (ch == NULL || dst_chunks == NULL || chunk_size == 0 || chunks_count == 0 ||
chunks_real_sizes == NULL || chunks_real_count == NULL) {
return KNOT_EINVAL;
}
for (size_t i = 0; i < chunks_count; i++) {
chunks_real_sizes[i] = 0;
}
wire_ctx_t wire = wire_ctx_init(dst_chunks[0], chunk_size);
size_t cur_chunk = 0;
if (ch->soa_from == NULL) {
// serializing bootstrap changeset
goto serialize_to; // note: it & ret & rrset are uninitialized here, we don't care
}
// Serialize SOA 'from'.
int ret = serialize_rrset_chunks(&wire, ch->soa_from, dst_chunks, chunk_size,
chunks_count, chunks_real_sizes, &cur_chunk);
if (ret != KNOT_EOK) {
return ret;
}
// Serialize RRSets from the 'rem' section.
changeset_iter_t it;
ret = changeset_iter_rem(&it, ch);
if (ret != KNOT_EOK) {
return ret;
}
knot_rrset_t rrset = changeset_iter_next(&it);
while (!knot_rrset_empty(&rrset)) {
ret = serialize_rrset_chunks(&wire, &rrset, dst_chunks, chunk_size,
chunks_count, chunks_real_sizes, &cur_chunk);
if (ret != KNOT_EOK) {
changeset_iter_clear(&it);
return ret;
}
rrset = changeset_iter_next(&it);
}
changeset_iter_clear(&it);
serialize_to:
// Serialize SOA 'to'.
ret = serialize_rrset_chunks(&wire, ch->soa_to, dst_chunks, chunk_size,
chunks_count, chunks_real_sizes, &cur_chunk);
if (ret != KNOT_EOK) {
return ret;
}
// Serialize RRSets from the 'add' section.
ret = changeset_iter_add(&it, ch);
if (ret != KNOT_EOK) {
return ret;
}
rrset = changeset_iter_next(&it);
while (!knot_rrset_empty(&rrset)) {
ret = serialize_rrset_chunks(&wire, &rrset, dst_chunks, chunk_size,
chunks_count, chunks_real_sizes, &cur_chunk);
if (ret != KNOT_EOK) {
changeset_iter_clear(&it);
return ret;
}
rrset = changeset_iter_next(&it);
}
changeset_iter_clear(&it);
chunks_real_sizes[cur_chunk] = wire_ctx_offset(&wire);
*chunks_real_count = cur_chunk + 1;
return wire.error;
}
int changeset_deserialize(changeset_t *ch, uint8_t *src_chunks[],
const size_t *chunks_sizes, size_t chunks_count)
{
......
/* Copyright (C) 2017 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
/* Copyright (C) 2018 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -20,51 +20,55 @@
#include "libknot/rrset.h"
#include "knot/updates/changesets.h"
typedef struct serialize_ctx serialize_ctx_t;
/*!
* \brief Returns size of changeset in serialized form.
* \brief Init serialization context.
*
* \param[in] ch Changeset whose size we want to compute.
* \param ch Changeset to be serialized.
*
* \return Size of the changeset.
* \return Context.
*/
size_t changeset_serialized_size(const changeset_t *ch);
serialize_ctx_t *serialize_init(const changeset_t *ch);
/*!
* \brief Serializes given changeset into chunked area.
* \brief Pre-check and space computation before serializing a chunk.
*
* \param[in] ch The changeset.
* \param[in] dst_chunks The chunks to serialize into.
* \param[in] chunk_size Maximum size of each chunk.
* \param[in] chunks_count Maximum number of used chunks.
* \param[out] chunks_real_sizes Real size of each chunk after serialization, or zeros for unused chunks.
* \param[out] chunks_real_count Real # of chunks after serialization. Can be wrong if error returned!
* \note This MUST be called before each serialize_chunk() !
*
* \retval KNOT_E*
* \param ctx Serializing context.
* \param max_size Maximum size of next chunk.
* \param realsize Output: real exact size of next chunk.
*/
int changeset_serialize(const changeset_t *ch, uint8_t *dst_chunks[],
size_t chunk_size, size_t chunks_count, size_t *chunks_real_sizes,
size_t *chunks_real_count);
void serialize_prepare(serialize_ctx_t *ctx, size_t max_size, size_t *realsize);
/*!
* \brief Deserializes chunked area into ch
* \brief Perform one step of serializiation: fill one chunk.
*
* \param[out] ch The changeset.
* \param[in] src_chunks The chunks to deserialize.
* \param[in] chunks_sizes The size of each chunk.
* \param[in] chunks_count The number of chunks.
*
* \retval KNOT_E*
* \param ctx Serializing context.
* \param chunk Pointer on allocated memory to be serialized into.
* \param chunk_size Its size. It MUST be the same as returned from serialize_prepare().
*/
int changeset_deserialize(changeset_t *ch, uint8_t *src_chunks[],
const size_t *chunks_sizes, size_t chunks_count);
void serialize_chunk(serialize_ctx_t *ctx, uint8_t *chunk, size_t chunk_size);
/*! \brief Tells if there remains something of the changeset
* to be serialized into next chunk(s) yet. */
bool serialize_unfinished(serialize_ctx_t *ctx);
/*! \brief Free serialization context. */
void serialize_deinit(serialize_ctx_t *ctx);
/*!
* \brief Deserializes bootstrap changeset
* \brief Returns size of changeset in serialized form.
*
* This is like changeset_deserialize(), but expects a bootstrap changeset (no soa_from and no
* 'rem' section) in src_chunks.
* \param[in] ch Changeset whose size we want to compute.
*
* There is no changeset_serialize_botstrap: simply run changeset_serialize() with ch->soa_from = NULL.
* \return Size of the changeset.
*/
size_t changeset_serialized_size(const changeset_t *ch);
/*!
* \brief Deserializes chunked area into ch
*
* \param[out] ch The changeset.
* \param[in] src_chunks The chunks to deserialize.
......@@ -73,5 +77,5 @@ int changeset_deserialize(changeset_t *ch, uint8_t *src_chunks[],
*
* \retval KNOT_E*
*/
int changeset_deserialize_bootstrap(changeset_t *ch, uint8_t *src_chunks[],
const size_t *chunks_sizes, size_t chunks_count);
int changeset_deserialize(changeset_t *ch, uint8_t *src_chunks[],
const size_t *chunks_sizes, size_t chunks_count);
......@@ -225,11 +225,18 @@ int print_journal(char *path, knot_dname_t *name, uint32_t limit, bool color, bo
if (debugmode) {
if ((alternative_from && serial_from.valid) ||
kserial_equal(serial_from, last_flushed)) {
printf("---- Additional history ----\n");
init_list(&db);
ret = journal_load_changesets(j, &db, serial_from.serial);
if (ret != KNOT_EOK) {
switch (ret) {
case KNOT_EOK:
printf("---- Additional history ----\n");
break;
case KNOT_ENOENT:
printf("---- No additional history ----\n");
ret = KNOT_EOK;
break;
default:
goto pj_finally;
}
WALK_LIST(chs, db) {
......
......@@ -310,7 +310,7 @@ static void test_store_load(void)
/* Fill the journal. */
ret = KNOT_EOK;
uint32_t serial = 1;
for (; ret == KNOT_EOK; ++serial) {
for (; ret == KNOT_EOK && serial < 40000; ++serial) {