diff options
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 51 |
1 files changed, 42 insertions, 9 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 7f433b3e837..aafca514fd2 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -879,6 +879,7 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) { } _replicationWaiterList.signalAll_inlock(); _opTimeWaiterList.signalAll_inlock(); + _wMajorityWriteAvailabilityWaiter.reset(); _currentCommittedSnapshotCond.notify_all(); _initialSyncer.swap(initialSyncerCopy); } @@ -2118,6 +2119,7 @@ void ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx, // Clear the node's election candidate metrics since it is no longer primary. ReplicationMetrics::get(opCtx).clearElectionCandidateMetrics(); + _wMajorityWriteAvailabilityWaiter.reset(); _topCoord->finishUnconditionalStepDown(); @@ -2702,6 +2704,7 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig(OperationContext* opCtx, // Clear the node's election candidate metrics since it is no longer primary. ReplicationMetrics::get(opCtx).clearElectionCandidateMetrics(); + _wMajorityWriteAvailabilityWaiter.reset(); } else { // Release the rstl lock as the node might have stepped down due to // other unconditional step down code paths like learning new term via heartbeat & @@ -3181,7 +3184,7 @@ void ReplicationCoordinatorImpl::incrementNumCatchUpOpsIfCatchingUp(int numOps) void ReplicationCoordinatorImpl::signalDropPendingCollectionsRemovedFromStorage() { stdx::lock_guard<stdx::mutex> lock(_mutex); - _wakeReadyWaiters_inlock(); + _wakeReadyWaiters(lock); } boost::optional<Timestamp> ReplicationCoordinatorImpl::getRecoveryTimestamp() { @@ -3273,10 +3276,26 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig(WithLock lk, return action; } -void ReplicationCoordinatorImpl::_wakeReadyWaiters_inlock() { +void ReplicationCoordinatorImpl::_wakeReadyWaiters(WithLock lk) { _replicationWaiterList.signalIf_inlock([this](Waiter* waiter) { return _doneWaitingForReplication_inlock(waiter->opTime, *waiter->writeConcern); }); + + if (_wMajorityWriteAvailabilityWaiter) { + WriteConcernOptions kMajorityWriteConcern( + WriteConcernOptions::kMajority, + WriteConcernOptions::SyncMode::UNSET, + // The timeout isn't used by _doneWaitingForReplication_inlock. + WriteConcernOptions::kNoTimeout); + kMajorityWriteConcern = + _populateUnsetWriteConcernOptionsSyncMode(lk, kMajorityWriteConcern); + + if (_doneWaitingForReplication_inlock(_wMajorityWriteAvailabilityWaiter->opTime, + kMajorityWriteConcern)) { + _wMajorityWriteAvailabilityWaiter->notify_inlock(); + _wMajorityWriteAvailabilityWaiter.reset(); + } + } } Status ReplicationCoordinatorImpl::processReplSetUpdatePosition(const UpdatePositionArgs& updates, @@ -3504,7 +3523,7 @@ void ReplicationCoordinatorImpl::_updateLastCommittedOpTimeAndWallTime(WithLock // check satisfied. We must do this regardless of whether we updated the lastCommittedOpTime, // as lastCommittedOpTime may be based on durable optimes whereas some waiters may be // waiting on applied (but not necessarily durable) optimes. - _wakeReadyWaiters_inlock(); + _wakeReadyWaiters(lk); } boost::optional<OpTimeAndWallTime> ReplicationCoordinatorImpl::_chooseStableOpTimeFromCandidates( @@ -3650,7 +3669,7 @@ void ReplicationCoordinatorImpl::_setStableTimestampForStorage(WithLock lk) { if (serverGlobalParams.enableMajorityReadConcern) { // When majority read concern is enabled, the committed snapshot is set to the new // stable optime. - if (_updateCommittedSnapshot_inlock(stableOpTime.value())) { + if (_updateCommittedSnapshot(lk, stableOpTime.value())) { // Update the stable timestamp for the storage engine. _storage->setStableTimestamp(getServiceContext(), stableOpTime->opTime.getTimestamp()); @@ -3666,7 +3685,7 @@ void ReplicationCoordinatorImpl::_setStableTimestampForStorage(WithLock lk) { // lastCommittedOpTime is set to be the lastApplied which can be ahead of the // allCommitted. auto newCommittedSnapshot = std::min(lastCommittedOpTime, *stableOpTime); - _updateCommittedSnapshot_inlock(newCommittedSnapshot); + _updateCommittedSnapshot(lk, newCommittedSnapshot); } // Set the stable timestamp regardless of whether the majority commit point moved // forward. @@ -3923,8 +3942,17 @@ size_t ReplicationCoordinatorImpl::getNumUncommittedSnapshots() { return _uncommittedSnapshotsSize.load(); } -bool ReplicationCoordinatorImpl::_updateCommittedSnapshot_inlock( - const OpTimeAndWallTime& newCommittedSnapshot) { +void ReplicationCoordinatorImpl::createWMajorityWriteAvailabilityDateWaiter(OpTime opTime) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + auto opTimeCB = [this, opTime]() { + ReplicationMetrics::get(getServiceContext()) + .setWMajorityWriteAvailabilityDate(_replExecutor->now()); + }; + _wMajorityWriteAvailabilityWaiter = std::make_unique<CallbackWaiter>(opTime, opTimeCB); +} + +bool ReplicationCoordinatorImpl::_updateCommittedSnapshot( + WithLock lk, const OpTimeAndWallTime& newCommittedSnapshot) { if (gTestingSnapshotBehaviorInIsolation) { return false; } @@ -3956,7 +3984,7 @@ bool ReplicationCoordinatorImpl::_updateCommittedSnapshot_inlock( _externalState->updateCommittedSnapshot(newCommittedSnapshot.opTime); // Wake up any threads waiting for read concern or write concern. - _wakeReadyWaiters_inlock(); + _wakeReadyWaiters(lk); return true; } @@ -4007,11 +4035,16 @@ EventHandle ReplicationCoordinatorImpl::_makeEvent() { WriteConcernOptions ReplicationCoordinatorImpl::populateUnsetWriteConcernOptionsSyncMode( WriteConcernOptions wc) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _populateUnsetWriteConcernOptionsSyncMode(lock, wc); +} +WriteConcernOptions ReplicationCoordinatorImpl::_populateUnsetWriteConcernOptionsSyncMode( + WithLock lk, WriteConcernOptions wc) { WriteConcernOptions writeConcern(wc); if (writeConcern.syncMode == WriteConcernOptions::SyncMode::UNSET) { if (writeConcern.wMode == WriteConcernOptions::kMajority && - getWriteConcernMajorityShouldJournal()) { + getWriteConcernMajorityShouldJournal_inlock()) { writeConcern.syncMode = WriteConcernOptions::SyncMode::JOURNAL; } else { writeConcern.syncMode = WriteConcernOptions::SyncMode::NONE; |