From 34ac49477b87e183637f68cda828ecff8b393c64 Mon Sep 17 00:00:00 2001 From: Denis Grebennicov Date: Tue, 29 Nov 2022 10:00:27 +0000 Subject: SERVER-69959 Introduce majority committed point advancement notification mechanism --- .../change_streams_multitenant_passthrough.yml | 2 - ...multitenant_sharded_collections_passthrough.yml | 2 - src/mongo/db/SConscript | 1 + src/mongo/db/query/plan_executor_impl.cpp | 9 ++- src/mongo/db/query/plan_executor_sbe.cpp | 7 +- src/mongo/db/query/plan_insert_listener.cpp | 40 ++++++----- src/mongo/db/query/plan_insert_listener.h | 80 ++++++++++++++++++---- 7 files changed, 98 insertions(+), 43 deletions(-) diff --git a/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml index 9ad92676e3e..783c18a8626 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml @@ -6,8 +6,6 @@ selector: exclude_files: # TODO SERVER-68341: Implement enable/disable command for mongoQ in the serverless. - jstests/change_streams/projection_fakes_internal_event.js - # TODO SERVER-69959: Implement a majority-committed insert listener. - - jstests/change_streams/only_wake_getmore_for_relevant_changes.js # The following test uses the 'find' method directly on 'config'.'system.preimages' collection # while this suite enforces the tenant-specific pre-image collection for the test tenant (a.k.a. # '000000000000000000000000_config'.'system.preimages'), which is not directly accessible diff --git a/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml index 72cfa5b91d6..51dbc708087 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml @@ -4,8 +4,6 @@ selector: roots: - jstests/change_streams/**/*.js exclude_files: - # TODO SERVER-69959: Implement a majority-committed insert listener. - - jstests/change_streams/only_wake_getmore_for_relevant_changes.js # TODO SERVER-68341: Implement enable/disable command for mongoQ in the serverless. - jstests/change_streams/projection_fakes_internal_event.js # TODO SERVER-68557 This test list databases that does not work in the sharded-cluster. This test diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index a33c66d3cdf..d0bdca2977b 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1524,6 +1524,7 @@ env.Library( '$BUILD_DIR/mongo/db/query/ce/query_ce_histogram', '$BUILD_DIR/mongo/db/query/ce/query_ce_sampling', '$BUILD_DIR/mongo/db/query/optimizer/optimizer', + '$BUILD_DIR/mongo/db/repl/wait_for_majority_service', '$BUILD_DIR/mongo/db/session/kill_sessions', '$BUILD_DIR/mongo/db/stats/resource_consumption_metrics', '$BUILD_DIR/mongo/db/storage/record_store_base', diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp index 5d1524251e5..5678a0e32d6 100644 --- a/src/mongo/db/query/plan_executor_impl.cpp +++ b/src/mongo/db/query/plan_executor_impl.cpp @@ -355,11 +355,10 @@ PlanExecutor::ExecState PlanExecutorImpl::_getNextImpl(Snapshotted* ob // Capped insert data; declared outside the loop so we hold a shared pointer to the capped // insert notifier the entire time we are in the loop. Holding a shared pointer to the // capped insert notifier is necessary for the notifierVersion to advance. - insert_listener::CappedInsertNotifierData cappedInsertNotifierData; + std::unique_ptr notifier; if (insert_listener::shouldListenForInserts(_opCtx, _cq.get())) { - // We always construct the CappedInsertNotifier for awaitData cursors. - cappedInsertNotifierData.notifier = - insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get()); + // We always construct the insert_listener::Notifier for awaitData cursors. + notifier = insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get()); } for (;;) { // These are the conditions which can cause us to yield: @@ -499,7 +498,7 @@ PlanExecutor::ExecState PlanExecutorImpl::_getNextImpl(Snapshotted* ob return PlanExecutor::IS_EOF; } - insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), &cappedInsertNotifierData); + insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), notifier); // There may be more results, keep going. continue; diff --git a/src/mongo/db/query/plan_executor_sbe.cpp b/src/mongo/db/query/plan_executor_sbe.cpp index 791408aeb9c..51f94f70a43 100644 --- a/src/mongo/db/query/plan_executor_sbe.cpp +++ b/src/mongo/db/query/plan_executor_sbe.cpp @@ -272,14 +272,13 @@ PlanExecutor::ExecState PlanExecutorSBE::getNextImpl(ObjectType* out, RecordId* // // Note that we need to hold a database intent lock before acquiring a notifier. boost::optional coll; - insert_listener::CappedInsertNotifierData cappedInsertNotifierData; + std::unique_ptr notifier; if (insert_listener::shouldListenForInserts(_opCtx, _cq.get())) { if (!_opCtx->lockState()->isCollectionLockedForMode(_nss, MODE_IS)) { coll.emplace(_opCtx, _nss); } - cappedInsertNotifierData.notifier = - insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get()); + notifier = insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get()); } for (;;) { @@ -324,7 +323,7 @@ PlanExecutor::ExecState PlanExecutorSBE::getNextImpl(ObjectType* out, RecordId* return PlanExecutor::ExecState::IS_EOF; } - insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), &cappedInsertNotifierData); + insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), notifier); // There may be more results, keep going. continue; } else if (_resumeRecordIdSlot) { diff --git a/src/mongo/db/query/plan_insert_listener.cpp b/src/mongo/db/query/plan_insert_listener.cpp index 1f7546be064..6e31be68b0b 100644 --- a/src/mongo/db/query/plan_insert_listener.cpp +++ b/src/mongo/db/query/plan_insert_listener.cpp @@ -31,6 +31,8 @@ #include "mongo/db/query/plan_insert_listener.h" +#include + #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/curop.h" @@ -76,25 +78,32 @@ bool shouldWaitForInserts(OperationContext* opCtx, return false; } -std::shared_ptr getCappedInsertNotifier(OperationContext* opCtx, - const NamespaceString& nss, - PlanYieldPolicy* yieldPolicy) { +std::unique_ptr getCappedInsertNotifier(OperationContext* opCtx, + const NamespaceString& nss, + PlanYieldPolicy* yieldPolicy) { // We don't expect to need a capped insert notifier for non-yielding plans. invariant(yieldPolicy->canReleaseLocksDuringExecution()); - // We can only wait if we have a collection; otherwise we should retry immediately when - // we hit EOF. - auto collection = CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForRead(opCtx, nss); - invariant(collection); - - return collection->getRecordStore()->getCappedInsertNotifier(); + // In case of the read concern majority, return a majority committed point notifier, otherwise, + // a notifier associated with that capped collection + // + // We can only wait on the capped collection insert notifier if the collection is present, + // otherwise we should retry immediately when we hit EOF. + if (opCtx->recoveryUnit()->getTimestampReadSource() == RecoveryUnit::kMajorityCommitted) { + return std::make_unique(); + } else { + auto collection = + CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForRead(opCtx, nss); + invariant(collection); + + return std::make_unique( + collection->getRecordStore()->getCappedInsertNotifier()); + } } void waitForInserts(OperationContext* opCtx, PlanYieldPolicy* yieldPolicy, - CappedInsertNotifierData* notifierData) { - invariant(notifierData->notifier); - + std::unique_ptr& notifier) { // The notifier wait() method will not wait unless the version passed to it matches the // current version of the notifier. Since the version passed to it is the current version // of the notifier at the time of the previous EOF, we require two EOFs in a row with no @@ -104,10 +113,10 @@ void waitForInserts(OperationContext* opCtx, curOp->pauseTimer(); ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); }); - uint64_t currentNotifierVersion = notifierData->notifier->getVersion(); - auto yieldResult = yieldPolicy->yieldOrInterrupt(opCtx, [opCtx, notifierData] { + notifier->prepareForWait(opCtx); + auto yieldResult = yieldPolicy->yieldOrInterrupt(opCtx, [opCtx, ¬ifier] { const auto deadline = awaitDataState(opCtx).waitForInsertsDeadline; - notifierData->notifier->waitUntil(notifierData->lastEOFVersion, deadline); + notifier->waitUntil(opCtx, deadline); if (MONGO_unlikely(planExecutorHangWhileYieldedInWaitForInserts.shouldFail())) { LOGV2(4452903, "PlanExecutor - planExecutorHangWhileYieldedInWaitForInserts fail point enabled. " @@ -115,7 +124,6 @@ void waitForInserts(OperationContext* opCtx, planExecutorHangWhileYieldedInWaitForInserts.pauseWhileSet(); } }); - notifierData->lastEOFVersion = currentNotifierVersion; uassertStatusOK(yieldResult); } diff --git a/src/mongo/db/query/plan_insert_listener.h b/src/mongo/db/query/plan_insert_listener.h index be4407d3df8..094789301ba 100644 --- a/src/mongo/db/query/plan_insert_listener.h +++ b/src/mongo/db/query/plan_insert_listener.h @@ -33,15 +33,69 @@ #include "mongo/db/operation_context.h" #include "mongo/db/query/canonical_query.h" #include "mongo/db/query/plan_yield_policy.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/wait_for_majority_service.h" namespace mongo::insert_listener { -/** - * A helper wrapper struct around CappedInsertNotifier which also holds the last version returned - * by the 'notifier'. - */ -struct CappedInsertNotifierData { - std::shared_ptr notifier; - uint64_t lastEOFVersion = ~0; + +// An abstract class used to notify on new insert events. +class Notifier { +public: + virtual ~Notifier(){}; + + // Performs the necessary work needed for waiting. Should be called prior calling waitUntil(). + virtual void prepareForWait(OperationContext* opCtx) = 0; + + // Blocks the caller until an insert event is fired or the deadline is hit. + virtual void waitUntil(OperationContext* opCtx, Date_t deadline) = 0; +}; + +// Class used to notify listeners of local inserts into the capped collection. +class LocalCappedInsertNotifier final : public Notifier { +public: + LocalCappedInsertNotifier(std::shared_ptr notifier) + : _notifier(notifier) {} + + void prepareForWait(OperationContext* opCtx) final { + invariant(_notifier); + } + + void waitUntil(OperationContext* opCtx, Date_t deadline) final { + auto currentVersion = _notifier->getVersion(); + _notifier->waitUntil(_lastEOFVersion, deadline); + _lastEOFVersion = currentVersion; + } + +private: + std::shared_ptr _notifier; + uint64_t _lastEOFVersion; +}; + +// Class used to notify listeners on majority committed point advancement events. +class MajorityCommittedPointNotifier final : public Notifier { +public: + MajorityCommittedPointNotifier(repl::OpTime opTime = repl::OpTime()) + : _opTimeToBeMajorityCommitted(opTime) {} + + // Computes the OpTime to wait on by incrementing the current read timestamp. + void prepareForWait(OperationContext* opCtx) final { + auto readTs = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx); + invariant(readTs); + _opTimeToBeMajorityCommitted = + repl::OpTime(*readTs + 1, repl::ReplicationCoordinator::get(opCtx)->getTerm()); + } + + void waitUntil(OperationContext* opCtx, Date_t deadline) final { + auto majorityCommittedFuture = + WaitForMajorityService::get(opCtx->getServiceContext()) + .waitUntilMajority(_opTimeToBeMajorityCommitted, opCtx->getCancellationToken()); + opCtx->runWithDeadline(deadline, opCtx->getTimeoutError(), [&] { + (void)majorityCommittedFuture.getNoThrow(opCtx); + }); + } + +private: + repl::OpTime _opTimeToBeMajorityCommitted; }; /** @@ -61,12 +115,11 @@ bool shouldWaitForInserts(OperationContext* opCtx, PlanYieldPolicy* yieldPolicy); /** - * Gets the CappedInsertNotifier for a capped collection. Returns nullptr if this plan executor - * is not capable of yielding based on a notifier. + * Returns a CappedInsertNotifier for a capped collection. */ -std::shared_ptr getCappedInsertNotifier(OperationContext* opCtx, - const NamespaceString& nss, - PlanYieldPolicy* yieldPolicy); +std::unique_ptr getCappedInsertNotifier(OperationContext* opCtx, + const NamespaceString& nss, + PlanYieldPolicy* yieldPolicy); /** * Called for tailable and awaitData cursors in order to yield locks and waits for inserts to @@ -74,8 +127,7 @@ std::shared_ptr getCappedInsertNotifier(OperationContext* * and there may be new results. If the PlanExecutor was killed during a yield, throws an * exception. */ - void waitForInserts(OperationContext* opCtx, PlanYieldPolicy* yieldPolicy, - CappedInsertNotifierData* notifierData); + std::unique_ptr& notifier); } // namespace mongo::insert_listener -- cgit v1.2.1