summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/aof.c12
-rw-r--r--src/db.c2
-rw-r--r--src/debug.c2
-rw-r--r--src/evict.c17
-rw-r--r--src/expire.c13
-rw-r--r--src/module.c6
-rw-r--r--src/replication.c6
-rw-r--r--src/server.c167
-rw-r--r--src/server.h15
-rw-r--r--src/t_set.c13
-rw-r--r--tests/modules/keyspace_events.c24
-rw-r--r--tests/unit/expire.tcl23
-rw-r--r--tests/unit/maxmemory.tcl2
-rw-r--r--tests/unit/moduleapi/propagate.tcl192
-rw-r--r--tests/unit/multi.tcl6
15 files changed, 375 insertions, 125 deletions
diff --git a/src/aof.c b/src/aof.c
index 1f0b824f1..52026ab24 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -1295,10 +1295,18 @@ sds genAofTimestampAnnotationIfNeeded(int force) {
return ts;
}
+/* Write the given command to the aof file.
+ * dictid - dictionary id the command should be applied to,
+ * this is used in order to decide if a `select` command
+ * should also be written to the aof. Value of -1 means
+ * to avoid writing `select` command in any case.
+ * argv - The command to write to the aof.
+ * argc - Number of values in argv
+ */
void feedAppendOnlyFile(int dictid, robj **argv, int argc) {
sds buf = sdsempty();
- serverAssert(dictid >= 0 && dictid < server.dbnum);
+ serverAssert(dictid == -1 || (dictid >= 0 && dictid < server.dbnum));
/* Feed timestamp if needed */
if (server.aof_timestamp_enabled) {
@@ -1311,7 +1319,7 @@ void feedAppendOnlyFile(int dictid, robj **argv, int argc) {
/* The DB this command was targeting is not the same as the last command
* we appended. To issue a SELECT command is needed. */
- if (dictid != server.aof_selected_db) {
+ if (dictid != -1 && dictid != server.aof_selected_db) {
char seldb[64];
snprintf(seldb,sizeof(seldb),"%d",dictid);
diff --git a/src/db.c b/src/db.c
index 36de2aa0a..04ab455fd 100644
--- a/src/db.c
+++ b/src/db.c
@@ -1557,9 +1557,9 @@ void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) {
dbSyncDelete(db,keyobj);
latencyEndMonitor(expire_latency);
latencyAddSampleIfNeeded("expire-del",expire_latency);
- notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id);
signalModifiedKey(NULL, db, keyobj);
propagateDeletion(db,keyobj,server.lazyfree_lazy_expire);
+ notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id);
server.stat_expiredkeys++;
}
diff --git a/src/debug.c b/src/debug.c
index dac7716ac..43375c101 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -852,7 +852,7 @@ NULL
server.aof_flush_sleep = atoi(c->argv[2]->ptr);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc >= 3) {
- replicationFeedSlaves(server.slaves, server.slaveseldb,
+ replicationFeedSlaves(server.slaves, -1,
c->argv + 2, c->argc - 2);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"error") && c->argc == 3) {
diff --git a/src/evict.c b/src/evict.c
index 6ee85ede5..34f40b008 100644
--- a/src/evict.c
+++ b/src/evict.c
@@ -574,7 +574,12 @@ int performEvictions(void) {
int prev_core_propagates = server.core_propagates;
serverAssert(server.also_propagate.numops == 0);
server.core_propagates = 1;
- server.propagate_no_multi = 1;
+
+ /* Increase nested call counter
+ * we add this in order to prevent any RM_Call that may exist
+ * in the notify CB to be propagated immediately.
+ * we want them in multi/exec with the DEL command */
+ server.in_nested_call++;
while (mem_freed < (long long)mem_tofree) {
int j, k, i;
@@ -685,9 +690,10 @@ int performEvictions(void) {
mem_freed += delta;
server.stat_evictedkeys++;
signalModifiedKey(NULL,db,keyobj);
- notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
- keyobj, db->id);
propagateDeletion(db,keyobj,server.lazyfree_lazy_eviction);
+ notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",keyobj,db->id);
+ /* Propagate notification commandds and the del. */
+ propagatePendingCommands();
decrRefCount(keyobj);
keys_freed++;
@@ -742,11 +748,8 @@ cant_free:
serverAssert(server.core_propagates); /* This function should not be re-entrant */
- /* Propagate all DELs */
- propagatePendingCommands();
-
server.core_propagates = prev_core_propagates;
- server.propagate_no_multi = 0;
+ server.in_nested_call--;
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("eviction-cycle",latency);
diff --git a/src/expire.c b/src/expire.c
index a6a40450e..0e737fce3 100644
--- a/src/expire.c
+++ b/src/expire.c
@@ -186,7 +186,7 @@ void activeExpireCycle(int type) {
* we're in cron */
serverAssert(server.also_propagate.numops == 0);
server.core_propagates = 1;
- server.propagate_no_multi = 1;
+ server.in_nested_call++;
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
/* Expired and checked in a single loop. */
@@ -264,7 +264,11 @@ void activeExpireCycle(int type) {
de = de->next;
ttl = dictGetSignedIntegerVal(e)-now;
- if (activeExpireCycleTryExpire(db,e,now)) expired++;
+ if (activeExpireCycleTryExpire(db,e,now)) {
+ expired++;
+ /* Propagate the DEL command */
+ propagatePendingCommands();
+ }
if (ttl > 0) {
/* We want the average TTL of keys yet
* not expired. */
@@ -310,11 +314,8 @@ void activeExpireCycle(int type) {
serverAssert(server.core_propagates); /* This function should not be re-entrant */
- /* Propagate all DELs */
- propagatePendingCommands();
-
server.core_propagates = 0;
- server.propagate_no_multi = 0;
+ server.in_nested_call--;
elapsed = ustime()-start;
server.stat_expire_cycle_time_used += elapsed;
diff --git a/src/module.c b/src/module.c
index d8897f36c..5db88703f 100644
--- a/src/module.c
+++ b/src/module.c
@@ -7795,6 +7795,12 @@ void moduleReleaseGIL(void) {
* - REDISMODULE_NOTIFY_STREAM: Stream events
* - REDISMODULE_NOTIFY_MODULE: Module types events
* - REDISMODULE_NOTIFY_KEYMISS: Key-miss events
+ * Notice, key-miss event is the only type
+ * of event that is fired from within a read command.
+ * Performing RM_Call with a write command from within
+ * this notification is wrong and discourage. It will
+ * cause the read command that trigger the event to be
+ * replicated to the AOF/Replica.
* - REDISMODULE_NOTIFY_ALL: All events (Excluding REDISMODULE_NOTIFY_KEYMISS)
* - REDISMODULE_NOTIFY_LOADED: A special notification available only for modules,
* indicates that the key was loaded from persistence.
diff --git a/src/replication.c b/src/replication.c
index faf159d7d..58ed10833 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -420,7 +420,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
char llstr[LONG_STR_SIZE];
/* In case we propagate a command that doesn't touch keys (PING, REPLCONF) we
- * pass dbid=server.slaveseldb which may be -1. */
+ * pass dbid=-1 that indicate there is no need to replicate `select` command. */
serverAssert(dictid == -1 || (dictid >= 0 && dictid < server.dbnum));
/* If the instance is not a top level master, return ASAP: we'll just proxy
@@ -442,7 +442,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
prepareReplicasToWrite();
/* Send SELECT command to every slave if needed. */
- if (server.slaveseldb != dictid) {
+ if (dictid != -1 && server.slaveseldb != dictid) {
robj *selectcmd;
/* For a few DBs we have pre-computed SELECT command. */
@@ -3615,7 +3615,7 @@ void replicationCron(void) {
if (!manual_failover_in_progress) {
ping_argv[0] = shared.ping;
- replicationFeedSlaves(server.slaves, server.slaveseldb,
+ replicationFeedSlaves(server.slaves, -1,
ping_argv, 1);
}
}
diff --git a/src/server.c b/src/server.c
index 093fe876c..4b8cfbad7 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1501,7 +1501,7 @@ static void sendGetackToReplicas(void) {
argv[0] = shared.replconf;
argv[1] = shared.getack;
argv[2] = shared.special_asterick; /* Not used argument. */
- replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
+ replicationFeedSlaves(server.slaves, -1, argv, 3);
}
extern int ProcessingEventsWhileBlocked;
@@ -2520,7 +2520,6 @@ void initServer(void) {
server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE;
server.busy_module_yield_reply = NULL;
server.core_propagates = 0;
- server.propagate_no_multi = 0;
server.module_ctx_nesting = 0;
server.client_pause_in_transaction = 0;
server.child_pid = -1;
@@ -2910,46 +2909,89 @@ void resetErrorTableStats(void) {
/* ========================== Redis OP Array API ============================ */
-void redisOpArrayInit(redisOpArray *oa) {
- oa->ops = NULL;
- oa->numops = 0;
- oa->capacity = 0;
+/* Used to update a placeholder previously created using `redisOpArrayAppendPlaceholder`.
+ * We assume the updated placeholder has no target set yet (otherwise its not a placeholder). */
+void redisOpArraySet(redisOpArray *oa, int dbid, robj **argv, int argc, int target, int index) {
+ redisOp *op = oa->ops+index;
+ serverAssert(!op->target);
+ op->dbid = dbid;
+ op->argv = argv;
+ op->argc = argc;
+ op->target = target;
+ oa->numops++;
}
+/* Append an operation to the given redisOpArray.
+ * dbid - the id of the database to apply the operation on
+ * argv - operations arguments (including the command)
+ * argc - size of argv
+ * target - indicating how to propagate the operation (PROPAGATE_AOF,PROPAGATE_REPL)
+ *
+ * Special case is when used with target 0, in this case the operation is a placeholder
+ * that is expected to be filled later on using redisOpArraySet. */
int redisOpArrayAppend(redisOpArray *oa, int dbid, robj **argv, int argc, int target) {
redisOp *op;
int prev_capacity = oa->capacity;
- if (oa->numops == 0) {
+ if (oa->used == 0) {
oa->capacity = 16;
- } else if (oa->numops >= oa->capacity) {
+ } else if (oa->used >= oa->capacity) {
oa->capacity *= 2;
}
if (prev_capacity != oa->capacity)
oa->ops = zrealloc(oa->ops,sizeof(redisOp)*oa->capacity);
- op = oa->ops+oa->numops;
+ int idx = oa->used;
+ op = oa->ops+idx;
op->dbid = dbid;
op->argv = argv;
op->argc = argc;
op->target = target;
- oa->numops++;
- return oa->numops;
+ oa->used++;
+ if (op->target) {
+ /* if `target` is 0, the operation was added as a placeholder and is not yet counted as
+ * a command that need to be propagated, this is why we only increase `numops` if `target` is not 0. */
+ oa->numops++;
+ }
+ return idx; /* return the index of the appended operation */
+}
+
+void redisOpArrayTrim(redisOpArray *oa) {
+ if (!oa->used) {
+ /* nothing to trim */
+ return;
+ }
+ redisOp *last_op = oa->ops + (oa->used - 1);
+ if (!last_op->target) {
+ /* last op is an unused placeholder, we can remove it */
+ --oa->used;
+ }
+}
+
+int redisOpArrayAppendPlaceholder(redisOpArray *oa) {
+ return redisOpArrayAppend(oa, 0, NULL, 0, 0);
}
void redisOpArrayFree(redisOpArray *oa) {
- while(oa->numops) {
+ while(oa->used) {
int j;
redisOp *op;
+ oa->used--;
+ op = oa->ops+oa->used;
+ if (!op->target) {
+ continue;
+ }
oa->numops--;
- op = oa->ops+oa->numops;
- for (j = 0; j < op->argc; j++)
+ for (j = 0; j < op->argc; j++) {
decrRefCount(op->argv[j]);
+ }
zfree(op->argv);
}
- zfree(oa->ops);
- redisOpArrayInit(oa);
+
+ /* no need to free the actual op array, we reuse the memory for future commands */
+ serverAssert(!oa->numops);
+ serverAssert(!oa->used);
}
/* ====================== Commands lookup and execution ===================== */
@@ -3078,6 +3120,9 @@ static int shouldPropagate(int target) {
* This is an internal low-level function and should not be called!
*
* The API for propagating commands is alsoPropagate().
+ *
+ * dbid value of -1 is saved to indicate that the called do not want
+ * to replicate SELECT for this command (used for database neutral commands).
*/
static void propagateNow(int dbid, robj **argv, int argc, int target) {
if (!shouldPropagate(target))
@@ -3093,6 +3138,29 @@ static void propagateNow(int dbid, robj **argv, int argc, int target) {
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
+/* Append or set the given operation to the replication buffer.
+ * If index is -1, the operation will be appended to the end of the buffer.
+ * Otherwise the operation will be set at the given index. */
+void alsoPropagateRaw(int dbid, robj **argv, int argc, int target, int index) {
+ robj **argvcopy;
+ int j;
+
+ if (!shouldPropagate(target))
+ return;
+
+ argvcopy = zmalloc(sizeof(robj*)*argc);
+ for (j = 0; j < argc; j++) {
+ argvcopy[j] = argv[j];
+ incrRefCount(argv[j]);
+ }
+ if (index == -1) {
+ /* -1 means append to the end */
+ redisOpArrayAppend(&server.also_propagate,dbid,argvcopy,argc,target);
+ } else {
+ redisOpArraySet(&server.also_propagate,dbid,argvcopy,argc,target,index);
+ }
+}
+
/* Used inside commands to schedule the propagation of additional commands
* after the current command is propagated to AOF / Replication.
*
@@ -3105,18 +3173,7 @@ static void propagateNow(int dbid, robj **argv, int argc, int target) {
* stack allocated). The function automatically increments ref count of
* passed objects, so the caller does not need to. */
void alsoPropagate(int dbid, robj **argv, int argc, int target) {
- robj **argvcopy;
- int j;
-
- if (!shouldPropagate(target))
- return;
-
- argvcopy = zmalloc(sizeof(robj*)*argc);
- for (j = 0; j < argc; j++) {
- argvcopy[j] = argv[j];
- incrRefCount(argv[j]);
- }
- redisOpArrayAppend(&server.also_propagate,dbid,argvcopy,argc,target);
+ alsoPropagateRaw(dbid, argv, argc, target, -1);
}
/* It is possible to call the function forceCommandPropagation() inside a
@@ -3175,8 +3232,10 @@ void updateCommandLatencyHistogram(struct hdr_histogram **latency_histogram, int
* multiple separated commands. Note that alsoPropagate() is not affected
* by CLIENT_PREVENT_PROP flag. */
void propagatePendingCommands() {
- if (server.also_propagate.numops == 0)
+ if (server.also_propagate.numops == 0) {
+ redisOpArrayFree(&server.also_propagate);
return;
+ }
int j;
redisOp *rop;
@@ -3188,24 +3247,25 @@ void propagatePendingCommands() {
*
* And if the array contains only one command, no need to
* wrap it, since the single command is atomic. */
- if (server.also_propagate.numops > 1 && !server.propagate_no_multi) {
- /* We use the first command-to-propagate to set the dbid for MULTI,
- * so that the SELECT will be propagated beforehand */
- int multi_dbid = server.also_propagate.ops[0].dbid;
- propagateNow(multi_dbid,&shared.multi,1,PROPAGATE_AOF|PROPAGATE_REPL);
+ if (server.also_propagate.numops > 1) {
+ /* We use dbid=-1 to indicate we do not want to replicate SELECT.
+ * It'll be inserted together with the next command (inside the MULTI) */
+ propagateNow(-1,&shared.multi,1,PROPAGATE_AOF|PROPAGATE_REPL);
multi_emitted = 1;
}
- for (j = 0; j < server.also_propagate.numops; j++) {
+
+ for (j = 0; j < server.also_propagate.used; j++) {
rop = &server.also_propagate.ops[j];
- serverAssert(rop->target);
+ if (!rop->target) {
+ continue;
+ }
propagateNow(rop->dbid,rop->argv,rop->argc,rop->target);
}
if (multi_emitted) {
- /* We take the dbid from last command so that propagateNow() won't inject another SELECT */
- int exec_dbid = server.also_propagate.ops[server.also_propagate.numops-1].dbid;
- propagateNow(exec_dbid,&shared.exec,1,PROPAGATE_AOF|PROPAGATE_REPL);
+ /* We use dbid=-1 to indicate we do not want to replicate select */
+ propagateNow(-1,&shared.exec,1,PROPAGATE_AOF|PROPAGATE_REPL);
}
redisOpArrayFree(&server.also_propagate);
@@ -3314,6 +3374,24 @@ void call(client *c, int flags) {
if (monotonicGetType() == MONOTONIC_CLOCK_HW)
monotonic_start = getMonotonicUs();
+ int cmd_prop_index = -1;
+ if (c->cmd->flags & (CMD_WRITE | CMD_MAY_REPLICATE)) {
+ /* Save placeholder for replication.
+ * If the command will be replicated, this is the location where it should be placed in the replication stream.
+ * If we just push the command to the replication buffer (without the placeholder), the replica might get the commands
+ * in a wrong order and we will end up with master-replica inconsistency. To demonstrate, take the following example:
+ *
+ * 1. A module register a key space notification callback and inside the notification the module performed an incr command on the given key
+ * 2. User performs 'set x 1'
+ * 3. The module get the notification and perform 'incr x'
+ * 4. The command 'incr x' enters the replication buffer before the 'set x 1' command and the replica sees the command in the wrong order
+ *
+ * The final result is that the replica will have the value x=1 while the master will have x=2
+ *
+ * Notice that we only do this if the command might cause replication (either it's a WRITE command or MAY_REPLICATE) */
+ cmd_prop_index = redisOpArrayAppendPlaceholder(&server.also_propagate);
+ }
+
server.in_nested_call++;
c->cmd->proc(c);
server.in_nested_call--;
@@ -3435,9 +3513,18 @@ void call(client *c, int flags) {
/* Call alsoPropagate() only if at least one of AOF / replication
* propagation is needed. */
if (propagate_flags != PROPAGATE_NONE)
- alsoPropagate(c->db->id,c->argv,c->argc,propagate_flags);
+ /* If we got here with cmd_prop_index == -1, the command will be added to the end
+ * of the replication buffer, this can only happened on a read that causes a key
+ * miss event which causes the module to perform a write command using RM_Call.
+ * In such case we will propagate a read command (or a write command that has no effect
+ * and should not have been propagated). This behavior is wrong and module writer is advised
+ * not to perform any write commands on key miss event. */
+ alsoPropagateRaw(c->db->id,c->argv,c->argc,propagate_flags,cmd_prop_index);
}
+ /* Try to trim the last element if it is not used (if it's still a placeholder). */
+ redisOpArrayTrim(&server.also_propagate);
+
/* Restore the old replication flags, since call() can be executed
* recursively. */
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
diff --git a/src/server.h b/src/server.h
index acb5bfd44..ebf7037e3 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1281,6 +1281,7 @@ extern clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUN
* after the propagation of the executed command. */
typedef struct redisOp {
robj **argv;
+ /* target=0 means the operation should not be propagate (unused placeholder), for more info look at redisOpArray */
int argc, dbid, target;
} redisOp;
@@ -1292,9 +1293,14 @@ typedef struct redisOp {
* redisOpArrayFree();
*/
typedef struct redisOpArray {
- redisOp *ops;
- int numops;
- int capacity;
+ redisOp *ops; /* The array of operation to replicated */
+ int numops; /* The actual number of operations in the array */
+ int used; /* Spots that is used in the ops array, we need to
+ differenced between the actual number of operations
+ and the used spots because there might be spots
+ that was saved as a placeholder for future command
+ but was never actually used */
+ int capacity; /* The ops array capacity */
} redisOpArray;
/* This structure is returned by the getMemoryOverheadData() function in
@@ -1485,7 +1491,6 @@ struct redisServer {
int busy_module_yield_flags; /* Are we inside a busy module? (triggered by RM_Yield). see BUSY_MODULE_YIELD_ flags. */
const char *busy_module_yield_reply; /* When non-null, we are inside RM_Yield. */
int core_propagates; /* Is the core (in oppose to the module subsystem) is in charge of calling propagatePendingCommands? */
- int propagate_no_multi; /* True if propagatePendingCommands should avoid wrapping command in MULTI/EXEC */
int module_ctx_nesting; /* moduleCreateContext() nesting level */
char *ignore_warnings; /* Config: warnings that should be ignored. */
int client_pause_in_transaction; /* Was a client pause executed during this Exec? */
@@ -2899,7 +2904,7 @@ int incrCommandStatsOnError(struct redisCommand *cmd, int flags);
void call(client *c, int flags);
void alsoPropagate(int dbid, robj **argv, int argc, int target);
void propagatePendingCommands();
-void redisOpArrayInit(redisOpArray *oa);
+void redisOpArrayReset(redisOpArray *oa);
void redisOpArrayFree(redisOpArray *oa);
void forceCommandPropagation(client *c, int flags);
void preventCommandPropagation(client *c);
diff --git a/src/t_set.c b/src/t_set.c
index 3fb9f5259..d07f2ced9 100644
--- a/src/t_set.c
+++ b/src/t_set.c
@@ -490,11 +490,20 @@ void spopWithCountCommand(client *c) {
/* Delete the set as it is now empty */
dbDelete(c->db,c->argv[1]);
+
+ /* todo: Move the spop notification to be executed after the command logic.
+ * We can then decide if we want to keep the `alsoPropagate` or move to `rewriteClientCommandVector`. */
+
+ /* Propagate del command */
+ robj *propagate[2];
+ propagate[0] = shared.del;
+ propagate[1] = c->argv[1];
+ alsoPropagate(c->db->id,propagate,2,PROPAGATE_AOF|PROPAGATE_REPL);
+
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
- /* Propagate this command as a DEL operation */
- rewriteClientCommandVector(c,2,shared.del,c->argv[1]);
signalModifiedKey(c,c->db,c->argv[1]);
+ preventCommandPropagation(c);
return;
}
diff --git a/tests/modules/keyspace_events.c b/tests/modules/keyspace_events.c
index d394786b3..44de09979 100644
--- a/tests/modules/keyspace_events.c
+++ b/tests/modules/keyspace_events.c
@@ -101,6 +101,26 @@ static int KeySpace_NotificationExpired(RedisModuleCtx *ctx, int type, const cha
return REDISMODULE_OK;
}
+/* This key miss notification handler is performing a write command inside the notification callback.
+ * Notice, it is discourage and currently wrong to perform a write command inside key miss event.
+ * It can cause read commands to be replicated to the replica/aof. This test is here temporary (for coverage and
+ * verification that it's not crashing). */
+static int KeySpace_NotificationModuleKeyMiss(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
+ REDISMODULE_NOT_USED(type);
+ REDISMODULE_NOT_USED(event);
+ REDISMODULE_NOT_USED(key);
+
+ int flags = RedisModule_GetContextFlags(ctx);
+ if (!(flags & REDISMODULE_CTX_FLAGS_MASTER)) {
+ return REDISMODULE_OK; // ignore the event on replica
+ }
+
+ RedisModuleCallReply* rep = RedisModule_Call(ctx, "incr", "!c", "missed");
+ RedisModule_FreeCallReply(rep);
+
+ return REDISMODULE_OK;
+}
+
static int KeySpace_NotificationModule(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(type);
@@ -266,6 +286,10 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_ERR;
}
+ if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_KEY_MISS, KeySpace_NotificationModuleKeyMiss) != REDISMODULE_OK){
+ return REDISMODULE_ERR;
+ }
+
if (RedisModule_CreateCommand(ctx,"keyspace.notify", cmdNotify,"",0,0,0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
diff --git a/tests/unit/expire.tcl b/tests/unit/expire.tcl
index d85d59e7a..8e4954a34 100644
--- a/tests/unit/expire.tcl
+++ b/tests/unit/expire.tcl
@@ -745,4 +745,27 @@ start_server {tags {"expire"}} {
assert_equal [r EXPIRE none 100 GT] 0
assert_equal [r EXPIRE none 100 LT] 0
} {}
+
+ test {Redis should not propagate the read command on lazy expire} {
+ r debug set-active-expire 0
+ r flushall ; # Clean up keyspace to avoid interference by keys from other tests
+ r set foo bar PX 1
+ set repl [attach_to_replication_stream]
+ wait_for_condition 50 100 {
+ [r get foo] eq {}
+ } else {
+ fail "Replication not started."
+ }
+
+ # dummy command to verify nothing else gets into the replication stream.
+ r set x 1
+
+ assert_replication_stream $repl {
+ {select *}
+ {del foo}
+ {set x 1}
+ }
+ close_replication_stream $repl
+ assert_equal [r debug set-active-expire 1] {OK}
+ } {} {needs:debug}
}
diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl
index 8b0e5045a..631b3ac91 100644
--- a/tests/unit/maxmemory.tcl
+++ b/tests/unit/maxmemory.tcl
@@ -558,8 +558,8 @@ start_server {tags {"maxmemory" "external:skip"}} {
}
assert_replication_stream $repl {
- {select *}
{multi}
+ {select *}
{incr x}
{incr x}
{exec}
diff --git a/tests/unit/moduleapi/propagate.tcl b/tests/unit/moduleapi/propagate.tcl
index 846d938fb..39189b715 100644
--- a/tests/unit/moduleapi/propagate.tcl
+++ b/tests/unit/moduleapi/propagate.tcl
@@ -45,10 +45,10 @@ tags "modules" {
$master set x y
assert_replication_stream $repl {
- {select *}
{multi}
- {incr notifications}
+ {select *}
{set x y}
+ {incr notifications}
{exec}
}
close_replication_stream $repl
@@ -63,12 +63,12 @@ tags "modules" {
$master exec
assert_replication_stream $repl {
- {select *}
{multi}
- {incr notifications}
+ {select *}
{set x1 y1}
{incr notifications}
{set x2 y2}
+ {incr notifications}
{exec}
}
close_replication_stream $repl
@@ -91,34 +91,40 @@ tags "modules" {
# Note whenever there's double notification: SET with PX issues two separate
# notifications: one for "set" and one for "expire"
assert_replication_stream $repl {
- {select *}
{multi}
+ {select *}
+ {set asdf1 1 PXAT *}
{incr notifications}
{incr notifications}
- {set asdf1 1 PXAT *}
{exec}
{multi}
+ {set asdf2 2 PXAT *}
{incr notifications}
{incr notifications}
- {set asdf2 2 PXAT *}
{exec}
{multi}
+ {set asdf3 3 PXAT *}
{incr notifications}
{incr notifications}
- {set asdf3 3 PXAT *}
{exec}
- {incr notifications}
+ {multi}
+ {del asdf*}
{incr notifications}
{incr testkeyspace:expired}
- {del asdf*}
{incr notifications}
+ {exec}
+ {multi}
+ {del asdf*}
{incr notifications}
{incr testkeyspace:expired}
- {del asdf*}
{incr notifications}
+ {exec}
+ {multi}
+ {del asdf*}
{incr notifications}
{incr testkeyspace:expired}
- {del asdf*}
+ {incr notifications}
+ {exec}
}
close_replication_stream $repl
@@ -143,8 +149,11 @@ tags "modules" {
# Bottom line: "notifications" always exists and we can't really determine the order of evictions
# This test is here only for sanity
+ # The replica will get the notification with multi exec and we have a generic notification handler
+ # that performs `RedisModule_Call(ctx, "INCR", "c", "multi");` if the notification is inside multi exec.
+ # so we will have 2 keys, "notifications" and "multi".
wait_for_condition 500 10 {
- [$replica dbsize] eq 1
+ [$replica dbsize] eq 2
} else {
fail "Not all keys have been evicted"
}
@@ -183,31 +192,37 @@ tags "modules" {
# Note that although CONFIG SET maxmemory is called in this flow (see issue #10014),
# eviction will happen and will not induce propagation of the CONFIG command (see #10019).
assert_replication_stream $repl {
- {select *}
{multi}
+ {select *}
+ {set asdf1 1 PXAT *}
{incr notifications}
{incr notifications}
- {set asdf1 1 PXAT *}
{exec}
{multi}
+ {set asdf2 2 PXAT *}
{incr notifications}
{incr notifications}
- {set asdf2 2 PXAT *}
{exec}
{multi}
+ {set asdf3 3 PXAT *}
{incr notifications}
{incr notifications}
- {set asdf3 3 PXAT *}
{exec}
- {incr notifications}
+ {multi}
{del asdf*}
{incr notifications}
+ {exec}
+ {multi}
{del asdf*}
{incr notifications}
- {del asdf*}
+ {exec}
{multi}
+ {del asdf*}
{incr notifications}
+ {exec}
+ {multi}
{set asdf4 4}
+ {incr notifications}
{exec}
}
close_replication_stream $repl
@@ -229,20 +244,24 @@ tags "modules" {
}
assert_replication_stream $repl {
- {select *}
{multi}
+ {select *}
+ {set timer-maxmemory-volatile-start 1 PXAT *}
{incr notifications}
{incr notifications}
- {set timer-maxmemory-volatile-start 1 PXAT *}
{incr timer-maxmemory-middle}
+ {set timer-maxmemory-volatile-end 1 PXAT *}
{incr notifications}
{incr notifications}
- {set timer-maxmemory-volatile-end 1 PXAT *}
{exec}
- {incr notifications}
+ {multi}
{del timer-maxmemory-volatile-*}
{incr notifications}
+ {exec}
+ {multi}
{del timer-maxmemory-volatile-*}
+ {incr notifications}
+ {exec}
}
close_replication_stream $repl
@@ -256,15 +275,15 @@ tags "modules" {
$master propagate-test.timer-eval
assert_replication_stream $repl {
- {select *}
{multi}
- {incr notifications}
+ {select *}
{incrby timer-eval-start 1}
{incr notifications}
{set foo bar}
- {incr timer-eval-middle}
{incr notifications}
+ {incr timer-eval-middle}
{incrby timer-eval-end 1}
+ {incr notifications}
{exec}
}
close_replication_stream $repl
@@ -282,8 +301,8 @@ tags "modules" {
}
assert_replication_stream $repl {
- {select *}
{multi}
+ {select *}
{incrby timer-nested-start 1}
{incrby timer-nested-end 1}
{exec}
@@ -307,16 +326,15 @@ tags "modules" {
}
assert_replication_stream $repl {
- {select *}
{multi}
+ {select *}
{incrby timer-nested-start 1}
- {incr notifications}
{incr using-call}
+ {incr notifications}
{incr counter-1}
{incr counter-2}
{incr counter-3}
{incr counter-4}
- {incr notifications}
{incr after-call}
{incr notifications}
{incr before-call-2}
@@ -328,6 +346,7 @@ tags "modules" {
{incr after-call-2}
{incr notifications}
{incr timer-nested-middle}
+ {incr notifications}
{incrby timer-nested-end 1}
{exec}
}
@@ -350,23 +369,23 @@ tags "modules" {
}
assert_replication_stream $repl {
- {select *}
{multi}
+ {select *}
{incr a-from-thread}
- {incr notifications}
{incr thread-call}
+ {incr notifications}
{incr b-from-thread}
{exec}
{multi}
{incr a-from-thread}
- {incr notifications}
{incr thread-call}
+ {incr notifications}
{incr b-from-thread}
{exec}
{multi}
{incr a-from-thread}
- {incr notifications}
{incr thread-call}
+ {incr notifications}
{incr b-from-thread}
{exec}
}
@@ -385,13 +404,13 @@ tags "modules" {
}
assert_replication_stream $repl {
- {select *}
{multi}
+ {select *}
{incr thread-detached-before}
- {incr notifications}
{incr thread-detached-1}
{incr notifications}
{incr thread-detached-2}
+ {incr notifications}
{incr thread-detached-after}
{exec}
}
@@ -405,18 +424,18 @@ tags "modules" {
$master propagate-test.mixed
assert_replication_stream $repl {
- {select *}
{multi}
+ {select *}
{incr counter-1}
{incr counter-2}
{exec}
{multi}
- {incr notifications}
{incr using-call}
+ {incr notifications}
{incr counter-1}
{incr counter-2}
- {incr notifications}
{incr after-call}
+ {incr notifications}
{exec}
}
close_replication_stream $repl
@@ -431,18 +450,18 @@ tags "modules" {
redis.call("propagate-test.mixed"); return "OK" } 0 ] {OK}
assert_replication_stream $repl {
- {select *}
{multi}
+ {select *}
{incr counter-1}
{incr counter-2}
- {incr notifications}
{set x y}
{incr notifications}
{incr using-call}
+ {incr notifications}
{incr counter-1}
{incr counter-2}
- {incr notifications}
{incr after-call}
+ {incr notifications}
{exec}
}
close_replication_stream $repl
@@ -456,18 +475,18 @@ tags "modules" {
$master propagate-test.mixed
assert_replication_stream $repl {
- {select *}
{multi}
+ {select *}
{incr counter-1}
{incr counter-2}
{exec}
{multi}
- {incr notifications}
{incr using-call}
+ {incr notifications}
{incr counter-1}
{incr counter-2}
- {incr notifications}
{incr after-call}
+ {incr notifications}
{exec}
}
close_replication_stream $repl
@@ -482,18 +501,18 @@ tags "modules" {
$master propagate-test.mixed
assert_replication_stream $repl {
- {select *}
{multi}
+ {select *}
{incr counter-1}
{incr counter-2}
{exec}
{multi}
- {incr notifications}
{incr using-call}
+ {incr notifications}
{incr counter-1}
{incr counter-2}
- {incr notifications}
{incr after-call}
+ {incr notifications}
{exec}
}
close_replication_stream $repl
@@ -515,26 +534,25 @@ tags "modules" {
}
assert_replication_stream $repl {
- {select *}
{multi}
+ {select *}
{incr counter-1}
{incr counter-2}
- {incr notifications}
{incr using-call}
+ {incr notifications}
{incr counter-1}
{incr counter-2}
- {incr notifications}
{incr after-call}
+ {incr notifications}
{exec}
{multi}
{incrby timer-nested-start 1}
- {incr notifications}
{incr using-call}
+ {incr notifications}
{incr counter-1}
{incr counter-2}
{incr counter-3}
{incr counter-4}
- {incr notifications}
{incr after-call}
{incr notifications}
{incr before-call-2}
@@ -546,6 +564,7 @@ tags "modules" {
{incr after-call-2}
{incr notifications}
{incr timer-nested-middle}
+ {incr notifications}
{incrby timer-nested-end 1}
{exec}
}
@@ -566,8 +585,8 @@ tags "modules" {
$master propagate-test.incr k1
assert_replication_stream $repl {
- {select *}
{multi}
+ {select *}
{del k1}
{propagate-test.incr k1}
{exec}
@@ -580,6 +599,71 @@ tags "modules" {
assert_equal [$replica ttl k1] -1
}
+ test {module notification on set} {
+ set repl [attach_to_replication_stream]
+
+ $master SADD s foo
+
+ wait_for_condition 500 10 {
+ [$replica SCARD s] eq "1"
+ } else {
+ fail "Failed to wait for set to be replicated"
+ }
+
+ $master SPOP s 1
+
+ wait_for_condition 500 10 {
+ [$replica SCARD s] eq "0"
+ } else {
+ fail "Failed to wait for set to be replicated"
+ }
+
+ # Currently the `del` command comes after the notification.
+ # When we fix spop to fire notification at the end (like all other commands),
+ # the `del` will come first.
+ assert_replication_stream $repl {
+ {multi}
+ {select *}
+ {sadd s foo}
+ {incr notifications}
+ {exec}
+ {multi}
+ {incr notifications}
+ {del s}
+ {incr notifications}
+ {exec}
+ }
+ close_replication_stream $repl
+ }
+
+ test {module key miss notification do not cause read command to be replicated} {
+ set repl [attach_to_replication_stream]
+
+ $master flushall
+
+ $master get unexisting_key
+
+ wait_for_condition 500 10 {
+ [$replica get missed] eq "1"
+ } else {
+ fail "Failed to wait for set to be replicated"
+ }
+
+ # Test is checking a wrong!!! behavior that causes a read command to be replicated to replica/aof.
+ # We keep the test to verify that such a wrong behavior does not cause any crashes.
+ assert_replication_stream $repl {
+ {select *}
+ {flushall}
+ {multi}
+ {incr missed}
+ {incr notifications}
+ {get unexisting_key}
+ {exec}
+ }
+
+ close_replication_stream $repl
+ }
+
test "Unload the module - propagate-test/testkeyspace" {
assert_equal {OK} [r module unload propagate-test]
assert_equal {OK} [r module unload testkeyspace]
diff --git a/tests/unit/multi.tcl b/tests/unit/multi.tcl
index 63d85d26b..26ab8e571 100644
--- a/tests/unit/multi.tcl
+++ b/tests/unit/multi.tcl
@@ -421,8 +421,8 @@ start_server {tags {"multi"}} {
r exec
assert_replication_stream $repl {
- {select *}
{multi}
+ {select *}
{set foo{t} bar}
{set foo2{t} bar2}
{set foo3{t} bar3}
@@ -446,8 +446,8 @@ start_server {tags {"multi"}} {
r exec
assert_replication_stream $repl {
- {select *}
{multi}
+ {select *}
{set foo{t} bar}
{select *}
{set foo2{t} bar2}
@@ -904,8 +904,8 @@ start_server {overrides {appendonly {yes} appendfilename {appendonly.aof} append
r flushall
r exec
assert_aof_content $aof {
- {select *}
{multi}
+ {select *}
{set *}
{flushall}
{exec}