diff options
author | antirez <antirez@gmail.com> | 2013-05-27 19:34:02 +0200 |
---|---|---|
committer | antirez <antirez@gmail.com> | 2013-05-28 15:21:18 +0200 |
commit | 0bf64399713699c1ed423530edbdbbb0f259edd0 (patch) | |
tree | b42c26b4a50467ce0a110c8a9b91732be1fb16a8 | |
parent | ce05a8bb5b4cbc3a638d7b099f342c0dd1bde7d7 (diff) | |
download | redis-0bf64399713699c1ed423530edbdbbb0f259edd0.tar.gz |
MINREPLICAS work in progress.ack
-rw-r--r-- | src/multi.c | 29 | ||||
-rw-r--r-- | src/redis.c | 13 |
2 files changed, 42 insertions, 0 deletions
diff --git a/src/multi.c b/src/multi.c index 7000c4841..833ff7e0a 100644 --- a/src/multi.c +++ b/src/multi.c @@ -35,6 +35,8 @@ void initClientMultiState(redisClient *c) { c->mstate.commands = NULL; c->mstate.count = 0; + c->mstate.minreplicas = 0; + c->mstate.minreplicas_timeout = 0; } /* Release all the resources associated with MULTI/EXEC state */ @@ -321,3 +323,30 @@ void unwatchCommand(redisClient *c) { c->flags &= (~REDIS_DIRTY_CAS); addReply(c,shared.ok); } + +/* ------------------------- MINREPLICAS implementation --------------------- */ + +/* MINREPLICAS <count> <timeout> */ +void minreplicasCommand(redisClient *c) { + long timeout, minreplicas; + + if (!(c->flags & REDIS_MULTI)) { + addReplyError(c,"MINREPLICAS without MULTI"); + return; + } + if (getLongFromObjectOrReply(c,object,&minreplicas, + "number of replicas is not an integer or out of range") != REDIS_OK) + return REDIS_ERR; + + if (getLongFromObjectOrReply(c,object,&timeout, + "timeout is not an integer or out of range") != REDIS_OK) + return REDIS_ERR; + + /* Force sane values. */ + if (timeout < 0) timeout = 0; + if (minreplicas < 0) minreplicas = 0; + + c->mstate.minreplicas = minreplicas; + c->mstate.minreplicas_timeout = timeout; + addReply(c,shared.ok); +} diff --git a/src/redis.c b/src/redis.c index d68837500..767411758 100644 --- a/src/redis.c +++ b/src/redis.c @@ -221,6 +221,7 @@ struct redisCommand redisCommandTable[] = { {"multi",multiCommand,1,"rs",0,NULL,0,0,0,0,0}, {"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0}, {"discard",discardCommand,1,"rs",0,NULL,0,0,0,0,0}, + {"minreplicas",minreplicasCommand,3,"rs",0,NULL,0,0,0,0,0}, {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0}, {"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0}, {"replconf",replconfCommand,-1,"ars",0,NULL,0,0,0,0,0}, @@ -1112,6 +1113,18 @@ void beforeSleep(struct aeEventLoop *eventLoop) { } } + /* Request ACK to slaves if we have pending MINREPLICAS transactions. */ + if (server.repl_request_ack) { + repl_request_ack = 0; + robj *argv[2]; + + argv[0] = createStringObject("REPLCONF",8); + argv[1] = createStrinbObject("SENDACK",7); + replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 2); + decrRefCount(ping_argv[0]); + decrRefCount(ping_argv[1]); + } + /* Write the AOF buffer on disk */ flushAppendOnlyFile(0); } |