summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/range_deleter_service.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/range_deleter_service.h')
-rw-r--r--src/mongo/db/s/range_deleter_service.h67
1 files changed, 27 insertions, 40 deletions
diff --git a/src/mongo/db/s/range_deleter_service.h b/src/mongo/db/s/range_deleter_service.h
index 68cca97454c..c816c8b9db2 100644
--- a/src/mongo/db/s/range_deleter_service.h
+++ b/src/mongo/db/s/range_deleter_service.h
@@ -110,58 +110,37 @@ private:
*/
class ReadyRangeDeletionsProcessor {
public:
- ReadyRangeDeletionsProcessor(OperationContext* opCtx) {
- _thread = stdx::thread([this] { _runRangeDeletions(); });
- stdx::unique_lock<Latch> lock(_mutex);
- opCtx->waitForConditionOrInterrupt(
- _condVar, lock, [&] { return _threadOpCtxHolder.is_initialized(); });
- }
-
- ~ReadyRangeDeletionsProcessor() {
- {
- stdx::unique_lock<Latch> lock(_mutex);
- // The `_threadOpCtxHolder` may have been already reset/interrupted in case the
- // thread got interrupted due to stepdown
- if (_threadOpCtxHolder) {
- stdx::lock_guard<Client> scopedClientLock(*(*_threadOpCtxHolder)->getClient());
- if ((*_threadOpCtxHolder)->checkForInterruptNoAssert().isOK()) {
- (*_threadOpCtxHolder)->markKilled(ErrorCodes::Interrupted);
- }
- }
- _condVar.notify_all();
- }
+ ReadyRangeDeletionsProcessor(OperationContext* opCtx);
+ ~ReadyRangeDeletionsProcessor();
- if (_thread.joinable()) {
- _thread.join();
- }
- }
+ /*
+ * Interrupt ongoing range deletions
+ */
+ void shutdown();
/*
* Schedule a range deletion at the end of the queue
*/
- void emplaceRangeDeletion(const RangeDeletionTask& rdt) {
- stdx::unique_lock<Latch> lock(_mutex);
- _queue.push(rdt);
- _condVar.notify_all();
- }
+ void emplaceRangeDeletion(const RangeDeletionTask& rdt);
private:
/*
+ * Return true if this processor have been shutted down
+ */
+ bool _stopRequested() const;
+
+ /*
* Remove a range deletion from the head of the queue. Supposed to be called only once a
* range deletion successfully finishes.
*/
- void _completedRangeDeletion() {
- stdx::unique_lock<Latch> lock(_mutex);
- dassert(!_queue.empty());
- _queue.pop();
- }
+ void _completedRangeDeletion();
/*
* Code executed by the internal thread
*/
void _runRangeDeletions();
- Mutex _mutex = MONGO_MAKE_LATCH("ReadyRangeDeletionsProcessor");
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("ReadyRangeDeletionsProcessor");
/*
* Condition variable notified when:
@@ -175,10 +154,13 @@ private:
std::queue<RangeDeletionTask> _queue;
/* Pointer to the (one and only) operation context used by the thread */
- boost::optional<ServiceContext::UniqueOperationContext> _threadOpCtxHolder;
+ ServiceContext::UniqueOperationContext _threadOpCtxHolder;
/* Thread consuming the range deletions queue */
stdx::thread _thread;
+
+ enum State { kRunning, kStopped };
+ State _state{kRunning};
};
// Keeping track of per-collection registered range deletion tasks
@@ -253,6 +235,7 @@ public:
const ChunkRange& range);
/* ReplicaSetAwareServiceShardSvr implemented methods */
+ void onStartup(OperationContext* opCtx) override;
void onStepUpComplete(OperationContext* opCtx, long long term) override;
void onStepDown() override;
void onShutdown() override;
@@ -276,17 +259,21 @@ public:
std::unique_ptr<ReadyRangeDeletionsProcessor> _readyRangeDeletionsProcessorPtr;
private:
+ /* Join all threads and executor and reset the in memory state of the service
+ * Used for onStartUpBegin and on onShutdown
+ */
+ void _joinAndResetState();
+
/* Asynchronously register range deletions on the service. To be called on on step-up */
void _recoverRangeDeletionsOnStepUp(OperationContext* opCtx);
- /* Called by shutdown/stepdown hooks to reset the service */
- void _stopService(bool joinExecutor);
+ /* Called by shutdown/stepdown hooks to interrupt the service */
+ void _stopService();
/* ReplicaSetAwareServiceShardSvr "empty implemented" methods */
- void onStartup(OperationContext* opCtx) override final{};
void onInitialDataAvailable(OperationContext* opCtx,
bool isMajorityDataAvailable) override final {}
- void onStepUpBegin(OperationContext* opCtx, long long term) override final {}
+ void onStepUpBegin(OperationContext* opCtx, long long term) override final{};
void onBecomeArbiter() override final {}
};