summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp18
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h4
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp10
3 files changed, 16 insertions, 16 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 3de98571a7c..526b03d9b38 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -921,8 +921,9 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
return;
}
invariant(!_isCatchingUp);
+ invariant(!_canAcceptNonLocalWrites);
_isWaitingForDrainToComplete = false;
- _drainFinishedCond.notify_all();
+ _drainFinishedCond_forTest.notify_all();
if (!_getMemberState_inlock().primary()) {
// We must have decided not to transition to primary while waiting for the applier to drain.
@@ -930,7 +931,6 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
return;
}
- invariant(!_canAcceptNonLocalWrites);
_canAcceptNonLocalWrites = true;
lk.unlock();
@@ -952,7 +952,7 @@ Status ReplicationCoordinatorImpl::waitForDrainFinish(Milliseconds timeout) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto pred = [this]() { return !_isCatchingUp && !_isWaitingForDrainToComplete; };
- if (!_drainFinishedCond.wait_for(lk, timeout.toSystemDuration(), pred)) {
+ if (!_drainFinishedCond_forTest.wait_for(lk, timeout.toSystemDuration(), pred)) {
return Status(ErrorCodes::ExceededTimeLimit,
"Timed out waiting to finish draining applier buffer");
}
@@ -2516,9 +2516,11 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() {
_replicationWaiterList.signalAndRemoveAll_inlock();
// Wake up the optime waiter that is waiting for primary catch-up to finish.
_opTimeWaiterList.signalAndRemoveAll_inlock();
- // _isCatchingUp and _isWaitingForDrainToComplete could be cleaned up asynchronously
- // by freshness scan.
+ // Clean up primary states.
_canAcceptNonLocalWrites = false;
+ _isCatchingUp = false;
+ _isWaitingForDrainToComplete = false;
+ _drainFinishedCond_forTest.notify_all();
serverGlobalParams.featureCompatibility.validateFeaturesAsMaster.store(false);
result = kActionCloseAllConnections;
} else {
@@ -2652,7 +2654,6 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() {
auto evhStatus =
scanner->start(&_replExecutor, _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod());
if (evhStatus == ErrorCodes::ShutdownInProgress) {
- _finishCatchUpOplog_inlock(true);
return;
}
fassertStatusOK(40254, evhStatus.getStatus());
@@ -2661,7 +2662,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() {
evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) {
LockGuard lk(_mutex);
if (cbData.status == ErrorCodes::CallbackCanceled) {
- _finishCatchUpOplog_inlock(true);
+ _finishCatchUpOplog_inlock(false);
return;
}
auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod();
@@ -2728,10 +2729,11 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
}
void ReplicationCoordinatorImpl::_finishCatchUpOplog_inlock(bool startToDrain) {
- invariant(_isCatchingUp);
_isCatchingUp = false;
// If the node steps down during the catch-up, we don't go into drain mode.
if (startToDrain) {
+ invariant(_getMemberState_inlock().primary());
+ invariant(!_canAcceptNonLocalWrites);
invariant(!_isWaitingForDrainToComplete);
_isWaitingForDrainToComplete = true;
// Signal applier in executor to avoid the deadlock with bgsync's mutex that is required to
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index d2d95ea44aa..8597f9c21ce 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -1209,8 +1209,8 @@ private:
// Current ReplicaSet state.
MemberState _memberState; // (MX)
- // Used to signal threads waiting for changes to _memberState.
- stdx::condition_variable _drainFinishedCond; // (M)
+ // Used to signal threads waiting for changes to _memberState. Only used in testing.
+ stdx::condition_variable _drainFinishedCond_forTest; // (M)
// True if we are waiting for the applier to finish draining.
bool _isWaitingForDrainToComplete; // (M)
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
index 7bbe9000bc7..23b9df8e219 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
@@ -1348,11 +1348,12 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringFreshnessScan) {
ASSERT_TRUE(evh.isValid());
getReplExec()->waitForEvent(evh);
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+ ASSERT_FALSE(getReplCoord()->isCatchingUp());
+ ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain());
auto net = getNet();
net->enterNetwork();
net->runUntil(net->now() + config.getCatchUpTimeoutPeriod());
net->exitNetwork();
- ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain());
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Stopped transition to primary"));
ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase("test"));
@@ -1377,15 +1378,12 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringCatchUp) {
ASSERT_TRUE(evh.isValid());
getReplExec()->waitForEvent(evh);
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+ ASSERT_FALSE(getReplCoord()->isCatchingUp());
+ ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain());
auto net = getNet();
net->enterNetwork();
net->runReadyNetworkOperations();
net->exitNetwork();
- auto txn = makeOperationContext();
- // Simulate bgsync signaling replCoord to exit drain mode.
- // At this point, we see the stepdown and reset the states.
- getReplCoord()->signalDrainComplete(txn.get());
- ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain());
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary"));
ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase("test"));