summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/concurrency/lock_state.cpp3
-rw-r--r--src/mongo/db/operation_context.h9
-rw-r--r--src/mongo/db/repl/noop_writer.cpp7
-rw-r--r--src/mongo/db/repl/repl_set_request_votes.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator.h5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp6
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp3
-rw-r--r--src/mongo/db/repl/sync_tail.cpp15
-rw-r--r--src/mongo/db/storage/SConscript13
-rw-r--r--src/mongo/db/storage/flow_control.cpp21
-rw-r--r--src/mongo/db/storage/flow_control.h2
-rw-r--r--src/mongo/dbtests/replica_set_tests.cpp6
-rw-r--r--src/mongo/embedded/replication_coordinator_embedded.cpp4
-rw-r--r--src/mongo/embedded/replication_coordinator_embedded.h2
19 files changed, 97 insertions, 17 deletions
diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp
index 9b67bc18331..8f1ec5c6c21 100644
--- a/src/mongo/db/concurrency/lock_state.cpp
+++ b/src/mongo/db/concurrency/lock_state.cpp
@@ -919,7 +919,8 @@ void LockerImpl::lockComplete(OperationContext* opCtx,
void LockerImpl::getFlowControlTicket(OperationContext* opCtx, LockMode lockMode) {
auto ticketholder = FlowControlTicketholder::get(opCtx);
- if (ticketholder && lockMode == LockMode::MODE_IX && _clientState.load() == kInactive) {
+ if (ticketholder && lockMode == LockMode::MODE_IX && _clientState.load() == kInactive &&
+ opCtx->shouldParticipateInFlowControl()) {
// FlowControl only acts when a MODE_IX global lock is being taken. The clientState is only
// being modified here to change serverStatus' `globalLock.currentQueue` metrics. This
// method must not exit with a side-effect on the clientState. That value is also used for
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index 07a72c9a03d..58d1d14ec67 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -80,6 +80,14 @@ public:
OperationContext(Client* client, unsigned int opId);
virtual ~OperationContext();
+ bool shouldParticipateInFlowControl() const {
+ return _shouldParticipateInFlowControl;
+ }
+
+ void setShouldParticipateInFlowControl(bool target) {
+ _shouldParticipateInFlowControl = target;
+ }
+
/**
* Interface for durability. Caller DOES NOT own pointer.
*/
@@ -471,6 +479,7 @@ private:
Timer _elapsedTime;
bool _writesAreReplicated = true;
+ bool _shouldParticipateInFlowControl = true;
};
namespace repl {
diff --git a/src/mongo/db/repl/noop_writer.cpp b/src/mongo/db/repl/noop_writer.cpp
index 6897acab22c..8c0fbfaa6b9 100644
--- a/src/mongo/db/repl/noop_writer.cpp
+++ b/src/mongo/db/repl/noop_writer.cpp
@@ -130,8 +130,11 @@ Status NoopWriter::startWritingPeriodicNoops(OpTime lastKnownOpTime) {
_lastKnownOpTime = lastKnownOpTime;
invariant(!_noopRunner);
- _noopRunner = stdx::make_unique<PeriodicNoopRunner>(
- _writeInterval, [this](OperationContext* opCtx) { _writeNoop(opCtx); });
+ _noopRunner =
+ stdx::make_unique<PeriodicNoopRunner>(_writeInterval, [this](OperationContext* opCtx) {
+ opCtx->setShouldParticipateInFlowControl(false);
+ _writeNoop(opCtx);
+ });
return Status::OK();
}
diff --git a/src/mongo/db/repl/repl_set_request_votes.cpp b/src/mongo/db/repl/repl_set_request_votes.cpp
index 86b94e2d771..2654bc24ccb 100644
--- a/src/mongo/db/repl/repl_set_request_votes.cpp
+++ b/src/mongo/db/repl/repl_set_request_votes.cpp
@@ -59,6 +59,8 @@ private:
status = parsedArgs.initialize(cmdObj);
uassertStatusOK(status);
+ // Any writes that occur as part of an election should not be subject to Flow Control.
+ opCtx->setShouldParticipateInFlowControl(false);
ReplSetRequestVotesResponse response;
status = ReplicationCoordinator::get(opCtx)->processReplSetRequestVotes(
opCtx, parsedArgs, &response);
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index 3a8f9c2e667..4593ce48c7c 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -153,6 +153,11 @@ public:
virtual MemberState getMemberState() const = 0;
/**
+ * Returns whether this node can accept writes to databases other than local.
+ */
+ virtual bool canAcceptNonLocalWrites() const = 0;
+
+ /**
* Waits for 'timeout' ms for member state to become 'state'.
* Returns OK if member state is 'state'.
* Returns ErrorCodes::ExceededTimeLimit if we timed out waiting for the state change.
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index c34fe99d4f7..2e163ee30ea 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -449,6 +449,7 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati
void ReplicationCoordinatorExternalStateImpl::onDrainComplete(OperationContext* opCtx) {
invariant(!opCtx->lockState()->isLocked());
+ invariant(!opCtx->shouldParticipateInFlowControl());
// If this is a config server node becoming a primary, ensure the balancer is ready to start.
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
@@ -460,6 +461,7 @@ void ReplicationCoordinatorExternalStateImpl::onDrainComplete(OperationContext*
OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationContext* opCtx) {
invariant(opCtx->lockState()->isRSTLExclusive());
+ invariant(!opCtx->shouldParticipateInFlowControl());
// Clear the appliedThrough marker so on startup we'll use the top of the oplog. This must be
// done before we add anything to our oplog.
@@ -571,6 +573,10 @@ StatusWith<LastVote> ReplicationCoordinatorExternalStateImpl::loadLocalLastVoteD
Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument(
OperationContext* opCtx, const LastVote& lastVote) {
BSONObj lastVoteObj = lastVote.toBSON();
+
+ // Writes that are part of elections should not be throttled.
+ invariant(!opCtx->shouldParticipateInFlowControl());
+
try {
Status status =
writeConflictRetry(opCtx, "save replica set lastVote", lastVoteCollectionName, [&] {
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index c306a06ca59..3353c3cd506 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -2146,6 +2146,11 @@ bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase_UNSAFE(OperationCont
return false;
}
+bool ReplicationCoordinatorImpl::canAcceptNonLocalWrites() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _readWriteAbility->canAcceptNonLocalWrites(lk);
+}
+
bool ReplicationCoordinatorImpl::canAcceptWritesFor(OperationContext* opCtx,
const NamespaceString& ns) {
invariant(opCtx->lockState()->isRSTLLocked());
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 588c543ca41..0356b3b77c5 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -113,6 +113,8 @@ public:
virtual std::vector<MemberData> getMemberData() const override;
+ virtual bool canAcceptNonLocalWrites() const override;
+
virtual Status waitForMemberState(MemberState expectedState, Milliseconds timeout) override;
virtual bool isInPrimaryOrSecondaryState(OperationContext* opCtx) const override;
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
index e8415946dfc..ec622e52ba3 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
@@ -247,6 +247,8 @@ void ReplicationCoordinatorImpl::_writeLastVoteForMyElection(
return cbData.status;
}
auto opCtx = cc().makeOperationContext();
+ // Any writes that occur as part of an election should not be subject to Flow Control.
+ opCtx->setShouldParticipateInFlowControl(false);
return _externalState->storeLocalLastVoteDocument(opCtx.get(), lastVote);
}();
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index a1ef20fab24..40df9529262 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -108,6 +108,11 @@ std::vector<MemberData> ReplicationCoordinatorMock::getMemberData() const {
return {};
}
+bool ReplicationCoordinatorMock::canAcceptNonLocalWrites() const {
+ MONGO_UNREACHABLE;
+ return false;
+}
+
Status ReplicationCoordinatorMock::waitForMemberState(MemberState expectedState,
Milliseconds timeout) {
MONGO_UNREACHABLE;
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 215e1954c70..076d849f560 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -81,6 +81,8 @@ public:
virtual MemberState getMemberState() const;
+ virtual bool canAcceptNonLocalWrites() const;
+
virtual Status waitForMemberState(MemberState expectedState, Milliseconds timeout) override;
virtual bool isInPrimaryOrSecondaryState(OperationContext* opCtx) const;
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index f3d5cfb637f..5ee9f020ebe 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -384,6 +384,9 @@ void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) {
}
void ReplCoordTest::signalDrainComplete(OperationContext* opCtx) {
+ // Writes that occur in code paths that call signalDrainComplete are expected to be excluded
+ // from Flow Control.
+ opCtx->setShouldParticipateInFlowControl(false);
getExternalState()->setFirstOpTimeOfMyTerm(OpTime(Timestamp(1, 1), getReplCoord()->getTerm()));
getReplCoord()->signalDrainComplete(opCtx, getReplCoord()->getTerm());
}
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index befd6b94771..a06c7d2dc8f 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -423,6 +423,11 @@ void scheduleWritesToOplog(OperationContext* opCtx,
invariant(status);
auto opCtx = cc().makeOperationContext();
+
+ // This code path is only executed on secondaries and initial syncing nodes, so it is
+ // safe to exclude any writes from Flow Control.
+ opCtx->setShouldParticipateInFlowControl(false);
+
UnreplicatedWritesBlock uwb(opCtx.get());
ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock(
opCtx->lockState());
@@ -729,6 +734,11 @@ void SyncTail::_oplogApplication(ReplicationCoordinator* replCoord,
const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext();
OperationContext& opCtx = *opCtxPtr;
+ // This code path gets used during elections, so it should not be subject to Flow Control.
+ // It is safe to exclude this operation context from Flow Control here because this code
+ // path only gets used on secondaries or on a node transitioning to primary.
+ opCtx.setShouldParticipateInFlowControl(false);
+
// For pausing replication in tests.
if (MONGO_FAIL_POINT(rsSyncApplyStop)) {
log() << "sync tail - rsSyncApplyStop fail point enabled. Blocking until fail point is "
@@ -1270,6 +1280,11 @@ void SyncTail::_applyOps(std::vector<MultiApplier::OperationPtrs>& writerVectors
invariant(scheduleStatus);
auto opCtx = cc().makeOperationContext();
+
+ // This code path is only executed on secondaries and initial syncing nodes, so it is
+ // safe to exclude any writes from Flow Control.
+ opCtx->setShouldParticipateInFlowControl(false);
+
status = opCtx->runWithoutInterruptionExceptAtGlobalShutdown(
[&] { return _applyFunc(opCtx.get(), &writer, this, &workerMultikeyPathInfo); });
});
diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript
index 49dca278980..7713e8c75c7 100644
--- a/src/mongo/db/storage/SConscript
+++ b/src/mongo/db/storage/SConscript
@@ -360,21 +360,30 @@ env.Library(
)
env.Library(
+ target='flow_control_parameters',
+ source=[
+ env.Idlc('flow_control_parameters.idl')[0],
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/idl/server_parameter',
+ ],
+)
+
+env.Library(
target='flow_control',
source=[
'flow_control.cpp',
- env.Idlc('flow_control_parameters.idl')[0],
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/commands/server_status',
],
LIBDEPS_PRIVATE=[
+ 'flow_control_parameters',
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/concurrency/flow_control_ticketholder',
'$BUILD_DIR/mongo/db/concurrency/lock_manager',
'$BUILD_DIR/mongo/db/server_options_core',
'$BUILD_DIR/mongo/db/service_context',
- '$BUILD_DIR/mongo/idl/server_parameter',
'$BUILD_DIR/mongo/util/background_job',
],
)
diff --git a/src/mongo/db/storage/flow_control.cpp b/src/mongo/db/storage/flow_control.cpp
index 5e0ba4c3320..f3006aae278 100644
--- a/src/mongo/db/storage/flow_control.cpp
+++ b/src/mongo/db/storage/flow_control.cpp
@@ -176,10 +176,8 @@ double FlowControl::_getLocksPerOp() {
BSONObj FlowControl::generateSection(OperationContext* opCtx,
const BSONElement& configElement) const {
- // Lag is not meaningful on arbiters.
- const bool isArbiter =
- _replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet &&
- _replCoord->getMemberState().arbiter();
+ // Flow Control does not have use for lag measured on nodes that cannot accept writes.
+ const bool canAcceptWrites = _replCoord->canAcceptNonLocalWrites();
// Flow Control is only enabled if FCV is 4.2.
const bool isFCV42 =
@@ -198,7 +196,8 @@ BSONObj FlowControl::generateSection(OperationContext* opCtx,
FlowControlTicketholder::get(opCtx)->totalTimeAcquiringMicros());
bob.append("locksPerOp", _lastLocksPerOp.load());
bob.append("sustainerRate", _lastSustainerAppliedCount.load());
- bob.append("isLagged", isFCV42 && !isArbiter && isLagged(myLastAppliedWall, lastCommittedWall));
+ bob.append("isLagged",
+ isFCV42 && canAcceptWrites && isLagged(myLastAppliedWall, lastCommittedWall));
return bob.obj();
}
@@ -225,7 +224,7 @@ int FlowControl::_calculateNewTicketsForLag(const std::vector<repl::MemberData>&
using namespace fmt::literals;
const auto currSustainerAppliedTs = getMedianAppliedTimestamp(currMemberData);
- const auto prevSustainerAppliedTs = getMedianAppliedTimestamp(_prevMemberData);
+ const auto prevSustainerAppliedTs = getMedianAppliedTimestamp(prevMemberData);
invariant(prevSustainerAppliedTs <= currSustainerAppliedTs,
"PrevSustainer: {} CurrSustainer: {}"_format(prevSustainerAppliedTs.toString(),
currSustainerAppliedTs.toString()));
@@ -253,10 +252,9 @@ int FlowControl::_calculateNewTicketsForLag(const std::vector<repl::MemberData>&
}
int FlowControl::getNumTickets() {
- // Lag is not meaningful on arbiters.
- const bool isArbiter =
- _replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet &&
- _replCoord->getMemberState().arbiter();
+
+ // Flow Control is only enabled on nodes that can accept writes.
+ const bool canAcceptWrites = _replCoord->canAcceptNonLocalWrites();
// Flow Control is only enabled if FCV is 4.2.
const bool isFCV42 =
@@ -272,7 +270,8 @@ int FlowControl::getNumTickets() {
const std::int64_t locksUsedLastPeriod = _getLocksUsedLastPeriod();
if (serverGlobalParams.enableMajorityReadConcern == false ||
- gFlowControlEnabled.load() == false || isFCV42 == false || isArbiter || locksPerOp < 0.0) {
+ gFlowControlEnabled.load() == false || isFCV42 == false || canAcceptWrites == false ||
+ locksPerOp < 0.0) {
_trimSamples(std::min(lastCommitted.opTime.getTimestamp(),
getMedianAppliedTimestamp(_prevMemberData)));
return kMaxTickets;
diff --git a/src/mongo/db/storage/flow_control.h b/src/mongo/db/storage/flow_control.h
index 2261778547d..e9b89052fd6 100644
--- a/src/mongo/db/storage/flow_control.h
+++ b/src/mongo/db/storage/flow_control.h
@@ -95,7 +95,7 @@ private:
repl::ReplicationCoordinator* _replCoord;
- // These values are updated with each flow control computation that are also surfaced in server
+ // These values are updated with each flow control computation and are also surfaced in server
// status.
AtomicWord<int> _lastTargetTicketsPermitted{0};
AtomicWord<double> _lastLocksPerOp{0.0};
diff --git a/src/mongo/dbtests/replica_set_tests.cpp b/src/mongo/dbtests/replica_set_tests.cpp
index b04f4f066f6..34d8145606d 100644
--- a/src/mongo/dbtests/replica_set_tests.cpp
+++ b/src/mongo/dbtests/replica_set_tests.cpp
@@ -96,6 +96,8 @@ private:
TEST_F(ReplicaSetTest, ReplCoordExternalStateStoresLastVoteWithNewTerm) {
auto opCtx = makeOpCtx();
+ // Methods that do writes as part of elections expect Flow Control to be disabled.
+ opCtx->setShouldParticipateInFlowControl(false);
auto replCoordExternalState = getReplCoordExternalState();
replCoordExternalState->storeLocalLastVoteDocument(opCtx.get(), repl::LastVote{2, 1})
@@ -117,6 +119,8 @@ TEST_F(ReplicaSetTest, ReplCoordExternalStateStoresLastVoteWithNewTerm) {
TEST_F(ReplicaSetTest, ReplCoordExternalStateDoesNotStoreLastVoteWithOldTerm) {
auto opCtx = makeOpCtx();
+ // Methods that do writes as part of elections expect Flow Control to be disabled.
+ opCtx->setShouldParticipateInFlowControl(false);
auto replCoordExternalState = getReplCoordExternalState();
replCoordExternalState->storeLocalLastVoteDocument(opCtx.get(), repl::LastVote{2, 1})
@@ -138,6 +142,8 @@ TEST_F(ReplicaSetTest, ReplCoordExternalStateDoesNotStoreLastVoteWithOldTerm) {
TEST_F(ReplicaSetTest, ReplCoordExternalStateDoesNotStoreLastVoteWithEqualTerm) {
auto opCtx = makeOpCtx();
+ // Methods that do writes as part of elections expect Flow Control to be disabled.
+ opCtx->setShouldParticipateInFlowControl(false);
auto replCoordExternalState = getReplCoordExternalState();
replCoordExternalState->storeLocalLastVoteDocument(opCtx.get(), repl::LastVote{2, 1})
diff --git a/src/mongo/embedded/replication_coordinator_embedded.cpp b/src/mongo/embedded/replication_coordinator_embedded.cpp
index 7c19bf2b7f8..87b04673039 100644
--- a/src/mongo/embedded/replication_coordinator_embedded.cpp
+++ b/src/mongo/embedded/replication_coordinator_embedded.cpp
@@ -159,6 +159,10 @@ std::vector<repl::MemberData> ReplicationCoordinatorEmbedded::getMemberData() co
UASSERT_NOT_IMPLEMENTED;
}
+bool ReplicationCoordinatorEmbedded::canAcceptNonLocalWrites() const {
+ UASSERT_NOT_IMPLEMENTED;
+}
+
Status ReplicationCoordinatorEmbedded::waitForMemberState(MemberState, Milliseconds) {
UASSERT_NOT_IMPLEMENTED;
}
diff --git a/src/mongo/embedded/replication_coordinator_embedded.h b/src/mongo/embedded/replication_coordinator_embedded.h
index cec4184d320..9fe6f6ae999 100644
--- a/src/mongo/embedded/replication_coordinator_embedded.h
+++ b/src/mongo/embedded/replication_coordinator_embedded.h
@@ -92,6 +92,8 @@ public:
repl::MemberState getMemberState() const override;
+ bool canAcceptNonLocalWrites() const override;
+
std::vector<repl::MemberData> getMemberData() const override;
Status waitForMemberState(repl::MemberState, Milliseconds) override;