From de6eb37f324ca66b48bf0136994a932a62a2a2a1 Mon Sep 17 00:00:00 2001 From: Tommaso Tocci Date: Thu, 3 Nov 2022 10:27:08 +0000 Subject: SERVER-70964 Do not wait for range deletion thread on stepdown --- .../db/s/collection_sharding_runtime_test.cpp | 6 +- src/mongo/db/s/range_deleter_service.cpp | 145 ++++++++++++++------- src/mongo/db/s/range_deleter_service.h | 67 ++++------ src/mongo/db/s/range_deleter_service_test.cpp | 17 +-- 4 files changed, 132 insertions(+), 103 deletions(-) diff --git a/src/mongo/db/s/collection_sharding_runtime_test.cpp b/src/mongo/db/s/collection_sharding_runtime_test.cpp index 9bc89c2fb36..334225663e3 100644 --- a/src/mongo/db/s/collection_sharding_runtime_test.cpp +++ b/src/mongo/db/s/collection_sharding_runtime_test.cpp @@ -377,8 +377,10 @@ public: AutoGetCollection autoColl(operationContext(), kTestNss, MODE_IX); _uuid = autoColl.getCollection()->uuid(); - RangeDeleterService::get(operationContext())->onStepUpComplete(operationContext(), 0L); - RangeDeleterService::get(operationContext())->_waitForRangeDeleterServiceUp_FOR_TESTING(); + auto opCtx = operationContext(); + RangeDeleterService::get(opCtx)->onStartup(opCtx); + RangeDeleterService::get(opCtx)->onStepUpComplete(opCtx, 0L); + RangeDeleterService::get(opCtx)->_waitForRangeDeleterServiceUp_FOR_TESTING(); } void tearDown() override { diff --git a/src/mongo/db/s/range_deleter_service.cpp b/src/mongo/db/s/range_deleter_service.cpp index 3a6ff006cfb..ca80b24aa3e 100644 --- a/src/mongo/db/s/range_deleter_service.cpp +++ b/src/mongo/db/s/range_deleter_service.cpp @@ -86,6 +86,50 @@ RangeDeleterService* RangeDeleterService::get(OperationContext* opCtx) { return get(opCtx->getServiceContext()); } +RangeDeleterService::ReadyRangeDeletionsProcessor::ReadyRangeDeletionsProcessor( + OperationContext* opCtx) + : _thread([this] { _runRangeDeletions(); }) {} + +RangeDeleterService::ReadyRangeDeletionsProcessor::~ReadyRangeDeletionsProcessor() { + shutdown(); + invariant(_thread.joinable()); + _thread.join(); + invariant(!_threadOpCtxHolder, + "Thread operation context is still alive after joining main thread"); +} + +void RangeDeleterService::ReadyRangeDeletionsProcessor::shutdown() { + stdx::lock_guard lock(_mutex); + if (_state == kStopped) + return; + + _state = kStopped; + + if (_threadOpCtxHolder) { + stdx::lock_guard scopedClientLock(*_threadOpCtxHolder->getClient()); + _threadOpCtxHolder->markKilled(ErrorCodes::Interrupted); + } +} + +bool RangeDeleterService::ReadyRangeDeletionsProcessor::_stopRequested() const { + stdx::unique_lock lock(_mutex); + return _state == kStopped; +} + +void RangeDeleterService::ReadyRangeDeletionsProcessor::emplaceRangeDeletion( + const RangeDeletionTask& rdt) { + stdx::unique_lock lock(_mutex); + invariant(_state == kRunning); + _queue.push(rdt); + _condVar.notify_all(); +} + +void RangeDeleterService::ReadyRangeDeletionsProcessor::_completedRangeDeletion() { + stdx::unique_lock lock(_mutex); + dassert(!_queue.empty()); + _queue.pop(); +} + void RangeDeleterService::ReadyRangeDeletionsProcessor::_runRangeDeletions() { Client::initThread(kRangeDeletionThreadName); { @@ -93,21 +137,22 @@ void RangeDeleterService::ReadyRangeDeletionsProcessor::_runRangeDeletions() { cc().setSystemOperationKillableByStepdown(lk); } - auto opCtx = [&] { - stdx::unique_lock lock(_mutex); + { + stdx::lock_guard lock(_mutex); + if (_state != kRunning) { + return; + } _threadOpCtxHolder = cc().makeOperationContext(); - _condVar.notify_all(); - return (*_threadOpCtxHolder).get(); - }(); + } - opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); + auto opCtx = _threadOpCtxHolder.get(); ON_BLOCK_EXIT([this]() { - stdx::unique_lock lock(_mutex); + stdx::lock_guard lock(_mutex); _threadOpCtxHolder.reset(); }); - while (opCtx->checkForInterruptNoAssert().isOK()) { + while (!_stopRequested()) { { stdx::unique_lock lock(_mutex); try { @@ -223,7 +268,7 @@ void RangeDeleterService::ReadyRangeDeletionsProcessor::_runRangeDeletions() { // Release the thread only in case the operation context has been interrupted, as // interruption only happens on shutdown/stepdown (this is fine because range // deletions will be resumed on the next step up) - if (!opCtx->checkForInterruptNoAssert().isOK()) { + if (_stopRequested()) { break; } @@ -237,6 +282,17 @@ void RangeDeleterService::ReadyRangeDeletionsProcessor::_runRangeDeletions() { } } +void RangeDeleterService::onStartup(OperationContext* opCtx) { + if (disableResumableRangeDeleter.load() || + !feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) { + return; + } + + auto opObserverRegistry = + checked_cast(opCtx->getServiceContext()->getOpObserver()); + opObserverRegistry->addObserver(std::make_unique()); +} + void RangeDeleterService::onStepUpComplete(OperationContext* opCtx, long long term) { if (!feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) { return; @@ -249,22 +305,14 @@ void RangeDeleterService::onStepUpComplete(OperationContext* opCtx, long long te return; } + // Wait until all tasks and thread from previous term drain + _joinAndResetState(); + auto lock = _acquireMutexUnconditionally(); dassert(_state == kDown, "Service expected to be down before stepping up"); _state = kInitializing; - if (_executor) { - // Join previously shutted down executor before reinstantiating it - _executor->join(); - _executor.reset(); - } else { - // Initializing the op observer, only executed once at the first step-up - auto opObserverRegistry = - checked_cast(opCtx->getServiceContext()->getOpObserver()); - opObserverRegistry->addObserver(std::make_unique()); - } - const std::string kExecName("RangeDeleterServiceExecutor"); auto net = executor::makeNetworkInterface(kExecName); auto pool = std::make_unique(net.get()); @@ -375,47 +423,55 @@ void RangeDeleterService::_recoverRangeDeletionsOnStepUp(OperationContext* opCtx .semi(); } -void RangeDeleterService::_stopService(bool joinExecutor) { - if (!feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) { - return; - } +void RangeDeleterService::_joinAndResetState() { + invariant(_state == kDown); + // Join the thread spawned on step-up to resume range deletions + _stepUpCompletedFuture.getNoThrow().ignore(); - { - auto lock = _acquireMutexUnconditionally(); - _state = kDown; - if (_initOpCtxHolder) { - stdx::lock_guard lk(*_initOpCtxHolder->getClient()); - _initOpCtxHolder->markKilled(ErrorCodes::Interrupted); - } + // Join and destruct the executor + if (_executor) { + _executor->join(); + _executor.reset(); } - // Join the thread spawned on step-up to resume range deletions - _stepUpCompletedFuture.getNoThrow().ignore(); + // Join and destruct the processor + _readyRangeDeletionsProcessorPtr.reset(); + + // Clear range deletions potentially created during recovery + _rangeDeletionTasks.clear(); +} +void RangeDeleterService::_stopService() { auto lock = _acquireMutexUnconditionally(); + if (_state == kDown) + return; + + _state = kDown; + if (_initOpCtxHolder) { + stdx::lock_guard lk(*_initOpCtxHolder->getClient()); + _initOpCtxHolder->markKilled(ErrorCodes::Interrupted); + } - // It may happen for the `onStepDown` hook to be invoked on a SECONDARY node transitioning - // to ROLLBACK, hence the executor may have never been initialized if (_executor) { _executor->shutdown(); - if (joinExecutor) { - _executor->join(); - } } - // Destroy the range deletion processor in order to stop range deletions - _readyRangeDeletionsProcessorPtr.reset(); + // Shutdown the range deletion processor to interrupt range deletions + if (_readyRangeDeletionsProcessorPtr) { + _readyRangeDeletionsProcessorPtr->shutdown(); + } // Clear range deletion tasks map in order to notify potential waiters on completion futures _rangeDeletionTasks.clear(); } void RangeDeleterService::onStepDown() { - _stopService(false /* joinExecutor */); + _stopService(); } void RangeDeleterService::onShutdown() { - _stopService(true /* joinExecutor */); + _stopService(); + _joinAndResetState(); } BSONObj RangeDeleterService::dumpState() { @@ -477,10 +533,9 @@ SharedSemiFuture RangeDeleterService::registerTask( .then([this, rdt = rdt]() { // Step 3: schedule the actual range deletion task auto lock = _acquireMutexUnconditionally(); - invariant( - _readyRangeDeletionsProcessorPtr || _state == kDown, - "The range deletions processor must be instantiated if the state != kDown"); if (_state != kDown) { + invariant(_readyRangeDeletionsProcessorPtr, + "The range deletions processor is not initialized"); _readyRangeDeletionsProcessorPtr->emplaceRangeDeletion(rdt); } }); diff --git a/src/mongo/db/s/range_deleter_service.h b/src/mongo/db/s/range_deleter_service.h index 68cca97454c..c816c8b9db2 100644 --- a/src/mongo/db/s/range_deleter_service.h +++ b/src/mongo/db/s/range_deleter_service.h @@ -110,58 +110,37 @@ private: */ class ReadyRangeDeletionsProcessor { public: - ReadyRangeDeletionsProcessor(OperationContext* opCtx) { - _thread = stdx::thread([this] { _runRangeDeletions(); }); - stdx::unique_lock lock(_mutex); - opCtx->waitForConditionOrInterrupt( - _condVar, lock, [&] { return _threadOpCtxHolder.is_initialized(); }); - } - - ~ReadyRangeDeletionsProcessor() { - { - stdx::unique_lock lock(_mutex); - // The `_threadOpCtxHolder` may have been already reset/interrupted in case the - // thread got interrupted due to stepdown - if (_threadOpCtxHolder) { - stdx::lock_guard scopedClientLock(*(*_threadOpCtxHolder)->getClient()); - if ((*_threadOpCtxHolder)->checkForInterruptNoAssert().isOK()) { - (*_threadOpCtxHolder)->markKilled(ErrorCodes::Interrupted); - } - } - _condVar.notify_all(); - } + ReadyRangeDeletionsProcessor(OperationContext* opCtx); + ~ReadyRangeDeletionsProcessor(); - if (_thread.joinable()) { - _thread.join(); - } - } + /* + * Interrupt ongoing range deletions + */ + void shutdown(); /* * Schedule a range deletion at the end of the queue */ - void emplaceRangeDeletion(const RangeDeletionTask& rdt) { - stdx::unique_lock lock(_mutex); - _queue.push(rdt); - _condVar.notify_all(); - } + void emplaceRangeDeletion(const RangeDeletionTask& rdt); private: + /* + * Return true if this processor have been shutted down + */ + bool _stopRequested() const; + /* * Remove a range deletion from the head of the queue. Supposed to be called only once a * range deletion successfully finishes. */ - void _completedRangeDeletion() { - stdx::unique_lock lock(_mutex); - dassert(!_queue.empty()); - _queue.pop(); - } + void _completedRangeDeletion(); /* * Code executed by the internal thread */ void _runRangeDeletions(); - Mutex _mutex = MONGO_MAKE_LATCH("ReadyRangeDeletionsProcessor"); + mutable Mutex _mutex = MONGO_MAKE_LATCH("ReadyRangeDeletionsProcessor"); /* * Condition variable notified when: @@ -175,10 +154,13 @@ private: std::queue _queue; /* Pointer to the (one and only) operation context used by the thread */ - boost::optional _threadOpCtxHolder; + ServiceContext::UniqueOperationContext _threadOpCtxHolder; /* Thread consuming the range deletions queue */ stdx::thread _thread; + + enum State { kRunning, kStopped }; + State _state{kRunning}; }; // Keeping track of per-collection registered range deletion tasks @@ -253,6 +235,7 @@ public: const ChunkRange& range); /* ReplicaSetAwareServiceShardSvr implemented methods */ + void onStartup(OperationContext* opCtx) override; void onStepUpComplete(OperationContext* opCtx, long long term) override; void onStepDown() override; void onShutdown() override; @@ -276,17 +259,21 @@ public: std::unique_ptr _readyRangeDeletionsProcessorPtr; private: + /* Join all threads and executor and reset the in memory state of the service + * Used for onStartUpBegin and on onShutdown + */ + void _joinAndResetState(); + /* Asynchronously register range deletions on the service. To be called on on step-up */ void _recoverRangeDeletionsOnStepUp(OperationContext* opCtx); - /* Called by shutdown/stepdown hooks to reset the service */ - void _stopService(bool joinExecutor); + /* Called by shutdown/stepdown hooks to interrupt the service */ + void _stopService(); /* ReplicaSetAwareServiceShardSvr "empty implemented" methods */ - void onStartup(OperationContext* opCtx) override final{}; void onInitialDataAvailable(OperationContext* opCtx, bool isMajorityDataAvailable) override final {} - void onStepUpBegin(OperationContext* opCtx, long long term) override final {} + void onStepUpBegin(OperationContext* opCtx, long long term) override final{}; void onBecomeArbiter() override final {} }; diff --git a/src/mongo/db/s/range_deleter_service_test.cpp b/src/mongo/db/s/range_deleter_service_test.cpp index d2406120483..0807867da2d 100644 --- a/src/mongo/db/s/range_deleter_service_test.cpp +++ b/src/mongo/db/s/range_deleter_service_test.cpp @@ -45,6 +45,7 @@ void RangeDeleterServiceTest::setUp() { ShardServerTestFixture::setUp(); WaitForMajorityService::get(getServiceContext()).startup(getServiceContext()); opCtx = operationContext(); + RangeDeleterService::get(opCtx)->onStartup(opCtx); RangeDeleterService::get(opCtx)->onStepUpComplete(opCtx, 0L); RangeDeleterService::get(opCtx)->_waitForRangeDeleterServiceUp_FOR_TESTING(); @@ -275,12 +276,6 @@ TEST_F(RangeDeleterServiceTest, ScheduledTaskInvalidatedOnStepDown) { // Manually trigger disabling of the service rds->onStepDown(); - ON_BLOCK_EXIT([&] { - // Re-enable the service for clean teardown - rds->onStepUpComplete(opCtx, 0L); - rds->_waitForRangeDeleterServiceUp_FOR_TESTING(); - }); - try { completionFuture.get(opCtx); } catch (const ExceptionForCat&) { @@ -293,12 +288,6 @@ TEST_F(RangeDeleterServiceTest, NoActionPossibleIfServiceIsDown) { // Manually trigger disabling of the service rds->onStepDown(); - ON_BLOCK_EXIT([&] { - // Re-enable the service for clean teardown - rds->onStepUpComplete(opCtx, 0L); - rds->_waitForRangeDeleterServiceUp_FOR_TESTING(); - }); - auto taskWithOngoingQueries = createRangeDeletionTaskWithOngoingQueries( uuidCollA, BSON(kShardKey << 0), BSON(kShardKey << 10), CleanWhenEnum::kDelayed); @@ -886,10 +875,6 @@ TEST_F(RangeDeleterServiceTest, WaitForOngoingQueriesInvalidatedOnStepDown) { // Manually trigger disabling of the service rds->onStepDown(); - ON_BLOCK_EXIT([&] { - rds->onStepUpComplete(opCtx, 0L); // Re-enable the service - }); - try { completionFuture.get(opCtx); } catch (const ExceptionForCat&) { -- cgit v1.2.1