summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Storch <david.storch@mongodb.com>2023-03-02 18:58:35 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-03-02 19:47:27 +0000
commit96763fa1fef7faa2513afd2618d4d039ee70a6fe (patch)
tree7f5a28ab11cd5aeefc639881897054d2d6db9c53
parenta687e8ade189ec0e311ded76effab420813c808d (diff)
downloadmongo-96763fa1fef7faa2513afd2618d4d039ee70a6fe.tar.gz
SERVER-74526 Fix high CPU utilization for a change streams opened against secondary nodes
This reverts commit 34ac49477b87e183637f68cda828ecff8b393c64. Future work is needed to reintroduce the changes from SERVER-69959 without causing the problematic "busy wait" behavior on secondary nodes.
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml2
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/query/plan_executor_impl.cpp9
-rw-r--r--src/mongo/db/query/plan_executor_sbe.cpp7
-rw-r--r--src/mongo/db/query/plan_insert_listener.cpp40
-rw-r--r--src/mongo/db/query/plan_insert_listener.h80
7 files changed, 43 insertions, 98 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml
index f838d68afd5..17a29d4f9cf 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml
@@ -7,6 +7,8 @@ selector:
##
# TODO SERVER-68341: Implement enable/disable command for mongoQ in the serverless.
- jstests/change_streams/projection_fakes_internal_event.js
+ # TODO SERVER-69959: Implement a majority-committed insert listener.
+ - jstests/change_streams/only_wake_getmore_for_relevant_changes.js
##
# TODO SERVER-70760: This test creates its own sharded cluster and uses transaction. The support
diff --git a/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml
index 1b59797a303..73adfbc19b4 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml
@@ -4,6 +4,8 @@ selector:
roots:
- jstests/change_streams/**/*.js
exclude_files:
+ # TODO SERVER-69959: Implement a majority-committed insert listener.
+ - jstests/change_streams/only_wake_getmore_for_relevant_changes.js
# TODO SERVER-68341: Implement enable/disable command for mongoQ in the serverless.
- jstests/change_streams/projection_fakes_internal_event.js
# TODO SERVER-68557 This test list databases that does not work in the sharded-cluster. This test
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 86494b576dd..8089d6e2c99 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1475,7 +1475,6 @@ env.Library(
'$BUILD_DIR/mongo/db/query/ce/query_ce_histogram',
'$BUILD_DIR/mongo/db/query/ce/query_ce_sampling',
'$BUILD_DIR/mongo/db/query/optimizer/optimizer',
- '$BUILD_DIR/mongo/db/repl/wait_for_majority_service',
'$BUILD_DIR/mongo/db/session/kill_sessions',
'$BUILD_DIR/mongo/db/sorter/sorter_stats',
'$BUILD_DIR/mongo/db/stats/resource_consumption_metrics',
diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp
index 677267fc8b7..27f7fe90a15 100644
--- a/src/mongo/db/query/plan_executor_impl.cpp
+++ b/src/mongo/db/query/plan_executor_impl.cpp
@@ -353,10 +353,11 @@ PlanExecutor::ExecState PlanExecutorImpl::_getNextImpl(Snapshotted<Document>* ob
// Capped insert data; declared outside the loop so we hold a shared pointer to the capped
// insert notifier the entire time we are in the loop. Holding a shared pointer to the
// capped insert notifier is necessary for the notifierVersion to advance.
- std::unique_ptr<insert_listener::Notifier> notifier;
+ insert_listener::CappedInsertNotifierData cappedInsertNotifierData;
if (insert_listener::shouldListenForInserts(_opCtx, _cq.get())) {
- // We always construct the insert_listener::Notifier for awaitData cursors.
- notifier = insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get());
+ // We always construct the CappedInsertNotifier for awaitData cursors.
+ cappedInsertNotifierData.notifier =
+ insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get());
}
for (;;) {
// These are the conditions which can cause us to yield:
@@ -496,7 +497,7 @@ PlanExecutor::ExecState PlanExecutorImpl::_getNextImpl(Snapshotted<Document>* ob
return PlanExecutor::IS_EOF;
}
- insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), notifier);
+ insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), &cappedInsertNotifierData);
// There may be more results, keep going.
continue;
diff --git a/src/mongo/db/query/plan_executor_sbe.cpp b/src/mongo/db/query/plan_executor_sbe.cpp
index f47f2a99d71..0fb5f85b8fa 100644
--- a/src/mongo/db/query/plan_executor_sbe.cpp
+++ b/src/mongo/db/query/plan_executor_sbe.cpp
@@ -273,13 +273,14 @@ PlanExecutor::ExecState PlanExecutorSBE::getNextImpl(ObjectType* out, RecordId*
//
// Note that we need to hold a database intent lock before acquiring a notifier.
boost::optional<AutoGetCollectionForReadMaybeLockFree> coll;
- std::unique_ptr<insert_listener::Notifier> notifier;
+ insert_listener::CappedInsertNotifierData cappedInsertNotifierData;
if (insert_listener::shouldListenForInserts(_opCtx, _cq.get())) {
if (!_opCtx->lockState()->isCollectionLockedForMode(_nss, MODE_IS)) {
coll.emplace(_opCtx, _nss);
}
- notifier = insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get());
+ cappedInsertNotifierData.notifier =
+ insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get());
}
for (;;) {
@@ -324,7 +325,7 @@ PlanExecutor::ExecState PlanExecutorSBE::getNextImpl(ObjectType* out, RecordId*
return PlanExecutor::ExecState::IS_EOF;
}
- insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), notifier);
+ insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), &cappedInsertNotifierData);
// There may be more results, keep going.
continue;
} else if (_resumeRecordIdSlot) {
diff --git a/src/mongo/db/query/plan_insert_listener.cpp b/src/mongo/db/query/plan_insert_listener.cpp
index 6e31be68b0b..1f7546be064 100644
--- a/src/mongo/db/query/plan_insert_listener.cpp
+++ b/src/mongo/db/query/plan_insert_listener.cpp
@@ -31,8 +31,6 @@
#include "mongo/db/query/plan_insert_listener.h"
-#include <memory>
-
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/curop.h"
@@ -78,32 +76,25 @@ bool shouldWaitForInserts(OperationContext* opCtx,
return false;
}
-std::unique_ptr<Notifier> getCappedInsertNotifier(OperationContext* opCtx,
- const NamespaceString& nss,
- PlanYieldPolicy* yieldPolicy) {
+std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier(OperationContext* opCtx,
+ const NamespaceString& nss,
+ PlanYieldPolicy* yieldPolicy) {
// We don't expect to need a capped insert notifier for non-yielding plans.
invariant(yieldPolicy->canReleaseLocksDuringExecution());
- // In case of the read concern majority, return a majority committed point notifier, otherwise,
- // a notifier associated with that capped collection
- //
- // We can only wait on the capped collection insert notifier if the collection is present,
- // otherwise we should retry immediately when we hit EOF.
- if (opCtx->recoveryUnit()->getTimestampReadSource() == RecoveryUnit::kMajorityCommitted) {
- return std::make_unique<MajorityCommittedPointNotifier>();
- } else {
- auto collection =
- CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForRead(opCtx, nss);
- invariant(collection);
-
- return std::make_unique<LocalCappedInsertNotifier>(
- collection->getRecordStore()->getCappedInsertNotifier());
- }
+ // We can only wait if we have a collection; otherwise we should retry immediately when
+ // we hit EOF.
+ auto collection = CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForRead(opCtx, nss);
+ invariant(collection);
+
+ return collection->getRecordStore()->getCappedInsertNotifier();
}
void waitForInserts(OperationContext* opCtx,
PlanYieldPolicy* yieldPolicy,
- std::unique_ptr<Notifier>& notifier) {
+ CappedInsertNotifierData* notifierData) {
+ invariant(notifierData->notifier);
+
// The notifier wait() method will not wait unless the version passed to it matches the
// current version of the notifier. Since the version passed to it is the current version
// of the notifier at the time of the previous EOF, we require two EOFs in a row with no
@@ -113,10 +104,10 @@ void waitForInserts(OperationContext* opCtx,
curOp->pauseTimer();
ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); });
- notifier->prepareForWait(opCtx);
- auto yieldResult = yieldPolicy->yieldOrInterrupt(opCtx, [opCtx, &notifier] {
+ uint64_t currentNotifierVersion = notifierData->notifier->getVersion();
+ auto yieldResult = yieldPolicy->yieldOrInterrupt(opCtx, [opCtx, notifierData] {
const auto deadline = awaitDataState(opCtx).waitForInsertsDeadline;
- notifier->waitUntil(opCtx, deadline);
+ notifierData->notifier->waitUntil(notifierData->lastEOFVersion, deadline);
if (MONGO_unlikely(planExecutorHangWhileYieldedInWaitForInserts.shouldFail())) {
LOGV2(4452903,
"PlanExecutor - planExecutorHangWhileYieldedInWaitForInserts fail point enabled. "
@@ -124,6 +115,7 @@ void waitForInserts(OperationContext* opCtx,
planExecutorHangWhileYieldedInWaitForInserts.pauseWhileSet();
}
});
+ notifierData->lastEOFVersion = currentNotifierVersion;
uassertStatusOK(yieldResult);
}
diff --git a/src/mongo/db/query/plan_insert_listener.h b/src/mongo/db/query/plan_insert_listener.h
index b16b5cff7b7..be4407d3df8 100644
--- a/src/mongo/db/query/plan_insert_listener.h
+++ b/src/mongo/db/query/plan_insert_listener.h
@@ -33,69 +33,15 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/query/canonical_query.h"
#include "mongo/db/query/plan_yield_policy.h"
-#include "mongo/db/repl/optime.h"
-#include "mongo/db/repl/wait_for_majority_service.h"
namespace mongo::insert_listener {
-
-// An abstract class used to notify on new insert events.
-class Notifier {
-public:
- virtual ~Notifier(){};
-
- // Performs the necessary work needed for waiting. Should be called prior calling waitUntil().
- virtual void prepareForWait(OperationContext* opCtx) = 0;
-
- // Blocks the caller until an insert event is fired or the deadline is hit.
- virtual void waitUntil(OperationContext* opCtx, Date_t deadline) = 0;
-};
-
-// Class used to notify listeners of local inserts into the capped collection.
-class LocalCappedInsertNotifier final : public Notifier {
-public:
- LocalCappedInsertNotifier(std::shared_ptr<CappedInsertNotifier> notifier)
- : _notifier(notifier) {}
-
- void prepareForWait(OperationContext* opCtx) final {
- invariant(_notifier);
- }
-
- void waitUntil(OperationContext* opCtx, Date_t deadline) final {
- auto currentVersion = _notifier->getVersion();
- _notifier->waitUntil(_lastEOFVersion, deadline);
- _lastEOFVersion = currentVersion;
- }
-
-private:
- std::shared_ptr<CappedInsertNotifier> _notifier;
- uint64_t _lastEOFVersion = 0;
-};
-
-// Class used to notify listeners on majority committed point advancement events.
-class MajorityCommittedPointNotifier final : public Notifier {
-public:
- MajorityCommittedPointNotifier(repl::OpTime opTime = repl::OpTime())
- : _opTimeToBeMajorityCommitted(opTime) {}
-
- // Computes the OpTime to wait on by incrementing the current read timestamp.
- void prepareForWait(OperationContext* opCtx) final {
- auto readTs = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx);
- invariant(readTs);
- _opTimeToBeMajorityCommitted =
- repl::OpTime(*readTs + 1, repl::ReplicationCoordinator::get(opCtx)->getTerm());
- }
-
- void waitUntil(OperationContext* opCtx, Date_t deadline) final {
- auto majorityCommittedFuture =
- WaitForMajorityService::get(opCtx->getServiceContext())
- .waitUntilMajority(_opTimeToBeMajorityCommitted, opCtx->getCancellationToken());
- opCtx->runWithDeadline(deadline, opCtx->getTimeoutError(), [&] {
- (void)majorityCommittedFuture.getNoThrow(opCtx);
- });
- }
-
-private:
- repl::OpTime _opTimeToBeMajorityCommitted;
+/**
+ * A helper wrapper struct around CappedInsertNotifier which also holds the last version returned
+ * by the 'notifier'.
+ */
+struct CappedInsertNotifierData {
+ std::shared_ptr<CappedInsertNotifier> notifier;
+ uint64_t lastEOFVersion = ~0;
};
/**
@@ -115,11 +61,12 @@ bool shouldWaitForInserts(OperationContext* opCtx,
PlanYieldPolicy* yieldPolicy);
/**
- * Returns a CappedInsertNotifier for a capped collection.
+ * Gets the CappedInsertNotifier for a capped collection. Returns nullptr if this plan executor
+ * is not capable of yielding based on a notifier.
*/
-std::unique_ptr<Notifier> getCappedInsertNotifier(OperationContext* opCtx,
- const NamespaceString& nss,
- PlanYieldPolicy* yieldPolicy);
+std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier(OperationContext* opCtx,
+ const NamespaceString& nss,
+ PlanYieldPolicy* yieldPolicy);
/**
* Called for tailable and awaitData cursors in order to yield locks and waits for inserts to
@@ -127,7 +74,8 @@ std::unique_ptr<Notifier> getCappedInsertNotifier(OperationContext* opCtx,
* and there may be new results. If the PlanExecutor was killed during a yield, throws an
* exception.
*/
+
void waitForInserts(OperationContext* opCtx,
PlanYieldPolicy* yieldPolicy,
- std::unique_ptr<Notifier>& notifier);
+ CappedInsertNotifierData* notifierData);
} // namespace mongo::insert_listener