diff options
author | Jason Zhang <jason.zhang@mongodb.com> | 2022-02-11 15:22:03 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-11 16:01:18 +0000 |
commit | 250795d1fb30fc9088c057902d81349100c79eb3 (patch) | |
tree | c01848fcb9e9d78c93656bdb7d5ff9b76c06438d /src | |
parent | 037b22172c54d5652298203077c16cc0ee7f26a1 (diff) | |
download | mongo-250795d1fb30fc9088c057902d81349100c79eb3.tar.gz |
SERVER-61090 Make setFCV wait for all TransactionCoordinators of internal transactions to be cleaned up
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/commands/set_feature_compatibility_version_command.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator.h | 14 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_service.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_service.h | 8 |
5 files changed, 77 insertions, 1 deletions
diff --git a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp index 75a38e8a472..f07008508d8 100644 --- a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp +++ b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp @@ -71,6 +71,7 @@ #include "mongo/db/s/resharding/resharding_donor_recipient_common.h" #include "mongo/db/s/sharding_ddl_coordinator_service.h" #include "mongo/db/s/sharding_util.h" +#include "mongo/db/s/transaction_coordinator_service.h" #include "mongo/db/server_options.h" #include "mongo/db/session_catalog.h" #include "mongo/db/session_txn_record_gen.h" @@ -776,6 +777,24 @@ private: LOGV2(5876101, "Completed removal of internal sessions from config.transactions."); } + // TODO: SERVER-62375 Remove upgrade/downgrade code for internal transactions. + // We want to wait for all of the transaction coordinator entries related to internal + // transactions to be removed. This is because the corresponding coordinator document + // contains has a special lsid which downgraded binaries cannot properly parse. + if (serverGlobalParams.clusterRole != ClusterRole::None) { + auto coordinatorService = TransactionCoordinatorService::get(opCtx); + for (const auto& future : + coordinatorService->getAllRemovalFuturesForCoordinatorsForInternalTransactions( + opCtx)) { + auto status = future.getNoThrow(opCtx); + uassertStatusOKWithContext(status, + str::stream() + << "Unable to remove all " + << NamespaceString::kTransactionCoordinatorsNamespace + << " documents for internal transactions"); + } + } + // TODO SERVER-62338 Remove when 6.0 branches-out if (serverGlobalParams.clusterRole == ClusterRole::ShardServer && !resharding::gFeatureFlagRecoverableShardsvrReshardCollectionCoordinator diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp index beaeadac455..be708a05742 100644 --- a/src/mongo/db/s/transaction_coordinator.cpp +++ b/src/mongo/db/s/transaction_coordinator.cpp @@ -349,7 +349,11 @@ TransactionCoordinator::TransactionCoordinator( _serviceContext->getPreciseClockSource()->now()); } - return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumberAndRetryCounter); + return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumberAndRetryCounter) + .then([this] { + invariant(!_coordinatorDocRemovalPromise.getFuture().isReady()); + _coordinatorDocRemovalPromise.emplaceValue(); + }); }) .getAsync([this, deadlineFuture = std::move(deadlineFuture)](Status s) mutable { // Interrupt this coordinator's scheduler hierarchy and join the deadline task's future @@ -464,6 +468,10 @@ void TransactionCoordinator::_done(Status status) { _decisionPromise.setError(status); } + if (!_coordinatorDocRemovalPromise.getFuture().isReady()) { + _coordinatorDocRemovalPromise.setError(status); + } + if (!status.isOK()) { _completionPromise.setError(status); } else { diff --git a/src/mongo/db/s/transaction_coordinator.h b/src/mongo/db/s/transaction_coordinator.h index ba8eaf144e6..ddf4af355ac 100644 --- a/src/mongo/db/s/transaction_coordinator.h +++ b/src/mongo/db/s/transaction_coordinator.h @@ -131,6 +131,15 @@ public: Step getStep() const; + /** + * TODO: SERVER-62375 Remove upgrade/downgrade code for internal transactions. + * Returns a future that will be resolved when we remove the coordinator + * document. + */ + SharedSemiFuture<void> getCoordinatorDocRemovalFuture() { + return _coordinatorDocRemovalPromise.getFuture(); + } + private: void _updateAssociatedClient(Client* client); @@ -196,6 +205,11 @@ private: bool _decisionDurable{false}; SharedPromise<txn::CommitDecision> _decisionPromise; + // TODO: SERVER-62375 Remove upgrade/downgrade code for internal transactions. + // Promise that is set when we have successfully removed the coordinator + // document. + SharedPromise<void> _coordinatorDocRemovalPromise; + // A list of all promises corresponding to futures that were returned to callers of // onCompletion. SharedPromise<txn::CommitDecision> _completionPromise; diff --git a/src/mongo/db/s/transaction_coordinator_service.cpp b/src/mongo/db/s/transaction_coordinator_service.cpp index bdc4cc373d9..f936323dcee 100644 --- a/src/mongo/db/s/transaction_coordinator_service.cpp +++ b/src/mongo/db/s/transaction_coordinator_service.cpp @@ -370,4 +370,31 @@ void TransactionCoordinatorService::cancelIfCommitNotYetStarted( } } +const std::vector<SharedSemiFuture<void>> +TransactionCoordinatorService::getAllRemovalFuturesForCoordinatorsForInternalTransactions( + OperationContext* opCtx) { + std::vector<SharedSemiFuture<void>> coordinatorStateDocRemovalFutures; + std::shared_ptr<CatalogAndScheduler> cas = _getCatalogAndScheduler(opCtx); + auto& catalog = cas->catalog; + + auto predicate = [](const LogicalSessionId lsid, + const TxnNumberAndRetryCounter txnNumberAndRetryCounter, + const std::shared_ptr<TransactionCoordinator> transactionCoordinator) { + TransactionCoordinator::Step step = transactionCoordinator->getStep(); + if (step > TransactionCoordinator::Step::kInactive && getParentSessionId(lsid)) { + return true; + } + return false; + }; + + auto visitorAction = [&](const LogicalSessionId lsid, + const TxnNumberAndRetryCounter txnNumberAndRetryCounter, + const std::shared_ptr<TransactionCoordinator> transactionCoordinator) { + coordinatorStateDocRemovalFutures.push_back( + transactionCoordinator->getCoordinatorDocRemovalFuture()); + }; + catalog.filter(predicate, visitorAction); + return coordinatorStateDocRemovalFutures; +} + } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_service.h b/src/mongo/db/s/transaction_coordinator_service.h index c0946bd5286..f0795d857e0 100644 --- a/src/mongo/db/s/transaction_coordinator_service.h +++ b/src/mongo/db/s/transaction_coordinator_service.h @@ -127,6 +127,14 @@ public: */ void joinPreviousRound(); + /** + * TODO: SERVER-62375 Remove upgrade/downgrade code for internal transactions. + * Returns a vector of futures that wait on the removal of transaction + * coordinators for internal transactions. + */ + const std::vector<SharedSemiFuture<void>> + getAllRemovalFuturesForCoordinatorsForInternalTransactions(OperationContext* opCtx); + private: struct CatalogAndScheduler { CatalogAndScheduler(ServiceContext* service) : scheduler(service) {} |