journal.c 59.7 KB
Newer Older
1
/*  Copyright (C) 2018 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
2 3 4 5 6 7 8 9 10 11 12 13

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
14
    along with this program.  If not, see <https://www.gnu.org/licenses/>.
15 16
 */

17
#include <inttypes.h>
18 19 20 21 22 23 24
#include <limits.h>
#include <sys/stat.h>
#include <stdarg.h>

#include "knot/journal/journal.h"
#include "knot/common/log.h"
#include "contrib/files.h"
25
#include "contrib/ctype.h"
26
#include "libknot/endian.h"
27
#include "contrib/dynarray.h"
28

29 30
/*! \brief Journal version. */
#define JOURNAL_VERSION	"1.0"
31
/*! \brief Changeset chunk size. */
32
#define CHUNK_MAX	(70 * 1024)
33 34
/*! \brief Max number of concurrent DB readers. */
#define JOURNAL_MAX_READERS 630
35

36 37 38 39 40 41 42
/*! \brief Various metadata DB key strings. Also hardcoded in macro txn_commit()! */
#define MDKEY_GLOBAL_VERSION			"version"
#define MDKEY_GLOBAL_JOURNAL_COUNT		"journal_count"
#define MDKEY_GLOBAL_LAST_TOTAL_OCCUPIED	"last_total_occupied"
#define MDKEY_GLOBAL_LAST_INSERTER_ZONE		"last_inserter_zone"
#define MDKEY_PERZONE_OCCUPIED			"occupied"
#define MDKEY_PERZONE_FLAGS			"flags"
43
#define KEY_BOOTSTRAP_CHANGESET			"bootstrap"
44

45
/*! \brief The number of unused bytes in DB key. */
46 47
#define DB_KEY_UNUSED_ZERO (4)

48 49 50 51
/*! \brief Metadata inserted on the beginning of each chunk:
 * uint32_t serial_to + uint32_t chunk_count + 24B unused */
#define JOURNAL_HEADER_SIZE (32)

52 53
// eventually move to contrib and reuse as needed
#define local_array_max_static_size (100)
Libor Peltan's avatar
Libor Peltan committed
54
#define local_array(type, name, size) \
55 56
	type name ## _static__[local_array_max_static_size] = { 0 }; \
	type *name ## _dynamic__ = ((size) > local_array_max_static_size ? calloc((size), sizeof(type)) : NULL); \
