diff options
author | Jennifer Peshansky <jennifer.peshansky@mongodb.com> | 2022-11-03 16:13:20 +0000 |
---|---|---|
committer | Jennifer Peshansky <jennifer.peshansky@mongodb.com> | 2022-11-03 16:13:20 +0000 |
commit | e74d2910bbe76790ad131d53fee277829cd95982 (patch) | |
tree | cabe148764529c9623652374fbc36323a550cd44 /src/mongo/db/s/range_deleter_service.h | |
parent | 280145e9940729480bb8a35453d4056afac87641 (diff) | |
parent | ba467f46cc1bc49965e1d72b541eff0cf1d7b22e (diff) | |
download | mongo-e74d2910bbe76790ad131d53fee277829cd95982.tar.gz |
Merge branch 'master' into jenniferpeshansky/SERVER-70854jenniferpeshansky/SERVER-70854
Diffstat (limited to 'src/mongo/db/s/range_deleter_service.h')
-rw-r--r-- | src/mongo/db/s/range_deleter_service.h | 67 |
1 files changed, 27 insertions, 40 deletions
diff --git a/src/mongo/db/s/range_deleter_service.h b/src/mongo/db/s/range_deleter_service.h index 68cca97454c..c816c8b9db2 100644 --- a/src/mongo/db/s/range_deleter_service.h +++ b/src/mongo/db/s/range_deleter_service.h @@ -110,58 +110,37 @@ private: */ class ReadyRangeDeletionsProcessor { public: - ReadyRangeDeletionsProcessor(OperationContext* opCtx) { - _thread = stdx::thread([this] { _runRangeDeletions(); }); - stdx::unique_lock<Latch> lock(_mutex); - opCtx->waitForConditionOrInterrupt( - _condVar, lock, [&] { return _threadOpCtxHolder.is_initialized(); }); - } - - ~ReadyRangeDeletionsProcessor() { - { - stdx::unique_lock<Latch> lock(_mutex); - // The `_threadOpCtxHolder` may have been already reset/interrupted in case the - // thread got interrupted due to stepdown - if (_threadOpCtxHolder) { - stdx::lock_guard<Client> scopedClientLock(*(*_threadOpCtxHolder)->getClient()); - if ((*_threadOpCtxHolder)->checkForInterruptNoAssert().isOK()) { - (*_threadOpCtxHolder)->markKilled(ErrorCodes::Interrupted); - } - } - _condVar.notify_all(); - } + ReadyRangeDeletionsProcessor(OperationContext* opCtx); + ~ReadyRangeDeletionsProcessor(); - if (_thread.joinable()) { - _thread.join(); - } - } + /* + * Interrupt ongoing range deletions + */ + void shutdown(); /* * Schedule a range deletion at the end of the queue */ - void emplaceRangeDeletion(const RangeDeletionTask& rdt) { - stdx::unique_lock<Latch> lock(_mutex); - _queue.push(rdt); - _condVar.notify_all(); - } + void emplaceRangeDeletion(const RangeDeletionTask& rdt); private: /* + * Return true if this processor have been shutted down + */ + bool _stopRequested() const; + + /* * Remove a range deletion from the head of the queue. Supposed to be called only once a * range deletion successfully finishes. */ - void _completedRangeDeletion() { - stdx::unique_lock<Latch> lock(_mutex); - dassert(!_queue.empty()); - _queue.pop(); - } + void _completedRangeDeletion(); /* * Code executed by the internal thread */ void _runRangeDeletions(); - Mutex _mutex = MONGO_MAKE_LATCH("ReadyRangeDeletionsProcessor"); + mutable Mutex _mutex = MONGO_MAKE_LATCH("ReadyRangeDeletionsProcessor"); /* * Condition variable notified when: @@ -175,10 +154,13 @@ private: std::queue<RangeDeletionTask> _queue; /* Pointer to the (one and only) operation context used by the thread */ - boost::optional<ServiceContext::UniqueOperationContext> _threadOpCtxHolder; + ServiceContext::UniqueOperationContext _threadOpCtxHolder; /* Thread consuming the range deletions queue */ stdx::thread _thread; + + enum State { kRunning, kStopped }; + State _state{kRunning}; }; // Keeping track of per-collection registered range deletion tasks @@ -253,6 +235,7 @@ public: const ChunkRange& range); /* ReplicaSetAwareServiceShardSvr implemented methods */ + void onStartup(OperationContext* opCtx) override; void onStepUpComplete(OperationContext* opCtx, long long term) override; void onStepDown() override; void onShutdown() override; @@ -276,17 +259,21 @@ public: std::unique_ptr<ReadyRangeDeletionsProcessor> _readyRangeDeletionsProcessorPtr; private: + /* Join all threads and executor and reset the in memory state of the service + * Used for onStartUpBegin and on onShutdown + */ + void _joinAndResetState(); + /* Asynchronously register range deletions on the service. To be called on on step-up */ void _recoverRangeDeletionsOnStepUp(OperationContext* opCtx); - /* Called by shutdown/stepdown hooks to reset the service */ - void _stopService(bool joinExecutor); + /* Called by shutdown/stepdown hooks to interrupt the service */ + void _stopService(); /* ReplicaSetAwareServiceShardSvr "empty implemented" methods */ - void onStartup(OperationContext* opCtx) override final{}; void onInitialDataAvailable(OperationContext* opCtx, bool isMajorityDataAvailable) override final {} - void onStepUpBegin(OperationContext* opCtx, long long term) override final {} + void onStepUpBegin(OperationContext* opCtx, long long term) override final{}; void onBecomeArbiter() override final {} }; |