summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMihai Andrei <mihai.andrei@mongodb.com>2019-10-08 22:52:40 +0000
committerevergreen <evergreen@mongodb.com>2019-10-08 22:52:40 +0000
commit1d1379a2b019df47c8a49fdba107b46aa54736c7 (patch)
tree15e59acd954a470fd75137f727458cac3195f66f
parent331535530240cb91e44002fca36ec9927548ada9 (diff)
downloadmongo-1d1379a2b019df47c8a49fdba107b46aa54736c7.tar.gz
SERVER-42996 Move ApplierState to OplogApplier
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/oplog_applier.cpp10
-rw-r--r--src/mongo/db/repl/oplog_applier.h61
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp5
-rw-r--r--src/mongo/db/repl/opqueue_batcher.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator.h58
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h11
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp9
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp36
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h4
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp19
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h4
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp56
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp9
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_noop.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_noop.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp7
-rw-r--r--src/mongo/embedded/replication_coordinator_embedded.cpp4
-rw-r--r--src/mongo/embedded/replication_coordinator_embedded.h2
22 files changed, 180 insertions, 132 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index a0283ce3ae5..0dd0478e690 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -722,6 +722,7 @@ env.Library(
'storage_interface_mock.cpp',
],
LIBDEPS=[
+ 'oplog_application_interface',
'oplog_buffer_blocking_queue',
'repl_coordinator_interface',
'repl_settings',
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp
index 4c854beb0c2..4ef95f94816 100644
--- a/src/mongo/db/repl/oplog_applier.cpp
+++ b/src/mongo/db/repl/oplog_applier.cpp
@@ -287,6 +287,16 @@ void OplogApplier::_consume(OperationContext* opCtx, OplogBuffer* oplogBuffer) {
invariant(oplogBuffer->tryPop(opCtx, &opToPopAndDiscard) || inShutdown());
}
+OplogApplier::ApplierState OplogApplier::getApplierState() const {
+ stdx::lock_guard<Latch> lock(_mutex);
+ return _applierState;
+}
+
+void OplogApplier::setApplierState(ApplierState st) {
+ stdx::lock_guard<Latch> lock(_mutex);
+ _applierState = st;
+}
+
std::unique_ptr<ThreadPool> makeReplWriterPool() {
return makeReplWriterPool(replWriterThreadCount);
}
diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h
index b6bcaf08b77..722d5516719 100644
--- a/src/mongo/db/repl/oplog_applier.h
+++ b/src/mongo/db/repl/oplog_applier.h
@@ -190,6 +190,65 @@ public:
StatusWith<OpTime> multiApply(OperationContext* opCtx, Operations ops);
const Options& getOptions() const;
+ /**
+ * Step-up
+ * =======
+ * On stepup, repl coord enters catch-up mode. It's the same as the secondary mode from
+ * the perspective of producer and applier, so there's nothing to do with them.
+ * When a node enters drain mode, producer state = Stopped, applier state = Draining.
+ *
+ * If the applier state is Draining, it will signal repl coord when there's nothing to apply.
+ * The applier goes into Stopped state at the same time.
+ *
+ * The states go like the following:
+ * - secondary and during catchup mode
+ * (producer: Running, applier: Running)
+ * |
+ * | finish catch-up, enter drain mode
+ * V
+ * - drain mode
+ * (producer: Stopped, applier: Draining)
+ * |
+ * | applier signals drain is complete
+ * V
+ * - primary is in master mode
+ * (producer: Stopped, applier: Stopped)
+ *
+ *
+ * Step-down
+ * =========
+ * The state transitions become:
+ * - primary is in master mode
+ * (producer: Stopped, applier: Stopped)
+ * |
+ * | step down
+ * V
+ * - secondary mode, starting bgsync
+ * (producer: Starting, applier: Running)
+ * |
+ * | bgsync runs start()
+ * V
+ * - secondary mode, normal
+ * (producer: Running, applier: Running)
+ *
+ * When a node steps down during draining mode, it's OK to change from (producer: Stopped,
+ * applier: Draining) to (producer: Starting, applier: Running).
+ *
+ * When a node steps down during catchup mode, the states remain the same (producer: Running,
+ * applier: Running).
+ */
+ enum class ApplierState { Running, Draining, Stopped };
+
+ /**
+ * In normal cases: Running -> Draining -> Stopped -> Running.
+ * Draining -> Running is also possible if a node steps down during drain mode.
+ *
+ * Only the applier can make the transition from Draining to Stopped by calling
+ * signalDrainComplete().
+ */
+ virtual ApplierState getApplierState() const;
+
+ virtual void setApplierState(ApplierState st);
private:
/**
@@ -228,6 +287,8 @@ private:
// Configures this OplogApplier.
const Options _options;
+
+ ApplierState _applierState = ApplierState::Running;
};
/**
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index 899bae5683d..50235183d3b 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -557,10 +557,7 @@ StatusWith<OpTime> OplogApplierImpl::_multiApply(OperationContext* opCtx,
// entries from the oplog until we finish writing.
Lock::ParallelBatchWriterMode pbwm(opCtx->lockState());
- // TODO (SERVER-42996): This is a temporary invariant to protect against segfaults. This will
- // be removed once ApplierState is moved from ReplicationCoordinator to OplogApplier.
- invariant(_replCoord);
- if (_replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped) {
+ if (getApplierState() == ApplierState::Stopped) {
severe() << "attempting to replicate ops while primary";
return {ErrorCodes::CannotApplyOplogWhilePrimary,
"attempting to replicate ops while primary"};
diff --git a/src/mongo/db/repl/opqueue_batcher.cpp b/src/mongo/db/repl/opqueue_batcher.cpp
index a7b41404193..c0f9e604393 100644
--- a/src/mongo/db/repl/opqueue_batcher.cpp
+++ b/src/mongo/db/repl/opqueue_batcher.cpp
@@ -154,7 +154,7 @@ void OpQueueBatcher::run() {
// Draining state guarantees the producer has already been fully stopped and no more
// operations will be pushed in to the oplog buffer until the applier state changes.
auto isDraining =
- replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining;
+ _oplogApplier->getApplierState() == OplogApplier::ApplierState::Draining;
// Check the oplog buffer after the applier state to ensure the producer is stopped.
if (isDraining && _oplogBuffer->isEmpty()) {
ops.setTermWhenExhausted(termWhenBufferIsEmpty);
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index 63d51768d53..525cb1a102e 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -484,64 +484,6 @@ public:
virtual Status setFollowerModeStrict(OperationContext* opCtx, const MemberState& newState) = 0;
/**
- * Step-up
- * =======
- * On stepup, repl coord enters catch-up mode. It's the same as the secondary mode from
- * the perspective of producer and applier, so there's nothing to do with them.
- * When a node enters drain mode, producer state = Stopped, applier state = Draining.
- *
- * If the applier state is Draining, it will signal repl coord when there's nothing to apply.
- * The applier goes into Stopped state at the same time.
- *
- * The states go like the following:
- * - secondary and during catchup mode
- * (producer: Running, applier: Running)
- * |
- * | finish catch-up, enter drain mode
- * V
- * - drain mode
- * (producer: Stopped, applier: Draining)
- * |
- * | applier signals drain is complete
- * V
- * - primary is in master mode
- * (producer: Stopped, applier: Stopped)
- *
- *
- * Step-down
- * =========
- * The state transitions become:
- * - primary is in master mode
- * (producer: Stopped, applier: Stopped)
- * |
- * | step down
- * V
- * - secondary mode, starting bgsync
- * (producer: Starting, applier: Running)
- * |
- * | bgsync runs start()
- * V
- * - secondary mode, normal
- * (producer: Running, applier: Running)
- *
- * When a node steps down during draining mode, it's OK to change from (producer: Stopped,
- * applier: Draining) to (producer: Starting, applier: Running).
- *
- * When a node steps down during catchup mode, the states remain the same (producer: Running,
- * applier: Running).
- */
- enum class ApplierState { Running, Draining, Stopped };
-
- /**
- * In normal cases: Running -> Draining -> Stopped -> Running.
- * Draining -> Running is also possible if a node steps down during drain mode.
- *
- * Only the applier can make the transition from Draining to Stopped by calling
- * signalDrainComplete().
- */
- virtual ApplierState getApplierState() = 0;
-
- /**
* Signals that a previously requested pause and drain of the applier buffer
* has completed.
*
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index dd6f4e507ac..fa6c4bbad73 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -35,6 +35,7 @@
#include "mongo/bson/timestamp.h"
#include "mongo/db/repl/member_state.h"
+#include "mongo/db/repl/oplog_applier.h"
#include "mongo/db/repl/optime.h"
#include "mongo/executor/task_executor.h"
#include "mongo/util/concurrency/thread_pool.h"
@@ -301,6 +302,16 @@ public:
virtual std::size_t getOplogFetcherInitialSyncMaxFetcherRestarts() const = 0;
/*
+ * Returns the OplogApplier's current state.
+ */
+ virtual OplogApplier::ApplierState getApplierState() const = 0;
+
+ /*
+ * Updates the OplogApplier's current state.
+ */
+ virtual void setApplierState(const OplogApplier::ApplierState st) = 0;
+
+ /*
* Creates noop writer instance. Setting the _noopWriter member is not protected by a guard,
* hence it must be called before multi-threaded operations start.
*/
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 1c7603ca075..4354746147d 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -910,6 +910,15 @@ std::size_t ReplicationCoordinatorExternalStateImpl::getOplogFetcherInitialSyncM
return oplogFetcherInitialSyncMaxFetcherRestarts.load();
}
+
+OplogApplier::ApplierState ReplicationCoordinatorExternalStateImpl::getApplierState() const {
+ return _oplogApplier.get()->getApplierState();
+}
+
+void ReplicationCoordinatorExternalStateImpl::setApplierState(const OplogApplier::ApplierState st) {
+ _oplogApplier.get()->setApplierState(st);
+}
+
JournalListener::Token ReplicationCoordinatorExternalStateImpl::getToken() {
return repl::ReplicationCoordinator::get(_service)->getMyLastAppliedOpTimeAndWallTime();
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 0b75f3d9cc7..da4808543e9 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -108,6 +108,8 @@ public:
virtual bool isReadConcernSnapshotSupportedByStorageEngine(OperationContext* opCtx) const;
virtual std::size_t getOplogFetcherSteadyStateMaxFetcherRestarts() const override;
virtual std::size_t getOplogFetcherInitialSyncMaxFetcherRestarts() const override;
+ virtual OplogApplier::ApplierState getApplierState() const override;
+ virtual void setApplierState(const OplogApplier::ApplierState st) override;
// Methods from JournalListener.
virtual JournalListener::Token getToken();
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index 75bdac91439..eddecc1bdb5 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -46,7 +46,31 @@
namespace mongo {
namespace repl {
+namespace {
+/**
+ * Minimal implementation of OplogApplier for testing.
+ */
+class OplogApplierMock : public OplogApplier {
+ OplogApplierMock(const OplogApplierMock&) = delete;
+ OplogApplierMock& operator=(const OplogApplierMock&) = delete;
+
+public:
+ explicit OplogApplierMock(OplogApplier::Options options);
+
+ void _run(OplogBuffer* oplogBuffer) final;
+ StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) final;
+};
+
+OplogApplierMock::OplogApplierMock(OplogApplier::Options options)
+ : OplogApplier(nullptr, nullptr, nullptr, options) {}
+
+void OplogApplierMock::_run(OplogBuffer* oplogBuffer) {}
+StatusWith<OpTime> OplogApplierMock::_multiApply(OperationContext* opCtx, Operations ops) {
+ return OpTime();
+}
+
+} // namespace
ReplicationCoordinatorExternalStateMock::ReplicationCoordinatorExternalStateMock()
: _localRsConfigDocument(ErrorCodes::NoMatchingDocument, "No local config document"),
_localRsLastVoteDocument(ErrorCodes::NoMatchingDocument, "No local lastVote document"),
@@ -57,7 +81,9 @@ ReplicationCoordinatorExternalStateMock::ReplicationCoordinatorExternalStateMock
_storeLocalLastVoteDocumentStatus(Status::OK()),
_storeLocalLastVoteDocumentShouldHang(false),
_connectionsClosed(false),
- _threadsStarted(false) {}
+ _threadsStarted(false),
+ _oplogApplier(std::make_unique<OplogApplierMock>(
+ repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary))) {}
ReplicationCoordinatorExternalStateMock::~ReplicationCoordinatorExternalStateMock() {}
@@ -293,6 +319,14 @@ void ReplicationCoordinatorExternalStateMock::setIsReadCommittedEnabled(bool val
_isReadCommittedSupported = val;
}
+void ReplicationCoordinatorExternalStateMock::setApplierState(const OplogApplier::ApplierState st) {
+ _oplogApplier->setApplierState(st);
+}
+
+OplogApplier::ApplierState ReplicationCoordinatorExternalStateMock::getApplierState() const {
+ return _oplogApplier->getApplierState();
+}
+
void ReplicationCoordinatorExternalStateMock::onDrainComplete(OperationContext* opCtx) {}
OpTime ReplicationCoordinatorExternalStateMock::onTransitionToPrimary(OperationContext* opCtx) {
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index 5cebab1e820..42378115a1e 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -36,6 +36,7 @@
#include "mongo/bson/timestamp.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/last_vote.h"
+#include "mongo/db/repl/oplog_applier_impl.h"
#include "mongo/db/repl/replication_coordinator_external_state.h"
#include "mongo/platform/condition_variable.h"
#include "mongo/platform/mutex.h"
@@ -99,6 +100,8 @@ public:
virtual bool isReadConcernSnapshotSupportedByStorageEngine(OperationContext* opCtx) const;
virtual std::size_t getOplogFetcherSteadyStateMaxFetcherRestarts() const override;
virtual std::size_t getOplogFetcherInitialSyncMaxFetcherRestarts() const override;
+ virtual OplogApplier::ApplierState getApplierState() const override;
+ virtual void setApplierState(const OplogApplier::ApplierState st) override;
/**
* Adds "host" to the list of hosts that this mock will match when responding to "isSelf"
@@ -209,6 +212,7 @@ private:
OpTime _firstOpTimeOfMyTerm;
double _electionTimeoutOffsetLimitFraction = 0.15;
Timestamp _globalTimestamp;
+ std::unique_ptr<OplogApplier> _oplogApplier;
};
} // namespace repl
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index ef8d6a7d73e..3eba44e99a7 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -951,11 +951,6 @@ Status ReplicationCoordinatorImpl::_setFollowerMode(OperationContext* opCtx,
return Status::OK();
}
-ReplicationCoordinator::ApplierState ReplicationCoordinatorImpl::getApplierState() {
- stdx::lock_guard<Latch> lk(_mutex);
- return _applierState;
-}
-
void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx,
long long termWhenBufferIsEmpty) {
// This logic is a little complicated in order to avoid acquiring the RSTL in mode X
@@ -985,7 +980,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx,
invariant(opCtx->writesAreReplicated());
stdx::unique_lock<Latch> lk(_mutex);
- if (_applierState != ApplierState::Draining) {
+ if (_externalState->getApplierState() != OplogApplier::ApplierState::Draining) {
return;
}
lk.unlock();
@@ -1001,11 +996,11 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx,
// Exit drain mode only if we're actually in draining mode, the apply buffer is empty in the
// current term, and we're allowed to become the write master.
- if (_applierState != ApplierState::Draining ||
+ if (_externalState->getApplierState() != OplogApplier::ApplierState::Draining ||
!_topCoord->canCompleteTransitionToPrimary(termWhenBufferIsEmpty)) {
return;
}
- _applierState = ApplierState::Stopped;
+ _externalState->setApplierState(OplogApplier::ApplierState::Stopped);
invariant(_getMemberState_inlock().primary());
invariant(!_readWriteAbility->canAcceptNonLocalWrites(opCtx));
@@ -1047,7 +1042,9 @@ Status ReplicationCoordinatorImpl::waitForDrainFinish(Milliseconds timeout) {
}
stdx::unique_lock<Latch> lk(_mutex);
- auto pred = [this]() { return _applierState != ApplierState::Draining; };
+ auto pred = [this]() {
+ return _externalState->getApplierState() != OplogApplier::ApplierState::Draining;
+ };
if (!_drainFinishedCond.wait_for(lk, timeout.toSystemDuration(), pred)) {
return Status(ErrorCodes::ExceededTimeLimit,
"Timed out waiting to finish draining applier buffer");
@@ -2847,7 +2844,7 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator(WithLock l
_catchupState->abort_inlock(PrimaryCatchUpConclusionReason::kFailedWithError);
}
}
- _applierState = ApplierState::Running;
+ _externalState->setApplierState(OplogApplier::ApplierState::Running);
_externalState->startProducerIfStopped();
}
@@ -3150,7 +3147,7 @@ boost::optional<Timestamp> ReplicationCoordinatorImpl::getRecoveryTimestamp() {
}
void ReplicationCoordinatorImpl::_enterDrainMode_inlock() {
- _applierState = ApplierState::Draining;
+ _externalState->setApplierState(OplogApplier::ApplierState::Draining);
_externalState->stopProducer();
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 78f1df824b0..bcb37c64af0 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -198,8 +198,6 @@ public:
virtual Status setFollowerModeStrict(OperationContext* opCtx,
const MemberState& newState) override;
- virtual ApplierState getApplierState() override;
-
virtual void signalDrainComplete(OperationContext* opCtx,
long long termWhenBufferIsEmpty) override;
@@ -1379,8 +1377,6 @@ private:
// Used to signal threads waiting for changes to _memberState.
stdx::condition_variable _drainFinishedCond; // (M)
- ReplicationCoordinator::ApplierState _applierState = ApplierState::Running; // (M)
-
// Used to signal threads waiting for changes to _rsConfigState.
stdx::condition_variable _rsConfigStateChange; // (M)
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
index 2ab89c5a33a..3e806b36e6c 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
@@ -57,7 +57,7 @@ namespace {
using executor::NetworkInterfaceMock;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
-using ApplierState = ReplicationCoordinator::ApplierState;
+using ApplierState = OplogApplier::ApplierState;
TEST_F(ReplCoordTest, RandomizedElectionOffsetWithinProperBounds) {
BSONObj configObj = BSON("_id"
@@ -153,7 +153,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) {
ASSERT(getReplCoord()->getMemberState().primary())
<< getReplCoord()->getMemberState().toString();
simulateCatchUpAbort();
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Draining);
const auto opCtxPtr = makeOperationContext();
auto& opCtx = *opCtxPtr;
@@ -209,7 +209,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyNode) {
getReplCoord()->waitForElectionFinish_forTest();
ASSERT(getReplCoord()->getMemberState().primary())
<< getReplCoord()->getMemberState().toString();
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Draining);
const auto opCtxPtr = makeOperationContext();
auto& opCtx = *opCtxPtr;
@@ -2250,7 +2250,7 @@ TEST_F(PrimaryCatchUpTest, PrimaryDoesNotNeedToCatchUp) {
// Get 2 heartbeats from secondaries.
ASSERT_EQUALS(2, count);
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest optime known via heartbeats"));
auto opCtx = makeOperationContext();
@@ -2302,9 +2302,9 @@ TEST_F(PrimaryCatchUpTest, CatchupSucceeds) {
ASSERT_EQUALS(time2,
ReplicationMetrics::get(getServiceContext()).getTargetCatchupOpTime_forTesting());
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Running);
advanceMyLastAppliedOpTime(time2, Date_t() + Seconds(time2.getSecs()));
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest known optime successfully"));
auto opCtx = makeOperationContext();
@@ -2338,7 +2338,7 @@ TEST_F(PrimaryCatchUpTest, CatchupTimeout) {
// Other nodes are ahead of me.
getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2));
});
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Catchup timed out"));
auto opCtx = makeOperationContext();
@@ -2377,7 +2377,7 @@ TEST_F(PrimaryCatchUpTest, CannotSeeAllNodes) {
getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time1));
}
});
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest optime known via heartbeats"));
auto opCtx = makeOperationContext();
@@ -2417,7 +2417,7 @@ TEST_F(PrimaryCatchUpTest, HeartbeatTimeout) {
getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time1));
}
});
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest optime known via heartbeats"));
auto opCtx = makeOperationContext();
@@ -2448,13 +2448,13 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownBeforeHeartbeatRefreshing) {
OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
// Step down immediately.
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Running);
TopologyCoordinator::UpdateTermResult updateTermResult;
auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult);
ASSERT_TRUE(evh.isValid());
getReplExec()->waitForEvent(evh);
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Running);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode"));
ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest"));
@@ -2495,14 +2495,14 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringCatchUp) {
ASSERT_EQUALS(time2,
ReplicationMetrics::get(getServiceContext()).getTargetCatchupOpTime_forTesting());
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Running);
TopologyCoordinator::UpdateTermResult updateTermResult;
auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult);
ASSERT_TRUE(evh.isValid());
getReplExec()->waitForEvent(evh);
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
// replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod());
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Running);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode"));
ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest"));
@@ -2544,9 +2544,9 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) {
net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2));
});
ReplicationCoordinatorImpl* replCoord = getReplCoord();
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Running);
advanceMyLastAppliedOpTime(time2, Date_t() + Seconds(time2.getSecs()));
- ASSERT(replCoord->getApplierState() == ApplierState::Draining);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest"));
@@ -2573,7 +2573,7 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) {
ASSERT_TRUE(replCoord->getMemberState().secondary());
// Step up again
- ASSERT(replCoord->getApplierState() == ApplierState::Running);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Running);
simulateSuccessfulV1Voting();
ASSERT_TRUE(replCoord->getMemberState().primary());
@@ -2582,14 +2582,14 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) {
auto net = getNet();
net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2));
});
- ASSERT(replCoord->getApplierState() == ApplierState::Draining);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Draining);
{
Lock::GlobalLock lock(opCtx.get(), MODE_IX);
ASSERT_FALSE(replCoord->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
signalDrainComplete(opCtx.get());
Lock::GlobalLock lock(opCtx.get(), MODE_IX);
- ASSERT(replCoord->getApplierState() == ApplierState::Stopped);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Stopped);
ASSERT_TRUE(replCoord->canAcceptWritesForDatabase(opCtx.get(), "test"));
// Check that the number of elections requiring primary catchup was not incremented again.
@@ -2631,7 +2631,7 @@ TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) {
}
});
// The node is still in catchup mode, but the target optime has been set.
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Running);
stopCapturingLogMessages();
ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime"));
ASSERT_EQUALS(time3,
@@ -2639,7 +2639,7 @@ TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) {
// 3) Advancing its applied optime to time 2 isn't enough.
advanceMyLastAppliedOpTime(time2, Date_t() + Seconds(time2.getSecs()));
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Running);
// 4) After a while, the other node at time 4 becomes available. Time 4 becomes the new target.
startCapturingLogMessages();
@@ -2653,7 +2653,7 @@ TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) {
}
});
// The node is still in catchup mode, but the target optime has been updated.
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Running);
stopCapturingLogMessages();
ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime"));
ASSERT_EQUALS(time4,
@@ -2661,12 +2661,12 @@ TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) {
// 5) Advancing to time 3 isn't enough now.
advanceMyLastAppliedOpTime(time3, Date_t() + Seconds(time3.getSecs()));
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Running);
// 6) The node catches up time 4 eventually.
startCapturingLogMessages();
advanceMyLastAppliedOpTime(time4, Date_t() + Seconds(time4.getSecs()));
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest"));
auto opCtx = makeOperationContext();
@@ -2718,13 +2718,13 @@ TEST_F(PrimaryCatchUpTest, InfiniteTimeoutAndAbort) {
ASSERT_OK(getReplCoord()->processHeartbeatV1(hbArgs, &response));
});
ASSERT_TRUE(getReplCoord()->getMemberState().primary());
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Running);
// Simulate a user initiated abort.
ASSERT_OK(getReplCoord()->abortCatchupIfNeeded(
ReplicationCoordinator::PrimaryCatchUpConclusionReason::
kFailedWithReplSetAbortPrimaryCatchUpCmd));
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode"));
@@ -2756,7 +2756,7 @@ TEST_F(PrimaryCatchUpTest, ZeroTimeout) {
OpTime time1(Timestamp(100, 1), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1, 0);
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Skipping primary catchup"));
auto opCtx = makeOperationContext();
@@ -2791,12 +2791,12 @@ TEST_F(PrimaryCatchUpTest, CatchUpFailsDueToPrimaryStepDown) {
// Other nodes are ahead of me.
getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2));
});
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Running);
auto opCtx = makeOperationContext();
getReplCoord()->stepDown(opCtx.get(), true, Milliseconds(0), Milliseconds(1000));
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
- ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Running);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode"));
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index cfdd4b29a05..4dfc7d06fe5 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -86,6 +86,7 @@ using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
using unittest::assertGet;
using unittest::EnsureFCV;
+using ApplierState = OplogApplier::ApplierState;
typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs;
// Helper class to wrap Timestamp as an OpTime with term 1.
@@ -1734,7 +1735,7 @@ TEST_F(StepDownTest, StepDownFailureRestoresDrainState) {
auto electionTimeoutWhen = getReplCoord()->getElectionTimeout_forTest();
simulateSuccessfulV1ElectionWithoutExitingDrainMode(electionTimeoutWhen);
ASSERT_TRUE(repl->getMemberState().primary());
- ASSERT(repl->getApplierState() == ReplicationCoordinator::ApplierState::Draining);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Draining);
{
// We can't take writes yet since we're still in drain mode.
@@ -1759,7 +1760,7 @@ TEST_F(StepDownTest, StepDownFailureRestoresDrainState) {
ASSERT(stepDownStatus == ErrorCodes::PrimarySteppedDown ||
stepDownStatus == ErrorCodes::Interrupted);
ASSERT_TRUE(getReplCoord()->getMemberState().primary());
- ASSERT(repl->getApplierState() == ReplicationCoordinator::ApplierState::Draining);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Draining);
// Ensure that the failed stepdown attempt didn't make us able to take writes since we're still
// in drain mode.
@@ -1772,7 +1773,7 @@ TEST_F(StepDownTest, StepDownFailureRestoresDrainState) {
// Now complete drain mode and ensure that we become capable of taking writes.
auto opCtx = makeOperationContext();
signalDrainComplete(opCtx.get());
- ASSERT(repl->getApplierState() == ReplicationCoordinator::ApplierState::Stopped);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Stopped);
ASSERT_TRUE(getReplCoord()->getMemberState().primary());
Lock::GlobalLock lock(opCtx.get(), MODE_IX);
@@ -5695,7 +5696,7 @@ TEST_F(ReplCoordTest, WaitForDrainFinish) {
auto timeout = Milliseconds(1);
ASSERT_OK(replCoord->waitForMemberState(MemberState::RS_PRIMARY, timeout));
- ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Draining);
ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, replCoord->waitForDrainFinish(timeout));
ASSERT_EQUALS(ErrorCodes::BadValue, replCoord->waitForDrainFinish(Milliseconds(-1)));
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index eb09d7384be..234df4ece5b 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -296,10 +296,6 @@ Status ReplicationCoordinatorMock::setFollowerModeStrict(OperationContext* opCtx
return setFollowerMode(newState);
}
-ReplicationCoordinator::ApplierState ReplicationCoordinatorMock::getApplierState() {
- return ApplierState::Running;
-}
-
void ReplicationCoordinatorMock::signalDrainComplete(OperationContext*, long long) {}
Status ReplicationCoordinatorMock::waitForDrainFinish(Milliseconds timeout) {
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 596776e1498..42f50670cba 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -166,8 +166,6 @@ public:
virtual Status setFollowerModeStrict(OperationContext* opCtx, const MemberState& newState);
- virtual ApplierState getApplierState();
-
virtual void signalDrainComplete(OperationContext*, long long);
virtual Status waitForDrainFinish(Milliseconds timeout) override;
diff --git a/src/mongo/db/repl/replication_coordinator_noop.cpp b/src/mongo/db/repl/replication_coordinator_noop.cpp
index 44d45bada49..e3e8f062bc4 100644
--- a/src/mongo/db/repl/replication_coordinator_noop.cpp
+++ b/src/mongo/db/repl/replication_coordinator_noop.cpp
@@ -177,10 +177,6 @@ Status ReplicationCoordinatorNoOp::setFollowerModeStrict(OperationContext* opCtx
MONGO_UNREACHABLE;
}
-ReplicationCoordinator::ApplierState ReplicationCoordinatorNoOp::getApplierState() {
- MONGO_UNREACHABLE;
-}
-
void ReplicationCoordinatorNoOp::signalDrainComplete(OperationContext*, long long) {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/repl/replication_coordinator_noop.h b/src/mongo/db/repl/replication_coordinator_noop.h
index e8390f6fc42..a13cc50f979 100644
--- a/src/mongo/db/repl/replication_coordinator_noop.h
+++ b/src/mongo/db/repl/replication_coordinator_noop.h
@@ -147,8 +147,6 @@ public:
Status setFollowerModeStrict(OperationContext* opCtx, const MemberState&) final;
- ApplierState getApplierState() final;
-
void signalDrainComplete(OperationContext*, long long) final;
Status waitForDrainFinish(Milliseconds) final;
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index d09d8bdf379..18ee73d94b9 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -62,6 +62,7 @@ namespace repl {
using executor::NetworkInterfaceMock;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
+using ApplierState = OplogApplier::ApplierState;
executor::TaskExecutor* ReplCoordTest::getReplExec() {
return _replExec;
@@ -359,7 +360,7 @@ void ReplCoordTest::simulateSuccessfulV1ElectionWithoutExitingDrainMode(Date_t e
hasReadyRequests = net->hasReadyRequests();
getNet()->exitNetwork();
}
- ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Draining);
ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString();
IsMasterResponse imResponse;
@@ -376,7 +377,7 @@ void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) {
auto opCtx = makeOperationContext();
signalDrainComplete(opCtx.get());
}
- ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Stopped);
IsMasterResponse imResponse;
replCoord->fillIsMasterForReplSet(&imResponse, {});
ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString();
@@ -399,7 +400,7 @@ void ReplCoordTest::runSingleNodeElection(OperationContext* opCtx) {
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->waitForElectionFinish_forTest();
- ASSERT(getReplCoord()->getApplierState() == ReplicationCoordinator::ApplierState::Draining);
+ ASSERT(getExternalState()->getApplierState() == ApplierState::Draining);
ASSERT(getReplCoord()->getMemberState().primary())
<< getReplCoord()->getMemberState().toString();
diff --git a/src/mongo/embedded/replication_coordinator_embedded.cpp b/src/mongo/embedded/replication_coordinator_embedded.cpp
index 408b5c78ee6..26c7ed3673c 100644
--- a/src/mongo/embedded/replication_coordinator_embedded.cpp
+++ b/src/mongo/embedded/replication_coordinator_embedded.cpp
@@ -184,10 +184,6 @@ Status ReplicationCoordinatorEmbedded::setFollowerModeStrict(OperationContext* o
UASSERT_NOT_IMPLEMENTED;
}
-ReplicationCoordinator::ApplierState ReplicationCoordinatorEmbedded::getApplierState() {
- UASSERT_NOT_IMPLEMENTED;
-}
-
void ReplicationCoordinatorEmbedded::signalDrainComplete(OperationContext*, long long) {
UASSERT_NOT_IMPLEMENTED;
}
diff --git a/src/mongo/embedded/replication_coordinator_embedded.h b/src/mongo/embedded/replication_coordinator_embedded.h
index 9fe16bf52dc..f2cd27e9f43 100644
--- a/src/mongo/embedded/replication_coordinator_embedded.h
+++ b/src/mongo/embedded/replication_coordinator_embedded.h
@@ -153,8 +153,6 @@ public:
Status setFollowerModeStrict(OperationContext* opCtx, const repl::MemberState&) override;
- ApplierState getApplierState() override;
-
void signalDrainComplete(OperationContext*, long long) override;
Status waitForDrainFinish(Milliseconds) override;