summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/db.c2
-rw-r--r--src/evict.c4
-rw-r--r--src/server.c140
-rw-r--r--src/server.h13
-rw-r--r--src/t_set.c15
-rw-r--r--tests/integration/replication.tcl31
-rw-r--r--tests/unit/moduleapi/propagate.tcl90
7 files changed, 110 insertions, 185 deletions
diff --git a/src/db.c b/src/db.c
index 04ab455fd..36de2aa0a 100644
--- a/src/db.c
+++ b/src/db.c
@@ -1557,9 +1557,9 @@ void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) {
dbSyncDelete(db,keyobj);
latencyEndMonitor(expire_latency);
latencyAddSampleIfNeeded("expire-del",expire_latency);
+ notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id);
signalModifiedKey(NULL, db, keyobj);
propagateDeletion(db,keyobj,server.lazyfree_lazy_expire);
- notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id);
server.stat_expiredkeys++;
}
diff --git a/src/evict.c b/src/evict.c
index 34f40b008..3c767a557 100644
--- a/src/evict.c
+++ b/src/evict.c
@@ -690,9 +690,9 @@ int performEvictions(void) {
mem_freed += delta;
server.stat_evictedkeys++;
signalModifiedKey(NULL,db,keyobj);
+ notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
+ keyobj, db->id);
propagateDeletion(db,keyobj,server.lazyfree_lazy_eviction);
- notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",keyobj,db->id);
- /* Propagate notification commandds and the del. */
propagatePendingCommands();
decrRefCount(keyobj);
keys_freed++;
diff --git a/src/server.c b/src/server.c
index b6a72ae88..a3f86225e 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2935,89 +2935,40 @@ void resetErrorTableStats(void) {
/* ========================== Redis OP Array API ============================ */
-/* Used to update a placeholder previously created using `redisOpArrayAppendPlaceholder`.
- * We assume the updated placeholder has no target set yet (otherwise its not a placeholder). */
-void redisOpArraySet(redisOpArray *oa, int dbid, robj **argv, int argc, int target, int index) {
- redisOp *op = oa->ops+index;
- serverAssert(!op->target);
- op->dbid = dbid;
- op->argv = argv;
- op->argc = argc;
- op->target = target;
- oa->numops++;
-}
-
-/* Append an operation to the given redisOpArray.
- * dbid - the id of the database to apply the operation on
- * argv - operations arguments (including the command)
- * argc - size of argv
- * target - indicating how to propagate the operation (PROPAGATE_AOF,PROPAGATE_REPL)
- *
- * Special case is when used with target 0, in this case the operation is a placeholder
- * that is expected to be filled later on using redisOpArraySet. */
int redisOpArrayAppend(redisOpArray *oa, int dbid, robj **argv, int argc, int target) {
redisOp *op;
int prev_capacity = oa->capacity;
- if (oa->used == 0) {
+ if (oa->numops == 0) {
oa->capacity = 16;
- } else if (oa->used >= oa->capacity) {
+ } else if (oa->numops >= oa->capacity) {
oa->capacity *= 2;
}
if (prev_capacity != oa->capacity)
oa->ops = zrealloc(oa->ops,sizeof(redisOp)*oa->capacity);
- int idx = oa->used;
- op = oa->ops+idx;
+ op = oa->ops+oa->numops;
op->dbid = dbid;
op->argv = argv;
op->argc = argc;
op->target = target;
- oa->used++;
- if (op->target) {
- /* if `target` is 0, the operation was added as a placeholder and is not yet counted as
- * a command that need to be propagated, this is why we only increase `numops` if `target` is not 0. */
- oa->numops++;
- }
- return idx; /* return the index of the appended operation */
-}
-
-void redisOpArrayTrim(redisOpArray *oa) {
- if (!oa->used) {
- /* nothing to trim */
- return;
- }
- redisOp *last_op = oa->ops + (oa->used - 1);
- if (!last_op->target) {
- /* last op is an unused placeholder, we can remove it */
- --oa->used;
- }
-}
-
-int redisOpArrayAppendPlaceholder(redisOpArray *oa) {
- return redisOpArrayAppend(oa, 0, NULL, 0, 0);
+ oa->numops++;
+ return oa->numops;
}
void redisOpArrayFree(redisOpArray *oa) {
- while(oa->used) {
+ while(oa->numops) {
int j;
redisOp *op;
- oa->used--;
- op = oa->ops+oa->used;
- if (!op->target) {
- continue;
- }
oa->numops--;
- for (j = 0; j < op->argc; j++) {
+ op = oa->ops+oa->numops;
+ for (j = 0; j < op->argc; j++)
decrRefCount(op->argv[j]);
- }
zfree(op->argv);
}
-
/* no need to free the actual op array, we reuse the memory for future commands */
serverAssert(!oa->numops);
- serverAssert(!oa->used);
}
/* ====================== Commands lookup and execution ===================== */
@@ -3164,29 +3115,6 @@ static void propagateNow(int dbid, robj **argv, int argc, int target) {
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
-/* Append or set the given operation to the replication buffer.
- * If index is -1, the operation will be appended to the end of the buffer.
- * Otherwise the operation will be set at the given index. */
-void alsoPropagateRaw(int dbid, robj **argv, int argc, int target, int index) {
- robj **argvcopy;
- int j;
-
- if (!shouldPropagate(target))
- return;
-
- argvcopy = zmalloc(sizeof(robj*)*argc);
- for (j = 0; j < argc; j++) {
- argvcopy[j] = argv[j];
- incrRefCount(argv[j]);
- }
- if (index == -1) {
- /* -1 means append to the end */
- redisOpArrayAppend(&server.also_propagate,dbid,argvcopy,argc,target);
- } else {
- redisOpArraySet(&server.also_propagate,dbid,argvcopy,argc,target,index);
- }
-}
-
/* Used inside commands to schedule the propagation of additional commands
* after the current command is propagated to AOF / Replication.
*
@@ -3199,7 +3127,18 @@ void alsoPropagateRaw(int dbid, robj **argv, int argc, int target, int index) {
* stack allocated). The function automatically increments ref count of
* passed objects, so the caller does not need to. */
void alsoPropagate(int dbid, robj **argv, int argc, int target) {
- alsoPropagateRaw(dbid, argv, argc, target, -1);
+ robj **argvcopy;
+ int j;
+
+ if (!shouldPropagate(target))
+ return;
+
+ argvcopy = zmalloc(sizeof(robj*)*argc);
+ for (j = 0; j < argc; j++) {
+ argvcopy[j] = argv[j];
+ incrRefCount(argv[j]);
+ }
+ redisOpArrayAppend(&server.also_propagate,dbid,argvcopy,argc,target);
}
/* It is possible to call the function forceCommandPropagation() inside a
@@ -3258,10 +3197,8 @@ void updateCommandLatencyHistogram(struct hdr_histogram **latency_histogram, int
* multiple separated commands. Note that alsoPropagate() is not affected
* by CLIENT_PREVENT_PROP flag. */
void propagatePendingCommands() {
- if (server.also_propagate.numops == 0) {
- redisOpArrayFree(&server.also_propagate);
+ if (server.also_propagate.numops == 0)
return;
- }
int j;
redisOp *rop;
@@ -3281,11 +3218,9 @@ void propagatePendingCommands() {
}
- for (j = 0; j < server.also_propagate.used; j++) {
+ for (j = 0; j < server.also_propagate.numops; j++) {
rop = &server.also_propagate.ops[j];
- if (!rop->target) {
- continue;
- }
+ serverAssert(rop->target);
propagateNow(rop->dbid,rop->argv,rop->argc,rop->target);
}
@@ -3400,24 +3335,6 @@ void call(client *c, int flags) {
if (monotonicGetType() == MONOTONIC_CLOCK_HW)
monotonic_start = getMonotonicUs();
- int cmd_prop_index = -1;
- if (c->cmd->flags & (CMD_WRITE | CMD_MAY_REPLICATE)) {
- /* Save placeholder for replication.
- * If the command will be replicated, this is the location where it should be placed in the replication stream.
- * If we just push the command to the replication buffer (without the placeholder), the replica might get the commands
- * in a wrong order and we will end up with master-replica inconsistency. To demonstrate, take the following example:
- *
- * 1. A module register a key space notification callback and inside the notification the module performed an incr command on the given key
- * 2. User performs 'set x 1'
- * 3. The module get the notification and perform 'incr x'
- * 4. The command 'incr x' enters the replication buffer before the 'set x 1' command and the replica sees the command in the wrong order
- *
- * The final result is that the replica will have the value x=1 while the master will have x=2
- *
- * Notice that we only do this if the command might cause replication (either it's a WRITE command or MAY_REPLICATE) */
- cmd_prop_index = redisOpArrayAppendPlaceholder(&server.also_propagate);
- }
-
server.in_nested_call++;
c->cmd->proc(c);
server.in_nested_call--;
@@ -3539,18 +3456,9 @@ void call(client *c, int flags) {
/* Call alsoPropagate() only if at least one of AOF / replication
* propagation is needed. */
if (propagate_flags != PROPAGATE_NONE)
- /* If we got here with cmd_prop_index == -1, the command will be added to the end
- * of the replication buffer, this can only happened on a read that causes a key
- * miss event which causes the module to perform a write command using RM_Call.
- * In such case we will propagate a read command (or a write command that has no effect
- * and should not have been propagated). This behavior is wrong and module writer is advised
- * not to perform any write commands on key miss event. */
- alsoPropagateRaw(c->db->id,c->argv,c->argc,propagate_flags,cmd_prop_index);
+ alsoPropagate(c->db->id,c->argv,c->argc,propagate_flags);
}
- /* Try to trim the last element if it is not used (if it's still a placeholder). */
- redisOpArrayTrim(&server.also_propagate);
-
/* Restore the old replication flags, since call() can be executed
* recursively. */
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
diff --git a/src/server.h b/src/server.h
index 8a0a2f5f6..491f7bad8 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1271,7 +1271,6 @@ extern clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUN
* after the propagation of the executed command. */
typedef struct redisOp {
robj **argv;
- /* target=0 means the operation should not be propagate (unused placeholder), for more info look at redisOpArray */
int argc, dbid, target;
} redisOp;
@@ -1283,14 +1282,9 @@ typedef struct redisOp {
* redisOpArrayFree();
*/
typedef struct redisOpArray {
- redisOp *ops; /* The array of operation to replicated */
- int numops; /* The actual number of operations in the array */
- int used; /* Spots that is used in the ops array, we need to
- differenced between the actual number of operations
- and the used spots because there might be spots
- that was saved as a placeholder for future command
- but was never actually used */
- int capacity; /* The ops array capacity */
+ redisOp *ops;
+ int numops;
+ int capacity;
} redisOpArray;
/* This structure is returned by the getMemoryOverheadData() function in
@@ -2892,7 +2886,6 @@ int incrCommandStatsOnError(struct redisCommand *cmd, int flags);
void call(client *c, int flags);
void alsoPropagate(int dbid, robj **argv, int argc, int target);
void propagatePendingCommands();
-void redisOpArrayReset(redisOpArray *oa);
void redisOpArrayFree(redisOpArray *oa);
void forceCommandPropagation(client *c, int flags);
void preventCommandPropagation(client *c);
diff --git a/src/t_set.c b/src/t_set.c
index d07f2ced9..8a669fe7c 100644
--- a/src/t_set.c
+++ b/src/t_set.c
@@ -490,20 +490,13 @@ void spopWithCountCommand(client *c) {
/* Delete the set as it is now empty */
dbDelete(c->db,c->argv[1]);
-
- /* todo: Move the spop notification to be executed after the command logic.
- * We can then decide if we want to keep the `alsoPropagate` or move to `rewriteClientCommandVector`. */
-
- /* Propagate del command */
- robj *propagate[2];
- propagate[0] = shared.del;
- propagate[1] = c->argv[1];
- alsoPropagate(c->db->id,propagate,2,PROPAGATE_AOF|PROPAGATE_REPL);
-
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
+ /* todo: Move the spop notification to be executed after the command logic. */
+
+ /* Propagate this command as a DEL operation */
+ rewriteClientCommandVector(c,2,shared.del,c->argv[1]);
signalModifiedKey(c,c->db,c->argv[1]);
- preventCommandPropagation(c);
return;
}
diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl
index 457c3150e..11849a5b6 100644
--- a/tests/integration/replication.tcl
+++ b/tests/integration/replication.tcl
@@ -1355,3 +1355,34 @@ start_server {tags {"repl" "external:skip"}} {
assert_equal "PONG" [r ping]
}
}
+
+start_server {tags {"repl external:skip"}} {
+ set master [srv 0 client]
+ set master_host [srv 0 host]
+ set master_port [srv 0 port]
+ $master debug SET-ACTIVE-EXPIRE 0
+ start_server {} {
+ set slave [srv 0 client]
+ $slave debug SET-ACTIVE-EXPIRE 0
+ $slave slaveof $master_host $master_port
+
+ test "Test replication with lazy expire" {
+ # wait for replication to be in sync
+ wait_for_condition 50 100 {
+ [lindex [$slave role] 0] eq {slave} &&
+ [string match {*master_link_status:up*} [$slave info replication]]
+ } else {
+ fail "Can't turn the instance into a replica"
+ }
+
+ $master sadd s foo
+ $master pexpire s 1
+ after 10
+ $master sadd s foo
+ assert_equal 1 [$master wait 1 1]
+
+ assert_equal "set" [$master type s]
+ assert_equal "set" [$slave type s]
+ }
+ }
+}
diff --git a/tests/unit/moduleapi/propagate.tcl b/tests/unit/moduleapi/propagate.tcl
index 39189b715..7bdc000ee 100644
--- a/tests/unit/moduleapi/propagate.tcl
+++ b/tests/unit/moduleapi/propagate.tcl
@@ -47,8 +47,8 @@ tags "modules" {
assert_replication_stream $repl {
{multi}
{select *}
- {set x y}
{incr notifications}
+ {set x y}
{exec}
}
close_replication_stream $repl
@@ -65,10 +65,10 @@ tags "modules" {
assert_replication_stream $repl {
{multi}
{select *}
+ {incr notifications}
{set x1 y1}
{incr notifications}
{set x2 y2}
- {incr notifications}
{exec}
}
close_replication_stream $repl
@@ -93,37 +93,37 @@ tags "modules" {
assert_replication_stream $repl {
{multi}
{select *}
- {set asdf1 1 PXAT *}
{incr notifications}
{incr notifications}
+ {set asdf1 1 PXAT *}
{exec}
{multi}
- {set asdf2 2 PXAT *}
{incr notifications}
{incr notifications}
+ {set asdf2 2 PXAT *}
{exec}
{multi}
- {set asdf3 3 PXAT *}
{incr notifications}
{incr notifications}
+ {set asdf3 3 PXAT *}
{exec}
{multi}
- {del asdf*}
{incr notifications}
- {incr testkeyspace:expired}
{incr notifications}
+ {incr testkeyspace:expired}
+ {del asdf*}
{exec}
{multi}
- {del asdf*}
{incr notifications}
- {incr testkeyspace:expired}
{incr notifications}
+ {incr testkeyspace:expired}
+ {del asdf*}
{exec}
{multi}
- {del asdf*}
{incr notifications}
- {incr testkeyspace:expired}
{incr notifications}
+ {incr testkeyspace:expired}
+ {del asdf*}
{exec}
}
close_replication_stream $repl
@@ -194,35 +194,35 @@ tags "modules" {
assert_replication_stream $repl {
{multi}
{select *}
- {set asdf1 1 PXAT *}
{incr notifications}
{incr notifications}
+ {set asdf1 1 PXAT *}
{exec}
{multi}
- {set asdf2 2 PXAT *}
{incr notifications}
{incr notifications}
+ {set asdf2 2 PXAT *}
{exec}
{multi}
- {set asdf3 3 PXAT *}
{incr notifications}
{incr notifications}
+ {set asdf3 3 PXAT *}
{exec}
{multi}
- {del asdf*}
{incr notifications}
+ {del asdf*}
{exec}
{multi}
- {del asdf*}
{incr notifications}
+ {del asdf*}
{exec}
{multi}
- {del asdf*}
{incr notifications}
+ {del asdf*}
{exec}
{multi}
- {set asdf4 4}
{incr notifications}
+ {set asdf4 4}
{exec}
}
close_replication_stream $repl
@@ -246,21 +246,21 @@ tags "modules" {
assert_replication_stream $repl {
{multi}
{select *}
- {set timer-maxmemory-volatile-start 1 PXAT *}
{incr notifications}
{incr notifications}
+ {set timer-maxmemory-volatile-start 1 PXAT *}
{incr timer-maxmemory-middle}
- {set timer-maxmemory-volatile-end 1 PXAT *}
{incr notifications}
{incr notifications}
+ {set timer-maxmemory-volatile-end 1 PXAT *}
{exec}
{multi}
- {del timer-maxmemory-volatile-*}
{incr notifications}
+ {del timer-maxmemory-volatile-*}
{exec}
{multi}
- {del timer-maxmemory-volatile-*}
{incr notifications}
+ {del timer-maxmemory-volatile-*}
{exec}
}
close_replication_stream $repl
@@ -277,13 +277,13 @@ tags "modules" {
assert_replication_stream $repl {
{multi}
{select *}
+ {incr notifications}
{incrby timer-eval-start 1}
{incr notifications}
{set foo bar}
- {incr notifications}
{incr timer-eval-middle}
- {incrby timer-eval-end 1}
{incr notifications}
+ {incrby timer-eval-end 1}
{exec}
}
close_replication_stream $repl
@@ -329,12 +329,13 @@ tags "modules" {
{multi}
{select *}
{incrby timer-nested-start 1}
- {incr using-call}
{incr notifications}
+ {incr using-call}
{incr counter-1}
{incr counter-2}
{incr counter-3}
{incr counter-4}
+ {incr notifications}
{incr after-call}
{incr notifications}
{incr before-call-2}
@@ -346,7 +347,6 @@ tags "modules" {
{incr after-call-2}
{incr notifications}
{incr timer-nested-middle}
- {incr notifications}
{incrby timer-nested-end 1}
{exec}
}
@@ -372,20 +372,20 @@ tags "modules" {
{multi}
{select *}
{incr a-from-thread}
- {incr thread-call}
{incr notifications}
+ {incr thread-call}
{incr b-from-thread}
{exec}
{multi}
{incr a-from-thread}
- {incr thread-call}
{incr notifications}
+ {incr thread-call}
{incr b-from-thread}
{exec}
{multi}
{incr a-from-thread}
- {incr thread-call}
{incr notifications}
+ {incr thread-call}
{incr b-from-thread}
{exec}
}
@@ -407,10 +407,10 @@ tags "modules" {
{multi}
{select *}
{incr thread-detached-before}
+ {incr notifications}
{incr thread-detached-1}
{incr notifications}
{incr thread-detached-2}
- {incr notifications}
{incr thread-detached-after}
{exec}
}
@@ -430,12 +430,12 @@ tags "modules" {
{incr counter-2}
{exec}
{multi}
- {incr using-call}
{incr notifications}
+ {incr using-call}
{incr counter-1}
{incr counter-2}
- {incr after-call}
{incr notifications}
+ {incr after-call}
{exec}
}
close_replication_stream $repl
@@ -454,14 +454,14 @@ tags "modules" {
{select *}
{incr counter-1}
{incr counter-2}
+ {incr notifications}
{set x y}
{incr notifications}
{incr using-call}
- {incr notifications}
{incr counter-1}
{incr counter-2}
- {incr after-call}
{incr notifications}
+ {incr after-call}
{exec}
}
close_replication_stream $repl
@@ -481,12 +481,12 @@ tags "modules" {
{incr counter-2}
{exec}
{multi}
- {incr using-call}
{incr notifications}
+ {incr using-call}
{incr counter-1}
{incr counter-2}
- {incr after-call}
{incr notifications}
+ {incr after-call}
{exec}
}
close_replication_stream $repl
@@ -507,12 +507,12 @@ tags "modules" {
{incr counter-2}
{exec}
{multi}
- {incr using-call}
{incr notifications}
+ {incr using-call}
{incr counter-1}
{incr counter-2}
- {incr after-call}
{incr notifications}
+ {incr after-call}
{exec}
}
close_replication_stream $repl
@@ -538,21 +538,22 @@ tags "modules" {
{select *}
{incr counter-1}
{incr counter-2}
- {incr using-call}
{incr notifications}
+ {incr using-call}
{incr counter-1}
{incr counter-2}
- {incr after-call}
{incr notifications}
+ {incr after-call}
{exec}
{multi}
{incrby timer-nested-start 1}
- {incr using-call}
{incr notifications}
+ {incr using-call}
{incr counter-1}
{incr counter-2}
{incr counter-3}
{incr counter-4}
+ {incr notifications}
{incr after-call}
{incr notifications}
{incr before-call-2}
@@ -564,7 +565,6 @@ tags "modules" {
{incr after-call-2}
{incr notifications}
{incr timer-nested-middle}
- {incr notifications}
{incrby timer-nested-end 1}
{exec}
}
@@ -624,13 +624,13 @@ tags "modules" {
assert_replication_stream $repl {
{multi}
{select *}
- {sadd s foo}
{incr notifications}
+ {sadd s foo}
{exec}
{multi}
{incr notifications}
- {del s}
{incr notifications}
+ {del s}
{exec}
}
close_replication_stream $repl
@@ -655,8 +655,8 @@ tags "modules" {
{select *}
{flushall}
{multi}
- {incr missed}
{incr notifications}
+ {incr missed}
{get unexisting_key}
{exec}
}