summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/.codespellrc2
-rw-r--r--redis.conf14
-rw-r--r--src/blocked.c18
-rw-r--r--src/cluster.c6
-rw-r--r--src/commands.c10
-rw-r--r--src/commands/shutdown.json25
-rw-r--r--src/config.c1
-rw-r--r--src/db.c54
-rw-r--r--src/networking.c85
-rw-r--r--src/replication.c4
-rw-r--r--src/server.c249
-rw-r--r--src/server.h32
-rw-r--r--tests/integration/psync2-master-restart.tcl21
-rw-r--r--tests/integration/shutdown.tcl238
-rw-r--r--tests/support/server.tcl6
-rw-r--r--tests/support/util.tcl7
-rw-r--r--tests/test_helper.tcl1
-rw-r--r--tests/unit/shutdown.tcl11
18 files changed, 688 insertions, 96 deletions
diff --git a/.github/.codespellrc b/.github/.codespellrc
index 88146bef7..6cc43b26e 100644
--- a/.github/.codespellrc
+++ b/.github/.codespellrc
@@ -1,5 +1,5 @@
[codespell]
quiet-level = 2
count =
-skip = ./deps,./src/crc16_slottable.h
+skip = ./deps,./src/crc16_slottable.h,tmp*,./.git,./lcov-html
ignore-words = ./.github/wordlist.txt
diff --git a/redis.conf b/redis.conf
index 8bac2afb5..7289277cf 100644
--- a/redis.conf
+++ b/redis.conf
@@ -1442,6 +1442,20 @@ aof-use-rdb-preamble yes
# the AOF format in a way that may not be compatible with existing AOF parsers.
aof-timestamp-enabled no
+################################ SHUTDOWN #####################################
+
+# Maximum time to wait for replicas when shutting down, in seconds.
+#
+# During shut down, a grace period allows any lagging replicas to catch up with
+# the latest replication offset before the master exists. This period can
+# prevent data loss, especially for deployments without configured disk backups.
+#
+# The 'shutdown-timeout' value is the grace period's duration in seconds. It is
+# only applicable when the instance has replicas. To disable the feature, set
+# the value to 0.
+#
+# shutdown-timeout 10
+
################################ LUA SCRIPTING ###############################
# Max execution time of a Lua script in milliseconds.
diff --git a/src/blocked.c b/src/blocked.c
index ccab0e0e1..6a553926b 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -191,6 +191,8 @@ void unblockClient(client *c) {
} else if (c->btype == BLOCKED_PAUSE) {
listDelNode(server.paused_clients,c->paused_list_node);
c->paused_list_node = NULL;
+ } else if (c->btype == BLOCKED_SHUTDOWN) {
+ /* No special cleanup. */
} else {
serverPanic("Unknown btype in unblockClient().");
}
@@ -231,6 +233,22 @@ void replyToBlockedClientTimedOut(client *c) {
}
}
+/* If one or more clients are blocked on the SHUTDOWN command, this function
+ * sends them an error reply and unblocks them. */
+void replyToClientsBlockedOnShutdown(void) {
+ if (server.blocked_clients_by_type[BLOCKED_SHUTDOWN] == 0) return;
+ listNode *ln;
+ listIter li;
+ listRewind(server.clients, &li);
+ while((ln = listNext(&li))) {
+ client *c = listNodeValue(ln);
+ if (c->flags & CLIENT_BLOCKED && c->btype == BLOCKED_SHUTDOWN) {
+ addReplyError(c, "Errors trying to SHUTDOWN. Check logs.");
+ unblockClient(c);
+ }
+ }
+}
+
/* Mass-unblock clients because something changed in the instance that makes
* blocking no longer safe. For example clients blocked in list operations
* in an instance which turns from master to slave is unsafe, so this function
diff --git a/src/cluster.c b/src/cluster.c
index 3b35972d5..78e273f34 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -2325,7 +2325,9 @@ int clusterProcessPacket(clusterLink *link) {
resetManualFailover();
server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT;
server.cluster->mf_slave = sender;
- pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT),CLIENT_PAUSE_WRITE);
+ pauseClients(PAUSE_DURING_FAILOVER,
+ now + (CLUSTER_MF_TIMEOUT * CLUSTER_MF_PAUSE_MULT),
+ CLIENT_PAUSE_WRITE);
serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
sender->name);
/* We need to send a ping message to the replica, as it would carry
@@ -3590,7 +3592,7 @@ void resetManualFailover(void) {
if (server.cluster->mf_slave) {
/* We were a master failing over, so we paused clients. Regardless
* of the outcome we unpause now to allow traffic again. */
- unpauseClients();
+ unpauseClients(PAUSE_DURING_FAILOVER);
}
server.cluster->mf_end = 0; /* No manual failover in progress. */
server.cluster->mf_can_start = 0;
diff --git a/src/commands.c b/src/commands.c
index 8933fa247..bf0537b4f 100644
--- a/src/commands.c
+++ b/src/commands.c
@@ -4321,7 +4321,10 @@ struct redisCommandArg REPLICAOF_Args[] = {
/********** SHUTDOWN ********************/
/* SHUTDOWN history */
-#define SHUTDOWN_History NULL
+commandHistory SHUTDOWN_History[] = {
+{"7.0","Added the `NOW`, `FORCE` and `ABORT` modifiers. Introduced waiting for lagging replicas before exiting."},
+{0}
+};
/* SHUTDOWN hints */
#define SHUTDOWN_Hints NULL
@@ -4336,6 +4339,9 @@ struct redisCommandArg SHUTDOWN_nosave_save_Subargs[] = {
/* SHUTDOWN argument table */
struct redisCommandArg SHUTDOWN_Args[] = {
{"nosave_save",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=SHUTDOWN_nosave_save_Subargs},
+{"now",ARG_TYPE_PURE_TOKEN,-1,"NOW",NULL,NULL,CMD_ARG_OPTIONAL},
+{"force",ARG_TYPE_PURE_TOKEN,-1,"FORCE",NULL,NULL,CMD_ARG_OPTIONAL},
+{"abort",ARG_TYPE_PURE_TOKEN,-1,"ABORT",NULL,NULL,CMD_ARG_OPTIONAL},
{0}
};
@@ -6542,7 +6548,7 @@ struct redisCommand redisCommandTable[] = {
{"restore-asking","An internal command for migrating keys in a cluster","O(1) to create the new key and additional O(N*M) to reconstruct the serialized value, where N is the number of Redis objects composing the value and M their average size. For small string values the time complexity is thus O(1)+O(1*M) where M is small, so simply O(1). However for sorted set values the complexity is O(N*M*log(N)) because inserting values into sorted sets is O(log(N)).","3.0.0",CMD_DOC_SYSCMD,NULL,NULL,COMMAND_GROUP_SERVER,RESTORE_ASKING_History,RESTORE_ASKING_Hints,restoreCommand,-4,CMD_WRITE|CMD_DENYOOM|CMD_ASKING,ACL_CATEGORY_KEYSPACE|ACL_CATEGORY_DANGEROUS,{{CMD_KEY_WRITE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}}},
{"role","Return the role of the instance in the context of replication","O(1)","2.8.12",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,ROLE_History,ROLE_Hints,roleCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_SENTINEL,ACL_CATEGORY_ADMIN|ACL_CATEGORY_DANGEROUS},
{"save","Synchronously save the dataset to disk","O(N) where N is the total number of keys in all databases","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SAVE_History,SAVE_Hints,saveCommand,1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT,0},
-{"shutdown","Synchronously save the dataset to disk and then shut down the server",NULL,"1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SHUTDOWN_History,SHUTDOWN_Hints,shutdownCommand,-1,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,.args=SHUTDOWN_Args},
+{"shutdown","Synchronously save the dataset to disk and then shut down the server","O(N) when saving, where N is the total number of keys in all databases when saving data, otherwise O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SHUTDOWN_History,SHUTDOWN_Hints,shutdownCommand,-1,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,.args=SHUTDOWN_Args},
{"slaveof","Make the server a replica of another instance, or promote it as master. Deprecated starting with Redis 5. Use REPLICAOF instead.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SLAVEOF_History,SLAVEOF_Hints,replicaofCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT|CMD_STALE,0,.args=SLAVEOF_Args},
{"slowlog","A container for slow log commands","Depends on subcommand.","2.2.12",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SLOWLOG_History,SLOWLOG_Hints,NULL,-2,0,0,.subcommands=SLOWLOG_Subcommands},
{"swapdb","Swaps two Redis databases","O(N) where N is the count of clients watching or blocking on keys from both databases.","4.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SERVER,SWAPDB_History,SWAPDB_Hints,swapdbCommand,3,CMD_WRITE|CMD_FAST,ACL_CATEGORY_KEYSPACE|ACL_CATEGORY_DANGEROUS,.args=SWAPDB_Args},
diff --git a/src/commands/shutdown.json b/src/commands/shutdown.json
index 1a594d74d..deb2e48ba 100644
--- a/src/commands/shutdown.json
+++ b/src/commands/shutdown.json
@@ -1,10 +1,17 @@
{
"SHUTDOWN": {
"summary": "Synchronously save the dataset to disk and then shut down the server",
+ "complexity": "O(N) when saving, where N is the total number of keys in all databases when saving data, otherwise O(1)",
"group": "server",
"since": "1.0.0",
"arity": -1,
"function": "shutdownCommand",
+ "history": [
+ [
+ "7.0",
+ "Added the `NOW`, `FORCE` and `ABORT` modifiers. Introduced waiting for lagging replicas before exiting."
+ ]
+ ],
"command_flags": [
"ADMIN",
"NOSCRIPT",
@@ -29,6 +36,24 @@
"token": "SAVE"
}
]
+ },
+ {
+ "name": "now",
+ "type": "pure-token",
+ "token": "NOW",
+ "optional": true
+ },
+ {
+ "name": "force",
+ "type": "pure-token",
+ "token": "FORCE",
+ "optional": true
+ },
+ {
+ "name": "abort",
+ "type": "pure-token",
+ "token": "ABORT",
+ "optional": true
}
]
}
diff --git a/src/config.c b/src/config.c
index 407cf7249..03b6af29a 100644
--- a/src/config.c
+++ b/src/config.c
@@ -2717,6 +2717,7 @@ standardConfig configs[] = {
createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves),
createIntConfig("min-replicas-max-lag", "min-slaves-max-lag", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_slaves_max_lag, 10, INTEGER_CONFIG, NULL, updateGoodSlaves),
createIntConfig("watchdog-period", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.watchdog_period, 0, INTEGER_CONFIG, NULL, updateWatchdogPeriod),
+ createIntConfig("shutdown-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.shutdown_timeout, 10, INTEGER_CONFIG, NULL, NULL),
/* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
diff --git a/src/db.c b/src/db.c
index d19c4d92a..fb08152e6 100644
--- a/src/db.c
+++ b/src/db.c
@@ -1033,23 +1033,59 @@ void typeCommand(client *c) {
}
void shutdownCommand(client *c) {
- int flags = 0;
-
- if (c->argc > 2) {
- addReplyErrorObject(c,shared.syntaxerr);
- return;
- } else if (c->argc == 2) {
- if (!strcasecmp(c->argv[1]->ptr,"nosave")) {
+ int flags = SHUTDOWN_NOFLAGS;
+ int abort = 0;
+ for (int i = 1; i < c->argc; i++) {
+ if (!strcasecmp(c->argv[i]->ptr,"nosave")) {
flags |= SHUTDOWN_NOSAVE;
- } else if (!strcasecmp(c->argv[1]->ptr,"save")) {
+ } else if (!strcasecmp(c->argv[i]->ptr,"save")) {
flags |= SHUTDOWN_SAVE;
+ } else if (!strcasecmp(c->argv[i]->ptr, "now")) {
+ flags |= SHUTDOWN_NOW;
+ } else if (!strcasecmp(c->argv[i]->ptr, "force")) {
+ flags |= SHUTDOWN_FORCE;
+ } else if (!strcasecmp(c->argv[i]->ptr, "abort")) {
+ abort = 1;
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
}
+ if ((abort && flags != SHUTDOWN_NOFLAGS) ||
+ (flags & SHUTDOWN_NOSAVE && flags & SHUTDOWN_SAVE))
+ {
+ /* Illegal combo. */
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
+
+ if (abort) {
+ if (abortShutdown() == C_OK)
+ addReply(c, shared.ok);
+ else
+ addReplyError(c, "No shutdown in progress.");
+ return;
+ }
+
+ if (!(flags & SHUTDOWN_NOW) && c->flags & CLIENT_DENY_BLOCKING) {
+ addReplyError(c, "SHUTDOWN without NOW or ABORT isn't allowed for DENY BLOCKING client");
+ return;
+ }
+
+ if (!(flags & SHUTDOWN_NOSAVE) && scriptIsTimedout()) {
+ /* Script timed out. Shutdown allowed only with the NOSAVE flag. See
+ * also processCommand where these errors are returned. */
+ if (scriptIsEval())
+ addReplyErrorObject(c, shared.slowevalerr);
+ else
+ addReplyErrorObject(c, shared.slowscripterr);
+ return;
+ }
+
+ blockClient(c, BLOCKED_SHUTDOWN);
if (prepareForShutdown(flags) == C_OK) exit(0);
- addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
+ /* If we're here, then shutdown is ongoing (the client is still blocked) or
+ * failed (the client has received an error). */
}
void renameGenericCommand(client *c, int nx) {
diff --git a/src/networking.c b/src/networking.c
index d0c74faba..2fa05f0d6 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -2873,7 +2873,7 @@ NULL
addReplyNull(c);
} else if (!strcasecmp(c->argv[1]->ptr,"unpause") && c->argc == 2) {
/* CLIENT UNPAUSE */
- unpauseClients();
+ unpauseClients(PAUSE_BY_CLIENT_COMMAND);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"pause") && (c->argc == 3 ||
c->argc == 4))
@@ -2895,7 +2895,7 @@ NULL
if (getTimeoutFromObjectOrReply(c,c->argv[2],&end,
UNIT_MILLISECONDS) != C_OK) return;
- pauseClients(end, type);
+ pauseClients(PAUSE_BY_CLIENT_COMMAND, end, type);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) {
/* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first]
@@ -3539,6 +3539,48 @@ void flushSlavesOutputBuffers(void) {
}
}
+/* Compute current most restictive pause type and its end time, aggregated for
+ * all pause purposes. */
+static void updateClientPauseTypeAndEndTime(void) {
+ pause_type old_type = server.client_pause_type;
+ pause_type type = CLIENT_PAUSE_OFF;
+ mstime_t end = 0;
+ for (int i = 0; i < NUM_PAUSE_PURPOSES; i++) {
+ pause_event *p = server.client_pause_per_purpose[i];
+ if (p == NULL) {
+ /* Nothing to do. */
+ } else if (p->end < server.mstime) {
+ /* This one expired. */
+ zfree(p);
+ server.client_pause_per_purpose[i] = NULL;
+ } else if (p->type > type) {
+ /* This type is the most restrictive so far. */
+ type = p->type;
+ }
+ }
+
+ /* Find the furthest end time among the pause purposes of the most
+ * restrictive type */
+ for (int i = 0; i < NUM_PAUSE_PURPOSES; i++) {
+ pause_event *p = server.client_pause_per_purpose[i];
+ if (p != NULL && p->type == type && p->end > end) end = p->end;
+ }
+ server.client_pause_type = type;
+ server.client_pause_end_time = end;
+
+ /* If the pause type is less restrictive than before, we unblock all clients
+ * so they are reprocessed (may get re-paused). */
+ if (type < old_type) {
+ listNode *ln;
+ listIter li;
+ listRewind(server.paused_clients, &li);
+ while ((ln = listNext(&li)) != NULL) {
+ client *c = listNodeValue(ln);
+ unblockClient(c);
+ }
+ }
+}
+
/* Pause clients up to the specified unixtime (in ms) for a given type of
* commands.
*
@@ -3552,14 +3594,18 @@ void flushSlavesOutputBuffers(void) {
* The function always succeed, even if there is already a pause in progress.
* In such a case, the duration is set to the maximum and new end time and the
* type is set to the more restrictive type of pause. */
-void pauseClients(mstime_t end, pause_type type) {
- if (type > server.client_pause_type) {
- server.client_pause_type = type;
- }
-
- if (end > server.client_pause_end_time) {
- server.client_pause_end_time = end;
+void pauseClients(pause_purpose purpose, mstime_t end, pause_type type) {
+ /* Manage pause type and end time per pause purpose. */
+ if (server.client_pause_per_purpose[purpose] == NULL) {
+ server.client_pause_per_purpose[purpose] = zmalloc(sizeof(pause_event));
+ server.client_pause_per_purpose[purpose]->type = type;
+ server.client_pause_per_purpose[purpose]->end = end;
+ } else {
+ pause_event *p = server.client_pause_per_purpose[purpose];
+ p->type = max(p->type, type);
+ p->end = max(p->end, end);
}
+ updateClientPauseTypeAndEndTime();
/* We allow write commands that were queued
* up before and after to execute. We need
@@ -3571,20 +3617,11 @@ void pauseClients(mstime_t end, pause_type type) {
}
/* Unpause clients and queue them for reprocessing. */
-void unpauseClients(void) {
- listNode *ln;
- listIter li;
- client *c;
-
- server.client_pause_type = CLIENT_PAUSE_OFF;
- server.client_pause_end_time = 0;
-
- /* Unblock all of the clients so they are reprocessed. */
- listRewind(server.paused_clients,&li);
- while ((ln = listNext(&li)) != NULL) {
- c = listNodeValue(ln);
- unblockClient(c);
- }
+void unpauseClients(pause_purpose purpose) {
+ if (server.client_pause_per_purpose[purpose] == NULL) return;
+ zfree(server.client_pause_per_purpose[purpose]);
+ server.client_pause_per_purpose[purpose] = NULL;
+ updateClientPauseTypeAndEndTime();
}
/* Returns true if clients are paused and false otherwise. */
@@ -3599,7 +3636,7 @@ int checkClientPauseTimeoutAndReturnIfPaused(void) {
if (!areClientsPaused())
return 0;
if (server.client_pause_end_time < server.mstime) {
- unpauseClients();
+ updateClientPauseTypeAndEndTime();
}
return areClientsPaused();
}
diff --git a/src/replication.c b/src/replication.c
index e7a092645..80dc33ad4 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -3786,7 +3786,7 @@ void clearFailoverState() {
server.target_replica_host = NULL;
server.target_replica_port = 0;
server.failover_state = NO_FAILOVER;
- unpauseClients();
+ unpauseClients(PAUSE_DURING_FAILOVER);
}
/* Abort an ongoing failover if one is going on. */
@@ -3935,7 +3935,7 @@ void failoverCommand(client *c) {
server.force_failover = force_flag;
server.failover_state = FAILOVER_WAIT_FOR_SYNC;
/* Cluster failover will unpause eventually */
- pauseClients(LLONG_MAX,CLIENT_PAUSE_WRITE);
+ pauseClients(PAUSE_DURING_FAILOVER, LLONG_MAX, CLIENT_PAUSE_WRITE);
addReply(c,shared.ok);
}
diff --git a/src/server.c b/src/server.c
index a5a95d1d8..47bf1628e 100644
--- a/src/server.c
+++ b/src/server.c
@@ -83,6 +83,13 @@ double R_Zero, R_PosInf, R_NegInf, R_Nan;
/* Global vars */
struct redisServer server; /* Server global state */
+/*============================ Internal prototypes ========================== */
+
+static inline int isShutdownInitiated(void);
+int isReadyToShutdown(void);
+int finishShutdown(void);
+const char *replstateToString(int replstate);
+
/*============================ Utility functions ============================ */
/* We use a private localtime implementation which is fork-safe. The logging
@@ -1137,10 +1144,13 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* We received a SIGTERM, shutting down here in a safe way, as it is
* not ok doing so inside the signal handler. */
- if (server.shutdown_asap) {
+ if (server.shutdown_asap && !isShutdownInitiated()) {
if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
- serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
- server.shutdown_asap = 0;
+ } else if (isShutdownInitiated()) {
+ if (server.mstime >= server.shutdown_mstime || isReadyToShutdown()) {
+ if (finishShutdown() == C_OK) exit(0);
+ /* Shutdown failed. Continue running. An error has been logged. */
+ }
}
/* Show some info about non-empty databases */
@@ -1383,6 +1393,14 @@ void whileBlockedCron() {
}
}
+static void sendGetackToReplicas(void) {
+ robj *argv[3];
+ argv[0] = shared.replconf;
+ argv[1] = shared.getack;
+ argv[2] = shared.special_asterick; /* Not used argument. */
+ replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
+}
+
extern int ProcessingEventsWhileBlocked;
/* This function gets called every time Redis is entering the
@@ -1467,12 +1485,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
* increment the replication backlog, they'll be sent after the pause
* if we are still the master. */
if (server.get_ack_from_slaves && !checkClientPauseTimeoutAndReturnIfPaused()) {
- robj *argv[3];
-
- argv[0] = shared.replconf;
- argv[1] = shared.getack;
- argv[2] = shared.special_asterick; /* Not used argument. */
- replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
+ sendGetackToReplicas();
server.get_ack_from_slaves = 0;
}
@@ -1750,6 +1763,8 @@ void initServerConfig(void) {
memset(server.blocked_clients_by_type,0,
sizeof(server.blocked_clients_by_type));
server.shutdown_asap = 0;
+ server.shutdown_flags = 0;
+ server.shutdown_mstime = 0;
server.cluster_module_flags = CLUSTER_MODULE_FLAG_NONE;
server.migrate_cached_sockets = dictCreate(&migrateCacheDictType);
server.next_client_id = 1; /* Client IDs, start from 1 .*/
@@ -1852,9 +1867,9 @@ int restartServer(int flags, mstime_t delay) {
return C_ERR;
}
- /* Perform a proper shutdown. */
+ /* Perform a proper shutdown. We don't wait for lagging replicas though. */
if (flags & RESTART_SERVER_GRACEFULLY &&
- prepareForShutdown(SHUTDOWN_NOFLAGS) != C_OK)
+ prepareForShutdown(SHUTDOWN_NOW) != C_OK)
{
serverLog(LL_WARNING,"Can't restart: error preparing for shutdown");
return C_ERR;
@@ -2266,6 +2281,8 @@ void initServer(void) {
server.get_ack_from_slaves = 0;
server.client_pause_type = CLIENT_PAUSE_OFF;
server.client_pause_end_time = 0;
+ memset(server.client_pause_per_purpose, 0,
+ sizeof(server.client_pause_per_purpose));
server.paused_clients = listCreate();
server.events_processed_while_blocked = 0;
server.system_memory_size = zmalloc_get_memory_size();
@@ -3532,9 +3549,7 @@ int processCommand(client *c) {
c->cmd->proc != unwatchCommand &&
c->cmd->proc != quitCommand &&
c->cmd->proc != resetCommand &&
- !(c->cmd->proc == shutdownCommand &&
- c->argc == 2 &&
- tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
+ c->cmd->proc != shutdownCommand && /* more checks in shutdownCommand */
!(c->cmd->proc == scriptCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k') &&
@@ -3620,7 +3635,32 @@ void closeListeningSockets(int unlink_unix_socket) {
}
}
+/* Prepare for shutting down the server. Flags:
+ *
+ * - SHUTDOWN_SAVE: Save a database dump even if the server is configured not to
+ * save any dump.
+ *
+ * - SHUTDOWN_NOSAVE: Don't save any database dump even if the server is
+ * configured to save one.
+ *
+ * - SHUTDOWN_NOW: Don't wait for replicas to catch up before shutting down.
+ *
+ * - SHUTDOWN_FORCE: Ignore errors writing AOF and RDB files on disk, which
+ * would normally prevent a shutdown.
+ *
+ * Unless SHUTDOWN_NOW is set and if any replicas are lagging behind, C_ERR is
+ * returned and server.shutdown_mstime is set to a timestamp to allow a grace
+ * period for the replicas to catch up. This is checked and handled by
+ * serverCron() which completes the shutdown as soon as possible.
+ *
+ * If shutting down fails due to errors writing RDB or AOF files, C_ERR is
+ * returned and an error is logged. If the flag SHUTDOWN_FORCE is set, these
+ * errors are logged but ignored and C_OK is returned.
+ *
+ * On success, this function returns C_OK and then it's OK to call exit(0). */
int prepareForShutdown(int flags) {
+ if (isShutdownInitiated()) return C_ERR;
+
/* When SHUTDOWN is called while the server is loading a dataset in
* memory we need to make sure no attempt is performed to save
* the dataset on shutdown (otherwise it could overwrite the current DB
@@ -3630,13 +3670,108 @@ int prepareForShutdown(int flags) {
if (server.loading || server.sentinel_mode)
flags = (flags & ~SHUTDOWN_SAVE) | SHUTDOWN_NOSAVE;
- int save = flags & SHUTDOWN_SAVE;
- int nosave = flags & SHUTDOWN_NOSAVE;
+ server.shutdown_flags = flags;
serverLog(LL_WARNING,"User requested shutdown...");
if (server.supervised_mode == SUPERVISED_SYSTEMD)
redisCommunicateSystemd("STOPPING=1\n");
+ /* If we have any replicas, let them catch up the replication offset before
+ * we shut down, to avoid data loss. */
+ if (!(flags & SHUTDOWN_NOW) &&
+ server.shutdown_timeout != 0 &&
+ !isReadyToShutdown())
+ {
+ server.shutdown_mstime = server.mstime + server.shutdown_timeout * 1000;
+ if (!areClientsPaused()) sendGetackToReplicas();
+ pauseClients(PAUSE_DURING_SHUTDOWN, LLONG_MAX, CLIENT_PAUSE_WRITE);
+ serverLog(LL_NOTICE, "Waiting for replicas before shutting down.");
+ return C_ERR;
+ }
+
+ return finishShutdown();
+}
+
+static inline int isShutdownInitiated(void) {
+ return server.shutdown_mstime != 0;
+}
+
+/* Returns 0 if there are any replicas which are lagging in replication which we
+ * need to wait for before shutting down. Returns 1 if we're ready to shut
+ * down now. */
+int isReadyToShutdown(void) {
+ if (listLength(server.slaves) == 0) return 1; /* No replicas. */
+
+ listIter li;
+ listNode *ln;
+ listRewind(server.slaves, &li);
+ while ((ln = listNext(&li)) != NULL) {
+ client *replica = listNodeValue(ln);
+ if (replica->repl_ack_off != server.master_repl_offset) return 0;
+ }
+ return 1;
+}
+
+static void cancelShutdown(void) {
+ server.shutdown_asap = 0;
+ server.shutdown_flags = 0;
+ server.shutdown_mstime = 0;
+ replyToClientsBlockedOnShutdown();
+ unpauseClients(PAUSE_DURING_SHUTDOWN);
+}
+
+/* Returns C_OK if shutdown was aborted and C_ERR if shutdown wasn't ongoing. */
+int abortShutdown(void) {
+ if (isShutdownInitiated()) {
+ cancelShutdown();
+ } else if (server.shutdown_asap) {
+ /* Signal handler has requested shutdown, but it hasn't been initiated
+ * yet. Just clear the flag. */
+ server.shutdown_asap = 0;
+ } else {
+ /* Shutdown neither initiated nor requested. */
+ return C_ERR;
+ }
+ serverLog(LL_NOTICE, "Shutdown manually aborted.");
+ return C_OK;
+}
+
+/* The final step of the shutdown sequence. Returns C_OK if the shutdown
+ * sequence was successful and it's OK to call exit(). If C_ERR is returned,
+ * it's not safe to call exit(). */
+int finishShutdown(void) {
+
+ int save = server.shutdown_flags & SHUTDOWN_SAVE;
+ int nosave = server.shutdown_flags & SHUTDOWN_NOSAVE;
+ int force = server.shutdown_flags & SHUTDOWN_FORCE;
+
+ /* Log a warning for each replica that is lagging. */
+ listIter replicas_iter;
+ listNode *replicas_list_node;
+ int num_replicas = 0, num_lagging_replicas = 0;
+ listRewind(server.slaves, &replicas_iter);
+ while ((replicas_list_node = listNext(&replicas_iter)) != NULL) {
+ client *replica = listNodeValue(replicas_list_node);
+ num_replicas++;
+ if (replica->repl_ack_off != server.master_repl_offset) {
+ num_lagging_replicas++;
+ long lag = replica->replstate == SLAVE_STATE_ONLINE ?
+ time(NULL) - replica->repl_ack_time : 0;
+ serverLog(LL_WARNING,
+ "Lagging replica %s reported offset %lld behind master, lag=%ld, state=%s.",
+ replicationGetSlaveName(replica),
+ server.master_repl_offset - replica->repl_ack_off,
+ lag,
+ replstateToString(replica->replstate));
+ }
+ }
+ if (num_replicas > 0) {
+ serverLog(LL_NOTICE,
+ "%d of %d replicas are in sync when shutting down.",
+ num_replicas - num_lagging_replicas,
+ num_replicas);
+ }
+
/* Kill all the Lua debugger forked sessions. */
ldbKillForkedSessions();
@@ -3661,20 +3796,24 @@ int prepareForShutdown(int flags) {
TerminateModuleForkChild(server.child_pid,0);
}
- if (server.aof_state != AOF_OFF) {
- /* Kill the AOF saving child as the AOF we already have may be longer
- * but contains the full dataset anyway. */
- if (server.child_type == CHILD_TYPE_AOF) {
- /* If we have AOF enabled but haven't written the AOF yet, don't
- * shutdown or else the dataset will be lost. */
- if (server.aof_state == AOF_WAIT_REWRITE) {
+ /* Kill the AOF saving child as the AOF we already have may be longer
+ * but contains the full dataset anyway. */
+ if (server.child_type == CHILD_TYPE_AOF) {
+ /* If we have AOF enabled but haven't written the AOF yet, don't
+ * shutdown or else the dataset will be lost. */
+ if (server.aof_state == AOF_WAIT_REWRITE) {
+ if (force) {
+ serverLog(LL_WARNING, "Writing initial AOF. Exit anyway.");
+ } else {
serverLog(LL_WARNING, "Writing initial AOF, can't exit.");
- return C_ERR;
+ goto error;
}
- serverLog(LL_WARNING,
- "There is a child rewriting the AOF. Killing it!");
- killAppendOnlyChild();
}
+ serverLog(LL_WARNING,
+ "There is a child rewriting the AOF. Killing it!");
+ killAppendOnlyChild();
+ }
+ if (server.aof_state != AOF_OFF) {
/* Append only file: flush buffers and fsync() the AOF at exit */
serverLog(LL_NOTICE,"Calling fsync() on the AOF file.");
flushAppendOnlyFile(1);
@@ -3698,10 +3837,14 @@ int prepareForShutdown(int flags) {
* in the next cron() Redis will be notified that the background
* saving aborted, handling special stuff like slaves pending for
* synchronization... */
- serverLog(LL_WARNING,"Error trying to save the DB, can't exit.");
- if (server.supervised_mode == SUPERVISED_SYSTEMD)
- redisCommunicateSystemd("STATUS=Error trying to save the DB, can't exit.\n");
- return C_ERR;
+ if (force) {
+ serverLog(LL_WARNING,"Error trying to save the DB. Exit anyway.");
+ } else {
+ serverLog(LL_WARNING,"Error trying to save the DB, can't exit.");
+ if (server.supervised_mode == SUPERVISED_SYSTEMD)
+ redisCommunicateSystemd("STATUS=Error trying to save the DB, can't exit.\n");
+ goto error;
+ }
}
}
@@ -3723,6 +3866,11 @@ int prepareForShutdown(int flags) {
serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",
server.sentinel_mode ? "Sentinel" : "Redis");
return C_OK;
+
+error:
+ serverLog(LL_WARNING, "Errors trying to shut down the server. Check the logs for more information.");
+ cancelShutdown();
+ return C_ERR;
}
/*================================== Commands =============================== */
@@ -4404,6 +4552,20 @@ sds getFullCommandName(struct redisCommand *cmd) {
}
}
+const char *replstateToString(int replstate) {
+ switch (replstate) {
+ case SLAVE_STATE_WAIT_BGSAVE_START:
+ case SLAVE_STATE_WAIT_BGSAVE_END:
+ return "wait_bgsave";
+ case SLAVE_STATE_SEND_BULK:
+ return "send_bulk";
+ case SLAVE_STATE_ONLINE:
+ return "online";
+ default:
+ return "";
+ }
+}
+
/* Characters we sanitize on INFO output to maintain expected format. */
static char unsafe_info_chars[] = "#:\n\r";
static char unsafe_info_chars_substs[] = "____"; /* Must be same length as above */
@@ -4550,6 +4712,13 @@ sds genRedisInfoString(const char *section) {
server.executable ? server.executable : "",
server.configfile ? server.configfile : "",
server.io_threads_active);
+
+ /* Conditional properties */
+ if (isShutdownInitiated()) {
+ info = sdscatfmt(info,
+ "shutdown_in_milliseconds:%I\r\n",
+ (int64_t)(server.shutdown_mstime - server.mstime));
+ }
}
/* Clients */
@@ -5047,7 +5216,6 @@ sds genRedisInfoString(const char *section) {
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = listNodeValue(ln);
- char *state = NULL;
char ip[NET_IP_STR_LEN], *slaveip = slave->slave_addr;
int port;
long lag = 0;
@@ -5057,19 +5225,8 @@ sds genRedisInfoString(const char *section) {
continue;
slaveip = ip;
}
- switch(slave->replstate) {
- case SLAVE_STATE_WAIT_BGSAVE_START:
- case SLAVE_STATE_WAIT_BGSAVE_END:
- state = "wait_bgsave";
- break;
- case SLAVE_STATE_SEND_BULK:
- state = "send_bulk";
- break;
- case SLAVE_STATE_ONLINE:
- state = "online";
- break;
- }
- if (state == NULL) continue;
+ const char *state = replstateToString(slave->replstate);
+ if (state[0] == '\0') continue;
if (slave->replstate == SLAVE_STATE_ONLINE)
lag = time(NULL) - slave->repl_ack_time;
@@ -5589,7 +5746,7 @@ static void sigShutdownHandler(int sig) {
/* SIGINT is often delivered via Ctrl+C in an interactive session.
* If we receive the signal the second time, we interpret this as
* the user really wanting to quit ASAP without waiting to persist
- * on disk. */
+ * on disk and without waiting for lagging replicas. */
if (server.shutdown_asap && sig == SIGINT) {
serverLogFromHandler(LL_WARNING, "You insist... exiting now.");
rdbRemoveTempFile(getpid(), 1);
diff --git a/src/server.h b/src/server.h
index e3cf50b65..c281ac30a 100644
--- a/src/server.h
+++ b/src/server.h
@@ -321,7 +321,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define BLOCKED_STREAM 4 /* XREAD. */
#define BLOCKED_ZSET 5 /* BZPOP et al. */
#define BLOCKED_PAUSE 6 /* Blocked by CLIENT PAUSE */
-#define BLOCKED_NUM 7 /* Number of blocked states. */
+#define BLOCKED_SHUTDOWN 7 /* SHUTDOWN. */
+#define BLOCKED_NUM 8 /* Number of blocked states. */
/* Client request types */
#define PROTO_REQ_INLINE 1
@@ -484,6 +485,8 @@ typedef enum {
#define SHUTDOWN_SAVE 1 /* Force SAVE on SHUTDOWN even if no save
points are configured. */
#define SHUTDOWN_NOSAVE 2 /* Don't SAVE on SHUTDOWN. */
+#define SHUTDOWN_NOW 4 /* Don't wait for replicas to catch up. */
+#define SHUTDOWN_FORCE 8 /* Don't let errors prevent shutdown. */
/* Command call flags, see call() function */
#define CMD_CALL_NONE 0
@@ -508,6 +511,19 @@ typedef enum {
CLIENT_PAUSE_ALL /* Pause all commands */
} pause_type;
+/* Client pause purposes. Each purpose has its own end time and pause type. */
+typedef enum {
+ PAUSE_BY_CLIENT_COMMAND = 0,
+ PAUSE_DURING_SHUTDOWN,
+ PAUSE_DURING_FAILOVER,
+ NUM_PAUSE_PURPOSES /* This value is the number of purposes above. */
+} pause_purpose;
+
+typedef struct {
+ pause_type type;
+ mstime_t end;
+} pause_event;
+
/* RDB active child save type. */
#define RDB_CHILD_TYPE_NONE 0
#define RDB_CHILD_TYPE_DISK 1 /* RDB is written to disk. */
@@ -1353,7 +1369,9 @@ struct redisServer {
aeEventLoop *el;
rax *errors; /* Errors table */
redisAtomic unsigned int lruclock; /* Clock for LRU eviction */
- volatile sig_atomic_t shutdown_asap; /* SHUTDOWN needed ASAP */
+ volatile sig_atomic_t shutdown_asap; /* Shutdown ordered by signal handler. */
+ mstime_t shutdown_mstime; /* Timestamp to limit graceful shutdown. */
+ int shutdown_flags; /* Flags passed to prepareForShutdown(). */
int activerehashing; /* Incremental rehash in serverCron() */
int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */
char *pidfile; /* PID file path */
@@ -1413,6 +1431,7 @@ struct redisServer {
pause_type client_pause_type; /* True if clients are currently paused */
list *paused_clients; /* List of pause clients */
mstime_t client_pause_end_time; /* Time when we undo clients_paused */
+ pause_event *client_pause_per_purpose[NUM_PAUSE_PURPOSES];
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
redisAtomic uint64_t next_client_id; /* Next client unique ID. Incremental. */
@@ -1613,6 +1632,9 @@ struct redisServer {
int memcheck_enabled; /* Enable memory check on crash. */
int use_exit_on_panic; /* Use exit() on panic and assert rather than
* abort(). useful for Valgrind. */
+ /* Shutdown */
+ int shutdown_timeout; /* Graceful shutdown time limit in seconds. */
+
/* Replication (master) */
char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */
char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/
@@ -2344,8 +2366,8 @@ void flushSlavesOutputBuffers(void);
void disconnectSlaves(void);
void evictClients(void);
int listenToPort(int port, socketFds *fds);
-void pauseClients(mstime_t duration, pause_type type);
-void unpauseClients(void);
+void pauseClients(pause_purpose purpose, mstime_t end, pause_type type);
+void unpauseClients(pause_purpose purpose);
int areClientsPaused(void);
int checkClientPauseTimeoutAndReturnIfPaused(void);
void processEventsWhileBlocked(void);
@@ -2708,6 +2730,8 @@ void preventCommandAOF(client *c);
void preventCommandReplication(client *c);
void slowlogPushCurrentCommand(client *c, struct redisCommand *cmd, ustime_t duration);
int prepareForShutdown(int flags);
+void replyToClientsBlockedOnShutdown(void);
+int abortShutdown(void);
void afterCommand(client *c);
int inNestedCall(void);
#ifdef __GNUC__
diff --git a/tests/integration/psync2-master-restart.tcl b/tests/integration/psync2-master-restart.tcl
index 672143102..c0c8f471f 100644
--- a/tests/integration/psync2-master-restart.tcl
+++ b/tests/integration/psync2-master-restart.tcl
@@ -42,7 +42,9 @@ start_server {} {
$replica config resetstat
catch {
- restart_server 0 true false
+ # SHUTDOWN NOW ensures master doesn't send GETACK to replicas before
+ # shutting down which would affect the replication offset.
+ restart_server 0 true false true now
set master [srv 0 client]
}
wait_for_condition 50 1000 {
@@ -77,9 +79,14 @@ start_server {} {
after 20
+ # Wait until master has received ACK from replica. If the master thinks
+ # that any replica is lagging when it shuts down, master would send
+ # GETACK to the replicas, affecting the replication offset.
+ set offset [status $master master_repl_offset]
wait_for_condition 500 100 {
- [status $master master_repl_offset] == [status $replica master_repl_offset] &&
- [status $master master_repl_offset] == [status $sub_replica master_repl_offset]
+ [string match "*slave0:*,offset=$offset,*" [$master info replication]] &&
+ $offset == [status $replica master_repl_offset] &&
+ $offset == [status $sub_replica master_repl_offset]
} else {
show_cluster_status
fail "Replicas and master offsets were unable to match *exactly*."
@@ -89,6 +96,11 @@ start_server {} {
$replica config resetstat
catch {
+ # Unlike the test above, here we use SIGTERM, which behaves
+ # differently compared to SHUTDOWN NOW if there are lagging
+ # replicas. This is just to increase coverage and let each test use
+ # a different shutdown approach. In this case there are no lagging
+ # replicas though.
restart_server 0 true false
set master [srv 0 client]
}
@@ -136,6 +148,9 @@ start_server {} {
$replica config resetstat
catch {
+ # Unlike the test above, here we use SIGTERM. This is just to
+ # increase coverage and let each test use a different shutdown
+ # approach.
restart_server 0 true false
set master [srv 0 client]
}
diff --git a/tests/integration/shutdown.tcl b/tests/integration/shutdown.tcl
new file mode 100644
index 000000000..60afc5c7f
--- /dev/null
+++ b/tests/integration/shutdown.tcl
@@ -0,0 +1,238 @@
+# This test suite tests shutdown when there are lagging replicas connected.
+
+# Fill up the OS socket send buffer for the replica connection 1M at a time.
+# When the replication buffer memory increases beyond 2M (often after writing 4M
+# or so), we assume it's because the OS socket send buffer can't swallow
+# anymore.
+proc fill_up_os_socket_send_buffer_for_repl {idx} {
+ set i 0
+ while {1} {
+ incr i
+ populate 1024 junk$i: 1024 $idx
+ after 10
+ set buf_size [s $idx mem_total_replication_buffers]
+ if {$buf_size > 2*1024*1024} {
+ break
+ }
+ }
+}
+
+foreach how {sigterm shutdown} {
+ test "Shutting down master waits for replica to catch up ($how)" {
+ start_server {} {
+ start_server {} {
+ set master [srv -1 client]
+ set master_host [srv -1 host]
+ set master_port [srv -1 port]
+ set master_pid [srv -1 pid]
+ set replica [srv 0 client]
+ set replica_pid [srv 0 pid]
+
+ # Config master.
+ $master config set shutdown-timeout 300; # 5min for slow CI
+ $master config set repl-backlog-size 1; # small as possible
+ $master config set hz 100; # cron runs every 10ms
+
+ # Config replica.
+ $replica replicaof $master_host $master_port
+ wait_for_sync $replica
+
+ # Preparation: Set k to 1 on both master and replica.
+ $master set k 1
+ wait_for_ofs_sync $master $replica
+
+ # Pause the replica.
+ exec kill -SIGSTOP $replica_pid
+ after 10
+
+ # Fill up the OS socket send buffer for the replica connection
+ # to prevent the following INCR from reaching the replica via
+ # the OS.
+ fill_up_os_socket_send_buffer_for_repl -1
+
+ # Incr k and immediately shutdown master.
+ $master incr k
+ switch $how {
+ sigterm {
+ exec kill -SIGTERM $master_pid
+ }
+ shutdown {
+ set rd [redis_deferring_client -1]
+ $rd shutdown
+ }
+ }
+ wait_for_condition 50 100 {
+ [s -1 shutdown_in_milliseconds] > 0
+ } else {
+ fail "Master not indicating ongoing shutdown."
+ }
+
+ # Wake up replica and check if master has waited for it.
+ after 20; # 2 cron intervals
+ exec kill -SIGCONT $replica_pid
+ wait_for_condition 300 1000 {
+ [$replica get k] eq 2
+ } else {
+ fail "Master exited before replica could catch up."
+ }
+
+ # Check shutdown log messages on master
+ wait_for_log_messages -1 {"*ready to exit, bye bye*"} 0 100 500
+ assert_equal 0 [count_log_message -1 "*Lagging replica*"]
+ verify_log_message -1 "*1 of 1 replicas are in sync*" 0
+ }
+ }
+ } {} {repl external:skip}
+}
+
+test {Shutting down master waits for replica timeout} {
+ start_server {} {
+ start_server {} {
+ set master [srv -1 client]
+ set master_host [srv -1 host]
+ set master_port [srv -1 port]
+ set master_pid [srv -1 pid]
+ set replica [srv 0 client]
+ set replica_pid [srv 0 pid]
+
+ # Config master.
+ $master config set shutdown-timeout 1; # second
+
+ # Config replica.
+ $replica replicaof $master_host $master_port
+ wait_for_sync $replica
+
+ # Preparation: Set k to 1 on both master and replica.
+ $master set k 1
+ wait_for_ofs_sync $master $replica
+
+ # Pause the replica.
+ exec kill -SIGSTOP $replica_pid
+ after 10
+
+ # Fill up the OS socket send buffer for the replica connection to
+ # prevent the following INCR k from reaching the replica via the OS.
+ fill_up_os_socket_send_buffer_for_repl -1
+
+ # Incr k and immediately shutdown master.
+ $master incr k
+ exec kill -SIGTERM $master_pid
+ wait_for_condition 50 100 {
+ [s -1 shutdown_in_milliseconds] > 0
+ } else {
+ fail "Master not indicating ongoing shutdown."
+ }
+
+ # Let master finish shutting down and check log.
+ wait_for_log_messages -1 {"*ready to exit, bye bye*"} 0 100 100
+ verify_log_message -1 "*Lagging replica*" 0
+ verify_log_message -1 "*0 of 1 replicas are in sync*" 0
+
+ # Wake up replica.
+ exec kill -SIGCONT $replica_pid
+ assert_equal 1 [$replica get k]
+ }
+ }
+} {} {repl external:skip}
+
+test "Shutting down master waits for replica then fails" {
+ start_server {} {
+ start_server {} {
+ set master [srv -1 client]
+ set master_host [srv -1 host]
+ set master_port [srv -1 port]
+ set master_pid [srv -1 pid]
+ set replica [srv 0 client]
+ set replica_pid [srv 0 pid]
+
+ # Config master and replica.
+ $replica replicaof $master_host $master_port
+ wait_for_sync $replica
+
+ # Pause the replica and write a key on master.
+ exec kill -SIGSTOP $replica_pid
+ after 10
+ $master incr k
+
+ # Two clients call blocking SHUTDOWN in parallel.
+ set rd1 [redis_deferring_client -1]
+ set rd2 [redis_deferring_client -1]
+ $rd1 shutdown
+ $rd2 shutdown
+ set info_clients [$master info clients]
+ assert_match "*connected_clients:3*" $info_clients
+ assert_match "*blocked_clients:2*" $info_clients
+
+ # Start a very slow initial AOFRW, which will prevent shutdown.
+ $master config set rdb-key-save-delay 30000000; # 30 seconds
+ $master config set appendonly yes
+
+ # Wake up replica, causing master to continue shutting down.
+ exec kill -SIGCONT $replica_pid
+
+ # SHUTDOWN returns an error to both clients blocking on SHUTDOWN.
+ catch { $rd1 read } e1
+ catch { $rd2 read } e2
+ assert_match "*Errors trying to SHUTDOWN. Check logs*" $e1
+ assert_match "*Errors trying to SHUTDOWN. Check logs*" $e2
+ $rd1 close
+ $rd2 close
+
+ # Check shutdown log messages on master.
+ verify_log_message -1 "*1 of 1 replicas are in sync*" 0
+ verify_log_message -1 "*Writing initial AOF, can't exit*" 0
+ verify_log_message -1 "*Errors trying to shut down*" 0
+
+ # Let master to exit fast, without waiting for the very slow AOFRW.
+ catch {$master shutdown nosave force}
+ }
+ }
+} {} {repl external:skip}
+
+test "Shutting down master waits for replica then aborted" {
+ start_server {} {
+ start_server {} {
+ set master [srv -1 client]
+ set master_host [srv -1 host]
+ set master_port [srv -1 port]
+ set master_pid [srv -1 pid]
+ set replica [srv 0 client]
+ set replica_pid [srv 0 pid]
+
+ # Config master and replica.
+ $replica replicaof $master_host $master_port
+ wait_for_sync $replica
+
+ # Pause the replica and write a key on master.
+ exec kill -SIGSTOP $replica_pid
+ after 10
+ $master incr k
+
+ # Two clients call blocking SHUTDOWN in parallel.
+ set rd1 [redis_deferring_client -1]
+ set rd2 [redis_deferring_client -1]
+ $rd1 shutdown
+ $rd2 shutdown
+ set info_clients [$master info clients]
+ assert_match "*connected_clients:3*" $info_clients
+ assert_match "*blocked_clients:2*" $info_clients
+
+ # Abort the shutdown
+ $master shutdown abort
+
+ # Wake up replica, causing master to continue shutting down.
+ exec kill -SIGCONT $replica_pid
+
+ # SHUTDOWN returns an error to both clients blocking on SHUTDOWN.
+ catch { $rd1 read } e1
+ catch { $rd2 read } e2
+ assert_match "*Errors trying to SHUTDOWN. Check logs*" $e1
+ assert_match "*Errors trying to SHUTDOWN. Check logs*" $e2
+ $rd1 close
+ $rd2 close
+
+ # Check shutdown log messages on master.
+ verify_log_message -1 "*Shutdown manually aborted*" 0
+ }
+ }
+} {} {repl external:skip}
diff --git a/tests/support/server.tcl b/tests/support/server.tcl
index ee39c8df9..51d3629a2 100644
--- a/tests/support/server.tcl
+++ b/tests/support/server.tcl
@@ -674,8 +674,12 @@ proc start_server {options {code undefined}} {
}
}
-proc restart_server {level wait_ready rotate_logs {reconnect 1}} {
+proc restart_server {level wait_ready rotate_logs {reconnect 1} {shutdown sigterm}} {
set srv [lindex $::servers end+$level]
+ if {$shutdown ne {sigterm}} {
+ catch {[dict get $srv "client"] shutdown $shutdown}
+ }
+ # Kill server doesn't mind if the server is already dead
kill_server $srv
# Remove the default client from the server
dict unset srv "client"
diff --git a/tests/support/util.tcl b/tests/support/util.tcl
index c4f90b29e..aa14fd3a0 100644
--- a/tests/support/util.tcl
+++ b/tests/support/util.tcl
@@ -591,8 +591,11 @@ proc stop_bg_complex_data {handle} {
catch {exec /bin/kill -9 $handle}
}
-proc populate {num {prefix key:} {size 3}} {
- set rd [redis_deferring_client]
+# Write num keys with the given key prefix and value size (in bytes). If idx is
+# given, it's the index (AKA level) used with the srv procedure and it specifies
+# to which Redis instance to write the keys.
+proc populate {num {prefix key:} {size 3} {idx 0}} {
+ set rd [redis_deferring_client $idx]
for {set j 0} {$j < $num} {incr j} {
$rd set $prefix$j [string repeat A $size]
}
diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl
index 570d9e85f..722b04ed0 100644
--- a/tests/test_helper.tcl
+++ b/tests/test_helper.tcl
@@ -44,6 +44,7 @@ set ::all_tests {
integration/replication-4
integration/replication-psync
integration/replication-buffer
+ integration/shutdown
integration/aof
integration/rdb
integration/corrupt-dump
diff --git a/tests/unit/shutdown.tcl b/tests/unit/shutdown.tcl
index 359f5bb63..5c618d285 100644
--- a/tests/unit/shutdown.tcl
+++ b/tests/unit/shutdown.tcl
@@ -26,6 +26,17 @@ start_server {tags {"shutdown external:skip"}} {
}
start_server {tags {"shutdown external:skip"}} {
+ test {SHUTDOWN ABORT can cancel SIGTERM} {
+ r debug pause-cron 1
+ set pid [s process_id]
+ exec kill -SIGTERM $pid
+ after 10; # Give signal handler some time to run
+ r shutdown abort
+ verify_log_message 0 "*Shutdown manually aborted*" 0
+ r debug pause-cron 0
+ r ping
+ } {PONG}
+
test {Temp rdb will be deleted in signal handle} {
for {set i 0} {$i < 20} {incr i} {
r set $i $i