summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorOran Agra <oran@redislabs.com>2019-07-21 17:41:03 +0300
committerOran Agra <oran@redislabs.com>2019-07-22 21:15:33 +0300
commit3b6aeea44cf8bdc64214a5f145da55453722a9a2 (patch)
tree103e66717e30499dc4a2b3c7c8ef27133a323140 /src
parentbc5cb168f504c188c7e67ca61853fd73c341fa62 (diff)
downloadredis-3b6aeea44cf8bdc64214a5f145da55453722a9a2.tar.gz
Implement module api for aux data in rdb
Other changes: * fix memory leak in error handling of rdb loading of type OBJ_MODULE
Diffstat (limited to 'src')
-rw-r--r--src/module.c41
-rw-r--r--src/rdb.c87
-rw-r--r--src/rdb.h1
-rw-r--r--src/redismodule.h11
-rw-r--r--src/server.h10
5 files changed, 134 insertions, 16 deletions
diff --git a/src/module.c b/src/module.c
index f4f753c00..812a54e04 100644
--- a/src/module.c
+++ b/src/module.c
@@ -29,6 +29,7 @@
#include "server.h"
#include "cluster.h"
+#include "rdb.h"
#include <dlfcn.h>
#define REDISMODULE_CORE 1
@@ -3078,6 +3079,11 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver,
moduleTypeMemUsageFunc mem_usage;
moduleTypeDigestFunc digest;
moduleTypeFreeFunc free;
+ struct {
+ moduleTypeAuxLoadFunc aux_load;
+ moduleTypeAuxSaveFunc aux_save;
+ int aux_save_triggers;
+ } v2;
} *tms = (struct typemethods*) typemethods_ptr;
moduleType *mt = zcalloc(sizeof(*mt));
@@ -3089,6 +3095,11 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver,
mt->mem_usage = tms->mem_usage;
mt->digest = tms->digest;
mt->free = tms->free;
+ if (tms->version >= 2) {
+ mt->aux_load = tms->v2.aux_load;
+ mt->aux_save = tms->v2.aux_save;
+ mt->aux_save_triggers = tms->v2.aux_save_triggers;
+ }
memcpy(mt->name,name,sizeof(mt->name));
listAddNodeTail(ctx->module->types,mt);
return mt;
@@ -3355,6 +3366,36 @@ loaderr:
return 0; /* Never reached. */
}
+/* Iterate over modules, and trigger rdb aux saving for the ones modules types
+ * who asked for it. */
+ssize_t rdbSaveModulesAux(rio *rdb, int when) {
+ size_t total_written = 0;
+ dictIterator *di = dictGetIterator(modules);
+ dictEntry *de;
+
+ while ((de = dictNext(di)) != NULL) {
+ struct RedisModule *module = dictGetVal(de);
+ listIter li;
+ listNode *ln;
+
+ listRewind(module->types,&li);
+ while((ln = listNext(&li))) {
+ moduleType *mt = ln->value;
+ if (!mt->aux_save || !(mt->aux_save_triggers & when))
+ continue;
+ ssize_t ret = rdbSaveSingleModuleAux(rdb, when, mt);
+ if (ret==-1) {
+ dictReleaseIterator(di);
+ return -1;
+ }
+ total_written += ret;
+ }
+ }
+
+ dictReleaseIterator(di);
+ return total_written;
+}
+
/* --------------------------------------------------------------------------
* Key digest API (DEBUG DIGEST interface for modules types)
* -------------------------------------------------------------------------- */
diff --git a/src/rdb.c b/src/rdb.c
index 2118a00f4..4e00fad67 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -971,7 +971,6 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
RedisModuleIO io;
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
- moduleInitIOContext(io,mt,rdb,key);
/* Write the "module" identifier as prefix, so that we'll be able
* to call the right module during loading. */
@@ -980,10 +979,13 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
io.bytes += retval;
/* Then write the module-specific representation + EOF marker. */
+ moduleInitIOContext(io,mt,rdb,key);
mt->rdb_save(&io,mv->value);
retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
- if (retval == -1) return -1;
- io.bytes += retval;
+ if (retval == -1)
+ io.error = 1;
+ else
+ io.bytes += retval;
if (io.ctx) {
moduleFreeContext(io.ctx);
@@ -1101,6 +1103,40 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
return 1;
}
+ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) {
+ /* Save a module-specific aux value. */
+ RedisModuleIO io;
+ int retval = rdbSaveType(rdb, RDB_OPCODE_MODULE_AUX);
+
+ /* Write the "module" identifier as prefix, so that we'll be able
+ * to call the right module during loading. */
+ retval = rdbSaveLen(rdb,mt->id);
+ if (retval == -1) return -1;
+ io.bytes += retval;
+
+ /* write the 'when' so that we can provide it on loading */
+ retval = rdbSaveLen(rdb,when);
+ if (retval == -1) return -1;
+ io.bytes += retval;
+
+ /* Then write the module-specific representation + EOF marker. */
+ moduleInitIOContext(io,mt,rdb,NULL);
+ mt->aux_save(&io,when);
+ retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
+ if (retval == -1)
+ io.error = 1;
+ else
+ io.bytes += retval;
+
+ if (io.ctx) {
+ moduleFreeContext(io.ctx);
+ zfree(io.ctx);
+ }
+ if (io.error)
+ return -1;
+ return io.bytes;
+}
+
/* Produces a dump of the database in RDB format sending it to the specified
* Redis I/O channel. On success C_OK is returned, otherwise C_ERR
* is returned and part of the output, or all the output, can be
@@ -1122,6 +1158,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
+ if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
@@ -1183,6 +1220,8 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
di = NULL; /* So that we don't release it again on error. */
}
+ if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;
+
/* EOF opcode */
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
@@ -2089,15 +2128,11 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
decrRefCount(auxval);
continue; /* Read type again. */
} else if (type == RDB_OPCODE_MODULE_AUX) {
- /* This is just for compatibility with the future: we have plans
- * to add the ability for modules to store anything in the RDB
- * file, like data that is not related to the Redis key space.
- * Such data will potentially be stored both before and after the
- * RDB keys-values section. For this reason since RDB version 9,
- * we have the ability to read a MODULE_AUX opcode followed by an
- * identifier of the module, and a serialized value in "MODULE V2"
- * format. */
+ /* Load module data that is not related to the Redis key space.
+ * Such data can be potentially be stored both before and after the
+ * RDB keys-values section. */
uint64_t moduleid = rdbLoadLen(rdb,NULL);
+ int when = rdbLoadLen(rdb,NULL);
if (rioGetReadError(rdb)) goto eoferr;
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
char name[10];
@@ -2108,10 +2143,32 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load: no matching module '%s'", name);
exit(1);
} else if (!rdbCheckMode && mt != NULL) {
- /* This version of Redis actually does not know what to do
- * with modules AUX data... */
- serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load for the module '%s'. Probably you want to use a newer version of Redis which implements aux data callbacks", name);
- exit(1);
+ if (!mt->aux_load) {
+ /* Module doesn't support AUX. */
+ serverLog(LL_WARNING,"The RDB file contains module AUX data, but the module '%s' doesn't seem to support it.", name);
+ exit(1);
+ }
+
+ RedisModuleIO io;
+ moduleInitIOContext(io,mt,rdb,NULL);
+ io.ver = 2;
+ /* Call the rdb_load method of the module providing the 10 bit
+ * encoding version in the lower 10 bits of the module ID. */
+ if (mt->aux_load(&io,moduleid&1023, when) || io.error) {
+ moduleTypeNameByID(name,moduleid);
+ serverLog(LL_WARNING,"The RDB file contains module AUX data for the module type '%s', that the responsible module is not able to load. Check for modules log above for additional clues.", name);
+ exit(1);
+ }
+ if (io.ctx) {
+ moduleFreeContext(io.ctx);
+ zfree(io.ctx);
+ }
+ uint64_t eof = rdbLoadLen(rdb,NULL);
+ if (eof != RDB_MODULE_OPCODE_EOF) {
+ serverLog(LL_WARNING,"The RDB file contains module AUX data for the module '%s' that is not terminated by the proper module value EOF marker", name);
+ exit(1);
+ }
+ continue;
} else {
/* RDB check mode. */
robj *aux = rdbLoadCheckModuleValue(rdb,name);
diff --git a/src/rdb.h b/src/rdb.h
index 0acddf9ab..40a50f7ba 100644
--- a/src/rdb.h
+++ b/src/rdb.h
@@ -145,6 +145,7 @@ size_t rdbSavedObjectLen(robj *o);
robj *rdbLoadObject(int type, rio *rdb, robj *key);
void backgroundSaveDoneHandler(int exitcode, int bysignal);
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime);
+ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt);
robj *rdbLoadStringObject(rio *rdb);
ssize_t rdbSaveStringObject(rio *rdb, robj *obj);
ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len);
diff --git a/src/redismodule.h b/src/redismodule.h
index b9c73957b..21227fe3a 100644
--- a/src/redismodule.h
+++ b/src/redismodule.h
@@ -129,6 +129,10 @@
#define REDISMODULE_NOT_USED(V) ((void) V)
+/* Bit flags for aux_save_triggers and the aux_load and aux_save callbacks */
+#define REDISMODULE_AUX_BEFORE_RDB (1<<0)
+#define REDISMODULE_AUX_AFTER_RDB (1<<1)
+
/* This type represents a timer handle, and is returned when a timer is
* registered and used in order to invalidate a timer. It's just a 64 bit
* number, because this is how each timer is represented inside the radix tree
@@ -166,6 +170,8 @@ typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlocke
typedef int (*RedisModuleNotificationFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver);
typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value);
+typedef int (*RedisModuleTypeAuxLoadFunc)(RedisModuleIO *rdb, int encver, int when);
+typedef void (*RedisModuleTypeAuxSaveFunc)(RedisModuleIO *rdb, int when);
typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value);
typedef size_t (*RedisModuleTypeMemUsageFunc)(const void *value);
typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value);
@@ -174,7 +180,7 @@ typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const cha
typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter);
-#define REDISMODULE_TYPE_METHOD_VERSION 1
+#define REDISMODULE_TYPE_METHOD_VERSION 2
typedef struct RedisModuleTypeMethods {
uint64_t version;
RedisModuleTypeLoadFunc rdb_load;
@@ -183,6 +189,9 @@ typedef struct RedisModuleTypeMethods {
RedisModuleTypeMemUsageFunc mem_usage;
RedisModuleTypeDigestFunc digest;
RedisModuleTypeFreeFunc free;
+ RedisModuleTypeAuxLoadFunc aux_load;
+ RedisModuleTypeAuxSaveFunc aux_save;
+ int aux_save_triggers;
} RedisModuleTypeMethods;
#define REDISMODULE_GET_API(name) \
diff --git a/src/server.h b/src/server.h
index b200a6696..33522dd69 100644
--- a/src/server.h
+++ b/src/server.h
@@ -536,6 +536,10 @@ typedef long long mstime_t; /* millisecond time type. */
#define REDISMODULE_TYPE_ENCVER(id) (id & REDISMODULE_TYPE_ENCVER_MASK)
#define REDISMODULE_TYPE_SIGN(id) ((id & ~((uint64_t)REDISMODULE_TYPE_ENCVER_MASK)) >>REDISMODULE_TYPE_ENCVER_BITS)
+/* Bit flags for moduleTypeAuxSaveFunc */
+#define REDISMODULE_AUX_BEFORE_RDB (1<<0)
+#define REDISMODULE_AUX_AFTER_RDB (1<<1)
+
struct RedisModule;
struct RedisModuleIO;
struct RedisModuleDigest;
@@ -548,6 +552,8 @@ struct redisObject;
* is deleted. */
typedef void *(*moduleTypeLoadFunc)(struct RedisModuleIO *io, int encver);
typedef void (*moduleTypeSaveFunc)(struct RedisModuleIO *io, void *value);
+typedef int (*moduleTypeAuxLoadFunc)(struct RedisModuleIO *rdb, int encver, int when);
+typedef void (*moduleTypeAuxSaveFunc)(struct RedisModuleIO *rdb, int when);
typedef void (*moduleTypeRewriteFunc)(struct RedisModuleIO *io, struct redisObject *key, void *value);
typedef void (*moduleTypeDigestFunc)(struct RedisModuleDigest *digest, void *value);
typedef size_t (*moduleTypeMemUsageFunc)(const void *value);
@@ -564,6 +570,9 @@ typedef struct RedisModuleType {
moduleTypeMemUsageFunc mem_usage;
moduleTypeDigestFunc digest;
moduleTypeFreeFunc free;
+ moduleTypeAuxLoadFunc aux_load;
+ moduleTypeAuxSaveFunc aux_save;
+ int aux_save_triggers;
char name[10]; /* 9 bytes name + null term. Charset: A-Z a-z 0-9 _- */
} moduleType;
@@ -1528,6 +1537,7 @@ void moduleAcquireGIL(void);
void moduleReleaseGIL(void);
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
void moduleCallCommandFilters(client *c);
+ssize_t rdbSaveModulesAux(rio *rdb, int when);
/* Utils */
long long ustime(void);