summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp51
1 files changed, 42 insertions, 9 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 7f433b3e837..aafca514fd2 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -879,6 +879,7 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) {
}
_replicationWaiterList.signalAll_inlock();
_opTimeWaiterList.signalAll_inlock();
+ _wMajorityWriteAvailabilityWaiter.reset();
_currentCommittedSnapshotCond.notify_all();
_initialSyncer.swap(initialSyncerCopy);
}
@@ -2118,6 +2119,7 @@ void ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx,
// Clear the node's election candidate metrics since it is no longer primary.
ReplicationMetrics::get(opCtx).clearElectionCandidateMetrics();
+ _wMajorityWriteAvailabilityWaiter.reset();
_topCoord->finishUnconditionalStepDown();
@@ -2702,6 +2704,7 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig(OperationContext* opCtx,
// Clear the node's election candidate metrics since it is no longer primary.
ReplicationMetrics::get(opCtx).clearElectionCandidateMetrics();
+ _wMajorityWriteAvailabilityWaiter.reset();
} else {
// Release the rstl lock as the node might have stepped down due to
// other unconditional step down code paths like learning new term via heartbeat &
@@ -3181,7 +3184,7 @@ void ReplicationCoordinatorImpl::incrementNumCatchUpOpsIfCatchingUp(int numOps)
void ReplicationCoordinatorImpl::signalDropPendingCollectionsRemovedFromStorage() {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- _wakeReadyWaiters_inlock();
+ _wakeReadyWaiters(lock);
}
boost::optional<Timestamp> ReplicationCoordinatorImpl::getRecoveryTimestamp() {
@@ -3273,10 +3276,26 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig(WithLock lk,
return action;
}
-void ReplicationCoordinatorImpl::_wakeReadyWaiters_inlock() {
+void ReplicationCoordinatorImpl::_wakeReadyWaiters(WithLock lk) {
_replicationWaiterList.signalIf_inlock([this](Waiter* waiter) {
return _doneWaitingForReplication_inlock(waiter->opTime, *waiter->writeConcern);
});
+
+ if (_wMajorityWriteAvailabilityWaiter) {
+ WriteConcernOptions kMajorityWriteConcern(
+ WriteConcernOptions::kMajority,
+ WriteConcernOptions::SyncMode::UNSET,
+ // The timeout isn't used by _doneWaitingForReplication_inlock.
+ WriteConcernOptions::kNoTimeout);
+ kMajorityWriteConcern =
+ _populateUnsetWriteConcernOptionsSyncMode(lk, kMajorityWriteConcern);
+
+ if (_doneWaitingForReplication_inlock(_wMajorityWriteAvailabilityWaiter->opTime,
+ kMajorityWriteConcern)) {
+ _wMajorityWriteAvailabilityWaiter->notify_inlock();
+ _wMajorityWriteAvailabilityWaiter.reset();
+ }
+ }
}
Status ReplicationCoordinatorImpl::processReplSetUpdatePosition(const UpdatePositionArgs& updates,
@@ -3504,7 +3523,7 @@ void ReplicationCoordinatorImpl::_updateLastCommittedOpTimeAndWallTime(WithLock
// check satisfied. We must do this regardless of whether we updated the lastCommittedOpTime,
// as lastCommittedOpTime may be based on durable optimes whereas some waiters may be
// waiting on applied (but not necessarily durable) optimes.
- _wakeReadyWaiters_inlock();
+ _wakeReadyWaiters(lk);
}
boost::optional<OpTimeAndWallTime> ReplicationCoordinatorImpl::_chooseStableOpTimeFromCandidates(
@@ -3650,7 +3669,7 @@ void ReplicationCoordinatorImpl::_setStableTimestampForStorage(WithLock lk) {
if (serverGlobalParams.enableMajorityReadConcern) {
// When majority read concern is enabled, the committed snapshot is set to the new
// stable optime.
- if (_updateCommittedSnapshot_inlock(stableOpTime.value())) {
+ if (_updateCommittedSnapshot(lk, stableOpTime.value())) {
// Update the stable timestamp for the storage engine.
_storage->setStableTimestamp(getServiceContext(),
stableOpTime->opTime.getTimestamp());
@@ -3666,7 +3685,7 @@ void ReplicationCoordinatorImpl::_setStableTimestampForStorage(WithLock lk) {
// lastCommittedOpTime is set to be the lastApplied which can be ahead of the
// allCommitted.
auto newCommittedSnapshot = std::min(lastCommittedOpTime, *stableOpTime);
- _updateCommittedSnapshot_inlock(newCommittedSnapshot);
+ _updateCommittedSnapshot(lk, newCommittedSnapshot);
}
// Set the stable timestamp regardless of whether the majority commit point moved
// forward.
@@ -3923,8 +3942,17 @@ size_t ReplicationCoordinatorImpl::getNumUncommittedSnapshots() {
return _uncommittedSnapshotsSize.load();
}
-bool ReplicationCoordinatorImpl::_updateCommittedSnapshot_inlock(
- const OpTimeAndWallTime& newCommittedSnapshot) {
+void ReplicationCoordinatorImpl::createWMajorityWriteAvailabilityDateWaiter(OpTime opTime) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ auto opTimeCB = [this, opTime]() {
+ ReplicationMetrics::get(getServiceContext())
+ .setWMajorityWriteAvailabilityDate(_replExecutor->now());
+ };
+ _wMajorityWriteAvailabilityWaiter = std::make_unique<CallbackWaiter>(opTime, opTimeCB);
+}
+
+bool ReplicationCoordinatorImpl::_updateCommittedSnapshot(
+ WithLock lk, const OpTimeAndWallTime& newCommittedSnapshot) {
if (gTestingSnapshotBehaviorInIsolation) {
return false;
}
@@ -3956,7 +3984,7 @@ bool ReplicationCoordinatorImpl::_updateCommittedSnapshot_inlock(
_externalState->updateCommittedSnapshot(newCommittedSnapshot.opTime);
// Wake up any threads waiting for read concern or write concern.
- _wakeReadyWaiters_inlock();
+ _wakeReadyWaiters(lk);
return true;
}
@@ -4007,11 +4035,16 @@ EventHandle ReplicationCoordinatorImpl::_makeEvent() {
WriteConcernOptions ReplicationCoordinatorImpl::populateUnsetWriteConcernOptionsSyncMode(
WriteConcernOptions wc) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ return _populateUnsetWriteConcernOptionsSyncMode(lock, wc);
+}
+WriteConcernOptions ReplicationCoordinatorImpl::_populateUnsetWriteConcernOptionsSyncMode(
+ WithLock lk, WriteConcernOptions wc) {
WriteConcernOptions writeConcern(wc);
if (writeConcern.syncMode == WriteConcernOptions::SyncMode::UNSET) {
if (writeConcern.wMode == WriteConcernOptions::kMajority &&
- getWriteConcernMajorityShouldJournal()) {
+ getWriteConcernMajorityShouldJournal_inlock()) {
writeConcern.syncMode = WriteConcernOptions::SyncMode::JOURNAL;
} else {
writeConcern.syncMode = WriteConcernOptions::SyncMode::NONE;