summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2015-02-09 22:49:27 +0100
committerantirez <antirez@gmail.com>2015-02-09 22:49:32 +0100
commit359113cc222fcd47e8dcd424f0fa086538b2457f (patch)
tree0be012425026c22bcf88c8104af46b753f7635d0
parent56a21d2ae87fb040782ba572b2d4a5eac38fe965 (diff)
downloadredis-359113cc222fcd47e8dcd424f0fa086538b2457f.tar.gz
SPOP with count: initial fixes to the implementation.
Severan problems are addressed but still a few missing. Since replication of this command was more complex than others since it needs to replicate multiple SREM commands, an old API able to do this was reused (it was taken inside the implementation since it was pretty obvious soon or later that would be useful). The API was improved a bit so that now a command may opt-out for the standard command replication when the server.dirty counter is incremented, in order to "manually" replicate what it wants.
-rw-r--r--src/redis.c23
-rw-r--r--src/redis.h4
-rw-r--r--src/t_set.c69
3 files changed, 54 insertions, 42 deletions
diff --git a/src/redis.c b/src/redis.c
index 6f5c40209..277d2e4b0 100644
--- a/src/redis.c
+++ b/src/redis.c
@@ -1529,6 +1529,7 @@ void initServerConfig(void) {
server.lpushCommand = lookupCommandByCString("lpush");
server.lpopCommand = lookupCommandByCString("lpop");
server.rpopCommand = lookupCommandByCString("rpop");
+ server.sremCommand = lookupCommandByCString("srem");
/* Slow log */
server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
@@ -2001,6 +2002,9 @@ struct redisCommand *lookupCommandOrOriginal(sds name) {
* + REDIS_PROPAGATE_NONE (no propagation of command at all)
* + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
* + REDIS_PROPAGATE_REPL (propagate into the replication link)
+ *
+ * This should not be used inside commands implementation. Use instead
+ * alsoPropagate(), preventCommandPropagation(), forceCommandPropagation().
*/
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
@@ -2027,6 +2031,13 @@ void forceCommandPropagation(redisClient *c, int flags) {
if (flags & REDIS_PROPAGATE_AOF) c->flags |= REDIS_FORCE_AOF;
}
+/* Avoid that the executed command is propagated at all. This way we
+ * are free to just propagate what we want using the alsoPropagate()
+ * API. */
+void preventCommandPropagation(redisClient *c) {
+ c->flags |= REDIS_PREVENT_PROP;
+}
+
/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
long long dirty, start, duration;
@@ -2080,7 +2091,7 @@ void call(redisClient *c, int flags) {
}
/* Propagate the command into the AOF and replication link */
- if (flags & REDIS_CALL_PROPAGATE) {
+ if (flags & REDIS_CALL_PROPAGATE && (c->flags & REDIS_PREVENT_PROP) == 0) {
int flags = REDIS_PROPAGATE_NONE;
if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;
@@ -2091,13 +2102,15 @@ void call(redisClient *c, int flags) {
propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
}
- /* Restore the old FORCE_AOF/REPL flags, since call can be executed
+ /* Restore the old replication flags, since call can be executed
* recursively. */
- c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
- c->flags |= client_old_flags & (REDIS_FORCE_AOF|REDIS_FORCE_REPL);
+ c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL|REDIS_PREVENT_PROP);
+ c->flags |= client_old_flags &
+ (REDIS_FORCE_AOF|REDIS_FORCE_REPL|REDIS_PREVENT_PROP);
/* Handle the alsoPropagate() API to handle commands that want to propagate
- * multiple separated commands. */
+ * multiple separated commands. Note that alsoPropagate() is not affected
+ * by REDIS_PREVENT_PROP flag. */
if (server.also_propagate.numops) {
int j;
redisOp *rop;
diff --git a/src/redis.h b/src/redis.h
index 2170c5d29..a675d4f12 100644
--- a/src/redis.h
+++ b/src/redis.h
@@ -257,6 +257,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define REDIS_PRE_PSYNC (1<<16) /* Instance don't understand PSYNC. */
#define REDIS_READONLY (1<<17) /* Cluster client is in read-only state. */
#define REDIS_PUBSUB (1<<18) /* Client is in Pub/Sub mode. */
+#define REDIS_PREVENT_PROP (1<<19) /* Don't propagate to AOF / Slaves. */
/* Client block type (btype field in client structure)
* if REDIS_BLOCKED flag is set. */
@@ -708,7 +709,7 @@ struct redisServer {
off_t loading_process_events_interval_bytes;
/* Fast pointers to often looked up command */
struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
- *rpopCommand;
+ *rpopCommand, *sremCommand;
/* Fields used only for stats */
time_t stat_starttime; /* Server start time */
long long stat_numcommands; /* Number of processed commands */
@@ -1252,6 +1253,7 @@ void call(redisClient *c, int flags);
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags);
void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int target);
void forceCommandPropagation(redisClient *c, int flags);
+void preventCommandPropagation(redisClient *c);
int prepareForShutdown();
#ifdef __GNUC__
void redisLog(int level, const char *fmt, ...)
diff --git a/src/t_set.c b/src/t_set.c
index f3f8bbaca..619b0f8a6 100644
--- a/src/t_set.c
+++ b/src/t_set.c
@@ -45,6 +45,11 @@ robj *setTypeCreate(robj *value) {
return createSetObject();
}
+/* Add the specified value into a set. The function takes care of incrementing
+ * the reference count of the object if needed in order to retain a copy.
+ *
+ * If the value was already member of the set, nothing is done and 0 is
+ * returned, otherwise the new element is added and 1 is returned. */
int setTypeAdd(robj *subject, robj *value) {
long long llval;
if (subject->encoding == REDIS_ENCODING_HT) {
@@ -487,7 +492,7 @@ void spopWithCountCommand(redisClient *c) {
long l;
unsigned long count, size;
unsigned long elements_returned;
- robj *set, *aux, *aux_set;
+ robj *set, *aux_set;
int64_t llele;
/* Get the count argument */
@@ -522,7 +527,6 @@ void spopWithCountCommand(redisClient *c) {
* The number of requested elements is greater than or equal to
* the number of elements inside the set: simply return the whole set. */
if (count >= size) {
-
/* We just return the entire set */
sunionDiffGenericCommand(c,c->argv+1,1,NULL,REDIS_OP_UNION);
@@ -531,10 +535,9 @@ void spopWithCountCommand(redisClient *c) {
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
/* Replicate/AOF this command as an SREM operation */
- aux = createStringObject("DEL",3);
- rewriteClientCommandVector(c,2,aux,c->argv[1]);
- decrRefCount(aux);
-
+ rewriteClientCommandVector(c,2,shared.del,c->argv[1]);
+ signalModifiedKey(c->db,c->argv[1]);
+ server.dirty++;
return;
}
@@ -544,9 +547,7 @@ void spopWithCountCommand(redisClient *c) {
/* We need an auxiliary set. Optimistically, we create a set using an
* Intset internally. */
- aux = createStringObjectFromLongLong(0);
- aux_set = setTypeCreate(aux);
- decrRefCount(aux);
+ aux_set = createIntsetObject();
/* Get the count requested of random elements from the set into our
* auxiliary set. */
@@ -555,47 +556,43 @@ void spopWithCountCommand(redisClient *c) {
{
setTypeIterator *si;
- robj *objele;
+ robj *objele, **propargv;
int element_encoding;
addReplyMultiBulkLen(c, elements_returned);
- /* Replicate/AOF this command as an SREM operation */
- aux = createStringObject("SREM",4);
-
si = setTypeInitIterator(aux_set);
while ((element_encoding = setTypeNext(si, &objele, &llele)) != -1) {
if (element_encoding == REDIS_ENCODING_HT) {
-
- addReplyBulk(c, objele);
-
- /* Replicate/AOF this command as an SREM commands */
- rewriteClientCommandVector(c, 3, aux, c->argv[1], objele);
- setTypeRemove(set, objele);
- }
- else if (element_encoding == REDIS_ENCODING_INTSET) {
- /* TODO: setTypeRemove() forces us to convert all of the ints
- * to string... isn't there a nicer way to do this? */
+ incrRefCount(objele);
+ } else if (element_encoding == REDIS_ENCODING_INTSET) {
objele = createStringObjectFromLongLong(llele);
- addReplyBulk(c, objele);
-
- /* Replicate/AOF this command as an SREM commands */
- rewriteClientCommandVector(c, 3, aux, c->argv[1], objele);
- setTypeRemove(set, objele);
-
- /* We created it, we kill it. */
- decrRefCount(objele);
- }
- else {
+ } else {
redisPanic("Unknown set encoding");
}
+ setTypeRemove(set, objele);
+ addReplyBulk(c, objele);
+
+ /* Replicate/AOF this command as an SREM operation */
+ propargv = zmalloc(sizeof(robj*)*3);
+ propargv[0] = createStringObject("SREM",4);
+ propargv[1] = c->argv[1];
+ incrRefCount(c->argv[1]);
+ propargv[2] = objele;
+ incrRefCount(objele);
+
+ alsoPropagate(server.sremCommand,c->db->id,propargv,3,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
+ decrRefCount(objele);
+ server.dirty++;
}
setTypeReleaseIterator(si);
-
- decrRefCount(aux);
}
- /* Free the auxiliary set - we need it no more. */
+ /* Don't propagate the command itself even if we incremented the
+ * dirty counter. We don't want to propagate an SPOP command since
+ * we propagated the command as a set of SREMs operations using
+ * the alsoPropagate() API. */
+ preventCommandPropagation(c);
decrRefCount(aux_set);
}