summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2019-07-09 16:14:31 -0400
committerRandolph Tan <randolph@10gen.com>2019-07-18 14:10:50 -0400
commit52fba7897df46f8a52e590b6cc3a05a24aa93bed (patch)
tree11e158c060f737ec9dbeffefde68845e951931a5
parent15cb55efacbb7be6d3bd65ea265b1c45da5decff (diff)
downloadmongo-52fba7897df46f8a52e590b6cc3a05a24aa93bed.tar.gz
SERVER-39692 Make mongos shutdown drain all in-progress transactions
(cherry picked from commit 36dc61299993ce6473a4660150bfb25a59afce77)
-rw-r--r--src/mongo/SConscript1
-rw-r--r--src/mongo/client/replica_set_monitor.cpp4
-rw-r--r--src/mongo/client/replica_set_monitor_manager.cpp5
-rw-r--r--src/mongo/client/replica_set_monitor_manager.h4
-rw-r--r--src/mongo/db/session_catalog.cpp17
-rw-r--r--src/mongo/db/session_catalog.h11
-rw-r--r--src/mongo/s/server.cpp84
-rw-r--r--src/mongo/s/transaction_router.cpp6
8 files changed, 121 insertions, 11 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index 4cb70d3a055..931c0bb4c67 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -518,6 +518,7 @@ mongos = env.Program(
'db/server_options',
'db/server_options_base',
'db/service_liaison_mongos',
+ 'db/session_catalog',
'db/sessions_collection_sharded',
'db/startup_warnings_common',
'db/stats/counters',
diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp
index 248446f5cb7..033d2608210 100644
--- a/src/mongo/client/replica_set_monitor.cpp
+++ b/src/mongo/client/replica_set_monitor.cpp
@@ -284,7 +284,7 @@ Future<std::vector<HostAndPort>> ReplicaSetMonitor::_getHostsOrRefresh(
const ReadPreferenceSetting& criteria, Milliseconds maxWait) {
// If we're in shutdown, don't bother
- if (globalInShutdownDeprecated()) {
+ if (globalRSMonitorManager.isShutdown()) {
return Status(ErrorCodes::ShutdownInProgress, "Server is shutting down"_sd);
}
@@ -1268,7 +1268,7 @@ void SetState::notify(bool finishedScan) {
const auto cachedNow = now();
for (auto it = waiters.begin(); it != waiters.end();) {
- if (globalInShutdownDeprecated()) {
+ if (globalRSMonitorManager.isShutdown()) {
it->promise.setError(
{ErrorCodes::ShutdownInProgress, str::stream() << "Server is shutting down"});
waiters.erase(it++);
diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp
index dff3d8a16c4..452609a39bd 100644
--- a/src/mongo/client/replica_set_monitor_manager.cpp
+++ b/src/mongo/client/replica_set_monitor_manager.cpp
@@ -231,4 +231,9 @@ ReplicaSetChangeNotifier& ReplicaSetMonitorManager::getNotifier() {
return _notifier;
}
+bool ReplicaSetMonitorManager::isShutdown() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _isShutdown;
+}
+
} // namespace mongo
diff --git a/src/mongo/client/replica_set_monitor_manager.h b/src/mongo/client/replica_set_monitor_manager.h
index b0a08f8f4e5..4e037a70f91 100644
--- a/src/mongo/client/replica_set_monitor_manager.h
+++ b/src/mongo/client/replica_set_monitor_manager.h
@@ -98,11 +98,13 @@ public:
ReplicaSetChangeNotifier& getNotifier();
+ bool isShutdown() const;
+
private:
using ReplicaSetMonitorsMap = StringMap<std::weak_ptr<ReplicaSetMonitor>>;
// Protects access to the replica set monitors
- stdx::mutex _mutex;
+ mutable stdx::mutex _mutex;
// Executor for monitoring replica sets.
std::unique_ptr<executor::TaskExecutor> _taskExecutor;
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp
index db574180f6f..2710ec3e212 100644
--- a/src/mongo/db/session_catalog.cpp
+++ b/src/mongo/db/session_catalog.cpp
@@ -250,6 +250,23 @@ OperationContextSession::OperationContextSession(OperationContext* opCtx) : _opC
checkOut(opCtx);
}
+OperationContextSession::OperationContextSession(OperationContext* opCtx,
+ SessionCatalog::KillToken killToken)
+ : _opCtx(opCtx) {
+ auto& checkedOutSession = operationSessionDecoration(opCtx);
+
+ invariant(!checkedOutSession);
+ invariant(!opCtx->getLogicalSessionId()); // lsid is specified by killToken argument.
+
+ const auto catalog = SessionCatalog::get(opCtx);
+ auto scopedSessionForKill = catalog->checkOutSessionForKill(opCtx, std::move(killToken));
+
+ // We acquire a Client lock here to guard the construction of this session so that references to
+ // this session are safe to use while the lock is held
+ stdx::lock_guard lk(*opCtx->getClient());
+ checkedOutSession.emplace(std::move(scopedSessionForKill._scos));
+}
+
OperationContextSession::~OperationContextSession() {
// Only release the checked out session at the end of the top-level request from the client, not
// at the end of a nested DBDirectClient call
diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h
index d0f97902461..029a3881c1f 100644
--- a/src/mongo/db/session_catalog.h
+++ b/src/mongo/db/session_catalog.h
@@ -205,6 +205,8 @@ private:
boost::optional<SessionCatalog::KillToken> _killToken;
};
+class OperationContextSession;
+
/**
* RAII type returned by SessionCatalog::checkOutSessionForKill.
*
@@ -229,6 +231,8 @@ public:
}
private:
+ friend OperationContextSession;
+
ScopedCheckedOutSession _scos;
};
using SessionToKill = SessionCatalog::SessionToKill;
@@ -339,6 +343,13 @@ public:
* for the desired session to be checked in by another user.
*/
OperationContextSession(OperationContext* opCtx);
+
+ /**
+ * Same as constructor above, but takes the session id from the killToken and uses
+ * checkoutSessionForKill instead, placing the checked-out session on the operation context.
+ * Must not be called if the operation context contains a session.
+ */
+ OperationContextSession(OperationContext* opCtx, SessionCatalog::KillToken killToken);
~OperationContextSession();
/**
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index 53293c690a5..df41a09ec8d 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -89,6 +89,7 @@
#include "mongo/s/sharding_egress_metadata_hook_for_mongos.h"
#include "mongo/s/sharding_initialization.h"
#include "mongo/s/sharding_uptime_reporter.h"
+#include "mongo/s/transaction_router.h"
#include "mongo/s/version_mongos.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/thread.h"
@@ -177,6 +178,64 @@ Status waitForSigningKeys(OperationContext* opCtx) {
}
}
+
+/**
+ * Abort all active transactions in the catalog that has not yet been committed.
+ *
+ * Outline:
+ * 1. Mark all sessions as killed and collect killTokens from each session.
+ * 2. Create a new Client in order not to pollute the current OperationContext.
+ * 3. Create new OperationContexts for each session to be killed and perform the necessary setup
+ * to be able to abort transactions properly: like setting TxnNumber and attaching the session
+ * to the OperationContext.
+ * 4. Send abortTransaction.
+ */
+void implicitlyAbortAllTransactions(OperationContext* opCtx) {
+ struct AbortTransactionDetails {
+ public:
+ AbortTransactionDetails(LogicalSessionId _lsid, SessionCatalog::KillToken _killToken)
+ : lsid(std::move(_lsid)), killToken(std::move(_killToken)) {}
+
+ LogicalSessionId lsid;
+ SessionCatalog::KillToken killToken;
+ };
+
+ const auto catalog = SessionCatalog::get(opCtx);
+
+ SessionKiller::Matcher matcherAllSessions(
+ KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)});
+
+ const auto abortDeadline =
+ opCtx->getServiceContext()->getFastClockSource()->now() + Seconds(15);
+
+ std::vector<AbortTransactionDetails> toKill;
+ catalog->scanSessions(matcherAllSessions, [&](const ObservableSession& session) {
+ toKill.emplace_back(session.getSessionId(),
+ session.kill(ErrorCodes::InterruptedAtShutdown));
+ });
+
+ auto newClient = opCtx->getServiceContext()->makeClient("ImplicitlyAbortTxnAtShutdown");
+ AlternativeClientRegion acr(newClient);
+
+ Status shutDownStatus(ErrorCodes::InterruptedAtShutdown,
+ "aborting transactions due to shutdown");
+
+ for (auto& killDetails : toKill) {
+ auto uniqueNewOpCtx = cc().makeOperationContext();
+ auto newOpCtx = uniqueNewOpCtx.get();
+
+ newOpCtx->setDeadlineByDate(abortDeadline, ErrorCodes::ExceededTimeLimit);
+
+ OperationContextSession sessionCtx(newOpCtx, std::move(killDetails.killToken));
+
+ auto session = OperationContextSession::get(newOpCtx);
+ newOpCtx->setLogicalSessionId(session->getSessionId());
+
+ auto txnRouter = TransactionRouter::get(newOpCtx);
+ txnRouter.implicitlyAbortTransaction(newOpCtx, shutDownStatus);
+ }
+}
+
/**
* NOTE: This function may be called at any time after registerShutdownTask is called below. It must
* not depend on the prior execution of mongo initializers or the existence of threads.
@@ -189,9 +248,11 @@ void cleanupTask(ServiceContext* serviceContext) {
Client::initThread(getThreadName());
Client& client = cc();
- // Join the logical session cache before the transport layer
- if (auto lsc = LogicalSessionCache::get(serviceContext)) {
- lsc->joinOnShutDown();
+ ServiceContext::UniqueOperationContext uniqueTxn;
+ OperationContext* opCtx = client.getOperationContext();
+ if (!opCtx) {
+ uniqueTxn = client.makeOperationContext();
+ opCtx = uniqueTxn.get();
}
// Shutdown the TransportLayer so that new connections aren't accepted
@@ -201,13 +262,20 @@ void cleanupTask(ServiceContext* serviceContext) {
tl->shutdown();
}
- ServiceContext::UniqueOperationContext uniqueTxn;
- OperationContext* opCtx = client.getOperationContext();
- if (!opCtx) {
- uniqueTxn = client.makeOperationContext();
- opCtx = uniqueTxn.get();
+ try {
+ // Abort transactions while we can still send remote commands.
+ implicitlyAbortAllTransactions(opCtx);
+ } catch (const DBException& excep) {
+ warning() << "encountered " << excep
+ << " while trying to abort all active transactions";
}
+ if (auto lsc = LogicalSessionCache::get(serviceContext)) {
+ lsc->joinOnShutDown();
+ }
+
+ ReplicaSetMonitor::shutdown();
+
opCtx->setIsExecutingShutdown();
if (serviceContext) {
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index 1c5c9946ad6..c2bdc3d7f68 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -1299,6 +1299,12 @@ void TransactionRouter::Router::_endTransactionTrackingIfNecessary(
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
o(lk).timingStats.endTime = curTicks;
+
+ // If startTime hasn't been set yet, that probably means it run into an error and is
+ // getting aborted.
+ if (o().timingStats.startTime == 0) {
+ o(lk).timingStats.startTime = curTicks;
+ }
}
if (shouldLog(logger::LogComponent::kTransaction, logger::LogSeverity::Debug(1)) ||