diff options
author | Randolph Tan <randolph@10gen.com> | 2019-07-09 16:14:31 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2019-07-18 14:10:50 -0400 |
commit | 52fba7897df46f8a52e590b6cc3a05a24aa93bed (patch) | |
tree | 11e158c060f737ec9dbeffefde68845e951931a5 | |
parent | 15cb55efacbb7be6d3bd65ea265b1c45da5decff (diff) | |
download | mongo-52fba7897df46f8a52e590b6cc3a05a24aa93bed.tar.gz |
SERVER-39692 Make mongos shutdown drain all in-progress transactions
(cherry picked from commit 36dc61299993ce6473a4660150bfb25a59afce77)
-rw-r--r-- | src/mongo/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor.cpp | 4 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_manager.cpp | 5 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_manager.h | 4 | ||||
-rw-r--r-- | src/mongo/db/session_catalog.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/session_catalog.h | 11 | ||||
-rw-r--r-- | src/mongo/s/server.cpp | 84 | ||||
-rw-r--r-- | src/mongo/s/transaction_router.cpp | 6 |
8 files changed, 121 insertions, 11 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 4cb70d3a055..931c0bb4c67 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -518,6 +518,7 @@ mongos = env.Program( 'db/server_options', 'db/server_options_base', 'db/service_liaison_mongos', + 'db/session_catalog', 'db/sessions_collection_sharded', 'db/startup_warnings_common', 'db/stats/counters', diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp index 248446f5cb7..033d2608210 100644 --- a/src/mongo/client/replica_set_monitor.cpp +++ b/src/mongo/client/replica_set_monitor.cpp @@ -284,7 +284,7 @@ Future<std::vector<HostAndPort>> ReplicaSetMonitor::_getHostsOrRefresh( const ReadPreferenceSetting& criteria, Milliseconds maxWait) { // If we're in shutdown, don't bother - if (globalInShutdownDeprecated()) { + if (globalRSMonitorManager.isShutdown()) { return Status(ErrorCodes::ShutdownInProgress, "Server is shutting down"_sd); } @@ -1268,7 +1268,7 @@ void SetState::notify(bool finishedScan) { const auto cachedNow = now(); for (auto it = waiters.begin(); it != waiters.end();) { - if (globalInShutdownDeprecated()) { + if (globalRSMonitorManager.isShutdown()) { it->promise.setError( {ErrorCodes::ShutdownInProgress, str::stream() << "Server is shutting down"}); waiters.erase(it++); diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp index dff3d8a16c4..452609a39bd 100644 --- a/src/mongo/client/replica_set_monitor_manager.cpp +++ b/src/mongo/client/replica_set_monitor_manager.cpp @@ -231,4 +231,9 @@ ReplicaSetChangeNotifier& ReplicaSetMonitorManager::getNotifier() { return _notifier; } +bool ReplicaSetMonitorManager::isShutdown() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _isShutdown; +} + } // namespace mongo diff --git a/src/mongo/client/replica_set_monitor_manager.h b/src/mongo/client/replica_set_monitor_manager.h index b0a08f8f4e5..4e037a70f91 100644 --- a/src/mongo/client/replica_set_monitor_manager.h +++ b/src/mongo/client/replica_set_monitor_manager.h @@ -98,11 +98,13 @@ public: ReplicaSetChangeNotifier& getNotifier(); + bool isShutdown() const; + private: using ReplicaSetMonitorsMap = StringMap<std::weak_ptr<ReplicaSetMonitor>>; // Protects access to the replica set monitors - stdx::mutex _mutex; + mutable stdx::mutex _mutex; // Executor for monitoring replica sets. std::unique_ptr<executor::TaskExecutor> _taskExecutor; diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp index db574180f6f..2710ec3e212 100644 --- a/src/mongo/db/session_catalog.cpp +++ b/src/mongo/db/session_catalog.cpp @@ -250,6 +250,23 @@ OperationContextSession::OperationContextSession(OperationContext* opCtx) : _opC checkOut(opCtx); } +OperationContextSession::OperationContextSession(OperationContext* opCtx, + SessionCatalog::KillToken killToken) + : _opCtx(opCtx) { + auto& checkedOutSession = operationSessionDecoration(opCtx); + + invariant(!checkedOutSession); + invariant(!opCtx->getLogicalSessionId()); // lsid is specified by killToken argument. + + const auto catalog = SessionCatalog::get(opCtx); + auto scopedSessionForKill = catalog->checkOutSessionForKill(opCtx, std::move(killToken)); + + // We acquire a Client lock here to guard the construction of this session so that references to + // this session are safe to use while the lock is held + stdx::lock_guard lk(*opCtx->getClient()); + checkedOutSession.emplace(std::move(scopedSessionForKill._scos)); +} + OperationContextSession::~OperationContextSession() { // Only release the checked out session at the end of the top-level request from the client, not // at the end of a nested DBDirectClient call diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h index d0f97902461..029a3881c1f 100644 --- a/src/mongo/db/session_catalog.h +++ b/src/mongo/db/session_catalog.h @@ -205,6 +205,8 @@ private: boost::optional<SessionCatalog::KillToken> _killToken; }; +class OperationContextSession; + /** * RAII type returned by SessionCatalog::checkOutSessionForKill. * @@ -229,6 +231,8 @@ public: } private: + friend OperationContextSession; + ScopedCheckedOutSession _scos; }; using SessionToKill = SessionCatalog::SessionToKill; @@ -339,6 +343,13 @@ public: * for the desired session to be checked in by another user. */ OperationContextSession(OperationContext* opCtx); + + /** + * Same as constructor above, but takes the session id from the killToken and uses + * checkoutSessionForKill instead, placing the checked-out session on the operation context. + * Must not be called if the operation context contains a session. + */ + OperationContextSession(OperationContext* opCtx, SessionCatalog::KillToken killToken); ~OperationContextSession(); /** diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 53293c690a5..df41a09ec8d 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -89,6 +89,7 @@ #include "mongo/s/sharding_egress_metadata_hook_for_mongos.h" #include "mongo/s/sharding_initialization.h" #include "mongo/s/sharding_uptime_reporter.h" +#include "mongo/s/transaction_router.h" #include "mongo/s/version_mongos.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" @@ -177,6 +178,64 @@ Status waitForSigningKeys(OperationContext* opCtx) { } } + +/** + * Abort all active transactions in the catalog that has not yet been committed. + * + * Outline: + * 1. Mark all sessions as killed and collect killTokens from each session. + * 2. Create a new Client in order not to pollute the current OperationContext. + * 3. Create new OperationContexts for each session to be killed and perform the necessary setup + * to be able to abort transactions properly: like setting TxnNumber and attaching the session + * to the OperationContext. + * 4. Send abortTransaction. + */ +void implicitlyAbortAllTransactions(OperationContext* opCtx) { + struct AbortTransactionDetails { + public: + AbortTransactionDetails(LogicalSessionId _lsid, SessionCatalog::KillToken _killToken) + : lsid(std::move(_lsid)), killToken(std::move(_killToken)) {} + + LogicalSessionId lsid; + SessionCatalog::KillToken killToken; + }; + + const auto catalog = SessionCatalog::get(opCtx); + + SessionKiller::Matcher matcherAllSessions( + KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)}); + + const auto abortDeadline = + opCtx->getServiceContext()->getFastClockSource()->now() + Seconds(15); + + std::vector<AbortTransactionDetails> toKill; + catalog->scanSessions(matcherAllSessions, [&](const ObservableSession& session) { + toKill.emplace_back(session.getSessionId(), + session.kill(ErrorCodes::InterruptedAtShutdown)); + }); + + auto newClient = opCtx->getServiceContext()->makeClient("ImplicitlyAbortTxnAtShutdown"); + AlternativeClientRegion acr(newClient); + + Status shutDownStatus(ErrorCodes::InterruptedAtShutdown, + "aborting transactions due to shutdown"); + + for (auto& killDetails : toKill) { + auto uniqueNewOpCtx = cc().makeOperationContext(); + auto newOpCtx = uniqueNewOpCtx.get(); + + newOpCtx->setDeadlineByDate(abortDeadline, ErrorCodes::ExceededTimeLimit); + + OperationContextSession sessionCtx(newOpCtx, std::move(killDetails.killToken)); + + auto session = OperationContextSession::get(newOpCtx); + newOpCtx->setLogicalSessionId(session->getSessionId()); + + auto txnRouter = TransactionRouter::get(newOpCtx); + txnRouter.implicitlyAbortTransaction(newOpCtx, shutDownStatus); + } +} + /** * NOTE: This function may be called at any time after registerShutdownTask is called below. It must * not depend on the prior execution of mongo initializers or the existence of threads. @@ -189,9 +248,11 @@ void cleanupTask(ServiceContext* serviceContext) { Client::initThread(getThreadName()); Client& client = cc(); - // Join the logical session cache before the transport layer - if (auto lsc = LogicalSessionCache::get(serviceContext)) { - lsc->joinOnShutDown(); + ServiceContext::UniqueOperationContext uniqueTxn; + OperationContext* opCtx = client.getOperationContext(); + if (!opCtx) { + uniqueTxn = client.makeOperationContext(); + opCtx = uniqueTxn.get(); } // Shutdown the TransportLayer so that new connections aren't accepted @@ -201,13 +262,20 @@ void cleanupTask(ServiceContext* serviceContext) { tl->shutdown(); } - ServiceContext::UniqueOperationContext uniqueTxn; - OperationContext* opCtx = client.getOperationContext(); - if (!opCtx) { - uniqueTxn = client.makeOperationContext(); - opCtx = uniqueTxn.get(); + try { + // Abort transactions while we can still send remote commands. + implicitlyAbortAllTransactions(opCtx); + } catch (const DBException& excep) { + warning() << "encountered " << excep + << " while trying to abort all active transactions"; } + if (auto lsc = LogicalSessionCache::get(serviceContext)) { + lsc->joinOnShutDown(); + } + + ReplicaSetMonitor::shutdown(); + opCtx->setIsExecutingShutdown(); if (serviceContext) { diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index 1c5c9946ad6..c2bdc3d7f68 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -1299,6 +1299,12 @@ void TransactionRouter::Router::_endTransactionTrackingIfNecessary( { stdx::lock_guard<Client> lk(*opCtx->getClient()); o(lk).timingStats.endTime = curTicks; + + // If startTime hasn't been set yet, that probably means it run into an error and is + // getting aborted. + if (o().timingStats.startTime == 0) { + o(lk).timingStats.startTime = curTicks; + } } if (shouldLog(logger::LogComponent::kTransaction, logger::LogSeverity::Debug(1)) || |