Commit dc9209d0 authored by Libor Peltan's avatar Libor Peltan

journal: fix failure states in zone-in-journal case:

 - discontinuity handling
 - duplicitiy of serials
 - delete subsequent changes if overwriting bootstrap
parent fb30a035
......@@ -914,6 +914,15 @@ static int load_bootstrap_changeset(journal_t *j, txn_t *_txn, changeset_t **ch)
return txn->ret;
}
static bool has_bootstrap_changeset(journal_t *j, txn_t *_txn)
{
reuse_txn(txn, j, _txn, false);
txn_key_str_u32(txn, j->zone, KEY_BOOTSTRAP_CHANGESET, 0);
bool res = txn_find(txn);
unreuse_txn(txn, _txn);
return res;
}
int journal_load_bootstrap(journal_t *j, list_t *dst)
{
if (j == NULL || j->db == NULL || dst == NULL) return KNOT_EINVAL;
......@@ -1256,17 +1265,24 @@ static int store_changesets(journal_t *j, list_t *changesets)
bool inserting_merged = false;
bool merged_into_bootstrap = false;
bool inserting_bootstrap = false;
size_t occupied_last, occupied_now = knot_db_lmdb_get_usage(j->db->db);
WALK_LIST(ch, *changesets) {
nchs++;
serialized_size_total += changeset_serialized_size(ch);
if (ch->soa_from == NULL) {
inserting_bootstrap = true;
}
}
local_txn_t(txn, j);
txn_begin(txn, true);
bool zone_in_journal = has_bootstrap_changeset(j, txn);
bool merge_allowed = journal_merge_allowed(j);
// if you're tempted to add dirty_serial deletion somewhere here, you're wrong. Don't do it.
// PART 2 : recalculating the previous insert's occupy change
......@@ -1296,12 +1312,26 @@ static int store_changesets(journal_t *j, list_t *changesets)
md_set_common_last_inserter_zone(txn, j->zone);
}
// PART 3 : check if we exceeded designed occupation and delete some
// PART 3a : delete all if inserting bootstrap changeset
if (inserting_bootstrap) {
drop_journal(j, txn);
txn_restart(txn);
}
// PART 3b : check if we exceeded designed occupation and delete some
uint64_t occupied = 0, occupied_max;
md_get(txn, j->zone, MDKEY_PERZONE_OCCUPIED, &occupied);
occupied_max = journal_max_usage(j);
occupied += serialized_size_total;
if (occupied > occupied_max) {
if (zone_in_journal && occupied > occupied_max) {
if (merge_allowed) {
try_flush // decrease journal occupation by merging all into bootstrap changeset
} else {
txn->ret = KNOT_ESPACE;
log_zone_warning(j->zone, "journal, unable to make free space for insert");
goto store_changeset_cleanup;
}
} else if (occupied > occupied_max) {
size_t freed;
size_t tofree = (occupied - occupied_max) * journal_tofree_factor(j);
size_t free_min = tofree * journal_minfree_factor(j);
......@@ -1319,10 +1349,18 @@ static int store_changesets(journal_t *j, list_t *changesets)
}
}
// PART 3.5 : check if we exceeded history depth
// PART 3c : check if we exceeded history depth
long over_limit = (long)txn->shadow_md.changeset_count - journal_max_changesets(j) +
list_size(changesets) - (inserting_merged ? 1 : 0);
if (over_limit > 0) {
if (zone_in_journal && over_limit > 0) {
if (merge_allowed) {
try_flush // decrease journal occupation by merging all into bootstrap changeset
} else {
txn->ret = KNOT_ESPACE;
log_zone_warning(j->zone, "journal, unable to make free slot for insert");
goto store_changeset_cleanup;
}
} else if (over_limit > 0) {
size_t deled;
delete_count(j, txn, over_limit, &deled);
over_limit -= deled;
......@@ -1335,22 +1373,27 @@ static int store_changesets(journal_t *j, list_t *changesets)
// PART 4: continuity and duplicity check
changeset_t * chs_head = (HEAD(*changesets));
bool is_bootstrap = (chs_head->soa_from == NULL);
uint32_t serial = is_bootstrap ? 0 : knot_soa_serial(&chs_head->soa_from->rrs);
if (md_flag(txn, SERIAL_TO_VALID) && (is_bootstrap ||
serial_compare(txn->shadow_md.last_serial_to, serial) != 0)) {
bool is_first_bootstrap = (chs_head->soa_from == NULL);
uint32_t serial = is_first_bootstrap ? 0 : knot_soa_serial(&chs_head->soa_from->rrs);
if (md_flag(txn, SERIAL_TO_VALID) && (is_first_bootstrap ||
serial_compare(txn->shadow_md.last_serial_to, serial) != 0) &&
!inserting_bootstrap /* if inserting bootstrap, drop_journal() was called, so no discontinuity */) {
log_zone_warning(j->zone, "journal, discontinuity in changes history (%u -> %u), dropping older changesets",
txn->shadow_md.last_serial_to, serial);
if (!md_flushed(txn) && !journal_merge_allowed(j)) {
txn->ret = KNOT_EBUSY; // aka try_flush, but don't merge
if (zone_in_journal) {
txn->ret = KNOT_ERANGE; // we can't drop history if zone-in-journal, so this is forbidden
goto store_changeset_cleanup;
} else if (merge_allowed) {
// flush would only merge and drop would delete the merge, so skip it
} else {
try_flush
}
drop_journal(j, txn);
txn_restart(txn);
}
WALK_LIST(ch, *changesets) {
uint32_t serial_to = knot_soa_serial(&ch->soa_to->rrs);
bool is_this_bootstrap = (is_bootstrap && ch == HEAD(*changesets));
bool is_this_bootstrap = (ch->soa_from == NULL);
bool is_this_merged = (inserting_merged && ch == TAIL(*changesets));
if (is_this_bootstrap || is_this_merged) {
continue;
......@@ -1359,9 +1402,18 @@ static int store_changesets(journal_t *j, list_t *changesets)
if (txn_find(txn)) {
log_zone_warning(j->zone, "journal, duplicate changeset serial (%u), dropping older changesets",
serial_to);
try_flush
delete_upto(j, txn, txn->shadow_md.first_serial, serial_to);
txn_restart(txn);
if (zone_in_journal) {
if (merge_allowed) {
try_flush // merge will get rid of the duplicity => OK
} else {
txn->ret = KNOT_EEXIST; // we can't fix it in this case, refuse to do it
goto store_changeset_cleanup;
}
} else {
try_flush
delete_upto(j, txn, txn->shadow_md.first_serial, serial_to);
txn_restart(txn);
}
}
}
......@@ -1391,8 +1443,7 @@ static int store_changesets(journal_t *j, list_t *changesets)
}
bool is_this_merged = (inserting_merged && ch == TAIL(*changesets));
bool is_this_bootstrap = (is_bootstrap && (ch == HEAD(*changesets))) ||
(merged_into_bootstrap && is_this_merged);
bool is_this_bootstrap = (ch->soa_from == NULL);
uint32_t serial = is_this_bootstrap ? 0 : knot_soa_serial(&ch->soa_from->rrs);
uint32_t serial_to = knot_soa_serial(&ch->soa_to->rrs);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment