summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2022-09-10 15:55:51 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-10 16:34:21 +0000
commitfba24ea907dd3220fd2e3149c6ac81fd67b16254 (patch)
tree5c9a44373b9add8a6693dafd73424656a3569853
parent27c853347660310304d6fe2a3870a9a083aba953 (diff)
downloadmongo-fba24ea907dd3220fd2e3149c6ac81fd67b16254.tar.gz
Revert "SERVER-68348 Asynchronously register tasks on the range deleter service on step-up"
This reverts commit 30583a2c505c03e4d54bbb14cab170b225c071f2.
-rw-r--r--src/mongo/db/s/range_deleter_service.cpp90
-rw-r--r--src/mongo/db/s/range_deleter_service.h5
-rw-r--r--src/mongo/db/s/range_deleter_service_test.cpp56
3 files changed, 9 insertions, 142 deletions
diff --git a/src/mongo/db/s/range_deleter_service.cpp b/src/mongo/db/s/range_deleter_service.cpp
index acc97caeea9..63a2fb33cf8 100644
--- a/src/mongo/db/s/range_deleter_service.cpp
+++ b/src/mongo/db/s/range_deleter_service.cpp
@@ -28,13 +28,10 @@
*/
#include "mongo/db/s/range_deleter_service.h"
-#include "mongo/db/dbdirectclient.h"
#include "mongo/db/op_observer/op_observer_registry.h"
-#include "mongo/db/s/balancer_stats_registry.h"
#include "mongo/db/s/range_deleter_service_op_observer.h"
#include "mongo/logv2/log.h"
#include "mongo/s/sharding_feature_flags_gen.h"
-#include "mongo/util/future_util.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingRangeDeleter
@@ -86,88 +83,18 @@ void RangeDeleterService::onStepUpComplete(OperationContext* opCtx, long long te
_executor = std::move(taskExecutor);
_executor->startup();
- _recoverRangeDeletionsOnStepUp(opCtx);
+ _recoverRangeDeletionsOnStepUp();
}
-void RangeDeleterService::_recoverRangeDeletionsOnStepUp(OperationContext* opCtx) {
+void RangeDeleterService::_recoverRangeDeletionsOnStepUp() {
+
if (disableResumableRangeDeleter.load()) {
_state.store(kDown);
return;
}
- LOGV2(6834800, "Resubmitting range deletion tasks");
-
- ServiceContext* serviceContext = opCtx->getServiceContext();
-
- ExecutorFuture<void>(_executor)
- .then([serviceContext, this] {
- ThreadClient tc("ResubmitRangeDeletionsOnStepUp", serviceContext);
- {
- stdx::lock_guard<Client> lk(*tc.get());
- tc->setSystemOperationKillableByStepdown(lk);
- }
- auto opCtx = tc->makeOperationContext();
- opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
-
- ScopedRangeDeleterLock rangeDeleterLock(opCtx.get());
- DBDirectClient client(opCtx.get());
-
- int nRescheduledTasks = 0;
-
- // (1) register range deletion tasks marked as "processing"
- auto processingTasksCompletionFuture = [&] {
- std::vector<ExecutorFuture<void>> processingTasksCompletionFutures;
- FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace);
- findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName << true));
- auto cursor = client.find(std::move(findCommand));
-
- while (cursor->more()) {
- auto completionFuture = this->registerTask(
- RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"),
- cursor->next()),
- SemiFuture<void>::makeReady(),
- true /* fromResubmitOnStepUp */);
- nRescheduledTasks++;
- processingTasksCompletionFutures.push_back(
- completionFuture.thenRunOn(_executor));
- }
-
- if (nRescheduledTasks > 1) {
- LOGV2_WARNING(6834801,
- "Rescheduling several range deletions marked as processing. "
- "Orphans count may be off while they are not drained",
- "numRangeDeletionsMarkedAsProcessing"_attr = nRescheduledTasks);
- }
-
- return processingTasksCompletionFutures.size() > 0
- ? whenAllSucceed(std::move(processingTasksCompletionFutures)).share()
- : SemiFuture<void>::makeReady().share();
- }();
-
- // (2) register all other "non-pending" tasks
- {
- FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace);
- findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName
- << BSON("$ne" << true)
- << RangeDeletionTask::kPendingFieldName
- << BSON("$ne" << true)));
- auto cursor = client.find(std::move(findCommand));
- while (cursor->more()) {
- (void)this->registerTask(
- RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"),
- cursor->next()),
- processingTasksCompletionFuture.thenRunOn(_executor).semi(),
- true /* fromResubmitOnStepUp */);
- }
- }
-
- LOGV2_INFO(6834802,
- "Finished resubmitting range deletion tasks",
- "nRescheduledTasks"_attr = nRescheduledTasks);
-
- this->_state.store(kUp);
- })
- .getAsync([](auto) {});
+ // TODO SERVER-68348 Asynchronously register tasks on the range deleter service on step-up
+ _state.store(kUp);
}
void RangeDeleterService::onStepDown() {
@@ -207,9 +134,7 @@ long long RangeDeleterService::totalNumOfRegisteredTasks() {
}
SharedSemiFuture<void> RangeDeleterService::registerTask(
- const RangeDeletionTask& rdt,
- SemiFuture<void>&& waitForActiveQueriesToComplete,
- bool fromResubmitOnStepUp) {
+ const RangeDeletionTask& rdt, SemiFuture<void>&& waitForActiveQueriesToComplete) {
if (disableResumableRangeDeleter.load()) {
return SemiFuture<void>::makeReady(
@@ -267,8 +192,7 @@ SharedSemiFuture<void> RangeDeleterService::registerTask(
.share();
auto [taskCompletionFuture, inserted] = [&]() -> std::pair<SharedSemiFuture<void>, bool> {
- auto lock = fromResubmitOnStepUp ? _acquireMutexUnconditionally()
- : _acquireMutexFailIfServiceNotUp();
+ auto lock = _acquireMutexFailIfServiceNotUp();
auto [registeredTask, inserted] = _rangeDeletionTasks[rdt.getCollectionUuid()].insert(
std::make_shared<RangeDeletion>(RangeDeletion(rdt, chainCompletionFuture)));
auto retFuture = static_cast<RangeDeletion*>(registeredTask->get())->getCompletionFuture();
diff --git a/src/mongo/db/s/range_deleter_service.h b/src/mongo/db/s/range_deleter_service.h
index a946865739c..4b1e09ce919 100644
--- a/src/mongo/db/s/range_deleter_service.h
+++ b/src/mongo/db/s/range_deleter_service.h
@@ -120,8 +120,7 @@ public:
*/
SharedSemiFuture<void> registerTask(
const RangeDeletionTask& rdt,
- SemiFuture<void>&& waitForActiveQueriesToComplete = SemiFuture<void>::makeReady(),
- bool fromResubmitOnStepUp = false);
+ SemiFuture<void>&& waitForActiveQueriesToComplete = SemiFuture<void>::makeReady());
/*
* Deregister a task from the range deleter service.
@@ -159,7 +158,7 @@ public:
private:
/* Asynchronously register range deletions on the service. To be called on on step-up */
- void _recoverRangeDeletionsOnStepUp(OperationContext* opCtx);
+ void _recoverRangeDeletionsOnStepUp();
/* ReplicaSetAwareServiceShardSvr "empty implemented" methods */
void onStartup(OperationContext* opCtx) override final{};
diff --git a/src/mongo/db/s/range_deleter_service_test.cpp b/src/mongo/db/s/range_deleter_service_test.cpp
index a6ba43ac540..c359d31725d 100644
--- a/src/mongo/db/s/range_deleter_service_test.cpp
+++ b/src/mongo/db/s/range_deleter_service_test.cpp
@@ -31,7 +31,6 @@
#include "mongo/bson/unordered_fields_bsonobj_comparator.h"
#include "mongo/db/catalog/create_collection.h"
#include "mongo/db/catalog_raii.h"
-#include "mongo/db/persistent_task_store.h"
#include "mongo/db/s/operation_sharding_state.h"
namespace mongo {
@@ -588,59 +587,4 @@ TEST_F(RangeDeleterServiceTest,
ASSERT(overlappingRangeFutureWhenDisabled.isReady());
}
-TEST_F(RangeDeleterServiceTest, RescheduleRangeDeletionTasksOnStepUp) {
- PseudoRandom random(SecureRandom().nextInt64());
- auto rds = RangeDeleterService::get(opCtx);
-
- // Trigger step-down
- rds->onStepDown();
-
- // Random number of range deletion documents to generate (minimum 1, maximum 20).
- int nRangeDeletionTasks = random.nextInt32(20) + 1;
-
- PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
-
- // Generate and persist range deleter tasks (some pending, some non-pending, some non-pending &&
- // processing)
- int nPending = 0, nNonPending = 0, nNonPendingAndProcessing = 0;
- int minBound = 0;
- for (int i = 0; i < nRangeDeletionTasks; i++) {
- auto rangeDeletionTask = createRangeDeletionTask(
- uuidCollA, BSON("a" << minBound), BSON("a" << minBound + 10), CleanWhenEnum::kDelayed);
- minBound += 10;
-
- auto rand = random.nextInt32() % 3;
- if (rand == 0) {
- // Pending range deletion task
- rangeDeletionTask.setPending(true);
- nPending++;
- } else if (rand == 1) {
- // Non-pending range deletion task
- rangeDeletionTask.setPending(false);
- nNonPending++;
- } else if (rand == 2) {
- // Non-pending and processing range deletion task
- rangeDeletionTask.setPending(false);
- rangeDeletionTask.setProcessing(true);
- nNonPendingAndProcessing++;
- }
-
- store.add(opCtx, rangeDeletionTask);
- }
-
- // Trigger step-up
- rds->onStepUpComplete(opCtx, 0L);
-
- // Check that all non-pending tasks are being rescheduled
- while (true) {
- try {
- ASSERT_EQ(nNonPending + nNonPendingAndProcessing,
- rds->getNumRangeDeletionTasksForCollection(uuidCollA));
- break;
- } catch (const ExceptionFor<ErrorCodes::NotYetInitialized>&) {
- // Retry as long as range deletion tasks are being resubmitted
- }
- }
-}
-
} // namespace mongo