summaryrefslogtreecommitdiff
path: root/src/redis.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/redis.c')
-rwxr-xr-xsrc/redis.c239
1 files changed, 168 insertions, 71 deletions
diff --git a/src/redis.c b/src/redis.c
index a7d468104..3e9b178ac 100755
--- a/src/redis.c
+++ b/src/redis.c
@@ -114,7 +114,7 @@ struct redisCommand *commandTable;
*/
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"r",0,NULL,1,1,1,0,0},
- {"set",setCommand,3,"wm",0,noPreloadGetKeys,1,1,1,0,0},
+ {"set",setCommand,-3,"wm",0,noPreloadGetKeys,1,1,1,0,0},
{"setnx",setnxCommand,3,"wm",0,noPreloadGetKeys,1,1,1,0,0},
{"setex",setexCommand,4,"wm",0,noPreloadGetKeys,1,1,1,0,0},
{"psetex",psetexCommand,4,"wm",0,noPreloadGetKeys,1,1,1,0,0},
@@ -197,7 +197,7 @@ struct redisCommand redisCommandTable[] = {
{"mset",msetCommand,-3,"wm",0,NULL,1,-1,2,0,0},
{"msetnx",msetnxCommand,-3,"wm",0,NULL,1,-1,2,0,0},
{"randomkey",randomkeyCommand,1,"rR",0,NULL,0,0,0,0,0},
- {"select",selectCommand,2,"r",0,NULL,0,0,0,0,0},
+ {"select",selectCommand,2,"rl",0,NULL,0,0,0,0,0},
{"move",moveCommand,3,"w",0,NULL,1,1,1,0,0},
{"rename",renameCommand,3,"w",0,renameGetKeys,1,2,1,0,0},
{"renamenx",renamenxCommand,3,"w",0,renameGetKeys,1,2,1,0,0},
@@ -207,7 +207,7 @@ struct redisCommand redisCommandTable[] = {
{"pexpireat",pexpireatCommand,3,"w",0,NULL,1,1,1,0,0},
{"keys",keysCommand,2,"rS",0,NULL,0,0,0,0,0},
{"dbsize",dbsizeCommand,1,"r",0,NULL,0,0,0,0,0},
- {"auth",authCommand,2,"rs",0,NULL,0,0,0,0,0},
+ {"auth",authCommand,2,"rsl",0,NULL,0,0,0,0,0},
{"ping",pingCommand,1,"r",0,NULL,0,0,0,0,0},
{"echo",echoCommand,2,"r",0,NULL,0,0,0,0,0},
{"save",saveCommand,1,"ars",0,NULL,0,0,0,0,0},
@@ -238,7 +238,7 @@ struct redisCommand redisCommandTable[] = {
{"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
{"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
{"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
- {"publish",publishCommand,3,"pflt",0,NULL,0,0,0,0,0},
+ {"publish",publishCommand,3,"pfltr",0,NULL,0,0,0,0,0},
{"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0},
{"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0},
{"restore",restoreCommand,4,"awm",0,NULL,1,1,1,0,0},
@@ -574,36 +574,32 @@ int htNeedsResize(dict *dict) {
/* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
* we resize the hash table to save memory */
-void tryResizeHashTables(void) {
- int j;
-
- for (j = 0; j < server.dbnum; j++) {
- if (htNeedsResize(server.db[j].dict))
- dictResize(server.db[j].dict);
- if (htNeedsResize(server.db[j].expires))
- dictResize(server.db[j].expires);
- }
+void tryResizeHashTables(int dbid) {
+ if (htNeedsResize(server.db[dbid].dict))
+ dictResize(server.db[dbid].dict);
+ if (htNeedsResize(server.db[dbid].expires))
+ dictResize(server.db[dbid].expires);
}
/* Our hash table implementation performs rehashing incrementally while
* we write/read from the hash table. Still if the server is idle, the hash
* table will use two tables for a long time. So we try to use 1 millisecond
- * of CPU time at every serverCron() loop in order to rehash some key. */
-void incrementallyRehash(void) {
- int j;
-
- for (j = 0; j < server.dbnum; j++) {
- /* Keys dictionary */
- if (dictIsRehashing(server.db[j].dict)) {
- dictRehashMilliseconds(server.db[j].dict,1);
- break; /* already used our millisecond for this loop... */
- }
- /* Expires */
- if (dictIsRehashing(server.db[j].expires)) {
- dictRehashMilliseconds(server.db[j].expires,1);
- break; /* already used our millisecond for this loop... */
- }
+ * of CPU time at every call of this function to perform some rehahsing.
+ *
+ * The function returns 1 if some rehashing was performed, otherwise 0
+ * is returned. */
+int incrementallyRehash(int dbid) {
+ /* Keys dictionary */
+ if (dictIsRehashing(server.db[dbid].dict)) {
+ dictRehashMilliseconds(server.db[dbid].dict,1);
+ return 1; /* already used our millisecond for this loop... */
+ }
+ /* Expires */
+ if (dictIsRehashing(server.db[dbid].expires)) {
+ dictRehashMilliseconds(server.db[dbid].expires,1);
+ return 1; /* already used our millisecond for this loop... */
}
+ return 0;
}
/* This function is called once a background process of some kind terminates,
@@ -624,28 +620,57 @@ void updateDictResizePolicy(void) {
/* Try to expire a few timed out keys. The algorithm used is adaptive and
* will use few CPU cycles if there are few expiring keys, otherwise
* it will get more aggressive to avoid that too much memory is used by
- * keys that can be removed from the keyspace. */
+ * keys that can be removed from the keyspace.
+ *
+ * No more than REDIS_DBCRON_DBS_PER_CALL databases are tested at every
+ * iteration. */
void activeExpireCycle(void) {
- int j, iteration = 0;
+ /* This function has some global state in order to continue the work
+ * incrementally across calls. */
+ static unsigned int current_db = 0; /* Last DB tested. */
+ static int timelimit_exit = 0; /* Time limit hit in previous call? */
+
+ unsigned int j, iteration = 0;
+ unsigned int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL;
long long start = ustime(), timelimit;
+ /* We usually should test REDIS_DBCRON_DBS_PER_CALL per iteration, with
+ * two exceptions:
+ *
+ * 1) Don't test more DBs than we have.
+ * 2) If last time we hit the time limit, we want to scan all DBs
+ * in this iteration, as there is work to do in some DB and we don't want
+ * expired keys to use memory for too much time. */
+ if (dbs_per_call > server.dbnum || timelimit_exit)
+ dbs_per_call = server.dbnum;
+
/* We can use at max REDIS_EXPIRELOOKUPS_TIME_PERC percentage of CPU time
* per iteration. Since this function gets called with a frequency of
- * REDIS_HZ times per second, the following is the max amount of
+ * server.hz times per second, the following is the max amount of
* microseconds we can spend in this function. */
- timelimit = 1000000*REDIS_EXPIRELOOKUPS_TIME_PERC/REDIS_HZ/100;
+ timelimit = 1000000*REDIS_EXPIRELOOKUPS_TIME_PERC/server.hz/100;
+ timelimit_exit = 0;
if (timelimit <= 0) timelimit = 1;
- for (j = 0; j < server.dbnum; j++) {
+ for (j = 0; j < dbs_per_call; j++) {
int expired;
- redisDb *db = server.db+j;
+ redisDb *db = server.db+(current_db % server.dbnum);
+
+ /* Increment the DB now so we are sure if we run out of time
+ * in the current DB we'll restart from the next. This allows to
+ * distribute the time evenly across DBs. */
+ current_db++;
/* Continue to expire if at the end of the cycle more than 25%
* of the keys were expired. */
do {
- unsigned long num = dictSize(db->expires);
- unsigned long slots = dictSlots(db->expires);
- long long now = mstime();
+ unsigned long num, slots;
+ long long now;
+
+ /* If there is nothing to expire try next DB ASAP. */
+ if ((num = dictSize(db->expires)) == 0) break;
+ slots = dictSlots(db->expires);
+ now = mstime();
/* When there are less than 1% filled slots getting random
* keys is expensive, so stop here waiting for better times...
@@ -679,8 +704,12 @@ void activeExpireCycle(void) {
* expire. So after a given amount of milliseconds return to the
* caller waiting for the other active expire cycle. */
iteration++;
- if ((iteration & 0xf) == 0 && /* check once every 16 cycles. */
- (ustime()-start) > timelimit) return;
+ if ((iteration & 0xf) == 0 && /* check once every 16 iterations. */
+ (ustime()-start) > timelimit)
+ {
+ timelimit_exit = 1;
+ return;
+ }
} while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
}
}
@@ -766,13 +795,13 @@ int clientsCronResizeQueryBuffer(redisClient *c) {
}
void clientsCron(void) {
- /* Make sure to process at least 1/(REDIS_HZ*10) of clients per call.
- * Since this function is called REDIS_HZ times per second we are sure that
+ /* Make sure to process at least 1/(server.hz*10) of clients per call.
+ * Since this function is called server.hz times per second we are sure that
* in the worst case we process all the clients in 10 seconds.
* In normal conditions (a reasonable number of clients) we process
* all the clients in a shorter time. */
int numclients = listLength(server.clients);
- int iterations = numclients/(REDIS_HZ*10);
+ int iterations = numclients/(server.hz*10);
if (iterations < 50)
iterations = (numclients < 50) ? numclients : 50;
@@ -872,7 +901,52 @@ void checkOSMemory(void) {
}
-/* This is our timer interrupt, called REDIS_HZ times per second.
+/* This function handles 'background' operations we are required to do
+ * incrementally in Redis databases, such as active key expiring, resizing,
+ * rehashing. */
+void databasesCron(void) {
+ /* Expire keys by random sampling. Not required for slaves
+ * as master will synthesize DELs for us. */
+ if (server.active_expire_enabled && server.masterhost == NULL)
+ activeExpireCycle();
+
+ /* Perform hash tables rehashing if needed, but only if there are no
+ * other processes saving the DB on disk. Otherwise rehashing is bad
+ * as will cause a lot of copy-on-write of memory pages. */
+ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
+ /* We use global counters so if we stop the computation at a given
+ * DB we'll be able to start from the successive in the next
+ * cron loop iteration. */
+ static unsigned int resize_db = 0;
+ static unsigned int rehash_db = 0;
+ unsigned int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL;
+ unsigned int j;
+
+ /* Don't test more DBs than we have. */
+ if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
+
+ /* Resize */
+ for (j = 0; j < dbs_per_call; j++) {
+ tryResizeHashTables(resize_db % server.dbnum);
+ resize_db++;
+ }
+
+ /* Rehash */
+ if (server.activerehashing) {
+ for (j = 0; j < dbs_per_call; j++) {
+ int work_done = incrementallyRehash(rehash_db % server.dbnum);
+ rehash_db++;
+ if (work_done) {
+ /* If the function did some work, stop here, we'll do
+ * more at the next cron loop. */
+ break;
+ }
+ }
+ }
+ }
+}
+
+/* This is our timer interrupt, called server.hz times per second.
* Here is where we do a number of things that need to be done asynchronously.
* For instance:
*
@@ -886,7 +960,7 @@ void checkOSMemory(void) {
* - Replication reconnection.
* - Many more...
*
- * Everything directly called here will be called REDIS_HZ times per second,
+ * Everything directly called here will be called server.hz times per second,
* so in order to throttle execution of things we want to do less frequently
* a macro is used: run_with_period(milliseconds) { .... }
*/
@@ -959,17 +1033,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
}
}
- /* We don't want to resize the hash tables while a background saving
- * is in progress: the saving child is created using fork() that is
- * implemented with a copy-on-write semantic in most modern systems, so
- * if we resize the HT while there is the saving child at work actually
- * a lot of memory movements in the parent will cause a lot of pages
- * copied. */
- if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
- tryResizeHashTables();
- if (server.activerehashing) incrementallyRehash();
- }
-
/* Show information about connected clients */
if (!server.sentinel_mode) {
run_with_period(5000) {
@@ -984,6 +1047,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* We need to do a few operations on clients asynchronously. */
clientsCron();
+ /* Handle background operations on Redis databases. */
+ databasesCron();
+
/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
@@ -1020,8 +1086,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
+ /* Save if we reached the given amount of changes,
+ * the given amount of seconds, and if the latest bgsave was
+ * successful or if, in case of an error, at least
+ * REDIS_BGSAVE_RETRY_DELAY seconds already elapsed. */
if (server.dirty >= sp->changes &&
- server.unixtime-server.lastsave > sp->seconds) {
+ server.unixtime-server.lastsave > sp->seconds &&
+ (server.unixtime-server.lastbgsave_try >
+ REDIS_BGSAVE_RETRY_DELAY ||
+ server.lastbgsave_status == REDIS_OK))
+ {
redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, sp->seconds);
rdbSaveBackground(server.rdb_filename, REDIS_BGSAVE_NORMAL, -1);
@@ -1050,11 +1124,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
* cron function is called. */
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
- /* Expire a few keys per cycle, only if this is a master.
- * On slaves we wait for DEL operations synthesized by the master
- * in order to guarantee a strict consistency. */
- if (server.masterhost == NULL) activeExpireCycle();
-
/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue();
@@ -1068,7 +1137,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
}
server.cronloops++;
- return 1000/REDIS_HZ;
+ return 1000/server.hz;
}
/* This function gets called every time Redis is entering the
@@ -1176,6 +1245,7 @@ void createSharedObjects(void) {
void initServerConfig() {
getRandomHexChars(server.runid,REDIS_RUN_ID_SIZE);
+ server.hz = REDIS_DEFAULT_HZ;
server.runid[REDIS_RUN_ID_SIZE] = '\0';
server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
server.port = REDIS_SERVERPORT;
@@ -1188,6 +1258,7 @@ void initServerConfig() {
server.verbosity = REDIS_NOTICE;
server.maxidletime = REDIS_MAXIDLETIME;
server.tcpkeepalive = 0;
+ server.active_expire_enabled = 1;
server.client_max_querybuf_len = REDIS_MAX_QUERYBUF_LEN;
server.saveparams = NULL;
server.loading = 0;
@@ -1213,12 +1284,15 @@ void initServerConfig() {
server.aof_fd = -1;
server.aof_selected_db = -1; /* Make sure the first time will not match */
server.aof_flush_postponed_start = 0;
+ server.aof_rewrite_incremental_fsync = 1;
+ server.rdb_incremental_fsync = 1;
server.pidfile = zstrdup("/var/run/redis.pid");
server.rdb_filename = zstrdup("dump.rdb");
server.aof_filename = zstrdup("appendonly.aof");
server.requirepass = NULL;
server.rdb_compression = 1;
server.rdb_checksum = 1;
+ server.stop_writes_on_bgsave_err = 1;
server.activerehashing = 1;
server.maxclients = REDIS_MAX_CLIENTS;
server.bpop_blocked_clients = 0;
@@ -1256,7 +1330,7 @@ void initServerConfig() {
server.repl_syncio_timeout = REDIS_REPL_SYNCIO_TIMEOUT;
server.repl_serve_stale_data = 1;
server.repl_slave_ro = 1;
- server.repl_down_since = time(NULL);
+ server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
server.repl_disable_tcp_nodelay = 0;
server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY;
@@ -1281,6 +1355,7 @@ void initServerConfig() {
* initial configuration, since command names may be changed via
* redis.conf using the rename-command directive. */
server.commands = dictCreate(&commandTableDictType,NULL);
+ server.orig_commands = dictCreate(&commandTableDictType,NULL);
populateCommandTable();
server.delCommand = lookupCommandByCString("del");
server.multiCommand = lookupCommandByCString("multi");
@@ -1406,7 +1481,8 @@ void initServer() {
server.aof_child_pid = -1;
aofRewriteBufferReset();
server.aof_buf = sdsempty();
- server.lastsave = time(NULL);
+ server.lastsave = time(NULL); /* At startup we consider the DB saved. */
+ server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */
server.rdb_save_time_last = -1;
server.rdb_save_time_start = -1;
server.dirty = 0;
@@ -1431,8 +1507,10 @@ void initServer() {
server.ops_sec_last_sample_ops = 0;
server.unixtime = time(NULL);
server.lastbgsave_status = REDIS_OK;
- server.stop_writes_on_bgsave_err = 1;
- aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
+ if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
+ redisPanic("create time event failed");
+ exit(1);
+ }
if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.ipfd file event.");
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
@@ -1472,7 +1550,7 @@ void populateCommandTable(void) {
for (j = 0; j < numcommands; j++) {
struct redisCommand *c = redisCommandTable+j;
char *f = c->sflags;
- int retval;
+ int retval1, retval2;
while(*f != '\0') {
switch(*f) {
@@ -1493,8 +1571,11 @@ void populateCommandTable(void) {
f++;
}
- retval = dictAdd(server.commands, sdsnew(c->name), c);
- assert(retval == DICT_OK);
+ retval1 = dictAdd(server.commands, sdsnew(c->name), c);
+ /* Populate an additional dictionary that will be unaffected
+ * by rename-command statements in redis.conf. */
+ retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
+ redisAssert(retval1 == DICT_OK && retval2 == DICT_OK);
}
}
@@ -1562,6 +1643,20 @@ struct redisCommand *lookupCommandByCString(char *s) {
return cmd;
}
+/* Lookup the command in the current table, if not found also check in
+ * the original table containing the original command names unaffected by
+ * redis.conf rename-command statement.
+ *
+ * This is used by functions rewriting the argument vector such as
+ * rewriteClientCommandVector() in order to set client->cmd pointer
+ * correctly even if the command was renamed. */
+struct redisCommand *lookupCommandOrOriginal(sds name) {
+ struct redisCommand *cmd = dictFetchValue(server.commands, name);
+
+ if (!cmd) cmd = dictFetchValue(server.orig_commands,name);
+ return cmd;
+}
+
/* Propagate the specified command (in the context of the specified database id)
* to AOF and Slaves.
*
@@ -2020,6 +2115,7 @@ sds genRedisInfoString(char *section) {
"tcp_port:%d\r\n"
"uptime_in_seconds:%ld\r\n"
"uptime_in_days:%ld\r\n"
+ "hz:%d\r\n"
"lru_clock:%ld\r\n",
REDIS_VERSION,
redisGitSHA1(),
@@ -2038,6 +2134,7 @@ sds genRedisInfoString(char *section) {
server.port,
uptime,
uptime/(3600*24),
+ server.hz,
(unsigned long) server.lruclock);
}
@@ -2698,7 +2795,7 @@ void loadDataFromDisk(void) {
redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
(float)(ustime()-start)/1000000);
} else if (errno != ENOENT) {
- redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting.");
+ redisLog(REDIS_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
exit(1);
}
}
@@ -2707,7 +2804,7 @@ void loadDataFromDisk(void) {
void redisOutOfMemoryHandler(size_t allocation_size) {
redisLog(REDIS_WARNING,"Out Of Memory allocating %zu bytes!",
allocation_size);
- redisPanic("OOM");
+ redisPanic("Redis aborting for OUT OF MEMORY");
}
int main(int argc, char **argv) {