summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/range_deleter_service.cpp90
-rw-r--r--src/mongo/db/s/range_deleter_service.h5
-rw-r--r--src/mongo/db/s/range_deleter_service_test.cpp56
3 files changed, 142 insertions, 9 deletions
diff --git a/src/mongo/db/s/range_deleter_service.cpp b/src/mongo/db/s/range_deleter_service.cpp
index 84de8538cc4..95e3f0477a3 100644
--- a/src/mongo/db/s/range_deleter_service.cpp
+++ b/src/mongo/db/s/range_deleter_service.cpp
@@ -28,10 +28,13 @@
*/
#include "mongo/db/s/range_deleter_service.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/op_observer/op_observer_registry.h"
+#include "mongo/db/s/balancer_stats_registry.h"
#include "mongo/db/s/range_deleter_service_op_observer.h"
#include "mongo/logv2/log.h"
#include "mongo/s/sharding_feature_flags_gen.h"
+#include "mongo/util/future_util.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingRangeDeleter
@@ -83,18 +86,88 @@ void RangeDeleterService::onStepUpComplete(OperationContext* opCtx, long long te
_executor = std::move(taskExecutor);
_executor->startup();
- _recoverRangeDeletionsOnStepUp();
+ _recoverRangeDeletionsOnStepUp(opCtx);
}
-void RangeDeleterService::_recoverRangeDeletionsOnStepUp() {
-
+void RangeDeleterService::_recoverRangeDeletionsOnStepUp(OperationContext* opCtx) {
if (disableResumableRangeDeleter.load()) {
_state.store(kDown);
return;
}
- // TODO SERVER-68348 Asynchronously register tasks on the range deleter service on step-up
- _state.store(kUp);
+ LOGV2(6834800, "Resubmitting range deletion tasks");
+
+ ServiceContext* serviceContext = opCtx->getServiceContext();
+
+ ExecutorFuture<void>(_executor)
+ .then([serviceContext, this] {
+ ThreadClient tc("ResubmitRangeDeletionsOnStepUp", serviceContext);
+ {
+ stdx::lock_guard<Client> lk(*tc.get());
+ tc->setSystemOperationKillableByStepdown(lk);
+ }
+ auto opCtx = tc->makeOperationContext();
+ opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
+
+ ScopedRangeDeleterLock rangeDeleterLock(opCtx.get());
+ DBDirectClient client(opCtx.get());
+
+ int nRescheduledTasks = 0;
+
+ // (1) register range deletion tasks marked as "processing"
+ auto processingTasksCompletionFuture = [&] {
+ std::vector<ExecutorFuture<void>> processingTasksCompletionFutures;
+ FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace);
+ findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName << true));
+ auto cursor = client.find(std::move(findCommand));
+
+ while (cursor->more()) {
+ auto completionFuture = this->registerTask(
+ RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"),
+ cursor->next()),
+ SemiFuture<void>::makeReady(),
+ true /* fromResubmitOnStepUp */);
+ nRescheduledTasks++;
+ processingTasksCompletionFutures.push_back(
+ completionFuture.thenRunOn(_executor));
+ }
+
+ if (nRescheduledTasks > 1) {
+ LOGV2_WARNING(6834801,
+ "Rescheduling several range deletions marked as processing. "
+ "Orphans count may be off while they are not drained",
+ "numRangeDeletionsMarkedAsProcessing"_attr = nRescheduledTasks);
+ }
+
+ return processingTasksCompletionFutures.size() > 0
+ ? whenAllSucceed(std::move(processingTasksCompletionFutures)).share()
+ : SemiFuture<void>::makeReady().share();
+ }();
+
+ // (2) register all other "non-pending" tasks
+ {
+ FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace);
+ findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName
+ << BSON("$ne" << true)
+ << RangeDeletionTask::kPendingFieldName
+ << BSON("$ne" << true)));
+ auto cursor = client.find(std::move(findCommand));
+ while (cursor->more()) {
+ (void)this->registerTask(
+ RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"),
+ cursor->next()),
+ processingTasksCompletionFuture.thenRunOn(_executor).semi(),
+ true /* fromResubmitOnStepUp */);
+ }
+ }
+
+ LOGV2_INFO(6834802,
+ "Finished resubmitting range deletion tasks",
+ "nRescheduledTasks"_attr = nRescheduledTasks);
+
+ this->_state.store(kUp);
+ })
+ .getAsync([](auto) {});
}
void RangeDeleterService::onStepDown() {
@@ -124,7 +197,9 @@ BSONObj RangeDeleterService::dumpState() {
}
SharedSemiFuture<void> RangeDeleterService::registerTask(
- const RangeDeletionTask& rdt, SemiFuture<void>&& waitForActiveQueriesToComplete) {
+ const RangeDeletionTask& rdt,
+ SemiFuture<void>&& waitForActiveQueriesToComplete,
+ bool fromResubmitOnStepUp) {
if (disableResumableRangeDeleter.load()) {
return SemiFuture<void>::makeReady(
@@ -182,7 +257,8 @@ SharedSemiFuture<void> RangeDeleterService::registerTask(
.share();
auto [taskCompletionFuture, inserted] = [&]() -> std::pair<SharedSemiFuture<void>, bool> {
- auto lock = _acquireMutexFailIfServiceNotUp();
+ auto lock = fromResubmitOnStepUp ? _acquireMutexUnconditionally()
+ : _acquireMutexFailIfServiceNotUp();
auto [registeredTask, inserted] = _rangeDeletionTasks[rdt.getCollectionUuid()].insert(
std::make_shared<RangeDeletion>(RangeDeletion(rdt, chainCompletionFuture)));
auto retFuture = static_cast<RangeDeletion*>(registeredTask->get())->getCompletionFuture();
diff --git a/src/mongo/db/s/range_deleter_service.h b/src/mongo/db/s/range_deleter_service.h
index 75e2c4dd655..33144c3f190 100644
--- a/src/mongo/db/s/range_deleter_service.h
+++ b/src/mongo/db/s/range_deleter_service.h
@@ -120,7 +120,8 @@ public:
*/
SharedSemiFuture<void> registerTask(
const RangeDeletionTask& rdt,
- SemiFuture<void>&& waitForActiveQueriesToComplete = SemiFuture<void>::makeReady());
+ SemiFuture<void>&& waitForActiveQueriesToComplete = SemiFuture<void>::makeReady(),
+ bool fromResubmitOnStepUp = false);
/*
* Deregister a task from the range deleter service.
@@ -153,7 +154,7 @@ public:
private:
/* Asynchronously register range deletions on the service. To be called on on step-up */
- void _recoverRangeDeletionsOnStepUp();
+ void _recoverRangeDeletionsOnStepUp(OperationContext* opCtx);
/* ReplicaSetAwareServiceShardSvr "empty implemented" methods */
void onStartup(OperationContext* opCtx) override final{};
diff --git a/src/mongo/db/s/range_deleter_service_test.cpp b/src/mongo/db/s/range_deleter_service_test.cpp
index d12da47c2f4..414f8b284d3 100644
--- a/src/mongo/db/s/range_deleter_service_test.cpp
+++ b/src/mongo/db/s/range_deleter_service_test.cpp
@@ -31,6 +31,7 @@
#include "mongo/bson/unordered_fields_bsonobj_comparator.h"
#include "mongo/db/catalog/create_collection.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/persistent_task_store.h"
#include "mongo/db/s/operation_sharding_state.h"
namespace mongo {
@@ -567,4 +568,59 @@ TEST_F(RangeDeleterServiceTest,
ASSERT(overlappingRangeFutureWhenDisabled.isReady());
}
+TEST_F(RangeDeleterServiceTest, RescheduleRangeDeletionTasksOnStepUp) {
+ PseudoRandom random(SecureRandom().nextInt64());
+ auto rds = RangeDeleterService::get(opCtx);
+
+ // Trigger step-down
+ rds->onStepDown();
+
+ // Random number of range deletion documents to generate (minimum 1, maximum 20).
+ int nRangeDeletionTasks = random.nextInt32(20) + 1;
+
+ PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
+
+ // Generate and persist range deleter tasks (some pending, some non-pending, some non-pending &&
+ // processing)
+ int nPending = 0, nNonPending = 0, nNonPendingAndProcessing = 0;
+ int minBound = 0;
+ for (int i = 0; i < nRangeDeletionTasks; i++) {
+ auto rangeDeletionTask = createRangeDeletionTask(
+ uuidCollA, BSON("a" << minBound), BSON("a" << minBound + 10), CleanWhenEnum::kDelayed);
+ minBound += 10;
+
+ auto rand = random.nextInt32() % 3;
+ if (rand == 0) {
+ // Pending range deletion task
+ rangeDeletionTask.setPending(true);
+ nPending++;
+ } else if (rand == 1) {
+ // Non-pending range deletion task
+ rangeDeletionTask.setPending(false);
+ nNonPending++;
+ } else if (rand == 2) {
+ // Non-pending and processing range deletion task
+ rangeDeletionTask.setPending(false);
+ rangeDeletionTask.setProcessing(true);
+ nNonPendingAndProcessing++;
+ }
+
+ store.add(opCtx, rangeDeletionTask);
+ }
+
+ // Trigger step-up
+ rds->onStepUpComplete(opCtx, 0L);
+
+ // Check that all non-pending tasks are being rescheduled
+ while (true) {
+ try {
+ ASSERT_EQ(nNonPending + nNonPendingAndProcessing,
+ rds->getNumRangeDeletionTasksForCollection(uuidCollA));
+ break;
+ } catch (const ExceptionFor<ErrorCodes::NotYetInitialized>&) {
+ // Retry as long as range deletion tasks are being resubmitted
+ }
+ }
+}
+
} // namespace mongo