diff options
-rw-r--r-- | src/mongo/db/s/migration_coordinator.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util_test.cpp | 76 | ||||
-rw-r--r-- | src/mongo/db/s/range_deletion_task.idl | 4 | ||||
-rw-r--r-- | src/mongo/db/s/range_deletion_util_test.cpp | 17 |
6 files changed, 81 insertions, 27 deletions
diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp index a4fe84afb6d..97d24a6375c 100644 --- a/src/mongo/db/s/migration_coordinator.cpp +++ b/src/mongo/db/s/migration_coordinator.cpp @@ -33,6 +33,7 @@ #include "mongo/db/s/migration_coordinator.h" +#include "mongo/db/logical_clock.h" #include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/s/migration_util.h" #include "mongo/db/s/range_deletion_task_gen.h" @@ -123,6 +124,8 @@ void MigrationCoordinator::startMigration(OperationContext* opCtx) { _waitForDelete ? CleanWhenEnum::kNow : CleanWhenEnum::kDelayed); donorDeletionTask.setPending(true); + const auto clusterTime = LogicalClock::get(opCtx)->getClusterTime(); + donorDeletionTask.setTimestamp(clusterTime.asTimestamp()); migrationutil::persistRangeDeletionTaskLocally( opCtx, donorDeletionTask, WriteConcerns::kMajorityWriteConcern); } @@ -223,6 +226,8 @@ SemiFuture<void> MigrationCoordinator::_commitMigrationOnDonorAndRecipient( _migrationInfo.getDonorShardId(), _migrationInfo.getRange(), _waitForDelete ? CleanWhenEnum::kNow : CleanWhenEnum::kDelayed); + const auto clusterTime = LogicalClock::get(opCtx)->getClusterTime(); + deletionTask.setTimestamp(clusterTime.asTimestamp()); return migrationutil::submitRangeDeletionTask(opCtx, deletionTask).semi(); } diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index b4ad2ae10fc..7657072300e 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -43,6 +43,7 @@ #include "mongo/db/dbhelpers.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/index_builds_coordinator.h" +#include "mongo/db/logical_clock.h" #include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/namespace_string.h" #include "mongo/db/op_observer.h" @@ -1065,6 +1066,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { range, CleanWhenEnum::kNow); recipientDeletionTask.setPending(true); + const auto clusterTime = LogicalClock::get(outerOpCtx)->getClusterTime(); + recipientDeletionTask.setTimestamp(clusterTime.asTimestamp()); // It is illegal to wait for write concern with a session checked out, so persist the // range deletion task with an immediately satsifiable write concern and then wait for diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index 7d21dd60e34..c78be68200a 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -43,6 +43,7 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/commands.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/logical_clock.h" #include "mongo/db/logical_session_cache.h" #include "mongo/db/namespace_string.h" #include "mongo/db/ops/write_ops.h" @@ -507,6 +508,8 @@ void submitOrphanRanges(OperationContext* opCtx, const NamespaceString& nss, con ShardId(kRangeDeletionTaskShardIdForFCVUpgrade), range, CleanWhenEnum::kDelayed); + const auto clusterTime = LogicalClock::get(opCtx)->getClusterTime(); + task.setTimestamp(clusterTime.asTimestamp()); deletions.emplace_back(task); if (deletions.size() % 1000 == 0) { diff --git a/src/mongo/db/s/migration_util_test.cpp b/src/mongo/db/s/migration_util_test.cpp index 079afb19393..faeb3435a31 100644 --- a/src/mongo/db/s/migration_util_test.cpp +++ b/src/mongo/db/s/migration_util_test.cpp @@ -30,6 +30,7 @@ #include "mongo/client/remote_command_targeter_factory_mock.h" #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/logical_clock.h" #include "mongo/db/s/catalog_cache_loader_mock.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/collection_sharding_state.h" @@ -69,7 +70,8 @@ void addRangeToReceivingChunks(OperationContext* opCtx, } template <typename ShardKey> -RangeDeletionTask createDeletionTask(const NamespaceString& nss, +RangeDeletionTask createDeletionTask(OperationContext* opCtx, + const NamespaceString& nss, const UUID& uuid, ShardKey min, ShardKey max, @@ -81,6 +83,8 @@ RangeDeletionTask createDeletionTask(const NamespaceString& nss, donorShard, ChunkRange{BSON("_id" << min), BSON("_id" << max)}, CleanWhenEnum::kNow); + const auto clusterTime = LogicalClock::get(opCtx)->getClusterTime(); + task.setTimestamp(clusterTime.asTimestamp()); if (pending) task.setPending(true); @@ -109,9 +113,9 @@ TEST_F(MigrationUtilsTest, TestOverlappingRangeQueryWithIntegerShardKey) { PersistentTaskStore<RangeDeletionTask> store(opCtx, NamespaceString::kRangeDeletionNamespace); - store.add(opCtx, createDeletionTask(NamespaceString{"one"}, uuid, 0, 10)); - store.add(opCtx, createDeletionTask(NamespaceString{"two"}, uuid, 10, 20)); - store.add(opCtx, createDeletionTask(NamespaceString{"three"}, uuid, 40, 50)); + store.add(opCtx, createDeletionTask(opCtx, NamespaceString{"one"}, uuid, 0, 10)); + store.add(opCtx, createDeletionTask(opCtx, NamespaceString{"two"}, uuid, 10, 20)); + store.add(opCtx, createDeletionTask(opCtx, NamespaceString{"three"}, uuid, 40, 50)); ASSERT_EQ(store.count(opCtx), 3); @@ -167,12 +171,21 @@ TEST_F(MigrationUtilsTest, TestOverlappingRangeQueryWithCompoundShardKeyWhereFir PersistentTaskStore<RangeDeletionTask> store(opCtx, NamespaceString::kRangeDeletionNamespace); auto deletionTasks = { - createDeletionTask( - NamespaceString{"one"}, uuid, BSON("a" << 0 << "b" << 0), BSON("a" << 0 << "b" << 10)), - createDeletionTask( - NamespaceString{"two"}, uuid, BSON("a" << 0 << "b" << 10), BSON("a" << 0 << "b" << 20)), - createDeletionTask( - NamespaceString{"one"}, uuid, BSON("a" << 0 << "b" << 40), BSON("a" << 0 << "b" << 50)), + createDeletionTask(opCtx, + NamespaceString{"one"}, + uuid, + BSON("a" << 0 << "b" << 0), + BSON("a" << 0 << "b" << 10)), + createDeletionTask(opCtx, + NamespaceString{"two"}, + uuid, + BSON("a" << 0 << "b" << 10), + BSON("a" << 0 << "b" << 20)), + createDeletionTask(opCtx, + NamespaceString{"one"}, + uuid, + BSON("a" << 0 << "b" << 40), + BSON("a" << 0 << "b" << 50)), }; for (auto&& task : deletionTasks) { @@ -242,12 +255,21 @@ TEST_F(MigrationUtilsTest, PersistentTaskStore<RangeDeletionTask> store(opCtx, NamespaceString::kRangeDeletionNamespace); auto deletionTasks = { - createDeletionTask( - NamespaceString{"one"}, uuid, BSON("a" << 0 << "b" << 0), BSON("a" << 10 << "b" << 0)), - createDeletionTask( - NamespaceString{"two"}, uuid, BSON("a" << 10 << "b" << 0), BSON("a" << 20 << "b" << 0)), - createDeletionTask( - NamespaceString{"one"}, uuid, BSON("a" << 40 << "b" << 0), BSON("a" << 50 << "b" << 0)), + createDeletionTask(opCtx, + NamespaceString{"one"}, + uuid, + BSON("a" << 0 << "b" << 0), + BSON("a" << 10 << "b" << 0)), + createDeletionTask(opCtx, + NamespaceString{"two"}, + uuid, + BSON("a" << 10 << "b" << 0), + BSON("a" << 20 << "b" << 0)), + createDeletionTask(opCtx, + NamespaceString{"one"}, + uuid, + BSON("a" << 40 << "b" << 0), + BSON("a" << 50 << "b" << 0)), }; for (auto&& task : deletionTasks) { @@ -315,9 +337,9 @@ TEST_F(MigrationUtilsTest, TestInvalidUUID) { PersistentTaskStore<RangeDeletionTask> store(opCtx, NamespaceString::kRangeDeletionNamespace); - store.add(opCtx, createDeletionTask(NamespaceString{"one"}, uuid, 0, 10)); - store.add(opCtx, createDeletionTask(NamespaceString{"two"}, uuid, 10, 20)); - store.add(opCtx, createDeletionTask(NamespaceString{"three"}, uuid, 40, 50)); + store.add(opCtx, createDeletionTask(opCtx, NamespaceString{"one"}, uuid, 0, 10)); + store.add(opCtx, createDeletionTask(opCtx, NamespaceString{"two"}, uuid, 10, 20)); + store.add(opCtx, createDeletionTask(opCtx, NamespaceString{"three"}, uuid, 40, 50)); ASSERT_EQ(store.count(opCtx), 3); @@ -471,7 +493,7 @@ TEST_F(SubmitRangeDeletionTaskTest, FailsAndDeletesTaskIfFilteringMetadataIsUnknownEvenAfterRefresh) { auto opCtx = operationContext(); - auto deletionTask = createDeletionTask(kNss, kDefaultUUID, 0, 10, _myShardName); + auto deletionTask = createDeletionTask(opCtx, kNss, kDefaultUUID, 0, 10, _myShardName); PersistentTaskStore<RangeDeletionTask> store(opCtx, NamespaceString::kRangeDeletionNamespace); store.add(opCtx, deletionTask); @@ -496,7 +518,7 @@ TEST_F(SubmitRangeDeletionTaskTest, TEST_F(SubmitRangeDeletionTaskTest, FailsAndDeletesTaskIfNamespaceIsUnshardedEvenAfterRefresh) { auto opCtx = operationContext(); - auto deletionTask = createDeletionTask(kNss, kDefaultUUID, 0, 10, _myShardName); + auto deletionTask = createDeletionTask(opCtx, kNss, kDefaultUUID, 0, 10, _myShardName); PersistentTaskStore<RangeDeletionTask> store(opCtx, NamespaceString::kRangeDeletionNamespace); store.add(opCtx, deletionTask); @@ -523,7 +545,7 @@ TEST_F(SubmitRangeDeletionTaskTest, FailsAndDeletesTaskIfNamespaceIsUnshardedBeforeAndAfterRefresh) { auto opCtx = operationContext(); - auto deletionTask = createDeletionTask(kNss, kDefaultUUID, 0, 10, _myShardName); + auto deletionTask = createDeletionTask(opCtx, kNss, kDefaultUUID, 0, 10, _myShardName); PersistentTaskStore<RangeDeletionTask> store(opCtx, NamespaceString::kRangeDeletionNamespace); store.add(opCtx, deletionTask); @@ -551,7 +573,7 @@ TEST_F(SubmitRangeDeletionTaskTest, SucceedsIfFilteringMetadataUUIDMatchesTaskUU auto opCtx = operationContext(); auto collectionUUID = createCollectionAndGetUUID(kNss); - auto deletionTask = createDeletionTask(kNss, collectionUUID, 0, 10, _myShardName); + auto deletionTask = createDeletionTask(opCtx, kNss, collectionUUID, 0, 10, _myShardName); PersistentTaskStore<RangeDeletionTask> store(opCtx, NamespaceString::kRangeDeletionNamespace); store.add(opCtx, deletionTask); @@ -578,7 +600,7 @@ TEST_F( auto opCtx = operationContext(); auto collectionUUID = createCollectionAndGetUUID(kNss); - auto deletionTask = createDeletionTask(kNss, collectionUUID, 0, 10, _myShardName); + auto deletionTask = createDeletionTask(opCtx, kNss, collectionUUID, 0, 10, _myShardName); PersistentTaskStore<RangeDeletionTask> store(opCtx, NamespaceString::kRangeDeletionNamespace); store.add(opCtx, deletionTask); @@ -610,7 +632,7 @@ TEST_F(SubmitRangeDeletionTaskTest, forceShardFilteringMetadataRefresh(opCtx, kNss, true); auto collectionUUID = createCollectionAndGetUUID(kNss); - auto deletionTask = createDeletionTask(kNss, collectionUUID, 0, 10, _myShardName); + auto deletionTask = createDeletionTask(opCtx, kNss, collectionUUID, 0, 10, _myShardName); PersistentTaskStore<RangeDeletionTask> store(opCtx, NamespaceString::kRangeDeletionNamespace); store.add(opCtx, deletionTask); @@ -646,7 +668,7 @@ TEST_F(SubmitRangeDeletionTaskTest, forceShardFilteringMetadataRefresh(opCtx, kNss, true); auto collectionUUID = createCollectionAndGetUUID(kNss); - auto deletionTask = createDeletionTask(kNss, collectionUUID, 0, 10, _myShardName); + auto deletionTask = createDeletionTask(opCtx, kNss, collectionUUID, 0, 10, _myShardName); PersistentTaskStore<RangeDeletionTask> store(opCtx, NamespaceString::kRangeDeletionNamespace); store.add(opCtx, deletionTask); @@ -669,7 +691,7 @@ TEST_F(SubmitRangeDeletionTaskTest, FailsAndDeletesTaskIfFilteringMetadataUUIDDifferentFromTaskUUIDEvenAfterRefresh) { auto opCtx = operationContext(); - auto deletionTask = createDeletionTask(kNss, kDefaultUUID, 0, 10, _myShardName); + auto deletionTask = createDeletionTask(opCtx, kNss, kDefaultUUID, 0, 10, _myShardName); PersistentTaskStore<RangeDeletionTask> store(opCtx, NamespaceString::kRangeDeletionNamespace); store.add(opCtx, deletionTask); diff --git a/src/mongo/db/s/range_deletion_task.idl b/src/mongo/db/s/range_deletion_task.idl index 1c265f2656a..cef4a3c0c65 100644 --- a/src/mongo/db/s/range_deletion_task.idl +++ b/src/mongo/db/s/range_deletion_task.idl @@ -74,3 +74,7 @@ structs: whenToClean: type: CleanWhen description: "Enumeration that defines when to cleanup the range." + timestamp: + type: timestamp + description: "The timestamp the task was created." + optional: true diff --git a/src/mongo/db/s/range_deletion_util_test.cpp b/src/mongo/db/s/range_deletion_util_test.cpp index 426b2d2e360..8c5b5adc797 100644 --- a/src/mongo/db/s/range_deletion_util_test.cpp +++ b/src/mongo/db/s/range_deletion_util_test.cpp @@ -31,6 +31,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/logical_clock.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/metadata_manager.h" #include "mongo/db/s/migration_util.h" @@ -428,6 +429,8 @@ TEST_F(RangeDeleterTest, RemoveDocumentsInRangeWaitsForReplicationAfterDeletingS const ChunkRange range(BSON(kShardKey << 0), BSON(kShardKey << 10)); RangeDeletionTask t( UUID::gen(), kNss, uuid(), ShardId("donor"), range, CleanWhenEnum::kDelayed); + const auto clusterTime = LogicalClock::get(operationContext())->getClusterTime(); + t.setTimestamp(clusterTime.asTimestamp()); store.add(operationContext(), t); // Document should be in the store. ASSERT_EQUALS(countDocsInConfigRangeDeletions(store, operationContext()), 1); @@ -485,6 +488,8 @@ TEST_F(RangeDeleterTest, RemoveDocumentsInRangeWaitsForReplicationOnlyOnceAfterS const ChunkRange range(BSON(kShardKey << 0), BSON(kShardKey << 10)); RangeDeletionTask t( UUID::gen(), kNss, uuid(), ShardId("donor"), range, CleanWhenEnum::kDelayed); + const auto clusterTime = LogicalClock::get(operationContext())->getClusterTime(); + t.setTimestamp(clusterTime.asTimestamp()); store.add(operationContext(), t); // Document should be in the store. ASSERT_EQUALS(countDocsInConfigRangeDeletions(store, operationContext()), 1); @@ -536,6 +541,8 @@ TEST_F(RangeDeleterTest, RemoveDocumentsInRangeDoesNotWaitForReplicationIfErrorD const ChunkRange range(BSON(kShardKey << 0), BSON(kShardKey << 10)); RangeDeletionTask t( UUID::gen(), kNss, uuid(), ShardId("donor"), range, CleanWhenEnum::kDelayed); + const auto clusterTime = LogicalClock::get(operationContext())->getClusterTime(); + t.setTimestamp(clusterTime.asTimestamp()); store.add(operationContext(), t); // Document should be in the store. ASSERT_EQUALS(countDocsInConfigRangeDeletions(store, operationContext()), 1); @@ -588,6 +595,8 @@ TEST_F(RangeDeleterTest, RemoveDocumentsInRangeRetriesOnWriteConflictException) NamespaceString::kRangeDeletionNamespace); RangeDeletionTask t( UUID::gen(), kNss, uuid(), ShardId("donor"), range, CleanWhenEnum::kDelayed); + const auto clusterTime = LogicalClock::get(operationContext())->getClusterTime(); + t.setTimestamp(clusterTime.asTimestamp()); store.add(operationContext(), t); // Document should be in the store. ASSERT_EQUALS(countDocsInConfigRangeDeletions(store, operationContext()), 1); @@ -627,6 +636,8 @@ TEST_F(RangeDeleterTest, RemoveDocumentsInRangeRetriesOnUnexpectedError) { NamespaceString::kRangeDeletionNamespace); RangeDeletionTask t( UUID::gen(), kNss, uuid(), ShardId("donor"), range, CleanWhenEnum::kDelayed); + const auto clusterTime = LogicalClock::get(operationContext())->getClusterTime(); + t.setTimestamp(clusterTime.asTimestamp()); store.add(operationContext(), t); // Document should be in the store. ASSERT_EQUALS(countDocsInConfigRangeDeletions(store, operationContext()), 1); @@ -749,6 +760,8 @@ TEST_F(RangeDeleterTest, RemoveDocumentsInRangeRemovesRangeDeletionTaskOnSuccess RangeDeletionTask t( UUID::gen(), kNss, uuid(), ShardId("donor"), range, CleanWhenEnum::kDelayed); + const auto clusterTime = LogicalClock::get(operationContext())->getClusterTime(); + t.setTimestamp(clusterTime.asTimestamp()); store.add(operationContext(), t); // Document should be in the store. ASSERT_EQUALS(countDocsInConfigRangeDeletions(store, operationContext()), 1); @@ -787,6 +800,8 @@ TEST_F(RangeDeleterTest, RangeDeletionTask t( UUID::gen(), kNss, fakeUuid, ShardId("donor"), range, CleanWhenEnum::kDelayed); + const auto clusterTime = LogicalClock::get(operationContext())->getClusterTime(); + t.setTimestamp(clusterTime.asTimestamp()); store.add(operationContext(), t); // Document should be in the store. ASSERT_EQUALS(countDocsInConfigRangeDeletions(store, operationContext()), 1); @@ -826,6 +841,8 @@ TEST_F(RangeDeleterTest, RangeDeletionTask t( UUID::gen(), kNss, uuid(), ShardId("donor"), range, CleanWhenEnum::kDelayed); + const auto clusterTime = LogicalClock::get(operationContext())->getClusterTime(); + t.setTimestamp(clusterTime.asTimestamp()); store.add(operationContext(), t); // Document should be in the store. ASSERT_EQUALS(countDocsInConfigRangeDeletions(store, operationContext()), 1); |