diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-03-18 18:25:31 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-03-21 09:20:17 -0400 |
commit | c3d1212f1a4a258833b792a43d2d2d8a144f1adb (patch) | |
tree | 02afa07ae3dc723a56abdc172ab9973217966a0d /src | |
parent | cc2b4b907aaf788f356ec23e1b315ea5d7b2cf82 (diff) | |
download | mongo-c3d1212f1a4a258833b792a43d2d2d8a144f1adb.tar.gz |
SERVER-40223 Ensure that the TransactionCoordinator will always complete if its scheduler is shut down
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/base/error_codes.err | 2 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator.h | 15 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_catalog.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_catalog.h | 22 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_catalog_test.cpp | 49 |
6 files changed, 80 insertions, 49 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index 6601adf40e6..4fa6ca8929b 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -285,6 +285,8 @@ error_code("WouldChangeOwningShard", 283, extra="WouldChangeOwningShardInfo") error_code("ForTestingErrorExtraInfoWithExtraInfoInNamespace", 284, extra="nested::twice::NestedErrorExtraInfoExample") error_code("IndexBuildAlreadyInProgress", 285) error_code("ChangeStreamHistoryLost", 286) +# The code below is for internal use only and must never be returned in a network response +error_code("TransactionCoordinatorDeadlineTaskCanceled", 287) # Error codes 4000-8999 are reserved. diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp index 0a9eb4f7bb6..6e7d4d377ed 100644 --- a/src/mongo/db/s/transaction_coordinator.cpp +++ b/src/mongo/db/s/transaction_coordinator.cpp @@ -83,9 +83,12 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext, if (coordinateCommitDeadline) { _deadlineScheduler = _scheduler->makeChildScheduler(); _deadlineScheduler - ->scheduleWorkAt(*coordinateCommitDeadline, - [this](OperationContext* opCtx) { cancelIfCommitNotYetStarted(); }) - .getAsync([](const Status&) {}); + ->scheduleWorkAt(*coordinateCommitDeadline, [](OperationContext* opCtx) {}) + .getAsync([this](const Status& s) { + if (s == ErrorCodes::TransactionCoordinatorDeadlineTaskCanceled) + return; + cancelIfCommitNotYetStarted(); + }); } } @@ -183,8 +186,8 @@ void TransactionCoordinator::cancelIfCommitNotYetStarted() { void TransactionCoordinator::_cancelTimeoutWaitForCommitTask() { if (_deadlineScheduler) { - _deadlineScheduler->shutdown( - {ErrorCodes::CallbackCanceled, "Interrupting the commit received deadline task"}); + _deadlineScheduler->shutdown({ErrorCodes::TransactionCoordinatorDeadlineTaskCanceled, + "Interrupting the commit received deadline task"}); } } diff --git a/src/mongo/db/s/transaction_coordinator.h b/src/mongo/db/s/transaction_coordinator.h index 6397cf21c65..d47a39b9a88 100644 --- a/src/mongo/db/s/transaction_coordinator.h +++ b/src/mongo/db/s/transaction_coordinator.h @@ -37,7 +37,12 @@ namespace mongo { /** - * Class responsible for coordinating two-phase commit across shards. + * State machine, which implements the two-phase commit protocol for a specific transaction, + * identified by lsid + txnNumber. + * + * The lifetime of a coordinator starts with a construction and ends with the `onCompletion()` + * future getting signaled. It is illegal to destroy a coordinator without waiting for + * `onCompletion()`. */ class TransactionCoordinator { MONGO_DISALLOW_COPYING(TransactionCoordinator); @@ -125,11 +130,9 @@ public: SharedSemiFuture<txn::CommitDecision> getDecision(); /** - * Returns a future that will be signaled when the transaction has completely finished - * committing or aborting (i.e. when commit/abort acknowledgements have been received from all - * participants, or the coordinator commit process is aborted locally for some reason). - * - * Unlike runCommit, this will not kick off the commit process if it has not already begun. + * Returns a future which can be listened on for when all the asynchronous activity spawned by + * this coordinator has completed. It will always eventually be set and once set it is safe to + * dispose of the TransactionCoordinator object. */ Future<void> onCompletion(); diff --git a/src/mongo/db/s/transaction_coordinator_catalog.cpp b/src/mongo/db/s/transaction_coordinator_catalog.cpp index 1d66b095602..8d5dc379d0e 100644 --- a/src/mongo/db/s/transaction_coordinator_catalog.cpp +++ b/src/mongo/db/s/transaction_coordinator_catalog.cpp @@ -78,8 +78,7 @@ void TransactionCoordinatorCatalog::onStepDown() { coordinator->cancelIfCommitNotYetStarted(); } - ul.lock(); - _cleanupCompletedCoordinators(ul); + _cleanupCompletedCoordinators(); } void TransactionCoordinatorCatalog::insert(OperationContext* opCtx, @@ -90,8 +89,9 @@ void TransactionCoordinatorCatalog::insert(OperationContext* opCtx, LOG(3) << "Inserting coordinator " << lsid.getId() << ':' << txnNumber << " into in-memory catalog"; + auto cleanupCoordinatorsGuard = makeGuard([&] { _cleanupCompletedCoordinators(); }); + stdx::unique_lock<stdx::mutex> ul(_mutex); - auto cleanupCoordinatorsGuard = makeGuard([&] { _cleanupCompletedCoordinators(ul); }); if (!forStepUp) { _waitForStepUpToComplete(ul, opCtx); } @@ -105,17 +105,23 @@ void TransactionCoordinatorCatalog::insert(OperationContext* opCtx, "Cannot insert a TransactionCoordinator into the TransactionCoordinatorCatalog with " "the same session ID and transaction number as a previous coordinator"); - // Schedule callback to remove coordinator from catalog when it either commits or aborts. + coordinatorsBySession[txnNumber] = coordinator; + + // Schedule callback to remove the coordinator from the catalog when all its activities have + // completed. This needs to be done outside of the mutex, in case the coordinator completed + // early because of stepdown. Otherwise the continuation could execute on the same thread and + // recursively acquire the mutex. + ul.unlock(); + coordinator->onCompletion().getAsync( [this, lsid, txnNumber](Status) { _remove(lsid, txnNumber); }); - - coordinatorsBySession[txnNumber] = std::move(coordinator); } std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::get( OperationContext* opCtx, const LogicalSessionId& lsid, TxnNumber txnNumber) { + _cleanupCompletedCoordinators(); + stdx::unique_lock<stdx::mutex> ul(_mutex); - auto cleanupCoordinatorsGuard = makeGuard([&] { _cleanupCompletedCoordinators(ul); }); _waitForStepUpToComplete(ul, opCtx); std::shared_ptr<TransactionCoordinator> coordinatorToReturn; @@ -149,8 +155,9 @@ std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::get( boost::optional<std::pair<TxnNumber, std::shared_ptr<TransactionCoordinator>>> TransactionCoordinatorCatalog::getLatestOnSession(OperationContext* opCtx, const LogicalSessionId& lsid) { + _cleanupCompletedCoordinators(); + stdx::unique_lock<stdx::mutex> ul(_mutex); - auto cleanupCoordinatorsGuard = makeGuard([&] { _cleanupCompletedCoordinators(ul); }); _waitForStepUpToComplete(ul, opCtx); const auto& coordinatorsForSessionIter = _coordinatorsBySession.find(lsid); @@ -242,9 +249,8 @@ void TransactionCoordinatorCatalog::_waitForStepUpToComplete(stdx::unique_lock<s uassertStatusOK(*_stepUpCompletionStatus); } -void TransactionCoordinatorCatalog::_cleanupCompletedCoordinators( - stdx::unique_lock<stdx::mutex>& ul) { - invariant(ul.owns_lock()); +void TransactionCoordinatorCatalog::_cleanupCompletedCoordinators() { + stdx::unique_lock<stdx::mutex> ul(_mutex); auto coordinatorsToCleanup = std::move(_coordinatorsToCleanup); // Ensure the destructors run outside of the lock in order to minimize the time this methods diff --git a/src/mongo/db/s/transaction_coordinator_catalog.h b/src/mongo/db/s/transaction_coordinator_catalog.h index 67f6daf77db..1605dbe8869 100644 --- a/src/mongo/db/s/transaction_coordinator_catalog.h +++ b/src/mongo/db/s/transaction_coordinator_catalog.h @@ -40,8 +40,8 @@ namespace mongo { /** - * A container for TransactionCoordinator objects, indexed by logical session id and transaction - * number. It allows holding several coordinator objects per session. + * This class is a registry for all the active TransactionCoordinator objects, indexed by lsid and + * txnNumber. It supports holding several coordinator objects per session. */ class TransactionCoordinatorCatalog { MONGO_DISALLOW_COPYING(TransactionCoordinatorCatalog); @@ -61,11 +61,15 @@ public: void onStepDown(); /** - * Inserts a coordinator into the catalog. + * Inserts a coordinator to be tracked by the catalog. * - * Note: Inserting a duplicate coordinator for the given session id and transaction number - * is not allowed and will lead to an invariant failure. Users of the catalog must ensure this - * does not take place. + * Duplicate lsid + txnNumber are not legal and will lead to invariant. The consumer of this + * class (TransactionCoordinatorService) guarantees this will not happen through the following + * means: + * - At step-up recovery time - the catalog starts empty and the coordinators inserted are read + * from the `config.coordinators` collection, which only contains unique entries + * - At regular run time - the session check-out mechanism guarantees that calls to get, + * followed by insert are atomic for the same lsid + txnNumber */ void insert(OperationContext* opCtx, const LogicalSessionId& lsid, @@ -122,11 +126,9 @@ private: void _remove(const LogicalSessionId& lsid, TxnNumber txnNumber); /** - * Goes through the '_coordinatorsToCleanup' list and deletes entries from it. As a side-effect - * also unlocks the passed-in lock, so no other synchronized members of the class should be - * accessed after it is called. + * Goes through the '_coordinatorsToCleanup' list and deletes entries from it */ - void _cleanupCompletedCoordinators(stdx::unique_lock<stdx::mutex>& ul); + void _cleanupCompletedCoordinators(); /** * Constructs a string representation of all the coordinators registered on the catalog. diff --git a/src/mongo/db/s/transaction_coordinator_catalog_test.cpp b/src/mongo/db/s/transaction_coordinator_catalog_test.cpp index b73955b5c78..db11b3aff10 100644 --- a/src/mongo/db/s/transaction_coordinator_catalog_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_catalog_test.cpp @@ -30,34 +30,27 @@ #include "mongo/platform/basic.h" #include "mongo/db/s/transaction_coordinator_catalog.h" -#include "mongo/s/shard_server_test_fixture.h" +#include "mongo/db/s/transaction_coordinator_test_fixture.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" namespace mongo { namespace { -class TransactionCoordinatorCatalogTest : public ShardServerTestFixture { +class TransactionCoordinatorCatalogTest : public TransactionCoordinatorTestFixture { protected: void setUp() override { - ShardServerTestFixture::setUp(); + TransactionCoordinatorTestFixture::setUp(); _coordinatorCatalog.emplace(); _coordinatorCatalog->exitStepUp(Status::OK()); } void tearDown() override { - // Make sure all of the coordinators are in a committed/aborted state before they are - // destroyed. Otherwise, the coordinator's destructor will invariant because it will still - // have outstanding futures that have not been completed (the one to remove itself from the - // catalog). This has the added benefit of testing whether it's okay to destroy - // the catalog while there are outstanding coordinators. - for (auto& coordinator : _coordinatorsForTest) { - coordinator->cancelIfCommitNotYetStarted(); - } - _coordinatorsForTest.clear(); - - ShardServerTestFixture::tearDown(); + _coordinatorCatalog->onStepDown(); + _coordinatorCatalog.reset(); + + TransactionCoordinatorTestFixture::tearDown(); } void createCoordinatorInCatalog(OperationContext* opCtx, @@ -71,12 +64,9 @@ protected: boost::none); _coordinatorCatalog->insert(opCtx, lsid, txnNumber, newCoordinator); - _coordinatorsForTest.push_back(newCoordinator); } boost::optional<TransactionCoordinatorCatalog> _coordinatorCatalog; - - std::vector<std::shared_ptr<TransactionCoordinator>> _coordinatorsForTest; }; TEST_F(TransactionCoordinatorCatalogTest, GetOnSessionThatDoesNotExistReturnsNone) { @@ -170,5 +160,30 @@ TEST_F(TransactionCoordinatorCatalogTest, ASSERT_EQ(latestTxnNumAndCoordinator->first, txnNumber2); } +TEST_F(TransactionCoordinatorCatalogTest, StepDownBeforeCoordinatorInsertedIntoCatalog) { + LogicalSessionId lsid = makeLogicalSessionIdForTest(); + TxnNumber txnNumber = 1; + + txn::AsyncWorkScheduler aws(getServiceContext()); + TransactionCoordinatorCatalog catalog; + catalog.exitStepUp(Status::OK()); + + auto coordinator = std::make_shared<TransactionCoordinator>(getServiceContext(), + lsid, + txnNumber, + aws.makeChildScheduler(), + network()->now() + Seconds{5}); + + aws.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Test step down"}); + catalog.onStepDown(); + + advanceClockAndExecuteScheduledTasks(); + + catalog.insert(operationContext(), lsid, txnNumber, coordinator); + catalog.join(); + + coordinator->onCompletion().wait(); +} + } // namespace } // namespace mongo |