summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSalvatore Sanfilippo <antirez@gmail.com>2019-09-26 11:52:42 +0200
committerGitHub <noreply@github.com>2019-09-26 11:52:42 +0200
commit959fb5cf6879c5fb04e8fcf00efda4816e358d0d (patch)
treeb5f35d85bb5ac3b60e30731cd602317915c9e616
parentb0a90d8fa825bdc35982529c97ebe316cdca0161 (diff)
parent40c4183196b8c6e1c44c27400965786b9f010c74 (diff)
downloadredis-959fb5cf6879c5fb04e8fcf00efda4816e358d0d.tar.gz
Merge pull request #6235 from oranagra/module_rdb_load_errors
Allow modules to handle RDB loading errors.
-rw-r--r--src/module.c64
-rw-r--r--src/redismodule.h7
-rw-r--r--src/replication.c9
-rw-r--r--src/server.h1
-rw-r--r--tests/modules/testrdb.c13
-rw-r--r--tests/unit/moduleapi/testrdb.tcl62
6 files changed, 147 insertions, 9 deletions
diff --git a/src/module.c b/src/module.c
index 3074b2b19..a9db0a3a5 100644
--- a/src/module.c
+++ b/src/module.c
@@ -52,6 +52,7 @@ struct RedisModule {
list *using; /* List of modules we use some APIs of. */
list *filters; /* List of filters the module has registered. */
int in_call; /* RM_Call() nesting level */
+ int options; /* Moduile options and capabilities. */
};
typedef struct RedisModule RedisModule;
@@ -772,6 +773,19 @@ long long RM_Milliseconds(void) {
return mstime();
}
+/* Set flags defining capabilities or behavior bit flags.
+ *
+ * REDISMODULE_OPTIONS_HANDLE_IO_ERRORS:
+ * Generally, modules don't need to bother with this, as the process will just
+ * terminate if a read error happens, however, setting this flag would allow
+ * repl-diskless-load to work if enabled.
+ * The module should use RedisModule_IsIOError after reads, before using the
+ * data that was read, and in case of error, propagate it upwards, and also be
+ * able to release the partially populated value and all it's allocations. */
+void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) {
+ ctx->module->options = options;
+}
+
/* --------------------------------------------------------------------------
* Automatic memory management for modules
* -------------------------------------------------------------------------- */
@@ -3150,9 +3164,14 @@ void *RM_ModuleTypeGetValue(RedisModuleKey *key) {
* RDB loading and saving functions
* -------------------------------------------------------------------------- */
-/* Called when there is a load error in the context of a module. This cannot
- * be recovered like for the built-in types. */
+/* Called when there is a load error in the context of a module. On some
+ * modules this cannot be recovered, but if the module declared capability
+ * to handle errors, we'll raise a flag rather than exiting. */
void moduleRDBLoadError(RedisModuleIO *io) {
+ if (io->ctx->module->options & REDISMODULE_OPTIONS_HANDLE_IO_ERRORS) {
+ io->error = 1;
+ return;
+ }
serverLog(LL_WARNING,
"Error loading data from RDB (short read or EOF). "
"Read performed by module '%s' about type '%s' "
@@ -3163,6 +3182,33 @@ void moduleRDBLoadError(RedisModuleIO *io) {
exit(1);
}
+/* Returns 0 if there's at least one registered data type that did not declare
+ * REDISMODULE_OPTIONS_HANDLE_IO_ERRORS, in which case diskless loading should
+ * be avoided since it could cause data loss. */
+int moduleAllDatatypesHandleErrors() {
+ dictIterator *di = dictGetIterator(modules);
+ dictEntry *de;
+
+ while ((de = dictNext(di)) != NULL) {
+ struct RedisModule *module = dictGetVal(de);
+ if (listLength(module->types) &&
+ !(module->options & REDISMODULE_OPTIONS_HANDLE_IO_ERRORS))
+ {
+ dictReleaseIterator(di);
+ return 0;
+ }
+ }
+ dictReleaseIterator(di);
+ return 1;
+}
+
+/* Returns true if any previous IO API failed.
+ * for Load* APIs the REDISMODULE_OPTIONS_HANDLE_IO_ERRORS flag must be set with
+ * RediModule_SetModuleOptions first. */
+int RM_IsIOError(RedisModuleIO *io) {
+ return io->error;
+}
+
/* Save an unsigned 64 bit value into the RDB file. This function should only
* be called in the context of the rdb_save method of modules implementing new
* data types. */
@@ -3186,6 +3232,7 @@ saveerr:
* be called in the context of the rdb_load method of modules implementing
* new data types. */
uint64_t RM_LoadUnsigned(RedisModuleIO *io) {
+ if (io->error) return 0;
if (io->ver == 2) {
uint64_t opcode = rdbLoadLen(io->rio,NULL);
if (opcode != RDB_MODULE_OPCODE_UINT) goto loaderr;
@@ -3197,7 +3244,7 @@ uint64_t RM_LoadUnsigned(RedisModuleIO *io) {
loaderr:
moduleRDBLoadError(io);
- return 0; /* Never reached. */
+ return 0;
}
/* Like RedisModule_SaveUnsigned() but for signed 64 bit values. */
@@ -3256,6 +3303,7 @@ saveerr:
/* Implements RM_LoadString() and RM_LoadStringBuffer() */
void *moduleLoadString(RedisModuleIO *io, int plain, size_t *lenptr) {
+ if (io->error) return NULL;
if (io->ver == 2) {
uint64_t opcode = rdbLoadLen(io->rio,NULL);
if (opcode != RDB_MODULE_OPCODE_STRING) goto loaderr;
@@ -3267,7 +3315,7 @@ void *moduleLoadString(RedisModuleIO *io, int plain, size_t *lenptr) {
loaderr:
moduleRDBLoadError(io);
- return NULL; /* Never reached. */
+ return NULL;
}
/* In the context of the rdb_load method of a module data type, loads a string
@@ -3316,6 +3364,7 @@ saveerr:
/* In the context of the rdb_save method of a module data type, loads back the
* double value saved by RedisModule_SaveDouble(). */
double RM_LoadDouble(RedisModuleIO *io) {
+ if (io->error) return 0;
if (io->ver == 2) {
uint64_t opcode = rdbLoadLen(io->rio,NULL);
if (opcode != RDB_MODULE_OPCODE_DOUBLE) goto loaderr;
@@ -3327,7 +3376,7 @@ double RM_LoadDouble(RedisModuleIO *io) {
loaderr:
moduleRDBLoadError(io);
- return 0; /* Never reached. */
+ return 0;
}
/* In the context of the rdb_save method of a module data type, saves a float
@@ -3352,6 +3401,7 @@ saveerr:
/* In the context of the rdb_save method of a module data type, loads back the
* float value saved by RedisModule_SaveFloat(). */
float RM_LoadFloat(RedisModuleIO *io) {
+ if (io->error) return 0;
if (io->ver == 2) {
uint64_t opcode = rdbLoadLen(io->rio,NULL);
if (opcode != RDB_MODULE_OPCODE_FLOAT) goto loaderr;
@@ -3363,7 +3413,7 @@ float RM_LoadFloat(RedisModuleIO *io) {
loaderr:
moduleRDBLoadError(io);
- return 0; /* Never reached. */
+ return 0;
}
/* Iterate over modules, and trigger rdb aux saving for the ones modules types
@@ -5461,6 +5511,8 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(ModuleTypeSetValue);
REGISTER_API(ModuleTypeGetType);
REGISTER_API(ModuleTypeGetValue);
+ REGISTER_API(IsIOError);
+ REGISTER_API(SetModuleOptions);
REGISTER_API(SaveUnsigned);
REGISTER_API(LoadUnsigned);
REGISTER_API(SaveSigned);
diff --git a/src/redismodule.h b/src/redismodule.h
index 45fcc2091..ad1c6e652 100644
--- a/src/redismodule.h
+++ b/src/redismodule.h
@@ -144,6 +144,9 @@ typedef uint64_t RedisModuleTimerID;
/* Do filter RedisModule_Call() commands initiated by module itself. */
#define REDISMODULE_CMDFILTER_NOSELF (1<<0)
+/* Declare that the module can handle errors with RedisModule_SetModuleOptions. */
+#define REDISMODULE_OPTIONS_HANDLE_IO_ERRORS (1<<0)
+
/* ------------------------- End of common defines ------------------------ */
#ifndef REDISMODULE_CORE
@@ -280,6 +283,8 @@ RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx
int REDISMODULE_API_FUNC(RedisModule_ModuleTypeSetValue)(RedisModuleKey *key, RedisModuleType *mt, void *value);
RedisModuleType *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetType)(RedisModuleKey *key);
void *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetValue)(RedisModuleKey *key);
+int REDISMODULE_API_FUNC(RedisModule_IsIOError)(RedisModuleIO *io);
+void REDISMODULE_API_FUNC(RedisModule_SetModuleOptions)(RedisModuleCtx *ctx, int options);
void REDISMODULE_API_FUNC(RedisModule_SaveUnsigned)(RedisModuleIO *io, uint64_t value);
uint64_t REDISMODULE_API_FUNC(RedisModule_LoadUnsigned)(RedisModuleIO *io);
void REDISMODULE_API_FUNC(RedisModule_SaveSigned)(RedisModuleIO *io, int64_t value);
@@ -454,6 +459,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(ModuleTypeSetValue);
REDISMODULE_GET_API(ModuleTypeGetType);
REDISMODULE_GET_API(ModuleTypeGetValue);
+ REDISMODULE_GET_API(IsIOError);
+ REDISMODULE_GET_API(SetModuleOptions);
REDISMODULE_GET_API(SaveUnsigned);
REDISMODULE_GET_API(LoadUnsigned);
REDISMODULE_GET_API(SaveSigned);
diff --git a/src/replication.c b/src/replication.c
index d6646c9ef..bb4287b62 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -1139,8 +1139,15 @@ void restartAOFAfterSYNC() {
static int useDisklessLoad() {
/* compute boolean decision to use diskless load */
- return server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB ||
+ int enabled = server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB ||
(server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0);
+ /* Check all modules handle read errors, otherwise it's not safe to use diskless load. */
+ if (enabled && !moduleAllDatatypesHandleErrors()) {
+ serverLog(LL_WARNING,
+ "Skipping diskless-load because there are modules that don't handle read errors.");
+ enabled = 0;
+ }
+ return enabled;
}
/* Helper function for readSyncBulkPayload() to make backups of the current
diff --git a/src/server.h b/src/server.h
index 31d253066..d0d0ece1c 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1541,6 +1541,7 @@ void moduleReleaseGIL(void);
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
void moduleCallCommandFilters(client *c);
ssize_t rdbSaveModulesAux(rio *rdb, int when);
+int moduleAllDatatypesHandleErrors();
/* Utils */
long long ustime(void);
diff --git a/tests/modules/testrdb.c b/tests/modules/testrdb.c
index 415497a2f..d73c8bfd3 100644
--- a/tests/modules/testrdb.c
+++ b/tests/modules/testrdb.c
@@ -15,6 +15,8 @@ RedisModuleString *after_str = NULL;
void *testrdb_type_load(RedisModuleIO *rdb, int encver) {
int count = RedisModule_LoadSigned(rdb);
+ if (RedisModule_IsIOError(rdb))
+ return NULL;
assert(count==1);
assert(encver==1);
RedisModuleString *str = RedisModule_LoadString(rdb);
@@ -57,6 +59,8 @@ int testrdb_aux_load(RedisModuleIO *rdb, int encver, int when) {
RedisModule_FreeString(ctx, before_str);
before_str = NULL;
int count = RedisModule_LoadSigned(rdb);
+ if (RedisModule_IsIOError(rdb))
+ return REDISMODULE_ERR;
if (count)
before_str = RedisModule_LoadString(rdb);
} else {
@@ -64,14 +68,19 @@ int testrdb_aux_load(RedisModuleIO *rdb, int encver, int when) {
RedisModule_FreeString(ctx, after_str);
after_str = NULL;
int count = RedisModule_LoadSigned(rdb);
+ if (RedisModule_IsIOError(rdb))
+ return REDISMODULE_ERR;
if (count)
after_str = RedisModule_LoadString(rdb);
}
+ if (RedisModule_IsIOError(rdb))
+ return REDISMODULE_ERR;
return REDISMODULE_OK;
}
void testrdb_type_free(void *value) {
- RedisModule_FreeString(NULL, (RedisModuleString*)value);
+ if (value)
+ RedisModule_FreeString(NULL, (RedisModuleString*)value);
}
int testrdb_set_before(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
@@ -171,6 +180,8 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
if (RedisModule_Init(ctx,"testrdb",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
+ RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_HANDLE_IO_ERRORS);
+
if (argc > 0)
RedisModule_StringToLongLong(argv[0], &conf_aux_count);
diff --git a/tests/unit/moduleapi/testrdb.tcl b/tests/unit/moduleapi/testrdb.tcl
index 22201a08e..c72570002 100644
--- a/tests/unit/moduleapi/testrdb.tcl
+++ b/tests/unit/moduleapi/testrdb.tcl
@@ -56,7 +56,67 @@ tags "modules" {
}
}
+ tags {repl} {
+ test {diskless loading short read with module} {
+ start_server [list overrides [list loadmodule "$testmodule"]] {
+ set replica [srv 0 client]
+ set replica_host [srv 0 host]
+ set replica_port [srv 0 port]
+ start_server [list overrides [list loadmodule "$testmodule"]] {
+ set master [srv 0 client]
+ set master_host [srv 0 host]
+ set master_port [srv 0 port]
- # TODO: test short read handling
+ # Set master and replica to use diskless replication
+ $master config set repl-diskless-sync yes
+ $master config set rdbcompression no
+ $replica config set repl-diskless-load swapdb
+ for {set k 0} {$k < 30} {incr k} {
+ r testrdb.set.key key$k [string repeat A [expr {int(rand()*1000000)}]]
+ }
+ # Start the replication process...
+ $master config set repl-diskless-sync-delay 0
+ $replica replicaof $master_host $master_port
+
+ # kill the replication at various points
+ set attempts 3
+ if {$::accurate} { set attempts 10 }
+ for {set i 0} {$i < $attempts} {incr i} {
+ # wait for the replica to start reading the rdb
+ # using the log file since the replica only responds to INFO once in 2mb
+ wait_for_log_message -1 "*Loading DB in memory*" 5 2000 1
+
+ # add some additional random sleep so that we kill the master on a different place each time
+ after [expr {int(rand()*100)}]
+
+ # kill the replica connection on the master
+ set killed [$master client kill type replica]
+
+ if {[catch {
+ set res [wait_for_log_message -1 "*Internal error in RDB*" 5 100 10]
+ if {$::verbose} {
+ puts $res
+ }
+ }]} {
+ puts "failed triggering short read"
+ # force the replica to try another full sync
+ $master client kill type replica
+ $master set asdf asdf
+ # the side effect of resizing the backlog is that it is flushed (16k is the min size)
+ $master config set repl-backlog-size [expr {16384 + $i}]
+ }
+ # wait for loading to stop (fail)
+ wait_for_condition 100 10 {
+ [s -1 loading] eq 0
+ } else {
+ fail "Replica didn't disconnect"
+ }
+ }
+ # enable fast shutdown
+ $master config set rdb-key-save-delay 0
+ }
+ }
+ }
+ }
}