summaryrefslogtreecommitdiff
path: root/src/mongo/db/query/plan_insert_listener.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/query/plan_insert_listener.cpp')
-rw-r--r--src/mongo/db/query/plan_insert_listener.cpp40
1 files changed, 22 insertions, 18 deletions
diff --git a/src/mongo/db/query/plan_insert_listener.cpp b/src/mongo/db/query/plan_insert_listener.cpp
index d6ee899dce5..df947c2e725 100644
--- a/src/mongo/db/query/plan_insert_listener.cpp
+++ b/src/mongo/db/query/plan_insert_listener.cpp
@@ -31,6 +31,8 @@
#include "mongo/db/query/plan_insert_listener.h"
+#include <memory>
+
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/curop.h"
@@ -76,28 +78,31 @@ bool shouldWaitForInserts(OperationContext* opCtx,
return false;
}
-std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier(OperationContext* opCtx,
- const NamespaceString& nss,
- PlanYieldPolicy* yieldPolicy) {
+std::unique_ptr<Notifier> getCappedInsertNotifier(OperationContext* opCtx,
+ const NamespaceString& nss,
+ PlanYieldPolicy* yieldPolicy) {
// We don't expect to need a capped insert notifier for non-yielding plans.
invariant(yieldPolicy->canReleaseLocksDuringExecution());
- // We can only wait if we have a collection; otherwise we should retry immediately when
- // we hit EOF.
+ // In case of the read concern majority, return a majority committed point notifier, otherwise,
+ // a notifier associated with that capped collection
//
- // Hold reference to the catalog for collection lookup without locks to be safe.
- auto catalog = CollectionCatalog::get(opCtx);
- auto collection = catalog->lookupCollectionByNamespace(opCtx, nss);
- invariant(collection);
-
- return collection->getRecordStore()->getCappedInsertNotifier();
+ // We can only wait on the capped collection insert notifier if the collection is present,
+ // otherwise we should retry immediately when we hit EOF.
+ if (opCtx->recoveryUnit()->getTimestampReadSource() == RecoveryUnit::kMajorityCommitted) {
+ return std::make_unique<MajorityCommittedPointNotifier>();
+ } else {
+ auto collection = CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, nss);
+ invariant(collection);
+
+ return std::make_unique<LocalCappedInsertNotifier>(
+ collection->getRecordStore()->getCappedInsertNotifier());
+ }
}
void waitForInserts(OperationContext* opCtx,
PlanYieldPolicy* yieldPolicy,
- CappedInsertNotifierData* notifierData) {
- invariant(notifierData->notifier);
-
+ std::unique_ptr<Notifier>& notifier) {
// The notifier wait() method will not wait unless the version passed to it matches the
// current version of the notifier. Since the version passed to it is the current version
// of the notifier at the time of the previous EOF, we require two EOFs in a row with no
@@ -107,10 +112,10 @@ void waitForInserts(OperationContext* opCtx,
curOp->pauseTimer();
ON_BLOCK_EXIT([curOp] { curOp->resumeTimer(); });
- uint64_t currentNotifierVersion = notifierData->notifier->getVersion();
- auto yieldResult = yieldPolicy->yieldOrInterrupt(opCtx, [opCtx, notifierData] {
+ notifier->prepareForWait(opCtx);
+ auto yieldResult = yieldPolicy->yieldOrInterrupt(opCtx, [opCtx, &notifier] {
const auto deadline = awaitDataState(opCtx).waitForInsertsDeadline;
- notifierData->notifier->waitUntil(notifierData->lastEOFVersion, deadline);
+ notifier->waitUntil(opCtx, deadline);
if (MONGO_unlikely(planExecutorHangWhileYieldedInWaitForInserts.shouldFail())) {
LOGV2(4452903,
"PlanExecutor - planExecutorHangWhileYieldedInWaitForInserts fail point enabled. "
@@ -118,7 +123,6 @@ void waitForInserts(OperationContext* opCtx,
planExecutorHangWhileYieldedInWaitForInserts.pauseWhileSet();
}
});
- notifierData->lastEOFVersion = currentNotifierVersion;
uassertStatusOK(yieldResult);
}