summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp18
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h4
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp10
3 files changed, 16 insertions, 16 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 78141e4987e..851c91dac40 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -921,9 +921,8 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
return;
}
invariant(!_isCatchingUp);
- invariant(!_canAcceptNonLocalWrites);
_isWaitingForDrainToComplete = false;
- _drainFinishedCond_forTest.notify_all();
+ _drainFinishedCond.notify_all();
if (!_getMemberState_inlock().primary()) {
// We must have decided not to transition to primary while waiting for the applier to drain.
@@ -931,6 +930,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
return;
}
+ invariant(!_canAcceptNonLocalWrites);
_canAcceptNonLocalWrites = true;
lk.unlock();
@@ -952,7 +952,7 @@ Status ReplicationCoordinatorImpl::waitForDrainFinish(Milliseconds timeout) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto pred = [this]() { return !_isCatchingUp && !_isWaitingForDrainToComplete; };
- if (!_drainFinishedCond_forTest.wait_for(lk, timeout.toSystemDuration(), pred)) {
+ if (!_drainFinishedCond.wait_for(lk, timeout.toSystemDuration(), pred)) {
return Status(ErrorCodes::ExceededTimeLimit,
"Timed out waiting to finish draining applier buffer");
}
@@ -2543,12 +2543,10 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() {
_replicationWaiterList.signalAndRemoveAll_inlock();
// Wake up the optime waiter that is waiting for primary catch-up to finish.
_opTimeWaiterList.signalAndRemoveAll_inlock();
- // Clean up primary states.
+ // _isCatchingUp and _isWaitingForDrainToComplete could be cleaned up asynchronously
+ // by freshness scan.
_canAcceptNonLocalWrites = false;
- _isCatchingUp = false;
- _isWaitingForDrainToComplete = false;
_stepDownPending = false;
- _drainFinishedCond_forTest.notify_all();
serverGlobalParams.featureCompatibility.validateFeaturesAsMaster.store(false);
result = kActionCloseAllConnections;
} else {
@@ -2682,6 +2680,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() {
auto evhStatus =
scanner->start(&_replExecutor, _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod());
if (evhStatus == ErrorCodes::ShutdownInProgress) {
+ _finishCatchUpOplog_inlock(true);
return;
}
fassertStatusOK(40254, evhStatus.getStatus());
@@ -2690,7 +2689,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() {
evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) {
LockGuard lk(_mutex);
if (cbData.status == ErrorCodes::CallbackCanceled) {
- _finishCatchUpOplog_inlock(false);
+ _finishCatchUpOplog_inlock(true);
return;
}
auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod();
@@ -2757,11 +2756,10 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
}
void ReplicationCoordinatorImpl::_finishCatchUpOplog_inlock(bool startToDrain) {
+ invariant(_isCatchingUp);
_isCatchingUp = false;
// If the node steps down during the catch-up, we don't go into drain mode.
if (startToDrain) {
- invariant(_getMemberState_inlock().primary());
- invariant(!_canAcceptNonLocalWrites);
invariant(!_isWaitingForDrainToComplete);
_isWaitingForDrainToComplete = true;
// Signal applier in executor to avoid the deadlock with bgsync's mutex that is required to
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 25e448500fd..14b9571b7f4 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -1222,8 +1222,8 @@ private:
// Current ReplicaSet state.
MemberState _memberState; // (MX)
- // Used to signal threads waiting for changes to _memberState. Only used in testing.
- stdx::condition_variable _drainFinishedCond_forTest; // (M)
+ // Used to signal threads waiting for changes to _memberState.
+ stdx::condition_variable _drainFinishedCond; // (M)
// True if we are waiting for the applier to finish draining.
bool _isWaitingForDrainToComplete; // (M)
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
index 23b9df8e219..7bbe9000bc7 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
@@ -1348,12 +1348,11 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringFreshnessScan) {
ASSERT_TRUE(evh.isValid());
getReplExec()->waitForEvent(evh);
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
- ASSERT_FALSE(getReplCoord()->isCatchingUp());
- ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain());
auto net = getNet();
net->enterNetwork();
net->runUntil(net->now() + config.getCatchUpTimeoutPeriod());
net->exitNetwork();
+ ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain());
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Stopped transition to primary"));
ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase("test"));
@@ -1378,12 +1377,15 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringCatchUp) {
ASSERT_TRUE(evh.isValid());
getReplExec()->waitForEvent(evh);
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
- ASSERT_FALSE(getReplCoord()->isCatchingUp());
- ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain());
auto net = getNet();
net->enterNetwork();
net->runReadyNetworkOperations();
net->exitNetwork();
+ auto txn = makeOperationContext();
+ // Simulate bgsync signaling replCoord to exit drain mode.
+ // At this point, we see the stepdown and reset the states.
+ getReplCoord()->signalDrainComplete(txn.get());
+ ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain());
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary"));
ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase("test"));