diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2020-09-21 20:49:21 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-30 23:23:49 +0000 |
commit | 3d309638ce9f55e93bd500945dea2b63e1e591d6 (patch) | |
tree | b7b74f1193ab7a81a11acc6888e300ba97f194bd | |
parent | d8d30d268bfccc2bfd646d45bc2e339742c3a0ea (diff) | |
download | mongo-3d309638ce9f55e93bd500945dea2b63e1e591d6.tar.gz |
SERVER-50023 Implement temporary resharding collection creation logic on shard version update on recipient shards
-rw-r--r-- | src/mongo/db/s/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/s/clone_collection_options_from_primary_shard_cmd.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 110 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.h | 41 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager_test.cpp | 62 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_recipient_service.cpp | 86 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_recipient_service.h | 13 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_recipient_service_test.cpp | 487 | ||||
-rw-r--r-- | src/mongo/s/catalog_cache_test_fixture.cpp | 19 | ||||
-rw-r--r-- | src/mongo/s/catalog_cache_test_fixture.h | 10 |
10 files changed, 797 insertions, 43 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 138ca82d3f2..c8ae8134680 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -433,6 +433,7 @@ env.CppUnitTest( 'migration_session_id_test.cpp', 'migration_util_test.cpp', 'namespace_metadata_change_notifications_test.cpp', + 'resharding/resharding_recipient_service_test.cpp', 'resharding_destined_recipient_test.cpp', 'session_catalog_migration_destination_test.cpp', 'session_catalog_migration_source_test.cpp', @@ -459,6 +460,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/repl/wait_for_majority_service', '$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock', '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock', + '$BUILD_DIR/mongo/s/sharding_router_test_fixture', 'resharding_util', 'shard_server_test_fixture', 'sharding_logging', diff --git a/src/mongo/db/s/clone_collection_options_from_primary_shard_cmd.cpp b/src/mongo/db/s/clone_collection_options_from_primary_shard_cmd.cpp index 9b51dc21e81..1714facebef 100644 --- a/src/mongo/db/s/clone_collection_options_from_primary_shard_cmd.cpp +++ b/src/mongo/db/s/clone_collection_options_from_primary_shard_cmd.cpp @@ -51,9 +51,13 @@ public: void typedRun(OperationContext* opCtx) { auto primaryShardId = ShardId(request().getPrimaryShard().toString()); - auto collectionOptionsAndIndexes = - MigrationDestinationManager::getCollectionIndexesAndOptions( - opCtx, ns(), primaryShardId); + auto collectionOptionsAndIndexes = [&]() -> CollectionOptionsAndIndexes { + auto [collOptions, uuid] = MigrationDestinationManager::getCollectionOptions( + opCtx, ns(), primaryShardId, boost::none, boost::none); + auto [indexes, idIndex] = MigrationDestinationManager::getCollectionIndexes( + opCtx, ns(), primaryShardId, boost::none, boost::none); + return {uuid, indexes, idIndex, collOptions}; + }(); MigrationDestinationManager::cloneCollectionIndexesAndOptions( opCtx, ns(), collectionOptionsAndIndexes); diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index fe1952a8ffc..41f3b5c8cdd 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -50,6 +50,7 @@ #include "mongo/db/ops/delete.h" #include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/persistent_task_store.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/collection_sharding_runtime.h" @@ -68,6 +69,7 @@ #include "mongo/logv2/log.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/client/shard_registry.h" +#include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/stdx/chrono.h" @@ -91,6 +93,13 @@ const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, -1); +BSONObj makeLocalReadConcernWithAfterClusterTime(Timestamp afterClusterTime) { + return BSON(repl::ReadConcernArgs::kReadConcernFieldName + << BSON(repl::ReadConcernArgs::kLevelFieldName + << repl::readConcernLevels::kLocalName + << repl::ReadConcernArgs::kAfterClusterTimeFieldName << afterClusterTime)); +} + void checkOutSessionAndVerifyTxnState(OperationContext* opCtx) { MongoDOperationContextSession::checkOut(opCtx); TransactionParticipant::get(opCtx).beginOrContinue(opCtx, @@ -578,28 +587,38 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio return Status::OK(); } -CollectionOptionsAndIndexes MigrationDestinationManager::getCollectionIndexesAndOptions( - OperationContext* opCtx, const NamespaceString& nss, const ShardId& fromShardId) { +MigrationDestinationManager::IndexesAndIdIndex MigrationDestinationManager::getCollectionIndexes( + OperationContext* opCtx, + const NamespaceStringOrUUID& nssOrUUID, + const ShardId& fromShardId, + const boost::optional<ChunkManager>& cm, + boost::optional<Timestamp> afterClusterTime) { auto fromShard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, fromShardId)); - DisableDocumentValidation validationDisabler(opCtx); - std::vector<BSONObj> donorIndexSpecs; BSONObj donorIdIndexSpec; - BSONObj donorOptions; // Get the collection indexes and options from the donor shard. // Do not hold any locks while issuing remote calls. invariant(!opCtx->lockState()->isLocked()); + auto cmd = nssOrUUID.nss() ? BSON("listIndexes" << nssOrUUID.nss()->coll()) + : BSON("listIndexes" << *nssOrUUID.uuid()); + if (cm) { + cmd = appendShardVersion(cmd, cm->getVersion(fromShardId)); + } + if (afterClusterTime) { + cmd = cmd.addFields(makeLocalReadConcernWithAfterClusterTime(*afterClusterTime)); + } + // Get indexes by calling listIndexes against the donor. auto indexes = uassertStatusOK( fromShard->runExhaustiveCursorCommand(opCtx, ReadPreferenceSetting(ReadPreference::PrimaryOnly), - nss.db().toString(), - BSON("listIndexes" << nss.coll().toString()), + nssOrUUID.db().toString(), + cmd, Milliseconds(-1))); for (auto&& spec : indexes.docs) { @@ -612,18 +631,42 @@ CollectionOptionsAndIndexes MigrationDestinationManager::getCollectionIndexesAnd } } - // Get collection options by calling listCollections against the donor. - auto infosRes = uassertStatusOK(fromShard->runExhaustiveCursorCommand( - opCtx, - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - nss.db().toString(), - BSON("listCollections" << 1 << "filter" << BSON("name" << nss.coll())), - Milliseconds(-1))); + return {donorIndexSpecs, donorIdIndexSpec}; +} + +MigrationDestinationManager::CollectionOptionsAndUUID +MigrationDestinationManager::getCollectionOptions(OperationContext* opCtx, + const NamespaceStringOrUUID& nssOrUUID, + const ShardId& fromShardId, + const boost::optional<ChunkManager>& cm, + boost::optional<Timestamp> afterClusterTime) { + auto fromShard = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, fromShardId)); + + BSONObj fromOptions; + + auto cmd = nssOrUUID.nss() + ? BSON("listCollections" << 1 << "filter" << BSON("name" << nssOrUUID.nss()->coll())) + : BSON("listCollections" << 1 << "filter" << BSON("info.uuid" << *nssOrUUID.uuid())); + if (cm) { + cmd = appendDbVersionIfPresent(cmd, cm->dbVersion()); + } + if (afterClusterTime) { + cmd = cmd.addFields(makeLocalReadConcernWithAfterClusterTime(*afterClusterTime)); + } + + // Get collection options by calling listCollections against the from shard. + auto infosRes = uassertStatusOK( + fromShard->runExhaustiveCursorCommand(opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + nssOrUUID.db().toString(), + cmd, + Milliseconds(-1))); auto infos = infosRes.docs; uassert(ErrorCodes::NamespaceNotFound, str::stream() << "expected listCollections against the primary shard for " - << nss.toString() << " to return 1 entry, but got " << infos.size() + << nssOrUUID.toString() << " to return 1 entry, but got " << infos.size() << " entries", infos.size() == 1); @@ -632,10 +675,10 @@ CollectionOptionsAndIndexes MigrationDestinationManager::getCollectionIndexesAnd // The entire options include both the settable options under the 'options' field in the // listCollections response, and the UUID under the 'info' field. - BSONObjBuilder donorOptionsBob; + BSONObjBuilder fromOptionsBob; if (entry["options"].isABSONObj()) { - donorOptionsBob.appendElements(entry["options"].Obj()); + fromOptionsBob.appendElements(entry["options"].Obj()); } BSONObj info; @@ -644,25 +687,23 @@ CollectionOptionsAndIndexes MigrationDestinationManager::getCollectionIndexesAnd } uassert(ErrorCodes::InvalidUUID, - str::stream() << "The donor shard did not return a UUID for collection " << nss.ns() - << " as part of its listCollections response: " << entry - << ", but this node expects to see a UUID.", + str::stream() << "The from shard did not return a UUID for collection " + << nssOrUUID.toString() << " as part of its listCollections response: " + << entry << ", but this node expects to see a UUID.", !info["uuid"].eoo()); - auto donorUUID = info["uuid"].uuid(); + auto fromUUID = info["uuid"].uuid(); - donorOptionsBob.append(info["uuid"]); - donorOptions = donorOptionsBob.obj(); + fromOptionsBob.append(info["uuid"]); + fromOptions = fromOptionsBob.obj(); - return {donorUUID, donorIndexSpecs, donorIdIndexSpec, donorOptions}; + return {fromOptions, fromUUID}; } -void MigrationDestinationManager::cloneCollectionIndexesAndOptions( +void MigrationDestinationManager::_dropLocalIndexesIfNecessary( OperationContext* opCtx, const NamespaceString& nss, const CollectionOptionsAndIndexes& collectionOptionsAndIndexes) { - // 0. If this shard doesn't own any chunks for the collection to be cloned and the collection - // exists locally, we drop its indexes to guarantee that no stale indexes carry over. bool dropNonDonorIndexes = [&]() -> bool { AutoGetCollection autoColl(opCtx, nss, MODE_IS); auto* const css = CollectionShardingRuntime::get(opCtx, nss); @@ -711,7 +752,12 @@ void MigrationDestinationManager::cloneCollectionIndexesAndOptions( } } } +} +void MigrationDestinationManager::cloneCollectionIndexesAndOptions( + OperationContext* opCtx, + const NamespaceString& nss, + const CollectionOptionsAndIndexes& collectionOptionsAndIndexes) { { // 1. Create the collection (if it doesn't already exist) and create any indexes we are // missing (auto-heal indexes). @@ -881,8 +927,13 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { invariant(initialState == READY); - auto donorCollectionOptionsAndIndexes = - getCollectionIndexesAndOptions(outerOpCtx, _nss, _fromShard); + auto donorCollectionOptionsAndIndexes = [&]() -> CollectionOptionsAndIndexes { + auto [collOptions, uuid] = + getCollectionOptions(outerOpCtx, _nss, _fromShard, boost::none, boost::none); + auto [indexes, idIndex] = + getCollectionIndexes(outerOpCtx, _nss, _fromShard, boost::none, boost::none); + return {uuid, indexes, idIndex, collOptions}; + }(); auto fromShard = uassertStatusOK(Grid::get(outerOpCtx)->shardRegistry()->getShard(outerOpCtx, _fromShard)); @@ -965,6 +1016,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { auto opCtx = newOpCtxPtr.get(); { + _dropLocalIndexesIfNecessary(opCtx, _nss, donorCollectionOptionsAndIndexes); cloneCollectionIndexesAndOptions(opCtx, _nss, donorCollectionOptionsAndIndexes); timing.done(2); diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index d544a5de1bf..761f8c9c4ef 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -41,6 +41,7 @@ #include "mongo/db/s/migration_session_id.h" #include "mongo/db/s/session_catalog_migration_destination.h" #include "mongo/platform/mutex.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/shard_id.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/thread.h" @@ -135,11 +136,34 @@ public: Status startCommit(const MigrationSessionId& sessionId); /** - * Gets the collection uuid, options and indexes from fromShardId. + * Gets the collection indexes from fromShardId. If given a chunk manager, will fetch the + * indexes using the shard version protocol. */ - static CollectionOptionsAndIndexes getCollectionIndexesAndOptions(OperationContext* opCtx, - const NamespaceString& nss, - const ShardId& fromShardId); + struct IndexesAndIdIndex { + std::vector<BSONObj> indexSpecs; + BSONObj idIndexSpec; + }; + static IndexesAndIdIndex getCollectionIndexes(OperationContext* opCtx, + const NamespaceStringOrUUID& nssOrUUID, + const ShardId& fromShardId, + const boost::optional<ChunkManager>& cm, + boost::optional<Timestamp> afterClusterTime); + + /** + * Gets the collection uuid and options from fromShardId. If given a chunk manager, will fetch + * the collection options using the database version protocol. + */ + struct CollectionOptionsAndUUID { + BSONObj options; + UUID uuid; + }; + static CollectionOptionsAndUUID getCollectionOptions( + OperationContext* opCtx, + const NamespaceStringOrUUID& nssOrUUID, + const ShardId& fromShardId, + const boost::optional<ChunkManager>& cm, + boost::optional<Timestamp> afterClusterTime); + /** * Creates the collection on the shard and clones the indexes and options. @@ -169,6 +193,15 @@ private: bool _flushPendingWrites(OperationContext* opCtx, const repl::OpTime& lastOpApplied); /** + * If this shard doesn't own any chunks for the collection to be cloned and the collection + * exists locally, drops its indexes to guarantee that no stale indexes carry over. + */ + void _dropLocalIndexesIfNecessary( + OperationContext* opCtx, + const NamespaceString& nss, + const CollectionOptionsAndIndexes& collectionOptionsAndIndexes); + + /** * Remembers a chunk range between 'min' and 'max' as a range which will have data migrated * into it, to protect it against separate commands to clean up orphaned data. First, though, * it schedules deletion of any documents in the range, so that process must be seen to be diff --git a/src/mongo/db/s/migration_destination_manager_test.cpp b/src/mongo/db/s/migration_destination_manager_test.cpp index 51aa7174b35..3d677d5c025 100644 --- a/src/mongo/db/s/migration_destination_manager_test.cpp +++ b/src/mongo/db/s/migration_destination_manager_test.cpp @@ -31,6 +31,7 @@ #include "mongo/db/s/migration_destination_manager.h" #include "mongo/db/s/shard_server_test_fixture.h" +#include "mongo/s/catalog_cache_test_fixture.h" namespace mongo { namespace { @@ -156,5 +157,66 @@ TEST_F(MigrationDestinationManagerTest, CloneDocumentsCatchesInsertErrors) { ASSERT_EQ(operationContext()->getKillStatus(), 51008); } +using MigrationDestinationManagerNetworkTest = CatalogCacheTestFixture; + +// Verifies MigrationDestinationManager::getCollectionOptions() and +// MigrationDestinationManager::getCollectionIndexes() won't use shard/db versioning without a chunk +// manager and won't include a read concern without afterClusterTime. +TEST_F(MigrationDestinationManagerNetworkTest, + MigrationDestinationManagerGetIndexesAndCollectionsNoVersionsOrReadConcern) { + const NamespaceString nss("db.foo"); + + // Shard nss by _id with chunks [minKey, 0), [0, maxKey] on shards "0" and "1" respectively. + // ShardId("1") is the primary shard for the database. + auto shards = setupNShards(2); + auto cm = loadRoutingTableWithTwoChunksAndTwoShardsImpl( + nss, BSON("_id" << 1), boost::optional<std::string>("1")); + + auto future = launchAsync([&] { + onCommand([&](const executor::RemoteCommandRequest& request) { + ASSERT_EQ(request.cmdObj.firstElementFieldName(), "listCollections"_sd); + ASSERT_EQUALS(request.target, HostAndPort("Host0:12345")); + ASSERT_FALSE(request.cmdObj.hasField("readConcern")); + ASSERT_FALSE(request.cmdObj.hasField("databaseVersion")); + ASSERT_BSONOBJ_EQ(request.cmdObj["filter"].Obj(), BSON("name" << nss.coll())); + + const std::vector<BSONObj> colls = { + BSON("name" << nss.coll() << "options" << BSONObj() << "info" + << BSON("readOnly" << false << "uuid" << UUID::gen()) << "idIndex" + << BSON("v" << 2 << "key" << BSON("_id" << 1) << "name" + << "_id_"))}; + + std::string listCollectionsNs = str::stream() << nss.db() << "$cmd.listCollections"; + return BSON( + "ok" << 1 << "cursor" + << BSON("id" << 0LL << "ns" << listCollectionsNs << "firstBatch" << colls)); + }); + }); + + MigrationDestinationManager::getCollectionOptions( + operationContext(), nss, ShardId("0"), boost::none, boost::none); + + future.default_timed_get(); + + future = launchAsync([&] { + onCommand([&](const executor::RemoteCommandRequest& request) { + ASSERT_EQ(request.cmdObj.firstElementFieldName(), "listIndexes"_sd); + ASSERT_EQUALS(request.target, HostAndPort("Host0:12345")); + ASSERT_FALSE(request.cmdObj.hasField("readConcern")); + ASSERT_FALSE(request.cmdObj.hasField("shardVersion")); + + const std::vector<BSONObj> indexes = {BSON("v" << 2 << "key" << BSON("_id" << 1) + << "name" + << "_id_")}; + return BSON("ok" << 1 << "cursor" + << BSON("id" << 0LL << "ns" << nss.ns() << "firstBatch" << indexes)); + }); + }); + + MigrationDestinationManager::getCollectionIndexes( + operationContext(), nss, ShardId("0"), boost::none, boost::none); + future.default_timed_get(); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index 73256ff0fab..2405cdf3222 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -31,12 +31,95 @@ #include "mongo/db/s/resharding/resharding_recipient_service.h" +#include "mongo/db/catalog_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/persistent_task_store.h" +#include "mongo/db/pipeline/sharded_agg_helpers.h" +#include "mongo/db/repl/read_concern_args.h" +#include "mongo/db/s/migration_destination_manager.h" +#include "mongo/db/s/resharding_util.h" #include "mongo/logv2/log.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/cluster_commands_helpers.h" +#include "mongo/s/grid.h" namespace mongo { +namespace resharding { + +void createTemporaryReshardingCollectionLocally(OperationContext* opCtx, + const NamespaceString& reshardingNss, + Timestamp fetchTimestamp) { + LOGV2_DEBUG( + 5002300, 1, "Creating temporary resharding collection", "namespace"_attr = reshardingNss); + + auto catalogCache = Grid::get(opCtx)->catalogCache(); + auto reshardingCm = + uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, reshardingNss)); + uassert( + 5002301, + "Expected cached metadata for resharding temporary collection to have resharding fields", + reshardingCm.getReshardingFields() && + reshardingCm.getReshardingFields()->getRecipientFields()); + auto originalNss = + reshardingCm.getReshardingFields()->getRecipientFields()->getOriginalNamespace(); + + // Load the original collection's options from the database's primary shard. + auto [collOptions, uuid] = sharded_agg_helpers::shardVersionRetry( + opCtx, + catalogCache, + reshardingNss, + "loading collection options to create temporary resharding collection"_sd, + [&]() -> MigrationDestinationManager::CollectionOptionsAndUUID { + auto originalCm = + uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, originalNss)); + uassert(ErrorCodes::InvalidUUID, + "Expected cached metadata for resharding temporary collection to have a UUID", + originalCm.getUUID()); + return MigrationDestinationManager::getCollectionOptions( + opCtx, + NamespaceStringOrUUID(originalNss.db().toString(), *originalCm.getUUID()), + originalCm.dbPrimary(), + originalCm, + fetchTimestamp); + }); + + // Load the original collection's indexes from the shard that owns the global minimum chunk. + auto [indexes, idIndex] = sharded_agg_helpers::shardVersionRetry( + opCtx, + catalogCache, + reshardingNss, + "loading indexes to create temporary resharding collection"_sd, + [&]() -> MigrationDestinationManager::IndexesAndIdIndex { + auto originalCm = + uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, originalNss)); + uassert(ErrorCodes::NamespaceNotSharded, + str::stream() << "Expected collection " << originalNss << " to be sharded", + originalCm.isSharded()); + uassert(ErrorCodes::InvalidUUID, + "Expected cached metadata for resharding temporary collection to have a UUID", + originalCm.getUUID()); + auto indexShardId = originalCm.getMinKeyShardIdWithSimpleCollation(); + return MigrationDestinationManager::getCollectionIndexes( + opCtx, + NamespaceStringOrUUID(originalNss.db().toString(), *originalCm.getUUID()), + indexShardId, + originalCm, + fetchTimestamp); + }); + + // Set the temporary resharding collection's UUID to the resharding UUID. Note that + // BSONObj::addFields() replaces any fields that already exist. + auto reshardingUUID = reshardingCm.getReshardingFields()->getUuid(); + collOptions = collOptions.addFields(BSON("uuid" << reshardingUUID)); + + CollectionOptionsAndIndexes optionsAndIndexes = {reshardingUUID, indexes, idIndex, collOptions}; + MigrationDestinationManager::cloneCollectionIndexesAndOptions( + opCtx, reshardingNss, optionsAndIndexes); +} + +} // namespace resharding + std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingRecipientService::constructInstance( BSONObj initialState) const { return std::make_shared<RecipientStateMachine>(std::move(initialState)); @@ -129,6 +212,9 @@ void ReshardingRecipientService::RecipientStateMachine:: return; } + // TODO SERVER-51217: Call + // resharding_recipient_service_util::createTemporaryReshardingCollectionLocally() + _transitionState(RecipientStateEnum::kInitialized); } diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index ec38bbbdde8..0c33c303451 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -36,6 +36,19 @@ namespace mongo { constexpr StringData kReshardingRecipientServiceName = "ReshardingRecipientService"_sd; +namespace resharding { + +/** + * Creates the temporary resharding collection locally by loading the collection options and + * collection indexes from the original collection's primary and MinKey owning chunk shards, + * respectively. + */ +void createTemporaryReshardingCollectionLocally(OperationContext* opCtx, + const NamespaceString& reshardingNss, + Timestamp fetchTimestamp); + +} // namespace resharding + class ReshardingRecipientService final : public repl::PrimaryOnlyService { public: explicit ReshardingRecipientService(ServiceContext* serviceContext) diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp new file mode 100644 index 00000000000..3ddb9efd4e1 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp @@ -0,0 +1,487 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#include "mongo/platform/basic.h" + +#include "mongo/bson/unordered_fields_bsonobj_comparator.h" +#include "mongo/db/catalog_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/storage_interface_impl.h" +#include "mongo/db/s/migration_destination_manager.h" +#include "mongo/db/s/resharding/resharding_recipient_service.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/db/session_catalog_mongod.h" +#include "mongo/logv2/log.h" +#include "mongo/s/catalog_cache_test_fixture.h" +#include "mongo/s/database_version_helpers.h" +#include "mongo/s/stale_exception.h" + +namespace mongo { +namespace { + +class ReshardingRecipientServiceTest : public ServiceContextMongoDTest, + public CatalogCacheTestFixture { +public: + const UUID kOrigUUID = UUID::gen(); + const NamespaceString kOrigNss = NamespaceString("db.foo"); + const ShardKeyPattern kReshardingKey = ShardKeyPattern(BSON("newKey" << 1)); + const OID kReshardingEpoch = OID::gen(); + const UUID kReshardingUUID = UUID::gen(); + const NamespaceString kReshardingNss = NamespaceString( + str::stream() << "db." << NamespaceString::kTemporaryReshardingCollectionPrefix + << kOrigUUID); + const Timestamp kDefaultFetchTimestamp = Timestamp(200, 1); + + void setUp() override { + CatalogCacheTestFixture::setUp(); + + repl::ReplicationCoordinator::set( + getServiceContext(), + std::make_unique<repl::ReplicationCoordinatorMock>(getServiceContext())); + ASSERT_OK(repl::ReplicationCoordinator::get(getServiceContext()) + ->setFollowerMode(repl::MemberState::RS_PRIMARY)); + + auto _storageInterfaceImpl = std::make_unique<repl::StorageInterfaceImpl>(); + repl::StorageInterface::set(getServiceContext(), std::move(_storageInterfaceImpl)); + + repl::setOplogCollectionName(getServiceContext()); + repl::createOplog(operationContext()); + MongoDSessionCatalog::onStepUp(operationContext()); + } + + void tearDown() override { + CatalogCacheTestFixture::tearDown(); + } + + void expectListCollections(const NamespaceString& nss, + UUID uuid, + const std::vector<BSONObj>& collectionsDocs, + const HostAndPort& expectedHost) { + onCommand([&](const executor::RemoteCommandRequest& request) { + ASSERT_EQ(request.cmdObj.firstElementFieldName(), "listCollections"_sd); + ASSERT_EQUALS(nss.db(), request.dbname); + ASSERT_EQUALS(expectedHost, request.target); + ASSERT_BSONOBJ_EQ(request.cmdObj["filter"].Obj(), BSON("info.uuid" << uuid)); + ASSERT(request.cmdObj.hasField("databaseVersion")); + ASSERT_BSONOBJ_EQ(request.cmdObj["readConcern"].Obj(), + BSON("level" + << "local" + << "afterClusterTime" << kDefaultFetchTimestamp)); + + std::string listCollectionsNs = str::stream() << nss.db() << "$cmd.listCollections"; + return BSON("ok" << 1 << "cursor" + << BSON("id" << 0LL << "ns" << listCollectionsNs << "firstBatch" + << collectionsDocs)); + }); + } + + void expectListIndexes(const NamespaceString& nss, + UUID uuid, + const std::vector<BSONObj>& indexDocs, + const HostAndPort& expectedHost) { + onCommand([&](const executor::RemoteCommandRequest& request) { + ASSERT_EQ(request.cmdObj.firstElementFieldName(), "listIndexes"_sd); + ASSERT_EQUALS(nss.db(), request.dbname); + ASSERT_EQUALS(expectedHost, request.target); + ASSERT_EQ(unittest::assertGet(UUID::parse(request.cmdObj.firstElement())), uuid); + ASSERT(request.cmdObj.hasField("shardVersion")); + ASSERT_BSONOBJ_EQ(request.cmdObj["readConcern"].Obj(), + BSON("level" + << "local" + << "afterClusterTime" << kDefaultFetchTimestamp)); + + return BSON("ok" << 1 << "cursor" + << BSON("id" << 0LL << "ns" << nss.ns() << "firstBatch" << indexDocs)); + }); + } + + // Loads the metadata for the temporary resharding collection into the catalog cache by mocking + // network responses. The collection contains a single chunk from minKey to maxKey for the given + // shard key. + void loadOneChunkMetadataForTemporaryReshardingColl(const NamespaceString& tempNss, + const NamespaceString& origNss, + const ShardKeyPattern& skey, + UUID uuid, + OID epoch) { + auto future = scheduleRoutingInfoForcedRefresh(tempNss); + + expectFindSendBSONObjVector(kConfigHostAndPort, [&]() { + CollectionType coll; + coll.setNs(tempNss); + coll.setEpoch(epoch); + coll.setKeyPattern(skey.getKeyPattern()); + coll.setUnique(false); + coll.setUUID(uuid); + + TypeCollectionReshardingFields reshardingFields; + reshardingFields.setUuid(uuid); + TypeCollectionRecipientFields recipientFields; + recipientFields.setOriginalNamespace(origNss); + reshardingFields.setRecipientFields(recipientFields); + coll.setReshardingFields(reshardingFields); + + return std::vector<BSONObj>{coll.toBSON()}; + }()); + expectFindSendBSONObjVector(kConfigHostAndPort, [&]() { + ChunkVersion version(1, 0, epoch); + + ChunkType chunk(tempNss, + {skey.getKeyPattern().globalMin(), skey.getKeyPattern().globalMax()}, + version, + {"0"}); + chunk.setName(OID::gen()); + version.incMinor(); + + return std::vector<BSONObj>{chunk.toConfigBSON()}; + }()); + + future.default_timed_get(); + } + + void expectStaleDbVersionError(const NamespaceString& nss, StringData expectedCmdName) { + onCommand([&](const executor::RemoteCommandRequest& request) { + ASSERT_EQ(request.cmdObj.firstElementFieldNameStringData(), expectedCmdName); + return createErrorCursorResponse(Status( + StaleDbRoutingVersion(nss.db().toString(), databaseVersion::makeNew(), boost::none), + "dummy stale db version error")); + }); + } + + void expectStaleEpochError(const NamespaceString& nss, StringData expectedCmdName) { + onCommand([&](const executor::RemoteCommandRequest& request) { + ASSERT_EQ(request.cmdObj.firstElementFieldNameStringData(), expectedCmdName); + return createErrorCursorResponse( + Status(ErrorCodes::StaleEpoch, "dummy stale epoch error")); + }); + } + + void verifyCollectionAndIndexes(const NamespaceString& nss, + UUID uuid, + const std::vector<BSONObj>& indexes) { + DBDirectClient client(operationContext()); + + auto collInfos = client.getCollectionInfos(nss.db().toString()); + ASSERT_EQ(collInfos.size(), 1); + ASSERT_EQ(collInfos.front()["name"].str(), nss.coll()); + ASSERT_EQ(unittest::assertGet(UUID::parse(collInfos.front()["info"]["uuid"])), uuid); + + auto indexSpecs = client.getIndexSpecs(nss, false, 0); + ASSERT_EQ(indexSpecs.size(), indexes.size()); + + UnorderedFieldsBSONObjComparator comparator; + std::vector<BSONObj> indexesCopy(indexes); + for (const auto& indexSpec : indexSpecs) { + for (auto it = indexesCopy.begin(); it != indexesCopy.end(); it++) { + if (comparator.evaluate(indexSpec == *it)) { + indexesCopy.erase(it); + break; + } + } + } + ASSERT_EQ(indexesCopy.size(), 0); + } +}; + +TEST_F(ReshardingRecipientServiceTest, CreateLocalReshardingCollectionBasic) { + auto shards = setupNShards(2); + + // Shard kOrigNss by _id with chunks [minKey, 0), [0, maxKey] on shards "0" and "1" + // respectively. ShardId("1") is the primary shard for the database. + loadRoutingTableWithTwoChunksAndTwoShardsImpl( + kOrigNss, BSON("_id" << 1), boost::optional<std::string>("1"), kOrigUUID); + + { + // The resharding collection shouldn't exist yet. + AutoGetCollection autoColl(operationContext(), kReshardingNss, MODE_IS); + ASSERT_FALSE(autoColl.getCollection()); + } + + // Simulate a refresh for the temporary resharding collection. + loadOneChunkMetadataForTemporaryReshardingColl( + kReshardingNss, kOrigNss, kReshardingKey, kReshardingUUID, kReshardingEpoch); + + const std::vector<BSONObj> indexes = {BSON("v" << 2 << "key" << BSON("_id" << 1) << "name" + << "_id_"), + BSON("v" << 2 << "key" + << BSON("a" << 1 << "b" + << "hashed") + << "name" + << "indexOne")}; + auto future = launchAsync([&] { + expectListCollections( + kOrigNss, + kOrigUUID, + {BSON("name" << kOrigNss.coll() << "options" << BSONObj() << "info" + << BSON("readOnly" << false << "uuid" << kOrigUUID) << "idIndex" + << BSON("v" << 2 << "key" << BSON("_id" << 1) << "name" + << "_id_"))}, + HostAndPort(shards[1].getHost())); + expectListIndexes(kOrigNss, kOrigUUID, indexes, HostAndPort(shards[0].getHost())); + }); + + resharding::createTemporaryReshardingCollectionLocally( + operationContext(), kReshardingNss, kDefaultFetchTimestamp); + + future.default_timed_get(); + + verifyCollectionAndIndexes(kReshardingNss, kReshardingUUID, indexes); +} + +TEST_F(ReshardingRecipientServiceTest, + CreatingLocalReshardingCollectionRetriesOnStaleVersionErrors) { + auto shards = setupNShards(2); + + // Shard kOrigNss by _id with chunks [minKey, 0), [0, maxKey] on shards "0" and "1" + // respectively. ShardId("1") is the primary shard for the database. + loadRoutingTableWithTwoChunksAndTwoShardsImpl( + kOrigNss, BSON("_id" << 1), boost::optional<std::string>("1"), kOrigUUID); + + { + // The resharding collection shouldn't exist yet. + AutoGetCollection autoColl(operationContext(), kReshardingNss, MODE_IS); + ASSERT_FALSE(autoColl.getCollection()); + } + + // Simulate a refresh for the temporary resharding collection. + loadOneChunkMetadataForTemporaryReshardingColl( + kReshardingNss, kOrigNss, kReshardingKey, kReshardingUUID, kReshardingEpoch); + + const std::vector<BSONObj> indexes = {BSON("v" << 2 << "key" << BSON("_id" << 1) << "name" + << "_id_"), + BSON("v" << 2 << "key" + << BSON("a" << 1 << "b" + << "hashed") + << "name" + << "indexOne")}; + auto future = launchAsync([&] { + expectStaleDbVersionError(kOrigNss, "listCollections"); + expectListCollections( + kOrigNss, + kOrigUUID, + {BSON("name" << kOrigNss.coll() << "options" << BSONObj() << "info" + << BSON("readOnly" << false << "uuid" << kOrigUUID) << "idIndex" + << BSON("v" << 2 << "key" << BSON("_id" << 1) << "name" + << "_id_"))}, + HostAndPort(shards[1].getHost())); + + expectStaleEpochError(kOrigNss, "listIndexes"); + expectListIndexes(kOrigNss, kOrigUUID, indexes, HostAndPort(shards[0].getHost())); + }); + + resharding::createTemporaryReshardingCollectionLocally( + operationContext(), kReshardingNss, kDefaultFetchTimestamp); + + future.default_timed_get(); + + verifyCollectionAndIndexes(kReshardingNss, kReshardingUUID, indexes); +} + +TEST_F(ReshardingRecipientServiceTest, + CreateLocalReshardingCollectionCollectionAlreadyExistsWithNoIndexes) { + auto shards = setupNShards(2); + + // Shard kOrigNss by _id with chunks [minKey, 0), [0, maxKey] on shards "0" and "1" + // respectively. ShardId("1") is the primary shard for the database. + loadRoutingTableWithTwoChunksAndTwoShardsImpl( + kOrigNss, BSON("_id" << 1), boost::optional<std::string>("1"), kOrigUUID); + + { + // The resharding collection shouldn't exist yet. + AutoGetCollection autoColl(operationContext(), kReshardingNss, MODE_IS); + ASSERT_FALSE(autoColl.getCollection()); + } + + // Simulate a refresh for the temporary resharding collection. + loadOneChunkMetadataForTemporaryReshardingColl( + kReshardingNss, kOrigNss, kReshardingKey, kReshardingUUID, kReshardingEpoch); + + const std::vector<BSONObj> indexes = {BSON("v" << 2 << "key" << BSON("_id" << 1) << "name" + << "_id_"), + BSON("v" << 2 << "key" + << BSON("a" << 1 << "b" + << "hashed") + << "name" + << "indexOne")}; + + // Create the collection and indexes to simulate retrying after a failover. Only include the id + // index, because it is needed to create the collection. + CollectionOptionsAndIndexes optionsAndIndexes = { + kReshardingUUID, {indexes[0]}, indexes[0], BSON("uuid" << kReshardingUUID)}; + MigrationDestinationManager::cloneCollectionIndexesAndOptions( + operationContext(), kReshardingNss, optionsAndIndexes); + + { + // The collection should exist locally but only have the _id index. + DBDirectClient client(operationContext()); + auto indexSpecs = client.getIndexSpecs(kReshardingNss, false, 0); + ASSERT_EQ(indexSpecs.size(), 1); + } + + auto future = launchAsync([&] { + expectListCollections( + kOrigNss, + kOrigUUID, + {BSON("name" << kOrigNss.coll() << "options" << BSONObj() << "info" + << BSON("readOnly" << false << "uuid" << kOrigUUID) << "idIndex" + << BSON("v" << 2 << "key" << BSON("_id" << 1) << "name" + << "_id_"))}, + HostAndPort(shards[1].getHost())); + expectListIndexes(kOrigNss, kOrigUUID, indexes, HostAndPort(shards[0].getHost())); + }); + + resharding::createTemporaryReshardingCollectionLocally( + operationContext(), kReshardingNss, kDefaultFetchTimestamp); + + future.default_timed_get(); + + verifyCollectionAndIndexes(kReshardingNss, kReshardingUUID, indexes); +} + +TEST_F(ReshardingRecipientServiceTest, + CreateLocalReshardingCollectionCollectionAlreadyExistsWithSomeIndexes) { + auto shards = setupNShards(2); + + // Shard kOrigNss by _id with chunks [minKey, 0), [0, maxKey] on shards "0" and "1" + // respectively. ShardId("1") is the primary shard for the database. + loadRoutingTableWithTwoChunksAndTwoShardsImpl( + kOrigNss, BSON("_id" << 1), boost::optional<std::string>("1"), kOrigUUID); + + { + // The resharding collection shouldn't exist yet. + AutoGetCollection autoColl(operationContext(), kReshardingNss, MODE_IS); + ASSERT_FALSE(autoColl.getCollection()); + } + + // Simulate a refresh for the temporary resharding collection. + loadOneChunkMetadataForTemporaryReshardingColl( + kReshardingNss, kOrigNss, kReshardingKey, kReshardingUUID, kReshardingEpoch); + + const std::vector<BSONObj> indexes = {BSON("v" << 2 << "key" << BSON("_id" << 1) << "name" + << "_id_"), + BSON("v" << 2 << "key" + << BSON("a" << 1 << "b" + << "hashed") + << "name" + << "indexOne"), + BSON("v" << 2 << "key" << BSON("c.d" << 1) << "name" + << "nested")}; + + // Create the collection and indexes to simulate retrying after a failover. Only include the id + // index, because it is needed to create the collection. + CollectionOptionsAndIndexes optionsAndIndexes = { + kReshardingUUID, {indexes[0], indexes[2]}, indexes[0], BSON("uuid" << kReshardingUUID)}; + MigrationDestinationManager::cloneCollectionIndexesAndOptions( + operationContext(), kReshardingNss, optionsAndIndexes); + + { + // The collection should exist locally but only have the _id index. + DBDirectClient client(operationContext()); + auto indexSpecs = client.getIndexSpecs(kReshardingNss, false, 0); + ASSERT_EQ(indexSpecs.size(), 2); + } + + auto future = launchAsync([&] { + expectListCollections( + kOrigNss, + kOrigUUID, + {BSON("name" << kOrigNss.coll() << "options" << BSONObj() << "info" + << BSON("readOnly" << false << "uuid" << kOrigUUID) << "idIndex" + << BSON("v" << 2 << "key" << BSON("_id" << 1) << "name" + << "_id_"))}, + HostAndPort(shards[1].getHost())); + expectListIndexes(kOrigNss, kOrigUUID, indexes, HostAndPort(shards[0].getHost())); + }); + + resharding::createTemporaryReshardingCollectionLocally( + operationContext(), kReshardingNss, kDefaultFetchTimestamp); + + future.default_timed_get(); + + verifyCollectionAndIndexes(kReshardingNss, kReshardingUUID, indexes); +} + +TEST_F(ReshardingRecipientServiceTest, + CreateLocalReshardingCollectionCollectionAlreadyExistsWithAllIndexes) { + auto shards = setupNShards(2); + + // Shard kOrigNss by _id with chunks [minKey, 0), [0, maxKey] on shards "0" and "1" + // respectively. ShardId("1") is the primary shard for the database. + loadRoutingTableWithTwoChunksAndTwoShardsImpl( + kOrigNss, BSON("_id" << 1), boost::optional<std::string>("1"), kOrigUUID); + + { + // The resharding collection shouldn't exist yet. + AutoGetCollection autoColl(operationContext(), kReshardingNss, MODE_IS); + ASSERT_FALSE(autoColl.getCollection()); + } + + // Simulate a refresh for the temporary resharding collection. + loadOneChunkMetadataForTemporaryReshardingColl( + kReshardingNss, kOrigNss, kReshardingKey, kReshardingUUID, kReshardingEpoch); + + const std::vector<BSONObj> indexes = {BSON("v" << 2 << "key" << BSON("_id" << 1) << "name" + << "_id_"), + BSON("v" << 2 << "key" + << BSON("a" << 1 << "b" + << "hashed") + << "name" + << "indexOne")}; + + // Create the collection and indexes to simulate retrying after a failover. + CollectionOptionsAndIndexes optionsAndIndexes = { + kReshardingUUID, indexes, indexes[0], BSON("uuid" << kReshardingUUID)}; + MigrationDestinationManager::cloneCollectionIndexesAndOptions( + operationContext(), kReshardingNss, optionsAndIndexes); + + auto future = launchAsync([&] { + expectListCollections( + kOrigNss, + kOrigUUID, + {BSON("name" << kOrigNss.coll() << "options" << BSONObj() << "info" + << BSON("readOnly" << false << "uuid" << kOrigUUID) << "idIndex" + << BSON("v" << 2 << "key" << BSON("_id" << 1) << "name" + << "_id_"))}, + HostAndPort(shards[1].getHost())); + expectListIndexes(kOrigNss, kOrigUUID, indexes, HostAndPort(shards[0].getHost())); + }); + + resharding::createTemporaryReshardingCollectionLocally( + operationContext(), kReshardingNss, kDefaultFetchTimestamp); + + future.default_timed_get(); + + verifyCollectionAndIndexes(kReshardingNss, kReshardingUUID, indexes); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/catalog_cache_test_fixture.cpp b/src/mongo/s/catalog_cache_test_fixture.cpp index 4f59eeaef8a..d537c4d770a 100644 --- a/src/mongo/s/catalog_cache_test_fixture.cpp +++ b/src/mongo/s/catalog_cache_test_fixture.cpp @@ -197,13 +197,17 @@ void CatalogCacheTestFixture::expectGetDatabase(NamespaceString nss, std::string void CatalogCacheTestFixture::expectGetCollection(NamespaceString nss, OID epoch, - const ShardKeyPattern& shardKeyPattern) { + const ShardKeyPattern& shardKeyPattern, + boost::optional<UUID> uuid) { expectFindSendBSONObjVector(kConfigHostAndPort, [&]() { CollectionType collType; collType.setNs(nss); collType.setEpoch(epoch); collType.setKeyPattern(shardKeyPattern.toBSON()); collType.setUnique(false); + if (uuid) { + collType.setUUID(*uuid); + } return std::vector<BSONObj>{collType.toBSON()}; }()); @@ -224,7 +228,10 @@ ChunkManager CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShardsH } ChunkManager CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShardsImpl( - NamespaceString nss, const BSONObj& shardKey) { + NamespaceString nss, + const BSONObj& shardKey, + boost::optional<std::string> primaryShardId, + boost::optional<UUID> uuid) { const OID epoch = OID::gen(); const ShardKeyPattern shardKeyPattern(shardKey); @@ -232,9 +239,13 @@ ChunkManager CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShardsI // Mock the expected config server queries. if (!nss.isAdminDB() && !nss.isConfigDB()) { - expectGetDatabase(nss); + if (primaryShardId) { + expectGetDatabase(nss, *primaryShardId); + } else { + expectGetDatabase(nss); + } } - expectGetCollection(nss, epoch, shardKeyPattern); + expectGetCollection(nss, epoch, shardKeyPattern, uuid); expectFindSendBSONObjVector(kConfigHostAndPort, [&]() { ChunkVersion version(1, 0, epoch); diff --git a/src/mongo/s/catalog_cache_test_fixture.h b/src/mongo/s/catalog_cache_test_fixture.h index 3d58f6a8557..8886c1e74bb 100644 --- a/src/mongo/s/catalog_cache_test_fixture.h +++ b/src/mongo/s/catalog_cache_test_fixture.h @@ -116,8 +116,11 @@ protected: /** * The common implementation for any shard key. */ - ChunkManager loadRoutingTableWithTwoChunksAndTwoShardsImpl(NamespaceString nss, - const BSONObj& shardKey); + ChunkManager loadRoutingTableWithTwoChunksAndTwoShardsImpl( + NamespaceString nss, + const BSONObj& shardKey, + boost::optional<std::string> primaryShardId = boost::none, + boost::optional<UUID> uuid = boost::none); /** * Mocks network responses for loading a sharded database and collection from the config server. @@ -125,7 +128,8 @@ protected: void expectGetDatabase(NamespaceString nss, std::string primaryShard = "0"); void expectGetCollection(NamespaceString nss, OID epoch, - const ShardKeyPattern& shardKeyPattern); + const ShardKeyPattern& shardKeyPattern, + boost::optional<UUID> uuid = boost::none); const HostAndPort kConfigHostAndPort{"DummyConfig", 1234}; }; |