diff options
Diffstat (limited to 'src/mongo/db/query/plan_insert_listener.h')
-rw-r--r-- | src/mongo/db/query/plan_insert_listener.h | 80 |
1 files changed, 66 insertions, 14 deletions
diff --git a/src/mongo/db/query/plan_insert_listener.h b/src/mongo/db/query/plan_insert_listener.h index be4407d3df8..094789301ba 100644 --- a/src/mongo/db/query/plan_insert_listener.h +++ b/src/mongo/db/query/plan_insert_listener.h @@ -33,15 +33,69 @@ #include "mongo/db/operation_context.h" #include "mongo/db/query/canonical_query.h" #include "mongo/db/query/plan_yield_policy.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/wait_for_majority_service.h" namespace mongo::insert_listener { -/** - * A helper wrapper struct around CappedInsertNotifier which also holds the last version returned - * by the 'notifier'. - */ -struct CappedInsertNotifierData { - std::shared_ptr<CappedInsertNotifier> notifier; - uint64_t lastEOFVersion = ~0; + +// An abstract class used to notify on new insert events. +class Notifier { +public: + virtual ~Notifier(){}; + + // Performs the necessary work needed for waiting. Should be called prior calling waitUntil(). + virtual void prepareForWait(OperationContext* opCtx) = 0; + + // Blocks the caller until an insert event is fired or the deadline is hit. + virtual void waitUntil(OperationContext* opCtx, Date_t deadline) = 0; +}; + +// Class used to notify listeners of local inserts into the capped collection. +class LocalCappedInsertNotifier final : public Notifier { +public: + LocalCappedInsertNotifier(std::shared_ptr<CappedInsertNotifier> notifier) + : _notifier(notifier) {} + + void prepareForWait(OperationContext* opCtx) final { + invariant(_notifier); + } + + void waitUntil(OperationContext* opCtx, Date_t deadline) final { + auto currentVersion = _notifier->getVersion(); + _notifier->waitUntil(_lastEOFVersion, deadline); + _lastEOFVersion = currentVersion; + } + +private: + std::shared_ptr<CappedInsertNotifier> _notifier; + uint64_t _lastEOFVersion; +}; + +// Class used to notify listeners on majority committed point advancement events. +class MajorityCommittedPointNotifier final : public Notifier { +public: + MajorityCommittedPointNotifier(repl::OpTime opTime = repl::OpTime()) + : _opTimeToBeMajorityCommitted(opTime) {} + + // Computes the OpTime to wait on by incrementing the current read timestamp. + void prepareForWait(OperationContext* opCtx) final { + auto readTs = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx); + invariant(readTs); + _opTimeToBeMajorityCommitted = + repl::OpTime(*readTs + 1, repl::ReplicationCoordinator::get(opCtx)->getTerm()); + } + + void waitUntil(OperationContext* opCtx, Date_t deadline) final { + auto majorityCommittedFuture = + WaitForMajorityService::get(opCtx->getServiceContext()) + .waitUntilMajority(_opTimeToBeMajorityCommitted, opCtx->getCancellationToken()); + opCtx->runWithDeadline(deadline, opCtx->getTimeoutError(), [&] { + (void)majorityCommittedFuture.getNoThrow(opCtx); + }); + } + +private: + repl::OpTime _opTimeToBeMajorityCommitted; }; /** @@ -61,12 +115,11 @@ bool shouldWaitForInserts(OperationContext* opCtx, PlanYieldPolicy* yieldPolicy); /** - * Gets the CappedInsertNotifier for a capped collection. Returns nullptr if this plan executor - * is not capable of yielding based on a notifier. + * Returns a CappedInsertNotifier for a capped collection. */ -std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier(OperationContext* opCtx, - const NamespaceString& nss, - PlanYieldPolicy* yieldPolicy); +std::unique_ptr<Notifier> getCappedInsertNotifier(OperationContext* opCtx, + const NamespaceString& nss, + PlanYieldPolicy* yieldPolicy); /** * Called for tailable and awaitData cursors in order to yield locks and waits for inserts to @@ -74,8 +127,7 @@ std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier(OperationContext* * and there may be new results. If the PlanExecutor was killed during a yield, throws an * exception. */ - void waitForInserts(OperationContext* opCtx, PlanYieldPolicy* yieldPolicy, - CappedInsertNotifierData* notifierData); + std::unique_ptr<Notifier>& notifier); } // namespace mongo::insert_listener |