Commit e2621d92 authored by Vladimír Čunát's avatar Vladimír Čunát

cache: rework reusing transactions (LMDB)

Previously a read transaction could be held open by each fork
indefinitely.  That was done for better speed, but it had a downside
of keeping old pages alive and potentially reading only old data,
until some writes were attempted by that fork.

Now kr_cache_ provides explicit API for suitable points where to break
transactions, reusing the _sync command.  On LMDB side the read-only
transaction is only reset and later renewed, supposedly giving better
performance than aborting (see LMDB docs on reset+renew).

Performance: preliminary testing with two forks, resperf on comcast
query-set shows no noticeable difference in peak QPS.
parent 536fa610
......@@ -726,7 +726,9 @@ static int cache_prefixed(struct kr_cache *cache, const char *args, knot_db_val_
}
/* Start prefix search */
return kr_cache_match(cache, namespace, buf, results, maxresults);
int ret = kr_cache_match(cache, namespace, buf, results, maxresults);
kr_cache_sync(cache);
return ret;
}
/** @internal Delete iterated key. */
......@@ -752,6 +754,7 @@ static int cache_remove_prefix(struct kr_cache *cache, const char *args)
result_set[i].data = dst;
}
cache->api->remove(cache->db, result_set, ret);
kr_cache_sync(cache);
/* Free keys */
for (int i = 0; i < ret; ++i) {
free(result_set[i].data);
......
......@@ -109,11 +109,15 @@ void kr_cache_close(struct kr_cache *cache)
}
}
void kr_cache_sync(struct kr_cache *cache)
int kr_cache_sync(struct kr_cache *cache)
{
if (cache_isvalid(cache) && cache->api->sync) {
cache_op(cache, sync);
if (!cache_isvalid(cache)) {
return kr_error(EINVAL);
}
if (cache->api->sync) {
return cache_op(cache, sync);
}
return kr_ok();
}
/**
......@@ -237,7 +241,6 @@ int kr_cache_insert(struct kr_cache *cache, uint8_t tag, const knot_dname_t *nam
return ret;
}
entry_write(entry.data, header, data);
ret = cache_op(cache, sync); /* Make sure the entry is comitted. */
} else {
/* Other backends must prepare contiguous data first */
auto_free char *buffer = malloc(entry.len);
......
......@@ -98,12 +98,9 @@ int kr_cache_open(struct kr_cache *cache, const struct kr_cdb_api *api, struct k
KR_EXPORT
void kr_cache_close(struct kr_cache *cache);
/**
* Synchronise cache with the backing store.
* @param cache structure
*/
/** Run after a row of operations to release transaction/lock if needed. */
KR_EXPORT
void kr_cache_sync(struct kr_cache *cache);
int kr_cache_sync(struct kr_cache *cache);
/**
* Return true if cache is open and enabled.
......
......@@ -37,6 +37,8 @@ struct kr_cdb_api {
void (*close)(knot_db_t *db);
int (*count)(knot_db_t *db);
int (*clear)(knot_db_t *db);
/** Run after a row of operations to release transaction/lock if needed. */
int (*sync)(knot_db_t *db);
/* Data access */
......
......@@ -37,8 +37,17 @@ struct lmdb_env
size_t mapsize;
MDB_dbi dbi;
MDB_env *env;
MDB_txn *rdtxn;
MDB_txn *wrtxn;
/** Cached transactions
*
* - only one of (ro,rw) may be active at once
* - non-NULL .ro may be active or reset
* - non-NULL .rw is always active
*/
struct {
bool ro_active;
MDB_txn *ro, *rw;
} txn;
};
/** @brief Convert LMDB error code. */
......@@ -52,7 +61,8 @@ static int lmdb_error(int error)
case ENOSPC:
return kr_error(ENOSPC);
default:
kr_log_info("[cache] LMDB error: %s\n", mdb_strerror(error));
kr_log_error("[cache] LMDB error: %s\n", mdb_strerror(error));
assert(false);
return -abs(error);
}
}
......@@ -77,53 +87,64 @@ static int set_mapsize(MDB_env *env, size_t map_size)
return 0;
}
static int txn_begin(struct lmdb_env *env, MDB_txn **txn, bool rdonly)
/** Obtain a transaction. (they're cached in env->txn) */
static int txn_get(struct lmdb_env *env, MDB_txn **txn, bool rdonly)
{
/* Always barrier for write transaction. */
assert(env && txn);
if (env->wrtxn) {
mdb_txn_abort(env->wrtxn);
env->wrtxn = NULL;
}
/* Renew pending read-only transaction
* or abort it to clear reader slot before writing. */
if (env->rdtxn) {
if (rdonly) {
*txn = env->rdtxn;
env->rdtxn = NULL;
return 0;
} else {
mdb_txn_abort(env->rdtxn);
env->rdtxn = NULL;
if (env->txn.rw) {
/* Reuse the *open* RW txn even if only reading is requested.
* We leave the management of this to the cdb_sync command.
* The user may e.g. want to do some reads between the writes. */
*txn = env->txn.rw;
return kr_ok();
}
if (!rdonly) {
/* avoid two active transactions */
if (env->txn.ro && env->txn.ro_active) {
mdb_txn_reset(env->txn.ro);
env->txn.ro_active = false;
}
int ret = mdb_txn_begin(env->env, NULL, 0/*RW*/, &env->txn.rw);
if (ret == MDB_SUCCESS) {
*txn = env->txn.rw;
assert(*txn);
}
return lmdb_error(ret);
}
unsigned flags = rdonly ? MDB_RDONLY : 0;
return lmdb_error(mdb_txn_begin(env->env, NULL, flags, txn));
}
static int txn_end(struct lmdb_env *env, MDB_txn *txn)
{
assert(env && txn);
/* Cache read transactions */
if (!env->rdtxn) {
env->rdtxn = txn;
} else {
mdb_txn_abort(txn);
/* Get an active RO txn and return it. */
if (!env->txn.ro) { //:unlikely
int ret = mdb_txn_begin(env->env, NULL, MDB_RDONLY, &env->txn.ro);
if (ret != MDB_SUCCESS) {
return lmdb_error(ret);
}
} else if (!env->txn.ro_active) {
int ret = mdb_txn_renew(env->txn.ro);
if (ret != MDB_SUCCESS) {
return lmdb_error(ret);
}
}
return 0;
env->txn.ro_active = true;
*txn = env->txn.ro;
assert(*txn);
return kr_ok();
}
static int cdb_sync(knot_db_t *db)
{
struct lmdb_env *env = db;
int ret = 0;
if (env->wrtxn) {
ret = lmdb_error(mdb_txn_commit(env->wrtxn));
env->wrtxn = NULL; /* In-flight transaction is committed. */
}
if (env->rdtxn) {
mdb_txn_abort(env->rdtxn);
env->rdtxn = NULL;
int ret = kr_ok();
if (env->txn.rw) {
ret = mdb_txn_commit(env->txn.rw);
if (ret != MDB_BAD_TXN) {
/* _BAD_TXN happens during overfull clear with multiple forks :-/ */
ret = lmdb_error(ret);
}
env->txn.rw = NULL; /* the transaction got freed even in case of errors */
} else if (env->txn.ro && env->txn.ro_active) {
mdb_txn_reset(env->txn.ro);
env->txn.ro_active = false;
}
return ret;
}
......@@ -132,7 +153,14 @@ static int cdb_sync(knot_db_t *db)
static void cdb_close_env(struct lmdb_env *env)
{
assert(env && env->env);
/* Get rid of any transactions. */
cdb_sync(env);
if (env->txn.ro) {
mdb_txn_abort(env->txn.ro);
env->txn.ro = NULL;
}
mdb_env_sync(env->env, 1);
mdb_dbi_close(env->env, env->dbi);
mdb_env_close(env->env);
......@@ -250,7 +278,7 @@ static int cdb_count(knot_db_t *db)
{
struct lmdb_env *env = db;
MDB_txn *txn = NULL;
int ret = txn_begin(env, &txn, true);
int ret = txn_get(env, &txn, true);
if (ret != 0) {
return ret;
}
......@@ -258,8 +286,6 @@ static int cdb_count(knot_db_t *db)
MDB_stat stat;
ret = mdb_stat(txn, env->dbi, &stat);
/* Always abort, serves as a checkpoint for in-flight transaction. */
mdb_txn_abort(txn);
return (ret == MDB_SUCCESS) ? stat.ms_entries : lmdb_error(ret);
}
......@@ -351,8 +377,8 @@ static int cdb_readv(knot_db_t *db, knot_db_val_t *key, knot_db_val_t *val, int
{
struct lmdb_env *env = db;
MDB_txn *txn = NULL;
int ret = txn_begin(env, &txn, true);
if (ret != 0) {
int ret = txn_get(env, &txn, true);
if (ret) {
return ret;
}
......@@ -361,41 +387,50 @@ static int cdb_readv(knot_db_t *db, knot_db_val_t *key, knot_db_val_t *val, int
MDB_val _key = { .mv_size = key[i].len, .mv_data = key[i].data };
MDB_val _val = { .mv_size = val[i].len, .mv_data = val[i].data };
ret = mdb_get(txn, env->dbi, &_key, &_val);
if (ret != MDB_SUCCESS) {
return lmdb_error(ret);
}
/* Update the result. */
val[i].data = _val.mv_data;
val[i].len = _val.mv_size;
}
txn_end(env, txn);
return lmdb_error(ret);
return kr_ok();
}
static int cdb_write(struct lmdb_env *env, MDB_txn *txn, knot_db_val_t *key, knot_db_val_t *val, unsigned flags)
static int cdb_write(struct lmdb_env *env, MDB_txn **txn, knot_db_val_t *key, knot_db_val_t *val, unsigned flags)
{
/* Convert key structs and write */
MDB_val _key = { key->len, key->data };
MDB_val _val = { val->len, val->data };
int ret = mdb_put(txn, env->dbi, &_key, &_val, flags);
int ret = mdb_put(*txn, env->dbi, &_key, &_val, flags);
/* Try to recover from doing too much writing in a single transaction. */
if (ret == MDB_TXN_FULL) {
ret = cdb_sync(env);
if (ret) {
ret = txn_get(env, txn, false);
}
if (ret) {
ret = mdb_put(*txn, env->dbi, &_key, &_val, flags);
}
}
if (ret != MDB_SUCCESS) {
return lmdb_error(ret);
}
/* Update the result. */
val->data = _val.mv_data;
val->len = _val.mv_size;
return 0;
return kr_ok();
}
static int cdb_writev(knot_db_t *db, knot_db_val_t *key, knot_db_val_t *val, int maxcount)
{
struct lmdb_env *env = db;
MDB_txn *txn = NULL;
int ret = txn_begin(env, &txn, false);
if (ret != 0) {
return ret;
}
int ret = txn_get(env, &txn, false);
bool reserved = false;
for (int i = 0; i < maxcount; ++i) {
for (int i = 0; ret == kr_ok() && i < maxcount; ++i) {
/* This is LMDB specific optimisation,
* if caller specifies value with NULL data and non-zero length,
* LMDB will preallocate the entry for caller and leave write
......@@ -404,22 +439,10 @@ static int cdb_writev(knot_db_t *db, knot_db_val_t *key, knot_db_val_t *val, int
unsigned mdb_flags = 0;
if (val[i].len > 0 && val[i].data == NULL) {
mdb_flags |= MDB_RESERVE;
reserved = true;
}
ret = cdb_write(env, txn, &key[i], &val[i], mdb_flags);
if (ret != 0) {
mdb_txn_abort(txn);
return ret;
}
ret = cdb_write(env, &txn, &key[i], &val[i], mdb_flags);
}
/* Leave transaction open if reserved. */
if (reserved) {
assert(env->wrtxn == NULL);
env->wrtxn = txn;
} else {
ret = lmdb_error(mdb_txn_commit(txn));
}
return ret;
}
......@@ -427,29 +450,22 @@ static int cdb_remove(knot_db_t *db, knot_db_val_t *key, int maxcount)
{
struct lmdb_env *env = db;
MDB_txn *txn = NULL;
int ret = txn_begin(env, &txn, false);
if (ret != 0) {
return ret;
}
int ret = txn_get(env, &txn, false);
for (int i = 0; i < maxcount; ++i) {
for (int i = 0; ret == kr_ok() && i < maxcount; ++i) {
MDB_val _key = { key[i].len, key[i].data };
MDB_val val = { 0, NULL };
ret = mdb_del(txn, env->dbi, &_key, &val);
if (ret != 0) {
mdb_txn_abort(txn);
return lmdb_error(ret);
}
ret = lmdb_error(mdb_del(txn, env->dbi, &_key, &val));
}
return lmdb_error(mdb_txn_commit(txn));
return ret;
}
static int cdb_match(knot_db_t *db, knot_db_val_t *key, knot_db_val_t *val, int maxcount)
{
struct lmdb_env *env = db;
MDB_txn *txn = NULL;
int ret = txn_begin(env, &txn, true);
int ret = txn_get(env, &txn, true);
if (ret != 0) {
return ret;
}
......@@ -463,20 +479,18 @@ static int cdb_match(knot_db_t *db, knot_db_val_t *key, knot_db_val_t *val, int
MDB_cursor *cur = NULL;
ret = mdb_cursor_open(txn, env->dbi, &cur);
if (ret != 0) {
mdb_txn_abort(txn);
return lmdb_error(ret);
}
MDB_val cur_key = { key->len, key->data }, cur_val = { 0, NULL };
ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_SET_RANGE);
if (ret != 0) {
if (ret != MDB_SUCCESS) {
mdb_cursor_close(cur);
mdb_txn_abort(txn);
return lmdb_error(ret);
}
int results = 0;
while (ret == 0) {
while (ret == MDB_SUCCESS) {
/* Retrieve current key and compare with prefix */
if (cur_key.mv_size < key->len || memcmp(cur_key.mv_data, key->data, key->len) != 0) {
break;
......@@ -493,7 +507,6 @@ static int cdb_match(knot_db_t *db, knot_db_val_t *key, knot_db_val_t *val, int
}
mdb_cursor_close(cur);
txn_end(env, txn);
return results;
}
......@@ -506,7 +519,7 @@ static int cdb_prune(knot_db_t *db, int limit)
/* Prune old records */
struct lmdb_env *env = db;
MDB_txn *txn = NULL;
int ret = txn_begin(env, &txn, false);
int ret = txn_get(env, &txn, false);
if (ret != 0) {
return ret;
}
......@@ -514,7 +527,6 @@ static int cdb_prune(knot_db_t *db, int limit)
MDB_cursor *cur = NULL;
ret = mdb_cursor_open(txn, env->dbi, &cur);
if (ret != 0) {
mdb_txn_abort(txn);
return lmdb_error(ret);
}
......@@ -522,7 +534,6 @@ static int cdb_prune(knot_db_t *db, int limit)
ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_FIRST);
if (ret != 0) {
mdb_cursor_close(cur);
mdb_txn_abort(txn);
return lmdb_error(ret);
}
......@@ -548,7 +559,6 @@ static int cdb_prune(knot_db_t *db, int limit)
ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_NEXT);
}
mdb_cursor_close(cur);
ret = lmdb_error(mdb_txn_commit(txn));
return ret < 0 ? ret : results;
}
......
......@@ -159,6 +159,7 @@ static int pktcache_peek(kr_layer_t *ctx, knot_pkt_t *pkt)
/* Fetch either answer to original or minimized query */
uint8_t flags = 0;
int ret = loot_pktcache(req->ctx, pkt, req, &flags);
kr_cache_sync(&req->ctx->cache);
if (ret == 0) {
qry->flags.CACHED = true;
qry->flags.NO_MINIMIZE = true;
......@@ -296,11 +297,13 @@ static int pktcache_stash(kr_layer_t *ctx, knot_pkt_t *pkt)
}
/* Stash answer in the cache */
int ret = kr_cache_insert(cache, KR_CACHE_PKT, qname, qtype, &header, data);
if (ret == 0) {
int ret1 = kr_cache_insert(cache, KR_CACHE_PKT, qname, qtype, &header, data);
int ret2 = kr_cache_sync(cache);
if (!ret1 && !ret2) {
VERBOSE_MSG(qry, "=> answer cached for TTL=%u\n", ttl);
} else {
VERBOSE_MSG(qry, "=> stashing failed; codes: %d and %d\n", ret1, ret2);
}
kr_cache_sync(cache);
return ctx->state;
}
......
......@@ -252,6 +252,7 @@ static int rrcache_peek(kr_layer_t *ctx, knot_pkt_t *pkt)
}
}
}
kr_cache_sync(&req->ctx->cache);
if (ret == 0) {
VERBOSE_MSG(qry, "=> satisfied from cache\n");
qry->flags.CACHED = true;
......@@ -451,14 +452,21 @@ static int rrcache_stash(kr_layer_t *ctx, knot_pkt_t *pkt)
/* Open write transaction */
struct kr_cache *cache = &req->ctx->cache;
ret = stash_commit(&stash, qry, cache, req);
if (ret == 0) {
ret = kr_cache_sync(cache);
} else {
kr_cache_sync(cache);
}
/* Clear if full */
if (ret == kr_error(ENOSPC)) {
kr_log_info("[cache] clearing because overfull\n");
ret = kr_cache_clear(cache);
if (ret != 0 && ret != kr_error(EEXIST)) {
kr_log_error("[ rc ] failed to clear cache: %s\n", kr_strerror(ret));
kr_log_error("[cache] failed to clear cache: %s\n", kr_strerror(ret));
}
} else if (ret) {
VERBOSE_MSG(qry, "=> stashing failed: %d\n", ret);
}
kr_cache_sync(cache);
}
return ctx->state;
}
......
......@@ -188,11 +188,12 @@ static void check_empty_nonterms(struct kr_query *qry, knot_pkt_t *pkt, struct k
/* @todo We could stop resolution here for NXDOMAIN, but we can't because of broken CDNs */
qry->flags.NO_MINIMIZE = true;
kr_make_query(qry, pkt);
return;
break;
}
assert(target[0]);
target = knot_wire_next_label(target, NULL);
}
kr_cache_sync(cache);
}
static int ns_fetch_cut(struct kr_query *qry, const knot_dname_t *requested_name,
......
......@@ -458,6 +458,7 @@ int kr_zonecut_find_cached(struct kr_context *ctx, struct kr_zonecut *cut, const
}
update_cut_name(cut, label);
mm_free(cut->pool, qname);
kr_cache_sync(&ctx->cache);
return kr_ok();
}
/* Subtract label from QNAME. */
......@@ -467,6 +468,7 @@ int kr_zonecut_find_cached(struct kr_context *ctx, struct kr_zonecut *cut, const
break;
}
}
kr_cache_sync(&ctx->cache);
mm_free(cut->pool, qname);
return kr_error(ENOENT);
}
......@@ -166,6 +166,7 @@ static void test_fake_invalid (void **state)
ret = kr_cache_peek(cache, KR_CACHE_USER, dname, KNOT_RRTYPE_TSIG, &entry, 0);
cache->api = api_saved;
assert_int_not_equal(ret, 0);
kr_cache_sync(cache);
}
static void test_fake_insert(void **state)
......@@ -184,6 +185,7 @@ static void test_fake_insert(void **state)
KNOT_RRTYPE_TSIG, &global_fake_ce, global_namedb_data);
assert_int_equal(ret_cache_ins_ok, 0);
assert_int_equal(ret_cache_ins_inval, KNOT_EINVAL);
kr_cache_sync(cache);
}
/* Test invalid parameters and some api failures. */
......@@ -217,6 +219,7 @@ static void test_invalid(void **state)
assert_int_not_equal(kr_cache_remove(cache, KR_CACHE_RR, NULL, 0), 0);
assert_int_not_equal(kr_cache_remove(NULL, 0, NULL, 0), 0);
assert_int_not_equal(kr_cache_clear(NULL), 0);
kr_cache_sync(cache);
}
/* Test cache write */
......@@ -226,6 +229,7 @@ static void test_insert_rr(void **state)
struct kr_cache *cache = (*state);
int ret = kr_cache_insert_rr(cache, &global_rr, 0, 0, CACHE_TIME);
assert_int_equal(ret, 0);
kr_cache_sync(cache);
}
static void test_materialize(void **state)
......@@ -276,6 +280,7 @@ static void test_query(void **state)
assert_int_equal(query_ret, 0);
assert_true(rr_equal);
}
kr_cache_sync(cache);
}
/* Test cache read (simulate aged entry) */
......@@ -290,6 +295,7 @@ static void test_query_aged(void **state)
struct kr_cache *cache = (*state);
int ret = kr_cache_peek_rr(cache, &cache_rr, &rank, &flags, &timestamp);
assert_int_equal(ret, kr_error(ESTALE));
kr_cache_sync(cache);
}
/* Test cache removal */
......@@ -306,6 +312,7 @@ static void test_remove(void **state)
assert_int_equal(ret, 0);
ret = kr_cache_peek_rr(cache, &cache_rr, &rank, &flags, &timestamp);
assert_int_equal(ret, KNOT_ENOENT);
kr_cache_sync(cache);
}
/* Test cache fill */
......@@ -322,10 +329,15 @@ static void test_fill(void **state)
if (ret != 0) {
break;
}
ret = kr_cache_sync(cache);
if (ret != 0) {
break;
}
}
/* Expect we run out of space */
assert_int_equal(ret, kr_error(ENOSPC));
kr_cache_sync(cache);
}
/* Test cache clear */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment