summaryrefslogtreecommitdiff
path: root/src/mongo/db/query/plan_insert_listener.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/query/plan_insert_listener.h')
-rw-r--r--src/mongo/db/query/plan_insert_listener.h80
1 files changed, 66 insertions, 14 deletions
diff --git a/src/mongo/db/query/plan_insert_listener.h b/src/mongo/db/query/plan_insert_listener.h
index be4407d3df8..094789301ba 100644
--- a/src/mongo/db/query/plan_insert_listener.h
+++ b/src/mongo/db/query/plan_insert_listener.h
@@ -33,15 +33,69 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/query/canonical_query.h"
#include "mongo/db/query/plan_yield_policy.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
namespace mongo::insert_listener {
-/**
- * A helper wrapper struct around CappedInsertNotifier which also holds the last version returned
- * by the 'notifier'.
- */
-struct CappedInsertNotifierData {
- std::shared_ptr<CappedInsertNotifier> notifier;
- uint64_t lastEOFVersion = ~0;
+
+// An abstract class used to notify on new insert events.
+class Notifier {
+public:
+ virtual ~Notifier(){};
+
+ // Performs the necessary work needed for waiting. Should be called prior calling waitUntil().
+ virtual void prepareForWait(OperationContext* opCtx) = 0;
+
+ // Blocks the caller until an insert event is fired or the deadline is hit.
+ virtual void waitUntil(OperationContext* opCtx, Date_t deadline) = 0;
+};
+
+// Class used to notify listeners of local inserts into the capped collection.
+class LocalCappedInsertNotifier final : public Notifier {
+public:
+ LocalCappedInsertNotifier(std::shared_ptr<CappedInsertNotifier> notifier)
+ : _notifier(notifier) {}
+
+ void prepareForWait(OperationContext* opCtx) final {
+ invariant(_notifier);
+ }
+
+ void waitUntil(OperationContext* opCtx, Date_t deadline) final {
+ auto currentVersion = _notifier->getVersion();
+ _notifier->waitUntil(_lastEOFVersion, deadline);
+ _lastEOFVersion = currentVersion;
+ }
+
+private:
+ std::shared_ptr<CappedInsertNotifier> _notifier;
+ uint64_t _lastEOFVersion;
+};
+
+// Class used to notify listeners on majority committed point advancement events.
+class MajorityCommittedPointNotifier final : public Notifier {
+public:
+ MajorityCommittedPointNotifier(repl::OpTime opTime = repl::OpTime())
+ : _opTimeToBeMajorityCommitted(opTime) {}
+
+ // Computes the OpTime to wait on by incrementing the current read timestamp.
+ void prepareForWait(OperationContext* opCtx) final {
+ auto readTs = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx);
+ invariant(readTs);
+ _opTimeToBeMajorityCommitted =
+ repl::OpTime(*readTs + 1, repl::ReplicationCoordinator::get(opCtx)->getTerm());
+ }
+
+ void waitUntil(OperationContext* opCtx, Date_t deadline) final {
+ auto majorityCommittedFuture =
+ WaitForMajorityService::get(opCtx->getServiceContext())
+ .waitUntilMajority(_opTimeToBeMajorityCommitted, opCtx->getCancellationToken());
+ opCtx->runWithDeadline(deadline, opCtx->getTimeoutError(), [&] {
+ (void)majorityCommittedFuture.getNoThrow(opCtx);
+ });
+ }
+
+private:
+ repl::OpTime _opTimeToBeMajorityCommitted;
};
/**
@@ -61,12 +115,11 @@ bool shouldWaitForInserts(OperationContext* opCtx,
PlanYieldPolicy* yieldPolicy);
/**
- * Gets the CappedInsertNotifier for a capped collection. Returns nullptr if this plan executor
- * is not capable of yielding based on a notifier.
+ * Returns a CappedInsertNotifier for a capped collection.
*/
-std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier(OperationContext* opCtx,
- const NamespaceString& nss,
- PlanYieldPolicy* yieldPolicy);
+std::unique_ptr<Notifier> getCappedInsertNotifier(OperationContext* opCtx,
+ const NamespaceString& nss,
+ PlanYieldPolicy* yieldPolicy);
/**
* Called for tailable and awaitData cursors in order to yield locks and waits for inserts to
@@ -74,8 +127,7 @@ std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier(OperationContext*
* and there may be new results. If the PlanExecutor was killed during a yield, throws an
* exception.
*/
-
void waitForInserts(OperationContext* opCtx,
PlanYieldPolicy* yieldPolicy,
- CappedInsertNotifierData* notifierData);
+ std::unique_ptr<Notifier>& notifier);
} // namespace mongo::insert_listener