summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rwxr-xr-xsrc/aof.c9
-rw-r--r--src/bitops.c4
-rwxr-xr-xsrc/config.c54
-rw-r--r--src/db.c6
-rw-r--r--src/debug.c20
-rw-r--r--src/help.h16
-rwxr-xr-xsrc/mkreleasehdr.sh2
-rwxr-xr-xsrc/multi.c41
-rw-r--r--src/networking.c4
-rw-r--r--src/pubsub.c1
-rwxr-xr-xsrc/rdb.c12
-rw-r--r--src/redis-cli.c266
-rwxr-xr-xsrc/redis.c239
-rwxr-xr-xsrc/redis.h21
-rwxr-xr-xsrc/replication.c17
-rw-r--r--src/rio.c26
-rw-r--r--src/rio.h13
-rwxr-xr-xsrc/scripting.c1
-rw-r--r--src/sds.c21
-rw-r--r--src/sds.h1
-rw-r--r--src/sentinel.c171
-rw-r--r--src/t_string.c70
-rw-r--r--src/version.h2
23 files changed, 789 insertions, 228 deletions
diff --git a/src/aof.c b/src/aof.c
index cd729cec1..ffa4f6a97 100755
--- a/src/aof.c
+++ b/src/aof.c
@@ -863,7 +863,9 @@ int rewriteAppendOnlyFile(char *filename) {
return REDIS_ERR;
}
- rioInitWithFileAndFsyncInterval(&aof,fp, 1024*1024*16);
+ rioInitWithFile(&aof,fp);
+ if (server.aof_rewrite_incremental_fsync)
+ rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);
for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = server.db+j;
@@ -891,6 +893,9 @@ int rewriteAppendOnlyFile(char *filename) {
expiretime = getExpire(db,&key);
+ /* If this key is already expired skip it */
+ if (expiretime != -1 && expiretime < now) continue;
+
/* Save the key and associated value */
if (o->type == REDIS_STRING) {
/* Emit a SET command */
@@ -913,8 +918,6 @@ int rewriteAppendOnlyFile(char *filename) {
/* Save the expire time */
if (expiretime != -1) {
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
- /* If this key is already expired skip it */
- if (expiretime < now) continue;
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
diff --git a/src/bitops.c b/src/bitops.c
index 75f3317a9..926d1eacd 100644
--- a/src/bitops.c
+++ b/src/bitops.c
@@ -58,8 +58,8 @@ static int getBitOffsetFromArgument(redisClient *c, robj *o, size_t *offset) {
/* Count number of bits set in the binary array pointed by 's' and long
* 'count' bytes. The implementation of this function is required to
* work with a input string length up to 512 MB. */
-long popcount(void *s, long count) {
- long bits = 0;
+size_t popcount(void *s, long count) {
+ size_t bits = 0;
unsigned char *p;
uint32_t *p4 = s;
static const unsigned char bitsinbyte[256] = {0,1,1,2,1,2,2,3,1,2,2,3,2,3,3,4,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,4,5,5,6,5,6,6,7,5,6,6,7,6,7,7,8};
diff --git a/src/config.c b/src/config.c
index dbea5513d..711162cfd 100755
--- a/src/config.c
+++ b/src/config.c
@@ -68,11 +68,21 @@ void loadServerConfigFromString(char *config) {
linenum = i+1;
lines[i] = sdstrim(lines[i]," \t\r\n");
- /* Skip comments and blank lines*/
+ /* Skip comments and blank lines */
if (lines[i][0] == '#' || lines[i][0] == '\0') continue;
/* Split into arguments */
argv = sdssplitargs(lines[i],&argc);
+ if (argv == NULL) {
+ err = "Unbalanced quotes in configuration line";
+ goto loaderr;
+ }
+
+ /* Skip this line if the resulting command vector is empty. */
+ if (argc == 0) {
+ sdsfreesplitres(argv,argc);
+ return;
+ }
sdstolower(argv[0]);
/* Execute config directives */
@@ -281,6 +291,10 @@ void loadServerConfigFromString(char *config) {
if ((server.conditional_sync = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
+ } else if (!strcasecmp(argv[0],"hz") && argc == 2) {
+ server.hz = atoi(argv[1]);
+ if (server.hz < REDIS_MIN_HZ) server.hz = REDIS_MIN_HZ;
+ if (server.hz > REDIS_MAX_HZ) server.hz = REDIS_MAX_HZ;
} else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
int yes;
@@ -319,6 +333,18 @@ void loadServerConfigFromString(char *config) {
argc == 2)
{
server.aof_rewrite_min_size = memtoll(argv[1],NULL);
+ } else if (!strcasecmp(argv[0],"aof-rewrite-incremental-fsync") &&
+ argc == 2)
+ {
+ if ((server.aof_rewrite_incremental_fsync = yesnotoi(argv[1])) == -1) {
+ err = "argument must be 'yes' or 'no'"; goto loaderr;
+ }
+ } else if (!strcasecmp(argv[0],"rdb-incremental-fsync") &&
+ argc == 2)
+ {
+ if ((server.rdb_incremental_fsync = yesnotoi(argv[1])) == -1) {
+ err = "argument must be 'yes' or 'no'"; goto loaderr;
+ }
} else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
if (strlen(argv[1]) > REDIS_AUTHPASS_MAX_LEN) {
err = "Password is longer than REDIS_AUTHPASS_MAX_LEN";
@@ -495,7 +521,7 @@ void configSetCommand(redisClient *c) {
server.requirepass = ((char*)o->ptr)[0] ? zstrdup(o->ptr) : NULL;
} else if (!strcasecmp(c->argv[2]->ptr,"masterauth")) {
zfree(server.masterauth);
- server.masterauth = zstrdup(o->ptr);
+ server.masterauth = ((char*)o->ptr)[0] ? zstrdup(o->ptr) : NULL;
} else if (!strcasecmp(c->argv[2]->ptr,"maxmemory")) {
if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
ll < 0) goto badfmt;
@@ -506,6 +532,12 @@ void configSetCommand(redisClient *c) {
}
freeMemoryIfNeeded(server.maxmemory);
}
+ } else if (!strcasecmp(c->argv[2]->ptr,"hz")) {
+ if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
+ ll < 0) goto badfmt;
+ server.hz = (int) ll;
+ if (server.hz < REDIS_MIN_HZ) server.hz = REDIS_MIN_HZ;
+ if (server.hz > REDIS_MAX_HZ) server.hz = REDIS_MAX_HZ;
} else if (!strcasecmp(c->argv[2]->ptr,"maxmemory-policy")) {
if (!strcasecmp(o->ptr,"volatile-lru")) {
server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU;
@@ -568,6 +600,16 @@ void configSetCommand(redisClient *c) {
} else if (!strcasecmp(c->argv[2]->ptr,"auto-aof-rewrite-min-size")) {
if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt;
server.aof_rewrite_min_size = ll;
+ } else if (!strcasecmp(c->argv[2]->ptr,"aof-rewrite-incremental-fsync")) {
+ int yn = yesnotoi(o->ptr);
+
+ if (yn == -1) goto badfmt;
+ server.aof_rewrite_incremental_fsync = yn;
+ } else if (!strcasecmp(c->argv[2]->ptr,"rdb-incremental-fsync")) {
+ int yn = yesnotoi(o->ptr);
+
+ if (yn == -1) goto badfmt;
+ server.rdb_incremental_fsync = yn;
} else if (!strcasecmp(c->argv[2]->ptr,"save")) {
int vlen, j;
sds *v = sdssplitlen(o->ptr,sdslen(o->ptr)," ",1,&vlen);
@@ -792,7 +834,7 @@ void configGetCommand(redisClient *c) {
config_get_string_field("dbfilename",server.rdb_filename);
config_get_string_field("syncdbfilename",server.rdb_syncfilename);
config_get_string_field("requirepass",server.requirepass);
- config_get_string_field("masterauth",server.requirepass);
+ config_get_string_field("masterauth",server.masterauth);
config_get_string_field("bind",server.bindaddr);
config_get_string_field("unixsocket",server.unixsocket);
config_get_string_field("logfile",server.logfile);
@@ -833,6 +875,7 @@ void configGetCommand(redisClient *c) {
config_get_numerical_field("maxclients",server.maxclients);
config_get_numerical_field("watchdog-period",server.watchdog_period);
config_get_numerical_field("slave-priority",server.slave_priority);
+ config_get_numerical_field("hz",server.hz);
/* Bool (yes/no) values */
config_get_bool_field("no-appendfsync-on-rewrite",
@@ -849,6 +892,11 @@ void configGetCommand(redisClient *c) {
config_get_bool_field("activerehashing", server.activerehashing);
config_get_bool_field("repl-disable-tcp-nodelay",
server.repl_disable_tcp_nodelay);
+ config_get_bool_field("aof-rewrite-incremental-fsync",
+ server.aof_rewrite_incremental_fsync);
+ config_get_bool_field("rdb-incremental-fsync",
+ server.rdb_incremental_fsync);
+
/* Everything we can't handle with macros follows. */
diff --git a/src/db.c b/src/db.c
index bd3ec0150..353f6a37f 100644
--- a/src/db.c
+++ b/src/db.c
@@ -551,7 +551,6 @@ int expireIfNeeded(redisDb *db, robj *key) {
* unit is either UNIT_SECONDS or UNIT_MILLISECONDS, and is only used for
* the argv[2] parameter. The basetime is always specified in milliseconds. */
void expireGenericCommand(redisClient *c, long long basetime, int unit) {
- dictEntry *de;
robj *key = c->argv[1], *param = c->argv[2];
long long when; /* unix time in milliseconds when the key will expire. */
@@ -561,11 +560,12 @@ void expireGenericCommand(redisClient *c, long long basetime, int unit) {
if (unit == UNIT_SECONDS) when *= 1000;
when += basetime;
- de = dictFind(c->db->dict,key->ptr);
- if (de == NULL) {
+ /* No key, return zero. */
+ if (lookupKeyRead(c->db,key) == NULL) {
addReply(c,shared.czero);
return;
}
+
/* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past
* should never be executed as a DEL when load the AOF or in the context
* of a slave instance.
diff --git a/src/debug.c b/src/debug.c
index a74358383..e1d42cc65 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -330,9 +330,14 @@ void debugCommand(redisClient *c) {
usleep(utime);
addReply(c,shared.ok);
+ } else if (!strcasecmp(c->argv[1]->ptr,"set-active-expire") &&
+ c->argc == 3)
+ {
+ server.active_expire_enabled = atoi(c->argv[2]->ptr);
+ addReply(c,shared.ok);
} else {
- addReplyError(c,
- "Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|SWAPIN <key>|SWAPOUT <key>|RELOAD]");
+ addReplyErrorFormat(c, "Unknown DEBUG subcommand or wrong number of arguments for '%s'",
+ (char*)c->argv[1]->ptr);
}
}
@@ -382,9 +387,12 @@ void redisLogObjectDebugInfo(robj *o) {
redisLog(REDIS_WARNING,"Object encoding: %d", o->encoding);
redisLog(REDIS_WARNING,"Object refcount: %d", o->refcount);
if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_RAW) {
- redisLog(REDIS_WARNING,"Object raw string len: %d", sdslen(o->ptr));
- if (sdslen(o->ptr) < 4096)
- redisLog(REDIS_WARNING,"Object raw string content: \"%s\"", (char*)o->ptr);
+ redisLog(REDIS_WARNING,"Object raw string len: %zu", sdslen(o->ptr));
+ if (sdslen(o->ptr) < 4096) {
+ sds repr = sdscatrepr(sdsempty(),o->ptr,sdslen(o->ptr));
+ redisLog(REDIS_WARNING,"Object raw string content: %s", repr);
+ sdsfree(repr);
+ }
} else if (o->type == REDIS_LIST) {
redisLog(REDIS_WARNING,"List length: %d", (int) listTypeLength(o));
} else if (o->type == REDIS_SET) {
@@ -794,7 +802,7 @@ void enableWatchdog(int period) {
/* If the configured period is smaller than twice the timer period, it is
* too short for the software watchdog to work reliably. Fix it now
* if needed. */
- min_period = (1000/REDIS_HZ)*2;
+ min_period = (1000/server.hz)*2;
if (period < min_period) period = min_period;
watchdogScheduleSignal(period); /* Adjust the current timer. */
server.watchdog_period = period;
diff --git a/src/help.h b/src/help.h
index 1378b8b85..60a5726bf 100644
--- a/src/help.h
+++ b/src/help.h
@@ -69,6 +69,11 @@ struct commandHelp {
"Pop a value from a list, push it to another list and return it; or block until one is available",
2,
"2.2.0" },
+ { "CLIENT GETNAME",
+ "-",
+ "Get the current connection name",
+ 9,
+ "2.6.9" },
{ "CLIENT KILL",
"ip:port",
"Kill the connection of a client",
@@ -79,6 +84,11 @@ struct commandHelp {
"Get the list of client connections",
9,
"2.4.0" },
+ { "CLIENT SETNAME",
+ "connection-name",
+ "Set the current connection name",
+ 9,
+ "2.6.9" },
{ "CONFIG GET",
"parameter",
"Get the value of a configuration parameter",
@@ -280,7 +290,7 @@ struct commandHelp {
1,
"2.6.0" },
{ "INFO",
- "-",
+ "[section]",
"Get information and statistics about the server",
9,
"1.0.0" },
@@ -525,7 +535,7 @@ struct commandHelp {
8,
"1.0.0" },
{ "SET",
- "key value",
+ "key value [EX seconds] [PX milliseconds] [NX|XX]",
"Set the string value of a key",
1,
"1.0.0" },
@@ -665,7 +675,7 @@ struct commandHelp {
7,
"2.2.0" },
{ "ZADD",
- "key score member [score] [member]",
+ "key score member [score member ...]",
"Add one or more members to a sorted set, or update its score if it already exists",
4,
"1.2.0" },
diff --git a/src/mkreleasehdr.sh b/src/mkreleasehdr.sh
index dbf948c8a..386bd24d1 100755
--- a/src/mkreleasehdr.sh
+++ b/src/mkreleasehdr.sh
@@ -1,6 +1,6 @@
#!/bin/sh
GIT_SHA1=`(git show-ref --head --hash=8 2> /dev/null || echo 00000000) | head -n1`
-GIT_DIRTY=`git diff 2> /dev/null | wc -l`
+GIT_DIRTY=`git diff --no-ext-diff 2> /dev/null | wc -l`
test -f release.h || touch release.h
(cat release.h | grep SHA1 | grep $GIT_SHA1) && \
(cat release.h | grep DIRTY | grep $GIT_DIRTY) && exit 0 # Already up-to-date
diff --git a/src/multi.c b/src/multi.c
index 2b7e6b552..cd07971e2 100755
--- a/src/multi.c
+++ b/src/multi.c
@@ -103,13 +103,11 @@ void discardCommand(redisClient *c) {
/* Send a MULTI command to all the slaves and AOF file. Check the execCommand
* implementation for more information. */
-void execCommandReplicateMulti(redisClient *c) {
+void execCommandPropagateMulti(redisClient *c) {
robj *multistring = createStringObject("MULTI",5);
- if (server.aof_state != REDIS_AOF_OFF)
- feedAppendOnlyFile(server.multiCommand,c->db->id,&multistring,1);
- if (listLength(server.slaves))
- replicationFeedSlaves(server.slaves,c->db->id,&multistring,1);
+ propagate(server.multiCommand,c->db->id,&multistring,1,
+ REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
decrRefCount(multistring);
}
@@ -118,6 +116,7 @@ void execCommand(redisClient *c) {
robj **orig_argv;
int orig_argc;
struct redisCommand *orig_cmd;
+ int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */
if (!(c->flags & REDIS_MULTI)) {
addReplyError(c,"EXEC without MULTI");
@@ -133,19 +132,10 @@ void execCommand(redisClient *c) {
if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) {
addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr :
shared.nullmultibulk);
- freeClientMultiState(c);
- initClientMultiState(c);
- c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC);
- unwatchAllKeys(c);
+ discardTransaction(c);
goto handle_monitor;
}
- /* Replicate a MULTI request now that we are sure the block is executed.
- * This way we'll deliver the MULTI/..../EXEC block as a whole and
- * both the AOF and the replication link will have the same consistency
- * and atomicity guarantees. */
- execCommandReplicateMulti(c);
-
/* Exec all the queued commands */
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
orig_argv = c->argv;
@@ -156,6 +146,16 @@ void execCommand(redisClient *c) {
c->argc = c->mstate.commands[j].argc;
c->argv = c->mstate.commands[j].argv;
c->cmd = c->mstate.commands[j].cmd;
+
+ /* Propagate a MULTI request once we encounter the first write op.
+ * This way we'll deliver the MULTI/..../EXEC block as a whole and
+ * both the AOF and the replication link will have the same consistency
+ * and atomicity guarantees. */
+ if (!must_propagate && !(c->cmd->flags & REDIS_CMD_READONLY)) {
+ execCommandPropagateMulti(c);
+ must_propagate = 1;
+ }
+
call(c,REDIS_CALL_FULL);
/* Commands may alter argc/argv, restore mstate. */
@@ -166,13 +166,10 @@ void execCommand(redisClient *c) {
c->argv = orig_argv;
c->argc = orig_argc;
c->cmd = orig_cmd;
- freeClientMultiState(c);
- initClientMultiState(c);
- c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC);
- /* Make sure the EXEC command is always replicated / AOF, since we
- * always send the MULTI command (we can't know beforehand if the
- * next operations will contain at least a modification to the DB). */
- server.dirty++;
+ discardTransaction(c);
+ /* Make sure the EXEC command will be propagated as well if MULTI
+ * was already propagated. */
+ if (must_propagate) server.dirty++;
handle_monitor:
/* Send EXEC to clients waiting data from MONITOR. We do it here
diff --git a/src/networking.c b/src/networking.c
index 3f14b17c8..569b64945 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1259,7 +1259,7 @@ void rewriteClientCommandVector(redisClient *c, int argc, ...) {
/* Replace argv and argc with our new versions. */
c->argv = argv;
c->argc = argc;
- c->cmd = lookupCommand(c->argv[0]->ptr);
+ c->cmd = lookupCommandOrOriginal(c->argv[0]->ptr);
redisAssertWithInfo(c,NULL,c->cmd != NULL);
va_end(ap);
}
@@ -1277,7 +1277,7 @@ void rewriteClientCommandArgument(redisClient *c, int i, robj *newval) {
/* If this is the command name make sure to fix c->cmd. */
if (i == 0) {
- c->cmd = lookupCommand(c->argv[0]->ptr);
+ c->cmd = lookupCommandOrOriginal(c->argv[0]->ptr);
redisAssertWithInfo(c,NULL,c->cmd != NULL);
}
}
diff --git a/src/pubsub.c b/src/pubsub.c
index ece32445d..ed2b0766e 100644
--- a/src/pubsub.c
+++ b/src/pubsub.c
@@ -306,7 +306,6 @@ void punsubscribeCommand(redisClient *c) {
slowlogAddComplexityParam('M', listLength(server.pubsub_patterns));
if (c->argc == 1) {
pubsubUnsubscribeAllPatterns(c,1);
- return;
} else {
int j;
diff --git a/src/rdb.c b/src/rdb.c
index 803ecf14c..0784f81af 100755
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -648,7 +648,9 @@ int rdbSave(char *filename, int dbnum) {
return REDIS_ERR;
}
- rioInitWithFileAndFsyncInterval(&rdb,fp, 1024*1024*16);
+ rioInitWithFile(&rdb,fp);
+ if (server.rdb_incremental_fsync)
+ rioSetAutoSync(&rdb,REDIS_RDB_AUTOSYNC_BYTES);
if (server.rdb_checksum)
rdb.update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION);
@@ -763,6 +765,7 @@ int rdbSaveBackground(char *filename, int bgsavetype, int dbnum) {
if (bgsavetype == REDIS_BGSAVE_NORMAL) server.stat_rdb_saves++;
server.dirty_before_bgsave = server.dirty;
+ server.lastbgsave_try = time(NULL);
start = ustime();
if (server.rdb_bgsavefilename) zfree(server.rdb_bgsavefilename);
@@ -1144,11 +1147,8 @@ int rdbLoad(char *filename) {
FILE *fp;
rio rdb;
- fp = fopen(filename,"r");
- if (!fp) {
- errno = ENOENT;
- return REDIS_ERR;
- }
+ if ((fp = fopen(filename,"r")) == NULL) return REDIS_ERR;
+
rioInitWithFile(&rdb,fp);
rdb.update_cksum = rdbLoadProgressCallback;
rdb.max_processing_chunk = server.loading_process_events_interval_bytes;
diff --git a/src/redis-cli.c b/src/redis-cli.c
index c182ac17d..f12059ca4 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -42,6 +42,7 @@
#include <sys/time.h>
#include <assert.h>
#include <fcntl.h>
+#include <limits.h>
#include "hiredis.h"
#include "sds.h"
@@ -56,6 +57,7 @@
#define OUTPUT_STANDARD 0
#define OUTPUT_RAW 1
#define OUTPUT_CSV 2
+#define REDIS_CLI_KEEPALIVE_INTERVAL 15 /* seconds */
static redisContext *context;
static struct config {
@@ -70,11 +72,13 @@ static struct config {
int monitor_mode;
int pubsub_mode;
int latency_mode;
+ int latency_history;
int cluster_mode;
int cluster_reissue_command;
int slave_mode;
int pipe_mode;
int getrdb_mode;
+ int stat_mode;
char *rdb_filename;
int bigkeys;
int stdinarg; /* get last arg from stdin. (-x option) */
@@ -332,6 +336,12 @@ static int cliConnect(int force) {
return REDIS_ERR;
}
+ /* Set aggressive KEEP_ALIVE socket option in the Redis context socket
+ * in order to prevent timeouts caused by the execution of long
+ * commands. At the same time this improves the detection of real
+ * errors. */
+ anetKeepAlive(NULL, context->fd, REDIS_CLI_KEEPALIVE_INTERVAL);
+
/* Do AUTH and select the right DB. */
if (cliAuth() != REDIS_OK)
return REDIS_ERR;
@@ -623,6 +633,36 @@ static int cliSendCommand(int argc, char **argv, int repeat) {
return REDIS_OK;
}
+/* Send the INFO command, reconnecting the link if needed. */
+static redisReply *reconnectingInfo(void) {
+ redisContext *c = context;
+ redisReply *reply = NULL;
+ int tries = 0;
+
+ assert(!c->err);
+ while(reply == NULL) {
+ while (c->err & (REDIS_ERR_IO | REDIS_ERR_EOF)) {
+ printf("Reconnecting (%d)...\r", ++tries);
+ fflush(stdout);
+
+ redisFree(c);
+ c = redisConnect(config.hostip,config.hostport);
+ usleep(1000000);
+ }
+
+ reply = redisCommand(c,"INFO");
+ if (c->err && !(c->err & (REDIS_ERR_IO | REDIS_ERR_EOF))) {
+ fprintf(stderr, "Error: %s\n", c->errstr);
+ exit(1);
+ } else if (tries > 0) {
+ printf("\n");
+ }
+ }
+
+ context = c;
+ return reply;
+}
+
/*------------------------------------------------------------------------------
* User interface
*--------------------------------------------------------------------------- */
@@ -661,8 +701,13 @@ static int parseOptions(int argc, char **argv) {
config.output = OUTPUT_CSV;
} else if (!strcmp(argv[i],"--latency")) {
config.latency_mode = 1;
+ } else if (!strcmp(argv[i],"--latency-history")) {
+ config.latency_mode = 1;
+ config.latency_history = 1;
} else if (!strcmp(argv[i],"--slave")) {
config.slave_mode = 1;
+ } else if (!strcmp(argv[i],"--stat")) {
+ config.stat_mode = 1;
} else if (!strcmp(argv[i],"--rdb") && !lastarg) {
config.getrdb_mode = 1;
config.rdb_filename = argv[++i];
@@ -683,7 +728,15 @@ static int parseOptions(int argc, char **argv) {
sdsfree(version);
exit(0);
} else {
- break;
+ if (argv[i][0] == '-') {
+ fprintf(stderr,
+ "Unrecognized option or bad number of args for: '%s'\n",
+ argv[i]);
+ exit(1);
+ } else {
+ /* Likely the command name, stop here. */
+ break;
+ }
}
}
return i;
@@ -712,26 +765,29 @@ static void usage() {
"redis-cli %s\n"
"\n"
"Usage: redis-cli [OPTIONS] [cmd [arg [arg ...]]]\n"
-" -h <hostname> Server hostname (default: 127.0.0.1)\n"
-" -p <port> Server port (default: 6379)\n"
-" -s <socket> Server socket (overrides hostname and port)\n"
-" -a <password> Password to use when connecting to the server\n"
-" -r <repeat> Execute specified command N times\n"
-" -i <interval> When -r is used, waits <interval> seconds per command.\n"
-" It is possible to specify sub-second times like -i 0.1\n"
-" -n <db> Database number\n"
-" -x Read last argument from STDIN\n"
-" -d <delimiter> Multi-bulk delimiter in for raw formatting (default: \\n)\n"
-" -c Enable cluster mode (follow -ASK and -MOVED redirections)\n"
-" --raw Use raw formatting for replies (default when STDOUT is not a tty)\n"
-" --latency Enter a special mode continuously sampling latency\n"
-" --slave Simulate a slave showing commands received from the master\n"
-" --rdb <filename> Transfer an RDB dump from remote server to local file.\n"
-" --pipe Transfer raw Redis protocol from stdin to server\n"
-" --bigkeys Sample Redis keys looking for big keys\n"
-" --eval <file> Send an EVAL command using the Lua script at <file>\n"
-" --help Output this help and exit\n"
-" --version Output version and exit\n"
+" -h <hostname> Server hostname (default: 127.0.0.1)\n"
+" -p <port> Server port (default: 6379)\n"
+" -s <socket> Server socket (overrides hostname and port)\n"
+" -a <password> Password to use when connecting to the server\n"
+" -r <repeat> Execute specified command N times\n"
+" -i <interval> When -r is used, waits <interval> seconds per command.\n"
+" It is possible to specify sub-second times like -i 0.1\n"
+" -n <db> Database number\n"
+" -x Read last argument from STDIN\n"
+" -d <delimiter> Multi-bulk delimiter in for raw formatting (default: \\n)\n"
+" -c Enable cluster mode (follow -ASK and -MOVED redirections)\n"
+" --raw Use raw formatting for replies (default when STDOUT is\n"
+" not a tty)\n"
+" --latency Enter a special mode continuously sampling latency\n"
+" --latency-history Like --latency but tracking latency changes over time.\n"
+" Default time interval is 15 sec. Change it using -i.\n"
+" --slave Simulate a slave showing commands received from the master\n"
+" --rdb <filename> Transfer an RDB dump from remote server to local file.\n"
+" --pipe Transfer raw Redis protocol from stdin to server\n"
+" --bigkeys Sample Redis keys looking for big keys\n"
+" --eval <file> Send an EVAL command using the Lua script at <file>\n"
+" --help Output this help and exit\n"
+" --version Output version and exit\n"
"\n"
"Examples:\n"
" cat /etc/passwd | redis-cli -x set mypasswd\n"
@@ -843,8 +899,7 @@ static void repl() {
}
}
/* Free the argument vector */
- while(argc--) sdsfree(argv[argc]);
- zfree(argv);
+ sdsfreesplitres(argv,argc);
}
/* linenoise() returns malloc-ed lines like readline() */
free(line);
@@ -903,10 +958,16 @@ static int evalMode(int argc, char **argv) {
return cliSendCommand(argc+3-got_comma, argv2, config.repeat);
}
+#define LATENCY_SAMPLE_RATE 10 /* milliseconds. */
+#define LATENCY_HISTORY_DEFAULT_INTERVAL 15000 /* milliseconds. */
static void latencyMode(void) {
redisReply *reply;
long long start, latency, min = 0, max = 0, tot = 0, count = 0;
+ long long history_interval =
+ config.interval ? config.interval/1000 :
+ LATENCY_HISTORY_DEFAULT_INTERVAL;
double avg;
+ long long history_start = mstime();
if (!context) exit(1);
while(1) {
@@ -931,7 +992,13 @@ static void latencyMode(void) {
printf("\x1b[0G\x1b[2Kmin: %lld, max: %lld, avg: %.2f (%lld samples)",
min, max, avg, count);
fflush(stdout);
- usleep(10000);
+ if (config.latency_history && mstime()-history_start > history_interval)
+ {
+ printf(" -- %.2f seconds range\n", (float)(mstime()-history_start)/1000);
+ history_start = mstime();
+ min = max = tot = count = 0;
+ }
+ usleep(LATENCY_SAMPLE_RATE * 1000);
}
}
@@ -1199,7 +1266,11 @@ static void findBigKeys(void) {
fprintf(stderr, "RANDOMKEY error: %s\n",
reply1->str);
exit(1);
+ } else if (reply1->type == REDIS_REPLY_NIL) {
+ fprintf(stderr, "It looks like the database is empty!\n");
+ exit(1);
}
+
/* Get the key type */
reply2 = redisCommand(context,"TYPE %s",reply1->str);
assert(reply2 && reply2->type == REDIS_REPLY_STATUS);
@@ -1255,6 +1326,145 @@ static void findBigKeys(void) {
}
}
+/* Return the specified INFO field from the INFO command output "info".
+ * A new buffer is allocated for the result, that needs to be free'd.
+ * If the field is not found NULL is returned. */
+static char *getInfoField(char *info, char *field) {
+ char *p = strstr(info,field);
+ char *n1, *n2;
+ char *result;
+
+ if (!p) return NULL;
+ p += strlen(field)+1;
+ n1 = strchr(p,'\r');
+ n2 = strchr(p,',');
+ if (n2 && n2 < n1) n1 = n2;
+ result = malloc(sizeof(char)*(n1-p)+1);
+ memcpy(result,p,(n1-p));
+ result[n1-p] = '\0';
+ return result;
+}
+
+/* Like the above function but automatically convert the result into
+ * a long. On error (missing field) LONG_MIN is returned. */
+static long getLongInfoField(char *info, char *field) {
+ char *value = getInfoField(info,field);
+ long l;
+
+ if (!value) return LONG_MIN;
+ l = strtol(value,NULL,10);
+ free(value);
+ return l;
+}
+
+/* Convert number of bytes into a human readable string of the form:
+ * 100B, 2G, 100M, 4K, and so forth. */
+void bytesToHuman(char *s, long long n) {
+ double d;
+
+ if (n < 0) {
+ *s = '-';
+ s++;
+ n = -n;
+ }
+ if (n < 1024) {
+ /* Bytes */
+ sprintf(s,"%lluB",n);
+ return;
+ } else if (n < (1024*1024)) {
+ d = (double)n/(1024);
+ sprintf(s,"%.2fK",d);
+ } else if (n < (1024LL*1024*1024)) {
+ d = (double)n/(1024*1024);
+ sprintf(s,"%.2fM",d);
+ } else if (n < (1024LL*1024*1024*1024)) {
+ d = (double)n/(1024LL*1024*1024);
+ sprintf(s,"%.2fG",d);
+ }
+}
+
+static void statMode() {
+ redisReply *reply;
+ long aux, requests = 0;
+ int i = 0;
+
+ while(1) {
+ char buf[64];
+ int j;
+
+ reply = reconnectingInfo();
+ if (reply->type == REDIS_REPLY_ERROR) {
+ printf("ERROR: %s\n", reply->str);
+ exit(1);
+ }
+
+ if ((i++ % 20) == 0) {
+ printf(
+"------- data ------ --------------------- load -------------------- - child -\n"
+"keys mem clients blocked requests connections \n");
+ }
+
+ /* Keys */
+ aux = 0;
+ for (j = 0; j < 20; j++) {
+ long k;
+
+ sprintf(buf,"db%d:keys",j);
+ k = getLongInfoField(reply->str,buf);
+ if (k == LONG_MIN) continue;
+ aux += k;
+ }
+ sprintf(buf,"%ld",aux);
+ printf("%-11s",buf);
+
+ /* Used memory */
+ aux = getLongInfoField(reply->str,"used_memory");
+ bytesToHuman(buf,aux);
+ printf("%-8s",buf);
+
+ /* Clients */
+ aux = getLongInfoField(reply->str,"connected_clients");
+ sprintf(buf,"%ld",aux);
+ printf(" %-8s",buf);
+
+ /* Blocked (BLPOPPING) Clients */
+ aux = getLongInfoField(reply->str,"blocked_clients");
+ sprintf(buf,"%ld",aux);
+ printf("%-8s",buf);
+
+ /* Requets */
+ aux = getLongInfoField(reply->str,"total_commands_processed");
+ sprintf(buf,"%ld (+%ld)",aux,requests == 0 ? 0 : aux-requests);
+ printf("%-19s",buf);
+ requests = aux;
+
+ /* Connections */
+ aux = getLongInfoField(reply->str,"total_connections_received");
+ sprintf(buf,"%ld",aux);
+ printf(" %-12s",buf);
+
+ /* Children */
+ aux = getLongInfoField(reply->str,"bgsave_in_progress");
+ aux |= getLongInfoField(reply->str,"aof_rewrite_in_progress") << 1;
+ switch(aux) {
+ case 0: break;
+ case 1:
+ printf("SAVE");
+ break;
+ case 2:
+ printf("AOF");
+ break;
+ case 3:
+ printf("SAVE+AOF");
+ break;
+ }
+
+ printf("\n");
+ freeReplyObject(reply);
+ usleep(config.interval);
+ }
+}
+
int main(int argc, char **argv) {
int firstarg;
@@ -1269,6 +1479,7 @@ int main(int argc, char **argv) {
config.monitor_mode = 0;
config.pubsub_mode = 0;
config.latency_mode = 0;
+ config.latency_history = 0;
config.cluster_mode = 0;
config.slave_mode = 0;
config.getrdb_mode = 0;
@@ -1319,6 +1530,13 @@ int main(int argc, char **argv) {
findBigKeys();
}
+ /* Stat mode */
+ if (config.stat_mode) {
+ if (cliConnect(0) == REDIS_ERR) exit(1);
+ if (config.interval == 0) config.interval = 1000000;
+ statMode();
+ }
+
/* Start interactive mode when no command is provided */
if (argc == 0 && !config.eval) {
/* Note that in repl mode we don't abort on connection error.
diff --git a/src/redis.c b/src/redis.c
index a7d468104..3e9b178ac 100755
--- a/src/redis.c
+++ b/src/redis.c
@@ -114,7 +114,7 @@ struct redisCommand *commandTable;
*/
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"r",0,NULL,1,1,1,0,0},
- {"set",setCommand,3,"wm",0,noPreloadGetKeys,1,1,1,0,0},
+ {"set",setCommand,-3,"wm",0,noPreloadGetKeys,1,1,1,0,0},
{"setnx",setnxCommand,3,"wm",0,noPreloadGetKeys,1,1,1,0,0},
{"setex",setexCommand,4,"wm",0,noPreloadGetKeys,1,1,1,0,0},
{"psetex",psetexCommand,4,"wm",0,noPreloadGetKeys,1,1,1,0,0},
@@ -197,7 +197,7 @@ struct redisCommand redisCommandTable[] = {
{"mset",msetCommand,-3,"wm",0,NULL,1,-1,2,0,0},
{"msetnx",msetnxCommand,-3,"wm",0,NULL,1,-1,2,0,0},
{"randomkey",randomkeyCommand,1,"rR",0,NULL,0,0,0,0,0},
- {"select",selectCommand,2,"r",0,NULL,0,0,0,0,0},
+ {"select",selectCommand,2,"rl",0,NULL,0,0,0,0,0},
{"move",moveCommand,3,"w",0,NULL,1,1,1,0,0},
{"rename",renameCommand,3,"w",0,renameGetKeys,1,2,1,0,0},
{"renamenx",renamenxCommand,3,"w",0,renameGetKeys,1,2,1,0,0},
@@ -207,7 +207,7 @@ struct redisCommand redisCommandTable[] = {
{"pexpireat",pexpireatCommand,3,"w",0,NULL,1,1,1,0,0},
{"keys",keysCommand,2,"rS",0,NULL,0,0,0,0,0},
{"dbsize",dbsizeCommand,1,"r",0,NULL,0,0,0,0,0},
- {"auth",authCommand,2,"rs",0,NULL,0,0,0,0,0},
+ {"auth",authCommand,2,"rsl",0,NULL,0,0,0,0,0},
{"ping",pingCommand,1,"r",0,NULL,0,0,0,0,0},
{"echo",echoCommand,2,"r",0,NULL,0,0,0,0,0},
{"save",saveCommand,1,"ars",0,NULL,0,0,0,0,0},
@@ -238,7 +238,7 @@ struct redisCommand redisCommandTable[] = {
{"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
{"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
{"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
- {"publish",publishCommand,3,"pflt",0,NULL,0,0,0,0,0},
+ {"publish",publishCommand,3,"pfltr",0,NULL,0,0,0,0,0},
{"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0},
{"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0},
{"restore",restoreCommand,4,"awm",0,NULL,1,1,1,0,0},
@@ -574,36 +574,32 @@ int htNeedsResize(dict *dict) {
/* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
* we resize the hash table to save memory */
-void tryResizeHashTables(void) {
- int j;
-
- for (j = 0; j < server.dbnum; j++) {
- if (htNeedsResize(server.db[j].dict))
- dictResize(server.db[j].dict);
- if (htNeedsResize(server.db[j].expires))
- dictResize(server.db[j].expires);
- }
+void tryResizeHashTables(int dbid) {
+ if (htNeedsResize(server.db[dbid].dict))
+ dictResize(server.db[dbid].dict);
+ if (htNeedsResize(server.db[dbid].expires))
+ dictResize(server.db[dbid].expires);
}
/* Our hash table implementation performs rehashing incrementally while
* we write/read from the hash table. Still if the server is idle, the hash
* table will use two tables for a long time. So we try to use 1 millisecond
- * of CPU time at every serverCron() loop in order to rehash some key. */
-void incrementallyRehash(void) {
- int j;
-
- for (j = 0; j < server.dbnum; j++) {
- /* Keys dictionary */
- if (dictIsRehashing(server.db[j].dict)) {
- dictRehashMilliseconds(server.db[j].dict,1);
- break; /* already used our millisecond for this loop... */
- }
- /* Expires */
- if (dictIsRehashing(server.db[j].expires)) {
- dictRehashMilliseconds(server.db[j].expires,1);
- break; /* already used our millisecond for this loop... */
- }
+ * of CPU time at every call of this function to perform some rehahsing.
+ *
+ * The function returns 1 if some rehashing was performed, otherwise 0
+ * is returned. */
+int incrementallyRehash(int dbid) {
+ /* Keys dictionary */
+ if (dictIsRehashing(server.db[dbid].dict)) {
+ dictRehashMilliseconds(server.db[dbid].dict,1);
+ return 1; /* already used our millisecond for this loop... */
+ }
+ /* Expires */
+ if (dictIsRehashing(server.db[dbid].expires)) {
+ dictRehashMilliseconds(server.db[dbid].expires,1);
+ return 1; /* already used our millisecond for this loop... */
}
+ return 0;
}
/* This function is called once a background process of some kind terminates,
@@ -624,28 +620,57 @@ void updateDictResizePolicy(void) {
/* Try to expire a few timed out keys. The algorithm used is adaptive and
* will use few CPU cycles if there are few expiring keys, otherwise
* it will get more aggressive to avoid that too much memory is used by
- * keys that can be removed from the keyspace. */
+ * keys that can be removed from the keyspace.
+ *
+ * No more than REDIS_DBCRON_DBS_PER_CALL databases are tested at every
+ * iteration. */
void activeExpireCycle(void) {
- int j, iteration = 0;
+ /* This function has some global state in order to continue the work
+ * incrementally across calls. */
+ static unsigned int current_db = 0; /* Last DB tested. */
+ static int timelimit_exit = 0; /* Time limit hit in previous call? */
+
+ unsigned int j, iteration = 0;
+ unsigned int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL;
long long start = ustime(), timelimit;
+ /* We usually should test REDIS_DBCRON_DBS_PER_CALL per iteration, with
+ * two exceptions:
+ *
+ * 1) Don't test more DBs than we have.
+ * 2) If last time we hit the time limit, we want to scan all DBs
+ * in this iteration, as there is work to do in some DB and we don't want
+ * expired keys to use memory for too much time. */
+ if (dbs_per_call > server.dbnum || timelimit_exit)
+ dbs_per_call = server.dbnum;
+
/* We can use at max REDIS_EXPIRELOOKUPS_TIME_PERC percentage of CPU time
* per iteration. Since this function gets called with a frequency of
- * REDIS_HZ times per second, the following is the max amount of
+ * server.hz times per second, the following is the max amount of
* microseconds we can spend in this function. */
- timelimit = 1000000*REDIS_EXPIRELOOKUPS_TIME_PERC/REDIS_HZ/100;
+ timelimit = 1000000*REDIS_EXPIRELOOKUPS_TIME_PERC/server.hz/100;
+ timelimit_exit = 0;
if (timelimit <= 0) timelimit = 1;
- for (j = 0; j < server.dbnum; j++) {
+ for (j = 0; j < dbs_per_call; j++) {
int expired;
- redisDb *db = server.db+j;
+ redisDb *db = server.db+(current_db % server.dbnum);
+
+ /* Increment the DB now so we are sure if we run out of time
+ * in the current DB we'll restart from the next. This allows to
+ * distribute the time evenly across DBs. */
+ current_db++;
/* Continue to expire if at the end of the cycle more than 25%
* of the keys were expired. */
do {
- unsigned long num = dictSize(db->expires);
- unsigned long slots = dictSlots(db->expires);
- long long now = mstime();
+ unsigned long num, slots;
+ long long now;
+
+ /* If there is nothing to expire try next DB ASAP. */
+ if ((num = dictSize(db->expires)) == 0) break;
+ slots = dictSlots(db->expires);
+ now = mstime();
/* When there are less than 1% filled slots getting random
* keys is expensive, so stop here waiting for better times...
@@ -679,8 +704,12 @@ void activeExpireCycle(void) {
* expire. So after a given amount of milliseconds return to the
* caller waiting for the other active expire cycle. */
iteration++;
- if ((iteration & 0xf) == 0 && /* check once every 16 cycles. */
- (ustime()-start) > timelimit) return;
+ if ((iteration & 0xf) == 0 && /* check once every 16 iterations. */
+ (ustime()-start) > timelimit)
+ {
+ timelimit_exit = 1;
+ return;
+ }
} while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
}
}
@@ -766,13 +795,13 @@ int clientsCronResizeQueryBuffer(redisClient *c) {
}
void clientsCron(void) {
- /* Make sure to process at least 1/(REDIS_HZ*10) of clients per call.
- * Since this function is called REDIS_HZ times per second we are sure that
+ /* Make sure to process at least 1/(server.hz*10) of clients per call.
+ * Since this function is called server.hz times per second we are sure that
* in the worst case we process all the clients in 10 seconds.
* In normal conditions (a reasonable number of clients) we process
* all the clients in a shorter time. */
int numclients = listLength(server.clients);
- int iterations = numclients/(REDIS_HZ*10);
+ int iterations = numclients/(server.hz*10);
if (iterations < 50)
iterations = (numclients < 50) ? numclients : 50;
@@ -872,7 +901,52 @@ void checkOSMemory(void) {
}
-/* This is our timer interrupt, called REDIS_HZ times per second.
+/* This function handles 'background' operations we are required to do
+ * incrementally in Redis databases, such as active key expiring, resizing,
+ * rehashing. */
+void databasesCron(void) {
+ /* Expire keys by random sampling. Not required for slaves
+ * as master will synthesize DELs for us. */
+ if (server.active_expire_enabled && server.masterhost == NULL)
+ activeExpireCycle();
+
+ /* Perform hash tables rehashing if needed, but only if there are no
+ * other processes saving the DB on disk. Otherwise rehashing is bad
+ * as will cause a lot of copy-on-write of memory pages. */
+ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
+ /* We use global counters so if we stop the computation at a given
+ * DB we'll be able to start from the successive in the next
+ * cron loop iteration. */
+ static unsigned int resize_db = 0;
+ static unsigned int rehash_db = 0;
+ unsigned int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL;
+ unsigned int j;
+
+ /* Don't test more DBs than we have. */
+ if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
+
+ /* Resize */
+ for (j = 0; j < dbs_per_call; j++) {
+ tryResizeHashTables(resize_db % server.dbnum);
+ resize_db++;
+ }
+
+ /* Rehash */
+ if (server.activerehashing) {
+ for (j = 0; j < dbs_per_call; j++) {
+ int work_done = incrementallyRehash(rehash_db % server.dbnum);
+ rehash_db++;
+ if (work_done) {
+ /* If the function did some work, stop here, we'll do
+ * more at the next cron loop. */
+ break;
+ }
+ }
+ }
+ }
+}
+
+/* This is our timer interrupt, called server.hz times per second.
* Here is where we do a number of things that need to be done asynchronously.
* For instance:
*
@@ -886,7 +960,7 @@ void checkOSMemory(void) {
* - Replication reconnection.
* - Many more...
*
- * Everything directly called here will be called REDIS_HZ times per second,
+ * Everything directly called here will be called server.hz times per second,
* so in order to throttle execution of things we want to do less frequently
* a macro is used: run_with_period(milliseconds) { .... }
*/
@@ -959,17 +1033,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
}
}
- /* We don't want to resize the hash tables while a background saving
- * is in progress: the saving child is created using fork() that is
- * implemented with a copy-on-write semantic in most modern systems, so
- * if we resize the HT while there is the saving child at work actually
- * a lot of memory movements in the parent will cause a lot of pages
- * copied. */
- if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
- tryResizeHashTables();
- if (server.activerehashing) incrementallyRehash();
- }
-
/* Show information about connected clients */
if (!server.sentinel_mode) {
run_with_period(5000) {
@@ -984,6 +1047,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* We need to do a few operations on clients asynchronously. */
clientsCron();
+ /* Handle background operations on Redis databases. */
+ databasesCron();
+
/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
@@ -1020,8 +1086,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
+ /* Save if we reached the given amount of changes,
+ * the given amount of seconds, and if the latest bgsave was
+ * successful or if, in case of an error, at least
+ * REDIS_BGSAVE_RETRY_DELAY seconds already elapsed. */
if (server.dirty >= sp->changes &&
- server.unixtime-server.lastsave > sp->seconds) {
+ server.unixtime-server.lastsave > sp->seconds &&
+ (server.unixtime-server.lastbgsave_try >
+ REDIS_BGSAVE_RETRY_DELAY ||
+ server.lastbgsave_status == REDIS_OK))
+ {
redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, sp->seconds);
rdbSaveBackground(server.rdb_filename, REDIS_BGSAVE_NORMAL, -1);
@@ -1050,11 +1124,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
* cron function is called. */
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
- /* Expire a few keys per cycle, only if this is a master.
- * On slaves we wait for DEL operations synthesized by the master
- * in order to guarantee a strict consistency. */
- if (server.masterhost == NULL) activeExpireCycle();
-
/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue();
@@ -1068,7 +1137,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
}
server.cronloops++;
- return 1000/REDIS_HZ;
+ return 1000/server.hz;
}
/* This function gets called every time Redis is entering the
@@ -1176,6 +1245,7 @@ void createSharedObjects(void) {
void initServerConfig() {
getRandomHexChars(server.runid,REDIS_RUN_ID_SIZE);
+ server.hz = REDIS_DEFAULT_HZ;
server.runid[REDIS_RUN_ID_SIZE] = '\0';
server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
server.port = REDIS_SERVERPORT;
@@ -1188,6 +1258,7 @@ void initServerConfig() {
server.verbosity = REDIS_NOTICE;
server.maxidletime = REDIS_MAXIDLETIME;
server.tcpkeepalive = 0;
+ server.active_expire_enabled = 1;
server.client_max_querybuf_len = REDIS_MAX_QUERYBUF_LEN;
server.saveparams = NULL;
server.loading = 0;
@@ -1213,12 +1284,15 @@ void initServerConfig() {
server.aof_fd = -1;
server.aof_selected_db = -1; /* Make sure the first time will not match */
server.aof_flush_postponed_start = 0;
+ server.aof_rewrite_incremental_fsync = 1;
+ server.rdb_incremental_fsync = 1;
server.pidfile = zstrdup("/var/run/redis.pid");
server.rdb_filename = zstrdup("dump.rdb");
server.aof_filename = zstrdup("appendonly.aof");
server.requirepass = NULL;
server.rdb_compression = 1;
server.rdb_checksum = 1;
+ server.stop_writes_on_bgsave_err = 1;
server.activerehashing = 1;
server.maxclients = REDIS_MAX_CLIENTS;
server.bpop_blocked_clients = 0;
@@ -1256,7 +1330,7 @@ void initServerConfig() {
server.repl_syncio_timeout = REDIS_REPL_SYNCIO_TIMEOUT;
server.repl_serve_stale_data = 1;
server.repl_slave_ro = 1;
- server.repl_down_since = time(NULL);
+ server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
server.repl_disable_tcp_nodelay = 0;
server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY;
@@ -1281,6 +1355,7 @@ void initServerConfig() {
* initial configuration, since command names may be changed via
* redis.conf using the rename-command directive. */
server.commands = dictCreate(&commandTableDictType,NULL);
+ server.orig_commands = dictCreate(&commandTableDictType,NULL);
populateCommandTable();
server.delCommand = lookupCommandByCString("del");
server.multiCommand = lookupCommandByCString("multi");
@@ -1406,7 +1481,8 @@ void initServer() {
server.aof_child_pid = -1;
aofRewriteBufferReset();
server.aof_buf = sdsempty();
- server.lastsave = time(NULL);
+ server.lastsave = time(NULL); /* At startup we consider the DB saved. */
+ server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */
server.rdb_save_time_last = -1;
server.rdb_save_time_start = -1;
server.dirty = 0;
@@ -1431,8 +1507,10 @@ void initServer() {
server.ops_sec_last_sample_ops = 0;
server.unixtime = time(NULL);
server.lastbgsave_status = REDIS_OK;
- server.stop_writes_on_bgsave_err = 1;
- aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
+ if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
+ redisPanic("create time event failed");
+ exit(1);
+ }
if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.ipfd file event.");
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
@@ -1472,7 +1550,7 @@ void populateCommandTable(void) {
for (j = 0; j < numcommands; j++) {
struct redisCommand *c = redisCommandTable+j;
char *f = c->sflags;
- int retval;
+ int retval1, retval2;
while(*f != '\0') {
switch(*f) {
@@ -1493,8 +1571,11 @@ void populateCommandTable(void) {
f++;
}
- retval = dictAdd(server.commands, sdsnew(c->name), c);
- assert(retval == DICT_OK);
+ retval1 = dictAdd(server.commands, sdsnew(c->name), c);
+ /* Populate an additional dictionary that will be unaffected
+ * by rename-command statements in redis.conf. */
+ retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
+ redisAssert(retval1 == DICT_OK && retval2 == DICT_OK);
}
}
@@ -1562,6 +1643,20 @@ struct redisCommand *lookupCommandByCString(char *s) {
return cmd;
}
+/* Lookup the command in the current table, if not found also check in
+ * the original table containing the original command names unaffected by
+ * redis.conf rename-command statement.
+ *
+ * This is used by functions rewriting the argument vector such as
+ * rewriteClientCommandVector() in order to set client->cmd pointer
+ * correctly even if the command was renamed. */
+struct redisCommand *lookupCommandOrOriginal(sds name) {
+ struct redisCommand *cmd = dictFetchValue(server.commands, name);
+
+ if (!cmd) cmd = dictFetchValue(server.orig_commands,name);
+ return cmd;
+}
+
/* Propagate the specified command (in the context of the specified database id)
* to AOF and Slaves.
*
@@ -2020,6 +2115,7 @@ sds genRedisInfoString(char *section) {
"tcp_port:%d\r\n"
"uptime_in_seconds:%ld\r\n"
"uptime_in_days:%ld\r\n"
+ "hz:%d\r\n"
"lru_clock:%ld\r\n",
REDIS_VERSION,
redisGitSHA1(),
@@ -2038,6 +2134,7 @@ sds genRedisInfoString(char *section) {
server.port,
uptime,
uptime/(3600*24),
+ server.hz,
(unsigned long) server.lruclock);
}
@@ -2698,7 +2795,7 @@ void loadDataFromDisk(void) {
redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
(float)(ustime()-start)/1000000);
} else if (errno != ENOENT) {
- redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting.");
+ redisLog(REDIS_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
exit(1);
}
}
@@ -2707,7 +2804,7 @@ void loadDataFromDisk(void) {
void redisOutOfMemoryHandler(size_t allocation_size) {
redisLog(REDIS_WARNING,"Out Of Memory allocating %zu bytes!",
allocation_size);
- redisPanic("OOM");
+ redisPanic("Redis aborting for OUT OF MEMORY");
}
int main(int argc, char **argv) {
diff --git a/src/redis.h b/src/redis.h
index a969778d8..0c590f634 100755
--- a/src/redis.h
+++ b/src/redis.h
@@ -67,13 +67,16 @@
#define REDIS_ERR -1
/* Static server configuration */
-#define REDIS_HZ 100 /* Time interrupt calls/sec. */
+#define REDIS_DEFAULT_HZ 10 /* Time interrupt calls/sec. */
+#define REDIS_MIN_HZ 1
+#define REDIS_MAX_HZ 500
#define REDIS_SERVERPORT 6379 /* TCP port */
#define REDIS_MAXIDLETIME 0 /* default client timeout: infinite */
#define REDIS_DEFAULT_DBNUM 16
#define REDIS_CONFIGLINE_MAX 1024
#define REDIS_EXPIRELOOKUPS_PER_CRON 10 /* lookup 10 expires per loop */
#define REDIS_EXPIRELOOKUPS_TIME_PERC 25 /* CPU max % for keys collection */
+#define REDIS_DBCRON_DBS_PER_CALL 16
#define REDIS_MAX_WRITE_PER_EVENT (1024*64)
#define REDIS_SHARED_SELECT_CMDS 10
#define REDIS_SHARED_INTEGERS 10000
@@ -92,6 +95,7 @@
#define REDIS_REPL_PING_SLAVE_PERIOD 10
#define REDIS_RUN_ID_SIZE 40
#define REDIS_OPS_SEC_SAMPLES 16
+#define REDIS_BGSAVE_RETRY_DELAY 5 /* Wait a few secs before trying again. */
/* Protocol and I/O related defines */
#define REDIS_MAX_QUERYBUF_LEN (1024*1024*1024) /* 1GB max query buffer. */
@@ -99,6 +103,8 @@
#define REDIS_REPLY_CHUNK_BYTES (16*1024) /* 16k output buffer */
#define REDIS_INLINE_MAX_SIZE (1024*64) /* Max size of inline reads */
#define REDIS_MBULK_BIG_ARG (1024*32)
+#define REDIS_AOF_AUTOSYNC_BYTES (1024*1024*16) /* fdatasync every 16MB */
+#define REDIS_RDB_AUTOSYNC_BYTES (1024*1024*16) /* fdatasync every 16MB */
/* Hash table parameters */
#define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
@@ -302,8 +308,8 @@
/* Using the following macro you can run code inside serverCron() with the
* specified period, specified in milliseconds.
- * The actual resolution depends on REDIS_HZ. */
-#define run_with_period(_ms_) if (!(server.cronloops%((_ms_)/(1000/REDIS_HZ))))
+ * The actual resolution depends on server.hz. */
+#define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))
/* We can print the stacktrace, so our assert is defined this way: */
#define redisAssertWithInfo(_c,_o,_e) ((_e)?(void)0 : (_redisAssertWithInfo(_c,_o,#_e,__FILE__,__LINE__),_exit(1)))
@@ -510,8 +516,10 @@ struct complexity_param {
struct redisServer {
/* General */
+ int hz; /* serverCron() calls frequency in hertz */
redisDb *db;
- dict *commands; /* Command table hash table */
+ dict *commands; /* Command table */
+ dict *orig_commands; /* Command table before command renaming. */
aeEventLoop *el;
unsigned lruclock:22; /* Clock incrementing every minute, for LRU */
unsigned lruclock_padding:10;
@@ -575,6 +583,7 @@ struct redisServer {
int verbosity; /* Loglevel in redis.conf */
int maxidletime; /* Client timeout in seconds */
int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */
+ int active_expire_enabled; /* Can be disabled for testing purposes. */
size_t client_max_querybuf_len; /* Limit for client query buffer length */
int dbnum; /* Total number of configured DBs */
int daemonize; /* True if running as a daemon */
@@ -602,6 +611,7 @@ struct redisServer {
time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */
int aof_lastbgrewrite_status; /* REDIS_OK or REDIS_ERR */
unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */
+ int aof_rewrite_incremental_fsync;/* fsync incrementally while rewriting? */
/* RDB persistence */
long long dirty; /* Changes to DB from the last save */
long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */
@@ -616,10 +626,12 @@ struct redisServer {
int rdb_compression; /* Use compression in RDB? */
int rdb_checksum; /* Use RDB checksum? */
time_t lastsave; /* Unix time of last successful save */
+ time_t lastbgsave_try; /* Unix time of last attempted bgsave */
time_t rdb_save_time_last; /* Time used by last RDB save run. */
time_t rdb_save_time_start; /* Current RDB save start time. */
int lastbgsave_status; /* REDIS_OK or REDIS_ERR */
int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */
+ int rdb_incremental_fsync; /* fsync incrementally while rewriting? */
/* Propagation of commands in AOF / replication */
redisOpArray also_propagate; /* Additional command to propagate. */
int draining; /* Currently draining to slaves? */
@@ -979,6 +991,7 @@ int processCommand(redisClient *c);
void setupSignalHandlers(void);
struct redisCommand *lookupCommand(sds name);
struct redisCommand *lookupCommandByCString(char *s);
+struct redisCommand *lookupCommandOrOriginal(sds name);
void call(redisClient *c, int flags);
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags);
void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int target);
diff --git a/src/replication.c b/src/replication.c
index d8283590d..4d11cf6b3 100755
--- a/src/replication.c
+++ b/src/replication.c
@@ -697,11 +697,16 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
goto error;
}
- /* We don't care about the reply, it can be +PONG or an error since
- * the server requires AUTH. As long as it replies correctly, it's
- * fine from our point of view. */
- if (buf[0] != '-' && buf[0] != '+') {
- redisLog(REDIS_WARNING,"Unexpected reply to PING from master.");
+ /* We accept only two replies as valid, a positive +PONG reply
+ * (we just check for "+") or an authentication error.
+ * Note that older versions of Redis replied with "operation not
+ * permitted" instead of using a proper error code, so we test
+ * both. */
+ if (buf[0] != '+' &&
+ strncmp(buf,"-NOAUTH",7) != 0 &&
+ strncmp(buf,"-ERR operation not permitted",28) != 0)
+ {
+ redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf);
goto error;
} else {
redisLog(REDIS_NOTICE,
@@ -924,7 +929,7 @@ void replicationCron(void) {
* So slaves can implement an explicit timeout to masters, and will
* be able to detect a link disconnection even if the TCP connection
* will not actually go down. */
- if (!(server.cronloops % (server.repl_ping_slave_period * REDIS_HZ))) {
+ if (!(server.cronloops % (server.repl_ping_slave_period * server.hz))) {
listIter li;
listNode *ln;
diff --git a/src/rio.c b/src/rio.c
index a59038db0..2f5e78995 100644
--- a/src/rio.c
+++ b/src/rio.c
@@ -51,6 +51,8 @@
#include <unistd.h>
#include "rio.h"
#include "util.h"
+#include "config.h"
+#include "redis.h"
uint64_t crc64(uint64_t crc, const unsigned char *s, uint64_t l);
@@ -79,10 +81,10 @@ static off_t rioBufferTell(rio *r) {
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
size_t bytes_written = 0;
while (len) {
- size_t bytes_to_write = (r->io.file.fsync_interval && r->io.file.fsync_interval < len) ? r->io.file.fsync_interval : len;
+ size_t bytes_to_write = (r->io.file.autosync && r->io.file.autosync < len) ? r->io.file.autosync : len;
if (fwrite((char*)buf + bytes_written,bytes_to_write,1,r->io.file.fp) != 1)
return 0;
- if (r->io.file.fsync_interval && (r->processed_bytes + bytes_written)/r->io.file.fsync_interval < (r->processed_bytes + bytes_written + bytes_to_write)/r->io.file.fsync_interval)
+ if (r->io.file.autosync && (r->processed_bytes + bytes_written)/r->io.file.autosync < (r->processed_bytes + bytes_written + bytes_to_write)/r->io.file.autosync)
fsync(fileno(r->io.file.fp));
bytes_written += bytes_to_write;
len -= bytes_to_write;
@@ -125,12 +127,7 @@ static const rio rioFileIO = {
void rioInitWithFile(rio *r, FILE *fp) {
*r = rioFileIO;
r->io.file.fp = fp;
-}
-
-void rioInitWithFileAndFsyncInterval(rio *r, FILE *fp, size_t fsyncInterval) {
- *r = rioFileIO;
- r->io.file.fp = fp;
- r->io.file.fsync_interval = fsyncInterval;
+ r->io.file.autosync = 0;
}
void rioInitWithBuffer(rio *r, sds s) {
@@ -145,6 +142,19 @@ void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
r->cksum = crc64(r->cksum,buf,len);
}
+/* Set the file-based rio object to auto-fsync every 'bytes' file written.
+ * By default this is set to zero that means no automatic file sync is
+ * performed.
+ *
+ * This feature is useful in a few contexts since when we rely on OS write
+ * buffers sometimes the OS buffers way too much, resulting in too many
+ * disk I/O concentrated in very little time. When we fsync in an explicit
+ * way instead the I/O pressure is more distributed across time. */
+void rioSetAutoSync(rio *r, off_t bytes) {
+ redisAssert(r->read == rioFileIO.read);
+ r->io.file.autosync = bytes;
+}
+
/* ------------------------------ Higher level interface ---------------------------
* The following higher level functions use lower level rio.c functions to help
* generating the Redis protocol for the Append Only File. */
diff --git a/src/rio.h b/src/rio.h
index 3381d816c..4a930f92a 100644
--- a/src/rio.h
+++ b/src/rio.h
@@ -43,10 +43,11 @@ struct _rio {
size_t (*read)(struct _rio *, void *buf, size_t len);
size_t (*write)(struct _rio *, const void *buf, size_t len);
off_t (*tell)(struct _rio *);
- /* The update_cksum method if not NULL is used to compute the checksum of all the
- * data that was read or written so far. The method should be designed so that
- * can be called with the current checksum, and the buf and len fields pointing
- * to the new block of data to add to the checksum computation. */
+ /* The update_cksum method if not NULL is used to compute the checksum of
+ * all the data that was read or written so far. The method should be
+ * designed so that can be called with the current checksum, and the buf
+ * and len fields pointing to the new block of data to add to the checksum
+ * computation. */
void (*update_cksum)(struct _rio *, const void *buf, size_t len);
/* The current checksum */
@@ -66,7 +67,7 @@ struct _rio {
} buffer;
struct {
FILE *fp;
- size_t fsync_interval;
+ off_t autosync; /* fsync after 'autosync' bytes written. */
} file;
} io;
};
@@ -108,7 +109,6 @@ static inline off_t rioTell(rio *r) {
}
void rioInitWithFile(rio *r, FILE *fp);
-void rioInitWithFileAndFsyncInterval(rio *r, FILE *fp, size_t fsyncInterval);
void rioInitWithBuffer(rio *r, sds s);
size_t rioWriteBulkCount(rio *r, char prefix, int count);
@@ -117,5 +117,6 @@ size_t rioWriteBulkLongLong(rio *r, long long l);
size_t rioWriteBulkDouble(rio *r, double d);
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len);
+void rioSetAutoSync(rio *r, off_t bytes);
#endif
diff --git a/src/scripting.c b/src/scripting.c
index ee3d0405b..ff8c83cdc 100755
--- a/src/scripting.c
+++ b/src/scripting.c
@@ -258,6 +258,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
"Write commands not allowed after non deterministic commands");
goto cleanup;
} else if (server.masterhost && server.repl_slave_ro &&
+ !server.loading &&
!(server.lua_caller->flags & REDIS_MASTER))
{
luaPushError(lua, shared.roslaveerr->ptr);
diff --git a/src/sds.c b/src/sds.c
index 85858a4f0..4cf700b9a 100644
--- a/src/sds.c
+++ b/src/sds.c
@@ -475,11 +475,18 @@ int hex_digit_to_int(char c) {
* foo bar "newline are supported\n" and "\xff\x00otherstuff"
*
* The number of arguments is stored into *argc, and an array
- * of sds is returned. The caller should sdsfree() all the returned
- * strings and finally zfree() the array itself.
+ * of sds is returned.
+ *
+ * The caller should free the resulting array of sds strings with
+ * sdsfreesplitres().
*
* Note that sdscatrepr() is able to convert back a string into
* a quoted string in the same format sdssplitargs() is able to parse.
+ *
+ * The function returns the allocated tokens on success, even when the
+ * input string is empty, or NULL if the input contains unbalanced
+ * quotes or closed quotes followed by non space characters
+ * as in: "foo"bar or "foo'
*/
sds *sdssplitargs(const char *line, int *argc) {
const char *p = line;
@@ -576,6 +583,8 @@ sds *sdssplitargs(const char *line, int *argc) {
(*argc)++;
current = NULL;
} else {
+ /* Even on empty input string return something not NULL. */
+ if (vector == NULL) vector = zmalloc(sizeof(void*));
return vector;
}
}
@@ -585,16 +594,10 @@ err:
sdsfree(vector[*argc]);
zfree(vector);
if (current) sdsfree(current);
+ *argc = 0;
return NULL;
}
-void sdssplitargs_free(sds *argv, int argc) {
- int j;
-
- for (j = 0 ;j < argc; j++) sdsfree(argv[j]);
- zfree(argv);
-}
-
/* Modify the string substituting all the occurrences of the set of
* characters specified in the 'from' string to the corresponding character
* in the 'to' array.
diff --git a/src/sds.h b/src/sds.h
index e8d306503..c5a4f30a9 100644
--- a/src/sds.h
+++ b/src/sds.h
@@ -88,7 +88,6 @@ void sdstoupper(sds s);
sds sdsfromlonglong(long long value);
sds sdscatrepr(sds s, const char *p, size_t len);
sds *sdssplitargs(const char *line, int *argc);
-void sdssplitargs_free(sds *argv, int argc);
sds sdsmapchars(sds s, const char *from, const char *to, size_t setlen);
/* Low level functions exposed to the user API */
diff --git a/src/sentinel.c b/src/sentinel.c
index fc857344c..ed0978694 100644
--- a/src/sentinel.c
+++ b/src/sentinel.c
@@ -74,6 +74,8 @@ typedef struct sentinelAddr {
#define SRI_RECONF_DONE (1<<13) /* Slave synchronized with new master. */
#define SRI_FORCE_FAILOVER (1<<14) /* Force failover with master up. */
#define SRI_SCRIPT_KILL_SENT (1<<15) /* SCRIPT KILL already sent on -BUSY */
+#define SRI_DEMOTE (1<<16) /* If the instance claims to be a master, demote
+ it into a slave sending SLAVEOF. */
#define SENTINEL_INFO_PERIOD 10000
#define SENTINEL_PING_PERIOD 1000
@@ -403,7 +405,7 @@ void initSentinel(void) {
/* Initialize various data structures. */
sentinel.masters = dictCreate(&instancesDictType,NULL);
sentinel.tilt = 0;
- sentinel.tilt_start_time = mstime();
+ sentinel.tilt_start_time = 0;
sentinel.previous_time = mstime();
sentinel.running_scripts = 0;
sentinel.scripts_queue = listCreate();
@@ -1130,7 +1132,6 @@ int sentinelResetMastersByPattern(char *pattern, int flags) {
* TODO: make this reset so that original sentinels are re-added with
* same ip / port / runid.
*/
-
int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
sentinelAddr *oldaddr, *newaddr;
@@ -1139,12 +1140,26 @@ int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip,
sentinelResetMaster(master,SENTINEL_NO_FLAGS);
oldaddr = master->addr;
master->addr = newaddr;
+ master->o_down_since_time = 0;
+ master->s_down_since_time = 0;
+
/* Release the old address at the end so we are safe even if the function
* gets the master->addr->ip and master->addr->port as arguments. */
releaseSentinelAddr(oldaddr);
return REDIS_OK;
}
+/* Return non-zero if there was no SDOWN or ODOWN error associated to this
+ * instance in the latest 'ms' milliseconds. */
+int sentinelRedisInstanceNoDownFor(sentinelRedisInstance *ri, mstime_t ms) {
+ mstime_t most_recent;
+
+ most_recent = ri->s_down_since_time;
+ if (ri->o_down_since_time > most_recent)
+ most_recent = ri->o_down_since_time;
+ return most_recent == 0 || (mstime() - most_recent) > ms;
+}
+
/* ============================ Config handling ============================= */
char *sentinelHandleConfiguration(char **argv, int argc) {
sentinelRedisInstance *ri;
@@ -1382,26 +1397,39 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
}
}
- /* slave0:<ip>,<port>,<state> */
+ /* old versions: slave0:<ip>,<port>,<state>
+ * new versions: slave0:ip=127.0.0.1,port=9999,... */
if ((ri->flags & SRI_MASTER) &&
sdslen(l) >= 7 &&
!memcmp(l,"slave",5) && isdigit(l[5]))
{
char *ip, *port, *end;
- ip = strchr(l,':'); if (!ip) continue;
- ip++; /* Now ip points to start of ip address. */
- port = strchr(ip,','); if (!port) continue;
- *port = '\0'; /* nul term for easy access. */
- port++; /* Now port points to start of port number. */
- end = strchr(port,','); if (!end) continue;
- *end = '\0'; /* nul term for easy access. */
+ if (strstr(l,"ip=") == NULL) {
+ /* Old format. */
+ ip = strchr(l,':'); if (!ip) continue;
+ ip++; /* Now ip points to start of ip address. */
+ port = strchr(ip,','); if (!port) continue;
+ *port = '\0'; /* nul term for easy access. */
+ port++; /* Now port points to start of port number. */
+ end = strchr(port,','); if (!end) continue;
+ *end = '\0'; /* nul term for easy access. */
+ } else {
+ /* New format. */
+ ip = strstr(l,"ip="); if (!ip) continue;
+ ip += 3; /* Now ip points to start of ip address. */
+ port = strstr(l,"port="); if (!port) continue;
+ port += 5; /* Now port points to start of port number. */
+ /* Nul term both fields for easy access. */
+ end = strchr(ip,','); if (end) *end = '\0';
+ end = strchr(port,','); if (end) *end = '\0';
+ }
/* Check if we already have this slave into our table,
* otherwise add it. */
if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
- atoi(port), ri->quorum,ri)) != NULL)
+ atoi(port), ri->quorum, ri)) != NULL)
{
sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@");
}
@@ -1446,44 +1474,80 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
ri->info_refresh = mstime();
sdsfreesplitres(lines,numlines);
- /* ---------------------------- Acting half ----------------------------- */
- if (sentinel.tilt) return;
+ /* ---------------------------- Acting half -----------------------------
+ * Some things will not happen if sentinel.tilt is true, but some will
+ * still be processed. */
- /* Act if a master turned into a slave. */
- if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
- if ((first_runid || runid_changed) && ri->slave_master_host) {
- /* If it is the first time we receive INFO from it, but it's
- * a slave while it was configured as a master, we want to monitor
- * its master instead. */
- sentinelEvent(REDIS_WARNING,"+redirect-to-master",ri,
- "%s %s %d %s %d",
- ri->name, ri->addr->ip, ri->addr->port,
- ri->slave_master_host, ri->slave_master_port);
- sentinelResetMasterAndChangeAddress(ri,ri->slave_master_host,
- ri->slave_master_port);
- return;
- }
+ /* When what we believe is our master, turned into a slave, the wiser
+ * thing we can do is to follow the events and redirect to the new
+ * master, always. */
+ if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE && ri->slave_master_host)
+ {
+ sentinelEvent(REDIS_WARNING,"+redirect-to-master",ri,
+ "%s %s %d %s %d",
+ ri->name, ri->addr->ip, ri->addr->port,
+ ri->slave_master_host, ri->slave_master_port);
+ sentinelResetMasterAndChangeAddress(ri,ri->slave_master_host,
+ ri->slave_master_port);
+ return; /* Don't process anything after this event. */
}
- /* Act if a slave turned into a master. */
+ /* Handle slave -> master role switch. */
if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
- if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
- (runid_changed || first_runid))
+ if (!sentinel.tilt && ri->flags & SRI_DEMOTE) {
+ /* If this sentinel was partitioned from the slave's master,
+ * or tilted recently, wait some time before to act,
+ * so that DOWN and roles INFO will be refreshed. */
+ mstime_t wait_time = SENTINEL_INFO_PERIOD*2 +
+ ri->master->down_after_period*2;
+
+ if (!sentinelRedisInstanceNoDownFor(ri->master,wait_time) ||
+ (mstime()-sentinel.tilt_start_time) < wait_time)
+ return;
+
+ /* Old master returned back? Turn it into a slave ASAP if
+ * we can reach what we believe is the new master now, and
+ * have a recent role information for it.
+ *
+ * Note: we'll clear the DEMOTE flag only when we have the
+ * acknowledge that it's a slave again. */
+ if (ri->master->flags & SRI_MASTER &&
+ (ri->master->flags & (SRI_S_DOWN|SRI_O_DOWN)) == 0 &&
+ (mstime() - ri->master->info_refresh) < SENTINEL_INFO_PERIOD*2)
+ {
+ int retval;
+ retval = redisAsyncCommand(ri->cc,
+ sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %d",
+ ri->master->addr->ip,
+ ri->master->addr->port);
+ if (retval == REDIS_OK)
+ sentinelEvent(REDIS_NOTICE,"+demote-old-slave",ri,"%@");
+ } else {
+ /* Otherwise if there are not the conditions to demote, we
+ * no longer trust the DEMOTE flag and remove it. */
+ ri->flags &= ~SRI_DEMOTE;
+ sentinelEvent(REDIS_NOTICE,"-demote-flag-cleared",ri,"%@");
+ }
+ } else if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
+ (runid_changed || first_runid))
{
/* If a slave turned into master but:
*
* 1) Failover not in progress.
- * 2) RunID hs changed, or its the first time we see an INFO output.
+ * 2) RunID has changed or its the first time we see an INFO output.
*
* We assume this is a reboot with a wrong configuration.
- * Log the event and remove the slave. */
+ * Log the event and remove the slave. Note that this is processed
+ * in tilt mode as well, otherwise we lose the information that the
+ * runid changed (reboot?) and when the tilt mode ends a fake
+ * failover will be detected. */
int retval;
sentinelEvent(REDIS_WARNING,"-slave-restart-as-master",ri,"%@ #removing it from the attached slaves");
retval = dictDelete(ri->master->slaves,ri->name);
redisAssert(retval == REDIS_OK);
return;
- } else if (ri->flags & SRI_PROMOTED) {
+ } else if (!sentinel.tilt && ri->flags & SRI_PROMOTED) {
/* If this is a promoted slave we can change state to the
* failover state machine. */
if ((ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
@@ -1499,11 +1563,12 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
"start",ri->master->addr,ri->addr);
}
- } else if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) ||
- ((ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
- (ri->master->flags & SRI_I_AM_THE_LEADER) &&
- ri->master->failover_state ==
- SENTINEL_FAILOVER_STATE_WAIT_START))
+ } else if (!sentinel.tilt && (
+ !(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) ||
+ ((ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
+ (ri->master->flags & SRI_I_AM_THE_LEADER) &&
+ ri->master->failover_state ==
+ SENTINEL_FAILOVER_STATE_WAIT_START)))
{
/* No failover in progress? Then it is the start of a failover
* and we are an observer.
@@ -1523,6 +1588,7 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
ri->master->failover_state_change_time = mstime();
ri->master->promoted_slave = ri;
ri->flags |= SRI_PROMOTED;
+ ri->flags &= ~SRI_DEMOTE;
sentinelCallClientReconfScript(ri->master,SENTINEL_OBSERVER,
"start", ri->master->addr,ri->addr);
/* We are an observer, so we can only assume that the leader
@@ -1534,6 +1600,10 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
}
}
+ /* None of the following conditions are processed when in tilt mode, so
+ * return asap. */
+ if (sentinel.tilt) return;
+
/* Detect if the slave that is in the process of being reconfigured
* changed state. */
if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
@@ -1564,6 +1634,13 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
ri->failover_state_change_time = mstime();
}
}
+
+ /* Detect if the old master was demoted as slave and generate the
+ * +slave event. */
+ if (role == SRI_SLAVE && ri->flags & SRI_DEMOTE) {
+ sentinelEvent(REDIS_NOTICE,"+slave",ri,"%@");
+ ri->flags &= ~SRI_DEMOTE;
+ }
}
void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
@@ -1835,6 +1912,7 @@ void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");
+ if (ri->flags & SRI_DEMOTE) flags = sdscat(flags,"demote,");
if (sdslen(flags) != 0) flags = sdsrange(flags,0,-2); /* remove last "," */
addReplyBulkCString(c,flags);
@@ -2592,6 +2670,7 @@ sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
mstime_t info_validity_time = mstime()-SENTINEL_INFO_VALIDITY_TIME;
if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue;
+ if (slave->flags & SRI_DEMOTE) continue; /* Old master not yet ready. */
if (slave->last_avail_time < info_validity_time) continue;
if (slave->slave_priority == 0) continue;
@@ -2837,12 +2916,28 @@ void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
sentinelRedisInstance *ref = master->promoted_slave ?
master->promoted_slave : master;
+ sds old_master_ip;
+ int old_master_port;
sentinelEvent(REDIS_WARNING,"+switch-master",master,"%s %s %d %s %d",
master->name, master->addr->ip, master->addr->port,
ref->addr->ip, ref->addr->port);
+ old_master_ip = sdsdup(master->addr->ip);
+ old_master_port = master->addr->port;
sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
+ /* If this is a real switch, that is, we have master->promoted_slave not
+ * NULL, then we want to add the old master as a slave of the new master,
+ * but flagging it with SRI_DEMOTE so that we know we'll need to send
+ * SLAVEOF once the old master is reachable again. */
+ if (master != ref) {
+ /* Add the new slave, but don't generate a Sentinel event as it will
+ * happen later when finally the instance will claim to be a slave
+ * in the INFO output. */
+ createSentinelRedisInstance(NULL,SRI_SLAVE|SRI_DEMOTE,
+ old_master_ip, old_master_port, master->quorum, master);
+ }
+ sdsfree(old_master_ip);
}
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
diff --git a/src/t_string.c b/src/t_string.c
index 0a7f22583..389ff86a8 100644
--- a/src/t_string.c
+++ b/src/t_string.c
@@ -42,7 +42,27 @@ static int checkStringLength(redisClient *c, long long size) {
return REDIS_OK;
}
-void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expire, int unit) {
+/* The setGenericCommand() function implements the SET operation with different
+ * options and variants. This function is called in order to implement the
+ * following commands: SET, SETEX, PSETEX, SETNX.
+ *
+ * 'flags' changes the behavior of the command (NX or XX, see belove).
+ *
+ * 'expire' represents an expire to set in form of a Redis object as passed
+ * by the user. It is interpreted according to the specified 'unit'.
+ *
+ * 'ok_reply' and 'abort_reply' is what the function will reply to the client
+ * if the operation is performed, or when it is not because of NX or
+ * XX flags.
+ *
+ * If ok_reply is NULL "+OK" is used.
+ * If abort_reply is NULL, "$-1" is used. */
+
+#define REDIS_SET_NO_FLAGS 0
+#define REDIS_SET_NX (1<<0) /* Set if key not exists. */
+#define REDIS_SET_XX (1<<1) /* Set if key exists. */
+
+void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
long long milliseconds = 0; /* initialized to avoid any harmness warning */
if (expire) {
@@ -55,34 +75,68 @@ void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expir
if (unit == UNIT_SECONDS) milliseconds *= 1000;
}
- if (nx && lookupKeyWrite(c->db,key) != NULL) {
- addReply(c,shared.czero);
+ if ((flags & REDIS_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
+ (flags & REDIS_SET_XX && lookupKeyWrite(c->db,key) == NULL))
+ {
+ addReply(c, abort_reply ? abort_reply : shared.nullbulk);
return;
}
setKey(c->db,key,val);
server.dirty++;
if (expire) setExpire(c->db,key,mstime()+milliseconds);
- addReply(c, nx ? shared.cone : shared.ok);
+ addReply(c, ok_reply ? ok_reply : shared.ok);
}
+/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */
void setCommand(redisClient *c) {
+ int j;
+ robj *expire = NULL;
+ int unit = UNIT_SECONDS;
+ int flags = REDIS_SET_NO_FLAGS;
+
+ for (j = 3; j < c->argc; j++) {
+ char *a = c->argv[j]->ptr;
+ robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];
+
+ if ((a[0] == 'n' || a[0] == 'N') &&
+ (a[1] == 'x' || a[1] == 'X') && a[2] == '\0') {
+ flags |= REDIS_SET_NX;
+ } else if ((a[0] == 'x' || a[0] == 'X') &&
+ (a[1] == 'x' || a[1] == 'X') && a[2] == '\0') {
+ flags |= REDIS_SET_XX;
+ } else if ((a[0] == 'e' || a[0] == 'E') &&
+ (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next) {
+ unit = UNIT_SECONDS;
+ expire = next;
+ j++;
+ } else if ((a[0] == 'p' || a[0] == 'P') &&
+ (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next) {
+ unit = UNIT_MILLISECONDS;
+ expire = next;
+ j++;
+ } else {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+ }
+
c->argv[2] = tryObjectEncoding(c->argv[2]);
- setGenericCommand(c,0,c->argv[1],c->argv[2],NULL,0);
+ setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}
void setnxCommand(redisClient *c) {
c->argv[2] = tryObjectEncoding(c->argv[2]);
- setGenericCommand(c,1,c->argv[1],c->argv[2],NULL,0);
+ setGenericCommand(c,REDIS_SET_NX,c->argv[1],c->argv[2],NULL,0,shared.cone,shared.czero);
}
void setexCommand(redisClient *c) {
c->argv[3] = tryObjectEncoding(c->argv[3]);
- setGenericCommand(c,0,c->argv[1],c->argv[3],c->argv[2],UNIT_SECONDS);
+ setGenericCommand(c,REDIS_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_SECONDS,NULL,NULL);
}
void psetexCommand(redisClient *c) {
c->argv[3] = tryObjectEncoding(c->argv[3]);
- setGenericCommand(c,0,c->argv[1],c->argv[3],c->argv[2],UNIT_MILLISECONDS);
+ setGenericCommand(c,REDIS_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_MILLISECONDS,NULL,NULL);
}
int getGenericCommand(redisClient *c) {
diff --git a/src/version.h b/src/version.h
index 765dc9100..2872b9fe7 100644
--- a/src/version.h
+++ b/src/version.h
@@ -1 +1 @@
-#define REDIS_VERSION "2.6.10"
+#define REDIS_VERSION "2.6.14"