summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--redis.conf14
-rw-r--r--src/Makefile2
-rw-r--r--src/aof.c15
-rw-r--r--src/atomicvar.h2
-rw-r--r--src/bitops.c5
-rw-r--r--src/blocked.c7
-rw-r--r--src/childinfo.c85
-rw-r--r--src/cluster.c12
-rw-r--r--src/config.c12
-rw-r--r--src/db.c110
-rw-r--r--src/debug.c64
-rw-r--r--src/dict.c129
-rw-r--r--src/dict.h21
-rw-r--r--src/expire.c150
-rw-r--r--src/geo.c43
-rw-r--r--src/geohash_helper.c52
-rw-r--r--src/hyperloglog.c41
-rw-r--r--src/latency.c2
-rw-r--r--src/lazyfree.c5
-rw-r--r--src/module.c429
-rw-r--r--src/modules/API.md198
-rw-r--r--src/modules/BLOCK.md265
-rw-r--r--src/modules/INTRO.md1
-rw-r--r--src/modules/Makefile11
-rw-r--r--src/modules/TYPES.md36
-rw-r--r--src/modules/helloblock.c122
-rw-r--r--src/modules/hellotype.c15
-rw-r--r--src/modules/helloworld.c8
-rw-r--r--src/modules/testmodule.c41
-rw-r--r--src/networking.c53
-rw-r--r--src/object.c432
-rw-r--r--src/quicklist.c6
-rw-r--r--src/rdb.c97
-rw-r--r--src/rdb.h12
-rw-r--r--src/redis-benchmark.c2
-rw-r--r--src/redis-cli.c15
-rwxr-xr-xsrc/redis-trib.rb3
-rw-r--r--src/redismodule.h41
-rw-r--r--src/replication.c490
-rw-r--r--src/sentinel.c12
-rw-r--r--src/server.c129
-rw-r--r--src/server.h137
-rw-r--r--src/t_set.c2
-rw-r--r--src/t_string.c2
-rw-r--r--src/t_zset.c21
-rw-r--r--src/ziplist.c62
-rw-r--r--src/ziplist.h1
-rw-r--r--src/zmalloc.c25
-rw-r--r--src/zmalloc.h4
-rw-r--r--tests/assets/default.conf1
-rw-r--r--tests/integration/psync2.tcl182
-rw-r--r--tests/integration/replication-3.tcl12
-rw-r--r--tests/support/server.tcl2
-rw-r--r--tests/test_helper.tcl2
-rw-r--r--tests/unit/bitfield.tcl9
-rw-r--r--tests/unit/geo.tcl19
-rw-r--r--tests/unit/other.tcl2
-rw-r--r--tests/unit/wait.tcl42
-rw-r--r--utils/hyperloglog/hll-gnuplot-graph.rb2
59 files changed, 3167 insertions, 549 deletions
diff --git a/redis.conf b/redis.conf
index 53d519658..14b5ca979 100644
--- a/redis.conf
+++ b/redis.conf
@@ -185,6 +185,14 @@ logfile ""
# dbid is a number between 0 and 'databases'-1
databases 16
+# By default Redis shows an ASCII art logo only when started to log to the
+# standard output and if the standard output is a TTY. Basically this means
+# that normally a logo is displayed only in interactive sessions.
+#
+# However it is possible to force the pre-4.0 behavior and always show a
+# ASCII art logo in startup logs by setting the following option to yes.
+always-show-logo yes
+
################################ SNAPSHOTTING ################################
#
# Save the DB on disk:
@@ -402,6 +410,10 @@ repl-disable-tcp-nodelay no
# need to elapse, starting from the time the last slave disconnected, for
# the backlog buffer to be freed.
#
+# Note that slaves never free the backlog for timeout, since they may be
+# promoted to masters later, and should be able to correctly "partially
+# resynchronize" with the slaves: hence they should always accumulate backlog.
+#
# A value of 0 means to never release the backlog.
#
# repl-backlog-ttl 3600
@@ -448,7 +460,7 @@ slave-priority 100
# offers this information, which is used, among other tools, by
# Redis Sentinel in order to discover slave instances.
# Another place where this info is available is in the output of the
-# "ROLE" command of a masteer.
+# "ROLE" command of a master.
#
# The listed IP and address normally reported by a slave is obtained
# in the following way:
diff --git a/src/Makefile b/src/Makefile
index 6bd8d8d66..2bf3c9347 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -128,7 +128,7 @@ endif
REDIS_SERVER_NAME=redis-server
REDIS_SENTINEL_NAME=redis-sentinel
-REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o
+REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o
REDIS_CLI_NAME=redis-cli
REDIS_CLI_OBJ=anet.o adlist.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o
REDIS_BENCHMARK_NAME=redis-benchmark
diff --git a/src/aof.c b/src/aof.c
index 5523066b5..07d8561da 100644
--- a/src/aof.c
+++ b/src/aof.c
@@ -653,7 +653,7 @@ int loadAppendOnlyFile(char *filename) {
serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
rioInitWithFile(&rdb,fp);
- if (rdbLoadRio(&rdb) != C_OK) {
+ if (rdbLoadRio(&rdb,NULL) != C_OK) {
serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
goto readerr;
} else {
@@ -1023,6 +1023,10 @@ int rewriteModuleObject(rio *r, robj *key, robj *o) {
moduleType *mt = mv->type;
moduleInitIOContext(io,mt,r);
mt->aof_rewrite(&io,key,mv->value);
+ if (io.ctx) {
+ moduleFreeContext(io.ctx);
+ zfree(io.ctx);
+ }
return io.error ? 0 : 1;
}
@@ -1148,7 +1152,7 @@ int rewriteAppendOnlyFile(char *filename) {
if (server.aof_use_rdb_preamble) {
int error;
- if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE) == C_ERR) {
+ if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
@@ -1319,6 +1323,7 @@ int rewriteAppendOnlyFileBackground(void) {
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
if (aofCreatePipes() != C_OK) return C_ERR;
+ openChildInfoPipe();
start = ustime();
if ((childpid = fork()) == 0) {
char tmpfile[256];
@@ -1328,13 +1333,16 @@ int rewriteAppendOnlyFileBackground(void) {
redisSetProcTitle("redis-aof-rewrite");
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
- size_t private_dirty = zmalloc_get_private_dirty();
+ size_t private_dirty = zmalloc_get_private_dirty(-1);
if (private_dirty) {
serverLog(LL_NOTICE,
"AOF rewrite: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
+
+ server.child_info_data.cow_size = private_dirty;
+ sendChildInfo(CHILD_INFO_TYPE_AOF);
exitFromChild(0);
} else {
exitFromChild(1);
@@ -1345,6 +1353,7 @@ int rewriteAppendOnlyFileBackground(void) {
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
if (childpid == -1) {
+ closeChildInfoPipe();
serverLog(LL_WARNING,
"Can't rewrite append only file in background: fork: %s",
strerror(errno));
diff --git a/src/atomicvar.h b/src/atomicvar.h
index 3489972d2..4aa8fa173 100644
--- a/src/atomicvar.h
+++ b/src/atomicvar.h
@@ -51,7 +51,7 @@
#ifndef __ATOMIC_VAR_H
#define __ATOMIC_VAR_H
-#if defined(__ATOMIC_RELAXED)
+#if defined(__ATOMIC_RELAXED) && (!defined(__clang__) || !defined(__APPLE__) || __apple_build_version__ > 4210057)
/* Implementation using __atomic macros. */
#define atomicIncr(var,count,mutex) __atomic_add_fetch(&var,(count),__ATOMIC_RELAXED)
diff --git a/src/bitops.c b/src/bitops.c
index 302e811d2..46eee22c3 100644
--- a/src/bitops.c
+++ b/src/bitops.c
@@ -907,7 +907,7 @@ void bitfieldCommand(client *c) {
struct bitfieldOp *ops = NULL; /* Array of ops to execute at end. */
int owtype = BFOVERFLOW_WRAP; /* Overflow type. */
int readonly = 1;
- long higest_write_offset = 0;
+ size_t higest_write_offset = 0;
for (j = 2; j < c->argc; j++) {
int remargs = c->argc-j-1; /* Remaining args other than current. */
@@ -957,7 +957,8 @@ void bitfieldCommand(client *c) {
if (opcode != BITFIELDOP_GET) {
readonly = 0;
- higest_write_offset = bitoffset + bits - 1;
+ if (higest_write_offset < bitoffset + bits - 1)
+ higest_write_offset = bitoffset + bits - 1;
/* INCRBY and SET require another argument. */
if (getLongLongFromObjectOrReply(c,c->argv[j+3],&i64,NULL) != C_OK){
zfree(ops);
diff --git a/src/blocked.c b/src/blocked.c
index d22872548..54b26b713 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -136,6 +136,8 @@ void unblockClient(client *c) {
unblockClientWaitingData(c);
} else if (c->btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c);
+ } else if (c->btype == BLOCKED_MODULE) {
+ unblockClientFromModule(c);
} else {
serverPanic("Unknown btype in unblockClient().");
}
@@ -153,12 +155,15 @@ void unblockClient(client *c) {
}
/* This function gets called when a blocked client timed out in order to
- * send it a reply of some kind. */
+ * send it a reply of some kind. After this function is called,
+ * unblockClient() will be called with the same client as argument. */
void replyToBlockedClientTimedOut(client *c) {
if (c->btype == BLOCKED_LIST) {
addReply(c,shared.nullmultibulk);
} else if (c->btype == BLOCKED_WAIT) {
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
+ } else if (c->btype == BLOCKED_MODULE) {
+ moduleBlockedClientTimedOut(c);
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
}
diff --git a/src/childinfo.c b/src/childinfo.c
new file mode 100644
index 000000000..719025e8c
--- /dev/null
+++ b/src/childinfo.c
@@ -0,0 +1,85 @@
+/*
+ * Copyright (c) 2016, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "server.h"
+#include <unistd.h>
+
+/* Open a child-parent channel used in order to move information about the
+ * RDB / AOF saving process from the child to the parent (for instance
+ * the amount of copy on write memory used) */
+void openChildInfoPipe(void) {
+ if (pipe(server.child_info_pipe) == -1) {
+ /* On error our two file descriptors should be still set to -1,
+ * but we call anyway cloesChildInfoPipe() since can't hurt. */
+ closeChildInfoPipe();
+ } else if (anetNonBlock(NULL,server.child_info_pipe[0]) != ANET_OK) {
+ closeChildInfoPipe();
+ } else {
+ memset(&server.child_info_data,0,sizeof(server.child_info_data));
+ }
+}
+
+/* Close the pipes opened with openChildInfoPipe(). */
+void closeChildInfoPipe(void) {
+ if (server.child_info_pipe[0] != -1 ||
+ server.child_info_pipe[1] != -1)
+ {
+ close(server.child_info_pipe[0]);
+ close(server.child_info_pipe[1]);
+ server.child_info_pipe[0] = -1;
+ server.child_info_pipe[1] = -1;
+ }
+}
+
+/* Send COW data to parent. The child should call this function after populating
+ * the corresponding fields it want to sent (according to the process type). */
+void sendChildInfo(int ptype) {
+ if (server.child_info_pipe[1] == -1) return;
+ server.child_info_data.magic = CHILD_INFO_MAGIC;
+ server.child_info_data.process_type = ptype;
+ ssize_t wlen = sizeof(server.child_info_data);
+ if (write(server.child_info_pipe[1],&server.child_info_data,wlen) != wlen) {
+ /* Nothing to do on error, this will be detected by the other side. */
+ }
+}
+
+/* Receive COW data from parent. */
+void receiveChildInfo(void) {
+ if (server.child_info_pipe[0] == -1) return;
+ ssize_t wlen = sizeof(server.child_info_data);
+ if (read(server.child_info_pipe[0],&server.child_info_data,wlen) == wlen &&
+ server.child_info_data.magic == CHILD_INFO_MAGIC)
+ {
+ if (server.child_info_data.process_type == CHILD_INFO_TYPE_RDB) {
+ server.stat_rdb_cow_bytes = server.child_info_data.cow_size;
+ } else if (server.child_info_data.process_type == CHILD_INFO_TYPE_AOF) {
+ server.stat_aof_cow_bytes = server.child_info_data.cow_size;
+ }
+ }
+}
diff --git a/src/cluster.c b/src/cluster.c
index 9289f6782..6e3fc8b00 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -129,7 +129,7 @@ int clusterLoadConfig(char *filename) {
/* Skip blank lines, they can be created either by users manually
* editing nodes.conf or by the config writing process if stopped
* before the truncate() call. */
- if (line[0] == '\n') continue;
+ if (line[0] == '\n' || line[0] == '\0') continue;
/* Split the line into arguments for processing. */
argv = sdssplitargs(line,&argc);
@@ -2774,7 +2774,7 @@ void clusterHandleSlaveFailover(void) {
* and wait for replies), and the failover retry time (the time to wait
* before trying to get voted again).
*
- * Timeout is MIN(NODE_TIMEOUT*2,2000) milliseconds.
+ * Timeout is MAX(NODE_TIMEOUT*2,2000) milliseconds.
* Retry is two times the Timeout.
*/
auth_timeout = server.cluster_node_timeout*2;
@@ -4617,7 +4617,7 @@ void restoreCommand(client *c) {
/* Create the key and set the TTL if any */
dbAdd(c->db,c->argv[1],obj);
- if (ttl) setExpire(c->db,c->argv[1],mstime()+ttl);
+ if (ttl) setExpire(c,c->db,c->argv[1],mstime()+ttl);
signalModifiedKey(c->db,c->argv[1]);
addReply(c,shared.ok);
server.dirty++;
@@ -4750,7 +4750,6 @@ void migrateCommand(client *c) {
int copy, replace, j;
long timeout;
long dbid;
- long long ttl, expireat;
robj **ov = NULL; /* Objects to migrate. */
robj **kv = NULL; /* Key names. */
robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */
@@ -4765,7 +4764,6 @@ void migrateCommand(client *c) {
/* Initialization */
copy = 0;
replace = 0;
- ttl = 0;
/* Parse additional options */
for (j = 6; j < c->argc; j++) {
@@ -4841,7 +4839,9 @@ try_again:
/* Create RESTORE payload and generate the protocol to call the command. */
for (j = 0; j < num_keys; j++) {
- expireat = getExpire(c->db,kv[j]);
+ long long ttl = 0;
+ long long expireat = getExpire(c->db,kv[j]);
+
if (expireat != -1) {
ttl = expireat-mstime();
if (ttl < 1) ttl = 1;
diff --git a/src/config.c b/src/config.c
index 1d81180b7..54af5bfe0 100644
--- a/src/config.c
+++ b/src/config.c
@@ -283,6 +283,10 @@ void loadServerConfigFromString(char *config) {
}
fclose(logfp);
}
+ } else if (!strcasecmp(argv[0],"always-show-logo") && argc == 2) {
+ if ((server.always_show_logo = yesnotoi(argv[1])) == -1) {
+ err = "argument must be 'yes' or 'no'"; goto loaderr;
+ }
} else if (!strcasecmp(argv[0],"syslog-enabled") && argc == 2) {
if ((server.syslog_enabled = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
@@ -616,8 +620,9 @@ void loadServerConfigFromString(char *config) {
unsigned long long hard, soft;
int soft_seconds;
- if (class == -1) {
- err = "Unrecognized client limit class";
+ if (class == -1 || class == CLIENT_TYPE_MASTER) {
+ err = "Unrecognized client limit class: the user specified "
+ "an invalid one, or 'master' which has no buffer limits.";
goto loaderr;
}
hard = memtoll(argv[2],NULL);
@@ -906,7 +911,8 @@ void configSetCommand(client *c) {
long val;
if ((j % 4) == 0) {
- if (getClientTypeByName(v[j]) == -1) {
+ int class = getClientTypeByName(v[j]);
+ if (class == -1 || class == CLIENT_TYPE_MASTER) {
sdsfreesplitres(v,vlen);
goto badfmt;
}
diff --git a/src/db.c b/src/db.c
index d33c810b3..90a75fcfe 100644
--- a/src/db.c
+++ b/src/db.c
@@ -190,7 +190,9 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
*
* 1) The ref count of the value object is incremented.
* 2) clients WATCHing for the destination key notified.
- * 3) The expire time of the key is reset (the key is made persistent). */
+ * 3) The expire time of the key is reset (the key is made persistent).
+ *
+ * All the new keys in the database should be craeted via this interface. */
void setKey(redisDb *db, robj *key, robj *val) {
if (lookupKeyWrite(db,key) == NULL) {
dbAdd(db,key,val);
@@ -330,6 +332,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
slotToKeyFlush();
}
}
+ if (dbnum == -1) flushSlaveKeysWithExpireList();
return removed;
}
@@ -413,7 +416,7 @@ void flushallCommand(client *c) {
/* Normally rdbSave() will reset dirty, but we don't want this here
* as otherwise FLUSHALL will not be replicated nor put into the AOF. */
int saved_dirty = server.dirty;
- rdbSave(server.rdb_filename);
+ rdbSave(server.rdb_filename,NULL);
server.dirty = saved_dirty;
}
server.dirty++;
@@ -471,7 +474,7 @@ void selectCommand(client *c) {
return;
}
if (selectDb(c,id) == C_ERR) {
- addReplyError(c,"invalid DB index");
+ addReplyError(c,"DB index is out of range");
} else {
addReply(c,shared.ok);
}
@@ -851,7 +854,7 @@ void renameGenericCommand(client *c, int nx) {
dbDelete(c->db,c->argv[2]);
}
dbAdd(c->db,c->argv[2],o);
- if (expire != -1) setExpire(c->db,c->argv[2],expire);
+ if (expire != -1) setExpire(c,c->db,c->argv[2],expire);
dbDelete(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[2]);
@@ -917,7 +920,7 @@ void moveCommand(client *c) {
return;
}
dbAdd(dst,c->argv[1],o);
- if (expire != -1) setExpire(dst,c->argv[1],expire);
+ if (expire != -1) setExpire(c,dst,c->argv[1],expire);
incrRefCount(o);
/* OK! key moved, free the entry in the source DB */
@@ -926,6 +929,91 @@ void moveCommand(client *c) {
addReply(c,shared.cone);
}
+/* Helper function for dbSwapDatabases(): scans the list of keys that have
+ * one or more blocked clients for B[LR]POP or other list blocking commands
+ * and signal the keys are ready if they are lists. See the comment where
+ * the function is used for more info. */
+void scanDatabaseForReadyLists(redisDb *db) {
+ dictEntry *de;
+ dictIterator *di = dictGetSafeIterator(db->blocking_keys);
+ while((de = dictNext(di)) != NULL) {
+ robj *key = dictGetKey(de);
+ robj *value = lookupKey(db,key,LOOKUP_NOTOUCH);
+ if (value && value->type == OBJ_LIST)
+ signalListAsReady(db, key);
+ }
+ dictReleaseIterator(di);
+}
+
+/* Swap two databases at runtime so that all clients will magically see
+ * the new database even if already connected. Note that the client
+ * structure c->db points to a given DB, so we need to be smarter and
+ * swap the underlying referenced structures, otherwise we would need
+ * to fix all the references to the Redis DB structure.
+ *
+ * Returns C_ERR if at least one of the DB ids are out of range, otherwise
+ * C_OK is returned. */
+int dbSwapDatabases(int id1, int id2) {
+ if (id1 < 0 || id1 >= server.dbnum ||
+ id2 < 0 || id2 >= server.dbnum) return C_ERR;
+ if (id1 == id2) return C_OK;
+ redisDb aux = server.db[id1];
+ redisDb *db1 = &server.db[id1], *db2 = &server.db[id2];
+
+ /* Swap hash tables. Note that we don't swap blocking_keys,
+ * ready_keys and watched_keys, since we want clients to
+ * remain in the same DB they were. */
+ db1->dict = db2->dict;
+ db1->expires = db2->expires;
+ db1->avg_ttl = db2->avg_ttl;
+
+ db2->dict = aux.dict;
+ db2->expires = aux.expires;
+ db2->avg_ttl = aux.avg_ttl;
+
+ /* Now we need to handle clients blocked on lists: as an effect
+ * of swapping the two DBs, a client that was waiting for list
+ * X in a given DB, may now actually be unblocked if X happens
+ * to exist in the new version of the DB, after the swap.
+ *
+ * However normally we only do this check for efficiency reasons
+ * in dbAdd() when a list is created. So here we need to rescan
+ * the list of clients blocked on lists and signal lists as ready
+ * if needed. */
+ scanDatabaseForReadyLists(db1);
+ scanDatabaseForReadyLists(db2);
+ return C_OK;
+}
+
+/* SWAPDB db1 db2 */
+void swapdbCommand(client *c) {
+ long id1, id2;
+
+ /* Not allowed in cluster mode: we have just DB 0 there. */
+ if (server.cluster_enabled) {
+ addReplyError(c,"SWAPDB is not allowed in cluster mode");
+ return;
+ }
+
+ /* Get the two DBs indexes. */
+ if (getLongFromObjectOrReply(c, c->argv[1], &id1,
+ "invalid first DB index") != C_OK)
+ return;
+
+ if (getLongFromObjectOrReply(c, c->argv[2], &id2,
+ "invalid second DB index") != C_OK)
+ return;
+
+ /* Swap... */
+ if (dbSwapDatabases(id1,id2) == C_ERR) {
+ addReplyError(c,"DB index is out of range");
+ return;
+ } else {
+ server.dirty++;
+ addReply(c,shared.ok);
+ }
+}
+
/*-----------------------------------------------------------------------------
* Expires API
*----------------------------------------------------------------------------*/
@@ -937,14 +1025,22 @@ int removeExpire(redisDb *db, robj *key) {
return dictDelete(db->expires,key->ptr) == DICT_OK;
}
-void setExpire(redisDb *db, robj *key, long long when) {
+/* Set an expire to the specified key. If the expire is set in the context
+ * of an user calling a command 'c' is the client, otherwise 'c' is set
+ * to NULL. The 'when' parameter is the absolute unix time in milliseconds
+ * after which the key will no longer be considered valid. */
+void setExpire(client *c, redisDb *db, robj *key, long long when) {
dictEntry *kde, *de;
/* Reuse the sds from the main dict in the expire dict */
kde = dictFind(db->dict,key->ptr);
serverAssertWithInfo(NULL,key,kde != NULL);
- de = dictReplaceRaw(db->expires,dictGetKey(kde));
+ de = dictAddOrFind(db->expires,dictGetKey(kde));
dictSetSignedIntegerVal(de,when);
+
+ int writable_slave = server.masterhost && server.repl_slave_ro == 0;
+ if (c && writable_slave && !(c->flags & CLIENT_MASTER))
+ rememberSlaveKeyWithExpire(db,key);
}
/* Return the expire time of the specified key, or -1 if no expire
diff --git a/src/debug.c b/src/debug.c
index 6f0b5e702..b8ad4e511 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -33,6 +33,7 @@
#include <arpa/inet.h>
#include <signal.h>
+#include <dlfcn.h>
#ifdef HAVE_BACKTRACE
#include <execinfo.h>
@@ -40,7 +41,6 @@
#include <fcntl.h>
#include "bio.h"
#include <unistd.h>
-#include <dlfcn.h>
#endif /* HAVE_BACKTRACE */
#ifdef __CYGWIN__
@@ -252,14 +252,6 @@ void computeDatasetDigest(unsigned char *final) {
}
}
-#if defined(USE_JEMALLOC)
-void inputCatSds(void *result, const char *str) {
- /* result is actually a (sds *), so re-cast it here */
- sds *info = (sds *)result;
- *info = sdscat(*info, str);
-}
-#endif
-
void debugCommand(client *c) {
if (c->argc == 1) {
addReplyError(c,"You must specify a subcommand for DEBUG. Try DEBUG HELP for info.");
@@ -288,6 +280,8 @@ void debugCommand(client *c) {
blen++; addReplyStatus(c,
"sdslen <key> -- Show low level SDS string info representing key and value.");
blen++; addReplyStatus(c,
+ "ziplist <key> -- Show low level info about the ziplist encoding.");
+ blen++; addReplyStatus(c,
"populate <count> [prefix] -- Create <count> string keys named key:<num>. If a prefix is specified is used instead of the 'key' prefix.");
blen++; addReplyStatus(c,
"digest -- Outputs an hex signature representing the current DB content.");
@@ -303,10 +297,6 @@ void debugCommand(client *c) {
"structsize -- Return the size of different Redis core C structures.");
blen++; addReplyStatus(c,
"htstats <dbid> -- Return hash table statistics of the specified Redis database.");
- blen++; addReplyStatus(c,
- "jemalloc info -- Show internal jemalloc statistics.");
- blen++; addReplyStatus(c,
- "jemalloc purge -- Force jemalloc to release unused memory.");
setDeferredMultiBulkLength(c,blenp,blen);
} else if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
*((char*)-1) = 'x';
@@ -332,12 +322,12 @@ void debugCommand(client *c) {
if (c->argc >= 3) c->argv[2] = tryObjectEncoding(c->argv[2]);
serverAssertWithInfo(c,c->argv[0],1 == 2);
} else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
- if (rdbSave(server.rdb_filename) != C_OK) {
+ if (rdbSave(server.rdb_filename,NULL) != C_OK) {
addReply(c,shared.err);
return;
}
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
- if (rdbLoad(server.rdb_filename) != C_OK) {
+ if (rdbLoad(server.rdb_filename,NULL) != C_OK) {
addReplyError(c,"Error trying to load the RDB dump");
return;
}
@@ -421,12 +411,26 @@ void debugCommand(client *c) {
addReplyError(c,"Not an sds encoded string.");
} else {
addReplyStatusFormat(c,
- "key_sds_len:%lld, key_sds_avail:%lld, "
- "val_sds_len:%lld, val_sds_avail:%lld",
+ "key_sds_len:%lld, key_sds_avail:%lld, key_zmalloc: %lld, "
+ "val_sds_len:%lld, val_sds_avail:%lld, val_zmalloc: %lld",
(long long) sdslen(key),
(long long) sdsavail(key),
+ (long long) sdsZmallocSize(key),
(long long) sdslen(val->ptr),
- (long long) sdsavail(val->ptr));
+ (long long) sdsavail(val->ptr),
+ (long long) getStringObjectSdsUsedMemory(val));
+ }
+ } else if (!strcasecmp(c->argv[1]->ptr,"ziplist") && c->argc == 3) {
+ robj *o;
+
+ if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nokeyerr))
+ == NULL) return;
+
+ if (o->encoding != OBJ_ENCODING_ZIPLIST) {
+ addReplyError(c,"Not an sds encoded string.");
+ } else {
+ ziplistRepr(o->ptr);
+ addReplyStatus(c,"Ziplist structure printed on stdout");
}
} else if (!strcasecmp(c->argv[1]->ptr,"populate") &&
(c->argc == 3 || c->argc == 4)) {
@@ -520,30 +524,6 @@ void debugCommand(client *c) {
stats = sdscat(stats,buf);
addReplyBulkSds(c,stats);
- } else if (!strcasecmp(c->argv[1]->ptr,"jemalloc") && c->argc == 3) {
-#if defined(USE_JEMALLOC)
- if (!strcasecmp(c->argv[2]->ptr, "info")) {
- sds info = sdsempty();
- je_malloc_stats_print(inputCatSds, &info, NULL);
- addReplyBulkSds(c, info);
- } else if (!strcasecmp(c->argv[2]->ptr, "purge")) {
- char tmp[32];
- unsigned narenas = 0;
- size_t sz = sizeof(unsigned);
- if (!je_mallctl("arenas.narenas", &narenas, &sz, NULL, 0)) {
- sprintf(tmp, "arena.%d.purge", narenas);
- if (!je_mallctl(tmp, NULL, 0, NULL, 0)) {
- addReply(c, shared.ok);
- return;
- }
- }
- addReplyError(c, "Error purging dirty pages");
- } else {
- addReplyErrorFormat(c, "Valid jemalloc debug fields: info, purge");
- }
-#else
- addReplyErrorFormat(c, "jemalloc support not available");
-#endif
} else {
addReplyErrorFormat(c, "Unknown DEBUG subcommand or wrong number of arguments for '%s'",
(char*)c->argv[1]->ptr);
diff --git a/src/dict.c b/src/dict.c
index e37b659e6..b9b2390f1 100644
--- a/src/dict.c
+++ b/src/dict.c
@@ -66,23 +66,11 @@ static unsigned int dict_force_resize_ratio = 5;
static int _dictExpandIfNeeded(dict *ht);
static unsigned long _dictNextPower(unsigned long size);
-static int _dictKeyIndex(dict *ht, const void *key);
+static int _dictKeyIndex(dict *ht, const void *key, unsigned int hash, dictEntry **existing);
static int _dictInit(dict *ht, dictType *type, void *privDataPtr);
/* -------------------------- hash functions -------------------------------- */
-/* Thomas Wang's 32 bit Mix Function */
-unsigned int dictIntHashFunction(unsigned int key)
-{
- key += ~(key << 15);
- key ^= (key >> 10);
- key += (key << 3);
- key ^= (key >> 6);
- key += ~(key << 11);
- key ^= (key >> 16);
- return key;
-}
-
static uint32_t dict_hash_function_seed = 5381;
void dictSetHashFunctionSeed(uint32_t seed) {
@@ -325,29 +313,32 @@ static void _dictRehashStep(dict *d) {
/* Add an element to the target hash table */
int dictAdd(dict *d, void *key, void *val)
{
- dictEntry *entry = dictAddRaw(d,key);
+ dictEntry *entry = dictAddRaw(d,key,NULL);
if (!entry) return DICT_ERR;
dictSetVal(d, entry, val);
return DICT_OK;
}
-/* Low level add. This function adds the entry but instead of setting
- * a value returns the dictEntry structure to the user, that will make
- * sure to fill the value field as he wishes.
+/* Low level add or find:
+ * This function adds the entry but instead of setting a value returns the
+ * dictEntry structure to the user, that will make sure to fill the value
+ * field as he wishes.
*
* This function is also directly exposed to the user API to be called
* mainly in order to store non-pointers inside the hash value, example:
*
- * entry = dictAddRaw(dict,mykey);
+ * entry = dictAddRaw(dict,mykey,NULL);
* if (entry != NULL) dictSetSignedIntegerVal(entry,1000);
*
* Return values:
*
- * If key already exists NULL is returned.
+ * If key already exists NULL is returned, and "*existing" is populated
+ * with the existing entry if existing is not NULL.
+ *
* If key was added, the hash entry is returned to be manipulated by the caller.
*/
-dictEntry *dictAddRaw(dict *d, void *key)
+dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
int index;
dictEntry *entry;
@@ -357,7 +348,7 @@ dictEntry *dictAddRaw(dict *d, void *key)
/* Get the index of the new element, or -1 if
* the element already exists. */
- if ((index = _dictKeyIndex(d, key)) == -1)
+ if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
return NULL;
/* Allocate the memory and store the new entry.
@@ -375,51 +366,57 @@ dictEntry *dictAddRaw(dict *d, void *key)
return entry;
}
-/* Add an element, discarding the old if the key already exists.
+/* Add or Overwrite:
+ * Add an element, discarding the old value if the key already exists.
* Return 1 if the key was added from scratch, 0 if there was already an
* element with such key and dictReplace() just performed a value update
* operation. */
int dictReplace(dict *d, void *key, void *val)
{
- dictEntry *entry, auxentry;
+ dictEntry *entry, *existing, auxentry;
/* Try to add the element. If the key
* does not exists dictAdd will suceed. */
- if (dictAdd(d, key, val) == DICT_OK)
+ entry = dictAddRaw(d,key,&existing);
+ if (entry) {
+ dictSetVal(d, entry, val);
return 1;
- /* It already exists, get the entry */
- entry = dictFind(d, key);
+ }
+
/* Set the new value and free the old one. Note that it is important
* to do that in this order, as the value may just be exactly the same
* as the previous one. In this context, think to reference counting,
* you want to increment (set), and then decrement (free), and not the
* reverse. */
- auxentry = *entry;
- dictSetVal(d, entry, val);
+ auxentry = *existing;
+ dictSetVal(d, existing, val);
dictFreeVal(d, &auxentry);
return 0;
}
-/* dictReplaceRaw() is simply a version of dictAddRaw() that always
+/* Add or Find:
+ * dictAddOrFind() is simply a version of dictAddRaw() that always
* returns the hash entry of the specified key, even if the key already
* exists and can't be added (in that case the entry of the already
* existing key is returned.)
*
* See dictAddRaw() for more information. */
-dictEntry *dictReplaceRaw(dict *d, void *key) {
- dictEntry *entry = dictFind(d,key);
-
- return entry ? entry : dictAddRaw(d,key);
+dictEntry *dictAddOrFind(dict *d, void *key) {
+ dictEntry *entry, *existing;
+ entry = dictAddRaw(d,key,&existing);
+ return entry ? entry : existing;
}
-/* Search and remove an element */
-static int dictGenericDelete(dict *d, const void *key, int nofree)
-{
+/* Search and remove an element. This is an helper function for
+ * dictDelete() and dictUnlink(), please check the top comment
+ * of those functions. */
+static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
unsigned int h, idx;
dictEntry *he, *prevHe;
int table;
- if (d->ht[0].size == 0) return DICT_ERR; /* d->ht[0].table is NULL */
+ if (d->ht[0].used == 0 && d->ht[1].used == 0) return NULL;
+
if (dictIsRehashing(d)) _dictRehashStep(d);
h = dictHashKey(d, key);
@@ -437,27 +434,59 @@ static int dictGenericDelete(dict *d, const void *key, int nofree)
if (!nofree) {
dictFreeKey(d, he);
dictFreeVal(d, he);
+ zfree(he);
}
- zfree(he);
d->ht[table].used--;
- return DICT_OK;
+ return he;
}
prevHe = he;
he = he->next;
}
if (!dictIsRehashing(d)) break;
}
- return DICT_ERR; /* not found */
+ return NULL; /* not found */
}
+/* Remove an element, returning DICT_OK on success or DICT_ERR if the
+ * element was not found. */
int dictDelete(dict *ht, const void *key) {
- return dictGenericDelete(ht,key,0);
+ return dictGenericDelete(ht,key,0) ? DICT_OK : DICT_ERR;
}
-int dictDeleteNoFree(dict *ht, const void *key) {
+/* Remove an element from the table, but without actually releasing
+ * the key, value and dictionary entry. The dictionary entry is returned
+ * if the element was found (and unlinked from the table), and the user
+ * should later call `dictFreeUnlinkedEntry()` with it in order to release it.
+ * Otherwise if the key is not found, NULL is returned.
+ *
+ * This function is useful when we want to remove something from the hash
+ * table but want to use its value before actually deleting the entry.
+ * Without this function the pattern would require two lookups:
+ *
+ * entry = dictFind(...);
+ * // Do something with entry
+ * dictDelete(dictionary,entry);
+ *
+ * Thanks to this function it is possible to avoid this, and use
+ * instead:
+ *
+ * entry = dictUnlink(dictionary,entry);
+ * // Do something with entry
+ * dictFreeUnlinkedEntry(entry); // <- This does not need to lookup again.
+ */
+dictEntry *dictUnlink(dict *ht, const void *key) {
return dictGenericDelete(ht,key,1);
}
+/* You need to call this function to really free the entry after a call
+ * to dictUnlink(). It's safe to call this function with 'he' = NULL. */
+void dictFreeUnlinkedEntry(dict *d, dictEntry *he) {
+ if (he == NULL) return;
+ dictFreeKey(d, he);
+ dictFreeVal(d, he);
+ zfree(he);
+}
+
/* Destroy an entire dictionary */
int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
unsigned long i;
@@ -966,27 +995,29 @@ static unsigned long _dictNextPower(unsigned long size)
/* Returns the index of a free slot that can be populated with
* a hash entry for the given 'key'.
- * If the key already exists, -1 is returned.
+ * If the key already exists, -1 is returned
+ * and the optional output parameter may be filled.
*
* Note that if we are in the process of rehashing the hash table, the
* index is always returned in the context of the second (new) hash table. */
-static int _dictKeyIndex(dict *d, const void *key)
+static int _dictKeyIndex(dict *d, const void *key, unsigned int hash, dictEntry **existing)
{
- unsigned int h, idx, table;
+ unsigned int idx, table;
dictEntry *he;
+ if (existing) *existing = NULL;
/* Expand the hash table if needed */
if (_dictExpandIfNeeded(d) == DICT_ERR)
return -1;
- /* Compute the key hash value */
- h = dictHashKey(d, key);
for (table = 0; table <= 1; table++) {
- idx = h & d->ht[table].sizemask;
+ idx = hash & d->ht[table].sizemask;
/* Search if this slot does not already contain the given key */
he = d->ht[table].table[idx];
while(he) {
- if (key==he->key || dictCompareKeys(d, key, he->key))
+ if (key==he->key || dictCompareKeys(d, key, he->key)) {
+ if (existing) *existing = he;
return -1;
+ }
he = he->next;
}
if (!dictIsRehashing(d)) break;
diff --git a/src/dict.h b/src/dict.h
index 967a238b6..04b247a25 100644
--- a/src/dict.h
+++ b/src/dict.h
@@ -106,19 +106,19 @@ typedef void (dictScanFunction)(void *privdata, const dictEntry *de);
#define dictSetVal(d, entry, _val_) do { \
if ((d)->type->valDup) \
- entry->v.val = (d)->type->valDup((d)->privdata, _val_); \
+ (entry)->v.val = (d)->type->valDup((d)->privdata, _val_); \
else \
- entry->v.val = (_val_); \
+ (entry)->v.val = (_val_); \
} while(0)
#define dictSetSignedIntegerVal(entry, _val_) \
- do { entry->v.s64 = _val_; } while(0)
+ do { (entry)->v.s64 = _val_; } while(0)
#define dictSetUnsignedIntegerVal(entry, _val_) \
- do { entry->v.u64 = _val_; } while(0)
+ do { (entry)->v.u64 = _val_; } while(0)
#define dictSetDoubleVal(entry, _val_) \
- do { entry->v.d = _val_; } while(0)
+ do { (entry)->v.d = _val_; } while(0)
#define dictFreeKey(d, entry) \
if ((d)->type->keyDestructor) \
@@ -126,9 +126,9 @@ typedef void (dictScanFunction)(void *privdata, const dictEntry *de);
#define dictSetKey(d, entry, _key_) do { \
if ((d)->type->keyDup) \
- entry->key = (d)->type->keyDup((d)->privdata, _key_); \
+ (entry)->key = (d)->type->keyDup((d)->privdata, _key_); \
else \
- entry->key = (_key_); \
+ (entry)->key = (_key_); \
} while(0)
#define dictCompareKeys(d, key1, key2) \
@@ -150,11 +150,12 @@ typedef void (dictScanFunction)(void *privdata, const dictEntry *de);
dict *dictCreate(dictType *type, void *privDataPtr);
int dictExpand(dict *d, unsigned long size);
int dictAdd(dict *d, void *key, void *val);
-dictEntry *dictAddRaw(dict *d, void *key);
+dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing);
+dictEntry *dictAddOrFind(dict *d, void *key);
int dictReplace(dict *d, void *key, void *val);
-dictEntry *dictReplaceRaw(dict *d, void *key);
int dictDelete(dict *d, const void *key);
-int dictDeleteNoFree(dict *d, const void *key);
+dictEntry *dictUnlink(dict *ht, const void *key);
+void dictFreeUnlinkedEntry(dict *d, dictEntry *he);
void dictRelease(dict *d);
dictEntry * dictFind(dict *d, const void *key);
void *dictFetchValue(dict *d, const void *key);
diff --git a/src/expire.c b/src/expire.c
index ccfa959ef..637139f63 100644
--- a/src/expire.c
+++ b/src/expire.c
@@ -217,6 +217,154 @@ void activeExpireCycle(int type) {
}
/*-----------------------------------------------------------------------------
+ * Expires of keys created in writable slaves
+ *
+ * Normally slaves do not process expires: they wait the masters to synthesize
+ * DEL operations in order to retain consistency. However writable slaves are
+ * an exception: if a key is created in the slave and an expire is assigned
+ * to it, we need a way to expire such a key, since the master does not know
+ * anything about such a key.
+ *
+ * In order to do so, we track keys created in the slave side with an expire
+ * set, and call the expireSlaveKeys() function from time to time in order to
+ * reclaim the keys if they already expired.
+ *
+ * Note that the use case we are trying to cover here, is a popular one where
+ * slaves are put in writable mode in order to compute slow operations in
+ * the slave side that are mostly useful to actually read data in a more
+ * processed way. Think at sets intersections in a tmp key, with an expire so
+ * that it is also used as a cache to avoid intersecting every time.
+ *
+ * This implementation is currently not perfect but a lot better than leaking
+ * the keys as implemented in 3.2.
+ *----------------------------------------------------------------------------*/
+
+/* The dictionary where we remember key names and database ID of keys we may
+ * want to expire from the slave. Since this function is not often used we
+ * don't even care to initialize the database at startup. We'll do it once
+ * the feature is used the first time, that is, when rememberSlaveKeyWithExpire()
+ * is called.
+ *
+ * The dictionary has an SDS string representing the key as the hash table
+ * key, while the value is a 64 bit unsigned integer with the bits corresponding
+ * to the DB where the keys may exist set to 1. Currently the keys created
+ * with a DB id > 63 are not expired, but a trivial fix is to set the bitmap
+ * to the max 64 bit unsigned value when we know there is a key with a DB
+ * ID greater than 63, and check all the configured DBs in such a case. */
+dict *slaveKeysWithExpire = NULL;
+
+/* Check the set of keys created by the master with an expire set in order to
+ * check if they should be evicted. */
+void expireSlaveKeys(void) {
+ if (slaveKeysWithExpire == NULL ||
+ dictSize(slaveKeysWithExpire) == 0) return;
+
+ int cycles = 0, noexpire = 0;
+ mstime_t start = mstime();
+ while(1) {
+ dictEntry *de = dictGetRandomKey(slaveKeysWithExpire);
+ sds keyname = dictGetKey(de);
+ uint64_t dbids = dictGetUnsignedIntegerVal(de);
+ uint64_t new_dbids = 0;
+
+ /* Check the key against every database corresponding to the
+ * bits set in the value bitmap. */
+ int dbid = 0;
+ while(dbids && dbid < server.dbnum) {
+ if ((dbids & 1) != 0) {
+ redisDb *db = server.db+dbid;
+ dictEntry *expire = dictFind(db->expires,keyname);
+ int expired = 0;
+
+ if (expire &&
+ activeExpireCycleTryExpire(server.db+dbid,expire,start))
+ {
+ expired = 1;
+ }
+
+ /* If the key was not expired in this DB, we need to set the
+ * corresponding bit in the new bitmap we set as value.
+ * At the end of the loop if the bitmap is zero, it means we
+ * no longer need to keep track of this key. */
+ if (expire && !expired) {
+ noexpire++;
+ new_dbids |= (uint64_t)1 << dbid;
+ }
+ }
+ dbid++;
+ dbids >>= 1;
+ }
+
+ /* Set the new bitmap as value of the key, in the dictionary
+ * of keys with an expire set directly in the writable slave. Otherwise
+ * if the bitmap is zero, we no longer need to keep track of it. */
+ if (new_dbids)
+ dictSetUnsignedIntegerVal(de,new_dbids);
+ else
+ dictDelete(slaveKeysWithExpire,keyname);
+
+ /* Stop conditions: found 3 keys we cna't expire in a row or
+ * time limit was reached. */
+ cycles++;
+ if (noexpire > 3) break;
+ if ((cycles % 64) == 0 && mstime()-start > 1) break;
+ if (dictSize(slaveKeysWithExpire) == 0) break;
+ }
+}
+
+/* Track keys that received an EXPIRE or similar command in the context
+ * of a writable slave. */
+void rememberSlaveKeyWithExpire(redisDb *db, robj *key) {
+ if (slaveKeysWithExpire == NULL) {
+ static dictType dt = {
+ dictSdsHash, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ dictSdsKeyCompare, /* key compare */
+ dictSdsDestructor, /* key destructor */
+ NULL /* val destructor */
+ };
+ slaveKeysWithExpire = dictCreate(&dt,NULL);
+ }
+ if (db->id > 63) return;
+
+ dictEntry *de = dictAddOrFind(slaveKeysWithExpire,key->ptr);
+ /* If the entry was just created, set it to a copy of the SDS string
+ * representing the key: we don't want to need to take those keys
+ * in sync with the main DB. The keys will be removed by expireSlaveKeys()
+ * as it scans to find keys to remove. */
+ if (de->key == key->ptr) {
+ de->key = sdsdup(key->ptr);
+ dictSetUnsignedIntegerVal(de,0);
+ }
+
+ uint64_t dbids = dictGetUnsignedIntegerVal(de);
+ dbids |= (uint64_t)1 << db->id;
+ dictSetUnsignedIntegerVal(de,dbids);
+}
+
+/* Return the number of keys we are tracking. */
+size_t getSlaveKeyWithExpireCount(void) {
+ if (slaveKeysWithExpire == NULL) return 0;
+ return dictSize(slaveKeysWithExpire);
+}
+
+/* Remove the keys in the hash table. We need to do that when data is
+ * flushed from the server. We may receive new keys from the master with
+ * the same name/db and it is no longer a good idea to expire them.
+ *
+ * Note: technically we should handle the case of a single DB being flushed
+ * but it is not worth it since anyway race conditions using the same set
+ * of key names in a wriatable slave and in its master will lead to
+ * inconsistencies. This is just a best-effort thing we do. */
+void flushSlaveKeysWithExpireList(void) {
+ if (slaveKeysWithExpire) {
+ dictRelease(slaveKeysWithExpire);
+ slaveKeysWithExpire = NULL;
+ }
+}
+
+/*-----------------------------------------------------------------------------
* Expires Commands
*----------------------------------------------------------------------------*/
@@ -265,7 +413,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
addReply(c, shared.cone);
return;
} else {
- setExpire(c->db,key,when);
+ setExpire(c,c->db,key,when);
addReply(c,shared.cone);
signalModifiedKey(c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
diff --git a/src/geo.c b/src/geo.c
index 331d22435..8423931af 100644
--- a/src/geo.c
+++ b/src/geo.c
@@ -161,7 +161,7 @@ double extractDistanceOrReply(client *c, robj **argv,
addReplyError(c,"radius cannot be negative");
return -1;
}
-
+
double to_meters = extractUnitOrReply(c,argv[1]);
if (to_meters < 0) {
return -1;
@@ -326,6 +326,7 @@ int membersOfGeoHashBox(robj *zobj, GeoHashBits hash, geoArray *ga, double lon,
int membersOfAllNeighbors(robj *zobj, GeoHashRadius n, double lon, double lat, double radius, geoArray *ga) {
GeoHashBits neighbors[9];
unsigned int i, count = 0, last_processed = 0;
+ int debugmsg = 0;
neighbors[0] = n.hash;
neighbors[1] = n.neighbors.north;
@@ -340,8 +341,26 @@ int membersOfAllNeighbors(robj *zobj, GeoHashRadius n, double lon, double lat, d
/* For each neighbor (*and* our own hashbox), get all the matching
* members and add them to the potential result list. */
for (i = 0; i < sizeof(neighbors) / sizeof(*neighbors); i++) {
- if (HASHISZERO(neighbors[i]))
+ if (HASHISZERO(neighbors[i])) {
+ if (debugmsg) D("neighbors[%d] is zero",i);
continue;
+ }
+
+ /* Debugging info. */
+ if (debugmsg) {
+ GeoHashRange long_range, lat_range;
+ geohashGetCoordRange(&long_range,&lat_range);
+ GeoHashArea myarea = {{0}};
+ geohashDecode(long_range, lat_range, neighbors[i], &myarea);
+
+ /* Dump center square. */
+ D("neighbors[%d]:\n",i);
+ D("area.longitude.min: %f\n", myarea.longitude.min);
+ D("area.longitude.max: %f\n", myarea.longitude.max);
+ D("area.latitude.min: %f\n", myarea.latitude.min);
+ D("area.latitude.max: %f\n", myarea.latitude.max);
+ D("\n");
+ }
/* When a huge Radius (in the 5000 km range or more) is used,
* adjacent neighbors can be the same, leading to duplicated
@@ -350,7 +369,11 @@ int membersOfAllNeighbors(robj *zobj, GeoHashRadius n, double lon, double lat, d
if (last_processed &&
neighbors[i].bits == neighbors[last_processed].bits &&
neighbors[i].step == neighbors[last_processed].step)
+ {
+ if (debugmsg)
+ D("Skipping processing of %d, same as previous\n",i);
continue;
+ }
count += membersOfGeoHashBox(zobj, neighbors[i], ga, lon, lat, radius);
last_processed = i;
}
@@ -661,16 +684,15 @@ void geohashCommand(client *c) {
int j;
/* Look up the requested zset */
- robj *zobj = NULL;
- if ((zobj = lookupKeyReadOrReply(c, c->argv[1], shared.emptymultibulk))
- == NULL || checkType(c, zobj, OBJ_ZSET)) return;
+ robj *zobj = lookupKeyRead(c->db, c->argv[1]);
+ if (zobj && checkType(c, zobj, OBJ_ZSET)) return;
/* Geohash elements one after the other, using a null bulk reply for
* missing elements. */
addReplyMultiBulkLen(c,c->argc-2);
for (j = 2; j < c->argc; j++) {
double score;
- if (zsetScore(zobj, c->argv[j]->ptr, &score) == C_ERR) {
+ if (!zobj || zsetScore(zobj, c->argv[j]->ptr, &score) == C_ERR) {
addReply(c,shared.nullbulk);
} else {
/* The internal format we use for geocoding is a bit different
@@ -715,16 +737,15 @@ void geoposCommand(client *c) {
int j;
/* Look up the requested zset */
- robj *zobj = NULL;
- if ((zobj = lookupKeyReadOrReply(c, c->argv[1], shared.emptymultibulk))
- == NULL || checkType(c, zobj, OBJ_ZSET)) return;
+ robj *zobj = lookupKeyRead(c->db, c->argv[1]);
+ if (zobj && checkType(c, zobj, OBJ_ZSET)) return;
/* Report elements one after the other, using a null bulk reply for
* missing elements. */
addReplyMultiBulkLen(c,c->argc-2);
for (j = 2; j < c->argc; j++) {
double score;
- if (zsetScore(zobj, c->argv[j]->ptr, &score) == C_ERR) {
+ if (!zobj || zsetScore(zobj, c->argv[j]->ptr, &score) == C_ERR) {
addReply(c,shared.nullmultibulk);
} else {
/* Decode... */
@@ -759,7 +780,7 @@ void geodistCommand(client *c) {
/* Look up the requested zset */
robj *zobj = NULL;
- if ((zobj = lookupKeyReadOrReply(c, c->argv[1], shared.emptybulk))
+ if ((zobj = lookupKeyReadOrReply(c, c->argv[1], shared.nullbulk))
== NULL || checkType(c, zobj, OBJ_ZSET)) return;
/* Get the scores. We need both otherwise NULL is returned. */
diff --git a/src/geohash_helper.c b/src/geohash_helper.c
index 139bcea11..77d8ab392 100644
--- a/src/geohash_helper.c
+++ b/src/geohash_helper.c
@@ -82,30 +82,18 @@ uint8_t geohashEstimateStepsByRadius(double range_meters, double lat) {
return step;
}
+/* Return the bounding box of the search area centered at latitude,longitude
+ * having a radius of radius_meter. bounds[0] - bounds[2] is the minimum
+ * and maxium longitude, while bounds[1] - bounds[3] is the minimum and
+ * maximum latitude. */
int geohashBoundingBox(double longitude, double latitude, double radius_meters,
double *bounds) {
if (!bounds) return 0;
- double lonr, latr;
- lonr = deg_rad(longitude);
- latr = deg_rad(latitude);
-
- if (radius_meters > EARTH_RADIUS_IN_METERS)
- radius_meters = EARTH_RADIUS_IN_METERS;
- double distance = radius_meters / EARTH_RADIUS_IN_METERS;
- double min_latitude = latr - distance;
- double max_latitude = latr + distance;
-
- /* Note: we're being lazy and not accounting for coordinates near poles */
- double min_longitude, max_longitude;
- double difference_longitude = asin(sin(distance) / cos(latr));
- min_longitude = lonr - difference_longitude;
- max_longitude = lonr + difference_longitude;
-
- bounds[0] = rad_deg(min_longitude);
- bounds[1] = rad_deg(min_latitude);
- bounds[2] = rad_deg(max_longitude);
- bounds[3] = rad_deg(max_latitude);
+ bounds[0] = longitude - rad_deg(radius_meters/EARTH_RADIUS_IN_METERS/cos(deg_rad(latitude)));
+ bounds[2] = longitude + rad_deg(radius_meters/EARTH_RADIUS_IN_METERS/cos(deg_rad(latitude)));
+ bounds[1] = latitude - rad_deg(radius_meters/EARTH_RADIUS_IN_METERS);
+ bounds[3] = latitude + rad_deg(radius_meters/EARTH_RADIUS_IN_METERS);
return 1;
}
@@ -158,35 +146,13 @@ GeoHashRadius geohashGetAreasByRadius(double longitude, double latitude, double
< radius_meters) decrease_step = 1;
}
- if (decrease_step) {
+ if (steps > 1 && decrease_step) {
steps--;
geohashEncode(&long_range,&lat_range,longitude,latitude,steps,&hash);
geohashNeighbors(&hash,&neighbors);
geohashDecode(long_range,lat_range,hash,&area);
}
- /* Example debug info. This turns to be very useful every time there is
- * to investigate radius search potential bugs. So better to leave it
- * here. */
- if (0) {
- GeoHashArea myarea = {{0}};
- geohashDecode(long_range, lat_range, neighbors.west, &myarea);
-
- /* Dump West. */
- D("Neighbors");
- D("area.longitude.min: %f\n", myarea.longitude.min);
- D("area.longitude.max: %f\n", myarea.longitude.max);
- D("area.latitude.min: %f\n", myarea.latitude.min);
- D("area.latitude.max: %f\n", myarea.latitude.max);
-
- /* Dump center square. */
- D("Area");
- D("area.longitude.min: %f\n", area.longitude.min);
- D("area.longitude.max: %f\n", area.longitude.max);
- D("area.latitude.min: %f\n", area.latitude.min);
- D("area.latitude.max: %f\n", area.latitude.max);
- }
-
/* Exclude the search areas that are useless. */
if (area.latitude.min < min_lat) {
GZERO(neighbors.south);
diff --git a/src/hyperloglog.c b/src/hyperloglog.c
index 8ccc16be2..0800bf59d 100644
--- a/src/hyperloglog.c
+++ b/src/hyperloglog.c
@@ -994,32 +994,21 @@ uint64_t hllCount(struct hllhdr *hdr, int *invalid) {
serverPanic("Unknown HyperLogLog encoding in hllCount()");
}
- /* Muliply the inverse of E for alpha_m * m^2 to have the raw estimate. */
- E = (1/E)*alpha*m*m;
-
- /* Use the LINEARCOUNTING algorithm for small cardinalities.
- * For larger values but up to 72000 HyperLogLog raw approximation is
- * used since linear counting error starts to increase. However HyperLogLog
- * shows a strong bias in the range 2.5*16384 - 72000, so we try to
- * compensate for it. */
- if (E < m*2.5 && ez != 0) {
- E = m*log(m/ez); /* LINEARCOUNTING() */
- } else if (m == 16384 && E < 72000) {
- /* We did polynomial regression of the bias for this range, this
- * way we can compute the bias for a given cardinality and correct
- * according to it. Only apply the correction for P=14 that's what
- * we use and the value the correction was verified with. */
- double bias = 5.9119*1.0e-18*(E*E*E*E)
- -1.4253*1.0e-12*(E*E*E)+
- 1.2940*1.0e-7*(E*E)
- -5.2921*1.0e-3*E+
- 83.3216;
- E -= E*(bias/100);
- }
- /* We don't apply the correction for E > 1/30 of 2^32 since we use
- * a 64 bit function and 6 bit counters. To apply the correction for
- * 1/30 of 2^64 is not needed since it would require a huge set
- * to approach such a value. */
+ /* Apply loglog-beta to the raw estimate. See:
+ * "LogLog-Beta and More: A New Algorithm for Cardinality Estimation
+ * Based on LogLog Counting" Jason Qin, Denys Kim, Yumei Tung
+ * arXiv:1612.02284 */
+ double zl = log(ez + 1);
+ double beta = -0.370393911*ez +
+ 0.070471823*zl +
+ 0.17393686*pow(zl,2) +
+ 0.16339839*pow(zl,3) +
+ -0.09237745*pow(zl,4) +
+ 0.03738027*pow(zl,5) +
+ -0.005384159*pow(zl,6) +
+ 0.00042419*pow(zl,7);
+
+ E = llroundl(alpha*m*(m-ez)*(1/(E+beta)));
return (uint64_t) E;
}
diff --git a/src/latency.c b/src/latency.c
index 6f8b2a59f..53e0ec7be 100644
--- a/src/latency.c
+++ b/src/latency.c
@@ -79,7 +79,7 @@ int THPIsEnabled(void) {
* value of the function is non-zero, the process is being targeted by
* THP support, and is likely to have memory usage / latency issues. */
int THPGetAnonHugePagesSize(void) {
- return zmalloc_get_smap_bytes_by_field("AnonHugePages:");
+ return zmalloc_get_smap_bytes_by_field("AnonHugePages:",-1);
}
/* ---------------------------- Latency API --------------------------------- */
diff --git a/src/lazyfree.c b/src/lazyfree.c
index dba3f00e2..c05252159 100644
--- a/src/lazyfree.c
+++ b/src/lazyfree.c
@@ -57,7 +57,7 @@ int dbAsyncDelete(redisDb *db, robj *key) {
/* If the value is composed of a few allocations, to free in a lazy way
* is actually just slower... So under a certain limit we just free
* the object synchronously. */
- dictEntry *de = dictFind(db->dict,key->ptr);
+ dictEntry *de = dictUnlink(db->dict,key->ptr);
if (de) {
robj *val = dictGetVal(de);
size_t free_effort = lazyfreeGetFreeEffort(val);
@@ -73,7 +73,8 @@ int dbAsyncDelete(redisDb *db, robj *key) {
/* Release the key-val pair, or just the key if we set the val
* field to NULL in order to lazy free it later. */
- if (dictDelete(db->dict,key->ptr) == DICT_OK) {
+ if (de) {
+ dictFreeUnlinkedEntry(db->dict,de);
if (server.cluster_enabled) slotToKeyDel(key);
return 1;
} else {
diff --git a/src/module.c b/src/module.c
index feab67dfb..a5b3d52ae 100644
--- a/src/module.c
+++ b/src/module.c
@@ -1,3 +1,32 @@
+/*
+ * Copyright (c) 2016, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
#include "server.h"
#include "cluster.h"
#include <dlfcn.h>
@@ -76,6 +105,7 @@ struct RedisModuleCtx {
int flags; /* REDISMODULE_CTX_... flags. */
void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */
int postponed_arrays_count; /* Number of entries in postponed_arrays. */
+ void *blocked_privdata; /* Privdata set when unblocking a clinet. */
/* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */
int *keys_pos;
@@ -85,10 +115,12 @@ struct RedisModuleCtx {
};
typedef struct RedisModuleCtx RedisModuleCtx;
-#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, 0, NULL}
+#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL}
#define REDISMODULE_CTX_MULTI_EMITTED (1<<0)
#define REDISMODULE_CTX_AUTO_MEMORY (1<<1)
#define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2)
+#define REDISMODULE_CTX_BLOCKED_REPLY (1<<3)
+#define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<4)
/* This represents a Redis key opened with RM_OpenKey(). */
struct RedisModuleKey {
@@ -154,6 +186,23 @@ typedef struct RedisModuleCallReply {
} val;
} RedisModuleCallReply;
+/* Structure representing a blocked client. We get a pointer to such
+ * an object when blocking from modules. */
+typedef struct RedisModuleBlockedClient {
+ client *client; /* Pointer to the blocked client. or NULL if the client
+ was destroyed during the life of this object. */
+ RedisModule *module; /* Module blocking the client. */
+ RedisModuleCmdFunc reply_callback; /* Reply callback on normal completion.*/
+ RedisModuleCmdFunc timeout_callback; /* Reply callback on timeout. */
+ void (*free_privdata)(void *); /* privdata cleanup callback. */
+ void *privdata; /* Module private data that may be used by the reply
+ or timeout callback. It is set via the
+ RedisModule_UnblockClient() API. */
+} RedisModuleBlockedClient;
+
+static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
+static list *moduleUnblockedClients;
+
/* --------------------------------------------------------------------------
* Prototypes
* -------------------------------------------------------------------------- */
@@ -374,26 +423,36 @@ void moduleFreeContext(RedisModuleCtx *ctx) {
}
}
-/* This Redis command binds the normal Redis command invocation with commands
- * exported by modules. */
-void RedisModuleCommandDispatcher(client *c) {
- RedisModuleCommandProxy *cp = (void*)(unsigned long)c->cmd->getkeys_proc;
- RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
+/* Helper function for when a command callback is called, in order to handle
+ * details needed to correctly replicate commands. */
+void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
+ client *c = ctx->client;
- ctx.module = cp->module;
- ctx.client = c;
- cp->func(&ctx,(void**)c->argv,c->argc);
+ /* We don't want any automatic propagation here since in modules we handle
+ * replication / AOF propagation in explicit ways. */
preventCommandPropagation(c);
/* Handle the replication of the final EXEC, since whatever a command
* emits is always wrappered around MULTI/EXEC. */
- if (ctx.flags & REDISMODULE_CTX_MULTI_EMITTED) {
+ if (ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) {
robj *propargv[1];
propargv[0] = createStringObject("EXEC",4);
alsoPropagate(server.execCommand,c->db->id,propargv,1,
PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(propargv[0]);
}
+}
+
+/* This Redis command binds the normal Redis command invocation with commands
+ * exported by modules. */
+void RedisModuleCommandDispatcher(client *c) {
+ RedisModuleCommandProxy *cp = (void*)(unsigned long)c->cmd->getkeys_proc;
+ RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
+
+ ctx.module = cp->module;
+ ctx.client = c;
+ cp->func(&ctx,(void**)c->argv,c->argc);
+ moduleHandlePropagationAfterCommandCallback(&ctx);
moduleFreeContext(&ctx);
}
@@ -589,6 +648,11 @@ void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int api
ctx->module = module;
}
+/* Return the current UNIX time in milliseconds. */
+long long RM_Milliseconds(void) {
+ return mstime();
+}
+
/* --------------------------------------------------------------------------
* Automatic memory management for modules
* -------------------------------------------------------------------------- */
@@ -681,13 +745,33 @@ void autoMemoryCollect(RedisModuleCtx *ctx) {
*
* The string is created by copying the `len` bytes starting
* at `ptr`. No reference is retained to the passed buffer. */
-RedisModuleString *RM_CreateString(RedisModuleCtx *ctx, const char *ptr, size_t len)
-{
+RedisModuleString *RM_CreateString(RedisModuleCtx *ctx, const char *ptr, size_t len) {
RedisModuleString *o = createStringObject(ptr,len);
autoMemoryAdd(ctx,REDISMODULE_AM_STRING,o);
return o;
}
+
+/* Create a new module string object from a printf format and arguments.
+ * The returned string must be freed with RedisModule_FreeString(), unless
+ * automatic memory is enabled.
+ *
+ * The string is created using the sds formatter function sdscatvprintf(). */
+RedisModuleString *RM_CreateStringPrintf(RedisModuleCtx *ctx, const char *fmt, ...) {
+ sds s = sdsempty();
+
+ va_list ap;
+ va_start(ap, fmt);
+ s = sdscatvprintf(s, fmt, ap);
+ va_end(ap);
+
+ RedisModuleString *o = createObject(OBJ_STRING, s);
+ autoMemoryAdd(ctx,REDISMODULE_AM_STRING,o);
+
+ return o;
+}
+
+
/* Like RedisModule_CreatString(), but creates a string starting from a long long
* integer instead of taking a buffer and its length.
*
@@ -1258,7 +1342,7 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) {
return REDISMODULE_ERR;
if (expire != REDISMODULE_NO_EXPIRE) {
expire += mstime();
- setExpire(key->db,key->key,expire);
+ setExpire(key->ctx->client,key->db,key->key,expire);
} else {
removeExpire(key->db,key->key);
}
@@ -1774,12 +1858,12 @@ int RM_ZsetRangeNext(RedisModuleKey *key) {
} else {
/* Are we still within the range? */
if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE &&
- !zslValueLteMax(ln->score,&key->zrs))
+ !zslValueLteMax(next->score,&key->zrs))
{
key->zer = 1;
return 0;
} else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) {
- if (!zslLexValueLteMax(ln->ele,&key->zlrs)) {
+ if (!zslLexValueLteMax(next->ele,&key->zlrs)) {
key->zer = 1;
return 0;
}
@@ -1837,7 +1921,7 @@ int RM_ZsetRangePrev(RedisModuleKey *key) {
} else {
/* Are we still within the range? */
if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE &&
- !zslValueGteMin(ln->score,&key->zrs))
+ !zslValueGteMin(prev->score,&key->zrs))
{
key->zer = 1;
return 0;
@@ -2202,8 +2286,10 @@ void RM_FreeCallReply_Rec(RedisModuleCallReply *reply, int freenested){
* to have the first level function to return on nested replies, but only
* if called by the module API. */
void RM_FreeCallReply(RedisModuleCallReply *reply) {
+
+ RedisModuleCtx *ctx = reply->ctx;
RM_FreeCallReply_Rec(reply,0);
- autoMemoryFreed(reply->ctx,REDISMODULE_AM_REPLY,reply);
+ autoMemoryFreed(ctx,REDISMODULE_AM_REPLY,reply);
}
/* Return the reply type. */
@@ -2582,7 +2668,7 @@ void moduleTypeNameByID(char *name, uint64_t moduleid) {
/* Register a new data type exported by the module. The parameters are the
* following. Please for in depth documentation check the modules API
- * documentation, especially the INTRO.md file.
+ * documentation, especially the TYPES.md file.
*
* * **name**: A 9 characters data type name that MUST be unique in the Redis
* Modules ecosystem. Be creative... and there will be no collisions. Use
@@ -2601,12 +2687,31 @@ void moduleTypeNameByID(char *name, uint64_t moduleid) {
* still load old data produced by an older version if the rdb_load
* callback is able to check the encver value and act accordingly.
* The encver must be a positive value between 0 and 1023.
+ * * **typemethods_ptr** is a pointer to a RedisModuleTypeMethods structure
+ * that should be populated with the methods callbacks and structure
+ * version, like in the following example:
+ *
+ * RedisModuleTypeMethods tm = {
+ * .version = REDISMODULE_TYPE_METHOD_VERSION,
+ * .rdb_load = myType_RDBLoadCallBack,
+ * .rdb_save = myType_RDBSaveCallBack,
+ * .aof_rewrite = myType_AOFRewriteCallBack,
+ * .free = myType_FreeCallBack,
+ *
+ * // Optional fields
+ * .digest = myType_DigestCallBack,
+ * .mem_usage = myType_MemUsageCallBack,
+ * }
+ *
* * **rdb_load**: A callback function pointer that loads data from RDB files.
* * **rdb_save**: A callback function pointer that saves data to RDB files.
* * **aof_rewrite**: A callback function pointer that rewrites data as commands.
* * **digest**: A callback function pointer that is used for `DEBUG DIGEST`.
* * **free**: A callback function pointer that can free a type value.
*
+ * The **digest* and **mem_usage** methods should currently be omitted since
+ * they are not yet implemented inside the Redis modules core.
+ *
* Note: the module name "AAAAAAAAA" is reserved and produces an error, it
* happens to be pretty lame as well.
*
@@ -2625,19 +2730,33 @@ void moduleTypeNameByID(char *name, uint64_t moduleid) {
* BalancedTreeType = RM_CreateDataType(...);
* }
*/
-moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver, moduleTypeLoadFunc rdb_load, moduleTypeSaveFunc rdb_save, moduleTypeRewriteFunc aof_rewrite, moduleTypeDigestFunc digest, moduleTypeFreeFunc free) {
+moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver, void *typemethods_ptr) {
uint64_t id = moduleTypeEncodeId(name,encver);
if (id == 0) return NULL;
if (moduleTypeLookupModuleByName(name) != NULL) return NULL;
- moduleType *mt = zmalloc(sizeof(*mt));
+ long typemethods_version = ((long*)typemethods_ptr)[0];
+ if (typemethods_version == 0) return NULL;
+
+ struct typemethods {
+ uint64_t version;
+ moduleTypeLoadFunc rdb_load;
+ moduleTypeSaveFunc rdb_save;
+ moduleTypeRewriteFunc aof_rewrite;
+ moduleTypeDigestFunc digest;
+ moduleTypeMemUsageFunc mem_usage;
+ moduleTypeFreeFunc free;
+ } *tms = (struct typemethods*) typemethods_ptr;
+
+ moduleType *mt = zcalloc(sizeof(*mt));
mt->id = id;
mt->module = ctx->module;
- mt->rdb_load = rdb_load;
- mt->rdb_save = rdb_save;
- mt->aof_rewrite = aof_rewrite;
- mt->digest = digest;
- mt->free = free;
+ mt->rdb_load = tms->rdb_load;
+ mt->rdb_save = tms->rdb_save;
+ mt->aof_rewrite = tms->aof_rewrite;
+ mt->mem_usage = tms->mem_usage;
+ mt->digest = tms->digest;
+ mt->free = tms->free;
memcpy(mt->name,name,sizeof(mt->name));
listAddNodeTail(ctx->module->types,mt);
return mt;
@@ -2829,6 +2948,31 @@ double RM_LoadDouble(RedisModuleIO *io) {
return value;
}
+/* In the context of the rdb_save method of a module data type, saves a float
+ * value to the RDB file. The float can be a valid number, a NaN or infinity.
+ * It is possible to load back the value with RedisModule_LoadFloat(). */
+void RM_SaveFloat(RedisModuleIO *io, float value) {
+ if (io->error) return;
+ int retval = rdbSaveBinaryFloatValue(io->rio, value);
+ if (retval == -1) {
+ io->error = 1;
+ } else {
+ io->bytes += retval;
+ }
+}
+
+/* In the context of the rdb_save method of a module data type, loads back the
+ * float value saved by RedisModule_SaveFloat(). */
+float RM_LoadFloat(RedisModuleIO *io) {
+ float value;
+ int retval = rdbLoadBinaryFloatValue(io->rio, &value);
+ if (retval == -1) {
+ moduleRDBLoadError(io);
+ return 0; /* Never reached. */
+ }
+ return value;
+}
+
/* --------------------------------------------------------------------------
* AOF API for modules data types
* -------------------------------------------------------------------------- */
@@ -2885,10 +3029,47 @@ void RM_EmitAOF(RedisModuleIO *io, const char *cmdname, const char *fmt, ...) {
}
/* --------------------------------------------------------------------------
+ * IO context handling
+ * -------------------------------------------------------------------------- */
+
+RedisModuleCtx *RM_GetContextFromIO(RedisModuleIO *io) {
+ if (io->ctx) return io->ctx; /* Can't have more than one... */
+ RedisModuleCtx ctxtemplate = REDISMODULE_CTX_INIT;
+ io->ctx = zmalloc(sizeof(RedisModuleCtx));
+ *(io->ctx) = ctxtemplate;
+ io->ctx->module = io->type->module;
+ io->ctx->client = NULL;
+ return io->ctx;
+}
+
+/* --------------------------------------------------------------------------
* Logging
* -------------------------------------------------------------------------- */
-/* Produces a log message to the standard Redis log, the format accepts
+/* This is the low level function implementing both:
+ *
+ * RM_Log()
+ * RM_LogIOError()
+ *
+ */
+void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_list ap) {
+ char msg[LOG_MAX_LEN];
+ size_t name_len;
+ int level;
+
+ if (!strcasecmp(levelstr,"debug")) level = LL_DEBUG;
+ else if (!strcasecmp(levelstr,"verbose")) level = LL_VERBOSE;
+ else if (!strcasecmp(levelstr,"notice")) level = LL_NOTICE;
+ else if (!strcasecmp(levelstr,"warning")) level = LL_WARNING;
+ else level = LL_VERBOSE; /* Default. */
+
+ name_len = snprintf(msg, sizeof(msg),"<%s> ", module->name);
+ vsnprintf(msg + name_len, sizeof(msg) - name_len, fmt, ap);
+ serverLogRaw(level,msg);
+}
+
+/*
+ * Produces a log message to the standard Redis log, the format accepts
* printf-alike specifiers, while level is a string describing the log
* level to use when emitting the log, and must be one of the following:
*
@@ -2903,26 +3084,177 @@ void RM_EmitAOF(RedisModuleIO *io, const char *cmdname, const char *fmt, ...) {
* a few lines of text.
*/
void RM_Log(RedisModuleCtx *ctx, const char *levelstr, const char *fmt, ...) {
- va_list ap;
- char msg[LOG_MAX_LEN];
- size_t name_len;
- int level;
-
if (!ctx->module) return; /* Can only log if module is initialized */
- if (!strcasecmp(levelstr,"debug")) level = LL_DEBUG;
- else if (!strcasecmp(levelstr,"verbose")) level = LL_VERBOSE;
- else if (!strcasecmp(levelstr,"notice")) level = LL_NOTICE;
- else if (!strcasecmp(levelstr,"warning")) level = LL_WARNING;
- else level = LL_VERBOSE; /* Default. */
-
- name_len = snprintf(msg, sizeof(msg),"<%s> ", ctx->module->name);
+ va_list ap;
+ va_start(ap, fmt);
+ RM_LogRaw(ctx->module,levelstr,fmt,ap);
+ va_end(ap);
+}
+/* Log errors from RDB / AOF serialization callbacks.
+ *
+ * This function should be used when a callback is returning a critical
+ * error to the caller since cannot load or save the data for some
+ * critical reason. */
+void RM_LogIOError(RedisModuleIO *io, const char *levelstr, const char *fmt, ...) {
+ va_list ap;
va_start(ap, fmt);
- vsnprintf(msg + name_len, sizeof(msg) - name_len, fmt, ap);
+ RM_LogRaw(io->type->module,levelstr,fmt,ap);
va_end(ap);
+}
- serverLogRaw(level,msg);
+/* --------------------------------------------------------------------------
+ * Blocking clients from modules
+ * -------------------------------------------------------------------------- */
+
+/* This is called from blocked.c in order to unblock a client: may be called
+ * for multiple reasons while the client is in the middle of being blocked
+ * because the client is terminated, but is also called for cleanup when a
+ * client is unblocked in a clean way after replaying.
+ *
+ * What we do here is just to set the client to NULL in the redis module
+ * blocked client handle. This way if the client is terminated while there
+ * is a pending threaded operation involving the blocked client, we'll know
+ * that the client no longer exists and no reply callback should be called.
+ *
+ * The structure RedisModuleBlockedClient will be always deallocated when
+ * running the list of clients blocked by a module that need to be unblocked. */
+void unblockClientFromModule(client *c) {
+ RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
+ bc->client = NULL;
+}
+
+/* Block a client in the context of a blocking command, returning an handle
+ * which will be used, later, in order to block the client with a call to
+ * RedisModule_UnblockClient(). The arguments specify callback functions
+ * and a timeout after which the client is unblocked.
+ *
+ * The callbacks are called in the following contexts:
+ *
+ * reply_callback: called after a successful RedisModule_UnblockClient() call
+ * in order to reply to the client and unblock it.
+ * reply_timeout: called when the timeout is reached in order to send an
+ * error to the client.
+ * free_privdata: called in order to free the privata data that is passed
+ * by RedisModule_UnblockClient() call.
+ */
+RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(void*), long long timeout_ms) {
+ client *c = ctx->client;
+ c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient));
+ RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
+
+ bc->client = c;
+ bc->module = ctx->module;
+ bc->reply_callback = reply_callback;
+ bc->timeout_callback = timeout_callback;
+ bc->free_privdata = free_privdata;
+ bc->privdata = NULL;
+ c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
+
+ blockClient(c,BLOCKED_MODULE);
+ return bc;
+}
+
+/* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger
+ * the reply callbacks to be called in order to reply to the client.
+ * The 'privdata' argument will be accessible by the reply callback, so
+ * the caller of this function can pass any value that is needed in order to
+ * actually reply to the client.
+ *
+ * A common usage for 'privdata' is a thread that computes something that
+ * needs to be passed to the client, included but not limited some slow
+ * to compute reply or some reply obtained via networking.
+ *
+ * Note: this function can be called from threads spawned by the module. */
+int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
+ pthread_mutex_lock(&moduleUnblockedClientsMutex);
+ bc->privdata = privdata;
+ listAddNodeTail(moduleUnblockedClients,bc);
+ pthread_mutex_unlock(&moduleUnblockedClientsMutex);
+ return REDISMODULE_OK;
+}
+
+/* Abort a blocked client blocking operation: the client will be unblocked
+ * without firing the reply callback. */
+int RM_AbortBlock(RedisModuleBlockedClient *bc) {
+ bc->reply_callback = NULL;
+ return RM_UnblockClient(bc,NULL);
+}
+
+/* This function will check the moduleUnblockedClients queue in order to
+ * call the reply callback and really unblock the client.
+ *
+ * Clients end into this list because of calls to RM_UnblockClient(),
+ * however it is possible that while the module was doing work for the
+ * blocked client, it was terminated by Redis (for timeout or other reasons).
+ * When this happens the RedisModuleBlockedClient structure in the queue
+ * will have the 'client' field set to NULL. */
+void moduleHandleBlockedClients(void) {
+ listNode *ln;
+ RedisModuleBlockedClient *bc;
+
+ pthread_mutex_lock(&moduleUnblockedClientsMutex);
+ while (listLength(moduleUnblockedClients)) {
+ ln = listFirst(moduleUnblockedClients);
+ bc = ln->value;
+ client *c = bc->client;
+ listDelNode(moduleUnblockedClients,ln);
+ pthread_mutex_unlock(&moduleUnblockedClientsMutex);
+
+ /* Release the lock during the loop, as long as we don't
+ * touch the shared list. */
+
+ if (c != NULL && bc->reply_callback != NULL) {
+ RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
+ ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
+ ctx.blocked_privdata = bc->privdata;
+ ctx.module = bc->module;
+ ctx.client = bc->client;
+ bc->reply_callback(&ctx,(void**)c->argv,c->argc);
+ moduleHandlePropagationAfterCommandCallback(&ctx);
+ moduleFreeContext(&ctx);
+ }
+ if (bc->privdata && bc->free_privdata)
+ bc->free_privdata(bc->privdata);
+ zfree(bc);
+ if (c != NULL) unblockClient(c);
+
+ /* Lock again before to iterate the loop. */
+ pthread_mutex_lock(&moduleUnblockedClientsMutex);
+ }
+ pthread_mutex_unlock(&moduleUnblockedClientsMutex);
+}
+
+/* Called when our client timed out. After this function unblockClient()
+ * is called, and it will invalidate the blocked client. So this function
+ * does not need to do any cleanup. Eventually the module will call the
+ * API to unblock the client and the memory will be released. */
+void moduleBlockedClientTimedOut(client *c) {
+ RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
+ RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
+ ctx.flags |= REDISMODULE_CTX_BLOCKED_TIMEOUT;
+ ctx.module = bc->module;
+ ctx.client = bc->client;
+ bc->timeout_callback(&ctx,(void**)c->argv,c->argc);
+ moduleFreeContext(&ctx);
+}
+
+/* Return non-zero if a module command was called in order to fill the
+ * reply for a blocked client. */
+int RM_IsBlockedReplyRequest(RedisModuleCtx *ctx) {
+ return (ctx->flags & REDISMODULE_CTX_BLOCKED_REPLY) != 0;
+}
+
+/* Return non-zero if a module command was called in order to fill the
+ * reply for a blocked client that timed out. */
+int RM_IsBlockedTimeoutRequest(RedisModuleCtx *ctx) {
+ return (ctx->flags & REDISMODULE_CTX_BLOCKED_TIMEOUT) != 0;
+}
+
+/* Get the privata data set by RedisModule_UnblockClient() */
+void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) {
+ return ctx->blocked_privdata;
}
/* --------------------------------------------------------------------------
@@ -2961,6 +3293,8 @@ int moduleRegisterApi(const char *funcname, void *funcptr) {
void moduleRegisterCoreAPI(void);
void moduleInitModulesSystem(void) {
+ moduleUnblockedClients = listCreate();
+
server.loadmodule_queue = listCreate();
modules = dictCreate(&modulesDictType,NULL);
moduleRegisterCoreAPI();
@@ -3085,7 +3419,8 @@ int moduleUnload(sds name) {
/* Remove from list of modules. */
serverLog(LL_NOTICE,"Module %s unloaded",module->name);
- dictDeleteNoFree(modules,module->name);
+ dictDelete(modules,module->name);
+ module->name = NULL; /* The name was already freed by dictDelete(). */
moduleFreeModuleStructure(module);
return REDISMODULE_OK;
@@ -3193,6 +3528,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(CreateString);
REGISTER_API(CreateStringFromLongLong);
REGISTER_API(CreateStringFromString);
+ REGISTER_API(CreateStringPrintf);
REGISTER_API(FreeString);
REGISTER_API(StringPtrLen);
REGISTER_API(AutoMemory);
@@ -3237,9 +3573,20 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(LoadStringBuffer);
REGISTER_API(SaveDouble);
REGISTER_API(LoadDouble);
+ REGISTER_API(SaveFloat);
+ REGISTER_API(LoadFloat);
REGISTER_API(EmitAOF);
REGISTER_API(Log);
+ REGISTER_API(LogIOError);
REGISTER_API(StringAppendBuffer);
REGISTER_API(RetainString);
REGISTER_API(StringCompare);
+ REGISTER_API(GetContextFromIO);
+ REGISTER_API(BlockClient);
+ REGISTER_API(UnblockClient);
+ REGISTER_API(IsBlockedReplyRequest);
+ REGISTER_API(IsBlockedTimeoutRequest);
+ REGISTER_API(GetBlockedClientPrivateData);
+ REGISTER_API(AbortBlock);
+ REGISTER_API(Milliseconds);
}
diff --git a/src/modules/API.md b/src/modules/API.md
index 9c7ada9dd..e90429e3b 100644
--- a/src/modules/API.md
+++ b/src/modules/API.md
@@ -7,7 +7,16 @@
Use like malloc(). Memory allocated with this function is reported in
Redis INFO memory, used for keys eviction according to maxmemory settings
and in general is taken into account as memory allocated by Redis.
-You should avoid to use malloc().
+You should avoid using malloc().
+
+## `RM_Calloc`
+
+ void *RM_Calloc(size_t nmemb, size_t size);
+
+Use like calloc(). Memory allocated with this function is reported in
+Redis INFO memory, used for keys eviction according to maxmemory settings
+and in general is taken into account as memory allocated by Redis.
+You should avoid using calloc() directly.
## `RM_Realloc`
@@ -150,6 +159,12 @@ Called by `RM_Init()` to setup the `ctx->module` structure.
This is an internal function, Redis modules developers don't need
to use it.
+## `RM_Milliseconds`
+
+ long long RM_Milliseconds(void);
+
+Return the current UNIX time in milliseconds.
+
## `RM_AutoMemory`
void RM_AutoMemory(RedisModuleCtx *ctx);
@@ -169,6 +184,16 @@ with `RedisModule_FreeString()`, unless automatic memory is enabled.
The string is created by copying the `len` bytes starting
at `ptr`. No reference is retained to the passed buffer.
+## `RM_CreateStringPrintf`
+
+ RedisModuleString *RM_CreateStringPrintf(RedisModuleCtx *ctx, const char *fmt, ...);
+
+Create a new module string object from a printf format and arguments.
+The returned string must be freed with `RedisModule_FreeString()`, unless
+automatic memory is enabled.
+
+The string is created using the sds formatter function sdscatvprintf().
+
## `RM_CreateStringFromLongLong`
RedisModuleString *RM_CreateStringFromLongLong(RedisModuleCtx *ctx, long long ll);
@@ -183,7 +208,7 @@ enabling automatic memory management.
RedisModuleString *RM_CreateStringFromString(RedisModuleCtx *ctx, const RedisModuleString *str);
-Like `RedisModule_CreatString()`, but creates a string starting from an existing
+Like `RedisModule_CreatString()`, but creates a string starting from another
RedisModuleString.
The returned string must be released with `RedisModule_FreeString()` or by
@@ -200,9 +225,36 @@ It is possible to call this function even when automatic memory management
is enabled. In that case the string will be released ASAP and removed
from the pool of string to release at the end.
+## `RM_RetainString`
+
+ void RM_RetainString(RedisModuleCtx *ctx, RedisModuleString *str);
+
+Every call to this function, will make the string 'str' requiring
+an additional call to `RedisModule_FreeString()` in order to really
+free the string. Note that the automatic freeing of the string obtained
+enabling modules automatic memory management counts for one
+`RedisModule_FreeString()` call (it is just executed automatically).
+
+Normally you want to call this function when, at the same time
+the following conditions are true:
+
+1) You have automatic memory management enabled.
+2) You want to create string objects.
+3) Those string objects you create need to live *after* the callback
+ function(for example a command implementation) creating them returns.
+
+Usually you want this in order to store the created string object
+into your own data structure, for example when implementing a new data
+type.
+
+Note that when memory management is turned off, you don't need
+any call to RetainString() since creating a string will always result
+into a string that lives after the callback function returns, if
+no FreeString() call is performed.
+
## `RM_StringPtrLen`
- const char *RM_StringPtrLen(RedisModuleString *str, size_t *len);
+ const char *RM_StringPtrLen(const RedisModuleString *str, size_t *len);
Given a string module object, this function returns the string pointer
and length of the string. The returned pointer and length should only
@@ -210,7 +262,7 @@ be used for read only accesses and never modified.
## `RM_StringToLongLong`
- int RM_StringToLongLong(RedisModuleString *str, long long *ll);
+ int RM_StringToLongLong(const RedisModuleString *str, long long *ll);
Convert the string into a long long integer, storing it at `*ll`.
Returns `REDISMODULE_OK` on success. If the string can't be parsed
@@ -219,12 +271,28 @@ is returned.
## `RM_StringToDouble`
- int RM_StringToDouble(RedisModuleString *str, double *d);
+ int RM_StringToDouble(const RedisModuleString *str, double *d);
Convert the string into a double, storing it at `*d`.
Returns `REDISMODULE_OK` on success or `REDISMODULE_ERR` if the string is
not a valid string representation of a double value.
+## `RM_StringCompare`
+
+ int RM_StringCompare(RedisModuleString *a, RedisModuleString *b);
+
+Compare two string objects, returning -1, 0 or 1 respectively if
+a < b, a == b, a > b. Strings are compared byte by byte as two
+binary blobs without any encoding care / collation attempt.
+
+## `RM_StringAppendBuffer`
+
+ int RM_StringAppendBuffer(RedisModuleCtx *ctx, RedisModuleString *str, const char *buf, size_t len);
+
+Append the specified buffere to the string 'str'. The string must be a
+string created by the user that is referenced only a single time, otherwise
+`REDISMODULE_ERR` is returend and the operation is not performed.
+
## `RM_WrongArity`
int RM_WrongArity(RedisModuleCtx *ctx);
@@ -951,11 +1019,11 @@ that returned the reply object.
## `RM_CreateDataType`
- moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver, moduleTypeLoadFunc rdb_load, moduleTypeSaveFunc rdb_save, moduleTypeRewriteFunc aof_rewrite, moduleTypeDigestFunc digest, moduleTypeFreeFunc free);
+ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver, void *typemethods_ptr);
Register a new data type exported by the module. The parameters are the
following. Please for in depth documentation check the modules API
-documentation, especially the INTRO.md file.
+documentation, especially the TYPES.md file.
* **name**: A 9 characters data type name that MUST be unique in the Redis
Modules ecosystem. Be creative... and there will be no collisions. Use
@@ -974,12 +1042,32 @@ documentation, especially the INTRO.md file.
still load old data produced by an older version if the rdb_load
callback is able to check the encver value and act accordingly.
The encver must be a positive value between 0 and 1023.
+* **typemethods_ptr** is a pointer to a RedisModuleTypeMethods structure
+ that should be populated with the methods callbacks and structure
+ version, like in the following example:
+
+ RedisModuleTypeMethods tm = {
+ .version = `REDISMODULE_TYPE_METHOD_VERSION`,
+ .rdb_load = myType_RDBLoadCallBack,
+ .rdb_save = myType_RDBSaveCallBack,
+ .aof_rewrite = myType_AOFRewriteCallBack,
+ .free = myType_FreeCallBack,
+
+ // Optional fields
+ .digest = myType_DigestCallBack,
+ .mem_usage = myType_MemUsageCallBack,
+ }
+
* **rdb_load**: A callback function pointer that loads data from RDB files.
* **rdb_save**: A callback function pointer that saves data to RDB files.
* **aof_rewrite**: A callback function pointer that rewrites data as commands.
* **digest**: A callback function pointer that is used for `DEBUG DIGEST`.
+* **mem_usage**: A callback function pointer that is used for `MEMORY`.
* **free**: A callback function pointer that can free a type value.
+The **digest* and **mem_usage** methods should currently be omitted since
+they are not yet implemented inside the Redis modules core.
+
Note: the module name "AAAAAAAAA" is reserved and produces an error, it
happens to be pretty lame as well.
@@ -1115,6 +1203,21 @@ It is possible to load back the value with `RedisModule_LoadDouble()`.
In the context of the rdb_save method of a module data type, loads back the
double value saved by `RedisModule_SaveDouble()`.
+## `RM_SaveFloat`
+
+ void RM_SaveFloat(RedisModuleIO *io, float value);
+
+In the context of the rdb_save method of a module data type, saves a float
+value to the RDB file. The float can be a valid number, a NaN or infinity.
+It is possible to load back the value with `RedisModule_LoadFloat()`.
+
+## `RM_LoadFloat`
+
+ float RM_LoadFloat(RedisModuleIO *io);
+
+In the context of the rdb_save method of a module data type, loads back the
+float value saved by `RedisModule_SaveFloat()`.
+
## `RM_EmitAOF`
void RM_EmitAOF(RedisModuleIO *io, const char *cmdname, const char *fmt, ...);
@@ -1125,10 +1228,20 @@ by a module. The command works exactly like `RedisModule_Call()` in the way
the parameters are passed, but it does not return anything as the error
handling is performed by Redis itself.
+## `RM_LogRaw`
+
+ void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_list ap);
+
+This is the low level function implementing both:
+
+ `RM_Log()`
+ `RM_LogIOError()`
+
## `RM_Log`
void RM_Log(RedisModuleCtx *ctx, const char *levelstr, const char *fmt, ...);
+/*
Produces a log message to the standard Redis log, the format accepts
printf-alike specifiers, while level is a string describing the log
level to use when emitting the log, and must be one of the following:
@@ -1143,3 +1256,74 @@ There is a fixed limit to the length of the log line this function is able
to emit, this limti is not specified but is guaranteed to be more than
a few lines of text.
+## `RM_LogIOError`
+
+ void RM_LogIOError(RedisModuleIO *io, const char *levelstr, const char *fmt, ...);
+
+Log errors from RDB / AOF serialization callbacks.
+
+This function should be used when a callback is returning a critical
+error to the caller since cannot load or save the data for some
+critical reason.
+
+## `RM_BlockClient`
+
+ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(void*), long long timeout_ms);
+
+Block a client in the context of a blocking command, returning an handle
+which will be used, later, in order to block the client with a call to
+`RedisModule_UnblockClient()`. The arguments specify callback functions
+and a timeout after which the client is unblocked.
+
+The callbacks are called in the following contexts:
+
+reply_callback: called after a successful `RedisModule_UnblockClient()` call
+ in order to reply to the client and unblock it.
+reply_timeout: called when the timeout is reached in order to send an
+ error to the client.
+free_privdata: called in order to free the privata data that is passed
+ by `RedisModule_UnblockClient()` call.
+
+## `RM_UnblockClient`
+
+ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata);
+
+Unblock a client blocked by ``RedisModule_BlockedClient``. This will trigger
+the reply callbacks to be called in order to reply to the client.
+The 'privdata' argument will be accessible by the reply callback, so
+the caller of this function can pass any value that is needed in order to
+actually reply to the client.
+
+A common usage for 'privdata' is a thread that computes something that
+needs to be passed to the client, included but not limited some slow
+to compute reply or some reply obtained via networking.
+
+Note: this function can be called from threads spawned by the module.
+
+## `RM_AbortBlock`
+
+ int RM_AbortBlock(RedisModuleBlockedClient *bc);
+
+Abort a blocked client blocking operation: the client will be unblocked
+without firing the reply callback.
+
+## `RM_IsBlockedReplyRequest`
+
+ int RM_IsBlockedReplyRequest(RedisModuleCtx *ctx);
+
+Return non-zero if a module command was called in order to fill the
+reply for a blocked client.
+
+## `RM_IsBlockedTimeoutRequest`
+
+ int RM_IsBlockedTimeoutRequest(RedisModuleCtx *ctx);
+
+Return non-zero if a module command was called in order to fill the
+reply for a blocked client that timed out.
+
+## `RM_GetBlockedClientPrivateData`
+
+ void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx);
+
+Get the privata data set by `RedisModule_UnblockClient()`
+
diff --git a/src/modules/BLOCK.md b/src/modules/BLOCK.md
new file mode 100644
index 000000000..d4f3c93bc
--- /dev/null
+++ b/src/modules/BLOCK.md
@@ -0,0 +1,265 @@
+Blocking commands in Redis modules
+===
+
+Redis has a few blocking commands among the built-in set of commands.
+One of the most used is `BLPOP` (or the symmetric `BRPOP`) which blocks
+waiting for elements arriving in a list.
+
+The interesting fact about blocking commands is that they do not block
+the whole server, but just the client calling them. Usually the reason to
+block is that we expect some external event to happen: this can be
+some change in the Redis data structures like in the `BLPOP` case, a
+long computation happening in a thread, to receive some data from the
+network, and so forth.
+
+Redis modules have the ability to implement blocking commands as well,
+this documentation shows how the API works and describes a few patterns
+that can be used in order to model blocking commands.
+
+How blocking and resuming works.
+---
+
+_Note: You may want to check the `helloblock.c` example in the Redis source tree
+inside the `src/modules` directory, for a simple to understand example
+on how the blocking API is applied._
+
+In Redis modules, commands are implemented by callback functions that
+are invoked by the Redis core when the specific command is called
+by the user. Normally the callback terminates its execution sending
+some reply to the client. Using the following function instead, the
+function implementing the module command may request that the client
+is put into the blocked state:
+
+ RedisModuleBlockedClient *RedisModule_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(void*), long long timeout_ms);
+
+The function returns a `RedisModuleBlockedClient` object, which is later
+used in order to unblock the client. The arguments have the following
+meaning:
+
+* `ctx` is the command execution context as usually in the rest of the API.
+* `reply_callback` is the callback, having the same prototype of a normal command function, that is called when the client is unblocked in order to return a reply to the client.
+* `timeout_callback` is the callback, having the same prototype of a normal command function that is called when the client reached the `ms` timeout.
+* `free_privdata` is the callback that is called in order to free the private data. Private data is a pointer to some data that is passed between the API used to unblock the client, to the callback that will send the reply to the client. We'll see how this mechanism works later in this document.
+* `ms` is the timeout in milliseconds. When the timeout is reached, the timeout callback is called and the client is automatically aborted.
+
+Once a client is blocked, it can be unblocked with the following API:
+
+ int RedisModule_UnblockClient(RedisModuleBlockedClient *bc, void *privdata);
+
+The function takes as argument the blocked client object returned by
+the previous call to `RedisModule_BlockClient()`, and unblock the client.
+Immediately before the client gets unblocked, the `reply_callback` function
+specified when the client was blocked is called: this function will
+have access to the `privdata` pointer used here.
+
+IMPORTANT: The above function is thread safe, and can be called from within
+a thread doing some work in order to implement the command that blocked
+the client.
+
+The `privdata` data will be freed automatically using the `free_privdata`
+callback when the client is unblocked. This is useful **since the reply
+callback may never be called** in case the client timeouts or disconnects
+from the server, so it's important that it's up to an external function
+to have the responsibility to free the data passed if needed.
+
+To better understand how the API works, we can imagine writing a command
+that blocks a client for one second, and then send as reply "Hello!".
+
+Note: arity checks and other non important things are not implemented
+int his command, in order to take the example simple.
+
+ int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
+ int argc)
+ {
+ RedisModuleBlockedClient *bc =
+ RedisModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);
+
+ pthread_t tid;
+ pthread_create(&tid,NULL,threadmain,bc);
+
+ return REDISMODULE_OK;
+ }
+
+ void *threadmain(void *arg) {
+ RedisModuleBlockedClient *bc = arg;
+
+ sleep(1); /* Wait one second and unblock. */
+ RedisModule_UnblockClient(bc,NULL);
+ }
+
+The above command blocks the client ASAP, spawining a thread that will
+wait a second and will unblock the client. Let's check the reply and
+timeout callbacks, which are in our case very similar, since they
+just reply the client with a different reply type.
+
+ int reply_func(RedisModuleCtx *ctx, RedisModuleString **argv,
+ int argc)
+ {
+ return RedisModule_ReplyWithSimpleString(ctx,"Hello!");
+ }
+
+ int timeout_func(RedisModuleCtx *ctx, RedisModuleString **argv,
+ int argc)
+ {
+ return RedisModule_ReplyWithNull(ctx);
+ }
+
+The reply callback just sends the "Hello!" string to the client.
+The important bit here is that the reply callback is called when the
+client is unblocked from the thread.
+
+The timeout command returns `NULL`, as it often happens with actual
+Redis blocking commands timing out.
+
+Passing reply data when unblocking
+---
+
+The above example is simple to understand but lacks an important
+real world aspect of an actual blocking command implementation: often
+the reply function will need to know what to reply to the client,
+and this information is often provided as the client is unblocked.
+
+We could modify the above example so that the thread generates a
+random number after waiting one second. You can think at it as an
+actually expansive operation of some kind. Then this random number
+can be passed to the reply function so that we return it to the command
+caller. In order to make this working, we modify the functions as follow:
+
+ void *threadmain(void *arg) {
+ RedisModuleBlockedClient *bc = arg;
+
+ sleep(1); /* Wait one second and unblock. */
+
+ long *mynumber = RedisModule_Alloc(sizeof(long));
+ *mynumber = rand();
+ RedisModule_UnblockClient(bc,mynumber);
+ }
+
+As you can see, now the unblocking call is passing some private data,
+that is the `mynumber` pointer, to the reply callback. In order to
+obtain this private data, the reply callback will use the following
+fnuction:
+
+ void *RedisModule_GetBlockedClientPrivateData(RedisModuleCtx *ctx);
+
+So our reply callback is modified like that:
+
+ int reply_func(RedisModuleCtx *ctx, RedisModuleString **argv,
+ int argc)
+ {
+ long *mynumber = RedisModule_GetBlockedClientPrivateData(ctx);
+ /* IMPORTANT: don't free mynumber here, but in the
+ * free privdata callback. */
+ return RedisModule_ReplyWithLongLong(ctx,mynumber);
+ }
+
+Note that we also need to pass a `free_privdata` function when blocking
+the client with `RedisModule_BlockClient()`, since the allocated
+long value must be freed. Our callback will look like the following:
+
+ void free_privdata(void *privdata) {
+ RedisModule_Free(privdata);
+ }
+
+NOTE: It is important to stress that the private data is best freed in the
+`free_privdata` callback becaues the reply function may not be called
+if the client disconnects or timeout.
+
+Also note that the private data is also accessible from the timeout
+callback, always using the `GetBlockedClientPrivateData()` API.
+
+Aborting the blocking of a client
+---
+
+One problem that sometimes arises is that we need to allocate resources
+in order to implement the non blocking command. So we block the client,
+then, for example, try to create a thread, but the thread creation function
+returns an error. What to do in such a condition in order to recover? We
+don't want to take the client blocked, nor we want to call `UnblockClient()`
+because this will trigger the reply callback to be called.
+
+In this case the best thing to do is to use the following function:
+
+ int RedisModule_AbortBlock(RedisModuleBlockedClient *bc);
+
+Practically this is how to use it:
+
+ int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
+ int argc)
+ {
+ RedisModuleBlockedClient *bc =
+ RedisModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);
+
+ pthread_t tid;
+ if (pthread_create(&tid,NULL,threadmain,bc) != 0) {
+ RedisModule_AbortBlock(bc);
+ RedisModule_ReplyWithError(ctx,"Sorry can't create a thread");
+ }
+
+ return REDISMODULE_OK;
+ }
+
+The client will be unblocked but the reply callback will not be called.
+
+Implementing the command, reply and timeout callback using a single function
+---
+
+The following functions can be used in order to implement the reply and
+callback with the same function that implements the primary command
+function:
+
+ int RedisModule_IsBlockedReplyRequest(RedisModuleCtx *ctx);
+ int RedisModule_IsBlockedTimeoutRequest(RedisModuleCtx *ctx);
+
+So I could rewrite the example command without using a separated
+reply and timeout callback:
+
+ int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
+ int argc)
+ {
+ if (RedisModule_IsBlockedReplyRequest(ctx)) {
+ long *mynumber = RedisModule_GetBlockedClientPrivateData(ctx);
+ return RedisModule_ReplyWithLongLong(ctx,mynumber);
+ } else if (RedisModule_IsBlockedTimeoutRequest) {
+ return RedisModule_ReplyWithNull(ctx);
+ }
+
+ RedisModuleBlockedClient *bc =
+ RedisModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);
+
+ pthread_t tid;
+ if (pthread_create(&tid,NULL,threadmain,bc) != 0) {
+ RedisModule_AbortBlock(bc);
+ RedisModule_ReplyWithError(ctx,"Sorry can't create a thread");
+ }
+
+ return REDISMODULE_OK;
+ }
+
+Functionally is the same but there are people that will prefer the less
+verbose implementation that concentrates most of the command logic in a
+single function.
+
+Working on copies of data inside a thread
+---
+
+An interesting pattern in order to work with threads implementing the
+slow part of a command, is to work with a copy of the data, so that
+while some operation is performed in a key, the user continues to see
+the old version. However when the thread terminated its work, the
+representations are swapped and the new, processed version, is used.
+
+An example of this approach is the
+[Neural Redis module](https://github.com/antirez/neural-redis)
+where neural networks are trained in different threads while the
+user can still execute and inspect their older versions.
+
+Future work
+---
+
+An API is work in progress right now in order to allow Redis modules APIs
+to be called in a safe way from threads, so that the threaded command
+can access the data space and do incremental operations.
+
+There is no ETA for this feature but it may appear in the course of the
+Redis 4.0 release at some point.
diff --git a/src/modules/INTRO.md b/src/modules/INTRO.md
index e5576b7fc..3ac6a4673 100644
--- a/src/modules/INTRO.md
+++ b/src/modules/INTRO.md
@@ -6,6 +6,7 @@ The modules documentation is composed of the following files:
* `INTRO.md` (this file). An overview about Redis Modules system and API. It's a good idea to start your reading here.
* `API.md` is generated from module.c top comments of RedisMoule functions. It is a good reference in order to understand how each function works.
* `TYPES.md` covers the implementation of native data types into modules.
+* `BLOCK.md` shows how to write blocking commands that will not reply immediately, but will block the client, without blocking the Redis server, and will provide a reply whenever will be possible.
Redis modules make possible to extend Redis functionality using external
modules, implementing new Redis commands at a speed and with features
diff --git a/src/modules/Makefile b/src/modules/Makefile
index 3cd51023f..066e65e9b 100644
--- a/src/modules/Makefile
+++ b/src/modules/Makefile
@@ -4,16 +4,16 @@ uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not')
# Compile flags for linux / osx
ifeq ($(uname_S),Linux)
- SHOBJ_CFLAGS ?= -fno-common -g -ggdb -std=c99
+ SHOBJ_CFLAGS ?= -W -Wall -fno-common -g -ggdb -std=c99 -O2
SHOBJ_LDFLAGS ?= -shared
else
- SHOBJ_CFLAGS ?= -dynamic -fno-common -g -ggdb -std=c99
+ SHOBJ_CFLAGS ?= -W -Wall -dynamic -fno-common -g -ggdb -std=c99 -O2
SHOBJ_LDFLAGS ?= -bundle -undefined dynamic_lookup
endif
.SUFFIXES: .c .so .xo .o
-all: helloworld.so hellotype.so testmodule.so
+all: helloworld.so hellotype.so helloblock.so testmodule.so
.c.xo:
$(CC) -I. $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@
@@ -28,6 +28,11 @@ hellotype.xo: ../redismodule.h
hellotype.so: hellotype.xo
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
+helloblock.xo: ../redismodule.h
+
+helloblock.so: helloblock.xo
+ $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lpthread -lc
+
testmodule.xo: ../redismodule.h
testmodule.so: testmodule.xo
diff --git a/src/modules/TYPES.md b/src/modules/TYPES.md
index 1c31950fa..4d497356a 100644
--- a/src/modules/TYPES.md
+++ b/src/modules/TYPES.md
@@ -47,23 +47,25 @@ be stored in the global variable.
#define MYTYPE_ENCODING_VERSION 0
int RedisModule_OnLoad(RedisModuleCtx *ctx) {
- MyType = RedisModule_CreateDataType("MyType-AZ", MYTYPE_ENCODING_VERSION,
- MyTypeRDBLoad, MyTypeRDBSave, MyTypeAOFRewrite, MyTypeDigest,
- MyTypeFree);
+ RedisModuleTypeMethods tm = {
+ .version = REDISMODULE_TYPE_METHOD_VERSION,
+ .rdb_load = MyTypeRDBLoad,
+ .rdb_save = MyTypeRDBSave,
+ .aof_rewrite = MyTypeAOFRewrite,
+ .free = MyTypeFree
+ };
+
+ MyType = RedisModule_CreateDataType(ctx, "MyType-AZ",
+ MYTYPE_ENCODING_VERSION, &tm);
if (MyType == NULL) return REDISMODULE_ERR;
}
As you can see from the example above, a single API call is needed in order to
register the new type. However a number of function pointers are passed as
-arguments. The prototype of `RedisModule_CreateDataType` is the following:
-
- moduleType *RedisModule_CreateDataType(RedisModuleCtx *ctx,
- const char *name, int encver,
- moduleTypeLoadFunc rdb_load,
- moduleTypeSaveFunc rdb_save,
- moduleTypeRewriteFunc aof_rewrite,
- moduleTypeDigestFunc digest,
- moduleTypeFreeFunc free);
+arguments. Certain are optionals while some are mandatory. The above set
+of methods *must* be passed, while `.digest` and `.mem_usage` are optional
+and are currently not actually supported by the modules internals, so for
+now you can just ignore them.
The `ctx` argument is the context that we receive in the `OnLoad` function.
The type `name` is a 9 character name in the character set that includes
@@ -74,6 +76,9 @@ ecosystem, so be creative, use both lower-case and upper case if it makes
sense, and try to use the convention of mixing the type name with the name
of the author of the module, to create a 9 character unique name.
+**NOTE:** It is very important that the name is exactly 9 chars or the
+registration of the type will fail. Read more to understand why.
+
For example if I'm building a *b-tree* data structure and my name is *antirez*
I'll call my type **btree1-az**. The name, converted to a 64 bit integer,
is stored inside the RDB file when saving the type, and will be used when the
@@ -95,12 +100,14 @@ there is data found for a different encoding version (and the encoding version
is passed as argument to `rdb_load`), so that the module can still load old
RDB files.
-The remaining arguments `rdb_load`, `rdb_save`, `aof_rewrite`, `digest` and
-`free` are all callbacks with the following prototypes and uses:
+The last argument is a structure used in order to pass the type methods to the
+registration function: `rdb_load`, `rdb_save`, `aof_rewrite`, `digest` and
+`free` and `mem_usage` are all callbacks with the following prototypes and uses:
typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver);
typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value);
typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value);
+ typedef size_t (*RedisModuleTypeMemUsageFunc)(void *value);
typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value);
typedef void (*RedisModuleTypeFreeFunc)(void *value);
@@ -108,6 +115,7 @@ The remaining arguments `rdb_load`, `rdb_save`, `aof_rewrite`, `digest` and
* `rdb_save` is called when saving data to the RDB file.
* `aof_rewrite` is called when the AOF is being rewritten, and the module needs to tell Redis what is the sequence of commands to recreate the content of a given key.
* `digest` is called when `DEBUG DIGEST` is executed and a key holding this module type is found. Currently this is not yet implemented so the function ca be left empty.
+* `mem_usage` is called when the `MEMORY` command asks for the total memory consumed by a specific key, and is used in order to get the amount of bytes used by the module value.
* `free` is called when a key with the module native type is deleted via `DEL` or in any other mean, in order to let the module reclaim the memory associated with such a value.
Ok, but *why* modules types require a 9 characters name?
diff --git a/src/modules/helloblock.c b/src/modules/helloblock.c
new file mode 100644
index 000000000..71ec9b121
--- /dev/null
+++ b/src/modules/helloblock.c
@@ -0,0 +1,122 @@
+/* Helloblock module -- An example of blocking command implementation
+ * with threads.
+ *
+ * -----------------------------------------------------------------------------
+ *
+ * Copyright (c) 2016, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "../redismodule.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <pthread.h>
+#include <unistd.h>
+
+/* Reply callback for blocking command HELLO.BLOCK */
+int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ int *myint = RedisModule_GetBlockedClientPrivateData(ctx);
+ return RedisModule_ReplyWithLongLong(ctx,*myint);
+}
+
+/* Timeout callback for blocking command HELLO.BLOCK */
+int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ return RedisModule_ReplyWithSimpleString(ctx,"Request timedout");
+}
+
+/* Private data freeing callback for HELLO.BLOCK command. */
+void HelloBlock_FreeData(void *privdata) {
+ RedisModule_Free(privdata);
+}
+
+/* The thread entry point that actually executes the blocking part
+ * of the command HELLO.BLOCK. */
+void *HelloBlock_ThreadMain(void *arg) {
+ void **targ = arg;
+ RedisModuleBlockedClient *bc = targ[0];
+ long long delay = (unsigned long)targ[1];
+ RedisModule_Free(targ);
+
+ sleep(delay);
+ int *r = RedisModule_Alloc(sizeof(int));
+ *r = rand();
+ RedisModule_UnblockClient(bc,r);
+ return NULL;
+}
+
+/* HELLO.BLOCK <delay> <timeout> -- Block for <count> seconds, then reply with
+ * a random number. Timeout is the command timeout, so that you can test
+ * what happens when the delay is greater than the timeout. */
+int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 3) return RedisModule_WrongArity(ctx);
+ long long delay;
+ long long timeout;
+
+ if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
+ return RedisModule_ReplyWithError(ctx,"ERR invalid count");
+ }
+
+ if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK) {
+ return RedisModule_ReplyWithError(ctx,"ERR invalid count");
+ }
+
+ pthread_t tid;
+ RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
+
+ /* Now that we setup a blocking client, we need to pass the control
+ * to the thread. However we need to pass arguments to the thread:
+ * the delay and a reference to the blocked client handle. */
+ void **targ = RedisModule_Alloc(sizeof(void*)*2);
+ targ[0] = bc;
+ targ[1] = (void*)(unsigned long) delay;
+
+ if (pthread_create(&tid,NULL,HelloBlock_ThreadMain,targ) != 0) {
+ RedisModule_AbortBlock(bc);
+ return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
+ }
+ return REDISMODULE_OK;
+}
+
+/* This function must be present on each Redis module. It is used in order to
+ * register the commands into the Redis server. */
+int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ if (RedisModule_Init(ctx,"helloblock",1,REDISMODULE_APIVER_1)
+ == REDISMODULE_ERR) return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx,"hello.block",
+ HelloBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ return REDISMODULE_OK;
+}
diff --git a/src/modules/hellotype.c b/src/modules/hellotype.c
index b33ed81cd..02a5bb477 100644
--- a/src/modules/hellotype.c
+++ b/src/modules/hellotype.c
@@ -227,6 +227,8 @@ void HelloTypeAofRewrite(RedisModuleIO *aof, RedisModuleString *key, void *value
}
void HelloTypeDigest(RedisModuleDigest *digest, void *value) {
+ REDISMODULE_NOT_USED(digest);
+ REDISMODULE_NOT_USED(value);
/* TODO: The DIGEST module interface is yet not implemented. */
}
@@ -237,10 +239,21 @@ void HelloTypeFree(void *value) {
/* This function must be present on each Redis module. It is used in order to
* register the commands into the Redis server. */
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
if (RedisModule_Init(ctx,"hellotype",1,REDISMODULE_APIVER_1)
== REDISMODULE_ERR) return REDISMODULE_ERR;
- HelloType = RedisModule_CreateDataType(ctx,"hellotype",0,HelloTypeRdbLoad,HelloTypeRdbSave,HelloTypeAofRewrite,HelloTypeDigest,HelloTypeFree);
+ RedisModuleTypeMethods tm = {
+ .version = REDISMODULE_TYPE_METHOD_VERSION,
+ .rdb_load = HelloTypeRdbLoad,
+ .rdb_save = HelloTypeRdbSave,
+ .aof_rewrite = HelloTypeAofRewrite,
+ .free = HelloTypeFree
+ };
+
+ HelloType = RedisModule_CreateDataType(ctx,"hellotype",0,&tm);
if (HelloType == NULL) return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"hellotype.insert",
diff --git a/src/modules/helloworld.c b/src/modules/helloworld.c
index 4d6f8782d..4e30af2a0 100644
--- a/src/modules/helloworld.c
+++ b/src/modules/helloworld.c
@@ -46,6 +46,8 @@
* fetch the currently selected DB, the other in order to send the client
* an integer reply as response. */
int HelloSimple_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
RedisModule_ReplyWithLongLong(ctx,RedisModule_GetSelectedDb(ctx));
return REDISMODULE_OK;
}
@@ -237,6 +239,8 @@ int HelloRandArray_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, i
* comments the function implementation). */
int HelloRepl1_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
RedisModuleCallReply *reply;
RedisModule_AutoMemory(ctx);
@@ -519,7 +523,7 @@ int HelloLeftPad_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
/* If the string is already larger than the target len, just return
* the string itself. */
- if (strlen >= padlen)
+ if (strlen >= (size_t)padlen)
return RedisModule_ReplyWithString(ctx,argv[1]);
/* Padding must be a single character in this simple implementation. */
@@ -530,7 +534,7 @@ int HelloLeftPad_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
/* Here we use our pool allocator, for our throw-away allocation. */
padlen -= strlen;
char *buf = RedisModule_PoolAlloc(ctx,padlen+strlen);
- for (size_t j = 0; j < padlen; j++) buf[j] = *ch;
+ for (long long j = 0; j < padlen; j++) buf[j] = *ch;
memcpy(buf+padlen,str,strlen);
RedisModule_ReplyWithStringBuffer(ctx,buf,padlen+strlen);
diff --git a/src/modules/testmodule.c b/src/modules/testmodule.c
index a1a42f43b..8da45c0ea 100644
--- a/src/modules/testmodule.c
+++ b/src/modules/testmodule.c
@@ -48,6 +48,9 @@ int TestMatchReply(RedisModuleCallReply *reply, char *str) {
/* TEST.CALL -- Test Call() API. */
int TestCall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
RedisModule_AutoMemory(ctx);
RedisModuleCallReply *reply;
@@ -75,6 +78,9 @@ fail:
/* TEST.STRING.APPEND -- Test appending to an existing string object. */
int TestStringAppend(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
RedisModuleString *s = RedisModule_CreateString(ctx,"foo",3);
RedisModule_StringAppendBuffer(ctx,s,"bar",3);
RedisModule_ReplyWithString(ctx,s);
@@ -84,6 +90,9 @@ int TestStringAppend(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
/* TEST.STRING.APPEND.AM -- Test append with retain when auto memory is on. */
int TestStringAppendAM(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
RedisModule_AutoMemory(ctx);
RedisModuleString *s = RedisModule_CreateString(ctx,"foo",3);
RedisModule_RetainString(ctx,s);
@@ -93,6 +102,25 @@ int TestStringAppendAM(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_OK;
}
+/* TEST.STRING.PRINTF -- Test string formatting. */
+int TestStringPrintf(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ RedisModule_AutoMemory(ctx);
+ if (argc < 3) {
+ return RedisModule_WrongArity(ctx);
+ }
+ RedisModuleString *s = RedisModule_CreateStringPrintf(ctx,
+ "Got %d args. argv[1]: %s, argv[2]: %s",
+ argc,
+ RedisModule_StringPtrLen(argv[1], NULL),
+ RedisModule_StringPtrLen(argv[2], NULL)
+ );
+
+ RedisModule_ReplyWithString(ctx,s);
+
+ return REDISMODULE_OK;
+}
+
+
/* ----------------------------- Test framework ----------------------------- */
/* Return 1 if the reply matches the specified string, otherwise log errors
@@ -144,6 +172,9 @@ int TestAssertIntegerReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply, lon
/* TEST.IT -- Run all the tests. */
int TestIt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
RedisModule_AutoMemory(ctx);
RedisModuleCallReply *reply;
@@ -163,6 +194,9 @@ int TestIt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
T("test.string.append.am","");
if (!TestAssertStringReply(ctx,reply,"foobar",6)) goto fail;
+ T("test.string.printf", "cc", "foo", "bar");
+ if (!TestAssertStringReply(ctx,reply,"Got 3 args. argv[1]: foo, argv[2]: bar",38)) goto fail;
+
RedisModule_ReplyWithSimpleString(ctx,"ALL TESTS PASSED");
return REDISMODULE_OK;
@@ -173,6 +207,9 @@ fail:
}
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
if (RedisModule_Init(ctx,"test",1,REDISMODULE_APIVER_1)
== REDISMODULE_ERR) return REDISMODULE_ERR;
@@ -188,6 +225,10 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
TestStringAppendAM,"write deny-oom",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
+ if (RedisModule_CreateCommand(ctx,"test.string.printf",
+ TestStringPrintf,"write deny-oom",1,1,1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
if (RedisModule_CreateCommand(ctx,"test.it",
TestIt,"readonly",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
diff --git a/src/networking.c b/src/networking.c
index 2be40ae15..343a910e2 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -30,8 +30,9 @@
#include "server.h"
#include <sys/uio.h>
#include <math.h>
+#include <ctype.h>
-static void setProtocolError(client *c, int pos);
+static void setProtocolError(const char *errstr, client *c, int pos);
/* Return the size consumed from the allocator, for the specified SDS string,
* including internal fragmentation. This function is used in order to compute
@@ -352,6 +353,14 @@ void addReplySds(client *c, sds s) {
}
}
+/* This low level function just adds whatever protocol you send it to the
+ * client buffer, trying the static buffer initially, and using the string
+ * of objects if not possible.
+ *
+ * It is efficient because does not create an SDS object nor an Redis object
+ * if not needed. The object will only be created by calling
+ * _addReplyStringToList() if we fail to extend the existing tail object
+ * in the list of objects. */
void addReplyString(client *c, const char *s, size_t len) {
if (prepareClientToWrite(c) != C_OK) return;
if (_addReplyToBuffer(c,s,len) != C_OK)
@@ -1031,7 +1040,7 @@ int processInlineBuffer(client *c) {
if (newline == NULL) {
if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) {
addReplyError(c,"Protocol error: too big inline request");
- setProtocolError(c,0);
+ setProtocolError("too big inline request",c,0);
}
return C_ERR;
}
@@ -1047,7 +1056,7 @@ int processInlineBuffer(client *c) {
sdsfree(aux);
if (argv == NULL) {
addReplyError(c,"Protocol error: unbalanced quotes in request");
- setProtocolError(c,0);
+ setProtocolError("unbalanced quotes in inline request",c,0);
return C_ERR;
}
@@ -1081,11 +1090,29 @@ int processInlineBuffer(client *c) {
/* Helper function. Trims query buffer to make the function that processes
* multi bulk requests idempotent. */
-static void setProtocolError(client *c, int pos) {
+#define PROTO_DUMP_LEN 128
+static void setProtocolError(const char *errstr, client *c, int pos) {
if (server.verbosity <= LL_VERBOSE) {
sds client = catClientInfoString(sdsempty(),c);
+
+ /* Sample some protocol to given an idea about what was inside. */
+ char buf[256];
+ if (sdslen(c->querybuf) < PROTO_DUMP_LEN) {
+ snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%s'", c->querybuf);
+ } else {
+ snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%.*s' (... more %zu bytes ...) '%.*s'", PROTO_DUMP_LEN/2, c->querybuf, sdslen(c->querybuf)-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2);
+ }
+
+ /* Remove non printable chars. */
+ char *p = buf;
+ while (*p != '\0') {
+ if (!isprint(*p)) *p = '.';
+ p++;
+ }
+
+ /* Log all the client and protocol info. */
serverLog(LL_VERBOSE,
- "Protocol error from client: %s", client);
+ "Protocol error (%s) from client: %s. %s", errstr, client, buf);
sdsfree(client);
}
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
@@ -1106,7 +1133,7 @@ int processMultibulkBuffer(client *c) {
if (newline == NULL) {
if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) {
addReplyError(c,"Protocol error: too big mbulk count string");
- setProtocolError(c,0);
+ setProtocolError("too big mbulk count string",c,0);
}
return C_ERR;
}
@@ -1121,7 +1148,7 @@ int processMultibulkBuffer(client *c) {
ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
if (!ok || ll > 1024*1024) {
addReplyError(c,"Protocol error: invalid multibulk length");
- setProtocolError(c,pos);
+ setProtocolError("invalid mbulk count",c,pos);
return C_ERR;
}
@@ -1147,7 +1174,7 @@ int processMultibulkBuffer(client *c) {
if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) {
addReplyError(c,
"Protocol error: too big bulk count string");
- setProtocolError(c,0);
+ setProtocolError("too big bulk count string",c,0);
return C_ERR;
}
break;
@@ -1161,14 +1188,14 @@ int processMultibulkBuffer(client *c) {
addReplyErrorFormat(c,
"Protocol error: expected '$', got '%c'",
c->querybuf[pos]);
- setProtocolError(c,pos);
+ setProtocolError("expected $ but got something else",c,pos);
return C_ERR;
}
ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
if (!ok || ll < 0 || ll > 512*1024*1024) {
addReplyError(c,"Protocol error: invalid bulk length");
- setProtocolError(c,pos);
+ setProtocolError("invalid bulk length",c,pos);
return C_ERR;
}
@@ -1321,7 +1348,11 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
- if (c->flags & CLIENT_MASTER) c->reploff += nread;
+ if (c->flags & CLIENT_MASTER) {
+ c->reploff += nread;
+ replicationFeedSlavesFromMasterStream(server.slaves,
+ c->querybuf+qblen,nread);
+ }
server.stat_net_input_bytes += nread;
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
diff --git a/src/object.c b/src/object.c
index a7c1e4c21..1ae37c9d1 100644
--- a/src/object.c
+++ b/src/object.c
@@ -36,6 +36,8 @@
#define strtold(a,b) ((long double)strtod((a),(b)))
#endif
+/* ===================== Creation and parsing of objects ==================== */
+
robj *createObject(int type, void *ptr) {
robj *o = zmalloc(sizeof(*o));
o->type = type;
@@ -73,7 +75,7 @@ robj *makeObjectShared(robj *o) {
/* Create a string object with encoding OBJ_ENCODING_RAW, that is a plain
* string object where o->ptr points to a proper sds string. */
robj *createRawStringObject(const char *ptr, size_t len) {
- return createObject(OBJ_STRING,sdsnewlen(ptr,len));
+ return createObject(OBJ_STRING, sdsnewlen(ptr,len));
}
/* Create a string object with encoding OBJ_ENCODING_EMBSTR, that is
@@ -690,6 +692,299 @@ char *strEncoding(int encoding) {
}
}
+/* =========================== Memory introspection ========================== */
+
+/* Returns the size in bytes consumed by the key's value in RAM.
+ * Note that the returned value is just an approximation, especially in the
+ * case of aggregated data types where only "sample_size" elements
+ * are checked and averaged to estimate the total size. */
+#define OBJ_COMPUTE_SIZE_DEF_SAMPLES 5 /* Default sample size. */
+size_t objectComputeSize(robj *o, size_t sample_size) {
+ sds ele, ele2;
+ dict *d;
+ dictIterator *di;
+ struct dictEntry *de;
+ size_t asize = 0, elesize = 0, samples = 0;
+
+ if (o->type == OBJ_STRING) {
+ if(o->encoding == OBJ_ENCODING_INT) {
+ asize = sizeof(*o);
+ } else if(o->encoding == OBJ_ENCODING_RAW) {
+ asize = sdsAllocSize(o->ptr)+sizeof(*o);
+ } else if(o->encoding == OBJ_ENCODING_EMBSTR) {
+ asize = sdslen(o->ptr)+2+sizeof(*o);
+ } else {
+ serverPanic("Unknown string encoding");
+ }
+ } else if (o->type == OBJ_LIST) {
+ if (o->encoding == OBJ_ENCODING_QUICKLIST) {
+ quicklist *ql = o->ptr;
+ quicklistNode *node = ql->head;
+ asize = sizeof(*o)+sizeof(quicklist);
+ do {
+ elesize += sizeof(quicklistNode)+ziplistBlobLen(node->zl);
+ samples++;
+ } while ((node = node->next) && samples < sample_size);
+ asize += (double)elesize/samples*listTypeLength(o);
+ } else if (o->encoding == OBJ_ENCODING_ZIPLIST) {
+ asize = sizeof(*o)+ziplistBlobLen(o->ptr);
+ } else {
+ serverPanic("Unknown list encoding");
+ }
+ } else if (o->type == OBJ_SET) {
+ if (o->encoding == OBJ_ENCODING_HT) {
+ d = o->ptr;
+ di = dictGetIterator(d);
+ asize = sizeof(*o)+sizeof(dict)+(sizeof(struct dictEntry*)*dictSlots(d));
+ while((de = dictNext(di)) != NULL && samples < sample_size) {
+ ele = dictGetKey(de);
+ elesize += sizeof(struct dictEntry) + sdsAllocSize(ele);
+ samples++;
+ }
+ dictReleaseIterator(di);
+ if (samples) asize += (double)elesize/samples*dictSize(d);
+ } else if (o->encoding == OBJ_ENCODING_INTSET) {
+ intset *is = o->ptr;
+ asize = sizeof(*o)+sizeof(*is)+is->encoding*is->length;
+ } else {
+ serverPanic("Unknown set encoding");
+ }
+ } else if (o->type == OBJ_ZSET) {
+ if (o->encoding == OBJ_ENCODING_ZIPLIST) {
+ asize = sizeof(*o)+(ziplistBlobLen(o->ptr));
+ } else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
+ d = ((zset*)o->ptr)->dict;
+ zskiplist *zsl = ((zset*)o->ptr)->zsl;
+ zskiplistNode *znode = zsl->header->level[0].forward;
+ asize = sizeof(*o)+sizeof(zset)+(sizeof(struct dictEntry*)*dictSlots(d));
+ while(znode != NULL && samples < sample_size) {
+ elesize += sdsAllocSize(znode->ele);
+ elesize += sizeof(struct dictEntry) + zmalloc_size(znode);
+ samples++;
+ znode = znode->level[0].forward;
+ }
+ if (samples) asize += (double)elesize/samples*dictSize(d);
+ } else {
+ serverPanic("Unknown sorted set encoding");
+ }
+ } else if (o->type == OBJ_HASH) {
+ if (o->encoding == OBJ_ENCODING_ZIPLIST) {
+ asize = sizeof(*o)+(ziplistBlobLen(o->ptr));
+ } else if (o->encoding == OBJ_ENCODING_HT) {
+ d = o->ptr;
+ di = dictGetIterator(d);
+ asize = sizeof(*o)+sizeof(dict)+(sizeof(struct dictEntry*)*dictSlots(d));
+ while((de = dictNext(di)) != NULL && samples < sample_size) {
+ ele = dictGetKey(de);
+ ele2 = dictGetVal(de);
+ elesize += sdsAllocSize(ele) + sdsAllocSize(ele2);
+ elesize += sizeof(struct dictEntry);
+ samples++;
+ }
+ dictReleaseIterator(di);
+ if (samples) asize += (double)elesize/samples*dictSize(d);
+ } else {
+ serverPanic("Unknown hash encoding");
+ }
+ } else {
+ serverPanic("Unknown object type");
+ }
+ return asize;
+}
+
+/* Release data obtained with getMemoryOverheadData(). */
+void freeMemoryOverheadData(struct redisMemOverhead *mh) {
+ zfree(mh->db);
+ zfree(mh);
+}
+
+/* Return a struct redisMemOverhead filled with memory overhead
+ * information used for the MEMORY OVERHEAD and INFO command. The returned
+ * structure pointer should be freed calling freeMemoryOverheadData(). */
+struct redisMemOverhead *getMemoryOverheadData(void) {
+ int j;
+ size_t mem_total = 0;
+ size_t mem = 0;
+ size_t zmalloc_used = zmalloc_used_memory();
+ struct redisMemOverhead *mh = zcalloc(sizeof(*mh));
+
+ mh->total_allocated = zmalloc_used;
+ mh->startup_allocated = server.initial_memory_usage;
+ mh->peak_allocated = server.stat_peak_memory;
+ mh->fragmentation =
+ zmalloc_get_fragmentation_ratio(server.resident_set_size);
+ mem_total += server.initial_memory_usage;
+
+ mem = 0;
+ if (server.repl_backlog)
+ mem += zmalloc_size(server.repl_backlog);
+ mh->repl_backlog = mem;
+ mem_total += mem;
+
+ mem = 0;
+ if (listLength(server.slaves)) {
+ listIter li;
+ listNode *ln;
+
+ listRewind(server.slaves,&li);
+ while((ln = listNext(&li))) {
+ client *c = listNodeValue(ln);
+ mem += getClientOutputBufferMemoryUsage(c);
+ mem += sdsAllocSize(c->querybuf);
+ mem += sizeof(client);
+ }
+ }
+ mh->clients_slaves = mem;
+ mem_total+=mem;
+
+ mem = 0;
+ if (listLength(server.clients)) {
+ listIter li;
+ listNode *ln;
+
+ listRewind(server.clients,&li);
+ while((ln = listNext(&li))) {
+ client *c = listNodeValue(ln);
+ if (c->flags & CLIENT_SLAVE)
+ continue;
+ mem += getClientOutputBufferMemoryUsage(c);
+ mem += sdsAllocSize(c->querybuf);
+ mem += sizeof(client);
+ }
+ }
+ mh->clients_normal = mem;
+ mem_total+=mem;
+
+ mem = 0;
+ if (server.aof_state != AOF_OFF) {
+ mem += sdslen(server.aof_buf);
+ mem += aofRewriteBufferSize();
+ }
+ mh->aof_buffer = mem;
+ mem_total+=mem;
+
+ for (j = 0; j < server.dbnum; j++) {
+ redisDb *db = server.db+j;
+ long long keyscount = dictSize(db->dict);
+ if (keyscount==0) continue;
+
+ mh->total_keys += keyscount;
+ mh->db = zrealloc(mh->db,sizeof(mh->db[0])*(mh->num_dbs+1));
+ mh->db[mh->num_dbs].dbid = j;
+
+ mem = dictSize(db->dict) * sizeof(dictEntry) +
+ dictSlots(db->dict) * sizeof(dictEntry*) +
+ dictSize(db->dict) * sizeof(robj);
+ mh->db[mh->num_dbs].overhead_ht_main = mem;
+ mem_total+=mem;
+
+ mem = dictSize(db->expires) * sizeof(dictEntry) +
+ dictSlots(db->expires) * sizeof(dictEntry*);
+ mh->db[mh->num_dbs].overhead_ht_expires = mem;
+ mem_total+=mem;
+
+ mh->num_dbs++;
+ }
+
+ mh->overhead_total = mem_total;
+ mh->dataset = zmalloc_used - mem_total;
+ mh->peak_perc = (float)zmalloc_used*100/mh->peak_allocated;
+
+ /* Metrics computed after subtracting the startup memory from
+ * the total memory. */
+ size_t net_usage = 1;
+ if (zmalloc_used > mh->startup_allocated)
+ net_usage = zmalloc_used - mh->startup_allocated;
+ mh->dataset_perc = (float)mh->dataset*100/net_usage;
+ mh->bytes_per_key = mh->total_keys ? (net_usage / mh->total_keys) : 0;
+
+ return mh;
+}
+
+/* Helper for "MEMORY allocator-stats", used as a callback for the jemalloc
+ * stats output. */
+void inputCatSds(void *result, const char *str) {
+ /* result is actually a (sds *), so re-cast it here */
+ sds *info = (sds *)result;
+ *info = sdscat(*info, str);
+}
+
+/* This implements MEMORY DOCTOR. An human readable analysis of the Redis
+ * memory condition. */
+sds getMemoryDoctorReport(void) {
+ int empty = 0; /* Instance is empty or almost empty. */
+ int big_peak = 0; /* Memory peak is much larger than used mem. */
+ int high_frag = 0; /* High fragmentation. */
+ int big_slave_buf = 0; /* Slave buffers are too big. */
+ int big_client_buf = 0; /* Client buffers are too big. */
+ int num_reports = 0;
+ struct redisMemOverhead *mh = getMemoryOverheadData();
+
+ if (mh->total_allocated < (1024*1024*5)) {
+ empty = 1;
+ num_reports++;
+ } else {
+ /* Peak is > 150% of current used memory? */
+ if (((float)mh->peak_allocated / mh->total_allocated) > 1.5) {
+ big_peak = 1;
+ num_reports++;
+ }
+
+ /* Fragmentation is higher than 1.4? */
+ if (mh->fragmentation > 1.4) {
+ high_frag = 1;
+ num_reports++;
+ }
+
+ /* Clients using more than 200k each average? */
+ long numslaves = listLength(server.slaves);
+ long numclients = listLength(server.clients)-numslaves;
+ if (mh->clients_normal / numclients > (1024*200)) {
+ big_client_buf = 1;
+ num_reports++;
+ }
+
+ /* Slaves using more than 10 MB each? */
+ if (mh->clients_slaves / numslaves > (1024*1024*10)) {
+ big_slave_buf = 1;
+ num_reports++;
+ }
+ }
+
+ sds s;
+ if (num_reports == 0) {
+ s = sdsnew(
+ "Hi Sam, I can't find any memory issue in your instance. "
+ "I can only account for what occurs on this base.\n");
+ } else if (empty == 1) {
+ s = sdsnew(
+ "Hi Sam, this instance is empty or is using very little memory, "
+ "my issues detector can't be used in these conditions. "
+ "Please, leave for your mission on Earth and fill it with some data. "
+ "The new Sam and I will be back to our programming as soon as I "
+ "finished rebooting.\n");
+ } else {
+ s = sdsnew("Sam, I detected a few issues in this Redis instance memory implants:\n\n");
+ if (big_peak) {
+ s = sdscat(s," * Peak memory: In the past this instance used more than 150% the memory that is currently using. The allocator is normally not able to release memory after a peak, so you can expect to see a big fragmentation ratio, however this is actually harmless and is only due to the memory peak, and if the Redis instance Resident Set Size (RSS) is currently bigger than expected, the memory will be used as soon as you fill the Redis instance with more data. If the memory peak was only occasional and you want to try to reclaim memory, please try the MEMORY PURGE command, otherwise the only other option is to shutdown and restart the instance.\n\n");
+ }
+ if (high_frag) {
+ s = sdscatprintf(s," * High fragmentation: This instance has a memory fragmentation greater than 1.4 (this means that the Resident Set Size of the Redis process is much larger than the sum of the logical allocations Redis performed). This problem is usually due either to a large peak memory (check if there is a peak memory entry above in the report) or may result from a workload that causes the allocator to fragment memory a lot. If the problem is a large peak memory, then there is no issue. Otherwise, make sure you are using the Jemalloc allocator and not the default libc malloc. Note: The currently used allocator is \"%s\".\n\n", ZMALLOC_LIB);
+ }
+ if (big_slave_buf) {
+ s = sdscat(s," * Big slave buffers: The slave output buffers in this instance are greater than 10MB for each slave (on average). This likely means that there is some slave instance that is struggling receiving data, either because it is too slow or because of networking issues. As a result, data piles on the master output buffers. Please try to identify what slave is not receiving data correctly and why. You can use the INFO output in order to check the slaves delays and the CLIENT LIST command to check the output buffers of each slave.\n\n");
+ }
+ if (big_client_buf) {
+ s = sdscat(s," * Big client buffers: The clients output buffers in this instance are greater than 200K per client (on average). This may result from different causes, like Pub/Sub clients subscribed to channels bot not receiving data fast enough, so that data piles on the Redis instance output buffer, or clients sending commands with large replies or very large sequences of commands in the same pipeline. Please use the CLIENT LIST command in order to investigate the issue if it causes problems in your instance, or to understand better why certain clients are using a big amount of memory.\n\n");
+ }
+ s = sdscat(s,"I'm here to keep you safe, Sam. I want to help you.\n");
+ }
+ freeMemoryOverheadData(mh);
+ return s;
+}
+
+/* ======================= The OBJECT and MEMORY commands =================== */
+
/* This is a helper function for the OBJECT command. We need to lookup keys
* without any modification of LRU or other parameters. */
robj *objectCommandLookup(client *c, robj *key) {
@@ -740,3 +1035,138 @@ void objectCommand(client *c) {
}
}
+/* The memory command will eventually be a complete interface for the
+ * memory introspection capabilities of Redis.
+ *
+ * Usage: MEMORY usage <key> */
+void memoryCommand(client *c) {
+ robj *o;
+
+ if (!strcasecmp(c->argv[1]->ptr,"usage") && c->argc >= 3) {
+ long long samples = OBJ_COMPUTE_SIZE_DEF_SAMPLES;
+ for (int j = 3; j < c->argc; j++) {
+ if (!strcasecmp(c->argv[j]->ptr,"samples") &&
+ j+1 < c->argc)
+ {
+ if (getLongLongFromObjectOrReply(c,c->argv[j+1],&samples,NULL)
+ == C_ERR) return;
+ if (samples < 0) {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+ if (samples == 0) samples = LLONG_MAX;;
+ j++; /* skip option argument. */
+ } else {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+ }
+ if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
+ == NULL) return;
+ size_t usage = objectComputeSize(o,samples);
+ usage += sdsAllocSize(c->argv[1]->ptr);
+ usage += sizeof(dictEntry);
+ addReplyLongLong(c,usage);
+ } else if (!strcasecmp(c->argv[1]->ptr,"stats") && c->argc == 2) {
+ struct redisMemOverhead *mh = getMemoryOverheadData();
+
+ addReplyMultiBulkLen(c,(14+mh->num_dbs)*2);
+
+ addReplyBulkCString(c,"peak.allocated");
+ addReplyLongLong(c,mh->peak_allocated);
+
+ addReplyBulkCString(c,"total.allocated");
+ addReplyLongLong(c,mh->total_allocated);
+
+ addReplyBulkCString(c,"startup.allocated");
+ addReplyLongLong(c,mh->startup_allocated);
+
+ addReplyBulkCString(c,"replication.backlog");
+ addReplyLongLong(c,mh->repl_backlog);
+
+ addReplyBulkCString(c,"clients.slaves");
+ addReplyLongLong(c,mh->clients_slaves);
+
+ addReplyBulkCString(c,"clients.normal");
+ addReplyLongLong(c,mh->clients_normal);
+
+ addReplyBulkCString(c,"aof.buffer");
+ addReplyLongLong(c,mh->aof_buffer);
+
+ for (size_t j = 0; j < mh->num_dbs; j++) {
+ char dbname[32];
+ snprintf(dbname,sizeof(dbname),"db.%zd",mh->db[j].dbid);
+ addReplyBulkCString(c,dbname);
+ addReplyMultiBulkLen(c,4);
+
+ addReplyBulkCString(c,"overhead.hashtable.main");
+ addReplyLongLong(c,mh->db[j].overhead_ht_main);
+
+ addReplyBulkCString(c,"overhead.hashtable.expires");
+ addReplyLongLong(c,mh->db[j].overhead_ht_expires);
+ }
+
+ addReplyBulkCString(c,"overhead.total");
+ addReplyLongLong(c,mh->overhead_total);
+
+ addReplyBulkCString(c,"keys.count");
+ addReplyLongLong(c,mh->total_keys);
+
+ addReplyBulkCString(c,"keys.bytes-per-key");
+ addReplyLongLong(c,mh->bytes_per_key);
+
+ addReplyBulkCString(c,"dataset.bytes");
+ addReplyLongLong(c,mh->dataset);
+
+ addReplyBulkCString(c,"dataset.percentage");
+ addReplyDouble(c,mh->dataset_perc);
+
+ addReplyBulkCString(c,"peak.percentage");
+ addReplyDouble(c,mh->peak_perc);
+
+ addReplyBulkCString(c,"fragmentation");
+ addReplyDouble(c,mh->fragmentation);
+
+ freeMemoryOverheadData(mh);
+ } else if (!strcasecmp(c->argv[1]->ptr,"malloc-stats") && c->argc == 2) {
+#if defined(USE_JEMALLOC)
+ sds info = sdsempty();
+ je_malloc_stats_print(inputCatSds, &info, NULL);
+ addReplyBulkSds(c, info);
+#else
+ addReplyBulkCString(c,"Stats not supported for the current allocator");
+#endif
+ } else if (!strcasecmp(c->argv[1]->ptr,"doctor") && c->argc == 2) {
+ sds report = getMemoryDoctorReport();
+ addReplyBulkSds(c,report);
+ } else if (!strcasecmp(c->argv[1]->ptr,"purge") && c->argc == 2) {
+#if defined(USE_JEMALLOC)
+ char tmp[32];
+ unsigned narenas = 0;
+ size_t sz = sizeof(unsigned);
+ if (!je_mallctl("arenas.narenas", &narenas, &sz, NULL, 0)) {
+ sprintf(tmp, "arena.%d.purge", narenas);
+ if (!je_mallctl(tmp, NULL, 0, NULL, 0)) {
+ addReply(c, shared.ok);
+ return;
+ }
+ }
+ addReplyError(c, "Error purging dirty pages");
+#else
+ addReply(c, shared.ok);
+ /* Nothing to do for other allocators. */
+#endif
+ } else if (!strcasecmp(c->argv[1]->ptr,"help") && c->argc == 2) {
+ addReplyMultiBulkLen(c,4);
+ addReplyBulkCString(c,
+"MEMORY USAGE <key> [SAMPLES <count>] - Estimate memory usage of key");
+ addReplyBulkCString(c,
+"MEMORY STATS - Show memory usage details");
+ addReplyBulkCString(c,
+"MEMORY PURGE - Ask the allocator to release memory");
+ addReplyBulkCString(c,
+"MEMORY MALLOC-STATS - Show allocator internal stats");
+ } else {
+ addReplyError(c,"Syntax error. Try MEMORY HELP");
+ }
+}
diff --git a/src/quicklist.c b/src/quicklist.c
index 9cb052525..c8b72743c 100644
--- a/src/quicklist.c
+++ b/src/quicklist.c
@@ -1192,12 +1192,12 @@ quicklist *quicklistDup(quicklist *orig) {
current = current->next) {
quicklistNode *node = quicklistCreateNode();
- if (node->encoding == QUICKLIST_NODE_ENCODING_LZF) {
- quicklistLZF *lzf = (quicklistLZF *)node->zl;
+ if (current->encoding == QUICKLIST_NODE_ENCODING_LZF) {
+ quicklistLZF *lzf = (quicklistLZF *)current->zl;
size_t lzf_sz = sizeof(*lzf) + lzf->sz;
node->zl = zmalloc(lzf_sz);
memcpy(node->zl, current->zl, lzf_sz);
- } else if (node->encoding == QUICKLIST_NODE_ENCODING_RAW) {
+ } else if (current->encoding == QUICKLIST_NODE_ENCODING_RAW) {
node->zl = zmalloc(current->sz);
memcpy(node->zl, current->zl, current->sz);
}
diff --git a/src/rdb.c b/src/rdb.c
index 0cda23c5d..2689b172d 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -567,17 +567,30 @@ int rdbLoadDoubleValue(rio *rdb, double *val) {
* Return -1 on error, the size of the serialized value on success. */
int rdbSaveBinaryDoubleValue(rio *rdb, double val) {
memrev64ifbe(&val);
- return rdbWriteRaw(rdb,&val,8);
+ return rdbWriteRaw(rdb,&val,sizeof(val));
}
/* Loads a double from RDB 8 or greater. See rdbSaveBinaryDoubleValue() for
* more info. On error -1 is returned, otherwise 0. */
int rdbLoadBinaryDoubleValue(rio *rdb, double *val) {
- if (rioRead(rdb,val,8) == 0) return -1;
+ if (rioRead(rdb,val,sizeof(*val)) == 0) return -1;
memrev64ifbe(val);
return 0;
}
+/* Like rdbSaveBinaryDoubleValue() but single precision. */
+int rdbSaveBinaryFloatValue(rio *rdb, float val) {
+ memrev32ifbe(&val);
+ return rdbWriteRaw(rdb,&val,sizeof(val));
+}
+
+/* Like rdbLoadBinaryDoubleValue() but single precision. */
+int rdbLoadBinaryFloatValue(rio *rdb, float *val) {
+ if (rioRead(rdb,val,sizeof(*val)) == 0) return -1;
+ memrev32ifbe(val);
+ return 0;
+}
+
/* Save the object type of object "o". */
int rdbSaveObjectType(rio *rdb, robj *o) {
switch (o->type) {
@@ -757,6 +770,10 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) {
/* Then write the module-specific representation. */
mt->rdb_save(&io,mv->value);
+ if (io.ctx) {
+ moduleFreeContext(io.ctx);
+ zfree(io.ctx);
+ }
return io.error ? -1 : (ssize_t)io.bytes;
} else {
serverPanic("Unknown object type");
@@ -818,7 +835,7 @@ int rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
}
/* Save a few default AUX fields with information about the RDB generated. */
-int rdbSaveInfoAuxFields(rio *rdb, int flags) {
+int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0;
@@ -827,7 +844,19 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags) {
if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1;
+
+ /* Handle saving options that generate aux fields. */
+ if (rsi) {
+ if (rsi->repl_stream_db &&
+ rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db)
+ == -1)
+ {
+ return -1;
+ }
+ }
if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
+ if (rdbSaveAuxFieldStrStr(rdb,"repl-id",server.replid) == -1) return -1;
+ if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset) == -1) return -1;
return 1;
}
@@ -839,7 +868,7 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags) {
* When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
-int rdbSaveRio(rio *rdb, int *error, int flags) {
+int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10];
@@ -852,7 +881,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags) {
rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
- if (rdbSaveInfoAuxFields(rdb,flags) == -1) goto werr;
+ if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
@@ -928,7 +957,7 @@ werr:
* While the suffix is the 40 bytes hex string we announced in the prefix.
* This way processes receiving the payload can understand when it ends
* without doing any processing of the content. */
-int rdbSaveRioWithEOFMark(rio *rdb, int *error) {
+int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) {
char eofmark[RDB_EOF_MARK_SIZE];
getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE);
@@ -936,7 +965,7 @@ int rdbSaveRioWithEOFMark(rio *rdb, int *error) {
if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
- if (rdbSaveRio(rdb,error,RDB_SAVE_NONE) == C_ERR) goto werr;
+ if (rdbSaveRio(rdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
return C_OK;
@@ -947,7 +976,7 @@ werr: /* Write error. */
}
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
-int rdbSave(char *filename) {
+int rdbSave(char *filename, rdbSaveInfo *rsi) {
char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
FILE *fp;
@@ -968,7 +997,7 @@ int rdbSave(char *filename) {
}
rioInitWithFile(&rdb,fp);
- if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE) == C_ERR) {
+ if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
errno = error;
goto werr;
}
@@ -1006,7 +1035,7 @@ werr:
return C_ERR;
}
-int rdbSaveBackground(char *filename) {
+int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
pid_t childpid;
long long start;
@@ -1014,6 +1043,7 @@ int rdbSaveBackground(char *filename) {
server.dirty_before_bgsave = server.dirty;
server.lastbgsave_try = time(NULL);
+ openChildInfoPipe();
start = ustime();
if ((childpid = fork()) == 0) {
@@ -1022,15 +1052,18 @@ int rdbSaveBackground(char *filename) {
/* Child */
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-bgsave");
- retval = rdbSave(filename);
+ retval = rdbSave(filename,rsi);
if (retval == C_OK) {
- size_t private_dirty = zmalloc_get_private_dirty();
+ size_t private_dirty = zmalloc_get_private_dirty(-1);
if (private_dirty) {
serverLog(LL_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
+
+ server.child_info_data.cow_size = private_dirty;
+ sendChildInfo(CHILD_INFO_TYPE_RDB);
}
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
@@ -1039,6 +1072,7 @@ int rdbSaveBackground(char *filename) {
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
if (childpid == -1) {
+ closeChildInfoPipe();
server.lastbgsave_status = C_ERR;
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));
@@ -1388,7 +1422,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
-int rdbLoadRio(rio *rdb) {
+int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi) {
uint64_t dbid;
int type, rdbver;
redisDb *db = server.db+0;
@@ -1479,6 +1513,15 @@ int rdbLoadRio(rio *rdb) {
serverLog(LL_NOTICE,"RDB '%s': %s",
(char*)auxkey->ptr,
(char*)auxval->ptr);
+ } else if (!strcasecmp(auxkey->ptr,"repl-stream-db")) {
+ if (rsi) rsi->repl_stream_db = atoi(auxval->ptr);
+ } else if (!strcasecmp(auxkey->ptr,"repl-id")) {
+ if (rsi && sdslen(auxval->ptr) == CONFIG_RUN_ID_SIZE) {
+ memcpy(rsi->repl_id,auxval->ptr,CONFIG_RUN_ID_SIZE+1);
+ rsi->repl_id_is_set = 1;
+ }
+ } else if (!strcasecmp(auxkey->ptr,"repl-offset")) {
+ if (rsi) rsi->repl_offset = strtoll(auxval->ptr,NULL,10);
} else {
/* We ignore fields we don't understand, as by AUX field
* contract. */
@@ -1509,7 +1552,7 @@ int rdbLoadRio(rio *rdb) {
dbAdd(db,key,val);
/* Set the expire time if needed */
- if (expiretime != -1) setExpire(db,key,expiretime);
+ if (expiretime != -1) setExpire(NULL,db,key,expiretime);
decrRefCount(key);
}
@@ -1537,8 +1580,11 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
/* Like rdbLoadRio() but takes a filename instead of a rio stream. The
* filename is open for reading and a rio stream object created in order
* to do the actual loading. Moreover the ETA displayed in the INFO
- * output is initialized and finalized. */
-int rdbLoad(char *filename) {
+ * output is initialized and finalized.
+ *
+ * If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the
+ * loading code will fiil the information fields in the structure. */
+int rdbLoad(char *filename, rdbSaveInfo *rsi) {
FILE *fp;
rio rdb;
int retval;
@@ -1546,7 +1592,7 @@ int rdbLoad(char *filename) {
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
startLoading(fp);
rioInitWithFile(&rdb,fp);
- retval = rdbLoadRio(&rdb);
+ retval = rdbLoadRio(&rdb,rsi);
fclose(fp);
stopLoading();
return retval;
@@ -1699,7 +1745,7 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) {
/* Spawn an RDB child that writes the RDB to the sockets of the slaves
* that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */
-int rdbSaveToSlavesSockets(void) {
+int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
int *fds;
uint64_t *clientids;
int numfds;
@@ -1744,6 +1790,7 @@ int rdbSaveToSlavesSockets(void) {
}
/* Create the child process. */
+ openChildInfoPipe();
start = ustime();
if ((childpid = fork()) == 0) {
/* Child */
@@ -1756,12 +1803,12 @@ int rdbSaveToSlavesSockets(void) {
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-to-slaves");
- retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL);
+ retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL,rsi);
if (retval == C_OK && rioFlush(&slave_sockets) == 0)
retval = C_ERR;
if (retval == C_OK) {
- size_t private_dirty = zmalloc_get_private_dirty();
+ size_t private_dirty = zmalloc_get_private_dirty(-1);
if (private_dirty) {
serverLog(LL_NOTICE,
@@ -1769,6 +1816,9 @@ int rdbSaveToSlavesSockets(void) {
private_dirty/(1024*1024));
}
+ server.child_info_data.cow_size = private_dirty;
+ sendChildInfo(CHILD_INFO_TYPE_RDB);
+
/* If we are returning OK, at least one slave was served
* with the RDB file as expected, so we need to send a report
* to the parent via the pipe. The format of the message is:
@@ -1837,6 +1887,7 @@ int rdbSaveToSlavesSockets(void) {
}
close(pipefds[0]);
close(pipefds[1]);
+ closeChildInfoPipe();
} else {
serverLog(LL_NOTICE,"Background RDB transfer started by pid %d",
childpid);
@@ -1857,7 +1908,7 @@ void saveCommand(client *c) {
addReplyError(c,"Background save already in progress");
return;
}
- if (rdbSave(server.rdb_filename) == C_OK) {
+ if (rdbSave(server.rdb_filename,NULL) == C_OK) {
addReply(c,shared.ok);
} else {
addReply(c,shared.err);
@@ -1888,10 +1939,10 @@ void bgsaveCommand(client *c) {
} else {
addReplyError(c,
"An AOF log rewriting in progress: can't BGSAVE right now. "
- "Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenver "
+ "Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever "
"possible.");
}
- } else if (rdbSaveBackground(server.rdb_filename) == C_OK) {
+ } else if (rdbSaveBackground(server.rdb_filename,NULL) == C_OK) {
addReplyStatus(c,"Background saving started");
} else {
addReply(c,shared.err);
diff --git a/src/rdb.h b/src/rdb.h
index cd1d65392..efe932255 100644
--- a/src/rdb.h
+++ b/src/rdb.h
@@ -118,11 +118,11 @@ uint64_t rdbLoadLen(rio *rdb, int *isencoded);
int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr);
int rdbSaveObjectType(rio *rdb, robj *o);
int rdbLoadObjectType(rio *rdb);
-int rdbLoad(char *filename);
-int rdbSaveBackground(char *filename);
-int rdbSaveToSlavesSockets(void);
+int rdbLoad(char *filename, rdbSaveInfo *rsi);
+int rdbSaveBackground(char *filename, rdbSaveInfo *rsi);
+int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
void rdbRemoveTempFile(pid_t childpid);
-int rdbSave(char *filename);
+int rdbSave(char *filename, rdbSaveInfo *rsi);
ssize_t rdbSaveObject(rio *rdb, robj *o);
size_t rdbSavedObjectLen(robj *o);
robj *rdbLoadObject(int type, rio *rdb);
@@ -134,6 +134,8 @@ ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len);
void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr);
int rdbSaveBinaryDoubleValue(rio *rdb, double val);
int rdbLoadBinaryDoubleValue(rio *rdb, double *val);
-int rdbLoadRio(rio *rdb);
+int rdbSaveBinaryFloatValue(rio *rdb, float val);
+int rdbLoadBinaryFloatValue(rio *rdb, float *val);
+int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi);
#endif
diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c
index 50905c872..f382019a4 100644
--- a/src/redis-benchmark.c
+++ b/src/redis-benchmark.c
@@ -565,7 +565,7 @@ invalid:
usage:
printf(
-"Usage: redis-benchmark [-h <host>] [-p <port>] [-c <clients>] [-n <requests]> [-k <boolean>]\n\n"
+"Usage: redis-benchmark [-h <host>] [-p <port>] [-c <clients>] [-n <requests>] [-k <boolean>]\n\n"
" -h <hostname> Server hostname (default 127.0.0.1)\n"
" -p <port> Server port (default 6379)\n"
" -s <socket> Server socket (overrides host and port)\n"
diff --git a/src/redis-cli.c b/src/redis-cli.c
index d1735d638..ac4358220 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -275,6 +275,10 @@ static void cliIntegrateHelp(void) {
* don't already match what we have. */
for (size_t j = 0; j < reply->elements; j++) {
redisReply *entry = reply->element[j];
+ if (entry->type != REDIS_REPLY_ARRAY || entry->elements < 4 ||
+ entry->element[0]->type != REDIS_REPLY_STRING ||
+ entry->element[1]->type != REDIS_REPLY_INTEGER ||
+ entry->element[3]->type != REDIS_REPLY_INTEGER) return;
char *cmdname = entry->element[0]->str;
int i;
@@ -336,7 +340,7 @@ static void cliOutputGenericHelp(void) {
" \"help <tab>\" to get a list of possible help topics\n"
" \"quit\" to exit\n"
"\n"
- "To set redis-cli perferences:\n"
+ "To set redis-cli preferences:\n"
" \":set hints\" enable online hints\n"
" \":set nohints\" disable online hints\n"
"Set your preferences in ~/.redisclirc\n",
@@ -843,8 +847,10 @@ static int cliSendCommand(int argc, char **argv, int repeat) {
output_raw = 0;
if (!strcasecmp(command,"info") ||
(argc >= 2 && !strcasecmp(command,"debug") &&
- ((!strcasecmp(argv[1],"jemalloc") && !strcasecmp(argv[2],"info")) ||
- !strcasecmp(argv[1],"htstats"))) ||
+ !strcasecmp(argv[1],"htstats")) ||
+ (argc >= 2 && !strcasecmp(command,"memory") &&
+ (!strcasecmp(argv[1],"malloc-stats") ||
+ !strcasecmp(argv[1],"doctor"))) ||
(argc == 2 && !strcasecmp(command,"cluster") &&
(!strcasecmp(argv[1],"nodes") ||
!strcasecmp(argv[1],"info"))) ||
@@ -1220,7 +1226,7 @@ static sds *cliSplitArgs(char *line, int *argc) {
}
}
-/* Set the CLI perferences. This function is invoked when an interactive
+/* Set the CLI preferences. This function is invoked when an interactive
* ":command" is called, or when reading ~/.redisclirc file, in order to
* set user preferences. */
void cliSetPreferences(char **argv, int argc, int interactive) {
@@ -1255,6 +1261,7 @@ void cliLoadPreferences(void) {
if (argc > 0) cliSetPreferences(argv,argc,0);
sdsfreesplitres(argv,argc);
}
+ fclose(fp);
}
sdsfree(rcfile);
}
diff --git a/src/redis-trib.rb b/src/redis-trib.rb
index b40b5decb..39db97947 100755
--- a/src/redis-trib.rb
+++ b/src/redis-trib.rb
@@ -1445,7 +1445,7 @@ class RedisTrib
xputs ">>> Importing data from #{source_addr} to cluster #{argv[1]}"
use_copy = opt['copy']
use_replace = opt['replace']
-
+
# Check the existing cluster.
load_cluster_info_from_node(argv[0])
check_cluster
@@ -1669,7 +1669,6 @@ ALLOWED_OPTIONS={
def show_help
puts "Usage: redis-trib <command> <options> <arguments ...>\n\n"
COMMANDS.each{|k,v|
- o = ""
puts " #{k.ljust(15)} #{v[2]}"
if ALLOWED_OPTIONS[k]
ALLOWED_OPTIONS[k].each{|optname,has_arg|
diff --git a/src/redismodule.h b/src/redismodule.h
index 0a35cf047..186e284c0 100644
--- a/src/redismodule.h
+++ b/src/redismodule.h
@@ -68,6 +68,8 @@
#define REDISMODULE_POSITIVE_INFINITE (1.0/0.0)
#define REDISMODULE_NEGATIVE_INFINITE (-1.0/0.0)
+#define REDISMODULE_NOT_USED(V) ((void) V)
+
/* ------------------------- End of common defines ------------------------ */
#ifndef REDISMODULE_CORE
@@ -82,15 +84,28 @@ typedef struct RedisModuleCallReply RedisModuleCallReply;
typedef struct RedisModuleIO RedisModuleIO;
typedef struct RedisModuleType RedisModuleType;
typedef struct RedisModuleDigest RedisModuleDigest;
+typedef struct RedisModuleBlockedClient RedisModuleBlockedClient;
typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver);
typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value);
typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value);
+typedef size_t (*RedisModuleTypeMemUsageFunc)(void *value);
typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value);
typedef void (*RedisModuleTypeFreeFunc)(void *value);
+#define REDISMODULE_TYPE_METHOD_VERSION 1
+typedef struct RedisModuleTypeMethods {
+ uint64_t version;
+ RedisModuleTypeLoadFunc rdb_load;
+ RedisModuleTypeSaveFunc rdb_save;
+ RedisModuleTypeRewriteFunc aof_rewrite;
+ RedisModuleTypeMemUsageFunc mem_usage;
+ RedisModuleTypeDigestFunc digest;
+ RedisModuleTypeFreeFunc free;
+} RedisModuleTypeMethods;
+
#define REDISMODULE_GET_API(name) \
RedisModule_GetApi("RedisModule_" #name, ((void **)&RedisModule_ ## name))
@@ -125,6 +140,7 @@ RedisModuleCallReply *REDISMODULE_API_FUNC(RedisModule_CallReplyArrayElement)(Re
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateString)(RedisModuleCtx *ctx, const char *ptr, size_t len);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromLongLong)(RedisModuleCtx *ctx, long long ll);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringFromString)(RedisModuleCtx *ctx, const RedisModuleString *str);
+RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CreateStringPrintf)(RedisModuleCtx *ctx, const char *fmt, ...);
void REDISMODULE_API_FUNC(RedisModule_FreeString)(RedisModuleCtx *ctx, RedisModuleString *str);
const char *REDISMODULE_API_FUNC(RedisModule_StringPtrLen)(const RedisModuleString *str, size_t *len);
int REDISMODULE_API_FUNC(RedisModule_ReplyWithError)(RedisModuleCtx *ctx, const char *err);
@@ -168,7 +184,7 @@ int REDISMODULE_API_FUNC(RedisModule_IsKeysPositionRequest)(RedisModuleCtx *ctx)
void REDISMODULE_API_FUNC(RedisModule_KeyAtPos)(RedisModuleCtx *ctx, int pos);
unsigned long long REDISMODULE_API_FUNC(RedisModule_GetClientId)(RedisModuleCtx *ctx);
void *REDISMODULE_API_FUNC(RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes);
-RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeLoadFunc rdb_load, RedisModuleTypeSaveFunc rdb_save, RedisModuleTypeRewriteFunc aof_rewrite, RedisModuleTypeDigestFunc digest, RedisModuleTypeFreeFunc free);
+RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeMethods *typemethods);
int REDISMODULE_API_FUNC(RedisModule_ModuleTypeSetValue)(RedisModuleKey *key, RedisModuleType *mt, void *value);
RedisModuleType *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetType)(RedisModuleKey *key);
void *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetValue)(RedisModuleKey *key);
@@ -183,10 +199,21 @@ RedisModuleString *REDISMODULE_API_FUNC(RedisModule_LoadString)(RedisModuleIO *i
char *REDISMODULE_API_FUNC(RedisModule_LoadStringBuffer)(RedisModuleIO *io, size_t *lenptr);
void REDISMODULE_API_FUNC(RedisModule_SaveDouble)(RedisModuleIO *io, double value);
double REDISMODULE_API_FUNC(RedisModule_LoadDouble)(RedisModuleIO *io);
+void REDISMODULE_API_FUNC(RedisModule_SaveFloat)(RedisModuleIO *io, float value);
+float REDISMODULE_API_FUNC(RedisModule_LoadFloat)(RedisModuleIO *io);
void REDISMODULE_API_FUNC(RedisModule_Log)(RedisModuleCtx *ctx, const char *level, const char *fmt, ...);
+void REDISMODULE_API_FUNC(RedisModule_LogIOError)(RedisModuleIO *io, const char *levelstr, const char *fmt, ...);
int REDISMODULE_API_FUNC(RedisModule_StringAppendBuffer)(RedisModuleCtx *ctx, RedisModuleString *str, const char *buf, size_t len);
void REDISMODULE_API_FUNC(RedisModule_RetainString)(RedisModuleCtx *ctx, RedisModuleString *str);
int REDISMODULE_API_FUNC(RedisModule_StringCompare)(RedisModuleString *a, RedisModuleString *b);
+RedisModuleCtx *REDISMODULE_API_FUNC(RedisModule_GetContextFromIO)(RedisModuleIO *io);
+RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClient)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(void*), long long timeout_ms);
+int REDISMODULE_API_FUNC(RedisModule_UnblockClient)(RedisModuleBlockedClient *bc, void *privdata);
+int REDISMODULE_API_FUNC(RedisModule_IsBlockedReplyRequest)(RedisModuleCtx *ctx);
+int REDISMODULE_API_FUNC(RedisModule_IsBlockedTimeoutRequest)(RedisModuleCtx *ctx);
+void *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientPrivateData)(RedisModuleCtx *ctx);
+int REDISMODULE_API_FUNC(RedisModule_AbortBlock)(RedisModuleBlockedClient *bc);
+long long REDISMODULE_API_FUNC(RedisModule_Milliseconds)(void);
/* This is included inline inside each Redis module. */
static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) __attribute__((unused));
@@ -234,6 +261,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(CreateString);
REDISMODULE_GET_API(CreateStringFromLongLong);
REDISMODULE_GET_API(CreateStringFromString);
+ REDISMODULE_GET_API(CreateStringPrintf);
REDISMODULE_GET_API(FreeString);
REDISMODULE_GET_API(StringPtrLen);
REDISMODULE_GET_API(AutoMemory);
@@ -278,11 +306,22 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(LoadStringBuffer);
REDISMODULE_GET_API(SaveDouble);
REDISMODULE_GET_API(LoadDouble);
+ REDISMODULE_GET_API(SaveFloat);
+ REDISMODULE_GET_API(LoadFloat);
REDISMODULE_GET_API(EmitAOF);
REDISMODULE_GET_API(Log);
+ REDISMODULE_GET_API(LogIOError);
REDISMODULE_GET_API(StringAppendBuffer);
REDISMODULE_GET_API(RetainString);
REDISMODULE_GET_API(StringCompare);
+ REDISMODULE_GET_API(GetContextFromIO);
+ REDISMODULE_GET_API(BlockClient);
+ REDISMODULE_GET_API(UnblockClient);
+ REDISMODULE_GET_API(IsBlockedReplyRequest);
+ REDISMODULE_GET_API(IsBlockedTimeoutRequest);
+ REDISMODULE_GET_API(GetBlockedClientPrivateData);
+ REDISMODULE_GET_API(AbortBlock);
+ REDISMODULE_GET_API(Milliseconds);
RedisModule_SetModuleAttribs(ctx,name,ver,apiver);
return REDISMODULE_OK;
diff --git a/src/replication.c b/src/replication.c
index fb96fac1b..df2e23f3a 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -79,11 +79,6 @@ void createReplicationBacklog(void) {
server.repl_backlog = zmalloc(server.repl_backlog_size);
server.repl_backlog_histlen = 0;
server.repl_backlog_idx = 0;
- /* When a new backlog buffer is created, we increment the replication
- * offset by one to make sure we'll not be able to PSYNC with any
- * previous slave. This is needed because we avoid incrementing the
- * master_repl_offset if no backlog exists nor slaves are attached. */
- server.master_repl_offset++;
/* We don't have any data inside our buffer, but virtually the first
* byte we have is the next byte that will be generated for the
@@ -170,12 +165,24 @@ void feedReplicationBacklogWithObject(robj *o) {
feedReplicationBacklog(p,len);
}
+/* Propagate write commands to slaves, and populate the replication backlog
+ * as well. This function is used if the instance is a master: we use
+ * the commands received by our clients in order to create the replication
+ * stream. Instead if the instance is a slave and has sub-slaves attached,
+ * we use replicationFeedSlavesFromMaster() */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j, len;
char llstr[LONG_STR_SIZE];
+ /* If the instance is not a top level master, return ASAP: we'll just proxy
+ * the stream of data we receive from our master instead, in order to
+ * propagate *identical* replication stream. In this way this slave can
+ * advertise the same replication ID as the master (since it shares the
+ * master replication history and has the same backlog and offsets). */
+ if (server.masterhost != NULL) return;
+
/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP. */
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
@@ -244,7 +251,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
}
/* Write the command to every slave. */
- listRewind(server.slaves,&li);
+ listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
@@ -265,6 +272,34 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
}
}
+/* This function is used in order to proxy what we receive from our master
+ * to our sub-slaves. */
+#include <ctype.h>
+void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) {
+ listNode *ln;
+ listIter li;
+
+ /* Debugging: this is handy to see the stream sent from master
+ * to slaves. Disabled with if(0). */
+ if (0) {
+ printf("%zu:",buflen);
+ for (size_t j = 0; j < buflen; j++) {
+ printf("%c", isprint(buf[j]) ? buf[j] : '.');
+ }
+ printf("\n");
+ }
+
+ if (server.repl_backlog) feedReplicationBacklog(buf,buflen);
+ listRewind(slaves,&li);
+ while((ln = listNext(&li))) {
+ client *slave = ln->value;
+
+ /* Don't feed slaves that are still waiting for BGSAVE to start */
+ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
+ addReplyString(slave,buf,buflen);
+ }
+}
+
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
@@ -329,7 +364,7 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
skip = offset - server.repl_backlog_off;
serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);
- /* Point j to the oldest byte, that is actaully our
+ /* Point j to the oldest byte, that is actually our
* server.repl_backlog_off byte. */
j = (server.repl_backlog_idx +
(server.repl_backlog_size-server.repl_backlog_histlen)) %
@@ -361,18 +396,14 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
* the BGSAVE process started and before executing any other command
* from clients. */
long long getPsyncInitialOffset(void) {
- long long psync_offset = server.master_repl_offset;
- /* Add 1 to psync_offset if it the replication backlog does not exists
- * as when it will be created later we'll increment the offset by one. */
- if (server.repl_backlog == NULL) psync_offset++;
- return psync_offset;
+ return server.master_repl_offset;
}
/* Send a FULLRESYNC reply in the specific case of a full resynchronization,
* as a side effect setup the slave for a full sync in different ways:
*
- * 1) Remember, into the slave client structure, the offset we sent
- * here, so that if new slaves will later attach to the same
+ * 1) Remember, into the slave client structure, the replication offset
+ * we sent here, so that if new slaves will later attach to the same
* background RDB saving process (by duplicating this client output
* buffer), we can get the right offset from this slave.
* 2) Set the replication state of the slave to WAIT_BGSAVE_END so that
@@ -392,14 +423,14 @@ int replicationSetupSlaveForFullResync(client *slave, long long offset) {
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
/* We are going to accumulate the incremental changes for this
* slave as well. Set slaveseldb to -1 in order to force to re-emit
- * a SLEECT statement in the replication stream. */
+ * a SELECT statement in the replication stream. */
server.slaveseldb = -1;
/* Don't send this reply to slaves that approached us with
* the old SYNC command. */
if (!(slave->flags & CLIENT_PRE_PSYNC)) {
buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
- server.runid,offset);
+ server.replid,offset);
if (write(slave->fd,buf,buflen) != buflen) {
freeClientAsync(slave);
return C_ERR;
@@ -415,19 +446,40 @@ int replicationSetupSlaveForFullResync(client *slave, long long offset) {
* with the usual full resync. */
int masterTryPartialResynchronization(client *c) {
long long psync_offset, psync_len;
- char *master_runid = c->argv[1]->ptr;
+ char *master_replid = c->argv[1]->ptr;
char buf[128];
int buflen;
- /* Is the runid of this master the same advertised by the wannabe slave
- * via PSYNC? If runid changed this master is a different instance and
- * there is no way to continue. */
- if (strcasecmp(master_runid, server.runid)) {
+ /* Parse the replication offset asked by the slave. Go to full sync
+ * on parse error: this should never happen but we try to handle
+ * it in a robust way compared to aborting. */
+ if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
+ C_OK) goto need_full_resync;
+
+ /* Is the replication ID of this master the same advertised by the wannabe
+ * slave via PSYNC? If the replication ID changed this master has a
+ * different replication history, and there is no way to continue.
+ *
+ * Note that there are two potentially valid replication IDs: the ID1
+ * and the ID2. The ID2 however is only valid up to a specific offset. */
+ if (strcasecmp(master_replid, server.replid) &&
+ (strcasecmp(master_replid, server.replid2) ||
+ psync_offset > server.second_replid_offset))
+ {
/* Run id "?" is used by slaves that want to force a full resync. */
- if (master_runid[0] != '?') {
- serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
- "Runid mismatch (Client asked for runid '%s', my runid is '%s')",
- master_runid, server.runid);
+ if (master_replid[0] != '?') {
+ if (strcasecmp(master_replid, server.replid) &&
+ strcasecmp(master_replid, server.replid2))
+ {
+ serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
+ "Replication ID mismatch (Slave asked for '%s', my "
+ "replication IDs are '%s' and '%s')",
+ master_replid, server.replid, server.replid2);
+ } else {
+ serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
+ "Requested offset for second ID was %lld, but I can reply "
+ "up to %lld", psync_offset, server.second_replid_offset);
+ }
} else {
serverLog(LL_NOTICE,"Full resync requested by slave %s",
replicationGetSlaveName(c));
@@ -436,8 +488,6 @@ int masterTryPartialResynchronization(client *c) {
}
/* We still have the data our slave is asking for? */
- if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
- C_OK) goto need_full_resync;
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
@@ -463,7 +513,11 @@ int masterTryPartialResynchronization(client *c) {
/* We can't use the connection buffers since they are used to accumulate
* new commands at this stage. But we are sure the socket send buffer is
* empty so this write will never fail actually. */
- buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
+ if (c->slave_capa & SLAVE_CAPA_PSYNC2) {
+ buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid);
+ } else {
+ buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
+ }
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return C_OK;
@@ -515,10 +569,18 @@ int startBgsaveForReplication(int mincapa) {
serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
socket_target ? "slaves sockets" : "disk");
+ rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
+ /* If we are saving for a chained slave (that is, if we are,
+ * in turn, a slave of another instance), make sure after
+ * loadig the RDB, our slaves select the right DB: we'll just
+ * send the replication stream we receive from our master, so
+ * no way to send SELECT commands. */
+ if (server.master) rsi.repl_stream_db = server.master->db->id;
+
if (socket_target)
- retval = rdbSaveToSlavesSockets();
+ retval = rdbSaveToSlavesSockets(&rsi);
else
- retval = rdbSaveBackground(server.rdb_filename);
+ retval = rdbSaveBackground(server.rdb_filename,&rsi);
/* If we failed to BGSAVE, remove the slaves waiting for a full
* resynchorinization from the list of salves, inform them with
@@ -568,7 +630,7 @@ void syncCommand(client *c) {
/* Refuse SYNC requests if we are a slave but the link with our master
* is not ok... */
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
- addReplyError(c,"Can't SYNC while not connected with my master");
+ addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n"));
return;
}
@@ -589,22 +651,22 @@ void syncCommand(client *c) {
* when this happens masterTryPartialResynchronization() already
* replied with:
*
- * +FULLRESYNC <runid> <offset>
+ * +FULLRESYNC <replid> <offset>
*
- * So the slave knows the new runid and offset to try a PSYNC later
+ * So the slave knows the new replid and offset to try a PSYNC later
* if the connection with the master is lost. */
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
if (masterTryPartialResynchronization(c) == C_OK) {
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
- char *master_runid = c->argv[1]->ptr;
+ char *master_replid = c->argv[1]->ptr;
/* Increment stats for failed PSYNCs, but only if the
- * runid is not "?", as this is used by slaves to force a full
+ * replid is not "?", as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
- if (master_runid[0] != '?') server.stat_sync_partial_err++;
+ if (master_replid[0] != '?') server.stat_sync_partial_err++;
}
} else {
/* If a slave uses SYNC, we are dealing with an old implementation
@@ -625,6 +687,16 @@ void syncCommand(client *c) {
c->flags |= CLIENT_SLAVE;
listAddNodeTail(server.slaves,c);
+ /* Create the replication backlog if needed. */
+ if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
+ /* When we create the backlog from scratch, we always use a new
+ * replication ID and clear the ID2, since there is no valid
+ * past history. */
+ changeReplicationId();
+ clearReplicationId2();
+ createReplicationBacklog();
+ }
+
/* CASE 1: BGSAVE is in progress, with disk target. */
if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_DISK)
@@ -685,9 +757,6 @@ void syncCommand(client *c) {
}
}
}
-
- if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
- createReplicationBacklog();
return;
}
@@ -735,6 +804,8 @@ void replconfCommand(client *c) {
/* Ignore capabilities not understood by this master. */
if (!strcasecmp(c->argv[j+1]->ptr,"eof"))
c->slave_capa |= SLAVE_CAPA_EOF;
+ else if (!strcasecmp(c->argv[j+1]->ptr,"psync2"))
+ c->slave_capa |= SLAVE_CAPA_PSYNC2;
} else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
/* REPLCONF ACK is used by slave to inform the master the amount
* of replication stream that it processed so far. It is an
@@ -758,7 +829,7 @@ void replconfCommand(client *c) {
/* REPLCONF GETACK is used in order to request an ACK ASAP
* to the slave. */
if (server.masterhost && server.master) replicationSendAck();
- /* Note: this command does not reply anything! */
+ return;
} else {
addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
(char*)c->argv[j]->ptr);
@@ -928,6 +999,43 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
if (startbgsave) startBgsaveForReplication(mincapa);
}
+/* Change the current instance replication ID with a new, random one.
+ * This will prevent successful PSYNCs between this master and other
+ * slaves, so the command should be called when something happens that
+ * alters the current story of the dataset. */
+void changeReplicationId(void) {
+ getRandomHexChars(server.replid,CONFIG_RUN_ID_SIZE);
+ server.replid[CONFIG_RUN_ID_SIZE] = '\0';
+}
+
+/* Clear (invalidate) the secondary replication ID. This happens, for
+ * example, after a full resynchronization, when we start a new replication
+ * history. */
+void clearReplicationId2(void) {
+ memset(server.replid2,'0',sizeof(server.replid));
+ server.replid2[CONFIG_RUN_ID_SIZE] = '\0';
+ server.second_replid_offset = -1;
+}
+
+/* Use the current replication ID / offset as secondary replication
+ * ID, and change the current one in order to start a new history.
+ * This should be used when an instance is switched from slave to master
+ * so that it can serve PSYNC requests performed using the master
+ * replication ID. */
+void shiftReplicationId(void) {
+ memcpy(server.replid2,server.replid,sizeof(server.replid));
+ /* We set the second replid offset to the master offset + 1, since
+ * the slave will ask for the first byte it has not yet received, so
+ * we need to add one to the offset: for example if, as a slave, we are
+ * sure we have the same history as the master for 50 bytes, after we
+ * are turned into a master, we can accept a PSYNC request with offset
+ * 51, since the slave asking has the same history up to the 50th
+ * byte, and is asking for the new bytes starting at offset 51. */
+ server.second_replid_offset = server.master_repl_offset+1;
+ changeReplicationId();
+ serverLog(LL_WARNING,"Setting secondary replication ID to %s, valid up to offset: %lld. New replication ID is %s", server.replid2, server.second_replid_offset, server.replid);
+}
+
/* ----------------------------------- SLAVE -------------------------------- */
/* Returns 1 if the given replication state is a handshake state,
@@ -965,18 +1073,30 @@ void replicationEmptyDbCallback(void *privdata) {
/* Once we have a link with the master and the synchroniziation was
* performed, this function materializes the master client we store
* at server.master, starting from the specified file descriptor. */
-void replicationCreateMasterClient(int fd) {
+void replicationCreateMasterClient(int fd, int dbid) {
server.master = createClient(fd);
server.master->flags |= CLIENT_MASTER;
server.master->authenticated = 1;
- server.repl_state = REPL_STATE_CONNECTED;
- server.master->reploff = server.repl_master_initial_offset;
- memcpy(server.master->replrunid, server.repl_master_runid,
- sizeof(server.repl_master_runid));
+ server.master->reploff = server.master_initial_offset;
+ memcpy(server.master->replid, server.master_replid,
+ sizeof(server.master_replid));
/* If master offset is set to -1, this master is old and is not
* PSYNC capable, so we flag it accordingly. */
if (server.master->reploff == -1)
server.master->flags |= CLIENT_PRE_PSYNC;
+ if (dbid != -1) selectDb(server.master,dbid);
+}
+
+void restartAOF() {
+ int retry = 10;
+ while (retry-- && startAppendOnly() == C_ERR) {
+ serverLog(LL_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second.");
+ sleep(1);
+ }
+ if (!retry) {
+ serverLog(LL_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");
+ exit(1);
+ }
}
/* Asynchronously read the SYNC payload we receive from a master */
@@ -1120,12 +1240,17 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
}
if (eof_reached) {
+ int aof_is_enabled = server.aof_state != AOF_OFF;
+
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
cancelReplicationHandshake();
return;
}
serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");
+ /* We need to stop any AOFRW fork before flusing and parsing
+ * RDB, otherwise we'll create a copy-on-write disaster. */
+ if(aof_is_enabled) stopAppendOnly();
signalFlushedDb(-1);
emptyDb(
-1,
@@ -1137,34 +1262,38 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
* time for non blocking loading. */
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
- if (rdbLoad(server.rdb_filename) != C_OK) {
+ rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
+ if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {
serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
cancelReplicationHandshake();
+ /* Re-enable the AOF if we disabled it earlier, in order to restore
+ * the original configuration. */
+ if (aof_is_enabled) restartAOF();
return;
}
/* Final setup of the connected slave <- master link */
zfree(server.repl_transfer_tmpfile);
close(server.repl_transfer_fd);
- replicationCreateMasterClient(server.repl_transfer_s);
+ replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
+ server.repl_state = REPL_STATE_CONNECTED;
+ /* After a full resynchroniziation we use the replication ID and
+ * offset of the master. The secondary ID / offset are cleared since
+ * we are starting a new history. */
+ memcpy(server.replid,server.master->replid,sizeof(server.replid));
+ server.master_repl_offset = server.master->reploff;
+ clearReplicationId2();
+ /* Let's create the replication backlog if needed. Slaves need to
+ * accumulate the backlog regardless of the fact they have sub-slaves
+ * or not, in order to behave correctly if they are promoted to
+ * masters after a failover. */
+ if (server.repl_backlog == NULL) createReplicationBacklog();
+
serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
/* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite, and when done will start appending
* to the new file. */
- if (server.aof_state != AOF_OFF) {
- int retry = 10;
-
- stopAppendOnly();
- while (retry-- && startAppendOnly() == C_ERR) {
- serverLog(LL_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second.");
- sleep(1);
- }
- if (!retry) {
- serverLog(LL_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");
- exit(1);
- }
- }
+ if (aof_is_enabled) restartAOF();
}
-
return;
error:
@@ -1263,14 +1392,15 @@ char *sendSynchronousCommand(int flags, int fd, ...) {
* offset is saved.
* PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
* the caller should fall back to SYNC.
- * PSYNC_WRITE_ERR: There was an error writing the command to the socket.
+ * PSYNC_WRITE_ERROR: There was an error writing the command to the socket.
* PSYNC_WAIT_REPLY: Call again the function with read_reply set to 1.
+ * PSYNC_TRY_LATER: Master is currently in a transient error condition.
*
* Notable side effects:
*
* 1) As a side effect of the function call the function removes the readable
* event handler from "fd", unless the return value is PSYNC_WAIT_REPLY.
- * 2) server.repl_master_initial_offset is set to the right value according
+ * 2) server.master_initial_offset is set to the right value according
* to the master reply. This will be used to populate the 'server.master'
* structure replication offset.
*/
@@ -1280,32 +1410,33 @@ char *sendSynchronousCommand(int flags, int fd, ...) {
#define PSYNC_CONTINUE 2
#define PSYNC_FULLRESYNC 3
#define PSYNC_NOT_SUPPORTED 4
+#define PSYNC_TRY_LATER 5
int slaveTryPartialResynchronization(int fd, int read_reply) {
- char *psync_runid;
+ char *psync_replid;
char psync_offset[32];
sds reply;
/* Writing half */
if (!read_reply) {
- /* Initially set repl_master_initial_offset to -1 to mark the current
+ /* Initially set master_initial_offset to -1 to mark the current
* master run_id and offset as not valid. Later if we'll be able to do
* a FULL resync using the PSYNC command we'll set the offset at the
* right value, so that this information will be propagated to the
* client structure representing the master into server.master. */
- server.repl_master_initial_offset = -1;
+ server.master_initial_offset = -1;
if (server.cached_master) {
- psync_runid = server.cached_master->replrunid;
+ psync_replid = server.cached_master->replid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
- serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
+ serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
} else {
serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
- psync_runid = "?";
+ psync_replid = "?";
memcpy(psync_offset,"-1",3);
}
/* Issue the PSYNC command */
- reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_runid,psync_offset,NULL);
+ reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_replid,psync_offset,NULL);
if (reply != NULL) {
serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
sdsfree(reply);
@@ -1327,31 +1458,31 @@ int slaveTryPartialResynchronization(int fd, int read_reply) {
aeDeleteFileEvent(server.el,fd,AE_READABLE);
if (!strncmp(reply,"+FULLRESYNC",11)) {
- char *runid = NULL, *offset = NULL;
+ char *replid = NULL, *offset = NULL;
/* FULL RESYNC, parse the reply in order to extract the run id
* and the replication offset. */
- runid = strchr(reply,' ');
- if (runid) {
- runid++;
- offset = strchr(runid,' ');
+ replid = strchr(reply,' ');
+ if (replid) {
+ replid++;
+ offset = strchr(replid,' ');
if (offset) offset++;
}
- if (!runid || !offset || (offset-runid-1) != CONFIG_RUN_ID_SIZE) {
+ if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {
serverLog(LL_WARNING,
"Master replied with wrong +FULLRESYNC syntax.");
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
- * runid to make sure next PSYNCs will fail. */
- memset(server.repl_master_runid,0,CONFIG_RUN_ID_SIZE+1);
+ * replid to make sure next PSYNCs will fail. */
+ memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);
} else {
- memcpy(server.repl_master_runid, runid, offset-runid-1);
- server.repl_master_runid[CONFIG_RUN_ID_SIZE] = '\0';
- server.repl_master_initial_offset = strtoll(offset,NULL,10);
+ memcpy(server.master_replid, replid, offset-replid-1);
+ server.master_replid[CONFIG_RUN_ID_SIZE] = '\0';
+ server.master_initial_offset = strtoll(offset,NULL,10);
serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
- server.repl_master_runid,
- server.repl_master_initial_offset);
+ server.master_replid,
+ server.master_initial_offset);
}
/* We are going to full resync, discard the cached master structure. */
replicationDiscardCachedMaster();
@@ -1360,17 +1491,64 @@ int slaveTryPartialResynchronization(int fd, int read_reply) {
}
if (!strncmp(reply,"+CONTINUE",9)) {
- /* Partial resync was accepted, set the replication state accordingly */
+ /* Partial resync was accepted. */
serverLog(LL_NOTICE,
"Successful partial resynchronization with master.");
+
+ /* Check the new replication ID advertised by the master. If it
+ * changed, we need to set the new ID as primary ID, and set or
+ * secondary ID as the old master ID up to the current offset, so
+ * that our sub-slaves will be able to PSYNC with us after a
+ * disconnection. */
+ char *start = reply+10;
+ char *end = reply+9;
+ while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++;
+ if (end-start == CONFIG_RUN_ID_SIZE) {
+ char new[CONFIG_RUN_ID_SIZE+1];
+ memcpy(new,start,CONFIG_RUN_ID_SIZE);
+ new[CONFIG_RUN_ID_SIZE] = '\0';
+
+ if (strcmp(new,server.cached_master->replid)) {
+ /* Master ID changed. */
+ serverLog(LL_WARNING,"Master replication ID changed to %s",new);
+
+ /* Set the old ID as our ID2, up to the current offset+1. */
+ memcpy(server.replid2,server.cached_master->replid,
+ sizeof(server.replid2));
+ server.second_replid_offset = server.master_repl_offset+1;
+
+ /* Update the cached master ID and our own primary ID to the
+ * new one. */
+ memcpy(server.replid,new,sizeof(server.replid));
+ memcpy(server.cached_master->replid,new,sizeof(server.replid));
+
+ /* Disconnect all the sub-slaves: they need to be notified. */
+ disconnectSlaves();
+ }
+ }
+
+ /* Setup the replication to continue. */
sdsfree(reply);
replicationResurrectCachedMaster(fd);
return PSYNC_CONTINUE;
}
- /* If we reach this point we received either an error since the master does
- * not understand PSYNC, or an unexpected reply from the master.
- * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */
+ /* If we reach this point we received either an error (since the master does
+ * not understand PSYNC or because it is in a special state and cannot
+ * serve our request), or an unexpected reply from the master.
+ *
+ * Return PSYNC_NOT_SUPPORTED on errors we don't understand, otherwise
+ * return PSYNC_TRY_LATER if we believe this is a transient error. */
+
+ if (!strncmp(reply,"-NOMASTERLINK",13) ||
+ !strncmp(reply,"-LOADING",8))
+ {
+ serverLog(LL_NOTICE,
+ "Master is currently unable to PSYNC "
+ "but should be in the future: %s", reply);
+ sdsfree(reply);
+ return PSYNC_TRY_LATER;
+ }
if (strncmp(reply,"-ERR",4)) {
/* If it's not an error, log the unexpected event. */
@@ -1386,6 +1564,8 @@ int slaveTryPartialResynchronization(int fd, int read_reply) {
return PSYNC_NOT_SUPPORTED;
}
+/* This handler fires when the non blocking connect was able to
+ * establish a connection with the master. */
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
char tmpfile[256], *err = NULL;
int dfd, maxtries = 5;
@@ -1402,7 +1582,8 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
return;
}
- /* Check for errors in the socket. */
+ /* Check for errors in the socket: after a non blocking connect() we
+ * may find that the socket is in error state. */
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
sockerr = errno;
if (sockerr) {
@@ -1531,13 +1712,15 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
server.repl_state = REPL_STATE_SEND_CAPA;
}
- /* Inform the master of our capabilities. While we currently send
- * just one capability, it is possible to chain new capabilities here
- * in the form of REPLCONF capa X capa Y capa Z ...
+ /* Inform the master of our (slave) capabilities.
+ *
+ * EOF: supports EOF-style RDB transfer for diskless replication.
+ * PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
+ *
* The master will ignore capabilities it does not understand. */
if (server.repl_state == REPL_STATE_SEND_CAPA) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
- "capa","eof",NULL);
+ "capa","eof","capa","psync2",NULL);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_CAPA;
@@ -1582,6 +1765,12 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
psync_result = slaveTryPartialResynchronization(fd,1);
if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */
+ /* If the master is in an transient error, we should try to PSYNC
+ * from scratch later, so go to the error path. This happens when
+ * the server is loading the dataset or is not connected with its
+ * master and so forth. */
+ if (psync_result == PSYNC_TRY_LATER) goto error;
+
/* Note: if PSYNC does not return WAIT_REPLY, it will take care of
* uninstalling the read handler from the file descriptor. */
@@ -1591,14 +1780,14 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
}
/* PSYNC failed or is not supported: we want our slaves to resync with us
- * as well, if we have any (chained replication case). The mater may
- * transfer us an entirely different data set and we have no way to
- * incrementally feed our slaves after that. */
+ * as well, if we have any sub-slaves. The master may transfer us an
+ * entirely different data set and we have no way to incrementally feed
+ * our slaves after that. */
disconnectSlaves(); /* Force our slaves to resync with us as well. */
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
- * and the server.repl_master_runid and repl_master_initial_offset are
+ * and the server.master_replid and master_initial_offset are
* already populated. */
if (psync_result == PSYNC_NOT_SUPPORTED) {
serverLog(LL_NOTICE,"Retrying with SYNC...");
@@ -1727,17 +1916,24 @@ int cancelReplicationHandshake(void) {
/* Set replication to the specified master address and port. */
void replicationSetMaster(char *ip, int port) {
+ int was_master = server.masterhost == NULL;
+
sdsfree(server.masterhost);
server.masterhost = sdsnew(ip);
server.masterport = port;
- if (server.master) freeClient(server.master);
+ if (server.master) {
+ freeClient(server.master);
+ }
disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */
- disconnectSlaves(); /* Force our slaves to resync with us as well. */
- replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
- freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
+
+ /* Force our slaves to resync with us as well. They may hopefully be able
+ * to partially resync with us, but we can notify the replid change. */
+ disconnectSlaves();
cancelReplicationHandshake();
+ /* Before destroying our master state, create a cached master using
+ * our own parameters, to later PSYNC with the new master. */
+ if (was_master) replicationCacheMasterUsingMyself();
server.repl_state = REPL_STATE_CONNECT;
- server.master_repl_offset = 0;
server.repl_down_since = 0;
}
@@ -1746,20 +1942,26 @@ void replicationUnsetMaster(void) {
if (server.masterhost == NULL) return; /* Nothing to do. */
sdsfree(server.masterhost);
server.masterhost = NULL;
- if (server.master) {
- if (listLength(server.slaves) == 0) {
- /* If this instance is turned into a master and there are no
- * slaves, it inherits the replication offset from the master.
- * Under certain conditions this makes replicas comparable by
- * replication offset to understand what is the most updated. */
- server.master_repl_offset = server.master->reploff;
- freeReplicationBacklog();
- }
- freeClient(server.master);
- }
+ /* When a slave is turned into a master, the current replication ID
+ * (that was inherited from the master at synchronization time) is
+ * used as secondary ID up to the current offset, and a new replication
+ * ID is created to continue with a new replication history. */
+ shiftReplicationId();
+ if (server.master) freeClient(server.master);
replicationDiscardCachedMaster();
cancelReplicationHandshake();
+ /* Disconnecting all the slaves is required: we need to inform slaves
+ * of the replication ID change (see shiftReplicationId() call). However
+ * the slaves will be able to partially resync with us, so it will be
+ * a very fast reconnection. */
+ disconnectSlaves();
server.repl_state = REPL_STATE_NONE;
+
+ /* We need to make sure the new master will start the replication stream
+ * with a SELECT statement. This is forced after a full resync, but
+ * with PSYNC version 2, there is no need for full resync after a
+ * master switch. */
+ server.slaveseldb = -1;
}
/* This function is called when the slave lose the connection with the
@@ -1931,6 +2133,31 @@ void replicationCacheMaster(client *c) {
replicationHandleMasterDisconnection();
}
+/* This function is called when a master is turend into a slave, in order to
+ * create from scratch a cached master for the new client, that will allow
+ * to PSYNC with the slave that was promoted as the new master after a
+ * failover.
+ *
+ * Assuming this instance was previously the master instance of the new master,
+ * the new master will accept its replication ID, and potentiall also the
+ * current offset if no data was lost during the failover. So we use our
+ * current replication ID and offset in order to synthesize a cached master. */
+void replicationCacheMasterUsingMyself(void) {
+ /* The master client we create can be set to any DBID, because
+ * the new master will start its replication stream with SELECT. */
+ server.master_initial_offset = server.master_repl_offset;
+ replicationCreateMasterClient(-1,-1);
+
+ /* Use our own ID / offset. */
+ memcpy(server.master->replid, server.replid, sizeof(server.replid));
+
+ /* Set as cached master. */
+ unlinkClient(server.master);
+ server.cached_master = server.master;
+ server.master = NULL;
+ serverLog(LL_NOTICE,"Before turning into a slave, using my master parameters to synthesize a cached master: I may be able to synchronize with the new master with just a partial transfer.");
+}
+
/* Free a cached master, called when there are no longer the conditions for
* a partial resync on reconnection. */
void replicationDiscardCachedMaster(void) {
@@ -2142,6 +2369,11 @@ void waitCommand(client *c) {
long numreplicas, ackreplicas;
long long offset = c->woff;
+ if (server.masterhost) {
+ addReplyError(c,"WAIT cannot be used with slave instances. Please also note that since Redis 4.0 if a slave is configured to be writable (which is not the default) writes to slaves are just local and are not propagated.");
+ return;
+ }
+
/* Argument parsing. */
if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != C_OK)
return;
@@ -2290,7 +2522,9 @@ void replicationCron(void) {
robj *ping_argv[1];
/* First, send PING according to ping_slave_period. */
- if ((replication_cron_loops % server.repl_ping_slave_period) == 0) {
+ if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
+ listLength(server.slaves))
+ {
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(server.slaves, server.slaveseldb,
ping_argv, 1);
@@ -2299,20 +2533,30 @@ void replicationCron(void) {
/* Second, send a newline to all the slaves in pre-synchronization
* stage, that is, slaves waiting for the master to create the RDB file.
+ *
+ * Also send the a newline to all the chained slaves we have, if we lost
+ * connection from our master, to keep the slaves aware that their
+ * master is online. This is needed since sub-slaves only receive proxied
+ * data from top-level masters, so there is no explicit pinging in order
+ * to avoid altering the replication offsets. This special out of band
+ * pings (newlines) can be sent, they will have no effect in the offset.
+ *
* The newline will be ignored by the slave but will refresh the
- * last-io timer preventing a timeout. In this case we ignore the
+ * last interaction timer preventing a timeout. In this case we ignore the
* ping period and refresh the connection once per second since certain
* timeouts are set at a few seconds (example: PSYNC response). */
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
- if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
+ int is_presync =
+ (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
- server.rdb_child_type != RDB_CHILD_TYPE_SOCKET))
- {
+ server.rdb_child_type != RDB_CHILD_TYPE_SOCKET));
+
+ if (is_presync) {
if (write(slave->fd, "\n", 1) == -1) {
- /* Don't worry, it's just a ping. */
+ /* Don't worry about socket errors, it's just a ping. */
}
}
}
@@ -2337,10 +2581,14 @@ void replicationCron(void) {
}
}
- /* If we have no attached slaves and there is a replication backlog
- * using memory, free it after some (configured) time. */
+ /* If this is a master without attached slaves and there is a replication
+ * backlog active, in order to reclaim memory we can free it after some
+ * (configured) time. Note that this cannot be done for slaves: slaves
+ * without sub-slaves attached should still accumulate data into the
+ * backlog, in order to reply to PSYNC queries if they are turned into
+ * masters after a failover. */
if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
- server.repl_backlog)
+ server.repl_backlog && server.masterhost == NULL)
{
time_t idle = server.unixtime - server.repl_no_slaves_since;
diff --git a/src/sentinel.c b/src/sentinel.c
index 0168aa637..1f47dd337 100644
--- a/src/sentinel.c
+++ b/src/sentinel.c
@@ -3640,15 +3640,15 @@ struct sentinelLeader {
/* Helper function for sentinelGetLeader, increment the counter
* relative to the specified runid. */
int sentinelLeaderIncr(dict *counters, char *runid) {
- dictEntry *de = dictFind(counters,runid);
+ dictEntry *existing, *de;
uint64_t oldval;
- if (de) {
- oldval = dictGetUnsignedIntegerVal(de);
- dictSetUnsignedIntegerVal(de,oldval+1);
+ de = dictAddRaw(counters,runid,&existing);
+ if (existing) {
+ oldval = dictGetUnsignedIntegerVal(existing);
+ dictSetUnsignedIntegerVal(existing,oldval+1);
return oldval+1;
} else {
- de = dictAddRaw(counters,runid);
serverAssert(de != NULL);
dictSetUnsignedIntegerVal(de,1);
return 1;
@@ -3674,7 +3674,7 @@ char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
serverAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
counters = dictCreate(&leaderVotesDictType,NULL);
- voters = dictSize(master->sentinels)+1; /* All the other sentinels and me. */
+ voters = dictSize(master->sentinels)+1; /* All the other sentinels and me.*/
/* Count other sentinels votes */
di = dictGetIterator(master->sentinels);
diff --git a/src/server.c b/src/server.c
index e794ad132..f6868832d 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * Copyright (c) 2009-2016, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -219,6 +219,7 @@ struct redisCommand redisCommandTable[] = {
{"msetnx",msetnxCommand,-3,"wm",0,NULL,1,-1,2,0,0},
{"randomkey",randomkeyCommand,1,"rR",0,NULL,0,0,0,0,0},
{"select",selectCommand,2,"lF",0,NULL,0,0,0,0,0},
+ {"swapdb",swapdbCommand,3,"wF",0,NULL,0,0,0,0,0},
{"move",moveCommand,3,"wF",0,NULL,1,1,1,0,0},
{"rename",renameCommand,3,"w",0,NULL,1,2,1,0,0},
{"renamenx",renamenxCommand,3,"wF",0,NULL,1,2,1,0,0},
@@ -274,6 +275,7 @@ struct redisCommand redisCommandTable[] = {
{"readwrite",readwriteCommand,1,"F",0,NULL,0,0,0,0,0},
{"dump",dumpCommand,2,"r",0,NULL,1,1,1,0,0},
{"object",objectCommand,3,"r",0,NULL,2,2,2,0,0},
+ {"memory",memoryCommand,-2,"r",0,NULL,0,0,0,0,0},
{"client",clientCommand,-2,"as",0,NULL,0,0,0,0,0},
{"eval",evalCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
{"evalsha",evalShaCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
@@ -868,8 +870,11 @@ void clientsCron(void) {
void databasesCron(void) {
/* Expire keys by random sampling. Not required for slaves
* as master will synthesize DELs for us. */
- if (server.active_expire_enabled && server.masterhost == NULL)
+ if (server.active_expire_enabled && server.masterhost == NULL) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
+ } else if (server.masterhost != NULL) {
+ expireSlaveKeys();
+ }
/* Perform hash tables rehashing if needed, but only if there are no
* other processes saving the DB on disk. Otherwise rehashing is bad
@@ -1045,8 +1050,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
(int) server.aof_child_pid);
} else if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
+ if (!bysignal && exitcode == 0) receiveChildInfo();
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
+ if (!bysignal && exitcode == 0) receiveChildInfo();
} else {
if (!ldbRemoveChild(pid)) {
serverLog(LL_WARNING,
@@ -1055,6 +1062,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
}
}
updateDictResizePolicy();
+ closeChildInfoPipe();
}
} else {
/* If there is not a background saving/rewrite in progress check if
@@ -1074,7 +1082,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
{
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
- rdbSaveBackground(server.rdb_filename);
+ rdbSaveBackground(server.rdb_filename,NULL);
break;
}
}
@@ -1146,7 +1154,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
(server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
- if (rdbSaveBackground(server.rdb_filename) == C_OK)
+ if (rdbSaveBackground(server.rdb_filename,NULL) == C_OK)
server.rdb_bgsave_scheduled = 0;
}
@@ -1191,6 +1199,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
if (listLength(server.clients_waiting_acks))
processClientsWaitingReplicas();
+ /* Check if there are clients unblocked by modules that implement
+ * blocking commands. */
+ moduleHandleBlockedClients();
+
/* Try to process pending commands for clients that were just unblocked. */
if (listLength(server.unblocked_clients))
processUnblockedClients();
@@ -1300,10 +1312,12 @@ void initServerConfig(void) {
int j;
getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);
+ server.runid[CONFIG_RUN_ID_SIZE] = '\0';
+ changeReplicationId();
+ clearReplicationId2();
server.configfile = NULL;
server.executable = NULL;
server.hz = CONFIG_DEFAULT_HZ;
- server.runid[CONFIG_RUN_ID_SIZE] = '\0';
server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
server.port = CONFIG_DEFAULT_SERVER_PORT;
server.tcp_backlog = CONFIG_DEFAULT_TCP_BACKLOG;
@@ -1386,6 +1400,7 @@ void initServerConfig(void) {
server.lazyfree_lazy_eviction = CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION;
server.lazyfree_lazy_expire = CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE;
server.lazyfree_lazy_server_del = CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL;
+ server.always_show_logo = CONFIG_DEFAULT_ALWAYS_SHOW_LOGO;
server.lruclock = getLRUClock();
resetServerSaveParams();
@@ -1400,7 +1415,7 @@ void initServerConfig(void) {
server.masterport = 6379;
server.master = NULL;
server.cached_master = NULL;
- server.repl_master_initial_offset = -1;
+ server.master_initial_offset = -1;
server.repl_state = REPL_STATE_NONE;
server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA;
@@ -1561,9 +1576,12 @@ void adjustOpenFilesLimit(void) {
if (bestlimit < oldlimit) bestlimit = oldlimit;
if (bestlimit < maxfiles) {
- int old_maxclients = server.maxclients;
+ unsigned int old_maxclients = server.maxclients;
server.maxclients = bestlimit-CONFIG_MIN_RESERVED_FDS;
- if (server.maxclients < 1) {
+ /* maxclients is unsigned so may overflow: in order
+ * to check if maxclients is now logically less than 1
+ * we test indirectly via bestlimit. */
+ if (bestlimit <= CONFIG_MIN_RESERVED_FDS) {
serverLog(LL_WARNING,"Your current 'ulimit -n' "
"of %llu is not enough for the server to start. "
"Please increase your open file limit to at least "
@@ -1793,6 +1811,9 @@ void initServer(void) {
server.aof_child_pid = -1;
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
server.rdb_bgsave_scheduled = 0;
+ server.child_info_pipe[0] = -1;
+ server.child_info_pipe[1] = -1;
+ server.child_info_data.magic = 0;
aofRewriteBufferReset();
server.aof_buf = sdsempty();
server.lastsave = time(NULL); /* At startup we consider the DB saved. */
@@ -1804,6 +1825,8 @@ void initServer(void) {
/* A few stats we don't want to reset: server startup time, and peak mem. */
server.stat_starttime = time(NULL);
server.stat_peak_memory = 0;
+ server.stat_rdb_cow_bytes = 0;
+ server.stat_aof_cow_bytes = 0;
server.resident_set_size = 0;
server.lastbgsave_status = C_OK;
server.aof_last_write_status = C_OK;
@@ -1859,6 +1882,7 @@ void initServer(void) {
slowlogInit();
latencyMonitorInit();
bioInit();
+ server.initial_memory_usage = zmalloc_used_memory();
}
/* Populates the Redis Command Table starting from the hard coded list
@@ -2456,7 +2480,7 @@ int prepareForShutdown(int flags) {
if ((server.saveparamslen > 0 && !nosave) || save) {
serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting.");
/* Snapshotting. Perform a SYNC SAVE and exit */
- if (rdbSave(server.rdb_filename) != C_OK) {
+ if (rdbSave(server.rdb_filename,NULL) != C_OK) {
/* Ooops.. error saving! The best we can do is to continue
* operating. Note that if there was a background saving process,
* in the next cron() Redis will be notified that the background
@@ -2810,6 +2834,7 @@ sds genRedisInfoString(char *section) {
size_t total_system_mem = server.system_memory_size;
const char *evict_policy = evictPolicyToString();
long long memory_lua = (long long)lua_gc(server.lua,LUA_GCCOUNT,0)*1024;
+ struct redisMemOverhead *mh = getMemoryOverheadData();
/* Peak memory is updated from time to time by serverCron() so it
* may happen that the instantaneous value is slightly bigger than
@@ -2834,6 +2859,11 @@ sds genRedisInfoString(char *section) {
"used_memory_rss_human:%s\r\n"
"used_memory_peak:%zu\r\n"
"used_memory_peak_human:%s\r\n"
+ "used_memory_peak_perc:%.2f%%\r\n"
+ "used_memory_overhead:%zu\r\n"
+ "used_memory_startup:%zu\r\n"
+ "used_memory_dataset:%zu\r\n"
+ "used_memory_dataset_perc:%.2f%%\r\n"
"total_system_memory:%lu\r\n"
"total_system_memory_human:%s\r\n"
"used_memory_lua:%lld\r\n"
@@ -2850,6 +2880,11 @@ sds genRedisInfoString(char *section) {
used_memory_rss_hmem,
server.stat_peak_memory,
peak_hmem,
+ mh->peak_perc,
+ mh->overhead_total,
+ mh->startup_allocated,
+ mh->dataset,
+ mh->dataset_perc,
(unsigned long)total_system_mem,
total_system_hmem,
memory_lua,
@@ -2857,10 +2892,11 @@ sds genRedisInfoString(char *section) {
server.maxmemory,
maxmemory_hmem,
evict_policy,
- zmalloc_get_fragmentation_ratio(server.resident_set_size),
+ mh->fragmentation,
ZMALLOC_LIB,
lazyfreeGetPendingObjectsCount()
- );
+ );
+ freeMemoryOverheadData(mh);
}
/* Persistence */
@@ -2875,13 +2911,15 @@ sds genRedisInfoString(char *section) {
"rdb_last_bgsave_status:%s\r\n"
"rdb_last_bgsave_time_sec:%jd\r\n"
"rdb_current_bgsave_time_sec:%jd\r\n"
+ "rdb_last_cow_size:%zu\r\n"
"aof_enabled:%d\r\n"
"aof_rewrite_in_progress:%d\r\n"
"aof_rewrite_scheduled:%d\r\n"
"aof_last_rewrite_time_sec:%jd\r\n"
"aof_current_rewrite_time_sec:%jd\r\n"
"aof_last_bgrewrite_status:%s\r\n"
- "aof_last_write_status:%s\r\n",
+ "aof_last_write_status:%s\r\n"
+ "aof_last_cow_size:%zu\r\n",
server.loading,
server.dirty,
server.rdb_child_pid != -1,
@@ -2890,6 +2928,7 @@ sds genRedisInfoString(char *section) {
(intmax_t)server.rdb_save_time_last,
(intmax_t)((server.rdb_child_pid == -1) ?
-1 : time(NULL)-server.rdb_save_time_start),
+ server.stat_rdb_cow_bytes,
server.aof_state != AOF_OFF,
server.aof_child_pid != -1,
server.aof_rewrite_scheduled,
@@ -2897,7 +2936,8 @@ sds genRedisInfoString(char *section) {
(intmax_t)((server.aof_child_pid == -1) ?
-1 : time(NULL)-server.aof_rewrite_time_start),
(server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err",
- (server.aof_last_write_status == C_OK) ? "ok" : "err");
+ (server.aof_last_write_status == C_OK) ? "ok" : "err",
+ server.stat_aof_cow_bytes);
if (server.aof_state != AOF_OFF) {
info = sdscatprintf(info,
@@ -2972,7 +3012,8 @@ sds genRedisInfoString(char *section) {
"pubsub_channels:%ld\r\n"
"pubsub_patterns:%lu\r\n"
"latest_fork_usec:%lld\r\n"
- "migrate_cached_sockets:%ld\r\n",
+ "migrate_cached_sockets:%ld\r\n"
+ "slave_expires_tracked_keys:%zu\r\n",
server.stat_numconnections,
server.stat_numcommands,
getInstantaneousMetric(STATS_METRIC_COMMAND),
@@ -2991,7 +3032,8 @@ sds genRedisInfoString(char *section) {
dictSize(server.pubsub_channels),
listLength(server.pubsub_patterns),
server.stat_fork_time,
- dictSize(server.migrate_cached_sockets));
+ dictSize(server.migrate_cached_sockets),
+ getSlaveKeyWithExpireCount());
}
/* Replication */
@@ -3104,12 +3146,18 @@ sds genRedisInfoString(char *section) {
}
}
info = sdscatprintf(info,
+ "master_replid:%s\r\n"
+ "master_replid2:%s\r\n"
"master_repl_offset:%lld\r\n"
+ "second_repl_offset:%lld\r\n"
"repl_backlog_active:%d\r\n"
"repl_backlog_size:%lld\r\n"
"repl_backlog_first_byte_offset:%lld\r\n"
"repl_backlog_histlen:%lld\r\n",
+ server.replid,
+ server.replid2,
server.master_repl_offset,
+ server.second_replid_offset,
server.repl_backlog != NULL,
server.repl_backlog_size,
server.repl_backlog_off,
@@ -3288,15 +3336,18 @@ void redisAsciiArt(void) {
else if (server.sentinel_mode) mode = "sentinel";
else mode = "standalone";
- if (server.syslog_enabled) {
+ /* Show the ASCII logo if: log file is stdout AND stdout is a
+ * tty AND syslog logging is disabled. Also show logo if the user
+ * forced us to do so via redis.conf. */
+ int show_logo = ((!server.syslog_enabled &&
+ server.logfile[0] == '\0' &&
+ isatty(fileno(stdout))) ||
+ server.always_show_logo);
+
+ if (!show_logo) {
serverLog(LL_NOTICE,
- "Redis %s (%s/%d) %s bit, %s mode, port %d, pid %ld ready to start.",
- REDIS_VERSION,
- redisGitSHA1(),
- strtol(redisGitDirty(),NULL,10) > 0,
- (sizeof(long) == 8) ? "64" : "32",
- mode, server.port,
- (long) getpid()
+ "Running mode=%s, port=%d.",
+ mode, server.port
);
} else {
snprintf(buf,1024*16,ascii_logo,
@@ -3385,9 +3436,20 @@ void loadDataFromDisk(void) {
if (loadAppendOnlyFile(server.aof_filename) == C_OK)
serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
} else {
- if (rdbLoad(server.rdb_filename) == C_OK) {
+ rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
+ if (rdbLoad(server.rdb_filename,&rsi) == C_OK) {
serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
(float)(ustime()-start)/1000000);
+
+ /* Restore the replication ID / offset from the RDB file. */
+ if (rsi.repl_id_is_set && rsi.repl_offset != -1) {
+ memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
+ server.master_repl_offset = rsi.repl_offset;
+ /* If we are a slave, create a cached master from this
+ * information, in order to allow partial resynchronizations
+ * with masters. */
+ if (server.masterhost) replicationCacheMasterUsingMyself();
+ }
} else if (errno != ENOENT) {
serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
exit(1);
@@ -3642,8 +3704,21 @@ int main(int argc, char **argv) {
resetServerSaveParams();
loadServerConfig(configfile,options);
sdsfree(options);
- } else {
+ }
+
+ serverLog(LL_WARNING, "oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo");
+ serverLog(LL_WARNING,
+ "Redis version=%s, bits=%d, commit=%s, modified=%d, pid=%d, just started",
+ REDIS_VERSION,
+ (sizeof(long) == 8) ? 64 : 32,
+ redisGitSHA1(),
+ strtol(redisGitDirty(),NULL,10) > 0,
+ (int)getpid());
+
+ if (argc == 1) {
serverLog(LL_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
+ } else {
+ serverLog(LL_WARNING, "Configuration loaded");
}
server.supervised = redisIsSupervised(server.supervised_mode);
@@ -3658,7 +3733,7 @@ int main(int argc, char **argv) {
if (!server.sentinel_mode) {
/* Things not needed when running in Sentinel mode. */
- serverLog(LL_WARNING,"Server started, Redis version " REDIS_VERSION);
+ serverLog(LL_WARNING,"Server initialized");
#ifdef __linux__
linuxMemoryWarnings();
#endif
@@ -3673,7 +3748,7 @@ int main(int argc, char **argv) {
}
}
if (server.ipfd_count > 0)
- serverLog(LL_NOTICE,"The server is now ready to accept connections on port %d", server.port);
+ serverLog(LL_NOTICE,"Ready to accept connections");
if (server.sofd > 0)
serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
} else {
diff --git a/src/server.h b/src/server.h
index a5f0ee1a6..140897c18 100644
--- a/src/server.h
+++ b/src/server.h
@@ -151,6 +151,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION 0
#define CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE 0
#define CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL 0
+#define CONFIG_DEFAULT_ALWAYS_SHOW_LOGO 0
#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */
@@ -245,6 +246,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define BLOCKED_NONE 0 /* Not blocked, no CLIENT_BLOCKED flag set. */
#define BLOCKED_LIST 1 /* BLPOP & co. */
#define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */
+#define BLOCKED_MODULE 3 /* Blocked by a loadable module. */
/* Client request types */
#define PROTO_REQ_INLINE 1
@@ -292,7 +294,8 @@ typedef long long mstime_t; /* millisecond time type. */
/* Slave capabilities. */
#define SLAVE_CAPA_NONE 0
-#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
+#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
+#define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */
/* Synchronous read timeout - slave side */
#define CONFIG_REPL_SYNCIO_TIMEOUT 5
@@ -463,6 +466,7 @@ typedef long long mstime_t; /* millisecond time type. */
struct RedisModule;
struct RedisModuleIO;
struct RedisModuleDigest;
+struct RedisModuleCtx;
struct redisObject;
/* Each module type implementation should export a set of methods in order
@@ -473,6 +477,7 @@ typedef void *(*moduleTypeLoadFunc)(struct RedisModuleIO *io, int encver);
typedef void (*moduleTypeSaveFunc)(struct RedisModuleIO *io, void *value);
typedef void (*moduleTypeRewriteFunc)(struct RedisModuleIO *io, struct redisObject *key, void *value);
typedef void (*moduleTypeDigestFunc)(struct RedisModuleDigest *digest, void *value);
+typedef size_t (*moduleTypeMemUsageFunc)(void *value);
typedef void (*moduleTypeFreeFunc)(void *value);
/* The module type, which is referenced in each value of a given type, defines
@@ -483,6 +488,7 @@ typedef struct RedisModuleType {
moduleTypeLoadFunc rdb_load;
moduleTypeSaveFunc rdb_save;
moduleTypeRewriteFunc aof_rewrite;
+ moduleTypeMemUsageFunc mem_usage;
moduleTypeDigestFunc digest;
moduleTypeFreeFunc free;
char name[10]; /* 9 bytes name + null term. Charset: A-Z a-z 0-9 _- */
@@ -516,6 +522,7 @@ typedef struct RedisModuleIO {
rio *rio; /* Rio stream. */
moduleType *type; /* Module type doing the operation. */
int error; /* True if error condition happened. */
+ struct RedisModuleCtx *ctx; /* Optional context, see RM_GetContextFromIO()*/
} RedisModuleIO;
#define moduleInitIOContext(iovar,mtype,rioptr) do { \
@@ -523,6 +530,7 @@ typedef struct RedisModuleIO {
iovar.type = mtype; \
iovar.bytes = 0; \
iovar.error = 0; \
+ iovar.ctx = NULL; \
} while(0);
/* Objects encoding. Some kind of objects like Strings and Hashes can be
@@ -532,7 +540,7 @@ typedef struct RedisModuleIO {
#define OBJ_ENCODING_INT 1 /* Encoded as integer */
#define OBJ_ENCODING_HT 2 /* Encoded as hash table */
#define OBJ_ENCODING_ZIPMAP 3 /* Encoded as zipmap */
-#define OBJ_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */
+#define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old list encoding. */
#define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define OBJ_ENCODING_INTSET 6 /* Encoded as intset */
#define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
@@ -616,6 +624,11 @@ typedef struct blockingState {
/* BLOCKED_WAIT */
int numreplicas; /* Number of replicas we are waiting for ACK. */
long long reploffset; /* Replication offset to reach. */
+
+ /* BLOCKED_MODULE */
+ void *module_blocked_handle; /* RedisModuleBlockedClient structure.
+ which is opaque for the Redis core, only
+ handled in module.c. */
} blockingState;
/* The following structure represents a node in the server.ready_keys list,
@@ -670,8 +683,8 @@ typedef struct client {
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
copying this slave output buffer
should use. */
- char replrunid[CONFIG_RUN_ID_SIZE+1]; /* Master run id if is a master. */
- int slave_listening_port; /* As configured with: REPLCONF listening-port */
+ char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
+ int slave_listening_port; /* As configured with: SLAVECONF listening-port */
char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
multiState mstate; /* MULTI/EXEC state */
@@ -769,6 +782,51 @@ typedef struct redisOpArray {
int numops;
} redisOpArray;
+/* This structure is returned by the getMemoryOverheadData() function in
+ * order to return memory overhead information. */
+struct redisMemOverhead {
+ size_t peak_allocated;
+ size_t total_allocated;
+ size_t startup_allocated;
+ size_t repl_backlog;
+ size_t clients_slaves;
+ size_t clients_normal;
+ size_t aof_buffer;
+ size_t overhead_total;
+ size_t dataset;
+ size_t total_keys;
+ size_t bytes_per_key;
+ float dataset_perc;
+ float peak_perc;
+ float fragmentation;
+ size_t num_dbs;
+ struct {
+ size_t dbid;
+ size_t overhead_ht_main;
+ size_t overhead_ht_expires;
+ } *db;
+};
+
+/* This structure can be optionally passed to RDB save/load functions in
+ * order to implement additional functionalities, by storing and loading
+ * metadata to the RDB file.
+ *
+ * Currently the only use is to select a DB at load time, useful in
+ * replication in order to make sure that chained slaves (slaves of slaves)
+ * select the correct DB and are able to accept the stream coming from the
+ * top-level master. */
+typedef struct rdbSaveInfo {
+ /* Used saving and loading. */
+ int repl_stream_db; /* DB to select in server.master client. */
+
+ /* Used only loading. */
+ int repl_id_is_set; /* True if repl_id field is set. */
+ char repl_id[CONFIG_RUN_ID_SIZE+1]; /* Replication ID. */
+ long long repl_offset; /* Replication offset. */
+} rdbSaveInfo;
+
+#define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1}
+
/*-----------------------------------------------------------------------------
* Global server state
*----------------------------------------------------------------------------*/
@@ -781,6 +839,10 @@ struct clusterState;
#undef hz
#endif
+#define CHILD_INFO_MAGIC 0xC17DDA7A12345678LL
+#define CHILD_INFO_TYPE_RDB 0
+#define CHILD_INFO_TYPE_AOF 1
+
struct redisServer {
/* General */
pid_t pid; /* Main process pid. */
@@ -801,6 +863,8 @@ struct redisServer {
int cronloops; /* Number of times the cron function run */
char runid[CONFIG_RUN_ID_SIZE+1]; /* ID always different at every exec. */
int sentinel_mode; /* True if this instance is a Sentinel. */
+ size_t initial_memory_usage; /* Bytes used after initialization. */
+ int always_show_logo; /* Show logo even for non-stdout logging. */
/* Modules */
dict *moduleapi; /* Exported APIs dictionary for modules. */
list *loadmodule_queue; /* List of modules to load at startup. */
@@ -858,6 +922,8 @@ struct redisServer {
size_t resident_set_size; /* RSS sampled in serverCron(). */
long long stat_net_input_bytes; /* Bytes read from network. */
long long stat_net_output_bytes; /* Bytes written to network. */
+ size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */
+ size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */
/* The following two are used to track instantaneous metrics, like
* number of operations per second, network traffic. */
struct {
@@ -932,6 +998,13 @@ struct redisServer {
int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */
int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */
int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */
+ /* Pipe and data structures for child -> parent info sharing. */
+ int child_info_pipe[2]; /* Pipe used to write the child_info_data. */
+ struct {
+ int process_type; /* AOF or RDB child? */
+ size_t cow_size; /* Copy on write size. */
+ unsigned long long magic; /* Magic value to make sure data is valid. */
+ } child_info_data;
/* Propagation of commands in AOF / replication */
redisOpArray also_propagate; /* Additional command to propagate. */
/* Logging */
@@ -940,15 +1013,19 @@ struct redisServer {
char *syslog_ident; /* Syslog ident */
int syslog_facility; /* Syslog facility */
/* Replication (master) */
+ char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */
+ char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/
+ long long master_repl_offset; /* My current replication offset */
+ long long second_replid_offset; /* Accept offsets up to this for replid2. */
int slaveseldb; /* Last SELECTed DB in replication output */
- long long master_repl_offset; /* Global replication offset */
int repl_ping_slave_period; /* Master pings the slave every N seconds */
char *repl_backlog; /* Replication backlog for partial syncs */
long long repl_backlog_size; /* Backlog circular buffer size */
long long repl_backlog_histlen; /* Backlog actual data length */
- long long repl_backlog_idx; /* Backlog circular buffer current offset */
- long long repl_backlog_off; /* Replication offset of first byte in the
- backlog buffer. */
+ long long repl_backlog_idx; /* Backlog circular buffer current offset,
+ that is the next byte will'll write to.*/
+ long long repl_backlog_off; /* Replication "master offset" of first
+ byte in the replication backlog buffer.*/
time_t repl_backlog_time_limit; /* Time without slaves after the backlog
gets released. */
time_t repl_no_slaves_since; /* We have no slaves since that time.
@@ -981,8 +1058,11 @@ struct redisServer {
int slave_priority; /* Reported in INFO and used by Sentinel. */
int slave_announce_port; /* Give the master this listening port. */
char *slave_announce_ip; /* Give the master this ip address. */
- char repl_master_runid[CONFIG_RUN_ID_SIZE+1]; /* Master run id for PSYNC.*/
- long long repl_master_initial_offset; /* Master PSYNC offset. */
+ /* The following two fields is where we store master PSYNC replid/offset
+ * while the PSYNC is in progress. At the end we'll copy the fields into
+ * the server->master client structure. */
+ char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */
+ long long master_initial_offset; /* Master PSYNC offset. */
int repl_slave_lazy_flush; /* Lazy FLUSHALL before loading DB? */
/* Replication script cache. */
dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */
@@ -1183,6 +1263,10 @@ void moduleLoadFromQueue(void);
int *moduleGetCommandKeysViaAPI(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
moduleType *moduleTypeLookupModuleByID(uint64_t id);
void moduleTypeNameByID(char *name, uint64_t moduleid);
+void moduleFreeContext(struct RedisModuleCtx *ctx);
+void unblockClientFromModule(client *c);
+void moduleHandleBlockedClients(void);
+void moduleBlockedClientTimedOut(client *c);
/* Utils */
long long ustime(void);
@@ -1207,6 +1291,7 @@ void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask);
+void addReplyString(client *c, const char *s, size_t len);
void addReplyBulk(client *c, robj *obj);
void addReplyBulkCString(client *c, const char *s);
void addReplyBulkCBuffer(client *c, const void *p, size_t len);
@@ -1221,6 +1306,8 @@ void addReplyHumanLongDouble(client *c, long double d);
void addReplyLongLong(client *c, long long ll);
void addReplyMultiBulkLen(client *c, long length);
void copyClientOutputBuffer(client *dst, client *src);
+size_t sdsZmallocSize(sds s);
+size_t getStringObjectSdsUsedMemory(robj *o);
void *dupClientReplyValue(void *o);
void getClientsMaxBuffers(unsigned long *longest_output_list,
unsigned long *biggest_input_buffer);
@@ -1339,6 +1426,7 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
/* Replication */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
+void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen);
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc);
void updateSlavesWaitingBgsave(int bgsaveerr, int type);
void replicationCron(void);
@@ -1360,6 +1448,10 @@ long long replicationGetSlaveOffset(void);
char *replicationGetSlaveName(client *c);
long long getPsyncInitialOffset(void);
int replicationSetupSlaveForFullResync(client *slave, long long offset);
+void changeReplicationId(void);
+void clearReplicationId2(void);
+void chopReplicationBacklog(void);
+void replicationCacheMasterUsingMyself(void);
/* Generic persistence functions */
void startLoading(FILE *fp);
@@ -1368,7 +1460,7 @@ void stopLoading(void);
/* RDB persistence */
#include "rdb.h"
-int rdbSaveRio(rio *rdb, int *error, int flags);
+int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi);
/* AOF persistence */
void flushAppendOnlyFile(int force);
@@ -1383,6 +1475,12 @@ void aofRewriteBufferReset(void);
unsigned long aofRewriteBufferSize(void);
ssize_t aofReadDiffFromParent(void);
+/* Child info */
+void openChildInfoPipe(void);
+void closeChildInfoPipe(void);
+void sendChildInfo(int process_type);
+void receiveChildInfo(void);
+
/* Sorted sets data type */
/* Input flags. */
@@ -1480,6 +1578,8 @@ void updateCachedTime(void);
void resetServerStats(void);
unsigned int getLRUClock(void);
const char *evictPolicyToString(void);
+struct redisMemOverhead *getMemoryOverheadData(void);
+void freeMemoryOverheadData(struct redisMemOverhead *mh);
#define RESTART_SERVER_NONE 0
#define RESTART_SERVER_GRACEFULLY (1<<0) /* Do proper shutdown. */
@@ -1550,13 +1650,15 @@ int removeExpire(redisDb *db, robj *key);
void propagateExpire(redisDb *db, robj *key, int lazy);
int expireIfNeeded(redisDb *db, robj *key);
long long getExpire(redisDb *db, robj *key);
-void setExpire(redisDb *db, robj *key, long long when);
+void setExpire(client *c, redisDb *db, robj *key, long long when);
robj *lookupKey(redisDb *db, robj *key, int flags);
robj *lookupKeyRead(redisDb *db, robj *key);
robj *lookupKeyWrite(redisDb *db, robj *key);
robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply);
robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply);
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags);
+robj *objectCommandLookup(client *c, robj *key);
+robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply);
#define LOOKUP_NONE 0
#define LOOKUP_NOTOUCH (1<<0)
void dbAdd(redisDb *db, robj *key, robj *val);
@@ -1633,6 +1735,10 @@ void disconnectAllBlockedClients(void);
/* expire.c -- Handling of expired keys */
void activeExpireCycle(int type);
+void expireSlaveKeys(void);
+void rememberSlaveKeyWithExpire(redisDb *db, robj *key);
+void flushSlaveKeysWithExpireList(void);
+size_t getSlaveKeyWithExpireCount(void);
/* evict.c -- maxmemory handling and LRU eviction. */
void evictionPoolAlloc(void);
@@ -1640,6 +1746,11 @@ void evictionPoolAlloc(void);
unsigned long LFUGetTimeInMinutes(void);
uint8_t LFULogIncr(uint8_t value);
+/* Keys hashing / comparison functions for dict.c hash tables. */
+unsigned int dictSdsHash(const void *key);
+int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
+void dictSdsDestructor(void *privdata, void *val);
+
/* Git SHA1 */
char *redisGitSHA1(void);
char *redisGitDirty(void);
@@ -1669,6 +1780,7 @@ void incrbyCommand(client *c);
void decrbyCommand(client *c);
void incrbyfloatCommand(client *c);
void selectCommand(client *c);
+void swapdbCommand(client *c);
void randomkeyCommand(client *c);
void keysCommand(client *c);
void scanCommand(client *c);
@@ -1792,6 +1904,7 @@ void readonlyCommand(client *c);
void readwriteCommand(client *c);
void dumpCommand(client *c);
void objectCommand(client *c);
+void memoryCommand(client *c);
void clientCommand(client *c);
void evalCommand(client *c);
void evalShaCommand(client *c);
diff --git a/src/t_set.c b/src/t_set.c
index ddd82b8b0..d5a801e11 100644
--- a/src/t_set.c
+++ b/src/t_set.c
@@ -53,7 +53,7 @@ int setTypeAdd(robj *subject, sds value) {
long long llval;
if (subject->encoding == OBJ_ENCODING_HT) {
dict *ht = subject->ptr;
- dictEntry *de = dictAddRaw(ht,value);
+ dictEntry *de = dictAddRaw(ht,value,NULL);
if (de) {
dictSetKey(ht,de,sdsdup(value));
dictSetVal(ht,de,NULL);
diff --git a/src/t_string.c b/src/t_string.c
index 8c737c4e3..75375f446 100644
--- a/src/t_string.c
+++ b/src/t_string.c
@@ -85,7 +85,7 @@ void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire,
}
setKey(c->db,key,val);
server.dirty++;
- if (expire) setExpire(c->db,key,mstime()+milliseconds);
+ if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
"expire",key,c->db->id);
diff --git a/src/t_zset.c b/src/t_zset.c
index c61ba8089..8d905be02 100644
--- a/src/t_zset.c
+++ b/src/t_zset.c
@@ -1387,7 +1387,7 @@ int zsetDel(robj *zobj, sds ele) {
dictEntry *de;
double score;
- de = dictFind(zs->dict,ele);
+ de = dictUnlink(zs->dict,ele);
if (de != NULL) {
/* Get the score in order to delete from the skiplist later. */
score = *(double*)dictGetVal(de);
@@ -1397,12 +1397,11 @@ int zsetDel(robj *zobj, sds ele) {
* actually releases the SDS string representing the element,
* which is shared between the skiplist and the hash table, so
* we need to delete from the skiplist as the final step. */
- int retval1 = dictDelete(zs->dict,ele);
+ dictFreeUnlinkedEntry(zs->dict,de);
/* Delete from skiplist. */
- int retval2 = zslDelete(zs->zsl,score,ele,NULL);
-
- serverAssert(retval1 == DICT_OK && retval2);
+ int retval = zslDelete(zs->zsl,score,ele,NULL);
+ serverAssert(retval);
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
return 1;
@@ -2262,7 +2261,7 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
} else if (op == SET_OP_UNION) {
dict *accumulator = dictCreate(&setAccumulatorDictType,NULL);
dictIterator *di;
- dictEntry *de;
+ dictEntry *de, *existing;
double score;
if (setnum) {
@@ -2283,16 +2282,16 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
if (isnan(score)) score = 0;
/* Search for this element in the accumulating dictionary. */
- de = dictFind(accumulator,zuiSdsFromValue(&zval));
+ de = dictAddRaw(accumulator,zuiSdsFromValue(&zval),&existing);
/* If we don't have it, we need to create a new entry. */
- if (de == NULL) {
+ if (!existing) {
tmp = zuiNewSdsFromValue(&zval);
/* Remember the longest single element encountered,
* to understand if it's possible to convert to ziplist
* at the end. */
if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp);
- /* Add the element with its initial score. */
- de = dictAddRaw(accumulator,tmp);
+ /* Update the element with its initial score. */
+ dictSetKey(accumulator, de, tmp);
dictSetDoubleVal(de,score);
} else {
/* Update the score with the score of the new instance
@@ -2301,7 +2300,7 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
* Here we access directly the dictEntry double
* value inside the union as it is a big speedup
* compared to using the getDouble/setDouble API. */
- zunionInterAggregate(&de->v.d,score,aggregate);
+ zunionInterAggregate(&existing->v.d,score,aggregate);
}
}
zuiClearIterator(&src[i]);
diff --git a/src/ziplist.c b/src/ziplist.c
index 7428d30e9..a0939f640 100644
--- a/src/ziplist.c
+++ b/src/ziplist.c
@@ -8,24 +8,31 @@
*
* ----------------------------------------------------------------------------
*
- * ZIPLIST OVERALL LAYOUT:
+ * ZIPLIST OVERALL LAYOUT
+ *
* The general layout of the ziplist is as follows:
- * <zlbytes><zltail><zllen><entry><entry><zlend>
*
- * <zlbytes> is an unsigned integer to hold the number of bytes that the
- * ziplist occupies. This value needs to be stored to be able to resize the
+ * <zlbytes> <zltail> <zllen> <entry> <entry> ... <entry> <zlend>
+ *
+ * All fields are stored in little endian.
+ *
+ * <uint32_t zlbytes> is an unsigned integer to hold the number of bytes that
+ * the ziplist occupies. This value needs to be stored to be able to resize the
* entire structure without the need to traverse it first.
*
- * <zltail> is the offset to the last entry in the list. This allows a pop
- * operation on the far side of the list without the need for full traversal.
+ * <uint32_t zltail> is the offset to the last entry in the list. This allows
+ * a pop operation on the far side of the list without the need for full
+ * traversal.
+ *
+ * <uint16_t zllen> is the number of entries. When this value is larger
+ * than 2^16-2, we need to traverse the entire list to know how many items it
+ * holds.
*
- * <zllen> is the number of entries.When this value is larger than 2**16-2,
- * we need to traverse the entire list to know how many items it holds.
+ * <uint8_t zlend> is a single byte special value, equal to 255, which
+ * indicates the end of the list.
*
- * <zlend> is a single byte special value, equal to 255, which indicates the
- * end of the list.
+ * ZIPLIST ENTRIES
*
- * ZIPLIST ENTRIES:
* Every entry in the ziplist is prefixed by a header that contains two pieces
* of information. First, the length of the previous entry is stored to be
* able to traverse the list from back to front. Second, the encoding with an
@@ -242,7 +249,7 @@ static unsigned int zipEncodeLength(unsigned char *p, unsigned char encoding, un
} else if ((encoding) == ZIP_STR_14B) { \
(lensize) = 2; \
(len) = (((ptr)[0] & 0x3f) << 8) | (ptr)[1]; \
- } else if (encoding == ZIP_STR_32B) { \
+ } else if ((encoding) == ZIP_STR_32B) { \
(lensize) = 5; \
(len) = ((ptr)[1] << 24) | \
((ptr)[2] << 16) | \
@@ -1029,7 +1036,7 @@ void ziplistRepr(unsigned char *zl) {
printf(
"{total bytes %d} "
- "{length %u}\n"
+ "{num entries %u}\n"
"{tail offset %u}\n",
intrev32ifbe(ZIPLIST_BYTES(zl)),
intrev16ifbe(ZIPLIST_LENGTH(zl)),
@@ -1038,16 +1045,15 @@ void ziplistRepr(unsigned char *zl) {
while(*p != ZIP_END) {
zipEntry(p, &entry);
printf(
- "{"
- "addr 0x%08lx, "
- "index %2d, "
- "offset %5ld, "
- "rl: %5u, "
- "hs %2u, "
- "pl: %5u, "
- "pls: %2u, "
- "payload %5u"
- "} ",
+ "{\n"
+ "\taddr 0x%08lx,\n"
+ "\tindex %2d,\n"
+ "\toffset %5ld,\n"
+ "\thdr+entry len: %5u,\n"
+ "\thdr len%2u,\n"
+ "\tprevrawlen: %5u,\n"
+ "\tprevrawlensize: %2u,\n"
+ "\tpayload %5u\n",
(long unsigned)p,
index,
(unsigned long) (p-zl),
@@ -1056,8 +1062,14 @@ void ziplistRepr(unsigned char *zl) {
entry.prevrawlen,
entry.prevrawlensize,
entry.len);
+ printf("\tbytes: ");
+ for (unsigned int i = 0; i < entry.headersize+entry.len; i++) {
+ printf("%02x|",p[i]);
+ }
+ printf("\n");
p += entry.headersize;
if (ZIP_IS_STR(entry.encoding)) {
+ printf("\t[str]");
if (entry.len > 40) {
if (fwrite(p,40,1,stdout) == 0) perror("fwrite");
printf("...");
@@ -1066,9 +1078,9 @@ void ziplistRepr(unsigned char *zl) {
fwrite(p,entry.len,1,stdout) == 0) perror("fwrite");
}
} else {
- printf("%lld", (long long) zipLoadInteger(p,entry.encoding));
+ printf("\t[int]%lld", (long long) zipLoadInteger(p,entry.encoding));
}
- printf("\n");
+ printf("\n}\n");
p += entry.len;
index++;
}
diff --git a/src/ziplist.h b/src/ziplist.h
index ae96823f9..964a47f6d 100644
--- a/src/ziplist.h
+++ b/src/ziplist.h
@@ -48,6 +48,7 @@ unsigned int ziplistCompare(unsigned char *p, unsigned char *s, unsigned int sle
unsigned char *ziplistFind(unsigned char *p, unsigned char *vstr, unsigned int vlen, unsigned int skip);
unsigned int ziplistLen(unsigned char *zl);
size_t ziplistBlobLen(unsigned char *zl);
+void ziplistRepr(unsigned char *zl);
#ifdef REDIS_TEST
int ziplistTest(int argc, char *argv[]);
diff --git a/src/zmalloc.c b/src/zmalloc.c
index ab4af99e2..22bf84fce 100644
--- a/src/zmalloc.c
+++ b/src/zmalloc.c
@@ -304,14 +304,26 @@ float zmalloc_get_fragmentation_ratio(size_t rss) {
* /proc/self/smaps. The field must be specified with trailing ":" as it
* apperas in the smaps output.
*
- * Example: zmalloc_get_smap_bytes_by_field("Rss:");
+ * If a pid is specified, the information is extracted for such a pid,
+ * otherwise if pid is -1 the information is reported is about the
+ * current process.
+ *
+ * Example: zmalloc_get_smap_bytes_by_field("Rss:",-1);
*/
#if defined(HAVE_PROC_SMAPS)
-size_t zmalloc_get_smap_bytes_by_field(char *field) {
+size_t zmalloc_get_smap_bytes_by_field(char *field, long pid) {
char line[1024];
size_t bytes = 0;
- FILE *fp = fopen("/proc/self/smaps","r");
int flen = strlen(field);
+ FILE *fp;
+
+ if (pid == -1) {
+ fp = fopen("/proc/self/smaps","r");
+ } else {
+ char filename[128];
+ snprintf(filename,sizeof(filename),"/proc/%ld/smaps",pid);
+ fp = fopen(filename,"r");
+ }
if (!fp) return 0;
while(fgets(line,sizeof(line),fp) != NULL) {
@@ -327,14 +339,15 @@ size_t zmalloc_get_smap_bytes_by_field(char *field) {
return bytes;
}
#else
-size_t zmalloc_get_smap_bytes_by_field(char *field) {
+size_t zmalloc_get_smap_bytes_by_field(char *field, long pid) {
((void) field);
+ ((void) pid);
return 0;
}
#endif
-size_t zmalloc_get_private_dirty(void) {
- return zmalloc_get_smap_bytes_by_field("Private_Dirty:");
+size_t zmalloc_get_private_dirty(long pid) {
+ return zmalloc_get_smap_bytes_by_field("Private_Dirty:",pid);
}
/* Returns the size of physical memory (RAM) in bytes.
diff --git a/src/zmalloc.h b/src/zmalloc.h
index a47ea6ccf..9badf8f4c 100644
--- a/src/zmalloc.h
+++ b/src/zmalloc.h
@@ -75,8 +75,8 @@ void zmalloc_enable_thread_safeness(void);
void zmalloc_set_oom_handler(void (*oom_handler)(size_t));
float zmalloc_get_fragmentation_ratio(size_t rss);
size_t zmalloc_get_rss(void);
-size_t zmalloc_get_private_dirty(void);
-size_t zmalloc_get_smap_bytes_by_field(char *field);
+size_t zmalloc_get_private_dirty(long pid);
+size_t zmalloc_get_smap_bytes_by_field(char *field, long pid);
size_t zmalloc_get_memory_size(void);
void zlibc_free(void *ptr);
diff --git a/tests/assets/default.conf b/tests/assets/default.conf
index 81f8470bc..d7b8a75c6 100644
--- a/tests/assets/default.conf
+++ b/tests/assets/default.conf
@@ -1,5 +1,6 @@
# Redis configuration for testing.
+always-show-logo yes
notify-keyspace-events KEA
daemonize no
pidfile /var/run/redis.pid
diff --git a/tests/integration/psync2.tcl b/tests/integration/psync2.tcl
new file mode 100644
index 000000000..d91969e3e
--- /dev/null
+++ b/tests/integration/psync2.tcl
@@ -0,0 +1,182 @@
+start_server {tags {"psync2"}} {
+start_server {} {
+start_server {} {
+start_server {} {
+start_server {} {
+ set master_id 0 ; # Current master
+ set start_time [clock seconds] ; # Test start time
+ set counter_value 0 ; # Current value of the Redis counter "x"
+
+ # Config
+ set debug_msg 0 ; # Enable additional debug messages
+
+ set no_exit 0; ; # Do not exit at end of the test
+
+ set duration 20 ; # Total test seconds
+
+ set genload 1 ; # Load master with writes at every cycle
+
+ set genload_time 5000 ; # Writes duration time in ms
+
+ set disconnect 1 ; # Break replication link between random
+ # master and slave instances while the
+ # master is loaded with writes.
+
+ set disconnect_period 1000 ; # Disconnect repl link every N ms.
+
+ for {set j 0} {$j < 5} {incr j} {
+ set R($j) [srv [expr 0-$j] client]
+ set R_host($j) [srv [expr 0-$j] host]
+ set R_port($j) [srv [expr 0-$j] port]
+ if {$debug_msg} {puts "Log file: [srv [expr 0-$j] stdout]"}
+ }
+
+ set cycle 1
+ while {([clock seconds]-$start_time) < $duration} {
+ test "PSYNC2: --- CYCLE $cycle ---" {
+ incr cycle
+ }
+
+ # Create a random replication layout.
+ # Start with switching master (this simulates a failover).
+
+ # 1) Select the new master.
+ set master_id [randomInt 5]
+ set used [list $master_id]
+ test "PSYNC2: \[NEW LAYOUT\] Set #$master_id as master" {
+ $R($master_id) slaveof no one
+ if {$counter_value == 0} {
+ $R($master_id) set x $counter_value
+ }
+ }
+
+ # 2) Attach all the slaves to a random instance
+ while {[llength $used] != 5} {
+ while 1 {
+ set slave_id [randomInt 5]
+ if {[lsearch -exact $used $slave_id] == -1} break
+ }
+ set rand [randomInt [llength $used]]
+ set mid [lindex $used $rand]
+ set master_host $R_host($mid)
+ set master_port $R_port($mid)
+
+ test "PSYNC2: Set #$slave_id to replicate from #$mid" {
+ $R($slave_id) slaveof $master_host $master_port
+ }
+ lappend used $slave_id
+ }
+
+ # 3) Increment the counter and wait for all the instances
+ # to converge.
+ test "PSYNC2: cluster is consistent after failover" {
+ $R($master_id) incr x; incr counter_value
+ for {set j 0} {$j < 5} {incr j} {
+ wait_for_condition 50 1000 {
+ [$R($j) get x] == $counter_value
+ } else {
+ fail "Instance #$j x variable is inconsistent"
+ }
+ }
+ }
+
+ # 4) Generate load while breaking the connection of random
+ # slave-master pairs.
+ test "PSYNC2: generate load while killing replication links" {
+ set t [clock milliseconds]
+ set next_break [expr {$t+$disconnect_period}]
+ while {[clock milliseconds]-$t < $genload_time} {
+ if {$genload} {
+ $R($master_id) incr x; incr counter_value
+ }
+ if {[clock milliseconds] == $next_break} {
+ set next_break \
+ [expr {[clock milliseconds]+$disconnect_period}]
+ set slave_id [randomInt 5]
+ if {$disconnect} {
+ $R($slave_id) client kill type master
+ if {$debug_msg} {
+ puts "+++ Breaking link for slave #$slave_id"
+ }
+ }
+ }
+ }
+ }
+
+ # 5) Increment the counter and wait for all the instances
+ set x [$R($master_id) get x]
+ test "PSYNC2: cluster is consistent after load (x = $x)" {
+ for {set j 0} {$j < 5} {incr j} {
+ wait_for_condition 50 1000 {
+ [$R($j) get x] == $counter_value
+ } else {
+ fail "Instance #$j x variable is inconsistent"
+ }
+ }
+ }
+
+ # Put down the old master so that it cannot generate more
+ # replication stream, this way in the next master switch, the time at
+ # which we move slaves away is not important, each will have full
+ # history (otherwise PINGs will make certain slaves have more history),
+ # and sometimes a full resync will be needed.
+ $R($master_id) slaveof 127.0.0.1 0 ;# We use port zero to make it fail.
+
+ if {$debug_msg} {
+ for {set j 0} {$j < 5} {incr j} {
+ puts "$j: sync_full: [status $R($j) sync_full]"
+ puts "$j: id1 : [status $R($j) master_replid]:[status $R($j) master_repl_offset]"
+ puts "$j: id2 : [status $R($j) master_replid2]:[status $R($j) second_repl_offset]"
+ puts "$j: backlog : firstbyte=[status $R($j) repl_backlog_first_byte_offset] len=[status $R($j) repl_backlog_histlen]"
+ puts "---"
+ }
+ }
+
+ test "PSYNC2: total sum of full synchronizations is exactly 4" {
+ set sum 0
+ for {set j 0} {$j < 5} {incr j} {
+ incr sum [status $R($j) sync_full]
+ }
+ assert {$sum == 4}
+ }
+ }
+
+ test "PSYNC2: Bring the master back again for next test" {
+ $R($master_id) slaveof no one
+ set master_host $R_host($master_id)
+ set master_port $R_port($master_id)
+ for {set j 0} {$j < 5} {incr j} {
+ if {$j == $master_id} continue
+ $R($j) slaveof $master_host $master_port
+ }
+
+ # Wait for slaves to sync
+ wait_for_condition 50 1000 {
+ [status $R($master_id) connected_slaves] == 4
+ } else {
+ fail "Slave not reconnecting"
+ }
+ }
+
+ test "PSYNC2: Partial resync after restart using RDB aux fields" {
+ # Pick a random slave
+ set slave_id [expr {($master_id+1)%5}]
+ set sync_count [status $R($master_id) sync_full]
+ catch {
+ $R($slave_id) config rewrite
+ $R($slave_id) debug restart
+ }
+ wait_for_condition 50 1000 {
+ [status $R($master_id) connected_slaves] == 4
+ } else {
+ fail "Slave not reconnecting"
+ }
+ set new_sync_count [status $R($master_id) sync_full]
+ assert {$sync_count == $new_sync_count}
+ }
+
+ if {$no_exit} {
+ while 1 { puts -nonewline .; flush stdout; after 1000}
+ }
+
+}}}}}
diff --git a/tests/integration/replication-3.tcl b/tests/integration/replication-3.tcl
index 0fcbad45b..50dcb9a9a 100644
--- a/tests/integration/replication-3.tcl
+++ b/tests/integration/replication-3.tcl
@@ -30,6 +30,18 @@ start_server {tags {"repl"}} {
}
assert_equal [r debug digest] [r -1 debug digest]
}
+
+ test {Slave is able to evict keys created in writable slaves} {
+ r -1 select 5
+ assert {[r -1 dbsize] == 0}
+ r -1 config set slave-read-only no
+ r -1 set key1 1 ex 5
+ r -1 set key2 2 ex 5
+ r -1 set key3 3 ex 5
+ assert {[r -1 dbsize] == 3}
+ after 6000
+ r -1 dbsize
+ } {0}
}
}
diff --git a/tests/support/server.tcl b/tests/support/server.tcl
index 19d6c5152..c36b30775 100644
--- a/tests/support/server.tcl
+++ b/tests/support/server.tcl
@@ -278,7 +278,7 @@ proc start_server {options {code undefined}} {
while 1 {
# check that the server actually started and is ready for connections
- if {[exec grep "ready to accept" | wc -l < $stdout] > 0} {
+ if {[exec grep -i "Ready to accept" | wc -l < $stdout] > 0} {
break
}
after 10
diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl
index 5f114c5dc..fdfe6a01b 100644
--- a/tests/test_helper.tcl
+++ b/tests/test_helper.tcl
@@ -41,6 +41,7 @@ set ::all_tests {
integration/rdb
integration/convert-zipmap-hash-on-load
integration/logging
+ integration/psync2
unit/pubsub
unit/slowlog
unit/scripting
@@ -55,6 +56,7 @@ set ::all_tests {
unit/memefficiency
unit/hyperloglog
unit/lazyfree
+ unit/wait
}
# Index to the next test to run in the ::all_tests list.
set ::next_test 0
diff --git a/tests/unit/bitfield.tcl b/tests/unit/bitfield.tcl
index 26e47db0f..d76452b1b 100644
--- a/tests/unit/bitfield.tcl
+++ b/tests/unit/bitfield.tcl
@@ -189,4 +189,13 @@ start_server {tags {"bitops"}} {
r set bits 1
r bitfield bits get u1 0
} {0}
+
+ test {BITFIELD regression for #3564} {
+ for {set j 0} {$j < 10} {incr j} {
+ r del mystring
+ set res [r BITFIELD mystring SET i8 0 10 SET i8 64 10 INCRBY i8 10 99900]
+ assert {$res eq {0 0 60}}
+ }
+ r del mystring
+ }
}
diff --git a/tests/unit/geo.tcl b/tests/unit/geo.tcl
index a08726d2e..fdbfbf139 100644
--- a/tests/unit/geo.tcl
+++ b/tests/unit/geo.tcl
@@ -221,18 +221,26 @@ start_server {tags {"geo"}} {
}
test {GEOADD + GEORANGE randomized test} {
- set attempt 20
+ set attempt 30
while {[incr attempt -1]} {
set rv [lindex $regression_vectors $rv_idx]
incr rv_idx
unset -nocomplain debuginfo
- set srand_seed [randomInt 1000000]
+ set srand_seed [clock milliseconds]
if {$rv ne {}} {set srand_seed [lindex $rv 0]}
lappend debuginfo "srand_seed is $srand_seed"
expr {srand($srand_seed)} ; # If you need a reproducible run
r del mypoints
- set radius_km [expr {[randomInt 200]+10}]
+
+ if {[randomInt 10] == 0} {
+ # From time to time use very big radiuses
+ set radius_km [expr {[randomInt 50000]+10}]
+ } else {
+ # Normally use a few - ~200km radiuses to stress
+ # test the code the most in edge cases.
+ set radius_km [expr {[randomInt 200]+10}]
+ }
if {$rv ne {}} {set radius_km [lindex $rv 1]}
set radius_m [expr {$radius_km*1000}]
geo_random_point search_lon search_lat
@@ -246,10 +254,11 @@ start_server {tags {"geo"}} {
for {set j 0} {$j < 20000} {incr j} {
geo_random_point lon lat
lappend argv $lon $lat "place:$j"
- if {[geo_distance $lon $lat $search_lon $search_lat] < $radius_m} {
+ set distance [geo_distance $lon $lat $search_lon $search_lat]
+ if {$distance < $radius_m} {
lappend tcl_result "place:$j"
- lappend debuginfo "place:$j $lon $lat [expr {[geo_distance $lon $lat $search_lon $search_lat]/1000}] km"
}
+ lappend debuginfo "place:$j $lon $lat [expr {$distance/1000}] km"
}
r geoadd mypoints {*}$argv
set res [lsort [r georadius mypoints $search_lon $search_lat $radius_km km]]
diff --git a/tests/unit/other.tcl b/tests/unit/other.tcl
index 2f5773930..1d21b561a 100644
--- a/tests/unit/other.tcl
+++ b/tests/unit/other.tcl
@@ -52,7 +52,7 @@ start_server {tags {"other"}} {
test {SELECT an out of range DB} {
catch {r select 1000000} err
set _ $err
- } {*invalid*}
+ } {*index is out of range*}
tags {consistency} {
if {![catch {package require sha1}]} {
diff --git a/tests/unit/wait.tcl b/tests/unit/wait.tcl
new file mode 100644
index 000000000..e2f5d2942
--- /dev/null
+++ b/tests/unit/wait.tcl
@@ -0,0 +1,42 @@
+start_server {tags {"wait"}} {
+start_server {} {
+ set slave [srv 0 client]
+ set slave_host [srv 0 host]
+ set slave_port [srv 0 port]
+ set master [srv -1 client]
+ set master_host [srv -1 host]
+ set master_port [srv -1 port]
+
+ test {Setup slave} {
+ $slave slaveof $master_host $master_port
+ wait_for_condition 50 100 {
+ [s 0 master_link_status] eq {up}
+ } else {
+ fail "Replication not started."
+ }
+ }
+
+ test {WAIT should acknowledge 1 additional copy of the data} {
+ $master set foo 0
+ $master incr foo
+ $master incr foo
+ $master incr foo
+ assert {[$master wait 1 5000] == 1}
+ assert {[$slave get foo] == 3}
+ }
+
+ test {WAIT should not acknowledge 2 additional copies of the data} {
+ $master incr foo
+ assert {[$master wait 2 1000] <= 1}
+ }
+
+ test {WAIT should not acknowledge 1 additional copy if slave is blocked} {
+ exec src/redis-cli -h $slave_host -p $slave_port debug sleep 5 > /dev/null 2> /dev/null &
+ after 1000 ;# Give redis-cli the time to execute the command.
+ $master set foo 0
+ $master incr foo
+ $master incr foo
+ $master incr foo
+ assert {[$master wait 1 3000] == 0}
+ }
+}}
diff --git a/utils/hyperloglog/hll-gnuplot-graph.rb b/utils/hyperloglog/hll-gnuplot-graph.rb
index 745baddcf..6c7596d17 100644
--- a/utils/hyperloglog/hll-gnuplot-graph.rb
+++ b/utils/hyperloglog/hll-gnuplot-graph.rb
@@ -30,7 +30,7 @@ def run_experiment(r,seed,max,step)
elements << ele
i += 1
}
- r.pfadd('hll',*elements)
+ r.pfadd('hll',elements)
approx = r.pfcount('hll')
err = approx-i
rel_err = 100.to_f*err/i