summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Makefile4
-rw-r--r--src/anet.c7
-rw-r--r--src/db.c15
-rw-r--r--src/networking.c42
-rw-r--r--src/rdb.c29
-rw-r--r--src/replication.c9
-rw-r--r--src/server.h6
7 files changed, 88 insertions, 24 deletions
diff --git a/src/Makefile b/src/Makefile
index fec6573c2..8f429431b 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -214,8 +214,8 @@ $(REDIS_BENCHMARK_NAME): $(REDIS_BENCHMARK_OBJ)
$(REDIS_CHECK_AOF_NAME): $(REDIS_CHECK_AOF_OBJ)
$(REDIS_LD) -o $@ $^ $(FINAL_LIBS)
-dict-benchmark: dict.c zmalloc.c sds.c
- $(REDIS_CC) $(FINAL_CFLAGS) dict.c zmalloc.c sds.c siphash.c -D DICT_BENCHMARK_MAIN -o dict-benchmark
+dict-benchmark: dict.c zmalloc.c sds.c siphash.c
+ $(REDIS_CC) $(FINAL_CFLAGS) $^ -D DICT_BENCHMARK_MAIN -o $@ $(FINAL_LIBS)
# Because the jemalloc.h header is generated as a part of the jemalloc build,
# building it should complete before building any other object. Instead of
diff --git a/src/anet.c b/src/anet.c
index ef1711d06..53a56b0d2 100644
--- a/src/anet.c
+++ b/src/anet.c
@@ -380,8 +380,10 @@ int anetUnixGenericConnect(char *err, char *path, int flags)
sa.sun_family = AF_LOCAL;
strncpy(sa.sun_path,path,sizeof(sa.sun_path)-1);
if (flags & ANET_CONNECT_NONBLOCK) {
- if (anetNonBlock(err,s) != ANET_OK)
+ if (anetNonBlock(err,s) != ANET_OK) {
+ close(s);
return ANET_ERR;
+ }
}
if (connect(s,(struct sockaddr*)&sa,sizeof(sa)) == -1) {
if (errno == EINPROGRESS &&
@@ -462,7 +464,7 @@ static int anetV6Only(char *err, int s) {
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
{
- int s, rv;
+ int s = -1, rv;
char _port[6]; /* strlen("65535") */
struct addrinfo hints, *servinfo, *p;
@@ -491,6 +493,7 @@ static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backl
}
error:
+ if (s != -1) close(s);
s = ANET_ERR;
end:
freeaddrinfo(servinfo);
diff --git a/src/db.c b/src/db.c
index 760843120..86dabac8f 100644
--- a/src/db.c
+++ b/src/db.c
@@ -1133,11 +1133,24 @@ int *getKeysUsingCommandTable(struct redisCommand *cmd,robj **argv, int argc, in
*numkeys = 0;
return NULL;
}
+
last = cmd->lastkey;
if (last < 0) last = argc+last;
keys = zmalloc(sizeof(int)*((last - cmd->firstkey)+1));
for (j = cmd->firstkey; j <= last; j += cmd->keystep) {
- serverAssert(j < argc);
+ if (j >= argc) {
+ /* Modules command do not have dispatch time arity checks, so
+ * we need to handle the case where the user passed an invalid
+ * number of arguments here. In this case we return no keys
+ * and expect the module command to report an arity error. */
+ if (cmd->flags & CMD_MODULE) {
+ zfree(keys);
+ *numkeys = 0;
+ return NULL;
+ } else {
+ serverPanic("Redis built-in command declared keys positions not matching the arity requirements.");
+ }
+ }
keys[i++] = j;
}
*numkeys = i;
diff --git a/src/networking.c b/src/networking.c
index fbab9970f..fae8e52bd 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -93,6 +93,7 @@ client *createClient(int fd) {
c->name = NULL;
c->bufpos = 0;
c->querybuf = sdsempty();
+ c->pending_querybuf = sdsempty();
c->querybuf_peak = 0;
c->reqtype = 0;
c->argc = 0;
@@ -107,6 +108,7 @@ client *createClient(int fd) {
c->replstate = REPL_STATE_NONE;
c->repl_put_online_on_ack = 0;
c->reploff = 0;
+ c->read_reploff = 0;
c->repl_ack_off = 0;
c->repl_ack_time = 0;
c->slave_listening_port = 0;
@@ -796,6 +798,7 @@ void freeClient(client *c) {
/* Free the query buffer */
sdsfree(c->querybuf);
+ sdsfree(c->pending_querybuf);
c->querybuf = NULL;
/* Deallocate structures used to block on blocking ops. */
@@ -1318,8 +1321,13 @@ void processInputBuffer(client *c) {
resetClient(c);
} else {
/* Only reset the client when the command was executed. */
- if (processCommand(c) == C_OK)
+ if (processCommand(c) == C_OK) {
+ if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
+ /* Update the applied replication offset of our master. */
+ c->reploff = c->read_reploff - sdslen(c->querybuf);
+ }
resetClient(c);
+ }
/* freeMemoryIfNeeded may flush slave output buffers. This may result
* into a slave, that may be the active client, to be freed. */
if (server.current_client == NULL) break;
@@ -1366,15 +1374,17 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
serverLog(LL_VERBOSE, "Client closed connection");
freeClient(c);
return;
+ } else if (c->flags & CLIENT_MASTER) {
+ /* Append the query buffer to the pending (not applied) buffer
+ * of the master. We'll use this buffer later in order to have a
+ * copy of the string applied by the last command executed. */
+ c->pending_querybuf = sdscatlen(c->pending_querybuf,
+ c->querybuf+qblen,nread);
}
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
- if (c->flags & CLIENT_MASTER) {
- c->reploff += nread;
- replicationFeedSlavesFromMasterStream(server.slaves,
- c->querybuf+qblen,nread);
- }
+ if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
server.stat_net_input_bytes += nread;
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
@@ -1386,7 +1396,25 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
freeClient(c);
return;
}
- processInputBuffer(c);
+
+ /* Time to process the buffer. If the client is a master we need to
+ * compute the difference between the applied offset before and after
+ * processing the buffer, to understand how much of the replication stream
+ * was actually applied to the master state: this quantity, and its
+ * corresponding part of the replication stream, will be propagated to
+ * the sub-slaves and to the replication backlog. */
+ if (!(c->flags & CLIENT_MASTER)) {
+ processInputBuffer(c);
+ } else {
+ size_t prev_offset = c->reploff;
+ processInputBuffer(c);
+ size_t applied = c->reploff - prev_offset;
+ if (applied) {
+ replicationFeedSlavesFromMasterStream(server.slaves,
+ c->pending_querybuf, applied);
+ sdsrange(c->pending_querybuf,applied,-1);
+ }
+ }
}
void getClientsMaxBuffers(unsigned long *longest_output_list,
diff --git a/src/rdb.c b/src/rdb.c
index 2689b172d..1a5a7b2c5 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -704,23 +704,30 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) {
nwritten += n;
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
- dictIterator *di = dictGetIterator(zs->dict);
- dictEntry *de;
+ zskiplist *zsl = zs->zsl;
- if ((n = rdbSaveLen(rdb,dictSize(zs->dict))) == -1) return -1;
+ if ((n = rdbSaveLen(rdb,zsl->length)) == -1) return -1;
nwritten += n;
- while((de = dictNext(di)) != NULL) {
- sds ele = dictGetKey(de);
- double *score = dictGetVal(de);
-
- if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele)))
- == -1) return -1;
+ /* We save the skiplist elements from the greatest to the smallest
+ * (that's trivial since the elements are already ordered in the
+ * skiplist): this improves the load process, since the next loaded
+ * element will always be the smaller, so adding to the skiplist
+ * will always immediately stop at the head, making the insertion
+ * O(1) instead of O(log(N)). */
+ zskiplistNode *zn = zsl->tail;
+ while (zn != NULL) {
+ if ((n = rdbSaveRawString(rdb,
+ (unsigned char*)zn->ele,sdslen(zn->ele))) == -1)
+ {
+ return -1;
+ }
nwritten += n;
- if ((n = rdbSaveBinaryDoubleValue(rdb,*score)) == -1) return -1;
+ if ((n = rdbSaveBinaryDoubleValue(rdb,zn->score)) == -1)
+ return -1;
nwritten += n;
+ zn = zn->backward;
}
- dictReleaseIterator(di);
} else {
serverPanic("Unknown sorted set encoding");
}
diff --git a/src/replication.c b/src/replication.c
index c7a703b85..1828eb8bf 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -1078,6 +1078,7 @@ void replicationCreateMasterClient(int fd, int dbid) {
server.master->flags |= CLIENT_MASTER;
server.master->authenticated = 1;
server.master->reploff = server.master_initial_offset;
+ server.master->read_reploff = server.master->reploff;
memcpy(server.master->replid, server.master_replid,
sizeof(server.master_replid));
/* If master offset is set to -1, this master is old and is not
@@ -2118,6 +2119,14 @@ void replicationCacheMaster(client *c) {
/* Unlink the client from the server structures. */
unlinkClient(c);
+ /* Fix the master specific fields: we want to discard to non processed
+ * query buffers and non processed offsets, including pending
+ * transactions. */
+ sdsclear(server.master->querybuf);
+ sdsclear(server.master->pending_querybuf);
+ server.master->read_reploff = server.master->reploff;
+ if (c->flags & CLIENT_MULTI) discardTransaction(c);
+
/* Save the master. Server.master will be set to null later by
* replicationHandleMasterDisconnection(). */
server.cached_master = server.master;
diff --git a/src/server.h b/src/server.h
index 19be92ba2..8cc172149 100644
--- a/src/server.h
+++ b/src/server.h
@@ -663,6 +663,9 @@ typedef struct client {
redisDb *db; /* Pointer to currently SELECTed DB. */
robj *name; /* As set by CLIENT SETNAME. */
sds querybuf; /* Buffer we use to accumulate client queries. */
+ sds pending_querybuf; /* If this is a master, this buffer represents the
+ yet not applied replication stream that we
+ are receiving from the master. */
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */
@@ -685,7 +688,8 @@ typedef struct client {
off_t repldboff; /* Replication DB file offset. */
off_t repldbsize; /* Replication DB file size. */
sds replpreamble; /* Replication DB preamble. */
- long long reploff; /* Replication offset if this is our master. */
+ long long read_reploff; /* Read replication offset if this is a master. */
+ long long reploff; /* Applied replication offset if this is a master. */
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
long long repl_ack_time;/* Replication ack time, if this is a slave. */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves