From 253740ba90154d8f9330553f142e177db47f4910 Mon Sep 17 00:00:00 2001 From: Alex Taskov Date: Tue, 29 Sep 2020 19:42:59 -0400 Subject: SERVER-49822 Add destined recipient to oplog entries from inserts --- src/mongo/db/op_observer_impl.cpp | 1 + src/mongo/db/repl/oplog.cpp | 1 + src/mongo/db/s/SConscript | 3 + .../db/s/resharding_destined_recipient_test.cpp | 333 +++++++++++++++++++++ src/mongo/db/s/resharding_util.cpp | 11 +- src/mongo/db/s/resharding_util.h | 2 +- src/mongo/s/catalog_cache_loader_mock.cpp | 50 ++-- src/mongo/s/catalog_cache_loader_mock.h | 21 ++ 8 files changed, 399 insertions(+), 23 deletions(-) create mode 100644 src/mongo/db/s/resharding_destined_recipient_test.cpp diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index bb187fc2149..0aa06762031 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -454,6 +454,7 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, for (auto iter = first; iter != last; iter++) { auto operation = MutableOplogEntry::makeInsertOperation(nss, uuid.get(), iter->doc); + shardAnnotateOplogEntry(opCtx, nss, iter->doc, operation); txnParticipant.addTransactionOperation(opCtx, operation); } } else { diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 23c148b7f75..c79267245db 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -393,6 +393,7 @@ std::vector logInsertOps(OperationContext* opCtx, } oplogEntry.setObject(begin[i].doc); oplogEntry.setOpTime(insertStatementOplogSlot); + oplogEntry.setDestinedRecipient(getDestinedRecipient(opCtx, nss, begin[i].doc)); OplogLink oplogLink; if (i > 0) diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 1f005757352..138ca82d3f2 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -433,6 +433,7 @@ env.CppUnitTest( 'migration_session_id_test.cpp', 'migration_util_test.cpp', 'namespace_metadata_change_notifications_test.cpp', + 'resharding_destined_recipient_test.cpp', 'session_catalog_migration_destination_test.cpp', 'session_catalog_migration_source_test.cpp', 'shard_local_test.cpp', @@ -453,10 +454,12 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/ops/write_ops_exec', '$BUILD_DIR/mongo/db/query/query_request', '$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture', + '$BUILD_DIR/mongo/db/repl/oplog_interface_local', '$BUILD_DIR/mongo/db/repl/storage_interface_impl', '$BUILD_DIR/mongo/db/repl/wait_for_majority_service', '$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock', '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock', + 'resharding_util', 'shard_server_test_fixture', 'sharding_logging', 'sharding_runtime_d', diff --git a/src/mongo/db/s/resharding_destined_recipient_test.cpp b/src/mongo/db/s/resharding_destined_recipient_test.cpp new file mode 100644 index 00000000000..86ce0a174da --- /dev/null +++ b/src/mongo/db/s/resharding_destined_recipient_test.cpp @@ -0,0 +1,333 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#include "mongo/platform/basic.h" + +#include "mongo/db/catalog_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/repl/apply_ops.h" +#include "mongo/db/repl/oplog_interface_local.h" +#include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/resharding_util.h" +#include "mongo/db/s/shard_filtering_metadata_refresh.h" +#include "mongo/db/s/shard_server_test_fixture.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/db/session_catalog_mongod.h" +#include "mongo/db/transaction_participant.h" +#include "mongo/s/catalog/sharding_catalog_client_mock.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/catalog_cache_loader_mock.h" +#include "mongo/s/database_version_helpers.h" +#include "mongo/s/shard_id.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +class DestinedRecipientTest : public ShardServerTestFixture { +public: + const NamespaceString kNss{"test.foo"}; + const std::string kShardKey = "x"; + const HostAndPort kConfigHostAndPort{"DummyConfig", 12345}; + const std::vector kShardList = {ShardType("shard0", "Host0:12345"), + ShardType("shard1", "Host1:12345")}; + + void setUp() override { + // Don't call ShardServerTestFixture::setUp so we can install a mock catalog cache loader. + ShardingMongodTestFixture::setUp(); + + replicationCoordinator()->alwaysAllowWrites(true); + serverGlobalParams.clusterRole = ClusterRole::ShardServer; + + _clusterId = OID::gen(); + ShardingState::get(getServiceContext()) + ->setInitialized(kShardList[0].getName(), _clusterId); + + auto mockLoader = std::make_unique(); + _mockCatalogCacheLoader = mockLoader.get(); + CatalogCacheLoader::set(getServiceContext(), std::move(mockLoader)); + + uassertStatusOK( + initializeGlobalShardingStateForMongodForTest(ConnectionString(kConfigHostAndPort))); + + configTargeterMock()->setFindHostReturnValue(kConfigHostAndPort); + + WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); + + for (const auto& shard : kShardList) { + std::unique_ptr targeter( + std::make_unique()); + HostAndPort host(shard.getHost()); + targeter->setConnectionStringReturnValue(ConnectionString(host)); + targeter->setFindHostReturnValue(host); + targeterFactory()->addTargeterToReturn(ConnectionString(host), std::move(targeter)); + } + } + + void tearDown() override { + WaitForMajorityService::get(getServiceContext()).shutDown(); + + ShardServerTestFixture::tearDown(); + } + + class StaticCatalogClient final : public ShardingCatalogClientMock { + public: + StaticCatalogClient(std::vector shards) + : ShardingCatalogClientMock(nullptr), _shards(std::move(shards)) {} + + StatusWith>> getAllShards( + OperationContext* opCtx, repl::ReadConcernLevel readConcern) override { + return repl::OpTimeWith>(_shards); + } + + StatusWith> getCollections( + OperationContext* opCtx, + const std::string* dbName, + repl::OpTime* optime, + repl::ReadConcernLevel readConcernLevel) override { + return _colls; + } + + void setCollections(std::vector colls) { + _colls = std::move(colls); + } + + private: + const std::vector _shards; + std::vector _colls; + }; + + std::unique_ptr makeShardingCatalogClient( + std::unique_ptr distLockManager) override { + return std::make_unique(kShardList); + } + +protected: + CollectionType createCollection(const OID& epoch) { + CollectionType coll; + + coll.setNs(kNss); + coll.setEpoch(epoch); + coll.setKeyPattern(BSON(kShardKey << 1)); + coll.setUnique(false); + coll.setUUID(UUID::gen()); + + return coll; + } + + std::vector createChunks(const OID& epoch, const std::string& shardKey) { + auto range1 = ChunkRange(BSON(shardKey << MINKEY), BSON(shardKey << 5)); + ChunkType chunk1(kNss, range1, ChunkVersion(1, 0, epoch), kShardList[0].getName()); + + auto range2 = ChunkRange(BSON(shardKey << 5), BSON(shardKey << MAXKEY)); + ChunkType chunk2(kNss, range2, ChunkVersion(1, 0, epoch), kShardList[1].getName()); + + return {chunk1, chunk2}; + } + + struct ReshardingEnv { + ReshardingEnv(UUID uuid) : sourceUuid(std::move(uuid)) {} + + NamespaceString tempNss; + UUID sourceUuid; + ShardId destShard; + ChunkVersion version; + DatabaseVersion dbVersion; + }; + + ReshardingEnv setupReshardingEnv(OperationContext* opCtx, bool refreshTempNss) { + DBDirectClient client(opCtx); + client.createCollection(kNss.ns()); + client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns()); + + ReshardingEnv env(CollectionCatalog::get(opCtx).lookupUUIDByNSS(opCtx, kNss).value()); + env.destShard = kShardList[1].getName(); + env.version = ChunkVersion(1, 0, OID::gen()); + env.dbVersion = databaseVersion::makeNew(); + + env.tempNss = + NamespaceString(kNss.db(), + fmt::format("{}{}", + NamespaceString::kTemporaryReshardingCollectionPrefix, + env.sourceUuid.toString())); + + client.createCollection(env.tempNss.ns()); + + + DatabaseType db(kNss.db().toString(), kShardList[0].getName(), true, env.dbVersion); + + TypeCollectionReshardingFields reshardingFields; + reshardingFields.setUuid(UUID::gen()); + reshardingFields.setDonorFields(TypeCollectionDonorFields{BSON("y" << 1)}); + + auto collType = createCollection(env.version.epoch()); + + _mockCatalogCacheLoader->setDatabaseRefreshReturnValue(db); + _mockCatalogCacheLoader->setCollectionRefreshValues( + kNss, collType, createChunks(env.version.epoch(), kShardKey), reshardingFields); + _mockCatalogCacheLoader->setCollectionRefreshValues( + env.tempNss, collType, createChunks(env.version.epoch(), "y"), boost::none); + + forceShardFilteringMetadataRefresh(opCtx, kNss); + + if (refreshTempNss) + forceShardFilteringMetadataRefresh(opCtx, env.tempNss); + + return env; + } + + void writeDoc(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& doc, + const ReshardingEnv& env) { + WriteUnitOfWork wuow(opCtx); + AutoGetCollection autoColl1(opCtx, nss, MODE_IX); + + // TODO(SERVER-50027): This is to temporarily make this test pass until getOwnershipFilter + // has been updated to detect frozen migrations. + if (!OperationShardingState::isOperationVersioned(opCtx)) { + OperationShardingState::get(opCtx).initializeClientRoutingVersions( + nss, env.version, env.dbVersion); + } + + auto collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nss); + ASSERT(collection); + auto status = collection->insertDocument(opCtx, InsertStatement(doc), nullptr); + ASSERT_OK(status); + + wuow.commit(); + } + +protected: + CatalogCacheLoaderMock* _mockCatalogCacheLoader; +}; + +TEST_F(DestinedRecipientTest, TestGetDestinedRecipient) { + auto opCtx = operationContext(); + auto env = setupReshardingEnv(opCtx, true); + + AutoGetCollection coll(opCtx, kNss, MODE_IX); + + // TODO(SERVER-50027): This is to temporarily make this test pass until getOwnershipFilter has + // been updated to detect frozen migrations. + if (!OperationShardingState::isOperationVersioned(opCtx)) { + OperationShardingState::get(opCtx).initializeClientRoutingVersions( + kNss, env.version, env.dbVersion); + } + + auto destShardId = getDestinedRecipient(opCtx, kNss, BSON("x" << 2 << "y" << 10)); + ASSERT(destShardId); + ASSERT_EQ(*destShardId, env.destShard); +} + +TEST_F(DestinedRecipientTest, TestGetDestinedRecipientThrowsOnBlockedRefresh) { + auto opCtx = operationContext(); + auto env = setupReshardingEnv(opCtx, false); + + AutoGetCollection coll(opCtx, kNss, MODE_IX); + + // TODO(SERVER-50027): This is to temporarily make this test pass until getOwnershipFilter has + // been updated to detect frozen migrations. + if (!OperationShardingState::isOperationVersioned(opCtx)) { + OperationShardingState::get(opCtx).initializeClientRoutingVersions( + kNss, env.version, env.dbVersion); + } + + ASSERT_THROWS(getDestinedRecipient(opCtx, kNss, BSON("x" << 2 << "y" << 10)), + ExceptionFor); +} + +TEST_F(DestinedRecipientTest, TestOpObserverSetsDestinedRecipientOnInserts) { + auto opCtx = operationContext(); + auto env = setupReshardingEnv(opCtx, true); + + writeDoc(opCtx, kNss, BSON("_id" << 0 << "x" << 2 << "y" << 10), env); + + repl::OplogInterfaceLocal oplogInterface(opCtx); + auto oplogIter = oplogInterface.makeIterator(); + + auto nextValue = unittest::assertGet(oplogIter->next()); + const auto& doc = nextValue.first; + auto entry = unittest::assertGet(repl::OplogEntry::parse(doc)); + auto recipShard = entry.getDestinedRecipient(); + + ASSERT(recipShard); + ASSERT_EQ(*recipShard, env.destShard); +} + +TEST_F(DestinedRecipientTest, TestOpObserverSetsDestinedRecipientOnInsertsInTransaction) { + auto opCtx = operationContext(); + auto env = setupReshardingEnv(opCtx, true); + + auto sessionId = makeLogicalSessionIdForTest(); + const TxnNumber txnNum = 0; + + { + opCtx->setLogicalSessionId(sessionId); + opCtx->setTxnNumber(txnNum); + opCtx->setInMultiDocumentTransaction(); + + MongoDOperationContextSession ocs(opCtx); + + auto txnParticipant = TransactionParticipant::get(opCtx); + ASSERT(txnParticipant); + txnParticipant.beginOrContinue(opCtx, txnNum, false, true); + txnParticipant.unstashTransactionResources(opCtx, "SetDestinedRecipient"); + + writeDoc(opCtx, kNss, BSON("_id" << 0 << "x" << 2 << "y" << 10), env); + + txnParticipant.commitUnpreparedTransaction(opCtx); + txnParticipant.stashTransactionResources(opCtx); + } + + // Look for destined recipient in latest oplog entry. Since this write was done in a + // transaction, the write operation will be embedded in an applyOps entry and needs to be + // extracted. + repl::OplogInterfaceLocal oplogInterface(opCtx); + auto oplogIter = oplogInterface.makeIterator(); + + const auto& doc = unittest::assertGet(oplogIter->next()).first; + auto entry = unittest::assertGet(repl::OplogEntry::parse(doc)); + auto info = repl::ApplyOpsCommandInfo::parse(entry.getOperationToApply()); + + auto ops = info.getOperations(); + auto replOp = repl::ReplOperation::parse(IDLParserErrorContext("insertOp"), ops[0]); + ASSERT_EQ(replOp.getNss(), kNss); + + auto recipShard = replOp.getDestinedRecipient(); + ASSERT(recipShard); + ASSERT_EQ(*recipShard, env.destShard); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp index ccfc3caead7..339f4f456d2 100644 --- a/src/mongo/db/s/resharding_util.cpp +++ b/src/mongo/db/s/resharding_util.cpp @@ -55,6 +55,7 @@ #include "mongo/s/async_requests_sender.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/flush_routing_table_cache_updates_gen.h" +#include "mongo/s/shard_invalidated_for_targeting_exception.h" #include "mongo/s/shard_key_pattern.h" namespace mongo { @@ -82,7 +83,9 @@ NamespaceString getTempReshardingNss(StringData db, const UUID& sourceUuid) { // Ensure that this shard owns the document. This must be called after verifying that we // are in a resharding operation so that we are guaranteed that migrations are suspended. -bool documentBelongsToMe(OperationContext* opCtx, CollectionShardingState* css, BSONObj doc) { +bool documentBelongsToMe(OperationContext* opCtx, + CollectionShardingState* css, + const BSONObj& doc) { auto currentKeyPattern = ShardKeyPattern(css->getCollectionDescription(opCtx).getKeyPattern()); auto ownershipFilter = css->getOwnershipFilter( opCtx, CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup); @@ -92,7 +95,7 @@ bool documentBelongsToMe(OperationContext* opCtx, CollectionShardingState* css, boost::optional getDonorFields(OperationContext* opCtx, const NamespaceString& sourceNss, - BSONObj fullDocument) { + const BSONObj& fullDocument) { auto css = CollectionShardingState::get(opCtx, sourceNss); auto collDesc = css->getCollectionDescription(opCtx); @@ -680,7 +683,7 @@ std::unique_ptr createAggForCollectionCloning( boost::optional getDestinedRecipient(OperationContext* opCtx, const NamespaceString& sourceNss, - BSONObj fullDocument) { + const BSONObj& fullDocument) { auto donorFields = getDonorFields(opCtx, sourceNss, fullDocument); if (!donorFields) return boost::none; @@ -691,7 +694,7 @@ boost::optional getDestinedRecipient(OperationContext* opCtx, getTempReshardingNss(sourceNss.db(), getCollectionUuid(opCtx, sourceNss)), allowLocks); - uassert(ErrorCodes::ShardInvalidatedForTargeting, + uassert(ShardInvalidatedForTargetingInfo(sourceNss), "Routing information is not available for the temporary resharding collection.", tempNssRoutingInfo.getStatus() != ErrorCodes::StaleShardVersion); diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h index 21e4d192682..247dfba84a9 100644 --- a/src/mongo/db/s/resharding_util.h +++ b/src/mongo/db/s/resharding_util.h @@ -179,7 +179,7 @@ std::unique_ptr createOplogFetchingPipelineForReshard */ boost::optional getDestinedRecipient(OperationContext* opCtx, const NamespaceString& sourceNss, - BSONObj fullDocument); + const BSONObj& fullDocument); /** * Creates pipeline for filtering collection data matching the recipient shard. */ diff --git a/src/mongo/s/catalog_cache_loader_mock.cpp b/src/mongo/s/catalog_cache_loader_mock.cpp index 4fb761e5d4c..1bfe4d9c7d8 100644 --- a/src/mongo/s/catalog_cache_loader_mock.cpp +++ b/src/mongo/s/catalog_cache_loader_mock.cpp @@ -76,27 +76,41 @@ void CatalogCacheLoaderMock::waitForDatabaseFlush(OperationContext* opCtx, Strin MONGO_UNREACHABLE; } +CollectionAndChangedChunks getCollectionRefresh( + const StatusWith& swCollectionReturnValue, + StatusWith> swChunksReturnValue, + const boost::optional& reshardingFields) { + uassertStatusOK(swCollectionReturnValue); + uassertStatusOK(swChunksReturnValue); + + // We swap the chunks out of _swChunksReturnValue to ensure if this task is + // scheduled multiple times that we don't inform the ChunkManager about a chunk it + // has already updated. + std::vector chunks; + swChunksReturnValue.getValue().swap(chunks); + + return CollectionAndChangedChunks(swCollectionReturnValue.getValue().getUUID(), + swCollectionReturnValue.getValue().getEpoch(), + swCollectionReturnValue.getValue().getKeyPattern().toBSON(), + swCollectionReturnValue.getValue().getDefaultCollation(), + swCollectionReturnValue.getValue().getUnique(), + reshardingFields, + std::move(chunks)); +} + SemiFuture CatalogCacheLoaderMock::getChunksSince( const NamespaceString& nss, ChunkVersion version) { - return makeReadyFutureWith([this] { - uassertStatusOK(_swCollectionReturnValue); - uassertStatusOK(_swChunksReturnValue); - - // We swap the chunks out of _swChunksReturnValue to ensure if this task is - // scheduled multiple times that we don't inform the ChunkManager about a chunk it - // has already updated. - std::vector chunks; - _swChunksReturnValue.getValue().swap(chunks); - - return CollectionAndChangedChunks( - _swCollectionReturnValue.getValue().getUUID(), - _swCollectionReturnValue.getValue().getEpoch(), - _swCollectionReturnValue.getValue().getKeyPattern().toBSON(), - _swCollectionReturnValue.getValue().getDefaultCollation(), - _swCollectionReturnValue.getValue().getUnique(), - boost::none, - std::move(chunks)); + return makeReadyFutureWith([&nss, this] { + auto it = _refreshValues.find(nss); + + if (it != _refreshValues.end()) + return getCollectionRefresh(it->second.swCollectionReturnValue, + std::move(it->second.swChunksReturnValue), + it->second.reshardingFields); + + return getCollectionRefresh( + _swCollectionReturnValue, std::move(_swChunksReturnValue), _reshardingFields); }) .semi(); } diff --git a/src/mongo/s/catalog_cache_loader_mock.h b/src/mongo/s/catalog_cache_loader_mock.h index 9aa92f2a893..13538c4f69c 100644 --- a/src/mongo/s/catalog_cache_loader_mock.h +++ b/src/mongo/s/catalog_cache_loader_mock.h @@ -83,6 +83,18 @@ public: void setDatabaseRefreshReturnValue(StatusWith swDatabase); void clearDatabaseReturnValue(); + void setReshardingFields(boost::optional reshardingFields) { + _reshardingFields = std::move(reshardingFields); + } + + void setCollectionRefreshValues( + const NamespaceString& nss, + StatusWith statusWithCollection, + StatusWith> statusWithChunks, + boost::optional reshardingFields) { + _refreshValues[nss] = {statusWithCollection, statusWithChunks, reshardingFields}; + } + static const Status kCollectionInternalErrorStatus; static const Status kChunksInternalErrorStatus; static const Status kDatabaseInternalErrorStatus; @@ -92,6 +104,15 @@ private: StatusWith> _swChunksReturnValue{kChunksInternalErrorStatus}; + boost::optional _reshardingFields; + + struct RefreshInfo { + StatusWith swCollectionReturnValue{kCollectionInternalErrorStatus}; + StatusWith> swChunksReturnValue{kChunksInternalErrorStatus}; + boost::optional reshardingFields; + }; + + stdx::unordered_map _refreshValues; StatusWith _swDatabaseReturnValue{kDatabaseInternalErrorStatus}; }; -- cgit v1.2.1