diff options
Diffstat (limited to 'src/redis.c')
-rwxr-xr-x | src/redis.c | 239 |
1 files changed, 168 insertions, 71 deletions
diff --git a/src/redis.c b/src/redis.c index a7d468104..3e9b178ac 100755 --- a/src/redis.c +++ b/src/redis.c @@ -114,7 +114,7 @@ struct redisCommand *commandTable; */ struct redisCommand redisCommandTable[] = { {"get",getCommand,2,"r",0,NULL,1,1,1,0,0}, - {"set",setCommand,3,"wm",0,noPreloadGetKeys,1,1,1,0,0}, + {"set",setCommand,-3,"wm",0,noPreloadGetKeys,1,1,1,0,0}, {"setnx",setnxCommand,3,"wm",0,noPreloadGetKeys,1,1,1,0,0}, {"setex",setexCommand,4,"wm",0,noPreloadGetKeys,1,1,1,0,0}, {"psetex",psetexCommand,4,"wm",0,noPreloadGetKeys,1,1,1,0,0}, @@ -197,7 +197,7 @@ struct redisCommand redisCommandTable[] = { {"mset",msetCommand,-3,"wm",0,NULL,1,-1,2,0,0}, {"msetnx",msetnxCommand,-3,"wm",0,NULL,1,-1,2,0,0}, {"randomkey",randomkeyCommand,1,"rR",0,NULL,0,0,0,0,0}, - {"select",selectCommand,2,"r",0,NULL,0,0,0,0,0}, + {"select",selectCommand,2,"rl",0,NULL,0,0,0,0,0}, {"move",moveCommand,3,"w",0,NULL,1,1,1,0,0}, {"rename",renameCommand,3,"w",0,renameGetKeys,1,2,1,0,0}, {"renamenx",renamenxCommand,3,"w",0,renameGetKeys,1,2,1,0,0}, @@ -207,7 +207,7 @@ struct redisCommand redisCommandTable[] = { {"pexpireat",pexpireatCommand,3,"w",0,NULL,1,1,1,0,0}, {"keys",keysCommand,2,"rS",0,NULL,0,0,0,0,0}, {"dbsize",dbsizeCommand,1,"r",0,NULL,0,0,0,0,0}, - {"auth",authCommand,2,"rs",0,NULL,0,0,0,0,0}, + {"auth",authCommand,2,"rsl",0,NULL,0,0,0,0,0}, {"ping",pingCommand,1,"r",0,NULL,0,0,0,0,0}, {"echo",echoCommand,2,"r",0,NULL,0,0,0,0,0}, {"save",saveCommand,1,"ars",0,NULL,0,0,0,0,0}, @@ -238,7 +238,7 @@ struct redisCommand redisCommandTable[] = { {"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0}, {"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0}, {"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0}, - {"publish",publishCommand,3,"pflt",0,NULL,0,0,0,0,0}, + {"publish",publishCommand,3,"pfltr",0,NULL,0,0,0,0,0}, {"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0}, {"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0}, {"restore",restoreCommand,4,"awm",0,NULL,1,1,1,0,0}, @@ -574,36 +574,32 @@ int htNeedsResize(dict *dict) { /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL * we resize the hash table to save memory */ -void tryResizeHashTables(void) { - int j; - - for (j = 0; j < server.dbnum; j++) { - if (htNeedsResize(server.db[j].dict)) - dictResize(server.db[j].dict); - if (htNeedsResize(server.db[j].expires)) - dictResize(server.db[j].expires); - } +void tryResizeHashTables(int dbid) { + if (htNeedsResize(server.db[dbid].dict)) + dictResize(server.db[dbid].dict); + if (htNeedsResize(server.db[dbid].expires)) + dictResize(server.db[dbid].expires); } /* Our hash table implementation performs rehashing incrementally while * we write/read from the hash table. Still if the server is idle, the hash * table will use two tables for a long time. So we try to use 1 millisecond - * of CPU time at every serverCron() loop in order to rehash some key. */ -void incrementallyRehash(void) { - int j; - - for (j = 0; j < server.dbnum; j++) { - /* Keys dictionary */ - if (dictIsRehashing(server.db[j].dict)) { - dictRehashMilliseconds(server.db[j].dict,1); - break; /* already used our millisecond for this loop... */ - } - /* Expires */ - if (dictIsRehashing(server.db[j].expires)) { - dictRehashMilliseconds(server.db[j].expires,1); - break; /* already used our millisecond for this loop... */ - } + * of CPU time at every call of this function to perform some rehahsing. + * + * The function returns 1 if some rehashing was performed, otherwise 0 + * is returned. */ +int incrementallyRehash(int dbid) { + /* Keys dictionary */ + if (dictIsRehashing(server.db[dbid].dict)) { + dictRehashMilliseconds(server.db[dbid].dict,1); + return 1; /* already used our millisecond for this loop... */ + } + /* Expires */ + if (dictIsRehashing(server.db[dbid].expires)) { + dictRehashMilliseconds(server.db[dbid].expires,1); + return 1; /* already used our millisecond for this loop... */ } + return 0; } /* This function is called once a background process of some kind terminates, @@ -624,28 +620,57 @@ void updateDictResizePolicy(void) { /* Try to expire a few timed out keys. The algorithm used is adaptive and * will use few CPU cycles if there are few expiring keys, otherwise * it will get more aggressive to avoid that too much memory is used by - * keys that can be removed from the keyspace. */ + * keys that can be removed from the keyspace. + * + * No more than REDIS_DBCRON_DBS_PER_CALL databases are tested at every + * iteration. */ void activeExpireCycle(void) { - int j, iteration = 0; + /* This function has some global state in order to continue the work + * incrementally across calls. */ + static unsigned int current_db = 0; /* Last DB tested. */ + static int timelimit_exit = 0; /* Time limit hit in previous call? */ + + unsigned int j, iteration = 0; + unsigned int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL; long long start = ustime(), timelimit; + /* We usually should test REDIS_DBCRON_DBS_PER_CALL per iteration, with + * two exceptions: + * + * 1) Don't test more DBs than we have. + * 2) If last time we hit the time limit, we want to scan all DBs + * in this iteration, as there is work to do in some DB and we don't want + * expired keys to use memory for too much time. */ + if (dbs_per_call > server.dbnum || timelimit_exit) + dbs_per_call = server.dbnum; + /* We can use at max REDIS_EXPIRELOOKUPS_TIME_PERC percentage of CPU time * per iteration. Since this function gets called with a frequency of - * REDIS_HZ times per second, the following is the max amount of + * server.hz times per second, the following is the max amount of * microseconds we can spend in this function. */ - timelimit = 1000000*REDIS_EXPIRELOOKUPS_TIME_PERC/REDIS_HZ/100; + timelimit = 1000000*REDIS_EXPIRELOOKUPS_TIME_PERC/server.hz/100; + timelimit_exit = 0; if (timelimit <= 0) timelimit = 1; - for (j = 0; j < server.dbnum; j++) { + for (j = 0; j < dbs_per_call; j++) { int expired; - redisDb *db = server.db+j; + redisDb *db = server.db+(current_db % server.dbnum); + + /* Increment the DB now so we are sure if we run out of time + * in the current DB we'll restart from the next. This allows to + * distribute the time evenly across DBs. */ + current_db++; /* Continue to expire if at the end of the cycle more than 25% * of the keys were expired. */ do { - unsigned long num = dictSize(db->expires); - unsigned long slots = dictSlots(db->expires); - long long now = mstime(); + unsigned long num, slots; + long long now; + + /* If there is nothing to expire try next DB ASAP. */ + if ((num = dictSize(db->expires)) == 0) break; + slots = dictSlots(db->expires); + now = mstime(); /* When there are less than 1% filled slots getting random * keys is expensive, so stop here waiting for better times... @@ -679,8 +704,12 @@ void activeExpireCycle(void) { * expire. So after a given amount of milliseconds return to the * caller waiting for the other active expire cycle. */ iteration++; - if ((iteration & 0xf) == 0 && /* check once every 16 cycles. */ - (ustime()-start) > timelimit) return; + if ((iteration & 0xf) == 0 && /* check once every 16 iterations. */ + (ustime()-start) > timelimit) + { + timelimit_exit = 1; + return; + } } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4); } } @@ -766,13 +795,13 @@ int clientsCronResizeQueryBuffer(redisClient *c) { } void clientsCron(void) { - /* Make sure to process at least 1/(REDIS_HZ*10) of clients per call. - * Since this function is called REDIS_HZ times per second we are sure that + /* Make sure to process at least 1/(server.hz*10) of clients per call. + * Since this function is called server.hz times per second we are sure that * in the worst case we process all the clients in 10 seconds. * In normal conditions (a reasonable number of clients) we process * all the clients in a shorter time. */ int numclients = listLength(server.clients); - int iterations = numclients/(REDIS_HZ*10); + int iterations = numclients/(server.hz*10); if (iterations < 50) iterations = (numclients < 50) ? numclients : 50; @@ -872,7 +901,52 @@ void checkOSMemory(void) { } -/* This is our timer interrupt, called REDIS_HZ times per second. +/* This function handles 'background' operations we are required to do + * incrementally in Redis databases, such as active key expiring, resizing, + * rehashing. */ +void databasesCron(void) { + /* Expire keys by random sampling. Not required for slaves + * as master will synthesize DELs for us. */ + if (server.active_expire_enabled && server.masterhost == NULL) + activeExpireCycle(); + + /* Perform hash tables rehashing if needed, but only if there are no + * other processes saving the DB on disk. Otherwise rehashing is bad + * as will cause a lot of copy-on-write of memory pages. */ + if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { + /* We use global counters so if we stop the computation at a given + * DB we'll be able to start from the successive in the next + * cron loop iteration. */ + static unsigned int resize_db = 0; + static unsigned int rehash_db = 0; + unsigned int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL; + unsigned int j; + + /* Don't test more DBs than we have. */ + if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum; + + /* Resize */ + for (j = 0; j < dbs_per_call; j++) { + tryResizeHashTables(resize_db % server.dbnum); + resize_db++; + } + + /* Rehash */ + if (server.activerehashing) { + for (j = 0; j < dbs_per_call; j++) { + int work_done = incrementallyRehash(rehash_db % server.dbnum); + rehash_db++; + if (work_done) { + /* If the function did some work, stop here, we'll do + * more at the next cron loop. */ + break; + } + } + } + } +} + +/* This is our timer interrupt, called server.hz times per second. * Here is where we do a number of things that need to be done asynchronously. * For instance: * @@ -886,7 +960,7 @@ void checkOSMemory(void) { * - Replication reconnection. * - Many more... * - * Everything directly called here will be called REDIS_HZ times per second, + * Everything directly called here will be called server.hz times per second, * so in order to throttle execution of things we want to do less frequently * a macro is used: run_with_period(milliseconds) { .... } */ @@ -959,17 +1033,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } } - /* We don't want to resize the hash tables while a background saving - * is in progress: the saving child is created using fork() that is - * implemented with a copy-on-write semantic in most modern systems, so - * if we resize the HT while there is the saving child at work actually - * a lot of memory movements in the parent will cause a lot of pages - * copied. */ - if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { - tryResizeHashTables(); - if (server.activerehashing) incrementallyRehash(); - } - /* Show information about connected clients */ if (!server.sentinel_mode) { run_with_period(5000) { @@ -984,6 +1047,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* We need to do a few operations on clients asynchronously. */ clientsCron(); + /* Handle background operations on Redis databases. */ + databasesCron(); + /* Start a scheduled AOF rewrite if this was requested by the user while * a BGSAVE was in progress. */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && @@ -1020,8 +1086,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { for (j = 0; j < server.saveparamslen; j++) { struct saveparam *sp = server.saveparams+j; + /* Save if we reached the given amount of changes, + * the given amount of seconds, and if the latest bgsave was + * successful or if, in case of an error, at least + * REDIS_BGSAVE_RETRY_DELAY seconds already elapsed. */ if (server.dirty >= sp->changes && - server.unixtime-server.lastsave > sp->seconds) { + server.unixtime-server.lastsave > sp->seconds && + (server.unixtime-server.lastbgsave_try > + REDIS_BGSAVE_RETRY_DELAY || + server.lastbgsave_status == REDIS_OK)) + { redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...", sp->changes, sp->seconds); rdbSaveBackground(server.rdb_filename, REDIS_BGSAVE_NORMAL, -1); @@ -1050,11 +1124,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * cron function is called. */ if (server.aof_flush_postponed_start) flushAppendOnlyFile(0); - /* Expire a few keys per cycle, only if this is a master. - * On slaves we wait for DEL operations synthesized by the master - * in order to guarantee a strict consistency. */ - if (server.masterhost == NULL) activeExpireCycle(); - /* Close clients that need to be closed asynchronous */ freeClientsInAsyncFreeQueue(); @@ -1068,7 +1137,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } server.cronloops++; - return 1000/REDIS_HZ; + return 1000/server.hz; } /* This function gets called every time Redis is entering the @@ -1176,6 +1245,7 @@ void createSharedObjects(void) { void initServerConfig() { getRandomHexChars(server.runid,REDIS_RUN_ID_SIZE); + server.hz = REDIS_DEFAULT_HZ; server.runid[REDIS_RUN_ID_SIZE] = '\0'; server.arch_bits = (sizeof(long) == 8) ? 64 : 32; server.port = REDIS_SERVERPORT; @@ -1188,6 +1258,7 @@ void initServerConfig() { server.verbosity = REDIS_NOTICE; server.maxidletime = REDIS_MAXIDLETIME; server.tcpkeepalive = 0; + server.active_expire_enabled = 1; server.client_max_querybuf_len = REDIS_MAX_QUERYBUF_LEN; server.saveparams = NULL; server.loading = 0; @@ -1213,12 +1284,15 @@ void initServerConfig() { server.aof_fd = -1; server.aof_selected_db = -1; /* Make sure the first time will not match */ server.aof_flush_postponed_start = 0; + server.aof_rewrite_incremental_fsync = 1; + server.rdb_incremental_fsync = 1; server.pidfile = zstrdup("/var/run/redis.pid"); server.rdb_filename = zstrdup("dump.rdb"); server.aof_filename = zstrdup("appendonly.aof"); server.requirepass = NULL; server.rdb_compression = 1; server.rdb_checksum = 1; + server.stop_writes_on_bgsave_err = 1; server.activerehashing = 1; server.maxclients = REDIS_MAX_CLIENTS; server.bpop_blocked_clients = 0; @@ -1256,7 +1330,7 @@ void initServerConfig() { server.repl_syncio_timeout = REDIS_REPL_SYNCIO_TIMEOUT; server.repl_serve_stale_data = 1; server.repl_slave_ro = 1; - server.repl_down_since = time(NULL); + server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ server.repl_disable_tcp_nodelay = 0; server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY; @@ -1281,6 +1355,7 @@ void initServerConfig() { * initial configuration, since command names may be changed via * redis.conf using the rename-command directive. */ server.commands = dictCreate(&commandTableDictType,NULL); + server.orig_commands = dictCreate(&commandTableDictType,NULL); populateCommandTable(); server.delCommand = lookupCommandByCString("del"); server.multiCommand = lookupCommandByCString("multi"); @@ -1406,7 +1481,8 @@ void initServer() { server.aof_child_pid = -1; aofRewriteBufferReset(); server.aof_buf = sdsempty(); - server.lastsave = time(NULL); + server.lastsave = time(NULL); /* At startup we consider the DB saved. */ + server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */ server.rdb_save_time_last = -1; server.rdb_save_time_start = -1; server.dirty = 0; @@ -1431,8 +1507,10 @@ void initServer() { server.ops_sec_last_sample_ops = 0; server.unixtime = time(NULL); server.lastbgsave_status = REDIS_OK; - server.stop_writes_on_bgsave_err = 1; - aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL); + if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { + redisPanic("create time event failed"); + exit(1); + } if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.ipfd file event."); if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, @@ -1472,7 +1550,7 @@ void populateCommandTable(void) { for (j = 0; j < numcommands; j++) { struct redisCommand *c = redisCommandTable+j; char *f = c->sflags; - int retval; + int retval1, retval2; while(*f != '\0') { switch(*f) { @@ -1493,8 +1571,11 @@ void populateCommandTable(void) { f++; } - retval = dictAdd(server.commands, sdsnew(c->name), c); - assert(retval == DICT_OK); + retval1 = dictAdd(server.commands, sdsnew(c->name), c); + /* Populate an additional dictionary that will be unaffected + * by rename-command statements in redis.conf. */ + retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c); + redisAssert(retval1 == DICT_OK && retval2 == DICT_OK); } } @@ -1562,6 +1643,20 @@ struct redisCommand *lookupCommandByCString(char *s) { return cmd; } +/* Lookup the command in the current table, if not found also check in + * the original table containing the original command names unaffected by + * redis.conf rename-command statement. + * + * This is used by functions rewriting the argument vector such as + * rewriteClientCommandVector() in order to set client->cmd pointer + * correctly even if the command was renamed. */ +struct redisCommand *lookupCommandOrOriginal(sds name) { + struct redisCommand *cmd = dictFetchValue(server.commands, name); + + if (!cmd) cmd = dictFetchValue(server.orig_commands,name); + return cmd; +} + /* Propagate the specified command (in the context of the specified database id) * to AOF and Slaves. * @@ -2020,6 +2115,7 @@ sds genRedisInfoString(char *section) { "tcp_port:%d\r\n" "uptime_in_seconds:%ld\r\n" "uptime_in_days:%ld\r\n" + "hz:%d\r\n" "lru_clock:%ld\r\n", REDIS_VERSION, redisGitSHA1(), @@ -2038,6 +2134,7 @@ sds genRedisInfoString(char *section) { server.port, uptime, uptime/(3600*24), + server.hz, (unsigned long) server.lruclock); } @@ -2698,7 +2795,7 @@ void loadDataFromDisk(void) { redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds", (float)(ustime()-start)/1000000); } else if (errno != ENOENT) { - redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting."); + redisLog(REDIS_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); exit(1); } } @@ -2707,7 +2804,7 @@ void loadDataFromDisk(void) { void redisOutOfMemoryHandler(size_t allocation_size) { redisLog(REDIS_WARNING,"Out Of Memory allocating %zu bytes!", allocation_size); - redisPanic("OOM"); + redisPanic("Redis aborting for OUT OF MEMORY"); } int main(int argc, char **argv) { |