diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-01-22 14:20:58 -0500 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-02-12 07:08:24 -0500 |
commit | 1e7d26609cfa19c4840fe4ef4c1462b7fed48974 (patch) | |
tree | 575241d1ab5b80d0357b9cbb1745289cfa25e81c | |
parent | 7554cd5796762cfb037ad652818fff7b7e67a8a7 (diff) | |
download | mongo-1e7d26609cfa19c4840fe4ef4c1462b7fed48974.tar.gz |
SERVER-38521 Make the transaction coordinator step-up task interruptible
-rw-r--r-- | src/mongo/db/s/SConscript | 4 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_initialization_mongod.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_catalog.cpp | 63 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_catalog.h | 38 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_catalog_test.cpp | 36 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_driver.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_service.cpp | 148 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_service.h | 44 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_service_test.cpp | 105 |
9 files changed, 320 insertions, 131 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 128f6985853..d35c69de259 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -75,8 +75,9 @@ env.Library( '$BUILD_DIR/mongo/db/ops/write_ops_exec', '$BUILD_DIR/mongo/db/repl/oplog', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', - '$BUILD_DIR/mongo/db/storage/remove_saver', '$BUILD_DIR/mongo/db/rw_concern_d', + '$BUILD_DIR/mongo/db/storage/remove_saver', + '$BUILD_DIR/mongo/db/transaction_coordinator', '$BUILD_DIR/mongo/s/client/shard_local', '$BUILD_DIR/mongo/s/sharding_initialization', 'chunk_splitter', @@ -304,7 +305,6 @@ env.Library( '$BUILD_DIR/mongo/db/index_d', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', '$BUILD_DIR/mongo/db/repl/replica_set_messages', - '$BUILD_DIR/mongo/db/transaction_coordinator', '$BUILD_DIR/mongo/s/commands/shared_cluster_commands', '$BUILD_DIR/mongo/s/sharding_router_api', '$BUILD_DIR/mongo/s/sharding_initialization', diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp index 70aac5722f1..f2be7485319 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.cpp +++ b/src/mongo/db/s/sharding_initialization_mongod.cpp @@ -54,6 +54,7 @@ #include "mongo/db/s/shard_server_catalog_cache_loader.h" #include "mongo/db/s/sharding_config_optime_gossip.h" #include "mongo/db/server_options.h" +#include "mongo/db/transaction_coordinator_service.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/metadata/egress_metadata_hook_list.h" #include "mongo/s/catalog_cache.h" @@ -122,17 +123,20 @@ void initializeShardingEnvironmentOnShardServer(OperationContext* opCtx, // Determine primary/secondary/standalone state in order to properly initialize sharding // components. - auto replCoord = repl::ReplicationCoordinator::get(opCtx); + const auto replCoord = repl::ReplicationCoordinator::get(opCtx); bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; bool isStandaloneOrPrimary = - !isReplSet || (repl::ReplicationCoordinator::get(opCtx)->getMemberState() == - repl::MemberState::RS_PRIMARY); + !isReplSet || (replCoord->getMemberState() == repl::MemberState::RS_PRIMARY); CatalogCacheLoader::get(opCtx).initializeReplicaSetRole(isStandaloneOrPrimary); ChunkSplitter::get(opCtx).onShardingInitialization(isStandaloneOrPrimary); PeriodicBalancerConfigRefresher::get(opCtx).onShardingInitialization(opCtx->getServiceContext(), isStandaloneOrPrimary); + // Start the transaction coordinator service only if the node is the primary of a replica set + TransactionCoordinatorService::get(opCtx)->onShardingInitialization( + opCtx, isReplSet && isStandaloneOrPrimary); + Grid::get(opCtx)->setShardingInitialized(); LOG(0) << "Finished initializing sharding components for " diff --git a/src/mongo/db/transaction_coordinator_catalog.cpp b/src/mongo/db/transaction_coordinator_catalog.cpp index 460d450d4b1..8ebc71726a1 100644 --- a/src/mongo/db/transaction_coordinator_catalog.cpp +++ b/src/mongo/db/transaction_coordinator_catalog.cpp @@ -45,7 +45,9 @@ MONGO_FAIL_POINT_DEFINE(doNotForgetCoordinator); TransactionCoordinatorCatalog::TransactionCoordinatorCatalog() = default; -TransactionCoordinatorCatalog::~TransactionCoordinatorCatalog() = default; +TransactionCoordinatorCatalog::~TransactionCoordinatorCatalog() { + join(); +} void TransactionCoordinatorCatalog::insert(OperationContext* opCtx, LogicalSessionId lsid, @@ -68,15 +70,8 @@ void TransactionCoordinatorCatalog::insert(OperationContext* opCtx, "the same session ID and transaction number as a previous coordinator"); // Schedule callback to remove coordinator from catalog when it either commits or aborts. - coordinator->onCompletion().getAsync([ - catalogWeakPtr = std::weak_ptr<TransactionCoordinatorCatalog>(shared_from_this()), - lsid, - txnNumber - ](Status) { - if (auto catalog = catalogWeakPtr.lock()) { - catalog->remove(lsid, txnNumber); - } - }); + coordinator->onCompletion().getAsync( + [this, lsid, txnNumber](Status) { remove(lsid, txnNumber); }); LOG(3) << "Inserting coordinator for transaction " << txnNumber << " on session " << lsid.toBSON() << " into in-memory catalog"; @@ -170,7 +165,7 @@ void TransactionCoordinatorCatalog::remove(LogicalSessionId lsid, TxnNumber txnN } coordinatorsForSession.erase(coordinatorForTxnIter); - if (coordinatorsForSession.size() == 0) { + if (coordinatorsForSession.empty()) { _coordinatorsBySession.erase(coordinatorsForSessionIter); } } @@ -182,41 +177,43 @@ void TransactionCoordinatorCatalog::remove(LogicalSessionId lsid, TxnNumber txnN } } -void TransactionCoordinatorCatalog::enterStepUp(OperationContext* opCtx) { - stdx::unique_lock<stdx::mutex> lk(_mutex); +void TransactionCoordinatorCatalog::exitStepUp(Status status) { + if (status.isOK()) { + LOG(0) << "Incoming coordinateCommit requests are now enabled"; + } else { + warning() << "Coordinator recovery failed and coordinateCommit requests will not be allowed" + << causedBy(status); + } - // If this node stepped down and stepped back up, the asynchronous stepUp task from the previous - // stepUp may still be running, so wait for the previous stepUp task to complete. - LOG(3) << "Waiting for coordinator stepup task from previous term, if any, to complete"; - _waitForStepUpToComplete(lk, opCtx); + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(!_stepUpCompletionStatus); + _stepUpCompletionStatus = std::move(status); + _stepUpCompleteCv.notify_all(); +} - _stepUpInProgress = true; +void TransactionCoordinatorCatalog::join() { + stdx::unique_lock<stdx::mutex> ul(_mutex); - LOG(3) << "Waiting for there to be no active coordinators; current coordinator catalog: " - << this->_toString(lk); - opCtx->waitForConditionOrInterrupt( - _noActiveCoordinatorsCv, lk, [this]() { return _coordinatorsBySession.empty(); }); + while (!_noActiveCoordinatorsCv.wait_for( + ul, stdx::chrono::seconds{5}, [this] { return _coordinatorsBySession.empty(); })) { + LOG(0) << "After 5 seconds of wait there are still " << _coordinatorsBySession.size() + << " sessions left with active coordinators which have not yet completed"; + LOG(0) << _toString(ul); + } } -void TransactionCoordinatorCatalog::exitStepUp() { +std::string TransactionCoordinatorCatalog::toString() const { stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_stepUpInProgress); - - LOG(3) << "Signaling stepup complete"; - _stepUpInProgress = false; - _noStepUpInProgressCv.notify_all(); + return _toString(lk); } void TransactionCoordinatorCatalog::_waitForStepUpToComplete(stdx::unique_lock<stdx::mutex>& lk, OperationContext* opCtx) { invariant(lk.owns_lock()); opCtx->waitForConditionOrInterrupt( - _noStepUpInProgressCv, lk, [this]() { return !_stepUpInProgress; }); -} + _stepUpCompleteCv, lk, [this]() { return bool(_stepUpCompletionStatus); }); -std::string TransactionCoordinatorCatalog::toString() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _toString(lk); + uassertStatusOK(*_stepUpCompletionStatus); } std::string TransactionCoordinatorCatalog::_toString(WithLock wl) const { diff --git a/src/mongo/db/transaction_coordinator_catalog.h b/src/mongo/db/transaction_coordinator_catalog.h index 98acd96d3e9..5be509fde1e 100644 --- a/src/mongo/db/transaction_coordinator_catalog.h +++ b/src/mongo/db/transaction_coordinator_catalog.h @@ -48,8 +48,7 @@ namespace mongo { * itself from the config.txnCommitDecisions collection, which will be done on transition to * primary (whether from startup or ordinary step up). */ -class TransactionCoordinatorCatalog - : public std::enable_shared_from_this<TransactionCoordinatorCatalog> { +class TransactionCoordinatorCatalog { MONGO_DISALLOW_COPYING(TransactionCoordinatorCatalog); public: @@ -94,16 +93,15 @@ public: void remove(LogicalSessionId lsid, TxnNumber txnNumber); /** - * Waits for the catalog to no longer be marked as in stepup and then marks the catalog as in - * stepup and waits for all active coordinators from the previous term to complete (either - * successfully or with an error) and be removed from the catalog. + * Marks no stepup in progress and signals that no stepup is in progress. */ - void enterStepUp(OperationContext* opCtx); + void exitStepUp(Status status); /** - * Marks no stepup in progress and signals that no stepup is in progress. + * Blocking method, which waits for all coordinators registered on the catalog to complete + * (after this returns, it is guaranteed that all onCompletion futures have been set) */ - void exitStepUp(); + void join(); /** * Returns a string representation of the map from LogicalSessionId to the list of TxnNumbers @@ -140,16 +138,20 @@ private: // commit coordination and would normally be expunged from memory. LogicalSessionIdMap<TransactionCoordinatorMap> _coordinatorsBySessionDefunct; - /** - * Whether a thread is actively executing a stepUp task. - */ - bool _stepUpInProgress{false}; - - /** - * Notified when the *current* in-progress stepUp task has completed, i.e., _stepUpInProgress - * becomes false. - */ - stdx::condition_variable _noStepUpInProgressCv; + // Stores the result of the coordinator catalog's recovery attempt (the status passed to + // exitStepUp). This is what the values mean: + // + // stepUpCompletionStatus = none - brand new created object (exitStepUp has not been called + // yet). All calls will block. + // stepUpCompletionStatus = OK - recovery completed successfully, transactions can be + // coordinated + // stepUpCompletionStatus = error - recovery completed with an error, transactions cannot be + // coordinated (all methods will fail with this error) + boost::optional<Status> _stepUpCompletionStatus; + + // Signaled when recovery of the catalog completes (when _stepUpCompletionStatus transitions + // from none to either OK or error) + stdx::condition_variable _stepUpCompleteCv; // Notified when the last coordinator is removed from the catalog. stdx::condition_variable _noActiveCoordinatorsCv; diff --git a/src/mongo/db/transaction_coordinator_catalog_test.cpp b/src/mongo/db/transaction_coordinator_catalog_test.cpp index 9bc847790ad..57fcca2831d 100644 --- a/src/mongo/db/transaction_coordinator_catalog_test.cpp +++ b/src/mongo/db/transaction_coordinator_catalog_test.cpp @@ -42,11 +42,12 @@ class TransactionCoordinatorCatalogTest : public ShardServerTestFixture { protected: void setUp() override { ShardServerTestFixture::setUp(); - _coordinatorCatalog = std::make_shared<TransactionCoordinatorCatalog>(); + + _coordinatorCatalog.emplace(); + _coordinatorCatalog->exitStepUp(Status::OK()); } void tearDown() override { - _coordinatorCatalog.reset(); // Make sure all of the coordinators are in a committed/aborted state before they are // destroyed. Otherwise, the coordinator's destructor will invariant because it will still // have outstanding futures that have not been completed (the one to remove itself from the @@ -60,31 +61,25 @@ protected: ShardServerTestFixture::tearDown(); } - TransactionCoordinatorCatalog& coordinatorCatalog() { - return *_coordinatorCatalog; - } - void createCoordinatorInCatalog(OperationContext* opCtx, LogicalSessionId lsid, TxnNumber txnNumber) { auto newCoordinator = std::make_shared<TransactionCoordinator>(getServiceContext(), lsid, txnNumber); - coordinatorCatalog().insert(opCtx, lsid, txnNumber, newCoordinator); + _coordinatorCatalog->insert(opCtx, lsid, txnNumber, newCoordinator); _coordinatorsForTest.push_back(newCoordinator); } -private: - // Note: MUST be shared_ptr due to use of std::enable_shared_from_this - std::shared_ptr<TransactionCoordinatorCatalog> _coordinatorCatalog; + boost::optional<TransactionCoordinatorCatalog> _coordinatorCatalog; + std::vector<std::shared_ptr<TransactionCoordinator>> _coordinatorsForTest; }; TEST_F(TransactionCoordinatorCatalogTest, GetOnSessionThatDoesNotExistReturnsNone) { LogicalSessionId lsid = makeLogicalSessionIdForTest(); TxnNumber txnNumber = 1; - - auto coordinator = coordinatorCatalog().get(operationContext(), lsid, txnNumber); + auto coordinator = _coordinatorCatalog->get(operationContext(), lsid, txnNumber); ASSERT(coordinator == nullptr); } @@ -93,16 +88,15 @@ TEST_F(TransactionCoordinatorCatalogTest, LogicalSessionId lsid = makeLogicalSessionIdForTest(); TxnNumber txnNumber = 1; createCoordinatorInCatalog(operationContext(), lsid, txnNumber); - auto coordinatorInCatalog = coordinatorCatalog().get(operationContext(), lsid, txnNumber + 1); + auto coordinatorInCatalog = _coordinatorCatalog->get(operationContext(), lsid, txnNumber + 1); ASSERT(coordinatorInCatalog == nullptr); } - TEST_F(TransactionCoordinatorCatalogTest, CreateFollowedByGetReturnsCoordinator) { LogicalSessionId lsid = makeLogicalSessionIdForTest(); TxnNumber txnNumber = 1; createCoordinatorInCatalog(operationContext(), lsid, txnNumber); - auto coordinatorInCatalog = coordinatorCatalog().get(operationContext(), lsid, txnNumber); + auto coordinatorInCatalog = _coordinatorCatalog->get(operationContext(), lsid, txnNumber); ASSERT(coordinatorInCatalog != nullptr); } @@ -113,7 +107,7 @@ TEST_F(TransactionCoordinatorCatalogTest, SecondCreateForSessionDoesNotOverwrite createCoordinatorInCatalog(operationContext(), lsid, txnNumber1); createCoordinatorInCatalog(operationContext(), lsid, txnNumber2); - auto coordinator1InCatalog = coordinatorCatalog().get(operationContext(), lsid, txnNumber1); + auto coordinator1InCatalog = _coordinatorCatalog->get(operationContext(), lsid, txnNumber1); ASSERT(coordinator1InCatalog != nullptr); } @@ -130,7 +124,7 @@ DEATH_TEST_F(TransactionCoordinatorCatalogTest, TEST_F(TransactionCoordinatorCatalogTest, GetLatestOnSessionWithNoCoordinatorsReturnsNone) { LogicalSessionId lsid = makeLogicalSessionIdForTest(); auto latestTxnNumAndCoordinator = - coordinatorCatalog().getLatestOnSession(operationContext(), lsid); + _coordinatorCatalog->getLatestOnSession(operationContext(), lsid); ASSERT_FALSE(latestTxnNumAndCoordinator); } @@ -140,7 +134,7 @@ TEST_F(TransactionCoordinatorCatalogTest, TxnNumber txnNumber = 1; createCoordinatorInCatalog(operationContext(), lsid, txnNumber); auto latestTxnNumAndCoordinator = - coordinatorCatalog().getLatestOnSession(operationContext(), lsid); + _coordinatorCatalog->getLatestOnSession(operationContext(), lsid); ASSERT_TRUE(latestTxnNumAndCoordinator); ASSERT_EQ(latestTxnNumAndCoordinator->first, txnNumber); @@ -150,13 +144,13 @@ TEST_F(TransactionCoordinatorCatalogTest, CoordinatorsRemoveThemselvesFromCatalo LogicalSessionId lsid = makeLogicalSessionIdForTest(); TxnNumber txnNumber = 1; createCoordinatorInCatalog(operationContext(), lsid, txnNumber); - auto coordinator = coordinatorCatalog().get(operationContext(), lsid, txnNumber); + auto coordinator = _coordinatorCatalog->get(operationContext(), lsid, txnNumber); coordinator->cancelIfCommitNotYetStarted(); ASSERT(coordinator->onCompletion().isReady()); auto latestTxnNumAndCoordinator = - coordinatorCatalog().getLatestOnSession(operationContext(), lsid); + _coordinatorCatalog->getLatestOnSession(operationContext(), lsid); ASSERT_FALSE(latestTxnNumAndCoordinator); } @@ -168,7 +162,7 @@ TEST_F(TransactionCoordinatorCatalogTest, createCoordinatorInCatalog(operationContext(), lsid, txnNumber1); createCoordinatorInCatalog(operationContext(), lsid, txnNumber2); auto latestTxnNumAndCoordinator = - coordinatorCatalog().getLatestOnSession(operationContext(), lsid); + _coordinatorCatalog->getLatestOnSession(operationContext(), lsid); ASSERT_EQ(latestTxnNumAndCoordinator->first, txnNumber2); } diff --git a/src/mongo/db/transaction_coordinator_driver.cpp b/src/mongo/db/transaction_coordinator_driver.cpp index 6e048c731b7..3af5d5b2695 100644 --- a/src/mongo/db/transaction_coordinator_driver.cpp +++ b/src/mongo/db/transaction_coordinator_driver.cpp @@ -580,10 +580,9 @@ std::vector<TransactionCoordinatorDocument> TransactionCoordinatorDriver::readAl OperationContext* opCtx) { std::vector<TransactionCoordinatorDocument> allCoordinatorDocs; - Query query; DBDirectClient client(opCtx); auto coordinatorDocsCursor = - client.query(NamespaceString::kTransactionCoordinatorsNamespace, query); + client.query(NamespaceString::kTransactionCoordinatorsNamespace, Query{}); while (coordinatorDocsCursor->more()) { // TODO (SERVER-38307): Try/catch around parsing the document and skip the document if it diff --git a/src/mongo/db/transaction_coordinator_service.cpp b/src/mongo/db/transaction_coordinator_service.cpp index 27e1fba797a..9180607bacb 100644 --- a/src/mongo/db/transaction_coordinator_service.cpp +++ b/src/mongo/db/transaction_coordinator_service.cpp @@ -50,10 +50,11 @@ const auto transactionCoordinatorServiceDecoration = } // namespace -TransactionCoordinatorService::TransactionCoordinatorService() - : _coordinatorCatalog(std::make_shared<TransactionCoordinatorCatalog>()) {} +TransactionCoordinatorService::TransactionCoordinatorService() = default; -TransactionCoordinatorService::~TransactionCoordinatorService() = default; +TransactionCoordinatorService::~TransactionCoordinatorService() { + _joinPreviousRound(); +} TransactionCoordinatorService* TransactionCoordinatorService::get(OperationContext* opCtx) { return get(opCtx->getServiceContext()); @@ -67,7 +68,10 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx, LogicalSessionId lsid, TxnNumber txnNumber, Date_t commitDeadline) { - if (auto latestTxnNumAndCoordinator = _coordinatorCatalog->getLatestOnSession(opCtx, lsid)) { + auto cas = _getCatalogAndScheduler(opCtx); + auto& catalog = cas->catalog; + + if (auto latestTxnNumAndCoordinator = catalog.getLatestOnSession(opCtx, lsid)) { auto latestCoordinator = latestTxnNumAndCoordinator->second; if (txnNumber == latestTxnNumAndCoordinator->first) { return; @@ -78,7 +82,7 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx, auto coordinator = std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(), lsid, txnNumber); - _coordinatorCatalog->insert(opCtx, lsid, txnNumber, coordinator); + catalog.insert(opCtx, lsid, txnNumber, coordinator); // Schedule a task in the future to cancel the commit coordination on the coordinator, so that // the coordinator does not remain in memory forever (in case the particpant list is never @@ -103,7 +107,10 @@ boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::coor LogicalSessionId lsid, TxnNumber txnNumber, const std::set<ShardId>& participantList) { - auto coordinator = _coordinatorCatalog->get(opCtx, lsid, txnNumber); + auto cas = _getCatalogAndScheduler(opCtx); + auto& catalog = cas->catalog; + + auto coordinator = catalog.get(opCtx, lsid, txnNumber); if (!coordinator) { return boost::none; } @@ -117,49 +124,48 @@ boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::coor // TODO (SERVER-37364): Re-enable the coordinator returning the decision as soon as the decision // is made durable. Currently the coordinator waits to hear acks because participants in prepare // reject requests with a higher transaction number, causing tests to fail. - // return coordinator.get()->runCommit(participants); + // return coordinator->runCommit(participants); } boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::recoverCommit( OperationContext* opCtx, LogicalSessionId lsid, TxnNumber txnNumber) { - auto coordinator = _coordinatorCatalog->get(opCtx, lsid, txnNumber); + auto cas = _getCatalogAndScheduler(opCtx); + auto& catalog = cas->catalog; + + auto coordinator = catalog.get(opCtx, lsid, txnNumber); if (!coordinator) { return boost::none; } - return coordinator.get()->onCompletion().then( - [coordinator] { return coordinator.get()->getDecision().get(); }); + return coordinator->onCompletion().then( + [coordinator] { return coordinator->getDecision().get(); }); + // TODO (SERVER-37364): Re-enable the coordinator returning the decision as soon as the decision // is made durable. Currently the coordinator waits to hear acks because participants in prepare // reject requests with a higher transaction number, causing tests to fail. - // return coordinator.get()->getDecision(); + // return coordinator->getDecision(); } -void TransactionCoordinatorService::onStepUp(OperationContext* opCtx) { - // Blocks until the stepup task from the last term completes, then marks a new stepup task as - // having begun and blocks until all active coordinators complete (are removed from the - // catalog). - // Note: No other threads can read the catalog while the catalog is marked as having an active - // stepup task. - _coordinatorCatalog->enterStepUp(opCtx); +void TransactionCoordinatorService::onStepUp(OperationContext* opCtx, + Milliseconds recoveryDelayForTesting) { + _joinPreviousRound(); - const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); - auto status = - executor - ->scheduleWork([ this, service = opCtx->getServiceContext() ]( - const executor::TaskExecutor::CallbackArgs&) { - try { - // The opCtx destructor handles unsetting itself from the Client - ThreadClient threadClient("TransactionCoordinator-StepUp", service); - auto opCtxPtr = Client::getCurrent()->makeOperationContext(); - auto opCtx = opCtxPtr.get(); + stdx::lock_guard<stdx::mutex> lg(_mutex); + invariant(!_catalogAndScheduler); + _catalogAndScheduler = std::make_shared<CatalogAndScheduler>(opCtx->getServiceContext()); + auto future = + _catalogAndScheduler->scheduler + .scheduleWorkIn( + recoveryDelayForTesting, + [catalogAndScheduler = _catalogAndScheduler](OperationContext * opCtx) { auto& replClientInfo = repl::ReplClientInfo::forClient(opCtx->getClient()); replClientInfo.setLastOpToSystemLastOpTime(opCtx); const auto lastOpTime = replClientInfo.getLastOp(); - LOG(3) << "Going to wait for client's last OpTime " << lastOpTime + LOG(3) << "Waiting for OpTime " << lastOpTime << " to become majority committed"; + WriteConcernResult unusedWCResult; uassertStatusOK(waitForWriteConcern( opCtx, @@ -175,38 +181,82 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx) { LOG(0) << "Need to resume coordinating commit for " << coordinatorDocs.size() << " transactions"; + auto& catalog = catalogAndScheduler->catalog; + for (const auto& doc : coordinatorDocs) { LOG(3) << "Going to resume coordinating commit for " << doc.toBSON(); + const auto lsid = *doc.getId().getSessionId(); const auto txnNumber = *doc.getId().getTxnNumber(); auto coordinator = std::make_shared<TransactionCoordinator>( opCtx->getServiceContext(), lsid, txnNumber); - _coordinatorCatalog->insert( - opCtx, lsid, txnNumber, coordinator, true /* forStepUp */); + catalog.insert(opCtx, lsid, txnNumber, coordinator, true /* forStepUp */); coordinator->continueCommit(doc); } + }) + .tapAll([catalogAndScheduler = _catalogAndScheduler](Status status) { + // TODO (SERVER-38320): Reschedule the step-up task if the interruption was not due + // to stepdown. - _coordinatorCatalog->exitStepUp(); - - LOG(3) << "Incoming coordinateCommit requests now accepted"; - } catch (const DBException& e) { - LOG(3) << "Failed while executing thread to resume coordinating commit for " - "pending " - "transactions " - << causedBy(e.toStatus()); - _coordinatorCatalog->exitStepUp(); - } - }) - .getStatus(); - - // TODO (SERVER-38320): Reschedule the stepup task if the interruption was not due to stepdown. - if (status == ErrorCodes::ShutdownInProgress || ErrorCodes::isInterruption(status.code())) { - return; + auto& catalog = catalogAndScheduler->catalog; + catalog.exitStepUp(status); + }); + + _catalogAndScheduler->recoveryTaskCompleted.emplace(std::move(future)); +} + +void TransactionCoordinatorService::onStepDown() { + { + stdx::lock_guard<stdx::mutex> lg(_mutex); + if (!_catalogAndScheduler) + return; + + _catalogAndSchedulerToCleanup = std::move(_catalogAndScheduler); } - invariant(status); + + _catalogAndSchedulerToCleanup->scheduler.shutdown( + {ErrorCodes::InterruptedDueToStepDown, "Transaction coordinator service stepping down"}); +} + +void TransactionCoordinatorService::onShardingInitialization(OperationContext* opCtx, + bool isPrimary) { + if (!isPrimary) + return; + + stdx::lock_guard<stdx::mutex> lg(_mutex); + + invariant(!_catalogAndScheduler); + _catalogAndScheduler = std::make_shared<CatalogAndScheduler>(opCtx->getServiceContext()); + + _catalogAndScheduler->catalog.exitStepUp(Status::OK()); + _catalogAndScheduler->recoveryTaskCompleted.emplace(Future<void>::makeReady()); } -void TransactionCoordinatorService::onStepDown() {} +std::shared_ptr<TransactionCoordinatorService::CatalogAndScheduler> +TransactionCoordinatorService::_getCatalogAndScheduler(OperationContext* opCtx) { + stdx::unique_lock<stdx::mutex> ul(_mutex); + uassert( + ErrorCodes::NotMaster, "Transaction coordinator is not a primary", _catalogAndScheduler); + + return _catalogAndScheduler; +} + +void TransactionCoordinatorService::_joinPreviousRound() { + // onStepDown must have been called + invariant(!_catalogAndScheduler); + + if (!_catalogAndSchedulerToCleanup) + return; + + LOG(0) << "Waiting for coordinator tasks from previous term to complete"; + + // Block until all coordinators scheduled the previous time the service was primary to have + // drained. Because the scheduler was interrupted, it should be extremely rare for there to be + // any coordinators left, so if this actually causes blocking, it would most likely be a bug. + _catalogAndSchedulerToCleanup->recoveryTaskCompleted->wait(); + _catalogAndSchedulerToCleanup->catalog.join(); + _catalogAndSchedulerToCleanup.reset(); +} } // namespace mongo diff --git a/src/mongo/db/transaction_coordinator_service.h b/src/mongo/db/transaction_coordinator_service.h index 115c7604bcc..a569dfdd617 100644 --- a/src/mongo/db/transaction_coordinator_service.h +++ b/src/mongo/db/transaction_coordinator_service.h @@ -90,12 +90,52 @@ public: * 2. Read all pending commit tasks from the config.transactionCoordinators collection. * 3. Create TransactionCoordinator objects in memory for each pending commit and launch an * async task to continue coordinating its commit. + * + * The 'recoveryDelay' argument is only used for testing in order to simulate recovery taking + * long time. */ - void onStepUp(OperationContext* opCtx); + void onStepUp(OperationContext* opCtx, Milliseconds recoveryDelayForTesting = Milliseconds(0)); void onStepDown(); + /** + * Called when an already established replica set is added as a shard to a cluster. Ensures that + * the TransactionCoordinator service is started up if the replica set is currently primary. + */ + void onShardingInitialization(OperationContext* opCtx, bool isPrimary); + private: - std::shared_ptr<TransactionCoordinatorCatalog> _coordinatorCatalog; + struct CatalogAndScheduler { + CatalogAndScheduler(ServiceContext* service) : scheduler(service) {} + + txn::AsyncWorkScheduler scheduler; + TransactionCoordinatorCatalog catalog; + + boost::optional<Future<void>> recoveryTaskCompleted; + }; + + /** + * Returns the current catalog + scheduler if stepUp has started, otherwise throws a NotMaster + * exception. + */ + std::shared_ptr<CatalogAndScheduler> _getCatalogAndScheduler(OperationContext* opCtx); + + /** + * Blocking call which waits for the previous stepUp/stepDown round to join and ensures all + * tasks scheduled by that round have completed. + */ + void _joinPreviousRound(); + + // Contains the catalog + scheduler, which was active at the last step-down attempt (if any). + // Set at onStepDown and destroyed at onStepUp, which are always invoked sequentially by the + // replication machinery, so there is no need to explicitly synchronize it + std::shared_ptr<CatalogAndScheduler> _catalogAndSchedulerToCleanup; + + // Protects the state below + mutable stdx::mutex _mutex; + + // The catalog + scheduler instantiated at the last step-up attempt. When nullptr, it means + // onStepUp has not been called yet after the last stepDown (or construction). + std::shared_ptr<CatalogAndScheduler> _catalogAndScheduler; }; } // namespace mongo diff --git a/src/mongo/db/transaction_coordinator_service_test.cpp b/src/mongo/db/transaction_coordinator_service_test.cpp index 7b8020f8622..115fb760b60 100644 --- a/src/mongo/db/transaction_coordinator_service_test.cpp +++ b/src/mongo/db/transaction_coordinator_service_test.cpp @@ -34,10 +34,12 @@ #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/transaction_coordinator_service.h" #include "mongo/db/transaction_coordinator_test_fixture.h" #include "mongo/db/write_concern_options.h" #include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" namespace mongo { namespace { @@ -62,7 +64,7 @@ const StatusWith<BSONObj> kPrepareOkButWriteConcernError = BSON("ok" << 1 << "prepareTimestamp" << Timestamp(1, 1) << "writeConcernError" << kDummyWriteConcernError); -class TransactionCoordinatorServiceTest : public TransactionCoordinatorTestFixture { +class TransactionCoordinatorServiceTestFixture : public TransactionCoordinatorTestFixture { public: // Prepare responses @@ -193,6 +195,107 @@ public: commitDecisionFuture.get(); } + auto* service() const { + return TransactionCoordinatorService::get(operationContext()); + } +}; + + +using TransactionCoordinatorServiceStepUpStepDownTest = TransactionCoordinatorServiceTestFixture; + +TEST_F(TransactionCoordinatorServiceStepUpStepDownTest, OperationsFailBeforeStepUpStarts) { + ASSERT_THROWS_CODE(service()->createCoordinator( + operationContext(), makeLogicalSessionIdForTest(), 0, kCommitDeadline), + AssertionException, + ErrorCodes::NotMaster); + + ASSERT_THROWS_CODE(service()->coordinateCommit( + operationContext(), makeLogicalSessionIdForTest(), 0, kTwoShardIdSet), + AssertionException, + ErrorCodes::NotMaster); + + ASSERT_THROWS_CODE( + service()->recoverCommit(operationContext(), makeLogicalSessionIdForTest(), 0), + AssertionException, + ErrorCodes::NotMaster); +} + +TEST_F(TransactionCoordinatorServiceStepUpStepDownTest, OperationsBlockBeforeStepUpCompletes) { + service()->onStepUp(operationContext(), Milliseconds(1)); + auto stepDownGuard = makeGuard([&] { service()->onStepDown(); }); + + ASSERT_THROWS_CODE(operationContext()->runWithDeadline( + Date_t::now() + Milliseconds{5}, + ErrorCodes::NetworkInterfaceExceededTimeLimit, + [&] { + return service()->coordinateCommit(operationContext(), + makeLogicalSessionIdForTest(), + 0, + kTwoShardIdSet); + }), + AssertionException, + ErrorCodes::NetworkInterfaceExceededTimeLimit); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(network()); + network()->advanceTime(network()->now() + Milliseconds(1)); + } + + ASSERT(service()->coordinateCommit( + operationContext(), makeLogicalSessionIdForTest(), 0, kTwoShardIdSet) == + boost::none); +} + +TEST_F(TransactionCoordinatorServiceStepUpStepDownTest, StepUpFailsDueToBadCoordinatorDocument) { + DBDirectClient client(operationContext()); + client.insert(NamespaceString::kTransactionCoordinatorsNamespace.ns(), BSON("IllegalKey" << 1)); + ASSERT_EQ("", client.getLastError()); + + service()->onStepUp(operationContext()); + auto stepDownGuard = makeGuard([&] { service()->onStepDown(); }); + + ASSERT_THROWS_CODE(service()->coordinateCommit( + operationContext(), makeLogicalSessionIdForTest(), 0, kTwoShardIdSet), + AssertionException, + ErrorCodes::TypeMismatch); + + ASSERT_THROWS_CODE( + service()->recoverCommit(operationContext(), makeLogicalSessionIdForTest(), 0), + AssertionException, + ErrorCodes::TypeMismatch); +} + +TEST_F(TransactionCoordinatorServiceStepUpStepDownTest, StepDownBeforeStepUpTaskCompleted) { + // Call step-up with 1ms delay (meaning it will not actually execute until time is manually + // advanced on the underlying executor) + service()->onStepUp(operationContext(), Milliseconds(1)); + + // Should cancel all outstanding tasks (including the recovery task started by onStepUp above, + // which has not yet run) + service()->onStepDown(); + + // Do another onStepUp to ensure it runs successfully + service()->onStepUp(operationContext()); + + // Step-down the service so that the destructor does not complain + service()->onStepDown(); +} + + +class TransactionCoordinatorServiceTest : public TransactionCoordinatorServiceTestFixture { +protected: + void setUp() override { + TransactionCoordinatorServiceTestFixture::setUp(); + + service()->onStepUp(operationContext()); + } + + void tearDown() override { + service()->onStepDown(); + + TransactionCoordinatorServiceTestFixture::tearDown(); + } + LogicalSessionId _lsid{makeLogicalSessionIdForTest()}; TxnNumber _txnNumber{1}; }; |