summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPierlauro Sciarelli <pierlauro.sciarelli@mongodb.com>2022-10-17 16:36:21 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-10-17 17:34:48 +0000
commit46d15e1715071f615fe6ceb4c54cf2803f69b7fc (patch)
tree4abe5c53649e7f3dafdf6451b1d41bad36580c83
parent4b08c5ca99f670bf5137d81ee78947651aa727c6 (diff)
downloadmongo-46d15e1715071f615fe6ceb4c54cf2803f69b7fc.tar.gz
SERVER-65558 Enable Feature flag for PM-2849
-rw-r--r--buildscripts/resmokeconfig/fully_disabled_feature_flags.yml2
-rw-r--r--jstests/sharding/disable_resumable_range_deleter.js1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp5
-rw-r--r--src/mongo/db/s/collection_sharding_runtime.cpp53
-rw-r--r--src/mongo/db/s/collection_sharding_runtime_test.cpp51
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp27
-rw-r--r--src/mongo/db/s/collection_sharding_state_factory_shard.cpp5
-rw-r--r--src/mongo/db/s/metadata_manager.cpp5
-rw-r--r--src/mongo/db/s/metadata_manager.h3
-rw-r--r--src/mongo/db/s/metadata_manager_test.cpp17
-rw-r--r--src/mongo/db/s/migration_coordinator.cpp70
-rw-r--r--src/mongo/db/s/migration_coordinator.h4
-rw-r--r--src/mongo/db/s/migration_source_manager.h2
-rw-r--r--src/mongo/db/s/migration_util_test.cpp2
-rw-r--r--src/mongo/db/s/range_deleter_service.cpp32
-rw-r--r--src/mongo/db/s/range_deleter_service.h43
-rw-r--r--src/mongo/db/s/range_deleter_service_test.cpp28
-rw-r--r--src/mongo/db/s/range_deleter_service_test_util.cpp6
-rw-r--r--src/mongo/db/s/sharding_server_status.cpp6
-rw-r--r--src/mongo/s/sharding_feature_flags.idl3
20 files changed, 248 insertions, 117 deletions
diff --git a/buildscripts/resmokeconfig/fully_disabled_feature_flags.yml b/buildscripts/resmokeconfig/fully_disabled_feature_flags.yml
index 678b3c522c3..1188d8163cc 100644
--- a/buildscripts/resmokeconfig/fully_disabled_feature_flags.yml
+++ b/buildscripts/resmokeconfig/fully_disabled_feature_flags.yml
@@ -12,5 +12,3 @@
# released create the transactions collection index and is only meant to be enabled adhoc, so only
# its targeted tests should enable it.
- featureFlagAlwaysCreateConfigTransactionsPartialIndexOnStepUp
-# Disable 'featureFlagRangeDeleterService' as long as the service is not performing actual range deletions
-- featureFlagRangeDeleterService
diff --git a/jstests/sharding/disable_resumable_range_deleter.js b/jstests/sharding/disable_resumable_range_deleter.js
index ddf7c8a698f..760a937fbab 100644
--- a/jstests/sharding/disable_resumable_range_deleter.js
+++ b/jstests/sharding/disable_resumable_range_deleter.js
@@ -51,7 +51,6 @@ st.rs0.restart(0, {
});
jsTest.log("Shard0 should fail to submit the range deletion task on stepup.");
-checkLog.contains(st.rs0.getPrimary(), "Failed to submit range deletion task");
jsTest.log("Shard0 should fail to receive a range that overlaps the range deletion task.");
// The error from moveChunk gets wrapped as an OperationFailed error, so we have to check the error
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 677504159fc..bdd03bc69fe 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -112,6 +112,7 @@
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/cluster_identity_loader.h"
#include "mongo/s/grid.h"
+#include "mongo/s/sharding_feature_flags_gen.h"
#include "mongo/stdx/thread.h"
#include "mongo/transport/service_entry_point.h"
#include "mongo/util/assert_util.h"
@@ -951,7 +952,9 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
// Note, these must be done after the configOpTime is recovered via
// ShardingStateRecovery::recover above, because they may trigger filtering metadata
// refreshes which should use the recovered configOpTime.
- migrationutil::resubmitRangeDeletionsOnStepUp(_service);
+ if (!mongo::feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) {
+ migrationutil::resubmitRangeDeletionsOnStepUp(_service);
+ }
migrationutil::resumeMigrationCoordinationsOnStepUp(opCtx);
migrationutil::resumeMigrationRecipientsOnStepUp(opCtx);
diff --git a/src/mongo/db/s/collection_sharding_runtime.cpp b/src/mongo/db/s/collection_sharding_runtime.cpp
index f112f54ada8..e2b02e7a452 100644
--- a/src/mongo/db/s/collection_sharding_runtime.cpp
+++ b/src/mongo/db/s/collection_sharding_runtime.cpp
@@ -33,9 +33,11 @@
#include "mongo/db/catalog_raii.h"
#include "mongo/db/global_settings.h"
#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/s/range_deleter_service.h"
#include "mongo/db/s/sharding_runtime_d_params_gen.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/logv2/log.h"
+#include "mongo/s/sharding_feature_flags_gen.h"
#include "mongo/util/duration.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
@@ -260,9 +262,14 @@ void CollectionShardingRuntime::clearFilteringMetadataForDroppedCollection(
SharedSemiFuture<void> CollectionShardingRuntime::cleanUpRange(ChunkRange const& range,
CleanWhen when) {
- stdx::lock_guard lk(_metadataManagerLock);
- invariant(_metadataType == MetadataType::kSharded);
- return _metadataManager->cleanUpRange(range, when == kDelayed);
+ if (!feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) {
+ stdx::lock_guard lk(_metadataManagerLock);
+ invariant(_metadataType == MetadataType::kSharded);
+ return _metadataManager->cleanUpRange(range, when == kDelayed);
+ }
+
+ // This method must never be called if the range deleter service feature flag is enabled
+ MONGO_UNREACHABLE;
}
Status CollectionShardingRuntime::waitForClean(OperationContext* opCtx,
@@ -271,9 +278,8 @@ Status CollectionShardingRuntime::waitForClean(OperationContext* opCtx,
ChunkRange orphanRange,
Date_t deadline) {
while (true) {
- boost::optional<SharedSemiFuture<void>> stillScheduled;
-
- {
+ const StatusWith<SharedSemiFuture<void>> swOrphanCleanupFuture =
+ [&]() -> StatusWith<SharedSemiFuture<void>> {
AutoGetCollection autoColl(opCtx, nss, MODE_IX);
auto* const self = CollectionShardingRuntime::get(opCtx, nss);
stdx::lock_guard lk(self->_metadataManagerLock);
@@ -287,16 +293,27 @@ Status CollectionShardingRuntime::waitForClean(OperationContext* opCtx,
"metadata reset"};
}
- stillScheduled = self->_metadataManager->trackOrphanedDataCleanup(orphanRange);
- if (!stillScheduled) {
- LOGV2_OPTIONS(21918,
- {logv2::LogComponent::kShardingMigration},
- "Finished waiting for deletion of {namespace} range {orphanRange}",
- "Finished waiting for deletion of orphans",
- "namespace"_attr = nss.ns(),
- "orphanRange"_attr = redact(orphanRange.toString()));
- return Status::OK();
+ if (feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) {
+ return RangeDeleterService::get(opCtx)->getOverlappingRangeDeletionsFuture(
+ self->_metadataManager->getCollectionUuid(), orphanRange);
+ } else {
+ return self->_metadataManager->trackOrphanedDataCleanup(orphanRange);
}
+ }();
+
+ if (!swOrphanCleanupFuture.isOK()) {
+ return swOrphanCleanupFuture.getStatus();
+ }
+
+ auto orphanCleanupFuture = std::move(swOrphanCleanupFuture.getValue());
+ if (orphanCleanupFuture.isReady()) {
+ LOGV2_OPTIONS(21918,
+ {logv2::LogComponent::kShardingMigration},
+ "Finished waiting for deletion of {namespace} range {orphanRange}",
+ "Finished waiting for deletion of orphans",
+ "namespace"_attr = nss.ns(),
+ "orphanRange"_attr = redact(orphanRange.toString()));
+ return Status::OK();
}
LOGV2_OPTIONS(21919,
@@ -307,12 +324,12 @@ Status CollectionShardingRuntime::waitForClean(OperationContext* opCtx,
"orphanRange"_attr = orphanRange);
try {
opCtx->runWithDeadline(
- deadline, ErrorCodes::ExceededTimeLimit, [&] { stillScheduled->get(opCtx); });
+ deadline, ErrorCodes::ExceededTimeLimit, [&] { orphanCleanupFuture.get(opCtx); });
} catch (const DBException& ex) {
auto result = ex.toStatus();
// Swallow RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist error since the
- // collection could either never exist or get dropped directly from the shard after
- // the range deletion task got scheduled.
+ // collection could either never exist or get dropped directly from the shard after the
+ // range deletion task got scheduled.
if (result != ErrorCodes::RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist) {
return result.withContext(str::stream() << "Failed to delete orphaned " << nss.ns()
<< " range " << orphanRange.toString());
diff --git a/src/mongo/db/s/collection_sharding_runtime_test.cpp b/src/mongo/db/s/collection_sharding_runtime_test.cpp
index 1379772ba14..763e835d7f1 100644
--- a/src/mongo/db/s/collection_sharding_runtime_test.cpp
+++ b/src/mongo/db/s/collection_sharding_runtime_test.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/global_index_ddl_util.h"
#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/s/range_deleter_service_test.h"
#include "mongo/db/s/range_deletion_task_gen.h"
#include "mongo/db/s/shard_filtering_metadata_refresh.h"
#include "mongo/db/s/shard_server_test_fixture.h"
@@ -378,12 +379,17 @@ public:
AutoGetCollection autoColl(operationContext(), kTestNss, MODE_IX);
_uuid = autoColl.getCollection()->uuid();
+
+ RangeDeleterService::get(operationContext())->onStepUpComplete(operationContext(), 0L);
+ RangeDeleterService::get(operationContext())->_waitForRangeDeleterServiceUp_FOR_TESTING();
}
void tearDown() override {
DBDirectClient client(operationContext());
client.dropCollection(kTestNss.ns());
+ RangeDeleterService::get(operationContext())->onStepDown();
+ RangeDeleterService::get(operationContext())->onShutdown();
WaitForMajorityService::get(getServiceContext()).shutDown();
CollectionShardingRuntimeTest::tearDown();
}
@@ -411,25 +417,16 @@ private:
// updates on the range deletions namespace though, so we can get around the issue by inserting
// the task with the 'pending' field set, and then remove the field using a replacement update
// after.
-RangeDeletionTask insertRangeDeletionTask(OperationContext* opCtx,
+RangeDeletionTask createRangeDeletionTask(OperationContext* opCtx,
const NamespaceString& nss,
const UUID& uuid,
const ChunkRange& range,
int64_t numOrphans) {
- PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
auto migrationId = UUID::gen();
- RangeDeletionTask t(migrationId, nss, uuid, ShardId("donor"), range, CleanWhenEnum::kDelayed);
- t.setPending(true);
+ RangeDeletionTask t(migrationId, nss, uuid, ShardId("donor"), range, CleanWhenEnum::kNow);
t.setNumOrphanDocs(numOrphans);
const auto currentTime = VectorClock::get(opCtx)->getTime();
t.setTimestamp(currentTime.clusterTime().asTimestamp());
- store.add(opCtx, t);
-
- auto query = BSON(RangeDeletionTask::kIdFieldName << migrationId);
- t.setPending(boost::none);
- auto update = t.toBSON();
- store.update(opCtx, query, update);
-
return t;
}
@@ -480,14 +477,18 @@ TEST_F(CollectionShardingRuntimeWithRangeDeleterTest,
WaitForCleanBlocksBehindOneScheduledDeletion) {
// Enable fail point to suspendRangeDeletion.
globalFailPointRegistry().find("suspendRangeDeletion")->setMode(FailPoint::alwaysOn);
+ ScopeGuard resetFailPoint(
+ [=] { globalFailPointRegistry().find("suspendRangeDeletion")->setMode(FailPoint::off); });
+
OperationContext* opCtx = operationContext();
auto metadata = makeShardedMetadata(opCtx, uuid());
csr().setFilteringMetadata(opCtx, metadata);
const ChunkRange range = ChunkRange(BSON(kShardKey << MINKEY), BSON(kShardKey << MAXKEY));
- const auto task = insertRangeDeletionTask(opCtx, kTestNss, uuid(), range, 0);
- auto cleanupComplete = csr().cleanUpRange(range, CollectionShardingRuntime::CleanWhen::kNow);
+ const auto task = createRangeDeletionTask(opCtx, kTestNss, uuid(), range, 0);
+ auto taskCompletionFuture = registerAndCreatePersistentTask(
+ opCtx, task, SemiFuture<void>::makeReady() /* waitForActiveQueries */);
opCtx->setDeadlineAfterNowBy(Milliseconds(100), ErrorCodes::MaxTimeMSExpired);
auto status =
@@ -496,7 +497,7 @@ TEST_F(CollectionShardingRuntimeWithRangeDeleterTest,
ASSERT_EQ(status.code(), ErrorCodes::MaxTimeMSExpired);
globalFailPointRegistry().find("suspendRangeDeletion")->setMode(FailPoint::off);
- cleanupComplete.get();
+ taskCompletionFuture.get();
}
TEST_F(CollectionShardingRuntimeWithRangeDeleterTest,
@@ -507,15 +508,15 @@ TEST_F(CollectionShardingRuntimeWithRangeDeleterTest,
const auto middleKey = 5;
const ChunkRange range1 = ChunkRange(BSON(kShardKey << MINKEY), BSON(kShardKey << middleKey));
- const auto task1 = insertRangeDeletionTask(opCtx, kTestNss, uuid(), range1, 0);
+ const auto task1 = createRangeDeletionTask(opCtx, kTestNss, uuid(), range1, 0);
const ChunkRange range2 = ChunkRange(BSON(kShardKey << middleKey), BSON(kShardKey << MAXKEY));
- const auto task2 = insertRangeDeletionTask(opCtx, kTestNss, uuid(), range2, 0);
+ const auto task2 = createRangeDeletionTask(opCtx, kTestNss, uuid(), range2, 0);
- auto cleanupCompleteFirst =
- csr().cleanUpRange(range1, CollectionShardingRuntime::CleanWhen::kNow);
+ auto cleanupCompleteFirst = registerAndCreatePersistentTask(
+ opCtx, task1, SemiFuture<void>::makeReady() /* waitForActiveQueries */);
- auto cleanupCompleteSecond =
- csr().cleanUpRange(range2, CollectionShardingRuntime::CleanWhen::kNow);
+ auto cleanupCompleteSecond = registerAndCreatePersistentTask(
+ opCtx, task2, SemiFuture<void>::makeReady() /* waitForActiveQueries */);
auto status = CollectionShardingRuntime::waitForClean(
opCtx,
@@ -539,9 +540,10 @@ TEST_F(CollectionShardingRuntimeWithRangeDeleterTest,
auto metadata = makeShardedMetadata(opCtx, uuid());
csr().setFilteringMetadata(opCtx, metadata);
const ChunkRange range = ChunkRange(BSON(kShardKey << MINKEY), BSON(kShardKey << MAXKEY));
- const auto task = insertRangeDeletionTask(opCtx, kTestNss, uuid(), range, 0);
+ const auto task = createRangeDeletionTask(opCtx, kTestNss, uuid(), range, 0);
- auto cleanupComplete = csr().cleanUpRange(range, CollectionShardingRuntime::CleanWhen::kNow);
+ auto cleanupComplete = registerAndCreatePersistentTask(
+ opCtx, task, SemiFuture<void>::makeReady() /* waitForActiveQueries */);
auto status =
CollectionShardingRuntime::waitForClean(opCtx, kTestNss, uuid(), range, Date_t::max());
@@ -560,10 +562,11 @@ TEST_F(CollectionShardingRuntimeWithRangeDeleterTest,
auto metadata = makeShardedMetadata(opCtx, uuid());
csr().setFilteringMetadata(opCtx, metadata);
const ChunkRange range = ChunkRange(BSON(kShardKey << MINKEY), BSON(kShardKey << MAXKEY));
- const auto task = insertRangeDeletionTask(opCtx, kTestNss, uuid(), range, 0);
+ const auto task = createRangeDeletionTask(opCtx, kTestNss, uuid(), range, 0);
// Schedule range deletion that will hang due to `suspendRangeDeletion` failpoint
- auto cleanupComplete = csr().cleanUpRange(range, CollectionShardingRuntime::CleanWhen::kNow);
+ auto cleanupComplete = registerAndCreatePersistentTask(
+ opCtx, task, SemiFuture<void>::makeReady() /* waitForActiveQueries */);
// Clear and set again filtering metadata
csr().clearFilteringMetadata(opCtx);
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 17a9127e70f..cf8fed2816e 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -30,6 +30,7 @@
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/logv2/log.h"
+#include "mongo/s/sharding_feature_flags_gen.h"
#include "mongo/util/string_map.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
@@ -82,18 +83,20 @@ public:
}
void appendInfoForServerStatus(BSONObjBuilder* builder) {
- auto totalNumberOfRangesScheduledForDeletion = ([this] {
- stdx::lock_guard lg(_mutex);
- return std::accumulate(_collections.begin(),
- _collections.end(),
- 0LL,
- [](long long total, const auto& coll) {
- return total +
- coll.second->numberOfRangesScheduledForDeletion();
- });
- })();
-
- builder->appendNumber("rangeDeleterTasks", totalNumberOfRangesScheduledForDeletion);
+ if (!mongo::feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) {
+ auto totalNumberOfRangesScheduledForDeletion = ([this] {
+ stdx::lock_guard lg(_mutex);
+ return std::accumulate(_collections.begin(),
+ _collections.end(),
+ 0LL,
+ [](long long total, const auto& coll) {
+ return total +
+ coll.second->numberOfRangesScheduledForDeletion();
+ });
+ })();
+
+ builder->appendNumber("rangeDeleterTasks", totalNumberOfRangesScheduledForDeletion);
+ }
}
std::vector<NamespaceString> getCollectionNames() {
diff --git a/src/mongo/db/s/collection_sharding_state_factory_shard.cpp b/src/mongo/db/s/collection_sharding_state_factory_shard.cpp
index 8a1b65fa270..74939c03ffe 100644
--- a/src/mongo/db/s/collection_sharding_state_factory_shard.cpp
+++ b/src/mongo/db/s/collection_sharding_state_factory_shard.cpp
@@ -36,6 +36,7 @@
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/network_interface_thread_pool.h"
#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/s/sharding_feature_flags_gen.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
@@ -64,6 +65,10 @@ std::unique_ptr<CollectionShardingState> CollectionShardingStateFactoryShard::ma
std::shared_ptr<executor::TaskExecutor>
CollectionShardingStateFactoryShard::_getRangeDeletionExecutor() {
+ if (feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) {
+ return nullptr;
+ }
+
stdx::lock_guard<Latch> lg(_mutex);
if (!_rangeDeletionExecutor) {
const std::string kExecName("CollectionRangeDeleter-TaskExecutor");
diff --git a/src/mongo/db/s/metadata_manager.cpp b/src/mongo/db/s/metadata_manager.cpp
index 96542f14ef2..1d9c63f77e5 100644
--- a/src/mongo/db/s/metadata_manager.cpp
+++ b/src/mongo/db/s/metadata_manager.cpp
@@ -320,8 +320,7 @@ size_t MetadataManager::numberOfRangesScheduledForDeletion() const {
return _rangesScheduledForDeletion.size();
}
-boost::optional<SharedSemiFuture<void>> MetadataManager::trackOrphanedDataCleanup(
- ChunkRange const& range) const {
+SharedSemiFuture<void> MetadataManager::trackOrphanedDataCleanup(ChunkRange const& range) const {
stdx::lock_guard<Latch> lg(_managerLock);
for (const auto& [orphanRange, deletionComplete] : _rangesScheduledForDeletion) {
if (orphanRange.overlapWith(range)) {
@@ -329,7 +328,7 @@ boost::optional<SharedSemiFuture<void>> MetadataManager::trackOrphanedDataCleanu
}
}
- return boost::none;
+ return SemiFuture<void>::makeReady().share();
}
SharedSemiFuture<void> MetadataManager::getOngoingQueriesCompletionFuture(ChunkRange const& range) {
diff --git a/src/mongo/db/s/metadata_manager.h b/src/mongo/db/s/metadata_manager.h
index 5343e7cc123..153dbb5f55f 100644
--- a/src/mongo/db/s/metadata_manager.h
+++ b/src/mongo/db/s/metadata_manager.h
@@ -142,8 +142,7 @@ public:
* returns a future that will be resolved when the newest overlapping range's deletion (possibly
* the one of interest) completes or fails.
*/
- boost::optional<SharedSemiFuture<void>> trackOrphanedDataCleanup(
- ChunkRange const& orphans) const;
+ SharedSemiFuture<void> trackOrphanedDataCleanup(ChunkRange const& orphans) const;
/**
* Returns a future marked as ready when all the ongoing queries retaining the range complete
diff --git a/src/mongo/db/s/metadata_manager_test.cpp b/src/mongo/db/s/metadata_manager_test.cpp
index fca1894abf4..1d5244e080e 100644
--- a/src/mongo/db/s/metadata_manager_test.cpp
+++ b/src/mongo/db/s/metadata_manager_test.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/service_context.h"
#include "mongo/db/vector_clock.h"
#include "mongo/executor/task_executor.h"
+#include "mongo/idl/server_parameter_test_util.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/stdx/condition_variable.h"
@@ -214,6 +215,7 @@ RangeDeletionTask insertRangeDeletionTask(OperationContext* opCtx,
}
TEST_F(MetadataManagerTest, TrackOrphanedDataCleanupBlocksOnScheduledRangeDeletions) {
+ RAIIServerParameterControllerForTest enableFeatureFlag{"featureFlagRangeDeleterService", false};
ChunkRange cr1(BSON("key" << 0), BSON("key" << 10));
const auto task =
insertRangeDeletionTask(operationContext(), kNss, _manager->getCollectionUuid(), cr1, 0);
@@ -225,14 +227,15 @@ TEST_F(MetadataManagerTest, TrackOrphanedDataCleanupBlocksOnScheduledRangeDeleti
ASSERT_FALSE(notifn1.isReady());
ASSERT_EQ(_manager->numberOfRangesToClean(), 1UL);
- auto optNotifn = _manager->trackOrphanedDataCleanup(cr1);
+ auto future = _manager->trackOrphanedDataCleanup(cr1);
ASSERT_FALSE(notifn1.isReady());
- ASSERT_FALSE(optNotifn->isReady());
+ ASSERT_FALSE(future.isReady());
globalFailPointRegistry().find("suspendRangeDeletion")->setMode(FailPoint::off);
}
TEST_F(MetadataManagerTest, CleanupNotificationsAreSignaledWhenMetadataManagerIsDestroyed) {
+ RAIIServerParameterControllerForTest enableFeatureFlag{"featureFlagRangeDeleterService", false};
const ChunkRange rangeToClean(BSON("key" << 20), BSON("key" << 30));
const auto task = insertRangeDeletionTask(
operationContext(), kNss, _manager->getCollectionUuid(), rangeToClean, 0);
@@ -254,15 +257,14 @@ TEST_F(MetadataManagerTest, CleanupNotificationsAreSignaledWhenMetadataManagerIs
auto notif = _manager->cleanUpRange(rangeToClean, false /*delayBeforeDeleting*/);
ASSERT(!notif.isReady());
- auto optNotif = _manager->trackOrphanedDataCleanup(rangeToClean);
- ASSERT(optNotif);
- ASSERT(!optNotif->isReady());
+ auto future = _manager->trackOrphanedDataCleanup(rangeToClean);
+ ASSERT(!future.isReady());
// Reset the original shared_ptr. The cursorOnMovedMetadata will still contain its own copy of
// the shared_ptr though, so the destructor of ~MetadataManager won't yet be called.
_manager.reset();
ASSERT(!notif.isReady());
- ASSERT(!optNotif->isReady());
+ ASSERT(!future.isReady());
// Destroys the ScopedCollectionDescription object and causes the destructor of MetadataManager
// to run, which should trigger all deletion notifications.
@@ -275,7 +277,7 @@ TEST_F(MetadataManagerTest, CleanupNotificationsAreSignaledWhenMetadataManagerIs
}
notif.wait();
- optNotif->wait();
+ future.wait();
}
TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationSinglePending) {
@@ -327,6 +329,7 @@ TEST_F(MetadataManagerTest, BeginReceiveWithOverlappingRange) {
// Tests membership functions for _rangesToClean
TEST_F(MetadataManagerTest, RangesToCleanMembership) {
+ RAIIServerParameterControllerForTest enableFeatureFlag{"featureFlagRangeDeleterService", false};
ChunkRange cr(BSON("key" << 0), BSON("key" << 10));
const auto task =
insertRangeDeletionTask(operationContext(), kNss, _manager->getCollectionUuid(), cr, 0);
diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp
index feaf7252cc2..febea6e0fc8 100644
--- a/src/mongo/db/s/migration_coordinator.cpp
+++ b/src/mongo/db/s/migration_coordinator.cpp
@@ -29,7 +29,10 @@
#include "mongo/db/s/migration_coordinator.h"
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/migration_util.h"
+#include "mongo/db/s/range_deleter_service.h"
#include "mongo/db/s/range_deletion_task_gen.h"
#include "mongo/db/s/range_deletion_util.h"
#include "mongo/db/session/logical_session_id_helpers.h"
@@ -142,7 +145,8 @@ void MigrationCoordinator::setMigrationDecision(DecisionEnum decision) {
}
-boost::optional<SemiFuture<void>> MigrationCoordinator::completeMigration(OperationContext* opCtx) {
+boost::optional<SharedSemiFuture<void>> MigrationCoordinator::completeMigration(
+ OperationContext* opCtx) {
auto decision = _migrationInfo.getDecision();
if (!decision) {
LOGV2(
@@ -165,7 +169,7 @@ boost::optional<SemiFuture<void>> MigrationCoordinator::completeMigration(Operat
launchReleaseRecipientCriticalSection(opCtx);
}
- boost::optional<SemiFuture<void>> cleanupCompleteFuture = boost::none;
+ boost::optional<SharedSemiFuture<void>> cleanupCompleteFuture = boost::none;
switch (*decision) {
case DecisionEnum::kAborted:
@@ -183,7 +187,7 @@ boost::optional<SemiFuture<void>> MigrationCoordinator::completeMigration(Operat
return cleanupCompleteFuture;
}
-SemiFuture<void> MigrationCoordinator::_commitMigrationOnDonorAndRecipient(
+SharedSemiFuture<void> MigrationCoordinator::_commitMigrationOnDonorAndRecipient(
OperationContext* opCtx) {
hangBeforeMakingCommitDecisionDurable.pauseWhileSet();
@@ -233,19 +237,6 @@ SemiFuture<void> MigrationCoordinator::_commitMigrationOnDonorAndRecipient(
_migrationInfo.getRange(),
_migrationInfo.getId());
- LOGV2_DEBUG(23897,
- 2,
- "Marking range deletion task on donor as ready for processing",
- "migrationId"_attr = _migrationInfo.getId());
- migrationutil::markAsReadyRangeDeletionTaskLocally(
- opCtx, _migrationInfo.getCollectionUuid(), _migrationInfo.getRange());
-
- // At this point the decision cannot be changed and will be recovered in the event of a
- // failover, so it is safe to schedule the deletion task after updating the persisted state.
- LOGV2_DEBUG(23898,
- 2,
- "Scheduling range deletion task on donor",
- "migrationId"_attr = _migrationInfo.getId());
RangeDeletionTask deletionTask(_migrationInfo.getId(),
_migrationInfo.getNss(),
_migrationInfo.getCollectionUuid(),
@@ -254,7 +245,52 @@ SemiFuture<void> MigrationCoordinator::_commitMigrationOnDonorAndRecipient(
_waitForDelete ? CleanWhenEnum::kNow : CleanWhenEnum::kDelayed);
const auto currentTime = VectorClock::get(opCtx)->getTime();
deletionTask.setTimestamp(currentTime.clusterTime().asTimestamp());
- return migrationutil::submitRangeDeletionTask(opCtx, deletionTask).semi();
+
+ if (!feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) {
+ LOGV2_DEBUG(23897,
+ 2,
+ "Marking range deletion task on donor as ready for processing",
+ "migrationId"_attr = _migrationInfo.getId());
+ migrationutil::markAsReadyRangeDeletionTaskLocally(
+ opCtx, _migrationInfo.getCollectionUuid(), _migrationInfo.getRange());
+
+ // At this point the decision cannot be changed and will be recovered in the event of a
+ // failover, so it is safe to schedule the deletion task after updating the persisted state.
+ LOGV2_DEBUG(23898,
+ 2,
+ "Scheduling range deletion task on donor",
+ "migrationId"_attr = _migrationInfo.getId());
+
+ return migrationutil::submitRangeDeletionTask(opCtx, deletionTask).share();
+ }
+
+
+ auto waitForActiveQueriesToComplete = [&]() {
+ AutoGetCollection autoColl(opCtx, deletionTask.getNss(), MODE_IS);
+ return CollectionShardingRuntime::get(opCtx, deletionTask.getNss())
+ ->getOngoingQueriesCompletionFuture(deletionTask.getCollectionUuid(),
+ deletionTask.getRange())
+ .semi();
+ }();
+
+ // Register the range deletion task as pending in order to get the completion future
+ auto rangeDeletionCompletionFuture =
+ RangeDeleterService::get(opCtx)->registerTask(deletionTask,
+ std::move(waitForActiveQueriesToComplete),
+ false /* fromStepUp*/,
+ true /* pending */);
+
+ LOGV2_DEBUG(6555800,
+ 2,
+ "Marking range deletion task on donor as ready for processing",
+ "rangeDeletion"_attr = deletionTask);
+
+ // Mark the range deletion task document as non-pending in order to unblock the previously
+ // registered range deletion
+ migrationutil::markAsReadyRangeDeletionTaskLocally(
+ opCtx, deletionTask.getCollectionUuid(), deletionTask.getRange());
+
+ return rangeDeletionCompletionFuture;
}
void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext* opCtx) {
diff --git a/src/mongo/db/s/migration_coordinator.h b/src/mongo/db/s/migration_coordinator.h
index 5c24400ef7c..da0afc0a309 100644
--- a/src/mongo/db/s/migration_coordinator.h
+++ b/src/mongo/db/s/migration_coordinator.h
@@ -89,7 +89,7 @@ public:
* If the decision was to commit, returns a future that is set when range deletion for
* the donated range completes.
*/
- boost::optional<SemiFuture<void>> completeMigration(OperationContext* opCtx);
+ boost::optional<SharedSemiFuture<void>> completeMigration(OperationContext* opCtx);
/**
* Deletes the persistent state for this migration from config.migrationCoordinators.
@@ -109,7 +109,7 @@ private:
* the donor as ready to be processed. Returns a future that is set when range deletion for
* the donated range completes.
*/
- SemiFuture<void> _commitMigrationOnDonorAndRecipient(OperationContext* opCtx);
+ SharedSemiFuture<void> _commitMigrationOnDonorAndRecipient(OperationContext* opCtx);
/**
* Deletes the range deletion task from the donor node and marks the range deletion task on the
diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h
index 756d1828014..b7819432c49 100644
--- a/src/mongo/db/s/migration_source_manager.h
+++ b/src/mongo/db/s/migration_source_manager.h
@@ -290,7 +290,7 @@ private:
// Optional future that is populated if the migration succeeds and range deletion is scheduled
// on this node. The future is set when the range deletion completes. Used if the moveChunk was
// sent with waitForDelete.
- boost::optional<SemiFuture<void>> _cleanupCompleteFuture;
+ boost::optional<SharedSemiFuture<void>> _cleanupCompleteFuture;
};
} // namespace mongo
diff --git a/src/mongo/db/s/migration_util_test.cpp b/src/mongo/db/s/migration_util_test.cpp
index ee6e189178d..d023db60ab1 100644
--- a/src/mongo/db/s/migration_util_test.cpp
+++ b/src/mongo/db/s/migration_util_test.cpp
@@ -490,6 +490,8 @@ public:
CatalogCacheLoaderMock* _mockCatalogCacheLoader;
StaticCatalogClient* _mockCatalogClient;
+
+ RAIIServerParameterControllerForTest enableFeatureFlag{"featureFlagRangeDeleterService", false};
};
TEST_F(SubmitRangeDeletionTaskTest,
diff --git a/src/mongo/db/s/range_deleter_service.cpp b/src/mongo/db/s/range_deleter_service.cpp
index dd94e9be392..267905c0632 100644
--- a/src/mongo/db/s/range_deleter_service.cpp
+++ b/src/mongo/db/s/range_deleter_service.cpp
@@ -427,7 +427,8 @@ long long RangeDeleterService::totalNumOfRegisteredTasks() {
SharedSemiFuture<void> RangeDeleterService::registerTask(
const RangeDeletionTask& rdt,
SemiFuture<void>&& waitForActiveQueriesToComplete,
- bool fromResubmitOnStepUp) {
+ bool fromResubmitOnStepUp,
+ bool pending) {
if (disableResumableRangeDeleter.load()) {
LOGV2_INFO(6872509,
@@ -439,10 +440,13 @@ SharedSemiFuture<void> RangeDeleterService::registerTask(
.share();
}
- auto scheduleRangeDeletionChain = [&]() {
- // Step 1: wait for ongoing queries retaining the range to drain
- (void)std::move(waitForActiveQueriesToComplete)
- .thenRunOn(_executor)
+ auto scheduleRangeDeletionChain = [&](SharedSemiFuture<void> pendingFuture) {
+ (void)pendingFuture.thenRunOn(_executor)
+ .then([this,
+ waitForOngoingQueries = std::move(waitForActiveQueriesToComplete).share()]() {
+ // Step 1: wait for ongoing queries retaining the range to drain
+ return waitForOngoingQueries;
+ })
.then([this, when = rdt.getWhenToClean()]() {
// Step 2: schedule wait for secondaries orphans cleanup delay
const auto delayForActiveQueriesOnSecondariesToComplete =
@@ -467,19 +471,23 @@ SharedSemiFuture<void> RangeDeleterService::registerTask(
auto lock =
fromResubmitOnStepUp ? _acquireMutexUnconditionally() : _acquireMutexFailIfServiceNotUp();
+
auto [registeredTask, firstRegistration] =
_rangeDeletionTasks[rdt.getCollectionUuid()].insert(std::make_shared<RangeDeletion>(rdt));
+ auto task = static_cast<RangeDeletion*>(registeredTask->get());
+
+ // Register the task on the service only once, duplicate registrations will join
if (firstRegistration) {
- scheduleRangeDeletionChain();
- } else {
- LOGV2_WARNING(6804200,
- "Tried to register duplicate range deletion task. This results in a no-op.",
- "collectionUUID"_attr = rdt.getCollectionUuid(),
- "range"_attr = rdt.getRange());
+ scheduleRangeDeletionChain(task->getPendingFuture());
+ }
+
+ // Allow future chain to progress in case the task is flagged as non-pending
+ if (!pending) {
+ task->clearPending();
}
- return static_cast<RangeDeletion*>(registeredTask->get())->getCompletionFuture();
+ return task->getCompletionFuture();
}
void RangeDeleterService::deregisterTask(const UUID& collUUID, const ChunkRange& range) {
diff --git a/src/mongo/db/s/range_deleter_service.h b/src/mongo/db/s/range_deleter_service.h
index 2b8293805e8..2c772053dc6 100644
--- a/src/mongo/db/s/range_deleter_service.h
+++ b/src/mongo/db/s/range_deleter_service.h
@@ -64,6 +64,16 @@ private:
}
}
+ SharedSemiFuture<void> getPendingFuture() {
+ return _pendingPromise.getFuture();
+ }
+
+ void clearPending() {
+ if (!_pendingPromise.getFuture().isReady()) {
+ _pendingPromise.emplaceValue();
+ }
+ }
+
SharedSemiFuture<void> getCompletionFuture() const {
return _completionPromise.getFuture().semi().share();
}
@@ -75,6 +85,8 @@ private:
private:
// Marked ready once the range deletion has been fully processed
SharedPromise<void> _completionPromise;
+
+ SharedPromise<void> _pendingPromise;
};
/*
@@ -98,8 +110,8 @@ private:
*/
class ReadyRangeDeletionsProcessor {
public:
- ReadyRangeDeletionsProcessor(OperationContext* opCtx)
- : _thread(stdx::thread([this] { _runRangeDeletions(); })) {
+ ReadyRangeDeletionsProcessor(OperationContext* opCtx) {
+ _thread = stdx::thread([this] { _runRangeDeletions(); });
stdx::unique_lock<Latch> lock(_mutex);
opCtx->waitForConditionOrInterrupt(
_condVar, lock, [&] { return _threadOpCtxHolder.is_initialized(); });
@@ -149,12 +161,8 @@ private:
*/
void _runRangeDeletions();
- /* Queue containing scheduled range deletions */
- std::queue<RangeDeletionTask> _queue;
- /* Thread consuming the range deletions queue */
- stdx::thread _thread;
- /* Pointer to the (one and only) operation context used by the thread */
- boost::optional<ServiceContext::UniqueOperationContext> _threadOpCtxHolder;
+ Mutex _mutex = MONGO_MAKE_LATCH("ReadyRangeDeletionsProcessor");
+
/*
* Condition variable notified when:
* - The component has been initialized (the operation context has been instantiated)
@@ -163,7 +171,14 @@ private:
*/
stdx::condition_variable _condVar;
- Mutex _mutex = MONGO_MAKE_LATCH("ReadyRangeDeletionsProcessor");
+ /* Queue containing scheduled range deletions */
+ std::queue<RangeDeletionTask> _queue;
+
+ /* Pointer to the (one and only) operation context used by the thread */
+ boost::optional<ServiceContext::UniqueOperationContext> _threadOpCtxHolder;
+
+ /* Thread consuming the range deletions queue */
+ stdx::thread _thread;
};
// Keeping track of per-collection registered range deletion tasks
@@ -203,12 +218,18 @@ public:
* Register a task on the range deleter service.
* Returns a future that will be marked ready once the range deletion will be completed.
*
- * In case of trying to register an already existing task, the future will contain an error.
+ * In case of trying to register an already existing task, the original future will be returned.
+ *
+ * A task can be registered only if the service is up (except for tasks resubmitted on step-up).
+ *
+ * When a task is registered as `pending`, it can be unblocked by calling again the same method
+ * with `pending=false`.
*/
SharedSemiFuture<void> registerTask(
const RangeDeletionTask& rdt,
SemiFuture<void>&& waitForActiveQueriesToComplete = SemiFuture<void>::makeReady(),
- bool fromResubmitOnStepUp = false);
+ bool fromResubmitOnStepUp = false,
+ bool pending = false);
/*
* Deregister a task from the range deleter service.
diff --git a/src/mongo/db/s/range_deleter_service_test.cpp b/src/mongo/db/s/range_deleter_service_test.cpp
index 5d6b49b7587..69807d52684 100644
--- a/src/mongo/db/s/range_deleter_service_test.cpp
+++ b/src/mongo/db/s/range_deleter_service_test.cpp
@@ -851,4 +851,32 @@ TEST_F(RangeDeleterServiceTest, GetOverlappingRangeDeletionsWithNonContiguousTas
ASSERT_OK(futureReadyWhenTask30Ready.getNoThrow(opCtx));
}
+TEST_F(RangeDeleterServiceTest, RegisterPendingTaskAndMarkItNonPending) {
+ // Set delay for waiting secondary queries to 0
+ int defaultOrphanCleanupDelaySecs = orphanCleanupDelaySecs.load();
+ ScopeGuard reset([=] { orphanCleanupDelaySecs.store(defaultOrphanCleanupDelaySecs); });
+ orphanCleanupDelaySecs.store(0);
+
+ auto rds = RangeDeleterService::get(opCtx);
+ auto taskWithOngoingQueries = rangeDeletionTask0ForCollA;
+
+ // Drain queries since the beginning
+ taskWithOngoingQueries->drainOngoingQueries();
+
+ // Register task as pending (will not be processed until someone registers it again as !pending)
+ auto completionFuture = rds->registerTask(taskWithOngoingQueries->getTask(),
+ taskWithOngoingQueries->getOngoingQueriesFuture(),
+ false /* from step up*/,
+ true /* pending */);
+
+ ASSERT(!completionFuture.isReady());
+
+ // Re-registering the task as non-pending must unblock the range deletion
+ registerAndCreatePersistentTask(
+ opCtx, taskWithOngoingQueries->getTask(), taskWithOngoingQueries->getOngoingQueriesFuture())
+ .get(opCtx);
+
+ ASSERT(completionFuture.isReady());
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/range_deleter_service_test_util.cpp b/src/mongo/db/s/range_deleter_service_test_util.cpp
index 607f51abbf8..fde50854c74 100644
--- a/src/mongo/db/s/range_deleter_service_test_util.cpp
+++ b/src/mongo/db/s/range_deleter_service_test_util.cpp
@@ -85,15 +85,15 @@ std::shared_ptr<RangeDeletionWithOngoingQueries> createRangeDeletionTaskWithOngo
createRangeDeletionTask(collectionUUID, min, max, whenToClean, pending, keyPattern));
}
-// TODO review this method: the task may be registered, finish and be recreated by inserting the
-// document
SharedSemiFuture<void> registerAndCreatePersistentTask(
OperationContext* opCtx,
const RangeDeletionTask& rdt,
SemiFuture<void>&& waitForActiveQueriesToComplete) {
auto rds = RangeDeleterService::get(opCtx);
- auto completionFuture = rds->registerTask(rdt, std::move(waitForActiveQueriesToComplete));
+ // Register task as `pending` in order to block it until the persistent document is non-pending
+ auto completionFuture = rds->registerTask(
+ rdt, std::move(waitForActiveQueriesToComplete), false /* fromStepUp */, true /* pending*/);
// Range deletion task will only proceed if persistent doc exists and its `pending` field
// doesn't exist
diff --git a/src/mongo/db/s/sharding_server_status.cpp b/src/mongo/db/s/sharding_server_status.cpp
index a765d9fd7e4..c47bfd6388e 100644
--- a/src/mongo/db/s/sharding_server_status.cpp
+++ b/src/mongo/db/s/sharding_server_status.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/commands/server_status.h"
#include "mongo/db/s/active_migrations_registry.h"
#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/s/range_deleter_service.h"
#include "mongo/db/s/sharding_data_transform_cumulative_metrics.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/sharding_statistics.h"
@@ -125,6 +126,11 @@ public:
ShardingStatistics::get(opCtx).report(&result);
catalogCache->report(&result);
+ if (mongo::feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) {
+ auto nRangeDeletions = RangeDeleterService::get(opCtx)->totalNumOfRegisteredTasks();
+ result.appendNumber("rangeDeleterTasks", nRangeDeletions);
+ }
+
CollectionShardingState::appendInfoForServerStatus(opCtx, &result);
}
diff --git a/src/mongo/s/sharding_feature_flags.idl b/src/mongo/s/sharding_feature_flags.idl
index 917172718d0..218702b0496 100644
--- a/src/mongo/s/sharding_feature_flags.idl
+++ b/src/mongo/s/sharding_feature_flags.idl
@@ -54,7 +54,8 @@ feature_flags:
featureFlagRangeDeleterService:
description: "Feature flag protecting instantiation and usages of the range deleter service"
cpp_varname: feature_flags::gRangeDeleterService
- default: false
+ default: true
+ version: 6.2
featureFlagCollModCoordinatorV3:
description: "Feature for enabling new coll mod coordinator v3"
cpp_varname: feature_flags::gCollModCoordinatorV3