summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMeir Shpilraien (Spielrein) <meir@redis.com>2022-08-18 10:16:32 +0300
committerGitHub <noreply@github.com>2022-08-18 10:16:32 +0300
commit508a138885b33666923ab92720c8c3263dc5bd56 (patch)
treeb0b34980b578984589a0c64bb57898d5685b87be /src
parent6a9cc20d9429e0ce1d6bfca5a40b681b1665cdec (diff)
downloadredis-508a138885b33666923ab92720c8c3263dc5bd56.tar.gz
Fix replication inconsistency on modules that uses key space notifications (#10969)
Fix replication inconsistency on modules that uses key space notifications. ### The Problem In general, key space notifications are invoked after the command logic was executed (this is not always the case, we will discuss later about specific command that do not follow this rules). For example, the `set x 1` will trigger a `set` notification that will be invoked after the `set` logic was performed, so if the notification logic will try to fetch `x`, it will see the new data that was written. Consider the scenario on which the notification logic performs some write commands. for example, the notification logic increase some counter, `incr x{counter}`, indicating how many times `x` was changed. The logical order by which the logic was executed is has follow: ``` set x 1 incr x{counter} ``` The issue is that the `set x 1` command is added to the replication buffer at the end of the command invocation (specifically after the key space notification logic was invoked and performed the `incr` command). The replication/aof sees the commands in the wrong order: ``` incr x{counter} set x 1 ``` In this specific example the order is less important. But if, for example, the notification would have deleted `x` then we would end up with primary-replica inconsistency. ### The Solution Put the command that cause the notification in its rightful place. In the above example, the `set x 1` command logic was executed before the notification logic, so it should be added to the replication buffer before the commands that is invoked by the notification logic. To achieve this, without a major code refactoring, we save a placeholder in the replication buffer, when finishing invoking the command logic we check if the command need to be replicated, and if it does, we use the placeholder to add it to the replication buffer instead of appending it to the end. To be efficient and not allocating memory on each command to save the placeholder, the replication buffer array was modified to reuse memory (instead of allocating it each time we want to replicate commands). Also, to avoid saving a placeholder when not needed, we do it only for WRITE or MAY_REPLICATE commands. #### Additional Fixes * Expire and Eviction notifications: * Expire/Eviction logical order was to first perform the Expire/Eviction and then the notification logic. The replication buffer got this in the other way around (first notification effect and then the `del` command). The PR fixes this issue. * The notification effect and the `del` command was not wrap with `multi-exec` (if needed). The PR also fix this issue. * SPOP command: * On spop, the `spop` notification was fired before the command logic was executed. The change in this PR would have cause the replication order to be change (first `spop` command and then notification `logic`) although the logical order is first the notification logic and then the `spop` logic. The right fix would have been to move the notification to be fired after the command was executed (like all the other commands), but this can be considered a breaking change. To overcome this, the PR keeps the current behavior and changes the `spop` code to keep the right logical order when pushing commands to the replication buffer. Another PR will follow to fix the SPOP properly and match it to the other command (we split it to 2 separate PR's so it will be easy to cherry-pick this PR to 7.0 if we chose to). #### Unhanded Known Limitations * key miss event: * On key miss event, if a module performed some write command on the event (using `RM_Call`), the `dirty` counter would increase and the read command that cause the key miss event would be replicated to the replication and aof. This problem can also happened on a write command that open some keys but eventually decides not to perform any action. We decided not to handle this problem on this PR because the solution is complex and will cause additional risks in case we will want to cherry-pick this PR. We should decide if we want to handle it in future PR's. For now, modules writers is advice not to perform any write commands on key miss event. #### Testing * We already have tests to cover cases where a notification is invoking write commands that are also added to the replication buffer, the tests was modified to verify that the replica gets the command in the correct logical order. * Test was added to verify that `spop` behavior was kept unchanged. * Test was added to verify key miss event behave as expected. * Test was added to verify the changes do not break lazy expiration. #### Additional Changes * `propagateNow` function can accept a special dbid, -1, indicating not to replicate `select`. We use this to replicate `multi/exec` on `propagatePendingCommands` function. The side effect of this change is that now the `select` command will appear inside the `multi/exec` block on the replication stream (instead of outside of the `multi/exec` block). Tests was modified to match this new behavior.
Diffstat (limited to 'src')
-rw-r--r--src/aof.c12
-rw-r--r--src/db.c2
-rw-r--r--src/debug.c2
-rw-r--r--src/evict.c17
-rw-r--r--src/expire.c13
-rw-r--r--src/module.c6
-rw-r--r--src/replication.c6
-rw-r--r--src/server.c167
-rw-r--r--src/server.h15
-rw-r--r--src/t_set.c13
10 files changed, 186 insertions, 67 deletions
diff --git a/src/aof.c b/src/aof.c
index 1f0b824f1..52026ab24 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -1295,10 +1295,18 @@ sds genAofTimestampAnnotationIfNeeded(int force) {
return ts;
}
+/* Write the given command to the aof file.
+ * dictid - dictionary id the command should be applied to,
+ * this is used in order to decide if a `select` command
+ * should also be written to the aof. Value of -1 means
+ * to avoid writing `select` command in any case.
+ * argv - The command to write to the aof.
+ * argc - Number of values in argv
+ */
void feedAppendOnlyFile(int dictid, robj **argv, int argc) {
sds buf = sdsempty();
- serverAssert(dictid >= 0 && dictid < server.dbnum);
+ serverAssert(dictid == -1 || (dictid >= 0 && dictid < server.dbnum));
/* Feed timestamp if needed */
if (server.aof_timestamp_enabled) {
@@ -1311,7 +1319,7 @@ void feedAppendOnlyFile(int dictid, robj **argv, int argc) {
/* The DB this command was targeting is not the same as the last command
* we appended. To issue a SELECT command is needed. */
- if (dictid != server.aof_selected_db) {
+ if (dictid != -1 && dictid != server.aof_selected_db) {
char seldb[64];
snprintf(seldb,sizeof(seldb),"%d",dictid);
diff --git a/src/db.c b/src/db.c
index 36de2aa0a..04ab455fd 100644
--- a/src/db.c
+++ b/src/db.c
@@ -1557,9 +1557,9 @@ void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) {
dbSyncDelete(db,keyobj);
latencyEndMonitor(expire_latency);
latencyAddSampleIfNeeded("expire-del",expire_latency);
- notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id);
signalModifiedKey(NULL, db, keyobj);
propagateDeletion(db,keyobj,server.lazyfree_lazy_expire);
+ notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id);
server.stat_expiredkeys++;
}
diff --git a/src/debug.c b/src/debug.c
index dac7716ac..43375c101 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -852,7 +852,7 @@ NULL
server.aof_flush_sleep = atoi(c->argv[2]->ptr);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc >= 3) {
- replicationFeedSlaves(server.slaves, server.slaveseldb,
+ replicationFeedSlaves(server.slaves, -1,
c->argv + 2, c->argc - 2);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"error") && c->argc == 3) {
diff --git a/src/evict.c b/src/evict.c
index 6ee85ede5..34f40b008 100644
--- a/src/evict.c
+++ b/src/evict.c
@@ -574,7 +574,12 @@ int performEvictions(void) {
int prev_core_propagates = server.core_propagates;
serverAssert(server.also_propagate.numops == 0);
server.core_propagates = 1;
- server.propagate_no_multi = 1;
+
+ /* Increase nested call counter
+ * we add this in order to prevent any RM_Call that may exist
+ * in the notify CB to be propagated immediately.
+ * we want them in multi/exec with the DEL command */
+ server.in_nested_call++;
while (mem_freed < (long long)mem_tofree) {
int j, k, i;
@@ -685,9 +690,10 @@ int performEvictions(void) {
mem_freed += delta;
server.stat_evictedkeys++;
signalModifiedKey(NULL,db,keyobj);
- notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
- keyobj, db->id);
propagateDeletion(db,keyobj,server.lazyfree_lazy_eviction);
+ notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",keyobj,db->id);
+ /* Propagate notification commandds and the del. */
+ propagatePendingCommands();
decrRefCount(keyobj);
keys_freed++;
@@ -742,11 +748,8 @@ cant_free:
serverAssert(server.core_propagates); /* This function should not be re-entrant */
- /* Propagate all DELs */
- propagatePendingCommands();
-
server.core_propagates = prev_core_propagates;
- server.propagate_no_multi = 0;
+ server.in_nested_call--;
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("eviction-cycle",latency);
diff --git a/src/expire.c b/src/expire.c
index a6a40450e..0e737fce3 100644
--- a/src/expire.c
+++ b/src/expire.c
@@ -186,7 +186,7 @@ void activeExpireCycle(int type) {
* we're in cron */
serverAssert(server.also_propagate.numops == 0);
server.core_propagates = 1;
- server.propagate_no_multi = 1;
+ server.in_nested_call++;
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
/* Expired and checked in a single loop. */
@@ -264,7 +264,11 @@ void activeExpireCycle(int type) {
de = de->next;
ttl = dictGetSignedIntegerVal(e)-now;
- if (activeExpireCycleTryExpire(db,e,now)) expired++;
+ if (activeExpireCycleTryExpire(db,e,now)) {
+ expired++;
+ /* Propagate the DEL command */
+ propagatePendingCommands();
+ }
if (ttl > 0) {
/* We want the average TTL of keys yet
* not expired. */
@@ -310,11 +314,8 @@ void activeExpireCycle(int type) {
serverAssert(server.core_propagates); /* This function should not be re-entrant */
- /* Propagate all DELs */
- propagatePendingCommands();
-
server.core_propagates = 0;
- server.propagate_no_multi = 0;
+ server.in_nested_call--;
elapsed = ustime()-start;
server.stat_expire_cycle_time_used += elapsed;
diff --git a/src/module.c b/src/module.c
index d8897f36c..5db88703f 100644
--- a/src/module.c
+++ b/src/module.c
@@ -7795,6 +7795,12 @@ void moduleReleaseGIL(void) {
* - REDISMODULE_NOTIFY_STREAM: Stream events
* - REDISMODULE_NOTIFY_MODULE: Module types events
* - REDISMODULE_NOTIFY_KEYMISS: Key-miss events
+ * Notice, key-miss event is the only type
+ * of event that is fired from within a read command.
+ * Performing RM_Call with a write command from within
+ * this notification is wrong and discourage. It will
+ * cause the read command that trigger the event to be
+ * replicated to the AOF/Replica.
* - REDISMODULE_NOTIFY_ALL: All events (Excluding REDISMODULE_NOTIFY_KEYMISS)
* - REDISMODULE_NOTIFY_LOADED: A special notification available only for modules,
* indicates that the key was loaded from persistence.
diff --git a/src/replication.c b/src/replication.c
index faf159d7d..58ed10833 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -420,7 +420,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
char llstr[LONG_STR_SIZE];
/* In case we propagate a command that doesn't touch keys (PING, REPLCONF) we
- * pass dbid=server.slaveseldb which may be -1. */
+ * pass dbid=-1 that indicate there is no need to replicate `select` command. */
serverAssert(dictid == -1 || (dictid >= 0 && dictid < server.dbnum));
/* If the instance is not a top level master, return ASAP: we'll just proxy
@@ -442,7 +442,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
prepareReplicasToWrite();
/* Send SELECT command to every slave if needed. */
- if (server.slaveseldb != dictid) {
+ if (dictid != -1 && server.slaveseldb != dictid) {
robj *selectcmd;
/* For a few DBs we have pre-computed SELECT command. */
@@ -3615,7 +3615,7 @@ void replicationCron(void) {
if (!manual_failover_in_progress) {
ping_argv[0] = shared.ping;
- replicationFeedSlaves(server.slaves, server.slaveseldb,
+ replicationFeedSlaves(server.slaves, -1,
ping_argv, 1);
}
}
diff --git a/src/server.c b/src/server.c
index 093fe876c..4b8cfbad7 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1501,7 +1501,7 @@ static void sendGetackToReplicas(void) {
argv[0] = shared.replconf;
argv[1] = shared.getack;
argv[2] = shared.special_asterick; /* Not used argument. */
- replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
+ replicationFeedSlaves(server.slaves, -1, argv, 3);
}
extern int ProcessingEventsWhileBlocked;
@@ -2520,7 +2520,6 @@ void initServer(void) {
server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE;
server.busy_module_yield_reply = NULL;
server.core_propagates = 0;
- server.propagate_no_multi = 0;
server.module_ctx_nesting = 0;
server.client_pause_in_transaction = 0;
server.child_pid = -1;
@@ -2910,46 +2909,89 @@ void resetErrorTableStats(void) {
/* ========================== Redis OP Array API ============================ */
-void redisOpArrayInit(redisOpArray *oa) {
- oa->ops = NULL;
- oa->numops = 0;
- oa->capacity = 0;
+/* Used to update a placeholder previously created using `redisOpArrayAppendPlaceholder`.
+ * We assume the updated placeholder has no target set yet (otherwise its not a placeholder). */
+void redisOpArraySet(redisOpArray *oa, int dbid, robj **argv, int argc, int target, int index) {
+ redisOp *op = oa->ops+index;
+ serverAssert(!op->target);
+ op->dbid = dbid;
+ op->argv = argv;
+ op->argc = argc;
+ op->target = target;
+ oa->numops++;
}
+/* Append an operation to the given redisOpArray.
+ * dbid - the id of the database to apply the operation on
+ * argv - operations arguments (including the command)
+ * argc - size of argv
+ * target - indicating how to propagate the operation (PROPAGATE_AOF,PROPAGATE_REPL)
+ *
+ * Special case is when used with target 0, in this case the operation is a placeholder
+ * that is expected to be filled later on using redisOpArraySet. */
int redisOpArrayAppend(redisOpArray *oa, int dbid, robj **argv, int argc, int target) {
redisOp *op;
int prev_capacity = oa->capacity;
- if (oa->numops == 0) {
+ if (oa->used == 0) {
oa->capacity = 16;
- } else if (oa->numops >= oa->capacity) {
+ } else if (oa->used >= oa->capacity) {
oa->capacity *= 2;
}
if (prev_capacity != oa->capacity)
oa->ops = zrealloc(oa->ops,sizeof(redisOp)*oa->capacity);
- op = oa->ops+oa->numops;
+ int idx = oa->used;
+ op = oa->ops+idx;
op->dbid = dbid;
op->argv = argv;
op->argc = argc;
op->target = target;
- oa->numops++;
- return oa->numops;
+ oa->used++;
+ if (op->target) {
+ /* if `target` is 0, the operation was added as a placeholder and is not yet counted as
+ * a command that need to be propagated, this is why we only increase `numops` if `target` is not 0. */
+ oa->numops++;
+ }
+ return idx; /* return the index of the appended operation */
+}
+
+void redisOpArrayTrim(redisOpArray *oa) {
+ if (!oa->used) {
+ /* nothing to trim */
+ return;
+ }
+ redisOp *last_op = oa->ops + (oa->used - 1);
+ if (!last_op->target) {
+ /* last op is an unused placeholder, we can remove it */
+ --oa->used;
+ }
+}
+
+int redisOpArrayAppendPlaceholder(redisOpArray *oa) {
+ return redisOpArrayAppend(oa, 0, NULL, 0, 0);
}
void redisOpArrayFree(redisOpArray *oa) {
- while(oa->numops) {
+ while(oa->used) {
int j;
redisOp *op;
+ oa->used--;
+ op = oa->ops+oa->used;
+ if (!op->target) {
+ continue;
+ }
oa->numops--;
- op = oa->ops+oa->numops;
- for (j = 0; j < op->argc; j++)
+ for (j = 0; j < op->argc; j++) {
decrRefCount(op->argv[j]);
+ }
zfree(op->argv);
}
- zfree(oa->ops);
- redisOpArrayInit(oa);
+
+ /* no need to free the actual op array, we reuse the memory for future commands */
+ serverAssert(!oa->numops);
+ serverAssert(!oa->used);
}
/* ====================== Commands lookup and execution ===================== */
@@ -3078,6 +3120,9 @@ static int shouldPropagate(int target) {
* This is an internal low-level function and should not be called!
*
* The API for propagating commands is alsoPropagate().
+ *
+ * dbid value of -1 is saved to indicate that the called do not want
+ * to replicate SELECT for this command (used for database neutral commands).
*/
static void propagateNow(int dbid, robj **argv, int argc, int target) {
if (!shouldPropagate(target))
@@ -3093,6 +3138,29 @@ static void propagateNow(int dbid, robj **argv, int argc, int target) {
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
+/* Append or set the given operation to the replication buffer.
+ * If index is -1, the operation will be appended to the end of the buffer.
+ * Otherwise the operation will be set at the given index. */
+void alsoPropagateRaw(int dbid, robj **argv, int argc, int target, int index) {
+ robj **argvcopy;
+ int j;
+
+ if (!shouldPropagate(target))
+ return;
+
+ argvcopy = zmalloc(sizeof(robj*)*argc);
+ for (j = 0; j < argc; j++) {
+ argvcopy[j] = argv[j];
+ incrRefCount(argv[j]);
+ }
+ if (index == -1) {
+ /* -1 means append to the end */
+ redisOpArrayAppend(&server.also_propagate,dbid,argvcopy,argc,target);
+ } else {
+ redisOpArraySet(&server.also_propagate,dbid,argvcopy,argc,target,index);
+ }
+}
+
/* Used inside commands to schedule the propagation of additional commands
* after the current command is propagated to AOF / Replication.
*
@@ -3105,18 +3173,7 @@ static void propagateNow(int dbid, robj **argv, int argc, int target) {
* stack allocated). The function automatically increments ref count of
* passed objects, so the caller does not need to. */
void alsoPropagate(int dbid, robj **argv, int argc, int target) {
- robj **argvcopy;
- int j;
-
- if (!shouldPropagate(target))
- return;
-
- argvcopy = zmalloc(sizeof(robj*)*argc);
- for (j = 0; j < argc; j++) {
- argvcopy[j] = argv[j];
- incrRefCount(argv[j]);
- }
- redisOpArrayAppend(&server.also_propagate,dbid,argvcopy,argc,target);
+ alsoPropagateRaw(dbid, argv, argc, target, -1);
}
/* It is possible to call the function forceCommandPropagation() inside a
@@ -3175,8 +3232,10 @@ void updateCommandLatencyHistogram(struct hdr_histogram **latency_histogram, int
* multiple separated commands. Note that alsoPropagate() is not affected
* by CLIENT_PREVENT_PROP flag. */
void propagatePendingCommands() {
- if (server.also_propagate.numops == 0)
+ if (server.also_propagate.numops == 0) {
+ redisOpArrayFree(&server.also_propagate);
return;
+ }
int j;
redisOp *rop;
@@ -3188,24 +3247,25 @@ void propagatePendingCommands() {
*
* And if the array contains only one command, no need to
* wrap it, since the single command is atomic. */
- if (server.also_propagate.numops > 1 && !server.propagate_no_multi) {
- /* We use the first command-to-propagate to set the dbid for MULTI,
- * so that the SELECT will be propagated beforehand */
- int multi_dbid = server.also_propagate.ops[0].dbid;
- propagateNow(multi_dbid,&shared.multi,1,PROPAGATE_AOF|PROPAGATE_REPL);
+ if (server.also_propagate.numops > 1) {
+ /* We use dbid=-1 to indicate we do not want to replicate SELECT.
+ * It'll be inserted together with the next command (inside the MULTI) */
+ propagateNow(-1,&shared.multi,1,PROPAGATE_AOF|PROPAGATE_REPL);
multi_emitted = 1;
}
- for (j = 0; j < server.also_propagate.numops; j++) {
+
+ for (j = 0; j < server.also_propagate.used; j++) {
rop = &server.also_propagate.ops[j];
- serverAssert(rop->target);
+ if (!rop->target) {
+ continue;
+ }
propagateNow(rop->dbid,rop->argv,rop->argc,rop->target);
}
if (multi_emitted) {
- /* We take the dbid from last command so that propagateNow() won't inject another SELECT */
- int exec_dbid = server.also_propagate.ops[server.also_propagate.numops-1].dbid;
- propagateNow(exec_dbid,&shared.exec,1,PROPAGATE_AOF|PROPAGATE_REPL);
+ /* We use dbid=-1 to indicate we do not want to replicate select */
+ propagateNow(-1,&shared.exec,1,PROPAGATE_AOF|PROPAGATE_REPL);
}
redisOpArrayFree(&server.also_propagate);
@@ -3314,6 +3374,24 @@ void call(client *c, int flags) {
if (monotonicGetType() == MONOTONIC_CLOCK_HW)
monotonic_start = getMonotonicUs();
+ int cmd_prop_index = -1;
+ if (c->cmd->flags & (CMD_WRITE | CMD_MAY_REPLICATE)) {
+ /* Save placeholder for replication.
+ * If the command will be replicated, this is the location where it should be placed in the replication stream.
+ * If we just push the command to the replication buffer (without the placeholder), the replica might get the commands
+ * in a wrong order and we will end up with master-replica inconsistency. To demonstrate, take the following example:
+ *
+ * 1. A module register a key space notification callback and inside the notification the module performed an incr command on the given key
+ * 2. User performs 'set x 1'
+ * 3. The module get the notification and perform 'incr x'
+ * 4. The command 'incr x' enters the replication buffer before the 'set x 1' command and the replica sees the command in the wrong order
+ *
+ * The final result is that the replica will have the value x=1 while the master will have x=2
+ *
+ * Notice that we only do this if the command might cause replication (either it's a WRITE command or MAY_REPLICATE) */
+ cmd_prop_index = redisOpArrayAppendPlaceholder(&server.also_propagate);
+ }
+
server.in_nested_call++;
c->cmd->proc(c);
server.in_nested_call--;
@@ -3435,9 +3513,18 @@ void call(client *c, int flags) {
/* Call alsoPropagate() only if at least one of AOF / replication
* propagation is needed. */
if (propagate_flags != PROPAGATE_NONE)
- alsoPropagate(c->db->id,c->argv,c->argc,propagate_flags);
+ /* If we got here with cmd_prop_index == -1, the command will be added to the end
+ * of the replication buffer, this can only happened on a read that causes a key
+ * miss event which causes the module to perform a write command using RM_Call.
+ * In such case we will propagate a read command (or a write command that has no effect
+ * and should not have been propagated). This behavior is wrong and module writer is advised
+ * not to perform any write commands on key miss event. */
+ alsoPropagateRaw(c->db->id,c->argv,c->argc,propagate_flags,cmd_prop_index);
}
+ /* Try to trim the last element if it is not used (if it's still a placeholder). */
+ redisOpArrayTrim(&server.also_propagate);
+
/* Restore the old replication flags, since call() can be executed
* recursively. */
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
diff --git a/src/server.h b/src/server.h
index acb5bfd44..ebf7037e3 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1281,6 +1281,7 @@ extern clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUN
* after the propagation of the executed command. */
typedef struct redisOp {
robj **argv;
+ /* target=0 means the operation should not be propagate (unused placeholder), for more info look at redisOpArray */
int argc, dbid, target;
} redisOp;
@@ -1292,9 +1293,14 @@ typedef struct redisOp {
* redisOpArrayFree();
*/
typedef struct redisOpArray {
- redisOp *ops;
- int numops;
- int capacity;
+ redisOp *ops; /* The array of operation to replicated */
+ int numops; /* The actual number of operations in the array */
+ int used; /* Spots that is used in the ops array, we need to
+ differenced between the actual number of operations
+ and the used spots because there might be spots
+ that was saved as a placeholder for future command
+ but was never actually used */
+ int capacity; /* The ops array capacity */
} redisOpArray;
/* This structure is returned by the getMemoryOverheadData() function in
@@ -1485,7 +1491,6 @@ struct redisServer {
int busy_module_yield_flags; /* Are we inside a busy module? (triggered by RM_Yield). see BUSY_MODULE_YIELD_ flags. */
const char *busy_module_yield_reply; /* When non-null, we are inside RM_Yield. */
int core_propagates; /* Is the core (in oppose to the module subsystem) is in charge of calling propagatePendingCommands? */
- int propagate_no_multi; /* True if propagatePendingCommands should avoid wrapping command in MULTI/EXEC */
int module_ctx_nesting; /* moduleCreateContext() nesting level */
char *ignore_warnings; /* Config: warnings that should be ignored. */
int client_pause_in_transaction; /* Was a client pause executed during this Exec? */
@@ -2899,7 +2904,7 @@ int incrCommandStatsOnError(struct redisCommand *cmd, int flags);
void call(client *c, int flags);
void alsoPropagate(int dbid, robj **argv, int argc, int target);
void propagatePendingCommands();
-void redisOpArrayInit(redisOpArray *oa);
+void redisOpArrayReset(redisOpArray *oa);
void redisOpArrayFree(redisOpArray *oa);
void forceCommandPropagation(client *c, int flags);
void preventCommandPropagation(client *c);
diff --git a/src/t_set.c b/src/t_set.c
index 3fb9f5259..d07f2ced9 100644
--- a/src/t_set.c
+++ b/src/t_set.c
@@ -490,11 +490,20 @@ void spopWithCountCommand(client *c) {
/* Delete the set as it is now empty */
dbDelete(c->db,c->argv[1]);
+
+ /* todo: Move the spop notification to be executed after the command logic.
+ * We can then decide if we want to keep the `alsoPropagate` or move to `rewriteClientCommandVector`. */
+
+ /* Propagate del command */
+ robj *propagate[2];
+ propagate[0] = shared.del;
+ propagate[1] = c->argv[1];
+ alsoPropagate(c->db->id,propagate,2,PROPAGATE_AOF|PROPAGATE_REPL);
+
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
- /* Propagate this command as a DEL operation */
- rewriteClientCommandVector(c,2,shared.del,c->argv[1]);
signalModifiedKey(c,c->db,c->argv[1]);
+ preventCommandPropagation(c);
return;
}