diff options
author | Randolph Tan <randolph@10gen.com> | 2019-06-27 16:02:28 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2019-07-16 13:44:09 -0400 |
commit | 2dfbaadb85f32869c75f87117d9f6e98b4e948ea (patch) | |
tree | 131ffaee362373775aadef9105db99337c5d7fa6 /src/mongo/db | |
parent | db2cfda750494cf1c2d0a235df945e335075b8e2 (diff) | |
download | mongo-2dfbaadb85f32869c75f87117d9f6e98b4e948ea.tar.gz |
SERVER-40785 Create WaitForMajorityService to allow waiting for write concern asynchronously
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/catalog/drop_database_test.cpp | 84 | ||||
-rw-r--r-- | src/mongo/db/db.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.h | 8 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator.cpp | 77 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_service.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_service.h | 12 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_service_test.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_test.cpp | 49 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_test_fixture.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_test_fixture.h | 1 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_util.cpp | 105 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_util.h | 23 | ||||
-rw-r--r-- | src/mongo/db/s/wait_for_majority_service.cpp | 188 | ||||
-rw-r--r-- | src/mongo/db/s/wait_for_majority_service.h | 100 | ||||
-rw-r--r-- | src/mongo/db/s/wait_for_majority_service_test.cpp | 247 |
17 files changed, 759 insertions, 176 deletions
diff --git a/src/mongo/db/catalog/drop_database_test.cpp b/src/mongo/db/catalog/drop_database_test.cpp index 4fe1d2bc098..2eb37e80a36 100644 --- a/src/mongo/db/catalog/drop_database_test.cpp +++ b/src/mongo/db/catalog/drop_database_test.cpp @@ -268,7 +268,7 @@ TEST_F(DropDatabaseTest, DropDatabaseWaitsForDropPendingCollectionOpTimeIfNoColl // Update ReplicationCoordinatorMock so that we record the optime passed to awaitReplication(). _replCoord->setAwaitReplicationReturnValueFunction( - [&clientLastOpTime, this](const repl::OpTime& opTime) { + [&clientLastOpTime, this](OperationContext*, const repl::OpTime& opTime) { clientLastOpTime = opTime; ASSERT_GREATER_THAN(clientLastOpTime, repl::OpTime()); return repl::ReplicationCoordinator::StatusAndDuration(Status::OK(), Milliseconds(0)); @@ -283,11 +283,12 @@ TEST_F(DropDatabaseTest, DropDatabaseWaitsForDropPendingCollectionOpTimeIfNoColl TEST_F(DropDatabaseTest, DropDatabasePassedThroughAwaitReplicationErrorForDropPendingCollection) { // Update ReplicationCoordinatorMock so that we record the optime passed to awaitReplication(). - _replCoord->setAwaitReplicationReturnValueFunction([this](const repl::OpTime& opTime) { - ASSERT_GREATER_THAN(opTime, repl::OpTime()); - return repl::ReplicationCoordinator::StatusAndDuration( - Status(ErrorCodes::WriteConcernFailed, ""), Milliseconds(0)); - }); + _replCoord->setAwaitReplicationReturnValueFunction( + [this](OperationContext*, const repl::OpTime& opTime) { + ASSERT_GREATER_THAN(opTime, repl::OpTime()); + return repl::ReplicationCoordinator::StatusAndDuration( + Status(ErrorCodes::WriteConcernFailed, ""), Milliseconds(0)); + }); repl::OpTime dropOpTime(Timestamp(Seconds(100), 0), 1LL); auto dpns = _nss.makeDropPendingNamespace(dropOpTime); @@ -342,7 +343,7 @@ void _testDropDatabaseResetsDropPendingStateIfAwaitReplicationFails(OperationCon TEST_F(DropDatabaseTest, DropDatabaseResetsDropPendingStateIfAwaitReplicationFailsAndDatabaseIsPresent) { // Update ReplicationCoordinatorMock so that awaitReplication() fails. - _replCoord->setAwaitReplicationReturnValueFunction([](const repl::OpTime&) { + _replCoord->setAwaitReplicationReturnValueFunction([](OperationContext*, const repl::OpTime&) { return repl::ReplicationCoordinator::StatusAndDuration( Status(ErrorCodes::WriteConcernFailed, ""), Milliseconds(0)); }); @@ -353,11 +354,12 @@ TEST_F(DropDatabaseTest, TEST_F(DropDatabaseTest, DropDatabaseResetsDropPendingStateIfAwaitReplicationFailsAndDatabaseIsMissing) { // Update ReplicationCoordinatorMock so that awaitReplication() fails. - _replCoord->setAwaitReplicationReturnValueFunction([this](const repl::OpTime&) { - _removeDatabaseFromCatalog(_opCtx.get(), _nss.db()); - return repl::ReplicationCoordinator::StatusAndDuration( - Status(ErrorCodes::WriteConcernFailed, ""), Milliseconds(0)); - }); + _replCoord->setAwaitReplicationReturnValueFunction( + [this](OperationContext*, const repl::OpTime&) { + _removeDatabaseFromCatalog(_opCtx.get(), _nss.db()); + return repl::ReplicationCoordinator::StatusAndDuration( + Status(ErrorCodes::WriteConcernFailed, ""), Milliseconds(0)); + }); _testDropDatabaseResetsDropPendingStateIfAwaitReplicationFails(_opCtx.get(), _nss, false); } @@ -368,15 +370,16 @@ TEST_F(DropDatabaseTest, // dropDatabase() should detect this and release the global lock temporarily if it needs to call // ReplicationCoordinator::awaitReplication(). bool isAwaitReplicationCalled = false; - _replCoord->setAwaitReplicationReturnValueFunction([&, this](const repl::OpTime& opTime) { - isAwaitReplicationCalled = true; - // This test does not set the client's last optime. - ASSERT_EQUALS(opTime, repl::OpTime()); - ASSERT_FALSE(_opCtx->lockState()->isW()); - ASSERT_FALSE(_opCtx->lockState()->isDbLockedForMode(_nss.db(), MODE_X)); - ASSERT_FALSE(_opCtx->lockState()->isLocked()); - return repl::ReplicationCoordinator::StatusAndDuration(Status::OK(), Milliseconds(0)); - }); + _replCoord->setAwaitReplicationReturnValueFunction( + [&, this](OperationContext*, const repl::OpTime& opTime) { + isAwaitReplicationCalled = true; + // This test does not set the client's last optime. + ASSERT_EQUALS(opTime, repl::OpTime()); + ASSERT_FALSE(_opCtx->lockState()->isW()); + ASSERT_FALSE(_opCtx->lockState()->isDbLockedForMode(_nss.db(), MODE_X)); + ASSERT_FALSE(_opCtx->lockState()->isLocked()); + return repl::ReplicationCoordinator::StatusAndDuration(Status::OK(), Milliseconds(0)); + }); { Lock::GlobalWrite lk(_opCtx.get()); @@ -392,14 +395,15 @@ TEST_F(DropDatabaseTest, // dropDatabase() should detect this and release the global lock temporarily if it needs to call // ReplicationCoordinator::awaitReplication(). bool isAwaitReplicationCalled = false; - _replCoord->setAwaitReplicationReturnValueFunction([&, this](const repl::OpTime& opTime) { - isAwaitReplicationCalled = true; - ASSERT_GREATER_THAN(opTime, repl::OpTime()); - ASSERT_FALSE(_opCtx->lockState()->isW()); - ASSERT_FALSE(_opCtx->lockState()->isDbLockedForMode(_nss.db(), MODE_X)); - ASSERT_FALSE(_opCtx->lockState()->isLocked()); - return repl::ReplicationCoordinator::StatusAndDuration(Status::OK(), Milliseconds(0)); - }); + _replCoord->setAwaitReplicationReturnValueFunction( + [&, this](OperationContext*, const repl::OpTime& opTime) { + isAwaitReplicationCalled = true; + ASSERT_GREATER_THAN(opTime, repl::OpTime()); + ASSERT_FALSE(_opCtx->lockState()->isW()); + ASSERT_FALSE(_opCtx->lockState()->isDbLockedForMode(_nss.db(), MODE_X)); + ASSERT_FALSE(_opCtx->lockState()->isLocked()); + return repl::ReplicationCoordinator::StatusAndDuration(Status::OK(), Milliseconds(0)); + }); repl::OpTime dropOpTime(Timestamp(Seconds(100), 0), 1LL); auto dpns = _nss.makeDropPendingNamespace(dropOpTime); @@ -416,10 +420,11 @@ TEST_F(DropDatabaseTest, TEST_F(DropDatabaseTest, DropDatabaseReturnsNamespaceNotFoundIfDatabaseIsRemovedAfterCollectionsDropsAreReplicated) { // Update ReplicationCoordinatorMock so that awaitReplication() fails. - _replCoord->setAwaitReplicationReturnValueFunction([this](const repl::OpTime&) { - _removeDatabaseFromCatalog(_opCtx.get(), _nss.db()); - return repl::ReplicationCoordinator::StatusAndDuration(Status::OK(), Milliseconds(0)); - }); + _replCoord->setAwaitReplicationReturnValueFunction( + [this](OperationContext*, const repl::OpTime&) { + _removeDatabaseFromCatalog(_opCtx.get(), _nss.db()); + return repl::ReplicationCoordinator::StatusAndDuration(Status::OK(), Milliseconds(0)); + }); _createCollection(_opCtx.get(), _nss); @@ -438,12 +443,13 @@ TEST_F(DropDatabaseTest, TEST_F(DropDatabaseTest, DropDatabaseReturnsNotMasterIfNotPrimaryAfterCollectionsDropsAreReplicated) { // Transition from PRIMARY to SECONDARY while awaiting replication of collection drops. - _replCoord->setAwaitReplicationReturnValueFunction([this](const repl::OpTime&) { - ASSERT_OK(_replCoord->setFollowerMode(repl::MemberState::RS_SECONDARY)); - ASSERT_TRUE(_opCtx->writesAreReplicated()); - ASSERT_FALSE(_replCoord->canAcceptWritesForDatabase(_opCtx.get(), _nss.db())); - return repl::ReplicationCoordinator::StatusAndDuration(Status::OK(), Milliseconds(0)); - }); + _replCoord->setAwaitReplicationReturnValueFunction( + [this](OperationContext*, const repl::OpTime&) { + ASSERT_OK(_replCoord->setFollowerMode(repl::MemberState::RS_SECONDARY)); + ASSERT_TRUE(_opCtx->writesAreReplicated()); + ASSERT_FALSE(_replCoord->canAcceptWritesForDatabase(_opCtx.get(), _nss.db())); + return repl::ReplicationCoordinator::StatusAndDuration(Status::OK(), Milliseconds(0)); + }); _createCollection(_opCtx.get(), _nss); diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 652355ee908..37082607dba 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -123,6 +123,7 @@ #include "mongo/db/s/shard_server_op_observer.h" #include "mongo/db/s/sharding_initialization_mongod.h" #include "mongo/db/s/sharding_state_recovery.h" +#include "mongo/db/s/wait_for_majority_service.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/db/service_entry_point_mongod.h" @@ -505,6 +506,8 @@ ExitCode _initAndListen(int listenPort) { << startupWarningsLog; } + WaitForMajorityService::get(serviceContext).setUp(serviceContext); + // This function may take the global lock. auto shardingInitialized = ShardingInitializationMongoD::get(startupOpCtx.get()) ->initializeShardingAwarenessIfNeeded(startupOpCtx.get()); @@ -904,6 +907,8 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) { } } + WaitForMajorityService::get(serviceContext).shutDown(); + // Terminate the balancer thread so it doesn't leak memory. if (auto balancer = Balancer::get(serviceContext)) { balancer->interruptBalancer(); diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 9194089e26e..32589b56952 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -138,7 +138,7 @@ void ReplicationCoordinatorMock::clearSyncSourceBlacklist() {} ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorMock::awaitReplication( OperationContext* opCtx, const OpTime& opTime, const WriteConcernOptions& writeConcern) { - return _awaitReplicationReturnValueFunction(opTime); + return _awaitReplicationReturnValueFunction(opCtx, opTime); } void ReplicationCoordinatorMock::setAwaitReplicationReturnValueFunction( diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 0a9e49f3e5e..62126adf068 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -287,9 +287,10 @@ public: /** * Sets the function to generate the return value for calls to awaitReplication(). - * 'opTime' is the optime passed to awaitReplication(). + * 'OperationContext' and 'opTime' are the parameters passed to awaitReplication(). */ - using AwaitReplicationReturnValueFunction = std::function<StatusAndDuration(const OpTime&)>; + using AwaitReplicationReturnValueFunction = + std::function<StatusAndDuration(OperationContext*, const OpTime&)>; void setAwaitReplicationReturnValueFunction( AwaitReplicationReturnValueFunction returnValueFunction); @@ -325,7 +326,8 @@ private: OpTime _myLastAppliedOpTime; Date_t _myLastAppliedWallTime; ReplSetConfig _getConfigReturnValue; - AwaitReplicationReturnValueFunction _awaitReplicationReturnValueFunction = [](const OpTime&) { + AwaitReplicationReturnValueFunction _awaitReplicationReturnValueFunction = [](OperationContext*, + const OpTime&) { return StatusAndDuration(Status::OK(), Milliseconds(0)); }; bool _alwaysAllowWrites = false; diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 41588ca7eb8..528b880d93c 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -132,6 +132,7 @@ env.Library( 'transaction_coordinator_structures.cpp', 'transaction_coordinator_util.cpp', 'transaction_coordinator.cpp', + 'wait_for_majority_service.cpp', env.Idlc('transaction_coordinator_document.idl')[0], env.Idlc('transaction_coordinators_stats.idl')[0], ], @@ -327,6 +328,7 @@ env.CppUnitTest( 'sharding_logging_test.cpp', 'start_chunk_clone_request_test.cpp', 'type_shard_identity_test.cpp', + 'wait_for_majority_service_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authmocks', diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp index e9c59b0b17b..3085db606e4 100644 --- a/src/mongo/db/s/transaction_coordinator.cpp +++ b/src/mongo/db/s/transaction_coordinator.cpp @@ -35,7 +35,10 @@ #include "mongo/db/logical_clock.h" #include "mongo/db/s/transaction_coordinator_metrics_observer.h" +#include "mongo/db/s/wait_for_majority_service.h" #include "mongo/db/server_options.h" +#include "mongo/s/grid.h" +#include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" namespace mongo { @@ -46,6 +49,26 @@ using CoordinatorCommitDecision = txn::CoordinatorCommitDecision; using PrepareVoteConsensus = txn::PrepareVoteConsensus; using TransactionCoordinatorDocument = txn::TransactionCoordinatorDocument; +MONGO_FAIL_POINT_DEFINE(hangBeforeWaitingForParticipantListWriteConcern); +MONGO_FAIL_POINT_DEFINE(hangBeforeWaitingForDecisionWriteConcern); + +void hangIfFailPointEnabled(ServiceContext* service, + FailPoint& failpoint, + const StringData& failPointName) { + MONGO_FAIL_POINT_BLOCK(failpoint, fp) { + LOG(0) << "Hit " << failPointName << " failpoint"; + const BSONObj& data = fp.getData(); + if (!data["useUninterruptibleSleep"].eoo()) { + MONGO_FAIL_POINT_PAUSE_WHILE_SET(failpoint); + } else { + ThreadClient tc(failPointName, service); + auto opCtx = tc->makeOperationContext(); + + MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx.get(), failpoint); + } + } +} + } // namespace TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext, @@ -112,17 +135,25 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext, _serviceContext->getPreciseClockSource()->now()); if (_participantsDurable) - return Future<void>::makeReady(); + return Future<repl::OpTime>::makeReady(repl::OpTime()); } return txn::persistParticipantsList( - *_sendPrepareScheduler, _lsid, _txnNumber, *_participants) - .then([this] { - stdx::lock_guard<stdx::mutex> lg(_mutex); - _participantsDurable = true; - }); + *_sendPrepareScheduler, _lsid, _txnNumber, *_participants); }) + .then([this](repl::OpTime opTime) { + hangIfFailPointEnabled(_serviceContext, + hangBeforeWaitingForParticipantListWriteConcern, + "hangBeforeWaitingForParticipantListWriteConcern"); + return WaitForMajorityService::get(_serviceContext).waitUntilMajority(opTime); + }) + .thenRunOn(Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor()) .then([this] { + { + stdx::lock_guard<stdx::mutex> lg(_mutex); + _participantsDurable = true; + } + // Send prepare to the participants, unless this has already been done (which would only // be the case if this coordinator was created as part of step-up recovery and the // recovery document contained a decision). @@ -184,16 +215,24 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext, _serviceContext->getPreciseClockSource()->now()); if (_decisionDurable) - return Future<void>::makeReady(); + return Future<repl::OpTime>::makeReady(repl::OpTime()); } - return txn::persistDecision(*_scheduler, _lsid, _txnNumber, *_participants, *_decision) - .then([this] { - stdx::lock_guard<stdx::mutex> lg(_mutex); - _decisionDurable = true; - }); + return txn::persistDecision(*_scheduler, _lsid, _txnNumber, *_participants, *_decision); + }) + .then([this](repl::OpTime opTime) { + hangIfFailPointEnabled(_serviceContext, + hangBeforeWaitingForDecisionWriteConcern, + "hangBeforeWaitingForDecisionWriteConcern"); + return WaitForMajorityService::get(_serviceContext).waitUntilMajority(opTime); }) + .thenRunOn(Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor()) .then([this] { + { + stdx::lock_guard<stdx::mutex> lg(_mutex); + _decisionDurable = true; + } + // Send the commit/abort decision to the participants. // Input: _decisionDurable // Output: (none) @@ -249,13 +288,13 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext, _scheduler->shutdown( {ErrorCodes::TransactionCoordinatorDeadlineTaskCanceled, "Coordinator completed"}); - return std::move(deadlineFuture).onCompletion([s = std::move(s)](Status) { return s; }); - }) - .getAsync([this](Status s) { - // Notify all the listeners which are interested in the coordinator's lifecycle. After - // this call, the coordinator object could potentially get destroyed by its lifetime - // controller, so there shouldn't be any accesses to `this` after this call. - _done(s); + return std::move(deadlineFuture).onCompletion([ this, s = std::move(s) ](Status) { + // Notify all the listeners which are interested in the coordinator's lifecycle. + // After this call, the coordinator object could potentially get destroyed by its + // lifetime controller, so there shouldn't be any accesses to `this` after this + // call. + _done(s); + }); }); } diff --git a/src/mongo/db/s/transaction_coordinator_service.cpp b/src/mongo/db/s/transaction_coordinator_service.cpp index 2e93ca54435..dac4caee608 100644 --- a/src/mongo/db/s/transaction_coordinator_service.cpp +++ b/src/mongo/db/s/transaction_coordinator_service.cpp @@ -51,7 +51,7 @@ const auto transactionCoordinatorServiceDecoration = TransactionCoordinatorService::TransactionCoordinatorService() = default; TransactionCoordinatorService::~TransactionCoordinatorService() { - _joinPreviousRound(); + joinPreviousRound(); } TransactionCoordinatorService* TransactionCoordinatorService::get(OperationContext* opCtx) { @@ -137,7 +137,7 @@ boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::reco void TransactionCoordinatorService::onStepUp(OperationContext* opCtx, Milliseconds recoveryDelayForTesting) { - _joinPreviousRound(); + joinPreviousRound(); stdx::lock_guard<stdx::mutex> lg(_mutex); invariant(!_catalogAndScheduler); @@ -235,7 +235,7 @@ TransactionCoordinatorService::_getCatalogAndScheduler(OperationContext* opCtx) return _catalogAndScheduler; } -void TransactionCoordinatorService::_joinPreviousRound() { +void TransactionCoordinatorService::joinPreviousRound() { // onStepDown must have been called invariant(!_catalogAndScheduler); diff --git a/src/mongo/db/s/transaction_coordinator_service.h b/src/mongo/db/s/transaction_coordinator_service.h index c1dcb9550e7..11561a7e648 100644 --- a/src/mongo/db/s/transaction_coordinator_service.h +++ b/src/mongo/db/s/transaction_coordinator_service.h @@ -109,6 +109,12 @@ public: LogicalSessionId lsid, TxnNumber txnNumber); + /** + * Blocking call which waits for the previous stepUp/stepDown round to join and ensures all + * tasks scheduled by that round have completed. + */ + void joinPreviousRound(); + private: struct CatalogAndScheduler { CatalogAndScheduler(ServiceContext* service) : scheduler(service) {} @@ -128,12 +134,6 @@ private: */ std::shared_ptr<CatalogAndScheduler> _getCatalogAndScheduler(OperationContext* opCtx); - /** - * Blocking call which waits for the previous stepUp/stepDown round to join and ensures all - * tasks scheduled by that round have completed. - */ - void _joinPreviousRound(); - // Contains the catalog + scheduler, which was active at the last step-down attempt (if any). // Set at onStepDown and destroyed at onStepUp, which are always invoked sequentially by the // replication machinery, so there is no need to explicitly synchronize it diff --git a/src/mongo/db/s/transaction_coordinator_service_test.cpp b/src/mongo/db/s/transaction_coordinator_service_test.cpp index 128a574c79c..6ce22953437 100644 --- a/src/mongo/db/s/transaction_coordinator_service_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_service_test.cpp @@ -192,7 +192,6 @@ protected: } }; - using TransactionCoordinatorServiceStepUpStepDownTest = TransactionCoordinatorServiceTestFixture; TEST_F(TransactionCoordinatorServiceStepUpStepDownTest, OperationsFailBeforeStepUpStarts) { @@ -273,7 +272,6 @@ TEST_F(TransactionCoordinatorServiceStepUpStepDownTest, StepDownBeforeStepUpTask service()->onStepDown(); } - class TransactionCoordinatorServiceTest : public TransactionCoordinatorServiceTestFixture { protected: void setUp() override { @@ -284,6 +282,7 @@ protected: void tearDown() override { service()->onStepDown(); + service()->joinPreviousRound(); TransactionCoordinatorServiceTestFixture::tearDown(); } @@ -341,8 +340,6 @@ TEST_F(TransactionCoordinatorServiceTest, // commit acks. coordinatorService->createCoordinator( operationContext(), _lsid, _txnNumber + 1, kCommitDeadline); - auto newTxnCommitDecisionFuture = coordinatorService->coordinateCommit( - operationContext(), _lsid, _txnNumber + 1, kTwoShardIdSet); // Finish committing the old transaction by sending it commit acks from both participants. assertCommitSentAndRespondWithSuccess(); @@ -351,6 +348,10 @@ TEST_F(TransactionCoordinatorServiceTest, // The old transaction should now be committed. ASSERT_EQ(static_cast<int>(oldTxnCommitDecisionFuture.get()), static_cast<int>(txn::CommitDecision::kCommit)); + + auto newTxnCommitDecisionFuture = coordinatorService->coordinateCommit( + operationContext(), _lsid, _txnNumber + 1, kTwoShardIdSet); + commitTransaction(*coordinatorService, _lsid, _txnNumber + 1, kTwoShardIdSet); } @@ -703,19 +704,12 @@ TEST_F(TransactionCoordinatorServiceTestSingleTxn, } TEST_F(TransactionCoordinatorServiceTestSingleTxn, - CoordinateCommitWithNoVotesReturnsNotReadyFuture) { + CoordinateCommitReturnsCorrectCommitDecisionOnCommit) { auto commitDecisionFuture = *coordinatorService()->coordinateCommit( operationContext(), _lsid, _txnNumber, kTwoShardIdSet); ASSERT_FALSE(commitDecisionFuture.isReady()); -} - -TEST_F(TransactionCoordinatorServiceTestSingleTxn, - CoordinateCommitReturnsCorrectCommitDecisionOnCommit) { - - auto commitDecisionFuture = *coordinatorService()->coordinateCommit( - operationContext(), _lsid, _txnNumber, kTwoShardIdSet); assertPrepareSentAndRespondWithSuccess(); assertPrepareSentAndRespondWithSuccess(); diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp index ddf0bb96fa6..32134a82a71 100644 --- a/src/mongo/db/s/transaction_coordinator_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_test.cpp @@ -66,6 +66,31 @@ StatusWith<BSONObj> makePrepareOkResponse(const Timestamp& timestamp) { const StatusWith<BSONObj> kPrepareOk = makePrepareOkResponse(kDummyPrepareTimestamp); const StatusWith<BSONObj> kPrepareOkNoTimestamp = BSON("ok" << 1); +/** + * Searches for a client matching the name and mark the operation context as killed. + */ +void killClientOpCtx(ServiceContext* service, const std::string& clientName) { + for (int retries = 0; retries < 20; retries++) { + for (ServiceContext::LockedClientsCursor cursor(service); auto client = cursor.next();) { + invariant(client); + + stdx::lock_guard lk(*client); + if (client->desc() == clientName) { + if (auto opCtx = client->getOperationContext()) { + opCtx->getServiceContext()->killOperation( + lk, opCtx, ErrorCodes::InterruptedAtShutdown); + return; + } + } + } + + sleepmillis(50); + } + + error() << "Timed out trying to find and kill client opCtx with name: " << clientName; + ASSERT_FALSE(true); +} + class TransactionCoordinatorTestBase : public TransactionCoordinatorTestFixture { protected: void assertPrepareSentAndRespondWithSuccess() { @@ -566,8 +591,7 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, // We should retry until shutdown. The original participants should be persisted. std::vector<ShardId> smallerParticipantList{ShardId("shard0001"), ShardId("shard0002")}; - Future<void> future = - txn::persistParticipantsList(*_aws, _lsid, _txnNumber, smallerParticipantList); + auto future = txn::persistParticipantsList(*_aws, _lsid, _txnNumber, smallerParticipantList); _aws->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Shutdown for test"}); advanceClockAndExecuteScheduledTasks(); @@ -619,11 +643,12 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, TEST_F(TransactionCoordinatorDriverPersistenceTest, PersistCommitDecisionWhenNoDocumentForTransactionExistsCanBeInterruptedAndReturnsError) { - Future<void> future = txn::persistDecision(*_aws, _lsid, _txnNumber, _participants, [&] { - txn::CoordinatorCommitDecision decision(txn::CommitDecision::kCommit); - decision.setCommitTimestamp(_commitTimestamp); - return decision; - }()); + Future<repl::OpTime> future = + txn::persistDecision(*_aws, _lsid, _txnNumber, _participants, [&] { + txn::CoordinatorCommitDecision decision(txn::CommitDecision::kCommit); + decision.setCommitTimestamp(_commitTimestamp); + return decision; + }()); _aws->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Shutdown for test"}); ASSERT_THROWS_CODE( @@ -1078,6 +1103,10 @@ public: assertCommitSentAndRespondWithSuccess(); stopCapturingLogMessages(); + + // Properly wait for the coordinator to finish all asynchronous tasks. + auto future = coordinator.onCompletion(); + future.getNoThrow().ignore(); } }; @@ -1624,7 +1653,6 @@ TEST_F(TransactionCoordinatorMetricsTest, expectedMetrics.totalCreated++; auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()); - auto awsPtr = aws.get(); TransactionCoordinator coordinator( getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); const auto& stats = @@ -1657,7 +1685,7 @@ TEST_F(TransactionCoordinatorMetricsTest, *expectedStats.writingParticipantListDuration + Microseconds(100); expectedMetrics.currentWritingParticipantList--; - awsPtr->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "dummy"}); + killClientOpCtx(getServiceContext(), "hangBeforeWaitingForParticipantListWriteConcern"); coordinator.onCompletion().get(); checkStats(stats, expectedStats); @@ -1746,7 +1774,6 @@ TEST_F(TransactionCoordinatorMetricsTest, expectedMetrics.totalCreated++; auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()); - auto awsPtr = aws.get(); TransactionCoordinator coordinator( getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); const auto& stats = @@ -1784,7 +1811,7 @@ TEST_F(TransactionCoordinatorMetricsTest, *expectedStats.writingDecisionDuration + Microseconds(100); expectedMetrics.currentWritingDecision--; - awsPtr->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "dummy"}); + killClientOpCtx(getServiceContext(), "hangBeforeWaitingForDecisionWriteConcern"); coordinator.onCompletion().get(); checkStats(stats, expectedStats); diff --git a/src/mongo/db/s/transaction_coordinator_test_fixture.cpp b/src/mongo/db/s/transaction_coordinator_test_fixture.cpp index bced1fcb575..6554461dd97 100644 --- a/src/mongo/db/s/transaction_coordinator_test_fixture.cpp +++ b/src/mongo/db/s/transaction_coordinator_test_fixture.cpp @@ -37,6 +37,7 @@ #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" #include "mongo/db/operation_context.h" +#include "mongo/db/s/wait_for_majority_service.h" #include "mongo/s/catalog/sharding_catalog_client_mock.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/unittest/unittest.h" @@ -64,6 +65,13 @@ void TransactionCoordinatorTestFixture::setUp() { uassertStatusOK(shardRegistry()->getShard(operationContext(), shardId))->getTargeter()); shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId)); } + + WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); +} + +void TransactionCoordinatorTestFixture::tearDown() { + WaitForMajorityService::get(getServiceContext()).shutDown(); + ShardServerTestFixture::tearDown(); } std::unique_ptr<ShardingCatalogClient> TransactionCoordinatorTestFixture::makeShardingCatalogClient( diff --git a/src/mongo/db/s/transaction_coordinator_test_fixture.h b/src/mongo/db/s/transaction_coordinator_test_fixture.h index 981f4bdb046..5f1067a9320 100644 --- a/src/mongo/db/s/transaction_coordinator_test_fixture.h +++ b/src/mongo/db/s/transaction_coordinator_test_fixture.h @@ -45,6 +45,7 @@ namespace mongo { class TransactionCoordinatorTestFixture : public ShardServerTestFixture { protected: void setUp() override; + void tearDown() override; /** * Override the CatalogClient to make CatalogClient::getAllShards automatically return the diff --git a/src/mongo/db/s/transaction_coordinator_util.cpp b/src/mongo/db/s/transaction_coordinator_util.cpp index 5baab08bc83..0d3bde20418 100644 --- a/src/mongo/db/s/transaction_coordinator_util.cpp +++ b/src/mongo/db/s/transaction_coordinator_util.cpp @@ -48,8 +48,6 @@ namespace mongo { namespace txn { namespace { -MONGO_FAIL_POINT_DEFINE(hangBeforeWaitingForParticipantListWriteConcern); -MONGO_FAIL_POINT_DEFINE(hangBeforeWaitingForDecisionWriteConcern); MONGO_FAIL_POINT_DEFINE(hangAfterDeletingCoordinatorDoc); MONGO_FAIL_POINT_DEFINE(hangBeforeWritingParticipantList); @@ -58,10 +56,6 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeDeletingCoordinatorDoc); using ResponseStatus = executor::TaskExecutor::ResponseStatus; -const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, - WriteConcernOptions::SyncMode::UNSET, - WriteConcernOptions::kNoTimeout); - const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max()); const ReadPreferenceSetting kPrimaryReadPreference{ReadPreference::PrimaryOnly}; @@ -92,7 +86,8 @@ std::string buildParticipantListString(const std::vector<ShardId>& participantLi return ss.str(); } -bool shouldRetryPersistingCoordinatorState(const Status& responseStatus) { +template <typename T> +bool shouldRetryPersistingCoordinatorState(const StatusWith<T>& responseStatus) { return !responseStatus.isOK() && responseStatus != ErrorCodes::TransactionCoordinatorSteppingDown; } @@ -100,10 +95,10 @@ bool shouldRetryPersistingCoordinatorState(const Status& responseStatus) { } // namespace namespace { -void persistParticipantListBlocking(OperationContext* opCtx, - const LogicalSessionId& lsid, - TxnNumber txnNumber, - const std::vector<ShardId>& participantList) { +repl::OpTime persistParticipantListBlocking(OperationContext* opCtx, + const LogicalSessionId& lsid, + TxnNumber txnNumber, + const std::vector<ShardId>& participantList) { LOG(3) << "Going to write participant list for " << lsid.getId() << ':' << txnNumber; if (MONGO_FAIL_POINT(hangBeforeWritingParticipantList)) { @@ -173,37 +168,21 @@ void persistParticipantListBlocking(OperationContext* opCtx, LOG(3) << "Wrote participant list for " << lsid.getId() << ':' << txnNumber; - MONGO_FAIL_POINT_BLOCK(hangBeforeWaitingForParticipantListWriteConcern, fp) { - LOG(0) << "Hit hangBeforeWaitingForParticipantListWriteConcern failpoint"; - const BSONObj& data = fp.getData(); - if (!data["useUninterruptibleSleep"].eoo()) { - MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeWaitingForParticipantListWriteConcern); - } else { - MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED( - opCtx, hangBeforeWaitingForParticipantListWriteConcern); - } - } - - WriteConcernResult unusedWCResult; - uassertStatusOK( - waitForWriteConcern(opCtx, - repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), - kMajorityWriteConcern, - &unusedWCResult)); + return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); } } // namespace -Future<void> persistParticipantsList(txn::AsyncWorkScheduler& scheduler, - const LogicalSessionId& lsid, - TxnNumber txnNumber, - const txn::ParticipantsList& participants) { +Future<repl::OpTime> persistParticipantsList(txn::AsyncWorkScheduler& scheduler, + const LogicalSessionId& lsid, + TxnNumber txnNumber, + const txn::ParticipantsList& participants) { return txn::doWhile( scheduler, boost::none /* no need for a backoff */, - [](const Status& s) { return shouldRetryPersistingCoordinatorState(s); }, + [](const StatusWith<repl::OpTime>& s) { return shouldRetryPersistingCoordinatorState(s); }, [&scheduler, lsid, txnNumber, participants] { return scheduler.scheduleWork([lsid, txnNumber, participants](OperationContext* opCtx) { - persistParticipantListBlocking(opCtx, lsid, txnNumber, participants); + return persistParticipantListBlocking(opCtx, lsid, txnNumber, participants); }); }); } @@ -286,11 +265,11 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service, } namespace { -void persistDecisionBlocking(OperationContext* opCtx, - const LogicalSessionId& lsid, - TxnNumber txnNumber, - const std::vector<ShardId>& participantList, - const txn::CoordinatorCommitDecision& decision) { +repl::OpTime persistDecisionBlocking(OperationContext* opCtx, + const LogicalSessionId& lsid, + TxnNumber txnNumber, + const std::vector<ShardId>& participantList, + const txn::CoordinatorCommitDecision& decision) { const bool isCommit = decision.getDecision() == txn::CommitDecision::kCommit; LOG(3) << "Going to write decision " << (isCommit ? "commit" : "abort") << " for " << lsid.getId() << ':' << txnNumber; @@ -368,41 +347,25 @@ void persistDecisionBlocking(OperationContext* opCtx, LOG(3) << "Wrote decision " << (isCommit ? "commit" : "abort") << " for " << lsid.getId() << ':' << txnNumber; - MONGO_FAIL_POINT_BLOCK(hangBeforeWaitingForDecisionWriteConcern, fp) { - LOG(0) << "Hit hangBeforeWaitingForDecisionWriteConcern failpoint"; - const BSONObj& data = fp.getData(); - if (!data["useUninterruptibleSleep"].eoo()) { - MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeWaitingForDecisionWriteConcern); - } else { - MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED( - opCtx, hangBeforeWaitingForDecisionWriteConcern); - } - } - - WriteConcernResult unusedWCResult; - uassertStatusOK( - waitForWriteConcern(opCtx, - repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), - kMajorityWriteConcern, - &unusedWCResult)); + return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); } } // namespace -Future<void> persistDecision(txn::AsyncWorkScheduler& scheduler, - const LogicalSessionId& lsid, - TxnNumber txnNumber, - const txn::ParticipantsList& participants, - const txn::CoordinatorCommitDecision& decision) { - return txn::doWhile(scheduler, - boost::none /* no need for a backoff */, - [](const Status& s) { return shouldRetryPersistingCoordinatorState(s); }, - [&scheduler, lsid, txnNumber, participants, decision] { - return scheduler.scheduleWork( - [lsid, txnNumber, participants, decision](OperationContext* opCtx) { - persistDecisionBlocking( - opCtx, lsid, txnNumber, participants, decision); - }); - }); +Future<repl::OpTime> persistDecision(txn::AsyncWorkScheduler& scheduler, + const LogicalSessionId& lsid, + TxnNumber txnNumber, + const txn::ParticipantsList& participants, + const txn::CoordinatorCommitDecision& decision) { + return txn::doWhile( + scheduler, + boost::none /* no need for a backoff */, + [](const StatusWith<repl::OpTime>& s) { return shouldRetryPersistingCoordinatorState(s); }, + [&scheduler, lsid, txnNumber, participants, decision] { + return scheduler.scheduleWork( + [lsid, txnNumber, participants, decision](OperationContext* opCtx) { + return persistDecisionBlocking(opCtx, lsid, txnNumber, participants, decision); + }); + }); } Future<void> sendCommit(ServiceContext* service, diff --git a/src/mongo/db/s/transaction_coordinator_util.h b/src/mongo/db/s/transaction_coordinator_util.h index d6e87bb4baf..b83afda06e1 100644 --- a/src/mongo/db/s/transaction_coordinator_util.h +++ b/src/mongo/db/s/transaction_coordinator_util.h @@ -31,6 +31,7 @@ #include <vector> +#include "mongo/db/repl/optime.h" #include "mongo/db/s/transaction_coordinator_document_gen.h" #include "mongo/db/s/transaction_coordinator_futures_util.h" @@ -45,17 +46,17 @@ namespace txn { * participants: ["shard0000", "shard0001"] * } * - * into config.transaction_coordinators and waits for the upsert to be majority-committed. + * into config.transaction_coordinators and returns the opTime of the upsert. * * Throws if the upsert fails or waiting for writeConcern fails. * * If the upsert returns a DuplicateKey error, converts it to an anonymous error, because it means a * document for the (lsid, txnNumber) exists with a different participant list. */ -Future<void> persistParticipantsList(txn::AsyncWorkScheduler& scheduler, - const LogicalSessionId& lsid, - TxnNumber txnNumber, - const txn::ParticipantsList& participants); +Future<repl::OpTime> persistParticipantsList(txn::AsyncWorkScheduler& scheduler, + const LogicalSessionId& lsid, + TxnNumber txnNumber, + const txn::ParticipantsList& participants); struct PrepareResponse; class PrepareVoteConsensus { @@ -120,7 +121,7 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service, * commitTimestamp: Timestamp(xxxxxxxx, x), * } * - * and waits for the update to be majority-committed. + * Returns the opTime of the write. * * Throws if the update fails or waiting for writeConcern fails. * @@ -128,11 +129,11 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service, * means either no document for (lsid, txnNumber) exists, or a document exists but has a different * participant list, different decision, or different commit Timestamp. */ -Future<void> persistDecision(txn::AsyncWorkScheduler& scheduler, - const LogicalSessionId& lsid, - TxnNumber txnNumber, - const txn::ParticipantsList& participants, - const txn::CoordinatorCommitDecision& decision); +Future<repl::OpTime> persistDecision(txn::AsyncWorkScheduler& scheduler, + const LogicalSessionId& lsid, + TxnNumber txnNumber, + const txn::ParticipantsList& participants, + const txn::CoordinatorCommitDecision& decision); /** * Sends commit to all shards and returns a future that will be resolved when all participants have diff --git a/src/mongo/db/s/wait_for_majority_service.cpp b/src/mongo/db/s/wait_for_majority_service.cpp new file mode 100644 index 00000000000..0625a84b611 --- /dev/null +++ b/src/mongo/db/s/wait_for_majority_service.cpp @@ -0,0 +1,188 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/wait_for_majority_service.h" + +#include <utility> + +#include "mongo/db/service_context.h" +#include "mongo/db/write_concern.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/util/concurrency/thread_pool.h" + +namespace mongo { + +namespace { +const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, + WriteConcernOptions::SyncMode::UNSET, + WriteConcernOptions::kNoTimeout); + +const auto waitForMajorityServiceDecoration = + ServiceContext::declareDecoration<WaitForMajorityService>(); +} // namespace + +WaitForMajorityService::~WaitForMajorityService() { + shutDown(); +} + +WaitForMajorityService& WaitForMajorityService::get(ServiceContext* service) { + return waitForMajorityServiceDecoration(service); +} + +void WaitForMajorityService::setUp(ServiceContext* service) { + stdx::lock_guard lk(_mutex); + + if (!_thread.joinable() && !_inShutDown) { + _thread = stdx::thread([this, service] { _periodicallyWaitForMajority(service); }); + } +} + +void WaitForMajorityService::shutDown() { + { + stdx::lock_guard lk(_mutex); + + if (std::exchange(_inShutDown, true)) { + return; + } + + if (_opCtx) { + stdx::lock_guard scopedClientLock(*_opCtx->getClient()); + _opCtx->getServiceContext()->killOperation( + scopedClientLock, _opCtx, ErrorCodes::InterruptedAtShutdown); + } + } + + if (_thread.joinable()) { + _thread.join(); + } + + stdx::lock_guard lk(_mutex); + for (auto&& pendingRequest : _queuedOpTimes) { + pendingRequest.second.setError( + {ErrorCodes::InterruptedAtShutdown, "Shutting down wait for majority service"}); + } + + _queuedOpTimes.clear(); +} + +SharedSemiFuture<void> WaitForMajorityService::waitUntilMajority(const repl::OpTime& opTime) { + stdx::lock_guard lk(_mutex); + + if (_inShutDown) { + return {Future<void>::makeReady( + Status{ErrorCodes::ShutdownInProgress, + "rejecting wait for majority request due to server shutdown"})}; + } + + // Background thread must be running before requesting. + invariant(_thread.joinable()); + + if (_lastOpTimeWaited >= opTime) { + return {Future<void>::makeReady()}; + } + + auto iter = _queuedOpTimes.lower_bound(opTime); + if (iter != _queuedOpTimes.end()) { + if (iter->first == opTime) { + return iter->second.getFuture(); + } + } + + if (iter == _queuedOpTimes.begin()) { + // Background thread could already be actively waiting on a later time, so tell it to stop + // and wait for the newly requested opTime instead. + if (_opCtx) { + stdx::lock_guard scopedClientLock(*_opCtx->getClient()); + _opCtx->getServiceContext()->killOperation( + scopedClientLock, _opCtx, ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable); + } + } + + const bool wasEmpty = _queuedOpTimes.empty(); + auto resultIter = _queuedOpTimes.emplace_hint( + iter, std::piecewise_construct, std::forward_as_tuple(opTime), std::forward_as_tuple()); + + if (wasEmpty) { + _hasNewOpTimeCV.notify_one(); + } + + return resultIter->second.getFuture(); +} + +void WaitForMajorityService::_periodicallyWaitForMajority(ServiceContext* service) { + ThreadClient tc("waitForMajority", service); + + stdx::unique_lock lk(_mutex); + + while (!_inShutDown) { + auto opCtx = tc->makeOperationContext(); + _opCtx = opCtx.get(); + + if (!_queuedOpTimes.empty()) { + auto lowestOpTimeIter = _queuedOpTimes.begin(); + auto lowestOpTime = lowestOpTimeIter->first; + + lk.unlock(); + + WriteConcernResult ignoreResult; + auto status = + waitForWriteConcern(_opCtx, lowestOpTime, kMajorityWriteConcern, &ignoreResult); + + lk.lock(); + + if (status.isOK()) { + _lastOpTimeWaited = lowestOpTime; + } + + if (status == ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable) { + _opCtx = nullptr; + continue; + } + + if (status.isOK()) { + lowestOpTimeIter->second.emplaceValue(); + } else { + lowestOpTimeIter->second.setError(status); + } + + _queuedOpTimes.erase(lowestOpTimeIter); + } + + if (_queuedOpTimes.empty() && !_inShutDown) { + _opCtx->waitForConditionOrInterruptNoAssert(_hasNewOpTimeCV, lk).ignore(); + } + + _opCtx = nullptr; + } +} + +} // namespace mongo diff --git a/src/mongo/db/s/wait_for_majority_service.h b/src/mongo/db/s/wait_for_majority_service.h new file mode 100644 index 00000000000..970b475d0d3 --- /dev/null +++ b/src/mongo/db/s/wait_for_majority_service.h @@ -0,0 +1,100 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <map> +#include <memory> +#include <vector> + +#include "mongo/db/repl/optime.h" +#include "mongo/db/service_context.h" +#include "mongo/executor/task_executor.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/thread.h" +#include "mongo/util/future.h" + +namespace mongo { + +/** + * Provides a facility for asynchronously waiting a local opTime to be majority committed. + */ +class WaitForMajorityService { +public: + ~WaitForMajorityService(); + + static WaitForMajorityService& get(ServiceContext* service); + + /** + * Sets up the background thread responsible for waiting for opTimes to be majority committed. + */ + void setUp(ServiceContext* service); + + /** + * Blocking method, which shuts down and joins the background thread. + */ + void shutDown(); + + /** + * Enqueue a request to wait for the given opTime to be majority committed. + */ + SharedSemiFuture<void> waitUntilMajority(const repl::OpTime& opTime); + +private: + using OpTimeWaitingMap = std::map<repl::OpTime, SharedPromise<void>>; + + /** + * Periodically checks the list of opTimes to wait for majority committed. + */ + void _periodicallyWaitForMajority(ServiceContext* service); + + stdx::mutex _mutex; + + // Contains an ordered list of opTimes to wait to be majority comitted. + OpTimeWaitingMap _queuedOpTimes; + + // Contains the last opTime that the background thread was able to successfully wait to be + // majority comitted. + repl::OpTime _lastOpTimeWaited; + + // The background thread. + stdx::thread _thread; + + // Use for signalling new opTime requests being queued. + stdx::condition_variable _hasNewOpTimeCV; + + // If set, contains a reference to the opCtx being used by the background thread. + // Only valid when _thread.joinable() and not nullptr. + OperationContext* _opCtx{nullptr}; + + // Flag is set to true after shutDown() is called. + bool _inShutDown{false}; +}; + +} // namespace mongo diff --git a/src/mongo/db/s/wait_for_majority_service_test.cpp b/src/mongo/db/s/wait_for_majority_service_test.cpp new file mode 100644 index 00000000000..27e9c5ac75b --- /dev/null +++ b/src/mongo/db/s/wait_for_majority_service_test.cpp @@ -0,0 +1,247 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/s/wait_for_majority_service.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/stdx/mutex.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +class WaitForMajorityServiceTest : public ServiceContextTest { +public: + void setUp() override { + auto service = getServiceContext(); + waitService()->setUp(service); + + auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(service); + + replCoord->setAwaitReplicationReturnValueFunction([this](OperationContext* opCtx, + const repl::OpTime& opTime) { + waitForWriteConcernStub(opCtx, opTime); + return repl::ReplicationCoordinator::StatusAndDuration(Status::OK(), Milliseconds(0)); + }); + + repl::ReplicationCoordinator::set(service, std::move(replCoord)); + } + + void tearDown() override { + waitService()->shutDown(); + } + + WaitForMajorityService* waitService() { + return &_waitForMajorityService; + } + + void finishWaitingOneOpTime() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _isTestReady = true; + _isTestReadyCV.notify_one(); + + while (_isTestReady) { + _finishWaitingOneOpTimeCV.wait(lk); + } + } + + void waitForWriteConcernStub(OperationContext* opCtx, const repl::OpTime& opTime) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + + while (!_isTestReady) { + if (!opCtx->waitForConditionOrInterruptNoAssert(_isTestReadyCV, lk).isOK()) { + return; + } + } + + _lastOpTimeWaited = opTime; + _isTestReady = false; + _finishWaitingOneOpTimeCV.notify_one(); + } + + const repl::OpTime& getLastOpTimeWaited() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _lastOpTimeWaited; + } + +private: + WaitForMajorityService _waitForMajorityService; + + stdx::mutex _mutex; + stdx::condition_variable _isTestReadyCV; + stdx::condition_variable _finishWaitingOneOpTimeCV; + + bool _isTestReady{false}; + repl::OpTime _lastOpTimeWaited; +}; + +TEST_F(WaitForMajorityServiceTest, WaitOneOpTime) { + repl::OpTime t1(Timestamp(1, 0), 2); + + auto future = waitService()->waitUntilMajority(t1); + + ASSERT_FALSE(future.isReady()); + + finishWaitingOneOpTime(); + + future.get(); + ASSERT_EQ(t1, getLastOpTimeWaited()); +} + +TEST_F(WaitForMajorityServiceTest, WaitWithSameOpTime) { + repl::OpTime t1(Timestamp(1, 0), 2); + + auto future1 = waitService()->waitUntilMajority(t1); + auto future1b = waitService()->waitUntilMajority(t1); + + ASSERT_FALSE(future1.isReady()); + ASSERT_FALSE(future1b.isReady()); + + finishWaitingOneOpTime(); + + future1.get(); + future1b.get(); + + ASSERT_EQ(t1, getLastOpTimeWaited()); +} + +TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanLowestQueued) { + repl::OpTime laterOpTime(Timestamp(6, 0), 2); + repl::OpTime earlierOpTime(Timestamp(1, 0), 2); + + auto laterFuture = waitService()->waitUntilMajority(laterOpTime); + auto earlierFuture = waitService()->waitUntilMajority(earlierOpTime); + + ASSERT_FALSE(laterFuture.isReady()); + ASSERT_FALSE(earlierFuture.isReady()); + + finishWaitingOneOpTime(); + + ASSERT_FALSE(laterFuture.isReady()); + + earlierFuture.get(); + ASSERT_EQ(earlierOpTime, getLastOpTimeWaited()); + + finishWaitingOneOpTime(); + laterFuture.get(); + + ASSERT_EQ(laterOpTime, getLastOpTimeWaited()); +} + +TEST_F(WaitForMajorityServiceTest, WaitWithDifferentOpTime) { + repl::OpTime t1(Timestamp(1, 0), 2); + repl::OpTime t2(Timestamp(14, 0), 2); + + auto future1 = waitService()->waitUntilMajority(t1); + auto future2 = waitService()->waitUntilMajority(t2); + + ASSERT_FALSE(future1.isReady()); + ASSERT_FALSE(future2.isReady()); + + finishWaitingOneOpTime(); + + ASSERT_FALSE(future2.isReady()); + + future1.get(); + ASSERT_EQ(t1, getLastOpTimeWaited()); + + finishWaitingOneOpTime(); + + future2.get(); + ASSERT_EQ(t2, getLastOpTimeWaited()); +} + +TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanOpTimeAlreadyWaited) { + repl::OpTime t1(Timestamp(5, 0), 2); + repl::OpTime t2(Timestamp(14, 0), 2); + + auto future1 = waitService()->waitUntilMajority(t1); + auto future2 = waitService()->waitUntilMajority(t2); + + ASSERT_FALSE(future1.isReady()); + ASSERT_FALSE(future2.isReady()); + + finishWaitingOneOpTime(); + + ASSERT_FALSE(future2.isReady()); + + future1.get(); + ASSERT_EQ(t1, getLastOpTimeWaited()); + + repl::OpTime oldTs(Timestamp(4, 0), 2); + auto oldFuture = waitService()->waitUntilMajority(oldTs); + auto alreadyWaitedFuture = waitService()->waitUntilMajority(t1); + + ASSERT_FALSE(future2.isReady()); + + oldFuture.get(); + alreadyWaitedFuture.get(); + ASSERT_EQ(t1, getLastOpTimeWaited()); + + finishWaitingOneOpTime(); + + future2.get(); + ASSERT_EQ(t2, getLastOpTimeWaited()); +} + +TEST_F(WaitForMajorityServiceTest, ShutdownShouldCancelQueuedRequests) { + repl::OpTime t1(Timestamp(5, 0), 2); + repl::OpTime t2(Timestamp(14, 0), 2); + + auto future1 = waitService()->waitUntilMajority(t1); + auto future2 = waitService()->waitUntilMajority(t2); + + ASSERT_FALSE(future1.isReady()); + ASSERT_FALSE(future2.isReady()); + + waitService()->shutDown(); + + ASSERT_THROWS_CODE(future1.get(), AssertionException, ErrorCodes::InterruptedAtShutdown); + ASSERT_THROWS_CODE(future2.get(), AssertionException, ErrorCodes::InterruptedAtShutdown); +} + +TEST_F(WaitForMajorityServiceTest, WriteConcernErrorGetsPropagatedCorrectly) { + repl::OpTime t(Timestamp(5, 0), 2); + + auto replCoord = dynamic_cast<repl::ReplicationCoordinatorMock*>( + repl::ReplicationCoordinator::get(getServiceContext())); + replCoord->setAwaitReplicationReturnValueFunction( + [this](OperationContext* opCtx, const repl::OpTime& opTime) { + return repl::ReplicationCoordinator::StatusAndDuration( + {ErrorCodes::PrimarySteppedDown, "test stepdown"}, Milliseconds(0)); + }); + + auto future = waitService()->waitUntilMajority(t); + ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::PrimarySteppedDown); +} + +} // namespace +} // namespace mongo |