summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPierlauro Sciarelli <pierlauro.sciarelli@mongodb.com>2022-09-20 10:13:29 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-20 11:25:22 +0000
commitbdeb7d80139abd91d3db4eaa29df796375096ec4 (patch)
tree757278dd25166860d7228a149364975aa0400201 /src
parent4d58e73a530c35a43c62cebb0d827f6cd03e8a29 (diff)
downloadmongo-bdeb7d80139abd91d3db4eaa29df796375096ec4.tar.gz
SERVER-68725 Plug actual deletion in the range deleter service
Co-authored-by: Pierlauro Sciarelli <pierlauro.sciarelli@mongodb.com> Co-authored-by: Silvia Surroca <silvia.surroca@mongodb.com>
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/migration_coordinator.cpp3
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp6
-rw-r--r--src/mongo/db/s/migration_util.cpp24
-rw-r--r--src/mongo/db/s/migration_util.h11
-rw-r--r--src/mongo/db/s/migration_util_test.cpp5
-rw-r--r--src/mongo/db/s/range_deleter_service.cpp173
-rw-r--r--src/mongo/db/s/range_deleter_service_op_observer_test.cpp60
-rw-r--r--src/mongo/db/s/range_deleter_service_test.cpp400
-rw-r--r--src/mongo/db/s/range_deleter_service_test.h66
-rw-r--r--src/mongo/db/s/range_deleter_service_test_util.cpp177
-rw-r--r--src/mongo/db/s/range_deletion_util.cpp110
-rw-r--r--src/mongo/db/s/range_deletion_util.h59
-rw-r--r--src/mongo/db/s/range_deletion_util_test.cpp3
14 files changed, 770 insertions, 328 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index c1d894535fb..b67524f1834 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -647,6 +647,7 @@ env.CppUnitTest(
'operation_sharding_state_test.cpp',
'persistent_task_queue_test.cpp',
'range_deleter_service_test.cpp',
+ 'range_deleter_service_test_util.cpp',
'range_deleter_service_op_observer_test.cpp',
'range_deletion_util_test.cpp',
'resharding/resharding_agg_test.cpp',
diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp
index 57ad09a9ded..5844d30c3a5 100644
--- a/src/mongo/db/s/migration_coordinator.cpp
+++ b/src/mongo/db/s/migration_coordinator.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/s/migration_util.h"
#include "mongo/db/s/range_deletion_task_gen.h"
+#include "mongo/db/s/range_deletion_util.h"
#include "mongo/db/session/logical_session_id_helpers.h"
#include "mongo/db/vector_clock_mutable.h"
#include "mongo/logv2/log.h"
@@ -215,7 +216,7 @@ SemiFuture<void> MigrationCoordinator::_commitMigrationOnDonorAndRecipient(
const auto numOrphans = migrationutil::retrieveNumOrphansFromRecipient(opCtx, _migrationInfo);
if (numOrphans > 0) {
- migrationutil::persistUpdatedNumOrphans(
+ persistUpdatedNumOrphans(
opCtx, _migrationInfo.getCollectionUuid(), _migrationInfo.getRange(), numOrphans);
}
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index fee3a2f796b..72ee678b386 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -58,6 +58,7 @@
#include "mongo/db/s/move_timing_helper.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/range_deletion_task_gen.h"
+#include "mongo/db/s/range_deletion_util.h"
#include "mongo/db/s/shard_filtering_metadata_refresh.h"
#include "mongo/db/s/sharding_recovery_service.h"
#include "mongo/db/s/sharding_runtime_d_params_gen.h"
@@ -1379,7 +1380,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx,
// Revert to the original DocumentValidationSettings for opCtx
}
- migrationutil::persistUpdatedNumOrphans(
+ persistUpdatedNumOrphans(
opCtx, *_collectionUuid, ChunkRange(_min, _max), batchNumCloned);
{
@@ -1827,8 +1828,7 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, const
}
if (changeInOrphans != 0) {
- migrationutil::persistUpdatedNumOrphans(
- opCtx, *_collectionUuid, ChunkRange(_min, _max), changeInOrphans);
+ persistUpdatedNumOrphans(opCtx, *_collectionUuid, ChunkRange(_min, _max), changeInOrphans);
}
return didAnything;
}
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index 55083911f44..d3110cb8f13 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -643,30 +643,6 @@ void persistRangeDeletionTaskLocally(OperationContext* opCtx,
}
}
-void persistUpdatedNumOrphans(OperationContext* opCtx,
- const UUID& collectionUuid,
- const ChunkRange& range,
- long long changeInOrphans) {
- const auto query = getQueryFilterForRangeDeletionTask(collectionUuid, range);
- try {
- PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
- ScopedRangeDeleterLock rangeDeleterLock(opCtx, collectionUuid);
- // The DBDirectClient will not retry WriteConflictExceptions internally while holding an X
- // mode lock, so we need to retry at this level.
- writeConflictRetry(
- opCtx, "updateOrphanCount", NamespaceString::kRangeDeletionNamespace.ns(), [&] {
- store.update(opCtx,
- query,
- BSON("$inc" << BSON(RangeDeletionTask::kNumOrphanDocsFieldName
- << changeInOrphans)),
- WriteConcerns::kLocalWriteConcern);
- });
- BalancerStatsRegistry::get(opCtx)->updateOrphansCount(collectionUuid, changeInOrphans);
- } catch (const ExceptionFor<ErrorCodes::NoMatchingDocument>&) {
- // When upgrading or downgrading, there may be no documents with the orphan count field.
- }
-}
-
long long retrieveNumOrphansFromRecipient(OperationContext* opCtx,
const MigrationCoordinatorDocument& migrationInfo) {
const auto recipientShard = uassertStatusOK(
diff --git a/src/mongo/db/s/migration_util.h b/src/mongo/db/s/migration_util.h
index 7ac9da2a9b2..df5ba9a15b6 100644
--- a/src/mongo/db/s/migration_util.h
+++ b/src/mongo/db/s/migration_util.h
@@ -47,8 +47,6 @@ class ShardId;
namespace migrationutil {
-constexpr auto kRangeDeletionThreadName = "range-deleter"_sd;
-
/**
* Creates a report document with the provided parameters:
*
@@ -138,15 +136,6 @@ void persistRangeDeletionTaskLocally(OperationContext* opCtx,
const WriteConcernOptions& writeConcern);
/**
- * Updates the range deletion task document to increase or decrease numOrphanedDocs and waits for
- * write concern.
- */
-void persistUpdatedNumOrphans(OperationContext* opCtx,
- const UUID& collectionUuid,
- const ChunkRange& range,
- long long changeInOrphans);
-
-/**
* Retrieves the value of 'numOrphanedDocs' from the recipient shard's range deletion task document.
*/
long long retrieveNumOrphansFromRecipient(OperationContext* opCtx,
diff --git a/src/mongo/db/s/migration_util_test.cpp b/src/mongo/db/s/migration_util_test.cpp
index 2807b26ba88..ee6e189178d 100644
--- a/src/mongo/db/s/migration_util_test.cpp
+++ b/src/mongo/db/s/migration_util_test.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/s/collection_sharding_runtime_test.cpp"
#include "mongo/db/s/migration_util.h"
#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/s/range_deletion_util.h"
#include "mongo/db/s/shard_filtering_metadata_refresh.h"
#include "mongo/db/s/shard_server_catalog_cache_loader.h"
#include "mongo/db/s/shard_server_test_fixture.h"
@@ -344,11 +345,11 @@ TEST_F(MigrationUtilsTest, TestUpdateNumberOfOrphans) {
auto rangeDeletionDoc = createDeletionTask(opCtx, kTestNss, collectionUuid, 0, 10);
store.add(opCtx, rangeDeletionDoc);
- migrationutil::persistUpdatedNumOrphans(opCtx, collectionUuid, rangeDeletionDoc.getRange(), 5);
+ persistUpdatedNumOrphans(opCtx, collectionUuid, rangeDeletionDoc.getRange(), 5);
rangeDeletionDoc.setNumOrphanDocs(5);
ASSERT_EQ(store.count(opCtx, rangeDeletionDoc.toBSON().removeField("timestamp")), 1);
- migrationutil::persistUpdatedNumOrphans(opCtx, collectionUuid, rangeDeletionDoc.getRange(), -5);
+ persistUpdatedNumOrphans(opCtx, collectionUuid, rangeDeletionDoc.getRange(), -5);
rangeDeletionDoc.setNumOrphanDocs(0);
ASSERT_EQ(store.count(opCtx, rangeDeletionDoc.toBSON().removeField("timestamp")), 1);
}
diff --git a/src/mongo/db/s/range_deleter_service.cpp b/src/mongo/db/s/range_deleter_service.cpp
index a24e14082f9..ee5ff61f608 100644
--- a/src/mongo/db/s/range_deleter_service.cpp
+++ b/src/mongo/db/s/range_deleter_service.cpp
@@ -28,10 +28,16 @@
*/
#include "mongo/db/s/range_deleter_service.h"
+#include "mongo/db/catalog_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/op_observer/op_observer_registry.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/s/balancer_stats_registry.h"
+#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/range_deleter_service_op_observer.h"
+#include "mongo/db/s/range_deletion_util.h"
+#include "mongo/db/s/shard_filtering_metadata_refresh.h"
#include "mongo/logv2/log.h"
#include "mongo/s/sharding_feature_flags_gen.h"
#include "mongo/util/future_util.h"
@@ -41,8 +47,32 @@
namespace mongo {
namespace {
const auto rangeDeleterServiceDecorator = ServiceContext::declareDecoration<RangeDeleterService>();
+
+const BSONObj getShardKeyPattern(OperationContext* opCtx,
+ const DatabaseName& dbName,
+ const UUID& collectionUuid) {
+ while (true) {
+ opCtx->checkForInterrupt();
+ boost::optional<NamespaceString> optNss;
+ {
+ AutoGetCollection collection(
+ opCtx, NamespaceStringOrUUID{dbName.toString(), collectionUuid}, MODE_IS);
+
+ auto optMetadata = CollectionShardingRuntime::get(opCtx, collection.getNss())
+ ->getCurrentMetadataIfKnown();
+ if (optMetadata && optMetadata->isSharded()) {
+ return optMetadata->getShardKeyPattern().toBSON();
+ }
+ optNss = collection.getNss();
+ }
+
+ onShardVersionMismatchNoExcept(opCtx, *optNss, boost::none).ignore();
+ continue;
+ }
}
+} // namespace
+
const ReplicaSetAwareServiceRegistry::Registerer<RangeDeleterService>
rangeDeleterServiceRegistryRegisterer("RangeDeleterService");
@@ -59,6 +89,13 @@ void RangeDeleterService::onStepUpComplete(OperationContext* opCtx, long long te
return;
}
+ if (disableResumableRangeDeleter.load()) {
+ LOGV2_INFO(
+ 6872508,
+ "Not resuming range deletions on step-up because `disableResumableRangeDeleter=true`");
+ return;
+ }
+
auto lock = _acquireMutexUnconditionally();
dassert(_state.load() == kDown, "Service expected to be down before stepping up");
@@ -236,6 +273,8 @@ SharedSemiFuture<void> RangeDeleterService::registerTask(
bool fromResubmitOnStepUp) {
if (disableResumableRangeDeleter.load()) {
+ LOGV2_INFO(6872509,
+ "Not scheduling range deletion because `disableResumableRangeDeleter=true`");
return SemiFuture<void>::makeReady(
Status(ErrorCodes::ResumableRangeDeleterDisabled,
"Not submitting any range deletion task because the "
@@ -278,12 +317,134 @@ SharedSemiFuture<void> RangeDeleterService::registerTask(
_executor->now() + delayForActiveQueriesOnSecondariesToComplete)
.share();
})
- .then([this, collUuid = rdt.getCollectionUuid(), range = rdt.getRange()]() {
- // Step 3: perform the actual range deletion
- // TODO
-
- // Deregister the task
- deregisterTask(collUuid, range);
+ .then([this,
+ dbName = rdt.getNss().dbName(),
+ collectionUuid = rdt.getCollectionUuid(),
+ range = rdt.getRange()]() {
+ return withTemporaryOperationContext(
+ [&](OperationContext* opCtx) {
+ // A task is considered completed when all the following conditions are met:
+ // - All orphans have been deleted
+ // - The deletions have been majority committed
+ // - The range deletion task document has been deleted
+ bool taskCompleted = false;
+
+ while (!taskCompleted) {
+ try {
+ // Perform the actual range deletion
+ bool orphansRemovalCompleted = false;
+ while (!orphansRemovalCompleted) {
+ try {
+ LOGV2_DEBUG(
+ 6872501,
+ 2,
+ "Beginning deletion of documents in orphan range",
+ "dbName"_attr = dbName,
+ "collectionUUID"_attr = collectionUuid.toString(),
+ "range"_attr = redact(range.toString()));
+
+ auto shardKeyPattern =
+ getShardKeyPattern(opCtx, dbName, collectionUuid);
+
+ uassertStatusOK(deleteRangeInBatches(
+ opCtx, dbName, collectionUuid, shardKeyPattern, range));
+ orphansRemovalCompleted = true;
+ } catch (ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
+ // No orphaned documents to remove from a dropped collection
+ orphansRemovalCompleted = true;
+ } catch (
+ ExceptionFor<
+ ErrorCodes::
+ RangeDeletionAbandonedBecauseTaskDocumentDoesNotExist>&) {
+ // No orphaned documents to remove from a dropped collection
+ orphansRemovalCompleted = true;
+ } catch (
+ ExceptionFor<
+ ErrorCodes::
+ RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist>&) {
+ // The task can be considered completed because the range
+ // deletion document doesn't exist
+ orphansRemovalCompleted = true;
+ } catch (const DBException& e) {
+ LOGV2_ERROR(6872502,
+ "Failed to delete documents in orphan range",
+ "dbName"_attr = dbName,
+ "collectionUUID"_attr =
+ collectionUuid.toString(),
+ "range"_attr = redact(range.toString()),
+ "error"_attr = e);
+ throw;
+ }
+ }
+
+ {
+ repl::ReplClientInfo::forClient(opCtx->getClient())
+ .setLastOpToSystemLastOpTime(opCtx);
+ auto clientOpTime =
+ repl::ReplClientInfo::forClient(opCtx->getClient())
+ .getLastOp();
+
+ LOGV2_DEBUG(
+ 6872503,
+ 2,
+ "Waiting for majority replication of local deletions",
+ "dbName"_attr = dbName,
+ "collectionUUID"_attr = collectionUuid,
+ "range"_attr = redact(range.toString()),
+ "clientOpTime"_attr = clientOpTime);
+
+ // Synchronously wait for majority before removing the range
+ // deletion task document: oplog gets applied in parallel for
+ // different collections, so it's important not to apply
+ // out of order the deletions of orphans and the removal of the
+ // entry persisted in `config.rangeDeletions`
+ WaitForMajorityService::get(opCtx->getServiceContext())
+ .waitUntilMajority(clientOpTime,
+ CancellationToken::uncancelable())
+ .get(opCtx);
+ }
+
+ // Remove persistent range deletion task
+ try {
+ removePersistentRangeDeletionTask(opCtx, collectionUuid, range);
+
+ LOGV2_DEBUG(
+ 6872504,
+ 2,
+ "Completed removal of persistent range deletion task",
+ "dbName"_attr = dbName,
+ "collectionUUID"_attr = collectionUuid.toString(),
+ "range"_attr = redact(range.toString()));
+
+ } catch (const DBException& e) {
+ LOGV2_ERROR(6872505,
+ "Failed to remove persistent range deletion task",
+ "dbName"_attr = dbName,
+ "collectionUUID"_attr = collectionUuid.toString(),
+ "range"_attr = redact(range.toString()),
+ "error"_attr = e);
+ throw;
+ }
+ } catch (const DBException& e) {
+ // Fail in case of shutdown/stepdown errors as the range
+ // deletion will be resumed on the next step up
+ if (ErrorCodes::isShutdownError(e.code()) ||
+ ErrorCodes::isNotPrimaryError(e.code())) {
+ return e.toStatus();
+ }
+
+ // Iterate again in case of any other error
+ continue;
+ }
+
+ taskCompleted = true;
+ }
+
+ return Status::OK();
+ },
+ dbName,
+ collectionUuid,
+ true);
})
// IMPORTANT: no continuation should be added to this chain after this point
// in order to make sure range deletions order is preserved.
diff --git a/src/mongo/db/s/range_deleter_service_op_observer_test.cpp b/src/mongo/db/s/range_deleter_service_op_observer_test.cpp
index 09fa8e3eb91..3d7d4767bf9 100644
--- a/src/mongo/db/s/range_deleter_service_op_observer_test.cpp
+++ b/src/mongo/db/s/range_deleter_service_op_observer_test.cpp
@@ -32,66 +32,6 @@
namespace mongo {
-void insertRangeDeletionTaskDocument(OperationContext* opCtx, const RangeDeletionTask& rdt) {
- PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
- store.add(opCtx, rdt);
-}
-
-void updatePendingField(OperationContext* opCtx, UUID migrationId, bool pending) {
- PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
- store.update(opCtx,
- BSON(RangeDeletionTask::kIdFieldName << migrationId),
- BSON("$set" << BSON(RangeDeletionTask::kPendingFieldName << pending)));
-}
-
-void removePendingField(OperationContext* opCtx, UUID migrationId) {
- PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
- store.update(opCtx,
- BSON(RangeDeletionTask::kIdFieldName << migrationId),
- BSON("$unset" << BSON(RangeDeletionTask::kPendingFieldName << "")));
-}
-
-void deleteRangeDeletionTaskDocument(OperationContext* opCtx, UUID migrationId) {
- PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
- store.remove(opCtx, BSON(RangeDeletionTask::kIdFieldName << migrationId));
-}
-
-// Ensure that `expectedChunkRanges` range deletion tasks are scheduled for collection with UUID
-// `uuidColl`
-void verifyRangeDeletionTasks(OperationContext* opCtx,
- UUID uuidColl,
- std::vector<ChunkRange> expectedChunkRanges) {
- auto rds = RangeDeleterService::get(opCtx);
-
- // Get chunk ranges inserted to be deleted by RangeDeleterService
- BSONObj dumpState = rds->dumpState();
- BSONElement chunkRangesElem = dumpState.getField(uuidColl.toString());
- if (!chunkRangesElem.ok() && expectedChunkRanges.size() == 0) {
- return;
- }
- ASSERT(chunkRangesElem.ok()) << "Expected to find range deletion tasks from collection "
- << uuidColl.toString();
-
- const auto chunkRanges = chunkRangesElem.Array();
- ASSERT_EQ(chunkRanges.size(), expectedChunkRanges.size());
-
- // Sort expectedChunkRanges vector to replicate RangeDeleterService dumpState order
- struct {
- bool operator()(const ChunkRange& a, const ChunkRange& b) {
- return a.getMin().woCompare(b.getMin()) < 0;
- }
- } RANGES_COMPARATOR;
-
- std::sort(expectedChunkRanges.begin(), expectedChunkRanges.end(), RANGES_COMPARATOR);
-
- // Check expectedChunkRanges are exactly the same as the returned ones
- for (size_t i = 0; i < expectedChunkRanges.size(); ++i) {
- ASSERT(ChunkRange::fromBSONThrowing(chunkRanges[i].Obj()) == expectedChunkRanges[i])
- << "Expected " << ChunkRange::fromBSONThrowing(chunkRanges[i].Obj()).toBSON()
- << " == " << expectedChunkRanges[i].toBSON();
- }
-}
-
/**
** TESTS
*/
diff --git a/src/mongo/db/s/range_deleter_service_test.cpp b/src/mongo/db/s/range_deleter_service_test.cpp
index 6d3d49eced0..8d465cfc34d 100644
--- a/src/mongo/db/s/range_deleter_service_test.cpp
+++ b/src/mongo/db/s/range_deleter_service_test.cpp
@@ -32,6 +32,8 @@
#include "mongo/db/catalog/create_collection.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/persistent_task_store.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/operation_sharding_state.h"
namespace mongo {
@@ -41,6 +43,7 @@ namespace mongo {
*/
void RangeDeleterServiceTest::setUp() {
ShardServerTestFixture::setUp();
+ WaitForMajorityService::get(getServiceContext()).startup(getServiceContext());
opCtx = operationContext();
RangeDeleterService::get(opCtx)->onStepUpComplete(opCtx, 0L);
RangeDeleterService::get(opCtx)->_waitForRangeDeleterServiceUp_FOR_TESTING();
@@ -57,68 +60,67 @@ void RangeDeleterServiceTest::setUp() {
{
AutoGetCollection autoColl(opCtx, nsCollA, MODE_IX);
uuidCollA = autoColl.getCollection()->uuid();
+ nssWithUuid[uuidCollA] = nsCollA;
+ _setFilteringMetadataWithUUID(opCtx, uuidCollA);
}
{
AutoGetCollection autoColl(opCtx, nsCollB, MODE_IX);
uuidCollB = autoColl.getCollection()->uuid();
+ nssWithUuid[uuidCollB] = nsCollB;
+ _setFilteringMetadataWithUUID(opCtx, uuidCollB);
}
- rangeDeletionTask0ForCollA =
- createRangeDeletionTaskWithOngoingQueries(uuidCollA, BSON("a" << 0), BSON("a" << 10));
- rangeDeletionTask1ForCollA =
- createRangeDeletionTaskWithOngoingQueries(uuidCollA, BSON("a" << 10), BSON("a" << 20));
- rangeDeletionTask0ForCollB =
- createRangeDeletionTaskWithOngoingQueries(uuidCollB, BSON("a" << 0), BSON("a" << 10));
+ rangeDeletionTask0ForCollA = createRangeDeletionTaskWithOngoingQueries(
+ uuidCollA, BSON(kShardKey << 0), BSON(kShardKey << 10));
+ rangeDeletionTask1ForCollA = createRangeDeletionTaskWithOngoingQueries(
+ uuidCollA, BSON(kShardKey << 10), BSON(kShardKey << 20));
+ rangeDeletionTask0ForCollB = createRangeDeletionTaskWithOngoingQueries(
+ uuidCollB, BSON(kShardKey << 0), BSON(kShardKey << 10));
}
void RangeDeleterServiceTest::tearDown() {
RangeDeleterService::get(opCtx)->onStepDown();
RangeDeleterService::get(opCtx)->onShutdown();
+ WaitForMajorityService::get(opCtx->getServiceContext()).shutDown();
ShardServerTestFixture::tearDown();
}
-RangeDeletionTask RangeDeleterServiceTest::createRangeDeletionTask(const UUID& collectionUUID,
- const BSONObj& min,
- const BSONObj& max,
- CleanWhenEnum whenToClean,
- bool pending) {
- RangeDeletionTask rdt;
- rdt.setId(UUID::gen());
- rdt.setNss(NamespaceString("test.mock"));
- rdt.setDonorShardId(ShardId("shard0"));
- rdt.setCollectionUuid(collectionUUID);
- rdt.setRange(ChunkRange(min, max));
- rdt.setWhenToClean(whenToClean);
- rdt.setPending(pending);
- return rdt;
-}
-
-std::shared_ptr<RangeDeletionWithOngoingQueries>
-RangeDeleterServiceTest::createRangeDeletionTaskWithOngoingQueries(const UUID& collectionUUID,
- const BSONObj& min,
- const BSONObj& max,
- CleanWhenEnum whenToClean,
- bool pending) {
- return std::make_shared<RangeDeletionWithOngoingQueries>(
- createRangeDeletionTask(collectionUUID, min, max, whenToClean, pending));
-}
-
-/**
- * RangeDeletionWithOngoingQueries implementation
- */
-RangeDeletionWithOngoingQueries::RangeDeletionWithOngoingQueries(const RangeDeletionTask& t)
- : _task(t) {}
-
-RangeDeletionTask RangeDeletionWithOngoingQueries::getTask() {
- return _task;
-}
-
-void RangeDeletionWithOngoingQueries::drainOngoingQueries() {
- _ongoingQueries.setFrom(Status::OK());
-}
-
-auto RangeDeletionWithOngoingQueries::getOngoingQueriesFuture() {
- return _ongoingQueries.getFuture().semi();
+void RangeDeleterServiceTest::_setFilteringMetadataWithUUID(OperationContext* opCtx,
+ const UUID& uuid) {
+ const OID epoch = OID::gen();
+ NamespaceString nss = nssWithUuid[uuid];
+
+ const CollectionMetadata metadata = [&]() {
+ auto chunk = ChunkType(uuid,
+ ChunkRange{BSON(kShardKey << MINKEY), BSON(kShardKey << MAXKEY)},
+ ChunkVersion({epoch, Timestamp(1, 1)}, {1, 0}),
+ ShardId("this"));
+ ChunkManager cm(ShardId("this"),
+ DatabaseVersion(UUID::gen(), Timestamp(1, 1)),
+ makeStandaloneRoutingTableHistory(
+ RoutingTableHistory::makeNew(nss,
+ uuid,
+ kShardKeyPattern,
+ nullptr,
+ false,
+ epoch,
+ Timestamp(1, 1),
+ boost::none /* timeseriesFields */,
+ boost::none,
+ boost::none /* chunkSizeBytes */,
+ true,
+ {std::move(chunk)})),
+ boost::none);
+
+ return CollectionMetadata(std::move(cm), ShardId("this"));
+ }();
+
+ AutoGetCollection autoColl(opCtx, nss, LockMode::MODE_X);
+
+ CollectionShardingRuntime::get(opCtx, nss)->setFilteringMetadata(opCtx, metadata);
+ auto* css = CollectionShardingState::get(opCtx, nss);
+ auto& csr = *checked_cast<CollectionShardingRuntime*>(css);
+ csr.setFilteringMetadata(opCtx, metadata);
}
/**
@@ -129,8 +131,10 @@ TEST_F(RangeDeleterServiceTest, RegisterAndProcessSingleTask) {
auto rds = RangeDeleterService::get(opCtx);
auto taskWithOngoingQueries = rangeDeletionTask0ForCollA;
- auto completionFuture = rds->registerTask(taskWithOngoingQueries->getTask(),
- taskWithOngoingQueries->getOngoingQueriesFuture());
+ auto completionFuture =
+ registerAndCreatePersistentTask(opCtx,
+ taskWithOngoingQueries->getTask(),
+ taskWithOngoingQueries->getOngoingQueriesFuture());
// The task can't be processed (hence completed) before ongoing queries drain
ASSERT(!completionFuture.isReady());
ASSERT_EQ(1, rds->getNumRangeDeletionTasksForCollection(uuidCollA));
@@ -145,8 +149,10 @@ TEST_F(RangeDeleterServiceTest, RegisterDuplicateTaskForSameRangeReturnsOriginal
auto rds = RangeDeleterService::get(opCtx);
auto taskWithOngoingQueries = rangeDeletionTask0ForCollA;
- auto originalTaskCompletionFuture = rds->registerTask(
- taskWithOngoingQueries->getTask(), taskWithOngoingQueries->getOngoingQueriesFuture());
+ auto originalTaskCompletionFuture =
+ registerAndCreatePersistentTask(opCtx,
+ taskWithOngoingQueries->getTask(),
+ taskWithOngoingQueries->getOngoingQueriesFuture());
// Trying registering a duplicate task must return a future without throwing errors
auto duplicateTaskCompletionFuture =
@@ -171,10 +177,14 @@ TEST_F(RangeDeleterServiceTest, RegisterAndProcessMoreTasksForSameCollection) {
auto task1WithOngoingQueries = rangeDeletionTask1ForCollA;
// Register 2 tasks for the same collection
- auto completionFuture0 = rds->registerTask(task0WithOngoingQueries->getTask(),
- task0WithOngoingQueries->getOngoingQueriesFuture());
- auto completionFuture1 = rds->registerTask(task1WithOngoingQueries->getTask(),
- task1WithOngoingQueries->getOngoingQueriesFuture());
+ auto completionFuture0 =
+ registerAndCreatePersistentTask(opCtx,
+ task0WithOngoingQueries->getTask(),
+ task0WithOngoingQueries->getOngoingQueriesFuture());
+ auto completionFuture1 =
+ registerAndCreatePersistentTask(opCtx,
+ task1WithOngoingQueries->getTask(),
+ task1WithOngoingQueries->getOngoingQueriesFuture());
// The tasks can't be processed (hence completed) before ongoing queries drain
ASSERT(!completionFuture0.isReady());
@@ -199,11 +209,13 @@ TEST_F(RangeDeleterServiceTest, RegisterAndProcessTasksForDifferentCollections)
// Register 1 tasks for `collA` and 1 task for `collB`
auto completionFutureCollA =
- rds->registerTask(taskWithOngoingQueriesCollA->getTask(),
- taskWithOngoingQueriesCollA->getOngoingQueriesFuture());
+ registerAndCreatePersistentTask(opCtx,
+ taskWithOngoingQueriesCollA->getTask(),
+ taskWithOngoingQueriesCollA->getOngoingQueriesFuture());
auto completionFutureCollB =
- rds->registerTask(taskWithOngoingQueriesCollB->getTask(),
- taskWithOngoingQueriesCollB->getOngoingQueriesFuture());
+ registerAndCreatePersistentTask(opCtx,
+ taskWithOngoingQueriesCollB->getTask(),
+ taskWithOngoingQueriesCollB->getOngoingQueriesFuture());
// The tasks can't be processed (hence completed) before ongoing queries drain
ASSERT(!completionFutureCollA.isReady());
@@ -232,12 +244,13 @@ TEST_F(RangeDeleterServiceTest, DelayForSecondaryQueriesIsHonored) {
// Set delay for waiting secondary queries to 2 seconds
orphanCleanupDelaySecs.store(2);
- auto rds = RangeDeleterService::get(opCtx);
auto taskWithOngoingQueries = createRangeDeletionTaskWithOngoingQueries(
- uuidCollA, BSON("a" << 0), BSON("a" << 10), CleanWhenEnum::kDelayed);
+ uuidCollA, BSON(kShardKey << 0), BSON(kShardKey << 10), CleanWhenEnum::kDelayed);
- auto completionFuture = rds->registerTask(taskWithOngoingQueries->getTask(),
- taskWithOngoingQueries->getOngoingQueriesFuture());
+ auto completionFuture =
+ registerAndCreatePersistentTask(opCtx,
+ taskWithOngoingQueries->getTask(),
+ taskWithOngoingQueries->getOngoingQueriesFuture());
// Check that the task lasts at least 2 seconds from the moment ongoing queries drain
auto start = Date_t::now();
@@ -253,12 +266,14 @@ TEST_F(RangeDeleterServiceTest, ScheduledTaskInvalidatedOnStepDown) {
auto rds = RangeDeleterService::get(opCtx);
auto taskWithOngoingQueries = createRangeDeletionTaskWithOngoingQueries(
- uuidCollA, BSON("a" << 0), BSON("a" << 10), CleanWhenEnum::kDelayed);
+ uuidCollA, BSON(kShardKey << 0), BSON(kShardKey << 10), CleanWhenEnum::kDelayed);
// Mark ongoing queries as completed
taskWithOngoingQueries->drainOngoingQueries();
- auto completionFuture = rds->registerTask(taskWithOngoingQueries->getTask(),
- taskWithOngoingQueries->getOngoingQueriesFuture());
+ auto completionFuture =
+ registerAndCreatePersistentTask(opCtx,
+ taskWithOngoingQueries->getTask(),
+ taskWithOngoingQueries->getOngoingQueriesFuture());
// Manually trigger disabling of the service
rds->onStepDown();
@@ -287,12 +302,14 @@ TEST_F(RangeDeleterServiceTest, NoActionPossibleIfServiceIsDown) {
});
auto taskWithOngoingQueries = createRangeDeletionTaskWithOngoingQueries(
- uuidCollA, BSON("a" << 0), BSON("a" << 10), CleanWhenEnum::kDelayed);
+ uuidCollA, BSON(kShardKey << 0), BSON(kShardKey << 10), CleanWhenEnum::kDelayed);
- ASSERT_THROWS_CODE(rds->registerTask(taskWithOngoingQueries->getTask(),
- taskWithOngoingQueries->getOngoingQueriesFuture()),
- DBException,
- ErrorCodes::NotYetInitialized);
+ ASSERT_THROWS_CODE(
+ registerAndCreatePersistentTask(opCtx,
+ taskWithOngoingQueries->getTask(),
+ taskWithOngoingQueries->getOngoingQueriesFuture()),
+ DBException,
+ ErrorCodes::NotYetInitialized);
ASSERT_THROWS_CODE(rds->deregisterTask(taskWithOngoingQueries->getTask().getCollectionUuid(),
taskWithOngoingQueries->getTask().getRange()),
@@ -309,80 +326,84 @@ TEST_F(RangeDeleterServiceTest, NoOverlappingRangeDeletionsFuture) {
auto rds = RangeDeleterService::get(opCtx);
// No range deletion task registered
- ChunkRange inputRange(BSON("a" << 0), BSON("a" << 10));
+ ChunkRange inputRange(BSON(kShardKey << 0), BSON(kShardKey << 10));
auto fut = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange);
ASSERT(fut.isReady());
// Register a range deletion task
- auto taskWithOngoingQueries =
- createRangeDeletionTaskWithOngoingQueries(uuidCollA, BSON("a" << 0), BSON("a" << 10));
- auto completionFuture = rds->registerTask(taskWithOngoingQueries->getTask(),
- taskWithOngoingQueries->getOngoingQueriesFuture());
+ auto taskWithOngoingQueries = createRangeDeletionTaskWithOngoingQueries(
+ uuidCollA, BSON(kShardKey << 0), BSON(kShardKey << 10));
+ auto completionFuture =
+ registerAndCreatePersistentTask(opCtx,
+ taskWithOngoingQueries->getTask(),
+ taskWithOngoingQueries->getOngoingQueriesFuture());
// Totally unrelated range
- inputRange = ChunkRange(BSON("a" << -10), BSON("a" << -3));
+ inputRange = ChunkRange(BSON(kShardKey << -10), BSON(kShardKey << -3));
fut = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange);
ASSERT(fut.isReady());
// Range "touching" lower bound
- inputRange = ChunkRange(BSON("a" << -10), BSON("a" << 0));
+ inputRange = ChunkRange(BSON(kShardKey << -10), BSON(kShardKey << 0));
fut = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange);
ASSERT(fut.isReady());
// Range "touching" upper bound
- inputRange = ChunkRange(BSON("a" << 10), BSON("a" << 20));
+ inputRange = ChunkRange(BSON(kShardKey << 10), BSON(kShardKey << 20));
fut = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange);
ASSERT(fut.isReady());
}
TEST_F(RangeDeleterServiceTest, OneOverlappingRangeDeletionFuture) {
auto rds = RangeDeleterService::get(opCtx);
- auto taskWithOngoingQueries =
- createRangeDeletionTaskWithOngoingQueries(uuidCollA, BSON("a" << 0), BSON("a" << 10));
+ auto taskWithOngoingQueries = createRangeDeletionTaskWithOngoingQueries(
+ uuidCollA, BSON(kShardKey << 0), BSON(kShardKey << 10));
- auto completionFuture = rds->registerTask(taskWithOngoingQueries->getTask(),
- taskWithOngoingQueries->getOngoingQueriesFuture());
+ auto completionFuture =
+ registerAndCreatePersistentTask(opCtx,
+ taskWithOngoingQueries->getTask(),
+ taskWithOngoingQueries->getOngoingQueriesFuture());
std::vector<SharedSemiFuture<void>> waitForRangeToBeDeletedFutures;
// Exact match
- ChunkRange inputRange(BSON("a" << 0), BSON("a" << 10));
+ ChunkRange inputRange(BSON(kShardKey << 0), BSON(kShardKey << 10));
auto fut = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange);
ASSERT(!fut.isReady());
waitForRangeToBeDeletedFutures.push_back(fut);
// Super-range
- inputRange = ChunkRange(BSON("a" << -10), BSON("a" << 20));
+ inputRange = ChunkRange(BSON(kShardKey << -10), BSON(kShardKey << 20));
fut = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange);
ASSERT(!fut.isReady());
waitForRangeToBeDeletedFutures.push_back(fut);
// Super range touching upper bound
- inputRange = ChunkRange(BSON("a" << -10), BSON("a" << 10));
+ inputRange = ChunkRange(BSON(kShardKey << -10), BSON(kShardKey << 10));
fut = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange);
ASSERT(!fut.isReady());
waitForRangeToBeDeletedFutures.push_back(fut);
// Super range touching lower bound
- inputRange = ChunkRange(BSON("a" << 0), BSON("a" << 20));
+ inputRange = ChunkRange(BSON(kShardKey << 0), BSON(kShardKey << 20));
fut = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange);
ASSERT(!fut.isReady());
waitForRangeToBeDeletedFutures.push_back(fut);
// Sub-range
- inputRange = ChunkRange(BSON("a" << 3), BSON("a" << 6));
+ inputRange = ChunkRange(BSON(kShardKey << 3), BSON(kShardKey << 6));
fut = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange);
ASSERT(!fut.isReady());
waitForRangeToBeDeletedFutures.push_back(fut);
// Sub-range touching upper bound
- inputRange = ChunkRange(BSON("a" << 3), BSON("a" << 10));
+ inputRange = ChunkRange(BSON(kShardKey << 3), BSON(kShardKey << 10));
fut = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange);
ASSERT(!fut.isReady());
waitForRangeToBeDeletedFutures.push_back(fut);
// Sub-range touching lower bound
- inputRange = ChunkRange(BSON("a" << 0), BSON("a" << 6));
+ inputRange = ChunkRange(BSON(kShardKey << 0), BSON(kShardKey << 6));
fut = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange);
ASSERT(!fut.isReady());
waitForRangeToBeDeletedFutures.push_back(fut);
@@ -398,32 +419,38 @@ TEST_F(RangeDeleterServiceTest, MultipleOverlappingRangeDeletionsFuture) {
auto rds = RangeDeleterService::get(opCtx);
// Register range deletion tasks [0, 10) - [10, 20) - [20, 30)
- auto taskWithOngoingQueries0 =
- createRangeDeletionTaskWithOngoingQueries(uuidCollA, BSON("a" << 0), BSON("a" << 10));
- auto completionFuture0 = rds->registerTask(taskWithOngoingQueries0->getTask(),
- taskWithOngoingQueries0->getOngoingQueriesFuture());
- auto taskWithOngoingQueries10 =
- createRangeDeletionTaskWithOngoingQueries(uuidCollA, BSON("a" << 10), BSON("a" << 20));
- auto completionFuture10 = rds->registerTask(
- taskWithOngoingQueries10->getTask(), taskWithOngoingQueries10->getOngoingQueriesFuture());
- auto taskWithOngoingQueries30 =
- createRangeDeletionTaskWithOngoingQueries(uuidCollA, BSON("a" << 30), BSON("a" << 40));
- auto completionFuture30 = rds->registerTask(
- taskWithOngoingQueries30->getTask(), taskWithOngoingQueries30->getOngoingQueriesFuture());
+ auto taskWithOngoingQueries0 = createRangeDeletionTaskWithOngoingQueries(
+ uuidCollA, BSON(kShardKey << 0), BSON(kShardKey << 10));
+ auto completionFuture0 =
+ registerAndCreatePersistentTask(opCtx,
+ taskWithOngoingQueries0->getTask(),
+ taskWithOngoingQueries0->getOngoingQueriesFuture());
+ auto taskWithOngoingQueries10 = createRangeDeletionTaskWithOngoingQueries(
+ uuidCollA, BSON(kShardKey << 10), BSON(kShardKey << 20));
+ auto completionFuture10 =
+ registerAndCreatePersistentTask(opCtx,
+ taskWithOngoingQueries10->getTask(),
+ taskWithOngoingQueries10->getOngoingQueriesFuture());
+ auto taskWithOngoingQueries30 = createRangeDeletionTaskWithOngoingQueries(
+ uuidCollA, BSON(kShardKey << 30), BSON(kShardKey << 40));
+ auto completionFuture30 =
+ registerAndCreatePersistentTask(opCtx,
+ taskWithOngoingQueries30->getTask(),
+ taskWithOngoingQueries30->getOngoingQueriesFuture());
// Exact match with [0, 10)
- ChunkRange inputRange(BSON("a" << 0), BSON("a" << 10));
+ ChunkRange inputRange(BSON(kShardKey << 0), BSON(kShardKey << 10));
auto futureReadyWhenTask0Ready = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange);
ASSERT(!futureReadyWhenTask0Ready.isReady());
// Super-range spanning across [0, 10) and [10, 20)
- inputRange = ChunkRange(BSON("a" << -10), BSON("a" << 20));
+ inputRange = ChunkRange(BSON(kShardKey << -10), BSON(kShardKey << 20));
auto futureReadyWhenTasks0And10Ready =
rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange);
ASSERT(!futureReadyWhenTasks0And10Ready.isReady());
// Super-range spanning across [0, 10), [10, 20) and [30, 40)
- inputRange = ChunkRange(BSON("a" << -10), BSON("a" << 50));
+ inputRange = ChunkRange(BSON(kShardKey << -10), BSON(kShardKey << 50));
auto futureReadyWhenTasks0And10And30Ready =
rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange);
ASSERT(!futureReadyWhenTasks0And10And30Ready.isReady());
@@ -446,41 +473,47 @@ TEST_F(RangeDeleterServiceTest, GetOverlappingRangeDeletionsResilientToRefineSha
auto rds = RangeDeleterService::get(opCtx);
// Register range deletion tasks [0, 10) - [10, 20) - [20, 30)
- auto taskWithOngoingQueries0 =
- createRangeDeletionTaskWithOngoingQueries(uuidCollA, BSON("a" << 0), BSON("a" << 10));
- auto completionFuture0 = rds->registerTask(taskWithOngoingQueries0->getTask(),
- taskWithOngoingQueries0->getOngoingQueriesFuture());
- auto taskWithOngoingQueries10 =
- createRangeDeletionTaskWithOngoingQueries(uuidCollA, BSON("a" << 10), BSON("a" << 20));
- auto completionFuture10 = rds->registerTask(
- taskWithOngoingQueries10->getTask(), taskWithOngoingQueries10->getOngoingQueriesFuture());
- auto taskWithOngoingQueries30 =
- createRangeDeletionTaskWithOngoingQueries(uuidCollA, BSON("a" << 30), BSON("a" << 40));
- auto completionFuture30 = rds->registerTask(
- taskWithOngoingQueries30->getTask(), taskWithOngoingQueries30->getOngoingQueriesFuture());
+ auto taskWithOngoingQueries0 = createRangeDeletionTaskWithOngoingQueries(
+ uuidCollA, BSON(kShardKey << 0), BSON(kShardKey << 10));
+ auto completionFuture0 =
+ registerAndCreatePersistentTask(opCtx,
+ taskWithOngoingQueries0->getTask(),
+ taskWithOngoingQueries0->getOngoingQueriesFuture());
+ auto taskWithOngoingQueries10 = createRangeDeletionTaskWithOngoingQueries(
+ uuidCollA, BSON(kShardKey << 10), BSON(kShardKey << 20));
+ auto completionFuture10 =
+ registerAndCreatePersistentTask(opCtx,
+ taskWithOngoingQueries10->getTask(),
+ taskWithOngoingQueries10->getOngoingQueriesFuture());
+ auto taskWithOngoingQueries30 = createRangeDeletionTaskWithOngoingQueries(
+ uuidCollA, BSON(kShardKey << 30), BSON(kShardKey << 40));
+ auto completionFuture30 =
+ registerAndCreatePersistentTask(opCtx,
+ taskWithOngoingQueries30->getTask(),
+ taskWithOngoingQueries30->getOngoingQueriesFuture());
// Exact match with [0, 10)
- ChunkRange inputRange(BSON("a" << 0 << "b"
- << "lol"),
- BSON("a" << 9 << "b"
- << "lol"));
+ ChunkRange inputRange(BSON(kShardKey << 0 << "b"
+ << "lol"),
+ BSON(kShardKey << 9 << "b"
+ << "lol"));
auto futureReadyWhenTask0Ready = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange);
ASSERT(!futureReadyWhenTask0Ready.isReady());
// Super-range spanning across [0, 10) and [10, 20)
- inputRange = ChunkRange(BSON("a" << -10 << "b"
- << "lol"),
- BSON("a" << 15 << "b"
- << "lol"));
+ inputRange = ChunkRange(BSON(kShardKey << -10 << "b"
+ << "lol"),
+ BSON(kShardKey << 15 << "b"
+ << "lol"));
auto futureReadyWhenTasks0And10Ready =
rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange);
ASSERT(!futureReadyWhenTasks0And10Ready.isReady());
// Super-range spanning across [0, 10), [10, 20) and [30, 40)
- inputRange = ChunkRange(BSON("a" << -10 << "b"
- << "lol"),
- BSON("a" << 50 << "b"
- << "lol"));
+ inputRange = ChunkRange(BSON(kShardKey << -10 << "b"
+ << "lol"),
+ BSON(kShardKey << 50 << "b"
+ << "lol"));
auto futureReadyWhenTasks0And10And30Ready =
rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange);
ASSERT(!futureReadyWhenTasks0And10And30Ready.isReady());
@@ -507,14 +540,17 @@ TEST_F(RangeDeleterServiceTest, DumpState) {
// Register 2 tasks for `collA` and 1 task for `collB`
auto completionFuture0CollA =
- rds->registerTask(task0WithOngoingQueriesCollA->getTask(),
- task0WithOngoingQueriesCollA->getOngoingQueriesFuture());
+ registerAndCreatePersistentTask(opCtx,
+ task0WithOngoingQueriesCollA->getTask(),
+ task0WithOngoingQueriesCollA->getOngoingQueriesFuture());
auto completionFuture1CollA =
- rds->registerTask(task1WithOngoingQueriesCollA->getTask(),
- task1WithOngoingQueriesCollA->getOngoingQueriesFuture());
+ registerAndCreatePersistentTask(opCtx,
+ task1WithOngoingQueriesCollA->getTask(),
+ task1WithOngoingQueriesCollA->getOngoingQueriesFuture());
auto completionFutureCollB =
- rds->registerTask(taskWithOngoingQueriesCollB->getTask(),
- taskWithOngoingQueriesCollB->getOngoingQueriesFuture());
+ registerAndCreatePersistentTask(opCtx,
+ taskWithOngoingQueriesCollB->getTask(),
+ taskWithOngoingQueriesCollB->getOngoingQueriesFuture());
// The tasks can't be processed (hence completed) before ongoing queries drain
ASSERT(!completionFuture0CollA.isReady());
@@ -550,14 +586,17 @@ TEST_F(RangeDeleterServiceTest, TotalNumOfRegisteredTasks) {
// Register 2 tasks for `collA` and 1 task for `collB`
auto completionFuture0CollA =
- rds->registerTask(rangeDeletionTask0ForCollA->getTask(),
- rangeDeletionTask0ForCollA->getOngoingQueriesFuture());
+ registerAndCreatePersistentTask(opCtx,
+ rangeDeletionTask0ForCollA->getTask(),
+ rangeDeletionTask0ForCollA->getOngoingQueriesFuture());
auto completionFuture1CollA =
- rds->registerTask(rangeDeletionTask1ForCollA->getTask(),
- rangeDeletionTask1ForCollA->getOngoingQueriesFuture());
+ registerAndCreatePersistentTask(opCtx,
+ rangeDeletionTask1ForCollA->getTask(),
+ rangeDeletionTask1ForCollA->getOngoingQueriesFuture());
auto completionFutureCollB =
- rds->registerTask(rangeDeletionTask0ForCollB->getTask(),
- rangeDeletionTask0ForCollB->getOngoingQueriesFuture());
+ registerAndCreatePersistentTask(opCtx,
+ rangeDeletionTask0ForCollB->getTask(),
+ rangeDeletionTask0ForCollB->getOngoingQueriesFuture());
ASSERT_EQ(3, rds->totalNumOfRegisteredTasks());
}
@@ -568,8 +607,10 @@ TEST_F(RangeDeleterServiceTest, RegisterTaskWithDisableResumableRangeDeleterFlag
auto rds = RangeDeleterService::get(opCtx);
auto taskWithOngoingQueries = rangeDeletionTask0ForCollA;
- auto completionFuture = rds->registerTask(taskWithOngoingQueries->getTask(),
- taskWithOngoingQueries->getOngoingQueriesFuture());
+ auto completionFuture =
+ registerAndCreatePersistentTask(opCtx,
+ taskWithOngoingQueries->getTask(),
+ taskWithOngoingQueries->getOngoingQueriesFuture());
ASSERT(completionFuture.isReady());
ASSERT_EQ(0, rds->getNumRangeDeletionTasksForCollection(uuidCollA));
}
@@ -579,8 +620,10 @@ TEST_F(RangeDeleterServiceTest,
auto rds = RangeDeleterService::get(opCtx);
auto taskWithOngoingQueries = rangeDeletionTask0ForCollA;
- auto completionFuture = rds->registerTask(taskWithOngoingQueries->getTask(),
- taskWithOngoingQueries->getOngoingQueriesFuture());
+ auto completionFuture =
+ registerAndCreatePersistentTask(opCtx,
+ taskWithOngoingQueries->getTask(),
+ taskWithOngoingQueries->getOngoingQueriesFuture());
ASSERT(!completionFuture.isReady());
ASSERT_EQ(1, rds->getNumRangeDeletionTasksForCollection(uuidCollA));
@@ -611,8 +654,10 @@ TEST_F(RangeDeleterServiceTest, RescheduleRangeDeletionTasksOnStepUp) {
int nPending = 0, nNonPending = 0, nNonPendingAndProcessing = 0;
int minBound = 0;
for (int i = 0; i < nRangeDeletionTasks; i++) {
- auto rangeDeletionTask = createRangeDeletionTask(
- uuidCollA, BSON("a" << minBound), BSON("a" << minBound + 10), CleanWhenEnum::kDelayed);
+ auto rangeDeletionTask = createRangeDeletionTask(uuidCollA,
+ BSON(kShardKey << minBound),
+ BSON(kShardKey << minBound + 10),
+ CleanWhenEnum::kDelayed);
minBound += 10;
auto rand = random.nextInt32() % 3;
@@ -643,4 +688,63 @@ TEST_F(RangeDeleterServiceTest, RescheduleRangeDeletionTasksOnStepUp) {
rds->getNumRangeDeletionTasksForCollection(uuidCollA));
}
+TEST_F(RangeDeleterServiceTest, PerformActualRangeDeletion) {
+ auto rds = RangeDeleterService::get(opCtx);
+ auto taskWithOngoingQueries = rangeDeletionTask0ForCollA;
+ auto nss = nssWithUuid[uuidCollA];
+ DBDirectClient dbclient(opCtx);
+
+ insertDocsWithinRange(opCtx, nss, 0, 10, 10);
+ ASSERT_EQUALS(dbclient.count(nss, BSONObj()), 10);
+
+ auto completionFuture =
+ registerAndCreatePersistentTask(opCtx,
+ taskWithOngoingQueries->getTask(),
+ taskWithOngoingQueries->getOngoingQueriesFuture());
+
+ // The task can't be processed (hence completed) before ongoing queries drain
+ ASSERT(!completionFuture.isReady());
+ ASSERT_EQ(1, rds->getNumRangeDeletionTasksForCollection(uuidCollA));
+ ASSERT_EQUALS(dbclient.count(NamespaceString::kRangeDeletionNamespace), 1);
+ verifyRangeDeletionTasks(opCtx, uuidCollA, {taskWithOngoingQueries->getTask().getRange()});
+
+ // Pretend ongoing queries have drained and make sure the task completes
+ taskWithOngoingQueries->drainOngoingQueries();
+ completionFuture.get(opCtx);
+ ASSERT_EQUALS(dbclient.count(nss), 0);
+}
+
+TEST_F(RangeDeleterServiceTest, OnlyRemoveDocumentsInRangeToDelete) {
+ auto rds = RangeDeleterService::get(opCtx);
+ auto taskWithOngoingQueries = rangeDeletionTask0ForCollA;
+ auto nss = nssWithUuid[uuidCollA];
+ DBDirectClient dbclient(opCtx);
+
+ // Insert docs within the range targeted by the deletion task
+ int numDocsToDelete = insertDocsWithinRange(opCtx, nss, 0, 10, 10);
+ ASSERT_EQUALS(dbclient.count(nss), numDocsToDelete);
+
+ // Insert docs in a different range
+ int numDocsToKeep = insertDocsWithinRange(opCtx, nss, 20, 25, 5);
+ numDocsToKeep += insertDocsWithinRange(opCtx, nss, 100, 105, 5);
+ ASSERT_EQUALS(dbclient.count(nss), numDocsToKeep + numDocsToDelete);
+
+ auto completionFuture =
+ registerAndCreatePersistentTask(opCtx,
+ taskWithOngoingQueries->getTask(),
+ taskWithOngoingQueries->getOngoingQueriesFuture());
+
+ // The task can't be processed (hence completed) before ongoing queries drain
+ ASSERT(!completionFuture.isReady());
+ ASSERT_EQ(1, rds->getNumRangeDeletionTasksForCollection(uuidCollA));
+ ASSERT_EQUALS(dbclient.count(NamespaceString::kRangeDeletionNamespace), 1);
+ verifyRangeDeletionTasks(opCtx, uuidCollA, {taskWithOngoingQueries->getTask().getRange()});
+
+ // Pretend ongoing queries have drained and make sure only orphans were cleared up
+ taskWithOngoingQueries->drainOngoingQueries();
+ completionFuture.get(opCtx);
+ ASSERT_EQUALS(dbclient.count(nss), numDocsToKeep);
+ ASSERT_EQUALS(dbclient.count(NamespaceString::kRangeDeletionNamespace), 0);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/range_deleter_service_test.h b/src/mongo/db/s/range_deleter_service_test.h
index 44435954534..7c689178ab1 100644
--- a/src/mongo/db/s/range_deleter_service_test.h
+++ b/src/mongo/db/s/range_deleter_service_test.h
@@ -31,6 +31,7 @@
#include "mongo/db/s/range_deleter_service.h"
#include "mongo/db/s/shard_server_test_fixture.h"
#include "mongo/idl/server_parameter_test_util.h"
+#include "mongo/unittest/log_test.h"
namespace mongo {
@@ -44,7 +45,7 @@ public:
RangeDeletionTask getTask();
void drainOngoingQueries();
- auto getOngoingQueriesFuture();
+ SemiFuture<void> getOngoingQueriesFuture();
private:
RangeDeletionTask _task;
@@ -58,32 +59,59 @@ public:
OperationContext* opCtx;
- // Util methods
- RangeDeletionTask createRangeDeletionTask(const UUID& collectionUUID,
- const BSONObj& min,
- const BSONObj& max,
- CleanWhenEnum whenToClean = CleanWhenEnum::kNow,
- bool pending = true);
-
- std::shared_ptr<RangeDeletionWithOngoingQueries> createRangeDeletionTaskWithOngoingQueries(
- const UUID& collectionUUID,
- const BSONObj& min,
- const BSONObj& max,
- CleanWhenEnum whenToClean = CleanWhenEnum::kNow,
- bool pending = true);
-
// Instantiate some collection UUIDs and tasks to be used for testing
UUID uuidCollA = UUID::gen();
- inline static const NamespaceString nsCollA = NamespaceString("test", "collA");
+ inline static const NamespaceString nsCollA{"test", "collA"};
+ UUID uuidCollB = UUID::gen();
+ inline static const NamespaceString nsCollB{"test", "collB"};
+
+ inline static std::map<UUID, NamespaceString> nssWithUuid{};
+
std::shared_ptr<RangeDeletionWithOngoingQueries> rangeDeletionTask0ForCollA;
std::shared_ptr<RangeDeletionWithOngoingQueries> rangeDeletionTask1ForCollA;
-
- UUID uuidCollB = UUID::gen();
- inline static const NamespaceString nsCollB = NamespaceString("test", "collB");
std::shared_ptr<RangeDeletionWithOngoingQueries> rangeDeletionTask0ForCollB;
+ inline static const std::string kShardKey = "_id";
+ inline static const BSONObj kShardKeyPattern = BSON(kShardKey << 1);
+
private:
+ void _setFilteringMetadataWithUUID(OperationContext* opCtx, const UUID& uuid);
+
+ // Scoped objects
RAIIServerParameterControllerForTest enableFeatureFlag{"featureFlagRangeDeleterService", true};
+ unittest::MinimumLoggedSeverityGuard _severityGuard{logv2::LogComponent::kShardingRangeDeleter,
+ logv2::LogSeverity::Debug(2)};
};
+RangeDeletionTask createRangeDeletionTask(const UUID& collectionUUID,
+ const BSONObj& min,
+ const BSONObj& max,
+ CleanWhenEnum whenToClean = CleanWhenEnum::kNow,
+ bool pending = true);
+
+std::shared_ptr<RangeDeletionWithOngoingQueries> createRangeDeletionTaskWithOngoingQueries(
+ const UUID& collectionUUID,
+ const BSONObj& min,
+ const BSONObj& max,
+ CleanWhenEnum whenToClean = CleanWhenEnum::kNow,
+ bool pending = true);
+
+SharedSemiFuture<void> registerAndCreatePersistentTask(
+ OperationContext* opCtx,
+ const RangeDeletionTask& rdt,
+ SemiFuture<void>&& waitForActiveQueriesToComplete);
+
+int insertDocsWithinRange(
+ OperationContext* opCtx, const NamespaceString& nss, int min, int max, int maxCount);
+
+void verifyRangeDeletionTasks(OperationContext* opCtx,
+ UUID uuidColl,
+ std::vector<ChunkRange> expectedChunkRanges);
+
+// CRUD operation over `config.rangeDeletions`
+void insertRangeDeletionTaskDocument(OperationContext* opCtx, const RangeDeletionTask& rdt);
+void updatePendingField(OperationContext* opCtx, UUID rdtId, bool pending);
+void removePendingField(OperationContext* opCtx, UUID rdtId);
+void deleteRangeDeletionTaskDocument(OperationContext* opCtx, UUID rdtId);
+
} // namespace mongo
diff --git a/src/mongo/db/s/range_deleter_service_test_util.cpp b/src/mongo/db/s/range_deleter_service_test_util.cpp
new file mode 100644
index 00000000000..6bfc2a76d5e
--- /dev/null
+++ b/src/mongo/db/s/range_deleter_service_test_util.cpp
@@ -0,0 +1,177 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/persistent_task_store.h"
+#include "mongo/db/s/range_deleter_service_test.h"
+
+namespace mongo {
+
+/**
+ * RangeDeletionWithOngoingQueries implementation
+ */
+RangeDeletionWithOngoingQueries::RangeDeletionWithOngoingQueries(const RangeDeletionTask& t)
+ : _task(t) {}
+
+RangeDeletionTask RangeDeletionWithOngoingQueries::getTask() {
+ return _task;
+}
+
+void RangeDeletionWithOngoingQueries::drainOngoingQueries() {
+ _ongoingQueries.setFrom(Status::OK());
+}
+
+SemiFuture<void> RangeDeletionWithOngoingQueries::getOngoingQueriesFuture() {
+ return _ongoingQueries.getFuture().semi();
+}
+
+/**
+ * Utils
+ */
+RangeDeletionTask createRangeDeletionTask(const UUID& collectionUUID,
+ const BSONObj& min,
+ const BSONObj& max,
+ CleanWhenEnum whenToClean,
+ bool pending) {
+ RangeDeletionTask rdt;
+ rdt.setId(UUID::gen());
+ rdt.setNss(RangeDeleterServiceTest::nssWithUuid[collectionUUID]);
+ rdt.setDonorShardId(ShardId("shard0"));
+ rdt.setCollectionUuid(collectionUUID);
+ rdt.setRange(ChunkRange(min, max));
+ rdt.setWhenToClean(whenToClean);
+ rdt.setPending(pending);
+ return rdt;
+}
+
+std::shared_ptr<RangeDeletionWithOngoingQueries> createRangeDeletionTaskWithOngoingQueries(
+ const UUID& collectionUUID,
+ const BSONObj& min,
+ const BSONObj& max,
+ CleanWhenEnum whenToClean,
+ bool pending) {
+ return std::make_shared<RangeDeletionWithOngoingQueries>(
+ createRangeDeletionTask(collectionUUID, min, max, whenToClean, pending));
+}
+
+// TODO review this method: the task may be registered, finish and be recreated by inserting the
+// document
+SharedSemiFuture<void> registerAndCreatePersistentTask(
+ OperationContext* opCtx,
+ const RangeDeletionTask& rdt,
+ SemiFuture<void>&& waitForActiveQueriesToComplete) {
+ auto rds = RangeDeleterService::get(opCtx);
+
+ auto completionFuture = rds->registerTask(rdt, std::move(waitForActiveQueriesToComplete));
+
+ // Range deletion task will only proceed if persistent doc exists and its `pending` field
+ // doesn't exist
+ insertRangeDeletionTaskDocument(opCtx, rdt);
+ removePendingField(opCtx, rdt.getId());
+
+ return completionFuture;
+}
+
+int insertDocsWithinRange(
+ OperationContext* opCtx, const NamespaceString& nss, int min, int max, int maxCount) {
+
+ DBDirectClient dbclient(opCtx);
+ for (auto i = 0; i < maxCount; ++i) {
+ const int nextI = min + i;
+ if (nextI == max) {
+ return i;
+ }
+ dbclient.insert(nss.toString(), BSON(RangeDeleterServiceTest::kShardKey << nextI));
+ }
+ return maxCount;
+}
+
+
+void insertRangeDeletionTaskDocument(OperationContext* opCtx, const RangeDeletionTask& rdt) {
+ PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
+ store.add(opCtx, rdt);
+}
+
+void updatePendingField(OperationContext* opCtx, UUID rdtId, bool pending) {
+ PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
+ store.update(opCtx,
+ BSON(RangeDeletionTask::kIdFieldName << rdtId),
+ BSON("$set" << BSON(RangeDeletionTask::kPendingFieldName << pending)));
+}
+
+void removePendingField(OperationContext* opCtx, UUID rdtId) {
+ PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
+ store.update(opCtx,
+ BSON(RangeDeletionTask::kIdFieldName << rdtId),
+ BSON("$unset" << BSON(RangeDeletionTask::kPendingFieldName << "")));
+}
+
+void deleteRangeDeletionTaskDocument(OperationContext* opCtx, UUID rdtId) {
+ PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
+ store.remove(opCtx, BSON(RangeDeletionTask::kIdFieldName << rdtId));
+}
+
+/**
+ * Ensure that `expectedChunkRanges` range deletion tasks are scheduled for collection with UUID
+ * `uuidColl`
+ */
+void verifyRangeDeletionTasks(OperationContext* opCtx,
+ UUID uuidColl,
+ std::vector<ChunkRange> expectedChunkRanges) {
+ auto rds = RangeDeleterService::get(opCtx);
+
+ // Get chunk ranges inserted to be deleted by RangeDeleterService
+ BSONObj dumpState = rds->dumpState();
+ BSONElement chunkRangesElem = dumpState.getField(uuidColl.toString());
+ if (!chunkRangesElem.ok() && expectedChunkRanges.size() == 0) {
+ return;
+ }
+ ASSERT(chunkRangesElem.ok()) << "Expected to find range deletion tasks from collection "
+ << uuidColl.toString();
+
+ const auto chunkRanges = chunkRangesElem.Array();
+ ASSERT_EQ(chunkRanges.size(), expectedChunkRanges.size());
+
+ // Sort expectedChunkRanges vector to replicate RangeDeleterService dumpState order
+ struct {
+ bool operator()(const ChunkRange& a, const ChunkRange& b) {
+ return a.getMin().woCompare(b.getMin()) < 0;
+ }
+ } RANGES_COMPARATOR;
+
+ std::sort(expectedChunkRanges.begin(), expectedChunkRanges.end(), RANGES_COMPARATOR);
+
+ // Check expectedChunkRanges are exactly the same as the returned ones
+ for (size_t i = 0; i < expectedChunkRanges.size(); ++i) {
+ ASSERT(ChunkRange::fromBSONThrowing(chunkRanges[i].Obj()) == expectedChunkRanges[i])
+ << "Expected " << ChunkRange::fromBSONThrowing(chunkRanges[i].Obj()).toBSON()
+ << " == " << expectedChunkRanges[i].toBSON();
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/range_deletion_util.cpp b/src/mongo/db/s/range_deletion_util.cpp
index a7a75a3257b..331f6f41a51 100644
--- a/src/mongo/db/s/range_deletion_util.cpp
+++ b/src/mongo/db/s/range_deletion_util.cpp
@@ -49,7 +49,8 @@
#include "mongo/db/query/query_planner.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/wait_for_majority_service.h"
-#include "mongo/db/s/migration_util.h"
+#include "mongo/db/s/balancer_stats_registry.h"
+#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/shard_key_index_util.h"
#include "mongo/db/s/sharding_runtime_d_params_gen.h"
#include "mongo/db/s/sharding_statistics.h"
@@ -190,33 +191,6 @@ StatusWith<int> deleteNextBatch(OperationContext* opCtx,
return numDeleted;
}
-template <typename Callable>
-auto withTemporaryOperationContext(Callable&& callable, const NamespaceString& nss) {
- ThreadClient tc(migrationutil::kRangeDeletionThreadName, getGlobalServiceContext());
- {
- stdx::lock_guard<Client> lk(*tc.get());
- tc->setSystemOperationKillableByStepdown(lk);
- }
- auto uniqueOpCtx = Client::getCurrent()->makeOperationContext();
- auto opCtx = uniqueOpCtx.get();
-
- // Ensure that this operation will be killed by the RstlKillOpThread during step-up or stepdown.
- opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
- invariant(opCtx->shouldAlwaysInterruptAtStepDownOrUp());
-
- {
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- Lock::GlobalLock lock(opCtx, MODE_IX);
- uassert(ErrorCodes::PrimarySteppedDown,
- str::stream() << "Not primary while running range deletion task for collection"
- << nss,
- replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet &&
- replCoord->canAcceptWritesFor(opCtx, nss));
- }
-
- return callable(opCtx);
-}
-
void ensureRangeDeletionTaskStillExists(OperationContext* opCtx,
const UUID& collectionUuid,
const ChunkRange& range) {
@@ -284,28 +258,11 @@ ExecutorFuture<void> deleteRangeInBatchesWithExecutor(
[=](OperationContext* opCtx) {
return deleteRangeInBatches(opCtx, nss.db(), collectionUuid, keyPattern, range);
},
- nss);
+ nss.db(),
+ collectionUuid);
});
}
-void removePersistentRangeDeletionTask(const NamespaceString& nss,
- const UUID& collectionUuid,
- const ChunkRange& range) {
- withTemporaryOperationContext(
- [&](OperationContext* opCtx) {
- PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
-
- auto overlappingRangeQuery = BSON(
- RangeDeletionTask::kCollectionUuidFieldName
- << collectionUuid << RangeDeletionTask::kRangeFieldName + "." + ChunkRange::kMinKey
- << GTE << range.getMin()
- << RangeDeletionTask::kRangeFieldName + "." + ChunkRange::kMaxKey << LTE
- << range.getMax());
- store.remove(opCtx, overlappingRangeQuery);
- },
- nss);
-}
-
ExecutorFuture<void> waitForDeletionsToMajorityReplicate(
const std::shared_ptr<executor::TaskExecutor>& executor,
const NamespaceString& nss,
@@ -329,7 +286,8 @@ ExecutorFuture<void> waitForDeletionsToMajorityReplicate(
.waitUntilMajority(clientOpTime, CancellationToken::uncancelable())
.thenRunOn(executor);
},
- nss);
+ nss.db(),
+ collectionUuid);
}
std::vector<RangeDeletionTask> getPersistentRangeDeletionTasks(OperationContext* opCtx,
@@ -347,6 +305,13 @@ std::vector<RangeDeletionTask> getPersistentRangeDeletionTasks(OperationContext*
return tasks;
}
+BSONObj getQueryFilterForRangeDeletionTask(const UUID& collectionUuid, const ChunkRange& range) {
+ return BSON(RangeDeletionTask::kCollectionUuidFieldName
+ << collectionUuid << RangeDeletionTask::kRangeFieldName + "." + ChunkRange::kMinKey
+ << range.getMin() << RangeDeletionTask::kRangeFieldName + "." + ChunkRange::kMaxKey
+ << range.getMax());
+}
+
} // namespace
Status deleteRangeInBatches(OperationContext* opCtx,
@@ -354,6 +319,8 @@ Status deleteRangeInBatches(OperationContext* opCtx,
const UUID& collectionUuid,
const BSONObj& keyPattern,
const ChunkRange& range) {
+ suspendRangeDeletion.pauseWhileSet();
+
bool allDocsRemoved = false;
// Delete all batches in this range unless a stepdown error occurs. Do not yield the
// executor to ensure that this range is fully deleted before another range is
@@ -402,7 +369,7 @@ Status deleteRangeInBatches(OperationContext* opCtx,
}
}();
- migrationutil::persistUpdatedNumOrphans(opCtx, collectionUuid, range, -numDeleted);
+ persistUpdatedNumOrphans(opCtx, collectionUuid, range, -numDeleted);
if (MONGO_unlikely(hangAfterDoingDeletion.shouldFail())) {
hangAfterDoingDeletion.pauseWhileSet(opCtx);
@@ -508,7 +475,6 @@ SharedSemiFuture<void> removeDocumentsInRange(
invariant(s.isOK());
})
.then([=]() mutable {
- suspendRangeDeletion.pauseWhileSet();
// Wait for possibly ongoing queries on secondaries to complete.
return sleepUntil(executor,
executor->now() + delayForActiveQueriesOnSecondariesToComplete);
@@ -574,7 +540,12 @@ SharedSemiFuture<void> removeDocumentsInRange(
}
try {
- removePersistentRangeDeletionTask(nss, collectionUuid, range);
+ withTemporaryOperationContext(
+ [&](OperationContext* opCtx) {
+ removePersistentRangeDeletionTask(opCtx, collectionUuid, range);
+ },
+ nss.db(),
+ collectionUuid);
} catch (const DBException& e) {
LOGV2_ERROR(23770,
"Failed to delete range deletion task for range {range} in collection "
@@ -602,4 +573,41 @@ SharedSemiFuture<void> removeDocumentsInRange(
.share();
}
+void persistUpdatedNumOrphans(OperationContext* opCtx,
+ const UUID& collectionUuid,
+ const ChunkRange& range,
+ long long changeInOrphans) {
+ const auto query = getQueryFilterForRangeDeletionTask(collectionUuid, range);
+ try {
+ PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
+ ScopedRangeDeleterLock rangeDeleterLock(opCtx, collectionUuid);
+ // The DBDirectClient will not retry WriteConflictExceptions internally while holding an X
+ // mode lock, so we need to retry at this level.
+ writeConflictRetry(
+ opCtx, "updateOrphanCount", NamespaceString::kRangeDeletionNamespace.ns(), [&] {
+ store.update(opCtx,
+ query,
+ BSON("$inc" << BSON(RangeDeletionTask::kNumOrphanDocsFieldName
+ << changeInOrphans)),
+ WriteConcerns::kLocalWriteConcern);
+ });
+ BalancerStatsRegistry::get(opCtx)->updateOrphansCount(collectionUuid, changeInOrphans);
+ } catch (const ExceptionFor<ErrorCodes::NoMatchingDocument>&) {
+ // When upgrading or downgrading, there may be no documents with the orphan count field.
+ }
+}
+
+void removePersistentRangeDeletionTask(OperationContext* opCtx,
+ const UUID& collectionUuid,
+ const ChunkRange& range) {
+ PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
+
+ auto overlappingRangeQuery = BSON(
+ RangeDeletionTask::kCollectionUuidFieldName
+ << collectionUuid << RangeDeletionTask::kRangeFieldName + "." + ChunkRange::kMinKey << GTE
+ << range.getMin() << RangeDeletionTask::kRangeFieldName + "." + ChunkRange::kMaxKey << LTE
+ << range.getMax());
+ store.remove(opCtx, overlappingRangeQuery);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/range_deletion_util.h b/src/mongo/db/s/range_deletion_util.h
index dd78e97db6f..6efaf6946ce 100644
--- a/src/mongo/db/s/range_deletion_util.h
+++ b/src/mongo/db/s/range_deletion_util.h
@@ -31,13 +31,17 @@
#include <boost/optional.hpp>
#include <list>
+#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/range_deletion_task_gen.h"
#include "mongo/executor/task_executor.h"
#include "mongo/s/catalog/type_chunk.h"
namespace mongo {
+constexpr auto kRangeDeletionThreadName = "range-deleter"_sd;
+
/**
* DO NOT USE - only necessary for the legacy range deleter
*
@@ -94,4 +98,59 @@ void restoreRangeDeletionTasksForRename(OperationContext* opCtx, const Namespace
void deleteRangeDeletionTasksForRename(OperationContext* opCtx,
const NamespaceString& fromNss,
const NamespaceString& toNss);
+
+/**
+ * Updates the range deletion task document to increase or decrease numOrphanedDocs
+ */
+void persistUpdatedNumOrphans(OperationContext* opCtx,
+ const UUID& collectionUuid,
+ const ChunkRange& range,
+ long long changeInOrphans);
+
+/**
+ * Removes range deletion task documents from `config.rangeDeletions` for the specified range and
+ * collection
+ */
+void removePersistentRangeDeletionTask(OperationContext* opCtx,
+ const UUID& collectionUuid,
+ const ChunkRange& range);
+
+/**
+ * Wrapper to run a safer step up/step down killable task within an operation context
+ */
+template <typename Callable>
+auto withTemporaryOperationContext(Callable&& callable,
+ const DatabaseName dbName,
+ const UUID& collectionUUID,
+ bool writeToRangeDeletionNamespace = false) {
+ ThreadClient tc(kRangeDeletionThreadName, getGlobalServiceContext());
+ {
+ stdx::lock_guard<Client> lk(*tc.get());
+ tc->setSystemOperationKillableByStepdown(lk);
+ }
+ auto uniqueOpCtx = Client::getCurrent()->makeOperationContext();
+ auto opCtx = uniqueOpCtx.get();
+
+ // Ensure that this operation will be killed by the RstlKillOpThread during step-up or stepdown.
+ opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
+ invariant(opCtx->shouldAlwaysInterruptAtStepDownOrUp());
+
+ {
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ Lock::GlobalLock lock(opCtx, MODE_IX);
+ uassert(
+ ErrorCodes::PrimarySteppedDown,
+ str::stream()
+ << "Not primary while running range deletion task for collection with UUID "
+ << collectionUUID,
+ replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet &&
+ replCoord->canAcceptWritesFor(opCtx,
+ NamespaceStringOrUUID(dbName, collectionUUID)) &&
+ (!writeToRangeDeletionNamespace ||
+ replCoord->canAcceptWritesFor(opCtx, NamespaceString::kRangeDeletionNamespace)));
+ }
+
+ return callable(opCtx);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/range_deletion_util_test.cpp b/src/mongo/db/s/range_deletion_util_test.cpp
index e0299a30208..5f8cafc91e0 100644
--- a/src/mongo/db/s/range_deletion_util_test.cpp
+++ b/src/mongo/db/s/range_deletion_util_test.cpp
@@ -120,9 +120,6 @@ public:
Lock::CollectionLock collLock(_opCtx, kNss, MODE_IX);
CollectionMetadata collMetadata(std::move(cm), ShardId("dummyShardId"));
CollectionShardingRuntime::get(_opCtx, kNss)->setFilteringMetadata(_opCtx, collMetadata);
- auto* css = CollectionShardingState::get(_opCtx, kNss);
- auto& csr = *checked_cast<CollectionShardingRuntime*>(css);
- csr.setFilteringMetadata(_opCtx, collMetadata);
}
UUID uuid() const {