summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2019-06-27 16:02:28 -0400
committerRandolph Tan <randolph@10gen.com>2019-07-16 13:44:09 -0400
commit2dfbaadb85f32869c75f87117d9f6e98b4e948ea (patch)
tree131ffaee362373775aadef9105db99337c5d7fa6 /src/mongo/db
parentdb2cfda750494cf1c2d0a235df945e335075b8e2 (diff)
downloadmongo-2dfbaadb85f32869c75f87117d9f6e98b4e948ea.tar.gz
SERVER-40785 Create WaitForMajorityService to allow waiting for write concern asynchronously
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/catalog/drop_database_test.cpp84
-rw-r--r--src/mongo/db/db.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h8
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/transaction_coordinator.cpp77
-rw-r--r--src/mongo/db/s/transaction_coordinator_service.cpp6
-rw-r--r--src/mongo/db/s/transaction_coordinator_service.h12
-rw-r--r--src/mongo/db/s/transaction_coordinator_service_test.cpp18
-rw-r--r--src/mongo/db/s/transaction_coordinator_test.cpp49
-rw-r--r--src/mongo/db/s/transaction_coordinator_test_fixture.cpp8
-rw-r--r--src/mongo/db/s/transaction_coordinator_test_fixture.h1
-rw-r--r--src/mongo/db/s/transaction_coordinator_util.cpp105
-rw-r--r--src/mongo/db/s/transaction_coordinator_util.h23
-rw-r--r--src/mongo/db/s/wait_for_majority_service.cpp188
-rw-r--r--src/mongo/db/s/wait_for_majority_service.h100
-rw-r--r--src/mongo/db/s/wait_for_majority_service_test.cpp247
17 files changed, 759 insertions, 176 deletions
diff --git a/src/mongo/db/catalog/drop_database_test.cpp b/src/mongo/db/catalog/drop_database_test.cpp
index 4fe1d2bc098..2eb37e80a36 100644
--- a/src/mongo/db/catalog/drop_database_test.cpp
+++ b/src/mongo/db/catalog/drop_database_test.cpp
@@ -268,7 +268,7 @@ TEST_F(DropDatabaseTest, DropDatabaseWaitsForDropPendingCollectionOpTimeIfNoColl
// Update ReplicationCoordinatorMock so that we record the optime passed to awaitReplication().
_replCoord->setAwaitReplicationReturnValueFunction(
- [&clientLastOpTime, this](const repl::OpTime& opTime) {
+ [&clientLastOpTime, this](OperationContext*, const repl::OpTime& opTime) {
clientLastOpTime = opTime;
ASSERT_GREATER_THAN(clientLastOpTime, repl::OpTime());
return repl::ReplicationCoordinator::StatusAndDuration(Status::OK(), Milliseconds(0));
@@ -283,11 +283,12 @@ TEST_F(DropDatabaseTest, DropDatabaseWaitsForDropPendingCollectionOpTimeIfNoColl
TEST_F(DropDatabaseTest, DropDatabasePassedThroughAwaitReplicationErrorForDropPendingCollection) {
// Update ReplicationCoordinatorMock so that we record the optime passed to awaitReplication().
- _replCoord->setAwaitReplicationReturnValueFunction([this](const repl::OpTime& opTime) {
- ASSERT_GREATER_THAN(opTime, repl::OpTime());
- return repl::ReplicationCoordinator::StatusAndDuration(
- Status(ErrorCodes::WriteConcernFailed, ""), Milliseconds(0));
- });
+ _replCoord->setAwaitReplicationReturnValueFunction(
+ [this](OperationContext*, const repl::OpTime& opTime) {
+ ASSERT_GREATER_THAN(opTime, repl::OpTime());
+ return repl::ReplicationCoordinator::StatusAndDuration(
+ Status(ErrorCodes::WriteConcernFailed, ""), Milliseconds(0));
+ });
repl::OpTime dropOpTime(Timestamp(Seconds(100), 0), 1LL);
auto dpns = _nss.makeDropPendingNamespace(dropOpTime);
@@ -342,7 +343,7 @@ void _testDropDatabaseResetsDropPendingStateIfAwaitReplicationFails(OperationCon
TEST_F(DropDatabaseTest,
DropDatabaseResetsDropPendingStateIfAwaitReplicationFailsAndDatabaseIsPresent) {
// Update ReplicationCoordinatorMock so that awaitReplication() fails.
- _replCoord->setAwaitReplicationReturnValueFunction([](const repl::OpTime&) {
+ _replCoord->setAwaitReplicationReturnValueFunction([](OperationContext*, const repl::OpTime&) {
return repl::ReplicationCoordinator::StatusAndDuration(
Status(ErrorCodes::WriteConcernFailed, ""), Milliseconds(0));
});
@@ -353,11 +354,12 @@ TEST_F(DropDatabaseTest,
TEST_F(DropDatabaseTest,
DropDatabaseResetsDropPendingStateIfAwaitReplicationFailsAndDatabaseIsMissing) {
// Update ReplicationCoordinatorMock so that awaitReplication() fails.
- _replCoord->setAwaitReplicationReturnValueFunction([this](const repl::OpTime&) {
- _removeDatabaseFromCatalog(_opCtx.get(), _nss.db());
- return repl::ReplicationCoordinator::StatusAndDuration(
- Status(ErrorCodes::WriteConcernFailed, ""), Milliseconds(0));
- });
+ _replCoord->setAwaitReplicationReturnValueFunction(
+ [this](OperationContext*, const repl::OpTime&) {
+ _removeDatabaseFromCatalog(_opCtx.get(), _nss.db());
+ return repl::ReplicationCoordinator::StatusAndDuration(
+ Status(ErrorCodes::WriteConcernFailed, ""), Milliseconds(0));
+ });
_testDropDatabaseResetsDropPendingStateIfAwaitReplicationFails(_opCtx.get(), _nss, false);
}
@@ -368,15 +370,16 @@ TEST_F(DropDatabaseTest,
// dropDatabase() should detect this and release the global lock temporarily if it needs to call
// ReplicationCoordinator::awaitReplication().
bool isAwaitReplicationCalled = false;
- _replCoord->setAwaitReplicationReturnValueFunction([&, this](const repl::OpTime& opTime) {
- isAwaitReplicationCalled = true;
- // This test does not set the client's last optime.
- ASSERT_EQUALS(opTime, repl::OpTime());
- ASSERT_FALSE(_opCtx->lockState()->isW());
- ASSERT_FALSE(_opCtx->lockState()->isDbLockedForMode(_nss.db(), MODE_X));
- ASSERT_FALSE(_opCtx->lockState()->isLocked());
- return repl::ReplicationCoordinator::StatusAndDuration(Status::OK(), Milliseconds(0));
- });
+ _replCoord->setAwaitReplicationReturnValueFunction(
+ [&, this](OperationContext*, const repl::OpTime& opTime) {
+ isAwaitReplicationCalled = true;
+ // This test does not set the client's last optime.
+ ASSERT_EQUALS(opTime, repl::OpTime());
+ ASSERT_FALSE(_opCtx->lockState()->isW());
+ ASSERT_FALSE(_opCtx->lockState()->isDbLockedForMode(_nss.db(), MODE_X));
+ ASSERT_FALSE(_opCtx->lockState()->isLocked());
+ return repl::ReplicationCoordinator::StatusAndDuration(Status::OK(), Milliseconds(0));
+ });
{
Lock::GlobalWrite lk(_opCtx.get());
@@ -392,14 +395,15 @@ TEST_F(DropDatabaseTest,
// dropDatabase() should detect this and release the global lock temporarily if it needs to call
// ReplicationCoordinator::awaitReplication().
bool isAwaitReplicationCalled = false;
- _replCoord->setAwaitReplicationReturnValueFunction([&, this](const repl::OpTime& opTime) {
- isAwaitReplicationCalled = true;
- ASSERT_GREATER_THAN(opTime, repl::OpTime());
- ASSERT_FALSE(_opCtx->lockState()->isW());
- ASSERT_FALSE(_opCtx->lockState()->isDbLockedForMode(_nss.db(), MODE_X));
- ASSERT_FALSE(_opCtx->lockState()->isLocked());
- return repl::ReplicationCoordinator::StatusAndDuration(Status::OK(), Milliseconds(0));
- });
+ _replCoord->setAwaitReplicationReturnValueFunction(
+ [&, this](OperationContext*, const repl::OpTime& opTime) {
+ isAwaitReplicationCalled = true;
+ ASSERT_GREATER_THAN(opTime, repl::OpTime());
+ ASSERT_FALSE(_opCtx->lockState()->isW());
+ ASSERT_FALSE(_opCtx->lockState()->isDbLockedForMode(_nss.db(), MODE_X));
+ ASSERT_FALSE(_opCtx->lockState()->isLocked());
+ return repl::ReplicationCoordinator::StatusAndDuration(Status::OK(), Milliseconds(0));
+ });
repl::OpTime dropOpTime(Timestamp(Seconds(100), 0), 1LL);
auto dpns = _nss.makeDropPendingNamespace(dropOpTime);
@@ -416,10 +420,11 @@ TEST_F(DropDatabaseTest,
TEST_F(DropDatabaseTest,
DropDatabaseReturnsNamespaceNotFoundIfDatabaseIsRemovedAfterCollectionsDropsAreReplicated) {
// Update ReplicationCoordinatorMock so that awaitReplication() fails.
- _replCoord->setAwaitReplicationReturnValueFunction([this](const repl::OpTime&) {
- _removeDatabaseFromCatalog(_opCtx.get(), _nss.db());
- return repl::ReplicationCoordinator::StatusAndDuration(Status::OK(), Milliseconds(0));
- });
+ _replCoord->setAwaitReplicationReturnValueFunction(
+ [this](OperationContext*, const repl::OpTime&) {
+ _removeDatabaseFromCatalog(_opCtx.get(), _nss.db());
+ return repl::ReplicationCoordinator::StatusAndDuration(Status::OK(), Milliseconds(0));
+ });
_createCollection(_opCtx.get(), _nss);
@@ -438,12 +443,13 @@ TEST_F(DropDatabaseTest,
TEST_F(DropDatabaseTest,
DropDatabaseReturnsNotMasterIfNotPrimaryAfterCollectionsDropsAreReplicated) {
// Transition from PRIMARY to SECONDARY while awaiting replication of collection drops.
- _replCoord->setAwaitReplicationReturnValueFunction([this](const repl::OpTime&) {
- ASSERT_OK(_replCoord->setFollowerMode(repl::MemberState::RS_SECONDARY));
- ASSERT_TRUE(_opCtx->writesAreReplicated());
- ASSERT_FALSE(_replCoord->canAcceptWritesForDatabase(_opCtx.get(), _nss.db()));
- return repl::ReplicationCoordinator::StatusAndDuration(Status::OK(), Milliseconds(0));
- });
+ _replCoord->setAwaitReplicationReturnValueFunction(
+ [this](OperationContext*, const repl::OpTime&) {
+ ASSERT_OK(_replCoord->setFollowerMode(repl::MemberState::RS_SECONDARY));
+ ASSERT_TRUE(_opCtx->writesAreReplicated());
+ ASSERT_FALSE(_replCoord->canAcceptWritesForDatabase(_opCtx.get(), _nss.db()));
+ return repl::ReplicationCoordinator::StatusAndDuration(Status::OK(), Milliseconds(0));
+ });
_createCollection(_opCtx.get(), _nss);
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 652355ee908..37082607dba 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -123,6 +123,7 @@
#include "mongo/db/s/shard_server_op_observer.h"
#include "mongo/db/s/sharding_initialization_mongod.h"
#include "mongo/db/s/sharding_state_recovery.h"
+#include "mongo/db/s/wait_for_majority_service.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/db/service_entry_point_mongod.h"
@@ -505,6 +506,8 @@ ExitCode _initAndListen(int listenPort) {
<< startupWarningsLog;
}
+ WaitForMajorityService::get(serviceContext).setUp(serviceContext);
+
// This function may take the global lock.
auto shardingInitialized = ShardingInitializationMongoD::get(startupOpCtx.get())
->initializeShardingAwarenessIfNeeded(startupOpCtx.get());
@@ -904,6 +907,8 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) {
}
}
+ WaitForMajorityService::get(serviceContext).shutDown();
+
// Terminate the balancer thread so it doesn't leak memory.
if (auto balancer = Balancer::get(serviceContext)) {
balancer->interruptBalancer();
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 9194089e26e..32589b56952 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -138,7 +138,7 @@ void ReplicationCoordinatorMock::clearSyncSourceBlacklist() {}
ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorMock::awaitReplication(
OperationContext* opCtx, const OpTime& opTime, const WriteConcernOptions& writeConcern) {
- return _awaitReplicationReturnValueFunction(opTime);
+ return _awaitReplicationReturnValueFunction(opCtx, opTime);
}
void ReplicationCoordinatorMock::setAwaitReplicationReturnValueFunction(
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 0a9e49f3e5e..62126adf068 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -287,9 +287,10 @@ public:
/**
* Sets the function to generate the return value for calls to awaitReplication().
- * 'opTime' is the optime passed to awaitReplication().
+ * 'OperationContext' and 'opTime' are the parameters passed to awaitReplication().
*/
- using AwaitReplicationReturnValueFunction = std::function<StatusAndDuration(const OpTime&)>;
+ using AwaitReplicationReturnValueFunction =
+ std::function<StatusAndDuration(OperationContext*, const OpTime&)>;
void setAwaitReplicationReturnValueFunction(
AwaitReplicationReturnValueFunction returnValueFunction);
@@ -325,7 +326,8 @@ private:
OpTime _myLastAppliedOpTime;
Date_t _myLastAppliedWallTime;
ReplSetConfig _getConfigReturnValue;
- AwaitReplicationReturnValueFunction _awaitReplicationReturnValueFunction = [](const OpTime&) {
+ AwaitReplicationReturnValueFunction _awaitReplicationReturnValueFunction = [](OperationContext*,
+ const OpTime&) {
return StatusAndDuration(Status::OK(), Milliseconds(0));
};
bool _alwaysAllowWrites = false;
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 41588ca7eb8..528b880d93c 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -132,6 +132,7 @@ env.Library(
'transaction_coordinator_structures.cpp',
'transaction_coordinator_util.cpp',
'transaction_coordinator.cpp',
+ 'wait_for_majority_service.cpp',
env.Idlc('transaction_coordinator_document.idl')[0],
env.Idlc('transaction_coordinators_stats.idl')[0],
],
@@ -327,6 +328,7 @@ env.CppUnitTest(
'sharding_logging_test.cpp',
'start_chunk_clone_request_test.cpp',
'type_shard_identity_test.cpp',
+ 'wait_for_majority_service_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/authmocks',
diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp
index e9c59b0b17b..3085db606e4 100644
--- a/src/mongo/db/s/transaction_coordinator.cpp
+++ b/src/mongo/db/s/transaction_coordinator.cpp
@@ -35,7 +35,10 @@
#include "mongo/db/logical_clock.h"
#include "mongo/db/s/transaction_coordinator_metrics_observer.h"
+#include "mongo/db/s/wait_for_majority_service.h"
#include "mongo/db/server_options.h"
+#include "mongo/s/grid.h"
+#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -46,6 +49,26 @@ using CoordinatorCommitDecision = txn::CoordinatorCommitDecision;
using PrepareVoteConsensus = txn::PrepareVoteConsensus;
using TransactionCoordinatorDocument = txn::TransactionCoordinatorDocument;
+MONGO_FAIL_POINT_DEFINE(hangBeforeWaitingForParticipantListWriteConcern);
+MONGO_FAIL_POINT_DEFINE(hangBeforeWaitingForDecisionWriteConcern);
+
+void hangIfFailPointEnabled(ServiceContext* service,
+ FailPoint& failpoint,
+ const StringData& failPointName) {
+ MONGO_FAIL_POINT_BLOCK(failpoint, fp) {
+ LOG(0) << "Hit " << failPointName << " failpoint";
+ const BSONObj& data = fp.getData();
+ if (!data["useUninterruptibleSleep"].eoo()) {
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(failpoint);
+ } else {
+ ThreadClient tc(failPointName, service);
+ auto opCtx = tc->makeOperationContext();
+
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx.get(), failpoint);
+ }
+ }
+}
+
} // namespace
TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
@@ -112,17 +135,25 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
_serviceContext->getPreciseClockSource()->now());
if (_participantsDurable)
- return Future<void>::makeReady();
+ return Future<repl::OpTime>::makeReady(repl::OpTime());
}
return txn::persistParticipantsList(
- *_sendPrepareScheduler, _lsid, _txnNumber, *_participants)
- .then([this] {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- _participantsDurable = true;
- });
+ *_sendPrepareScheduler, _lsid, _txnNumber, *_participants);
})
+ .then([this](repl::OpTime opTime) {
+ hangIfFailPointEnabled(_serviceContext,
+ hangBeforeWaitingForParticipantListWriteConcern,
+ "hangBeforeWaitingForParticipantListWriteConcern");
+ return WaitForMajorityService::get(_serviceContext).waitUntilMajority(opTime);
+ })
+ .thenRunOn(Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor())
.then([this] {
+ {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ _participantsDurable = true;
+ }
+
// Send prepare to the participants, unless this has already been done (which would only
// be the case if this coordinator was created as part of step-up recovery and the
// recovery document contained a decision).
@@ -184,16 +215,24 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
_serviceContext->getPreciseClockSource()->now());
if (_decisionDurable)
- return Future<void>::makeReady();
+ return Future<repl::OpTime>::makeReady(repl::OpTime());
}
- return txn::persistDecision(*_scheduler, _lsid, _txnNumber, *_participants, *_decision)
- .then([this] {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- _decisionDurable = true;
- });
+ return txn::persistDecision(*_scheduler, _lsid, _txnNumber, *_participants, *_decision);
+ })
+ .then([this](repl::OpTime opTime) {
+ hangIfFailPointEnabled(_serviceContext,
+ hangBeforeWaitingForDecisionWriteConcern,
+ "hangBeforeWaitingForDecisionWriteConcern");
+ return WaitForMajorityService::get(_serviceContext).waitUntilMajority(opTime);
})
+ .thenRunOn(Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor())
.then([this] {
+ {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ _decisionDurable = true;
+ }
+
// Send the commit/abort decision to the participants.
// Input: _decisionDurable
// Output: (none)
@@ -249,13 +288,13 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
_scheduler->shutdown(
{ErrorCodes::TransactionCoordinatorDeadlineTaskCanceled, "Coordinator completed"});
- return std::move(deadlineFuture).onCompletion([s = std::move(s)](Status) { return s; });
- })
- .getAsync([this](Status s) {
- // Notify all the listeners which are interested in the coordinator's lifecycle. After
- // this call, the coordinator object could potentially get destroyed by its lifetime
- // controller, so there shouldn't be any accesses to `this` after this call.
- _done(s);
+ return std::move(deadlineFuture).onCompletion([ this, s = std::move(s) ](Status) {
+ // Notify all the listeners which are interested in the coordinator's lifecycle.
+ // After this call, the coordinator object could potentially get destroyed by its
+ // lifetime controller, so there shouldn't be any accesses to `this` after this
+ // call.
+ _done(s);
+ });
});
}
diff --git a/src/mongo/db/s/transaction_coordinator_service.cpp b/src/mongo/db/s/transaction_coordinator_service.cpp
index 2e93ca54435..dac4caee608 100644
--- a/src/mongo/db/s/transaction_coordinator_service.cpp
+++ b/src/mongo/db/s/transaction_coordinator_service.cpp
@@ -51,7 +51,7 @@ const auto transactionCoordinatorServiceDecoration =
TransactionCoordinatorService::TransactionCoordinatorService() = default;
TransactionCoordinatorService::~TransactionCoordinatorService() {
- _joinPreviousRound();
+ joinPreviousRound();
}
TransactionCoordinatorService* TransactionCoordinatorService::get(OperationContext* opCtx) {
@@ -137,7 +137,7 @@ boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::reco
void TransactionCoordinatorService::onStepUp(OperationContext* opCtx,
Milliseconds recoveryDelayForTesting) {
- _joinPreviousRound();
+ joinPreviousRound();
stdx::lock_guard<stdx::mutex> lg(_mutex);
invariant(!_catalogAndScheduler);
@@ -235,7 +235,7 @@ TransactionCoordinatorService::_getCatalogAndScheduler(OperationContext* opCtx)
return _catalogAndScheduler;
}
-void TransactionCoordinatorService::_joinPreviousRound() {
+void TransactionCoordinatorService::joinPreviousRound() {
// onStepDown must have been called
invariant(!_catalogAndScheduler);
diff --git a/src/mongo/db/s/transaction_coordinator_service.h b/src/mongo/db/s/transaction_coordinator_service.h
index c1dcb9550e7..11561a7e648 100644
--- a/src/mongo/db/s/transaction_coordinator_service.h
+++ b/src/mongo/db/s/transaction_coordinator_service.h
@@ -109,6 +109,12 @@ public:
LogicalSessionId lsid,
TxnNumber txnNumber);
+ /**
+ * Blocking call which waits for the previous stepUp/stepDown round to join and ensures all
+ * tasks scheduled by that round have completed.
+ */
+ void joinPreviousRound();
+
private:
struct CatalogAndScheduler {
CatalogAndScheduler(ServiceContext* service) : scheduler(service) {}
@@ -128,12 +134,6 @@ private:
*/
std::shared_ptr<CatalogAndScheduler> _getCatalogAndScheduler(OperationContext* opCtx);
- /**
- * Blocking call which waits for the previous stepUp/stepDown round to join and ensures all
- * tasks scheduled by that round have completed.
- */
- void _joinPreviousRound();
-
// Contains the catalog + scheduler, which was active at the last step-down attempt (if any).
// Set at onStepDown and destroyed at onStepUp, which are always invoked sequentially by the
// replication machinery, so there is no need to explicitly synchronize it
diff --git a/src/mongo/db/s/transaction_coordinator_service_test.cpp b/src/mongo/db/s/transaction_coordinator_service_test.cpp
index 128a574c79c..6ce22953437 100644
--- a/src/mongo/db/s/transaction_coordinator_service_test.cpp
+++ b/src/mongo/db/s/transaction_coordinator_service_test.cpp
@@ -192,7 +192,6 @@ protected:
}
};
-
using TransactionCoordinatorServiceStepUpStepDownTest = TransactionCoordinatorServiceTestFixture;
TEST_F(TransactionCoordinatorServiceStepUpStepDownTest, OperationsFailBeforeStepUpStarts) {
@@ -273,7 +272,6 @@ TEST_F(TransactionCoordinatorServiceStepUpStepDownTest, StepDownBeforeStepUpTask
service()->onStepDown();
}
-
class TransactionCoordinatorServiceTest : public TransactionCoordinatorServiceTestFixture {
protected:
void setUp() override {
@@ -284,6 +282,7 @@ protected:
void tearDown() override {
service()->onStepDown();
+ service()->joinPreviousRound();
TransactionCoordinatorServiceTestFixture::tearDown();
}
@@ -341,8 +340,6 @@ TEST_F(TransactionCoordinatorServiceTest,
// commit acks.
coordinatorService->createCoordinator(
operationContext(), _lsid, _txnNumber + 1, kCommitDeadline);
- auto newTxnCommitDecisionFuture = coordinatorService->coordinateCommit(
- operationContext(), _lsid, _txnNumber + 1, kTwoShardIdSet);
// Finish committing the old transaction by sending it commit acks from both participants.
assertCommitSentAndRespondWithSuccess();
@@ -351,6 +348,10 @@ TEST_F(TransactionCoordinatorServiceTest,
// The old transaction should now be committed.
ASSERT_EQ(static_cast<int>(oldTxnCommitDecisionFuture.get()),
static_cast<int>(txn::CommitDecision::kCommit));
+
+ auto newTxnCommitDecisionFuture = coordinatorService->coordinateCommit(
+ operationContext(), _lsid, _txnNumber + 1, kTwoShardIdSet);
+
commitTransaction(*coordinatorService, _lsid, _txnNumber + 1, kTwoShardIdSet);
}
@@ -703,19 +704,12 @@ TEST_F(TransactionCoordinatorServiceTestSingleTxn,
}
TEST_F(TransactionCoordinatorServiceTestSingleTxn,
- CoordinateCommitWithNoVotesReturnsNotReadyFuture) {
+ CoordinateCommitReturnsCorrectCommitDecisionOnCommit) {
auto commitDecisionFuture = *coordinatorService()->coordinateCommit(
operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
ASSERT_FALSE(commitDecisionFuture.isReady());
-}
-
-TEST_F(TransactionCoordinatorServiceTestSingleTxn,
- CoordinateCommitReturnsCorrectCommitDecisionOnCommit) {
-
- auto commitDecisionFuture = *coordinatorService()->coordinateCommit(
- operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
assertPrepareSentAndRespondWithSuccess();
assertPrepareSentAndRespondWithSuccess();
diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp
index ddf0bb96fa6..32134a82a71 100644
--- a/src/mongo/db/s/transaction_coordinator_test.cpp
+++ b/src/mongo/db/s/transaction_coordinator_test.cpp
@@ -66,6 +66,31 @@ StatusWith<BSONObj> makePrepareOkResponse(const Timestamp& timestamp) {
const StatusWith<BSONObj> kPrepareOk = makePrepareOkResponse(kDummyPrepareTimestamp);
const StatusWith<BSONObj> kPrepareOkNoTimestamp = BSON("ok" << 1);
+/**
+ * Searches for a client matching the name and mark the operation context as killed.
+ */
+void killClientOpCtx(ServiceContext* service, const std::string& clientName) {
+ for (int retries = 0; retries < 20; retries++) {
+ for (ServiceContext::LockedClientsCursor cursor(service); auto client = cursor.next();) {
+ invariant(client);
+
+ stdx::lock_guard lk(*client);
+ if (client->desc() == clientName) {
+ if (auto opCtx = client->getOperationContext()) {
+ opCtx->getServiceContext()->killOperation(
+ lk, opCtx, ErrorCodes::InterruptedAtShutdown);
+ return;
+ }
+ }
+ }
+
+ sleepmillis(50);
+ }
+
+ error() << "Timed out trying to find and kill client opCtx with name: " << clientName;
+ ASSERT_FALSE(true);
+}
+
class TransactionCoordinatorTestBase : public TransactionCoordinatorTestFixture {
protected:
void assertPrepareSentAndRespondWithSuccess() {
@@ -566,8 +591,7 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest,
// We should retry until shutdown. The original participants should be persisted.
std::vector<ShardId> smallerParticipantList{ShardId("shard0001"), ShardId("shard0002")};
- Future<void> future =
- txn::persistParticipantsList(*_aws, _lsid, _txnNumber, smallerParticipantList);
+ auto future = txn::persistParticipantsList(*_aws, _lsid, _txnNumber, smallerParticipantList);
_aws->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Shutdown for test"});
advanceClockAndExecuteScheduledTasks();
@@ -619,11 +643,12 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest,
TEST_F(TransactionCoordinatorDriverPersistenceTest,
PersistCommitDecisionWhenNoDocumentForTransactionExistsCanBeInterruptedAndReturnsError) {
- Future<void> future = txn::persistDecision(*_aws, _lsid, _txnNumber, _participants, [&] {
- txn::CoordinatorCommitDecision decision(txn::CommitDecision::kCommit);
- decision.setCommitTimestamp(_commitTimestamp);
- return decision;
- }());
+ Future<repl::OpTime> future =
+ txn::persistDecision(*_aws, _lsid, _txnNumber, _participants, [&] {
+ txn::CoordinatorCommitDecision decision(txn::CommitDecision::kCommit);
+ decision.setCommitTimestamp(_commitTimestamp);
+ return decision;
+ }());
_aws->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Shutdown for test"});
ASSERT_THROWS_CODE(
@@ -1078,6 +1103,10 @@ public:
assertCommitSentAndRespondWithSuccess();
stopCapturingLogMessages();
+
+ // Properly wait for the coordinator to finish all asynchronous tasks.
+ auto future = coordinator.onCompletion();
+ future.getNoThrow().ignore();
}
};
@@ -1624,7 +1653,6 @@ TEST_F(TransactionCoordinatorMetricsTest,
expectedMetrics.totalCreated++;
auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
- auto awsPtr = aws.get();
TransactionCoordinator coordinator(
getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
const auto& stats =
@@ -1657,7 +1685,7 @@ TEST_F(TransactionCoordinatorMetricsTest,
*expectedStats.writingParticipantListDuration + Microseconds(100);
expectedMetrics.currentWritingParticipantList--;
- awsPtr->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "dummy"});
+ killClientOpCtx(getServiceContext(), "hangBeforeWaitingForParticipantListWriteConcern");
coordinator.onCompletion().get();
checkStats(stats, expectedStats);
@@ -1746,7 +1774,6 @@ TEST_F(TransactionCoordinatorMetricsTest,
expectedMetrics.totalCreated++;
auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
- auto awsPtr = aws.get();
TransactionCoordinator coordinator(
getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
const auto& stats =
@@ -1784,7 +1811,7 @@ TEST_F(TransactionCoordinatorMetricsTest,
*expectedStats.writingDecisionDuration + Microseconds(100);
expectedMetrics.currentWritingDecision--;
- awsPtr->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "dummy"});
+ killClientOpCtx(getServiceContext(), "hangBeforeWaitingForDecisionWriteConcern");
coordinator.onCompletion().get();
checkStats(stats, expectedStats);
diff --git a/src/mongo/db/s/transaction_coordinator_test_fixture.cpp b/src/mongo/db/s/transaction_coordinator_test_fixture.cpp
index bced1fcb575..6554461dd97 100644
--- a/src/mongo/db/s/transaction_coordinator_test_fixture.cpp
+++ b/src/mongo/db/s/transaction_coordinator_test_fixture.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/s/wait_for_majority_service.h"
#include "mongo/s/catalog/sharding_catalog_client_mock.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/unittest/unittest.h"
@@ -64,6 +65,13 @@ void TransactionCoordinatorTestFixture::setUp() {
uassertStatusOK(shardRegistry()->getShard(operationContext(), shardId))->getTargeter());
shardTargeter->setFindHostReturnValue(makeHostAndPort(shardId));
}
+
+ WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
+}
+
+void TransactionCoordinatorTestFixture::tearDown() {
+ WaitForMajorityService::get(getServiceContext()).shutDown();
+ ShardServerTestFixture::tearDown();
}
std::unique_ptr<ShardingCatalogClient> TransactionCoordinatorTestFixture::makeShardingCatalogClient(
diff --git a/src/mongo/db/s/transaction_coordinator_test_fixture.h b/src/mongo/db/s/transaction_coordinator_test_fixture.h
index 981f4bdb046..5f1067a9320 100644
--- a/src/mongo/db/s/transaction_coordinator_test_fixture.h
+++ b/src/mongo/db/s/transaction_coordinator_test_fixture.h
@@ -45,6 +45,7 @@ namespace mongo {
class TransactionCoordinatorTestFixture : public ShardServerTestFixture {
protected:
void setUp() override;
+ void tearDown() override;
/**
* Override the CatalogClient to make CatalogClient::getAllShards automatically return the
diff --git a/src/mongo/db/s/transaction_coordinator_util.cpp b/src/mongo/db/s/transaction_coordinator_util.cpp
index 5baab08bc83..0d3bde20418 100644
--- a/src/mongo/db/s/transaction_coordinator_util.cpp
+++ b/src/mongo/db/s/transaction_coordinator_util.cpp
@@ -48,8 +48,6 @@ namespace mongo {
namespace txn {
namespace {
-MONGO_FAIL_POINT_DEFINE(hangBeforeWaitingForParticipantListWriteConcern);
-MONGO_FAIL_POINT_DEFINE(hangBeforeWaitingForDecisionWriteConcern);
MONGO_FAIL_POINT_DEFINE(hangAfterDeletingCoordinatorDoc);
MONGO_FAIL_POINT_DEFINE(hangBeforeWritingParticipantList);
@@ -58,10 +56,6 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeDeletingCoordinatorDoc);
using ResponseStatus = executor::TaskExecutor::ResponseStatus;
-const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
- WriteConcernOptions::SyncMode::UNSET,
- WriteConcernOptions::kNoTimeout);
-
const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max());
const ReadPreferenceSetting kPrimaryReadPreference{ReadPreference::PrimaryOnly};
@@ -92,7 +86,8 @@ std::string buildParticipantListString(const std::vector<ShardId>& participantLi
return ss.str();
}
-bool shouldRetryPersistingCoordinatorState(const Status& responseStatus) {
+template <typename T>
+bool shouldRetryPersistingCoordinatorState(const StatusWith<T>& responseStatus) {
return !responseStatus.isOK() &&
responseStatus != ErrorCodes::TransactionCoordinatorSteppingDown;
}
@@ -100,10 +95,10 @@ bool shouldRetryPersistingCoordinatorState(const Status& responseStatus) {
} // namespace
namespace {
-void persistParticipantListBlocking(OperationContext* opCtx,
- const LogicalSessionId& lsid,
- TxnNumber txnNumber,
- const std::vector<ShardId>& participantList) {
+repl::OpTime persistParticipantListBlocking(OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
+ const std::vector<ShardId>& participantList) {
LOG(3) << "Going to write participant list for " << lsid.getId() << ':' << txnNumber;
if (MONGO_FAIL_POINT(hangBeforeWritingParticipantList)) {
@@ -173,37 +168,21 @@ void persistParticipantListBlocking(OperationContext* opCtx,
LOG(3) << "Wrote participant list for " << lsid.getId() << ':' << txnNumber;
- MONGO_FAIL_POINT_BLOCK(hangBeforeWaitingForParticipantListWriteConcern, fp) {
- LOG(0) << "Hit hangBeforeWaitingForParticipantListWriteConcern failpoint";
- const BSONObj& data = fp.getData();
- if (!data["useUninterruptibleSleep"].eoo()) {
- MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeWaitingForParticipantListWriteConcern);
- } else {
- MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(
- opCtx, hangBeforeWaitingForParticipantListWriteConcern);
- }
- }
-
- WriteConcernResult unusedWCResult;
- uassertStatusOK(
- waitForWriteConcern(opCtx,
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
- kMajorityWriteConcern,
- &unusedWCResult));
+ return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
}
} // namespace
-Future<void> persistParticipantsList(txn::AsyncWorkScheduler& scheduler,
- const LogicalSessionId& lsid,
- TxnNumber txnNumber,
- const txn::ParticipantsList& participants) {
+Future<repl::OpTime> persistParticipantsList(txn::AsyncWorkScheduler& scheduler,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
+ const txn::ParticipantsList& participants) {
return txn::doWhile(
scheduler,
boost::none /* no need for a backoff */,
- [](const Status& s) { return shouldRetryPersistingCoordinatorState(s); },
+ [](const StatusWith<repl::OpTime>& s) { return shouldRetryPersistingCoordinatorState(s); },
[&scheduler, lsid, txnNumber, participants] {
return scheduler.scheduleWork([lsid, txnNumber, participants](OperationContext* opCtx) {
- persistParticipantListBlocking(opCtx, lsid, txnNumber, participants);
+ return persistParticipantListBlocking(opCtx, lsid, txnNumber, participants);
});
});
}
@@ -286,11 +265,11 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service,
}
namespace {
-void persistDecisionBlocking(OperationContext* opCtx,
- const LogicalSessionId& lsid,
- TxnNumber txnNumber,
- const std::vector<ShardId>& participantList,
- const txn::CoordinatorCommitDecision& decision) {
+repl::OpTime persistDecisionBlocking(OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
+ const std::vector<ShardId>& participantList,
+ const txn::CoordinatorCommitDecision& decision) {
const bool isCommit = decision.getDecision() == txn::CommitDecision::kCommit;
LOG(3) << "Going to write decision " << (isCommit ? "commit" : "abort") << " for "
<< lsid.getId() << ':' << txnNumber;
@@ -368,41 +347,25 @@ void persistDecisionBlocking(OperationContext* opCtx,
LOG(3) << "Wrote decision " << (isCommit ? "commit" : "abort") << " for " << lsid.getId() << ':'
<< txnNumber;
- MONGO_FAIL_POINT_BLOCK(hangBeforeWaitingForDecisionWriteConcern, fp) {
- LOG(0) << "Hit hangBeforeWaitingForDecisionWriteConcern failpoint";
- const BSONObj& data = fp.getData();
- if (!data["useUninterruptibleSleep"].eoo()) {
- MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeWaitingForDecisionWriteConcern);
- } else {
- MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(
- opCtx, hangBeforeWaitingForDecisionWriteConcern);
- }
- }
-
- WriteConcernResult unusedWCResult;
- uassertStatusOK(
- waitForWriteConcern(opCtx,
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
- kMajorityWriteConcern,
- &unusedWCResult));
+ return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
}
} // namespace
-Future<void> persistDecision(txn::AsyncWorkScheduler& scheduler,
- const LogicalSessionId& lsid,
- TxnNumber txnNumber,
- const txn::ParticipantsList& participants,
- const txn::CoordinatorCommitDecision& decision) {
- return txn::doWhile(scheduler,
- boost::none /* no need for a backoff */,
- [](const Status& s) { return shouldRetryPersistingCoordinatorState(s); },
- [&scheduler, lsid, txnNumber, participants, decision] {
- return scheduler.scheduleWork(
- [lsid, txnNumber, participants, decision](OperationContext* opCtx) {
- persistDecisionBlocking(
- opCtx, lsid, txnNumber, participants, decision);
- });
- });
+Future<repl::OpTime> persistDecision(txn::AsyncWorkScheduler& scheduler,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
+ const txn::ParticipantsList& participants,
+ const txn::CoordinatorCommitDecision& decision) {
+ return txn::doWhile(
+ scheduler,
+ boost::none /* no need for a backoff */,
+ [](const StatusWith<repl::OpTime>& s) { return shouldRetryPersistingCoordinatorState(s); },
+ [&scheduler, lsid, txnNumber, participants, decision] {
+ return scheduler.scheduleWork(
+ [lsid, txnNumber, participants, decision](OperationContext* opCtx) {
+ return persistDecisionBlocking(opCtx, lsid, txnNumber, participants, decision);
+ });
+ });
}
Future<void> sendCommit(ServiceContext* service,
diff --git a/src/mongo/db/s/transaction_coordinator_util.h b/src/mongo/db/s/transaction_coordinator_util.h
index d6e87bb4baf..b83afda06e1 100644
--- a/src/mongo/db/s/transaction_coordinator_util.h
+++ b/src/mongo/db/s/transaction_coordinator_util.h
@@ -31,6 +31,7 @@
#include <vector>
+#include "mongo/db/repl/optime.h"
#include "mongo/db/s/transaction_coordinator_document_gen.h"
#include "mongo/db/s/transaction_coordinator_futures_util.h"
@@ -45,17 +46,17 @@ namespace txn {
* participants: ["shard0000", "shard0001"]
* }
*
- * into config.transaction_coordinators and waits for the upsert to be majority-committed.
+ * into config.transaction_coordinators and returns the opTime of the upsert.
*
* Throws if the upsert fails or waiting for writeConcern fails.
*
* If the upsert returns a DuplicateKey error, converts it to an anonymous error, because it means a
* document for the (lsid, txnNumber) exists with a different participant list.
*/
-Future<void> persistParticipantsList(txn::AsyncWorkScheduler& scheduler,
- const LogicalSessionId& lsid,
- TxnNumber txnNumber,
- const txn::ParticipantsList& participants);
+Future<repl::OpTime> persistParticipantsList(txn::AsyncWorkScheduler& scheduler,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
+ const txn::ParticipantsList& participants);
struct PrepareResponse;
class PrepareVoteConsensus {
@@ -120,7 +121,7 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service,
* commitTimestamp: Timestamp(xxxxxxxx, x),
* }
*
- * and waits for the update to be majority-committed.
+ * Returns the opTime of the write.
*
* Throws if the update fails or waiting for writeConcern fails.
*
@@ -128,11 +129,11 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service,
* means either no document for (lsid, txnNumber) exists, or a document exists but has a different
* participant list, different decision, or different commit Timestamp.
*/
-Future<void> persistDecision(txn::AsyncWorkScheduler& scheduler,
- const LogicalSessionId& lsid,
- TxnNumber txnNumber,
- const txn::ParticipantsList& participants,
- const txn::CoordinatorCommitDecision& decision);
+Future<repl::OpTime> persistDecision(txn::AsyncWorkScheduler& scheduler,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
+ const txn::ParticipantsList& participants,
+ const txn::CoordinatorCommitDecision& decision);
/**
* Sends commit to all shards and returns a future that will be resolved when all participants have
diff --git a/src/mongo/db/s/wait_for_majority_service.cpp b/src/mongo/db/s/wait_for_majority_service.cpp
new file mode 100644
index 00000000000..0625a84b611
--- /dev/null
+++ b/src/mongo/db/s/wait_for_majority_service.cpp
@@ -0,0 +1,188 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/wait_for_majority_service.h"
+
+#include <utility>
+
+#include "mongo/db/service_context.h"
+#include "mongo/db/write_concern.h"
+#include "mongo/executor/network_interface_factory.h"
+#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/util/concurrency/thread_pool.h"
+
+namespace mongo {
+
+namespace {
+const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
+ WriteConcernOptions::SyncMode::UNSET,
+ WriteConcernOptions::kNoTimeout);
+
+const auto waitForMajorityServiceDecoration =
+ ServiceContext::declareDecoration<WaitForMajorityService>();
+} // namespace
+
+WaitForMajorityService::~WaitForMajorityService() {
+ shutDown();
+}
+
+WaitForMajorityService& WaitForMajorityService::get(ServiceContext* service) {
+ return waitForMajorityServiceDecoration(service);
+}
+
+void WaitForMajorityService::setUp(ServiceContext* service) {
+ stdx::lock_guard lk(_mutex);
+
+ if (!_thread.joinable() && !_inShutDown) {
+ _thread = stdx::thread([this, service] { _periodicallyWaitForMajority(service); });
+ }
+}
+
+void WaitForMajorityService::shutDown() {
+ {
+ stdx::lock_guard lk(_mutex);
+
+ if (std::exchange(_inShutDown, true)) {
+ return;
+ }
+
+ if (_opCtx) {
+ stdx::lock_guard scopedClientLock(*_opCtx->getClient());
+ _opCtx->getServiceContext()->killOperation(
+ scopedClientLock, _opCtx, ErrorCodes::InterruptedAtShutdown);
+ }
+ }
+
+ if (_thread.joinable()) {
+ _thread.join();
+ }
+
+ stdx::lock_guard lk(_mutex);
+ for (auto&& pendingRequest : _queuedOpTimes) {
+ pendingRequest.second.setError(
+ {ErrorCodes::InterruptedAtShutdown, "Shutting down wait for majority service"});
+ }
+
+ _queuedOpTimes.clear();
+}
+
+SharedSemiFuture<void> WaitForMajorityService::waitUntilMajority(const repl::OpTime& opTime) {
+ stdx::lock_guard lk(_mutex);
+
+ if (_inShutDown) {
+ return {Future<void>::makeReady(
+ Status{ErrorCodes::ShutdownInProgress,
+ "rejecting wait for majority request due to server shutdown"})};
+ }
+
+ // Background thread must be running before requesting.
+ invariant(_thread.joinable());
+
+ if (_lastOpTimeWaited >= opTime) {
+ return {Future<void>::makeReady()};
+ }
+
+ auto iter = _queuedOpTimes.lower_bound(opTime);
+ if (iter != _queuedOpTimes.end()) {
+ if (iter->first == opTime) {
+ return iter->second.getFuture();
+ }
+ }
+
+ if (iter == _queuedOpTimes.begin()) {
+ // Background thread could already be actively waiting on a later time, so tell it to stop
+ // and wait for the newly requested opTime instead.
+ if (_opCtx) {
+ stdx::lock_guard scopedClientLock(*_opCtx->getClient());
+ _opCtx->getServiceContext()->killOperation(
+ scopedClientLock, _opCtx, ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable);
+ }
+ }
+
+ const bool wasEmpty = _queuedOpTimes.empty();
+ auto resultIter = _queuedOpTimes.emplace_hint(
+ iter, std::piecewise_construct, std::forward_as_tuple(opTime), std::forward_as_tuple());
+
+ if (wasEmpty) {
+ _hasNewOpTimeCV.notify_one();
+ }
+
+ return resultIter->second.getFuture();
+}
+
+void WaitForMajorityService::_periodicallyWaitForMajority(ServiceContext* service) {
+ ThreadClient tc("waitForMajority", service);
+
+ stdx::unique_lock lk(_mutex);
+
+ while (!_inShutDown) {
+ auto opCtx = tc->makeOperationContext();
+ _opCtx = opCtx.get();
+
+ if (!_queuedOpTimes.empty()) {
+ auto lowestOpTimeIter = _queuedOpTimes.begin();
+ auto lowestOpTime = lowestOpTimeIter->first;
+
+ lk.unlock();
+
+ WriteConcernResult ignoreResult;
+ auto status =
+ waitForWriteConcern(_opCtx, lowestOpTime, kMajorityWriteConcern, &ignoreResult);
+
+ lk.lock();
+
+ if (status.isOK()) {
+ _lastOpTimeWaited = lowestOpTime;
+ }
+
+ if (status == ErrorCodes::WaitForMajorityServiceEarlierOpTimeAvailable) {
+ _opCtx = nullptr;
+ continue;
+ }
+
+ if (status.isOK()) {
+ lowestOpTimeIter->second.emplaceValue();
+ } else {
+ lowestOpTimeIter->second.setError(status);
+ }
+
+ _queuedOpTimes.erase(lowestOpTimeIter);
+ }
+
+ if (_queuedOpTimes.empty() && !_inShutDown) {
+ _opCtx->waitForConditionOrInterruptNoAssert(_hasNewOpTimeCV, lk).ignore();
+ }
+
+ _opCtx = nullptr;
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/wait_for_majority_service.h b/src/mongo/db/s/wait_for_majority_service.h
new file mode 100644
index 00000000000..970b475d0d3
--- /dev/null
+++ b/src/mongo/db/s/wait_for_majority_service.h
@@ -0,0 +1,100 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <vector>
+
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/service_context.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/util/future.h"
+
+namespace mongo {
+
+/**
+ * Provides a facility for asynchronously waiting a local opTime to be majority committed.
+ */
+class WaitForMajorityService {
+public:
+ ~WaitForMajorityService();
+
+ static WaitForMajorityService& get(ServiceContext* service);
+
+ /**
+ * Sets up the background thread responsible for waiting for opTimes to be majority committed.
+ */
+ void setUp(ServiceContext* service);
+
+ /**
+ * Blocking method, which shuts down and joins the background thread.
+ */
+ void shutDown();
+
+ /**
+ * Enqueue a request to wait for the given opTime to be majority committed.
+ */
+ SharedSemiFuture<void> waitUntilMajority(const repl::OpTime& opTime);
+
+private:
+ using OpTimeWaitingMap = std::map<repl::OpTime, SharedPromise<void>>;
+
+ /**
+ * Periodically checks the list of opTimes to wait for majority committed.
+ */
+ void _periodicallyWaitForMajority(ServiceContext* service);
+
+ stdx::mutex _mutex;
+
+ // Contains an ordered list of opTimes to wait to be majority comitted.
+ OpTimeWaitingMap _queuedOpTimes;
+
+ // Contains the last opTime that the background thread was able to successfully wait to be
+ // majority comitted.
+ repl::OpTime _lastOpTimeWaited;
+
+ // The background thread.
+ stdx::thread _thread;
+
+ // Use for signalling new opTime requests being queued.
+ stdx::condition_variable _hasNewOpTimeCV;
+
+ // If set, contains a reference to the opCtx being used by the background thread.
+ // Only valid when _thread.joinable() and not nullptr.
+ OperationContext* _opCtx{nullptr};
+
+ // Flag is set to true after shutDown() is called.
+ bool _inShutDown{false};
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/s/wait_for_majority_service_test.cpp b/src/mongo/db/s/wait_for_majority_service_test.cpp
new file mode 100644
index 00000000000..27e9c5ac75b
--- /dev/null
+++ b/src/mongo/db/s/wait_for_majority_service_test.cpp
@@ -0,0 +1,247 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/s/wait_for_majority_service.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+class WaitForMajorityServiceTest : public ServiceContextTest {
+public:
+ void setUp() override {
+ auto service = getServiceContext();
+ waitService()->setUp(service);
+
+ auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(service);
+
+ replCoord->setAwaitReplicationReturnValueFunction([this](OperationContext* opCtx,
+ const repl::OpTime& opTime) {
+ waitForWriteConcernStub(opCtx, opTime);
+ return repl::ReplicationCoordinator::StatusAndDuration(Status::OK(), Milliseconds(0));
+ });
+
+ repl::ReplicationCoordinator::set(service, std::move(replCoord));
+ }
+
+ void tearDown() override {
+ waitService()->shutDown();
+ }
+
+ WaitForMajorityService* waitService() {
+ return &_waitForMajorityService;
+ }
+
+ void finishWaitingOneOpTime() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _isTestReady = true;
+ _isTestReadyCV.notify_one();
+
+ while (_isTestReady) {
+ _finishWaitingOneOpTimeCV.wait(lk);
+ }
+ }
+
+ void waitForWriteConcernStub(OperationContext* opCtx, const repl::OpTime& opTime) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ while (!_isTestReady) {
+ if (!opCtx->waitForConditionOrInterruptNoAssert(_isTestReadyCV, lk).isOK()) {
+ return;
+ }
+ }
+
+ _lastOpTimeWaited = opTime;
+ _isTestReady = false;
+ _finishWaitingOneOpTimeCV.notify_one();
+ }
+
+ const repl::OpTime& getLastOpTimeWaited() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _lastOpTimeWaited;
+ }
+
+private:
+ WaitForMajorityService _waitForMajorityService;
+
+ stdx::mutex _mutex;
+ stdx::condition_variable _isTestReadyCV;
+ stdx::condition_variable _finishWaitingOneOpTimeCV;
+
+ bool _isTestReady{false};
+ repl::OpTime _lastOpTimeWaited;
+};
+
+TEST_F(WaitForMajorityServiceTest, WaitOneOpTime) {
+ repl::OpTime t1(Timestamp(1, 0), 2);
+
+ auto future = waitService()->waitUntilMajority(t1);
+
+ ASSERT_FALSE(future.isReady());
+
+ finishWaitingOneOpTime();
+
+ future.get();
+ ASSERT_EQ(t1, getLastOpTimeWaited());
+}
+
+TEST_F(WaitForMajorityServiceTest, WaitWithSameOpTime) {
+ repl::OpTime t1(Timestamp(1, 0), 2);
+
+ auto future1 = waitService()->waitUntilMajority(t1);
+ auto future1b = waitService()->waitUntilMajority(t1);
+
+ ASSERT_FALSE(future1.isReady());
+ ASSERT_FALSE(future1b.isReady());
+
+ finishWaitingOneOpTime();
+
+ future1.get();
+ future1b.get();
+
+ ASSERT_EQ(t1, getLastOpTimeWaited());
+}
+
+TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanLowestQueued) {
+ repl::OpTime laterOpTime(Timestamp(6, 0), 2);
+ repl::OpTime earlierOpTime(Timestamp(1, 0), 2);
+
+ auto laterFuture = waitService()->waitUntilMajority(laterOpTime);
+ auto earlierFuture = waitService()->waitUntilMajority(earlierOpTime);
+
+ ASSERT_FALSE(laterFuture.isReady());
+ ASSERT_FALSE(earlierFuture.isReady());
+
+ finishWaitingOneOpTime();
+
+ ASSERT_FALSE(laterFuture.isReady());
+
+ earlierFuture.get();
+ ASSERT_EQ(earlierOpTime, getLastOpTimeWaited());
+
+ finishWaitingOneOpTime();
+ laterFuture.get();
+
+ ASSERT_EQ(laterOpTime, getLastOpTimeWaited());
+}
+
+TEST_F(WaitForMajorityServiceTest, WaitWithDifferentOpTime) {
+ repl::OpTime t1(Timestamp(1, 0), 2);
+ repl::OpTime t2(Timestamp(14, 0), 2);
+
+ auto future1 = waitService()->waitUntilMajority(t1);
+ auto future2 = waitService()->waitUntilMajority(t2);
+
+ ASSERT_FALSE(future1.isReady());
+ ASSERT_FALSE(future2.isReady());
+
+ finishWaitingOneOpTime();
+
+ ASSERT_FALSE(future2.isReady());
+
+ future1.get();
+ ASSERT_EQ(t1, getLastOpTimeWaited());
+
+ finishWaitingOneOpTime();
+
+ future2.get();
+ ASSERT_EQ(t2, getLastOpTimeWaited());
+}
+
+TEST_F(WaitForMajorityServiceTest, WaitWithOpTimeEarlierThanOpTimeAlreadyWaited) {
+ repl::OpTime t1(Timestamp(5, 0), 2);
+ repl::OpTime t2(Timestamp(14, 0), 2);
+
+ auto future1 = waitService()->waitUntilMajority(t1);
+ auto future2 = waitService()->waitUntilMajority(t2);
+
+ ASSERT_FALSE(future1.isReady());
+ ASSERT_FALSE(future2.isReady());
+
+ finishWaitingOneOpTime();
+
+ ASSERT_FALSE(future2.isReady());
+
+ future1.get();
+ ASSERT_EQ(t1, getLastOpTimeWaited());
+
+ repl::OpTime oldTs(Timestamp(4, 0), 2);
+ auto oldFuture = waitService()->waitUntilMajority(oldTs);
+ auto alreadyWaitedFuture = waitService()->waitUntilMajority(t1);
+
+ ASSERT_FALSE(future2.isReady());
+
+ oldFuture.get();
+ alreadyWaitedFuture.get();
+ ASSERT_EQ(t1, getLastOpTimeWaited());
+
+ finishWaitingOneOpTime();
+
+ future2.get();
+ ASSERT_EQ(t2, getLastOpTimeWaited());
+}
+
+TEST_F(WaitForMajorityServiceTest, ShutdownShouldCancelQueuedRequests) {
+ repl::OpTime t1(Timestamp(5, 0), 2);
+ repl::OpTime t2(Timestamp(14, 0), 2);
+
+ auto future1 = waitService()->waitUntilMajority(t1);
+ auto future2 = waitService()->waitUntilMajority(t2);
+
+ ASSERT_FALSE(future1.isReady());
+ ASSERT_FALSE(future2.isReady());
+
+ waitService()->shutDown();
+
+ ASSERT_THROWS_CODE(future1.get(), AssertionException, ErrorCodes::InterruptedAtShutdown);
+ ASSERT_THROWS_CODE(future2.get(), AssertionException, ErrorCodes::InterruptedAtShutdown);
+}
+
+TEST_F(WaitForMajorityServiceTest, WriteConcernErrorGetsPropagatedCorrectly) {
+ repl::OpTime t(Timestamp(5, 0), 2);
+
+ auto replCoord = dynamic_cast<repl::ReplicationCoordinatorMock*>(
+ repl::ReplicationCoordinator::get(getServiceContext()));
+ replCoord->setAwaitReplicationReturnValueFunction(
+ [this](OperationContext* opCtx, const repl::OpTime& opTime) {
+ return repl::ReplicationCoordinator::StatusAndDuration(
+ {ErrorCodes::PrimarySteppedDown, "test stepdown"}, Milliseconds(0));
+ });
+
+ auto future = waitService()->waitUntilMajority(t);
+ ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::PrimarySteppedDown);
+}
+
+} // namespace
+} // namespace mongo