Libor Peltan's avatar
Libor Peltan committed
57
	type *name = ((size) > local_array_max_static_size ? name ## _dynamic__ : name ## _static__);
58 59
#define local_array_free(name) { free(name ## _dynamic__); }

60
enum {
61 62 63 64 65
	LAST_FLUSHED_VALID   = 1 << 0, /* "last flush is valid" flag. */
	SERIAL_TO_VALID      = 1 << 1, /* "last serial_to is valid" flag. */
	MERGED_SERIAL_VALID  = 1 << 2, /* "serial_from" of merged changeset. */
	DIRTY_SERIAL_VALID   = 1 << 3, /* "dirty_serial" is present in the DB. */
	FIRST_SERIAL_INVALID = 1 << 4, /* "first_serial" is not valid. */
66 67
};

68
static bool journal_flush_allowed(journal_t *j) {
69
	conf_val_t val = conf_zone_get(conf(), C_ZONEFILE_SYNC, j->zone);
70
	return conf_int(&val) >= 0;
71 72
}

73
static bool journal_merge_allowed(journal_t *j) {
74 75 76
	return !journal_flush_allowed(j); // TODO think of other behaviour, e.g. setting
}

77
static size_t journal_max_usage(journal_t *j)
78 79 80 81 82
{
	conf_val_t val = conf_zone_get(conf(), C_MAX_JOURNAL_USAGE, j->zone);
	return conf_int(&val);
}

83
static size_t journal_max_changesets(journal_t *j)
84 85 86 87 88
{
	conf_val_t val = conf_zone_get(conf(), C_MAX_JOURNAL_DEPTH, j->zone);
	return conf_int(&val);
}

89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
static float journal_tofree_factor(journal_t *j)
{
	return 2.0f;
}

static float journal_minfree_factor(journal_t *j)
{
	return 0.33f;
}

static float journal_max_txn(journal_t *j)
{
	return 0.05f;
}

/*
 * ***************************** PART I *******************************
 *
 *  Transaction manipulation functions
 *
 * ********************************************************************
 */

112 113 114 115 116 117 118 119 120 121 122
typedef struct {
	uint32_t first_serial;    // Serial_from of the first changeset.
	uint32_t last_serial;     // Serial_from of the last changeset.
	uint32_t last_serial_to;  // Serial_to of the last changeset.
	uint32_t last_flushed;    // Serial_from of the last flushed (or merged) chengeset.
	uint32_t merged_serial;   // "serial_from" of merged changeset.
	uint32_t dirty_serial;    // Serial_from of an incompletely inserted changeset which shall be deleted (see DB_MAX_INSERT_TXN).
	uint32_t changeset_count; // Number of changesets in this journal.
	uint32_t flags;           // LAST_FLUSHED_VALID, SERIAL_TO_VALID, MERGED_SERIAL_VALID.
} metadata_t;

123
typedef struct journal_txn {
124 125 126
	journal_t *j;
	knot_db_txn_t *txn;
	int ret;
127
	bool opened;
128

Daniel Salzman's avatar
Daniel Salzman committed
129
	bool is_rw;
130 131 132 133 134 135 136

	knot_db_iter_t *iter;

	knot_db_val_t key;
	knot_db_val_t val;
	uint8_t key_raw[512];

137
	metadata_t shadow_md;
138 139
} txn_t;

140 141 142 143
static void md_get(txn_t *txn, const knot_dname_t *zone, const char *mdkey, uint64_t *res);
static void md_get32(txn_t *txn, const knot_dname_t *zone, const char *mdkey, uint32_t *res);
static void md_set(txn_t *txn, const knot_dname_t *zone, const char *mdkey, uint64_t val);
static void md_set32(txn_t *txn, const knot_dname_t *zone, const char *mdkey, uint32_t val);
144 145 146

static void txn_init(txn_t *txn, knot_db_txn_t *db_txn, journal_t *j)
{
147
	memset(txn, 0, sizeof(*txn));
148 149 150 151 152 153 154 155 156 157 158
	txn->j = j;
	txn->txn = db_txn;
	txn->key.data = &txn->key_raw;
}

#define local_txn_t(txn_name, journal) \
	knot_db_txn_t __db_txn_ ## txn_name; \
	txn_t __local_txn_ ## txn_name; \
	txn_t *txn_name = &__local_txn_ ## txn_name; \
	txn_init(txn_name, &__db_txn_ ## txn_name, (journal))

159 160 161 162 163 164 165
/*
 * Structure of the DB key:
 * Metadata:
 * | [ zone_name | \0 ] | unused zero 4B | metadata_key | \0 |
 *
 * Changeset:
 * | zone_name | \0 | unused zero 4B | (be32)serial_from | (be32)chunk_index |
166 167
 *  or
 * | zone_name | \0 | unused zero 4B | metadata_key | \0 | (be32)serial_from |
168 169 170 171 172
 *
 * Structure of the changeset:
 * | (be32)serial_to | (be32)#of_chunks | unused zero 24B | serialized_changeset...
 *
 */
173

174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
static bool key_is_ok(const knot_db_val_t *key, bool zone_related)
{
	const uint8_t *it = key->data;
	ssize_t it_len = key->len;
	if (zone_related) {
		size_t dname_len = knot_dname_size(it);
		it += dname_len;
		it_len -= dname_len;
	}
	it += 4;
	it_len -= 4;

	return ((zone_related && it_len == 8) || // normal changeset
	       (is_lower(*it) && !is_lower(*(it-1)))); // metadata
}

190 191
static void txn_key_str(txn_t *txn, const knot_dname_t *zone, const char *key)
{
192
	size_t zone_size = knot_dname_size(zone);
193
	txn->key.len = zone_size + DB_KEY_UNUSED_ZERO + strlen(key) + 1;
194 195 196 197 198
	if (txn->key.len > 512) {
		txn->ret = KNOT_ERROR;
		return;
	}
	if (zone != NULL) memcpy(txn->key.data, zone, zone_size);
199 200
	memset(txn->key.data + zone_size, 0, DB_KEY_UNUSED_ZERO);
	strcpy(txn->key.data + zone_size + DB_KEY_UNUSED_ZERO, key);
201
	assert(key_is_ok(&txn->key, zone != NULL));
202 203 204 205
}

static void txn_key_2u32(txn_t *txn, const knot_dname_t *zone, uint32_t key1, uint32_t key2)
{
206
	size_t zone_size = knot_dname_size(zone);
207
	txn->key.len = zone_size + DB_KEY_UNUSED_ZERO + 2*sizeof(uint32_t);
208 209 210 211 212
	if (txn->key.len > 512) {
		txn->ret = KNOT_ERROR;
		return;
	}
	if (zone != NULL) memcpy(txn->key.data, zone, zone_size);
213
	memset(txn->key.data + zone_size, 0, DB_KEY_UNUSED_ZERO);
214 215
	uint32_t key_be1 = htobe32(key1);
	uint32_t key_be2 = htobe32(key2);
216 217 218
	memcpy(txn->key.data + zone_size + DB_KEY_UNUSED_ZERO, &key_be1, sizeof(uint32_t));
	memcpy(txn->key.data + zone_size + DB_KEY_UNUSED_ZERO + sizeof(uint32_t),
	       &key_be2, sizeof(uint32_t));
219
	assert(key_is_ok(&txn->key, zone != NULL));
220 221
}

222 223
static void txn_key_str_u32(txn_t *txn, const knot_dname_t *zone, const char *key1, uint32_t key2)
{
224
	size_t zone_size = knot_dname_size(zone);
225 226 227 228 229 230 231 232 233 234 235
	txn->key.len = zone_size + DB_KEY_UNUSED_ZERO + strlen(key1) + 1 + sizeof(uint32_t);
	if (txn->key.len > 512) {
		txn->ret = KNOT_ERROR;
		return;
	}
	if (zone != NULL) memcpy(txn->key.data, zone, zone_size);
	memset(txn->key.data + zone_size, 0, DB_KEY_UNUSED_ZERO);
	strcpy(txn->key.data + zone_size + DB_KEY_UNUSED_ZERO, key1);
	uint32_t key_be2 = htobe32(key2);
	memcpy(txn->key.data + zone_size + DB_KEY_UNUSED_ZERO + strlen(key1) + 1,
	       &key_be2, sizeof(uint32_t));
236
	assert(key_is_ok(&txn->key, zone != NULL));
237 238
}

239 240 241 242 243
static int txn_cmpkey(txn_t *txn, knot_db_val_t *key2)
{
	if (txn->key.len != key2->len) {
		return (txn->key.len < key2->len ? -1 : 1);
	}
244 245 246
	if (key2->len == 0) {
		return 0;
	}
247 248 249
	return memcmp(txn->key.data, key2->data, key2->len);
}

250
static void txn_val_u64(txn_t *txn, uint64_t *res)
251 252 253 254
{
	if (txn->ret != KNOT_EOK) {
		return;
	}
255 256 257 258 259 260 261 262 263 264 265 266
	uint32_t beval32;
	uint64_t beval;
	switch (txn->val.len) {
	case sizeof(uint32_t):
		memcpy(&beval32, (uint32_t *)txn->val.data, sizeof(beval32));
		*res = (uint64_t)be32toh(beval32);
		break;
	case sizeof(uint64_t):
		memcpy(&beval, (uint64_t *)txn->val.data, sizeof(beval));
		*res = be64toh(beval);
		break;
	default:
267 268 269 270
		txn->ret = KNOT_EMALF;
	}
}

271 272
#define txn_begin_md(md) md_get32(txn, txn->j->zone, #md, &txn->shadow_md.md)
#define txn_commit_md(md) md_set32(txn, txn->j->zone, #md, txn->shadow_md.md)
273

274 275
#define txn_check_open(txn) if (((txn)->ret = ((txn)->opened ? (txn)->ret : KNOT_EINVAL)) != KNOT_EOK) return
#define txn_check_ret(txn) if (((txn)->ret = ((txn)->opened ? (txn)->ret : KNOT_EINVAL)) != KNOT_EOK)  return ((txn)->ret)
276

Daniel Salzman's avatar
Daniel Salzman committed
277
static void txn_begin(txn_t *txn, bool write_allowed)
278
{
279
	if (txn->ret == KNOT_EOK && txn->opened) {
280
		txn->ret = KNOT_EINVAL;
281
	}
282
	if (txn->ret != KNOT_EOK) {
283 284 285
		return;
	}

286
	txn->ret = txn->j->db->db_api->txn_begin(txn->j->db->db, txn->txn,
Daniel Salzman's avatar
Daniel Salzman committed
287
	                                         (write_allowed ? 0 : KNOT_DB_RDONLY));
288

Daniel Salzman's avatar
Daniel Salzman committed
289
	txn->is_rw = write_allowed;
290
	txn->opened = true;
291 292 293 294 295 296 297

	txn_begin_md(first_serial);
	txn_begin_md(last_serial);
	txn_begin_md(last_serial_to);
	txn_begin_md(last_flushed);
	txn_begin_md(merged_serial);
	txn_begin_md(dirty_serial);
298
	txn_begin_md(changeset_count);
299 300 301 302 303
	txn_begin_md(flags);
}

static void txn_find_force(txn_t *txn)
{
304 305
	txn_check_open(txn);
	txn->ret = txn->j->db->db_api->find(txn->txn, &txn->key, &txn->val, 0);
306 307
}

308
static bool txn_find(txn_t *txn)
309
{
310 311
	if (txn->ret != KNOT_EOK || !txn->opened) {
		return false;
312 313 314 315
	}
	txn_find_force(txn);
	if (txn->ret == KNOT_ENOENT) {
		txn->ret = KNOT_EOK;
316
		return false;
317
	}
318
	return (txn->ret == KNOT_EOK);
319 320 321 322
}

static void txn_insert(txn_t *txn)
{
323 324
	txn_check_open(txn);
	txn->ret = txn->j->db->db_api->insert(txn->txn, &txn->key, &txn->val, 0);
325 326 327 328
}

static void txn_del(txn_t *txn)
{
329 330
	txn_check_open(txn);
	txn->ret = txn->j->db->db_api->del(txn->txn, &txn->key);
331 332 333 334
}

static void txn_iter_begin(txn_t *txn)
{
335
	txn_check_open(txn);
336 337 338 339 340 341
	txn->iter = txn->j->db->db_api->iter_begin(txn->txn, KNOT_DB_FIRST);
	if (txn->iter == NULL) {
		txn->ret = KNOT_ENOMEM;
	}
}

342 343
#define txn_check_iter if (txn->iter == NULL && txn->ret == KNOT_EOK) txn->ret = KNOT_EINVAL; \
                       if (txn->ret != KNOT_EOK) return;
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384

static void txn_iter_seek(txn_t *txn)
{
	txn_check_iter
	txn->iter = txn->j->db->db_api->iter_seek(txn->iter, &txn->key, 0);
	if (txn->iter == NULL) {
		txn->ret = KNOT_ENOENT;
	}
}

static void txn_iter_key(txn_t *txn, knot_db_val_t *at_key)
{
	txn_check_iter
	txn->ret = txn->j->db->db_api->iter_key(txn->iter, at_key);
}

static void txn_iter_val(txn_t *txn)
{
	txn_check_iter
	txn->ret = txn->j->db->db_api->iter_val(txn->iter, &txn->val);
}

static void txn_iter_next(txn_t *txn)
{
	txn_check_iter
	txn->iter = txn->j->db->db_api->iter_next(txn->iter);
	if (txn->iter == NULL) {
		txn->ret = KNOT_ENOENT;
	}
}

static void txn_iter_finish(txn_t *txn)
{
	if (txn->iter != NULL) {
		txn->j->db->db_api->iter_finish(txn->iter);
	}
	txn->iter = NULL;
}

static void txn_abort(txn_t *txn)
{
385 386 387 388
	if (txn->opened) {
		txn_iter_finish(txn);
		txn->j->db->db_api->txn_abort(txn->txn);
		txn->opened = false;
389 390 391 392 393 394 395 396 397 398 399 400
	}
}

static void txn_commit(txn_t *txn)
{
	if (txn->is_rw) {
		txn_commit_md(first_serial);
		txn_commit_md(last_serial);
		txn_commit_md(last_serial_to);
		txn_commit_md(last_flushed);
		txn_commit_md(merged_serial);
		txn_commit_md(dirty_serial);
401
		txn_commit_md(changeset_count);
402 403 404 405 406 407 408 409 410 411 412 413
		txn_commit_md(flags);
	}

	if (txn->ret != KNOT_EOK) {
		txn_abort(txn);
		return;
	}

	txn_iter_finish(txn);
	txn->ret = txn->j->db->db_api->txn_commit(txn->txn);

	if (txn->ret == KNOT_EOK) {
414
		txn->opened = false;
415 416 417 418
	}
	txn_abort(txn); // no effect if all ok
}

419 420 421 422 423 424 425
void journal_txn_commit(struct journal_txn *txn)
{
	if (txn != NULL) {
		txn_commit(txn);
	}
}

426 427 428
static void txn_restart(txn_t *txn)
{
	txn_commit(txn);
429 430
	assert(!txn->opened);
	if (txn->ret == KNOT_EOK) {
431 432 433 434
		txn_begin(txn, txn->is_rw);
	}
}

Daniel Salzman's avatar
Daniel Salzman committed
435
static void txn_reuse(txn_t **txn, txn_t *to_reuse, bool write_allowed)
436 437 438
{
	if (to_reuse == NULL) {
		txn_begin(*txn, write_allowed);
439
	} else {
440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461
		*txn = to_reuse;
	}
}

static void txn_unreuse(txn_t **txn, txn_t *reused)
{
	if (reused == NULL) {
		txn_commit(*txn);
	}
}

#define reuse_txn(name, journal, to_reuse, wa) local_txn_t(name, journal); txn_reuse(&name, to_reuse, wa)
#define unreuse_txn(name, reused) txn_unreuse(&name, reused)

/*
 * ***************************** PART II ******************************
 *
 *  DB metadata manip. and Chunk metadata headers
 *
 * ********************************************************************
 */

462
static void md_get(txn_t *txn, const knot_dname_t *zone, const char *mdkey, uint64_t *res)
463
{
464
	txn_check_open(txn);
465
	txn_key_str(txn, zone, mdkey);
466
	uint64_t res1 = 0;
467
	if (txn_find(txn)) {
468
		txn_val_u64(txn, &res1);
469 470 471 472
	}
	*res = res1;
}

473 474 475 476 477 478 479 480 481 482 483
static void md_get32(txn_t *txn, const knot_dname_t *zone, const char *mdkey, uint32_t *res)
{
	uint64_t res1 = 0;
	md_get(txn, zone, mdkey, &res1);
	if (res1 > UINT32_MAX) {
		txn->ret = KNOT_EMALF;
	} else {
		*res = (uint32_t)res1;
	}
}

484 485 486
// allocates res
static void md_get_common_last_inserter_zone(txn_t *txn, knot_dname_t **res)
{
487
	txn_check_open(txn);
488 489 490
	txn_key_str(txn, NULL, MDKEY_GLOBAL_LAST_INSERTER_ZONE);
	if (txn_find(txn)) {
		*res = knot_dname_copy(txn->val.data, NULL);
491
	} else {
492 493 494 495 496 497 498 499 500 501 502 503 504 505
		*res = NULL;
	}
}

static int md_set_common_last_inserter_zone(txn_t *txn, knot_dname_t *zone)
{
	txn_check_ret(txn);
	txn_key_str(txn, NULL, MDKEY_GLOBAL_LAST_INSERTER_ZONE);
	txn->val.len = knot_dname_size(zone);
	txn->val.data = zone;
	txn_insert(txn);
	return txn->ret;
}

506 507
static void md_del_last_inserter_zone(txn_t *txn, knot_dname_t *if_equals)
{
508
	txn_check_open(txn);
509 510 511 512 513 514 515 516
	txn_key_str(txn, NULL, MDKEY_GLOBAL_LAST_INSERTER_ZONE);
	if (txn_find(txn)) {
		if (if_equals == NULL || knot_dname_is_equal(txn->val.data, if_equals)) {
			txn_del(txn);
		}
	}
}

517 518
static void md_get_common_last_occupied(txn_t *txn, size_t *res)
{
519
	uint64_t sres = 0;
520 521 522 523
	md_get(txn, NULL, MDKEY_GLOBAL_LAST_TOTAL_OCCUPIED, &sres);
	*res = (size_t) sres;
}

524 525 526 527 528 529 530 531 532 533
static void md_set(txn_t *txn, const knot_dname_t *zone, const char *mdkey, uint64_t val)
{
	txn_key_str(txn, zone, mdkey);
	uint64_t val1 = htobe64(val);
	txn->val.len = sizeof(uint64_t);
	txn->val.data = &val1;
	txn_insert(txn);
}

static void md_set32(txn_t *txn, const knot_dname_t *zone, const char *mdkey, uint32_t val)
534 535 536 537 538 539 540 541
{
	txn_key_str(txn, zone, mdkey);
	uint32_t val1 = htobe32(val);
	txn->val.len = sizeof(uint32_t);
	txn->val.data = &val1;
	txn_insert(txn);
}

Daniel Salzman's avatar
Daniel Salzman committed
542
static bool md_flag(txn_t *txn, int flag)
543
{
Daniel Salzman's avatar
Daniel Salzman committed
544
	return (txn->shadow_md.flags & flag);
545 546 547 548 549
}

/*! \brief Marks metadata as flushed */
static void md_flush(txn_t *txn)
{
550
	if (md_flag(txn, SERIAL_TO_VALID) && !md_flag(txn, FIRST_SERIAL_INVALID)) {
551 552 553 554 555 556 557
		txn->shadow_md.last_flushed = txn->shadow_md.last_serial;
		txn->shadow_md.flags |= LAST_FLUSHED_VALID;
	}
}

static int md_flushed(txn_t *txn)
{
558 559
	return (!md_flag(txn, SERIAL_TO_VALID) ||
		(md_flag(txn, LAST_FLUSHED_VALID) &&
560
		 serial_equal(txn->shadow_md.last_flushed, txn->shadow_md.last_serial)));
561 562 563 564
}

static void make_header(knot_db_val_t *to, uint32_t serial_to, int chunk_count)
{
565
	assert(to->len >= JOURNAL_HEADER_SIZE);
566 567
	assert(chunk_count > 0);

568 569 570 571 572 573 574
	uint32_t be_serial_to = htobe32(serial_to);
	uint32_t be_chunk_count = htobe32((uint32_t)chunk_count);

	memcpy(to->data, &be_serial_to, sizeof(be_serial_to));
	memcpy(to->data + sizeof(be_serial_to), &be_chunk_count, sizeof(be_chunk_count));
	memset(to->data + sizeof(be_serial_to) + sizeof(be_chunk_count), 0,
	       JOURNAL_HEADER_SIZE - sizeof(be_serial_to) - sizeof(be_chunk_count));
575 576 577 578 579 580
}

/*! \brief read properties from chunk header "from". All the output params are optional */
static void unmake_header(const knot_db_val_t *from, uint32_t *serial_to,
			  int *chunk_count, size_t *header_size)
{
581
	assert(from->len >= JOURNAL_HEADER_SIZE);
582

583 584 585 586 587 588 589 590 591 592 593 594 595
	uint32_t be_serial_to, be_chunk_count;
	if (serial_to != NULL) {
		memcpy(&be_serial_to, from->data, sizeof(be_serial_to));
		*serial_to = be32toh(be_serial_to);
	}
	if (chunk_count != NULL) {
		memcpy(&be_chunk_count, from->data + sizeof(be_serial_to), sizeof(be_chunk_count));
		assert(be32toh(be_chunk_count) <= INT_MAX);
		*chunk_count = (int)be32toh(be_chunk_count);
	}
	if (header_size != NULL) {
		*header_size = JOURNAL_HEADER_SIZE;
	}
596 597
}

598
static int first_digit(char * of)
599
{
Libor Peltan's avatar
Libor Peltan committed
600 601
	unsigned maj, min;
	return sscanf(of, "%u.%u", &maj, &min) == 2 ? maj : -1;
602 603 604 605
}

static void md_update_journal_count(txn_t * txn, int change_amount)
{
606
	uint64_t jcnt = 0;
607 608 609 610
	md_get(txn, NULL, MDKEY_GLOBAL_JOURNAL_COUNT, &jcnt);
	md_set(txn, NULL, MDKEY_GLOBAL_JOURNAL_COUNT, jcnt + change_amount);
}

Daniel Salzman's avatar
Daniel Salzman committed
611
static int initial_md_check(journal_t *j, bool *dirty_present)
612 613 614
{
	*dirty_present = 0;

Daniel Salzman's avatar
Daniel Salzman committed
615
	bool something_updated = false;
616

617
	local_txn_t(txn, j);
Daniel Salzman's avatar
Daniel Salzman committed
618
	txn_begin(txn, true);
619 620
	txn_key_str(txn, NULL, MDKEY_GLOBAL_VERSION);
	if (!txn_find(txn)) {
621 622 623
		txn->val.len = strlen(JOURNAL_VERSION) + 1;
		txn->val.data = JOURNAL_VERSION;
		txn_insert(txn);
Daniel Salzman's avatar
Daniel Salzman committed
624
		something_updated = true;
625
	} else {
626
		char * jver = txn->val.data;
627 628 629 630 631 632 633 634
		if (first_digit(jver) != first_digit(JOURNAL_VERSION)) {
			txn_abort(txn);
			return KNOT_ENOTSUP;
		}
	}
	txn_key_str(txn, j->zone, MDKEY_PERZONE_FLAGS);
	if (!txn_find(txn)) {
		md_update_journal_count(txn, +1);
Daniel Salzman's avatar
Daniel Salzman committed
635
		something_updated = true;
636 637
	}
	*dirty_present = md_flag(txn, DIRTY_SERIAL_VALID);
638 639 640

	if (something_updated) {
		txn_commit(txn);
641
	} else { // abort to gain up speed when opening a lot of zones
642 643
		txn_abort(txn);
	}
644

645
	return txn->ret;
646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672
}

/*
 * **************************** PART III ******************************
 *
 *  DB iteration
 *
 * ********************************************************************
 */

enum {
	JOURNAL_ITERATION_CHUNKS,     // call the iteration callback for each chunk read, with just the chunk in ctx->val
	JOURNAL_ITERATION_CHANGESETS  // call the iteration callback after the last chunk of a changeset read, with all its chunks in ctx->val
};

typedef struct {
	txn_t *txn;		// DB txn not to be touched by callback, just contains journal pointer
	uint32_t serial;	// serial-from of current changeset
	uint32_t serial_to;	// serial-to of current changeset
	const int method;	// JOURNAL_ITERATION_CHUNKS or JOURNAL_ITERATION_CHANGESETS, to be set by the caller of iterate()
	int chunk_index;	// index of current chunk
	int chunk_count;	// # of chunks of current changeset
	knot_db_val_t *val;	// one val if JOURNAL_ITERATION_CHUNKS; chunk_count vals if JOURNAL_ITERATION_CHANGESETS
	knot_db_iter_t *iter;	// DB iteration context, not to be touched by callback
	void *iter_context;	// anything to send to the callback by the caller of iterate(), untouched by iterate()
} iteration_ctx_t;

673 674
typedef int (*iteration_cb_t)(iteration_ctx_t *ctx);

675 676 677 678 679 680
/*!
 * \brief Move iter to next changeset chunk.
 *
 * Try optimisticly fast move to next DB item. But the changeset can be out of order,
 * so if we don't succeed (different serial or end of DB), we lookup next serial slowly.
 */
681 682

static void get_iter_next(iteration_ctx_t *ctx, iteration_cb_t key_cb)
683
{
684
	knot_db_val_t other_key = { 0 };
685

686
	txn_check_open(ctx->txn);
687 688 689 690 691
	txn_iter_next(ctx->txn);
	txn_iter_key(ctx->txn, &other_key);
	key_cb(ctx);
	if (ctx->txn->ret == KNOT_ENOENT ||
	    (ctx->txn->ret == KNOT_EOK && txn_cmpkey(ctx->txn, &other_key) != 0)) {
692 693 694 695 696
		ctx->txn->ret = KNOT_EOK;
		if (ctx->txn->iter != NULL) {
			txn_iter_finish(ctx->txn);
		}
		txn_iter_begin(ctx->txn);
697
		txn_iter_seek(ctx->txn);
698 699 700
	}
}

701
static int iterate(journal_t *j, txn_t *_txn, iteration_cb_t cb, int method,
702
                   void *iter_context, uint32_t first, uint32_t last, iteration_cb_t key_cb)
703
{
Daniel Salzman's avatar
Daniel Salzman committed
704
	reuse_txn(txn, j, _txn, true);
705

706 707 708 709 710 711 712
	iteration_ctx_t ctx = {
		.method = method,
		.iter_context = iter_context,
		.txn = txn,
		.serial = first,
		.chunk_index = 0
	};
713 714 715 716 717

	knot_db_val_t *vals = NULL;

	txn_iter_begin(txn);

718
	key_cb(&ctx);
719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758
	txn_iter_seek(txn);

	ctx.val = &txn->val;

	while (true) {
		txn_iter_val(txn);
		if (txn->ret != KNOT_EOK) {
			break;
		}

		unmake_header(&txn->val, &ctx.serial_to, &ctx.chunk_count, NULL);

		if (method == JOURNAL_ITERATION_CHANGESETS) {
			if (ctx.chunk_index == 0) {
				if (vals != NULL) free(vals);
				vals = malloc(ctx.chunk_count * sizeof(knot_db_val_t));
				if (vals == NULL) {
					txn->ret = KNOT_ENOMEM;
					break;
				}
				ctx.val = vals;
			}
			memcpy(vals + ctx.chunk_index, &txn->val, sizeof(knot_db_val_t));
		}

		if (method == JOURNAL_ITERATION_CHUNKS) {
			txn->ret = cb(&ctx);
		}

		if (ctx.chunk_index == ctx.chunk_count - 1) { // hit last chunk of current changeset
			if (method == JOURNAL_ITERATION_CHANGESETS) {
				txn->ret = cb(&ctx);
			}

			if (ctx.serial == last) {
				break; // standard loop exit here
			}

			ctx.serial = ctx.serial_to;
			ctx.chunk_index = 0;
759
		} else {
760 761 762
			ctx.chunk_index++;
		}

763
		get_iter_next(&ctx, key_cb);
764 765 766 767 768 769 770 771 772
	}

	if (vals != NULL) {
		free(vals);
	}
	txn_iter_finish(txn);

	unreuse_txn(txn, _txn);

773
	return txn->ret;
774 775
}

776 777 778 779 780 781
static int normal_iterkeycb(iteration_ctx_t *ctx)
{
	txn_key_2u32(ctx->txn, ctx->txn->j->zone, ctx->serial, ctx->chunk_index);
	return KNOT_EOK;
}

782 783 784 785 786 787 788 789 790
/*
 * ***************************** PART IV ******************************
 *
 *  Reading changesets
 *
 * ********************************************************************
 */

/*! \brief Deserialize changeset from chunks (in vals) */
791
static int vals_to_changeset(knot_db_val_t *vals, int nvals,
792
                             const knot_dname_t *zone_name, changeset_t **ch)
793
{
Libor Peltan's avatar
Libor Peltan committed
794 795 796 797 798 799 800
	local_array(uint8_t *, valps, nvals)
	local_array(size_t, vallens, nvals)
	if (valps == NULL || vallens == NULL) {
		local_array_free(valps)
		local_array_free(vallens)
		return KNOT_ENOMEM;
	}
801

802
	for (size_t i = 0; i < nvals; i++) {
803 804
		valps[i] = vals[i].data + JOURNAL_HEADER_SIZE;
		vallens[i] = vals[i].len - JOURNAL_HEADER_SIZE;
805 806 807 808
	}

	changeset_t *t_ch = changeset_new(zone_name);
	if (t_ch == NULL) {
Libor Peltan's avatar
Libor Peltan committed
809 810
		local_array_free(valps)
		local_array_free(vallens)
811 812 813
		return KNOT_ENOMEM;
	}

814
	int ret = changeset_deserialize(t_ch, valps, vallens, nvals);
815

816 817
	local_array_free(valps)
	local_array_free(vallens)
818 819 820 821 822 823 824 825
	if (ret != KNOT_EOK) {
		changeset_free(t_ch);
		return ret;
	}
	*ch = t_ch;
	return KNOT_EOK;
}

826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849
static int vals_to_chgset_ctx(knot_db_val_t *vals, int nvals, uint32_t serial_from,
                              uint32_t serial_to, chgset_ctx_t **ch)
{
	if (nvals < 1) {
		return KNOT_EINVAL;
	}

	chgset_ctx_t *t_ch = chgset_ctx_create(nvals);
	if (t_ch == NULL) {
		return KNOT_ENOMEM;
	}

	for (size_t i = 0; i < nvals; i++) {
		t_ch->src_chunks[i] = vals[i].data + JOURNAL_HEADER_SIZE;
		t_ch->chunk_sizes[i] = vals[i].len - JOURNAL_HEADER_SIZE;
	}

	t_ch->serial_from = serial_from;
	t_ch->serial_to = serial_to;

	*ch = t_ch;
	return KNOT_EOK;
}

850 851 852 853 854 855 856
static int load_one_itercb(iteration_ctx_t *ctx)
{
	changeset_t *ch = NULL, **targ = ctx->iter_context;
	if (*targ != NULL) {
		return KNOT_EINVAL;
	}

857
	int ret = vals_to_changeset(ctx->val, ctx->chunk_count, ctx->txn->j->zone, &ch);
858 859 860 861 862 863 864 865 866
	if (ret == KNOT_EOK) *targ = ch;
	return ret;
}

static int load_list_itercb(iteration_ctx_t *ctx)
{
	changeset_t *ch = NULL;
	list_t *chlist = *(list_t **) ctx->iter_context;

867
	int ret = vals_to_changeset(ctx->val, ctx->chunk_count, ctx->txn->j->zone, &ch);
868 869 870 871 872 873 874

	if (ret == KNOT_EOK) {
		add_tail(chlist, &ch->n);
	}
	return ret;
}

875 876 877 878 879 880 881 882 883 884 885 886 887
static int load_list_ctx_itercb(iteration_ctx_t *ctx)
{
	chgset_ctx_t *ch = NULL;
	list_t *chlist = *(list_t **) ctx->iter_context;

	int ret = vals_to_chgset_ctx(ctx->val, ctx->chunk_count, ctx->serial, ctx->serial_to, &ch);

	if (ret == KNOT_EOK) {
		add_tail(chlist, &ch->n);
	}
	return ret;
}

888 889 890
/*! \brief Load one changeset (with serial) from DB */
static int load_one(journal_t *j, txn_t *_txn, uint32_t serial, changeset_t **ch)
{
Daniel Salzman's avatar
Daniel Salzman committed
891
	reuse_txn(txn, j, _txn, false);
892
	changeset_t *rch = NULL;
893
	iterate(j, txn, load_one_itercb, JOURNAL_ITERATION_CHANGESETS, &rch, serial, serial, normal_iterkeycb);
894
	unreuse_txn(txn, _txn);
895
	if (txn->ret == KNOT_EOK) {
896 897 898
		if (rch == NULL) txn->ret = KNOT_ENOENT;
		else *ch = rch;
	}
899
	return txn->ret;
900 901
}

902 903
static int load_merged_changeset(journal_t *j, txn_t *_txn, changeset_t **mch,
                                 const uint32_t *only_if_serial)
904 905 906
{
	assert(*mch == NULL);

Daniel Salzman's avatar
Daniel Salzman committed
907
	reuse_txn(txn, j, _txn, false);
908
	txn_check_ret(txn);
909 910
	uint32_t ms = txn->shadow_md.merged_serial, fl = txn->shadow_md.flags;

911
	if ((fl & MERGED_SERIAL_VALID) &&
912
	    (only_if_serial == NULL || serial_equal(ms, *only_if_serial))) {
913 914 915 916
		load_one(j, txn, ms, mch);
	}
	unreuse_txn(txn, _txn);

917
	return txn->ret;
918 919 920 921 922 923 924
}

int journal_load_changesets(journal_t *j, list_t *dst, uint32_t from)
{
	if (j == NULL || j->db == NULL || dst == NULL) return KNOT_EINVAL;

	local_txn_t(txn, j);
Daniel Salzman's avatar
Daniel Salzman committed
925
	txn_begin(txn, false);
926 927

	uint32_t ls = txn->shadow_md.last_serial;
928 929
	iterate(j, txn, load_list_itercb, JOURNAL_ITERATION_CHANGESETS, &dst, from,
	        ls, normal_iterkeycb);
930 931
	txn_commit(txn);

932
	return txn->ret;
933 934
}

935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954
int journal_load_chgset_ctx(journal_t *j, chgset_ctx_list_t *dst, uint32_t from)
{
	if (j == NULL || j->db == NULL || dst == NULL) return KNOT_EINVAL;

	txn_t *txn = calloc(1, sizeof(*txn) + sizeof(*txn->txn));
	if (txn == NULL) {
		return KNOT_ENOMEM;
	}
	txn_init(txn, ((void *)txn) + sizeof(*txn), j);
	txn_begin(txn, false);

	init_list(&dst->l);
	dst->txn = txn;
	list_t *dstl = &dst->l;

	uint32_t ls = txn->shadow_md.last_serial;
	iterate(j, txn, load_list_ctx_itercb, JOURNAL_ITERATION_CHANGESETS, &dstl, from,
		ls, normal_iterkeycb);

	if (txn->ret != KNOT_EOK) {
955
		int ret = txn->ret;
956
		txn_commit(txn);
957 958
		free(txn);
		return ret;
959 960 961 962 963
	}

	return txn->ret;
}

964 965 966 967 968 969 970 971 972 973
int load_bootstrap_iterkeycb(iteration_ctx_t *ctx)
{
	txn_key_str_u32(ctx->txn, ctx->txn->j->zone, KEY_BOOTSTRAP_CHANGESET, ctx->chunk_index);
	return KNOT_EOK;
}

static int load_bootstrap_changeset(journal_t *j, txn_t *_txn, changeset_t **ch)
{
	reuse_txn(txn, j, _txn, false);
	changeset_t *rch = NULL;
974
	iterate(j, txn, load_one_itercb, JOURNAL_ITERATION_CHANGESETS, &rch,
975 976
	        0, 0, load_bootstrap_iterkeycb);
	unreuse_txn(txn, _txn);
977
	if (txn->ret == KNOT_EOK) {
978 979 980
		if (rch == NULL) txn->ret = KNOT_ENOENT;
		else *ch = rch;
	}
981
	return txn->ret;
982 983
}

984 985 986 987 988 989 990 991 992
static bool has_bootstrap_changeset(journal_t *j, txn_t *_txn)
{
	reuse_txn(txn, j, _txn, false);
	txn_key_str_u32(txn, j->zone, KEY_BOOTSTRAP_CHANGESET, 0);
	bool res = txn_find(txn);
	unreuse_txn(txn, _txn);
	return res;
}

993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006
int journal_load_bootstrap(journal_t *j, list_t *dst)
{
	if (j == NULL || j->db == NULL || dst == NULL) return KNOT_EINVAL;

	local_txn_t(txn, j);
	txn_begin(txn, false);

	changeset_t *bch = NULL;
	load_bootstrap_changeset(j, txn, &bch);
	if (bch == NULL) {
		txn->ret = KNOT_ENOENT;
		goto jlb_end;
	}
	add_tail(dst, &bch->n);
1007
	uint32_t from = knot_soa_serial(bch->soa_to->rrs.rdata);
1008 1009 1010 1011 1012 1013 1014 1015 1016

	uint32_t ls = txn->shadow_md.last_serial;
	iterate(j, txn, load_list_itercb, JOURNAL_ITERATION_CHANGESETS, &dst,
	        from, ls, normal_iterkeycb);
	if (txn->ret == KNOT_ENOENT) {
		txn->ret = KNOT_EOK;
	}
jlb_end:
	txn_commit(txn);
1017
	return txn->ret;
1018 1019
}

1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038
/*
 * ***************************** PART V *******************************
 *
 *  Deleting changesets
 *
 * ********************************************************************
 */

typedef struct {
	size_t freed_approx;
	size_t to_be_freed;
} delete_status_t;

static int del_upto_itercb(iteration_ctx_t *ctx)
{
	txn_key_2u32(ctx->txn, ctx->txn->j->zone, ctx->serial, ctx->chunk_index);
	txn_del(ctx->txn);
	txn_check_ret(ctx->txn);

1039 1040 1041
	// one whole changeset has been deleted => update metadata.
	// We are sure that the deleted changeset is first at this time.
	// If it's not merged changeset, point first_serial to next one
1042 1043
	if (ctx->chunk_index == ctx->chunk_count - 1) {
		if (!md_flag(ctx->txn, MERGED_SERIAL_VALID) ||
1044
		    !serial_equal(ctx->txn->shadow_md.merged_serial,ctx->serial)) {
1045
			ctx->txn->shadow_md.first_serial = ctx->serial_to;
1046 1047
			ctx->txn->shadow_md.changeset_count--;
		}
1048
		if (serial_equal(ctx->txn->shadow_md.last_flushed, ctx->serial)) {
Daniel Salzman's avatar
Daniel Salzman committed
1049 1050
			ctx->txn->shadow_md.flags &= ~LAST_FLUSHED_VALID;
		}
1051
		if (serial_equal(ctx->txn->shadow_md.last_serial,  ctx->serial)) {
Daniel Salzman's avatar
Daniel Salzman committed
1052 1053
			ctx->txn->shadow_md.flags &= ~SERIAL_TO_VALID;
		}
1054
		if (serial_equal(ctx->txn->shadow_md.merged_serial,ctx->serial)) {
Daniel Salzman's avatar
Daniel Salzman committed
1055 1056
			ctx->txn->shadow_md.flags &= ~MERGED_SERIAL_VALID;
		}
1057 1058 1059 1060 1061 1062 1063 1064
	}
	return KNOT_EOK;
}

/*! \brief Delete from beginning of DB up to "last" changeset including.
 * Please ensure (dbfirst == j->metadata.first_serial) */
static int delete_upto(journal_t *j, txn_t *txn, uint32_t dbfirst, uint32_t last)
{
1065 1066
	return iterate(j, txn, del_upto_itercb, JOURNAL_ITERATION_CHUNKS, NULL,
	               dbfirst, last, normal_iterkeycb);
1067 1068 1069 1070
}

static int delete_merged_changeset(journal_t *j, txn_t *t)
{
Daniel Salzman's avatar
Daniel Salzman committed
1071
	reuse_txn(txn, j, t, true);
1072
	txn_check_ret(txn);
1073 1074
	if (!md_flag(txn, MERGED_SERIAL_VALID)) {
		txn->ret = KNOT_ENOENT;
1075
	} else {
1076 1077 1078
		delete_upto(j, txn, txn->shadow_md.merged_serial, txn->shadow_md.merged_serial);
	}
	unreuse_txn(txn, t);
1079
	return txn->ret;
1080 1081
}

1082 1083
static int delete_bootstrap_changeset(journal_t *j, txn_t *_txn);

1084 1085
static int drop_journal(journal_t *j, txn_t *_txn)
{
Daniel Salzman's avatar
Daniel Salzman committed
1086
	reuse_txn(txn, j, _txn, true);
1087
	txn_check_ret(txn);
1088 1089 1090
	if (md_flag(txn, MERGED_SERIAL_VALID)) {
		delete_merged_changeset(j, txn);
	}
1091
	if (md_flag(txn, SERIAL_TO_VALID) && !md_flag(txn, FIRST_SERIAL_INVALID)) {
1092 1093
		delete_upto(j, txn, txn->shadow_md.first_serial, txn->shadow_md.last_serial);
	}
1094
	delete_bootstrap_changeset(j, txn);
1095 1096
	md_del_last_inserter_zone(txn, j->zone);
	md_set(txn, j->zone, MDKEY_PERZONE_OCCUPIED, 0);
1097
	unreuse_txn(txn, _txn);
1098
	return txn->ret;
1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117
}

static int del_tofree_itercb(iteration_ctx_t *ctx)
{
	delete_status_t *ds = ctx->iter_context;

	if (ds->to_be_freed == 0) {
		return KNOT_EOK; // all done, just running through the rest of records w/o change
	}

	txn_key_2u32(ctx->txn, ctx->txn->j->zone, ctx->serial, ctx->chunk_index);
	txn_del(ctx->txn);
	txn_check_ret(ctx->txn);

	ds->freed_approx += /*4096 + */ctx->val->len;

	// when whole changeset deleted, check target and update metadata
	if (ctx->chunk_index == ctx->chunk_count - 1) {
		ctx->txn->shadow_md.first_serial = ctx->serial_to;
1118
		ctx->txn->shadow_md.changeset_count--;
1119
		if (serial_equal(ctx->txn->shadow_md.last_flushed, ctx->serial)) {
1120 1121 1122
			ctx->txn->shadow_md.flags &= ~LAST_FLUSHED_VALID;
			ds->to_be_freed = 0; // prevents deleting unflushed changesets
		}
1123
		if (serial_equal(ctx->txn->shadow_md.last_serial, ctx->serial)) {
1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142
			ctx->txn->shadow_md.flags &= ~SERIAL_TO_VALID;
		}
		if (ds->freed_approx >= ds->to_be_freed) {
			ds->to_be_freed = 0;
		}
	}

	return KNOT_EOK;
}

/*!
 * \brief Deletes from j->db oldest changesets to free up space
 *
 * It tries deleting olny flushed changesets, preserves all unflushed ones.
 *
 * \retval KNOT_EOK if no error, even if too little or nothing deleted (check really_freed for result); KNOT_E* if error
 */
static int delete_tofree(journal_t *j, txn_t *_txn, size_t to_be_freed, size_t *really_freed)
{
Daniel Salzman's avatar
Daniel Salzman committed
1143
	reuse_txn(txn, j, _txn, true);
1144
	txn_check_ret(txn);
1145 1146 1147 1148 1149 1150

	if (!md_flag(txn, LAST_FLUSHED_VALID)) {
		*really_freed = 0;
		return KNOT_EOK;
	}
	delete_status_t ds = { .freed_approx = 0, .to_be_freed = to_be_freed };
1151
	iterate(j, txn, del_tofree_itercb, JOURNAL_ITERATION_CHUNKS, &ds,
1152
	        txn->shadow_md.first_serial, txn->shadow_md.last_serial, normal_iterkeycb);
1153 1154
	unreuse_txn(txn, _txn);

1155 1156 1157 1158
	if (txn->ret == KNOT_EOK) {
		*really_freed = ds.freed_approx;
	}
	return txn->ret;
1159 1160
}

1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174
static int del_count_itercb(iteration_ctx_t *ctx)
{
	delete_status_t *ds = ctx->iter_context;
	if (ds->freed_approx >= ds->to_be_freed) {
		return KNOT_EOK;
	}
	txn_key_2u32(ctx->txn, ctx->txn->j->zone, ctx->serial, ctx->chunk_index);
	txn_del(ctx->txn);
	txn_check_ret(ctx->txn);

	// when whole changeset deleted, check target and update metadata
	if (ctx->chunk_index == ctx->chunk_count - 1) {
		ctx->txn->shadow_md.first_serial = ctx->serial_to;
		ctx->txn->shadow_md.changeset_count--;
1175
		if (serial_equal(ctx->txn->shadow_md.last_flushed, ctx->serial)) {
1176 1177 1178
			ctx->txn->shadow_md.flags &= ~LAST_FLUSHED_VALID;
			ds->to_be_freed = ds->freed_approx; // prevents deleting unflushed changesets
		}
1179
		if (serial_equal(ctx->txn->shadow_md.last_serial, ctx->serial)) {
1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191
			ctx->txn->shadow_md.flags &= ~SERIAL_TO_VALID;
		}
		ds->freed_approx++;
	}
	return KNOT_EOK;
}

/*!
 * \brief Deletes specified number of changesets
 *
 * It tries deleting olny flushed changesets, preserves all unflushed ones.
 *
1192 1193
 * \retval KNOT_EOK if no error, even if too little or nothing deleted (check really_deleted for result)
 * \return KNOT_E* if error
1194 1195 1196
 */
static int delete_count(journal_t *j, txn_t *_txn, size_t to_be_deleted, size_t *really_deleted)
{
Daniel Salzman's avatar
Daniel Salzman committed
1197
	reuse_txn(txn, j, _txn, true);
1198
	txn_check_ret(txn);
1199 1200 1201 1202 1203 1204

	if (!md_flag(txn, LAST_FLUSHED_VALID)) {
		*really_deleted = 0;
		return KNOT_EOK;
	}
	delete_status_t ds = { .freed_approx = 0, .to_be_freed = to_be_deleted };
1205
	iterate(j, txn, del_count_itercb, JOURNAL_ITERATION_CHUNKS, &ds,
1206
	        txn->shadow_md.first_serial, txn->shadow_md.last_serial, normal_iterkeycb);
1207 1208
	unreuse_txn(txn, _txn);

1209 1210 1211 1212
	if (txn->ret == KNOT_EOK) {
		*really_deleted = ds.freed_approx;
	}
	return txn->ret;
1213 1214
}

1215 1216
static int delete_dirty_serial(journal_t *j, txn_t *_txn)
{
Daniel Salzman's avatar
Daniel Salzman committed
1217
	reuse_txn(txn, j, _txn, true);
1218
	txn_check_ret(txn);
1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229

	if (!md_flag(txn, DIRTY_SERIAL_VALID)) return KNOT_EOK;

	uint32_t ds = txn->shadow_md.dirty_serial, chunk = 0;

	txn_key_2u32(txn, j->zone, ds, chunk);
	while (txn_find(txn)) {
		txn_del(txn);
		txn_key_2u32(txn, j->zone, ds, ++chunk);
	}
	unreuse_txn(txn, _txn);
1230
	if (txn->ret == KNOT_EOK) {
1231 1232
		txn->shadow_md.flags &= ~DIRTY_SERIAL_VALID;
	}
1233
	return txn->ret;
1234 1235
}

1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248
static int delete_bootstrap_changeset(journal_t *j, txn_t *_txn)
{
	reuse_txn(txn, j, _txn, false);
	uint32_t chunk = 0;
	txn_key_str_u32(txn, j->zone, KEY_BOOTSTRAP_CHANGESET, chunk);
	while (txn_find(txn)) {
		txn_del(txn);
		txn_key_str_u32(txn, j->zone, KEY_BOOTSTRAP_CHANGESET, ++chunk);
	}
	unreuse_txn(txn, _txn);
	return txn->ret;
}

1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260
/*
 * ***************************** PART VI ******************************
 *
 *  Writing changesets
 *
 * ********************************************************************
 */

static int merge_itercb(iteration_ctx_t *ctx)
{
	changeset_t *ch = NULL, *mch = *(changeset_t **)ctx->iter_context;

1261
	int ret = vals_to_changeset(ctx->val, ctx->chunk_count, ctx->txn->j->zone, &ch);
1262
	if (ret == KNOT_EOK) {
1263
		ret = changeset_merge(mch, ch, 0);
1264 1265 1266 1267 1268
		changeset_free(ch);
	}
	return ret;
}

1269
static int merge_unflushed_changesets(journal_t *j, txn_t *_txn, changeset_t **mch, bool *merged_bootstrap)
1270
{
Daniel Salzman's avatar
Daniel Salzman committed
1271
	reuse_txn(txn, j, _txn, false);
1272
	txn_check_ret(txn);
1273 1274 1275 1276
	*mch = NULL;
	if (md_flushed(txn)) {
		goto m_u_ch_end;
	}
1277 1278 1279 1280 1281 1282 1283 1284 1285 1286
	uint32_t from;
	txn->ret = load_bootstrap_changeset(j, txn, mch);
	*merged_bootstrap = (txn->ret == KNOT_EOK);
	if (txn->ret == KNOT_ENOENT) { // no bootstrap changeset (normal operation)
		bool was_merged = md_flag(txn, MERGED_SERIAL_VALID);
		bool was_flushed = md_flag(txn, LAST_FLUSHED_VALID);
		txn->ret = KNOT_EOK;
		from = was_merged ? txn->shadow_md.merged_serial :
				    (was_flushed ? txn->shadow_md.last_flushed :
						   txn->shadow_md.first_serial);
1287
		txn->ret = load_one(j, txn, from, mch);
1288 1289
		if (!was_merged && was_flushed && txn->ret == KNOT_EOK) {
			// we have to jump to ONE AFTER last_flushed
1290
			from = knot_soa_serial((*mch)->soa_to->rrs.rdata);
1291 1292 1293 1294
			changeset_free(*mch);
			*mch = NULL;
			txn->ret = load_one(j, txn, from, mch);
		}
1295 1296 1297 1298
	}
	if (txn->ret != KNOT_EOK) {
		goto m_u_ch_end;
	}
1299
	from = knot_soa_serial((*mch)->soa_to->rrs.rdata);
1300

1301
	if (!serial_equal(from, txn->shadow_md.last_serial_to)) {
1302 1303 1304
		txn->ret = iterate(j, txn, merge_itercb, JOURNAL_ITERATION_CHANGESETS,
		                   mch, from, txn->shadow_md.last_serial, normal_iterkeycb);
	}
1305

1306
m_u_ch_end:
1307
	unreuse_txn(txn, _txn);
1308
	if (txn->ret != KNOT_EOK && *mch != NULL) {
1309 1310 1311
		changeset_free(*mch);
		*mch = NULL;
	}
1312
	return txn->ret;
1313 1314
}

1315 1316 1317
dynarray_declare(chunk, knot_db_val_t, DYNARRAY_VISIBILITY_STATIC, 32)
dynarray_define(chunk, knot_db_val_t, DYNARRAY_VISIBILITY_STATIC)

1318 1319 1320 1321 1322
// uses local context, e.g.: j, txn, changesets, nchs, serialized_size_total, store_changeset_cleanup, inserting_merged
#define try_flush \
	if (!md_flushed(txn)) { \
		if (journal_merge_allowed(j)) { \
			changeset_t *merged; \
1323
			merge_unflushed_changesets(j, txn, &merged, &merged_into_bootstrap); \
1324 1325 1326
			if (txn->ret != KNOT_EOK) { \
				goto store_changeset_cleanup; \
			} \
1327 1328
			add_tail(changesets, &merged->n); \
			nchs++; \
1329
			serialized_size_merged += changeset_serialized_size(merged); \
1330
			md_flush(txn); \
Daniel Salzman's avatar
Daniel Salzman committed
1331
			inserting_merged = true; \
1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343
		} \
		else { \
			txn->ret = KNOT_EBUSY; \
			goto store_changeset_cleanup; \
		} \
	}

static int store_changesets(journal_t *j, list_t *changesets)
{
	// PART 1 : initializers, compute serialized_sizes, transaction start
	changeset_t *ch;

1344 1345
	size_t nchs = 0, inserted_size = 0, insert_txn_count = 1;
	size_t serialized_size_changes = 0, serialized_size_merged = 0;
1346

1347
	size_t chunks = 0;
1348

1349 1350
	bool inserting_merged = false;
	bool merged_into_bootstrap = false;
1351
	bool inserting_bootstrap = false;
1352

1353 1354
	size_t occupied_last, occupied_now = knot_db_lmdb_get_usage(j->db->db);

1355 1356
	WALK_LIST(ch, *changesets) {
		nchs++;
1357
		serialized_size_changes += changeset_serialized_size(ch);
1358 1359 1360
		if (ch->soa_from == NULL) {
			inserting_bootstrap = true;
		}
1361 1362 1363
	}

	local_txn_t(txn, j);
Daniel Salzman's avatar
Daniel Salzman committed
1364
	txn_begin(txn, true);
1365

1366 1367 1368
	bool zone_in_journal = has_bootstrap_changeset(j, txn);
	bool merge_allowed = journal_merge_allowed(j);

1369 1370 1371 1372
	// if you're tempted to add dirty_serial deletion somewhere here, you're wrong. Don't do it.

	// PART 2 : recalculating the previous insert's occupy change
	md_get_common_last_occupied(txn, &occupied_last);
1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383
	md_set(txn, NULL, MDKEY_GLOBAL_LAST_TOTAL_OCCUPIED, occupied_now);
	if (occupied_now != occupied_last) {
		knot_dname_t *last_zone = NULL;
		uint64_t lz_occupied;
		md_get_common_last_inserter_zone(txn, &last_zone);
		if (last_zone != NULL) {
			md_get(txn, last_zone, MDKEY_PERZONE_OCCUPIED, &lz_occupied);
			lz_occupied = (lz_occupied + occupied_now > occupied_last ?
			               lz_occupied + occupied_now - occupied_last : 0);
			md_set(txn, last_zone, MDKEY_PERZONE_OCCUPIED, lz_occupied);
			free(last_zone);
1384
		}
1385
	}
1386
	md_set_common_last_inserter_zone(txn, j->zone);
1387

1388 1389 1390 1391 1392 1393 1394
	// PART 3a : delete all if inserting bootstrap changeset
	if (inserting_bootstrap) {
		drop_journal(j, txn);
		txn_restart(txn);
	}

	// PART 3b : check if we exceeded designed occupation and delete some
1395
	uint64_t occupied = 0, occupied_max;
1396
	md_get(txn, j->zone, MDKEY_PERZONE_OCCUPIED, &occupied);
1397
	occupied_max = journal_max_usage(j);
1398 1399
	occupied += serialized_size_changes;
	if (occupied > occupied_max) {
1400 1401 1402 1403 1404 1405 1406 1407
		size_t freed;
		size_t tofree = (occupied - occupied_max) * journal_tofree_factor(j);
		size_t free_min = tofree * journal_minfree_factor(j);
		delete_tofree(j, txn, tofree, &freed);
		if (freed < free_min) {
			tofree -= freed;
			free_min -= freed;
			try_flush
1408
			tofree += serialized_size_merged;
1409 1410
			delete_tofree(j, txn, tofree, &freed);
			if (freed < free_min) {
1411
				txn->ret = KNOT_ESPACE;
1412
				log_zone_warning(j->zone, "journal, unable to make free space for insert, "
1413 1414
				                 "required: %"PRIu64", max: %"PRIu64,
				                 occupied, occupied_max);
1415
				goto store_changeset_cleanup;
1416 1417 1418 1419
			}
		}
	}

1420
	// PART 3c : check if we exceeded history depth
1421
	long over_limit = (long)txn->shadow_md.changeset_count - journal_max_changesets(j) +
Daniel Salzman's avatar
Daniel Salzman committed
1422
	                  list_size(changesets) - (inserting_merged ? 1 : 0);
1423 1424 1425 1426
	if (zone_in_journal && over_limit > 0 && !merge_allowed) {
		txn->ret = KNOT_ESPACE;
		log_zone_warning(j->zone, "journal, unable to make free slot for insert");
		goto store_changeset_cleanup;
1427
	} else if (over_limit > 0) {
1428 1429 1430 1431 1432 1433 1434 1435 1436 1437
		size_t deled;
		delete_count(j, txn, over_limit, &deled);
		over_limit -= deled;
		if (over_limit > 0) {
			try_flush
			delete_count(j, txn, over_limit, &deled);
			// ignore further errors here, the limit is not so important
		}
	}

1438 1439
	// PART 4: continuity and duplicity check
	changeset_t * chs_head = (HEAD(*changesets));
1440
	bool is_first_bootstrap = (chs_head->soa_from == NULL);
1441
	uint32_t serial = is_first_bootstrap ? 0 : knot_soa_serial(chs_head->soa_from->rrs.rdata);
1442
	if (md_flag(txn, SERIAL_TO_VALID) && (is_first_bootstrap ||
1443
	    !serial_equal(txn->shadow_md.last_serial_to, serial)) &&
1444
	    !inserting_bootstrap /* if inserting bootstrap, drop_journal() was called, so no discontinuity */) {
Daniel Salzman's avatar
Daniel Salzman committed
1445
		log_zone_warning(j->zone, "journal, discontinuity in changes history (%u -> %u), dropping older changesets",
1446
		                 txn->shadow_md.last_serial_to, serial);
1447 1448
		if (zone_in_journal) {
			txn->ret = KNOT_ERANGE; // we can't drop history if zone-in-journal, so this is forbidden
1449
			goto store_changeset_cleanup;
1450 1451 1452 1453
		} else if (merge_allowed) {
			// flush would only merge and drop would delete the merge, so skip it
		} else {
			try_flush
1454
		}
1455 1456 1457 1458
		drop_journal(j, txn);
		txn_restart(txn);
	}
	WALK_LIST(ch, *changesets) {
1459
		uint32_t serial_to = knot_soa_serial(ch->soa_to->rrs.rdata);
1460
		bool is_this_bootstrap = (ch->soa_from == NULL);
1461 1462
		bool is_this_merged = (inserting_merged && ch == TAIL(*changesets));
		if (is_this_bootstrap || is_this_merged) {
1463 1464 1465 1466
			continue;
		}
		txn_key_2u32(txn, j->zone, serial_to, 0);
		if (txn_find(txn)) {
Daniel Salzman's avatar
Daniel Salzman committed
1467
			log_zone_warning(j->zone, "journal, duplicate changeset serial (%u), dropping older changesets",