diff options
Diffstat (limited to 'src/server.c')
-rw-r--r-- | src/server.c | 249 |
1 files changed, 170 insertions, 79 deletions
diff --git a/src/server.c b/src/server.c index 84f21fed3..298834eab 100644 --- a/src/server.c +++ b/src/server.c @@ -36,7 +36,6 @@ #include "atomicvar.h" #include "mt19937-64.h" #include "functions.h" -#include "hdr_alloc.h" #include <time.h> #include <signal.h> @@ -1016,18 +1015,8 @@ void databasesCron(void) { } } -/* We take a cached value of the unix time in the global state because with - * virtual memory and aging there is to store the current time in objects at - * every object access, and accuracy is not needed. To access a global var is - * a lot faster than calling time(NULL). - * - * This function should be fast because it is called at every command execution - * in call(), so it is possible to decide if to update the daylight saving - * info or not using the 'update_daylight_info' argument. Normally we update - * such info only when calling this function from serverCron() but not when - * calling it from call(). */ -void updateCachedTime(int update_daylight_info) { - server.ustime = ustime(); +static inline void updateCachedTimeWithUs(int update_daylight_info, const long long ustime) { + server.ustime = ustime; server.mstime = server.ustime / 1000; time_t unixtime = server.mstime / 1000; atomicSet(server.unixtime, unixtime); @@ -1045,6 +1034,21 @@ void updateCachedTime(int update_daylight_info) { } } +/* We take a cached value of the unix time in the global state because with + * virtual memory and aging there is to store the current time in objects at + * every object access, and accuracy is not needed. To access a global var is + * a lot faster than calling time(NULL). + * + * This function should be fast because it is called at every command execution + * in call(), so it is possible to decide if to update the daylight saving + * info or not using the 'update_daylight_info' argument. Normally we update + * such info only when calling this function from serverCron() but not when + * calling it from call(). */ +void updateCachedTime(int update_daylight_info) { + const long long us = ustime(); + updateCachedTimeWithUs(update_daylight_info, us); +} + void checkChildrenDone(void) { int statloc = 0; pid_t pid; @@ -1209,10 +1213,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { cronUpdateMemoryStats(); - /* We received a SIGTERM, shutting down here in a safe way, as it is + /* We received a SIGTERM or SIGINT, shutting down here in a safe way, as it is * not ok doing so inside the signal handler. */ if (server.shutdown_asap && !isShutdownInitiated()) { - if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0); + int shutdownFlags = SHUTDOWN_NOFLAGS; + if (server.last_sig_received == SIGINT && server.shutdown_on_sigint) + shutdownFlags = server.shutdown_on_sigint; + else if (server.last_sig_received == SIGTERM && server.shutdown_on_sigterm) + shutdownFlags = server.shutdown_on_sigterm; + + if (prepareForShutdown(shutdownFlags) == C_OK) exit(0); } else if (isShutdownInitiated()) { if (server.mstime >= server.shutdown_mstime || isReadyToShutdown()) { if (finishShutdown() == C_OK) exit(0); @@ -1296,13 +1306,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { if (server.aof_state == AOF_ON && !hasActiveChildProcess() && server.aof_rewrite_perc && - server.aof_current_size > server.aof_rewrite_min_size && - !aofRewriteLimited()) + server.aof_current_size > server.aof_rewrite_min_size) { long long base = server.aof_rewrite_base_size ? server.aof_rewrite_base_size : 1; long long growth = (server.aof_current_size*100/base) - 100; - if (growth >= server.aof_rewrite_perc) { + if (growth >= server.aof_rewrite_perc && !aofRewriteLimited()) { serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth); rewriteAppendOnlyFileBackground(); } @@ -1326,8 +1335,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * however to try every second is enough in case of 'hz' is set to * a higher frequency. */ run_with_period(1000) { - if (server.aof_state == AOF_ON && server.aof_last_write_status == C_ERR) - flushAppendOnlyFile(0); + if ((server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) && + server.aof_last_write_status == C_ERR) + { + flushAppendOnlyFile(0); + } } /* Clear the paused clients state if needed. */ @@ -1466,6 +1478,7 @@ void whileBlockedCron() { if (prepareForShutdown(SHUTDOWN_NOSAVE) == C_OK) exit(0); serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information"); server.shutdown_asap = 0; + server.last_sig_received = 0; } } @@ -1509,6 +1522,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) { uint64_t processed = 0; processed += handleClientsWithPendingReadsUsingThreads(); processed += tlsProcessPendingData(); + if (server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) + flushAppendOnlyFile(0); processed += handleClientsWithPendingWrites(); processed += freeClientsInAsyncFreeQueue(); server.events_processed_while_blocked += processed; @@ -1584,15 +1599,21 @@ void beforeSleep(struct aeEventLoop *eventLoop) { * client side caching protocol in broadcasting (BCAST) mode. */ trackingBroadcastInvalidationMessages(); - /* Write the AOF buffer on disk */ + /* Try to process blocked clients every once in while. + * + * Example: A module calls RM_SignalKeyAsReady from within a timer callback + * (So we don't visit processCommand() at all). + * + * must be done before flushAppendOnlyFile, in case of appendfsync=always, + * since the unblocked clients may write data. */ + handleClientsBlockedOnKeys(); + + /* Write the AOF buffer on disk, + * must be done before handleClientsWithPendingWritesUsingThreads, + * in case of appendfsync=always. */ if (server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) flushAppendOnlyFile(0); - /* Try to process blocked clients every once in while. Example: A module - * calls RM_SignalKeyAsReady from within a timer callback (So we don't - * visit processCommand() at all). */ - handleClientsBlockedOnKeys(); - /* Handle writes with pending output buffers. */ handleClientsWithPendingWritesUsingThreads(); @@ -1878,15 +1899,6 @@ void initServerConfig(void) { appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */ appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */ - /* Specify the allocation function for the hdr histogram */ - hdrAllocFuncs hdrallocfn = { - .mallocFn = zmalloc, - .callocFn = zcalloc_num, - .reallocFn = zrealloc, - .freeFn = zfree, - }; - hdrSetAllocators(&hdrallocfn); - /* Replication related */ server.masterhost = NULL; server.masterport = 6379; @@ -2286,6 +2298,7 @@ int listenToPort(int port, socketFds *sfd) { closeSocketListeners(sfd); return C_ERR; } + if (server.socket_mark_id > 0) anetSetSockMarkId(NULL, sfd->fd[sfd->count], server.socket_mark_id); anetNonBlock(NULL,sfd->fd[sfd->count]); anetCloexec(sfd->fd[sfd->count]); sfd->count++; @@ -2338,6 +2351,7 @@ void resetServerStats(void) { } server.stat_aof_rewrites = 0; server.stat_rdb_saves = 0; + server.stat_aofrw_consecutive_failures = 0; atomicSet(server.stat_net_input_bytes, 0); atomicSet(server.stat_net_output_bytes, 0); server.stat_unexpected_error_replies = 0; @@ -2539,6 +2553,7 @@ void initServer(void) { server.aof_last_write_status = C_OK; server.aof_last_write_errno = 0; server.repl_good_slaves_count = 0; + server.last_sig_received = 0; /* Create the timer callback, this is our way to process many background * operations incrementally, like clients timeout, eviction of unaccessed @@ -2978,6 +2993,11 @@ struct redisCommand *lookupCommandOrOriginal(robj **argv ,int argc) { return cmd; } +/* Commands arriving from the master client or AOF client, should never be rejected. */ +int mustObeyClient(client *c) { + return c->id == CLIENT_ID_AOF || c->flags & CLIENT_MASTER; +} + static int shouldPropagate(int target) { if (!server.replication_allowed || target == PROPAGATE_NONE || server.loading) return 0; @@ -3205,7 +3225,6 @@ int incrCommandStatsOnError(struct redisCommand *cmd, int flags) { */ void call(client *c, int flags) { long long dirty; - monotime call_timer; uint64_t client_old_flags = c->flags; struct redisCommand *real_cmd = c->realcmd; @@ -3230,22 +3249,34 @@ void call(client *c, int flags) { dirty = server.dirty; incrCommandStatsOnError(NULL, 0); + const long long call_timer = ustime(); + /* Update cache time, in case we have nested calls we want to * update only on the first call*/ if (server.fixed_time_expire++ == 0) { - updateCachedTime(0); + updateCachedTimeWithUs(0,call_timer); } - server.in_nested_call++; - elapsedStart(&call_timer); + monotime monotonic_start = 0; + if (monotonicGetType() == MONOTONIC_CLOCK_HW) + monotonic_start = getMonotonicUs(); + + server.in_nested_call++; c->cmd->proc(c); - const long duration = elapsedUs(call_timer); + server.in_nested_call--; + + /* In order to avoid performance implication due to querying the clock using a system call 3 times, + * we use a monotonic clock, when we are sure its cost is very low, and fall back to non-monotonic call otherwise. */ + ustime_t duration; + if (monotonicGetType() == MONOTONIC_CLOCK_HW) + duration = getMonotonicUs() - monotonic_start; + else + duration = ustime() - call_timer; + c->duration = duration; dirty = server.dirty-dirty; if (dirty < 0) dirty = 0; - server.in_nested_call--; - /* Update failed command calls if required. */ if (!incrCommandStatsOnError(real_cmd, ERROR_COMMAND_FAILED) && c->deferred_reply_errors) { @@ -3410,6 +3441,8 @@ void rejectCommand(client *c, robj *reply) { } void rejectCommandSds(client *c, sds s) { + flagTransaction(c); + if (c->cmd) c->cmd->rejected_calls++; if (c->cmd && c->cmd->proc == execCommand) { execCommandAbort(c, s); sdsfree(s); @@ -3420,8 +3453,6 @@ void rejectCommandSds(client *c, sds s) { } void rejectCommandFormat(client *c, const char *fmt, ...) { - if (c->cmd) c->cmd->rejected_calls++; - flagTransaction(c); va_list ap; va_start(ap,fmt); sds s = sdscatvprintf(sdsempty(),fmt,ap); @@ -3475,6 +3506,54 @@ void populateCommandMovableKeys(struct redisCommand *cmd) { cmd->flags |= CMD_MOVABLE_KEYS; } +/* Check if c->cmd exists, fills `err` with details in case it doesn't. + * Return 1 if exists. */ +int commandCheckExistence(client *c, sds *err) { + if (c->cmd) + return 1; + if (!err) + return 0; + if (isContainerCommandBySds(c->argv[0]->ptr)) { + /* If we can't find the command but argv[0] by itself is a command + * it means we're dealing with an invalid subcommand. Print Help. */ + sds cmd = sdsnew((char *)c->argv[0]->ptr); + sdstoupper(cmd); + *err = sdsnew(NULL); + *err = sdscatprintf(*err, "unknown subcommand '%.128s'. Try %s HELP.", + (char *)c->argv[1]->ptr, cmd); + sdsfree(cmd); + } else { + sds args = sdsempty(); + int i; + for (i=1; i < c->argc && sdslen(args) < 128; i++) + args = sdscatprintf(args, "'%.*s' ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr); + *err = sdsnew(NULL); + *err = sdscatprintf(*err, "unknown command '%.128s', with args beginning with: %s", + (char*)c->argv[0]->ptr, args); + sdsfree(args); + } + /* Make sure there are no newlines in the string, otherwise invalid protocol + * is emitted (The args come from the user, they may contain any character). */ + sdsmapchars(*err, "\r\n", " ", 2); + return 0; +} + +/* Check if c->argc is valid for c->cmd, fills `err` with details in case it isn't. + * Return 1 if valid. */ +int commandCheckArity(client *c, sds *err) { + if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || + (c->argc < -c->cmd->arity)) + { + if (err) { + *err = sdsnew(NULL); + *err = sdscatprintf(*err, "wrong number of arguments for '%s' command", c->cmd->fullname); + } + return 0; + } + + return 1; +} + /* If this function gets called we already read a whole * command, arguments are in the client argv/argc fields. * processCommand() execute the command or prepare the @@ -3514,29 +3593,13 @@ int processCommand(client *c) { /* Now lookup the command and check ASAP about trivial error conditions * such as wrong arity, bad command name and so forth. */ c->cmd = c->lastcmd = c->realcmd = lookupCommand(c->argv,c->argc); - if (!c->cmd) { - if (isContainerCommandBySds(c->argv[0]->ptr)) { - /* If we can't find the command but argv[0] by itself is a command - * it means we're dealing with an invalid subcommand. Print Help. */ - sds cmd = sdsnew((char *)c->argv[0]->ptr); - sdstoupper(cmd); - rejectCommandFormat(c, "Unknown subcommand '%.128s'. Try %s HELP.", - (char *)c->argv[1]->ptr, cmd); - sdsfree(cmd); - return C_OK; - } - sds args = sdsempty(); - int i; - for (i=1; i < c->argc && sdslen(args) < 128; i++) - args = sdscatprintf(args, "'%.*s' ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr); - rejectCommandFormat(c,"unknown command '%s', with args beginning with: %s", - (char*)c->argv[0]->ptr, args); - sdsfree(args); + sds err; + if (!commandCheckExistence(c, &err)) { + rejectCommandSds(c, err); return C_OK; - } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || - (c->argc < -c->cmd->arity)) - { - rejectCommandFormat(c,"wrong number of arguments for '%s' command", c->cmd->fullname); + } + if (!commandCheckArity(c, &err)) { + rejectCommandSds(c, err); return C_OK; } @@ -3569,6 +3632,7 @@ int processCommand(client *c) { (c->cmd->proc == execCommand && (c->mstate.cmd_flags & (CMD_WRITE | CMD_MAY_REPLICATE))); int is_deny_async_loading_command = (c->cmd->flags & CMD_NO_ASYNC_LOADING) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_NO_ASYNC_LOADING)); + int obey_client = mustObeyClient(c); if (authRequired(c)) { /* AUTH and HELLO and no auth commands are valid even in @@ -3620,23 +3684,20 @@ int processCommand(client *c) { * 1) The sender of this command is our master. * 2) The command has no key arguments. */ if (server.cluster_enabled && - !(c->flags & CLIENT_MASTER) && - !(c->flags & CLIENT_SCRIPT && - server.script_caller->flags & CLIENT_MASTER) && + !mustObeyClient(c) && !(!(c->cmd->flags&CMD_MOVABLE_KEYS) && c->cmd->key_specs_num == 0 && c->cmd->proc != execCommand)) { - int hashslot; int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc, - &hashslot,&error_code); + &c->slot,&error_code); if (n == NULL || n != server.cluster->myself) { if (c->cmd->proc == execCommand) { discardTransaction(c); } else { flagTransaction(c); } - clusterRedirectClient(c,n,hashslot,error_code); + clusterRedirectClient(c,n,c->slot,error_code); c->cmd->rejected_calls++; return C_OK; } @@ -3706,15 +3767,29 @@ int processCommand(client *c) { if (server.tracking_clients) trackingLimitUsedSlots(); /* Don't accept write commands if there are problems persisting on disk - * and if this is a master instance. */ + * unless coming from our master, in which case check the replica ignore + * disk write error config to either log or crash. */ int deny_write_type = writeCommandsDeniedByDiskError(); if (deny_write_type != DISK_ERROR_TYPE_NONE && - server.masterhost == NULL && - (is_write_command ||c->cmd->proc == pingCommand)) + (is_write_command || c->cmd->proc == pingCommand)) { - sds err = writeCommandsGetDiskErrorMessage(deny_write_type); - rejectCommandSds(c, err); - return C_OK; + if (obey_client) { + if (!server.repl_ignore_disk_write_error && c->cmd->proc != pingCommand) { + serverPanic("Replica was unable to write command to disk."); + } else { + static mstime_t last_log_time_ms = 0; + const mstime_t log_interval_ms = 10000; + if (server.mstime > last_log_time_ms + log_interval_ms) { + last_log_time_ms = server.mstime; + serverLog(LL_WARNING, "Replica is applying a command even though " + "it is unable to write to disk."); + } + } + } else { + sds err = writeCommandsGetDiskErrorMessage(deny_write_type); + rejectCommandSds(c, err); + return C_OK; + } } /* Don't accept write commands if there are not enough good slaves and @@ -3727,7 +3802,7 @@ int processCommand(client *c) { /* Don't accept write commands if this is a read only slave. But * accept write commands if this is our master. */ if (server.masterhost && server.repl_slave_ro && - !(c->flags & CLIENT_MASTER) && + !obey_client && is_write_command) { rejectCommand(c, shared.roslaveerr); @@ -3949,6 +4024,7 @@ static void cancelShutdown(void) { server.shutdown_asap = 0; server.shutdown_flags = 0; server.shutdown_mstime = 0; + server.last_sig_received = 0; replyToClientsBlockedOnShutdown(); unpauseClients(PAUSE_DURING_SHUTDOWN); } @@ -4309,6 +4385,7 @@ void addReplyCommandArgList(client *c, struct redisCommandArg *args, int num_arg if (args[j].token) maplen++; if (args[j].summary) maplen++; if (args[j].since) maplen++; + if (args[j].deprecated_since) maplen++; if (args[j].flags) maplen++; if (args[j].type == ARG_TYPE_ONEOF || args[j].type == ARG_TYPE_BLOCK) maplen++; @@ -4336,6 +4413,10 @@ void addReplyCommandArgList(client *c, struct redisCommandArg *args, int num_arg addReplyBulkCString(c, "since"); addReplyBulkCString(c, args[j].since); } + if (args[j].deprecated_since) { + addReplyBulkCString(c, "deprecated_since"); + addReplyBulkCString(c, args[j].deprecated_since); + } if (args[j].flags) { addReplyBulkCString(c, "flags"); addReplyFlagsForArg(c, args[j].flags); @@ -4562,6 +4643,7 @@ void addReplyCommandDocs(client *c, struct redisCommand *cmd) { long maplen = 1; if (cmd->summary) maplen++; if (cmd->since) maplen++; + if (cmd->flags & CMD_MODULE) maplen++; if (cmd->complexity) maplen++; if (cmd->doc_flags) maplen++; if (cmd->deprecated_since) maplen++; @@ -4588,6 +4670,10 @@ void addReplyCommandDocs(client *c, struct redisCommand *cmd) { addReplyBulkCString(c, "complexity"); addReplyBulkCString(c, cmd->complexity); } + if (cmd->flags & CMD_MODULE) { + addReplyBulkCString(c, "module"); + addReplyBulkCString(c, moduleNameFromCommand(cmd)); + } if (cmd->doc_flags) { addReplyBulkCString(c, "doc_flags"); addReplyDocFlagsForCommand(c, cmd); @@ -5123,6 +5209,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { "redis_mode:%s\r\n" "os:%s %s %s\r\n" "arch_bits:%i\r\n" + "monotonic_clock:%s\r\n" "multiplexing_api:%s\r\n" "atomicvar_api:%s\r\n" "gcc_version:%i.%i.%i\r\n" @@ -5146,6 +5233,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { mode, name.sysname, name.release, name.machine, server.arch_bits, + monotonicInfoString(), aeGetApiName(), REDIS_ATOMIC_API, #ifdef __GNUC__ @@ -5383,6 +5471,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { "aof_current_rewrite_time_sec:%jd\r\n" "aof_last_bgrewrite_status:%s\r\n" "aof_rewrites:%lld\r\n" + "aof_rewrites_consecutive_failures:%lld\r\n" "aof_last_write_status:%s\r\n" "aof_last_cow_size:%zu\r\n" "module_fork_in_progress:%d\r\n" @@ -5414,6 +5503,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { -1 : time(NULL)-server.aof_rewrite_time_start), (server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err", server.stat_aof_rewrites, + server.stat_aofrw_consecutive_failures, (server.aof_last_write_status == C_OK && aof_bio_fsync_status == C_OK) ? "ok" : "err", server.stat_aof_cow_bytes, @@ -6227,6 +6317,7 @@ static void sigShutdownHandler(int sig) { serverLogFromHandler(LL_WARNING, msg); server.shutdown_asap = 1; + server.last_sig_received = sig; } void setupSignalHandlers(void) { |