From 0bf64399713699c1ed423530edbdbbb0f259edd0 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 27 May 2013 19:34:02 +0200 Subject: MINREPLICAS work in progress. --- src/multi.c | 29 +++++++++++++++++++++++++++++ src/redis.c | 13 +++++++++++++ 2 files changed, 42 insertions(+) diff --git a/src/multi.c b/src/multi.c index 7000c4841..833ff7e0a 100644 --- a/src/multi.c +++ b/src/multi.c @@ -35,6 +35,8 @@ void initClientMultiState(redisClient *c) { c->mstate.commands = NULL; c->mstate.count = 0; + c->mstate.minreplicas = 0; + c->mstate.minreplicas_timeout = 0; } /* Release all the resources associated with MULTI/EXEC state */ @@ -321,3 +323,30 @@ void unwatchCommand(redisClient *c) { c->flags &= (~REDIS_DIRTY_CAS); addReply(c,shared.ok); } + +/* ------------------------- MINREPLICAS implementation --------------------- */ + +/* MINREPLICAS */ +void minreplicasCommand(redisClient *c) { + long timeout, minreplicas; + + if (!(c->flags & REDIS_MULTI)) { + addReplyError(c,"MINREPLICAS without MULTI"); + return; + } + if (getLongFromObjectOrReply(c,object,&minreplicas, + "number of replicas is not an integer or out of range") != REDIS_OK) + return REDIS_ERR; + + if (getLongFromObjectOrReply(c,object,&timeout, + "timeout is not an integer or out of range") != REDIS_OK) + return REDIS_ERR; + + /* Force sane values. */ + if (timeout < 0) timeout = 0; + if (minreplicas < 0) minreplicas = 0; + + c->mstate.minreplicas = minreplicas; + c->mstate.minreplicas_timeout = timeout; + addReply(c,shared.ok); +} diff --git a/src/redis.c b/src/redis.c index d68837500..767411758 100644 --- a/src/redis.c +++ b/src/redis.c @@ -221,6 +221,7 @@ struct redisCommand redisCommandTable[] = { {"multi",multiCommand,1,"rs",0,NULL,0,0,0,0,0}, {"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0}, {"discard",discardCommand,1,"rs",0,NULL,0,0,0,0,0}, + {"minreplicas",minreplicasCommand,3,"rs",0,NULL,0,0,0,0,0}, {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0}, {"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0}, {"replconf",replconfCommand,-1,"ars",0,NULL,0,0,0,0,0}, @@ -1112,6 +1113,18 @@ void beforeSleep(struct aeEventLoop *eventLoop) { } } + /* Request ACK to slaves if we have pending MINREPLICAS transactions. */ + if (server.repl_request_ack) { + repl_request_ack = 0; + robj *argv[2]; + + argv[0] = createStringObject("REPLCONF",8); + argv[1] = createStrinbObject("SENDACK",7); + replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 2); + decrRefCount(ping_argv[0]); + decrRefCount(ping_argv[1]); + } + /* Write the AOF buffer on disk */ flushAppendOnlyFile(0); } -- cgit v1.2.1