diff options
author | Denis Grebennicov <denis.grebennicov@mongodb.com> | 2023-04-21 15:25:04 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-04-21 17:15:25 +0000 |
commit | 0234b567d9bf6dad86d59127f23ef629b4eaa6cd (patch) | |
tree | add58a53d87d1c87918382dc0ed055d6206739b4 | |
parent | 2ad995a9161714f30702e1bb9b0b57b5e3623e16 (diff) | |
download | mongo-0234b567d9bf6dad86d59127f23ef629b4eaa6cd.tar.gz |
SERVER-74555 Re-introduce majority commit point advancement notification mechanism and use for change streams
7 files changed, 107 insertions, 45 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml index 17a29d4f9cf..f838d68afd5 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml @@ -7,8 +7,6 @@ selector: ## # TODO SERVER-68341: Implement enable/disable command for mongoQ in the serverless. - jstests/change_streams/projection_fakes_internal_event.js - # TODO SERVER-69959: Implement a majority-committed insert listener. - - jstests/change_streams/only_wake_getmore_for_relevant_changes.js ## # TODO SERVER-70760: This test creates its own sharded cluster and uses transaction. The support diff --git a/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml index a2caa7e4eaa..190d2e6013a 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml @@ -6,8 +6,6 @@ selector: exclude_files: # TODO SERVER-68341: Implement enable/disable command for mongoQ in the serverless. - jstests/change_streams/**/*.js - # TODO SERVER-74555: Implement a majority-committed insert listener. - - jstests/change_streams/only_wake_getmore_for_relevant_changes.js # TODO SERVER-68341: Implement enable/disable command for mongoQ in the serverless. - jstests/change_streams/projection_fakes_internal_event.js # TODO SERVER-68557 This test list databases that does not work in the sharded-cluster. This test diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index f0968b858c2..d86f51eee3a 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1539,6 +1539,7 @@ execEnv.Library( '$BUILD_DIR/mongo/db/query/ce/query_ce_histogram', '$BUILD_DIR/mongo/db/query/ce/query_ce_sampling', '$BUILD_DIR/mongo/db/query/optimizer/optimizer', + '$BUILD_DIR/mongo/db/repl/wait_for_majority_service', '$BUILD_DIR/mongo/db/session/kill_sessions', '$BUILD_DIR/mongo/db/sorter/sorter_idl', '$BUILD_DIR/mongo/db/sorter/sorter_stats', diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp index 593ac2659db..bdab2f383fd 100644 --- a/src/mongo/db/query/plan_executor_impl.cpp +++ b/src/mongo/db/query/plan_executor_impl.cpp @@ -382,11 +382,10 @@ PlanExecutor::ExecState PlanExecutorImpl::_getNextImpl(Snapshotted<Document>* ob // Capped insert data; declared outside the loop so we hold a shared pointer to the capped // insert notifier the entire time we are in the loop. Holding a shared pointer to the // capped insert notifier is necessary for the notifierVersion to advance. - insert_listener::CappedInsertNotifierData cappedInsertNotifierData; + std::unique_ptr<insert_listener::Notifier> notifier; if (insert_listener::shouldListenForInserts(_opCtx, _cq.get())) { - // We always construct the CappedInsertNotifier for awaitData cursors. - cappedInsertNotifierData.notifier = - insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get()); + // We always construct the insert_listener::Notifier for awaitData cursors. + notifier = insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get()); } for (;;) { // These are the conditions which can cause us to yield: @@ -526,7 +525,7 @@ PlanExecutor::ExecState PlanExecutorImpl::_getNextImpl(Snapshotted<Document>* ob return PlanExecutor::IS_EOF; } - insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), &cappedInsertNotifierData); + insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), notifier); // There may be more results, keep going. continue; diff --git a/src/mongo/db/query/plan_executor_sbe.cpp b/src/mongo/db/query/plan_executor_sbe.cpp index 0c436764c78..5f9c347f463 100644 --- a/src/mongo/db/query/plan_executor_sbe.cpp +++ b/src/mongo/db/query/plan_executor_sbe.cpp @@ -273,14 +273,13 @@ PlanExecutor::ExecState PlanExecutorSBE::getNextImpl(ObjectType* out, RecordId* // // Note that we need to hold a database intent lock before acquiring a notifier. boost::optional<AutoGetCollectionForReadMaybeLockFree> coll; - insert_listener::CappedInsertNotifierData cappedInsertNotifierData; + std::unique_ptr<insert_listener::Notifier> notifier; if (insert_listener::shouldListenForInserts(_opCtx, _cq.get())) { if (!_opCtx->lockState()->isCollectionLockedForMode(_nss, MODE_IS)) { coll.emplace(_opCtx, _nss); } - cappedInsertNotifierData.notifier = - insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get()); + notifier = insert_listener::getCappedInsertNotifier(_opCtx, _nss, _yieldPolicy.get()); } for (;;) { @@ -325,7 +324,7 @@ PlanExecutor::ExecState PlanExecutorSBE::getNextImpl(ObjectType* out, RecordId* return PlanExecutor::ExecState::IS_EOF; } - insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), &cappedInsertNotifierData); + insert_listener::waitForInserts(_opCtx, _yieldPolicy.get(), notifier); // There may be more results, keep going. continue; } else if (_resumeRecordIdSlot) { diff --git a/src/mongo/db/query/plan_insert_listener.cpp b/src/mongo/db/query/plan_insert_listener.cpp index d6ee899dce5..df947c2e725 100644 --- a/src/mongo/db/query/plan_insert_listener.cpp +++ b/src/mongo/db/query/plan_insert_listener.cpp @@ -31,6 +31,8 @@ #include "mongo/db/query/plan_insert_listener.h" +#include <memory> + #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/curop.h" @@ -76,28 +78,31 @@ bool shouldWaitForInserts(OperationContext* opCtx, return false; } -std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier(OperationContext* opCtx, - const NamespaceString& nss, - PlanYieldPolicy* yieldPolicy) { +std::unique_ptr<Notifier> getCappedInsertNotifier(OperationContext* opCtx, + const NamespaceString& nss, + PlanYieldPolicy* yieldPolicy) { // We don't expect to need a capped insert notifier for non-yielding plans. invariant(yieldPolicy->canReleaseLocksDuringExecution()); - // We can only wait if we have a collection; otherwise we should retry immediately when - // we hit EOF. + // In case of the read concern majority, return a majority committed point notifier, otherwise, + // a notifier associated with that capped collection // - // Hold reference to the catalog for collection lookup without locks to be safe. - auto catalog = CollectionCatalog::get(opCtx); - auto collection = catalog->lookupCollectionByNamespace(opCtx, nss); - invariant(collection); - - return collection->getRecordStore()->getCappedInsertNotifier(); + // We can only wait on the capped collection insert notifier if the collection is present, + // otherwise we should retry immediately when we hit EOF. + if (opCtx->recoveryUnit()->getTimestampReadSource() == RecoveryUnit::kMajorityCommitted) { + return std::make_unique<MajorityCommittedPointNotifier>(); + } else { + auto collection = CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, nss); + invariant(collection); + + return std::make_unique<LocalCappedInsertNotifier>( + collection->getRecordStore()->getCappedInsertNotifier()); + } } void waitForInserts(OperationContext* opCtx, PlanYieldPolicy* yieldPolicy, - CappedInsertNotifierData* notifierData) { - invariant(notifierData->notifier); - + std::unique_ptr<Notifier>& notifier) { // The notifier wait() method will not wait unless the version passed to it matches the // current version of the notifier. Since the version passed to it is the current version // of the notifier at the time of the previous EOF, we require two EOFs in a row with no @@ -107,10 +112,10 @@ void waitForInserts(OperationContext* opCtx, curOp->pauseTimer(); ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); }); - uint64_t currentNotifierVersion = notifierData->notifier->getVersion(); - auto yieldResult = yieldPolicy->yieldOrInterrupt(opCtx, [opCtx, notifierData] { + notifier->prepareForWait(opCtx); + auto yieldResult = yieldPolicy->yieldOrInterrupt(opCtx, [opCtx, ¬ifier] { const auto deadline = awaitDataState(opCtx).waitForInsertsDeadline; - notifierData->notifier->waitUntil(notifierData->lastEOFVersion, deadline); + notifier->waitUntil(opCtx, deadline); if (MONGO_unlikely(planExecutorHangWhileYieldedInWaitForInserts.shouldFail())) { LOGV2(4452903, "PlanExecutor - planExecutorHangWhileYieldedInWaitForInserts fail point enabled. " @@ -118,7 +123,6 @@ void waitForInserts(OperationContext* opCtx, planExecutorHangWhileYieldedInWaitForInserts.pauseWhileSet(); } }); - notifierData->lastEOFVersion = currentNotifierVersion; uassertStatusOK(yieldResult); } diff --git a/src/mongo/db/query/plan_insert_listener.h b/src/mongo/db/query/plan_insert_listener.h index be4407d3df8..71cf468e8a8 100644 --- a/src/mongo/db/query/plan_insert_listener.h +++ b/src/mongo/db/query/plan_insert_listener.h @@ -33,15 +33,78 @@ #include "mongo/db/operation_context.h" #include "mongo/db/query/canonical_query.h" #include "mongo/db/query/plan_yield_policy.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/logv2/log.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery namespace mongo::insert_listener { -/** - * A helper wrapper struct around CappedInsertNotifier which also holds the last version returned - * by the 'notifier'. - */ -struct CappedInsertNotifierData { - std::shared_ptr<CappedInsertNotifier> notifier; - uint64_t lastEOFVersion = ~0; + +// An abstract class used to notify on new insert events. +class Notifier { +public: + virtual ~Notifier(){}; + + // Performs the necessary work needed for waiting. Should be called prior calling waitUntil(). + virtual void prepareForWait(OperationContext* opCtx) = 0; + + // Blocks the caller until an insert event is fired or the deadline is hit. + virtual void waitUntil(OperationContext* opCtx, Date_t deadline) = 0; +}; + +// Class used to notify listeners of local inserts into the capped collection. +class LocalCappedInsertNotifier final : public Notifier { +public: + LocalCappedInsertNotifier(std::shared_ptr<CappedInsertNotifier> notifier) + : _notifier(notifier) {} + + void prepareForWait(OperationContext* opCtx) final { + invariant(_notifier); + } + + void waitUntil(OperationContext* opCtx, Date_t deadline) final { + auto currentVersion = _notifier->getVersion(); + _notifier->waitUntil(_lastEOFVersion, deadline); + _lastEOFVersion = currentVersion; + } + +private: + std::shared_ptr<CappedInsertNotifier> _notifier; + uint64_t _lastEOFVersion; +}; + +// Class used to notify listeners on majority committed point advancement events. +class MajorityCommittedPointNotifier final : public Notifier { +public: + MajorityCommittedPointNotifier(repl::OpTime opTime = repl::OpTime()) + : _opTimeToBeMajorityCommitted(opTime) {} + + // Computes the OpTime to wait on by incrementing the current read timestamp. + void prepareForWait(OperationContext* opCtx) final { + auto readTs = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx); + invariant(readTs); + _opTimeToBeMajorityCommitted = + repl::OpTime(*readTs + 1, repl::ReplicationCoordinator::get(opCtx)->getTerm()); + } + + void waitUntil(OperationContext* opCtx, Date_t deadline) final { + auto majorityCommittedFuture = WaitForMajorityService::get(opCtx->getServiceContext()) + .waitUntilMajorityForRead(_opTimeToBeMajorityCommitted, + opCtx->getCancellationToken()); + opCtx->runWithDeadline(deadline, opCtx->getTimeoutError(), [&] { + auto status = majorityCommittedFuture.getNoThrow(opCtx); + if (!status.isOK()) { + LOGV2_DEBUG(7455500, + 3, + "Failure waiting for the majority committed event", + "status"_attr = status.toString()); + } + }); + } + +private: + repl::OpTime _opTimeToBeMajorityCommitted; }; /** @@ -61,12 +124,11 @@ bool shouldWaitForInserts(OperationContext* opCtx, PlanYieldPolicy* yieldPolicy); /** - * Gets the CappedInsertNotifier for a capped collection. Returns nullptr if this plan executor - * is not capable of yielding based on a notifier. + * Returns an insert notifier for a capped collection. */ -std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier(OperationContext* opCtx, - const NamespaceString& nss, - PlanYieldPolicy* yieldPolicy); +std::unique_ptr<Notifier> getCappedInsertNotifier(OperationContext* opCtx, + const NamespaceString& nss, + PlanYieldPolicy* yieldPolicy); /** * Called for tailable and awaitData cursors in order to yield locks and waits for inserts to @@ -74,8 +136,9 @@ std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier(OperationContext* * and there may be new results. If the PlanExecutor was killed during a yield, throws an * exception. */ - void waitForInserts(OperationContext* opCtx, PlanYieldPolicy* yieldPolicy, - CappedInsertNotifierData* notifierData); + std::unique_ptr<Notifier>& notifier); } // namespace mongo::insert_listener + +#undef MONGO_LOGV2_DEFAULT_COMPONENT |