summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2013-05-27 19:34:02 +0200
committerantirez <antirez@gmail.com>2013-05-28 15:21:18 +0200
commit0bf64399713699c1ed423530edbdbbb0f259edd0 (patch)
treeb42c26b4a50467ce0a110c8a9b91732be1fb16a8
parentce05a8bb5b4cbc3a638d7b099f342c0dd1bde7d7 (diff)
downloadredis-ack.tar.gz
MINREPLICAS work in progress.ack
-rw-r--r--src/multi.c29
-rw-r--r--src/redis.c13
2 files changed, 42 insertions, 0 deletions
diff --git a/src/multi.c b/src/multi.c
index 7000c4841..833ff7e0a 100644
--- a/src/multi.c
+++ b/src/multi.c
@@ -35,6 +35,8 @@
void initClientMultiState(redisClient *c) {
c->mstate.commands = NULL;
c->mstate.count = 0;
+ c->mstate.minreplicas = 0;
+ c->mstate.minreplicas_timeout = 0;
}
/* Release all the resources associated with MULTI/EXEC state */
@@ -321,3 +323,30 @@ void unwatchCommand(redisClient *c) {
c->flags &= (~REDIS_DIRTY_CAS);
addReply(c,shared.ok);
}
+
+/* ------------------------- MINREPLICAS implementation --------------------- */
+
+/* MINREPLICAS <count> <timeout> */
+void minreplicasCommand(redisClient *c) {
+ long timeout, minreplicas;
+
+ if (!(c->flags & REDIS_MULTI)) {
+ addReplyError(c,"MINREPLICAS without MULTI");
+ return;
+ }
+ if (getLongFromObjectOrReply(c,object,&minreplicas,
+ "number of replicas is not an integer or out of range") != REDIS_OK)
+ return REDIS_ERR;
+
+ if (getLongFromObjectOrReply(c,object,&timeout,
+ "timeout is not an integer or out of range") != REDIS_OK)
+ return REDIS_ERR;
+
+ /* Force sane values. */
+ if (timeout < 0) timeout = 0;
+ if (minreplicas < 0) minreplicas = 0;
+
+ c->mstate.minreplicas = minreplicas;
+ c->mstate.minreplicas_timeout = timeout;
+ addReply(c,shared.ok);
+}
diff --git a/src/redis.c b/src/redis.c
index d68837500..767411758 100644
--- a/src/redis.c
+++ b/src/redis.c
@@ -221,6 +221,7 @@ struct redisCommand redisCommandTable[] = {
{"multi",multiCommand,1,"rs",0,NULL,0,0,0,0,0},
{"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0},
{"discard",discardCommand,1,"rs",0,NULL,0,0,0,0,0},
+ {"minreplicas",minreplicasCommand,3,"rs",0,NULL,0,0,0,0,0},
{"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
{"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0},
{"replconf",replconfCommand,-1,"ars",0,NULL,0,0,0,0,0},
@@ -1112,6 +1113,18 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
}
}
+ /* Request ACK to slaves if we have pending MINREPLICAS transactions. */
+ if (server.repl_request_ack) {
+ repl_request_ack = 0;
+ robj *argv[2];
+
+ argv[0] = createStringObject("REPLCONF",8);
+ argv[1] = createStrinbObject("SENDACK",7);
+ replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 2);
+ decrRefCount(ping_argv[0]);
+ decrRefCount(ping_argv[1]);
+ }
+
/* Write the AOF buffer on disk */
flushAppendOnlyFile(0);
}