diff options
Diffstat (limited to 'src/mongo')
19 files changed, 97 insertions, 17 deletions
diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp index 9b67bc18331..8f1ec5c6c21 100644 --- a/src/mongo/db/concurrency/lock_state.cpp +++ b/src/mongo/db/concurrency/lock_state.cpp @@ -919,7 +919,8 @@ void LockerImpl::lockComplete(OperationContext* opCtx, void LockerImpl::getFlowControlTicket(OperationContext* opCtx, LockMode lockMode) { auto ticketholder = FlowControlTicketholder::get(opCtx); - if (ticketholder && lockMode == LockMode::MODE_IX && _clientState.load() == kInactive) { + if (ticketholder && lockMode == LockMode::MODE_IX && _clientState.load() == kInactive && + opCtx->shouldParticipateInFlowControl()) { // FlowControl only acts when a MODE_IX global lock is being taken. The clientState is only // being modified here to change serverStatus' `globalLock.currentQueue` metrics. This // method must not exit with a side-effect on the clientState. That value is also used for diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 07a72c9a03d..58d1d14ec67 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -80,6 +80,14 @@ public: OperationContext(Client* client, unsigned int opId); virtual ~OperationContext(); + bool shouldParticipateInFlowControl() const { + return _shouldParticipateInFlowControl; + } + + void setShouldParticipateInFlowControl(bool target) { + _shouldParticipateInFlowControl = target; + } + /** * Interface for durability. Caller DOES NOT own pointer. */ @@ -471,6 +479,7 @@ private: Timer _elapsedTime; bool _writesAreReplicated = true; + bool _shouldParticipateInFlowControl = true; }; namespace repl { diff --git a/src/mongo/db/repl/noop_writer.cpp b/src/mongo/db/repl/noop_writer.cpp index 6897acab22c..8c0fbfaa6b9 100644 --- a/src/mongo/db/repl/noop_writer.cpp +++ b/src/mongo/db/repl/noop_writer.cpp @@ -130,8 +130,11 @@ Status NoopWriter::startWritingPeriodicNoops(OpTime lastKnownOpTime) { _lastKnownOpTime = lastKnownOpTime; invariant(!_noopRunner); - _noopRunner = stdx::make_unique<PeriodicNoopRunner>( - _writeInterval, [this](OperationContext* opCtx) { _writeNoop(opCtx); }); + _noopRunner = + stdx::make_unique<PeriodicNoopRunner>(_writeInterval, [this](OperationContext* opCtx) { + opCtx->setShouldParticipateInFlowControl(false); + _writeNoop(opCtx); + }); return Status::OK(); } diff --git a/src/mongo/db/repl/repl_set_request_votes.cpp b/src/mongo/db/repl/repl_set_request_votes.cpp index 86b94e2d771..2654bc24ccb 100644 --- a/src/mongo/db/repl/repl_set_request_votes.cpp +++ b/src/mongo/db/repl/repl_set_request_votes.cpp @@ -59,6 +59,8 @@ private: status = parsedArgs.initialize(cmdObj); uassertStatusOK(status); + // Any writes that occur as part of an election should not be subject to Flow Control. + opCtx->setShouldParticipateInFlowControl(false); ReplSetRequestVotesResponse response; status = ReplicationCoordinator::get(opCtx)->processReplSetRequestVotes( opCtx, parsedArgs, &response); diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 3a8f9c2e667..4593ce48c7c 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -153,6 +153,11 @@ public: virtual MemberState getMemberState() const = 0; /** + * Returns whether this node can accept writes to databases other than local. + */ + virtual bool canAcceptNonLocalWrites() const = 0; + + /** * Waits for 'timeout' ms for member state to become 'state'. * Returns OK if member state is 'state'. * Returns ErrorCodes::ExceededTimeLimit if we timed out waiting for the state change. diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index c34fe99d4f7..2e163ee30ea 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -449,6 +449,7 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati void ReplicationCoordinatorExternalStateImpl::onDrainComplete(OperationContext* opCtx) { invariant(!opCtx->lockState()->isLocked()); + invariant(!opCtx->shouldParticipateInFlowControl()); // If this is a config server node becoming a primary, ensure the balancer is ready to start. if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { @@ -460,6 +461,7 @@ void ReplicationCoordinatorExternalStateImpl::onDrainComplete(OperationContext* OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationContext* opCtx) { invariant(opCtx->lockState()->isRSTLExclusive()); + invariant(!opCtx->shouldParticipateInFlowControl()); // Clear the appliedThrough marker so on startup we'll use the top of the oplog. This must be // done before we add anything to our oplog. @@ -571,6 +573,10 @@ StatusWith<LastVote> ReplicationCoordinatorExternalStateImpl::loadLocalLastVoteD Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument( OperationContext* opCtx, const LastVote& lastVote) { BSONObj lastVoteObj = lastVote.toBSON(); + + // Writes that are part of elections should not be throttled. + invariant(!opCtx->shouldParticipateInFlowControl()); + try { Status status = writeConflictRetry(opCtx, "save replica set lastVote", lastVoteCollectionName, [&] { diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index c306a06ca59..3353c3cd506 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -2146,6 +2146,11 @@ bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase_UNSAFE(OperationCont return false; } +bool ReplicationCoordinatorImpl::canAcceptNonLocalWrites() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _readWriteAbility->canAcceptNonLocalWrites(lk); +} + bool ReplicationCoordinatorImpl::canAcceptWritesFor(OperationContext* opCtx, const NamespaceString& ns) { invariant(opCtx->lockState()->isRSTLLocked()); diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 588c543ca41..0356b3b77c5 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -113,6 +113,8 @@ public: virtual std::vector<MemberData> getMemberData() const override; + virtual bool canAcceptNonLocalWrites() const override; + virtual Status waitForMemberState(MemberState expectedState, Milliseconds timeout) override; virtual bool isInPrimaryOrSecondaryState(OperationContext* opCtx) const override; diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp index e8415946dfc..ec622e52ba3 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp @@ -247,6 +247,8 @@ void ReplicationCoordinatorImpl::_writeLastVoteForMyElection( return cbData.status; } auto opCtx = cc().makeOperationContext(); + // Any writes that occur as part of an election should not be subject to Flow Control. + opCtx->setShouldParticipateInFlowControl(false); return _externalState->storeLocalLastVoteDocument(opCtx.get(), lastVote); }(); diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index a1ef20fab24..40df9529262 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -108,6 +108,11 @@ std::vector<MemberData> ReplicationCoordinatorMock::getMemberData() const { return {}; } +bool ReplicationCoordinatorMock::canAcceptNonLocalWrites() const { + MONGO_UNREACHABLE; + return false; +} + Status ReplicationCoordinatorMock::waitForMemberState(MemberState expectedState, Milliseconds timeout) { MONGO_UNREACHABLE; diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 215e1954c70..076d849f560 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -81,6 +81,8 @@ public: virtual MemberState getMemberState() const; + virtual bool canAcceptNonLocalWrites() const; + virtual Status waitForMemberState(MemberState expectedState, Milliseconds timeout) override; virtual bool isInPrimaryOrSecondaryState(OperationContext* opCtx) const; diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index f3d5cfb637f..5ee9f020ebe 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -384,6 +384,9 @@ void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) { } void ReplCoordTest::signalDrainComplete(OperationContext* opCtx) { + // Writes that occur in code paths that call signalDrainComplete are expected to be excluded + // from Flow Control. + opCtx->setShouldParticipateInFlowControl(false); getExternalState()->setFirstOpTimeOfMyTerm(OpTime(Timestamp(1, 1), getReplCoord()->getTerm())); getReplCoord()->signalDrainComplete(opCtx, getReplCoord()->getTerm()); } diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index befd6b94771..a06c7d2dc8f 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -423,6 +423,11 @@ void scheduleWritesToOplog(OperationContext* opCtx, invariant(status); auto opCtx = cc().makeOperationContext(); + + // This code path is only executed on secondaries and initial syncing nodes, so it is + // safe to exclude any writes from Flow Control. + opCtx->setShouldParticipateInFlowControl(false); + UnreplicatedWritesBlock uwb(opCtx.get()); ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock( opCtx->lockState()); @@ -729,6 +734,11 @@ void SyncTail::_oplogApplication(ReplicationCoordinator* replCoord, const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext(); OperationContext& opCtx = *opCtxPtr; + // This code path gets used during elections, so it should not be subject to Flow Control. + // It is safe to exclude this operation context from Flow Control here because this code + // path only gets used on secondaries or on a node transitioning to primary. + opCtx.setShouldParticipateInFlowControl(false); + // For pausing replication in tests. if (MONGO_FAIL_POINT(rsSyncApplyStop)) { log() << "sync tail - rsSyncApplyStop fail point enabled. Blocking until fail point is " @@ -1270,6 +1280,11 @@ void SyncTail::_applyOps(std::vector<MultiApplier::OperationPtrs>& writerVectors invariant(scheduleStatus); auto opCtx = cc().makeOperationContext(); + + // This code path is only executed on secondaries and initial syncing nodes, so it is + // safe to exclude any writes from Flow Control. + opCtx->setShouldParticipateInFlowControl(false); + status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown( [&] { return _applyFunc(opCtx.get(), &writer, this, &workerMultikeyPathInfo); }); }); diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript index 49dca278980..7713e8c75c7 100644 --- a/src/mongo/db/storage/SConscript +++ b/src/mongo/db/storage/SConscript @@ -360,21 +360,30 @@ env.Library( ) env.Library( + target='flow_control_parameters', + source=[ + env.Idlc('flow_control_parameters.idl')[0], + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/idl/server_parameter', + ], +) + +env.Library( target='flow_control', source=[ 'flow_control.cpp', - env.Idlc('flow_control_parameters.idl')[0], ], LIBDEPS=[ '$BUILD_DIR/mongo/db/commands/server_status', ], LIBDEPS_PRIVATE=[ + 'flow_control_parameters', '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/concurrency/flow_control_ticketholder', '$BUILD_DIR/mongo/db/concurrency/lock_manager', '$BUILD_DIR/mongo/db/server_options_core', '$BUILD_DIR/mongo/db/service_context', - '$BUILD_DIR/mongo/idl/server_parameter', '$BUILD_DIR/mongo/util/background_job', ], ) diff --git a/src/mongo/db/storage/flow_control.cpp b/src/mongo/db/storage/flow_control.cpp index 5e0ba4c3320..f3006aae278 100644 --- a/src/mongo/db/storage/flow_control.cpp +++ b/src/mongo/db/storage/flow_control.cpp @@ -176,10 +176,8 @@ double FlowControl::_getLocksPerOp() { BSONObj FlowControl::generateSection(OperationContext* opCtx, const BSONElement& configElement) const { - // Lag is not meaningful on arbiters. - const bool isArbiter = - _replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet && - _replCoord->getMemberState().arbiter(); + // Flow Control does not have use for lag measured on nodes that cannot accept writes. + const bool canAcceptWrites = _replCoord->canAcceptNonLocalWrites(); // Flow Control is only enabled if FCV is 4.2. const bool isFCV42 = @@ -198,7 +196,8 @@ BSONObj FlowControl::generateSection(OperationContext* opCtx, FlowControlTicketholder::get(opCtx)->totalTimeAcquiringMicros()); bob.append("locksPerOp", _lastLocksPerOp.load()); bob.append("sustainerRate", _lastSustainerAppliedCount.load()); - bob.append("isLagged", isFCV42 && !isArbiter && isLagged(myLastAppliedWall, lastCommittedWall)); + bob.append("isLagged", + isFCV42 && canAcceptWrites && isLagged(myLastAppliedWall, lastCommittedWall)); return bob.obj(); } @@ -225,7 +224,7 @@ int FlowControl::_calculateNewTicketsForLag(const std::vector<repl::MemberData>& using namespace fmt::literals; const auto currSustainerAppliedTs = getMedianAppliedTimestamp(currMemberData); - const auto prevSustainerAppliedTs = getMedianAppliedTimestamp(_prevMemberData); + const auto prevSustainerAppliedTs = getMedianAppliedTimestamp(prevMemberData); invariant(prevSustainerAppliedTs <= currSustainerAppliedTs, "PrevSustainer: {} CurrSustainer: {}"_format(prevSustainerAppliedTs.toString(), currSustainerAppliedTs.toString())); @@ -253,10 +252,9 @@ int FlowControl::_calculateNewTicketsForLag(const std::vector<repl::MemberData>& } int FlowControl::getNumTickets() { - // Lag is not meaningful on arbiters. - const bool isArbiter = - _replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet && - _replCoord->getMemberState().arbiter(); + + // Flow Control is only enabled on nodes that can accept writes. + const bool canAcceptWrites = _replCoord->canAcceptNonLocalWrites(); // Flow Control is only enabled if FCV is 4.2. const bool isFCV42 = @@ -272,7 +270,8 @@ int FlowControl::getNumTickets() { const std::int64_t locksUsedLastPeriod = _getLocksUsedLastPeriod(); if (serverGlobalParams.enableMajorityReadConcern == false || - gFlowControlEnabled.load() == false || isFCV42 == false || isArbiter || locksPerOp < 0.0) { + gFlowControlEnabled.load() == false || isFCV42 == false || canAcceptWrites == false || + locksPerOp < 0.0) { _trimSamples(std::min(lastCommitted.opTime.getTimestamp(), getMedianAppliedTimestamp(_prevMemberData))); return kMaxTickets; diff --git a/src/mongo/db/storage/flow_control.h b/src/mongo/db/storage/flow_control.h index 2261778547d..e9b89052fd6 100644 --- a/src/mongo/db/storage/flow_control.h +++ b/src/mongo/db/storage/flow_control.h @@ -95,7 +95,7 @@ private: repl::ReplicationCoordinator* _replCoord; - // These values are updated with each flow control computation that are also surfaced in server + // These values are updated with each flow control computation and are also surfaced in server // status. AtomicWord<int> _lastTargetTicketsPermitted{0}; AtomicWord<double> _lastLocksPerOp{0.0}; diff --git a/src/mongo/dbtests/replica_set_tests.cpp b/src/mongo/dbtests/replica_set_tests.cpp index b04f4f066f6..34d8145606d 100644 --- a/src/mongo/dbtests/replica_set_tests.cpp +++ b/src/mongo/dbtests/replica_set_tests.cpp @@ -96,6 +96,8 @@ private: TEST_F(ReplicaSetTest, ReplCoordExternalStateStoresLastVoteWithNewTerm) { auto opCtx = makeOpCtx(); + // Methods that do writes as part of elections expect Flow Control to be disabled. + opCtx->setShouldParticipateInFlowControl(false); auto replCoordExternalState = getReplCoordExternalState(); replCoordExternalState->storeLocalLastVoteDocument(opCtx.get(), repl::LastVote{2, 1}) @@ -117,6 +119,8 @@ TEST_F(ReplicaSetTest, ReplCoordExternalStateStoresLastVoteWithNewTerm) { TEST_F(ReplicaSetTest, ReplCoordExternalStateDoesNotStoreLastVoteWithOldTerm) { auto opCtx = makeOpCtx(); + // Methods that do writes as part of elections expect Flow Control to be disabled. + opCtx->setShouldParticipateInFlowControl(false); auto replCoordExternalState = getReplCoordExternalState(); replCoordExternalState->storeLocalLastVoteDocument(opCtx.get(), repl::LastVote{2, 1}) @@ -138,6 +142,8 @@ TEST_F(ReplicaSetTest, ReplCoordExternalStateDoesNotStoreLastVoteWithOldTerm) { TEST_F(ReplicaSetTest, ReplCoordExternalStateDoesNotStoreLastVoteWithEqualTerm) { auto opCtx = makeOpCtx(); + // Methods that do writes as part of elections expect Flow Control to be disabled. + opCtx->setShouldParticipateInFlowControl(false); auto replCoordExternalState = getReplCoordExternalState(); replCoordExternalState->storeLocalLastVoteDocument(opCtx.get(), repl::LastVote{2, 1}) diff --git a/src/mongo/embedded/replication_coordinator_embedded.cpp b/src/mongo/embedded/replication_coordinator_embedded.cpp index 7c19bf2b7f8..87b04673039 100644 --- a/src/mongo/embedded/replication_coordinator_embedded.cpp +++ b/src/mongo/embedded/replication_coordinator_embedded.cpp @@ -159,6 +159,10 @@ std::vector<repl::MemberData> ReplicationCoordinatorEmbedded::getMemberData() co UASSERT_NOT_IMPLEMENTED; } +bool ReplicationCoordinatorEmbedded::canAcceptNonLocalWrites() const { + UASSERT_NOT_IMPLEMENTED; +} + Status ReplicationCoordinatorEmbedded::waitForMemberState(MemberState, Milliseconds) { UASSERT_NOT_IMPLEMENTED; } diff --git a/src/mongo/embedded/replication_coordinator_embedded.h b/src/mongo/embedded/replication_coordinator_embedded.h index cec4184d320..9fe6f6ae999 100644 --- a/src/mongo/embedded/replication_coordinator_embedded.h +++ b/src/mongo/embedded/replication_coordinator_embedded.h @@ -92,6 +92,8 @@ public: repl::MemberState getMemberState() const override; + bool canAcceptNonLocalWrites() const override; + std::vector<repl::MemberData> getMemberData() const override; Status waitForMemberState(repl::MemberState, Milliseconds) override; |