summaryrefslogtreecommitdiff
path: root/src/rdb.c
diff options
context:
space:
mode:
authorEduardo Semprebon <eduardobr@gmail.com>2021-11-04 09:46:50 +0100
committerGitHub <noreply@github.com>2021-11-04 10:46:50 +0200
commit91d0c758e5453644bb4e784a2af86033ccb971fc (patch)
treeae3b3651414ba4cfa9221411d5dfa1190d2cf06e /src/rdb.c
parent06dd202a051a6788ebb2e13cd4603ffbf7e6dbd0 (diff)
downloadredis-91d0c758e5453644bb4e784a2af86033ccb971fc.tar.gz
Replica keep serving data during repl-diskless-load=swapdb for better availability (#9323)
For diskless replication in swapdb mode, considering we already spend replica memory having a backup of current db to restore in case of failure, we can have the following benefits by instead swapping database only in case we succeeded in transferring db from master: - Avoid `LOADING` response during failed and successful synchronization for cases where the replica is already up and running with data. - Faster total time of diskless replication, because now we're moving from Transfer + Flush + Load time to Transfer + Load only. Flushing the tempDb is done asynchronously after swapping. - This could be implemented also for disk replication with similar benefits if consumers are willing to spend the extra memory usage. General notes: - The concept of `backupDb` becomes `tempDb` for clarity. - Async loading mode will only kick in if the replica is syncing from a master that has the same repl-id the one it had before. i.e. the data it's getting belongs to a different time of the same timeline. - New property in INFO: `async_loading` to differentiate from the blocking loading - Slot to Key mapping is now a field of `redisDb` as it's more natural to access it from both server.db and the tempDb that is passed around. - Because this is affecting replicas only, we assume that if they are not readonly and write commands during replication, they are lost after SYNC same way as before, but we're still denying CONFIG SET here anyways to avoid complications. Considerations for review: - We have many cases where server.loading flag is used and even though I tried my best, there may be cases where async_loading should be checked as well and cases where it shouldn't (would require very good understanding of whole code) - Several places that had different behavior depending on the loading flag where actually meant to just handle commands coming from the AOF client differently than ones coming from real clients, changed to check CLIENT_ID_AOF instead. **Additional for Release Notes** - Bugfix - server.dirty was not incremented for any kind of diskless replication, as effect it wouldn't contribute on triggering next database SAVE - New flag for RM_GetContextFlags module API: REDISMODULE_CTX_FLAGS_ASYNC_LOADING - Deprecated RedisModuleEvent_ReplBackup. Starting from Redis 7.0, we don't fire this event. Instead, we have the new RedisModuleEvent_ReplAsyncLoad holding 3 sub-events: STARTED, ABORTED and COMPLETED. - New module flag REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD for RedisModule_SetModuleOptions to allow modules to declare they support the diskless replication with async loading (when absent, we fall back to disk-based loading). Co-authored-by: Eduardo Semprebon <edus@saxobank.com> Co-authored-by: Oran Agra <oran@redislabs.com>
Diffstat (limited to 'src/rdb.c')
-rw-r--r--src/rdb.c28
1 files changed, 17 insertions, 11 deletions
diff --git a/src/rdb.c b/src/rdb.c
index a56a3133c..9ccf3d0f5 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -48,6 +48,10 @@
/* This macro is called when RDB read failed (possibly a short read) */
#define rdbReportReadError(...) rdbReportError(0, __LINE__,__VA_ARGS__)
+/* This macro tells if we are in the context of a RESTORE command, and not loading an RDB or AOF. */
+#define isRestoreContext() \
+ (server.current_client == NULL || server.current_client->id == CLIENT_ID_AOF) ? 0 : 1
+
char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */
extern int rdbCheckMode;
void rdbCheckError(const char *fmt, ...);
@@ -68,7 +72,7 @@ void rdbReportError(int corruption_error, int linenum, char *reason, ...) {
vsnprintf(msg+len,sizeof(msg)-len,reason,ap);
va_end(ap);
- if (!server.loading) {
+ if (isRestoreContext()) {
/* If we're in the context of a RESTORE command, just propagate the error. */
/* log in VERBOSE, and return (don't exit). */
serverLog(LL_VERBOSE, "%s", msg);
@@ -381,7 +385,7 @@ void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) {
if ((clen = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
if ((c = ztrymalloc(clen)) == NULL) {
- serverLog(server.loading? LL_WARNING: LL_VERBOSE, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)clen);
+ serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)clen);
goto err;
}
@@ -392,7 +396,7 @@ void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) {
val = sdstrynewlen(SDS_NOINIT,len);
}
if (!val) {
- serverLog(server.loading? LL_WARNING: LL_VERBOSE, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)len);
+ serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)len);
goto err;
}
@@ -525,7 +529,7 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) {
if (plain || sds) {
void *buf = plain ? ztrymalloc(len) : sdstrynewlen(SDS_NOINIT,len);
if (!buf) {
- serverLog(server.loading? LL_WARNING: LL_VERBOSE, "rdbGenericLoadStringObject failed allocating %llu bytes", len);
+ serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len);
return NULL;
}
if (lenptr) *lenptr = len;
@@ -541,7 +545,7 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) {
robj *o = encode ? tryCreateStringObject(SDS_NOINIT,len) :
tryCreateRawStringObject(SDS_NOINIT,len);
if (!o) {
- serverLog(server.loading? LL_WARNING: LL_VERBOSE, "rdbGenericLoadStringObject failed allocating %llu bytes", len);
+ serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len);
return NULL;
}
if (len && rioRead(rdb,o->ptr,len) == 0) {
@@ -2517,9 +2521,10 @@ emptykey:
/* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats. */
-void startLoading(size_t size, int rdbflags) {
+void startLoading(size_t size, int rdbflags, int async) {
/* Load the DB */
server.loading = 1;
+ if (async == 1) server.async_loading = 1;
server.loading_start_time = time(NULL);
server.loading_loaded_bytes = 0;
server.loading_total_bytes = size;
@@ -2547,7 +2552,7 @@ void startLoadingFile(FILE *fp, char* filename, int rdbflags) {
if (fstat(fileno(fp), &sb) == -1)
sb.st_size = 0;
rdbFileBeingLoaded = filename;
- startLoading(sb.st_size, rdbflags);
+ startLoading(sb.st_size, rdbflags, 0);
}
/* Refresh the loading progress info */
@@ -2560,6 +2565,7 @@ void loadingProgress(off_t pos) {
/* Loading finished */
void stopLoading(int success) {
server.loading = 0;
+ server.async_loading = 0;
blockingOperationEnds();
rdbFileBeingLoaded = NULL;
@@ -2610,10 +2616,10 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
-int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
+int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi, redisDb *dbarray) {
uint64_t dbid = 0;
int type, rdbver;
- redisDb *db = server.db+0;
+ redisDb *db = dbarray+0;
char buf[1024];
int error;
long long empty_keys_skipped = 0;
@@ -2685,7 +2691,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
"databases. Exiting\n", server.dbnum);
exit(1);
}
- db = server.db+dbid;
+ db = dbarray+dbid;
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_RESIZEDB) {
/* RESIZEDB: Hint about the size of the keys in the currently
@@ -2962,7 +2968,7 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
startLoadingFile(fp, filename,rdbflags);
rioInitWithFile(&rdb,fp);
- retval = rdbLoadRio(&rdb,rdbflags,rsi);
+ retval = rdbLoadRio(&rdb,rdbflags,rsi,server.db);
fclose(fp);
stopLoading(retval==C_OK);
return retval;