diff options
author | David Storch <david.storch@mongodb.com> | 2023-03-02 18:58:35 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-03-02 19:47:27 +0000 |
commit | 96763fa1fef7faa2513afd2618d4d039ee70a6fe (patch) | |
tree | 7f5a28ab11cd5aeefc639881897054d2d6db9c53 | |
parent | a687e8ade189ec0e311ded76effab420813c808d (diff) | |
download | mongo-96763fa1fef7faa2513afd2618d4d039ee70a6fe.tar.gz |
SERVER-74526 Fix high CPU utilization for a change streams opened against secondary nodes
This reverts commit 34ac49477b87e183637f68cda828ecff8b393c64. Future
work is needed to reintroduce the changes from SERVER-69959 without
causing the problematic "busy wait" behavior on secondary nodes.
7 files changed, 43 insertions, 98 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml index f838d68afd5..17a29d4f9cf 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml @@ -7,6 +7,8 @@ selector: ## # TODO SERVER-68341: Implement enable/disable command for mongoQ in the serverless. - jstests/change_streams/projection_fakes_internal_event.js + # TODO SERVER-69959: Implement a majority-committed insert listener. + - jstests/change_streams/only_wake_getmore_for_relevant_changes.js ## # TODO SERVER-70760: This test creates its own sharded cluster and uses transaction. The support diff --git a/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml index 1b59797a303..73adfbc19b4 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml @@ -4,6 +4,8 @@ selector: roots: - jstests/change_streams/**/*.js exclude_files: + # TODO SERVER-69959: Implement a majority-committed insert listener. + - jstests/change_streams/only_wake_getmore_for_relevant_changes.js # TODO SERVER-68341: Implement enable/disable command for mongoQ in the serverless. - jstests/change_streams/projection_fakes_internal_event.js # TODO SERVER-68557 This test list databases that does not work in the sharded-cluster. This test diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 86494b576dd..8089d6e2c99 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1475,7 +1475,6 @@ env.Library( '$BUILD_DIR/mongo/db/query/ce/query_ce_histogram', '$BUILD_DIR/mongo/db/query/ce/query_ce_sampling', '$BUILD_DIR/mongo/db/query/optimizer/optimizer', - '$BUILD_DIR/mongo/db/repl/wait_for_majority_service', '$BUILD_DIR/mongo/db/session/kill_sessions', '$BUILD_DIR/mongo/db/sorter/sorter_stats', '$BUILD_DIR/mongo/db/stats/resource_consumption_metrics', diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp index 677267fc8b7..27f7fe90a15 100644 --- a/src/mongo/db/query/plan_executor_impl.cpp +++ b/src/mongo/db/query/plan_executor_impl.cpp @@ -353,10 +353,11 @@ PlanExecutor::ExecState PlanExecutorImpl::_getNextImpl(Snapshotted<Document>* ob // Capped insert data; declared outside the loop so we hold a shared pointer to the capped // insert notifier the entire time we are in the loop. Holding a shared pointer to the // capped insert notifier is necessary for the notifierVersion to advance. - std::unique_ptr<insert_listener::Notifier> notifier; + insert_listener::CappedInsertNotifierData cappedInsertNotifierData; if (insert_listener::shouldListenForInserts(_opCtx, _cq.get())) { - // We always construct the insert_listener::Notifier for awaitData cursors. - notifier = insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get()); + // We always construct the CappedInsertNotifier for awaitData cursors. + cappedInsertNotifierData.notifier = + insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get()); } for (;;) { // These are the conditions which can cause us to yield: @@ -496,7 +497,7 @@ PlanExecutor::ExecState PlanExecutorImpl::_getNextImpl(Snapshotted<Document>* ob return PlanExecutor::IS_EOF; } - insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), notifier); + insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), &cappedInsertNotifierData); // There may be more results, keep going. continue; diff --git a/src/mongo/db/query/plan_executor_sbe.cpp b/src/mongo/db/query/plan_executor_sbe.cpp index f47f2a99d71..0fb5f85b8fa 100644 --- a/src/mongo/db/query/plan_executor_sbe.cpp +++ b/src/mongo/db/query/plan_executor_sbe.cpp @@ -273,13 +273,14 @@ PlanExecutor::ExecState PlanExecutorSBE::getNextImpl(ObjectType* out, RecordId* // // Note that we need to hold a database intent lock before acquiring a notifier. boost::optional<AutoGetCollectionForReadMaybeLockFree> coll; - std::unique_ptr<insert_listener::Notifier> notifier; + insert_listener::CappedInsertNotifierData cappedInsertNotifierData; if (insert_listener::shouldListenForInserts(_opCtx, _cq.get())) { if (!_opCtx->lockState()->isCollectionLockedForMode(_nss, MODE_IS)) { coll.emplace(_opCtx, _nss); } - notifier = insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get()); + cappedInsertNotifierData.notifier = + insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get()); } for (;;) { @@ -324,7 +325,7 @@ PlanExecutor::ExecState PlanExecutorSBE::getNextImpl(ObjectType* out, RecordId* return PlanExecutor::ExecState::IS_EOF; } - insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), notifier); + insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), &cappedInsertNotifierData); // There may be more results, keep going. continue; } else if (_resumeRecordIdSlot) { diff --git a/src/mongo/db/query/plan_insert_listener.cpp b/src/mongo/db/query/plan_insert_listener.cpp index 6e31be68b0b..1f7546be064 100644 --- a/src/mongo/db/query/plan_insert_listener.cpp +++ b/src/mongo/db/query/plan_insert_listener.cpp @@ -31,8 +31,6 @@ #include "mongo/db/query/plan_insert_listener.h" -#include <memory> - #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/curop.h" @@ -78,32 +76,25 @@ bool shouldWaitForInserts(OperationContext* opCtx, return false; } -std::unique_ptr<Notifier> getCappedInsertNotifier(OperationContext* opCtx, - const NamespaceString& nss, - PlanYieldPolicy* yieldPolicy) { +std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier(OperationContext* opCtx, + const NamespaceString& nss, + PlanYieldPolicy* yieldPolicy) { // We don't expect to need a capped insert notifier for non-yielding plans. invariant(yieldPolicy->canReleaseLocksDuringExecution()); - // In case of the read concern majority, return a majority committed point notifier, otherwise, - // a notifier associated with that capped collection - // - // We can only wait on the capped collection insert notifier if the collection is present, - // otherwise we should retry immediately when we hit EOF. - if (opCtx->recoveryUnit()->getTimestampReadSource() == RecoveryUnit::kMajorityCommitted) { - return std::make_unique<MajorityCommittedPointNotifier>(); - } else { - auto collection = - CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForRead(opCtx, nss); - invariant(collection); - - return std::make_unique<LocalCappedInsertNotifier>( - collection->getRecordStore()->getCappedInsertNotifier()); - } + // We can only wait if we have a collection; otherwise we should retry immediately when + // we hit EOF. + auto collection = CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForRead(opCtx, nss); + invariant(collection); + + return collection->getRecordStore()->getCappedInsertNotifier(); } void waitForInserts(OperationContext* opCtx, PlanYieldPolicy* yieldPolicy, - std::unique_ptr<Notifier>& notifier) { + CappedInsertNotifierData* notifierData) { + invariant(notifierData->notifier); + // The notifier wait() method will not wait unless the version passed to it matches the // current version of the notifier. Since the version passed to it is the current version // of the notifier at the time of the previous EOF, we require two EOFs in a row with no @@ -113,10 +104,10 @@ void waitForInserts(OperationContext* opCtx, curOp->pauseTimer(); ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); }); - notifier->prepareForWait(opCtx); - auto yieldResult = yieldPolicy->yieldOrInterrupt(opCtx, [opCtx, ¬ifier] { + uint64_t currentNotifierVersion = notifierData->notifier->getVersion(); + auto yieldResult = yieldPolicy->yieldOrInterrupt(opCtx, [opCtx, notifierData] { const auto deadline = awaitDataState(opCtx).waitForInsertsDeadline; - notifier->waitUntil(opCtx, deadline); + notifierData->notifier->waitUntil(notifierData->lastEOFVersion, deadline); if (MONGO_unlikely(planExecutorHangWhileYieldedInWaitForInserts.shouldFail())) { LOGV2(4452903, "PlanExecutor - planExecutorHangWhileYieldedInWaitForInserts fail point enabled. " @@ -124,6 +115,7 @@ void waitForInserts(OperationContext* opCtx, planExecutorHangWhileYieldedInWaitForInserts.pauseWhileSet(); } }); + notifierData->lastEOFVersion = currentNotifierVersion; uassertStatusOK(yieldResult); } diff --git a/src/mongo/db/query/plan_insert_listener.h b/src/mongo/db/query/plan_insert_listener.h index b16b5cff7b7..be4407d3df8 100644 --- a/src/mongo/db/query/plan_insert_listener.h +++ b/src/mongo/db/query/plan_insert_listener.h @@ -33,69 +33,15 @@ #include "mongo/db/operation_context.h" #include "mongo/db/query/canonical_query.h" #include "mongo/db/query/plan_yield_policy.h" -#include "mongo/db/repl/optime.h" -#include "mongo/db/repl/wait_for_majority_service.h" namespace mongo::insert_listener { - -// An abstract class used to notify on new insert events. -class Notifier { -public: - virtual ~Notifier(){}; - - // Performs the necessary work needed for waiting. Should be called prior calling waitUntil(). - virtual void prepareForWait(OperationContext* opCtx) = 0; - - // Blocks the caller until an insert event is fired or the deadline is hit. - virtual void waitUntil(OperationContext* opCtx, Date_t deadline) = 0; -}; - -// Class used to notify listeners of local inserts into the capped collection. -class LocalCappedInsertNotifier final : public Notifier { -public: - LocalCappedInsertNotifier(std::shared_ptr<CappedInsertNotifier> notifier) - : _notifier(notifier) {} - - void prepareForWait(OperationContext* opCtx) final { - invariant(_notifier); - } - - void waitUntil(OperationContext* opCtx, Date_t deadline) final { - auto currentVersion = _notifier->getVersion(); - _notifier->waitUntil(_lastEOFVersion, deadline); - _lastEOFVersion = currentVersion; - } - -private: - std::shared_ptr<CappedInsertNotifier> _notifier; - uint64_t _lastEOFVersion = 0; -}; - -// Class used to notify listeners on majority committed point advancement events. -class MajorityCommittedPointNotifier final : public Notifier { -public: - MajorityCommittedPointNotifier(repl::OpTime opTime = repl::OpTime()) - : _opTimeToBeMajorityCommitted(opTime) {} - - // Computes the OpTime to wait on by incrementing the current read timestamp. - void prepareForWait(OperationContext* opCtx) final { - auto readTs = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx); - invariant(readTs); - _opTimeToBeMajorityCommitted = - repl::OpTime(*readTs + 1, repl::ReplicationCoordinator::get(opCtx)->getTerm()); - } - - void waitUntil(OperationContext* opCtx, Date_t deadline) final { - auto majorityCommittedFuture = - WaitForMajorityService::get(opCtx->getServiceContext()) - .waitUntilMajority(_opTimeToBeMajorityCommitted, opCtx->getCancellationToken()); - opCtx->runWithDeadline(deadline, opCtx->getTimeoutError(), [&] { - (void)majorityCommittedFuture.getNoThrow(opCtx); - }); - } - -private: - repl::OpTime _opTimeToBeMajorityCommitted; +/** + * A helper wrapper struct around CappedInsertNotifier which also holds the last version returned + * by the 'notifier'. + */ +struct CappedInsertNotifierData { + std::shared_ptr<CappedInsertNotifier> notifier; + uint64_t lastEOFVersion = ~0; }; /** @@ -115,11 +61,12 @@ bool shouldWaitForInserts(OperationContext* opCtx, PlanYieldPolicy* yieldPolicy); /** - * Returns a CappedInsertNotifier for a capped collection. + * Gets the CappedInsertNotifier for a capped collection. Returns nullptr if this plan executor + * is not capable of yielding based on a notifier. */ -std::unique_ptr<Notifier> getCappedInsertNotifier(OperationContext* opCtx, - const NamespaceString& nss, - PlanYieldPolicy* yieldPolicy); +std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier(OperationContext* opCtx, + const NamespaceString& nss, + PlanYieldPolicy* yieldPolicy); /** * Called for tailable and awaitData cursors in order to yield locks and waits for inserts to @@ -127,7 +74,8 @@ std::unique_ptr<Notifier> getCappedInsertNotifier(OperationContext* opCtx, * and there may be new results. If the PlanExecutor was killed during a yield, throws an * exception. */ + void waitForInserts(OperationContext* opCtx, PlanYieldPolicy* yieldPolicy, - std::unique_ptr<Notifier>& notifier); + CappedInsertNotifierData* notifierData); } // namespace mongo::insert_listener |