summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp')
-rw-r--r--src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp55
1 files changed, 53 insertions, 2 deletions
diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
index 5586c71a25d..9830b44c5db 100644
--- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
+++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
@@ -48,8 +48,35 @@
#include "mongo/s/resharding/resharding_feature_flag_gen.h"
namespace mongo {
+
+MONGO_FAIL_POINT_DEFINE(reshardCollectionJoinedExistingOperation);
+
namespace {
+// Returns a resharding instance if one exists for the same collection and shard key already.
+boost::optional<std::shared_ptr<ReshardingCoordinatorService::ReshardingCoordinator>>
+getExistingInstanceToJoin(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& newShardKey) {
+ auto reshardingCoordinatorService = checked_cast<ReshardingCoordinatorService*>(
+ repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext())
+ ->lookupServiceByName(ReshardingCoordinatorService::kServiceName));
+ auto instances = reshardingCoordinatorService->getAllReshardingInstances(opCtx);
+ for (auto& instance : instances) {
+ auto reshardingCoordinator =
+ checked_pointer_cast<ReshardingCoordinatorService::ReshardingCoordinator>(instance);
+
+ auto instanceMetadata = reshardingCoordinator->getMetadata();
+ if (SimpleBSONObjComparator::kInstance.evaluate(
+ instanceMetadata.getReshardingKey().toBSON() == newShardKey) &&
+ instanceMetadata.getSourceNss() == nss) {
+ return reshardingCoordinator;
+ };
+ }
+
+ return boost::none;
+}
+
class ConfigsvrReshardCollectionCommand final
: public TypedCommand<ConfigsvrReshardCollectionCommand> {
public:
@@ -109,7 +136,9 @@ public:
*presetChunks, opCtx, ShardKeyPattern(request().getKey()).getKeyPattern());
}
- auto instance = ([&] {
+ // Returns boost::none if there isn't any work to be done by the resharding operation.
+ auto instance = ([&]() -> boost::optional<std::shared_ptr<
+ ReshardingCoordinatorService::ReshardingCoordinator>> {
FixedFCVRegion fixedFcv(opCtx);
uassert(ErrorCodes::CommandNotSupported,
@@ -117,10 +146,28 @@ public:
resharding::gFeatureFlagResharding.isEnabled(
serverGlobalParams.featureCompatibility));
+ if (auto existingInstance =
+ getExistingInstanceToJoin(opCtx, nss, request().getKey())) {
+ // Join the existing resharding operation to prevent generating a new resharding
+ // instance if the same command is issued consecutively due to client disconnect
+ // etc.
+ reshardCollectionJoinedExistingOperation.pauseWhileSet(opCtx);
+ existingInstance.get()->getCoordinatorDocWrittenFuture().get(opCtx);
+ return existingInstance;
+ }
+
const auto cm = uassertStatusOK(
Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(
opCtx, nss));
+ const auto currentShardKey = cm.getShardKeyPattern().getKeyPattern();
+ if (SimpleBSONObjComparator::kInstance.evaluate(currentShardKey.toBSON() ==
+ request().getKey())) {
+ // There isn't any work to be done by the resharding operation since the
+ // existing shard key matches the desired new shard key.
+ return boost::none;
+ }
+
auto tempReshardingNss = constructTemporaryReshardingNss(
nss.db(), getCollectionUUIDFromChunkManger(nss, cm));
@@ -158,7 +205,11 @@ public:
return instance;
})();
- instance->getCompletionFuture().get(opCtx);
+ if (instance) {
+ // There is work to be done in order to have the collection's shard key match the
+ // requested shard key. Wait until the work is complete.
+ instance.get()->getCompletionFuture().get(opCtx);
+ }
}
private: