diff options
author | Mihai Andrei <mihai.andrei@mongodb.com> | 2019-10-08 22:52:40 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-10-08 22:52:40 +0000 |
commit | 1d1379a2b019df47c8a49fdba107b46aa54736c7 (patch) | |
tree | 15e59acd954a470fd75137f727458cac3195f66f /src | |
parent | 331535530240cb91e44002fca36ec9927548ada9 (diff) | |
download | mongo-1d1379a2b019df47c8a49fdba107b46aa54736c7.tar.gz |
SERVER-42996 Move ApplierState to OplogApplier
Diffstat (limited to 'src')
22 files changed, 180 insertions, 132 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index a0283ce3ae5..0dd0478e690 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -722,6 +722,7 @@ env.Library( 'storage_interface_mock.cpp', ], LIBDEPS=[ + 'oplog_application_interface', 'oplog_buffer_blocking_queue', 'repl_coordinator_interface', 'repl_settings', diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index 4c854beb0c2..4ef95f94816 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -287,6 +287,16 @@ void OplogApplier::_consume(OperationContext* opCtx, OplogBuffer* oplogBuffer) { invariant(oplogBuffer->tryPop(opCtx, &opToPopAndDiscard) || inShutdown()); } +OplogApplier::ApplierState OplogApplier::getApplierState() const { + stdx::lock_guard<Latch> lock(_mutex); + return _applierState; +} + +void OplogApplier::setApplierState(ApplierState st) { + stdx::lock_guard<Latch> lock(_mutex); + _applierState = st; +} + std::unique_ptr<ThreadPool> makeReplWriterPool() { return makeReplWriterPool(replWriterThreadCount); } diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h index b6bcaf08b77..722d5516719 100644 --- a/src/mongo/db/repl/oplog_applier.h +++ b/src/mongo/db/repl/oplog_applier.h @@ -190,6 +190,65 @@ public: StatusWith<OpTime> multiApply(OperationContext* opCtx, Operations ops); const Options& getOptions() const; + /** + * Step-up + * ======= + * On stepup, repl coord enters catch-up mode. It's the same as the secondary mode from + * the perspective of producer and applier, so there's nothing to do with them. + * When a node enters drain mode, producer state = Stopped, applier state = Draining. + * + * If the applier state is Draining, it will signal repl coord when there's nothing to apply. + * The applier goes into Stopped state at the same time. + * + * The states go like the following: + * - secondary and during catchup mode + * (producer: Running, applier: Running) + * | + * | finish catch-up, enter drain mode + * V + * - drain mode + * (producer: Stopped, applier: Draining) + * | + * | applier signals drain is complete + * V + * - primary is in master mode + * (producer: Stopped, applier: Stopped) + * + * + * Step-down + * ========= + * The state transitions become: + * - primary is in master mode + * (producer: Stopped, applier: Stopped) + * | + * | step down + * V + * - secondary mode, starting bgsync + * (producer: Starting, applier: Running) + * | + * | bgsync runs start() + * V + * - secondary mode, normal + * (producer: Running, applier: Running) + * + * When a node steps down during draining mode, it's OK to change from (producer: Stopped, + * applier: Draining) to (producer: Starting, applier: Running). + * + * When a node steps down during catchup mode, the states remain the same (producer: Running, + * applier: Running). + */ + enum class ApplierState { Running, Draining, Stopped }; + + /** + * In normal cases: Running -> Draining -> Stopped -> Running. + * Draining -> Running is also possible if a node steps down during drain mode. + * + * Only the applier can make the transition from Draining to Stopped by calling + * signalDrainComplete(). + */ + virtual ApplierState getApplierState() const; + + virtual void setApplierState(ApplierState st); private: /** @@ -228,6 +287,8 @@ private: // Configures this OplogApplier. const Options _options; + + ApplierState _applierState = ApplierState::Running; }; /** diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index 899bae5683d..50235183d3b 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -557,10 +557,7 @@ StatusWith<OpTime> OplogApplierImpl::_multiApply(OperationContext* opCtx, // entries from the oplog until we finish writing. Lock::ParallelBatchWriterMode pbwm(opCtx->lockState()); - // TODO (SERVER-42996): This is a temporary invariant to protect against segfaults. This will - // be removed once ApplierState is moved from ReplicationCoordinator to OplogApplier. - invariant(_replCoord); - if (_replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped) { + if (getApplierState() == ApplierState::Stopped) { severe() << "attempting to replicate ops while primary"; return {ErrorCodes::CannotApplyOplogWhilePrimary, "attempting to replicate ops while primary"}; diff --git a/src/mongo/db/repl/opqueue_batcher.cpp b/src/mongo/db/repl/opqueue_batcher.cpp index a7b41404193..c0f9e604393 100644 --- a/src/mongo/db/repl/opqueue_batcher.cpp +++ b/src/mongo/db/repl/opqueue_batcher.cpp @@ -154,7 +154,7 @@ void OpQueueBatcher::run() { // Draining state guarantees the producer has already been fully stopped and no more // operations will be pushed in to the oplog buffer until the applier state changes. auto isDraining = - replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining; + _oplogApplier->getApplierState() == OplogApplier::ApplierState::Draining; // Check the oplog buffer after the applier state to ensure the producer is stopped. if (isDraining && _oplogBuffer->isEmpty()) { ops.setTermWhenExhausted(termWhenBufferIsEmpty); diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 63d51768d53..525cb1a102e 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -484,64 +484,6 @@ public: virtual Status setFollowerModeStrict(OperationContext* opCtx, const MemberState& newState) = 0; /** - * Step-up - * ======= - * On stepup, repl coord enters catch-up mode. It's the same as the secondary mode from - * the perspective of producer and applier, so there's nothing to do with them. - * When a node enters drain mode, producer state = Stopped, applier state = Draining. - * - * If the applier state is Draining, it will signal repl coord when there's nothing to apply. - * The applier goes into Stopped state at the same time. - * - * The states go like the following: - * - secondary and during catchup mode - * (producer: Running, applier: Running) - * | - * | finish catch-up, enter drain mode - * V - * - drain mode - * (producer: Stopped, applier: Draining) - * | - * | applier signals drain is complete - * V - * - primary is in master mode - * (producer: Stopped, applier: Stopped) - * - * - * Step-down - * ========= - * The state transitions become: - * - primary is in master mode - * (producer: Stopped, applier: Stopped) - * | - * | step down - * V - * - secondary mode, starting bgsync - * (producer: Starting, applier: Running) - * | - * | bgsync runs start() - * V - * - secondary mode, normal - * (producer: Running, applier: Running) - * - * When a node steps down during draining mode, it's OK to change from (producer: Stopped, - * applier: Draining) to (producer: Starting, applier: Running). - * - * When a node steps down during catchup mode, the states remain the same (producer: Running, - * applier: Running). - */ - enum class ApplierState { Running, Draining, Stopped }; - - /** - * In normal cases: Running -> Draining -> Stopped -> Running. - * Draining -> Running is also possible if a node steps down during drain mode. - * - * Only the applier can make the transition from Draining to Stopped by calling - * signalDrainComplete(). - */ - virtual ApplierState getApplierState() = 0; - - /** * Signals that a previously requested pause and drain of the applier buffer * has completed. * diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index dd6f4e507ac..fa6c4bbad73 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -35,6 +35,7 @@ #include "mongo/bson/timestamp.h" #include "mongo/db/repl/member_state.h" +#include "mongo/db/repl/oplog_applier.h" #include "mongo/db/repl/optime.h" #include "mongo/executor/task_executor.h" #include "mongo/util/concurrency/thread_pool.h" @@ -301,6 +302,16 @@ public: virtual std::size_t getOplogFetcherInitialSyncMaxFetcherRestarts() const = 0; /* + * Returns the OplogApplier's current state. + */ + virtual OplogApplier::ApplierState getApplierState() const = 0; + + /* + * Updates the OplogApplier's current state. + */ + virtual void setApplierState(const OplogApplier::ApplierState st) = 0; + + /* * Creates noop writer instance. Setting the _noopWriter member is not protected by a guard, * hence it must be called before multi-threaded operations start. */ diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 1c7603ca075..4354746147d 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -910,6 +910,15 @@ std::size_t ReplicationCoordinatorExternalStateImpl::getOplogFetcherInitialSyncM return oplogFetcherInitialSyncMaxFetcherRestarts.load(); } + +OplogApplier::ApplierState ReplicationCoordinatorExternalStateImpl::getApplierState() const { + return _oplogApplier.get()->getApplierState(); +} + +void ReplicationCoordinatorExternalStateImpl::setApplierState(const OplogApplier::ApplierState st) { + _oplogApplier.get()->setApplierState(st); +} + JournalListener::Token ReplicationCoordinatorExternalStateImpl::getToken() { return repl::ReplicationCoordinator::get(_service)->getMyLastAppliedOpTimeAndWallTime(); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 0b75f3d9cc7..da4808543e9 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -108,6 +108,8 @@ public: virtual bool isReadConcernSnapshotSupportedByStorageEngine(OperationContext* opCtx) const; virtual std::size_t getOplogFetcherSteadyStateMaxFetcherRestarts() const override; virtual std::size_t getOplogFetcherInitialSyncMaxFetcherRestarts() const override; + virtual OplogApplier::ApplierState getApplierState() const override; + virtual void setApplierState(const OplogApplier::ApplierState st) override; // Methods from JournalListener. virtual JournalListener::Token getToken(); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index 75bdac91439..eddecc1bdb5 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -46,7 +46,31 @@ namespace mongo { namespace repl { +namespace { +/** + * Minimal implementation of OplogApplier for testing. + */ +class OplogApplierMock : public OplogApplier { + OplogApplierMock(const OplogApplierMock&) = delete; + OplogApplierMock& operator=(const OplogApplierMock&) = delete; + +public: + explicit OplogApplierMock(OplogApplier::Options options); + + void _run(OplogBuffer* oplogBuffer) final; + StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) final; +}; + +OplogApplierMock::OplogApplierMock(OplogApplier::Options options) + : OplogApplier(nullptr, nullptr, nullptr, options) {} + +void OplogApplierMock::_run(OplogBuffer* oplogBuffer) {} +StatusWith<OpTime> OplogApplierMock::_multiApply(OperationContext* opCtx, Operations ops) { + return OpTime(); +} + +} // namespace ReplicationCoordinatorExternalStateMock::ReplicationCoordinatorExternalStateMock() : _localRsConfigDocument(ErrorCodes::NoMatchingDocument, "No local config document"), _localRsLastVoteDocument(ErrorCodes::NoMatchingDocument, "No local lastVote document"), @@ -57,7 +81,9 @@ ReplicationCoordinatorExternalStateMock::ReplicationCoordinatorExternalStateMock _storeLocalLastVoteDocumentStatus(Status::OK()), _storeLocalLastVoteDocumentShouldHang(false), _connectionsClosed(false), - _threadsStarted(false) {} + _threadsStarted(false), + _oplogApplier(std::make_unique<OplogApplierMock>( + repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary))) {} ReplicationCoordinatorExternalStateMock::~ReplicationCoordinatorExternalStateMock() {} @@ -293,6 +319,14 @@ void ReplicationCoordinatorExternalStateMock::setIsReadCommittedEnabled(bool val _isReadCommittedSupported = val; } +void ReplicationCoordinatorExternalStateMock::setApplierState(const OplogApplier::ApplierState st) { + _oplogApplier->setApplierState(st); +} + +OplogApplier::ApplierState ReplicationCoordinatorExternalStateMock::getApplierState() const { + return _oplogApplier->getApplierState(); +} + void ReplicationCoordinatorExternalStateMock::onDrainComplete(OperationContext* opCtx) {} OpTime ReplicationCoordinatorExternalStateMock::onTransitionToPrimary(OperationContext* opCtx) { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 5cebab1e820..42378115a1e 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -36,6 +36,7 @@ #include "mongo/bson/timestamp.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/last_vote.h" +#include "mongo/db/repl/oplog_applier_impl.h" #include "mongo/db/repl/replication_coordinator_external_state.h" #include "mongo/platform/condition_variable.h" #include "mongo/platform/mutex.h" @@ -99,6 +100,8 @@ public: virtual bool isReadConcernSnapshotSupportedByStorageEngine(OperationContext* opCtx) const; virtual std::size_t getOplogFetcherSteadyStateMaxFetcherRestarts() const override; virtual std::size_t getOplogFetcherInitialSyncMaxFetcherRestarts() const override; + virtual OplogApplier::ApplierState getApplierState() const override; + virtual void setApplierState(const OplogApplier::ApplierState st) override; /** * Adds "host" to the list of hosts that this mock will match when responding to "isSelf" @@ -209,6 +212,7 @@ private: OpTime _firstOpTimeOfMyTerm; double _electionTimeoutOffsetLimitFraction = 0.15; Timestamp _globalTimestamp; + std::unique_ptr<OplogApplier> _oplogApplier; }; } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index ef8d6a7d73e..3eba44e99a7 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -951,11 +951,6 @@ Status ReplicationCoordinatorImpl::_setFollowerMode(OperationContext* opCtx, return Status::OK(); } -ReplicationCoordinator::ApplierState ReplicationCoordinatorImpl::getApplierState() { - stdx::lock_guard<Latch> lk(_mutex); - return _applierState; -} - void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx, long long termWhenBufferIsEmpty) { // This logic is a little complicated in order to avoid acquiring the RSTL in mode X @@ -985,7 +980,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx, invariant(opCtx->writesAreReplicated()); stdx::unique_lock<Latch> lk(_mutex); - if (_applierState != ApplierState::Draining) { + if (_externalState->getApplierState() != OplogApplier::ApplierState::Draining) { return; } lk.unlock(); @@ -1001,11 +996,11 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx, // Exit drain mode only if we're actually in draining mode, the apply buffer is empty in the // current term, and we're allowed to become the write master. - if (_applierState != ApplierState::Draining || + if (_externalState->getApplierState() != OplogApplier::ApplierState::Draining || !_topCoord->canCompleteTransitionToPrimary(termWhenBufferIsEmpty)) { return; } - _applierState = ApplierState::Stopped; + _externalState->setApplierState(OplogApplier::ApplierState::Stopped); invariant(_getMemberState_inlock().primary()); invariant(!_readWriteAbility->canAcceptNonLocalWrites(opCtx)); @@ -1047,7 +1042,9 @@ Status ReplicationCoordinatorImpl::waitForDrainFinish(Milliseconds timeout) { } stdx::unique_lock<Latch> lk(_mutex); - auto pred = [this]() { return _applierState != ApplierState::Draining; }; + auto pred = [this]() { + return _externalState->getApplierState() != OplogApplier::ApplierState::Draining; + }; if (!_drainFinishedCond.wait_for(lk, timeout.toSystemDuration(), pred)) { return Status(ErrorCodes::ExceededTimeLimit, "Timed out waiting to finish draining applier buffer"); @@ -2847,7 +2844,7 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator(WithLock l _catchupState->abort_inlock(PrimaryCatchUpConclusionReason::kFailedWithError); } } - _applierState = ApplierState::Running; + _externalState->setApplierState(OplogApplier::ApplierState::Running); _externalState->startProducerIfStopped(); } @@ -3150,7 +3147,7 @@ boost::optional<Timestamp> ReplicationCoordinatorImpl::getRecoveryTimestamp() { } void ReplicationCoordinatorImpl::_enterDrainMode_inlock() { - _applierState = ApplierState::Draining; + _externalState->setApplierState(OplogApplier::ApplierState::Draining); _externalState->stopProducer(); } diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 78f1df824b0..bcb37c64af0 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -198,8 +198,6 @@ public: virtual Status setFollowerModeStrict(OperationContext* opCtx, const MemberState& newState) override; - virtual ApplierState getApplierState() override; - virtual void signalDrainComplete(OperationContext* opCtx, long long termWhenBufferIsEmpty) override; @@ -1379,8 +1377,6 @@ private: // Used to signal threads waiting for changes to _memberState. stdx::condition_variable _drainFinishedCond; // (M) - ReplicationCoordinator::ApplierState _applierState = ApplierState::Running; // (M) - // Used to signal threads waiting for changes to _rsConfigState. stdx::condition_variable _rsConfigStateChange; // (M) diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp index 2ab89c5a33a..3e806b36e6c 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp @@ -57,7 +57,7 @@ namespace { using executor::NetworkInterfaceMock; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; -using ApplierState = ReplicationCoordinator::ApplierState; +using ApplierState = OplogApplier::ApplierState; TEST_F(ReplCoordTest, RandomizedElectionOffsetWithinProperBounds) { BSONObj configObj = BSON("_id" @@ -153,7 +153,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) { ASSERT(getReplCoord()->getMemberState().primary()) << getReplCoord()->getMemberState().toString(); simulateCatchUpAbort(); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); + ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); const auto opCtxPtr = makeOperationContext(); auto& opCtx = *opCtxPtr; @@ -209,7 +209,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyNode) { getReplCoord()->waitForElectionFinish_forTest(); ASSERT(getReplCoord()->getMemberState().primary()) << getReplCoord()->getMemberState().toString(); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); + ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); const auto opCtxPtr = makeOperationContext(); auto& opCtx = *opCtxPtr; @@ -2250,7 +2250,7 @@ TEST_F(PrimaryCatchUpTest, PrimaryDoesNotNeedToCatchUp) { // Get 2 heartbeats from secondaries. ASSERT_EQUALS(2, count); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); + ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest optime known via heartbeats")); auto opCtx = makeOperationContext(); @@ -2302,9 +2302,9 @@ TEST_F(PrimaryCatchUpTest, CatchupSucceeds) { ASSERT_EQUALS(time2, ReplicationMetrics::get(getServiceContext()).getTargetCatchupOpTime_forTesting()); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + ASSERT(getExternalState()->getApplierState() == ApplierState::Running); advanceMyLastAppliedOpTime(time2, Date_t() + Seconds(time2.getSecs())); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); + ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest known optime successfully")); auto opCtx = makeOperationContext(); @@ -2338,7 +2338,7 @@ TEST_F(PrimaryCatchUpTest, CatchupTimeout) { // Other nodes are ahead of me. getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2)); }); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); + ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Catchup timed out")); auto opCtx = makeOperationContext(); @@ -2377,7 +2377,7 @@ TEST_F(PrimaryCatchUpTest, CannotSeeAllNodes) { getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time1)); } }); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); + ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest optime known via heartbeats")); auto opCtx = makeOperationContext(); @@ -2417,7 +2417,7 @@ TEST_F(PrimaryCatchUpTest, HeartbeatTimeout) { getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time1)); } }); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); + ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest optime known via heartbeats")); auto opCtx = makeOperationContext(); @@ -2448,13 +2448,13 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownBeforeHeartbeatRefreshing) { OpTime time2(Timestamp(100, 2), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); // Step down immediately. - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + ASSERT(getExternalState()->getApplierState() == ApplierState::Running); TopologyCoordinator::UpdateTermResult updateTermResult; auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult); ASSERT_TRUE(evh.isValid()); getReplExec()->waitForEvent(evh); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + ASSERT(getExternalState()->getApplierState() == ApplierState::Running); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode")); ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest")); @@ -2495,14 +2495,14 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringCatchUp) { ASSERT_EQUALS(time2, ReplicationMetrics::get(getServiceContext()).getTargetCatchupOpTime_forTesting()); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + ASSERT(getExternalState()->getApplierState() == ApplierState::Running); TopologyCoordinator::UpdateTermResult updateTermResult; auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult); ASSERT_TRUE(evh.isValid()); getReplExec()->waitForEvent(evh); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); // replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod()); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + ASSERT(getExternalState()->getApplierState() == ApplierState::Running); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode")); ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest")); @@ -2544,9 +2544,9 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) { net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2)); }); ReplicationCoordinatorImpl* replCoord = getReplCoord(); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + ASSERT(getExternalState()->getApplierState() == ApplierState::Running); advanceMyLastAppliedOpTime(time2, Date_t() + Seconds(time2.getSecs())); - ASSERT(replCoord->getApplierState() == ApplierState::Draining); + ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest")); @@ -2573,7 +2573,7 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) { ASSERT_TRUE(replCoord->getMemberState().secondary()); // Step up again - ASSERT(replCoord->getApplierState() == ApplierState::Running); + ASSERT(getExternalState()->getApplierState() == ApplierState::Running); simulateSuccessfulV1Voting(); ASSERT_TRUE(replCoord->getMemberState().primary()); @@ -2582,14 +2582,14 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) { auto net = getNet(); net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2)); }); - ASSERT(replCoord->getApplierState() == ApplierState::Draining); + ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); { Lock::GlobalLock lock(opCtx.get(), MODE_IX); ASSERT_FALSE(replCoord->canAcceptWritesForDatabase(opCtx.get(), "test")); } signalDrainComplete(opCtx.get()); Lock::GlobalLock lock(opCtx.get(), MODE_IX); - ASSERT(replCoord->getApplierState() == ApplierState::Stopped); + ASSERT(getExternalState()->getApplierState() == ApplierState::Stopped); ASSERT_TRUE(replCoord->canAcceptWritesForDatabase(opCtx.get(), "test")); // Check that the number of elections requiring primary catchup was not incremented again. @@ -2631,7 +2631,7 @@ TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) { } }); // The node is still in catchup mode, but the target optime has been set. - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + ASSERT(getExternalState()->getApplierState() == ApplierState::Running); stopCapturingLogMessages(); ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime")); ASSERT_EQUALS(time3, @@ -2639,7 +2639,7 @@ TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) { // 3) Advancing its applied optime to time 2 isn't enough. advanceMyLastAppliedOpTime(time2, Date_t() + Seconds(time2.getSecs())); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + ASSERT(getExternalState()->getApplierState() == ApplierState::Running); // 4) After a while, the other node at time 4 becomes available. Time 4 becomes the new target. startCapturingLogMessages(); @@ -2653,7 +2653,7 @@ TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) { } }); // The node is still in catchup mode, but the target optime has been updated. - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + ASSERT(getExternalState()->getApplierState() == ApplierState::Running); stopCapturingLogMessages(); ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime")); ASSERT_EQUALS(time4, @@ -2661,12 +2661,12 @@ TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) { // 5) Advancing to time 3 isn't enough now. advanceMyLastAppliedOpTime(time3, Date_t() + Seconds(time3.getSecs())); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + ASSERT(getExternalState()->getApplierState() == ApplierState::Running); // 6) The node catches up time 4 eventually. startCapturingLogMessages(); advanceMyLastAppliedOpTime(time4, Date_t() + Seconds(time4.getSecs())); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); + ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest")); auto opCtx = makeOperationContext(); @@ -2718,13 +2718,13 @@ TEST_F(PrimaryCatchUpTest, InfiniteTimeoutAndAbort) { ASSERT_OK(getReplCoord()->processHeartbeatV1(hbArgs, &response)); }); ASSERT_TRUE(getReplCoord()->getMemberState().primary()); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + ASSERT(getExternalState()->getApplierState() == ApplierState::Running); // Simulate a user initiated abort. ASSERT_OK(getReplCoord()->abortCatchupIfNeeded( ReplicationCoordinator::PrimaryCatchUpConclusionReason:: kFailedWithReplSetAbortPrimaryCatchUpCmd)); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); + ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode")); @@ -2756,7 +2756,7 @@ TEST_F(PrimaryCatchUpTest, ZeroTimeout) { OpTime time1(Timestamp(100, 1), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1, 0); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); + ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Skipping primary catchup")); auto opCtx = makeOperationContext(); @@ -2791,12 +2791,12 @@ TEST_F(PrimaryCatchUpTest, CatchUpFailsDueToPrimaryStepDown) { // Other nodes are ahead of me. getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2)); }); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + ASSERT(getExternalState()->getApplierState() == ApplierState::Running); auto opCtx = makeOperationContext(); getReplCoord()->stepDown(opCtx.get(), true, Milliseconds(0), Milliseconds(1000)); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); - ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); + ASSERT(getExternalState()->getApplierState() == ApplierState::Running); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode")); diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index cfdd4b29a05..4dfc7d06fe5 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -86,6 +86,7 @@ using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; using unittest::assertGet; using unittest::EnsureFCV; +using ApplierState = OplogApplier::ApplierState; typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs; // Helper class to wrap Timestamp as an OpTime with term 1. @@ -1734,7 +1735,7 @@ TEST_F(StepDownTest, StepDownFailureRestoresDrainState) { auto electionTimeoutWhen = getReplCoord()->getElectionTimeout_forTest(); simulateSuccessfulV1ElectionWithoutExitingDrainMode(electionTimeoutWhen); ASSERT_TRUE(repl->getMemberState().primary()); - ASSERT(repl->getApplierState() == ReplicationCoordinator::ApplierState::Draining); + ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); { // We can't take writes yet since we're still in drain mode. @@ -1759,7 +1760,7 @@ TEST_F(StepDownTest, StepDownFailureRestoresDrainState) { ASSERT(stepDownStatus == ErrorCodes::PrimarySteppedDown || stepDownStatus == ErrorCodes::Interrupted); ASSERT_TRUE(getReplCoord()->getMemberState().primary()); - ASSERT(repl->getApplierState() == ReplicationCoordinator::ApplierState::Draining); + ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); // Ensure that the failed stepdown attempt didn't make us able to take writes since we're still // in drain mode. @@ -1772,7 +1773,7 @@ TEST_F(StepDownTest, StepDownFailureRestoresDrainState) { // Now complete drain mode and ensure that we become capable of taking writes. auto opCtx = makeOperationContext(); signalDrainComplete(opCtx.get()); - ASSERT(repl->getApplierState() == ReplicationCoordinator::ApplierState::Stopped); + ASSERT(getExternalState()->getApplierState() == ApplierState::Stopped); ASSERT_TRUE(getReplCoord()->getMemberState().primary()); Lock::GlobalLock lock(opCtx.get(), MODE_IX); @@ -5695,7 +5696,7 @@ TEST_F(ReplCoordTest, WaitForDrainFinish) { auto timeout = Milliseconds(1); ASSERT_OK(replCoord->waitForMemberState(MemberState::RS_PRIMARY, timeout)); - ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining); + ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, replCoord->waitForDrainFinish(timeout)); ASSERT_EQUALS(ErrorCodes::BadValue, replCoord->waitForDrainFinish(Milliseconds(-1))); diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index eb09d7384be..234df4ece5b 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -296,10 +296,6 @@ Status ReplicationCoordinatorMock::setFollowerModeStrict(OperationContext* opCtx return setFollowerMode(newState); } -ReplicationCoordinator::ApplierState ReplicationCoordinatorMock::getApplierState() { - return ApplierState::Running; -} - void ReplicationCoordinatorMock::signalDrainComplete(OperationContext*, long long) {} Status ReplicationCoordinatorMock::waitForDrainFinish(Milliseconds timeout) { diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 596776e1498..42f50670cba 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -166,8 +166,6 @@ public: virtual Status setFollowerModeStrict(OperationContext* opCtx, const MemberState& newState); - virtual ApplierState getApplierState(); - virtual void signalDrainComplete(OperationContext*, long long); virtual Status waitForDrainFinish(Milliseconds timeout) override; diff --git a/src/mongo/db/repl/replication_coordinator_noop.cpp b/src/mongo/db/repl/replication_coordinator_noop.cpp index 44d45bada49..e3e8f062bc4 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.cpp +++ b/src/mongo/db/repl/replication_coordinator_noop.cpp @@ -177,10 +177,6 @@ Status ReplicationCoordinatorNoOp::setFollowerModeStrict(OperationContext* opCtx MONGO_UNREACHABLE; } -ReplicationCoordinator::ApplierState ReplicationCoordinatorNoOp::getApplierState() { - MONGO_UNREACHABLE; -} - void ReplicationCoordinatorNoOp::signalDrainComplete(OperationContext*, long long) { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/repl/replication_coordinator_noop.h b/src/mongo/db/repl/replication_coordinator_noop.h index e8390f6fc42..a13cc50f979 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.h +++ b/src/mongo/db/repl/replication_coordinator_noop.h @@ -147,8 +147,6 @@ public: Status setFollowerModeStrict(OperationContext* opCtx, const MemberState&) final; - ApplierState getApplierState() final; - void signalDrainComplete(OperationContext*, long long) final; Status waitForDrainFinish(Milliseconds) final; diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index d09d8bdf379..18ee73d94b9 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -62,6 +62,7 @@ namespace repl { using executor::NetworkInterfaceMock; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; +using ApplierState = OplogApplier::ApplierState; executor::TaskExecutor* ReplCoordTest::getReplExec() { return _replExec; @@ -359,7 +360,7 @@ void ReplCoordTest::simulateSuccessfulV1ElectionWithoutExitingDrainMode(Date_t e hasReadyRequests = net->hasReadyRequests(); getNet()->exitNetwork(); } - ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining); + ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString(); IsMasterResponse imResponse; @@ -376,7 +377,7 @@ void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) { auto opCtx = makeOperationContext(); signalDrainComplete(opCtx.get()); } - ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped); + ASSERT(getExternalState()->getApplierState() == ApplierState::Stopped); IsMasterResponse imResponse; replCoord->fillIsMasterForReplSet(&imResponse, {}); ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString(); @@ -399,7 +400,7 @@ void ReplCoordTest::runSingleNodeElection(OperationContext* opCtx) { ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); getReplCoord()->waitForElectionFinish_forTest(); - ASSERT(getReplCoord()->getApplierState() == ReplicationCoordinator::ApplierState::Draining); + ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); ASSERT(getReplCoord()->getMemberState().primary()) << getReplCoord()->getMemberState().toString(); diff --git a/src/mongo/embedded/replication_coordinator_embedded.cpp b/src/mongo/embedded/replication_coordinator_embedded.cpp index 408b5c78ee6..26c7ed3673c 100644 --- a/src/mongo/embedded/replication_coordinator_embedded.cpp +++ b/src/mongo/embedded/replication_coordinator_embedded.cpp @@ -184,10 +184,6 @@ Status ReplicationCoordinatorEmbedded::setFollowerModeStrict(OperationContext* o UASSERT_NOT_IMPLEMENTED; } -ReplicationCoordinator::ApplierState ReplicationCoordinatorEmbedded::getApplierState() { - UASSERT_NOT_IMPLEMENTED; -} - void ReplicationCoordinatorEmbedded::signalDrainComplete(OperationContext*, long long) { UASSERT_NOT_IMPLEMENTED; } diff --git a/src/mongo/embedded/replication_coordinator_embedded.h b/src/mongo/embedded/replication_coordinator_embedded.h index 9fe16bf52dc..f2cd27e9f43 100644 --- a/src/mongo/embedded/replication_coordinator_embedded.h +++ b/src/mongo/embedded/replication_coordinator_embedded.h @@ -153,8 +153,6 @@ public: Status setFollowerModeStrict(OperationContext* opCtx, const repl::MemberState&) override; - ApplierState getApplierState() override; - void signalDrainComplete(OperationContext*, long long) override; Status waitForDrainFinish(Milliseconds) override; |