diff options
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 39 |
1 files changed, 38 insertions, 1 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 2a8eec4863b..74adb1afb02 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1059,7 +1059,6 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) { _initialSyncer.swap(initialSyncerCopy); } - // joining the replication executor is blocking so it must be run outside of the mutex if (initialSyncerCopy) { LOGV2_DEBUG( @@ -1074,6 +1073,17 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) { initialSyncerCopy->join(); initialSyncerCopy.reset(); } + + { + stdx::unique_lock<Latch> lk(_mutex); + if (_finishedDrainingPromise) { + _finishedDrainingPromise->setError( + {ErrorCodes::InterruptedAtShutdown, + "Cancelling wait for drain mode to complete due to shutdown"}); + _finishedDrainingPromise = boost::none; + } + } + _externalState->shutdown(opCtx); _replExecutor->shutdown(); _replExecutor->join(); @@ -1192,6 +1202,23 @@ ReplicationCoordinator::ApplierState ReplicationCoordinatorImpl::getApplierState void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx, long long termWhenBufferIsEmpty) noexcept { + { + stdx::unique_lock<Latch> lk(_mutex); + if (_applierState == ReplicationCoordinator::ApplierState::DrainingForShardSplit) { + _applierState = ApplierState::Stopped; + auto memberState = _getMemberState_inlock(); + invariant(memberState.secondary() || memberState.startup()); + _externalState->onDrainComplete(opCtx); + + if (_finishedDrainingPromise) { + _finishedDrainingPromise->emplaceValue(); + _finishedDrainingPromise = boost::none; + } + + return; + } + } + // This logic is a little complicated in order to avoid acquiring the RSTL in mode X // unnecessarily. This is important because the applier may call signalDrainComplete() // whenever it wants, not only when the ReplicationCoordinator is expecting it. @@ -4844,6 +4871,16 @@ void ReplicationCoordinatorImpl::_enterDrainMode_inlock() { _externalState->stopProducer(); } +Future<void> ReplicationCoordinatorImpl::_drainForShardSplit() { + stdx::lock_guard<Latch> lk(_mutex); + invariant(!_finishedDrainingPromise.has_value()); + auto [promise, future] = makePromiseFuture<void>(); + _finishedDrainingPromise = std::move(promise); + _applierState = ApplierState::DrainingForShardSplit; + _externalState->stopProducer(); + return std::move(future); +} + ReplicationCoordinatorImpl::PostMemberStateUpdateAction ReplicationCoordinatorImpl::_setCurrentRSConfig(WithLock lk, OperationContext* opCtx, |