summaryrefslogtreecommitdiff
path: root/src/mongo/s/server.cpp
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2019-07-09 16:14:31 -0400
committerRandolph Tan <randolph@10gen.com>2019-07-18 14:08:29 -0400
commit36dc61299993ce6473a4660150bfb25a59afce77 (patch)
tree5742a6c76e03880eb20abf7a8fd50c27b868bd63 /src/mongo/s/server.cpp
parenta4f07aed5277cb31d2422b3dc658b5758d1773ed (diff)
downloadmongo-36dc61299993ce6473a4660150bfb25a59afce77.tar.gz
SERVER-39692 Make mongos shutdown drain all in-progress transactions
Diffstat (limited to 'src/mongo/s/server.cpp')
-rw-r--r--src/mongo/s/server.cpp84
1 files changed, 76 insertions, 8 deletions
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index 71d54f49ccd..d75120849af 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -90,6 +90,7 @@
#include "mongo/s/sharding_egress_metadata_hook_for_mongos.h"
#include "mongo/s/sharding_initialization.h"
#include "mongo/s/sharding_uptime_reporter.h"
+#include "mongo/s/transaction_router.h"
#include "mongo/s/version_mongos.h"
#include "mongo/stdx/thread.h"
#include "mongo/transport/transport_layer_manager.h"
@@ -177,6 +178,64 @@ Status waitForSigningKeys(OperationContext* opCtx) {
}
}
+
+/**
+ * Abort all active transactions in the catalog that has not yet been committed.
+ *
+ * Outline:
+ * 1. Mark all sessions as killed and collect killTokens from each session.
+ * 2. Create a new Client in order not to pollute the current OperationContext.
+ * 3. Create new OperationContexts for each session to be killed and perform the necessary setup
+ * to be able to abort transactions properly: like setting TxnNumber and attaching the session
+ * to the OperationContext.
+ * 4. Send abortTransaction.
+ */
+void implicitlyAbortAllTransactions(OperationContext* opCtx) {
+ struct AbortTransactionDetails {
+ public:
+ AbortTransactionDetails(LogicalSessionId _lsid, SessionCatalog::KillToken _killToken)
+ : lsid(std::move(_lsid)), killToken(std::move(_killToken)) {}
+
+ LogicalSessionId lsid;
+ SessionCatalog::KillToken killToken;
+ };
+
+ const auto catalog = SessionCatalog::get(opCtx);
+
+ SessionKiller::Matcher matcherAllSessions(
+ KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)});
+
+ const auto abortDeadline =
+ opCtx->getServiceContext()->getFastClockSource()->now() + Seconds(15);
+
+ std::vector<AbortTransactionDetails> toKill;
+ catalog->scanSessions(matcherAllSessions, [&](const ObservableSession& session) {
+ toKill.emplace_back(session.getSessionId(),
+ session.kill(ErrorCodes::InterruptedAtShutdown));
+ });
+
+ auto newClient = opCtx->getServiceContext()->makeClient("ImplicitlyAbortTxnAtShutdown");
+ AlternativeClientRegion acr(newClient);
+
+ Status shutDownStatus(ErrorCodes::InterruptedAtShutdown,
+ "aborting transactions due to shutdown");
+
+ for (auto& killDetails : toKill) {
+ auto uniqueNewOpCtx = cc().makeOperationContext();
+ auto newOpCtx = uniqueNewOpCtx.get();
+
+ newOpCtx->setDeadlineByDate(abortDeadline, ErrorCodes::ExceededTimeLimit);
+
+ OperationContextSession sessionCtx(newOpCtx, std::move(killDetails.killToken));
+
+ auto session = OperationContextSession::get(newOpCtx);
+ newOpCtx->setLogicalSessionId(session->getSessionId());
+
+ auto txnRouter = TransactionRouter::get(newOpCtx);
+ txnRouter.implicitlyAbortTransaction(newOpCtx, shutDownStatus);
+ }
+}
+
/**
* NOTE: This function may be called at any time after registerShutdownTask is called below. It must
* not depend on the prior execution of mongo initializers or the existence of threads.
@@ -189,9 +248,11 @@ void cleanupTask(ServiceContext* serviceContext) {
Client::initThread(getThreadName());
Client& client = cc();
- // Join the logical session cache before the transport layer
- if (auto lsc = LogicalSessionCache::get(serviceContext)) {
- lsc->joinOnShutDown();
+ ServiceContext::UniqueOperationContext uniqueTxn;
+ OperationContext* opCtx = client.getOperationContext();
+ if (!opCtx) {
+ uniqueTxn = client.makeOperationContext();
+ opCtx = uniqueTxn.get();
}
// Shutdown the TransportLayer so that new connections aren't accepted
@@ -201,13 +262,20 @@ void cleanupTask(ServiceContext* serviceContext) {
tl->shutdown();
}
- ServiceContext::UniqueOperationContext uniqueTxn;
- OperationContext* opCtx = client.getOperationContext();
- if (!opCtx) {
- uniqueTxn = client.makeOperationContext();
- opCtx = uniqueTxn.get();
+ try {
+ // Abort transactions while we can still send remote commands.
+ implicitlyAbortAllTransactions(opCtx);
+ } catch (const DBException& excep) {
+ warning() << "encountered " << excep
+ << " while trying to abort all active transactions";
}
+ if (auto lsc = LogicalSessionCache::get(serviceContext)) {
+ lsc->joinOnShutDown();
+ }
+
+ ReplicaSetMonitor::shutdown();
+
opCtx->setIsExecutingShutdown();
if (serviceContext) {