diff options
-rw-r--r-- | src/networking.c | 1 | ||||
-rw-r--r-- | src/redis.c | 3 | ||||
-rw-r--r-- | src/redis.h | 2 | ||||
-rw-r--r-- | src/replication.c | 122 |
4 files changed, 110 insertions, 18 deletions
diff --git a/src/networking.c b/src/networking.c index 1b1c91801..500df0d87 100644 --- a/src/networking.c +++ b/src/networking.c @@ -48,6 +48,7 @@ redisClient *createClient(int fd) { c->lastinteraction = time(NULL); c->authenticated = 0; c->replstate = REDIS_REPL_NONE; + c->slave_listening_port = 0; c->reply = listCreate(); c->reply_bytes = 0; listSetFreeMethod(c->reply,decrRefCount); diff --git a/src/redis.c b/src/redis.c index b13db8b87..3267744d2 100644 --- a/src/redis.c +++ b/src/redis.c @@ -173,6 +173,7 @@ struct redisCommand readonlyCommandTable[] = { {"exec",execCommand,1,REDIS_CMD_DENYOOM,execBlockClientOnSwappedKeys,0,0,0}, {"discard",discardCommand,1,0,NULL,0,0,0}, {"sync",syncCommand,1,0,NULL,0,0,0}, + {"replconf",replconfCommand,-1,0,NULL,0,0,0}, {"flushdb",flushdbCommand,1,0,NULL,0,0,0}, {"flushall",flushallCommand,1,0,NULL,0,0,0}, {"sort",sortCommand,-2,REDIS_CMD_DENYOOM,NULL,1,1,1}, @@ -1401,7 +1402,7 @@ sds genRedisInfoString(void) { } if (state == NULL) continue; info = sdscatprintf(info,"slave%d:%s,%d,%s\r\n", - slaveid,ip,port,state); + slaveid,ip,slave->slave_listening_port,state); slaveid++; } } diff --git a/src/redis.h b/src/redis.h index 86e78cac6..34b58d467 100644 --- a/src/redis.h +++ b/src/redis.h @@ -351,6 +351,7 @@ typedef struct redisClient { int repldbfd; /* replication DB file descriptor */ long repldboff; /* replication DB file offset */ off_t repldbsize; /* replication DB file size */ + int slave_listening_port; /* As configured with: SLAVECONF listening-port */ multiState mstate; /* MULTI/EXEC state */ blockingState bpop; /* blocking state */ list *io_keys; /* Keys this client is waiting to be loaded from the @@ -1071,6 +1072,7 @@ void watchCommand(redisClient *c); void unwatchCommand(redisClient *c); void objectCommand(redisClient *c); void clientCommand(redisClient *c); +void replconfCommand(redisClient *c); #if defined(__GNUC__) void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); diff --git a/src/replication.c b/src/replication.c index 80d2d26bd..884b0c499 100644 --- a/src/replication.c +++ b/src/replication.c @@ -139,6 +139,46 @@ void syncCommand(redisClient *c) { return; } +/* REPLCONF <option> <value> <option> <value> ... + * This command is used by a slave in order to configure the replication + * process before starting it with the SYNC command. + * + * Currently the only use of this command is to communicate to the master + * what is the listening port of the Slave redis instance, so that the + * master can accurately list slaves and their listening ports in + * the INFO output. + * + * In the future the same command can be used in order to configure + * the replication to initiate an incremental replication instead of a + * full resync. */ +void replconfCommand(redisClient *c) { + int j; + + if ((c->argc % 2) == 0) { + /* Number of arguments must be odd to make sure that every + * option has a corresponding value. */ + addReply(c,shared.syntaxerr); + return; + } + + /* Process every option-value pair. */ + for (j = 1; j < c->argc; j+=2) { + if (!strcasecmp(c->argv[j]->ptr,"listening-port")) { + long port; + + if ((getLongFromObjectOrReply(c,c->argv[j+1], + &port,NULL) != REDIS_OK)) + return; + c->slave_listening_port = port; + } else { + addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s", + (char*)c->argv[j]->ptr); + return; + } + } + addReply(c,shared.ok); +} + void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *slave = privdata; REDIS_NOTUSED(el); @@ -193,6 +233,52 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { } } +/* Send a synchronous command to the master. Used to send AUTH and + * REPLCONF commadns before starting the replication with SYNC. + * + * On success NULL is returned. + * On error an sds string describing the error is returned. + */ +char *sendSynchronousCommand(int fd, ...) { + va_list ap; + sds cmd = sdsempty(); + char *arg, buf[256]; + + /* Create the command to send to the master, we use simple inline + * protocol for simplicity as currently we only send simple strings. */ + va_start(ap,fd); + while(1) { + arg = va_arg(ap, char*); + if (arg == NULL) break; + + if (sdslen(cmd) != 0) cmd = sdscatlen(cmd," ",1); + cmd = sdscat(cmd,arg); + } + cmd = sdscatlen(cmd,"\r\n",2); + + /* Transfer command to the server. */ + if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) == -1) { + sdsfree(cmd); + return sdscatprintf(sdsempty(),"Writing to master: %s", + strerror(errno)); + } + sdsfree(cmd); + + /* Read the reply from the server. */ + if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout) == -1) + { + return sdscatprintf(sdsempty(),"Reading from master: %s", + strerror(errno)); + } + + /* Check for errors from the server. */ + if (buf[0] != '+') { + return sdscatprintf(sdsempty(),"Error from master: %s", buf); + } + + return NULL; /* No errors. */ +} + /* This function is called at the end of every backgrond saving. * The argument bgsaveerr is REDIS_OK if the background saving succeeded * otherwise REDIS_ERR is passed to the function. @@ -360,7 +446,7 @@ error: } void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { - char buf[1024], tmpfile[256]; + char tmpfile[256], *err; int dfd, maxtries = 5; REDIS_NOTUSED(el); REDIS_NOTUSED(privdata); @@ -381,24 +467,26 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { /* AUTH with the master if required. */ if(server.masterauth) { - char authcmd[1024]; - size_t authlen; - - authlen = snprintf(authcmd,sizeof(authcmd),"AUTH %s\r\n",server.masterauth); - if (syncWrite(fd,authcmd,authlen,server.repl_syncio_timeout) == -1) { - redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s", - strerror(errno)); - goto error; - } - /* Read the AUTH result. */ - if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout) == -1) { - redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s", - strerror(errno)); + err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL); + if (err) { + redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err); + sdsfree(err); goto error; } - if (buf[0] != '+') { - redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?"); - goto error; + } + + /* Set the slave port, so that Master's INFO command can list the + * slave listening port correctly. */ + { + sds port = sdsfromlonglong(server.port); + err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port, + NULL); + sdsfree(port); + /* Ignore the error if any, not all the Redis versions support + * REPLCONF listening-port. */ + if (err) { + redisLog(REDIS_NOTICE,"(non critical): Master does not understand REPLCONF listening-port: %s", err); + sdsfree(err); } } |