summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/redis.c23
-rw-r--r--src/redis.h4
-rw-r--r--src/t_set.c69
3 files changed, 54 insertions, 42 deletions
diff --git a/src/redis.c b/src/redis.c
index 6f5c40209..277d2e4b0 100644
--- a/src/redis.c
+++ b/src/redis.c
@@ -1529,6 +1529,7 @@ void initServerConfig(void) {
server.lpushCommand = lookupCommandByCString("lpush");
server.lpopCommand = lookupCommandByCString("lpop");
server.rpopCommand = lookupCommandByCString("rpop");
+ server.sremCommand = lookupCommandByCString("srem");
/* Slow log */
server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
@@ -2001,6 +2002,9 @@ struct redisCommand *lookupCommandOrOriginal(sds name) {
* + REDIS_PROPAGATE_NONE (no propagation of command at all)
* + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
* + REDIS_PROPAGATE_REPL (propagate into the replication link)
+ *
+ * This should not be used inside commands implementation. Use instead
+ * alsoPropagate(), preventCommandPropagation(), forceCommandPropagation().
*/
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
@@ -2027,6 +2031,13 @@ void forceCommandPropagation(redisClient *c, int flags) {
if (flags & REDIS_PROPAGATE_AOF) c->flags |= REDIS_FORCE_AOF;
}
+/* Avoid that the executed command is propagated at all. This way we
+ * are free to just propagate what we want using the alsoPropagate()
+ * API. */
+void preventCommandPropagation(redisClient *c) {
+ c->flags |= REDIS_PREVENT_PROP;
+}
+
/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
long long dirty, start, duration;
@@ -2080,7 +2091,7 @@ void call(redisClient *c, int flags) {
}
/* Propagate the command into the AOF and replication link */
- if (flags & REDIS_CALL_PROPAGATE) {
+ if (flags & REDIS_CALL_PROPAGATE && (c->flags & REDIS_PREVENT_PROP) == 0) {
int flags = REDIS_PROPAGATE_NONE;
if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;
@@ -2091,13 +2102,15 @@ void call(redisClient *c, int flags) {
propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
}
- /* Restore the old FORCE_AOF/REPL flags, since call can be executed
+ /* Restore the old replication flags, since call can be executed
* recursively. */
- c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
- c->flags |= client_old_flags & (REDIS_FORCE_AOF|REDIS_FORCE_REPL);
+ c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL|REDIS_PREVENT_PROP);
+ c->flags |= client_old_flags &
+ (REDIS_FORCE_AOF|REDIS_FORCE_REPL|REDIS_PREVENT_PROP);
/* Handle the alsoPropagate() API to handle commands that want to propagate
- * multiple separated commands. */
+ * multiple separated commands. Note that alsoPropagate() is not affected
+ * by REDIS_PREVENT_PROP flag. */
if (server.also_propagate.numops) {
int j;
redisOp *rop;
diff --git a/src/redis.h b/src/redis.h
index 2170c5d29..a675d4f12 100644
--- a/src/redis.h
+++ b/src/redis.h
@@ -257,6 +257,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define REDIS_PRE_PSYNC (1<<16) /* Instance don't understand PSYNC. */
#define REDIS_READONLY (1<<17) /* Cluster client is in read-only state. */
#define REDIS_PUBSUB (1<<18) /* Client is in Pub/Sub mode. */
+#define REDIS_PREVENT_PROP (1<<19) /* Don't propagate to AOF / Slaves. */
/* Client block type (btype field in client structure)
* if REDIS_BLOCKED flag is set. */
@@ -708,7 +709,7 @@ struct redisServer {
off_t loading_process_events_interval_bytes;
/* Fast pointers to often looked up command */
struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
- *rpopCommand;
+ *rpopCommand, *sremCommand;
/* Fields used only for stats */
time_t stat_starttime; /* Server start time */
long long stat_numcommands; /* Number of processed commands */
@@ -1252,6 +1253,7 @@ void call(redisClient *c, int flags);
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags);
void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int target);
void forceCommandPropagation(redisClient *c, int flags);
+void preventCommandPropagation(redisClient *c);
int prepareForShutdown();
#ifdef __GNUC__
void redisLog(int level, const char *fmt, ...)
diff --git a/src/t_set.c b/src/t_set.c
index f3f8bbaca..619b0f8a6 100644
--- a/src/t_set.c
+++ b/src/t_set.c
@@ -45,6 +45,11 @@ robj *setTypeCreate(robj *value) {
return createSetObject();
}
+/* Add the specified value into a set. The function takes care of incrementing
+ * the reference count of the object if needed in order to retain a copy.
+ *
+ * If the value was already member of the set, nothing is done and 0 is
+ * returned, otherwise the new element is added and 1 is returned. */
int setTypeAdd(robj *subject, robj *value) {
long long llval;
if (subject->encoding == REDIS_ENCODING_HT) {
@@ -487,7 +492,7 @@ void spopWithCountCommand(redisClient *c) {
long l;
unsigned long count, size;
unsigned long elements_returned;
- robj *set, *aux, *aux_set;
+ robj *set, *aux_set;
int64_t llele;
/* Get the count argument */
@@ -522,7 +527,6 @@ void spopWithCountCommand(redisClient *c) {
* The number of requested elements is greater than or equal to
* the number of elements inside the set: simply return the whole set. */
if (count >= size) {
-
/* We just return the entire set */
sunionDiffGenericCommand(c,c->argv+1,1,NULL,REDIS_OP_UNION);
@@ -531,10 +535,9 @@ void spopWithCountCommand(redisClient *c) {
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
/* Replicate/AOF this command as an SREM operation */
- aux = createStringObject("DEL",3);
- rewriteClientCommandVector(c,2,aux,c->argv[1]);
- decrRefCount(aux);
-
+ rewriteClientCommandVector(c,2,shared.del,c->argv[1]);
+ signalModifiedKey(c->db,c->argv[1]);
+ server.dirty++;
return;
}
@@ -544,9 +547,7 @@ void spopWithCountCommand(redisClient *c) {
/* We need an auxiliary set. Optimistically, we create a set using an
* Intset internally. */
- aux = createStringObjectFromLongLong(0);
- aux_set = setTypeCreate(aux);
- decrRefCount(aux);
+ aux_set = createIntsetObject();
/* Get the count requested of random elements from the set into our
* auxiliary set. */
@@ -555,47 +556,43 @@ void spopWithCountCommand(redisClient *c) {
{
setTypeIterator *si;
- robj *objele;
+ robj *objele, **propargv;
int element_encoding;
addReplyMultiBulkLen(c, elements_returned);
- /* Replicate/AOF this command as an SREM operation */
- aux = createStringObject("SREM",4);
-
si = setTypeInitIterator(aux_set);
while ((element_encoding = setTypeNext(si, &objele, &llele)) != -1) {
if (element_encoding == REDIS_ENCODING_HT) {
-
- addReplyBulk(c, objele);
-
- /* Replicate/AOF this command as an SREM commands */
- rewriteClientCommandVector(c, 3, aux, c->argv[1], objele);
- setTypeRemove(set, objele);
- }
- else if (element_encoding == REDIS_ENCODING_INTSET) {
- /* TODO: setTypeRemove() forces us to convert all of the ints
- * to string... isn't there a nicer way to do this? */
+ incrRefCount(objele);
+ } else if (element_encoding == REDIS_ENCODING_INTSET) {
objele = createStringObjectFromLongLong(llele);
- addReplyBulk(c, objele);
-
- /* Replicate/AOF this command as an SREM commands */
- rewriteClientCommandVector(c, 3, aux, c->argv[1], objele);
- setTypeRemove(set, objele);
-
- /* We created it, we kill it. */
- decrRefCount(objele);
- }
- else {
+ } else {
redisPanic("Unknown set encoding");
}
+ setTypeRemove(set, objele);
+ addReplyBulk(c, objele);
+
+ /* Replicate/AOF this command as an SREM operation */
+ propargv = zmalloc(sizeof(robj*)*3);
+ propargv[0] = createStringObject("SREM",4);
+ propargv[1] = c->argv[1];
+ incrRefCount(c->argv[1]);
+ propargv[2] = objele;
+ incrRefCount(objele);
+
+ alsoPropagate(server.sremCommand,c->db->id,propargv,3,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
+ decrRefCount(objele);
+ server.dirty++;
}
setTypeReleaseIterator(si);
-
- decrRefCount(aux);
}
- /* Free the auxiliary set - we need it no more. */
+ /* Don't propagate the command itself even if we incremented the
+ * dirty counter. We don't want to propagate an SPOP command since
+ * we propagated the command as a set of SREMs operations using
+ * the alsoPropagate() API. */
+ preventCommandPropagation(c);
decrRefCount(aux_set);
}