summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml2
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/query/plan_executor_impl.cpp9
-rw-r--r--src/mongo/db/query/plan_executor_sbe.cpp7
-rw-r--r--src/mongo/db/query/plan_insert_listener.cpp40
-rw-r--r--src/mongo/db/query/plan_insert_listener.h91
7 files changed, 107 insertions, 45 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml
index 17a29d4f9cf..f838d68afd5 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml
@@ -7,8 +7,6 @@ selector:
##
# TODO SERVER-68341: Implement enable/disable command for mongoQ in the serverless.
- jstests/change_streams/projection_fakes_internal_event.js
- # TODO SERVER-69959: Implement a majority-committed insert listener.
- - jstests/change_streams/only_wake_getmore_for_relevant_changes.js
##
# TODO SERVER-70760: This test creates its own sharded cluster and uses transaction. The support
diff --git a/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml
index 73adfbc19b4..1b59797a303 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml
@@ -4,8 +4,6 @@ selector:
roots:
- jstests/change_streams/**/*.js
exclude_files:
- # TODO SERVER-69959: Implement a majority-committed insert listener.
- - jstests/change_streams/only_wake_getmore_for_relevant_changes.js
# TODO SERVER-68341: Implement enable/disable command for mongoQ in the serverless.
- jstests/change_streams/projection_fakes_internal_event.js
# TODO SERVER-68557 This test list databases that does not work in the sharded-cluster. This test
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 0f014dd8848..01433848047 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1536,6 +1536,7 @@ env.Library(
'$BUILD_DIR/mongo/db/query/ce/query_ce_histogram',
'$BUILD_DIR/mongo/db/query/ce/query_ce_sampling',
'$BUILD_DIR/mongo/db/query/optimizer/optimizer',
+ '$BUILD_DIR/mongo/db/repl/wait_for_majority_service',
'$BUILD_DIR/mongo/db/session/kill_sessions',
'$BUILD_DIR/mongo/db/sorter/sorter_stats',
'$BUILD_DIR/mongo/db/stats/resource_consumption_metrics',
diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp
index 2c25777d8f2..b6fb30d2bd6 100644
--- a/src/mongo/db/query/plan_executor_impl.cpp
+++ b/src/mongo/db/query/plan_executor_impl.cpp
@@ -353,11 +353,10 @@ PlanExecutor::ExecState PlanExecutorImpl::_getNextImpl(Snapshotted<Document>* ob
// Capped insert data; declared outside the loop so we hold a shared pointer to the capped
// insert notifier the entire time we are in the loop. Holding a shared pointer to the
// capped insert notifier is necessary for the notifierVersion to advance.
- insert_listener::CappedInsertNotifierData cappedInsertNotifierData;
+ std::unique_ptr<insert_listener::Notifier> notifier;
if (insert_listener::shouldListenForInserts(_opCtx, _cq.get())) {
- // We always construct the CappedInsertNotifier for awaitData cursors.
- cappedInsertNotifierData.notifier =
- insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get());
+ // We always construct the insert_listener::Notifier for awaitData cursors.
+ notifier = insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get());
}
for (;;) {
// These are the conditions which can cause us to yield:
@@ -497,7 +496,7 @@ PlanExecutor::ExecState PlanExecutorImpl::_getNextImpl(Snapshotted<Document>* ob
return PlanExecutor::IS_EOF;
}
- insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), &cappedInsertNotifierData);
+ insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), notifier);
// There may be more results, keep going.
continue;
diff --git a/src/mongo/db/query/plan_executor_sbe.cpp b/src/mongo/db/query/plan_executor_sbe.cpp
index 0c436764c78..5f9c347f463 100644
--- a/src/mongo/db/query/plan_executor_sbe.cpp
+++ b/src/mongo/db/query/plan_executor_sbe.cpp
@@ -273,14 +273,13 @@ PlanExecutor::ExecState PlanExecutorSBE::getNextImpl(ObjectType* out, RecordId*
//
// Note that we need to hold a database intent lock before acquiring a notifier.
boost::optional<AutoGetCollectionForReadMaybeLockFree> coll;
- insert_listener::CappedInsertNotifierData cappedInsertNotifierData;
+ std::unique_ptr<insert_listener::Notifier> notifier;
if (insert_listener::shouldListenForInserts(_opCtx, _cq.get())) {
if (!_opCtx->lockState()->isCollectionLockedForMode(_nss, MODE_IS)) {
coll.emplace(_opCtx, _nss);
}
- cappedInsertNotifierData.notifier =
- insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get());
+ notifier = insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get());
}
for (;;) {
@@ -325,7 +324,7 @@ PlanExecutor::ExecState PlanExecutorSBE::getNextImpl(ObjectType* out, RecordId*
return PlanExecutor::ExecState::IS_EOF;
}
- insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), &cappedInsertNotifierData);
+ insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), notifier);
// There may be more results, keep going.
continue;
} else if (_resumeRecordIdSlot) {
diff --git a/src/mongo/db/query/plan_insert_listener.cpp b/src/mongo/db/query/plan_insert_listener.cpp
index d6ee899dce5..df947c2e725 100644
--- a/src/mongo/db/query/plan_insert_listener.cpp
+++ b/src/mongo/db/query/plan_insert_listener.cpp
@@ -31,6 +31,8 @@
#include "mongo/db/query/plan_insert_listener.h"
+#include <memory>
+
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/curop.h"
@@ -76,28 +78,31 @@ bool shouldWaitForInserts(OperationContext* opCtx,
return false;
}
-std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier(OperationContext* opCtx,
- const NamespaceString& nss,
- PlanYieldPolicy* yieldPolicy) {
+std::unique_ptr<Notifier> getCappedInsertNotifier(OperationContext* opCtx,
+ const NamespaceString& nss,
+ PlanYieldPolicy* yieldPolicy) {
// We don't expect to need a capped insert notifier for non-yielding plans.
invariant(yieldPolicy->canReleaseLocksDuringExecution());
- // We can only wait if we have a collection; otherwise we should retry immediately when
- // we hit EOF.
+ // In case of the read concern majority, return a majority committed point notifier, otherwise,
+ // a notifier associated with that capped collection
//
- // Hold reference to the catalog for collection lookup without locks to be safe.
- auto catalog = CollectionCatalog::get(opCtx);
- auto collection = catalog->lookupCollectionByNamespace(opCtx, nss);
- invariant(collection);
-
- return collection->getRecordStore()->getCappedInsertNotifier();
+ // We can only wait on the capped collection insert notifier if the collection is present,
+ // otherwise we should retry immediately when we hit EOF.
+ if (opCtx->recoveryUnit()->getTimestampReadSource() == RecoveryUnit::kMajorityCommitted) {
+ return std::make_unique<MajorityCommittedPointNotifier>();
+ } else {
+ auto collection = CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, nss);
+ invariant(collection);
+
+ return std::make_unique<LocalCappedInsertNotifier>(
+ collection->getRecordStore()->getCappedInsertNotifier());
+ }
}
void waitForInserts(OperationContext* opCtx,
PlanYieldPolicy* yieldPolicy,
- CappedInsertNotifierData* notifierData) {
- invariant(notifierData->notifier);
-
+ std::unique_ptr<Notifier>& notifier) {
// The notifier wait() method will not wait unless the version passed to it matches the
// current version of the notifier. Since the version passed to it is the current version
// of the notifier at the time of the previous EOF, we require two EOFs in a row with no
@@ -107,10 +112,10 @@ void waitForInserts(OperationContext* opCtx,
curOp->pauseTimer();
ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); });
- uint64_t currentNotifierVersion = notifierData->notifier->getVersion();
- auto yieldResult = yieldPolicy->yieldOrInterrupt(opCtx, [opCtx, notifierData] {
+ notifier->prepareForWait(opCtx);
+ auto yieldResult = yieldPolicy->yieldOrInterrupt(opCtx, [opCtx, &notifier] {
const auto deadline = awaitDataState(opCtx).waitForInsertsDeadline;
- notifierData->notifier->waitUntil(notifierData->lastEOFVersion, deadline);
+ notifier->waitUntil(opCtx, deadline);
if (MONGO_unlikely(planExecutorHangWhileYieldedInWaitForInserts.shouldFail())) {
LOGV2(4452903,
"PlanExecutor - planExecutorHangWhileYieldedInWaitForInserts fail point enabled. "
@@ -118,7 +123,6 @@ void waitForInserts(OperationContext* opCtx,
planExecutorHangWhileYieldedInWaitForInserts.pauseWhileSet();
}
});
- notifierData->lastEOFVersion = currentNotifierVersion;
uassertStatusOK(yieldResult);
}
diff --git a/src/mongo/db/query/plan_insert_listener.h b/src/mongo/db/query/plan_insert_listener.h
index be4407d3df8..71cf468e8a8 100644
--- a/src/mongo/db/query/plan_insert_listener.h
+++ b/src/mongo/db/query/plan_insert_listener.h
@@ -33,15 +33,78 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/query/canonical_query.h"
#include "mongo/db/query/plan_yield_policy.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/logv2/log.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
namespace mongo::insert_listener {
-/**
- * A helper wrapper struct around CappedInsertNotifier which also holds the last version returned
- * by the 'notifier'.
- */
-struct CappedInsertNotifierData {
- std::shared_ptr<CappedInsertNotifier> notifier;
- uint64_t lastEOFVersion = ~0;
+
+// An abstract class used to notify on new insert events.
+class Notifier {
+public:
+ virtual ~Notifier(){};
+
+ // Performs the necessary work needed for waiting. Should be called prior calling waitUntil().
+ virtual void prepareForWait(OperationContext* opCtx) = 0;
+
+ // Blocks the caller until an insert event is fired or the deadline is hit.
+ virtual void waitUntil(OperationContext* opCtx, Date_t deadline) = 0;
+};
+
+// Class used to notify listeners of local inserts into the capped collection.
+class LocalCappedInsertNotifier final : public Notifier {
+public:
+ LocalCappedInsertNotifier(std::shared_ptr<CappedInsertNotifier> notifier)
+ : _notifier(notifier) {}
+
+ void prepareForWait(OperationContext* opCtx) final {
+ invariant(_notifier);
+ }
+
+ void waitUntil(OperationContext* opCtx, Date_t deadline) final {
+ auto currentVersion = _notifier->getVersion();
+ _notifier->waitUntil(_lastEOFVersion, deadline);
+ _lastEOFVersion = currentVersion;
+ }
+
+private:
+ std::shared_ptr<CappedInsertNotifier> _notifier;
+ uint64_t _lastEOFVersion;
+};
+
+// Class used to notify listeners on majority committed point advancement events.
+class MajorityCommittedPointNotifier final : public Notifier {
+public:
+ MajorityCommittedPointNotifier(repl::OpTime opTime = repl::OpTime())
+ : _opTimeToBeMajorityCommitted(opTime) {}
+
+ // Computes the OpTime to wait on by incrementing the current read timestamp.
+ void prepareForWait(OperationContext* opCtx) final {
+ auto readTs = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx);
+ invariant(readTs);
+ _opTimeToBeMajorityCommitted =
+ repl::OpTime(*readTs + 1, repl::ReplicationCoordinator::get(opCtx)->getTerm());
+ }
+
+ void waitUntil(OperationContext* opCtx, Date_t deadline) final {
+ auto majorityCommittedFuture = WaitForMajorityService::get(opCtx->getServiceContext())
+ .waitUntilMajorityForRead(_opTimeToBeMajorityCommitted,
+ opCtx->getCancellationToken());
+ opCtx->runWithDeadline(deadline, opCtx->getTimeoutError(), [&] {
+ auto status = majorityCommittedFuture.getNoThrow(opCtx);
+ if (!status.isOK()) {
+ LOGV2_DEBUG(7455500,
+ 3,
+ "Failure waiting for the majority committed event",
+ "status"_attr = status.toString());
+ }
+ });
+ }
+
+private:
+ repl::OpTime _opTimeToBeMajorityCommitted;
};
/**
@@ -61,12 +124,11 @@ bool shouldWaitForInserts(OperationContext* opCtx,
PlanYieldPolicy* yieldPolicy);
/**
- * Gets the CappedInsertNotifier for a capped collection. Returns nullptr if this plan executor
- * is not capable of yielding based on a notifier.
+ * Returns an insert notifier for a capped collection.
*/
-std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier(OperationContext* opCtx,
- const NamespaceString& nss,
- PlanYieldPolicy* yieldPolicy);
+std::unique_ptr<Notifier> getCappedInsertNotifier(OperationContext* opCtx,
+ const NamespaceString& nss,
+ PlanYieldPolicy* yieldPolicy);
/**
* Called for tailable and awaitData cursors in order to yield locks and waits for inserts to
@@ -74,8 +136,9 @@ std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier(OperationContext*
* and there may be new results. If the PlanExecutor was killed during a yield, throws an
* exception.
*/
-
void waitForInserts(OperationContext* opCtx,
PlanYieldPolicy* yieldPolicy,
- CappedInsertNotifierData* notifierData);
+ std::unique_ptr<Notifier>& notifier);
} // namespace mongo::insert_listener
+
+#undef MONGO_LOGV2_DEFAULT_COMPONENT