summaryrefslogtreecommitdiff
path: root/src/server.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/server.c')
-rw-r--r--src/server.c249
1 files changed, 170 insertions, 79 deletions
diff --git a/src/server.c b/src/server.c
index 84f21fed3..298834eab 100644
--- a/src/server.c
+++ b/src/server.c
@@ -36,7 +36,6 @@
#include "atomicvar.h"
#include "mt19937-64.h"
#include "functions.h"
-#include "hdr_alloc.h"
#include <time.h>
#include <signal.h>
@@ -1016,18 +1015,8 @@ void databasesCron(void) {
}
}
-/* We take a cached value of the unix time in the global state because with
- * virtual memory and aging there is to store the current time in objects at
- * every object access, and accuracy is not needed. To access a global var is
- * a lot faster than calling time(NULL).
- *
- * This function should be fast because it is called at every command execution
- * in call(), so it is possible to decide if to update the daylight saving
- * info or not using the 'update_daylight_info' argument. Normally we update
- * such info only when calling this function from serverCron() but not when
- * calling it from call(). */
-void updateCachedTime(int update_daylight_info) {
- server.ustime = ustime();
+static inline void updateCachedTimeWithUs(int update_daylight_info, const long long ustime) {
+ server.ustime = ustime;
server.mstime = server.ustime / 1000;
time_t unixtime = server.mstime / 1000;
atomicSet(server.unixtime, unixtime);
@@ -1045,6 +1034,21 @@ void updateCachedTime(int update_daylight_info) {
}
}
+/* We take a cached value of the unix time in the global state because with
+ * virtual memory and aging there is to store the current time in objects at
+ * every object access, and accuracy is not needed. To access a global var is
+ * a lot faster than calling time(NULL).
+ *
+ * This function should be fast because it is called at every command execution
+ * in call(), so it is possible to decide if to update the daylight saving
+ * info or not using the 'update_daylight_info' argument. Normally we update
+ * such info only when calling this function from serverCron() but not when
+ * calling it from call(). */
+void updateCachedTime(int update_daylight_info) {
+ const long long us = ustime();
+ updateCachedTimeWithUs(update_daylight_info, us);
+}
+
void checkChildrenDone(void) {
int statloc = 0;
pid_t pid;
@@ -1209,10 +1213,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
cronUpdateMemoryStats();
- /* We received a SIGTERM, shutting down here in a safe way, as it is
+ /* We received a SIGTERM or SIGINT, shutting down here in a safe way, as it is
* not ok doing so inside the signal handler. */
if (server.shutdown_asap && !isShutdownInitiated()) {
- if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
+ int shutdownFlags = SHUTDOWN_NOFLAGS;
+ if (server.last_sig_received == SIGINT && server.shutdown_on_sigint)
+ shutdownFlags = server.shutdown_on_sigint;
+ else if (server.last_sig_received == SIGTERM && server.shutdown_on_sigterm)
+ shutdownFlags = server.shutdown_on_sigterm;
+
+ if (prepareForShutdown(shutdownFlags) == C_OK) exit(0);
} else if (isShutdownInitiated()) {
if (server.mstime >= server.shutdown_mstime || isReadyToShutdown()) {
if (finishShutdown() == C_OK) exit(0);
@@ -1296,13 +1306,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
if (server.aof_state == AOF_ON &&
!hasActiveChildProcess() &&
server.aof_rewrite_perc &&
- server.aof_current_size > server.aof_rewrite_min_size &&
- !aofRewriteLimited())
+ server.aof_current_size > server.aof_rewrite_min_size)
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100;
- if (growth >= server.aof_rewrite_perc) {
+ if (growth >= server.aof_rewrite_perc && !aofRewriteLimited()) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
@@ -1326,8 +1335,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
* however to try every second is enough in case of 'hz' is set to
* a higher frequency. */
run_with_period(1000) {
- if (server.aof_state == AOF_ON && server.aof_last_write_status == C_ERR)
- flushAppendOnlyFile(0);
+ if ((server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) &&
+ server.aof_last_write_status == C_ERR)
+ {
+ flushAppendOnlyFile(0);
+ }
}
/* Clear the paused clients state if needed. */
@@ -1466,6 +1478,7 @@ void whileBlockedCron() {
if (prepareForShutdown(SHUTDOWN_NOSAVE) == C_OK) exit(0);
serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
server.shutdown_asap = 0;
+ server.last_sig_received = 0;
}
}
@@ -1509,6 +1522,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
uint64_t processed = 0;
processed += handleClientsWithPendingReadsUsingThreads();
processed += tlsProcessPendingData();
+ if (server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE)
+ flushAppendOnlyFile(0);
processed += handleClientsWithPendingWrites();
processed += freeClientsInAsyncFreeQueue();
server.events_processed_while_blocked += processed;
@@ -1584,15 +1599,21 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
* client side caching protocol in broadcasting (BCAST) mode. */
trackingBroadcastInvalidationMessages();
- /* Write the AOF buffer on disk */
+ /* Try to process blocked clients every once in while.
+ *
+ * Example: A module calls RM_SignalKeyAsReady from within a timer callback
+ * (So we don't visit processCommand() at all).
+ *
+ * must be done before flushAppendOnlyFile, in case of appendfsync=always,
+ * since the unblocked clients may write data. */
+ handleClientsBlockedOnKeys();
+
+ /* Write the AOF buffer on disk,
+ * must be done before handleClientsWithPendingWritesUsingThreads,
+ * in case of appendfsync=always. */
if (server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE)
flushAppendOnlyFile(0);
- /* Try to process blocked clients every once in while. Example: A module
- * calls RM_SignalKeyAsReady from within a timer callback (So we don't
- * visit processCommand() at all). */
- handleClientsBlockedOnKeys();
-
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();
@@ -1878,15 +1899,6 @@ void initServerConfig(void) {
appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
- /* Specify the allocation function for the hdr histogram */
- hdrAllocFuncs hdrallocfn = {
- .mallocFn = zmalloc,
- .callocFn = zcalloc_num,
- .reallocFn = zrealloc,
- .freeFn = zfree,
- };
- hdrSetAllocators(&hdrallocfn);
-
/* Replication related */
server.masterhost = NULL;
server.masterport = 6379;
@@ -2286,6 +2298,7 @@ int listenToPort(int port, socketFds *sfd) {
closeSocketListeners(sfd);
return C_ERR;
}
+ if (server.socket_mark_id > 0) anetSetSockMarkId(NULL, sfd->fd[sfd->count], server.socket_mark_id);
anetNonBlock(NULL,sfd->fd[sfd->count]);
anetCloexec(sfd->fd[sfd->count]);
sfd->count++;
@@ -2338,6 +2351,7 @@ void resetServerStats(void) {
}
server.stat_aof_rewrites = 0;
server.stat_rdb_saves = 0;
+ server.stat_aofrw_consecutive_failures = 0;
atomicSet(server.stat_net_input_bytes, 0);
atomicSet(server.stat_net_output_bytes, 0);
server.stat_unexpected_error_replies = 0;
@@ -2539,6 +2553,7 @@ void initServer(void) {
server.aof_last_write_status = C_OK;
server.aof_last_write_errno = 0;
server.repl_good_slaves_count = 0;
+ server.last_sig_received = 0;
/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
@@ -2978,6 +2993,11 @@ struct redisCommand *lookupCommandOrOriginal(robj **argv ,int argc) {
return cmd;
}
+/* Commands arriving from the master client or AOF client, should never be rejected. */
+int mustObeyClient(client *c) {
+ return c->id == CLIENT_ID_AOF || c->flags & CLIENT_MASTER;
+}
+
static int shouldPropagate(int target) {
if (!server.replication_allowed || target == PROPAGATE_NONE || server.loading)
return 0;
@@ -3205,7 +3225,6 @@ int incrCommandStatsOnError(struct redisCommand *cmd, int flags) {
*/
void call(client *c, int flags) {
long long dirty;
- monotime call_timer;
uint64_t client_old_flags = c->flags;
struct redisCommand *real_cmd = c->realcmd;
@@ -3230,22 +3249,34 @@ void call(client *c, int flags) {
dirty = server.dirty;
incrCommandStatsOnError(NULL, 0);
+ const long long call_timer = ustime();
+
/* Update cache time, in case we have nested calls we want to
* update only on the first call*/
if (server.fixed_time_expire++ == 0) {
- updateCachedTime(0);
+ updateCachedTimeWithUs(0,call_timer);
}
- server.in_nested_call++;
- elapsedStart(&call_timer);
+ monotime monotonic_start = 0;
+ if (monotonicGetType() == MONOTONIC_CLOCK_HW)
+ monotonic_start = getMonotonicUs();
+
+ server.in_nested_call++;
c->cmd->proc(c);
- const long duration = elapsedUs(call_timer);
+ server.in_nested_call--;
+
+ /* In order to avoid performance implication due to querying the clock using a system call 3 times,
+ * we use a monotonic clock, when we are sure its cost is very low, and fall back to non-monotonic call otherwise. */
+ ustime_t duration;
+ if (monotonicGetType() == MONOTONIC_CLOCK_HW)
+ duration = getMonotonicUs() - monotonic_start;
+ else
+ duration = ustime() - call_timer;
+
c->duration = duration;
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;
- server.in_nested_call--;
-
/* Update failed command calls if required. */
if (!incrCommandStatsOnError(real_cmd, ERROR_COMMAND_FAILED) && c->deferred_reply_errors) {
@@ -3410,6 +3441,8 @@ void rejectCommand(client *c, robj *reply) {
}
void rejectCommandSds(client *c, sds s) {
+ flagTransaction(c);
+ if (c->cmd) c->cmd->rejected_calls++;
if (c->cmd && c->cmd->proc == execCommand) {
execCommandAbort(c, s);
sdsfree(s);
@@ -3420,8 +3453,6 @@ void rejectCommandSds(client *c, sds s) {
}
void rejectCommandFormat(client *c, const char *fmt, ...) {
- if (c->cmd) c->cmd->rejected_calls++;
- flagTransaction(c);
va_list ap;
va_start(ap,fmt);
sds s = sdscatvprintf(sdsempty(),fmt,ap);
@@ -3475,6 +3506,54 @@ void populateCommandMovableKeys(struct redisCommand *cmd) {
cmd->flags |= CMD_MOVABLE_KEYS;
}
+/* Check if c->cmd exists, fills `err` with details in case it doesn't.
+ * Return 1 if exists. */
+int commandCheckExistence(client *c, sds *err) {
+ if (c->cmd)
+ return 1;
+ if (!err)
+ return 0;
+ if (isContainerCommandBySds(c->argv[0]->ptr)) {
+ /* If we can't find the command but argv[0] by itself is a command
+ * it means we're dealing with an invalid subcommand. Print Help. */
+ sds cmd = sdsnew((char *)c->argv[0]->ptr);
+ sdstoupper(cmd);
+ *err = sdsnew(NULL);
+ *err = sdscatprintf(*err, "unknown subcommand '%.128s'. Try %s HELP.",
+ (char *)c->argv[1]->ptr, cmd);
+ sdsfree(cmd);
+ } else {
+ sds args = sdsempty();
+ int i;
+ for (i=1; i < c->argc && sdslen(args) < 128; i++)
+ args = sdscatprintf(args, "'%.*s' ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
+ *err = sdsnew(NULL);
+ *err = sdscatprintf(*err, "unknown command '%.128s', with args beginning with: %s",
+ (char*)c->argv[0]->ptr, args);
+ sdsfree(args);
+ }
+ /* Make sure there are no newlines in the string, otherwise invalid protocol
+ * is emitted (The args come from the user, they may contain any character). */
+ sdsmapchars(*err, "\r\n", " ", 2);
+ return 0;
+}
+
+/* Check if c->argc is valid for c->cmd, fills `err` with details in case it isn't.
+ * Return 1 if valid. */
+int commandCheckArity(client *c, sds *err) {
+ if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
+ (c->argc < -c->cmd->arity))
+ {
+ if (err) {
+ *err = sdsnew(NULL);
+ *err = sdscatprintf(*err, "wrong number of arguments for '%s' command", c->cmd->fullname);
+ }
+ return 0;
+ }
+
+ return 1;
+}
+
/* If this function gets called we already read a whole
* command, arguments are in the client argv/argc fields.
* processCommand() execute the command or prepare the
@@ -3514,29 +3593,13 @@ int processCommand(client *c) {
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
c->cmd = c->lastcmd = c->realcmd = lookupCommand(c->argv,c->argc);
- if (!c->cmd) {
- if (isContainerCommandBySds(c->argv[0]->ptr)) {
- /* If we can't find the command but argv[0] by itself is a command
- * it means we're dealing with an invalid subcommand. Print Help. */
- sds cmd = sdsnew((char *)c->argv[0]->ptr);
- sdstoupper(cmd);
- rejectCommandFormat(c, "Unknown subcommand '%.128s'. Try %s HELP.",
- (char *)c->argv[1]->ptr, cmd);
- sdsfree(cmd);
- return C_OK;
- }
- sds args = sdsempty();
- int i;
- for (i=1; i < c->argc && sdslen(args) < 128; i++)
- args = sdscatprintf(args, "'%.*s' ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
- rejectCommandFormat(c,"unknown command '%s', with args beginning with: %s",
- (char*)c->argv[0]->ptr, args);
- sdsfree(args);
+ sds err;
+ if (!commandCheckExistence(c, &err)) {
+ rejectCommandSds(c, err);
return C_OK;
- } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
- (c->argc < -c->cmd->arity))
- {
- rejectCommandFormat(c,"wrong number of arguments for '%s' command", c->cmd->fullname);
+ }
+ if (!commandCheckArity(c, &err)) {
+ rejectCommandSds(c, err);
return C_OK;
}
@@ -3569,6 +3632,7 @@ int processCommand(client *c) {
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & (CMD_WRITE | CMD_MAY_REPLICATE)));
int is_deny_async_loading_command = (c->cmd->flags & CMD_NO_ASYNC_LOADING) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_NO_ASYNC_LOADING));
+ int obey_client = mustObeyClient(c);
if (authRequired(c)) {
/* AUTH and HELLO and no auth commands are valid even in
@@ -3620,23 +3684,20 @@ int processCommand(client *c) {
* 1) The sender of this command is our master.
* 2) The command has no key arguments. */
if (server.cluster_enabled &&
- !(c->flags & CLIENT_MASTER) &&
- !(c->flags & CLIENT_SCRIPT &&
- server.script_caller->flags & CLIENT_MASTER) &&
+ !mustObeyClient(c) &&
!(!(c->cmd->flags&CMD_MOVABLE_KEYS) && c->cmd->key_specs_num == 0 &&
c->cmd->proc != execCommand))
{
- int hashslot;
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
- &hashslot,&error_code);
+ &c->slot,&error_code);
if (n == NULL || n != server.cluster->myself) {
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
flagTransaction(c);
}
- clusterRedirectClient(c,n,hashslot,error_code);
+ clusterRedirectClient(c,n,c->slot,error_code);
c->cmd->rejected_calls++;
return C_OK;
}
@@ -3706,15 +3767,29 @@ int processCommand(client *c) {
if (server.tracking_clients) trackingLimitUsedSlots();
/* Don't accept write commands if there are problems persisting on disk
- * and if this is a master instance. */
+ * unless coming from our master, in which case check the replica ignore
+ * disk write error config to either log or crash. */
int deny_write_type = writeCommandsDeniedByDiskError();
if (deny_write_type != DISK_ERROR_TYPE_NONE &&
- server.masterhost == NULL &&
- (is_write_command ||c->cmd->proc == pingCommand))
+ (is_write_command || c->cmd->proc == pingCommand))
{
- sds err = writeCommandsGetDiskErrorMessage(deny_write_type);
- rejectCommandSds(c, err);
- return C_OK;
+ if (obey_client) {
+ if (!server.repl_ignore_disk_write_error && c->cmd->proc != pingCommand) {
+ serverPanic("Replica was unable to write command to disk.");
+ } else {
+ static mstime_t last_log_time_ms = 0;
+ const mstime_t log_interval_ms = 10000;
+ if (server.mstime > last_log_time_ms + log_interval_ms) {
+ last_log_time_ms = server.mstime;
+ serverLog(LL_WARNING, "Replica is applying a command even though "
+ "it is unable to write to disk.");
+ }
+ }
+ } else {
+ sds err = writeCommandsGetDiskErrorMessage(deny_write_type);
+ rejectCommandSds(c, err);
+ return C_OK;
+ }
}
/* Don't accept write commands if there are not enough good slaves and
@@ -3727,7 +3802,7 @@ int processCommand(client *c) {
/* Don't accept write commands if this is a read only slave. But
* accept write commands if this is our master. */
if (server.masterhost && server.repl_slave_ro &&
- !(c->flags & CLIENT_MASTER) &&
+ !obey_client &&
is_write_command)
{
rejectCommand(c, shared.roslaveerr);
@@ -3949,6 +4024,7 @@ static void cancelShutdown(void) {
server.shutdown_asap = 0;
server.shutdown_flags = 0;
server.shutdown_mstime = 0;
+ server.last_sig_received = 0;
replyToClientsBlockedOnShutdown();
unpauseClients(PAUSE_DURING_SHUTDOWN);
}
@@ -4309,6 +4385,7 @@ void addReplyCommandArgList(client *c, struct redisCommandArg *args, int num_arg
if (args[j].token) maplen++;
if (args[j].summary) maplen++;
if (args[j].since) maplen++;
+ if (args[j].deprecated_since) maplen++;
if (args[j].flags) maplen++;
if (args[j].type == ARG_TYPE_ONEOF || args[j].type == ARG_TYPE_BLOCK)
maplen++;
@@ -4336,6 +4413,10 @@ void addReplyCommandArgList(client *c, struct redisCommandArg *args, int num_arg
addReplyBulkCString(c, "since");
addReplyBulkCString(c, args[j].since);
}
+ if (args[j].deprecated_since) {
+ addReplyBulkCString(c, "deprecated_since");
+ addReplyBulkCString(c, args[j].deprecated_since);
+ }
if (args[j].flags) {
addReplyBulkCString(c, "flags");
addReplyFlagsForArg(c, args[j].flags);
@@ -4562,6 +4643,7 @@ void addReplyCommandDocs(client *c, struct redisCommand *cmd) {
long maplen = 1;
if (cmd->summary) maplen++;
if (cmd->since) maplen++;
+ if (cmd->flags & CMD_MODULE) maplen++;
if (cmd->complexity) maplen++;
if (cmd->doc_flags) maplen++;
if (cmd->deprecated_since) maplen++;
@@ -4588,6 +4670,10 @@ void addReplyCommandDocs(client *c, struct redisCommand *cmd) {
addReplyBulkCString(c, "complexity");
addReplyBulkCString(c, cmd->complexity);
}
+ if (cmd->flags & CMD_MODULE) {
+ addReplyBulkCString(c, "module");
+ addReplyBulkCString(c, moduleNameFromCommand(cmd));
+ }
if (cmd->doc_flags) {
addReplyBulkCString(c, "doc_flags");
addReplyDocFlagsForCommand(c, cmd);
@@ -5123,6 +5209,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
"redis_mode:%s\r\n"
"os:%s %s %s\r\n"
"arch_bits:%i\r\n"
+ "monotonic_clock:%s\r\n"
"multiplexing_api:%s\r\n"
"atomicvar_api:%s\r\n"
"gcc_version:%i.%i.%i\r\n"
@@ -5146,6 +5233,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
mode,
name.sysname, name.release, name.machine,
server.arch_bits,
+ monotonicInfoString(),
aeGetApiName(),
REDIS_ATOMIC_API,
#ifdef __GNUC__
@@ -5383,6 +5471,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
"aof_current_rewrite_time_sec:%jd\r\n"
"aof_last_bgrewrite_status:%s\r\n"
"aof_rewrites:%lld\r\n"
+ "aof_rewrites_consecutive_failures:%lld\r\n"
"aof_last_write_status:%s\r\n"
"aof_last_cow_size:%zu\r\n"
"module_fork_in_progress:%d\r\n"
@@ -5414,6 +5503,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
-1 : time(NULL)-server.aof_rewrite_time_start),
(server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err",
server.stat_aof_rewrites,
+ server.stat_aofrw_consecutive_failures,
(server.aof_last_write_status == C_OK &&
aof_bio_fsync_status == C_OK) ? "ok" : "err",
server.stat_aof_cow_bytes,
@@ -6227,6 +6317,7 @@ static void sigShutdownHandler(int sig) {
serverLogFromHandler(LL_WARNING, msg);
server.shutdown_asap = 1;
+ server.last_sig_received = sig;
}
void setupSignalHandlers(void) {