diff options
-rw-r--r-- | src/aof.c | 12 | ||||
-rw-r--r-- | src/db.c | 2 | ||||
-rw-r--r-- | src/debug.c | 2 | ||||
-rw-r--r-- | src/evict.c | 17 | ||||
-rw-r--r-- | src/expire.c | 13 | ||||
-rw-r--r-- | src/module.c | 6 | ||||
-rw-r--r-- | src/replication.c | 6 | ||||
-rw-r--r-- | src/server.c | 167 | ||||
-rw-r--r-- | src/server.h | 15 | ||||
-rw-r--r-- | src/t_set.c | 13 | ||||
-rw-r--r-- | tests/modules/keyspace_events.c | 24 | ||||
-rw-r--r-- | tests/unit/expire.tcl | 23 | ||||
-rw-r--r-- | tests/unit/maxmemory.tcl | 2 | ||||
-rw-r--r-- | tests/unit/moduleapi/propagate.tcl | 192 | ||||
-rw-r--r-- | tests/unit/multi.tcl | 6 |
15 files changed, 375 insertions, 125 deletions
@@ -1295,10 +1295,18 @@ sds genAofTimestampAnnotationIfNeeded(int force) { return ts; } +/* Write the given command to the aof file. + * dictid - dictionary id the command should be applied to, + * this is used in order to decide if a `select` command + * should also be written to the aof. Value of -1 means + * to avoid writing `select` command in any case. + * argv - The command to write to the aof. + * argc - Number of values in argv + */ void feedAppendOnlyFile(int dictid, robj **argv, int argc) { sds buf = sdsempty(); - serverAssert(dictid >= 0 && dictid < server.dbnum); + serverAssert(dictid == -1 || (dictid >= 0 && dictid < server.dbnum)); /* Feed timestamp if needed */ if (server.aof_timestamp_enabled) { @@ -1311,7 +1319,7 @@ void feedAppendOnlyFile(int dictid, robj **argv, int argc) { /* The DB this command was targeting is not the same as the last command * we appended. To issue a SELECT command is needed. */ - if (dictid != server.aof_selected_db) { + if (dictid != -1 && dictid != server.aof_selected_db) { char seldb[64]; snprintf(seldb,sizeof(seldb),"%d",dictid); @@ -1557,9 +1557,9 @@ void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) { dbSyncDelete(db,keyobj); latencyEndMonitor(expire_latency); latencyAddSampleIfNeeded("expire-del",expire_latency); - notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id); signalModifiedKey(NULL, db, keyobj); propagateDeletion(db,keyobj,server.lazyfree_lazy_expire); + notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id); server.stat_expiredkeys++; } diff --git a/src/debug.c b/src/debug.c index dac7716ac..43375c101 100644 --- a/src/debug.c +++ b/src/debug.c @@ -852,7 +852,7 @@ NULL server.aof_flush_sleep = atoi(c->argv[2]->ptr); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc >= 3) { - replicationFeedSlaves(server.slaves, server.slaveseldb, + replicationFeedSlaves(server.slaves, -1, c->argv + 2, c->argc - 2); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"error") && c->argc == 3) { diff --git a/src/evict.c b/src/evict.c index 6ee85ede5..34f40b008 100644 --- a/src/evict.c +++ b/src/evict.c @@ -574,7 +574,12 @@ int performEvictions(void) { int prev_core_propagates = server.core_propagates; serverAssert(server.also_propagate.numops == 0); server.core_propagates = 1; - server.propagate_no_multi = 1; + + /* Increase nested call counter + * we add this in order to prevent any RM_Call that may exist + * in the notify CB to be propagated immediately. + * we want them in multi/exec with the DEL command */ + server.in_nested_call++; while (mem_freed < (long long)mem_tofree) { int j, k, i; @@ -685,9 +690,10 @@ int performEvictions(void) { mem_freed += delta; server.stat_evictedkeys++; signalModifiedKey(NULL,db,keyobj); - notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted", - keyobj, db->id); propagateDeletion(db,keyobj,server.lazyfree_lazy_eviction); + notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",keyobj,db->id); + /* Propagate notification commandds and the del. */ + propagatePendingCommands(); decrRefCount(keyobj); keys_freed++; @@ -742,11 +748,8 @@ cant_free: serverAssert(server.core_propagates); /* This function should not be re-entrant */ - /* Propagate all DELs */ - propagatePendingCommands(); - server.core_propagates = prev_core_propagates; - server.propagate_no_multi = 0; + server.in_nested_call--; latencyEndMonitor(latency); latencyAddSampleIfNeeded("eviction-cycle",latency); diff --git a/src/expire.c b/src/expire.c index a6a40450e..0e737fce3 100644 --- a/src/expire.c +++ b/src/expire.c @@ -186,7 +186,7 @@ void activeExpireCycle(int type) { * we're in cron */ serverAssert(server.also_propagate.numops == 0); server.core_propagates = 1; - server.propagate_no_multi = 1; + server.in_nested_call++; for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) { /* Expired and checked in a single loop. */ @@ -264,7 +264,11 @@ void activeExpireCycle(int type) { de = de->next; ttl = dictGetSignedIntegerVal(e)-now; - if (activeExpireCycleTryExpire(db,e,now)) expired++; + if (activeExpireCycleTryExpire(db,e,now)) { + expired++; + /* Propagate the DEL command */ + propagatePendingCommands(); + } if (ttl > 0) { /* We want the average TTL of keys yet * not expired. */ @@ -310,11 +314,8 @@ void activeExpireCycle(int type) { serverAssert(server.core_propagates); /* This function should not be re-entrant */ - /* Propagate all DELs */ - propagatePendingCommands(); - server.core_propagates = 0; - server.propagate_no_multi = 0; + server.in_nested_call--; elapsed = ustime()-start; server.stat_expire_cycle_time_used += elapsed; diff --git a/src/module.c b/src/module.c index d8897f36c..5db88703f 100644 --- a/src/module.c +++ b/src/module.c @@ -7795,6 +7795,12 @@ void moduleReleaseGIL(void) { * - REDISMODULE_NOTIFY_STREAM: Stream events * - REDISMODULE_NOTIFY_MODULE: Module types events * - REDISMODULE_NOTIFY_KEYMISS: Key-miss events + * Notice, key-miss event is the only type + * of event that is fired from within a read command. + * Performing RM_Call with a write command from within + * this notification is wrong and discourage. It will + * cause the read command that trigger the event to be + * replicated to the AOF/Replica. * - REDISMODULE_NOTIFY_ALL: All events (Excluding REDISMODULE_NOTIFY_KEYMISS) * - REDISMODULE_NOTIFY_LOADED: A special notification available only for modules, * indicates that the key was loaded from persistence. diff --git a/src/replication.c b/src/replication.c index faf159d7d..58ed10833 100644 --- a/src/replication.c +++ b/src/replication.c @@ -420,7 +420,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { char llstr[LONG_STR_SIZE]; /* In case we propagate a command that doesn't touch keys (PING, REPLCONF) we - * pass dbid=server.slaveseldb which may be -1. */ + * pass dbid=-1 that indicate there is no need to replicate `select` command. */ serverAssert(dictid == -1 || (dictid >= 0 && dictid < server.dbnum)); /* If the instance is not a top level master, return ASAP: we'll just proxy @@ -442,7 +442,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { prepareReplicasToWrite(); /* Send SELECT command to every slave if needed. */ - if (server.slaveseldb != dictid) { + if (dictid != -1 && server.slaveseldb != dictid) { robj *selectcmd; /* For a few DBs we have pre-computed SELECT command. */ @@ -3615,7 +3615,7 @@ void replicationCron(void) { if (!manual_failover_in_progress) { ping_argv[0] = shared.ping; - replicationFeedSlaves(server.slaves, server.slaveseldb, + replicationFeedSlaves(server.slaves, -1, ping_argv, 1); } } diff --git a/src/server.c b/src/server.c index 093fe876c..4b8cfbad7 100644 --- a/src/server.c +++ b/src/server.c @@ -1501,7 +1501,7 @@ static void sendGetackToReplicas(void) { argv[0] = shared.replconf; argv[1] = shared.getack; argv[2] = shared.special_asterick; /* Not used argument. */ - replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); + replicationFeedSlaves(server.slaves, -1, argv, 3); } extern int ProcessingEventsWhileBlocked; @@ -2520,7 +2520,6 @@ void initServer(void) { server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE; server.busy_module_yield_reply = NULL; server.core_propagates = 0; - server.propagate_no_multi = 0; server.module_ctx_nesting = 0; server.client_pause_in_transaction = 0; server.child_pid = -1; @@ -2910,46 +2909,89 @@ void resetErrorTableStats(void) { /* ========================== Redis OP Array API ============================ */ -void redisOpArrayInit(redisOpArray *oa) { - oa->ops = NULL; - oa->numops = 0; - oa->capacity = 0; +/* Used to update a placeholder previously created using `redisOpArrayAppendPlaceholder`. + * We assume the updated placeholder has no target set yet (otherwise its not a placeholder). */ +void redisOpArraySet(redisOpArray *oa, int dbid, robj **argv, int argc, int target, int index) { + redisOp *op = oa->ops+index; + serverAssert(!op->target); + op->dbid = dbid; + op->argv = argv; + op->argc = argc; + op->target = target; + oa->numops++; } +/* Append an operation to the given redisOpArray. + * dbid - the id of the database to apply the operation on + * argv - operations arguments (including the command) + * argc - size of argv + * target - indicating how to propagate the operation (PROPAGATE_AOF,PROPAGATE_REPL) + * + * Special case is when used with target 0, in this case the operation is a placeholder + * that is expected to be filled later on using redisOpArraySet. */ int redisOpArrayAppend(redisOpArray *oa, int dbid, robj **argv, int argc, int target) { redisOp *op; int prev_capacity = oa->capacity; - if (oa->numops == 0) { + if (oa->used == 0) { oa->capacity = 16; - } else if (oa->numops >= oa->capacity) { + } else if (oa->used >= oa->capacity) { oa->capacity *= 2; } if (prev_capacity != oa->capacity) oa->ops = zrealloc(oa->ops,sizeof(redisOp)*oa->capacity); - op = oa->ops+oa->numops; + int idx = oa->used; + op = oa->ops+idx; op->dbid = dbid; op->argv = argv; op->argc = argc; op->target = target; - oa->numops++; - return oa->numops; + oa->used++; + if (op->target) { + /* if `target` is 0, the operation was added as a placeholder and is not yet counted as + * a command that need to be propagated, this is why we only increase `numops` if `target` is not 0. */ + oa->numops++; + } + return idx; /* return the index of the appended operation */ +} + +void redisOpArrayTrim(redisOpArray *oa) { + if (!oa->used) { + /* nothing to trim */ + return; + } + redisOp *last_op = oa->ops + (oa->used - 1); + if (!last_op->target) { + /* last op is an unused placeholder, we can remove it */ + --oa->used; + } +} + +int redisOpArrayAppendPlaceholder(redisOpArray *oa) { + return redisOpArrayAppend(oa, 0, NULL, 0, 0); } void redisOpArrayFree(redisOpArray *oa) { - while(oa->numops) { + while(oa->used) { int j; redisOp *op; + oa->used--; + op = oa->ops+oa->used; + if (!op->target) { + continue; + } oa->numops--; - op = oa->ops+oa->numops; - for (j = 0; j < op->argc; j++) + for (j = 0; j < op->argc; j++) { decrRefCount(op->argv[j]); + } zfree(op->argv); } - zfree(oa->ops); - redisOpArrayInit(oa); + + /* no need to free the actual op array, we reuse the memory for future commands */ + serverAssert(!oa->numops); + serverAssert(!oa->used); } /* ====================== Commands lookup and execution ===================== */ @@ -3078,6 +3120,9 @@ static int shouldPropagate(int target) { * This is an internal low-level function and should not be called! * * The API for propagating commands is alsoPropagate(). + * + * dbid value of -1 is saved to indicate that the called do not want + * to replicate SELECT for this command (used for database neutral commands). */ static void propagateNow(int dbid, robj **argv, int argc, int target) { if (!shouldPropagate(target)) @@ -3093,6 +3138,29 @@ static void propagateNow(int dbid, robj **argv, int argc, int target) { replicationFeedSlaves(server.slaves,dbid,argv,argc); } +/* Append or set the given operation to the replication buffer. + * If index is -1, the operation will be appended to the end of the buffer. + * Otherwise the operation will be set at the given index. */ +void alsoPropagateRaw(int dbid, robj **argv, int argc, int target, int index) { + robj **argvcopy; + int j; + + if (!shouldPropagate(target)) + return; + + argvcopy = zmalloc(sizeof(robj*)*argc); + for (j = 0; j < argc; j++) { + argvcopy[j] = argv[j]; + incrRefCount(argv[j]); + } + if (index == -1) { + /* -1 means append to the end */ + redisOpArrayAppend(&server.also_propagate,dbid,argvcopy,argc,target); + } else { + redisOpArraySet(&server.also_propagate,dbid,argvcopy,argc,target,index); + } +} + /* Used inside commands to schedule the propagation of additional commands * after the current command is propagated to AOF / Replication. * @@ -3105,18 +3173,7 @@ static void propagateNow(int dbid, robj **argv, int argc, int target) { * stack allocated). The function automatically increments ref count of * passed objects, so the caller does not need to. */ void alsoPropagate(int dbid, robj **argv, int argc, int target) { - robj **argvcopy; - int j; - - if (!shouldPropagate(target)) - return; - - argvcopy = zmalloc(sizeof(robj*)*argc); - for (j = 0; j < argc; j++) { - argvcopy[j] = argv[j]; - incrRefCount(argv[j]); - } - redisOpArrayAppend(&server.also_propagate,dbid,argvcopy,argc,target); + alsoPropagateRaw(dbid, argv, argc, target, -1); } /* It is possible to call the function forceCommandPropagation() inside a @@ -3175,8 +3232,10 @@ void updateCommandLatencyHistogram(struct hdr_histogram **latency_histogram, int * multiple separated commands. Note that alsoPropagate() is not affected * by CLIENT_PREVENT_PROP flag. */ void propagatePendingCommands() { - if (server.also_propagate.numops == 0) + if (server.also_propagate.numops == 0) { + redisOpArrayFree(&server.also_propagate); return; + } int j; redisOp *rop; @@ -3188,24 +3247,25 @@ void propagatePendingCommands() { * * And if the array contains only one command, no need to * wrap it, since the single command is atomic. */ - if (server.also_propagate.numops > 1 && !server.propagate_no_multi) { - /* We use the first command-to-propagate to set the dbid for MULTI, - * so that the SELECT will be propagated beforehand */ - int multi_dbid = server.also_propagate.ops[0].dbid; - propagateNow(multi_dbid,&shared.multi,1,PROPAGATE_AOF|PROPAGATE_REPL); + if (server.also_propagate.numops > 1) { + /* We use dbid=-1 to indicate we do not want to replicate SELECT. + * It'll be inserted together with the next command (inside the MULTI) */ + propagateNow(-1,&shared.multi,1,PROPAGATE_AOF|PROPAGATE_REPL); multi_emitted = 1; } - for (j = 0; j < server.also_propagate.numops; j++) { + + for (j = 0; j < server.also_propagate.used; j++) { rop = &server.also_propagate.ops[j]; - serverAssert(rop->target); + if (!rop->target) { + continue; + } propagateNow(rop->dbid,rop->argv,rop->argc,rop->target); } if (multi_emitted) { - /* We take the dbid from last command so that propagateNow() won't inject another SELECT */ - int exec_dbid = server.also_propagate.ops[server.also_propagate.numops-1].dbid; - propagateNow(exec_dbid,&shared.exec,1,PROPAGATE_AOF|PROPAGATE_REPL); + /* We use dbid=-1 to indicate we do not want to replicate select */ + propagateNow(-1,&shared.exec,1,PROPAGATE_AOF|PROPAGATE_REPL); } redisOpArrayFree(&server.also_propagate); @@ -3314,6 +3374,24 @@ void call(client *c, int flags) { if (monotonicGetType() == MONOTONIC_CLOCK_HW) monotonic_start = getMonotonicUs(); + int cmd_prop_index = -1; + if (c->cmd->flags & (CMD_WRITE | CMD_MAY_REPLICATE)) { + /* Save placeholder for replication. + * If the command will be replicated, this is the location where it should be placed in the replication stream. + * If we just push the command to the replication buffer (without the placeholder), the replica might get the commands + * in a wrong order and we will end up with master-replica inconsistency. To demonstrate, take the following example: + * + * 1. A module register a key space notification callback and inside the notification the module performed an incr command on the given key + * 2. User performs 'set x 1' + * 3. The module get the notification and perform 'incr x' + * 4. The command 'incr x' enters the replication buffer before the 'set x 1' command and the replica sees the command in the wrong order + * + * The final result is that the replica will have the value x=1 while the master will have x=2 + * + * Notice that we only do this if the command might cause replication (either it's a WRITE command or MAY_REPLICATE) */ + cmd_prop_index = redisOpArrayAppendPlaceholder(&server.also_propagate); + } + server.in_nested_call++; c->cmd->proc(c); server.in_nested_call--; @@ -3435,9 +3513,18 @@ void call(client *c, int flags) { /* Call alsoPropagate() only if at least one of AOF / replication * propagation is needed. */ if (propagate_flags != PROPAGATE_NONE) - alsoPropagate(c->db->id,c->argv,c->argc,propagate_flags); + /* If we got here with cmd_prop_index == -1, the command will be added to the end + * of the replication buffer, this can only happened on a read that causes a key + * miss event which causes the module to perform a write command using RM_Call. + * In such case we will propagate a read command (or a write command that has no effect + * and should not have been propagated). This behavior is wrong and module writer is advised + * not to perform any write commands on key miss event. */ + alsoPropagateRaw(c->db->id,c->argv,c->argc,propagate_flags,cmd_prop_index); } + /* Try to trim the last element if it is not used (if it's still a placeholder). */ + redisOpArrayTrim(&server.also_propagate); + /* Restore the old replication flags, since call() can be executed * recursively. */ c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); diff --git a/src/server.h b/src/server.h index acb5bfd44..ebf7037e3 100644 --- a/src/server.h +++ b/src/server.h @@ -1281,6 +1281,7 @@ extern clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUN * after the propagation of the executed command. */ typedef struct redisOp { robj **argv; + /* target=0 means the operation should not be propagate (unused placeholder), for more info look at redisOpArray */ int argc, dbid, target; } redisOp; @@ -1292,9 +1293,14 @@ typedef struct redisOp { * redisOpArrayFree(); */ typedef struct redisOpArray { - redisOp *ops; - int numops; - int capacity; + redisOp *ops; /* The array of operation to replicated */ + int numops; /* The actual number of operations in the array */ + int used; /* Spots that is used in the ops array, we need to + differenced between the actual number of operations + and the used spots because there might be spots + that was saved as a placeholder for future command + but was never actually used */ + int capacity; /* The ops array capacity */ } redisOpArray; /* This structure is returned by the getMemoryOverheadData() function in @@ -1485,7 +1491,6 @@ struct redisServer { int busy_module_yield_flags; /* Are we inside a busy module? (triggered by RM_Yield). see BUSY_MODULE_YIELD_ flags. */ const char *busy_module_yield_reply; /* When non-null, we are inside RM_Yield. */ int core_propagates; /* Is the core (in oppose to the module subsystem) is in charge of calling propagatePendingCommands? */ - int propagate_no_multi; /* True if propagatePendingCommands should avoid wrapping command in MULTI/EXEC */ int module_ctx_nesting; /* moduleCreateContext() nesting level */ char *ignore_warnings; /* Config: warnings that should be ignored. */ int client_pause_in_transaction; /* Was a client pause executed during this Exec? */ @@ -2899,7 +2904,7 @@ int incrCommandStatsOnError(struct redisCommand *cmd, int flags); void call(client *c, int flags); void alsoPropagate(int dbid, robj **argv, int argc, int target); void propagatePendingCommands(); -void redisOpArrayInit(redisOpArray *oa); +void redisOpArrayReset(redisOpArray *oa); void redisOpArrayFree(redisOpArray *oa); void forceCommandPropagation(client *c, int flags); void preventCommandPropagation(client *c); diff --git a/src/t_set.c b/src/t_set.c index 3fb9f5259..d07f2ced9 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -490,11 +490,20 @@ void spopWithCountCommand(client *c) { /* Delete the set as it is now empty */ dbDelete(c->db,c->argv[1]); + + /* todo: Move the spop notification to be executed after the command logic. + * We can then decide if we want to keep the `alsoPropagate` or move to `rewriteClientCommandVector`. */ + + /* Propagate del command */ + robj *propagate[2]; + propagate[0] = shared.del; + propagate[1] = c->argv[1]; + alsoPropagate(c->db->id,propagate,2,PROPAGATE_AOF|PROPAGATE_REPL); + notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id); - /* Propagate this command as a DEL operation */ - rewriteClientCommandVector(c,2,shared.del,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]); + preventCommandPropagation(c); return; } diff --git a/tests/modules/keyspace_events.c b/tests/modules/keyspace_events.c index d394786b3..44de09979 100644 --- a/tests/modules/keyspace_events.c +++ b/tests/modules/keyspace_events.c @@ -101,6 +101,26 @@ static int KeySpace_NotificationExpired(RedisModuleCtx *ctx, int type, const cha return REDISMODULE_OK; } +/* This key miss notification handler is performing a write command inside the notification callback. + * Notice, it is discourage and currently wrong to perform a write command inside key miss event. + * It can cause read commands to be replicated to the replica/aof. This test is here temporary (for coverage and + * verification that it's not crashing). */ +static int KeySpace_NotificationModuleKeyMiss(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) { + REDISMODULE_NOT_USED(type); + REDISMODULE_NOT_USED(event); + REDISMODULE_NOT_USED(key); + + int flags = RedisModule_GetContextFlags(ctx); + if (!(flags & REDISMODULE_CTX_FLAGS_MASTER)) { + return REDISMODULE_OK; // ignore the event on replica + } + + RedisModuleCallReply* rep = RedisModule_Call(ctx, "incr", "!c", "missed"); + RedisModule_FreeCallReply(rep); + + return REDISMODULE_OK; +} + static int KeySpace_NotificationModule(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) { REDISMODULE_NOT_USED(ctx); REDISMODULE_NOT_USED(type); @@ -266,6 +286,10 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; } + if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_KEY_MISS, KeySpace_NotificationModuleKeyMiss) != REDISMODULE_OK){ + return REDISMODULE_ERR; + } + if (RedisModule_CreateCommand(ctx,"keyspace.notify", cmdNotify,"",0,0,0) == REDISMODULE_ERR){ return REDISMODULE_ERR; } diff --git a/tests/unit/expire.tcl b/tests/unit/expire.tcl index d85d59e7a..8e4954a34 100644 --- a/tests/unit/expire.tcl +++ b/tests/unit/expire.tcl @@ -745,4 +745,27 @@ start_server {tags {"expire"}} { assert_equal [r EXPIRE none 100 GT] 0 assert_equal [r EXPIRE none 100 LT] 0 } {} + + test {Redis should not propagate the read command on lazy expire} { + r debug set-active-expire 0 + r flushall ; # Clean up keyspace to avoid interference by keys from other tests + r set foo bar PX 1 + set repl [attach_to_replication_stream] + wait_for_condition 50 100 { + [r get foo] eq {} + } else { + fail "Replication not started." + } + + # dummy command to verify nothing else gets into the replication stream. + r set x 1 + + assert_replication_stream $repl { + {select *} + {del foo} + {set x 1} + } + close_replication_stream $repl + assert_equal [r debug set-active-expire 1] {OK} + } {} {needs:debug} } diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl index 8b0e5045a..631b3ac91 100644 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -558,8 +558,8 @@ start_server {tags {"maxmemory" "external:skip"}} { } assert_replication_stream $repl { - {select *} {multi} + {select *} {incr x} {incr x} {exec} diff --git a/tests/unit/moduleapi/propagate.tcl b/tests/unit/moduleapi/propagate.tcl index 846d938fb..39189b715 100644 --- a/tests/unit/moduleapi/propagate.tcl +++ b/tests/unit/moduleapi/propagate.tcl @@ -45,10 +45,10 @@ tags "modules" { $master set x y assert_replication_stream $repl { - {select *} {multi} - {incr notifications} + {select *} {set x y} + {incr notifications} {exec} } close_replication_stream $repl @@ -63,12 +63,12 @@ tags "modules" { $master exec assert_replication_stream $repl { - {select *} {multi} - {incr notifications} + {select *} {set x1 y1} {incr notifications} {set x2 y2} + {incr notifications} {exec} } close_replication_stream $repl @@ -91,34 +91,40 @@ tags "modules" { # Note whenever there's double notification: SET with PX issues two separate # notifications: one for "set" and one for "expire" assert_replication_stream $repl { - {select *} {multi} + {select *} + {set asdf1 1 PXAT *} {incr notifications} {incr notifications} - {set asdf1 1 PXAT *} {exec} {multi} + {set asdf2 2 PXAT *} {incr notifications} {incr notifications} - {set asdf2 2 PXAT *} {exec} {multi} + {set asdf3 3 PXAT *} {incr notifications} {incr notifications} - {set asdf3 3 PXAT *} {exec} - {incr notifications} + {multi} + {del asdf*} {incr notifications} {incr testkeyspace:expired} - {del asdf*} {incr notifications} + {exec} + {multi} + {del asdf*} {incr notifications} {incr testkeyspace:expired} - {del asdf*} {incr notifications} + {exec} + {multi} + {del asdf*} {incr notifications} {incr testkeyspace:expired} - {del asdf*} + {incr notifications} + {exec} } close_replication_stream $repl @@ -143,8 +149,11 @@ tags "modules" { # Bottom line: "notifications" always exists and we can't really determine the order of evictions # This test is here only for sanity + # The replica will get the notification with multi exec and we have a generic notification handler + # that performs `RedisModule_Call(ctx, "INCR", "c", "multi");` if the notification is inside multi exec. + # so we will have 2 keys, "notifications" and "multi". wait_for_condition 500 10 { - [$replica dbsize] eq 1 + [$replica dbsize] eq 2 } else { fail "Not all keys have been evicted" } @@ -183,31 +192,37 @@ tags "modules" { # Note that although CONFIG SET maxmemory is called in this flow (see issue #10014), # eviction will happen and will not induce propagation of the CONFIG command (see #10019). assert_replication_stream $repl { - {select *} {multi} + {select *} + {set asdf1 1 PXAT *} {incr notifications} {incr notifications} - {set asdf1 1 PXAT *} {exec} {multi} + {set asdf2 2 PXAT *} {incr notifications} {incr notifications} - {set asdf2 2 PXAT *} {exec} {multi} + {set asdf3 3 PXAT *} {incr notifications} {incr notifications} - {set asdf3 3 PXAT *} {exec} - {incr notifications} + {multi} {del asdf*} {incr notifications} + {exec} + {multi} {del asdf*} {incr notifications} - {del asdf*} + {exec} {multi} + {del asdf*} {incr notifications} + {exec} + {multi} {set asdf4 4} + {incr notifications} {exec} } close_replication_stream $repl @@ -229,20 +244,24 @@ tags "modules" { } assert_replication_stream $repl { - {select *} {multi} + {select *} + {set timer-maxmemory-volatile-start 1 PXAT *} {incr notifications} {incr notifications} - {set timer-maxmemory-volatile-start 1 PXAT *} {incr timer-maxmemory-middle} + {set timer-maxmemory-volatile-end 1 PXAT *} {incr notifications} {incr notifications} - {set timer-maxmemory-volatile-end 1 PXAT *} {exec} - {incr notifications} + {multi} {del timer-maxmemory-volatile-*} {incr notifications} + {exec} + {multi} {del timer-maxmemory-volatile-*} + {incr notifications} + {exec} } close_replication_stream $repl @@ -256,15 +275,15 @@ tags "modules" { $master propagate-test.timer-eval assert_replication_stream $repl { - {select *} {multi} - {incr notifications} + {select *} {incrby timer-eval-start 1} {incr notifications} {set foo bar} - {incr timer-eval-middle} {incr notifications} + {incr timer-eval-middle} {incrby timer-eval-end 1} + {incr notifications} {exec} } close_replication_stream $repl @@ -282,8 +301,8 @@ tags "modules" { } assert_replication_stream $repl { - {select *} {multi} + {select *} {incrby timer-nested-start 1} {incrby timer-nested-end 1} {exec} @@ -307,16 +326,15 @@ tags "modules" { } assert_replication_stream $repl { - {select *} {multi} + {select *} {incrby timer-nested-start 1} - {incr notifications} {incr using-call} + {incr notifications} {incr counter-1} {incr counter-2} {incr counter-3} {incr counter-4} - {incr notifications} {incr after-call} {incr notifications} {incr before-call-2} @@ -328,6 +346,7 @@ tags "modules" { {incr after-call-2} {incr notifications} {incr timer-nested-middle} + {incr notifications} {incrby timer-nested-end 1} {exec} } @@ -350,23 +369,23 @@ tags "modules" { } assert_replication_stream $repl { - {select *} {multi} + {select *} {incr a-from-thread} - {incr notifications} {incr thread-call} + {incr notifications} {incr b-from-thread} {exec} {multi} {incr a-from-thread} - {incr notifications} {incr thread-call} + {incr notifications} {incr b-from-thread} {exec} {multi} {incr a-from-thread} - {incr notifications} {incr thread-call} + {incr notifications} {incr b-from-thread} {exec} } @@ -385,13 +404,13 @@ tags "modules" { } assert_replication_stream $repl { - {select *} {multi} + {select *} {incr thread-detached-before} - {incr notifications} {incr thread-detached-1} {incr notifications} {incr thread-detached-2} + {incr notifications} {incr thread-detached-after} {exec} } @@ -405,18 +424,18 @@ tags "modules" { $master propagate-test.mixed assert_replication_stream $repl { - {select *} {multi} + {select *} {incr counter-1} {incr counter-2} {exec} {multi} - {incr notifications} {incr using-call} + {incr notifications} {incr counter-1} {incr counter-2} - {incr notifications} {incr after-call} + {incr notifications} {exec} } close_replication_stream $repl @@ -431,18 +450,18 @@ tags "modules" { redis.call("propagate-test.mixed"); return "OK" } 0 ] {OK} assert_replication_stream $repl { - {select *} {multi} + {select *} {incr counter-1} {incr counter-2} - {incr notifications} {set x y} {incr notifications} {incr using-call} + {incr notifications} {incr counter-1} {incr counter-2} - {incr notifications} {incr after-call} + {incr notifications} {exec} } close_replication_stream $repl @@ -456,18 +475,18 @@ tags "modules" { $master propagate-test.mixed assert_replication_stream $repl { - {select *} {multi} + {select *} {incr counter-1} {incr counter-2} {exec} {multi} - {incr notifications} {incr using-call} + {incr notifications} {incr counter-1} {incr counter-2} - {incr notifications} {incr after-call} + {incr notifications} {exec} } close_replication_stream $repl @@ -482,18 +501,18 @@ tags "modules" { $master propagate-test.mixed assert_replication_stream $repl { - {select *} {multi} + {select *} {incr counter-1} {incr counter-2} {exec} {multi} - {incr notifications} {incr using-call} + {incr notifications} {incr counter-1} {incr counter-2} - {incr notifications} {incr after-call} + {incr notifications} {exec} } close_replication_stream $repl @@ -515,26 +534,25 @@ tags "modules" { } assert_replication_stream $repl { - {select *} {multi} + {select *} {incr counter-1} {incr counter-2} - {incr notifications} {incr using-call} + {incr notifications} {incr counter-1} {incr counter-2} - {incr notifications} {incr after-call} + {incr notifications} {exec} {multi} {incrby timer-nested-start 1} - {incr notifications} {incr using-call} + {incr notifications} {incr counter-1} {incr counter-2} {incr counter-3} {incr counter-4} - {incr notifications} {incr after-call} {incr notifications} {incr before-call-2} @@ -546,6 +564,7 @@ tags "modules" { {incr after-call-2} {incr notifications} {incr timer-nested-middle} + {incr notifications} {incrby timer-nested-end 1} {exec} } @@ -566,8 +585,8 @@ tags "modules" { $master propagate-test.incr k1 assert_replication_stream $repl { - {select *} {multi} + {select *} {del k1} {propagate-test.incr k1} {exec} @@ -580,6 +599,71 @@ tags "modules" { assert_equal [$replica ttl k1] -1 } + test {module notification on set} { + set repl [attach_to_replication_stream] + + $master SADD s foo + + wait_for_condition 500 10 { + [$replica SCARD s] eq "1" + } else { + fail "Failed to wait for set to be replicated" + } + + $master SPOP s 1 + + wait_for_condition 500 10 { + [$replica SCARD s] eq "0" + } else { + fail "Failed to wait for set to be replicated" + } + + # Currently the `del` command comes after the notification. + # When we fix spop to fire notification at the end (like all other commands), + # the `del` will come first. + assert_replication_stream $repl { + {multi} + {select *} + {sadd s foo} + {incr notifications} + {exec} + {multi} + {incr notifications} + {del s} + {incr notifications} + {exec} + } + close_replication_stream $repl + } + + test {module key miss notification do not cause read command to be replicated} { + set repl [attach_to_replication_stream] + + $master flushall + + $master get unexisting_key + + wait_for_condition 500 10 { + [$replica get missed] eq "1" + } else { + fail "Failed to wait for set to be replicated" + } + + # Test is checking a wrong!!! behavior that causes a read command to be replicated to replica/aof. + # We keep the test to verify that such a wrong behavior does not cause any crashes. + assert_replication_stream $repl { + {select *} + {flushall} + {multi} + {incr missed} + {incr notifications} + {get unexisting_key} + {exec} + } + + close_replication_stream $repl + } + test "Unload the module - propagate-test/testkeyspace" { assert_equal {OK} [r module unload propagate-test] assert_equal {OK} [r module unload testkeyspace] diff --git a/tests/unit/multi.tcl b/tests/unit/multi.tcl index 63d85d26b..26ab8e571 100644 --- a/tests/unit/multi.tcl +++ b/tests/unit/multi.tcl @@ -421,8 +421,8 @@ start_server {tags {"multi"}} { r exec assert_replication_stream $repl { - {select *} {multi} + {select *} {set foo{t} bar} {set foo2{t} bar2} {set foo3{t} bar3} @@ -446,8 +446,8 @@ start_server {tags {"multi"}} { r exec assert_replication_stream $repl { - {select *} {multi} + {select *} {set foo{t} bar} {select *} {set foo2{t} bar2} @@ -904,8 +904,8 @@ start_server {overrides {appendonly {yes} appendfilename {appendonly.aof} append r flushall r exec assert_aof_content $aof { - {select *} {multi} + {select *} {set *} {flushall} {exec} |