diff options
-rw-r--r-- | src/db.c | 2 | ||||
-rw-r--r-- | src/evict.c | 4 | ||||
-rw-r--r-- | src/server.c | 140 | ||||
-rw-r--r-- | src/server.h | 13 | ||||
-rw-r--r-- | src/t_set.c | 15 | ||||
-rw-r--r-- | tests/integration/replication.tcl | 31 | ||||
-rw-r--r-- | tests/unit/moduleapi/propagate.tcl | 90 |
7 files changed, 110 insertions, 185 deletions
@@ -1557,9 +1557,9 @@ void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) { dbSyncDelete(db,keyobj); latencyEndMonitor(expire_latency); latencyAddSampleIfNeeded("expire-del",expire_latency); + notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id); signalModifiedKey(NULL, db, keyobj); propagateDeletion(db,keyobj,server.lazyfree_lazy_expire); - notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id); server.stat_expiredkeys++; } diff --git a/src/evict.c b/src/evict.c index 34f40b008..3c767a557 100644 --- a/src/evict.c +++ b/src/evict.c @@ -690,9 +690,9 @@ int performEvictions(void) { mem_freed += delta; server.stat_evictedkeys++; signalModifiedKey(NULL,db,keyobj); + notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted", + keyobj, db->id); propagateDeletion(db,keyobj,server.lazyfree_lazy_eviction); - notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",keyobj,db->id); - /* Propagate notification commandds and the del. */ propagatePendingCommands(); decrRefCount(keyobj); keys_freed++; diff --git a/src/server.c b/src/server.c index b6a72ae88..a3f86225e 100644 --- a/src/server.c +++ b/src/server.c @@ -2935,89 +2935,40 @@ void resetErrorTableStats(void) { /* ========================== Redis OP Array API ============================ */ -/* Used to update a placeholder previously created using `redisOpArrayAppendPlaceholder`. - * We assume the updated placeholder has no target set yet (otherwise its not a placeholder). */ -void redisOpArraySet(redisOpArray *oa, int dbid, robj **argv, int argc, int target, int index) { - redisOp *op = oa->ops+index; - serverAssert(!op->target); - op->dbid = dbid; - op->argv = argv; - op->argc = argc; - op->target = target; - oa->numops++; -} - -/* Append an operation to the given redisOpArray. - * dbid - the id of the database to apply the operation on - * argv - operations arguments (including the command) - * argc - size of argv - * target - indicating how to propagate the operation (PROPAGATE_AOF,PROPAGATE_REPL) - * - * Special case is when used with target 0, in this case the operation is a placeholder - * that is expected to be filled later on using redisOpArraySet. */ int redisOpArrayAppend(redisOpArray *oa, int dbid, robj **argv, int argc, int target) { redisOp *op; int prev_capacity = oa->capacity; - if (oa->used == 0) { + if (oa->numops == 0) { oa->capacity = 16; - } else if (oa->used >= oa->capacity) { + } else if (oa->numops >= oa->capacity) { oa->capacity *= 2; } if (prev_capacity != oa->capacity) oa->ops = zrealloc(oa->ops,sizeof(redisOp)*oa->capacity); - int idx = oa->used; - op = oa->ops+idx; + op = oa->ops+oa->numops; op->dbid = dbid; op->argv = argv; op->argc = argc; op->target = target; - oa->used++; - if (op->target) { - /* if `target` is 0, the operation was added as a placeholder and is not yet counted as - * a command that need to be propagated, this is why we only increase `numops` if `target` is not 0. */ - oa->numops++; - } - return idx; /* return the index of the appended operation */ -} - -void redisOpArrayTrim(redisOpArray *oa) { - if (!oa->used) { - /* nothing to trim */ - return; - } - redisOp *last_op = oa->ops + (oa->used - 1); - if (!last_op->target) { - /* last op is an unused placeholder, we can remove it */ - --oa->used; - } -} - -int redisOpArrayAppendPlaceholder(redisOpArray *oa) { - return redisOpArrayAppend(oa, 0, NULL, 0, 0); + oa->numops++; + return oa->numops; } void redisOpArrayFree(redisOpArray *oa) { - while(oa->used) { + while(oa->numops) { int j; redisOp *op; - oa->used--; - op = oa->ops+oa->used; - if (!op->target) { - continue; - } oa->numops--; - for (j = 0; j < op->argc; j++) { + op = oa->ops+oa->numops; + for (j = 0; j < op->argc; j++) decrRefCount(op->argv[j]); - } zfree(op->argv); } - /* no need to free the actual op array, we reuse the memory for future commands */ serverAssert(!oa->numops); - serverAssert(!oa->used); } /* ====================== Commands lookup and execution ===================== */ @@ -3164,29 +3115,6 @@ static void propagateNow(int dbid, robj **argv, int argc, int target) { replicationFeedSlaves(server.slaves,dbid,argv,argc); } -/* Append or set the given operation to the replication buffer. - * If index is -1, the operation will be appended to the end of the buffer. - * Otherwise the operation will be set at the given index. */ -void alsoPropagateRaw(int dbid, robj **argv, int argc, int target, int index) { - robj **argvcopy; - int j; - - if (!shouldPropagate(target)) - return; - - argvcopy = zmalloc(sizeof(robj*)*argc); - for (j = 0; j < argc; j++) { - argvcopy[j] = argv[j]; - incrRefCount(argv[j]); - } - if (index == -1) { - /* -1 means append to the end */ - redisOpArrayAppend(&server.also_propagate,dbid,argvcopy,argc,target); - } else { - redisOpArraySet(&server.also_propagate,dbid,argvcopy,argc,target,index); - } -} - /* Used inside commands to schedule the propagation of additional commands * after the current command is propagated to AOF / Replication. * @@ -3199,7 +3127,18 @@ void alsoPropagateRaw(int dbid, robj **argv, int argc, int target, int index) { * stack allocated). The function automatically increments ref count of * passed objects, so the caller does not need to. */ void alsoPropagate(int dbid, robj **argv, int argc, int target) { - alsoPropagateRaw(dbid, argv, argc, target, -1); + robj **argvcopy; + int j; + + if (!shouldPropagate(target)) + return; + + argvcopy = zmalloc(sizeof(robj*)*argc); + for (j = 0; j < argc; j++) { + argvcopy[j] = argv[j]; + incrRefCount(argv[j]); + } + redisOpArrayAppend(&server.also_propagate,dbid,argvcopy,argc,target); } /* It is possible to call the function forceCommandPropagation() inside a @@ -3258,10 +3197,8 @@ void updateCommandLatencyHistogram(struct hdr_histogram **latency_histogram, int * multiple separated commands. Note that alsoPropagate() is not affected * by CLIENT_PREVENT_PROP flag. */ void propagatePendingCommands() { - if (server.also_propagate.numops == 0) { - redisOpArrayFree(&server.also_propagate); + if (server.also_propagate.numops == 0) return; - } int j; redisOp *rop; @@ -3281,11 +3218,9 @@ void propagatePendingCommands() { } - for (j = 0; j < server.also_propagate.used; j++) { + for (j = 0; j < server.also_propagate.numops; j++) { rop = &server.also_propagate.ops[j]; - if (!rop->target) { - continue; - } + serverAssert(rop->target); propagateNow(rop->dbid,rop->argv,rop->argc,rop->target); } @@ -3400,24 +3335,6 @@ void call(client *c, int flags) { if (monotonicGetType() == MONOTONIC_CLOCK_HW) monotonic_start = getMonotonicUs(); - int cmd_prop_index = -1; - if (c->cmd->flags & (CMD_WRITE | CMD_MAY_REPLICATE)) { - /* Save placeholder for replication. - * If the command will be replicated, this is the location where it should be placed in the replication stream. - * If we just push the command to the replication buffer (without the placeholder), the replica might get the commands - * in a wrong order and we will end up with master-replica inconsistency. To demonstrate, take the following example: - * - * 1. A module register a key space notification callback and inside the notification the module performed an incr command on the given key - * 2. User performs 'set x 1' - * 3. The module get the notification and perform 'incr x' - * 4. The command 'incr x' enters the replication buffer before the 'set x 1' command and the replica sees the command in the wrong order - * - * The final result is that the replica will have the value x=1 while the master will have x=2 - * - * Notice that we only do this if the command might cause replication (either it's a WRITE command or MAY_REPLICATE) */ - cmd_prop_index = redisOpArrayAppendPlaceholder(&server.also_propagate); - } - server.in_nested_call++; c->cmd->proc(c); server.in_nested_call--; @@ -3539,18 +3456,9 @@ void call(client *c, int flags) { /* Call alsoPropagate() only if at least one of AOF / replication * propagation is needed. */ if (propagate_flags != PROPAGATE_NONE) - /* If we got here with cmd_prop_index == -1, the command will be added to the end - * of the replication buffer, this can only happened on a read that causes a key - * miss event which causes the module to perform a write command using RM_Call. - * In such case we will propagate a read command (or a write command that has no effect - * and should not have been propagated). This behavior is wrong and module writer is advised - * not to perform any write commands on key miss event. */ - alsoPropagateRaw(c->db->id,c->argv,c->argc,propagate_flags,cmd_prop_index); + alsoPropagate(c->db->id,c->argv,c->argc,propagate_flags); } - /* Try to trim the last element if it is not used (if it's still a placeholder). */ - redisOpArrayTrim(&server.also_propagate); - /* Restore the old replication flags, since call() can be executed * recursively. */ c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP); diff --git a/src/server.h b/src/server.h index 8a0a2f5f6..491f7bad8 100644 --- a/src/server.h +++ b/src/server.h @@ -1271,7 +1271,6 @@ extern clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUN * after the propagation of the executed command. */ typedef struct redisOp { robj **argv; - /* target=0 means the operation should not be propagate (unused placeholder), for more info look at redisOpArray */ int argc, dbid, target; } redisOp; @@ -1283,14 +1282,9 @@ typedef struct redisOp { * redisOpArrayFree(); */ typedef struct redisOpArray { - redisOp *ops; /* The array of operation to replicated */ - int numops; /* The actual number of operations in the array */ - int used; /* Spots that is used in the ops array, we need to - differenced between the actual number of operations - and the used spots because there might be spots - that was saved as a placeholder for future command - but was never actually used */ - int capacity; /* The ops array capacity */ + redisOp *ops; + int numops; + int capacity; } redisOpArray; /* This structure is returned by the getMemoryOverheadData() function in @@ -2892,7 +2886,6 @@ int incrCommandStatsOnError(struct redisCommand *cmd, int flags); void call(client *c, int flags); void alsoPropagate(int dbid, robj **argv, int argc, int target); void propagatePendingCommands(); -void redisOpArrayReset(redisOpArray *oa); void redisOpArrayFree(redisOpArray *oa); void forceCommandPropagation(client *c, int flags); void preventCommandPropagation(client *c); diff --git a/src/t_set.c b/src/t_set.c index d07f2ced9..8a669fe7c 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -490,20 +490,13 @@ void spopWithCountCommand(client *c) { /* Delete the set as it is now empty */ dbDelete(c->db,c->argv[1]); - - /* todo: Move the spop notification to be executed after the command logic. - * We can then decide if we want to keep the `alsoPropagate` or move to `rewriteClientCommandVector`. */ - - /* Propagate del command */ - robj *propagate[2]; - propagate[0] = shared.del; - propagate[1] = c->argv[1]; - alsoPropagate(c->db->id,propagate,2,PROPAGATE_AOF|PROPAGATE_REPL); - notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id); + /* todo: Move the spop notification to be executed after the command logic. */ + + /* Propagate this command as a DEL operation */ + rewriteClientCommandVector(c,2,shared.del,c->argv[1]); signalModifiedKey(c,c->db,c->argv[1]); - preventCommandPropagation(c); return; } diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 457c3150e..11849a5b6 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -1355,3 +1355,34 @@ start_server {tags {"repl" "external:skip"}} { assert_equal "PONG" [r ping] } } + +start_server {tags {"repl external:skip"}} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + $master debug SET-ACTIVE-EXPIRE 0 + start_server {} { + set slave [srv 0 client] + $slave debug SET-ACTIVE-EXPIRE 0 + $slave slaveof $master_host $master_port + + test "Test replication with lazy expire" { + # wait for replication to be in sync + wait_for_condition 50 100 { + [lindex [$slave role] 0] eq {slave} && + [string match {*master_link_status:up*} [$slave info replication]] + } else { + fail "Can't turn the instance into a replica" + } + + $master sadd s foo + $master pexpire s 1 + after 10 + $master sadd s foo + assert_equal 1 [$master wait 1 1] + + assert_equal "set" [$master type s] + assert_equal "set" [$slave type s] + } + } +} diff --git a/tests/unit/moduleapi/propagate.tcl b/tests/unit/moduleapi/propagate.tcl index 39189b715..7bdc000ee 100644 --- a/tests/unit/moduleapi/propagate.tcl +++ b/tests/unit/moduleapi/propagate.tcl @@ -47,8 +47,8 @@ tags "modules" { assert_replication_stream $repl { {multi} {select *} - {set x y} {incr notifications} + {set x y} {exec} } close_replication_stream $repl @@ -65,10 +65,10 @@ tags "modules" { assert_replication_stream $repl { {multi} {select *} + {incr notifications} {set x1 y1} {incr notifications} {set x2 y2} - {incr notifications} {exec} } close_replication_stream $repl @@ -93,37 +93,37 @@ tags "modules" { assert_replication_stream $repl { {multi} {select *} - {set asdf1 1 PXAT *} {incr notifications} {incr notifications} + {set asdf1 1 PXAT *} {exec} {multi} - {set asdf2 2 PXAT *} {incr notifications} {incr notifications} + {set asdf2 2 PXAT *} {exec} {multi} - {set asdf3 3 PXAT *} {incr notifications} {incr notifications} + {set asdf3 3 PXAT *} {exec} {multi} - {del asdf*} {incr notifications} - {incr testkeyspace:expired} {incr notifications} + {incr testkeyspace:expired} + {del asdf*} {exec} {multi} - {del asdf*} {incr notifications} - {incr testkeyspace:expired} {incr notifications} + {incr testkeyspace:expired} + {del asdf*} {exec} {multi} - {del asdf*} {incr notifications} - {incr testkeyspace:expired} {incr notifications} + {incr testkeyspace:expired} + {del asdf*} {exec} } close_replication_stream $repl @@ -194,35 +194,35 @@ tags "modules" { assert_replication_stream $repl { {multi} {select *} - {set asdf1 1 PXAT *} {incr notifications} {incr notifications} + {set asdf1 1 PXAT *} {exec} {multi} - {set asdf2 2 PXAT *} {incr notifications} {incr notifications} + {set asdf2 2 PXAT *} {exec} {multi} - {set asdf3 3 PXAT *} {incr notifications} {incr notifications} + {set asdf3 3 PXAT *} {exec} {multi} - {del asdf*} {incr notifications} + {del asdf*} {exec} {multi} - {del asdf*} {incr notifications} + {del asdf*} {exec} {multi} - {del asdf*} {incr notifications} + {del asdf*} {exec} {multi} - {set asdf4 4} {incr notifications} + {set asdf4 4} {exec} } close_replication_stream $repl @@ -246,21 +246,21 @@ tags "modules" { assert_replication_stream $repl { {multi} {select *} - {set timer-maxmemory-volatile-start 1 PXAT *} {incr notifications} {incr notifications} + {set timer-maxmemory-volatile-start 1 PXAT *} {incr timer-maxmemory-middle} - {set timer-maxmemory-volatile-end 1 PXAT *} {incr notifications} {incr notifications} + {set timer-maxmemory-volatile-end 1 PXAT *} {exec} {multi} - {del timer-maxmemory-volatile-*} {incr notifications} + {del timer-maxmemory-volatile-*} {exec} {multi} - {del timer-maxmemory-volatile-*} {incr notifications} + {del timer-maxmemory-volatile-*} {exec} } close_replication_stream $repl @@ -277,13 +277,13 @@ tags "modules" { assert_replication_stream $repl { {multi} {select *} + {incr notifications} {incrby timer-eval-start 1} {incr notifications} {set foo bar} - {incr notifications} {incr timer-eval-middle} - {incrby timer-eval-end 1} {incr notifications} + {incrby timer-eval-end 1} {exec} } close_replication_stream $repl @@ -329,12 +329,13 @@ tags "modules" { {multi} {select *} {incrby timer-nested-start 1} - {incr using-call} {incr notifications} + {incr using-call} {incr counter-1} {incr counter-2} {incr counter-3} {incr counter-4} + {incr notifications} {incr after-call} {incr notifications} {incr before-call-2} @@ -346,7 +347,6 @@ tags "modules" { {incr after-call-2} {incr notifications} {incr timer-nested-middle} - {incr notifications} {incrby timer-nested-end 1} {exec} } @@ -372,20 +372,20 @@ tags "modules" { {multi} {select *} {incr a-from-thread} - {incr thread-call} {incr notifications} + {incr thread-call} {incr b-from-thread} {exec} {multi} {incr a-from-thread} - {incr thread-call} {incr notifications} + {incr thread-call} {incr b-from-thread} {exec} {multi} {incr a-from-thread} - {incr thread-call} {incr notifications} + {incr thread-call} {incr b-from-thread} {exec} } @@ -407,10 +407,10 @@ tags "modules" { {multi} {select *} {incr thread-detached-before} + {incr notifications} {incr thread-detached-1} {incr notifications} {incr thread-detached-2} - {incr notifications} {incr thread-detached-after} {exec} } @@ -430,12 +430,12 @@ tags "modules" { {incr counter-2} {exec} {multi} - {incr using-call} {incr notifications} + {incr using-call} {incr counter-1} {incr counter-2} - {incr after-call} {incr notifications} + {incr after-call} {exec} } close_replication_stream $repl @@ -454,14 +454,14 @@ tags "modules" { {select *} {incr counter-1} {incr counter-2} + {incr notifications} {set x y} {incr notifications} {incr using-call} - {incr notifications} {incr counter-1} {incr counter-2} - {incr after-call} {incr notifications} + {incr after-call} {exec} } close_replication_stream $repl @@ -481,12 +481,12 @@ tags "modules" { {incr counter-2} {exec} {multi} - {incr using-call} {incr notifications} + {incr using-call} {incr counter-1} {incr counter-2} - {incr after-call} {incr notifications} + {incr after-call} {exec} } close_replication_stream $repl @@ -507,12 +507,12 @@ tags "modules" { {incr counter-2} {exec} {multi} - {incr using-call} {incr notifications} + {incr using-call} {incr counter-1} {incr counter-2} - {incr after-call} {incr notifications} + {incr after-call} {exec} } close_replication_stream $repl @@ -538,21 +538,22 @@ tags "modules" { {select *} {incr counter-1} {incr counter-2} - {incr using-call} {incr notifications} + {incr using-call} {incr counter-1} {incr counter-2} - {incr after-call} {incr notifications} + {incr after-call} {exec} {multi} {incrby timer-nested-start 1} - {incr using-call} {incr notifications} + {incr using-call} {incr counter-1} {incr counter-2} {incr counter-3} {incr counter-4} + {incr notifications} {incr after-call} {incr notifications} {incr before-call-2} @@ -564,7 +565,6 @@ tags "modules" { {incr after-call-2} {incr notifications} {incr timer-nested-middle} - {incr notifications} {incrby timer-nested-end 1} {exec} } @@ -624,13 +624,13 @@ tags "modules" { assert_replication_stream $repl { {multi} {select *} - {sadd s foo} {incr notifications} + {sadd s foo} {exec} {multi} {incr notifications} - {del s} {incr notifications} + {del s} {exec} } close_replication_stream $repl @@ -655,8 +655,8 @@ tags "modules" { {select *} {flushall} {multi} - {incr missed} {incr notifications} + {incr missed} {get unexisting_key} {exec} } |