summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTommaso Tocci <tommaso.tocci@mongodb.com>2022-11-03 10:27:08 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-03 11:00:19 +0000
commitde6eb37f324ca66b48bf0136994a932a62a2a2a1 (patch)
treef6b960a523ce35ac09fc4b2216be055adc2af789
parent8e7978fb75cad95f864255810c655f62a0a9408d (diff)
downloadmongo-de6eb37f324ca66b48bf0136994a932a62a2a2a1.tar.gz
SERVER-70964 Do not wait for range deletion thread on stepdown
-rw-r--r--src/mongo/db/s/collection_sharding_runtime_test.cpp6
-rw-r--r--src/mongo/db/s/range_deleter_service.cpp145
-rw-r--r--src/mongo/db/s/range_deleter_service.h67
-rw-r--r--src/mongo/db/s/range_deleter_service_test.cpp17
4 files changed, 132 insertions, 103 deletions
diff --git a/src/mongo/db/s/collection_sharding_runtime_test.cpp b/src/mongo/db/s/collection_sharding_runtime_test.cpp
index 9bc89c2fb36..334225663e3 100644
--- a/src/mongo/db/s/collection_sharding_runtime_test.cpp
+++ b/src/mongo/db/s/collection_sharding_runtime_test.cpp
@@ -377,8 +377,10 @@ public:
AutoGetCollection autoColl(operationContext(), kTestNss, MODE_IX);
_uuid = autoColl.getCollection()->uuid();
- RangeDeleterService::get(operationContext())->onStepUpComplete(operationContext(), 0L);
- RangeDeleterService::get(operationContext())->_waitForRangeDeleterServiceUp_FOR_TESTING();
+ auto opCtx = operationContext();
+ RangeDeleterService::get(opCtx)->onStartup(opCtx);
+ RangeDeleterService::get(opCtx)->onStepUpComplete(opCtx, 0L);
+ RangeDeleterService::get(opCtx)->_waitForRangeDeleterServiceUp_FOR_TESTING();
}
void tearDown() override {
diff --git a/src/mongo/db/s/range_deleter_service.cpp b/src/mongo/db/s/range_deleter_service.cpp
index 3a6ff006cfb..ca80b24aa3e 100644
--- a/src/mongo/db/s/range_deleter_service.cpp
+++ b/src/mongo/db/s/range_deleter_service.cpp
@@ -86,6 +86,50 @@ RangeDeleterService* RangeDeleterService::get(OperationContext* opCtx) {
return get(opCtx->getServiceContext());
}
+RangeDeleterService::ReadyRangeDeletionsProcessor::ReadyRangeDeletionsProcessor(
+ OperationContext* opCtx)
+ : _thread([this] { _runRangeDeletions(); }) {}
+
+RangeDeleterService::ReadyRangeDeletionsProcessor::~ReadyRangeDeletionsProcessor() {
+ shutdown();
+ invariant(_thread.joinable());
+ _thread.join();
+ invariant(!_threadOpCtxHolder,
+ "Thread operation context is still alive after joining main thread");
+}
+
+void RangeDeleterService::ReadyRangeDeletionsProcessor::shutdown() {
+ stdx::lock_guard<Latch> lock(_mutex);
+ if (_state == kStopped)
+ return;
+
+ _state = kStopped;
+
+ if (_threadOpCtxHolder) {
+ stdx::lock_guard<Client> scopedClientLock(*_threadOpCtxHolder->getClient());
+ _threadOpCtxHolder->markKilled(ErrorCodes::Interrupted);
+ }
+}
+
+bool RangeDeleterService::ReadyRangeDeletionsProcessor::_stopRequested() const {
+ stdx::unique_lock<Latch> lock(_mutex);
+ return _state == kStopped;
+}
+
+void RangeDeleterService::ReadyRangeDeletionsProcessor::emplaceRangeDeletion(
+ const RangeDeletionTask& rdt) {
+ stdx::unique_lock<Latch> lock(_mutex);
+ invariant(_state == kRunning);
+ _queue.push(rdt);
+ _condVar.notify_all();
+}
+
+void RangeDeleterService::ReadyRangeDeletionsProcessor::_completedRangeDeletion() {
+ stdx::unique_lock<Latch> lock(_mutex);
+ dassert(!_queue.empty());
+ _queue.pop();
+}
+
void RangeDeleterService::ReadyRangeDeletionsProcessor::_runRangeDeletions() {
Client::initThread(kRangeDeletionThreadName);
{
@@ -93,21 +137,22 @@ void RangeDeleterService::ReadyRangeDeletionsProcessor::_runRangeDeletions() {
cc().setSystemOperationKillableByStepdown(lk);
}
- auto opCtx = [&] {
- stdx::unique_lock<Latch> lock(_mutex);
+ {
+ stdx::lock_guard<Latch> lock(_mutex);
+ if (_state != kRunning) {
+ return;
+ }
_threadOpCtxHolder = cc().makeOperationContext();
- _condVar.notify_all();
- return (*_threadOpCtxHolder).get();
- }();
+ }
- opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
+ auto opCtx = _threadOpCtxHolder.get();
ON_BLOCK_EXIT([this]() {
- stdx::unique_lock<Latch> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
_threadOpCtxHolder.reset();
});
- while (opCtx->checkForInterruptNoAssert().isOK()) {
+ while (!_stopRequested()) {
{
stdx::unique_lock<Latch> lock(_mutex);
try {
@@ -223,7 +268,7 @@ void RangeDeleterService::ReadyRangeDeletionsProcessor::_runRangeDeletions() {
// Release the thread only in case the operation context has been interrupted, as
// interruption only happens on shutdown/stepdown (this is fine because range
// deletions will be resumed on the next step up)
- if (!opCtx->checkForInterruptNoAssert().isOK()) {
+ if (_stopRequested()) {
break;
}
@@ -237,6 +282,17 @@ void RangeDeleterService::ReadyRangeDeletionsProcessor::_runRangeDeletions() {
}
}
+void RangeDeleterService::onStartup(OperationContext* opCtx) {
+ if (disableResumableRangeDeleter.load() ||
+ !feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) {
+ return;
+ }
+
+ auto opObserverRegistry =
+ checked_cast<OpObserverRegistry*>(opCtx->getServiceContext()->getOpObserver());
+ opObserverRegistry->addObserver(std::make_unique<RangeDeleterServiceOpObserver>());
+}
+
void RangeDeleterService::onStepUpComplete(OperationContext* opCtx, long long term) {
if (!feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) {
return;
@@ -249,22 +305,14 @@ void RangeDeleterService::onStepUpComplete(OperationContext* opCtx, long long te
return;
}
+ // Wait until all tasks and thread from previous term drain
+ _joinAndResetState();
+
auto lock = _acquireMutexUnconditionally();
dassert(_state == kDown, "Service expected to be down before stepping up");
_state = kInitializing;
- if (_executor) {
- // Join previously shutted down executor before reinstantiating it
- _executor->join();
- _executor.reset();
- } else {
- // Initializing the op observer, only executed once at the first step-up
- auto opObserverRegistry =
- checked_cast<OpObserverRegistry*>(opCtx->getServiceContext()->getOpObserver());
- opObserverRegistry->addObserver(std::make_unique<RangeDeleterServiceOpObserver>());
- }
-
const std::string kExecName("RangeDeleterServiceExecutor");
auto net = executor::makeNetworkInterface(kExecName);
auto pool = std::make_unique<executor::NetworkInterfaceThreadPool>(net.get());
@@ -375,47 +423,55 @@ void RangeDeleterService::_recoverRangeDeletionsOnStepUp(OperationContext* opCtx
.semi();
}
-void RangeDeleterService::_stopService(bool joinExecutor) {
- if (!feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) {
- return;
- }
+void RangeDeleterService::_joinAndResetState() {
+ invariant(_state == kDown);
+ // Join the thread spawned on step-up to resume range deletions
+ _stepUpCompletedFuture.getNoThrow().ignore();
- {
- auto lock = _acquireMutexUnconditionally();
- _state = kDown;
- if (_initOpCtxHolder) {
- stdx::lock_guard<Client> lk(*_initOpCtxHolder->getClient());
- _initOpCtxHolder->markKilled(ErrorCodes::Interrupted);
- }
+ // Join and destruct the executor
+ if (_executor) {
+ _executor->join();
+ _executor.reset();
}
- // Join the thread spawned on step-up to resume range deletions
- _stepUpCompletedFuture.getNoThrow().ignore();
+ // Join and destruct the processor
+ _readyRangeDeletionsProcessorPtr.reset();
+
+ // Clear range deletions potentially created during recovery
+ _rangeDeletionTasks.clear();
+}
+void RangeDeleterService::_stopService() {
auto lock = _acquireMutexUnconditionally();
+ if (_state == kDown)
+ return;
+
+ _state = kDown;
+ if (_initOpCtxHolder) {
+ stdx::lock_guard<Client> lk(*_initOpCtxHolder->getClient());
+ _initOpCtxHolder->markKilled(ErrorCodes::Interrupted);
+ }
- // It may happen for the `onStepDown` hook to be invoked on a SECONDARY node transitioning
- // to ROLLBACK, hence the executor may have never been initialized
if (_executor) {
_executor->shutdown();
- if (joinExecutor) {
- _executor->join();
- }
}
- // Destroy the range deletion processor in order to stop range deletions
- _readyRangeDeletionsProcessorPtr.reset();
+ // Shutdown the range deletion processor to interrupt range deletions
+ if (_readyRangeDeletionsProcessorPtr) {
+ _readyRangeDeletionsProcessorPtr->shutdown();
+ }
// Clear range deletion tasks map in order to notify potential waiters on completion futures
_rangeDeletionTasks.clear();
}
void RangeDeleterService::onStepDown() {
- _stopService(false /* joinExecutor */);
+ _stopService();
}
void RangeDeleterService::onShutdown() {
- _stopService(true /* joinExecutor */);
+ _stopService();
+ _joinAndResetState();
}
BSONObj RangeDeleterService::dumpState() {
@@ -477,10 +533,9 @@ SharedSemiFuture<void> RangeDeleterService::registerTask(
.then([this, rdt = rdt]() {
// Step 3: schedule the actual range deletion task
auto lock = _acquireMutexUnconditionally();
- invariant(
- _readyRangeDeletionsProcessorPtr || _state == kDown,
- "The range deletions processor must be instantiated if the state != kDown");
if (_state != kDown) {
+ invariant(_readyRangeDeletionsProcessorPtr,
+ "The range deletions processor is not initialized");
_readyRangeDeletionsProcessorPtr->emplaceRangeDeletion(rdt);
}
});
diff --git a/src/mongo/db/s/range_deleter_service.h b/src/mongo/db/s/range_deleter_service.h
index 68cca97454c..c816c8b9db2 100644
--- a/src/mongo/db/s/range_deleter_service.h
+++ b/src/mongo/db/s/range_deleter_service.h
@@ -110,58 +110,37 @@ private:
*/
class ReadyRangeDeletionsProcessor {
public:
- ReadyRangeDeletionsProcessor(OperationContext* opCtx) {
- _thread = stdx::thread([this] { _runRangeDeletions(); });
- stdx::unique_lock<Latch> lock(_mutex);
- opCtx->waitForConditionOrInterrupt(
- _condVar, lock, [&] { return _threadOpCtxHolder.is_initialized(); });
- }
-
- ~ReadyRangeDeletionsProcessor() {
- {
- stdx::unique_lock<Latch> lock(_mutex);
- // The `_threadOpCtxHolder` may have been already reset/interrupted in case the
- // thread got interrupted due to stepdown
- if (_threadOpCtxHolder) {
- stdx::lock_guard<Client> scopedClientLock(*(*_threadOpCtxHolder)->getClient());
- if ((*_threadOpCtxHolder)->checkForInterruptNoAssert().isOK()) {
- (*_threadOpCtxHolder)->markKilled(ErrorCodes::Interrupted);
- }
- }
- _condVar.notify_all();
- }
+ ReadyRangeDeletionsProcessor(OperationContext* opCtx);
+ ~ReadyRangeDeletionsProcessor();
- if (_thread.joinable()) {
- _thread.join();
- }
- }
+ /*
+ * Interrupt ongoing range deletions
+ */
+ void shutdown();
/*
* Schedule a range deletion at the end of the queue
*/
- void emplaceRangeDeletion(const RangeDeletionTask& rdt) {
- stdx::unique_lock<Latch> lock(_mutex);
- _queue.push(rdt);
- _condVar.notify_all();
- }
+ void emplaceRangeDeletion(const RangeDeletionTask& rdt);
private:
/*
+ * Return true if this processor have been shutted down
+ */
+ bool _stopRequested() const;
+
+ /*
* Remove a range deletion from the head of the queue. Supposed to be called only once a
* range deletion successfully finishes.
*/
- void _completedRangeDeletion() {
- stdx::unique_lock<Latch> lock(_mutex);
- dassert(!_queue.empty());
- _queue.pop();
- }
+ void _completedRangeDeletion();
/*
* Code executed by the internal thread
*/
void _runRangeDeletions();
- Mutex _mutex = MONGO_MAKE_LATCH("ReadyRangeDeletionsProcessor");
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("ReadyRangeDeletionsProcessor");
/*
* Condition variable notified when:
@@ -175,10 +154,13 @@ private:
std::queue<RangeDeletionTask> _queue;
/* Pointer to the (one and only) operation context used by the thread */
- boost::optional<ServiceContext::UniqueOperationContext> _threadOpCtxHolder;
+ ServiceContext::UniqueOperationContext _threadOpCtxHolder;
/* Thread consuming the range deletions queue */
stdx::thread _thread;
+
+ enum State { kRunning, kStopped };
+ State _state{kRunning};
};
// Keeping track of per-collection registered range deletion tasks
@@ -253,6 +235,7 @@ public:
const ChunkRange& range);
/* ReplicaSetAwareServiceShardSvr implemented methods */
+ void onStartup(OperationContext* opCtx) override;
void onStepUpComplete(OperationContext* opCtx, long long term) override;
void onStepDown() override;
void onShutdown() override;
@@ -276,17 +259,21 @@ public:
std::unique_ptr<ReadyRangeDeletionsProcessor> _readyRangeDeletionsProcessorPtr;
private:
+ /* Join all threads and executor and reset the in memory state of the service
+ * Used for onStartUpBegin and on onShutdown
+ */
+ void _joinAndResetState();
+
/* Asynchronously register range deletions on the service. To be called on on step-up */
void _recoverRangeDeletionsOnStepUp(OperationContext* opCtx);
- /* Called by shutdown/stepdown hooks to reset the service */
- void _stopService(bool joinExecutor);
+ /* Called by shutdown/stepdown hooks to interrupt the service */
+ void _stopService();
/* ReplicaSetAwareServiceShardSvr "empty implemented" methods */
- void onStartup(OperationContext* opCtx) override final{};
void onInitialDataAvailable(OperationContext* opCtx,
bool isMajorityDataAvailable) override final {}
- void onStepUpBegin(OperationContext* opCtx, long long term) override final {}
+ void onStepUpBegin(OperationContext* opCtx, long long term) override final{};
void onBecomeArbiter() override final {}
};
diff --git a/src/mongo/db/s/range_deleter_service_test.cpp b/src/mongo/db/s/range_deleter_service_test.cpp
index d2406120483..0807867da2d 100644
--- a/src/mongo/db/s/range_deleter_service_test.cpp
+++ b/src/mongo/db/s/range_deleter_service_test.cpp
@@ -45,6 +45,7 @@ void RangeDeleterServiceTest::setUp() {
ShardServerTestFixture::setUp();
WaitForMajorityService::get(getServiceContext()).startup(getServiceContext());
opCtx = operationContext();
+ RangeDeleterService::get(opCtx)->onStartup(opCtx);
RangeDeleterService::get(opCtx)->onStepUpComplete(opCtx, 0L);
RangeDeleterService::get(opCtx)->_waitForRangeDeleterServiceUp_FOR_TESTING();
@@ -275,12 +276,6 @@ TEST_F(RangeDeleterServiceTest, ScheduledTaskInvalidatedOnStepDown) {
// Manually trigger disabling of the service
rds->onStepDown();
- ON_BLOCK_EXIT([&] {
- // Re-enable the service for clean teardown
- rds->onStepUpComplete(opCtx, 0L);
- rds->_waitForRangeDeleterServiceUp_FOR_TESTING();
- });
-
try {
completionFuture.get(opCtx);
} catch (const ExceptionForCat<ErrorCategory::Interruption>&) {
@@ -293,12 +288,6 @@ TEST_F(RangeDeleterServiceTest, NoActionPossibleIfServiceIsDown) {
// Manually trigger disabling of the service
rds->onStepDown();
- ON_BLOCK_EXIT([&] {
- // Re-enable the service for clean teardown
- rds->onStepUpComplete(opCtx, 0L);
- rds->_waitForRangeDeleterServiceUp_FOR_TESTING();
- });
-
auto taskWithOngoingQueries = createRangeDeletionTaskWithOngoingQueries(
uuidCollA, BSON(kShardKey << 0), BSON(kShardKey << 10), CleanWhenEnum::kDelayed);
@@ -886,10 +875,6 @@ TEST_F(RangeDeleterServiceTest, WaitForOngoingQueriesInvalidatedOnStepDown) {
// Manually trigger disabling of the service
rds->onStepDown();
- ON_BLOCK_EXIT([&] {
- rds->onStepUpComplete(opCtx, 0L); // Re-enable the service
- });
-
try {
completionFuture.get(opCtx);
} catch (const ExceptionForCat<ErrorCategory::Interruption>&) {