summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--redis.conf10
-rw-r--r--src/aof.c2
-rw-r--r--src/cluster.c56
-rw-r--r--src/cluster.h26
-rw-r--r--src/db.c145
-rw-r--r--src/defrag.c2
-rw-r--r--src/module.c51
-rw-r--r--src/multi.c8
-rw-r--r--src/rdb.c28
-rw-r--r--src/rdb.h2
-rw-r--r--src/redismodule.h27
-rw-r--r--src/replication.c159
-rw-r--r--src/scripting.c14
-rw-r--r--src/server.c8
-rw-r--r--src/server.h19
-rw-r--r--tests/cluster/tests/17-diskless-load-swapdb.tcl10
-rw-r--r--tests/cluster/tests/includes/init-tests.tcl1
-rw-r--r--tests/integration/replication.tcl238
-rw-r--r--tests/modules/testrdb.c136
-rw-r--r--tests/unit/moduleapi/testrdb.tcl115
20 files changed, 754 insertions, 303 deletions
diff --git a/redis.conf b/redis.conf
index bcb5fa022..04e8bb117 100644
--- a/redis.conf
+++ b/redis.conf
@@ -605,9 +605,13 @@ repl-diskless-sync-delay 5
#
# "disabled" - Don't use diskless load (store the rdb file to the disk first)
# "on-empty-db" - Use diskless load only when it is completely safe.
-# "swapdb" - Keep a copy of the current db contents in RAM while parsing
-# the data directly from the socket. note that this requires
-# sufficient memory, if you don't have it, you risk an OOM kill.
+# "swapdb" - Keep current db contents in RAM while parsing the data directly
+# from the socket. Replicas in this mode can keep serving current
+# data set while replication is in progress, except for cases where
+# they can't recognize master as having a data set from same
+# replication history.
+# Note that this requires sufficient memory, if you don't have it,
+# you risk an OOM kill.
repl-diskless-load disabled
# Replicas send PINGs to server in a predefined interval. It's possible to
diff --git a/src/aof.c b/src/aof.c
index 4c334e228..b345c2175 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -748,7 +748,7 @@ int loadAppendOnlyFile(char *filename) {
serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
rioInitWithFile(&rdb,fp);
- if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) {
+ if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL,server.db) != C_OK) {
serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
goto readerr;
} else {
diff --git a/src/cluster.c b/src/cluster.c
index 2cd98cd02..35f3a76be 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -593,8 +593,8 @@ void clusterInit(void) {
serverPanic("Unrecoverable error creating Redis Cluster socket accept handler.");
}
- /* Reset data for the Slot to key API. */
- slotToKeyFlush();
+ /* Initialize data for the Slot to key API. */
+ slotToKeyInit(server.db);
/* Set myself->port/cport/pport to my listening ports, we'll just need to
* discover the IP address via MEET messages. */
@@ -4954,7 +4954,7 @@ NULL
unsigned int keys_in_slot = countKeysInSlot(slot);
unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
addReplyArrayLen(c,numkeys);
- dictEntry *de = server.cluster->slots_to_keys[slot].head;
+ dictEntry *de = (*server.db->slots_to_keys).by_slot[slot].head;
for (unsigned int j = 0; j < numkeys; j++) {
serverAssert(de != NULL);
sds sdskey = dictGetKey(de);
@@ -6201,26 +6201,28 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {
* while rehashing the cluster and in other conditions when we need to
* understand if we have keys for a given hash slot. */
-void slotToKeyAddEntry(dictEntry *entry) {
+void slotToKeyAddEntry(dictEntry *entry, redisDb *db) {
sds key = entry->key;
unsigned int hashslot = keyHashSlot(key, sdslen(key));
- server.cluster->slots_to_keys[hashslot].count++;
+ slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
+ slot_to_keys->count++;
/* Insert entry before the first element in the list. */
- dictEntry *first = server.cluster->slots_to_keys[hashslot].head;
+ dictEntry *first = slot_to_keys->head;
dictEntryNextInSlot(entry) = first;
if (first != NULL) {
serverAssert(dictEntryPrevInSlot(first) == NULL);
dictEntryPrevInSlot(first) = entry;
}
serverAssert(dictEntryPrevInSlot(entry) == NULL);
- server.cluster->slots_to_keys[hashslot].head = entry;
+ slot_to_keys->head = entry;
}
-void slotToKeyDelEntry(dictEntry *entry) {
+void slotToKeyDelEntry(dictEntry *entry, redisDb *db) {
sds key = entry->key;
unsigned int hashslot = keyHashSlot(key, sdslen(key));
- server.cluster->slots_to_keys[hashslot].count--;
+ slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
+ slot_to_keys->count--;
/* Connect previous and next entries to each other. */
dictEntry *next = dictEntryNextInSlot(entry);
@@ -6232,14 +6234,14 @@ void slotToKeyDelEntry(dictEntry *entry) {
dictEntryNextInSlot(prev) = next;
} else {
/* The removed entry was the first in the list. */
- serverAssert(server.cluster->slots_to_keys[hashslot].head == entry);
- server.cluster->slots_to_keys[hashslot].head = next;
+ serverAssert(slot_to_keys->head == entry);
+ slot_to_keys->head = next;
}
}
/* Updates neighbour entries when an entry has been replaced (e.g. reallocated
* during active defrag). */
-void slotToKeyReplaceEntry(dictEntry *entry) {
+void slotToKeyReplaceEntry(dictEntry *entry, redisDb *db) {
dictEntry *next = dictEntryNextInSlot(entry);
dictEntry *prev = dictEntryPrevInSlot(entry);
if (next != NULL) {
@@ -6251,33 +6253,33 @@ void slotToKeyReplaceEntry(dictEntry *entry) {
/* The replaced entry was the first in the list. */
sds key = entry->key;
unsigned int hashslot = keyHashSlot(key, sdslen(key));
- server.cluster->slots_to_keys[hashslot].head = entry;
+ slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
+ slot_to_keys->head = entry;
}
}
-/* Copies the slots-keys map to the specified backup structure. */
-void slotToKeyCopyToBackup(clusterSlotsToKeysData *backup) {
- memcpy(backup, server.cluster->slots_to_keys,
- sizeof(server.cluster->slots_to_keys));
+/* Initialize slots-keys map of given db. */
+void slotToKeyInit(redisDb *db) {
+ db->slots_to_keys = zcalloc(sizeof(clusterSlotToKeyMapping));
}
-/* Overwrites the slots-keys map by copying the provided backup structure. */
-void slotToKeyRestoreBackup(clusterSlotsToKeysData *backup) {
- memcpy(server.cluster->slots_to_keys, backup,
- sizeof(server.cluster->slots_to_keys));
+/* Empty slots-keys map of given db. */
+void slotToKeyFlush(redisDb *db) {
+ memset(db->slots_to_keys, 0,
+ sizeof(clusterSlotToKeyMapping));
}
-/* Empty the slots-keys map of Redis Cluster. */
-void slotToKeyFlush(void) {
- memset(&server.cluster->slots_to_keys, 0,
- sizeof(server.cluster->slots_to_keys));
+/* Free slots-keys map of given db. */
+void slotToKeyDestroy(redisDb *db) {
+ zfree(db->slots_to_keys);
+ db->slots_to_keys = NULL;
}
/* Remove all the keys in the specified hash slot.
* The number of removed items is returned. */
unsigned int delKeysInSlot(unsigned int hashslot) {
unsigned int j = 0;
- dictEntry *de = server.cluster->slots_to_keys[hashslot].head;
+ dictEntry *de = (*server.db->slots_to_keys).by_slot[hashslot].head;
while (de != NULL) {
sds sdskey = dictGetKey(de);
de = dictEntryNextInSlot(de);
@@ -6290,5 +6292,5 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
}
unsigned int countKeysInSlot(unsigned int hashslot) {
- return server.cluster->slots_to_keys[hashslot].count;
+ return (*server.db->slots_to_keys).by_slot[hashslot].count;
}
diff --git a/src/cluster.h b/src/cluster.h
index f97814775..e6cdc1897 100644
--- a/src/cluster.h
+++ b/src/cluster.h
@@ -141,14 +141,17 @@ typedef struct clusterNode {
list *fail_reports; /* List of nodes signaling this as failing */
} clusterNode;
-/* State for the Slot to Key API, for a single slot. The keys in the same slot
- * are linked together using dictEntry metadata. See also "Slot to Key API" in
- * cluster.c. */
-struct clusterSlotToKeys {
+/* Slot to keys for a single slot. The keys in the same slot are linked together
+ * using dictEntry metadata. */
+typedef struct slotToKeys {
uint64_t count; /* Number of keys in the slot. */
dictEntry *head; /* The first key-value entry in the slot. */
+} slotToKeys;
+
+/* Slot to keys mapping for all slots, opaque outside this file. */
+struct clusterSlotToKeyMapping {
+ slotToKeys by_slot[CLUSTER_SLOTS];
};
-typedef struct clusterSlotToKeys clusterSlotsToKeysData[CLUSTER_SLOTS];
/* Dict entry metadata for cluster mode, used for the Slot to Key API to form a
* linked list of the entries belonging to the same slot. */
@@ -168,7 +171,6 @@ typedef struct clusterState {
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
clusterNode *importing_slots_from[CLUSTER_SLOTS];
clusterNode *slots[CLUSTER_SLOTS];
- clusterSlotsToKeysData slots_to_keys;
/* The following fields are used to take the slave state on elections. */
mstime_t failover_auth_time; /* Time of previous or next election. */
int failover_auth_count; /* Number of votes received so far. */
@@ -315,11 +317,11 @@ unsigned long getClusterConnectionsCount(void);
int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len);
void clusterPropagatePublish(robj *channel, robj *message);
unsigned int keyHashSlot(char *key, int keylen);
-void slotToKeyAddEntry(dictEntry *entry);
-void slotToKeyDelEntry(dictEntry *entry);
-void slotToKeyReplaceEntry(dictEntry *entry);
-void slotToKeyCopyToBackup(clusterSlotsToKeysData *backup);
-void slotToKeyRestoreBackup(clusterSlotsToKeysData *backup);
-void slotToKeyFlush(void);
+void slotToKeyAddEntry(dictEntry *entry, redisDb *db);
+void slotToKeyDelEntry(dictEntry *entry, redisDb *db);
+void slotToKeyReplaceEntry(dictEntry *entry, redisDb *db);
+void slotToKeyInit(redisDb *db);
+void slotToKeyFlush(redisDb *db);
+void slotToKeyDestroy(redisDb *db);
#endif /* __CLUSTER_H */
diff --git a/src/db.c b/src/db.c
index 8a90d3215..5d44a9ab9 100644
--- a/src/db.c
+++ b/src/db.c
@@ -35,12 +35,6 @@
#include <signal.h>
#include <ctype.h>
-/* Database backup. */
-struct dbBackup {
- redisDb *dbarray;
- clusterSlotsToKeysData slots_to_keys;
-};
-
/*-----------------------------------------------------------------------------
* C-level DB API
*----------------------------------------------------------------------------*/
@@ -187,7 +181,7 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
serverAssertWithInfo(NULL, key, de != NULL);
dictSetVal(db->dict, de, val);
signalKeyAsReady(db, key, val->type);
- if (server.cluster_enabled) slotToKeyAddEntry(de);
+ if (server.cluster_enabled) slotToKeyAddEntry(de, db);
}
/* This is a special version of dbAdd() that is used only when loading
@@ -205,7 +199,7 @@ int dbAddRDBLoad(redisDb *db, sds key, robj *val) {
dictEntry *de = dictAddRaw(db->dict, key, NULL);
if (de == NULL) return 0;
dictSetVal(db->dict, de, val);
- if (server.cluster_enabled) slotToKeyAddEntry(de);
+ if (server.cluster_enabled) slotToKeyAddEntry(de, db);
return 1;
}
@@ -321,7 +315,7 @@ static int dbGenericDelete(redisDb *db, robj *key, int async) {
freeObjAsync(key, val, db->id);
dictSetVal(db->dict, de, NULL);
}
- if (server.cluster_enabled) slotToKeyDelEntry(de);
+ if (server.cluster_enabled) slotToKeyDelEntry(de, db);
dictFreeUnlinkedEntry(db->dict,de);
return 1;
} else {
@@ -385,7 +379,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
}
/* Remove all keys from the database(s) structure. The dbarray argument
- * may not be the server main DBs (could be a backup).
+ * may not be the server main DBs (could be a temporary DB).
*
* The dbnum can be -1 if all the DBs should be emptied, or the specified
* DB index if we want to empty only a single database.
@@ -459,7 +453,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(dict*)) {
/* Flush slots to keys map if enable cluster, we can flush entire
* slots to keys map whatever dbnum because only support one DB
* in cluster mode. */
- if (server.cluster_enabled) slotToKeyFlush();
+ if (server.cluster_enabled) slotToKeyFlush(server.db);
if (dbnum == -1) flushSlaveKeysWithExpireList();
@@ -472,77 +466,40 @@ long long emptyDb(int dbnum, int flags, void(callback)(dict*)) {
return removed;
}
-/* Store a backup of the database for later use, and put an empty one
- * instead of it. */
-dbBackup *backupDb(void) {
- dbBackup *backup = zmalloc(sizeof(dbBackup));
-
- /* Backup main DBs. */
- backup->dbarray = zmalloc(sizeof(redisDb)*server.dbnum);
+/* Initialize temporary db on replica for use during diskless replication. */
+redisDb *initTempDb(void) {
+ redisDb *tempDb = zcalloc(sizeof(redisDb)*server.dbnum);
for (int i=0; i<server.dbnum; i++) {
- backup->dbarray[i] = server.db[i];
- server.db[i].dict = dictCreate(&dbDictType);
- server.db[i].expires = dictCreate(&dbExpiresDictType);
+ tempDb[i].dict = dictCreate(&dbDictType);
+ tempDb[i].expires = dictCreate(&dbExpiresDictType);
+ tempDb[i].slots_to_keys = NULL;
}
- /* Backup cluster slots to keys map if enable cluster. */
if (server.cluster_enabled) {
- slotToKeyCopyToBackup(&backup->slots_to_keys);
- slotToKeyFlush();
+ /* Prepare temp slot to key map to be written during async diskless replication. */
+ slotToKeyInit(tempDb);
}
- moduleFireServerEvent(REDISMODULE_EVENT_REPL_BACKUP,
- REDISMODULE_SUBEVENT_REPL_BACKUP_CREATE,
- NULL);
-
- return backup;
+ return tempDb;
}
-/* Discard a previously created backup, this can be slow (similar to FLUSHALL)
- * Arguments are similar to the ones of emptyDb, see EMPTYDB_ flags. */
-void discardDbBackup(dbBackup *backup, int flags, void(callback)(dict*)) {
- int async = (flags & EMPTYDB_ASYNC);
+/* Discard tempDb, this can be slow (similar to FLUSHALL), but it's always async. */
+void discardTempDb(redisDb *tempDb, void(callback)(dict*)) {
+ int async = 1;
- /* Release main DBs backup . */
- emptyDbStructure(backup->dbarray, -1, async, callback);
+ /* Release temp DBs. */
+ emptyDbStructure(tempDb, -1, async, callback);
for (int i=0; i<server.dbnum; i++) {
- dictRelease(backup->dbarray[i].dict);
- dictRelease(backup->dbarray[i].expires);
+ dictRelease(tempDb[i].dict);
+ dictRelease(tempDb[i].expires);
}
- /* Release backup. */
- zfree(backup->dbarray);
- zfree(backup);
-
- moduleFireServerEvent(REDISMODULE_EVENT_REPL_BACKUP,
- REDISMODULE_SUBEVENT_REPL_BACKUP_DISCARD,
- NULL);
-}
-
-/* Restore the previously created backup (discarding what currently resides
- * in the db).
- * This function should be called after the current contents of the database
- * was emptied with a previous call to emptyDb (possibly using the async mode). */
-void restoreDbBackup(dbBackup *backup) {
- /* Restore main DBs. */
- for (int i=0; i<server.dbnum; i++) {
- serverAssert(dictSize(server.db[i].dict) == 0);
- serverAssert(dictSize(server.db[i].expires) == 0);
- dictRelease(server.db[i].dict);
- dictRelease(server.db[i].expires);
- server.db[i] = backup->dbarray[i];
+ if (server.cluster_enabled) {
+ /* Release temp slot to key map. */
+ slotToKeyDestroy(tempDb);
}
- /* Restore slots to keys map backup if enable cluster. */
- if (server.cluster_enabled) slotToKeyRestoreBackup(&backup->slots_to_keys);
-
- /* Release backup. */
- zfree(backup->dbarray);
- zfree(backup);
-
- moduleFireServerEvent(REDISMODULE_EVENT_REPL_BACKUP,
- REDISMODULE_SUBEVENT_REPL_BACKUP_RESTORE,
- NULL);
+ zfree(tempDb);
}
int selectDb(client *c, int id) {
@@ -591,6 +548,10 @@ void signalFlushedDb(int dbid, int async) {
}
trackingInvalidateKeysOnFlush(async);
+
+ /* Changes in this method may take place in swapMainDbWithTempDb as well,
+ * where we execute similar calls, but with subtle differences as it's
+ * not simply flushing db. */
}
/*-----------------------------------------------------------------------------
@@ -1358,6 +1319,54 @@ int dbSwapDatabases(int id1, int id2) {
return C_OK;
}
+/* Logically, this discards (flushes) the old main database, and apply the newly loaded
+ * database (temp) as the main (active) database, the actual freeing of old database
+ * (which will now be placed in the temp one) is done later. */
+void swapMainDbWithTempDb(redisDb *tempDb) {
+ if (server.cluster_enabled) {
+ /* Swap slots_to_keys from tempdb just loaded with main db slots_to_keys. */
+ clusterSlotToKeyMapping *aux = server.db->slots_to_keys;
+ server.db->slots_to_keys = tempDb->slots_to_keys;
+ tempDb->slots_to_keys = aux;
+ }
+
+ for (int i=0; i<server.dbnum; i++) {
+ redisDb aux = server.db[i];
+ redisDb *activedb = &server.db[i], *newdb = &tempDb[i];
+
+ /* Swap hash tables. Note that we don't swap blocking_keys,
+ * ready_keys and watched_keys, since clients
+ * remain in the same DB they were. */
+ activedb->dict = newdb->dict;
+ activedb->expires = newdb->expires;
+ activedb->avg_ttl = newdb->avg_ttl;
+ activedb->expires_cursor = newdb->expires_cursor;
+
+ newdb->dict = aux.dict;
+ newdb->expires = aux.expires;
+ newdb->avg_ttl = aux.avg_ttl;
+ newdb->expires_cursor = aux.expires_cursor;
+
+ /* Now we need to handle clients blocked on lists: as an effect
+ * of swapping the two DBs, a client that was waiting for list
+ * X in a given DB, may now actually be unblocked if X happens
+ * to exist in the new version of the DB, after the swap.
+ *
+ * However normally we only do this check for efficiency reasons
+ * in dbAdd() when a list is created. So here we need to rescan
+ * the list of clients blocked on lists and signal lists as ready
+ * if needed.
+ *
+ * Also the swapdb should make transaction fail if there is any
+ * client watching keys. */
+ scanDatabaseForReadyLists(activedb);
+ touchAllWatchedKeysInDb(activedb, newdb);
+ }
+
+ trackingInvalidateKeysOnFlush(1);
+ flushSlaveKeysWithExpireList();
+}
+
/* SWAPDB db1 db2 */
void swapdbCommand(client *c) {
int id1, id2;
diff --git a/src/defrag.c b/src/defrag.c
index ad5e35bb5..734174c3a 100644
--- a/src/defrag.c
+++ b/src/defrag.c
@@ -903,7 +903,7 @@ void defragDictBucketCallback(dict *d, dictEntry **bucketref) {
*bucketref = newde;
if (server.cluster_enabled && d == server.db[0].dict) {
/* Cluster keyspace dict. Update slot-to-entries mapping. */
- slotToKeyReplaceEntry(newde);
+ slotToKeyReplaceEntry(newde, server.db);
}
}
bucketref = &(*bucketref)->next;
diff --git a/src/module.c b/src/module.c
index 7fe5a39a5..16f67671c 100644
--- a/src/module.c
+++ b/src/module.c
@@ -1344,6 +1344,10 @@ int RM_BlockedClientMeasureTimeEnd(RedisModuleBlockedClient *bc) {
*
* REDISMODULE_OPTION_NO_IMPLICIT_SIGNAL_MODIFIED:
* See RM_SignalModifiedKey().
+ *
+ * REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD:
+ * Setting this flag indicates module awareness of diskless async replication (repl-diskless-load=swapdb)
+ * and that redis could be serving reads during replication instead of blocking with LOADING status.
*/
void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) {
ctx->module->options = options;
@@ -2714,7 +2718,9 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) {
if (server.cluster_enabled)
flags |= REDISMODULE_CTX_FLAGS_CLUSTER;
- if (server.loading)
+ if (server.async_loading)
+ flags |= REDISMODULE_CTX_FLAGS_ASYNC_LOADING;
+ else if (server.loading)
flags |= REDISMODULE_CTX_FLAGS_LOADING;
/* Maxmemory and eviction policy */
@@ -5515,6 +5521,24 @@ int moduleAllDatatypesHandleErrors() {
return 1;
}
+/* Returns 0 if module did not declare REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD, in which case
+ * diskless async loading should be avoided because module doesn't know there can be traffic during
+ * database full resynchronization. */
+int moduleAllModulesHandleReplAsyncLoad() {
+ dictIterator *di = dictGetIterator(modules);
+ dictEntry *de;
+
+ while ((de = dictNext(di)) != NULL) {
+ struct RedisModule *module = dictGetVal(de);
+ if (!(module->options & REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD)) {
+ dictReleaseIterator(di);
+ return 0;
+ }
+ }
+ dictReleaseIterator(di);
+ return 1;
+}
+
/* Returns true if any previous IO API failed.
* for `Load*` APIs the REDISMODULE_OPTIONS_HANDLE_IO_ERRORS flag must be set with
* RedisModule_SetModuleOptions first. */
@@ -9165,8 +9189,12 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* int32_t dbnum_second; // Swap Db second dbnum
*
* * RedisModuleEvent_ReplBackup
+ *
+ * WARNING: Replication Backup events are deprecated since Redis 7.0 and are never fired.
+ * See RedisModuleEvent_ReplAsyncLoad for understanding how Async Replication Loading events
+ * are now triggered when repl-diskless-load is set to swapdb.
*
- * Called when diskless-repl-load config is set to swapdb,
+ * Called when repl-diskless-load config is set to swapdb,
* And redis needs to backup the the current database for the
* possibility to be restored later. A module with global data and
* maybe with aux_load and aux_save callbacks may need to use this
@@ -9176,6 +9204,19 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* * `REDISMODULE_SUBEVENT_REPL_BACKUP_CREATE`
* * `REDISMODULE_SUBEVENT_REPL_BACKUP_RESTORE`
* * `REDISMODULE_SUBEVENT_REPL_BACKUP_DISCARD`
+ *
+ * * RedisModuleEvent_ReplAsyncLoad
+ *
+ * Called when repl-diskless-load config is set to swapdb and a replication with a master of same
+ * data set history (matching replication ID) occurs.
+ * In which case redis serves current data set while loading new database in memory from socket.
+ * Modules must have declared they support this mechanism in order to activate it, through
+ * REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD flag.
+ * The following sub events are available:
+ *
+ * * `REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED`
+ * * `REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_ABORTED`
+ * * `REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED`
*
* * RedisModuleEvent_ForkChild
*
@@ -9255,8 +9296,8 @@ int RM_IsSubEventSupported(RedisModuleEvent event, int64_t subevent) {
return subevent < _REDISMODULE_SUBEVENT_LOADING_PROGRESS_NEXT;
case REDISMODULE_EVENT_SWAPDB:
return subevent < _REDISMODULE_SUBEVENT_SWAPDB_NEXT;
- case REDISMODULE_EVENT_REPL_BACKUP:
- return subevent < _REDISMODULE_SUBEVENT_REPL_BACKUP_NEXT;
+ case REDISMODULE_EVENT_REPL_ASYNC_LOAD:
+ return subevent < _REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_NEXT;
case REDISMODULE_EVENT_FORK_CHILD:
return subevent < _REDISMODULE_SUBEVENT_FORK_CHILD_NEXT;
default:
@@ -9794,6 +9835,8 @@ sds genModulesInfoStringRenderModuleOptions(struct RedisModule *module) {
sds output = sdsnew("[");
if (module->options & REDISMODULE_OPTIONS_HANDLE_IO_ERRORS)
output = sdscat(output,"handle-io-errors|");
+ if (module->options & REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD)
+ output = sdscat(output,"handle-repl-async-load|");
output = sdstrim(output,"|");
output = sdscat(output,"]");
return output;
diff --git a/src/multi.c b/src/multi.c
index e1a1df93f..1b708e859 100644
--- a/src/multi.c
+++ b/src/multi.c
@@ -244,7 +244,11 @@ void execCommand(client *c) {
"This command is no longer allowed for the "
"following reason: %s", reason);
} else {
- call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);
+ if (c->id == CLIENT_ID_AOF)
+ call(c,CMD_CALL_NONE);
+ else
+ call(c,CMD_CALL_FULL);
+
serverAssert((c->flags & CLIENT_BLOCKED) == 0);
}
@@ -398,7 +402,7 @@ void touchWatchedKey(redisDb *db, robj *key) {
/* Set CLIENT_DIRTY_CAS to all clients of DB when DB is dirty.
* It may happen in the following situations:
- * FLUSHDB, FLUSHALL, SWAPDB
+ * FLUSHDB, FLUSHALL, SWAPDB, end of successful diskless replication.
*
* replaced_with: for SWAPDB, the WATCH should be invalidated if
* the key exists in either of them, and skipped only if it
diff --git a/src/rdb.c b/src/rdb.c
index a56a3133c..9ccf3d0f5 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -48,6 +48,10 @@
/* This macro is called when RDB read failed (possibly a short read) */
#define rdbReportReadError(...) rdbReportError(0, __LINE__,__VA_ARGS__)
+/* This macro tells if we are in the context of a RESTORE command, and not loading an RDB or AOF. */
+#define isRestoreContext() \
+ (server.current_client == NULL || server.current_client->id == CLIENT_ID_AOF) ? 0 : 1
+
char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */
extern int rdbCheckMode;
void rdbCheckError(const char *fmt, ...);
@@ -68,7 +72,7 @@ void rdbReportError(int corruption_error, int linenum, char *reason, ...) {
vsnprintf(msg+len,sizeof(msg)-len,reason,ap);
va_end(ap);
- if (!server.loading) {
+ if (isRestoreContext()) {
/* If we're in the context of a RESTORE command, just propagate the error. */
/* log in VERBOSE, and return (don't exit). */
serverLog(LL_VERBOSE, "%s", msg);
@@ -381,7 +385,7 @@ void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) {
if ((clen = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
if ((c = ztrymalloc(clen)) == NULL) {
- serverLog(server.loading? LL_WARNING: LL_VERBOSE, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)clen);
+ serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)clen);
goto err;
}
@@ -392,7 +396,7 @@ void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) {
val = sdstrynewlen(SDS_NOINIT,len);
}
if (!val) {
- serverLog(server.loading? LL_WARNING: LL_VERBOSE, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)len);
+ serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)len);
goto err;
}
@@ -525,7 +529,7 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) {
if (plain || sds) {
void *buf = plain ? ztrymalloc(len) : sdstrynewlen(SDS_NOINIT,len);
if (!buf) {
- serverLog(server.loading? LL_WARNING: LL_VERBOSE, "rdbGenericLoadStringObject failed allocating %llu bytes", len);
+ serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len);
return NULL;
}
if (lenptr) *lenptr = len;
@@ -541,7 +545,7 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) {
robj *o = encode ? tryCreateStringObject(SDS_NOINIT,len) :
tryCreateRawStringObject(SDS_NOINIT,len);
if (!o) {
- serverLog(server.loading? LL_WARNING: LL_VERBOSE, "rdbGenericLoadStringObject failed allocating %llu bytes", len);
+ serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len);
return NULL;
}
if (len && rioRead(rdb,o->ptr,len) == 0) {
@@ -2517,9 +2521,10 @@ emptykey:
/* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats. */
-void startLoading(size_t size, int rdbflags) {
+void startLoading(size_t size, int rdbflags, int async) {
/* Load the DB */
server.loading = 1;
+ if (async == 1) server.async_loading = 1;
server.loading_start_time = time(NULL);
server.loading_loaded_bytes = 0;
server.loading_total_bytes = size;
@@ -2547,7 +2552,7 @@ void startLoadingFile(FILE *fp, char* filename, int rdbflags) {
if (fstat(fileno(fp), &sb) == -1)
sb.st_size = 0;
rdbFileBeingLoaded = filename;
- startLoading(sb.st_size, rdbflags);
+ startLoading(sb.st_size, rdbflags, 0);
}
/* Refresh the loading progress info */
@@ -2560,6 +2565,7 @@ void loadingProgress(off_t pos) {
/* Loading finished */
void stopLoading(int success) {
server.loading = 0;
+ server.async_loading = 0;
blockingOperationEnds();
rdbFileBeingLoaded = NULL;
@@ -2610,10 +2616,10 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
-int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
+int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi, redisDb *dbarray) {
uint64_t dbid = 0;
int type, rdbver;
- redisDb *db = server.db+0;
+ redisDb *db = dbarray+0;
char buf[1024];
int error;
long long empty_keys_skipped = 0;
@@ -2685,7 +2691,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
"databases. Exiting\n", server.dbnum);
exit(1);
}
- db = server.db+dbid;
+ db = dbarray+dbid;
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_RESIZEDB) {
/* RESIZEDB: Hint about the size of the keys in the currently
@@ -2962,7 +2968,7 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
startLoadingFile(fp, filename,rdbflags);
rioInitWithFile(&rdb,fp);
- retval = rdbLoadRio(&rdb,rdbflags,rsi);
+ retval = rdbLoadRio(&rdb,rdbflags,rsi,server.db);
fclose(fp);
stopLoading(retval==C_OK);
return retval;
diff --git a/src/rdb.h b/src/rdb.h
index 1188f1032..f150bcb0d 100644
--- a/src/rdb.h
+++ b/src/rdb.h
@@ -166,7 +166,7 @@ int rdbSaveBinaryDoubleValue(rio *rdb, double val);
int rdbLoadBinaryDoubleValue(rio *rdb, double *val);
int rdbSaveBinaryFloatValue(rio *rdb, float val);
int rdbLoadBinaryFloatValue(rio *rdb, float *val);
-int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi);
+int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi, redisDb *db);
int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);
diff --git a/src/redismodule.h b/src/redismodule.h
index 1bc86750e..317d51356 100644
--- a/src/redismodule.h
+++ b/src/redismodule.h
@@ -150,11 +150,13 @@ typedef struct RedisModuleStreamID {
#define REDISMODULE_CTX_FLAGS_DENY_BLOCKING (1<<21)
/* The current client uses RESP3 protocol */
#define REDISMODULE_CTX_FLAGS_RESP3 (1<<22)
+/* Redis is currently async loading database for diskless replication. */
+#define REDISMODULE_CTX_FLAGS_ASYNC_LOADING (1<<23)
/* Next context flag, must be updated when adding new flags above!
This flag should not be used directly by the module.
* Use RedisModule_GetContextFlagsAll instead. */
-#define _REDISMODULE_CTX_FLAGS_NEXT (1<<23)
+#define _REDISMODULE_CTX_FLAGS_NEXT (1<<24)
/* Keyspace changes notification classes. Every class is associated with a
* character for configuration purposes.
@@ -229,6 +231,10 @@ typedef uint64_t RedisModuleTimerID;
/* Declare that the module can handle errors with RedisModule_SetModuleOptions. */
#define REDISMODULE_OPTIONS_HANDLE_IO_ERRORS (1<<0)
+
+/* Declare that the module can handle diskless async replication with RedisModule_SetModuleOptions. */
+#define REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD (1<<1)
+
/* When set, Redis will not call RedisModule_SignalModifiedKey(), implicitly in
* RedisModule_CloseKey, and the module needs to do that when manually when keys
* are modified from the user's sperspective, to invalidate WATCH. */
@@ -249,9 +255,10 @@ typedef uint64_t RedisModuleTimerID;
#define REDISMODULE_EVENT_MODULE_CHANGE 9
#define REDISMODULE_EVENT_LOADING_PROGRESS 10
#define REDISMODULE_EVENT_SWAPDB 11
-#define REDISMODULE_EVENT_REPL_BACKUP 12
+#define REDISMODULE_EVENT_REPL_BACKUP 12 /* Deprecated since Redis 7.0, not used anymore. */
#define REDISMODULE_EVENT_FORK_CHILD 13
-#define _REDISMODULE_EVENT_NEXT 14 /* Next event flag, should be updated if a new event added. */
+#define REDISMODULE_EVENT_REPL_ASYNC_LOAD 14
+#define _REDISMODULE_EVENT_NEXT 15 /* Next event flag, should be updated if a new event added. */
typedef struct RedisModuleEvent {
uint64_t id; /* REDISMODULE_EVENT_... defines. */
@@ -311,8 +318,14 @@ static const RedisModuleEvent
REDISMODULE_EVENT_SWAPDB,
1
},
+ /* Deprecated since Redis 7.0, not used anymore. */
+ __attribute__ ((deprecated))
RedisModuleEvent_ReplBackup = {
- REDISMODULE_EVENT_REPL_BACKUP,
+ REDISMODULE_EVENT_REPL_BACKUP,
+ 1
+ },
+ RedisModuleEvent_ReplAsyncLoad = {
+ REDISMODULE_EVENT_REPL_ASYNC_LOAD,
1
},
RedisModuleEvent_ForkChild = {
@@ -363,11 +376,17 @@ static const RedisModuleEvent
#define REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF 1
#define _REDISMODULE_SUBEVENT_LOADING_PROGRESS_NEXT 2
+/* Replication Backup events are deprecated since Redis 7.0 and are never fired. */
#define REDISMODULE_SUBEVENT_REPL_BACKUP_CREATE 0
#define REDISMODULE_SUBEVENT_REPL_BACKUP_RESTORE 1
#define REDISMODULE_SUBEVENT_REPL_BACKUP_DISCARD 2
#define _REDISMODULE_SUBEVENT_REPL_BACKUP_NEXT 3
+#define REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED 0
+#define REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_ABORTED 1
+#define REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED 2
+#define _REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_NEXT 3
+
#define REDISMODULE_SUBEVENT_FORK_CHILD_BORN 0
#define REDISMODULE_SUBEVENT_FORK_CHILD_DIED 1
#define _REDISMODULE_SUBEVENT_FORK_CHILD_NEXT 2
diff --git a/src/replication.c b/src/replication.c
index babf0a022..6c79d72d8 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -1623,7 +1623,8 @@ void replicationSendNewlineToMaster(void) {
}
/* Callback used by emptyDb() while flushing away old data to load
- * the new dataset received by the master. */
+ * the new dataset received by the master and by discardTempDb()
+ * after loading succeeded or failed. */
void replicationEmptyDbCallback(dict *d) {
UNUSED(d);
if (server.repl_state == REPL_STATE_TRANSFER)
@@ -1689,36 +1690,49 @@ static int useDisklessLoad() {
/* compute boolean decision to use diskless load */
int enabled = server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB ||
(server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0);
- /* Check all modules handle read errors, otherwise it's not safe to use diskless load. */
- if (enabled && !moduleAllDatatypesHandleErrors()) {
- serverLog(LL_WARNING,
- "Skipping diskless-load because there are modules that don't handle read errors.");
- enabled = 0;
+
+ if (enabled) {
+ /* Check all modules handle read errors, otherwise it's not safe to use diskless load. */
+ if (!moduleAllDatatypesHandleErrors()) {
+ serverLog(LL_WARNING,
+ "Skipping diskless-load because there are modules that don't handle read errors.");
+ enabled = 0;
+ }
+ /* Check all modules handle async replication, otherwise it's not safe to use diskless load. */
+ else if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB && !moduleAllModulesHandleReplAsyncLoad()) {
+ serverLog(LL_WARNING,
+ "Skipping diskless-load because there are modules that are not aware of async replication.");
+ enabled = 0;
+ }
}
return enabled;
}
-/* Helper function for readSyncBulkPayload() to make backups of the current
- * databases before socket-loading the new ones. The backups may be restored
- * by disklessLoadRestoreBackup or freed by disklessLoadDiscardBackup later. */
-dbBackup *disklessLoadMakeBackup(void) {
- return backupDb();
+/* Helper function for readSyncBulkPayload() to initialize tempDb
+ * before socket-loading the new db from master. The tempDb may be populated
+ * by swapMainDbWithTempDb or freed by disklessLoadDiscardTempDb later. */
+redisDb *disklessLoadInitTempDb(void) {
+ return initTempDb();
}
-/* Helper function for readSyncBulkPayload(): when replica-side diskless
- * database loading is used, Redis makes a backup of the existing databases
- * before loading the new ones from the socket.
- *
- * If the socket loading went wrong, we want to restore the old backups
- * into the server databases. */
-void disklessLoadRestoreBackup(dbBackup *backup) {
- restoreDbBackup(backup);
+/* Helper function for readSyncBulkPayload() to discard our tempDb
+ * when the loading succeeded or failed. */
+void disklessLoadDiscardTempDb(redisDb *tempDb) {
+ discardTempDb(tempDb, replicationEmptyDbCallback);
}
-/* Helper function for readSyncBulkPayload() to discard our old backups
- * when the loading succeeded. */
-void disklessLoadDiscardBackup(dbBackup *backup, int flag) {
- discardDbBackup(backup, flag, replicationEmptyDbCallback);
+/* If we know we got an entirely different data set from our master
+ * we have no way to incrementally feed our replicas after that.
+ * We want our replicas to resync with us as well, if we have any sub-replicas.
+ * This is useful on readSyncBulkPayload in places where we just finished transferring db. */
+void replicationAttachToNewMaster() {
+ /* Replica starts to apply data from new master, we must discard the cached
+ * master structure. */
+ serverAssert(server.master == NULL);
+ replicationDiscardCachedMaster();
+
+ disconnectSlaves(); /* Force our replicas to resync with us as well. */
+ freeReplicationBacklog(); /* Don't allow our chained replicas to PSYNC. */
}
/* Asynchronously read the SYNC payload we receive from a master */
@@ -1727,7 +1741,7 @@ void readSyncBulkPayload(connection *conn) {
char buf[PROTO_IOBUF_LEN];
ssize_t nread, readlen, nwritten;
int use_diskless_load = useDisklessLoad();
- dbBackup *diskless_load_backup = NULL;
+ redisDb *diskless_load_tempDb = NULL;
int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC :
EMPTYDB_NO_FLAGS;
off_t left;
@@ -1895,58 +1909,61 @@ void readSyncBulkPayload(connection *conn) {
*
* 2. Or when we are done reading from the socket to the RDB file, in
* such case we want just to read the RDB file in memory. */
- serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
/* We need to stop any AOF rewriting child before flushing and parsing
* the RDB, otherwise we'll create a copy-on-write disaster. */
if (server.aof_state != AOF_OFF) stopAppendOnly();
- /* When diskless RDB loading is used by replicas, it may be configured
- * in order to save the current DB instead of throwing it away,
- * so that we can restore it in case of failed transfer. */
- if (use_diskless_load &&
- server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB)
- {
- /* Create a backup of server.db[] and initialize to empty
- * dictionaries. */
- diskless_load_backup = disklessLoadMakeBackup();
- }
-
- /* Replica starts to apply data from new master, we must discard the cached
- * master structure. */
- serverAssert(server.master == NULL);
- replicationDiscardCachedMaster();
+ if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
+ /* Initialize empty tempDb dictionaries. */
+ diskless_load_tempDb = disklessLoadInitTempDb();
- /* We want our slaves to resync with us as well, if we have any sub-slaves.
- * The master already transferred us an entirely different data set and we
- * have no way to incrementally feed our slaves after that. */
- disconnectSlaves(); /* Force our slaves to resync with us as well. */
- freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
+ moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD,
+ REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED,
+ NULL);
+ } else {
+ replicationAttachToNewMaster();
- /* We call to emptyDb even in case of REPL_DISKLESS_LOAD_SWAPDB
- * (Where disklessLoadMakeBackup left server.db empty) because we
- * want to execute all the auxiliary logic of emptyDb (Namely,
- * fire module events) */
- emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
+ serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
+ emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
+ }
/* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since
* rdbLoad() will call the event loop to process events from time to
* time for non blocking loading. */
connSetReadHandler(conn, NULL);
+
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (use_diskless_load) {
rio rdb;
+ redisDb *dbarray;
+ int asyncLoading = 0;
+
+ if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
+ /* Async loading means we continue serving read commands during full resync, and
+ * "swap" the new db with the old db only when loading is done.
+ * It is enabled only on SWAPDB diskless replication when master replication ID hasn't changed,
+ * because in that state the old content of the db represents a different point in time of the same
+ * data set we're currently receiving from the master. */
+ if (memcmp(server.replid, server.master_replid, CONFIG_RUN_ID_SIZE) == 0) {
+ asyncLoading = 1;
+ }
+ dbarray = diskless_load_tempDb;
+ } else {
+ dbarray = server.db;
+ }
+
rioInitWithConn(&rdb,conn,server.repl_transfer_size);
/* Put the socket in blocking mode to simplify RDB transfer.
* We'll restore it when the RDB is received. */
connBlock(conn);
connRecvTimeout(conn, server.repl_timeout*1000);
- startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION);
+ startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading);
- if (rdbLoadRio(&rdb,RDBFLAGS_REPLICATION,&rsi) != C_OK) {
+ if (rdbLoadRio(&rdb,RDBFLAGS_REPLICATION,&rsi,dbarray) != C_OK) {
/* RDB loading failed. */
serverLog(LL_WARNING,
"Failed trying to load the MASTER synchronization DB "
@@ -1955,13 +1972,17 @@ void readSyncBulkPayload(connection *conn) {
cancelReplicationHandshake(1);
rioFreeConn(&rdb, NULL);
- /* Remove the half-loaded data in case we started with
- * an empty replica. */
- emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
-
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
- /* Restore the backed up databases. */
- disklessLoadRestoreBackup(diskless_load_backup);
+ /* Discard potentially partially loaded tempDb. */
+ moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD,
+ REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_ABORTED,
+ NULL);
+
+ disklessLoadDiscardTempDb(diskless_load_tempDb);
+ serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Discarding temporary DB in background");
+ } else {
+ /* Remove the half-loaded data in case we started with an empty replica. */
+ emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
}
/* Note that there's no point in restarting the AOF on SYNC
@@ -1972,12 +1993,26 @@ void readSyncBulkPayload(connection *conn) {
/* RDB loading succeeded if we reach this point. */
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
- /* Delete the backup databases we created before starting to load
- * the new RDB. Now the RDB was loaded with success so the old
- * data is useless. */
- disklessLoadDiscardBackup(diskless_load_backup, empty_db_flags);
+ /* We will soon swap main db with tempDb and replicas will start
+ * to apply data from new master, we must discard the cached
+ * master structure and force resync of sub-replicas. */
+ replicationAttachToNewMaster();
+
+ serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Swapping active DB with loaded DB");
+ swapMainDbWithTempDb(diskless_load_tempDb);
+
+ moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD,
+ REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED,
+ NULL);
+
+ /* Delete the old db as it's useless now. */
+ disklessLoadDiscardTempDb(diskless_load_tempDb);
+ serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Discarding old DB in background");
}
+ /* Inform about db change, as replication was diskless and didn't cause a save. */
+ server.dirty++;
+
/* Verify the end mark is correct. */
if (usemark) {
if (!rioRead(&rdb,buf,CONFIG_RUN_ID_SIZE) ||
diff --git a/src/scripting.c b/src/scripting.c
index 85504bdb0..7880d1b0e 100644
--- a/src/scripting.c
+++ b/src/scripting.c
@@ -882,7 +882,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
"Write commands not allowed after non deterministic commands. Call redis.replicate_commands() at the start of your script in order to switch to single commands replication mode.");
goto cleanup;
} else if (server.masterhost && server.repl_slave_ro &&
- !server.loading &&
+ server.lua_caller->id != CLIENT_ID_AOF &&
!(server.lua_caller->flags & CLIENT_MASTER))
{
luaPushError(lua, shared.roslaveerr->ptr);
@@ -905,11 +905,11 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
* could enlarge the memory usage are not allowed, but only if this is the
* first write in the context of this script, otherwise we can't stop
* in the middle. */
- if (server.maxmemory && /* Maxmemory is actually enabled. */
- !server.loading && /* Don't care about mem if loading. */
- !server.masterhost && /* Slave must execute the script. */
- server.lua_write_dirty == 0 && /* Script had no side effects so far. */
- server.lua_oom && /* Detected OOM when script start. */
+ if (server.maxmemory && /* Maxmemory is actually enabled. */
+ server.lua_caller->id != CLIENT_ID_AOF && /* Don't care about mem if loading from AOF. */
+ !server.masterhost && /* Slave must execute the script. */
+ server.lua_write_dirty == 0 && /* Script had no side effects so far. */
+ server.lua_oom && /* Detected OOM when script start. */
(cmd->flags & CMD_DENYOOM))
{
luaPushError(lua, shared.oomerr->ptr);
@@ -922,7 +922,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
/* If this is a Redis Cluster node, we need to make sure Lua is not
* trying to access non-local keys, with the exception of commands
* received from our master or when loading the AOF back in memory. */
- if (server.cluster_enabled && !server.loading &&
+ if (server.cluster_enabled && server.lua_caller->id != CLIENT_ID_AOF &&
!(server.lua_caller->flags & CLIENT_MASTER))
{
int error_code;
diff --git a/src/server.c b/src/server.c
index 9740475ca..0773113ff 100644
--- a/src/server.c
+++ b/src/server.c
@@ -3680,6 +3680,7 @@ void initServerConfig(void) {
server.skip_checksum_validation = 0;
server.saveparams = NULL;
server.loading = 0;
+ server.async_loading = 0;
server.loading_rdb_used_mem = 0;
server.aof_state = AOF_OFF;
server.aof_rewrite_base_size = 0;
@@ -4252,6 +4253,7 @@ void initServer(void) {
server.db[j].id = j;
server.db[j].avg_ttl = 0;
server.db[j].defrag_later = listCreate();
+ server.db[j].slots_to_keys = NULL; /* Set by clusterInit later on if necessary. */
listSetFreeMethod(server.db[j].defrag_later,(void (*)(void*))sdsfree);
}
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
@@ -5432,7 +5434,7 @@ int processCommand(client *c) {
/* Loading DB? Return an error if the command has not the
* CMD_LOADING flag. */
- if (server.loading && is_denyloading_command) {
+ if (server.loading && !server.async_loading && is_denyloading_command) {
rejectCommand(c, shared.loadingerr);
return C_OK;
}
@@ -6420,6 +6422,7 @@ sds genRedisInfoString(const char *section) {
info = sdscatprintf(info,
"# Persistence\r\n"
"loading:%d\r\n"
+ "async_loading:%d\r\n"
"current_cow_peak:%zu\r\n"
"current_cow_size:%zu\r\n"
"current_cow_size_age:%lu\r\n"
@@ -6445,7 +6448,8 @@ sds genRedisInfoString(const char *section) {
"aof_last_cow_size:%zu\r\n"
"module_fork_in_progress:%d\r\n"
"module_fork_last_cow_size:%zu\r\n",
- (int)server.loading,
+ (int)(server.loading && !server.async_loading),
+ (int)server.async_loading,
server.stat_current_cow_peak,
server.stat_current_cow_bytes,
server.stat_current_cow_updated ? (unsigned long) elapsedMs(server.stat_current_cow_updated) / 1000 : 0,
diff --git a/src/server.h b/src/server.h
index f5b00e8b9..5eaa1521b 100644
--- a/src/server.h
+++ b/src/server.h
@@ -803,6 +803,9 @@ typedef struct replBufBlock {
char buf[];
} replBufBlock;
+/* Opaque type for the Slot to Key API. */
+typedef struct clusterSlotToKeyMapping clusterSlotToKeyMapping;
+
/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
@@ -816,13 +819,9 @@ typedef struct redisDb {
long long avg_ttl; /* Average TTL, just for stats */
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
+ clusterSlotToKeyMapping *slots_to_keys; /* Array of slots to keys. Only used in cluster mode (db 0). */
} redisDb;
-/* Declare database backup that include redis main DBs and slots to keys map.
- * Definition is in db.c. We can't define it here since we define CLUSTER_SLOTS
- * in cluster.h. */
-typedef struct dbBackup dbBackup;
-
/* Client MULTI/EXEC state */
typedef struct multiCmd {
robj **argv;
@@ -1394,6 +1393,7 @@ struct redisServer {
/* RDB / AOF loading information */
volatile sig_atomic_t loading; /* We are loading data from disk if true */
+ volatile sig_atomic_t async_loading; /* We are loading data without blocking the db being served */
off_t loading_total_bytes;
off_t loading_rdb_used_mem;
off_t loading_loaded_bytes;
@@ -2035,6 +2035,7 @@ void ModuleForkDoneHandler(int exitcode, int bysignal);
int TerminateModuleForkChild(int child_pid, int wait);
ssize_t rdbSaveModulesAux(rio *rdb, int when);
int moduleAllDatatypesHandleErrors();
+int moduleAllModulesHandleReplAsyncLoad();
sds modulesCollectInfo(sds info, const char *section, int for_crash_report, int sections);
void moduleFireServerEvent(uint64_t eid, int subid, void *data);
void processModuleLoadingProgressEvent(int is_aof);
@@ -2338,7 +2339,7 @@ const char *getFailoverStateString();
/* Generic persistence functions */
void startLoadingFile(FILE* fp, char* filename, int rdbflags);
-void startLoading(size_t size, int rdbflags);
+void startLoading(size_t size, int rdbflags, int async);
void loadingProgress(off_t pos);
void stopLoading(int success);
void startSaving(int rdbflags);
@@ -2663,9 +2664,8 @@ long long emptyDb(int dbnum, int flags, void(callback)(dict*));
long long emptyDbStructure(redisDb *dbarray, int dbnum, int async, void(callback)(dict*));
void flushAllDataAndResetRDB(int flags);
long long dbTotalServerKeyCount();
-dbBackup *backupDb(void);
-void restoreDbBackup(dbBackup *backup);
-void discardDbBackup(dbBackup *backup, int flags, void(callback)(dict*));
+redisDb *initTempDb(void);
+void discardTempDb(redisDb *tempDb, void(callback)(dict*));
int selectDb(client *c, int id);
@@ -3054,6 +3054,7 @@ void debugDelay(int usec);
void killIOThreads(void);
void killThreads(void);
void makeThreadKillable(void);
+void swapMainDbWithTempDb(redisDb *tempDb);
/* Use macro for checking log level to avoid evaluating arguments in cases log
* should be ignored due to low level. */
diff --git a/tests/cluster/tests/17-diskless-load-swapdb.tcl b/tests/cluster/tests/17-diskless-load-swapdb.tcl
index b9bf01ecd..7a56ec783 100644
--- a/tests/cluster/tests/17-diskless-load-swapdb.tcl
+++ b/tests/cluster/tests/17-diskless-load-swapdb.tcl
@@ -1,4 +1,4 @@
-# Check replica can restore database backup correctly if fail to diskless load.
+# Check that replica keys and keys to slots map are right after failing to diskless load using SWAPDB.
source "../tests/includes/init-tests.tcl"
@@ -14,7 +14,7 @@ test "Cluster is writable" {
cluster_write_test 0
}
-test "Right to restore backups when fail to diskless load " {
+test "Main db not affected when fail to diskless load" {
set master [Rn 0]
set replica [Rn 1]
set master_id 0
@@ -63,9 +63,9 @@ test "Right to restore backups when fail to diskless load " {
restart_instance redis $replica_id
$replica READONLY
- # Start full sync, wait till after db is flushed (backed up)
+ # Start full sync, wait till after db started loading in background
wait_for_condition 500 10 {
- [s $replica_id loading] eq 1
+ [s $replica_id async_loading] eq 1
} else {
fail "Fail to full sync"
}
@@ -75,7 +75,7 @@ test "Right to restore backups when fail to diskless load " {
# Start full sync, wait till the replica detects the disconnection
wait_for_condition 500 10 {
- [s $replica_id loading] eq 0
+ [s $replica_id async_loading] eq 0
} else {
fail "Fail to full sync"
}
diff --git a/tests/cluster/tests/includes/init-tests.tcl b/tests/cluster/tests/includes/init-tests.tcl
index caedc13e2..b787962a8 100644
--- a/tests/cluster/tests/includes/init-tests.tcl
+++ b/tests/cluster/tests/includes/init-tests.tcl
@@ -41,6 +41,7 @@ test "Cluster nodes hard reset" {
R $id config set cluster-slave-validity-factor 10
R $id config set loading-process-events-interval-bytes 2097152
R $id config set key-load-delay 0
+ R $id config set repl-diskless-load disabled
R $id config rewrite
}
}
diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl
index 13206d634..06f079d34 100644
--- a/tests/integration/replication.tcl
+++ b/tests/integration/replication.tcl
@@ -383,72 +383,226 @@ start_server {tags {"repl external:skip"}} {
}
}
-test {slave fails full sync and diskless load swapdb recovers it} {
- start_server {tags {"repl"}} {
- set slave [srv 0 client]
- set slave_host [srv 0 host]
- set slave_port [srv 0 port]
- set slave_log [srv 0 stdout]
+# Diskless load swapdb when NOT async_loading (different master replid)
+foreach testType {Successful Aborted} {
+ start_server {tags {"repl external:skip"}} {
+ set replica [srv 0 client]
+ set replica_host [srv 0 host]
+ set replica_port [srv 0 port]
+ set replica_log [srv 0 stdout]
start_server {} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
- # Put different data sets on the master and slave
- # we need to put large keys on the master since the slave replies to info only once in 2mb
- $slave debug populate 2000 slave 10
- $master debug populate 800 master 100000
+ # Set master and replica to use diskless replication on swapdb mode
+ $master config set repl-diskless-sync yes
+ $master config set repl-diskless-sync-delay 0
+ $master config set save ""
+ $replica config set repl-diskless-load swapdb
+ $replica config set save ""
+
+ # Put different data sets on the master and replica
+ # We need to put large keys on the master since the replica replies to info only once in 2mb
+ $replica debug populate 200 slave 10
+ $master debug populate 1000 master 100000
$master config set rdbcompression no
- # Set master and slave to use diskless replication
+ # Set a key value on replica to check status on failure and after swapping db
+ $replica set mykey myvalue
+
+ switch $testType {
+ "Aborted" {
+ # Set master with a slow rdb generation, so that we can easily intercept loading
+ # 10ms per key, with 1000 keys is 10 seconds
+ $master config set rdb-key-save-delay 10000
+
+ # Start the replication process
+ $replica replicaof $master_host $master_port
+
+ test {Diskless load swapdb (different replid): replica enter loading} {
+ # Wait for the replica to start reading the rdb
+ wait_for_condition 100 100 {
+ [s -1 loading] eq 1
+ } else {
+ fail "Replica didn't get into loading mode"
+ }
+
+ assert_equal [s -1 async_loading] 0
+ }
+
+ # Make sure that next sync will not start immediately so that we can catch the replica in between syncs
+ $master config set repl-diskless-sync-delay 5
+
+ # Kill the replica connection on the master
+ set killed [$master client kill type replica]
+
+ # Wait for loading to stop (fail)
+ wait_for_condition 100 100 {
+ [s -1 loading] eq 0
+ } else {
+ fail "Replica didn't disconnect"
+ }
+
+ test {Diskless load swapdb (different replid): old database is exposed after replication fails} {
+ # Ensure we see old values from replica
+ assert_equal [$replica get mykey] "myvalue"
+
+ # Make sure amount of replica keys didn't change
+ assert_equal [$replica dbsize] 201
+ }
+
+ # Speed up shutdown
+ $master config set rdb-key-save-delay 0
+ }
+ "Successful" {
+ # Start the replication process
+ $replica replicaof $master_host $master_port
+
+ # Let replica finish sync with master
+ wait_for_condition 100 100 {
+ [s -1 master_link_status] eq "up"
+ } else {
+ fail "Master <-> Replica didn't finish sync"
+ }
+
+ test {Diskless load swapdb (different replid): new database is exposed after swapping} {
+ # Ensure we don't see anymore the key that was stored only to replica and also that we don't get LOADING status
+ assert_equal [$replica GET mykey] ""
+
+ # Make sure amount of keys matches master
+ assert_equal [$replica dbsize] 1000
+ }
+ }
+ }
+ }
+ }
+}
+
+# Diskless load swapdb when async_loading (matching master replid)
+foreach testType {Successful Aborted} {
+ start_server {tags {"repl external:skip"}} {
+ set replica [srv 0 client]
+ set replica_host [srv 0 host]
+ set replica_port [srv 0 port]
+ set replica_log [srv 0 stdout]
+ start_server {} {
+ set master [srv 0 client]
+ set master_host [srv 0 host]
+ set master_port [srv 0 port]
+
+ # Set master and replica to use diskless replication on swapdb mode
$master config set repl-diskless-sync yes
$master config set repl-diskless-sync-delay 0
- $slave config set repl-diskless-load swapdb
+ $master config set save ""
+ $replica config set repl-diskless-load swapdb
+ $replica config set save ""
- # Set master with a slow rdb generation, so that we can easily disconnect it mid sync
- # 10ms per key, with 800 keys is 8 seconds
- $master config set rdb-key-save-delay 10000
+ # Set replica writable so we can check that a key we manually added is served
+ # during replication and after failure, but disappears on success
+ $replica config set replica-read-only no
- # Start the replication process...
- $slave slaveof $master_host $master_port
+ # Initial sync to have matching replids between master and replica
+ $replica replicaof $master_host $master_port
- # wait for the slave to start reading the rdb
+ # Let replica finish initial sync with master
wait_for_condition 100 100 {
- [s -1 loading] eq 1
+ [s -1 master_link_status] eq "up"
} else {
- fail "Replica didn't get into loading mode"
+ fail "Master <-> Replica didn't finish sync"
}
- # make sure that next sync will not start immediately so that we can catch the slave in between syncs
- $master config set repl-diskless-sync-delay 5
- # for faster server shutdown, make rdb saving fast again (the fork is already uses the slow one)
- $master config set rdb-key-save-delay 0
+ # Put different data sets on the master and replica
+ # We need to put large keys on the master since the replica replies to info only once in 2mb
+ $replica debug populate 2000 slave 10
+ $master debug populate 1000 master 100000
+ $master config set rdbcompression no
- # waiting slave to do flushdb (key count drop)
- wait_for_condition 50 100 {
- 2000 != [scan [regexp -inline {keys\=([\d]*)} [$slave info keyspace]] keys=%d]
- } else {
- fail "Replica didn't flush"
+ # Set a key value on replica to check status during loading, on failure and after swapping db
+ $replica set mykey myvalue
+
+ # Force the replica to try another full sync (this time it will have matching master replid)
+ $master multi
+ $master client kill type replica
+ # Fill replication backlog with new content
+ $master config set repl-backlog-size 16384
+ for {set keyid 0} {$keyid < 10} {incr keyid} {
+ $master set "$keyid string_$keyid" [string repeat A 16384]
}
+ $master exec
+
+ switch $testType {
+ "Aborted" {
+ # Set master with a slow rdb generation, so that we can easily intercept loading
+ # 10ms per key, with 1000 keys is 10 seconds
+ $master config set rdb-key-save-delay 10000
+
+ test {Diskless load swapdb (async_loading): replica enter async_loading} {
+ # Wait for the replica to start reading the rdb
+ wait_for_condition 100 100 {
+ [s -1 async_loading] eq 1
+ } else {
+ fail "Replica didn't get into async_loading mode"
+ }
+
+ assert_equal [s -1 loading] 0
+ }
+
+ test {Diskless load swapdb (async_loading): old database is exposed while async replication is in progress} {
+ # Ensure we still see old values while async_loading is in progress and also not LOADING status
+ assert_equal [$replica get mykey] "myvalue"
- # make sure we're still loading
- assert_equal [s -1 loading] 1
+ # Make sure we're still async_loading to validate previous assertion
+ assert_equal [s -1 async_loading] 1
- # kill the slave connection on the master
- set killed [$master client kill type slave]
+ # Make sure amount of replica keys didn't change
+ assert_equal [$replica dbsize] 2001
+ }
- # wait for loading to stop (fail)
- wait_for_condition 50 100 {
- [s -1 loading] eq 0
- } else {
- fail "Replica didn't disconnect"
- }
+ # Make sure that next sync will not start immediately so that we can catch the replica in between syncs
+ $master config set repl-diskless-sync-delay 5
+
+ # Kill the replica connection on the master
+ set killed [$master client kill type replica]
+
+ # Wait for loading to stop (fail)
+ wait_for_condition 100 100 {
+ [s -1 async_loading] eq 0
+ } else {
+ fail "Replica didn't disconnect"
+ }
+
+ test {Diskless load swapdb (async_loading): old database is exposed after async replication fails} {
+ # Ensure we see old values from replica
+ assert_equal [$replica get mykey] "myvalue"
+
+ # Make sure amount of replica keys didn't change
+ assert_equal [$replica dbsize] 2001
+ }
- # make sure the original keys were restored
- assert_equal [$slave dbsize] 2000
+ # Speed up shutdown
+ $master config set rdb-key-save-delay 0
+ }
+ "Successful" {
+ # Let replica finish sync with master
+ wait_for_condition 100 100 {
+ [s -1 master_link_status] eq "up"
+ } else {
+ fail "Master <-> Replica didn't finish sync"
+ }
+
+ test {Diskless load swapdb (async_loading): new database is exposed after swapping} {
+ # Ensure we don't see anymore the key that was stored only to replica and also that we don't get LOADING status
+ assert_equal [$replica GET mykey] ""
+
+ # Make sure amount of keys matches master
+ assert_equal [$replica dbsize] 1010
+ }
+ }
+ }
}
}
-} {} {external:skip}
+}
test {diskless loading short read} {
start_server {tags {"repl"}} {
diff --git a/tests/modules/testrdb.c b/tests/modules/testrdb.c
index 5d9382a3a..6786fdf2e 100644
--- a/tests/modules/testrdb.c
+++ b/tests/modules/testrdb.c
@@ -13,39 +13,47 @@ RedisModuleType *testrdb_type = NULL;
RedisModuleString *before_str = NULL;
RedisModuleString *after_str = NULL;
-void replBackupCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
+/* Global values used to keep aux from db being loaded (in case of async_loading) */
+RedisModuleString *before_str_temp = NULL;
+RedisModuleString *after_str_temp = NULL;
+
+/* Indicates whether there is an async replication in progress.
+ * We control this value from RedisModuleEvent_ReplAsyncLoad events. */
+int async_loading = 0;
+
+void replAsyncLoadCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(e);
REDISMODULE_NOT_USED(data);
- static RedisModuleString *before_str_backup = NULL;
- static RedisModuleString *after_str_backup = NULL;
switch (sub) {
- case REDISMODULE_SUBEVENT_REPL_BACKUP_CREATE:
- assert(before_str_backup == NULL);
- assert(after_str_backup == NULL);
- before_str_backup = before_str;
- after_str_backup = after_str;
- before_str = NULL;
- after_str = NULL;
+ case REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED:
+ assert(async_loading == 0);
+ async_loading = 1;
break;
- case REDISMODULE_SUBEVENT_REPL_BACKUP_RESTORE:
+ case REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_ABORTED:
+ /* Discard temp aux */
+ if (before_str_temp)
+ RedisModule_FreeString(ctx, before_str_temp);
+ if (after_str_temp)
+ RedisModule_FreeString(ctx, after_str_temp);
+ before_str_temp = NULL;
+ after_str_temp = NULL;
+
+ async_loading = 0;
+ break;
+ case REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED:
if (before_str)
RedisModule_FreeString(ctx, before_str);
if (after_str)
RedisModule_FreeString(ctx, after_str);
- before_str = before_str_backup;
- after_str = after_str_backup;
- before_str_backup = NULL;
- after_str_backup = NULL;
- break;
- case REDISMODULE_SUBEVENT_REPL_BACKUP_DISCARD:
- if (before_str_backup)
- RedisModule_FreeString(ctx, before_str_backup);
- if (after_str_backup)
- RedisModule_FreeString(ctx, after_str_backup);
- before_str_backup = NULL;
- after_str_backup = NULL;
+ before_str = before_str_temp;
+ after_str = after_str_temp;
+
+ before_str_temp = NULL;
+ after_str_temp = NULL;
+
+ async_loading = 0;
break;
default:
assert(0);
@@ -105,24 +113,47 @@ int testrdb_aux_load(RedisModuleIO *rdb, int encver, int when) {
if (conf_aux_count==0) assert(0);
RedisModuleCtx *ctx = RedisModule_GetContextFromIO(rdb);
if (when == REDISMODULE_AUX_BEFORE_RDB) {
- if (before_str)
- RedisModule_FreeString(ctx, before_str);
- before_str = NULL;
- int count = RedisModule_LoadSigned(rdb);
- if (RedisModule_IsIOError(rdb))
- return REDISMODULE_ERR;
- if (count)
- before_str = RedisModule_LoadString(rdb);
+ if (async_loading == 0) {
+ if (before_str)
+ RedisModule_FreeString(ctx, before_str);
+ before_str = NULL;
+ int count = RedisModule_LoadSigned(rdb);
+ if (RedisModule_IsIOError(rdb))
+ return REDISMODULE_ERR;
+ if (count)
+ before_str = RedisModule_LoadString(rdb);
+ } else {
+ if (before_str_temp)
+ RedisModule_FreeString(ctx, before_str_temp);
+ before_str_temp = NULL;
+ int count = RedisModule_LoadSigned(rdb);
+ if (RedisModule_IsIOError(rdb))
+ return REDISMODULE_ERR;
+ if (count)
+ before_str_temp = RedisModule_LoadString(rdb);
+ }
} else {
- if (after_str)
- RedisModule_FreeString(ctx, after_str);
- after_str = NULL;
- int count = RedisModule_LoadSigned(rdb);
- if (RedisModule_IsIOError(rdb))
- return REDISMODULE_ERR;
- if (count)
- after_str = RedisModule_LoadString(rdb);
+ if (async_loading == 0) {
+ if (after_str)
+ RedisModule_FreeString(ctx, after_str);
+ after_str = NULL;
+ int count = RedisModule_LoadSigned(rdb);
+ if (RedisModule_IsIOError(rdb))
+ return REDISMODULE_ERR;
+ if (count)
+ after_str = RedisModule_LoadString(rdb);
+ } else {
+ if (after_str_temp)
+ RedisModule_FreeString(ctx, after_str_temp);
+ after_str_temp = NULL;
+ int count = RedisModule_LoadSigned(rdb);
+ if (RedisModule_IsIOError(rdb))
+ return REDISMODULE_ERR;
+ if (count)
+ after_str_temp = RedisModule_LoadString(rdb);
+ }
}
+
if (RedisModule_IsIOError(rdb))
return REDISMODULE_ERR;
return REDISMODULE_OK;
@@ -162,6 +193,21 @@ int testrdb_get_before(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_OK;
}
+/* For purpose of testing module events, expose variable state during async_loading. */
+int testrdb_async_loading_get_before(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
+{
+ REDISMODULE_NOT_USED(argv);
+ if (argc != 1){
+ RedisModule_WrongArity(ctx);
+ return REDISMODULE_OK;
+ }
+ if (before_str_temp)
+ RedisModule_ReplyWithString(ctx, before_str_temp);
+ else
+ RedisModule_ReplyWithStringBuffer(ctx, "", 0);
+ return REDISMODULE_OK;
+}
+
int testrdb_set_after(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
if (argc != 2){
@@ -226,11 +272,10 @@ int testrdb_get_key(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
-
if (RedisModule_Init(ctx,"testrdb",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
- RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_HANDLE_IO_ERRORS);
+ RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_HANDLE_IO_ERRORS | REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD);
if (argc > 0)
RedisModule_StringToLongLong(argv[0], &conf_aux_count);
@@ -274,6 +319,9 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
if (RedisModule_CreateCommand(ctx,"testrdb.get.before", testrdb_get_before,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx,"testrdb.async_loading.get.before", testrdb_async_loading_get_before,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
if (RedisModule_CreateCommand(ctx,"testrdb.set.after", testrdb_set_after,"deny-oom",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
@@ -287,7 +335,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_ERR;
RedisModule_SubscribeToServerEvent(ctx,
- RedisModuleEvent_ReplBackup, replBackupCallback);
+ RedisModuleEvent_ReplAsyncLoad, replAsyncLoadCallback);
return REDISMODULE_OK;
}
@@ -297,5 +345,9 @@ int RedisModule_OnUnload(RedisModuleCtx *ctx) {
RedisModule_FreeString(ctx, before_str);
if (after_str)
RedisModule_FreeString(ctx, after_str);
+ if (before_str_temp)
+ RedisModule_FreeString(ctx, before_str_temp);
+ if (after_str_temp)
+ RedisModule_FreeString(ctx, after_str_temp);
return REDISMODULE_OK;
}
diff --git a/tests/unit/moduleapi/testrdb.tcl b/tests/unit/moduleapi/testrdb.tcl
index 8bc0f7cd4..12d6a2a77 100644
--- a/tests/unit/moduleapi/testrdb.tcl
+++ b/tests/unit/moduleapi/testrdb.tcl
@@ -131,5 +131,120 @@ tags "modules" {
}
}
}
+
+ # Module events for diskless load swapdb when async_loading (matching master replid)
+ foreach testType {Successful Aborted} {
+ start_server [list overrides [list loadmodule "$testmodule 2"] tags [list external:skip]] {
+ set replica [srv 0 client]
+ set replica_host [srv 0 host]
+ set replica_port [srv 0 port]
+ set replica_log [srv 0 stdout]
+ start_server [list overrides [list loadmodule "$testmodule 2"]] {
+ set master [srv 0 client]
+ set master_host [srv 0 host]
+ set master_port [srv 0 port]
+
+ set start [clock clicks -milliseconds]
+
+ # Set master and replica to use diskless replication on swapdb mode
+ $master config set repl-diskless-sync yes
+ $master config set repl-diskless-sync-delay 0
+ $master config set save ""
+ $replica config set repl-diskless-load swapdb
+ $replica config set save ""
+
+ # Initial sync to have matching replids between master and replica
+ $replica replicaof $master_host $master_port
+
+ # Let replica finish initial sync with master
+ wait_for_condition 100 100 {
+ [s -1 master_link_status] eq "up"
+ } else {
+ fail "Master <-> Replica didn't finish sync"
+ }
+
+ # Set global values on module so we can check if module event callbacks will pick it up correctly
+ $master testrdb.set.before value1_master
+ $replica testrdb.set.before value1_replica
+
+ # Put different data sets on the master and replica
+ # We need to put large keys on the master since the replica replies to info only once in 2mb
+ $replica debug populate 200 slave 10
+ $master debug populate 1000 master 100000
+ $master config set rdbcompression no
+
+ # Force the replica to try another full sync (this time it will have matching master replid)
+ $master multi
+ $master client kill type replica
+ # Fill replication backlog with new content
+ $master config set repl-backlog-size 16384
+ for {set keyid 0} {$keyid < 10} {incr keyid} {
+ $master set "$keyid string_$keyid" [string repeat A 16384]
+ }
+ $master exec
+
+ switch $testType {
+ "Aborted" {
+ # Set master with a slow rdb generation, so that we can easily intercept loading
+ # 10ms per key, with 1000 keys is 10 seconds
+ $master config set rdb-key-save-delay 10000
+
+ test {Diskless load swapdb RedisModuleEvent_ReplAsyncLoad handling: during loading, can keep module variable same as before} {
+ # Wait for the replica to start reading the rdb and module for acknowledgement
+ # We wanna abort only after the temp db was populated by REDISMODULE_AUX_BEFORE_RDB
+ wait_for_condition 100 100 {
+ [s -1 async_loading] eq 1 && [$replica testrdb.async_loading.get.before] eq "value1_master"
+ } else {
+ fail "Module didn't receive or react to REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED"
+ }
+
+ assert_equal [$replica dbsize] 200
+ assert_equal value1_replica [$replica testrdb.get.before]
+ }
+
+ # Make sure that next sync will not start immediately so that we can catch the replica in between syncs
+ $master config set repl-diskless-sync-delay 5
+
+ # Kill the replica connection on the master
+ set killed [$master client kill type replica]
+
+ test {Diskless load swapdb RedisModuleEvent_ReplAsyncLoad handling: when loading aborted, can keep module variable same as before} {
+ # Wait for loading to stop (fail) and module for acknowledgement
+ wait_for_condition 100 100 {
+ [s -1 async_loading] eq 0 && [$replica testrdb.async_loading.get.before] eq ""
+ } else {
+ fail "Module didn't receive or react to REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_ABORTED"
+ }
+
+ assert_equal [$replica dbsize] 200
+ assert_equal value1_replica [$replica testrdb.get.before]
+ }
+
+ # Speed up shutdown
+ $master config set rdb-key-save-delay 0
+ }
+ "Successful" {
+ # Let replica finish sync with master
+ wait_for_condition 100 100 {
+ [s -1 master_link_status] eq "up"
+ } else {
+ fail "Master <-> Replica didn't finish sync"
+ }
+
+ test {Diskless load swapdb RedisModuleEvent_ReplAsyncLoad handling: after db loaded, can set module variable with new value} {
+ assert_equal [$replica dbsize] 1010
+ assert_equal value1_master [$replica testrdb.get.before]
+ }
+ }
+ }
+
+ if {$::verbose} {
+ set end [clock clicks -milliseconds]
+ set duration [expr $end - $start]
+ puts "test took $duration ms"
+ }
+ }
+ }
+ }
}
}