From 6e136efbd50551965c17116d00e1f3179c039770 Mon Sep 17 00:00:00 2001 From: Mihai Andrei Date: Wed, 9 Oct 2019 17:29:25 +0000 Subject: Revert SERVER-42996 Move ApplierState to OplogApplier --- src/mongo/db/repl/SConscript | 1 - src/mongo/db/repl/oplog_applier.cpp | 10 ---- src/mongo/db/repl/oplog_applier.h | 61 ---------------------- src/mongo/db/repl/oplog_applier_impl.cpp | 5 +- src/mongo/db/repl/opqueue_batcher.cpp | 2 +- src/mongo/db/repl/replication_coordinator.h | 58 ++++++++++++++++++++ .../repl/replication_coordinator_external_state.h | 11 ---- ...replication_coordinator_external_state_impl.cpp | 9 ---- .../replication_coordinator_external_state_impl.h | 2 - ...replication_coordinator_external_state_mock.cpp | 36 +------------ .../replication_coordinator_external_state_mock.h | 4 -- src/mongo/db/repl/replication_coordinator_impl.cpp | 19 ++++--- src/mongo/db/repl/replication_coordinator_impl.h | 4 ++ .../replication_coordinator_impl_elect_v1_test.cpp | 56 ++++++++++---------- .../db/repl/replication_coordinator_impl_test.cpp | 9 ++-- src/mongo/db/repl/replication_coordinator_mock.cpp | 4 ++ src/mongo/db/repl/replication_coordinator_mock.h | 2 + src/mongo/db/repl/replication_coordinator_noop.cpp | 4 ++ src/mongo/db/repl/replication_coordinator_noop.h | 2 + .../repl/replication_coordinator_test_fixture.cpp | 7 ++- .../embedded/replication_coordinator_embedded.cpp | 4 ++ .../embedded/replication_coordinator_embedded.h | 2 + 22 files changed, 132 insertions(+), 180 deletions(-) diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 0dd0478e690..a0283ce3ae5 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -722,7 +722,6 @@ env.Library( 'storage_interface_mock.cpp', ], LIBDEPS=[ - 'oplog_application_interface', 'oplog_buffer_blocking_queue', 'repl_coordinator_interface', 'repl_settings', diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index 4ef95f94816..4c854beb0c2 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -287,16 +287,6 @@ void OplogApplier::_consume(OperationContext* opCtx, OplogBuffer* oplogBuffer) { invariant(oplogBuffer->tryPop(opCtx, &opToPopAndDiscard) || inShutdown()); } -OplogApplier::ApplierState OplogApplier::getApplierState() const { - stdx::lock_guard lock(_mutex); - return _applierState; -} - -void OplogApplier::setApplierState(ApplierState st) { - stdx::lock_guard lock(_mutex); - _applierState = st; -} - std::unique_ptr makeReplWriterPool() { return makeReplWriterPool(replWriterThreadCount); } diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h index 722d5516719..b6bcaf08b77 100644 --- a/src/mongo/db/repl/oplog_applier.h +++ b/src/mongo/db/repl/oplog_applier.h @@ -190,65 +190,6 @@ public: StatusWith multiApply(OperationContext* opCtx, Operations ops); const Options& getOptions() const; - /** - * Step-up - * ======= - * On stepup, repl coord enters catch-up mode. It's the same as the secondary mode from - * the perspective of producer and applier, so there's nothing to do with them. - * When a node enters drain mode, producer state = Stopped, applier state = Draining. - * - * If the applier state is Draining, it will signal repl coord when there's nothing to apply. - * The applier goes into Stopped state at the same time. - * - * The states go like the following: - * - secondary and during catchup mode - * (producer: Running, applier: Running) - * | - * | finish catch-up, enter drain mode - * V - * - drain mode - * (producer: Stopped, applier: Draining) - * | - * | applier signals drain is complete - * V - * - primary is in master mode - * (producer: Stopped, applier: Stopped) - * - * - * Step-down - * ========= - * The state transitions become: - * - primary is in master mode - * (producer: Stopped, applier: Stopped) - * | - * | step down - * V - * - secondary mode, starting bgsync - * (producer: Starting, applier: Running) - * | - * | bgsync runs start() - * V - * - secondary mode, normal - * (producer: Running, applier: Running) - * - * When a node steps down during draining mode, it's OK to change from (producer: Stopped, - * applier: Draining) to (producer: Starting, applier: Running). - * - * When a node steps down during catchup mode, the states remain the same (producer: Running, - * applier: Running). - */ - enum class ApplierState { Running, Draining, Stopped }; - - /** - * In normal cases: Running -> Draining -> Stopped -> Running. - * Draining -> Running is also possible if a node steps down during drain mode. - * - * Only the applier can make the transition from Draining to Stopped by calling - * signalDrainComplete(). - */ - virtual ApplierState getApplierState() const; - - virtual void setApplierState(ApplierState st); private: /** @@ -287,8 +228,6 @@ private: // Configures this OplogApplier. const Options _options; - - ApplierState _applierState = ApplierState::Running; }; /** diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index 50235183d3b..899bae5683d 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -557,7 +557,10 @@ StatusWith OplogApplierImpl::_multiApply(OperationContext* opCtx, // entries from the oplog until we finish writing. Lock::ParallelBatchWriterMode pbwm(opCtx->lockState()); - if (getApplierState() == ApplierState::Stopped) { + // TODO (SERVER-42996): This is a temporary invariant to protect against segfaults. This will + // be removed once ApplierState is moved from ReplicationCoordinator to OplogApplier. + invariant(_replCoord); + if (_replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped) { severe() << "attempting to replicate ops while primary"; return {ErrorCodes::CannotApplyOplogWhilePrimary, "attempting to replicate ops while primary"}; diff --git a/src/mongo/db/repl/opqueue_batcher.cpp b/src/mongo/db/repl/opqueue_batcher.cpp index c0f9e604393..a7b41404193 100644 --- a/src/mongo/db/repl/opqueue_batcher.cpp +++ b/src/mongo/db/repl/opqueue_batcher.cpp @@ -154,7 +154,7 @@ void OpQueueBatcher::run() { // Draining state guarantees the producer has already been fully stopped and no more // operations will be pushed in to the oplog buffer until the applier state changes. auto isDraining = - _oplogApplier->getApplierState() == OplogApplier::ApplierState::Draining; + replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining; // Check the oplog buffer after the applier state to ensure the producer is stopped. if (isDraining && _oplogBuffer->isEmpty()) { ops.setTermWhenExhausted(termWhenBufferIsEmpty); diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 525cb1a102e..63d51768d53 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -483,6 +483,64 @@ public: */ virtual Status setFollowerModeStrict(OperationContext* opCtx, const MemberState& newState) = 0; + /** + * Step-up + * ======= + * On stepup, repl coord enters catch-up mode. It's the same as the secondary mode from + * the perspective of producer and applier, so there's nothing to do with them. + * When a node enters drain mode, producer state = Stopped, applier state = Draining. + * + * If the applier state is Draining, it will signal repl coord when there's nothing to apply. + * The applier goes into Stopped state at the same time. + * + * The states go like the following: + * - secondary and during catchup mode + * (producer: Running, applier: Running) + * | + * | finish catch-up, enter drain mode + * V + * - drain mode + * (producer: Stopped, applier: Draining) + * | + * | applier signals drain is complete + * V + * - primary is in master mode + * (producer: Stopped, applier: Stopped) + * + * + * Step-down + * ========= + * The state transitions become: + * - primary is in master mode + * (producer: Stopped, applier: Stopped) + * | + * | step down + * V + * - secondary mode, starting bgsync + * (producer: Starting, applier: Running) + * | + * | bgsync runs start() + * V + * - secondary mode, normal + * (producer: Running, applier: Running) + * + * When a node steps down during draining mode, it's OK to change from (producer: Stopped, + * applier: Draining) to (producer: Starting, applier: Running). + * + * When a node steps down during catchup mode, the states remain the same (producer: Running, + * applier: Running). + */ + enum class ApplierState { Running, Draining, Stopped }; + + /** + * In normal cases: Running -> Draining -> Stopped -> Running. + * Draining -> Running is also possible if a node steps down during drain mode. + * + * Only the applier can make the transition from Draining to Stopped by calling + * signalDrainComplete(). + */ + virtual ApplierState getApplierState() = 0; + /** * Signals that a previously requested pause and drain of the applier buffer * has completed. diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index fa6c4bbad73..dd6f4e507ac 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -35,7 +35,6 @@ #include "mongo/bson/timestamp.h" #include "mongo/db/repl/member_state.h" -#include "mongo/db/repl/oplog_applier.h" #include "mongo/db/repl/optime.h" #include "mongo/executor/task_executor.h" #include "mongo/util/concurrency/thread_pool.h" @@ -301,16 +300,6 @@ public: */ virtual std::size_t getOplogFetcherInitialSyncMaxFetcherRestarts() const = 0; - /* - * Returns the OplogApplier's current state. - */ - virtual OplogApplier::ApplierState getApplierState() const = 0; - - /* - * Updates the OplogApplier's current state. - */ - virtual void setApplierState(const OplogApplier::ApplierState st) = 0; - /* * Creates noop writer instance. Setting the _noopWriter member is not protected by a guard, * hence it must be called before multi-threaded operations start. diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 4354746147d..1c7603ca075 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -910,15 +910,6 @@ std::size_t ReplicationCoordinatorExternalStateImpl::getOplogFetcherInitialSyncM return oplogFetcherInitialSyncMaxFetcherRestarts.load(); } - -OplogApplier::ApplierState ReplicationCoordinatorExternalStateImpl::getApplierState() const { - return _oplogApplier.get()->getApplierState(); -} - -void ReplicationCoordinatorExternalStateImpl::setApplierState(const OplogApplier::ApplierState st) { - _oplogApplier.get()->setApplierState(st); -} - JournalListener::Token ReplicationCoordinatorExternalStateImpl::getToken() { return repl::ReplicationCoordinator::get(_service)->getMyLastAppliedOpTimeAndWallTime(); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index da4808543e9..0b75f3d9cc7 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -108,8 +108,6 @@ public: virtual bool isReadConcernSnapshotSupportedByStorageEngine(OperationContext* opCtx) const; virtual std::size_t getOplogFetcherSteadyStateMaxFetcherRestarts() const override; virtual std::size_t getOplogFetcherInitialSyncMaxFetcherRestarts() const override; - virtual OplogApplier::ApplierState getApplierState() const override; - virtual void setApplierState(const OplogApplier::ApplierState st) override; // Methods from JournalListener. virtual JournalListener::Token getToken(); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index eddecc1bdb5..75bdac91439 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -46,31 +46,7 @@ namespace mongo { namespace repl { -namespace { -/** - * Minimal implementation of OplogApplier for testing. - */ -class OplogApplierMock : public OplogApplier { - OplogApplierMock(const OplogApplierMock&) = delete; - OplogApplierMock& operator=(const OplogApplierMock&) = delete; - -public: - explicit OplogApplierMock(OplogApplier::Options options); - - void _run(OplogBuffer* oplogBuffer) final; - StatusWith _multiApply(OperationContext* opCtx, Operations ops) final; -}; - -OplogApplierMock::OplogApplierMock(OplogApplier::Options options) - : OplogApplier(nullptr, nullptr, nullptr, options) {} - -void OplogApplierMock::_run(OplogBuffer* oplogBuffer) {} -StatusWith OplogApplierMock::_multiApply(OperationContext* opCtx, Operations ops) { - return OpTime(); -} - -} // namespace ReplicationCoordinatorExternalStateMock::ReplicationCoordinatorExternalStateMock() : _localRsConfigDocument(ErrorCodes::NoMatchingDocument, "No local config document"), _localRsLastVoteDocument(ErrorCodes::NoMatchingDocument, "No local lastVote document"), @@ -81,9 +57,7 @@ ReplicationCoordinatorExternalStateMock::ReplicationCoordinatorExternalStateMock _storeLocalLastVoteDocumentStatus(Status::OK()), _storeLocalLastVoteDocumentShouldHang(false), _connectionsClosed(false), - _threadsStarted(false), - _oplogApplier(std::make_unique( - repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary))) {} + _threadsStarted(false) {} ReplicationCoordinatorExternalStateMock::~ReplicationCoordinatorExternalStateMock() {} @@ -319,14 +293,6 @@ void ReplicationCoordinatorExternalStateMock::setIsReadCommittedEnabled(bool val _isReadCommittedSupported = val; } -void ReplicationCoordinatorExternalStateMock::setApplierState(const OplogApplier::ApplierState st) { - _oplogApplier->setApplierState(st); -} - -OplogApplier::ApplierState ReplicationCoordinatorExternalStateMock::getApplierState() const { - return _oplogApplier->getApplierState(); -} - void ReplicationCoordinatorExternalStateMock::onDrainComplete(OperationContext* opCtx) {} OpTime ReplicationCoordinatorExternalStateMock::onTransitionToPrimary(OperationContext* opCtx) { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 42378115a1e..5cebab1e820 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -36,7 +36,6 @@ #include "mongo/bson/timestamp.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/last_vote.h" -#include "mongo/db/repl/oplog_applier_impl.h" #include "mongo/db/repl/replication_coordinator_external_state.h" #include "mongo/platform/condition_variable.h" #include "mongo/platform/mutex.h" @@ -100,8 +99,6 @@ public: virtual bool isReadConcernSnapshotSupportedByStorageEngine(OperationContext* opCtx) const; virtual std::size_t getOplogFetcherSteadyStateMaxFetcherRestarts() const override; virtual std::size_t getOplogFetcherInitialSyncMaxFetcherRestarts() const override; - virtual OplogApplier::ApplierState getApplierState() const override; - virtual void setApplierState(const OplogApplier::ApplierState st) override; /** * Adds "host" to the list of hosts that this mock will match when responding to "isSelf" @@ -212,7 +209,6 @@ private: OpTime _firstOpTimeOfMyTerm; double _electionTimeoutOffsetLimitFraction = 0.15; Timestamp _globalTimestamp; - std::unique_ptr _oplogApplier; }; } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 4b796ccd2ee..8925e576199 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -951,6 +951,11 @@ Status ReplicationCoordinatorImpl::_setFollowerMode(OperationContext* opCtx, return Status::OK(); } +ReplicationCoordinator::ApplierState ReplicationCoordinatorImpl::getApplierState() { + stdx::lock_guard lk(_mutex); + return _applierState; +} + void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx, long long termWhenBufferIsEmpty) { // This logic is a little complicated in order to avoid acquiring the RSTL in mode X @@ -980,7 +985,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx, invariant(opCtx->writesAreReplicated()); stdx::unique_lock lk(_mutex); - if (_externalState->getApplierState() != OplogApplier::ApplierState::Draining) { + if (_applierState != ApplierState::Draining) { return; } lk.unlock(); @@ -996,11 +1001,11 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx, // Exit drain mode only if we're actually in draining mode, the apply buffer is empty in the // current term, and we're allowed to become the write master. - if (_externalState->getApplierState() != OplogApplier::ApplierState::Draining || + if (_applierState != ApplierState::Draining || !_topCoord->canCompleteTransitionToPrimary(termWhenBufferIsEmpty)) { return; } - _externalState->setApplierState(OplogApplier::ApplierState::Stopped); + _applierState = ApplierState::Stopped; invariant(_getMemberState_inlock().primary()); invariant(!_readWriteAbility->canAcceptNonLocalWrites(opCtx)); @@ -1042,9 +1047,7 @@ Status ReplicationCoordinatorImpl::waitForDrainFinish(Milliseconds timeout) { } stdx::unique_lock lk(_mutex); - auto pred = [this]() { - return _externalState->getApplierState() != OplogApplier::ApplierState::Draining; - }; + auto pred = [this]() { return _applierState != ApplierState::Draining; }; if (!_drainFinishedCond.wait_for(lk, timeout.toSystemDuration(), pred)) { return Status(ErrorCodes::ExceededTimeLimit, "Timed out waiting to finish draining applier buffer"); @@ -2844,7 +2847,7 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator(WithLock l _catchupState->abort_inlock(PrimaryCatchUpConclusionReason::kFailedWithError); } } - _externalState->setApplierState(OplogApplier::ApplierState::Running); + _applierState = ApplierState::Running; _externalState->startProducerIfStopped(); } @@ -3147,7 +3150,7 @@ boost::optional ReplicationCoordinatorImpl::getRecoveryTimestamp() { } void ReplicationCoordinatorImpl::_enterDrainMode_inlock() { - _externalState->setApplierState(OplogApplier::ApplierState::Draining); + _applierState = ApplierState::Draining; _externalState->stopProducer(); } diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index bcb37c64af0..78f1df824b0 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -198,6 +198,8 @@ public: virtual Status setFollowerModeStrict(OperationContext* opCtx, const MemberState& newState) override; + virtual ApplierState getApplierState() override; + virtual void signalDrainComplete(OperationContext* opCtx, long long termWhenBufferIsEmpty) override; @@ -1377,6 +1379,8 @@ private: // Used to signal threads waiting for changes to _memberState. stdx::condition_variable _drainFinishedCond; // (M) + ReplicationCoordinator::ApplierState _applierState = ApplierState::Running; // (M) + // Used to signal threads waiting for changes to _rsConfigState. stdx::condition_variable _rsConfigStateChange; // (M) diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp index 3e806b36e6c..2ab89c5a33a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp @@ -57,7 +57,7 @@ namespace { using executor::NetworkInterfaceMock; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; -using ApplierState = OplogApplier::ApplierState; +using ApplierState = ReplicationCoordinator::ApplierState; TEST_F(ReplCoordTest, RandomizedElectionOffsetWithinProperBounds) { BSONObj configObj = BSON("_id" @@ -153,7 +153,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) { ASSERT(getReplCoord()->getMemberState().primary()) << getReplCoord()->getMemberState().toString(); simulateCatchUpAbort(); - ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); const auto opCtxPtr = makeOperationContext(); auto& opCtx = *opCtxPtr; @@ -209,7 +209,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyNode) { getReplCoord()->waitForElectionFinish_forTest(); ASSERT(getReplCoord()->getMemberState().primary()) << getReplCoord()->getMemberState().toString(); - ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); const auto opCtxPtr = makeOperationContext(); auto& opCtx = *opCtxPtr; @@ -2250,7 +2250,7 @@ TEST_F(PrimaryCatchUpTest, PrimaryDoesNotNeedToCatchUp) { // Get 2 heartbeats from secondaries. ASSERT_EQUALS(2, count); - ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest optime known via heartbeats")); auto opCtx = makeOperationContext(); @@ -2302,9 +2302,9 @@ TEST_F(PrimaryCatchUpTest, CatchupSucceeds) { ASSERT_EQUALS(time2, ReplicationMetrics::get(getServiceContext()).getTargetCatchupOpTime_forTesting()); - ASSERT(getExternalState()->getApplierState() == ApplierState::Running); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); advanceMyLastAppliedOpTime(time2, Date_t() + Seconds(time2.getSecs())); - ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest known optime successfully")); auto opCtx = makeOperationContext(); @@ -2338,7 +2338,7 @@ TEST_F(PrimaryCatchUpTest, CatchupTimeout) { // Other nodes are ahead of me. getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2)); }); - ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Catchup timed out")); auto opCtx = makeOperationContext(); @@ -2377,7 +2377,7 @@ TEST_F(PrimaryCatchUpTest, CannotSeeAllNodes) { getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time1)); } }); - ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest optime known via heartbeats")); auto opCtx = makeOperationContext(); @@ -2417,7 +2417,7 @@ TEST_F(PrimaryCatchUpTest, HeartbeatTimeout) { getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time1)); } }); - ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest optime known via heartbeats")); auto opCtx = makeOperationContext(); @@ -2448,13 +2448,13 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownBeforeHeartbeatRefreshing) { OpTime time2(Timestamp(100, 2), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1); // Step down immediately. - ASSERT(getExternalState()->getApplierState() == ApplierState::Running); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); TopologyCoordinator::UpdateTermResult updateTermResult; auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult); ASSERT_TRUE(evh.isValid()); getReplExec()->waitForEvent(evh); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); - ASSERT(getExternalState()->getApplierState() == ApplierState::Running); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode")); ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest")); @@ -2495,14 +2495,14 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringCatchUp) { ASSERT_EQUALS(time2, ReplicationMetrics::get(getServiceContext()).getTargetCatchupOpTime_forTesting()); - ASSERT(getExternalState()->getApplierState() == ApplierState::Running); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); TopologyCoordinator::UpdateTermResult updateTermResult; auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult); ASSERT_TRUE(evh.isValid()); getReplExec()->waitForEvent(evh); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); // replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod()); - ASSERT(getExternalState()->getApplierState() == ApplierState::Running); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode")); ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest")); @@ -2544,9 +2544,9 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) { net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2)); }); ReplicationCoordinatorImpl* replCoord = getReplCoord(); - ASSERT(getExternalState()->getApplierState() == ApplierState::Running); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); advanceMyLastAppliedOpTime(time2, Date_t() + Seconds(time2.getSecs())); - ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); + ASSERT(replCoord->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest")); @@ -2573,7 +2573,7 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) { ASSERT_TRUE(replCoord->getMemberState().secondary()); // Step up again - ASSERT(getExternalState()->getApplierState() == ApplierState::Running); + ASSERT(replCoord->getApplierState() == ApplierState::Running); simulateSuccessfulV1Voting(); ASSERT_TRUE(replCoord->getMemberState().primary()); @@ -2582,14 +2582,14 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) { auto net = getNet(); net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2)); }); - ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); + ASSERT(replCoord->getApplierState() == ApplierState::Draining); { Lock::GlobalLock lock(opCtx.get(), MODE_IX); ASSERT_FALSE(replCoord->canAcceptWritesForDatabase(opCtx.get(), "test")); } signalDrainComplete(opCtx.get()); Lock::GlobalLock lock(opCtx.get(), MODE_IX); - ASSERT(getExternalState()->getApplierState() == ApplierState::Stopped); + ASSERT(replCoord->getApplierState() == ApplierState::Stopped); ASSERT_TRUE(replCoord->canAcceptWritesForDatabase(opCtx.get(), "test")); // Check that the number of elections requiring primary catchup was not incremented again. @@ -2631,7 +2631,7 @@ TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) { } }); // The node is still in catchup mode, but the target optime has been set. - ASSERT(getExternalState()->getApplierState() == ApplierState::Running); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); stopCapturingLogMessages(); ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime")); ASSERT_EQUALS(time3, @@ -2639,7 +2639,7 @@ TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) { // 3) Advancing its applied optime to time 2 isn't enough. advanceMyLastAppliedOpTime(time2, Date_t() + Seconds(time2.getSecs())); - ASSERT(getExternalState()->getApplierState() == ApplierState::Running); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); // 4) After a while, the other node at time 4 becomes available. Time 4 becomes the new target. startCapturingLogMessages(); @@ -2653,7 +2653,7 @@ TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) { } }); // The node is still in catchup mode, but the target optime has been updated. - ASSERT(getExternalState()->getApplierState() == ApplierState::Running); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); stopCapturingLogMessages(); ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime")); ASSERT_EQUALS(time4, @@ -2661,12 +2661,12 @@ TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) { // 5) Advancing to time 3 isn't enough now. advanceMyLastAppliedOpTime(time3, Date_t() + Seconds(time3.getSecs())); - ASSERT(getExternalState()->getApplierState() == ApplierState::Running); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); // 6) The node catches up time 4 eventually. startCapturingLogMessages(); advanceMyLastAppliedOpTime(time4, Date_t() + Seconds(time4.getSecs())); - ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest")); auto opCtx = makeOperationContext(); @@ -2718,13 +2718,13 @@ TEST_F(PrimaryCatchUpTest, InfiniteTimeoutAndAbort) { ASSERT_OK(getReplCoord()->processHeartbeatV1(hbArgs, &response)); }); ASSERT_TRUE(getReplCoord()->getMemberState().primary()); - ASSERT(getExternalState()->getApplierState() == ApplierState::Running); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); // Simulate a user initiated abort. ASSERT_OK(getReplCoord()->abortCatchupIfNeeded( ReplicationCoordinator::PrimaryCatchUpConclusionReason:: kFailedWithReplSetAbortPrimaryCatchUpCmd)); - ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode")); @@ -2756,7 +2756,7 @@ TEST_F(PrimaryCatchUpTest, ZeroTimeout) { OpTime time1(Timestamp(100, 1), 0); ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1, 0); - ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Skipping primary catchup")); auto opCtx = makeOperationContext(); @@ -2791,12 +2791,12 @@ TEST_F(PrimaryCatchUpTest, CatchUpFailsDueToPrimaryStepDown) { // Other nodes are ahead of me. getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2)); }); - ASSERT(getExternalState()->getApplierState() == ApplierState::Running); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); auto opCtx = makeOperationContext(); getReplCoord()->stepDown(opCtx.get(), true, Milliseconds(0), Milliseconds(1000)); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); - ASSERT(getExternalState()->getApplierState() == ApplierState::Running); + ASSERT(getReplCoord()->getApplierState() == ApplierState::Running); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode")); diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 4dfc7d06fe5..cfdd4b29a05 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -86,7 +86,6 @@ using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; using unittest::assertGet; using unittest::EnsureFCV; -using ApplierState = OplogApplier::ApplierState; typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs; // Helper class to wrap Timestamp as an OpTime with term 1. @@ -1735,7 +1734,7 @@ TEST_F(StepDownTest, StepDownFailureRestoresDrainState) { auto electionTimeoutWhen = getReplCoord()->getElectionTimeout_forTest(); simulateSuccessfulV1ElectionWithoutExitingDrainMode(electionTimeoutWhen); ASSERT_TRUE(repl->getMemberState().primary()); - ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); + ASSERT(repl->getApplierState() == ReplicationCoordinator::ApplierState::Draining); { // We can't take writes yet since we're still in drain mode. @@ -1760,7 +1759,7 @@ TEST_F(StepDownTest, StepDownFailureRestoresDrainState) { ASSERT(stepDownStatus == ErrorCodes::PrimarySteppedDown || stepDownStatus == ErrorCodes::Interrupted); ASSERT_TRUE(getReplCoord()->getMemberState().primary()); - ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); + ASSERT(repl->getApplierState() == ReplicationCoordinator::ApplierState::Draining); // Ensure that the failed stepdown attempt didn't make us able to take writes since we're still // in drain mode. @@ -1773,7 +1772,7 @@ TEST_F(StepDownTest, StepDownFailureRestoresDrainState) { // Now complete drain mode and ensure that we become capable of taking writes. auto opCtx = makeOperationContext(); signalDrainComplete(opCtx.get()); - ASSERT(getExternalState()->getApplierState() == ApplierState::Stopped); + ASSERT(repl->getApplierState() == ReplicationCoordinator::ApplierState::Stopped); ASSERT_TRUE(getReplCoord()->getMemberState().primary()); Lock::GlobalLock lock(opCtx.get(), MODE_IX); @@ -5696,7 +5695,7 @@ TEST_F(ReplCoordTest, WaitForDrainFinish) { auto timeout = Milliseconds(1); ASSERT_OK(replCoord->waitForMemberState(MemberState::RS_PRIMARY, timeout)); - ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); + ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining); ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, replCoord->waitForDrainFinish(timeout)); ASSERT_EQUALS(ErrorCodes::BadValue, replCoord->waitForDrainFinish(Milliseconds(-1))); diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 234df4ece5b..eb09d7384be 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -296,6 +296,10 @@ Status ReplicationCoordinatorMock::setFollowerModeStrict(OperationContext* opCtx return setFollowerMode(newState); } +ReplicationCoordinator::ApplierState ReplicationCoordinatorMock::getApplierState() { + return ApplierState::Running; +} + void ReplicationCoordinatorMock::signalDrainComplete(OperationContext*, long long) {} Status ReplicationCoordinatorMock::waitForDrainFinish(Milliseconds timeout) { diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 42f50670cba..596776e1498 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -166,6 +166,8 @@ public: virtual Status setFollowerModeStrict(OperationContext* opCtx, const MemberState& newState); + virtual ApplierState getApplierState(); + virtual void signalDrainComplete(OperationContext*, long long); virtual Status waitForDrainFinish(Milliseconds timeout) override; diff --git a/src/mongo/db/repl/replication_coordinator_noop.cpp b/src/mongo/db/repl/replication_coordinator_noop.cpp index e3e8f062bc4..44d45bada49 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.cpp +++ b/src/mongo/db/repl/replication_coordinator_noop.cpp @@ -177,6 +177,10 @@ Status ReplicationCoordinatorNoOp::setFollowerModeStrict(OperationContext* opCtx MONGO_UNREACHABLE; } +ReplicationCoordinator::ApplierState ReplicationCoordinatorNoOp::getApplierState() { + MONGO_UNREACHABLE; +} + void ReplicationCoordinatorNoOp::signalDrainComplete(OperationContext*, long long) { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/repl/replication_coordinator_noop.h b/src/mongo/db/repl/replication_coordinator_noop.h index a13cc50f979..e8390f6fc42 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.h +++ b/src/mongo/db/repl/replication_coordinator_noop.h @@ -147,6 +147,8 @@ public: Status setFollowerModeStrict(OperationContext* opCtx, const MemberState&) final; + ApplierState getApplierState() final; + void signalDrainComplete(OperationContext*, long long) final; Status waitForDrainFinish(Milliseconds) final; diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index 18ee73d94b9..d09d8bdf379 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -62,7 +62,6 @@ namespace repl { using executor::NetworkInterfaceMock; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; -using ApplierState = OplogApplier::ApplierState; executor::TaskExecutor* ReplCoordTest::getReplExec() { return _replExec; @@ -360,7 +359,7 @@ void ReplCoordTest::simulateSuccessfulV1ElectionWithoutExitingDrainMode(Date_t e hasReadyRequests = net->hasReadyRequests(); getNet()->exitNetwork(); } - ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); + ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining); ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString(); IsMasterResponse imResponse; @@ -377,7 +376,7 @@ void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) { auto opCtx = makeOperationContext(); signalDrainComplete(opCtx.get()); } - ASSERT(getExternalState()->getApplierState() == ApplierState::Stopped); + ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped); IsMasterResponse imResponse; replCoord->fillIsMasterForReplSet(&imResponse, {}); ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString(); @@ -400,7 +399,7 @@ void ReplCoordTest::runSingleNodeElection(OperationContext* opCtx) { ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); getReplCoord()->waitForElectionFinish_forTest(); - ASSERT(getExternalState()->getApplierState() == ApplierState::Draining); + ASSERT(getReplCoord()->getApplierState() == ReplicationCoordinator::ApplierState::Draining); ASSERT(getReplCoord()->getMemberState().primary()) << getReplCoord()->getMemberState().toString(); diff --git a/src/mongo/embedded/replication_coordinator_embedded.cpp b/src/mongo/embedded/replication_coordinator_embedded.cpp index 26c7ed3673c..408b5c78ee6 100644 --- a/src/mongo/embedded/replication_coordinator_embedded.cpp +++ b/src/mongo/embedded/replication_coordinator_embedded.cpp @@ -184,6 +184,10 @@ Status ReplicationCoordinatorEmbedded::setFollowerModeStrict(OperationContext* o UASSERT_NOT_IMPLEMENTED; } +ReplicationCoordinator::ApplierState ReplicationCoordinatorEmbedded::getApplierState() { + UASSERT_NOT_IMPLEMENTED; +} + void ReplicationCoordinatorEmbedded::signalDrainComplete(OperationContext*, long long) { UASSERT_NOT_IMPLEMENTED; } diff --git a/src/mongo/embedded/replication_coordinator_embedded.h b/src/mongo/embedded/replication_coordinator_embedded.h index f2cd27e9f43..9fe16bf52dc 100644 --- a/src/mongo/embedded/replication_coordinator_embedded.h +++ b/src/mongo/embedded/replication_coordinator_embedded.h @@ -153,6 +153,8 @@ public: Status setFollowerModeStrict(OperationContext* opCtx, const repl::MemberState&) override; + ApplierState getApplierState() override; + void signalDrainComplete(OperationContext*, long long) override; Status waitForDrainFinish(Milliseconds) override; -- cgit v1.2.1