diff options
Diffstat (limited to 'src/mongo/db/s/resharding_destined_recipient_test.cpp')
-rw-r--r-- | src/mongo/db/s/resharding_destined_recipient_test.cpp | 333 |
1 files changed, 333 insertions, 0 deletions
diff --git a/src/mongo/db/s/resharding_destined_recipient_test.cpp b/src/mongo/db/s/resharding_destined_recipient_test.cpp new file mode 100644 index 00000000000..86ce0a174da --- /dev/null +++ b/src/mongo/db/s/resharding_destined_recipient_test.cpp @@ -0,0 +1,333 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#include "mongo/platform/basic.h" + +#include "mongo/db/catalog_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/repl/apply_ops.h" +#include "mongo/db/repl/oplog_interface_local.h" +#include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/resharding_util.h" +#include "mongo/db/s/shard_filtering_metadata_refresh.h" +#include "mongo/db/s/shard_server_test_fixture.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/db/session_catalog_mongod.h" +#include "mongo/db/transaction_participant.h" +#include "mongo/s/catalog/sharding_catalog_client_mock.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/catalog_cache_loader_mock.h" +#include "mongo/s/database_version_helpers.h" +#include "mongo/s/shard_id.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +class DestinedRecipientTest : public ShardServerTestFixture { +public: + const NamespaceString kNss{"test.foo"}; + const std::string kShardKey = "x"; + const HostAndPort kConfigHostAndPort{"DummyConfig", 12345}; + const std::vector<ShardType> kShardList = {ShardType("shard0", "Host0:12345"), + ShardType("shard1", "Host1:12345")}; + + void setUp() override { + // Don't call ShardServerTestFixture::setUp so we can install a mock catalog cache loader. + ShardingMongodTestFixture::setUp(); + + replicationCoordinator()->alwaysAllowWrites(true); + serverGlobalParams.clusterRole = ClusterRole::ShardServer; + + _clusterId = OID::gen(); + ShardingState::get(getServiceContext()) + ->setInitialized(kShardList[0].getName(), _clusterId); + + auto mockLoader = std::make_unique<CatalogCacheLoaderMock>(); + _mockCatalogCacheLoader = mockLoader.get(); + CatalogCacheLoader::set(getServiceContext(), std::move(mockLoader)); + + uassertStatusOK( + initializeGlobalShardingStateForMongodForTest(ConnectionString(kConfigHostAndPort))); + + configTargeterMock()->setFindHostReturnValue(kConfigHostAndPort); + + WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); + + for (const auto& shard : kShardList) { + std::unique_ptr<RemoteCommandTargeterMock> targeter( + std::make_unique<RemoteCommandTargeterMock>()); + HostAndPort host(shard.getHost()); + targeter->setConnectionStringReturnValue(ConnectionString(host)); + targeter->setFindHostReturnValue(host); + targeterFactory()->addTargeterToReturn(ConnectionString(host), std::move(targeter)); + } + } + + void tearDown() override { + WaitForMajorityService::get(getServiceContext()).shutDown(); + + ShardServerTestFixture::tearDown(); + } + + class StaticCatalogClient final : public ShardingCatalogClientMock { + public: + StaticCatalogClient(std::vector<ShardType> shards) + : ShardingCatalogClientMock(nullptr), _shards(std::move(shards)) {} + + StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards( + OperationContext* opCtx, repl::ReadConcernLevel readConcern) override { + return repl::OpTimeWith<std::vector<ShardType>>(_shards); + } + + StatusWith<std::vector<CollectionType>> getCollections( + OperationContext* opCtx, + const std::string* dbName, + repl::OpTime* optime, + repl::ReadConcernLevel readConcernLevel) override { + return _colls; + } + + void setCollections(std::vector<CollectionType> colls) { + _colls = std::move(colls); + } + + private: + const std::vector<ShardType> _shards; + std::vector<CollectionType> _colls; + }; + + std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient( + std::unique_ptr<DistLockManager> distLockManager) override { + return std::make_unique<StaticCatalogClient>(kShardList); + } + +protected: + CollectionType createCollection(const OID& epoch) { + CollectionType coll; + + coll.setNs(kNss); + coll.setEpoch(epoch); + coll.setKeyPattern(BSON(kShardKey << 1)); + coll.setUnique(false); + coll.setUUID(UUID::gen()); + + return coll; + } + + std::vector<ChunkType> createChunks(const OID& epoch, const std::string& shardKey) { + auto range1 = ChunkRange(BSON(shardKey << MINKEY), BSON(shardKey << 5)); + ChunkType chunk1(kNss, range1, ChunkVersion(1, 0, epoch), kShardList[0].getName()); + + auto range2 = ChunkRange(BSON(shardKey << 5), BSON(shardKey << MAXKEY)); + ChunkType chunk2(kNss, range2, ChunkVersion(1, 0, epoch), kShardList[1].getName()); + + return {chunk1, chunk2}; + } + + struct ReshardingEnv { + ReshardingEnv(UUID uuid) : sourceUuid(std::move(uuid)) {} + + NamespaceString tempNss; + UUID sourceUuid; + ShardId destShard; + ChunkVersion version; + DatabaseVersion dbVersion; + }; + + ReshardingEnv setupReshardingEnv(OperationContext* opCtx, bool refreshTempNss) { + DBDirectClient client(opCtx); + client.createCollection(kNss.ns()); + client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns()); + + ReshardingEnv env(CollectionCatalog::get(opCtx).lookupUUIDByNSS(opCtx, kNss).value()); + env.destShard = kShardList[1].getName(); + env.version = ChunkVersion(1, 0, OID::gen()); + env.dbVersion = databaseVersion::makeNew(); + + env.tempNss = + NamespaceString(kNss.db(), + fmt::format("{}{}", + NamespaceString::kTemporaryReshardingCollectionPrefix, + env.sourceUuid.toString())); + + client.createCollection(env.tempNss.ns()); + + + DatabaseType db(kNss.db().toString(), kShardList[0].getName(), true, env.dbVersion); + + TypeCollectionReshardingFields reshardingFields; + reshardingFields.setUuid(UUID::gen()); + reshardingFields.setDonorFields(TypeCollectionDonorFields{BSON("y" << 1)}); + + auto collType = createCollection(env.version.epoch()); + + _mockCatalogCacheLoader->setDatabaseRefreshReturnValue(db); + _mockCatalogCacheLoader->setCollectionRefreshValues( + kNss, collType, createChunks(env.version.epoch(), kShardKey), reshardingFields); + _mockCatalogCacheLoader->setCollectionRefreshValues( + env.tempNss, collType, createChunks(env.version.epoch(), "y"), boost::none); + + forceShardFilteringMetadataRefresh(opCtx, kNss); + + if (refreshTempNss) + forceShardFilteringMetadataRefresh(opCtx, env.tempNss); + + return env; + } + + void writeDoc(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& doc, + const ReshardingEnv& env) { + WriteUnitOfWork wuow(opCtx); + AutoGetCollection autoColl1(opCtx, nss, MODE_IX); + + // TODO(SERVER-50027): This is to temporarily make this test pass until getOwnershipFilter + // has been updated to detect frozen migrations. + if (!OperationShardingState::isOperationVersioned(opCtx)) { + OperationShardingState::get(opCtx).initializeClientRoutingVersions( + nss, env.version, env.dbVersion); + } + + auto collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss); + ASSERT(collection); + auto status = collection->insertDocument(opCtx, InsertStatement(doc), nullptr); + ASSERT_OK(status); + + wuow.commit(); + } + +protected: + CatalogCacheLoaderMock* _mockCatalogCacheLoader; +}; + +TEST_F(DestinedRecipientTest, TestGetDestinedRecipient) { + auto opCtx = operationContext(); + auto env = setupReshardingEnv(opCtx, true); + + AutoGetCollection coll(opCtx, kNss, MODE_IX); + + // TODO(SERVER-50027): This is to temporarily make this test pass until getOwnershipFilter has + // been updated to detect frozen migrations. + if (!OperationShardingState::isOperationVersioned(opCtx)) { + OperationShardingState::get(opCtx).initializeClientRoutingVersions( + kNss, env.version, env.dbVersion); + } + + auto destShardId = getDestinedRecipient(opCtx, kNss, BSON("x" << 2 << "y" << 10)); + ASSERT(destShardId); + ASSERT_EQ(*destShardId, env.destShard); +} + +TEST_F(DestinedRecipientTest, TestGetDestinedRecipientThrowsOnBlockedRefresh) { + auto opCtx = operationContext(); + auto env = setupReshardingEnv(opCtx, false); + + AutoGetCollection coll(opCtx, kNss, MODE_IX); + + // TODO(SERVER-50027): This is to temporarily make this test pass until getOwnershipFilter has + // been updated to detect frozen migrations. + if (!OperationShardingState::isOperationVersioned(opCtx)) { + OperationShardingState::get(opCtx).initializeClientRoutingVersions( + kNss, env.version, env.dbVersion); + } + + ASSERT_THROWS(getDestinedRecipient(opCtx, kNss, BSON("x" << 2 << "y" << 10)), + ExceptionFor<ErrorCodes::ShardInvalidatedForTargeting>); +} + +TEST_F(DestinedRecipientTest, TestOpObserverSetsDestinedRecipientOnInserts) { + auto opCtx = operationContext(); + auto env = setupReshardingEnv(opCtx, true); + + writeDoc(opCtx, kNss, BSON("_id" << 0 << "x" << 2 << "y" << 10), env); + + repl::OplogInterfaceLocal oplogInterface(opCtx); + auto oplogIter = oplogInterface.makeIterator(); + + auto nextValue = unittest::assertGet(oplogIter->next()); + const auto& doc = nextValue.first; + auto entry = unittest::assertGet(repl::OplogEntry::parse(doc)); + auto recipShard = entry.getDestinedRecipient(); + + ASSERT(recipShard); + ASSERT_EQ(*recipShard, env.destShard); +} + +TEST_F(DestinedRecipientTest, TestOpObserverSetsDestinedRecipientOnInsertsInTransaction) { + auto opCtx = operationContext(); + auto env = setupReshardingEnv(opCtx, true); + + auto sessionId = makeLogicalSessionIdForTest(); + const TxnNumber txnNum = 0; + + { + opCtx->setLogicalSessionId(sessionId); + opCtx->setTxnNumber(txnNum); + opCtx->setInMultiDocumentTransaction(); + + MongoDOperationContextSession ocs(opCtx); + + auto txnParticipant = TransactionParticipant::get(opCtx); + ASSERT(txnParticipant); + txnParticipant.beginOrContinue(opCtx, txnNum, false, true); + txnParticipant.unstashTransactionResources(opCtx, "SetDestinedRecipient"); + + writeDoc(opCtx, kNss, BSON("_id" << 0 << "x" << 2 << "y" << 10), env); + + txnParticipant.commitUnpreparedTransaction(opCtx); + txnParticipant.stashTransactionResources(opCtx); + } + + // Look for destined recipient in latest oplog entry. Since this write was done in a + // transaction, the write operation will be embedded in an applyOps entry and needs to be + // extracted. + repl::OplogInterfaceLocal oplogInterface(opCtx); + auto oplogIter = oplogInterface.makeIterator(); + + const auto& doc = unittest::assertGet(oplogIter->next()).first; + auto entry = unittest::assertGet(repl::OplogEntry::parse(doc)); + auto info = repl::ApplyOpsCommandInfo::parse(entry.getOperationToApply()); + + auto ops = info.getOperations(); + auto replOp = repl::ReplOperation::parse(IDLParserErrorContext("insertOp"), ops[0]); + ASSERT_EQ(replOp.getNss(), kNss); + + auto recipShard = replOp.getDestinedRecipient(); + ASSERT(recipShard); + ASSERT_EQ(*recipShard, env.destShard); +} + +} // namespace +} // namespace mongo |