summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp39
1 files changed, 38 insertions, 1 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 2a8eec4863b..74adb1afb02 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1059,7 +1059,6 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) {
_initialSyncer.swap(initialSyncerCopy);
}
-
// joining the replication executor is blocking so it must be run outside of the mutex
if (initialSyncerCopy) {
LOGV2_DEBUG(
@@ -1074,6 +1073,17 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* opCtx) {
initialSyncerCopy->join();
initialSyncerCopy.reset();
}
+
+ {
+ stdx::unique_lock<Latch> lk(_mutex);
+ if (_finishedDrainingPromise) {
+ _finishedDrainingPromise->setError(
+ {ErrorCodes::InterruptedAtShutdown,
+ "Cancelling wait for drain mode to complete due to shutdown"});
+ _finishedDrainingPromise = boost::none;
+ }
+ }
+
_externalState->shutdown(opCtx);
_replExecutor->shutdown();
_replExecutor->join();
@@ -1192,6 +1202,23 @@ ReplicationCoordinator::ApplierState ReplicationCoordinatorImpl::getApplierState
void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx,
long long termWhenBufferIsEmpty) noexcept {
+ {
+ stdx::unique_lock<Latch> lk(_mutex);
+ if (_applierState == ReplicationCoordinator::ApplierState::DrainingForShardSplit) {
+ _applierState = ApplierState::Stopped;
+ auto memberState = _getMemberState_inlock();
+ invariant(memberState.secondary() || memberState.startup());
+ _externalState->onDrainComplete(opCtx);
+
+ if (_finishedDrainingPromise) {
+ _finishedDrainingPromise->emplaceValue();
+ _finishedDrainingPromise = boost::none;
+ }
+
+ return;
+ }
+ }
+
// This logic is a little complicated in order to avoid acquiring the RSTL in mode X
// unnecessarily. This is important because the applier may call signalDrainComplete()
// whenever it wants, not only when the ReplicationCoordinator is expecting it.
@@ -4844,6 +4871,16 @@ void ReplicationCoordinatorImpl::_enterDrainMode_inlock() {
_externalState->stopProducer();
}
+Future<void> ReplicationCoordinatorImpl::_drainForShardSplit() {
+ stdx::lock_guard<Latch> lk(_mutex);
+ invariant(!_finishedDrainingPromise.has_value());
+ auto [promise, future] = makePromiseFuture<void>();
+ _finishedDrainingPromise = std::move(promise);
+ _applierState = ApplierState::DrainingForShardSplit;
+ _externalState->stopProducer();
+ return std::move(future);
+}
+
ReplicationCoordinatorImpl::PostMemberStateUpdateAction
ReplicationCoordinatorImpl::_setCurrentRSConfig(WithLock lk,
OperationContext* opCtx,