summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/range_deleter_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/range_deleter_service.cpp')
-rw-r--r--src/mongo/db/s/range_deleter_service.cpp154
1 files changed, 107 insertions, 47 deletions
diff --git a/src/mongo/db/s/range_deleter_service.cpp b/src/mongo/db/s/range_deleter_service.cpp
index acabcc40bea..ca80b24aa3e 100644
--- a/src/mongo/db/s/range_deleter_service.cpp
+++ b/src/mongo/db/s/range_deleter_service.cpp
@@ -86,6 +86,50 @@ RangeDeleterService* RangeDeleterService::get(OperationContext* opCtx) {
return get(opCtx->getServiceContext());
}
+RangeDeleterService::ReadyRangeDeletionsProcessor::ReadyRangeDeletionsProcessor(
+ OperationContext* opCtx)
+ : _thread([this] { _runRangeDeletions(); }) {}
+
+RangeDeleterService::ReadyRangeDeletionsProcessor::~ReadyRangeDeletionsProcessor() {
+ shutdown();
+ invariant(_thread.joinable());
+ _thread.join();
+ invariant(!_threadOpCtxHolder,
+ "Thread operation context is still alive after joining main thread");
+}
+
+void RangeDeleterService::ReadyRangeDeletionsProcessor::shutdown() {
+ stdx::lock_guard<Latch> lock(_mutex);
+ if (_state == kStopped)
+ return;
+
+ _state = kStopped;
+
+ if (_threadOpCtxHolder) {
+ stdx::lock_guard<Client> scopedClientLock(*_threadOpCtxHolder->getClient());
+ _threadOpCtxHolder->markKilled(ErrorCodes::Interrupted);
+ }
+}
+
+bool RangeDeleterService::ReadyRangeDeletionsProcessor::_stopRequested() const {
+ stdx::unique_lock<Latch> lock(_mutex);
+ return _state == kStopped;
+}
+
+void RangeDeleterService::ReadyRangeDeletionsProcessor::emplaceRangeDeletion(
+ const RangeDeletionTask& rdt) {
+ stdx::unique_lock<Latch> lock(_mutex);
+ invariant(_state == kRunning);
+ _queue.push(rdt);
+ _condVar.notify_all();
+}
+
+void RangeDeleterService::ReadyRangeDeletionsProcessor::_completedRangeDeletion() {
+ stdx::unique_lock<Latch> lock(_mutex);
+ dassert(!_queue.empty());
+ _queue.pop();
+}
+
void RangeDeleterService::ReadyRangeDeletionsProcessor::_runRangeDeletions() {
Client::initThread(kRangeDeletionThreadName);
{
@@ -93,21 +137,22 @@ void RangeDeleterService::ReadyRangeDeletionsProcessor::_runRangeDeletions() {
cc().setSystemOperationKillableByStepdown(lk);
}
- auto opCtx = [&] {
- stdx::unique_lock<Latch> lock(_mutex);
+ {
+ stdx::lock_guard<Latch> lock(_mutex);
+ if (_state != kRunning) {
+ return;
+ }
_threadOpCtxHolder = cc().makeOperationContext();
- _condVar.notify_all();
- return (*_threadOpCtxHolder).get();
- }();
+ }
- opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
+ auto opCtx = _threadOpCtxHolder.get();
ON_BLOCK_EXIT([this]() {
- stdx::unique_lock<Latch> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
_threadOpCtxHolder.reset();
});
- while (opCtx->checkForInterruptNoAssert().isOK()) {
+ while (!_stopRequested()) {
{
stdx::unique_lock<Latch> lock(_mutex);
try {
@@ -223,7 +268,7 @@ void RangeDeleterService::ReadyRangeDeletionsProcessor::_runRangeDeletions() {
// Release the thread only in case the operation context has been interrupted, as
// interruption only happens on shutdown/stepdown (this is fine because range
// deletions will be resumed on the next step up)
- if (!opCtx->checkForInterruptNoAssert().isOK()) {
+ if (_stopRequested()) {
break;
}
@@ -237,6 +282,17 @@ void RangeDeleterService::ReadyRangeDeletionsProcessor::_runRangeDeletions() {
}
}
+void RangeDeleterService::onStartup(OperationContext* opCtx) {
+ if (disableResumableRangeDeleter.load() ||
+ !feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) {
+ return;
+ }
+
+ auto opObserverRegistry =
+ checked_cast<OpObserverRegistry*>(opCtx->getServiceContext()->getOpObserver());
+ opObserverRegistry->addObserver(std::make_unique<RangeDeleterServiceOpObserver>());
+}
+
void RangeDeleterService::onStepUpComplete(OperationContext* opCtx, long long term) {
if (!feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) {
return;
@@ -249,22 +305,14 @@ void RangeDeleterService::onStepUpComplete(OperationContext* opCtx, long long te
return;
}
+ // Wait until all tasks and thread from previous term drain
+ _joinAndResetState();
+
auto lock = _acquireMutexUnconditionally();
dassert(_state == kDown, "Service expected to be down before stepping up");
_state = kInitializing;
- if (_executor) {
- // Join previously shutted down executor before reinstantiating it
- _executor->join();
- _executor.reset();
- } else {
- // Initializing the op observer, only executed once at the first step-up
- auto opObserverRegistry =
- checked_cast<OpObserverRegistry*>(opCtx->getServiceContext()->getOpObserver());
- opObserverRegistry->addObserver(std::make_unique<RangeDeleterServiceOpObserver>());
- }
-
const std::string kExecName("RangeDeleterServiceExecutor");
auto net = executor::makeNetworkInterface(kExecName);
auto pool = std::make_unique<executor::NetworkInterfaceThreadPool>(net.get());
@@ -303,9 +351,14 @@ void RangeDeleterService::_recoverRangeDeletionsOnStepUp(OperationContext* opCtx
LOGV2(6834800, "Resubmitting range deletion tasks");
- ScopedRangeDeleterLock rangeDeleterLock(opCtx);
- DBDirectClient client(opCtx);
+ // The Scoped lock is needed to serialize with concurrent range deletions
+ ScopedRangeDeleterLock rangeDeleterLock(opCtx, MODE_S);
+ // The collection lock is needed to serialize with donors trying to
+ // schedule local range deletions by updating the 'pending' field
+ AutoGetCollection rangeDeletionLock(
+ opCtx, NamespaceString::kRangeDeletionNamespace, MODE_S);
+ DBDirectClient client(opCtx);
int nRescheduledTasks = 0;
// (1) register range deletion tasks marked as "processing"
@@ -370,47 +423,55 @@ void RangeDeleterService::_recoverRangeDeletionsOnStepUp(OperationContext* opCtx
.semi();
}
-void RangeDeleterService::_stopService(bool joinExecutor) {
- if (!feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) {
- return;
- }
+void RangeDeleterService::_joinAndResetState() {
+ invariant(_state == kDown);
+ // Join the thread spawned on step-up to resume range deletions
+ _stepUpCompletedFuture.getNoThrow().ignore();
- {
- auto lock = _acquireMutexUnconditionally();
- _state = kDown;
- if (_initOpCtxHolder) {
- stdx::lock_guard<Client> lk(*_initOpCtxHolder->getClient());
- _initOpCtxHolder->markKilled(ErrorCodes::Interrupted);
- }
+ // Join and destruct the executor
+ if (_executor) {
+ _executor->join();
+ _executor.reset();
}
- // Join the thread spawned on step-up to resume range deletions
- _stepUpCompletedFuture.getNoThrow().ignore();
+ // Join and destruct the processor
+ _readyRangeDeletionsProcessorPtr.reset();
+
+ // Clear range deletions potentially created during recovery
+ _rangeDeletionTasks.clear();
+}
+void RangeDeleterService::_stopService() {
auto lock = _acquireMutexUnconditionally();
+ if (_state == kDown)
+ return;
+
+ _state = kDown;
+ if (_initOpCtxHolder) {
+ stdx::lock_guard<Client> lk(*_initOpCtxHolder->getClient());
+ _initOpCtxHolder->markKilled(ErrorCodes::Interrupted);
+ }
- // It may happen for the `onStepDown` hook to be invoked on a SECONDARY node transitioning
- // to ROLLBACK, hence the executor may have never been initialized
if (_executor) {
_executor->shutdown();
- if (joinExecutor) {
- _executor->join();
- }
}
- // Destroy the range deletion processor in order to stop range deletions
- _readyRangeDeletionsProcessorPtr.reset();
+ // Shutdown the range deletion processor to interrupt range deletions
+ if (_readyRangeDeletionsProcessorPtr) {
+ _readyRangeDeletionsProcessorPtr->shutdown();
+ }
// Clear range deletion tasks map in order to notify potential waiters on completion futures
_rangeDeletionTasks.clear();
}
void RangeDeleterService::onStepDown() {
- _stopService(false /* joinExecutor */);
+ _stopService();
}
void RangeDeleterService::onShutdown() {
- _stopService(true /* joinExecutor */);
+ _stopService();
+ _joinAndResetState();
}
BSONObj RangeDeleterService::dumpState() {
@@ -472,10 +533,9 @@ SharedSemiFuture<void> RangeDeleterService::registerTask(
.then([this, rdt = rdt]() {
// Step 3: schedule the actual range deletion task
auto lock = _acquireMutexUnconditionally();
- invariant(
- _readyRangeDeletionsProcessorPtr || _state == kDown,
- "The range deletions processor must be instantiated if the state != kDown");
if (_state != kDown) {
+ invariant(_readyRangeDeletionsProcessorPtr,
+ "The range deletions processor is not initialized");
_readyRangeDeletionsProcessorPtr->emplaceRangeDeletion(rdt);
}
});