diff options
Diffstat (limited to 'src')
-rwxr-xr-x | src/aof.c | 9 | ||||
-rw-r--r-- | src/bitops.c | 4 | ||||
-rwxr-xr-x | src/config.c | 54 | ||||
-rw-r--r-- | src/db.c | 6 | ||||
-rw-r--r-- | src/debug.c | 20 | ||||
-rw-r--r-- | src/help.h | 16 | ||||
-rwxr-xr-x | src/mkreleasehdr.sh | 2 | ||||
-rwxr-xr-x | src/multi.c | 41 | ||||
-rw-r--r-- | src/networking.c | 4 | ||||
-rw-r--r-- | src/pubsub.c | 1 | ||||
-rwxr-xr-x | src/rdb.c | 12 | ||||
-rw-r--r-- | src/redis-cli.c | 266 | ||||
-rwxr-xr-x | src/redis.c | 239 | ||||
-rwxr-xr-x | src/redis.h | 21 | ||||
-rwxr-xr-x | src/replication.c | 17 | ||||
-rw-r--r-- | src/rio.c | 26 | ||||
-rw-r--r-- | src/rio.h | 13 | ||||
-rwxr-xr-x | src/scripting.c | 1 | ||||
-rw-r--r-- | src/sds.c | 21 | ||||
-rw-r--r-- | src/sds.h | 1 | ||||
-rw-r--r-- | src/sentinel.c | 171 | ||||
-rw-r--r-- | src/t_string.c | 70 | ||||
-rw-r--r-- | src/version.h | 2 |
23 files changed, 789 insertions, 228 deletions
@@ -863,7 +863,9 @@ int rewriteAppendOnlyFile(char *filename) { return REDIS_ERR; } - rioInitWithFileAndFsyncInterval(&aof,fp, 1024*1024*16); + rioInitWithFile(&aof,fp); + if (server.aof_rewrite_incremental_fsync) + rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES); for (j = 0; j < server.dbnum; j++) { char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; redisDb *db = server.db+j; @@ -891,6 +893,9 @@ int rewriteAppendOnlyFile(char *filename) { expiretime = getExpire(db,&key); + /* If this key is already expired skip it */ + if (expiretime != -1 && expiretime < now) continue; + /* Save the key and associated value */ if (o->type == REDIS_STRING) { /* Emit a SET command */ @@ -913,8 +918,6 @@ int rewriteAppendOnlyFile(char *filename) { /* Save the expire time */ if (expiretime != -1) { char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; - /* If this key is already expired skip it */ - if (expiretime < now) continue; if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; if (rioWriteBulkObject(&aof,&key) == 0) goto werr; if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr; diff --git a/src/bitops.c b/src/bitops.c index 75f3317a9..926d1eacd 100644 --- a/src/bitops.c +++ b/src/bitops.c @@ -58,8 +58,8 @@ static int getBitOffsetFromArgument(redisClient *c, robj *o, size_t *offset) { /* Count number of bits set in the binary array pointed by 's' and long * 'count' bytes. The implementation of this function is required to * work with a input string length up to 512 MB. */ -long popcount(void *s, long count) { - long bits = 0; +size_t popcount(void *s, long count) { + size_t bits = 0; unsigned char *p; uint32_t *p4 = s; static const unsigned char bitsinbyte[256] = {0,1,1,2,1,2,2,3,1,2,2,3,2,3,3,4,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,4,5,5,6,5,6,6,7,5,6,6,7,6,7,7,8}; diff --git a/src/config.c b/src/config.c index dbea5513d..711162cfd 100755 --- a/src/config.c +++ b/src/config.c @@ -68,11 +68,21 @@ void loadServerConfigFromString(char *config) { linenum = i+1; lines[i] = sdstrim(lines[i]," \t\r\n"); - /* Skip comments and blank lines*/ + /* Skip comments and blank lines */ if (lines[i][0] == '#' || lines[i][0] == '\0') continue; /* Split into arguments */ argv = sdssplitargs(lines[i],&argc); + if (argv == NULL) { + err = "Unbalanced quotes in configuration line"; + goto loaderr; + } + + /* Skip this line if the resulting command vector is empty. */ + if (argc == 0) { + sdsfreesplitres(argv,argc); + return; + } sdstolower(argv[0]); /* Execute config directives */ @@ -281,6 +291,10 @@ void loadServerConfigFromString(char *config) { if ((server.conditional_sync = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcasecmp(argv[0],"hz") && argc == 2) { + server.hz = atoi(argv[1]); + if (server.hz < REDIS_MIN_HZ) server.hz = REDIS_MIN_HZ; + if (server.hz > REDIS_MAX_HZ) server.hz = REDIS_MAX_HZ; } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) { int yes; @@ -319,6 +333,18 @@ void loadServerConfigFromString(char *config) { argc == 2) { server.aof_rewrite_min_size = memtoll(argv[1],NULL); + } else if (!strcasecmp(argv[0],"aof-rewrite-incremental-fsync") && + argc == 2) + { + if ((server.aof_rewrite_incremental_fsync = yesnotoi(argv[1])) == -1) { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } + } else if (!strcasecmp(argv[0],"rdb-incremental-fsync") && + argc == 2) + { + if ((server.rdb_incremental_fsync = yesnotoi(argv[1])) == -1) { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) { if (strlen(argv[1]) > REDIS_AUTHPASS_MAX_LEN) { err = "Password is longer than REDIS_AUTHPASS_MAX_LEN"; @@ -495,7 +521,7 @@ void configSetCommand(redisClient *c) { server.requirepass = ((char*)o->ptr)[0] ? zstrdup(o->ptr) : NULL; } else if (!strcasecmp(c->argv[2]->ptr,"masterauth")) { zfree(server.masterauth); - server.masterauth = zstrdup(o->ptr); + server.masterauth = ((char*)o->ptr)[0] ? zstrdup(o->ptr) : NULL; } else if (!strcasecmp(c->argv[2]->ptr,"maxmemory")) { if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt; @@ -506,6 +532,12 @@ void configSetCommand(redisClient *c) { } freeMemoryIfNeeded(server.maxmemory); } + } else if (!strcasecmp(c->argv[2]->ptr,"hz")) { + if (getLongLongFromObject(o,&ll) == REDIS_ERR || + ll < 0) goto badfmt; + server.hz = (int) ll; + if (server.hz < REDIS_MIN_HZ) server.hz = REDIS_MIN_HZ; + if (server.hz > REDIS_MAX_HZ) server.hz = REDIS_MAX_HZ; } else if (!strcasecmp(c->argv[2]->ptr,"maxmemory-policy")) { if (!strcasecmp(o->ptr,"volatile-lru")) { server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU; @@ -568,6 +600,16 @@ void configSetCommand(redisClient *c) { } else if (!strcasecmp(c->argv[2]->ptr,"auto-aof-rewrite-min-size")) { if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt; server.aof_rewrite_min_size = ll; + } else if (!strcasecmp(c->argv[2]->ptr,"aof-rewrite-incremental-fsync")) { + int yn = yesnotoi(o->ptr); + + if (yn == -1) goto badfmt; + server.aof_rewrite_incremental_fsync = yn; + } else if (!strcasecmp(c->argv[2]->ptr,"rdb-incremental-fsync")) { + int yn = yesnotoi(o->ptr); + + if (yn == -1) goto badfmt; + server.rdb_incremental_fsync = yn; } else if (!strcasecmp(c->argv[2]->ptr,"save")) { int vlen, j; sds *v = sdssplitlen(o->ptr,sdslen(o->ptr)," ",1,&vlen); @@ -792,7 +834,7 @@ void configGetCommand(redisClient *c) { config_get_string_field("dbfilename",server.rdb_filename); config_get_string_field("syncdbfilename",server.rdb_syncfilename); config_get_string_field("requirepass",server.requirepass); - config_get_string_field("masterauth",server.requirepass); + config_get_string_field("masterauth",server.masterauth); config_get_string_field("bind",server.bindaddr); config_get_string_field("unixsocket",server.unixsocket); config_get_string_field("logfile",server.logfile); @@ -833,6 +875,7 @@ void configGetCommand(redisClient *c) { config_get_numerical_field("maxclients",server.maxclients); config_get_numerical_field("watchdog-period",server.watchdog_period); config_get_numerical_field("slave-priority",server.slave_priority); + config_get_numerical_field("hz",server.hz); /* Bool (yes/no) values */ config_get_bool_field("no-appendfsync-on-rewrite", @@ -849,6 +892,11 @@ void configGetCommand(redisClient *c) { config_get_bool_field("activerehashing", server.activerehashing); config_get_bool_field("repl-disable-tcp-nodelay", server.repl_disable_tcp_nodelay); + config_get_bool_field("aof-rewrite-incremental-fsync", + server.aof_rewrite_incremental_fsync); + config_get_bool_field("rdb-incremental-fsync", + server.rdb_incremental_fsync); + /* Everything we can't handle with macros follows. */ @@ -551,7 +551,6 @@ int expireIfNeeded(redisDb *db, robj *key) { * unit is either UNIT_SECONDS or UNIT_MILLISECONDS, and is only used for * the argv[2] parameter. The basetime is always specified in milliseconds. */ void expireGenericCommand(redisClient *c, long long basetime, int unit) { - dictEntry *de; robj *key = c->argv[1], *param = c->argv[2]; long long when; /* unix time in milliseconds when the key will expire. */ @@ -561,11 +560,12 @@ void expireGenericCommand(redisClient *c, long long basetime, int unit) { if (unit == UNIT_SECONDS) when *= 1000; when += basetime; - de = dictFind(c->db->dict,key->ptr); - if (de == NULL) { + /* No key, return zero. */ + if (lookupKeyRead(c->db,key) == NULL) { addReply(c,shared.czero); return; } + /* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past * should never be executed as a DEL when load the AOF or in the context * of a slave instance. diff --git a/src/debug.c b/src/debug.c index a74358383..e1d42cc65 100644 --- a/src/debug.c +++ b/src/debug.c @@ -330,9 +330,14 @@ void debugCommand(redisClient *c) { usleep(utime); addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"set-active-expire") && + c->argc == 3) + { + server.active_expire_enabled = atoi(c->argv[2]->ptr); + addReply(c,shared.ok); } else { - addReplyError(c, - "Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|SWAPIN <key>|SWAPOUT <key>|RELOAD]"); + addReplyErrorFormat(c, "Unknown DEBUG subcommand or wrong number of arguments for '%s'", + (char*)c->argv[1]->ptr); } } @@ -382,9 +387,12 @@ void redisLogObjectDebugInfo(robj *o) { redisLog(REDIS_WARNING,"Object encoding: %d", o->encoding); redisLog(REDIS_WARNING,"Object refcount: %d", o->refcount); if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_RAW) { - redisLog(REDIS_WARNING,"Object raw string len: %d", sdslen(o->ptr)); - if (sdslen(o->ptr) < 4096) - redisLog(REDIS_WARNING,"Object raw string content: \"%s\"", (char*)o->ptr); + redisLog(REDIS_WARNING,"Object raw string len: %zu", sdslen(o->ptr)); + if (sdslen(o->ptr) < 4096) { + sds repr = sdscatrepr(sdsempty(),o->ptr,sdslen(o->ptr)); + redisLog(REDIS_WARNING,"Object raw string content: %s", repr); + sdsfree(repr); + } } else if (o->type == REDIS_LIST) { redisLog(REDIS_WARNING,"List length: %d", (int) listTypeLength(o)); } else if (o->type == REDIS_SET) { @@ -794,7 +802,7 @@ void enableWatchdog(int period) { /* If the configured period is smaller than twice the timer period, it is * too short for the software watchdog to work reliably. Fix it now * if needed. */ - min_period = (1000/REDIS_HZ)*2; + min_period = (1000/server.hz)*2; if (period < min_period) period = min_period; watchdogScheduleSignal(period); /* Adjust the current timer. */ server.watchdog_period = period; diff --git a/src/help.h b/src/help.h index 1378b8b85..60a5726bf 100644 --- a/src/help.h +++ b/src/help.h @@ -69,6 +69,11 @@ struct commandHelp { "Pop a value from a list, push it to another list and return it; or block until one is available", 2, "2.2.0" }, + { "CLIENT GETNAME", + "-", + "Get the current connection name", + 9, + "2.6.9" }, { "CLIENT KILL", "ip:port", "Kill the connection of a client", @@ -79,6 +84,11 @@ struct commandHelp { "Get the list of client connections", 9, "2.4.0" }, + { "CLIENT SETNAME", + "connection-name", + "Set the current connection name", + 9, + "2.6.9" }, { "CONFIG GET", "parameter", "Get the value of a configuration parameter", @@ -280,7 +290,7 @@ struct commandHelp { 1, "2.6.0" }, { "INFO", - "-", + "[section]", "Get information and statistics about the server", 9, "1.0.0" }, @@ -525,7 +535,7 @@ struct commandHelp { 8, "1.0.0" }, { "SET", - "key value", + "key value [EX seconds] [PX milliseconds] [NX|XX]", "Set the string value of a key", 1, "1.0.0" }, @@ -665,7 +675,7 @@ struct commandHelp { 7, "2.2.0" }, { "ZADD", - "key score member [score] [member]", + "key score member [score member ...]", "Add one or more members to a sorted set, or update its score if it already exists", 4, "1.2.0" }, diff --git a/src/mkreleasehdr.sh b/src/mkreleasehdr.sh index dbf948c8a..386bd24d1 100755 --- a/src/mkreleasehdr.sh +++ b/src/mkreleasehdr.sh @@ -1,6 +1,6 @@ #!/bin/sh GIT_SHA1=`(git show-ref --head --hash=8 2> /dev/null || echo 00000000) | head -n1` -GIT_DIRTY=`git diff 2> /dev/null | wc -l` +GIT_DIRTY=`git diff --no-ext-diff 2> /dev/null | wc -l` test -f release.h || touch release.h (cat release.h | grep SHA1 | grep $GIT_SHA1) && \ (cat release.h | grep DIRTY | grep $GIT_DIRTY) && exit 0 # Already up-to-date diff --git a/src/multi.c b/src/multi.c index 2b7e6b552..cd07971e2 100755 --- a/src/multi.c +++ b/src/multi.c @@ -103,13 +103,11 @@ void discardCommand(redisClient *c) { /* Send a MULTI command to all the slaves and AOF file. Check the execCommand * implementation for more information. */ -void execCommandReplicateMulti(redisClient *c) { +void execCommandPropagateMulti(redisClient *c) { robj *multistring = createStringObject("MULTI",5); - if (server.aof_state != REDIS_AOF_OFF) - feedAppendOnlyFile(server.multiCommand,c->db->id,&multistring,1); - if (listLength(server.slaves)) - replicationFeedSlaves(server.slaves,c->db->id,&multistring,1); + propagate(server.multiCommand,c->db->id,&multistring,1, + REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL); decrRefCount(multistring); } @@ -118,6 +116,7 @@ void execCommand(redisClient *c) { robj **orig_argv; int orig_argc; struct redisCommand *orig_cmd; + int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */ if (!(c->flags & REDIS_MULTI)) { addReplyError(c,"EXEC without MULTI"); @@ -133,19 +132,10 @@ void execCommand(redisClient *c) { if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) { addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr : shared.nullmultibulk); - freeClientMultiState(c); - initClientMultiState(c); - c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC); - unwatchAllKeys(c); + discardTransaction(c); goto handle_monitor; } - /* Replicate a MULTI request now that we are sure the block is executed. - * This way we'll deliver the MULTI/..../EXEC block as a whole and - * both the AOF and the replication link will have the same consistency - * and atomicity guarantees. */ - execCommandReplicateMulti(c); - /* Exec all the queued commands */ unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ orig_argv = c->argv; @@ -156,6 +146,16 @@ void execCommand(redisClient *c) { c->argc = c->mstate.commands[j].argc; c->argv = c->mstate.commands[j].argv; c->cmd = c->mstate.commands[j].cmd; + + /* Propagate a MULTI request once we encounter the first write op. + * This way we'll deliver the MULTI/..../EXEC block as a whole and + * both the AOF and the replication link will have the same consistency + * and atomicity guarantees. */ + if (!must_propagate && !(c->cmd->flags & REDIS_CMD_READONLY)) { + execCommandPropagateMulti(c); + must_propagate = 1; + } + call(c,REDIS_CALL_FULL); /* Commands may alter argc/argv, restore mstate. */ @@ -166,13 +166,10 @@ void execCommand(redisClient *c) { c->argv = orig_argv; c->argc = orig_argc; c->cmd = orig_cmd; - freeClientMultiState(c); - initClientMultiState(c); - c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC); - /* Make sure the EXEC command is always replicated / AOF, since we - * always send the MULTI command (we can't know beforehand if the - * next operations will contain at least a modification to the DB). */ - server.dirty++; + discardTransaction(c); + /* Make sure the EXEC command will be propagated as well if MULTI + * was already propagated. */ + if (must_propagate) server.dirty++; handle_monitor: /* Send EXEC to clients waiting data from MONITOR. We do it here diff --git a/src/networking.c b/src/networking.c index 3f14b17c8..569b64945 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1259,7 +1259,7 @@ void rewriteClientCommandVector(redisClient *c, int argc, ...) { /* Replace argv and argc with our new versions. */ c->argv = argv; c->argc = argc; - c->cmd = lookupCommand(c->argv[0]->ptr); + c->cmd = lookupCommandOrOriginal(c->argv[0]->ptr); redisAssertWithInfo(c,NULL,c->cmd != NULL); va_end(ap); } @@ -1277,7 +1277,7 @@ void rewriteClientCommandArgument(redisClient *c, int i, robj *newval) { /* If this is the command name make sure to fix c->cmd. */ if (i == 0) { - c->cmd = lookupCommand(c->argv[0]->ptr); + c->cmd = lookupCommandOrOriginal(c->argv[0]->ptr); redisAssertWithInfo(c,NULL,c->cmd != NULL); } } diff --git a/src/pubsub.c b/src/pubsub.c index ece32445d..ed2b0766e 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -306,7 +306,6 @@ void punsubscribeCommand(redisClient *c) { slowlogAddComplexityParam('M', listLength(server.pubsub_patterns)); if (c->argc == 1) { pubsubUnsubscribeAllPatterns(c,1); - return; } else { int j; @@ -648,7 +648,9 @@ int rdbSave(char *filename, int dbnum) { return REDIS_ERR; } - rioInitWithFileAndFsyncInterval(&rdb,fp, 1024*1024*16); + rioInitWithFile(&rdb,fp); + if (server.rdb_incremental_fsync) + rioSetAutoSync(&rdb,REDIS_RDB_AUTOSYNC_BYTES); if (server.rdb_checksum) rdb.update_cksum = rioGenericUpdateChecksum; snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION); @@ -763,6 +765,7 @@ int rdbSaveBackground(char *filename, int bgsavetype, int dbnum) { if (bgsavetype == REDIS_BGSAVE_NORMAL) server.stat_rdb_saves++; server.dirty_before_bgsave = server.dirty; + server.lastbgsave_try = time(NULL); start = ustime(); if (server.rdb_bgsavefilename) zfree(server.rdb_bgsavefilename); @@ -1144,11 +1147,8 @@ int rdbLoad(char *filename) { FILE *fp; rio rdb; - fp = fopen(filename,"r"); - if (!fp) { - errno = ENOENT; - return REDIS_ERR; - } + if ((fp = fopen(filename,"r")) == NULL) return REDIS_ERR; + rioInitWithFile(&rdb,fp); rdb.update_cksum = rdbLoadProgressCallback; rdb.max_processing_chunk = server.loading_process_events_interval_bytes; diff --git a/src/redis-cli.c b/src/redis-cli.c index c182ac17d..f12059ca4 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -42,6 +42,7 @@ #include <sys/time.h> #include <assert.h> #include <fcntl.h> +#include <limits.h> #include "hiredis.h" #include "sds.h" @@ -56,6 +57,7 @@ #define OUTPUT_STANDARD 0 #define OUTPUT_RAW 1 #define OUTPUT_CSV 2 +#define REDIS_CLI_KEEPALIVE_INTERVAL 15 /* seconds */ static redisContext *context; static struct config { @@ -70,11 +72,13 @@ static struct config { int monitor_mode; int pubsub_mode; int latency_mode; + int latency_history; int cluster_mode; int cluster_reissue_command; int slave_mode; int pipe_mode; int getrdb_mode; + int stat_mode; char *rdb_filename; int bigkeys; int stdinarg; /* get last arg from stdin. (-x option) */ @@ -332,6 +336,12 @@ static int cliConnect(int force) { return REDIS_ERR; } + /* Set aggressive KEEP_ALIVE socket option in the Redis context socket + * in order to prevent timeouts caused by the execution of long + * commands. At the same time this improves the detection of real + * errors. */ + anetKeepAlive(NULL, context->fd, REDIS_CLI_KEEPALIVE_INTERVAL); + /* Do AUTH and select the right DB. */ if (cliAuth() != REDIS_OK) return REDIS_ERR; @@ -623,6 +633,36 @@ static int cliSendCommand(int argc, char **argv, int repeat) { return REDIS_OK; } +/* Send the INFO command, reconnecting the link if needed. */ +static redisReply *reconnectingInfo(void) { + redisContext *c = context; + redisReply *reply = NULL; + int tries = 0; + + assert(!c->err); + while(reply == NULL) { + while (c->err & (REDIS_ERR_IO | REDIS_ERR_EOF)) { + printf("Reconnecting (%d)...\r", ++tries); + fflush(stdout); + + redisFree(c); + c = redisConnect(config.hostip,config.hostport); + usleep(1000000); + } + + reply = redisCommand(c,"INFO"); + if (c->err && !(c->err & (REDIS_ERR_IO | REDIS_ERR_EOF))) { + fprintf(stderr, "Error: %s\n", c->errstr); + exit(1); + } else if (tries > 0) { + printf("\n"); + } + } + + context = c; + return reply; +} + /*------------------------------------------------------------------------------ * User interface *--------------------------------------------------------------------------- */ @@ -661,8 +701,13 @@ static int parseOptions(int argc, char **argv) { config.output = OUTPUT_CSV; } else if (!strcmp(argv[i],"--latency")) { config.latency_mode = 1; + } else if (!strcmp(argv[i],"--latency-history")) { + config.latency_mode = 1; + config.latency_history = 1; } else if (!strcmp(argv[i],"--slave")) { config.slave_mode = 1; + } else if (!strcmp(argv[i],"--stat")) { + config.stat_mode = 1; } else if (!strcmp(argv[i],"--rdb") && !lastarg) { config.getrdb_mode = 1; config.rdb_filename = argv[++i]; @@ -683,7 +728,15 @@ static int parseOptions(int argc, char **argv) { sdsfree(version); exit(0); } else { - break; + if (argv[i][0] == '-') { + fprintf(stderr, + "Unrecognized option or bad number of args for: '%s'\n", + argv[i]); + exit(1); + } else { + /* Likely the command name, stop here. */ + break; + } } } return i; @@ -712,26 +765,29 @@ static void usage() { "redis-cli %s\n" "\n" "Usage: redis-cli [OPTIONS] [cmd [arg [arg ...]]]\n" -" -h <hostname> Server hostname (default: 127.0.0.1)\n" -" -p <port> Server port (default: 6379)\n" -" -s <socket> Server socket (overrides hostname and port)\n" -" -a <password> Password to use when connecting to the server\n" -" -r <repeat> Execute specified command N times\n" -" -i <interval> When -r is used, waits <interval> seconds per command.\n" -" It is possible to specify sub-second times like -i 0.1\n" -" -n <db> Database number\n" -" -x Read last argument from STDIN\n" -" -d <delimiter> Multi-bulk delimiter in for raw formatting (default: \\n)\n" -" -c Enable cluster mode (follow -ASK and -MOVED redirections)\n" -" --raw Use raw formatting for replies (default when STDOUT is not a tty)\n" -" --latency Enter a special mode continuously sampling latency\n" -" --slave Simulate a slave showing commands received from the master\n" -" --rdb <filename> Transfer an RDB dump from remote server to local file.\n" -" --pipe Transfer raw Redis protocol from stdin to server\n" -" --bigkeys Sample Redis keys looking for big keys\n" -" --eval <file> Send an EVAL command using the Lua script at <file>\n" -" --help Output this help and exit\n" -" --version Output version and exit\n" +" -h <hostname> Server hostname (default: 127.0.0.1)\n" +" -p <port> Server port (default: 6379)\n" +" -s <socket> Server socket (overrides hostname and port)\n" +" -a <password> Password to use when connecting to the server\n" +" -r <repeat> Execute specified command N times\n" +" -i <interval> When -r is used, waits <interval> seconds per command.\n" +" It is possible to specify sub-second times like -i 0.1\n" +" -n <db> Database number\n" +" -x Read last argument from STDIN\n" +" -d <delimiter> Multi-bulk delimiter in for raw formatting (default: \\n)\n" +" -c Enable cluster mode (follow -ASK and -MOVED redirections)\n" +" --raw Use raw formatting for replies (default when STDOUT is\n" +" not a tty)\n" +" --latency Enter a special mode continuously sampling latency\n" +" --latency-history Like --latency but tracking latency changes over time.\n" +" Default time interval is 15 sec. Change it using -i.\n" +" --slave Simulate a slave showing commands received from the master\n" +" --rdb <filename> Transfer an RDB dump from remote server to local file.\n" +" --pipe Transfer raw Redis protocol from stdin to server\n" +" --bigkeys Sample Redis keys looking for big keys\n" +" --eval <file> Send an EVAL command using the Lua script at <file>\n" +" --help Output this help and exit\n" +" --version Output version and exit\n" "\n" "Examples:\n" " cat /etc/passwd | redis-cli -x set mypasswd\n" @@ -843,8 +899,7 @@ static void repl() { } } /* Free the argument vector */ - while(argc--) sdsfree(argv[argc]); - zfree(argv); + sdsfreesplitres(argv,argc); } /* linenoise() returns malloc-ed lines like readline() */ free(line); @@ -903,10 +958,16 @@ static int evalMode(int argc, char **argv) { return cliSendCommand(argc+3-got_comma, argv2, config.repeat); } +#define LATENCY_SAMPLE_RATE 10 /* milliseconds. */ +#define LATENCY_HISTORY_DEFAULT_INTERVAL 15000 /* milliseconds. */ static void latencyMode(void) { redisReply *reply; long long start, latency, min = 0, max = 0, tot = 0, count = 0; + long long history_interval = + config.interval ? config.interval/1000 : + LATENCY_HISTORY_DEFAULT_INTERVAL; double avg; + long long history_start = mstime(); if (!context) exit(1); while(1) { @@ -931,7 +992,13 @@ static void latencyMode(void) { printf("\x1b[0G\x1b[2Kmin: %lld, max: %lld, avg: %.2f (%lld samples)", min, max, avg, count); fflush(stdout); - usleep(10000); + if (config.latency_history && mstime()-history_start > history_interval) + { + printf(" -- %.2f seconds range\n", (float)(mstime()-history_start)/1000); + history_start = mstime(); + min = max = tot = count = 0; + } + usleep(LATENCY_SAMPLE_RATE * 1000); } } @@ -1199,7 +1266,11 @@ static void findBigKeys(void) { fprintf(stderr, "RANDOMKEY error: %s\n", reply1->str); exit(1); + } else if (reply1->type == REDIS_REPLY_NIL) { + fprintf(stderr, "It looks like the database is empty!\n"); + exit(1); } + /* Get the key type */ reply2 = redisCommand(context,"TYPE %s",reply1->str); assert(reply2 && reply2->type == REDIS_REPLY_STATUS); @@ -1255,6 +1326,145 @@ static void findBigKeys(void) { } } +/* Return the specified INFO field from the INFO command output "info". + * A new buffer is allocated for the result, that needs to be free'd. + * If the field is not found NULL is returned. */ +static char *getInfoField(char *info, char *field) { + char *p = strstr(info,field); + char *n1, *n2; + char *result; + + if (!p) return NULL; + p += strlen(field)+1; + n1 = strchr(p,'\r'); + n2 = strchr(p,','); + if (n2 && n2 < n1) n1 = n2; + result = malloc(sizeof(char)*(n1-p)+1); + memcpy(result,p,(n1-p)); + result[n1-p] = '\0'; + return result; +} + +/* Like the above function but automatically convert the result into + * a long. On error (missing field) LONG_MIN is returned. */ +static long getLongInfoField(char *info, char *field) { + char *value = getInfoField(info,field); + long l; + + if (!value) return LONG_MIN; + l = strtol(value,NULL,10); + free(value); + return l; +} + +/* Convert number of bytes into a human readable string of the form: + * 100B, 2G, 100M, 4K, and so forth. */ +void bytesToHuman(char *s, long long n) { + double d; + + if (n < 0) { + *s = '-'; + s++; + n = -n; + } + if (n < 1024) { + /* Bytes */ + sprintf(s,"%lluB",n); + return; + } else if (n < (1024*1024)) { + d = (double)n/(1024); + sprintf(s,"%.2fK",d); + } else if (n < (1024LL*1024*1024)) { + d = (double)n/(1024*1024); + sprintf(s,"%.2fM",d); + } else if (n < (1024LL*1024*1024*1024)) { + d = (double)n/(1024LL*1024*1024); + sprintf(s,"%.2fG",d); + } +} + +static void statMode() { + redisReply *reply; + long aux, requests = 0; + int i = 0; + + while(1) { + char buf[64]; + int j; + + reply = reconnectingInfo(); + if (reply->type == REDIS_REPLY_ERROR) { + printf("ERROR: %s\n", reply->str); + exit(1); + } + + if ((i++ % 20) == 0) { + printf( +"------- data ------ --------------------- load -------------------- - child -\n" +"keys mem clients blocked requests connections \n"); + } + + /* Keys */ + aux = 0; + for (j = 0; j < 20; j++) { + long k; + + sprintf(buf,"db%d:keys",j); + k = getLongInfoField(reply->str,buf); + if (k == LONG_MIN) continue; + aux += k; + } + sprintf(buf,"%ld",aux); + printf("%-11s",buf); + + /* Used memory */ + aux = getLongInfoField(reply->str,"used_memory"); + bytesToHuman(buf,aux); + printf("%-8s",buf); + + /* Clients */ + aux = getLongInfoField(reply->str,"connected_clients"); + sprintf(buf,"%ld",aux); + printf(" %-8s",buf); + + /* Blocked (BLPOPPING) Clients */ + aux = getLongInfoField(reply->str,"blocked_clients"); + sprintf(buf,"%ld",aux); + printf("%-8s",buf); + + /* Requets */ + aux = getLongInfoField(reply->str,"total_commands_processed"); + sprintf(buf,"%ld (+%ld)",aux,requests == 0 ? 0 : aux-requests); + printf("%-19s",buf); + requests = aux; + + /* Connections */ + aux = getLongInfoField(reply->str,"total_connections_received"); + sprintf(buf,"%ld",aux); + printf(" %-12s",buf); + + /* Children */ + aux = getLongInfoField(reply->str,"bgsave_in_progress"); + aux |= getLongInfoField(reply->str,"aof_rewrite_in_progress") << 1; + switch(aux) { + case 0: break; + case 1: + printf("SAVE"); + break; + case 2: + printf("AOF"); + break; + case 3: + printf("SAVE+AOF"); + break; + } + + printf("\n"); + freeReplyObject(reply); + usleep(config.interval); + } +} + int main(int argc, char **argv) { int firstarg; @@ -1269,6 +1479,7 @@ int main(int argc, char **argv) { config.monitor_mode = 0; config.pubsub_mode = 0; config.latency_mode = 0; + config.latency_history = 0; config.cluster_mode = 0; config.slave_mode = 0; config.getrdb_mode = 0; @@ -1319,6 +1530,13 @@ int main(int argc, char **argv) { findBigKeys(); } + /* Stat mode */ + if (config.stat_mode) { + if (cliConnect(0) == REDIS_ERR) exit(1); + if (config.interval == 0) config.interval = 1000000; + statMode(); + } + /* Start interactive mode when no command is provided */ if (argc == 0 && !config.eval) { /* Note that in repl mode we don't abort on connection error. diff --git a/src/redis.c b/src/redis.c index a7d468104..3e9b178ac 100755 --- a/src/redis.c +++ b/src/redis.c @@ -114,7 +114,7 @@ struct redisCommand *commandTable; */ struct redisCommand redisCommandTable[] = { {"get",getCommand,2,"r",0,NULL,1,1,1,0,0}, - {"set",setCommand,3,"wm",0,noPreloadGetKeys,1,1,1,0,0}, + {"set",setCommand,-3,"wm",0,noPreloadGetKeys,1,1,1,0,0}, {"setnx",setnxCommand,3,"wm",0,noPreloadGetKeys,1,1,1,0,0}, {"setex",setexCommand,4,"wm",0,noPreloadGetKeys,1,1,1,0,0}, {"psetex",psetexCommand,4,"wm",0,noPreloadGetKeys,1,1,1,0,0}, @@ -197,7 +197,7 @@ struct redisCommand redisCommandTable[] = { {"mset",msetCommand,-3,"wm",0,NULL,1,-1,2,0,0}, {"msetnx",msetnxCommand,-3,"wm",0,NULL,1,-1,2,0,0}, {"randomkey",randomkeyCommand,1,"rR",0,NULL,0,0,0,0,0}, - {"select",selectCommand,2,"r",0,NULL,0,0,0,0,0}, + {"select",selectCommand,2,"rl",0,NULL,0,0,0,0,0}, {"move",moveCommand,3,"w",0,NULL,1,1,1,0,0}, {"rename",renameCommand,3,"w",0,renameGetKeys,1,2,1,0,0}, {"renamenx",renamenxCommand,3,"w",0,renameGetKeys,1,2,1,0,0}, @@ -207,7 +207,7 @@ struct redisCommand redisCommandTable[] = { {"pexpireat",pexpireatCommand,3,"w",0,NULL,1,1,1,0,0}, {"keys",keysCommand,2,"rS",0,NULL,0,0,0,0,0}, {"dbsize",dbsizeCommand,1,"r",0,NULL,0,0,0,0,0}, - {"auth",authCommand,2,"rs",0,NULL,0,0,0,0,0}, + {"auth",authCommand,2,"rsl",0,NULL,0,0,0,0,0}, {"ping",pingCommand,1,"r",0,NULL,0,0,0,0,0}, {"echo",echoCommand,2,"r",0,NULL,0,0,0,0,0}, {"save",saveCommand,1,"ars",0,NULL,0,0,0,0,0}, @@ -238,7 +238,7 @@ struct redisCommand redisCommandTable[] = { {"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0}, {"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0}, {"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0}, - {"publish",publishCommand,3,"pflt",0,NULL,0,0,0,0,0}, + {"publish",publishCommand,3,"pfltr",0,NULL,0,0,0,0,0}, {"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0}, {"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0}, {"restore",restoreCommand,4,"awm",0,NULL,1,1,1,0,0}, @@ -574,36 +574,32 @@ int htNeedsResize(dict *dict) { /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL * we resize the hash table to save memory */ -void tryResizeHashTables(void) { - int j; - - for (j = 0; j < server.dbnum; j++) { - if (htNeedsResize(server.db[j].dict)) - dictResize(server.db[j].dict); - if (htNeedsResize(server.db[j].expires)) - dictResize(server.db[j].expires); - } +void tryResizeHashTables(int dbid) { + if (htNeedsResize(server.db[dbid].dict)) + dictResize(server.db[dbid].dict); + if (htNeedsResize(server.db[dbid].expires)) + dictResize(server.db[dbid].expires); } /* Our hash table implementation performs rehashing incrementally while * we write/read from the hash table. Still if the server is idle, the hash * table will use two tables for a long time. So we try to use 1 millisecond - * of CPU time at every serverCron() loop in order to rehash some key. */ -void incrementallyRehash(void) { - int j; - - for (j = 0; j < server.dbnum; j++) { - /* Keys dictionary */ - if (dictIsRehashing(server.db[j].dict)) { - dictRehashMilliseconds(server.db[j].dict,1); - break; /* already used our millisecond for this loop... */ - } - /* Expires */ - if (dictIsRehashing(server.db[j].expires)) { - dictRehashMilliseconds(server.db[j].expires,1); - break; /* already used our millisecond for this loop... */ - } + * of CPU time at every call of this function to perform some rehahsing. + * + * The function returns 1 if some rehashing was performed, otherwise 0 + * is returned. */ +int incrementallyRehash(int dbid) { + /* Keys dictionary */ + if (dictIsRehashing(server.db[dbid].dict)) { + dictRehashMilliseconds(server.db[dbid].dict,1); + return 1; /* already used our millisecond for this loop... */ + } + /* Expires */ + if (dictIsRehashing(server.db[dbid].expires)) { + dictRehashMilliseconds(server.db[dbid].expires,1); + return 1; /* already used our millisecond for this loop... */ } + return 0; } /* This function is called once a background process of some kind terminates, @@ -624,28 +620,57 @@ void updateDictResizePolicy(void) { /* Try to expire a few timed out keys. The algorithm used is adaptive and * will use few CPU cycles if there are few expiring keys, otherwise * it will get more aggressive to avoid that too much memory is used by - * keys that can be removed from the keyspace. */ + * keys that can be removed from the keyspace. + * + * No more than REDIS_DBCRON_DBS_PER_CALL databases are tested at every + * iteration. */ void activeExpireCycle(void) { - int j, iteration = 0; + /* This function has some global state in order to continue the work + * incrementally across calls. */ + static unsigned int current_db = 0; /* Last DB tested. */ + static int timelimit_exit = 0; /* Time limit hit in previous call? */ + + unsigned int j, iteration = 0; + unsigned int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL; long long start = ustime(), timelimit; + /* We usually should test REDIS_DBCRON_DBS_PER_CALL per iteration, with + * two exceptions: + * + * 1) Don't test more DBs than we have. + * 2) If last time we hit the time limit, we want to scan all DBs + * in this iteration, as there is work to do in some DB and we don't want + * expired keys to use memory for too much time. */ + if (dbs_per_call > server.dbnum || timelimit_exit) + dbs_per_call = server.dbnum; + /* We can use at max REDIS_EXPIRELOOKUPS_TIME_PERC percentage of CPU time * per iteration. Since this function gets called with a frequency of - * REDIS_HZ times per second, the following is the max amount of + * server.hz times per second, the following is the max amount of * microseconds we can spend in this function. */ - timelimit = 1000000*REDIS_EXPIRELOOKUPS_TIME_PERC/REDIS_HZ/100; + timelimit = 1000000*REDIS_EXPIRELOOKUPS_TIME_PERC/server.hz/100; + timelimit_exit = 0; if (timelimit <= 0) timelimit = 1; - for (j = 0; j < server.dbnum; j++) { + for (j = 0; j < dbs_per_call; j++) { int expired; - redisDb *db = server.db+j; + redisDb *db = server.db+(current_db % server.dbnum); + + /* Increment the DB now so we are sure if we run out of time + * in the current DB we'll restart from the next. This allows to + * distribute the time evenly across DBs. */ + current_db++; /* Continue to expire if at the end of the cycle more than 25% * of the keys were expired. */ do { - unsigned long num = dictSize(db->expires); - unsigned long slots = dictSlots(db->expires); - long long now = mstime(); + unsigned long num, slots; + long long now; + + /* If there is nothing to expire try next DB ASAP. */ + if ((num = dictSize(db->expires)) == 0) break; + slots = dictSlots(db->expires); + now = mstime(); /* When there are less than 1% filled slots getting random * keys is expensive, so stop here waiting for better times... @@ -679,8 +704,12 @@ void activeExpireCycle(void) { * expire. So after a given amount of milliseconds return to the * caller waiting for the other active expire cycle. */ iteration++; - if ((iteration & 0xf) == 0 && /* check once every 16 cycles. */ - (ustime()-start) > timelimit) return; + if ((iteration & 0xf) == 0 && /* check once every 16 iterations. */ + (ustime()-start) > timelimit) + { + timelimit_exit = 1; + return; + } } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4); } } @@ -766,13 +795,13 @@ int clientsCronResizeQueryBuffer(redisClient *c) { } void clientsCron(void) { - /* Make sure to process at least 1/(REDIS_HZ*10) of clients per call. - * Since this function is called REDIS_HZ times per second we are sure that + /* Make sure to process at least 1/(server.hz*10) of clients per call. + * Since this function is called server.hz times per second we are sure that * in the worst case we process all the clients in 10 seconds. * In normal conditions (a reasonable number of clients) we process * all the clients in a shorter time. */ int numclients = listLength(server.clients); - int iterations = numclients/(REDIS_HZ*10); + int iterations = numclients/(server.hz*10); if (iterations < 50) iterations = (numclients < 50) ? numclients : 50; @@ -872,7 +901,52 @@ void checkOSMemory(void) { } -/* This is our timer interrupt, called REDIS_HZ times per second. +/* This function handles 'background' operations we are required to do + * incrementally in Redis databases, such as active key expiring, resizing, + * rehashing. */ +void databasesCron(void) { + /* Expire keys by random sampling. Not required for slaves + * as master will synthesize DELs for us. */ + if (server.active_expire_enabled && server.masterhost == NULL) + activeExpireCycle(); + + /* Perform hash tables rehashing if needed, but only if there are no + * other processes saving the DB on disk. Otherwise rehashing is bad + * as will cause a lot of copy-on-write of memory pages. */ + if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { + /* We use global counters so if we stop the computation at a given + * DB we'll be able to start from the successive in the next + * cron loop iteration. */ + static unsigned int resize_db = 0; + static unsigned int rehash_db = 0; + unsigned int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL; + unsigned int j; + + /* Don't test more DBs than we have. */ + if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum; + + /* Resize */ + for (j = 0; j < dbs_per_call; j++) { + tryResizeHashTables(resize_db % server.dbnum); + resize_db++; + } + + /* Rehash */ + if (server.activerehashing) { + for (j = 0; j < dbs_per_call; j++) { + int work_done = incrementallyRehash(rehash_db % server.dbnum); + rehash_db++; + if (work_done) { + /* If the function did some work, stop here, we'll do + * more at the next cron loop. */ + break; + } + } + } + } +} + +/* This is our timer interrupt, called server.hz times per second. * Here is where we do a number of things that need to be done asynchronously. * For instance: * @@ -886,7 +960,7 @@ void checkOSMemory(void) { * - Replication reconnection. * - Many more... * - * Everything directly called here will be called REDIS_HZ times per second, + * Everything directly called here will be called server.hz times per second, * so in order to throttle execution of things we want to do less frequently * a macro is used: run_with_period(milliseconds) { .... } */ @@ -959,17 +1033,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } } - /* We don't want to resize the hash tables while a background saving - * is in progress: the saving child is created using fork() that is - * implemented with a copy-on-write semantic in most modern systems, so - * if we resize the HT while there is the saving child at work actually - * a lot of memory movements in the parent will cause a lot of pages - * copied. */ - if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { - tryResizeHashTables(); - if (server.activerehashing) incrementallyRehash(); - } - /* Show information about connected clients */ if (!server.sentinel_mode) { run_with_period(5000) { @@ -984,6 +1047,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* We need to do a few operations on clients asynchronously. */ clientsCron(); + /* Handle background operations on Redis databases. */ + databasesCron(); + /* Start a scheduled AOF rewrite if this was requested by the user while * a BGSAVE was in progress. */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && @@ -1020,8 +1086,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { for (j = 0; j < server.saveparamslen; j++) { struct saveparam *sp = server.saveparams+j; + /* Save if we reached the given amount of changes, + * the given amount of seconds, and if the latest bgsave was + * successful or if, in case of an error, at least + * REDIS_BGSAVE_RETRY_DELAY seconds already elapsed. */ if (server.dirty >= sp->changes && - server.unixtime-server.lastsave > sp->seconds) { + server.unixtime-server.lastsave > sp->seconds && + (server.unixtime-server.lastbgsave_try > + REDIS_BGSAVE_RETRY_DELAY || + server.lastbgsave_status == REDIS_OK)) + { redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...", sp->changes, sp->seconds); rdbSaveBackground(server.rdb_filename, REDIS_BGSAVE_NORMAL, -1); @@ -1050,11 +1124,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * cron function is called. */ if (server.aof_flush_postponed_start) flushAppendOnlyFile(0); - /* Expire a few keys per cycle, only if this is a master. - * On slaves we wait for DEL operations synthesized by the master - * in order to guarantee a strict consistency. */ - if (server.masterhost == NULL) activeExpireCycle(); - /* Close clients that need to be closed asynchronous */ freeClientsInAsyncFreeQueue(); @@ -1068,7 +1137,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } server.cronloops++; - return 1000/REDIS_HZ; + return 1000/server.hz; } /* This function gets called every time Redis is entering the @@ -1176,6 +1245,7 @@ void createSharedObjects(void) { void initServerConfig() { getRandomHexChars(server.runid,REDIS_RUN_ID_SIZE); + server.hz = REDIS_DEFAULT_HZ; server.runid[REDIS_RUN_ID_SIZE] = '\0'; server.arch_bits = (sizeof(long) == 8) ? 64 : 32; server.port = REDIS_SERVERPORT; @@ -1188,6 +1258,7 @@ void initServerConfig() { server.verbosity = REDIS_NOTICE; server.maxidletime = REDIS_MAXIDLETIME; server.tcpkeepalive = 0; + server.active_expire_enabled = 1; server.client_max_querybuf_len = REDIS_MAX_QUERYBUF_LEN; server.saveparams = NULL; server.loading = 0; @@ -1213,12 +1284,15 @@ void initServerConfig() { server.aof_fd = -1; server.aof_selected_db = -1; /* Make sure the first time will not match */ server.aof_flush_postponed_start = 0; + server.aof_rewrite_incremental_fsync = 1; + server.rdb_incremental_fsync = 1; server.pidfile = zstrdup("/var/run/redis.pid"); server.rdb_filename = zstrdup("dump.rdb"); server.aof_filename = zstrdup("appendonly.aof"); server.requirepass = NULL; server.rdb_compression = 1; server.rdb_checksum = 1; + server.stop_writes_on_bgsave_err = 1; server.activerehashing = 1; server.maxclients = REDIS_MAX_CLIENTS; server.bpop_blocked_clients = 0; @@ -1256,7 +1330,7 @@ void initServerConfig() { server.repl_syncio_timeout = REDIS_REPL_SYNCIO_TIMEOUT; server.repl_serve_stale_data = 1; server.repl_slave_ro = 1; - server.repl_down_since = time(NULL); + server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ server.repl_disable_tcp_nodelay = 0; server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY; @@ -1281,6 +1355,7 @@ void initServerConfig() { * initial configuration, since command names may be changed via * redis.conf using the rename-command directive. */ server.commands = dictCreate(&commandTableDictType,NULL); + server.orig_commands = dictCreate(&commandTableDictType,NULL); populateCommandTable(); server.delCommand = lookupCommandByCString("del"); server.multiCommand = lookupCommandByCString("multi"); @@ -1406,7 +1481,8 @@ void initServer() { server.aof_child_pid = -1; aofRewriteBufferReset(); server.aof_buf = sdsempty(); - server.lastsave = time(NULL); + server.lastsave = time(NULL); /* At startup we consider the DB saved. */ + server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */ server.rdb_save_time_last = -1; server.rdb_save_time_start = -1; server.dirty = 0; @@ -1431,8 +1507,10 @@ void initServer() { server.ops_sec_last_sample_ops = 0; server.unixtime = time(NULL); server.lastbgsave_status = REDIS_OK; - server.stop_writes_on_bgsave_err = 1; - aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL); + if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { + redisPanic("create time event failed"); + exit(1); + } if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.ipfd file event."); if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, @@ -1472,7 +1550,7 @@ void populateCommandTable(void) { for (j = 0; j < numcommands; j++) { struct redisCommand *c = redisCommandTable+j; char *f = c->sflags; - int retval; + int retval1, retval2; while(*f != '\0') { switch(*f) { @@ -1493,8 +1571,11 @@ void populateCommandTable(void) { f++; } - retval = dictAdd(server.commands, sdsnew(c->name), c); - assert(retval == DICT_OK); + retval1 = dictAdd(server.commands, sdsnew(c->name), c); + /* Populate an additional dictionary that will be unaffected + * by rename-command statements in redis.conf. */ + retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c); + redisAssert(retval1 == DICT_OK && retval2 == DICT_OK); } } @@ -1562,6 +1643,20 @@ struct redisCommand *lookupCommandByCString(char *s) { return cmd; } +/* Lookup the command in the current table, if not found also check in + * the original table containing the original command names unaffected by + * redis.conf rename-command statement. + * + * This is used by functions rewriting the argument vector such as + * rewriteClientCommandVector() in order to set client->cmd pointer + * correctly even if the command was renamed. */ +struct redisCommand *lookupCommandOrOriginal(sds name) { + struct redisCommand *cmd = dictFetchValue(server.commands, name); + + if (!cmd) cmd = dictFetchValue(server.orig_commands,name); + return cmd; +} + /* Propagate the specified command (in the context of the specified database id) * to AOF and Slaves. * @@ -2020,6 +2115,7 @@ sds genRedisInfoString(char *section) { "tcp_port:%d\r\n" "uptime_in_seconds:%ld\r\n" "uptime_in_days:%ld\r\n" + "hz:%d\r\n" "lru_clock:%ld\r\n", REDIS_VERSION, redisGitSHA1(), @@ -2038,6 +2134,7 @@ sds genRedisInfoString(char *section) { server.port, uptime, uptime/(3600*24), + server.hz, (unsigned long) server.lruclock); } @@ -2698,7 +2795,7 @@ void loadDataFromDisk(void) { redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds", (float)(ustime()-start)/1000000); } else if (errno != ENOENT) { - redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting."); + redisLog(REDIS_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); exit(1); } } @@ -2707,7 +2804,7 @@ void loadDataFromDisk(void) { void redisOutOfMemoryHandler(size_t allocation_size) { redisLog(REDIS_WARNING,"Out Of Memory allocating %zu bytes!", allocation_size); - redisPanic("OOM"); + redisPanic("Redis aborting for OUT OF MEMORY"); } int main(int argc, char **argv) { diff --git a/src/redis.h b/src/redis.h index a969778d8..0c590f634 100755 --- a/src/redis.h +++ b/src/redis.h @@ -67,13 +67,16 @@ #define REDIS_ERR -1 /* Static server configuration */ -#define REDIS_HZ 100 /* Time interrupt calls/sec. */ +#define REDIS_DEFAULT_HZ 10 /* Time interrupt calls/sec. */ +#define REDIS_MIN_HZ 1 +#define REDIS_MAX_HZ 500 #define REDIS_SERVERPORT 6379 /* TCP port */ #define REDIS_MAXIDLETIME 0 /* default client timeout: infinite */ #define REDIS_DEFAULT_DBNUM 16 #define REDIS_CONFIGLINE_MAX 1024 #define REDIS_EXPIRELOOKUPS_PER_CRON 10 /* lookup 10 expires per loop */ #define REDIS_EXPIRELOOKUPS_TIME_PERC 25 /* CPU max % for keys collection */ +#define REDIS_DBCRON_DBS_PER_CALL 16 #define REDIS_MAX_WRITE_PER_EVENT (1024*64) #define REDIS_SHARED_SELECT_CMDS 10 #define REDIS_SHARED_INTEGERS 10000 @@ -92,6 +95,7 @@ #define REDIS_REPL_PING_SLAVE_PERIOD 10 #define REDIS_RUN_ID_SIZE 40 #define REDIS_OPS_SEC_SAMPLES 16 +#define REDIS_BGSAVE_RETRY_DELAY 5 /* Wait a few secs before trying again. */ /* Protocol and I/O related defines */ #define REDIS_MAX_QUERYBUF_LEN (1024*1024*1024) /* 1GB max query buffer. */ @@ -99,6 +103,8 @@ #define REDIS_REPLY_CHUNK_BYTES (16*1024) /* 16k output buffer */ #define REDIS_INLINE_MAX_SIZE (1024*64) /* Max size of inline reads */ #define REDIS_MBULK_BIG_ARG (1024*32) +#define REDIS_AOF_AUTOSYNC_BYTES (1024*1024*16) /* fdatasync every 16MB */ +#define REDIS_RDB_AUTOSYNC_BYTES (1024*1024*16) /* fdatasync every 16MB */ /* Hash table parameters */ #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */ @@ -302,8 +308,8 @@ /* Using the following macro you can run code inside serverCron() with the * specified period, specified in milliseconds. - * The actual resolution depends on REDIS_HZ. */ -#define run_with_period(_ms_) if (!(server.cronloops%((_ms_)/(1000/REDIS_HZ)))) + * The actual resolution depends on server.hz. */ +#define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz)))) /* We can print the stacktrace, so our assert is defined this way: */ #define redisAssertWithInfo(_c,_o,_e) ((_e)?(void)0 : (_redisAssertWithInfo(_c,_o,#_e,__FILE__,__LINE__),_exit(1))) @@ -510,8 +516,10 @@ struct complexity_param { struct redisServer { /* General */ + int hz; /* serverCron() calls frequency in hertz */ redisDb *db; - dict *commands; /* Command table hash table */ + dict *commands; /* Command table */ + dict *orig_commands; /* Command table before command renaming. */ aeEventLoop *el; unsigned lruclock:22; /* Clock incrementing every minute, for LRU */ unsigned lruclock_padding:10; @@ -575,6 +583,7 @@ struct redisServer { int verbosity; /* Loglevel in redis.conf */ int maxidletime; /* Client timeout in seconds */ int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */ + int active_expire_enabled; /* Can be disabled for testing purposes. */ size_t client_max_querybuf_len; /* Limit for client query buffer length */ int dbnum; /* Total number of configured DBs */ int daemonize; /* True if running as a daemon */ @@ -602,6 +611,7 @@ struct redisServer { time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */ int aof_lastbgrewrite_status; /* REDIS_OK or REDIS_ERR */ unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */ + int aof_rewrite_incremental_fsync;/* fsync incrementally while rewriting? */ /* RDB persistence */ long long dirty; /* Changes to DB from the last save */ long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */ @@ -616,10 +626,12 @@ struct redisServer { int rdb_compression; /* Use compression in RDB? */ int rdb_checksum; /* Use RDB checksum? */ time_t lastsave; /* Unix time of last successful save */ + time_t lastbgsave_try; /* Unix time of last attempted bgsave */ time_t rdb_save_time_last; /* Time used by last RDB save run. */ time_t rdb_save_time_start; /* Current RDB save start time. */ int lastbgsave_status; /* REDIS_OK or REDIS_ERR */ int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */ + int rdb_incremental_fsync; /* fsync incrementally while rewriting? */ /* Propagation of commands in AOF / replication */ redisOpArray also_propagate; /* Additional command to propagate. */ int draining; /* Currently draining to slaves? */ @@ -979,6 +991,7 @@ int processCommand(redisClient *c); void setupSignalHandlers(void); struct redisCommand *lookupCommand(sds name); struct redisCommand *lookupCommandByCString(char *s); +struct redisCommand *lookupCommandOrOriginal(sds name); void call(redisClient *c, int flags); void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags); void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int target); diff --git a/src/replication.c b/src/replication.c index d8283590d..4d11cf6b3 100755 --- a/src/replication.c +++ b/src/replication.c @@ -697,11 +697,16 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { goto error; } - /* We don't care about the reply, it can be +PONG or an error since - * the server requires AUTH. As long as it replies correctly, it's - * fine from our point of view. */ - if (buf[0] != '-' && buf[0] != '+') { - redisLog(REDIS_WARNING,"Unexpected reply to PING from master."); + /* We accept only two replies as valid, a positive +PONG reply + * (we just check for "+") or an authentication error. + * Note that older versions of Redis replied with "operation not + * permitted" instead of using a proper error code, so we test + * both. */ + if (buf[0] != '+' && + strncmp(buf,"-NOAUTH",7) != 0 && + strncmp(buf,"-ERR operation not permitted",28) != 0) + { + redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf); goto error; } else { redisLog(REDIS_NOTICE, @@ -924,7 +929,7 @@ void replicationCron(void) { * So slaves can implement an explicit timeout to masters, and will * be able to detect a link disconnection even if the TCP connection * will not actually go down. */ - if (!(server.cronloops % (server.repl_ping_slave_period * REDIS_HZ))) { + if (!(server.cronloops % (server.repl_ping_slave_period * server.hz))) { listIter li; listNode *ln; @@ -51,6 +51,8 @@ #include <unistd.h> #include "rio.h" #include "util.h" +#include "config.h" +#include "redis.h" uint64_t crc64(uint64_t crc, const unsigned char *s, uint64_t l); @@ -79,10 +81,10 @@ static off_t rioBufferTell(rio *r) { static size_t rioFileWrite(rio *r, const void *buf, size_t len) { size_t bytes_written = 0; while (len) { - size_t bytes_to_write = (r->io.file.fsync_interval && r->io.file.fsync_interval < len) ? r->io.file.fsync_interval : len; + size_t bytes_to_write = (r->io.file.autosync && r->io.file.autosync < len) ? r->io.file.autosync : len; if (fwrite((char*)buf + bytes_written,bytes_to_write,1,r->io.file.fp) != 1) return 0; - if (r->io.file.fsync_interval && (r->processed_bytes + bytes_written)/r->io.file.fsync_interval < (r->processed_bytes + bytes_written + bytes_to_write)/r->io.file.fsync_interval) + if (r->io.file.autosync && (r->processed_bytes + bytes_written)/r->io.file.autosync < (r->processed_bytes + bytes_written + bytes_to_write)/r->io.file.autosync) fsync(fileno(r->io.file.fp)); bytes_written += bytes_to_write; len -= bytes_to_write; @@ -125,12 +127,7 @@ static const rio rioFileIO = { void rioInitWithFile(rio *r, FILE *fp) { *r = rioFileIO; r->io.file.fp = fp; -} - -void rioInitWithFileAndFsyncInterval(rio *r, FILE *fp, size_t fsyncInterval) { - *r = rioFileIO; - r->io.file.fp = fp; - r->io.file.fsync_interval = fsyncInterval; + r->io.file.autosync = 0; } void rioInitWithBuffer(rio *r, sds s) { @@ -145,6 +142,19 @@ void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) { r->cksum = crc64(r->cksum,buf,len); } +/* Set the file-based rio object to auto-fsync every 'bytes' file written. + * By default this is set to zero that means no automatic file sync is + * performed. + * + * This feature is useful in a few contexts since when we rely on OS write + * buffers sometimes the OS buffers way too much, resulting in too many + * disk I/O concentrated in very little time. When we fsync in an explicit + * way instead the I/O pressure is more distributed across time. */ +void rioSetAutoSync(rio *r, off_t bytes) { + redisAssert(r->read == rioFileIO.read); + r->io.file.autosync = bytes; +} + /* ------------------------------ Higher level interface --------------------------- * The following higher level functions use lower level rio.c functions to help * generating the Redis protocol for the Append Only File. */ @@ -43,10 +43,11 @@ struct _rio { size_t (*read)(struct _rio *, void *buf, size_t len); size_t (*write)(struct _rio *, const void *buf, size_t len); off_t (*tell)(struct _rio *); - /* The update_cksum method if not NULL is used to compute the checksum of all the - * data that was read or written so far. The method should be designed so that - * can be called with the current checksum, and the buf and len fields pointing - * to the new block of data to add to the checksum computation. */ + /* The update_cksum method if not NULL is used to compute the checksum of + * all the data that was read or written so far. The method should be + * designed so that can be called with the current checksum, and the buf + * and len fields pointing to the new block of data to add to the checksum + * computation. */ void (*update_cksum)(struct _rio *, const void *buf, size_t len); /* The current checksum */ @@ -66,7 +67,7 @@ struct _rio { } buffer; struct { FILE *fp; - size_t fsync_interval; + off_t autosync; /* fsync after 'autosync' bytes written. */ } file; } io; }; @@ -108,7 +109,6 @@ static inline off_t rioTell(rio *r) { } void rioInitWithFile(rio *r, FILE *fp); -void rioInitWithFileAndFsyncInterval(rio *r, FILE *fp, size_t fsyncInterval); void rioInitWithBuffer(rio *r, sds s); size_t rioWriteBulkCount(rio *r, char prefix, int count); @@ -117,5 +117,6 @@ size_t rioWriteBulkLongLong(rio *r, long long l); size_t rioWriteBulkDouble(rio *r, double d); void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len); +void rioSetAutoSync(rio *r, off_t bytes); #endif diff --git a/src/scripting.c b/src/scripting.c index ee3d0405b..ff8c83cdc 100755 --- a/src/scripting.c +++ b/src/scripting.c @@ -258,6 +258,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { "Write commands not allowed after non deterministic commands"); goto cleanup; } else if (server.masterhost && server.repl_slave_ro && + !server.loading && !(server.lua_caller->flags & REDIS_MASTER)) { luaPushError(lua, shared.roslaveerr->ptr); @@ -475,11 +475,18 @@ int hex_digit_to_int(char c) { * foo bar "newline are supported\n" and "\xff\x00otherstuff" * * The number of arguments is stored into *argc, and an array - * of sds is returned. The caller should sdsfree() all the returned - * strings and finally zfree() the array itself. + * of sds is returned. + * + * The caller should free the resulting array of sds strings with + * sdsfreesplitres(). * * Note that sdscatrepr() is able to convert back a string into * a quoted string in the same format sdssplitargs() is able to parse. + * + * The function returns the allocated tokens on success, even when the + * input string is empty, or NULL if the input contains unbalanced + * quotes or closed quotes followed by non space characters + * as in: "foo"bar or "foo' */ sds *sdssplitargs(const char *line, int *argc) { const char *p = line; @@ -576,6 +583,8 @@ sds *sdssplitargs(const char *line, int *argc) { (*argc)++; current = NULL; } else { + /* Even on empty input string return something not NULL. */ + if (vector == NULL) vector = zmalloc(sizeof(void*)); return vector; } } @@ -585,16 +594,10 @@ err: sdsfree(vector[*argc]); zfree(vector); if (current) sdsfree(current); + *argc = 0; return NULL; } -void sdssplitargs_free(sds *argv, int argc) { - int j; - - for (j = 0 ;j < argc; j++) sdsfree(argv[j]); - zfree(argv); -} - /* Modify the string substituting all the occurrences of the set of * characters specified in the 'from' string to the corresponding character * in the 'to' array. @@ -88,7 +88,6 @@ void sdstoupper(sds s); sds sdsfromlonglong(long long value); sds sdscatrepr(sds s, const char *p, size_t len); sds *sdssplitargs(const char *line, int *argc); -void sdssplitargs_free(sds *argv, int argc); sds sdsmapchars(sds s, const char *from, const char *to, size_t setlen); /* Low level functions exposed to the user API */ diff --git a/src/sentinel.c b/src/sentinel.c index fc857344c..ed0978694 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -74,6 +74,8 @@ typedef struct sentinelAddr { #define SRI_RECONF_DONE (1<<13) /* Slave synchronized with new master. */ #define SRI_FORCE_FAILOVER (1<<14) /* Force failover with master up. */ #define SRI_SCRIPT_KILL_SENT (1<<15) /* SCRIPT KILL already sent on -BUSY */ +#define SRI_DEMOTE (1<<16) /* If the instance claims to be a master, demote + it into a slave sending SLAVEOF. */ #define SENTINEL_INFO_PERIOD 10000 #define SENTINEL_PING_PERIOD 1000 @@ -403,7 +405,7 @@ void initSentinel(void) { /* Initialize various data structures. */ sentinel.masters = dictCreate(&instancesDictType,NULL); sentinel.tilt = 0; - sentinel.tilt_start_time = mstime(); + sentinel.tilt_start_time = 0; sentinel.previous_time = mstime(); sentinel.running_scripts = 0; sentinel.scripts_queue = listCreate(); @@ -1130,7 +1132,6 @@ int sentinelResetMastersByPattern(char *pattern, int flags) { * TODO: make this reset so that original sentinels are re-added with * same ip / port / runid. */ - int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) { sentinelAddr *oldaddr, *newaddr; @@ -1139,12 +1140,26 @@ int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, sentinelResetMaster(master,SENTINEL_NO_FLAGS); oldaddr = master->addr; master->addr = newaddr; + master->o_down_since_time = 0; + master->s_down_since_time = 0; + /* Release the old address at the end so we are safe even if the function * gets the master->addr->ip and master->addr->port as arguments. */ releaseSentinelAddr(oldaddr); return REDIS_OK; } +/* Return non-zero if there was no SDOWN or ODOWN error associated to this + * instance in the latest 'ms' milliseconds. */ +int sentinelRedisInstanceNoDownFor(sentinelRedisInstance *ri, mstime_t ms) { + mstime_t most_recent; + + most_recent = ri->s_down_since_time; + if (ri->o_down_since_time > most_recent) + most_recent = ri->o_down_since_time; + return most_recent == 0 || (mstime() - most_recent) > ms; +} + /* ============================ Config handling ============================= */ char *sentinelHandleConfiguration(char **argv, int argc) { sentinelRedisInstance *ri; @@ -1382,26 +1397,39 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) { } } - /* slave0:<ip>,<port>,<state> */ + /* old versions: slave0:<ip>,<port>,<state> + * new versions: slave0:ip=127.0.0.1,port=9999,... */ if ((ri->flags & SRI_MASTER) && sdslen(l) >= 7 && !memcmp(l,"slave",5) && isdigit(l[5])) { char *ip, *port, *end; - ip = strchr(l,':'); if (!ip) continue; - ip++; /* Now ip points to start of ip address. */ - port = strchr(ip,','); if (!port) continue; - *port = '\0'; /* nul term for easy access. */ - port++; /* Now port points to start of port number. */ - end = strchr(port,','); if (!end) continue; - *end = '\0'; /* nul term for easy access. */ + if (strstr(l,"ip=") == NULL) { + /* Old format. */ + ip = strchr(l,':'); if (!ip) continue; + ip++; /* Now ip points to start of ip address. */ + port = strchr(ip,','); if (!port) continue; + *port = '\0'; /* nul term for easy access. */ + port++; /* Now port points to start of port number. */ + end = strchr(port,','); if (!end) continue; + *end = '\0'; /* nul term for easy access. */ + } else { + /* New format. */ + ip = strstr(l,"ip="); if (!ip) continue; + ip += 3; /* Now ip points to start of ip address. */ + port = strstr(l,"port="); if (!port) continue; + port += 5; /* Now port points to start of port number. */ + /* Nul term both fields for easy access. */ + end = strchr(ip,','); if (end) *end = '\0'; + end = strchr(port,','); if (end) *end = '\0'; + } /* Check if we already have this slave into our table, * otherwise add it. */ if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) { if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip, - atoi(port), ri->quorum,ri)) != NULL) + atoi(port), ri->quorum, ri)) != NULL) { sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@"); } @@ -1446,44 +1474,80 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) { ri->info_refresh = mstime(); sdsfreesplitres(lines,numlines); - /* ---------------------------- Acting half ----------------------------- */ - if (sentinel.tilt) return; + /* ---------------------------- Acting half ----------------------------- + * Some things will not happen if sentinel.tilt is true, but some will + * still be processed. */ - /* Act if a master turned into a slave. */ - if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) { - if ((first_runid || runid_changed) && ri->slave_master_host) { - /* If it is the first time we receive INFO from it, but it's - * a slave while it was configured as a master, we want to monitor - * its master instead. */ - sentinelEvent(REDIS_WARNING,"+redirect-to-master",ri, - "%s %s %d %s %d", - ri->name, ri->addr->ip, ri->addr->port, - ri->slave_master_host, ri->slave_master_port); - sentinelResetMasterAndChangeAddress(ri,ri->slave_master_host, - ri->slave_master_port); - return; - } + /* When what we believe is our master, turned into a slave, the wiser + * thing we can do is to follow the events and redirect to the new + * master, always. */ + if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE && ri->slave_master_host) + { + sentinelEvent(REDIS_WARNING,"+redirect-to-master",ri, + "%s %s %d %s %d", + ri->name, ri->addr->ip, ri->addr->port, + ri->slave_master_host, ri->slave_master_port); + sentinelResetMasterAndChangeAddress(ri,ri->slave_master_host, + ri->slave_master_port); + return; /* Don't process anything after this event. */ } - /* Act if a slave turned into a master. */ + /* Handle slave -> master role switch. */ if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) { - if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) && - (runid_changed || first_runid)) + if (!sentinel.tilt && ri->flags & SRI_DEMOTE) { + /* If this sentinel was partitioned from the slave's master, + * or tilted recently, wait some time before to act, + * so that DOWN and roles INFO will be refreshed. */ + mstime_t wait_time = SENTINEL_INFO_PERIOD*2 + + ri->master->down_after_period*2; + + if (!sentinelRedisInstanceNoDownFor(ri->master,wait_time) || + (mstime()-sentinel.tilt_start_time) < wait_time) + return; + + /* Old master returned back? Turn it into a slave ASAP if + * we can reach what we believe is the new master now, and + * have a recent role information for it. + * + * Note: we'll clear the DEMOTE flag only when we have the + * acknowledge that it's a slave again. */ + if (ri->master->flags & SRI_MASTER && + (ri->master->flags & (SRI_S_DOWN|SRI_O_DOWN)) == 0 && + (mstime() - ri->master->info_refresh) < SENTINEL_INFO_PERIOD*2) + { + int retval; + retval = redisAsyncCommand(ri->cc, + sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %d", + ri->master->addr->ip, + ri->master->addr->port); + if (retval == REDIS_OK) + sentinelEvent(REDIS_NOTICE,"+demote-old-slave",ri,"%@"); + } else { + /* Otherwise if there are not the conditions to demote, we + * no longer trust the DEMOTE flag and remove it. */ + ri->flags &= ~SRI_DEMOTE; + sentinelEvent(REDIS_NOTICE,"-demote-flag-cleared",ri,"%@"); + } + } else if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) && + (runid_changed || first_runid)) { /* If a slave turned into master but: * * 1) Failover not in progress. - * 2) RunID hs changed, or its the first time we see an INFO output. + * 2) RunID has changed or its the first time we see an INFO output. * * We assume this is a reboot with a wrong configuration. - * Log the event and remove the slave. */ + * Log the event and remove the slave. Note that this is processed + * in tilt mode as well, otherwise we lose the information that the + * runid changed (reboot?) and when the tilt mode ends a fake + * failover will be detected. */ int retval; sentinelEvent(REDIS_WARNING,"-slave-restart-as-master",ri,"%@ #removing it from the attached slaves"); retval = dictDelete(ri->master->slaves,ri->name); redisAssert(retval == REDIS_OK); return; - } else if (ri->flags & SRI_PROMOTED) { + } else if (!sentinel.tilt && ri->flags & SRI_PROMOTED) { /* If this is a promoted slave we can change state to the * failover state machine. */ if ((ri->master->flags & SRI_FAILOVER_IN_PROGRESS) && @@ -1499,11 +1563,12 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) { sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER, "start",ri->master->addr,ri->addr); } - } else if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) || - ((ri->master->flags & SRI_FAILOVER_IN_PROGRESS) && - (ri->master->flags & SRI_I_AM_THE_LEADER) && - ri->master->failover_state == - SENTINEL_FAILOVER_STATE_WAIT_START)) + } else if (!sentinel.tilt && ( + !(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) || + ((ri->master->flags & SRI_FAILOVER_IN_PROGRESS) && + (ri->master->flags & SRI_I_AM_THE_LEADER) && + ri->master->failover_state == + SENTINEL_FAILOVER_STATE_WAIT_START))) { /* No failover in progress? Then it is the start of a failover * and we are an observer. @@ -1523,6 +1588,7 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) { ri->master->failover_state_change_time = mstime(); ri->master->promoted_slave = ri; ri->flags |= SRI_PROMOTED; + ri->flags &= ~SRI_DEMOTE; sentinelCallClientReconfScript(ri->master,SENTINEL_OBSERVER, "start", ri->master->addr,ri->addr); /* We are an observer, so we can only assume that the leader @@ -1534,6 +1600,10 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) { } } + /* None of the following conditions are processed when in tilt mode, so + * return asap. */ + if (sentinel.tilt) return; + /* Detect if the slave that is in the process of being reconfigured * changed state. */ if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE && @@ -1564,6 +1634,13 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) { ri->failover_state_change_time = mstime(); } } + + /* Detect if the old master was demoted as slave and generate the + * +slave event. */ + if (role == SRI_SLAVE && ri->flags & SRI_DEMOTE) { + sentinelEvent(REDIS_NOTICE,"+slave",ri,"%@"); + ri->flags &= ~SRI_DEMOTE; + } } void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) { @@ -1835,6 +1912,7 @@ void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) { if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,"); if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,"); if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,"); + if (ri->flags & SRI_DEMOTE) flags = sdscat(flags,"demote,"); if (sdslen(flags) != 0) flags = sdsrange(flags,0,-2); /* remove last "," */ addReplyBulkCString(c,flags); @@ -2592,6 +2670,7 @@ sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) { mstime_t info_validity_time = mstime()-SENTINEL_INFO_VALIDITY_TIME; if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue; + if (slave->flags & SRI_DEMOTE) continue; /* Old master not yet ready. */ if (slave->last_avail_time < info_validity_time) continue; if (slave->slave_priority == 0) continue; @@ -2837,12 +2916,28 @@ void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) { void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) { sentinelRedisInstance *ref = master->promoted_slave ? master->promoted_slave : master; + sds old_master_ip; + int old_master_port; sentinelEvent(REDIS_WARNING,"+switch-master",master,"%s %s %d %s %d", master->name, master->addr->ip, master->addr->port, ref->addr->ip, ref->addr->port); + old_master_ip = sdsdup(master->addr->ip); + old_master_port = master->addr->port; sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port); + /* If this is a real switch, that is, we have master->promoted_slave not + * NULL, then we want to add the old master as a slave of the new master, + * but flagging it with SRI_DEMOTE so that we know we'll need to send + * SLAVEOF once the old master is reachable again. */ + if (master != ref) { + /* Add the new slave, but don't generate a Sentinel event as it will + * happen later when finally the instance will claim to be a slave + * in the INFO output. */ + createSentinelRedisInstance(NULL,SRI_SLAVE|SRI_DEMOTE, + old_master_ip, old_master_port, master->quorum, master); + } + sdsfree(old_master_ip); } void sentinelFailoverStateMachine(sentinelRedisInstance *ri) { diff --git a/src/t_string.c b/src/t_string.c index 0a7f22583..389ff86a8 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -42,7 +42,27 @@ static int checkStringLength(redisClient *c, long long size) { return REDIS_OK; } -void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expire, int unit) { +/* The setGenericCommand() function implements the SET operation with different + * options and variants. This function is called in order to implement the + * following commands: SET, SETEX, PSETEX, SETNX. + * + * 'flags' changes the behavior of the command (NX or XX, see belove). + * + * 'expire' represents an expire to set in form of a Redis object as passed + * by the user. It is interpreted according to the specified 'unit'. + * + * 'ok_reply' and 'abort_reply' is what the function will reply to the client + * if the operation is performed, or when it is not because of NX or + * XX flags. + * + * If ok_reply is NULL "+OK" is used. + * If abort_reply is NULL, "$-1" is used. */ + +#define REDIS_SET_NO_FLAGS 0 +#define REDIS_SET_NX (1<<0) /* Set if key not exists. */ +#define REDIS_SET_XX (1<<1) /* Set if key exists. */ + +void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) { long long milliseconds = 0; /* initialized to avoid any harmness warning */ if (expire) { @@ -55,34 +75,68 @@ void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expir if (unit == UNIT_SECONDS) milliseconds *= 1000; } - if (nx && lookupKeyWrite(c->db,key) != NULL) { - addReply(c,shared.czero); + if ((flags & REDIS_SET_NX && lookupKeyWrite(c->db,key) != NULL) || + (flags & REDIS_SET_XX && lookupKeyWrite(c->db,key) == NULL)) + { + addReply(c, abort_reply ? abort_reply : shared.nullbulk); return; } setKey(c->db,key,val); server.dirty++; if (expire) setExpire(c->db,key,mstime()+milliseconds); - addReply(c, nx ? shared.cone : shared.ok); + addReply(c, ok_reply ? ok_reply : shared.ok); } +/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */ void setCommand(redisClient *c) { + int j; + robj *expire = NULL; + int unit = UNIT_SECONDS; + int flags = REDIS_SET_NO_FLAGS; + + for (j = 3; j < c->argc; j++) { + char *a = c->argv[j]->ptr; + robj *next = (j == c->argc-1) ? NULL : c->argv[j+1]; + + if ((a[0] == 'n' || a[0] == 'N') && + (a[1] == 'x' || a[1] == 'X') && a[2] == '\0') { + flags |= REDIS_SET_NX; + } else if ((a[0] == 'x' || a[0] == 'X') && + (a[1] == 'x' || a[1] == 'X') && a[2] == '\0') { + flags |= REDIS_SET_XX; + } else if ((a[0] == 'e' || a[0] == 'E') && + (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next) { + unit = UNIT_SECONDS; + expire = next; + j++; + } else if ((a[0] == 'p' || a[0] == 'P') && + (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next) { + unit = UNIT_MILLISECONDS; + expire = next; + j++; + } else { + addReply(c,shared.syntaxerr); + return; + } + } + c->argv[2] = tryObjectEncoding(c->argv[2]); - setGenericCommand(c,0,c->argv[1],c->argv[2],NULL,0); + setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL); } void setnxCommand(redisClient *c) { c->argv[2] = tryObjectEncoding(c->argv[2]); - setGenericCommand(c,1,c->argv[1],c->argv[2],NULL,0); + setGenericCommand(c,REDIS_SET_NX,c->argv[1],c->argv[2],NULL,0,shared.cone,shared.czero); } void setexCommand(redisClient *c) { c->argv[3] = tryObjectEncoding(c->argv[3]); - setGenericCommand(c,0,c->argv[1],c->argv[3],c->argv[2],UNIT_SECONDS); + setGenericCommand(c,REDIS_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_SECONDS,NULL,NULL); } void psetexCommand(redisClient *c) { c->argv[3] = tryObjectEncoding(c->argv[3]); - setGenericCommand(c,0,c->argv[1],c->argv[3],c->argv[2],UNIT_MILLISECONDS); + setGenericCommand(c,REDIS_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_MILLISECONDS,NULL,NULL); } int getGenericCommand(redisClient *c) { diff --git a/src/version.h b/src/version.h index 765dc9100..2872b9fe7 100644 --- a/src/version.h +++ b/src/version.h @@ -1 +1 @@ -#define REDIS_VERSION "2.6.10" +#define REDIS_VERSION "2.6.14" |