summaryrefslogtreecommitdiff
path: root/src/server.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/server.c')
-rw-r--r--src/server.c490
1 files changed, 406 insertions, 84 deletions
diff --git a/src/server.c b/src/server.c
index ac43c32e7..0551eb3e4 100644
--- a/src/server.c
+++ b/src/server.c
@@ -34,6 +34,7 @@
#include "bio.h"
#include "latency.h"
#include "atomicvar.h"
+#include "mt19937-64.h"
#include <time.h>
#include <signal.h>
@@ -58,6 +59,10 @@
#include <sys/socket.h>
#include <sys/resource.h>
+#ifdef __linux__
+#include <sys/mman.h>
+#endif
+
/* Our shared "common" objects */
struct sharedObjectsStruct shared;
@@ -115,9 +120,9 @@ struct redisServer server; /* Server global state */
*
* write: Write command (may modify the key space).
*
- * read-only: All the non special commands just reading from keys without
- * changing the content, or returning other information like
- * the TIME command. Special commands such administrative commands
+ * read-only: Commands just reading from keys without changing the content.
+ * Note that commands that don't read from the keyspace such as
+ * TIME, SELECT, INFO, administrative commands, and connection
* or transaction related commands (multi, exec, discard, ...)
* are not flagged as read-only commands, since they affect the
* server or the connection in other ways.
@@ -158,6 +163,13 @@ struct redisServer server; /* Server global state */
* delay its execution as long as the kernel scheduler is giving
* us time. Note that commands that may trigger a DEL as a side
* effect (like SET) are not fast commands.
+ *
+ * may-replicate: Command may produce replication traffic, but should be
+ * allowed under circumstances where write commands are disallowed.
+ * Examples include PUBLISH, which replicates pubsub messages,and
+ * EVAL, which may execute write commands, which are replicated,
+ * or may just execute read commands. A command can not be marked
+ * both "write" and "may-replicate"
*
* The following additional flags are only used in order to put commands
* in a specific ACL category. Commands can have multiple ACL categories.
@@ -287,11 +299,11 @@ struct redisCommand redisCommandTable[] = {
"write use-memory @list",
0,NULL,1,1,1,0,0,0},
- {"rpop",rpopCommand,2,
+ {"rpop",rpopCommand,-2,
"write fast @list",
0,NULL,1,1,1,0,0,0},
- {"lpop",lpopCommand,2,
+ {"lpop",lpopCommand,-2,
"write fast @list",
0,NULL,1,1,1,0,0,0},
@@ -463,6 +475,10 @@ struct redisCommand redisCommandTable[] = {
"read-only @sortedset",
0,NULL,1,1,1,0,0,0},
+ {"zrangestore",zrangestoreCommand,-5,
+ "write use-memory @sortedset",
+ 0,NULL,1,2,1,0,0,0},
+
{"zrangebyscore",zrangebyscoreCommand,-4,
"read-only @sortedset",
0,NULL,1,1,1,0,0,0},
@@ -685,7 +701,7 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0},
{"echo",echoCommand,2,
- "read-only fast @connection",
+ "fast @connection",
0,NULL,0,0,0,0,0,0},
{"save",saveCommand,1,
@@ -705,7 +721,7 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0},
{"lastsave",lastsaveCommand,1,
- "read-only random fast ok-loading ok-stale @admin @dangerous",
+ "random fast ok-loading ok-stale @admin @dangerous",
0,NULL,0,0,0,0,0,0},
{"type",typeCommand,2,
@@ -781,7 +797,7 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0},
{"role",roleCommand,1,
- "ok-loading ok-stale no-script fast read-only @dangerous",
+ "ok-loading ok-stale no-script fast @dangerous",
0,NULL,0,0,0,0,0,0},
{"debug",debugCommand,-2,
@@ -809,7 +825,7 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0},
{"publish",publishCommand,3,
- "pub-sub ok-loading ok-stale fast",
+ "pub-sub ok-loading ok-stale fast may-replicate",
0,NULL,0,0,0,0,0,0},
{"pubsub",pubsubCommand,-2,
@@ -868,18 +884,18 @@ struct redisCommand redisCommandTable[] = {
"admin no-script random ok-loading ok-stale @connection",
0,NULL,0,0,0,0,0,0},
- {"hello",helloCommand,-2,
+ {"hello",helloCommand,-1,
"no-auth no-script fast no-monitor ok-loading ok-stale no-slowlog @connection",
0,NULL,0,0,0,0,0,0},
/* EVAL can modify the dataset, however it is not flagged as a write
* command since we do the check while running commands from Lua. */
{"eval",evalCommand,-3,
- "no-script @scripting",
+ "no-script may-replicate @scripting",
0,evalGetKeys,0,0,0,0,0,0},
{"evalsha",evalShaCommand,-3,
- "no-script @scripting",
+ "no-script may-replicate @scripting",
0,evalGetKeys,0,0,0,0,0,0},
{"slowlog",slowlogCommand,-2,
@@ -887,11 +903,11 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0},
{"script",scriptCommand,-2,
- "no-script @scripting",
+ "no-script may-replicate @scripting",
0,NULL,0,0,0,0,0,0},
{"time",timeCommand,1,
- "read-only random fast ok-loading ok-stale",
+ "random fast ok-loading ok-stale",
0,NULL,0,0,0,0,0,0},
{"bitop",bitopCommand,-4,
@@ -968,16 +984,19 @@ struct redisCommand redisCommandTable[] = {
* we claim that the representation, even if accessible, is an internal
* affair, and the command is semantically read only. */
{"pfcount",pfcountCommand,-2,
- "read-only @hyperloglog",
+ "read-only may-replicate @hyperloglog",
0,NULL,1,-1,1,0,0,0},
{"pfmerge",pfmergeCommand,-2,
"write use-memory @hyperloglog",
0,NULL,1,-1,1,0,0,0},
+ /* Unlike PFCOUNT that is considered as a read-only command (although
+ * it changes a bit), PFDEBUG may change the entire key when converting
+ * from sparse to dense representation */
{"pfdebug",pfdebugCommand,-3,
- "admin write",
- 0,NULL,0,0,0,0,0,0},
+ "admin write use-memory @hyperloglog",
+ 0,NULL,2,2,1,0,0,0},
{"xadd",xaddCommand,-5,
"write use-memory fast random @stream",
@@ -1023,6 +1042,10 @@ struct redisCommand redisCommandTable[] = {
"write random fast @stream",
0,NULL,1,1,1,0,0,0},
+ {"xautoclaim",xautoclaimCommand,-6,
+ "write random fast @stream",
+ 0,NULL,1,1,1,0,0,0},
+
{"xinfo",xinfoCommand,-2,
"read-only random @stream",
0,NULL,2,2,1,0,0,0},
@@ -1059,7 +1082,7 @@ struct redisCommand redisCommandTable[] = {
"read-only @string",
0,lcsGetKeys,0,0,0,0,0,0},
- {"reset",resetCommand,-1,
+ {"reset",resetCommand,1,
"no-script ok-stale ok-loading fast @connection",
0,NULL,0,0,0,0,0,0}
};
@@ -1547,12 +1570,33 @@ void updateDictResizePolicy(void) {
dictDisableResize();
}
+const char *strChildType(int type) {
+ switch(type) {
+ case CHILD_TYPE_RDB: return "RDB";
+ case CHILD_TYPE_AOF: return "AOF";
+ case CHILD_TYPE_LDB: return "LDB";
+ case CHILD_TYPE_MODULE: return "MODULE";
+ default: return "Unknown";
+ }
+}
+
/* Return true if there are active children processes doing RDB saving,
* AOF rewriting, or some side process spawned by a loaded module. */
int hasActiveChildProcess() {
- return server.rdb_child_pid != -1 ||
- server.aof_child_pid != -1 ||
- server.module_child_pid != -1;
+ return server.child_pid != -1;
+}
+
+void resetChildState() {
+ server.child_type = CHILD_TYPE_NONE;
+ server.child_pid = -1;
+ server.stat_current_cow_bytes = 0;
+ updateDictResizePolicy();
+ closeChildInfoPipe();
+}
+
+/* Return if child type is mutual exclusive with other fork children */
+int isMutuallyExclusiveChildType(int type) {
+ return type == CHILD_TYPE_RDB || type == CHILD_TYPE_AOF || type == CHILD_TYPE_MODULE;
}
/* Return true if this instance has persistence completely turned off:
@@ -1874,29 +1918,30 @@ void checkChildrenDone(void) {
if (pid == -1) {
serverLog(LL_WARNING,"wait3() returned an error: %s. "
- "rdb_child_pid = %d, aof_child_pid = %d, module_child_pid = %d",
+ "child_type: %s, child_pid = %d",
strerror(errno),
- (int) server.rdb_child_pid,
- (int) server.aof_child_pid,
- (int) server.module_child_pid);
- } else if (pid == server.rdb_child_pid) {
- backgroundSaveDoneHandler(exitcode,bysignal);
- if (!bysignal && exitcode == 0) receiveChildInfo();
- } else if (pid == server.aof_child_pid) {
- backgroundRewriteDoneHandler(exitcode,bysignal);
- if (!bysignal && exitcode == 0) receiveChildInfo();
- } else if (pid == server.module_child_pid) {
- ModuleForkDoneHandler(exitcode,bysignal);
+ strChildType(server.child_type),
+ (int) server.child_pid);
+ } else if (pid == server.child_pid) {
+ if (server.child_type == CHILD_TYPE_RDB) {
+ backgroundSaveDoneHandler(exitcode, bysignal);
+ } else if (server.child_type == CHILD_TYPE_AOF) {
+ backgroundRewriteDoneHandler(exitcode, bysignal);
+ } else if (server.child_type == CHILD_TYPE_MODULE) {
+ ModuleForkDoneHandler(exitcode, bysignal);
+ } else {
+ serverPanic("Unknown child type %d for child pid %d", server.child_type, server.child_pid);
+ exit(1);
+ }
if (!bysignal && exitcode == 0) receiveChildInfo();
+ resetChildState();
} else {
if (!ldbRemoveChild(pid)) {
serverLog(LL_WARNING,
- "Warning, detected child with unmatched pid: %ld",
- (long)pid);
+ "Warning, detected child with unmatched pid: %ld",
+ (long) pid);
}
}
- updateDictResizePolicy();
- closeChildInfoPipe();
/* start any pending forks immediately. */
replicationStartPendingFork();
@@ -2065,6 +2110,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* Check if a background saving or AOF rewrite in progress terminated. */
if (hasActiveChildProcess() || ldbPendingChildren())
{
+ run_with_period(1000) receiveChildInfo();
checkChildrenDone();
} else {
/* If there is not a background saving/rewrite in progress check if
@@ -2124,8 +2170,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
flushAppendOnlyFile(0);
}
- /* Clear the paused clients flag if needed. */
- clientsArePaused(); /* Don't check return value, just use the side effect.*/
+ /* Clear the paused clients state if needed. */
+ checkClientPauseTimeoutAndReturnIfPaused();
/* Replication cron function -- used to reconnect to master,
* detect transfer failures, start background RDB transfers and so forth. */
@@ -2183,12 +2229,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
void blockingOperationStarts() {
- updateCachedTime(0);
- server.blocked_last_cron = server.mstime;
+ if(!server.blocking_op_nesting++){
+ updateCachedTime(0);
+ server.blocked_last_cron = server.mstime;
+ }
}
void blockingOperationEnds() {
- server.blocked_last_cron = 0;
+ if(!(--server.blocking_op_nesting)){
+ server.blocked_last_cron = 0;
+ }
}
/* This function fill in the role of serverCron during RDB or AOF loading, and
@@ -2225,6 +2275,7 @@ void whileBlockedCron() {
activeDefragCycle();
server.blocked_last_cron += hz_ms;
+
/* Increment cronloop so that run_with_period works. */
server.cronloops++;
}
@@ -2317,8 +2368,12 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
* during the previous event loop iteration. Note that we do this after
* processUnblockedClients(), so if there are multiple pipelined WAITs
* and the just unblocked WAIT gets blocked again, we don't have to wait
- * a server cron cycle in absence of other event loop events. See #6623. */
- if (server.get_ack_from_slaves) {
+ * a server cron cycle in absence of other event loop events. See #6623.
+ *
+ * We also don't send the ACKs while clients are paused, since it can
+ * increment the replication backlog, they'll be sent after the pause
+ * if we are still the master. */
+ if (server.get_ack_from_slaves && !checkClientPauseTimeoutAndReturnIfPaused()) {
robj *argv[3];
argv[0] = createStringObject("REPLCONF",8);
@@ -2376,9 +2431,9 @@ void afterSleep(struct aeEventLoop *eventLoop) {
void createSharedObjects(void) {
int j;
+ /* Shared command responses */
shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n"));
shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n"));
- shared.err = createObject(OBJ_STRING,sdsnew("-ERR\r\n"));
shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n"));
shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n"));
shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n"));
@@ -2386,8 +2441,14 @@ void createSharedObjects(void) {
shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n"));
shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n"));
shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n"));
+ shared.space = createObject(OBJ_STRING,sdsnew(" "));
+ shared.colon = createObject(OBJ_STRING,sdsnew(":"));
+ shared.plus = createObject(OBJ_STRING,sdsnew("+"));
+
+ /* Shared command error responses */
shared.wrongtypeerr = createObject(OBJ_STRING,sdsnew(
"-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"));
+ shared.err = createObject(OBJ_STRING,sdsnew("-ERR\r\n"));
shared.nokeyerr = createObject(OBJ_STRING,sdsnew(
"-ERR no such key\r\n"));
shared.syntaxerr = createObject(OBJ_STRING,sdsnew(
@@ -2418,9 +2479,6 @@ void createSharedObjects(void) {
"-NOREPLICAS Not enough good replicas to write.\r\n"));
shared.busykeyerr = createObject(OBJ_STRING,sdsnew(
"-BUSYKEY Target key name already exists.\r\n"));
- shared.space = createObject(OBJ_STRING,sdsnew(" "));
- shared.colon = createObject(OBJ_STRING,sdsnew(":"));
- shared.plus = createObject(OBJ_STRING,sdsnew("+"));
/* The shared NULL depends on the protocol version. */
shared.null[0] = NULL;
@@ -2945,9 +3003,9 @@ void resetServerStats(void) {
atomicSet(server.stat_net_input_bytes, 0);
atomicSet(server.stat_net_output_bytes, 0);
server.stat_unexpected_error_replies = 0;
+ server.stat_total_error_replies = 0;
server.stat_dump_payload_sanitizations = 0;
server.aof_delayed_fsync = 0;
- server.blocked_last_cron = 0;
}
/* Make the thread killable at any time, so that kill threads functions
@@ -2978,6 +3036,7 @@ void initServer(void) {
server.in_fork_child = CHILD_TYPE_NONE;
server.main_thread_id = pthread_self();
server.current_client = NULL;
+ server.errors = raxNew();
server.fixed_time_expire = 0;
server.clients = listCreate();
server.clients_index = raxNew();
@@ -2992,9 +3051,12 @@ void initServer(void) {
server.ready_keys = listCreate();
server.clients_waiting_acks = listCreate();
server.get_ack_from_slaves = 0;
- server.clients_paused = 0;
+ server.client_pause_type = 0;
+ server.paused_clients = listCreate();
server.events_processed_while_blocked = 0;
server.system_memory_size = zmalloc_get_memory_size();
+ server.blocked_last_cron = 0;
+ server.blocking_op_nesting = 0;
if ((server.tls_port || server.tls_replication || server.tls_cluster)
&& tlsConfigure(&server.tls_ctx_config) == C_ERR) {
@@ -3064,9 +3126,9 @@ void initServer(void) {
server.in_eval = 0;
server.in_exec = 0;
server.propagate_in_transaction = 0;
- server.rdb_child_pid = -1;
- server.aof_child_pid = -1;
- server.module_child_pid = -1;
+ server.client_pause_in_transaction = 0;
+ server.child_pid = -1;
+ server.child_type = CHILD_TYPE_NONE;
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
server.rdb_pipe_conns = NULL;
server.rdb_pipe_numconns = 0;
@@ -3076,7 +3138,7 @@ void initServer(void) {
server.rdb_bgsave_scheduled = 0;
server.child_info_pipe[0] = -1;
server.child_info_pipe[1] = -1;
- server.child_info_data.magic = 0;
+ server.child_info_nread = 0;
aofRewriteBufferReset();
server.aof_buf = sdsempty();
server.lastsave = time(NULL); /* At startup we consider the DB saved. */
@@ -3088,6 +3150,7 @@ void initServer(void) {
/* A few stats we don't want to reset: server startup time, and peak mem. */
server.stat_starttime = time(NULL);
server.stat_peak_memory = 0;
+ server.stat_current_cow_bytes = 0;
server.stat_rdb_cow_bytes = 0;
server.stat_aof_cow_bytes = 0;
server.stat_module_cow_bytes = 0;
@@ -3230,6 +3293,8 @@ int populateCommandTableParseFlags(struct redisCommand *c, char *strflags) {
c->flags |= CMD_FAST | CMD_CATEGORY_FAST;
} else if (!strcasecmp(flag,"no-auth")) {
c->flags |= CMD_NO_AUTH;
+ } else if (!strcasecmp(flag,"may-replicate")) {
+ c->flags |= CMD_MAY_REPLICATE;
} else {
/* Parse ACL categories here if the flag name starts with @. */
uint64_t catflag;
@@ -3284,11 +3349,18 @@ void resetCommandTableStats(void) {
c = (struct redisCommand *) dictGetVal(de);
c->microseconds = 0;
c->calls = 0;
+ c->rejected_calls = 0;
+ c->failed_calls = 0;
}
dictReleaseIterator(di);
}
+void resetErrorTableStats(void) {
+ raxFreeWithCallback(server.errors, zfree);
+ server.errors = raxNew();
+}
+
/* ========================== Redis OP Array API ============================ */
void redisOpArrayInit(redisOpArray *oa) {
@@ -3374,6 +3446,18 @@ struct redisCommand *lookupCommandOrOriginal(sds name) {
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
+ /* Propagate a MULTI request once we encounter the first command which
+ * is a write command.
+ * This way we'll deliver the MULTI/..../EXEC block as a whole and
+ * both the AOF and the replication link will have the same consistency
+ * and atomicity guarantees. */
+ if (server.in_exec && !server.propagate_in_transaction)
+ execCommandPropagateMulti(dbid);
+
+ /* This needs to be unreachable since the dataset should be fixed during
+ * client pause, otherwise data may be lossed during a failover. */
+ serverAssert(!(areClientsPaused() && !server.client_pause_in_transaction));
+
if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
if (flags & PROPAGATE_REPL)
@@ -3412,6 +3496,7 @@ void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
* Redis command implementation in order to to force the propagation of a
* specific command execution into AOF / Replication. */
void forceCommandPropagation(client *c, int flags) {
+ serverAssert(c->cmd->flags & (CMD_WRITE | CMD_MAY_REPLICATE));
if (flags & PROPAGATE_REPL) c->flags |= CLIENT_FORCE_REPL;
if (flags & PROPAGATE_AOF) c->flags |= CLIENT_FORCE_AOF;
}
@@ -3475,6 +3560,7 @@ void call(client *c, int flags) {
ustime_t start, duration;
int client_old_flags = c->flags;
struct redisCommand *real_cmd = c->cmd;
+ static long long prev_err_count;
server.fixed_time_expire++;
@@ -3495,6 +3581,7 @@ void call(client *c, int flags) {
/* Call the command. */
dirty = server.dirty;
+ prev_err_count = server.stat_total_error_replies;
updateCachedTime(0);
start = server.ustime;
c->cmd->proc(c);
@@ -3502,6 +3589,14 @@ void call(client *c, int flags) {
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;
+ /* Update failed command calls if required.
+ * We leverage a static variable (prev_err_count) to retain
+ * the counter across nested function calls and avoid logging
+ * the same error twice. */
+ if ((server.stat_total_error_replies - prev_err_count) > 0) {
+ real_cmd->failed_calls++;
+ }
+
/* After executing command, we will close the client after writing entire
* reply if it is set 'CLIENT_CLOSE_AFTER_COMMAND' flag. */
if (c->flags & CLIENT_CLOSE_AFTER_COMMAND) {
@@ -3604,7 +3699,7 @@ void call(client *c, int flags) {
!(c->flags & CLIENT_MULTI) &&
!(flags & CMD_CALL_NOWRAP))
{
- execCommandPropagateMulti(c);
+ execCommandPropagateMulti(c->db->id);
multi_emitted = 1;
}
@@ -3619,13 +3714,19 @@ void call(client *c, int flags) {
}
if (multi_emitted) {
- execCommandPropagateExec(c);
+ execCommandPropagateExec(c->db->id);
}
}
redisOpArrayFree(&server.also_propagate);
}
server.also_propagate = prev_also_propagate;
+ /* Client pause takes effect after a transaction has finished. This needs
+ * to be located after everything is propagated. */
+ if (!server.in_exec && server.client_pause_in_transaction) {
+ server.client_pause_in_transaction = 0;
+ }
+
/* If the client has keys tracking enabled for client side caching,
* make sure to remember the keys it fetched via this command. */
if (c->cmd->flags & CMD_READONLY) {
@@ -3640,6 +3741,7 @@ void call(client *c, int flags) {
server.fixed_time_expire--;
server.stat_numcommands++;
+ prev_err_count = server.stat_total_error_replies;
/* Record peak memory after each command and before the eviction that runs
* before the next command. */
@@ -3655,6 +3757,7 @@ void call(client *c, int flags) {
* Note: 'reply' is expected to end with \r\n */
void rejectCommand(client *c, robj *reply) {
flagTransaction(c);
+ if (c->cmd) c->cmd->rejected_calls++;
if (c->cmd && c->cmd->proc == execCommand) {
execCommandAbort(c, reply->ptr);
} else {
@@ -3664,6 +3767,7 @@ void rejectCommand(client *c, robj *reply) {
}
void rejectCommandFormat(client *c, const char *fmt, ...) {
+ if (c->cmd) c->cmd->rejected_calls++;
flagTransaction(c);
va_list ap;
va_start(ap,fmt);
@@ -3674,10 +3778,11 @@ void rejectCommandFormat(client *c, const char *fmt, ...) {
sdsmapchars(s, "\r\n", " ", 2);
if (c->cmd && c->cmd->proc == execCommand) {
execCommandAbort(c, s);
+ sdsfree(s);
} else {
+ /* The following frees 's'. */
addReplyErrorSds(c, s);
}
- sdsfree(s);
}
/* Returns 1 for commands that may have key names in their arguments, but have
@@ -3735,6 +3840,8 @@ int processCommand(client *c) {
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_STALE));
int is_denyloading_command = !(c->cmd->flags & CMD_LOADING) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_LOADING));
+ int is_may_replicate_command = (c->cmd->flags & (CMD_WRITE | CMD_MAY_REPLICATE)) ||
+ (c->cmd->proc == execCommand && (c->mstate.cmd_flags & (CMD_WRITE | CMD_MAY_REPLICATE)));
/* Check if the user is authenticated. This check is skipped in case
* the default user is flagged as "nopass" and is active. */
@@ -3789,6 +3896,7 @@ int processCommand(client *c) {
flagTransaction(c);
}
clusterRedirectClient(c,n,hashslot,error_code);
+ c->cmd->rejected_calls++;
return C_OK;
}
}
@@ -3932,6 +4040,17 @@ int processCommand(client *c) {
return C_OK;
}
+ /* If the server is paused, block the client until
+ * the pause has ended. Replicas are never paused. */
+ if (!(c->flags & CLIENT_SLAVE) &&
+ ((server.client_pause_type == CLIENT_PAUSE_ALL) ||
+ (server.client_pause_type == CLIENT_PAUSE_WRITE && is_may_replicate_command)))
+ {
+ c->bpop.timeout = 0;
+ blockClient(c,BLOCKED_PAUSE);
+ return C_OK;
+ }
+
/* Exec the command */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
@@ -3946,9 +4065,22 @@ int processCommand(client *c) {
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys();
}
+
return C_OK;
}
+/* ====================== Error lookup and execution ===================== */
+
+void incrementErrorCount(const char *fullerr, size_t namelen) {
+ struct redisError *error = raxFind(server.errors,(unsigned char*)fullerr,namelen);
+ if (error == raxNotFound) {
+ error = zmalloc(sizeof(*error));
+ error->count = 0;
+ raxInsert(server.errors,(unsigned char*)fullerr,namelen,error,NULL);
+ }
+ error->count++;
+}
+
/*================================== Shutdown =============================== */
/* Close listening sockets. Also unlink the unix domain socket if
@@ -3990,25 +4122,28 @@ int prepareForShutdown(int flags) {
/* Kill the saving child if there is a background saving in progress.
We want to avoid race conditions, for instance our saving child may
overwrite the synchronous saving did by SHUTDOWN. */
- if (server.rdb_child_pid != -1) {
+ if (server.child_type == CHILD_TYPE_RDB) {
serverLog(LL_WARNING,"There is a child saving an .rdb. Killing it!");
- /* Note that, in killRDBChild, we call rdbRemoveTempFile that will
- * do close fd(in order to unlink file actully) in background thread.
+ killRDBChild();
+ /* Note that, in killRDBChild normally has backgroundSaveDoneHandler
+ * doing it's cleanup, but in this case this code will not be reached,
+ * so we need to call rdbRemoveTempFile which will close fd(in order
+ * to unlink file actully) in background thread.
* The temp rdb file fd may won't be closed when redis exits quickly,
* but OS will close this fd when process exits. */
- killRDBChild();
+ rdbRemoveTempFile(server.child_pid, 0);
}
/* Kill module child if there is one. */
- if (server.module_child_pid != -1) {
+ if (server.child_type == CHILD_TYPE_MODULE) {
serverLog(LL_WARNING,"There is a module fork child. Killing it!");
- TerminateModuleForkChild(server.module_child_pid,0);
+ TerminateModuleForkChild(server.child_pid,0);
}
if (server.aof_state != AOF_OFF) {
/* Kill the AOF saving child as the AOF we already have may be longer
* but contains the full dataset anyway. */
- if (server.aof_child_pid != -1) {
+ if (server.child_type == CHILD_TYPE_AOF) {
/* If we have AOF enabled but haven't written the AOF yet, don't
* shutdown or else the dataset will be lost. */
if (server.aof_state == AOF_WAIT_REWRITE) {
@@ -4170,6 +4305,7 @@ void addReplyCommand(client *c, struct redisCommand *cmd) {
flagcount += addReplyCommandFlag(c,cmd,CMD_ASKING, "asking");
flagcount += addReplyCommandFlag(c,cmd,CMD_FAST, "fast");
flagcount += addReplyCommandFlag(c,cmd,CMD_NO_AUTH, "no_auth");
+ flagcount += addReplyCommandFlag(c,cmd,CMD_MAY_REPLICATE, "may_replicate");
if (cmdHasMovableKeys(cmd)) {
addReplyStatus(c, "movablekeys");
flagcount += 1;
@@ -4191,10 +4327,14 @@ void commandCommand(client *c) {
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
const char *help[] = {
-"(no subcommand) -- Return details about all Redis commands.",
-"COUNT -- Return the total number of commands in this Redis server.",
-"GETKEYS <full-command> -- Return the keys from a full Redis command.",
-"INFO [command-name ...] -- Return details about multiple Redis commands.",
+"(no subcommand)",
+" Return details about all Redis commands.",
+"COUNT",
+" Return the total number of commands in this Redis server.",
+"GETKEYS <full-command>",
+" Return the keys from a full Redis command.",
+"INFO [<command-name> ...]",
+" Return details about multiple Redis commands.",
NULL
};
addReplyHelp(c, help);
@@ -4524,6 +4664,7 @@ sds genRedisInfoString(const char *section) {
info = sdscatprintf(info,
"# Persistence\r\n"
"loading:%d\r\n"
+ "current_cow_size:%zu\r\n"
"rdb_changes_since_last_save:%lld\r\n"
"rdb_bgsave_in_progress:%d\r\n"
"rdb_last_save_time:%jd\r\n"
@@ -4542,24 +4683,25 @@ sds genRedisInfoString(const char *section) {
"module_fork_in_progress:%d\r\n"
"module_fork_last_cow_size:%zu\r\n",
server.loading,
+ server.stat_current_cow_bytes,
server.dirty,
- server.rdb_child_pid != -1,
+ server.child_type == CHILD_TYPE_RDB,
(intmax_t)server.lastsave,
(server.lastbgsave_status == C_OK) ? "ok" : "err",
(intmax_t)server.rdb_save_time_last,
- (intmax_t)((server.rdb_child_pid == -1) ?
+ (intmax_t)((server.child_type != CHILD_TYPE_RDB) ?
-1 : time(NULL)-server.rdb_save_time_start),
server.stat_rdb_cow_bytes,
server.aof_state != AOF_OFF,
- server.aof_child_pid != -1,
+ server.child_type == CHILD_TYPE_AOF,
server.aof_rewrite_scheduled,
(intmax_t)server.aof_rewrite_time_last,
- (intmax_t)((server.aof_child_pid == -1) ?
+ (intmax_t)((server.child_type != CHILD_TYPE_AOF) ?
-1 : time(NULL)-server.aof_rewrite_time_start),
(server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err",
(server.aof_last_write_status == C_OK) ? "ok" : "err",
server.stat_aof_cow_bytes,
- server.module_child_pid != -1,
+ server.child_type == CHILD_TYPE_MODULE,
server.stat_module_cow_bytes);
if (server.aof_enabled) {
@@ -4665,6 +4807,7 @@ sds genRedisInfoString(const char *section) {
"tracking_total_items:%lld\r\n"
"tracking_total_prefixes:%lld\r\n"
"unexpected_error_replies:%lld\r\n"
+ "total_error_replies:%lld\r\n"
"dump_payload_sanitizations:%lld\r\n"
"total_reads_processed:%lld\r\n"
"total_writes_processed:%lld\r\n"
@@ -4702,6 +4845,7 @@ sds genRedisInfoString(const char *section) {
(unsigned long long) trackingGetTotalItems(),
(unsigned long long) trackingGetTotalPrefixes(),
server.stat_unexpected_error_replies,
+ server.stat_total_error_replies,
server.stat_dump_payload_sanitizations,
stat_total_reads_processed,
stat_total_writes_processed,
@@ -4892,14 +5036,33 @@ sds genRedisInfoString(const char *section) {
di = dictGetSafeIterator(server.commands);
while((de = dictNext(di)) != NULL) {
c = (struct redisCommand *) dictGetVal(de);
- if (!c->calls) continue;
+ if (!c->calls && !c->failed_calls && !c->rejected_calls)
+ continue;
info = sdscatprintf(info,
- "cmdstat_%s:calls=%lld,usec=%lld,usec_per_call=%.2f\r\n",
+ "cmdstat_%s:calls=%lld,usec=%lld,usec_per_call=%.2f"
+ ",rejected_calls=%lld,failed_calls=%lld\r\n",
c->name, c->calls, c->microseconds,
- (c->calls == 0) ? 0 : ((float)c->microseconds/c->calls));
+ (c->calls == 0) ? 0 : ((float)c->microseconds/c->calls),
+ c->rejected_calls, c->failed_calls);
}
dictReleaseIterator(di);
}
+ /* Error statistics */
+ if (allsections || defsections || !strcasecmp(section,"errorstats")) {
+ if (sections++) info = sdscat(info,"\r\n");
+ info = sdscat(info, "# Errorstats\r\n");
+ raxIterator ri;
+ raxStart(&ri,server.errors);
+ raxSeek(&ri,"^",NULL,0);
+ struct redisError *e;
+ while(raxNext(&ri)) {
+ e = (struct redisError *) ri.data;
+ info = sdscatprintf(info,
+ "errorstat_%.*s:count=%lld\r\n",
+ (int)ri.key_len, ri.key, e->count);
+ }
+ raxStop(&ri);
+ }
/* Cluster */
if (allsections || defsections || !strcasecmp(section,"cluster")) {
@@ -4944,7 +5107,7 @@ void infoCommand(client *c) {
char *section = c->argc == 2 ? c->argv[1]->ptr : "default";
if (c->argc > 2) {
- addReply(c,shared.syntaxerr);
+ addReplyErrorObject(c,shared.syntaxerr);
return;
}
sds info = genRedisInfoString(section);
@@ -4971,6 +5134,21 @@ void monitorCommand(client *c) {
/* =================================== Main! ================================ */
+int checkIgnoreWarning(const char *warning) {
+ int argc, j;
+ sds *argv = sdssplitargs(server.ignore_warnings, &argc);
+ if (argv == NULL)
+ return 0;
+
+ for (j = 0; j < argc; j++) {
+ char *flag = argv[j];
+ if (!strcasecmp(flag, warning))
+ break;
+ }
+ sdsfreesplitres(argv,argc);
+ return j < argc;
+}
+
#ifdef __linux__
int linuxOvercommitMemoryValue(void) {
FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
@@ -4994,6 +5172,113 @@ void linuxMemoryWarnings(void) {
serverLog(LL_WARNING,"WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo madvise > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled (set to 'madvise' or 'never').");
}
}
+
+#ifdef __arm64__
+
+/* Get size in kilobytes of the Shared_Dirty pages of the calling process for the
+ * memory map corresponding to the provided address, or -1 on error. */
+static int smapsGetSharedDirty(unsigned long addr) {
+ int ret, in_mapping = 0, val = -1;
+ unsigned long from, to;
+ char buf[64];
+ FILE *f;
+
+ f = fopen("/proc/self/smaps", "r");
+ serverAssert(f);
+
+ while (1) {
+ if (!fgets(buf, sizeof(buf), f))
+ break;
+
+ ret = sscanf(buf, "%lx-%lx", &from, &to);
+ if (ret == 2)
+ in_mapping = from <= addr && addr < to;
+
+ if (in_mapping && !memcmp(buf, "Shared_Dirty:", 13)) {
+ ret = sscanf(buf, "%*s %d", &val);
+ serverAssert(ret == 1);
+ break;
+ }
+ }
+
+ fclose(f);
+ return val;
+}
+
+/* Older arm64 Linux kernels have a bug that could lead to data corruption
+ * during background save in certain scenarios. This function checks if the
+ * kernel is affected.
+ * The bug was fixed in commit ff1712f953e27f0b0718762ec17d0adb15c9fd0b
+ * titled: "arm64: pgtable: Ensure dirty bit is preserved across pte_wrprotect()"
+ * Return 1 if the kernel seems to be affected, and 0 otherwise. */
+int linuxMadvFreeForkBugCheck(void) {
+ int ret, pipefd[2];
+ pid_t pid;
+ char *p, *q, bug_found = 0;
+ const long map_size = 3 * 4096;
+
+ /* Create a memory map that's in our full control (not one used by the allocator). */
+ p = mmap(NULL, map_size, PROT_READ, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
+ serverAssert(p != MAP_FAILED);
+
+ q = p + 4096;
+
+ /* Split the memory map in 3 pages by setting their protection as RO|RW|RO to prevent
+ * Linux from merging this memory map with adjacent VMAs. */
+ ret = mprotect(q, 4096, PROT_READ | PROT_WRITE);
+ serverAssert(!ret);
+
+ /* Write to the page once to make it resident */
+ *(volatile char*)q = 0;
+
+ /* Tell the kernel that this page is free to be reclaimed. */
+#ifndef MADV_FREE
+#define MADV_FREE 8
+#endif
+ ret = madvise(q, 4096, MADV_FREE);
+ serverAssert(!ret);
+
+ /* Write to the page after being marked for freeing, this is supposed to take
+ * ownership of that page again. */
+ *(volatile char*)q = 0;
+
+ /* Create a pipe for the child to return the info to the parent. */
+ ret = pipe(pipefd);
+ serverAssert(!ret);
+
+ /* Fork the process. */
+ pid = fork();
+ serverAssert(pid >= 0);
+ if (!pid) {
+ /* Child: check if the page is marked as dirty, expecing 4 (kB).
+ * A value of 0 means the kernel is affected by the bug. */
+ if (!smapsGetSharedDirty((unsigned long)q))
+ bug_found = 1;
+
+ ret = write(pipefd[1], &bug_found, 1);
+ serverAssert(ret == 1);
+
+ exit(0);
+ } else {
+ /* Read the result from the child. */
+ ret = read(pipefd[0], &bug_found, 1);
+ serverAssert(ret == 1);
+
+ /* Reap the child pid. */
+ serverAssert(waitpid(pid, NULL, 0) == pid);
+ }
+
+ /* Cleanup */
+ ret = close(pipefd[0]);
+ serverAssert(!ret);
+ ret = close(pipefd[1]);
+ serverAssert(!ret);
+ ret = munmap(p, map_size);
+ serverAssert(!ret);
+
+ return bug_found;
+}
+#endif /* __arm64__ */
#endif /* __linux__ */
void createPidFile(void) {
@@ -5189,10 +5474,22 @@ void closeClildUnusedResourceAfterFork() {
closeListeningSockets(0);
if (server.cluster_enabled && server.cluster_config_file_lock_fd != -1)
close(server.cluster_config_file_lock_fd); /* don't care if this fails */
+
+ /* Clear server.pidfile, this is the parent pidfile which should not
+ * be touched (or deleted) by the child (on exit / crash) */
+ zfree(server.pidfile);
+ server.pidfile = NULL;
}
/* purpose is one of CHILD_TYPE_ types */
int redisFork(int purpose) {
+ if (isMutuallyExclusiveChildType(purpose)) {
+ if (hasActiveChildProcess())
+ return -1;
+
+ openChildInfoPipe();
+ }
+
int childpid;
long long start = ustime();
if ((childpid = fork()) == 0) {
@@ -5208,23 +5505,38 @@ int redisFork(int purpose) {
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
if (childpid == -1) {
+ if (isMutuallyExclusiveChildType(purpose)) closeChildInfoPipe();
return -1;
}
+
+ /* The child_pid and child_type are only for mutual exclusive children.
+ * other child types should handle and store their pid's in dedicated variables.
+ *
+ * Today, we allows CHILD_TYPE_LDB to run in parallel with the other fork types:
+ * - it isn't used for production, so it will not make the server be less efficient
+ * - used for debugging, and we don't want to block it from running while other
+ * forks are running (like RDB and AOF) */
+ if (isMutuallyExclusiveChildType(purpose)) {
+ server.child_pid = childpid;
+ server.child_type = purpose;
+ server.stat_current_cow_bytes = 0;
+ }
+
+ updateDictResizePolicy();
}
return childpid;
}
-void sendChildCOWInfo(int ptype, char *pname) {
+void sendChildCOWInfo(int ptype, int on_exit, char *pname) {
size_t private_dirty = zmalloc_get_private_dirty(-1);
if (private_dirty) {
- serverLog(LL_NOTICE,
+ serverLog(on_exit ? LL_NOTICE : LL_VERBOSE,
"%s: %zu MB of memory used by copy-on-write",
- pname, private_dirty/(1024*1024));
+ pname, private_dirty);
}
- server.child_info_data.cow_size = private_dirty;
- sendChildInfo(ptype);
+ sendChildInfo(ptype, on_exit, private_dirty);
}
void memtest(size_t megabytes, int passes);
@@ -5282,7 +5594,7 @@ void loadDataFromDisk(void) {
void redisOutOfMemoryHandler(size_t allocation_size) {
serverLog(LL_WARNING,"Out Of Memory allocating %zu bytes!",
allocation_size);
- serverPanic("Redis aborting for OUT OF MEMORY. Allocating %zu bytes!",
+ serverPanic("Redis aborting for OUT OF MEMORY. Allocating %zu bytes!",
allocation_size);
}
@@ -5436,6 +5748,7 @@ int main(int argc, char **argv) {
srand(time(NULL)^getpid());
srandom(time(NULL)^getpid());
gettimeofday(&tv,NULL);
+ init_genrand64(((long long) tv.tv_sec * 1000000 + tv.tv_usec) ^ getpid());
crc64_init();
uint8_t hashseed[16];
@@ -5564,7 +5877,16 @@ int main(int argc, char **argv) {
serverLog(LL_WARNING,"Server initialized");
#ifdef __linux__
linuxMemoryWarnings();
- #endif
+ #if defined (__arm64__)
+ if (linuxMadvFreeForkBugCheck()) {
+ serverLog(LL_WARNING,"WARNING Your kernel has a bug that could lead to data corruption during background save. Please upgrade to the latest stable kernel.");
+ if (!checkIgnoreWarning("ARM64-COW-BUG")) {
+ serverLog(LL_WARNING,"Redis will now exit to prevent data corruption. Note that it is possible to suppress this warning by setting the following config: ignore-warnings ARM64-COW-BUG");
+ exit(1);
+ }
+ }
+ #endif /* __arm64__ */
+ #endif /* __linux__ */
moduleInitModulesSystemLast();
moduleLoadFromQueue();
ACLLoadUsersAtStartup();