diff options
Diffstat (limited to 'src/server.c')
-rw-r--r-- | src/server.c | 457 |
1 files changed, 338 insertions, 119 deletions
diff --git a/src/server.c b/src/server.c index 712cda1bd..d16ff0a8e 100644 --- a/src/server.c +++ b/src/server.c @@ -146,6 +146,8 @@ volatile unsigned long lru_clock; /* Server global current LRU time. */ * in this condition but just a few. * * no-monitor: Do not automatically propagate the command on MONITOR. + * + * no-slowlog: Do not automatically propagate the command to the slowlog. * * cluster-asking: Perform an implicit ASKING for this command, so the * command will be accepted in cluster mode if the slot is marked @@ -627,7 +629,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"auth",authCommand,-2, - "no-script ok-loading ok-stale fast @connection", + "no-script ok-loading ok-stale fast no-monitor no-slowlog @connection", 0,NULL,0,0,0,0,0,0}, /* We don't allow PING during loading since in Redis PING is used as @@ -670,7 +672,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"exec",execCommand,1, - "no-script no-monitor @transaction", + "no-script no-monitor no-slowlog @transaction", 0,NULL,0,0,0,0,0,0}, {"discard",discardCommand,1, @@ -715,7 +717,7 @@ struct redisCommand redisCommandTable[] = { {"touch",touchCommand,-2, "read-only fast @keyspace", - 0,NULL,1,1,1,0,0,0}, + 0,NULL,1,-1,1,0,0,0}, {"pttl",pttlCommand,2, "read-only fast random @keyspace", @@ -822,7 +824,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"hello",helloCommand,-2, - "no-script fast @connection", + "no-script fast no-monitor no-slowlog @connection", 0,NULL,0,0,0,0,0,0}, /* EVAL can modify the dataset, however it is not flagged as a write @@ -863,7 +865,7 @@ struct redisCommand redisCommandTable[] = { "no-script @keyspace", 0,NULL,0,0,0,0,0,0}, - {"command",commandCommand,0, + {"command",commandCommand,-1, "ok-loading ok-stale random @connection", 0,NULL,0,0,0,0,0,0}, @@ -1447,12 +1449,18 @@ int incrementallyRehash(int dbid) { * for dict.c to resize the hash tables accordingly to the fact we have o not * running childs. */ void updateDictResizePolicy(void) { - if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) + if (!hasActiveChildProcess()) dictEnableResize(); else dictDisableResize(); } +int hasActiveChildProcess() { + return server.rdb_child_pid != -1 || + server.aof_child_pid != -1 || + server.module_child_pid != -1; +} + /* ======================= Cron: called every 100 ms ======================== */ /* Add a sample to the operations per second array of samples. */ @@ -1674,10 +1682,12 @@ void clientsCron(void) { void databasesCron(void) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ - if (server.active_expire_enabled && server.masterhost == NULL) { - activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); - } else if (server.masterhost != NULL) { - expireSlaveKeys(); + if (server.active_expire_enabled) { + if (server.masterhost == NULL) { + activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); + } else { + expireSlaveKeys(); + } } /* Defrag keys gradually. */ @@ -1687,7 +1697,7 @@ void databasesCron(void) { /* Perform hash tables rehashing if needed, but only if there are no * other processes saving the DB on disk. Otherwise rehashing is bad * as will cause a lot of copy-on-write of memory pages. */ - if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { + if (!hasActiveChildProcess()) { /* We use global counters so if we stop the computation at a given * DB we'll be able to start from the successive in the next * cron loop iteration. */ @@ -1728,19 +1738,76 @@ void databasesCron(void) { * every object access, and accuracy is not needed. To access a global var is * a lot faster than calling time(NULL) */ void updateCachedTime(void) { - time_t unixtime = time(NULL); - atomicSet(server.unixtime,unixtime); + server.unixtime = time(NULL); server.mstime = mstime(); - /* To get information about daylight saving time, we need to call localtime_r - * and cache the result. However calling localtime_r in this context is safe - * since we will never fork() while here, in the main thread. The logging - * function will call a thread safe version of localtime that has no locks. */ + /* To get information about daylight saving time, we need to call + * localtime_r and cache the result. However calling localtime_r in this + * context is safe since we will never fork() while here, in the main + * thread. The logging function will call a thread safe version of + * localtime that has no locks. */ struct tm tm; - localtime_r(&server.unixtime,&tm); + time_t ut = server.unixtime; + localtime_r(&ut,&tm); server.daylight_active = tm.tm_isdst; } +void checkChildrenDone(void) { + int statloc; + pid_t pid; + + /* If we have a diskless rdb child (note that we support only one concurrent + * child), we want to avoid collecting it's exit status and acting on it + * as long as we didn't finish to drain the pipe, since then we're at risk + * of starting a new fork and a new pipe before we're done with the previous + * one. */ + if (server.rdb_child_pid != -1 && server.rdb_pipe_conns) + return; + + if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { + int exitcode = WEXITSTATUS(statloc); + int bysignal = 0; + + if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); + + /* sigKillChildHandler catches the signal and calls exit(), but we + * must make sure not to flag lastbgsave_status, etc incorrectly. + * We could directly terminate the child process via SIGUSR1 + * without handling it, but in this case Valgrind will log an + * annoying error. */ + if (exitcode == SERVER_CHILD_NOERROR_RETVAL) { + bysignal = SIGUSR1; + exitcode = 1; + } + + if (pid == -1) { + serverLog(LL_WARNING,"wait3() returned an error: %s. " + "rdb_child_pid = %d, aof_child_pid = %d, module_child_pid = %d", + strerror(errno), + (int) server.rdb_child_pid, + (int) server.aof_child_pid, + (int) server.module_child_pid); + } else if (pid == server.rdb_child_pid) { + backgroundSaveDoneHandler(exitcode,bysignal); + if (!bysignal && exitcode == 0) receiveChildInfo(); + } else if (pid == server.aof_child_pid) { + backgroundRewriteDoneHandler(exitcode,bysignal); + if (!bysignal && exitcode == 0) receiveChildInfo(); + } else if (pid == server.module_child_pid) { + ModuleForkDoneHandler(exitcode,bysignal); + if (!bysignal && exitcode == 0) receiveChildInfo(); + } else { + if (!ldbRemoveChild(pid)) { + serverLog(LL_WARNING, + "Warning, detected child with unmatched pid: %ld", + (long)pid); + } + } + updateDictResizePolicy(); + closeChildInfoPipe(); + } +} + /* This is our timer interrupt, called server.hz times per second. * Here is where we do a number of things that need to be done asynchronously. * For instance: @@ -1807,8 +1874,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * * Note that you can change the resolution altering the * LRU_CLOCK_RESOLUTION define. */ - unsigned long lruclock = getLRUClock(); - atomicSet(server.lruclock,lruclock); + server.lruclock = getLRUClock(); /* Record the max memory used since the server was started. */ if (zmalloc_used_memory() > server.stat_peak_memory) @@ -1884,47 +1950,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* Start a scheduled AOF rewrite if this was requested by the user while * a BGSAVE was in progress. */ - if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && + if (!hasActiveChildProcess() && server.aof_rewrite_scheduled) { rewriteAppendOnlyFileBackground(); } /* Check if a background saving or AOF rewrite in progress terminated. */ - if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || - ldbPendingChildren()) + if (hasActiveChildProcess() || ldbPendingChildren()) { - int statloc; - pid_t pid; - - if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { - int exitcode = WEXITSTATUS(statloc); - int bysignal = 0; - - if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); - - if (pid == -1) { - serverLog(LL_WARNING,"wait3() returned an error: %s. " - "rdb_child_pid = %d, aof_child_pid = %d", - strerror(errno), - (int) server.rdb_child_pid, - (int) server.aof_child_pid); - } else if (pid == server.rdb_child_pid) { - backgroundSaveDoneHandler(exitcode,bysignal); - if (!bysignal && exitcode == 0) receiveChildInfo(); - } else if (pid == server.aof_child_pid) { - backgroundRewriteDoneHandler(exitcode,bysignal); - if (!bysignal && exitcode == 0) receiveChildInfo(); - } else { - if (!ldbRemoveChild(pid)) { - serverLog(LL_WARNING, - "Warning, detected child with unmatched pid: %ld", - (long)pid); - } - } - updateDictResizePolicy(); - closeChildInfoPipe(); - } + checkChildrenDone(); } else { /* If there is not a background saving/rewrite in progress check if * we have to save/rewrite now. */ @@ -1952,8 +1987,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* Trigger an AOF rewrite if needed. */ if (server.aof_state == AOF_ON && - server.rdb_child_pid == -1 && - server.aof_child_pid == -1 && + !hasActiveChildProcess() && server.aof_rewrite_perc && server.aof_current_size > server.aof_rewrite_min_size) { @@ -1981,9 +2015,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { flushAppendOnlyFile(0); } - /* Close clients that need to be closed asynchronous */ - freeClientsInAsyncFreeQueue(); - /* Clear the paused clients flag if needed. */ clientsArePaused(); /* Don't check return value, just use the side effect.*/ @@ -2004,6 +2035,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { migrateCloseTimedoutSockets(); } + /* Stop the I/O threads if we don't have enough pending work. */ + stopThreadedIOIfNeeded(); + /* Start a scheduled BGSAVE if the corresponding flag is set. This is * useful when we are forced to postpone a BGSAVE because an AOF * rewrite is in progress. @@ -2011,7 +2045,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * Note: this code must be after the replicationCron() call above so * make sure when refactoring this file to keep this order. This is useful * because we want to give priority to RDB savings for replication. */ - if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && + if (!hasActiveChildProcess() && server.rdb_bgsave_scheduled && (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY || server.lastbgsave_status == C_OK)) @@ -2032,6 +2066,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); + /* Handle TLS pending data. (must be done before flushAppendOnlyFile) */ + tlsProcessPendingData(); + /* If tls still has pending unread data don't sleep at all. */ + aeSetDontWait(server.el, tlsHasPendingData()); + /* Call the Redis Cluster before sleep function. Note that this function * may change the state of Redis Cluster (from ok to fail or vice versa), * so it's a good idea to call it before serving the unblocked clients @@ -2075,7 +2114,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) { flushAppendOnlyFile(0); /* Handle writes with pending output buffers. */ - handleClientsWithPendingWrites(); + handleClientsWithPendingWritesUsingThreads(); + + /* Close clients that need to be closed asynchronous */ + freeClientsInAsyncFreeQueue(); /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this @@ -2089,6 +2131,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { void afterSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); if (moduleCount()) moduleAcquireGIL(); + handleClientsWithPendingReadsUsingThreads(); } /* =========================== Server initialization ======================== */ @@ -2153,6 +2196,16 @@ void createSharedObjects(void) { shared.nullarray[2] = createObject(OBJ_STRING,sdsnew("*-1\r\n")); shared.nullarray[3] = createObject(OBJ_STRING,sdsnew("_\r\n")); + shared.emptymap[0] = NULL; + shared.emptymap[1] = NULL; + shared.emptymap[2] = createObject(OBJ_STRING,sdsnew("*0\r\n")); + shared.emptymap[3] = createObject(OBJ_STRING,sdsnew("%0\r\n")); + + shared.emptyset[0] = NULL; + shared.emptyset[1] = NULL; + shared.emptyset[2] = createObject(OBJ_STRING,sdsnew("*0\r\n")); + shared.emptyset[3] = createObject(OBJ_STRING,sdsnew("~0\r\n")); + for (j = 0; j < PROTO_SHARED_SELECT_CMDS; j++) { char dictid_str[64]; int dictid_len; @@ -2199,10 +2252,6 @@ void createSharedObjects(void) { void initServerConfig(void) { int j; - pthread_mutex_init(&server.next_client_id_mutex,NULL); - pthread_mutex_init(&server.lruclock_mutex,NULL); - pthread_mutex_init(&server.unixtime_mutex,NULL); - updateCachedTime(); getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE); server.runid[CONFIG_RUN_ID_SIZE] = '\0'; @@ -2215,11 +2264,13 @@ void initServerConfig(void) { server.dynamic_hz = CONFIG_DEFAULT_DYNAMIC_HZ; server.arch_bits = (sizeof(long) == 8) ? 64 : 32; server.port = CONFIG_DEFAULT_SERVER_PORT; + server.tls_port = CONFIG_DEFAULT_SERVER_TLS_PORT; server.tcp_backlog = CONFIG_DEFAULT_TCP_BACKLOG; server.bindaddr_count = 0; server.unixsocket = NULL; server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM; server.ipfd_count = 0; + server.tlsfd_count = 0; server.sofd = -1; server.protected_mode = CONFIG_DEFAULT_PROTECTED_MODE; server.gopher_enabled = CONFIG_DEFAULT_GOPHER_ENABLED; @@ -2228,6 +2279,7 @@ void initServerConfig(void) { server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT; server.tcpkeepalive = CONFIG_DEFAULT_TCP_KEEPALIVE; server.active_expire_enabled = 1; + server.jemalloc_bg_thread = 1; server.active_defrag_enabled = CONFIG_DEFAULT_ACTIVE_DEFRAG; server.active_defrag_ignore_bytes = CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES; server.active_defrag_threshold_lower = CONFIG_DEFAULT_DEFRAG_THRESHOLD_LOWER; @@ -2253,6 +2305,7 @@ void initServerConfig(void) { server.aof_rewrite_min_size = AOF_REWRITE_MIN_SIZE; server.aof_rewrite_base_size = 0; server.aof_rewrite_scheduled = 0; + server.aof_flush_sleep = 0; server.aof_last_fsync = time(NULL); server.aof_rewrite_time_last = -1; server.aof_rewrite_time_start = -1; @@ -2263,6 +2316,8 @@ void initServerConfig(void) { server.aof_flush_postponed_start = 0; server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC; server.rdb_save_incremental_fsync = CONFIG_DEFAULT_RDB_SAVE_INCREMENTAL_FSYNC; + server.rdb_key_save_delay = CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY; + server.key_load_delay = CONFIG_DEFAULT_KEY_LOAD_DELAY; server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED; server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE; server.pidfile = NULL; @@ -2314,9 +2369,10 @@ void initServerConfig(void) { server.lazyfree_lazy_server_del = CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL; server.always_show_logo = CONFIG_DEFAULT_ALWAYS_SHOW_LOGO; server.lua_time_limit = LUA_SCRIPT_TIME_LIMIT; + server.io_threads_num = CONFIG_DEFAULT_IO_THREADS_NUM; + server.io_threads_do_reads = CONFIG_DEFAULT_IO_THREADS_DO_READS; - unsigned int lruclock = getLRUClock(); - atomicSet(server.lruclock,lruclock); + server.lruclock = getLRUClock(); resetServerSaveParams(); appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ @@ -2331,6 +2387,9 @@ void initServerConfig(void) { server.cached_master = NULL; server.master_initial_offset = -1; server.repl_state = REPL_STATE_NONE; + server.repl_transfer_tmpfile = NULL; + server.repl_transfer_fd = -1; + server.repl_transfer_s = NULL; server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA; server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY; @@ -2339,6 +2398,7 @@ void initServerConfig(void) { server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY; server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC; + server.repl_diskless_load = CONFIG_DEFAULT_REPL_DISKLESS_LOAD; server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY; server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD; server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT; @@ -2395,6 +2455,9 @@ void initServerConfig(void) { /* Latency monitor */ server.latency_monitor_threshold = CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD; + /* Tracking. */ + server.tracking_table_max_fill = CONFIG_DEFAULT_TRACKING_TABLE_MAX_FILL; + /* Debugging */ server.assert_failed = "<no assertion failed>"; server.assert_file = "<no file>"; @@ -2714,6 +2777,7 @@ void initServer(void) { server.slaves = listCreate(); server.monitors = listCreate(); server.clients_pending_write = listCreate(); + server.clients_pending_read = listCreate(); server.slaveseldb = -1; /* Force to emit the first SELECT command. */ server.unblocked_clients = listCreate(); server.ready_keys = listCreate(); @@ -2722,6 +2786,11 @@ void initServer(void) { server.clients_paused = 0; server.system_memory_size = zmalloc_get_memory_size(); + if (server.tls_port && tlsConfigure(&server.tls_ctx_config) == C_ERR) { + serverLog(LL_WARNING, "Failed to configure TLS. Check logs for more info."); + exit(1); + } + createSharedObjects(); adjustOpenFilesLimit(); server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); @@ -2737,6 +2806,9 @@ void initServer(void) { if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR) exit(1); + if (server.tls_port != 0 && + listenToPort(server.tls_port,server.tlsfd,&server.tlsfd_count) == C_ERR) + exit(1); /* Open the listening Unix domain socket. */ if (server.unixsocket != NULL) { @@ -2751,7 +2823,7 @@ void initServer(void) { } /* Abort if there are no listening sockets at all. */ - if (server.ipfd_count == 0 && server.sofd < 0) { + if (server.ipfd_count == 0 && server.tlsfd_count == 0 && server.sofd < 0) { serverLog(LL_WARNING, "Configured to not listen anywhere, exiting."); exit(1); } @@ -2775,7 +2847,13 @@ void initServer(void) { server.cronloops = 0; server.rdb_child_pid = -1; server.aof_child_pid = -1; + server.module_child_pid = -1; server.rdb_child_type = RDB_CHILD_TYPE_NONE; + server.rdb_pipe_conns = NULL; + server.rdb_pipe_numconns = 0; + server.rdb_pipe_numconns_writing = 0; + server.rdb_pipe_buff = NULL; + server.rdb_pipe_bufflen = 0; server.rdb_bgsave_scheduled = 0; server.child_info_pipe[0] = -1; server.child_info_pipe[1] = -1; @@ -2793,6 +2871,7 @@ void initServer(void) { server.stat_peak_memory = 0; server.stat_rdb_cow_bytes = 0; server.stat_aof_cow_bytes = 0; + server.stat_module_cow_bytes = 0; server.cron_malloc_stats.zmalloc_used = 0; server.cron_malloc_stats.process_rss = 0; server.cron_malloc_stats.allocator_allocated = 0; @@ -2821,6 +2900,14 @@ void initServer(void) { "Unrecoverable error creating server.ipfd file event."); } } + for (j = 0; j < server.tlsfd_count; j++) { + if (aeCreateFileEvent(server.el, server.tlsfd[j], AE_READABLE, + acceptTLSHandler,NULL) == AE_ERR) + { + serverPanic( + "Unrecoverable error creating server.tlsfd file event."); + } + } if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event."); @@ -2860,7 +2947,17 @@ void initServer(void) { scriptingInit(1); slowlogInit(); latencyMonitorInit(); +} + +/* Some steps in server initialization need to be done last (after modules + * are loaded). + * Specifically, creation of threads due to a race bug in ld.so, in which + * Thread Local Storage initialization collides with dlopen call. + * see: https://sourceware.org/bugzilla/show_bug.cgi?id=19329 */ +void InitServerLast() { bioInit(); + initThreadedIO(); + set_jemalloc_bg_thread(server.jemalloc_bg_thread); server.initial_memory_usage = zmalloc_used_memory(); } @@ -2899,6 +2996,8 @@ int populateCommandTableParseFlags(struct redisCommand *c, char *strflags) { c->flags |= CMD_STALE; } else if (!strcasecmp(flag,"no-monitor")) { c->flags |= CMD_SKIP_MONITOR; + } else if (!strcasecmp(flag,"no-slowlog")) { + c->flags |= CMD_SKIP_SLOWLOG; } else if (!strcasecmp(flag,"cluster-asking")) { c->flags |= CMD_ASKING; } else if (!strcasecmp(flag,"fast")) { @@ -3183,12 +3282,13 @@ void call(client *c, int flags) { /* Log the command into the Slow log if needed, and populate the * per-command statistics that we show in INFO commandstats. */ - if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) { + if (flags & CMD_CALL_SLOWLOG && !(c->cmd->flags & CMD_SKIP_SLOWLOG)) { char *latency_event = (c->cmd->flags & CMD_FAST) ? "fast-command" : "command"; latencyAddSampleIfNeeded(latency_event,duration/1000); slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration); } + if (flags & CMD_CALL_STATS) { /* use the real command that was executed (cmd and lastamc) may be * different, in case of MULTI-EXEC or re-written commands such as @@ -3256,6 +3356,16 @@ void call(client *c, int flags) { redisOpArrayFree(&server.also_propagate); } server.also_propagate = prev_also_propagate; + + /* If the client has keys tracking enabled for client side caching, + * make sure to remember the keys it fetched via this command. */ + if (c->cmd->flags & CMD_READONLY) { + client *caller = (c->flags & CLIENT_LUA && server.lua_caller) ? + server.lua_caller : c; + if (caller->flags & CLIENT_TRACKING) + trackingRememberKeys(caller); + } + server.stat_numcommands++; } @@ -3268,6 +3378,8 @@ void call(client *c, int flags) { * other operations can be performed by the caller. Otherwise * if C_ERR is returned the client was destroyed (i.e. after QUIT). */ int processCommand(client *c) { + moduleCallCommandFilters(c); + /* The QUIT command is handled separately. Normal command procs will * go through checking for replication and QUIT will cause trouble * when FORCE_REPLICATION is enabled and would be implemented in @@ -3301,11 +3413,12 @@ int processCommand(client *c) { /* Check if the user is authenticated. This check is skipped in case * the default user is flagged as "nopass" and is active. */ - int auth_required = !(DefaultUser->flags & USER_FLAG_NOPASS) && + int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) || + DefaultUser->flags & USER_FLAG_DISABLED) && !c->authenticated; - if (auth_required || DefaultUser->flags & USER_FLAG_DISABLED) { + if (auth_required) { /* AUTH and HELLO are valid even in non authenticated state. */ - if (c->cmd->proc != authCommand || c->cmd->proc == helloCommand) { + if (c->cmd->proc != authCommand && c->cmd->proc != helloCommand) { flagTransaction(c); addReply(c,shared.noautherr); return C_OK; @@ -3320,7 +3433,7 @@ int processCommand(client *c) { if (acl_retval == ACL_DENIED_CMD) addReplyErrorFormat(c, "-NOPERM this user has no permissions to run " - "the '%s' command or its subcommnad", c->cmd->name); + "the '%s' command or its subcommand", c->cmd->name); else addReplyErrorFormat(c, "-NOPERM this user has no permissions to access " @@ -3371,13 +3484,20 @@ int processCommand(client *c) { * is in MULTI/EXEC context? Error. */ if (out_of_memory && (c->cmd->flags & CMD_DENYOOM || - (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand))) { + (c->flags & CLIENT_MULTI && + c->cmd->proc != execCommand && + c->cmd->proc != discardCommand))) + { flagTransaction(c); addReply(c, shared.oomerr); return C_OK; } } + /* Make sure to use a reasonable amount of memory for client side + * caching metadata. */ + if (server.tracking_clients) trackingLimitUsedSlots(); + /* Don't accept write commands if there are problems persisting on disk * and if this is a master instance. */ int deny_write_type = writeCommandsDeniedByDiskError(); @@ -3492,6 +3612,7 @@ void closeListeningSockets(int unlink_unix_socket) { int j; for (j = 0; j < server.ipfd_count; j++) close(server.ipfd[j]); + for (j = 0; j < server.tlsfd_count; j++) close(server.tlsfd[j]); if (server.sofd != -1) close(server.sofd); if (server.cluster_enabled) for (j = 0; j < server.cfd_count; j++) close(server.cfd[j]); @@ -3518,6 +3639,12 @@ int prepareForShutdown(int flags) { killRDBChild(); } + /* Kill module child if there is one. */ + if (server.module_child_pid != -1) { + serverLog(LL_WARNING,"There is a module fork child. Killing it!"); + TerminateModuleForkChild(server.module_child_pid,0); + } + if (server.aof_state != AOF_OFF) { /* Kill the AOF saving child as the AOF we already have may be longer * but contains the full dataset anyway. */ @@ -3672,6 +3799,7 @@ void addReplyCommand(client *c, struct redisCommand *cmd) { flagcount += addReplyCommandFlag(c,cmd,CMD_LOADING, "loading"); flagcount += addReplyCommandFlag(c,cmd,CMD_STALE, "stale"); flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_MONITOR, "skip_monitor"); + flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_SLOWLOG, "skip_slowlog"); flagcount += addReplyCommandFlag(c,cmd,CMD_ASKING, "asking"); flagcount += addReplyCommandFlag(c,cmd,CMD_FAST, "fast"); if ((cmd->getkeys_proc && !(cmd->flags & CMD_MODULE)) || @@ -3786,12 +3914,15 @@ sds genRedisInfoString(char *section) { time_t uptime = server.unixtime-server.stat_starttime; int j; struct rusage self_ru, c_ru; - int allsections = 0, defsections = 0; + int allsections = 0, defsections = 0, everything = 0, modules = 0; int sections = 0; if (section == NULL) section = "default"; allsections = strcasecmp(section,"all") == 0; defsections = strcasecmp(section,"default") == 0; + everything = strcasecmp(section,"everything") == 0; + modules = strcasecmp(section,"modules") == 0; + if (everything) allsections = 1; getrusage(RUSAGE_SELF, &self_ru); getrusage(RUSAGE_CHILDREN, &c_ru); @@ -3814,34 +3945,32 @@ sds genRedisInfoString(char *section) { call_uname = 0; } - unsigned int lruclock; - atomicGet(server.lruclock,lruclock); - info = sdscatprintf(info, + info = sdscatfmt(info, "# Server\r\n" "redis_version:%s\r\n" "redis_git_sha1:%s\r\n" - "redis_git_dirty:%d\r\n" - "redis_build_id:%llx\r\n" + "redis_git_dirty:%i\r\n" + "redis_build_id:%s\r\n" "redis_mode:%s\r\n" "os:%s %s %s\r\n" - "arch_bits:%d\r\n" + "arch_bits:%i\r\n" "multiplexing_api:%s\r\n" "atomicvar_api:%s\r\n" - "gcc_version:%d.%d.%d\r\n" - "process_id:%ld\r\n" + "gcc_version:%i.%i.%i\r\n" + "process_id:%I\r\n" "run_id:%s\r\n" - "tcp_port:%d\r\n" - "uptime_in_seconds:%jd\r\n" - "uptime_in_days:%jd\r\n" - "hz:%d\r\n" - "configured_hz:%d\r\n" - "lru_clock:%ld\r\n" + "tcp_port:%i\r\n" + "uptime_in_seconds:%I\r\n" + "uptime_in_days:%I\r\n" + "hz:%i\r\n" + "configured_hz:%i\r\n" + "lru_clock:%u\r\n" "executable:%s\r\n" "config_file:%s\r\n", REDIS_VERSION, redisGitSHA1(), strtol(redisGitDirty(),NULL,10) > 0, - (unsigned long long) redisBuildId(), + redisBuildIdString(), mode, name.sysname, name.release, name.machine, server.arch_bits, @@ -3852,14 +3981,14 @@ sds genRedisInfoString(char *section) { #else 0,0,0, #endif - (long) getpid(), + (int64_t) getpid(), server.runid, - server.port, - (intmax_t)uptime, - (intmax_t)(uptime/(3600*24)), + server.port ? server.port : server.tls_port, + (int64_t)uptime, + (int64_t)(uptime/(3600*24)), server.hz, server.config_hz, - (unsigned long) lruclock, + server.lruclock, server.executable ? server.executable : "", server.configfile ? server.configfile : ""); } @@ -3874,10 +4003,12 @@ sds genRedisInfoString(char *section) { "connected_clients:%lu\r\n" "client_recent_max_input_buffer:%zu\r\n" "client_recent_max_output_buffer:%zu\r\n" - "blocked_clients:%d\r\n", + "blocked_clients:%d\r\n" + "tracking_clients:%d\r\n", listLength(server.clients)-listLength(server.slaves), maxin, maxout, - server.blocked_clients); + server.blocked_clients, + server.tracking_clients); } /* Memory */ @@ -3983,8 +4114,11 @@ sds genRedisInfoString(char *section) { mh->allocator_rss_bytes, mh->rss_extra, mh->rss_extra_bytes, - mh->total_frag, /* this is the total RSS overhead, including fragmentation, */ - mh->total_frag_bytes, /* named so for backwards compatibility */ + mh->total_frag, /* This is the total RSS overhead, including + fragmentation, but not just it. This field + (and the next one) is named like that just + for backward compatibility. */ + mh->total_frag_bytes, freeMemoryGetNotCountedMemory(), mh->repl_backlog, mh->clients_slaves, @@ -4017,7 +4151,9 @@ sds genRedisInfoString(char *section) { "aof_current_rewrite_time_sec:%jd\r\n" "aof_last_bgrewrite_status:%s\r\n" "aof_last_write_status:%s\r\n" - "aof_last_cow_size:%zu\r\n", + "aof_last_cow_size:%zu\r\n" + "module_fork_in_progress:%d\r\n" + "module_fork_last_cow_size:%zu\r\n", server.loading, server.dirty, server.rdb_child_pid != -1, @@ -4035,9 +4171,11 @@ sds genRedisInfoString(char *section) { -1 : time(NULL)-server.aof_rewrite_time_start), (server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err", (server.aof_last_write_status == C_OK) ? "ok" : "err", - server.stat_aof_cow_bytes); + server.stat_aof_cow_bytes, + server.module_child_pid != -1, + server.stat_module_cow_bytes); - if (server.aof_state != AOF_OFF) { + if (server.aof_enabled) { info = sdscatprintf(info, "aof_current_size:%lld\r\n" "aof_base_size:%lld\r\n" @@ -4117,7 +4255,8 @@ sds genRedisInfoString(char *section) { "active_defrag_hits:%lld\r\n" "active_defrag_misses:%lld\r\n" "active_defrag_key_hits:%lld\r\n" - "active_defrag_key_misses:%lld\r\n", + "active_defrag_key_misses:%lld\r\n" + "tracking_used_slots:%lld\r\n", server.stat_numconnections, server.stat_numcommands, getInstantaneousMetric(STATS_METRIC_COMMAND), @@ -4143,7 +4282,8 @@ sds genRedisInfoString(char *section) { server.stat_active_defrag_hits, server.stat_active_defrag_misses, server.stat_active_defrag_key_hits, - server.stat_active_defrag_key_misses); + server.stat_active_defrag_key_misses, + trackingGetUsedSlots()); } /* Replication */ @@ -4191,7 +4331,7 @@ sds genRedisInfoString(char *section) { if (server.repl_state != REPL_STATE_CONNECTED) { info = sdscatprintf(info, "master_link_down_since_seconds:%jd\r\n", - (intmax_t)server.unixtime-server.repl_down_since); + (intmax_t)(server.unixtime-server.repl_down_since)); } info = sdscatprintf(info, "slave_priority:%d\r\n" @@ -4227,7 +4367,7 @@ sds genRedisInfoString(char *section) { long lag = 0; if (slaveip[0] == '\0') { - if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) == -1) + if (connPeerToString(slave->conn,ip,sizeof(ip),&port) == -1) continue; slaveip = ip; } @@ -4289,6 +4429,13 @@ sds genRedisInfoString(char *section) { (long)c_ru.ru_utime.tv_sec, (long)c_ru.ru_utime.tv_usec); } + /* Modules */ + if (allsections || defsections || !strcasecmp(section,"modules")) { + if (sections++) info = sdscat(info,"\r\n"); + info = sdscatprintf(info,"# Modules\r\n"); + info = genModulesInfoString(info); + } + /* Command statistics */ if (allsections || !strcasecmp(section,"commandstats")) { if (sections++) info = sdscat(info,"\r\n"); @@ -4334,6 +4481,17 @@ sds genRedisInfoString(char *section) { } } } + + /* Get info from modules. + * if user asked for "everything" or "modules", or a specific section + * that's not found yet. */ + if (everything || modules || + (!allsections && !defsections && sections==0)) { + info = modulesCollectInfo(info, + everything || modules ? NULL: section, + 0, /* not a crash report */ + sections); + } return info; } @@ -4344,7 +4502,9 @@ void infoCommand(client *c) { addReply(c,shared.syntaxerr); return; } - addReplyBulkSds(c, genRedisInfoString(section)); + sds info = genRedisInfoString(section); + addReplyVerbatim(c,info,sdslen(info),"txt"); + sdsfree(info); } void monitorCommand(client *c) { @@ -4461,7 +4621,7 @@ void redisAsciiArt(void) { if (!show_logo) { serverLog(LL_NOTICE, "Running mode=%s, port=%d.", - mode, server.port + mode, server.port ? server.port : server.tls_port ); } else { snprintf(buf,1024*16,ascii_logo, @@ -4469,7 +4629,7 @@ void redisAsciiArt(void) { redisGitSHA1(), strtol(redisGitDirty(),NULL,10) > 0, (sizeof(long) == 8) ? "64" : "32", - mode, server.port, + mode, server.port ? server.port : server.tls_port, (long) getpid() ); serverLogRaw(LL_NOTICE|LL_RAW,buf); @@ -4500,6 +4660,7 @@ static void sigShutdownHandler(int sig) { rdbRemoveTempFile(getpid()); exit(1); /* Exit with an error since this was not a clean shutdown. */ } else if (server.loading) { + serverLogFromHandler(LL_WARNING, "Received shutdown signal during loading, exiting now."); exit(0); } @@ -4530,6 +4691,61 @@ void setupSignalHandlers(void) { return; } +/* This is the signal handler for children process. It is currently useful + * in order to track the SIGUSR1, that we send to a child in order to terminate + * it in a clean way, without the parent detecting an error and stop + * accepting writes because of a write error condition. */ +static void sigKillChildHandler(int sig) { + UNUSED(sig); + serverLogFromHandler(LL_WARNING, "Received SIGUSR1 in child, exiting now."); + exitFromChild(SERVER_CHILD_NOERROR_RETVAL); +} + +void setupChildSignalHandlers(void) { + struct sigaction act; + + /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used. + * Otherwise, sa_handler is used. */ + sigemptyset(&act.sa_mask); + act.sa_flags = 0; + act.sa_handler = sigKillChildHandler; + sigaction(SIGUSR1, &act, NULL); + return; +} + +int redisFork() { + int childpid; + long long start = ustime(); + if ((childpid = fork()) == 0) { + /* Child */ + closeListeningSockets(0); + setupChildSignalHandlers(); + } else { + /* Parent */ + server.stat_fork_time = ustime()-start; + server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */ + latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000); + if (childpid == -1) { + return -1; + } + updateDictResizePolicy(); + } + return childpid; +} + +void sendChildCOWInfo(int ptype, char *pname) { + size_t private_dirty = zmalloc_get_private_dirty(-1); + + if (private_dirty) { + serverLog(LL_NOTICE, + "%s: %zu MB of memory used by copy-on-write", + pname, private_dirty/(1024*1024)); + } + + server.child_info_data.cow_size = private_dirty; + sendChildInfo(ptype); +} + void memtest(size_t megabytes, int passes); /* Returns 1 if there is --sentinel among the arguments or if @@ -4556,12 +4772,14 @@ void loadDataFromDisk(void) { (float)(ustime()-start)/1000000); /* Restore the replication ID / offset from the RDB file. */ - if ((server.masterhost || (server.cluster_enabled && nodeIsSlave(server.cluster->myself)))&& + if ((server.masterhost || + (server.cluster_enabled && + nodeIsSlave(server.cluster->myself))) && rsi.repl_id_is_set && rsi.repl_offset != -1 && /* Note that older implementations may save a repl_stream_db - * of -1 inside the RDB file in a wrong way, see more information - * in function rdbPopulateSaveInfo. */ + * of -1 inside the RDB file in a wrong way, see more + * information in function rdbPopulateSaveInfo. */ rsi.repl_stream_db != -1) { memcpy(server.replid,rsi.repl_id,sizeof(server.replid)); @@ -4594,7 +4812,7 @@ void redisSetProcTitle(char *title) { setproctitle("%s %s:%d%s", title, server.bindaddr_count ? server.bindaddr[0] : "*", - server.port, + server.port ? server.port : server.tls_port, server_mode); #else UNUSED(title); @@ -4715,8 +4933,6 @@ int main(int argc, char **argv) { return sha1Test(argc, argv); } else if (!strcasecmp(argv[2], "util")) { return utilTest(argc, argv); - } else if (!strcasecmp(argv[2], "sds")) { - return sdsTest(argc, argv); } else if (!strcasecmp(argv[2], "endianconv")) { return endianconvTest(argc, argv); } else if (!strcasecmp(argv[2], "crc64")) { @@ -4739,14 +4955,15 @@ int main(int argc, char **argv) { srand(time(NULL)^getpid()); gettimeofday(&tv,NULL); - char hashseed[16]; - getRandomHexChars(hashseed,sizeof(hashseed)); - dictSetHashFunctionSeed((uint8_t*)hashseed); + uint8_t hashseed[16]; + getRandomBytes(hashseed,sizeof(hashseed)); + dictSetHashFunctionSeed(hashseed); server.sentinel_mode = checkForSentinelMode(argc,argv); initServerConfig(); ACLInit(); /* The ACL subsystem must be initialized ASAP because the basic networking code and client creation depends on it. */ moduleInitModulesSystem(); + tlsInit(); /* Store the executable path and arguments in a safe place in order * to be able to restart the server later. */ @@ -4870,6 +5087,7 @@ int main(int argc, char **argv) { #endif moduleLoadFromQueue(); ACLLoadUsersAtStartup(); + InitServerLast(); loadDataFromDisk(); if (server.cluster_enabled) { if (verifyClusterConfigWithData() == C_ERR) { @@ -4879,11 +5097,12 @@ int main(int argc, char **argv) { exit(1); } } - if (server.ipfd_count > 0) + if (server.ipfd_count > 0 || server.tlsfd_count > 0) serverLog(LL_NOTICE,"Ready to accept connections"); if (server.sofd > 0) serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket); } else { + InitServerLast(); sentinelIsRunning(); } |