diff options
-rw-r--r-- | src/Makefile | 4 | ||||
-rw-r--r-- | src/anet.c | 7 | ||||
-rw-r--r-- | src/db.c | 15 | ||||
-rw-r--r-- | src/networking.c | 42 | ||||
-rw-r--r-- | src/rdb.c | 29 | ||||
-rw-r--r-- | src/replication.c | 9 | ||||
-rw-r--r-- | src/server.h | 6 |
7 files changed, 88 insertions, 24 deletions
diff --git a/src/Makefile b/src/Makefile index fec6573c2..8f429431b 100644 --- a/src/Makefile +++ b/src/Makefile @@ -214,8 +214,8 @@ $(REDIS_BENCHMARK_NAME): $(REDIS_BENCHMARK_OBJ) $(REDIS_CHECK_AOF_NAME): $(REDIS_CHECK_AOF_OBJ) $(REDIS_LD) -o $@ $^ $(FINAL_LIBS) -dict-benchmark: dict.c zmalloc.c sds.c - $(REDIS_CC) $(FINAL_CFLAGS) dict.c zmalloc.c sds.c siphash.c -D DICT_BENCHMARK_MAIN -o dict-benchmark +dict-benchmark: dict.c zmalloc.c sds.c siphash.c + $(REDIS_CC) $(FINAL_CFLAGS) $^ -D DICT_BENCHMARK_MAIN -o $@ $(FINAL_LIBS) # Because the jemalloc.h header is generated as a part of the jemalloc build, # building it should complete before building any other object. Instead of diff --git a/src/anet.c b/src/anet.c index ef1711d06..53a56b0d2 100644 --- a/src/anet.c +++ b/src/anet.c @@ -380,8 +380,10 @@ int anetUnixGenericConnect(char *err, char *path, int flags) sa.sun_family = AF_LOCAL; strncpy(sa.sun_path,path,sizeof(sa.sun_path)-1); if (flags & ANET_CONNECT_NONBLOCK) { - if (anetNonBlock(err,s) != ANET_OK) + if (anetNonBlock(err,s) != ANET_OK) { + close(s); return ANET_ERR; + } } if (connect(s,(struct sockaddr*)&sa,sizeof(sa)) == -1) { if (errno == EINPROGRESS && @@ -462,7 +464,7 @@ static int anetV6Only(char *err, int s) { static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog) { - int s, rv; + int s = -1, rv; char _port[6]; /* strlen("65535") */ struct addrinfo hints, *servinfo, *p; @@ -491,6 +493,7 @@ static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backl } error: + if (s != -1) close(s); s = ANET_ERR; end: freeaddrinfo(servinfo); @@ -1133,11 +1133,24 @@ int *getKeysUsingCommandTable(struct redisCommand *cmd,robj **argv, int argc, in *numkeys = 0; return NULL; } + last = cmd->lastkey; if (last < 0) last = argc+last; keys = zmalloc(sizeof(int)*((last - cmd->firstkey)+1)); for (j = cmd->firstkey; j <= last; j += cmd->keystep) { - serverAssert(j < argc); + if (j >= argc) { + /* Modules command do not have dispatch time arity checks, so + * we need to handle the case where the user passed an invalid + * number of arguments here. In this case we return no keys + * and expect the module command to report an arity error. */ + if (cmd->flags & CMD_MODULE) { + zfree(keys); + *numkeys = 0; + return NULL; + } else { + serverPanic("Redis built-in command declared keys positions not matching the arity requirements."); + } + } keys[i++] = j; } *numkeys = i; diff --git a/src/networking.c b/src/networking.c index fbab9970f..fae8e52bd 100644 --- a/src/networking.c +++ b/src/networking.c @@ -93,6 +93,7 @@ client *createClient(int fd) { c->name = NULL; c->bufpos = 0; c->querybuf = sdsempty(); + c->pending_querybuf = sdsempty(); c->querybuf_peak = 0; c->reqtype = 0; c->argc = 0; @@ -107,6 +108,7 @@ client *createClient(int fd) { c->replstate = REPL_STATE_NONE; c->repl_put_online_on_ack = 0; c->reploff = 0; + c->read_reploff = 0; c->repl_ack_off = 0; c->repl_ack_time = 0; c->slave_listening_port = 0; @@ -796,6 +798,7 @@ void freeClient(client *c) { /* Free the query buffer */ sdsfree(c->querybuf); + sdsfree(c->pending_querybuf); c->querybuf = NULL; /* Deallocate structures used to block on blocking ops. */ @@ -1318,8 +1321,13 @@ void processInputBuffer(client *c) { resetClient(c); } else { /* Only reset the client when the command was executed. */ - if (processCommand(c) == C_OK) + if (processCommand(c) == C_OK) { + if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { + /* Update the applied replication offset of our master. */ + c->reploff = c->read_reploff - sdslen(c->querybuf); + } resetClient(c); + } /* freeMemoryIfNeeded may flush slave output buffers. This may result * into a slave, that may be the active client, to be freed. */ if (server.current_client == NULL) break; @@ -1366,15 +1374,17 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { serverLog(LL_VERBOSE, "Client closed connection"); freeClient(c); return; + } else if (c->flags & CLIENT_MASTER) { + /* Append the query buffer to the pending (not applied) buffer + * of the master. We'll use this buffer later in order to have a + * copy of the string applied by the last command executed. */ + c->pending_querybuf = sdscatlen(c->pending_querybuf, + c->querybuf+qblen,nread); } sdsIncrLen(c->querybuf,nread); c->lastinteraction = server.unixtime; - if (c->flags & CLIENT_MASTER) { - c->reploff += nread; - replicationFeedSlavesFromMasterStream(server.slaves, - c->querybuf+qblen,nread); - } + if (c->flags & CLIENT_MASTER) c->read_reploff += nread; server.stat_net_input_bytes += nread; if (sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); @@ -1386,7 +1396,25 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { freeClient(c); return; } - processInputBuffer(c); + + /* Time to process the buffer. If the client is a master we need to + * compute the difference between the applied offset before and after + * processing the buffer, to understand how much of the replication stream + * was actually applied to the master state: this quantity, and its + * corresponding part of the replication stream, will be propagated to + * the sub-slaves and to the replication backlog. */ + if (!(c->flags & CLIENT_MASTER)) { + processInputBuffer(c); + } else { + size_t prev_offset = c->reploff; + processInputBuffer(c); + size_t applied = c->reploff - prev_offset; + if (applied) { + replicationFeedSlavesFromMasterStream(server.slaves, + c->pending_querybuf, applied); + sdsrange(c->pending_querybuf,applied,-1); + } + } } void getClientsMaxBuffers(unsigned long *longest_output_list, @@ -704,23 +704,30 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) { nwritten += n; } else if (o->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = o->ptr; - dictIterator *di = dictGetIterator(zs->dict); - dictEntry *de; + zskiplist *zsl = zs->zsl; - if ((n = rdbSaveLen(rdb,dictSize(zs->dict))) == -1) return -1; + if ((n = rdbSaveLen(rdb,zsl->length)) == -1) return -1; nwritten += n; - while((de = dictNext(di)) != NULL) { - sds ele = dictGetKey(de); - double *score = dictGetVal(de); - - if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele))) - == -1) return -1; + /* We save the skiplist elements from the greatest to the smallest + * (that's trivial since the elements are already ordered in the + * skiplist): this improves the load process, since the next loaded + * element will always be the smaller, so adding to the skiplist + * will always immediately stop at the head, making the insertion + * O(1) instead of O(log(N)). */ + zskiplistNode *zn = zsl->tail; + while (zn != NULL) { + if ((n = rdbSaveRawString(rdb, + (unsigned char*)zn->ele,sdslen(zn->ele))) == -1) + { + return -1; + } nwritten += n; - if ((n = rdbSaveBinaryDoubleValue(rdb,*score)) == -1) return -1; + if ((n = rdbSaveBinaryDoubleValue(rdb,zn->score)) == -1) + return -1; nwritten += n; + zn = zn->backward; } - dictReleaseIterator(di); } else { serverPanic("Unknown sorted set encoding"); } diff --git a/src/replication.c b/src/replication.c index c7a703b85..1828eb8bf 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1078,6 +1078,7 @@ void replicationCreateMasterClient(int fd, int dbid) { server.master->flags |= CLIENT_MASTER; server.master->authenticated = 1; server.master->reploff = server.master_initial_offset; + server.master->read_reploff = server.master->reploff; memcpy(server.master->replid, server.master_replid, sizeof(server.master_replid)); /* If master offset is set to -1, this master is old and is not @@ -2118,6 +2119,14 @@ void replicationCacheMaster(client *c) { /* Unlink the client from the server structures. */ unlinkClient(c); + /* Fix the master specific fields: we want to discard to non processed + * query buffers and non processed offsets, including pending + * transactions. */ + sdsclear(server.master->querybuf); + sdsclear(server.master->pending_querybuf); + server.master->read_reploff = server.master->reploff; + if (c->flags & CLIENT_MULTI) discardTransaction(c); + /* Save the master. Server.master will be set to null later by * replicationHandleMasterDisconnection(). */ server.cached_master = server.master; diff --git a/src/server.h b/src/server.h index 19be92ba2..8cc172149 100644 --- a/src/server.h +++ b/src/server.h @@ -663,6 +663,9 @@ typedef struct client { redisDb *db; /* Pointer to currently SELECTed DB. */ robj *name; /* As set by CLIENT SETNAME. */ sds querybuf; /* Buffer we use to accumulate client queries. */ + sds pending_querybuf; /* If this is a master, this buffer represents the + yet not applied replication stream that we + are receiving from the master. */ size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */ int argc; /* Num of arguments of current command. */ robj **argv; /* Arguments of current command. */ @@ -685,7 +688,8 @@ typedef struct client { off_t repldboff; /* Replication DB file offset. */ off_t repldbsize; /* Replication DB file size. */ sds replpreamble; /* Replication DB preamble. */ - long long reploff; /* Replication offset if this is our master. */ + long long read_reploff; /* Read replication offset if this is a master. */ + long long reploff; /* Applied replication offset if this is a master. */ long long repl_ack_off; /* Replication ack offset, if this is a slave. */ long long repl_ack_time;/* Replication ack time, if this is a slave. */ long long psync_initial_offset; /* FULLRESYNC reply offset other slaves |