summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2013-08-12 12:10:38 +0200
committerantirez <antirez@gmail.com>2013-08-12 12:50:29 +0200
commitdcc48a81436c08128a15f88170eee1287122e87a (patch)
treea96687efce0ecf26f95e0326dbed2f57a8881924
parentaa05128f51baf8063606770b608432e9ffd96981 (diff)
downloadredis-dcc48a81436c08128a15f88170eee1287122e87a.tar.gz
replicationFeedSlave() reworked for correctness and speed.
The previous code using a static buffer as an optimization was lame: 1) Premature optimization, actually it was *slower* than naive code because resulted into the creation / destruction of the object encapsulating the output buffer. 2) The code was very hard to test, since it was needed to have specific tests for command lines exceeding the size of the static buffer. 3) As a result of "2" the code was bugged as the current tests were not able to stress specific corner cases. It was replaced with easy to understand code that is safer and faster.
-rw-r--r--src/replication.c140
1 files changed, 42 insertions, 98 deletions
diff --git a/src/replication.c b/src/replication.c
index 487d3d4f9..e0f9646c3 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -138,15 +138,11 @@ void feedReplicationBacklogWithObject(robj *o) {
feedReplicationBacklog(p,len);
}
-#define FEEDSLAVE_BUF_SIZE (1024*64)
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
- int j, i, len;
- char buf[FEEDSLAVE_BUF_SIZE], *b = buf;
+ int j, len;
char llstr[REDIS_LONGSTR_SIZE];
- int buf_left = FEEDSLAVE_BUF_SIZE;
- robj *o;
/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP. */
@@ -155,117 +151,66 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
/* We can't have slaves attached and no backlog. */
redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
- /* What we do here is to try to write as much data as possible in a static
- * buffer "buf" that is used to create an object that is later sent to all
- * the slaves. This way we do the decoding only one time for most commands
- * not containing big payloads. */
-
- /* Create the SELECT command into the static buffer if needed. */
+ /* Send SELECT command to every slave if needed. */
if (server.slaveseldb != dictid) {
- char *selectcmd;
- size_t sclen;
+ robj *selectcmd;
+ /* For a few DBs we have pre-computed SELECT command. */
if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
- selectcmd = shared.select[dictid]->ptr;
- sclen = sdslen(selectcmd);
- memcpy(b,selectcmd,sclen);
- b += sclen;
- buf_left -= sclen;
+ selectcmd = shared.select[dictid];
} else {
int dictid_len;
dictid_len = ll2string(llstr,sizeof(llstr),dictid);
- sclen = snprintf(b,buf_left,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
- dictid_len, llstr);
- b += sclen;
- buf_left -= sclen;
+ selectcmd = createObject(REDIS_STRING,
+ sdscatprintf(sdsempty(),
+ "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
+ dictid_len, llstr));
}
+
+ /* Add the SELECT command into the backlog. */
+ if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
+
+ /* Send it to slaves. */
+ listRewind(slaves,&li);
+ while((ln = listNext(&li))) {
+ redisClient *slave = ln->value;
+ addReply(slave,selectcmd);
+ }
+
+ if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)
+ decrRefCount(selectcmd);
}
server.slaveseldb = dictid;
-
- /* Add the multi bulk reply size to the static buffer, that is, the number
- * of arguments of the command to send to every slave. */
- b[0] = '*';
- len = ll2string(b+1,REDIS_LONGSTR_SIZE,argc);
- b += len+1;
- buf_left -= len+1;
- b[0] = '\r';
- b[1] = '\n';
- b += 2;
- buf_left -= 2;
-
- /* Try to use the static buffer for as much arguments is possible. */
- for (j = 0; j < argc; j++) {
- int objlen;
- char *objptr;
- if (argv[j]->encoding != REDIS_ENCODING_RAW &&
- argv[j]->encoding != REDIS_ENCODING_INT &&
- argv[j]->encoding != REDIS_ENCODING_EMBSTR) {
- redisPanic("Unexpected encoding");
- }
- if (sdsEncodedObject(argv[j])) {
- objlen = sdslen(argv[j]->ptr);
- objptr = argv[j]->ptr;
- } else {
- objlen = ll2string(llstr,REDIS_LONGSTR_SIZE,(long)argv[j]->ptr);
- objptr = llstr;
- }
- /* We need enough space for bulk reply encoding, newlines, and
- * the data itself. */
- if (buf_left < objlen+REDIS_LONGSTR_SIZE+32) break;
-
- /* Write $...CRLF */
- b[0] = '$';
- len = ll2string(b+1,REDIS_LONGSTR_SIZE,objlen);
- b += len+1;
- buf_left -= len+1;
- b[0] = '\r';
- b[1] = '\n';
- b += 2;
- buf_left -= 2;
-
- /* And data plus CRLF */
- memcpy(b,objptr,objlen);
- b += objlen;
- buf_left -= objlen;
- b[0] = '\r';
- b[1] = '\n';
- b += 2;
- buf_left -= 2;
- }
-
- /* Create an object with the static buffer content. */
- redisAssert(buf_left < FEEDSLAVE_BUF_SIZE);
- o = createStringObject(buf,b-buf);
-
- /* If we have a backlog, populate it with data and increment
- * the global replication offset. */
+ /* Write the command to the replication backlog if any. */
if (server.repl_backlog) {
- feedReplicationBacklogWithObject(o);
- for (i = j; i < argc; i++) {
- char aux[REDIS_LONGSTR_SIZE+3];
- long objlen = stringObjectLen(argv[i]);
+ char aux[REDIS_LONGSTR_SIZE+3];
+
+ /* Add the multi bulk reply length. */
+ aux[0] = '*';
+ len = ll2string(aux+1,sizeof(aux-1),argc);
+ aux[len+1] = '\r';
+ aux[len+2] = '\n';
+ feedReplicationBacklog(aux,len+3);
+
+ for (j = 0; j < argc; j++) {
+ long objlen = stringObjectLen(argv[j]);
/* We need to feed the buffer with the object as a bulk reply
* not just as a plain string, so create the $..CRLF payload len
* ad add the final CRLF */
aux[0] = '$';
- len = ll2string(aux+1,objlen,sizeof(aux)-1);
+ len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
- feedReplicationBacklogWithObject(argv[i]);
- feedReplicationBacklogWithObject(shared.crlf);
+ feedReplicationBacklogWithObject(argv[j]);
+ feedReplicationBacklogWithObject(aux+len+1,2);
}
}
- /* Write data to slaves. Here we do two things:
- * 1) We write the "o" object that was created using the accumulated
- * static buffer.
- * 2) We write any additional argument of the command to replicate that
- * was not written inside the static buffer for lack of space.
- */
+ /* Write the command to every slave. */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
@@ -277,15 +222,14 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
* are queued in the output buffer until the initial SYNC completes),
* or are already in sync with the master. */
- /* First, trasmit the object created from the static buffer. */
- addReply(slave,o);
+ /* Add the multi bulk length. */
+ addReplyMultiBulkLen(slave,argc);
/* Finally any additional argument that was not stored inside the
* static buffer if any (from j to argc). */
- for (i = j; i < argc; i++)
- addReplyBulk(slave,argv[i]);
+ for (j = 0; j < argc; j++)
+ addReplyBulk(slave,argv[j]);
}
- decrRefCount(o);
}
void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) {