summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJason Zhang <jason.zhang@mongodb.com>2022-02-11 15:22:03 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-11 16:01:18 +0000
commit250795d1fb30fc9088c057902d81349100c79eb3 (patch)
treec01848fcb9e9d78c93656bdb7d5ff9b76c06438d /src
parent037b22172c54d5652298203077c16cc0ee7f26a1 (diff)
downloadmongo-250795d1fb30fc9088c057902d81349100c79eb3.tar.gz
SERVER-61090 Make setFCV wait for all TransactionCoordinators of internal transactions to be cleaned up
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands/set_feature_compatibility_version_command.cpp19
-rw-r--r--src/mongo/db/s/transaction_coordinator.cpp10
-rw-r--r--src/mongo/db/s/transaction_coordinator.h14
-rw-r--r--src/mongo/db/s/transaction_coordinator_service.cpp27
-rw-r--r--src/mongo/db/s/transaction_coordinator_service.h8
5 files changed, 77 insertions, 1 deletions
diff --git a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp
index 75a38e8a472..f07008508d8 100644
--- a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp
+++ b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp
@@ -71,6 +71,7 @@
#include "mongo/db/s/resharding/resharding_donor_recipient_common.h"
#include "mongo/db/s/sharding_ddl_coordinator_service.h"
#include "mongo/db/s/sharding_util.h"
+#include "mongo/db/s/transaction_coordinator_service.h"
#include "mongo/db/server_options.h"
#include "mongo/db/session_catalog.h"
#include "mongo/db/session_txn_record_gen.h"
@@ -776,6 +777,24 @@ private:
LOGV2(5876101, "Completed removal of internal sessions from config.transactions.");
}
+ // TODO: SERVER-62375 Remove upgrade/downgrade code for internal transactions.
+ // We want to wait for all of the transaction coordinator entries related to internal
+ // transactions to be removed. This is because the corresponding coordinator document
+ // contains has a special lsid which downgraded binaries cannot properly parse.
+ if (serverGlobalParams.clusterRole != ClusterRole::None) {
+ auto coordinatorService = TransactionCoordinatorService::get(opCtx);
+ for (const auto& future :
+ coordinatorService->getAllRemovalFuturesForCoordinatorsForInternalTransactions(
+ opCtx)) {
+ auto status = future.getNoThrow(opCtx);
+ uassertStatusOKWithContext(status,
+ str::stream()
+ << "Unable to remove all "
+ << NamespaceString::kTransactionCoordinatorsNamespace
+ << " documents for internal transactions");
+ }
+ }
+
// TODO SERVER-62338 Remove when 6.0 branches-out
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer &&
!resharding::gFeatureFlagRecoverableShardsvrReshardCollectionCoordinator
diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp
index beaeadac455..be708a05742 100644
--- a/src/mongo/db/s/transaction_coordinator.cpp
+++ b/src/mongo/db/s/transaction_coordinator.cpp
@@ -349,7 +349,11 @@ TransactionCoordinator::TransactionCoordinator(
_serviceContext->getPreciseClockSource()->now());
}
- return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumberAndRetryCounter);
+ return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumberAndRetryCounter)
+ .then([this] {
+ invariant(!_coordinatorDocRemovalPromise.getFuture().isReady());
+ _coordinatorDocRemovalPromise.emplaceValue();
+ });
})
.getAsync([this, deadlineFuture = std::move(deadlineFuture)](Status s) mutable {
// Interrupt this coordinator's scheduler hierarchy and join the deadline task's future
@@ -464,6 +468,10 @@ void TransactionCoordinator::_done(Status status) {
_decisionPromise.setError(status);
}
+ if (!_coordinatorDocRemovalPromise.getFuture().isReady()) {
+ _coordinatorDocRemovalPromise.setError(status);
+ }
+
if (!status.isOK()) {
_completionPromise.setError(status);
} else {
diff --git a/src/mongo/db/s/transaction_coordinator.h b/src/mongo/db/s/transaction_coordinator.h
index ba8eaf144e6..ddf4af355ac 100644
--- a/src/mongo/db/s/transaction_coordinator.h
+++ b/src/mongo/db/s/transaction_coordinator.h
@@ -131,6 +131,15 @@ public:
Step getStep() const;
+ /**
+ * TODO: SERVER-62375 Remove upgrade/downgrade code for internal transactions.
+ * Returns a future that will be resolved when we remove the coordinator
+ * document.
+ */
+ SharedSemiFuture<void> getCoordinatorDocRemovalFuture() {
+ return _coordinatorDocRemovalPromise.getFuture();
+ }
+
private:
void _updateAssociatedClient(Client* client);
@@ -196,6 +205,11 @@ private:
bool _decisionDurable{false};
SharedPromise<txn::CommitDecision> _decisionPromise;
+ // TODO: SERVER-62375 Remove upgrade/downgrade code for internal transactions.
+ // Promise that is set when we have successfully removed the coordinator
+ // document.
+ SharedPromise<void> _coordinatorDocRemovalPromise;
+
// A list of all promises corresponding to futures that were returned to callers of
// onCompletion.
SharedPromise<txn::CommitDecision> _completionPromise;
diff --git a/src/mongo/db/s/transaction_coordinator_service.cpp b/src/mongo/db/s/transaction_coordinator_service.cpp
index bdc4cc373d9..f936323dcee 100644
--- a/src/mongo/db/s/transaction_coordinator_service.cpp
+++ b/src/mongo/db/s/transaction_coordinator_service.cpp
@@ -370,4 +370,31 @@ void TransactionCoordinatorService::cancelIfCommitNotYetStarted(
}
}
+const std::vector<SharedSemiFuture<void>>
+TransactionCoordinatorService::getAllRemovalFuturesForCoordinatorsForInternalTransactions(
+ OperationContext* opCtx) {
+ std::vector<SharedSemiFuture<void>> coordinatorStateDocRemovalFutures;
+ std::shared_ptr<CatalogAndScheduler> cas = _getCatalogAndScheduler(opCtx);
+ auto& catalog = cas->catalog;
+
+ auto predicate = [](const LogicalSessionId lsid,
+ const TxnNumberAndRetryCounter txnNumberAndRetryCounter,
+ const std::shared_ptr<TransactionCoordinator> transactionCoordinator) {
+ TransactionCoordinator::Step step = transactionCoordinator->getStep();
+ if (step > TransactionCoordinator::Step::kInactive && getParentSessionId(lsid)) {
+ return true;
+ }
+ return false;
+ };
+
+ auto visitorAction = [&](const LogicalSessionId lsid,
+ const TxnNumberAndRetryCounter txnNumberAndRetryCounter,
+ const std::shared_ptr<TransactionCoordinator> transactionCoordinator) {
+ coordinatorStateDocRemovalFutures.push_back(
+ transactionCoordinator->getCoordinatorDocRemovalFuture());
+ };
+ catalog.filter(predicate, visitorAction);
+ return coordinatorStateDocRemovalFutures;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_service.h b/src/mongo/db/s/transaction_coordinator_service.h
index c0946bd5286..f0795d857e0 100644
--- a/src/mongo/db/s/transaction_coordinator_service.h
+++ b/src/mongo/db/s/transaction_coordinator_service.h
@@ -127,6 +127,14 @@ public:
*/
void joinPreviousRound();
+ /**
+ * TODO: SERVER-62375 Remove upgrade/downgrade code for internal transactions.
+ * Returns a vector of futures that wait on the removal of transaction
+ * coordinators for internal transactions.
+ */
+ const std::vector<SharedSemiFuture<void>>
+ getAllRemovalFuturesForCoordinatorsForInternalTransactions(OperationContext* opCtx);
+
private:
struct CatalogAndScheduler {
CatalogAndScheduler(ServiceContext* service) : scheduler(service) {}