Commit 10a4510e authored by Marek Vavrusa's avatar Marek Vavrusa

Journal is now protected with mutex and not refcounted.

refcounting lead to several errors, for example when:
* ixfr started to write changesets
* scheduler started applying changes to zonefile
* ixfr finished writing changesets
>>> race for node flags <<<
* scheduler finished
==> result: corrupted node flags, bad CRC
parent da23c9d6
This diff is collapsed.
......@@ -43,6 +43,7 @@
#include <stdint.h>
#include <fcntl.h>
#include <pthread.h>
/*!
* \brief Journal entry flags.
......@@ -55,14 +56,6 @@ typedef enum journal_flag_t {
JOURNAL_TRANS = 1 << 3 /*!< Entry is in transaction (uncommited). */
} journal_flag_t;
/*!
* \brief Journal mode.
*/
typedef enum journal_mode_t {
JOURNAL_PERSISTENT = 0 << 0, /*!< Persistent mode (open keeps fd). */
JOURNAL_LAZY = 1 << 0 /*!< Lazy mode (open doesn't keep fd). */
} journal_mode_t;
/*!
* \brief Journal node structure.
*
......@@ -93,8 +86,8 @@ typedef struct journal_t
{
int fd;
struct flock fl; /*!< File lock. */
pthread_mutex_t mutex; /*!< Synchronization mutex. */
char *path; /*!< Path to journal file. */
int refs; /*!< Number of references. */
uint16_t tmark; /*!< Transaction start mark. */
uint16_t max_nodes; /*!< Number of nodes. */
uint16_t qhead; /*!< Node queue head. */
......@@ -103,7 +96,7 @@ typedef struct journal_t
size_t fsize; /*!< Journal file size. */
size_t fslimit; /*!< File size limit. */
journal_node_t free; /*!< Free segment. */
journal_node_t nodes[]; /*!< Array of nodes. */
journal_node_t *nodes; /*!< Array of nodes. */
} journal_t;
/*!
......@@ -146,17 +139,19 @@ typedef int (*journal_apply_t)(journal_t *j, journal_node_t *n);
int journal_create(const char *fn, uint16_t max_nodes);
/*!
* \brief Open journal file for read/write.
* \brief Open journal.
*
* \warning This doesn't open the file yet, just sets up the structure.
* Call \fn journal_retain before reading/writing the journal.
*
* \param fn Journal file name.
* \param fslimit File size limit (0 for no limit).
* \param mode Open mode (0 for normal).
* \param bflags Initial flags for each written node.
*
* \retval new journal instance if successful.
* \retval NULL on error.
*/
journal_t* journal_open(const char *fn, size_t fslimit, int mode, uint16_t bflags);
journal_t* journal_open(const char *fn, size_t fslimit, uint16_t bflags);
/*!
* \brief Fetch entry node for given identifier.
......@@ -345,13 +340,14 @@ int journal_close(journal_t *journal);
/*!
* \brief Retain journal for use.
*
* Allows to track usage of lazily-opened journals.
*
* \param journal Journal.
*
* \return Retained journal.
* \retval KNOT_EOK
* \retval KNOT_EBUSY
* \retval KNOT_EINVAL
* \retval KNOT_ERROR
*/
journal_t *journal_retain(journal_t *journal);
int journal_retain(journal_t *journal);
/*!
* \brief Release retained journal.
......
......@@ -108,7 +108,7 @@ static int zonedata_destroy(knot_zone_t *zone)
pthread_mutex_destroy(&zd->lock);
/* Close IXFR db. */
journal_release(zd->ixfr_db);
journal_close(zd->ixfr_db);
/* Free assigned config. */
conf_free_zone(zd->conf);
......@@ -151,8 +151,7 @@ static int zonedata_init(conf_zone_t *cfg, knot_zone_t *zone)
zd->xfr_in.bootstrap_retry = (XFRIN_BOOTSTRAP_DELAY * tls_rand());
/* Initialize IXFR database. */
zd->ixfr_db = journal_open(cfg->ixfr_db, cfg->ixfr_fslimit,
JOURNAL_LAZY, JOURNAL_DIRTY);
zd->ixfr_db = journal_open(cfg->ixfr_db, cfg->ixfr_fslimit, JOURNAL_DIRTY);
if (zd->ixfr_db == NULL) {
char ebuf[256] = {0};
......@@ -507,9 +506,11 @@ static int zones_zonefile_sync_ev(event_t *e)
int ret = KNOT_EOK;
if (knot_zone_contents(zone)) {
/* Synchronize journal. */
journal_t *j = journal_retain(zd->ixfr_db);
ret = zones_zonefile_sync(zone, j);
journal_release(j);
ret = journal_retain(zd->ixfr_db);
if (ret == KNOT_EOK) {
ret = zones_zonefile_sync(zone, zd->ixfr_db);
journal_release(zd->ixfr_db);
}
rcu_read_lock();
if (ret == KNOT_EOK) {
......@@ -848,23 +849,23 @@ static int zones_load_changesets(const knot_zone_t *zone,
rcu_read_unlock();
/* Retain journal for changeset loading. */
journal_t *j = journal_retain(zd->ixfr_db);
if (j == NULL) {
return KNOT_EBUSY;
int ret = journal_retain(zd->ixfr_db);
if (ret != KNOT_EOK) {
return ret;
}
/* Read entries from starting serial until finished. */
uint32_t found_to = from;
journal_node_t *n = 0;
int ret = journal_fetch(j, from, ixfrdb_key_from_cmp, &n);
ret = journal_fetch(zd->ixfr_db, from, ixfrdb_key_from_cmp, &n);
if (ret != KNOT_EOK) {
dbg_xfr("xfr: failed to fetch starting changeset: %s\n",
knot_strerror(ret));
journal_release(j);
journal_release(zd->ixfr_db);
return ret;
}
while (n != 0 && n != journal_end(j)) {
while (n != 0 && n != journal_end(zd->ixfr_db)) {
/* Check for history end. */
if (to == found_to) {
......@@ -875,7 +876,7 @@ static int zones_load_changesets(const knot_zone_t *zone,
if (chs == NULL) {
dbg_xfr("xfr: failed to create changeset: %s\n",
knot_strerror(ret));
journal_release(j);
journal_release(zd->ixfr_db);
return KNOT_ERROR;
}
......@@ -892,16 +893,16 @@ static int zones_load_changesets(const knot_zone_t *zone,
chs->serial_to = ixfrdb_key_to(n->id);
chs->data = malloc(n->len);
if (!chs->data) {
journal_release(j);
journal_release(zd->ixfr_db);
return KNOT_ENOMEM;
}
/* Read journal entry. */
ret = journal_read_node(j, n, (char*)chs->data);
ret = journal_read_node(zd->ixfr_db, n, (char*)chs->data);
if (ret != KNOT_EOK) {
dbg_xfr("xfr: failed to read data from journal\n");
free(chs->data);
journal_release(j);
journal_release(zd->ixfr_db);
return KNOT_ERROR;
}
......@@ -916,7 +917,7 @@ static int zones_load_changesets(const knot_zone_t *zone,
}
dbg_xfr_detail("xfr: finished reading journal entries\n");
journal_release(j);
journal_release(zd->ixfr_db);
/* Unpack binary data. */
int unpack_ret = zones_changesets_from_binary(dst);
......@@ -3167,13 +3168,17 @@ journal_t *zones_store_changesets_begin(knot_zone_t *zone)
}
/* Begin transaction, will be released on commit/rollback. */
journal_t *j = journal_retain(zd->ixfr_db);
if (journal_trans_begin(j) != KNOT_EOK) {
journal_release(j);
j = NULL;
int ret = journal_retain(zd->ixfr_db);
if (ret != KNOT_EOK) {
return NULL;
}
if (journal_trans_begin(zd->ixfr_db) != KNOT_EOK) {
journal_release(zd->ixfr_db);
return NULL;
}
return j;
return zd->ixfr_db;
}
/*----------------------------------------------------------------------------*/
......
......@@ -77,20 +77,21 @@ int main(int argc, char *argv[])
is_int(KNOT_EOK, ret, "journal: create journal '%s'", jfilename);
/* Open journal. */
journal_t *journal = journal_open(jfilename, fsize, JOURNAL_LAZY, 0);
journal_t *journal = journal_open(jfilename, fsize, 0);
ok(journal != NULL, "journal: open journal '%s'", jfilename);
/* Retain journal. */
journal_t *j = journal_retain(journal);
ret = journal_retain(journal);
is_int(KNOT_EOK, ret, "journal: retain");
/* Write entry to log. */
const char *sample = "deadbeef";
ret = journal_write(j, 0x0a, sample, strlen(sample));
ret = journal_write(journal, 0x0a, sample, strlen(sample));
is_int(KNOT_EOK, ret, "journal: write");
/* Read entry from log. */
char tmpbuf[64] = {'\0'};
ret = journal_read(j, 0x0a, 0, tmpbuf);
ret = journal_read(journal, 0x0a, 0, tmpbuf);
is_int(KNOT_EOK, ret, "journal: read entry");
/* Compare read data. */
......@@ -98,15 +99,15 @@ int main(int argc, char *argv[])
is_int(KNOT_EOK, ret, "journal: read data integrity check");
/* Append several characters. */
journal_write(j, 0, "X", 1); /* Dummy */
journal_write(journal, 0, "X", 1); /* Dummy */
char word[7] = { 'w', 'o', 'r', 'd', '0', '\0', '\0' };
for (int i = 0; i < strlen(word); ++i) {
journal_write(j, i, word+i, 1);
journal_write(journal, i, word+i, 1);
}
/* Compare journal_walk() result. */
_wbi = 0;
journal_walk(j, walkchars);
journal_walk(journal, walkchars);
_walkbuf[_wbi] = '\0';
ret = strcmp(word, _walkbuf);
is_int(0, ret, "journal: read data integrity check 2 '%s'", _walkbuf);
......@@ -114,20 +115,15 @@ int main(int argc, char *argv[])
/* Change single letter and compare. */
word[5] = 'X';
journal_write(j, 5, word+5, 1); /* append 'X', shifts out 'w' */
journal_walk(j, walkchars);
journal_write(journal, 5, word+5, 1); /* append 'X', shifts out 'w' */
journal_walk(journal, walkchars);
_walkbuf[_wbi] = '\0';
ret = strcmp(word + 1, _walkbuf);
is_int(0, ret, "journal: read data integrity check 3 '%s'", _walkbuf);
_wbi = 0;
/* Attempt to retain and release. */
journal_t *tmp = journal_retain(j);
ok(tmp == j, "journal: tested journal retaining");
journal_release(tmp);
/* Release journal. */
journal_release(j);
journal_release(journal);
/* Close journal. */
journal_close(journal);
......@@ -141,8 +137,9 @@ int main(int argc, char *argv[])
ret = journal_create(jfilename, jsize);
is_int(KNOT_EOK, ret, "journal: create journal '%s'", jfilename);
j = journal_open(jfilename, fsize, 0, 0);
ok(j != NULL, "journal: open journal '%s'", jfilename);
journal = journal_open(jfilename, fsize, 0);
ok(journal != NULL, "journal: open journal '%s'", jfilename);
journal_retain(journal);
/* Write random data. */
int chk_key = 0;
......@@ -152,7 +149,7 @@ int main(int argc, char *argv[])
for (int i = 0; i < itcount; ++i) {
int key = rand() % 65535;
randstr(tmpbuf, sizeof(tmpbuf));
if (journal_write(j, key, tmpbuf, sizeof(tmpbuf)) != KNOT_EOK) {
if (journal_write(journal, key, tmpbuf, sizeof(tmpbuf)) != KNOT_EOK) {
ret = -1;
break;
}
......@@ -167,36 +164,38 @@ int main(int argc, char *argv[])
/* Check data integrity. */
memset(tmpbuf, 0, sizeof(tmpbuf));
journal_read(j, chk_key, 0, tmpbuf);
journal_read(journal, chk_key, 0, tmpbuf);
ret = strncmp(chk_buf, tmpbuf, sizeof(chk_buf));
is_int(0, ret, "journal: read data integrity check");
/* Reopen log and re-read value. */
memset(tmpbuf, 0, sizeof(tmpbuf));
journal_close(j);
j = journal_open(jfilename, fsize, 0, 0);
ok(j != NULL, "journal: open journal '%s'", jfilename);
journal_release(journal);
journal_close(journal);
journal = journal_open(jfilename, fsize, 0);
ok(journal != NULL, "journal: open journal '%s'", jfilename);
journal_retain(journal);
journal_read(j, chk_key, 0, tmpbuf);
journal_read(journal, chk_key, 0, tmpbuf);
ret = strncmp(chk_buf, tmpbuf, sizeof(chk_buf));
is_int(0, ret, "journal: read data integrity check after close/open");
/* Map journal entry. */
char *mptr = NULL;
memset(chk_buf, 0xde, sizeof(chk_buf));
ret = journal_map(j, 0x12345, &mptr, sizeof(chk_buf));
ret = journal_map(journal, 0x12345, &mptr, sizeof(chk_buf));
ok(mptr && ret == 0, "journal: mapped journal entry");
if (ret != 0) {
skip_block(2, "No mapped journal");
} else {
/* Write to mmaped entry and unmap. */
memcpy(mptr, chk_buf, sizeof(chk_buf));
ret = journal_unmap(j, 0x12345, mptr, 1);
ret = journal_unmap(journal, 0x12345, mptr, 1);
ok(mptr && ret == 0, "journal: written to mapped entry and finished");
/* Compare mmaped entry. */
memset(tmpbuf, 0, sizeof(tmpbuf));
journal_read(j, 0x12345, NULL, tmpbuf);
journal_read(journal, 0x12345, NULL, tmpbuf);
ret = strncmp(chk_buf, tmpbuf, sizeof(chk_buf));
ok(ret == 0, "journal: mapped entry data integrity check");
......@@ -204,31 +203,31 @@ int main(int argc, char *argv[])
/* Make a transaction. */
uint64_t tskey = 0x75750000;
ret = journal_trans_begin(j);
ret = journal_trans_begin(journal);
is_int(0, ret, "journal: TRANS begin");
for (int i = 0; i < 16; ++i) {
memset(tmpbuf, i, sizeof(tmpbuf));
journal_write(j, tskey + i, tmpbuf, sizeof(tmpbuf));
journal_write(journal, tskey + i, tmpbuf, sizeof(tmpbuf));
}
/* Check if uncommited node exists. */
ret = journal_read(j, tskey + rand() % 16, NULL, chk_buf);
ret = journal_read(journal, tskey + rand() % 16, NULL, chk_buf);
ok(ret != 0, "journal: check for uncommited node");
/* Commit transaction. */
ret = journal_trans_commit(j);
int read_ret = journal_read(j, tskey + rand() % 16, NULL, chk_buf);
ret = journal_trans_commit(journal);
int read_ret = journal_read(journal, tskey + rand() % 16, NULL, chk_buf);
ok(ret == 0 && read_ret == 0, "journal: transaction commit");
/* Rollback transaction. */
tskey = 0x6B6B0000;
journal_trans_begin(j);
journal_trans_begin(journal);
for (int i = 0; i < 16; ++i) {
memset(tmpbuf, i, sizeof(tmpbuf));
journal_write(j, tskey + i, tmpbuf, sizeof(tmpbuf));
journal_write(journal, tskey + i, tmpbuf, sizeof(tmpbuf));
}
ret = journal_trans_rollback(j);
read_ret = journal_read(j, tskey + rand() % 16, NULL, chk_buf);
ret = journal_trans_rollback(journal);
read_ret = journal_read(journal, tskey + rand() % 16, NULL, chk_buf);
ok(ret == 0 && read_ret != 0, "journal: transaction rollback");
/* Write random data. */
......@@ -236,20 +235,20 @@ int main(int argc, char *argv[])
for (int i = 0; i < 512; ++i) {
int key = i;
randstr(tmpbuf, sizeof(tmpbuf));
ret = journal_map(j, key, &mptr, sizeof(tmpbuf));
ret = journal_map(journal, key, &mptr, sizeof(tmpbuf));
if (ret != KNOT_EOK) {
diag("journal_map failed: %s", knot_strerror(ret));
break;
}
memcpy(mptr, tmpbuf, sizeof(tmpbuf));
if ((ret = journal_unmap(j, key, mptr, 1)) != KNOT_EOK) {
if ((ret = journal_unmap(journal, key, mptr, 1)) != KNOT_EOK) {
diag("journal_unmap failed: %s", knot_strerror(ret));
break;
}
/* Store some key on the end. */
memset(chk_buf, 0, sizeof(chk_buf));
ret = journal_read(j, key, 0, chk_buf);
ret = journal_read(journal, key, 0, chk_buf);
if (ret != 0) {
diag("journal_map integrity check failed %s",
knot_strerror(ret));
......@@ -264,13 +263,14 @@ int main(int argc, char *argv[])
is_int(0, ret, "journal: sustained mmap r/w");
/* Open + create journal. */
journal_close(j);
journal_release(journal);
journal_close(journal);
remove(jfilename);
j = journal_open(jfilename, fsize, 0, 0);
ok(j != NULL, "journal: open+create from scratch '%s'", jfilename);
journal = journal_open(jfilename, fsize, 0);
ok(journal != NULL, "journal: open+create from scratch '%s'", jfilename);
/* Close journal. */
journal_close(j);
journal_close(journal);
/* Delete journal. */
remove(jfilename);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment