Commit 578d59c0 authored by Marek Vavruša's avatar Marek Vavruša Committed by Marek Vavruša

modules/redis: implement basic hiredis storage backend

the Redis backend supports UNIX, TCP and custom ports
no real transactions nor iteration, but it’s not needed at the moment
transactions keep replies in a freelist, which is purged on transaction
commit / abort
parent c78c9faf
......@@ -27,6 +27,7 @@ $(eval $(call find_bin,sphinx-build))
$(eval $(call find_bin,gccgo))
$(eval $(call find_python))
$(eval $(call find_lib,libmemcached,1.0))
$(eval $(call find_lib,hiredis))
# Work around luajit on OS X
ifeq ($(PLATFORM), Darwin)
......
......@@ -15,3 +15,4 @@ Implemented modules
.. include:: ../modules/graphite/README.rst
.. include:: ../modules/ketcd/README.rst
.. include:: ../modules/kmemcached/README.rst
.. include:: ../modules/redis/README.rst
......@@ -19,4 +19,5 @@ info:
$(info [$(HAS_python)] Python (tests/integration))
$(info [$(HAS_gccgo)] GCCGO (modules/go))
$(info [$(HAS_libmemcached)] libmemcached (modules/memcached))
$(info [$(HAS_hiredis)] hiredis (modules/redis))
$(info )
......@@ -7,6 +7,10 @@ modules_TARGETS := hints \
ifeq ($(HAS_libmemcached),yes)
modules_TARGETS += kmemcached
endif
# Redis
ifeq ($(HAS_hiredis),yes)
modules_TARGETS += redis
endif
# List of Lua modules
ifeq ($(HAS_lua),yes)
......
Redis cache storage
-------------------
This modules provides Redis_ backend for cache storage. Redis is a BSD-license key-value cache and storage server.
Like memcached_ backend, Redis provides master-server replication, but also weak-consistency clustering.
After loading you can see the storage backend registered and useable.
.. code-block:: lua
> modules.load 'redis'
> cache.backends()
[redis://] => true
Redis client support TCP or UNIX sockets.
.. code-block:: lua
> cache.storage = 'redis://127.0.0.1'
> cache.storage = 'redis://127.0.0.1:6398'
> cache.storage = 'redis:///tmp/redis.sock'
It also supports indexed databases if you prefix the configuration string with ``DBID@``.
.. code-block:: lua
> cache.storage = 'redis://9@127.0.0.1'
.. warning:: The Redis client doesn't really support transactions nor pruning. Cache eviction policy shoud be left upon Redis server, see the `Using Redis as an LRU cache <redis-lru>`_.
Build distributed cache
^^^^^^^^^^^^^^^^^^^^^^^
See `Redis Cluster` tutorial.
Dependencies
^^^^^^^^^^^^
Depends on the hiredis_ library, which is usually in the packages / ports or you can install it from sources.
.. _Redis: http://redis.io/
.. _memcached: http://memcached.org/
.. _`Redis Cluster`: http://redis.io/topics/cluster-tutorial
.. _hiredis: https://github.com/redis/hiredis
.. _redis-lru: http://redis.io/topics/lru-cache
/* Copyright (C) 2015 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file namedb_redis.c
* @brief Implemented all the things that the resolver cache needs (get, set, expiration).
* @note No real transactions.
* @note No iteration support.
*/
#include <assert.h>
#include <string.h>
#include <libknot/internal/namedb/namedb.h>
#include "modules/redis/redis.h"
#include "lib/cache.h"
#include "lib/utils.h"
#include "lib/defines.h"
static int cli_connect(struct redis_cli *cli)
{
/* Connect to either UNIX socket or TCP */
if (cli->port == 0) {
cli->handle = redisConnectUnix(cli->addr);
} else {
cli->handle = redisConnect(cli->addr, cli->port);
}
/* Catch errors */
if (!cli->handle) {
return kr_error(ENOMEM);
} else if (cli->handle->err) {
redisFree(cli->handle);
cli->handle = NULL;
return kr_error(ECONNREFUSED);
}
/* Set max bufsize */
cli->handle->reader->maxbuf = REDIS_BUFSIZE;
/* Select database */
redisReply *reply = redisCommand(cli->handle, "SELECT %d", cli->database);
if (!reply) {
redisFree(cli->handle);
cli->handle = NULL;
return kr_error(ENOTDIR);
}
freeReplyObject(reply);
return kr_ok();
}
static void cli_decommit(struct redis_cli *cli)
{
redis_freelist_t *freelist = &cli->freelist;
for (unsigned i = 0; i < freelist->len; ++i) {
freeReplyObject(freelist->at[i]);
}
freelist->len = 0;
}
static void cli_free(struct redis_cli *cli)
{
if (cli->handle) {
redisFree(cli->handle);
}
cli_decommit(cli);
array_clear(cli->freelist);
free(cli->addr);
free(cli);
}
static int init(namedb_t **db, mm_ctx_t *mm, void *arg)
{
if (!db || !arg) {
return kr_error(EINVAL);
}
/* Clone redis cli and connect */
struct redis_cli *cli = malloc(sizeof(*cli));
if (!cli) {
return kr_error(ENOMEM);
}
memcpy(cli, arg, sizeof(*cli));
int ret = cli_connect(cli);
if (ret != 0) {
cli_free(cli);
return ret;
}
*db = cli;
return ret;
}
static void deinit(namedb_t *db)
{
struct redis_cli *cli = db;
cli_free(cli);
}
static int txn_begin(namedb_t *db, namedb_txn_t *txn, unsigned flags)
{
if (!db || !txn) {
return kr_error(EINVAL);
}
txn->db = db;
return kr_ok();
}
static int txn_commit(namedb_txn_t *txn)
{
if (!txn || !txn->db) {
return kr_error(EINVAL);
}
cli_decommit(txn->db);
txn->db = NULL;
return kr_ok();
}
static void txn_abort(namedb_txn_t *txn)
{
/** @warning No real transactions here. */
txn_commit(txn);
}
/* Attempt to reconnect */
#define ERR_RECONNECT(cli) \
if ((cli)->handle->err != REDIS_ERR_OTHER) { \
redisFree((cli)->handle); \
(cli)->handle = NULL; \
cli_connect(cli); \
}
static int count(namedb_txn_t *txn)
{
if (!txn || !txn->db) {
return kr_error(EINVAL);
}
int ret = 0;
struct redis_cli *cli = txn->db;
redisReply *reply = redisCommand(cli->handle, "DBSIZE");
if (!reply) {
ERR_RECONNECT(cli);
return kr_error(EIO);
}
if (reply->type == REDIS_REPLY_INTEGER) {
ret = reply->integer;
}
freeReplyObject(reply);
return ret;
}
static int clear(namedb_txn_t *txn)
{
if (!txn || !txn->db) {
return kr_error(EINVAL);
}
struct redis_cli *cli = txn->db;
redisReply *reply = redisCommand(cli->handle, "FLUSHDB");
if (!reply) {
ERR_RECONNECT(cli);
return kr_error(EIO);
}
freeReplyObject(reply);
return kr_ok();
}
static int find(namedb_txn_t *txn, namedb_val_t *key, namedb_val_t *val, unsigned flags)
{
if (!txn || !key || !val) {
return kr_error(EINVAL);
}
struct redis_cli *cli = txn->db;
redisReply *reply = redisCommand(cli->handle, "GET %b", key->data, key->len);
if (!reply) {
ERR_RECONNECT(cli);
return kr_error(EIO);
}
/* Track reply in a freelist for this transaction */
if (array_push(cli->freelist, reply) < 0) {
freeReplyObject(reply); /* Can't track this, must free */
return kr_error(ENOMEM);
}
/* Return value */
if (reply->type != REDIS_REPLY_STRING) {
return kr_error(EPROTO);
}
val->data = reply->str;
val->len = reply->len;
return kr_ok();
}
static int insert(namedb_txn_t *txn, namedb_val_t *key, namedb_val_t *val, unsigned flags)
{
if (!txn || !key || !val) {
return kr_error(EINVAL);
}
/* @warning This expects usage only for recursor cache, if anyone
* desires to port this somewhere else, TTL shouldn't be interpreted.
*/
struct redis_cli *cli = txn->db;
struct kr_cache_entry *entry = val->data;
redisReply *reply = redisCommand(cli->handle, "SETEX %b %d %b",
key->data, key->len, entry->ttl, val->data, val->len);
if (!reply) {
ERR_RECONNECT(cli);
return kr_error(EIO);
}
freeReplyObject(reply);
return kr_ok();
}
static int del(namedb_txn_t *txn, namedb_val_t *key)
{
return kr_error(ENOSYS);
}
static namedb_iter_t *iter_begin(namedb_txn_t *txn, unsigned flags)
{
/* Iteration is not supported, pruning should be
* left on the Redis server setting */
return NULL;
}
static namedb_iter_t *iter_seek(namedb_iter_t *iter, namedb_val_t *key, unsigned flags)
{
assert(0);
return NULL; /* ENOSYS */
}
static namedb_iter_t *iter_next(namedb_iter_t *iter)
{
assert(0);
return NULL;
}
static int iter_key(namedb_iter_t *iter, namedb_val_t *val)
{
return kr_error(ENOSYS);
}
static int iter_val(namedb_iter_t *iter, namedb_val_t *val)
{
return kr_error(ENOSYS);
}
static void iter_finish(namedb_iter_t *iter)
{
assert(0);
}
const namedb_api_t *namedb_redis_api(void)
{
static const namedb_api_t api = {
"redis",
init, deinit,
txn_begin, txn_commit, txn_abort,
count, clear, find, insert, del,
iter_begin, iter_seek, iter_next, iter_key, iter_val, iter_finish
};
return &api;
}
/* Copyright (C) 2015 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <libknot/internal/namedb/namedb.h>
#include <uv.h>
#include "modules/redis/redis.h"
#include "daemon/engine.h"
#include "lib/module.h"
#include "lib/cache.h"
/** @internal Redis API */
extern const namedb_api_t *namedb_redis_api(void);
/** @internal Make redis options. */
void *namedb_redis_mkopts(const char *conf_, size_t maxsize)
{
auto_free char *conf = strdup(conf_);
struct redis_cli *cli = malloc(sizeof(*cli));
if (!cli || !conf) {
free(cli);
return NULL;
}
/* Parse database */
memset(cli, 0, sizeof(*cli));
char *bp = conf;
char *p = strchr(bp, '@');
if (p) {
*p = '\0';
cli->database = atoi(conf);
bp = (p + 1);
}
/* Parse host / ip / sock */
if (access(bp, W_OK) == 0) { /* UNIX */
cli->addr = strdup(bp);
return cli;
}
struct sockaddr_in6 ip6;
p = strchr(bp, ':');
if (!p) { /* IPv4 */
cli->addr = strdup(bp);
cli->port = REDIS_PORT;
return cli;
}
if (!strchr(p + 1, ':')) { /* IPv4 + port */
*p = '\0';
cli->addr = strdup(bp);
cli->port = atoi(p + 1);
} else { /* IPv6 */
if (uv_ip6_addr(bp, 0, &ip6) == 0) {
cli->addr = strdup(bp);
cli->port = REDIS_PORT;
} else { /* IPv6 + port */
p = strrchr(bp, ':');
*p = '\0';
cli->addr = strdup(bp);
cli->port = atoi(p + 1);
}
}
return cli;
}
int redis_init(struct kr_module *module)
{
struct engine *engine = module->data;
/* Register new storage option */
static struct storage_api redis = {
"redis://", namedb_redis_api, namedb_redis_mkopts
};
array_push(engine->storage_registry, redis);
return kr_ok();
}
int redis_deinit(struct kr_module *module)
{
struct engine *engine = module->data;
/* It was currently loaded, close cache */
if (engine->resolver.cache.api == namedb_redis_api()) {
kr_cache_close(&engine->resolver.cache);
}
/* Prevent from loading it again */
for (unsigned i = 0; i < engine->storage_registry.len; ++i) {
struct storage_api *storage = &engine->storage_registry.at[i];
if (strcmp(storage->prefix, "redis://") == 0) {
array_del(engine->storage_registry, i);
break;
}
}
return kr_ok();
}
KR_MODULE_EXPORT(redis);
redis_SOURCES := modules/redis/redis.c modules/redis/namedb_redis.c
redis_LIBS := $(libkres_TARGET) $(libkres_LIBS) $(hiredis_LIBS) $(libuv_LIBS)
$(call make_c_module,redis)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment