diff options
author | Haley Connelly <haley.connelly@mongodb.com> | 2021-06-07 21:46:23 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-06-15 16:16:44 +0000 |
commit | 9a804b395df987000d8bdab9ef14ef1ba19e03df (patch) | |
tree | df9d340b561e74fff0ce987204d32ce8dfffb97e | |
parent | aa50d1c7ee75512a5e860b2cede3b91bb957ae4b (diff) | |
download | mongo-9a804b395df987000d8bdab9ef14ef1ba19e03df.tar.gz |
SERVER-57182 reshardCollection command is shuffling data even when requested shard key already matches
(cherry picked from commit df3f17d4ae56f4d22304498405fb072d9627f14a)
6 files changed, 234 insertions, 10 deletions
diff --git a/jstests/sharding/reshard_collection_joins_existing_operation.js b/jstests/sharding/reshard_collection_joins_existing_operation.js new file mode 100644 index 00000000000..0ee7dce2b64 --- /dev/null +++ b/jstests/sharding/reshard_collection_joins_existing_operation.js @@ -0,0 +1,115 @@ +/** + * Tests that if a _configsvrReshardCollection command is issued and then the same command gets + * issued again, the second command issued joins with the instance of resharding already in + * progress. + * + * Use _configsvrReshardCollection instead of reshardCollection to exercise the behavior of the + * config server in the absence of the distributed lock taken by _shardsvrReshardCollection on the + * primary shard for the database. + * + * @tags: [ + * requires_fcv_49, + * uses_atclustertime, + * ] + */ +(function() { +"use strict"; + +load("jstests/libs/discover_topology.js"); +load("jstests/libs/parallelTester.js"); +load("jstests/sharding/libs/resharding_test_fixture.js"); + +// Generates a new thread to run _configsvrReshardCollection. +const makeConfigsvrReshardCollectionThread = (mongosConnString, configsvrReshardCollectionCmd) => { + return new Thread((mongosConnString, configsvrReshardCollectionCmd) => { + load("jstests/libs/discover_topology.js"); + const mongos = new Mongo(mongosConnString); + const topology = DiscoverTopology.findConnectedNodes(mongos); + const configsvr = new Mongo(topology.configsvr.nodes[0]); + assert.commandWorked(configsvr.adminCommand(configsvrReshardCollectionCmd)); + }, mongosConnString, configsvrReshardCollectionCmd); +}; + +const constructTempReshardingNs = (dbName, collName) => { + const sourceCollectionUUID = getUUIDFromListCollections(dbName, collName); + const sourceCollectionUUIDString = extractUUIDFromObject(sourceCollectionUUID); + return `${dbName}.system.resharding.${sourceCollectionUUIDString}`; +}; + +// Callers must ensure the temporary collection has actually been created by the time this is +// called. +const getTempUUID = (tempNs) => { + const tempCollection = mongos.getCollection(tempNs); + return getUUIDFromConfigCollections(mongos, tempCollection.getFullName()); +}; + +const st = new ShardingTest({ + mongos: 1, + config: 1, + shards: 2, + rs: {nodes: 2}, +}); + +const sourceCollection = st.s.getCollection("reshardingDb.coll"); + +CreateShardedCollectionUtil.shardCollectionWithChunks(sourceCollection, {oldKey: 1}, [ + {min: {oldKey: MinKey}, max: {oldKey: MaxKey}, shard: st.shard0.shardName}, +]); + +const reshardKey = { + newKey: 1 +}; +const configsvrReshardCollectionCmd = { + _configsvrReshardCollection: sourceCollection.getFullName(), + key: reshardKey, + writeConcern: {w: "majority"} +}; + +// Before starting the actual resharding, get the source collection's UUID to construct the +// namespace for the temporary collection that will be created. +const sourceDBName = sourceCollection.getDB(); +const sourceCollName = sourceCollection.getName(); +const tempNs = constructTempReshardingNs(sourceDBName, sourceCollName); + +const mongos = sourceCollection.getMongo(); +const topology = DiscoverTopology.findConnectedNodes(mongos); +const configsvr = new Mongo(topology.configsvr.nodes[0]); + +const reshardCollectionThread1 = + makeConfigsvrReshardCollectionThread(topology.mongos.nodes[0], configsvrReshardCollectionCmd); +const pauseBeforeCloningFP = + configureFailPoint(configsvr, "reshardingPauseCoordinatorBeforeCloning"); + +// Issue the first _configsvrReshardCollection command and pause after the temporary collection is +// created but before its config.collections entry is replaced. +reshardCollectionThread1.start(); +pauseBeforeCloningFP.wait(); + +// The UUID of the temporary resharding collection should become the UUID of the original collection +// once resharding has completed. +const expectedUUIDAfterReshardingCompletes = getTempUUID(tempNs); + +const reshardCollectionJoinedFP = + configureFailPoint(configsvr, "reshardCollectionJoinedExistingOperation"); +const reshardCollectionThread2 = + makeConfigsvrReshardCollectionThread(topology.mongos.nodes[0], configsvrReshardCollectionCmd); + +// Hitting the reshardCollectionJoinedFP is additional confirmation that the second +// _configsvrReshardCollection command (identical to the first) gets joined with the instance +// created/running for the first command issued. +reshardCollectionThread2.start(); +reshardCollectionJoinedFP.wait(); + +reshardCollectionJoinedFP.off(); +pauseBeforeCloningFP.off(); + +reshardCollectionThread2.join(); +reshardCollectionThread1.join(); + +// Confirm the UUID for the namespace that was resharded is the same as the temporary collection's +// UUID before the second reshardCollection command was issued. +const finalSourceCollectionUUID = getUUIDFromListCollections(sourceDBName, sourceCollName); +assert.eq(expectedUUIDAfterReshardingCompletes, finalSourceCollectionUUID); + +st.stop(); +})(); diff --git a/jstests/sharding/resharding_on_existing_key_is_noop.js b/jstests/sharding/resharding_on_existing_key_is_noop.js new file mode 100644 index 00000000000..dcc3f68178e --- /dev/null +++ b/jstests/sharding/resharding_on_existing_key_is_noop.js @@ -0,0 +1,39 @@ +/** + * Tests that trying to perform reshardCollection with a resharding key that matches the + * collection's existing shard key is a noop (which can be done by confirming the collection's UUID + * remains unchanged after the operation). + * + * @tags: [uses_atclustertime, requires_fcv_49,] + */ +(function() { +"use strict"; + +load("jstests/libs/uuid_util.js"); +load("jstests/sharding/libs/create_sharded_collection_util.js"); + +const st = new ShardingTest({ + mongos: 1, + config: 1, + shards: 2, + rs: {nodes: 2}, +}); + +const sourceCollection = st.s.getCollection("reshardingDb.coll"); + +CreateShardedCollectionUtil.shardCollectionWithChunks(sourceCollection, {key: 1}, [ + {min: {key: MinKey}, max: {key: 0}, shard: st.shard0.shardName}, + {min: {key: 0}, max: {key: MaxKey}, shard: st.shard1.shardName}, +]); + +const ns = sourceCollection.getFullName(); +const mongos = sourceCollection.getMongo(); +const sourceDB = sourceCollection.getDB(); + +// The UUID should remain the same if the resharding key matches the existing shard key. +const preReshardCollectionUUID = getUUIDFromListCollections(sourceDB, sourceCollection.getName()); +assert.commandWorked(mongos.adminCommand({reshardCollection: ns, key: {key: 1}})); +const postReshardCollectionUUID = getUUIDFromListCollections(sourceDB, sourceCollection.getName()); +assert.eq(preReshardCollectionUUID, postReshardCollectionUUID); + +st.stop(); +})(); diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp index 5586c71a25d..9830b44c5db 100644 --- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp +++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp @@ -48,8 +48,35 @@ #include "mongo/s/resharding/resharding_feature_flag_gen.h" namespace mongo { + +MONGO_FAIL_POINT_DEFINE(reshardCollectionJoinedExistingOperation); + namespace { +// Returns a resharding instance if one exists for the same collection and shard key already. +boost::optional<std::shared_ptr<ReshardingCoordinatorService::ReshardingCoordinator>> +getExistingInstanceToJoin(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& newShardKey) { + auto reshardingCoordinatorService = checked_cast<ReshardingCoordinatorService*>( + repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()) + ->lookupServiceByName(ReshardingCoordinatorService::kServiceName)); + auto instances = reshardingCoordinatorService->getAllReshardingInstances(opCtx); + for (auto& instance : instances) { + auto reshardingCoordinator = + checked_pointer_cast<ReshardingCoordinatorService::ReshardingCoordinator>(instance); + + auto instanceMetadata = reshardingCoordinator->getMetadata(); + if (SimpleBSONObjComparator::kInstance.evaluate( + instanceMetadata.getReshardingKey().toBSON() == newShardKey) && + instanceMetadata.getSourceNss() == nss) { + return reshardingCoordinator; + }; + } + + return boost::none; +} + class ConfigsvrReshardCollectionCommand final : public TypedCommand<ConfigsvrReshardCollectionCommand> { public: @@ -109,7 +136,9 @@ public: *presetChunks, opCtx, ShardKeyPattern(request().getKey()).getKeyPattern()); } - auto instance = ([&] { + // Returns boost::none if there isn't any work to be done by the resharding operation. + auto instance = ([&]() -> boost::optional<std::shared_ptr< + ReshardingCoordinatorService::ReshardingCoordinator>> { FixedFCVRegion fixedFcv(opCtx); uassert(ErrorCodes::CommandNotSupported, @@ -117,10 +146,28 @@ public: resharding::gFeatureFlagResharding.isEnabled( serverGlobalParams.featureCompatibility)); + if (auto existingInstance = + getExistingInstanceToJoin(opCtx, nss, request().getKey())) { + // Join the existing resharding operation to prevent generating a new resharding + // instance if the same command is issued consecutively due to client disconnect + // etc. + reshardCollectionJoinedExistingOperation.pauseWhileSet(opCtx); + existingInstance.get()->getCoordinatorDocWrittenFuture().get(opCtx); + return existingInstance; + } + const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh( opCtx, nss)); + const auto currentShardKey = cm.getShardKeyPattern().getKeyPattern(); + if (SimpleBSONObjComparator::kInstance.evaluate(currentShardKey.toBSON() == + request().getKey())) { + // There isn't any work to be done by the resharding operation since the + // existing shard key matches the desired new shard key. + return boost::none; + } + auto tempReshardingNss = constructTemporaryReshardingNss( nss.db(), getCollectionUUIDFromChunkManger(nss, cm)); @@ -158,7 +205,11 @@ public: return instance; })(); - instance->getCompletionFuture().get(opCtx); + if (instance) { + // There is work to be done in order to have the collection's shard key match the + // requested shard key. Wait until the work is complete. + instance.get()->getCompletionFuture().get(opCtx); + } } private: diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index c4f8a745c8d..18e8cc3b8aa 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -926,7 +926,10 @@ ThreadPool::Limits ReshardingCoordinatorService::getThreadPoolLimits() const { std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingCoordinatorService::constructInstance( BSONObj initialState) { return std::make_shared<ReshardingCoordinator>( - this, std::move(initialState), std::make_shared<ReshardingCoordinatorExternalStateImpl>()); + this, + ReshardingCoordinatorDocument::parse(IDLParserErrorContext("ReshardingCoordinatorStateDoc"), + std::move(initialState)), + std::make_shared<ReshardingCoordinatorExternalStateImpl>()); } ExecutorFuture<void> ReshardingCoordinatorService::_rebuildService( @@ -971,13 +974,13 @@ void ReshardingCoordinatorService::abortAllReshardCollection(OperationContext* o ReshardingCoordinatorService::ReshardingCoordinator::ReshardingCoordinator( const ReshardingCoordinatorService* coordinatorService, - const BSONObj& state, + const ReshardingCoordinatorDocument& coordinatorDoc, std::shared_ptr<ReshardingCoordinatorExternalState> externalState) : PrimaryOnlyService::TypedInstance<ReshardingCoordinator>(), - _id(state["_id"].wrap().getOwned()), + _id(coordinatorDoc.getReshardingUUID().toBSON()), _coordinatorService(coordinatorService), - _coordinatorDoc(ReshardingCoordinatorDocument::parse( - IDLParserErrorContext("ReshardingCoordinatorStateDoc"), state)), + _metadata(coordinatorDoc.getCommonReshardingMetadata()), + _coordinatorDoc(coordinatorDoc), _markKilledExecutor(std::make_shared<ThreadPool>([] { ThreadPool::Options options; options.poolName = "ReshardingCoordinatorCancelableOpCtxPool"; @@ -1122,7 +1125,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_runUntilReadyToPersistDeci } if (_coordinatorDoc.getState() < CoordinatorStateEnum::kPreparingToDonate) { - // Participants were never made aware of the resharding opeartion. Abort without + // Participants were never made aware of the resharding operation. Abort without // waiting for participant acknowledgement. _onAbortCoordinatorOnly(executor, status); } else { diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h index f6fda9129c7..454e6fd5c49 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h @@ -188,6 +188,11 @@ public: std::shared_ptr<PrimaryOnlyService::Instance> constructInstance(BSONObj initialState) override; + std::vector<std::shared_ptr<PrimaryOnlyService::Instance>> getAllReshardingInstances( + OperationContext* opCtx) { + return getAllInstances(opCtx); + } + /** * Tries to abort all active reshardCollection operations. Note that this doesn't differentiate * between operations interrupted due to stepdown or abort. Callers who wish to confirm that @@ -206,7 +211,7 @@ class ReshardingCoordinatorService::ReshardingCoordinator final public: explicit ReshardingCoordinator( const ReshardingCoordinatorService* coordinatorService, - const BSONObj& state, + const ReshardingCoordinatorDocument& coordinatorDoc, std::shared_ptr<ReshardingCoordinatorExternalState> externalState); ~ReshardingCoordinator() = default; @@ -226,6 +231,10 @@ public: void installCoordinatorDoc(OperationContext* opCtx, const ReshardingCoordinatorDocument& doc) noexcept; + CommonReshardingMetadata getMetadata() const { + return _metadata; + } + /** * Returns a Future that will be resolved when all work associated with this Instance has * completed running. @@ -423,6 +432,10 @@ private: // The primary-only service instance corresponding to the coordinator instance. Not owned. const ReshardingCoordinatorService* const _coordinatorService; + // The in-memory representation of the immutable portion of the document in + // config.reshardingOperations. + const CommonReshardingMetadata _metadata; + // Observes writes that indicate state changes for this resharding operation and notifies // 'this' when all donors/recipients have entered some state so that 'this' can transition // states. diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp index dfc98b2ed11..f816e805262 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp @@ -201,7 +201,10 @@ public: std::shared_ptr<PrimaryOnlyService::Instance> constructInstance(BSONObj initialState) override { return std::make_shared<ReshardingCoordinator>( - this, std::move(initialState), std::make_shared<ExternalStateForTest>()); + this, + ReshardingCoordinatorDocument::parse( + IDLParserErrorContext("ReshardingCoordinatorStateDoc"), std::move(initialState)), + std::make_shared<ExternalStateForTest>()); } }; |