diff options
Diffstat (limited to 'src/mongo/db/query/plan_insert_listener.cpp')
-rw-r--r-- | src/mongo/db/query/plan_insert_listener.cpp | 40 |
1 files changed, 22 insertions, 18 deletions
diff --git a/src/mongo/db/query/plan_insert_listener.cpp b/src/mongo/db/query/plan_insert_listener.cpp index d6ee899dce5..df947c2e725 100644 --- a/src/mongo/db/query/plan_insert_listener.cpp +++ b/src/mongo/db/query/plan_insert_listener.cpp @@ -31,6 +31,8 @@ #include "mongo/db/query/plan_insert_listener.h" +#include <memory> + #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/curop.h" @@ -76,28 +78,31 @@ bool shouldWaitForInserts(OperationContext* opCtx, return false; } -std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier(OperationContext* opCtx, - const NamespaceString& nss, - PlanYieldPolicy* yieldPolicy) { +std::unique_ptr<Notifier> getCappedInsertNotifier(OperationContext* opCtx, + const NamespaceString& nss, + PlanYieldPolicy* yieldPolicy) { // We don't expect to need a capped insert notifier for non-yielding plans. invariant(yieldPolicy->canReleaseLocksDuringExecution()); - // We can only wait if we have a collection; otherwise we should retry immediately when - // we hit EOF. + // In case of the read concern majority, return a majority committed point notifier, otherwise, + // a notifier associated with that capped collection // - // Hold reference to the catalog for collection lookup without locks to be safe. - auto catalog = CollectionCatalog::get(opCtx); - auto collection = catalog->lookupCollectionByNamespace(opCtx, nss); - invariant(collection); - - return collection->getRecordStore()->getCappedInsertNotifier(); + // We can only wait on the capped collection insert notifier if the collection is present, + // otherwise we should retry immediately when we hit EOF. + if (opCtx->recoveryUnit()->getTimestampReadSource() == RecoveryUnit::kMajorityCommitted) { + return std::make_unique<MajorityCommittedPointNotifier>(); + } else { + auto collection = CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, nss); + invariant(collection); + + return std::make_unique<LocalCappedInsertNotifier>( + collection->getRecordStore()->getCappedInsertNotifier()); + } } void waitForInserts(OperationContext* opCtx, PlanYieldPolicy* yieldPolicy, - CappedInsertNotifierData* notifierData) { - invariant(notifierData->notifier); - + std::unique_ptr<Notifier>& notifier) { // The notifier wait() method will not wait unless the version passed to it matches the // current version of the notifier. Since the version passed to it is the current version // of the notifier at the time of the previous EOF, we require two EOFs in a row with no @@ -107,10 +112,10 @@ void waitForInserts(OperationContext* opCtx, curOp->pauseTimer(); ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); }); - uint64_t currentNotifierVersion = notifierData->notifier->getVersion(); - auto yieldResult = yieldPolicy->yieldOrInterrupt(opCtx, [opCtx, notifierData] { + notifier->prepareForWait(opCtx); + auto yieldResult = yieldPolicy->yieldOrInterrupt(opCtx, [opCtx, ¬ifier] { const auto deadline = awaitDataState(opCtx).waitForInsertsDeadline; - notifierData->notifier->waitUntil(notifierData->lastEOFVersion, deadline); + notifier->waitUntil(opCtx, deadline); if (MONGO_unlikely(planExecutorHangWhileYieldedInWaitForInserts.shouldFail())) { LOGV2(4452903, "PlanExecutor - planExecutorHangWhileYieldedInWaitForInserts fail point enabled. " @@ -118,7 +123,6 @@ void waitForInserts(OperationContext* opCtx, planExecutorHangWhileYieldedInWaitForInserts.pauseWhileSet(); } }); - notifierData->lastEOFVersion = currentNotifierVersion; uassertStatusOK(yieldResult); } |