diff options
Diffstat (limited to 'src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp')
-rw-r--r-- | src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp | 55 |
1 files changed, 53 insertions, 2 deletions
diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp index 5586c71a25d..9830b44c5db 100644 --- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp +++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp @@ -48,8 +48,35 @@ #include "mongo/s/resharding/resharding_feature_flag_gen.h" namespace mongo { + +MONGO_FAIL_POINT_DEFINE(reshardCollectionJoinedExistingOperation); + namespace { +// Returns a resharding instance if one exists for the same collection and shard key already. +boost::optional<std::shared_ptr<ReshardingCoordinatorService::ReshardingCoordinator>> +getExistingInstanceToJoin(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& newShardKey) { + auto reshardingCoordinatorService = checked_cast<ReshardingCoordinatorService*>( + repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()) + ->lookupServiceByName(ReshardingCoordinatorService::kServiceName)); + auto instances = reshardingCoordinatorService->getAllReshardingInstances(opCtx); + for (auto& instance : instances) { + auto reshardingCoordinator = + checked_pointer_cast<ReshardingCoordinatorService::ReshardingCoordinator>(instance); + + auto instanceMetadata = reshardingCoordinator->getMetadata(); + if (SimpleBSONObjComparator::kInstance.evaluate( + instanceMetadata.getReshardingKey().toBSON() == newShardKey) && + instanceMetadata.getSourceNss() == nss) { + return reshardingCoordinator; + }; + } + + return boost::none; +} + class ConfigsvrReshardCollectionCommand final : public TypedCommand<ConfigsvrReshardCollectionCommand> { public: @@ -109,7 +136,9 @@ public: *presetChunks, opCtx, ShardKeyPattern(request().getKey()).getKeyPattern()); } - auto instance = ([&] { + // Returns boost::none if there isn't any work to be done by the resharding operation. + auto instance = ([&]() -> boost::optional<std::shared_ptr< + ReshardingCoordinatorService::ReshardingCoordinator>> { FixedFCVRegion fixedFcv(opCtx); uassert(ErrorCodes::CommandNotSupported, @@ -117,10 +146,28 @@ public: resharding::gFeatureFlagResharding.isEnabled( serverGlobalParams.featureCompatibility)); + if (auto existingInstance = + getExistingInstanceToJoin(opCtx, nss, request().getKey())) { + // Join the existing resharding operation to prevent generating a new resharding + // instance if the same command is issued consecutively due to client disconnect + // etc. + reshardCollectionJoinedExistingOperation.pauseWhileSet(opCtx); + existingInstance.get()->getCoordinatorDocWrittenFuture().get(opCtx); + return existingInstance; + } + const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh( opCtx, nss)); + const auto currentShardKey = cm.getShardKeyPattern().getKeyPattern(); + if (SimpleBSONObjComparator::kInstance.evaluate(currentShardKey.toBSON() == + request().getKey())) { + // There isn't any work to be done by the resharding operation since the + // existing shard key matches the desired new shard key. + return boost::none; + } + auto tempReshardingNss = constructTemporaryReshardingNss( nss.db(), getCollectionUUIDFromChunkManger(nss, cm)); @@ -158,7 +205,11 @@ public: return instance; })(); - instance->getCompletionFuture().get(opCtx); + if (instance) { + // There is work to be done in order to have the collection's shard key match the + // requested shard key. Wait until the work is complete. + instance.get()->getCompletionFuture().get(opCtx); + } } private: |