diff options
author | Randolph Tan <randolph@10gen.com> | 2021-09-21 16:19:56 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-09-21 17:03:47 +0000 |
commit | 800c6d40912751e272853e383f4c4bf1f00e5c88 (patch) | |
tree | b65e26e73d5a26fdca4ec297fc0c283cb90f29b4 /src/mongo/db/s | |
parent | 536cd38e7ab615181f5586519f60728c52e47108 (diff) | |
download | mongo-800c6d40912751e272853e383f4c4bf1f00e5c88.tar.gz |
SERVER-58915 Implement ReshardingDonorWriteRouter functionality along…
Diffstat (limited to 'src/mongo/db/s')
16 files changed, 208 insertions, 256 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index da18f30056d..e11ff37a646 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -16,9 +16,9 @@ env.Library( 'collection_sharding_state.cpp', 'database_sharding_state.cpp', 'operation_sharding_state.cpp', - 'resharding_donor_write_router.cpp', 'sharding_migration_critical_section.cpp', 'sharding_state.cpp', + 'sharding_write_router.cpp', 'transaction_coordinator_curop.cpp', 'transaction_coordinator_factory.cpp', 'transaction_coordinator_worker_curop_repository.cpp', @@ -27,6 +27,7 @@ env.Library( '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/concurrency/lock_manager', '$BUILD_DIR/mongo/db/range_arithmetic', + '$BUILD_DIR/mongo/s/grid', '$BUILD_DIR/mongo/s/sharding_routing_table', ], ) @@ -61,7 +62,12 @@ env.Library( 'range_deletion_util.cpp', 'read_only_catalog_cache_loader.cpp', 'recoverable_critical_section_service.cpp', + 'resharding_util.cpp', + 'resharding/coordinator_document.idl','resharding/document_source_resharding_iterate_transaction.cpp', 'resharding/document_source_resharding_ownership_match.cpp', + 'resharding/donor_document.idl', + 'resharding/donor_oplog_id.idl', + 'resharding/recipient_document.idl', 'resharding/resharding_change_event_o2_field.idl', 'resharding/resharding_collection_cloner.cpp', 'resharding/resharding_coordinator_commit_monitor.cpp', @@ -145,35 +151,6 @@ env.Library( '$BUILD_DIR/mongo/db/session_catalog', '$BUILD_DIR/mongo/idl/server_parameter', '$BUILD_DIR/mongo/util/future_util', - 'resharding_util', - ], -) - -# Be careful about adding dependencies to this library, as any dependencies will currently be -# included in mongo_embedded. -env.Library( - target='resharding_util', - source=[ - 'resharding_util.cpp', - 'resharding/coordinator_document.idl', - 'resharding/document_source_resharding_iterate_transaction.cpp', - 'resharding/donor_document.idl', - 'resharding/donor_oplog_id.idl', - 'resharding/recipient_document.idl', - ], - LIBDEPS_PRIVATE=[ - '$BUILD_DIR/mongo/base', - '$BUILD_DIR/mongo/db/catalog/collection_options', - '$BUILD_DIR/mongo/db/common', - '$BUILD_DIR/mongo/db/concurrency/write_conflict_exception', - '$BUILD_DIR/mongo/db/curop', - '$BUILD_DIR/mongo/db/namespace_string', - '$BUILD_DIR/mongo/db/pipeline/expression_context', - '$BUILD_DIR/mongo/db/pipeline/pipeline', - '$BUILD_DIR/mongo/db/storage/write_unit_of_work', - '$BUILD_DIR/mongo/s/async_requests_sender', - '$BUILD_DIR/mongo/s/grid', - 'sharding_api_d', ], ) @@ -392,7 +369,6 @@ env.Library( '$BUILD_DIR/mongo/s/commands/shared_cluster_commands', '$BUILD_DIR/mongo/s/sharding_initialization', '$BUILD_DIR/mongo/s/sharding_router_api', - 'resharding_util', 'sharding_runtime_d', ], ) @@ -543,7 +519,6 @@ env.CppUnitTest( '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock', '$BUILD_DIR/mongo/s/sharding_router_test_fixture', - 'resharding_util', 'shard_server_test_fixture', 'sharding_commands_d', 'sharding_logging', @@ -597,7 +572,6 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/timeseries/timeseries_options', '$BUILD_DIR/mongo/util/version_impl', 'config_server_test_fixture', - 'resharding_util', ], ) diff --git a/src/mongo/db/s/config/config_server_test_fixture.cpp b/src/mongo/db/s/config/config_server_test_fixture.cpp index 3144add6236..aa573a94deb 100644 --- a/src/mongo/db/s/config/config_server_test_fixture.cpp +++ b/src/mongo/db/s/config/config_server_test_fixture.cpp @@ -43,6 +43,8 @@ #include "mongo/db/commands.h" #include "mongo/db/namespace_string.h" #include "mongo/db/op_observer.h" +#include "mongo/db/op_observer_impl.h" +#include "mongo/db/op_observer_registry.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/query_request_helper.h" @@ -51,6 +53,7 @@ #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/s/config/sharding_catalog_manager.h" +#include "mongo/db/s/config_server_op_observer.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/rpc/metadata/repl_set_metadata.h" @@ -473,4 +476,11 @@ void ConfigServerTestFixture::expectSetShardVersion( }); } +void ConfigServerTestFixture::setupOpObservers() { + auto opObserverRegistry = + checked_cast<OpObserverRegistry*>(getServiceContext()->getOpObserver()); + opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>()); + opObserverRegistry->addObserver(std::make_unique<ConfigServerOpObserver>()); +} + } // namespace mongo diff --git a/src/mongo/db/s/config/config_server_test_fixture.h b/src/mongo/db/s/config/config_server_test_fixture.h index 5e00755a652..821b8987406 100644 --- a/src/mongo/db/s/config/config_server_test_fixture.h +++ b/src/mongo/db/s/config/config_server_test_fixture.h @@ -194,6 +194,9 @@ protected: std::unique_ptr<BalancerConfiguration> makeBalancerConfiguration() override; +protected: + void setupOpObservers() override; + private: /** * 'onPreInitGlobalStateFn' is invoked near the end of _setUp() before calling diff --git a/src/mongo/db/s/op_observer_sharding_impl.cpp b/src/mongo/db/s/op_observer_sharding_impl.cpp index b55ef001bd0..319db36b61b 100644 --- a/src/mongo/db/s/op_observer_sharding_impl.cpp +++ b/src/mongo/db/s/op_observer_sharding_impl.cpp @@ -40,6 +40,7 @@ #include "mongo/db/s/migration_chunk_cloner_source_legacy.h" #include "mongo/db/s/migration_source_manager.h" #include "mongo/db/s/resharding_util.h" +#include "mongo/db/s/sharding_write_router.h" #include "mongo/logv2/log.h" namespace mongo { @@ -121,12 +122,13 @@ void OpObserverShardingImpl::shardObserveInsertOp(OperationContext* opCtx, const NamespaceString nss, const BSONObj& insertedDoc, const repl::OpTime& opTime, - CollectionShardingState* css, + const ShardingWriteRouter& shardingWriteRouter, const bool fromMigrate, const bool inMultiDocumentTransaction) { if (nss == NamespaceString::kSessionTransactionsTableNamespace || fromMigrate) return; + auto css = shardingWriteRouter.getCollectionShardingState(); auto* const csr = CollectionShardingRuntime::get(css); csr->checkShardVersionOrThrow(opCtx); @@ -160,9 +162,10 @@ void OpObserverShardingImpl::shardObserveUpdateOp(OperationContext* opCtx, boost::optional<BSONObj> preImageDoc, const BSONObj& postImageDoc, const repl::OpTime& opTime, - CollectionShardingState* css, + const ShardingWriteRouter& shardingWriteRouter, const repl::OpTime& prePostImageOpTime, const bool inMultiDocumentTransaction) { + auto css = shardingWriteRouter.getCollectionShardingState(); auto* const csr = CollectionShardingRuntime::get(css); csr->checkShardVersionOrThrow(opCtx); @@ -195,9 +198,10 @@ void OpObserverShardingImpl::shardObserveDeleteOp(OperationContext* opCtx, const NamespaceString nss, const BSONObj& documentKey, const repl::OpTime& opTime, - CollectionShardingState* css, + const ShardingWriteRouter& shardingWriteRouter, const repl::OpTime& preImageOpTime, const bool inMultiDocumentTransaction) { + auto css = shardingWriteRouter.getCollectionShardingState(); auto* const csr = CollectionShardingRuntime::get(css); csr->checkShardVersionOrThrow(opCtx); @@ -237,13 +241,4 @@ void OpObserverShardingImpl::shardObserveTransactionPrepareOrUnpreparedCommit( opCtx->getServiceContext(), stmts, prepareOrCommitOptime)); } -void OpObserverShardingImpl::shardAnnotateOplogEntry(OperationContext* opCtx, - const NamespaceString nss, - const BSONObj& doc, - repl::DurableReplOperation& op, - CollectionShardingState* css, - const ScopedCollectionDescription& collDesc) { - op.setDestinedRecipient(getDestinedRecipient(opCtx, nss, doc, css, collDesc)); -} - } // namespace mongo diff --git a/src/mongo/db/s/op_observer_sharding_impl.h b/src/mongo/db/s/op_observer_sharding_impl.h index 0fafcf833ff..f9005497c57 100644 --- a/src/mongo/db/s/op_observer_sharding_impl.h +++ b/src/mongo/db/s/op_observer_sharding_impl.h @@ -33,6 +33,8 @@ namespace mongo { +class ShardingWriteRouter; + class OpObserverShardingImpl : public OpObserverImpl { public: // True if the document being deleted belongs to a chunk which, while still in the shard, @@ -50,7 +52,7 @@ protected: NamespaceString nss, const BSONObj& insertedDoc, const repl::OpTime& opTime, - CollectionShardingState* css, + const ShardingWriteRouter& shardingWriteRouter, bool fromMigrate, bool inMultiDocumentTransaction) override; void shardObserveUpdateOp(OperationContext* opCtx, @@ -58,27 +60,20 @@ protected: boost::optional<BSONObj> preImageDoc, const BSONObj& updatedDoc, const repl::OpTime& opTime, - CollectionShardingState* css, + const ShardingWriteRouter& shardingWriteRouter, const repl::OpTime& prePostImageOpTime, bool inMultiDocumentTransaction) override; void shardObserveDeleteOp(OperationContext* opCtx, NamespaceString nss, const BSONObj& documentKey, const repl::OpTime& opTime, - CollectionShardingState* css, + const ShardingWriteRouter& shardingWriteRouter, const repl::OpTime& preImageOpTime, bool inMultiDocumentTransaction) override; void shardObserveTransactionPrepareOrUnpreparedCommit( OperationContext* opCtx, const std::vector<repl::ReplOperation>& stmts, const repl::OpTime& prepareOrCommitOptime) override; - - void shardAnnotateOplogEntry(OperationContext* opCtx, - NamespaceString nss, - const BSONObj& doc, - repl::DurableReplOperation& op, - CollectionShardingState* css, - const ScopedCollectionDescription& collDesc) override; }; } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp index c437eb3ec05..c72b4a2122a 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp @@ -73,6 +73,10 @@ public: ServiceContextMongoDTest::setUp(); auto serviceContext = getServiceContext(); + + // Initialize sharding components as a shard server. + serverGlobalParams.clusterRole = ClusterRole::ShardServer; + { auto opCtx = makeOperationContext(); auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(serviceContext); diff --git a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp index 3a96cd1672e..02889248639 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp @@ -65,6 +65,10 @@ public: ServiceContextMongoDTest::setUp(); auto serviceContext = getServiceContext(); + + // Initialize sharding components as a shard server. + serverGlobalParams.clusterRole = ClusterRole::ShardServer; + { auto opCtx = makeOperationContext(); auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(serviceContext); diff --git a/src/mongo/db/s/resharding_destined_recipient_test.cpp b/src/mongo/db/s/resharding_destined_recipient_test.cpp index 676154a6a44..a3ba879cac5 100644 --- a/src/mongo/db/s/resharding_destined_recipient_test.cpp +++ b/src/mongo/db/s/resharding_destined_recipient_test.cpp @@ -40,10 +40,10 @@ #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/operation_sharding_state.h" -#include "mongo/db/s/resharding_util.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/shard_server_test_fixture.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/sharding_write_router.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/db/transaction_participant.h" #include "mongo/s/catalog/sharding_catalog_client_mock.h" @@ -298,11 +298,10 @@ TEST_F(DestinedRecipientTest, TestGetDestinedRecipient) { AutoGetCollection coll(opCtx, kNss, MODE_IX); OperationShardingState::get(opCtx).initializeClientRoutingVersions( kNss, env.version, env.dbVersion); - auto* const css = CollectionShardingState::get(opCtx, kNss); - auto collDesc = css->getCollectionDescription(opCtx); + ShardingWriteRouter shardingWriteRouter(opCtx, kNss, Grid::get(opCtx)->catalogCache()); auto destShardId = - getDestinedRecipient(opCtx, kNss, BSON("x" << 2 << "y" << 10), css, collDesc); + shardingWriteRouter.getReshardingDestinedRecipient(BSON("x" << 2 << "y" << 10)); ASSERT(destShardId); ASSERT_EQ(*destShardId, env.destShard); } @@ -315,18 +314,16 @@ TEST_F(DestinedRecipientTest, TestGetDestinedRecipientThrowsOnBlockedRefresh) { AutoGetCollection coll(opCtx, kNss, MODE_IX); OperationShardingState::get(opCtx).initializeClientRoutingVersions( kNss, env.version, env.dbVersion); - auto* const css = CollectionShardingState::get(opCtx, kNss); - auto collDesc = css->getCollectionDescription(opCtx); FailPointEnableBlock failPoint("blockCollectionCacheLookup"); - ASSERT_THROWS_WITH_CHECK( - getDestinedRecipient(opCtx, kNss, BSON("x" << 2 << "y" << 10), css, collDesc), - ShardCannotRefreshDueToLocksHeldException, - [&](const ShardCannotRefreshDueToLocksHeldException& ex) { - const auto refreshInfo = ex.extraInfo<ShardCannotRefreshDueToLocksHeldInfo>(); - ASSERT(refreshInfo); - ASSERT_EQ(refreshInfo->getNss(), env.tempNss); - }); + ASSERT_THROWS_WITH_CHECK(ShardingWriteRouter(opCtx, kNss, Grid::get(opCtx)->catalogCache()), + ShardCannotRefreshDueToLocksHeldException, + [&](const ShardCannotRefreshDueToLocksHeldException& ex) { + const auto refreshInfo = + ex.extraInfo<ShardCannotRefreshDueToLocksHeldInfo>(); + ASSERT(refreshInfo); + ASSERT_EQ(refreshInfo->getNss(), env.tempNss); + }); } auto sw = catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, env.tempNss); diff --git a/src/mongo/db/s/resharding_donor_write_router.cpp b/src/mongo/db/s/resharding_donor_write_router.cpp deleted file mode 100644 index d5f4315928f..00000000000 --- a/src/mongo/db/s/resharding_donor_write_router.cpp +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/db/s/resharding_donor_write_router.h" - -namespace mongo { - -ReshardingDonorWriteRouter::ReshardingDonorWriteRouter(OperationContext* opCtx, - const NamespaceString& sourceNss, - CatalogCache* catalogCache) - : _css(nullptr), _collDesc(nullptr) {} - -ReshardingDonorWriteRouter::ReshardingDonorWriteRouter(OperationContext* opCtx, - const NamespaceString& sourceNss, - CatalogCache* catalogCache, - CollectionShardingState* css, - const ScopedCollectionDescription* collDesc) - : _css(css), _collDesc(collDesc) {} - -CollectionShardingState* ReshardingDonorWriteRouter::getCollectionShardingState() const { - return nullptr; -} - -boost::optional<ShardId> ReshardingDonorWriteRouter::getDestinedRecipient( - const BSONObj& fullDocument) const { - return boost::none; -} - -} // namespace mongo diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp index baec1d6281d..247fd986682 100644 --- a/src/mongo/db/s/resharding_util.cpp +++ b/src/mongo/db/s/resharding_util.cpp @@ -60,31 +60,6 @@ namespace mongo { using namespace fmt::literals; -namespace { - -UUID getCollectionUuid(OperationContext* opCtx, const NamespaceString& nss) { - dassert(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IS)); - - auto uuid = CollectionCatalog::get(opCtx)->lookupUUIDByNSS(opCtx, nss); - invariant(uuid); - - return *uuid; -} - -// Ensure that this shard owns the document. This must be called after verifying that we -// are in a resharding operation so that we are guaranteed that migrations are suspended. -bool documentBelongsToMe(OperationContext* opCtx, - CollectionShardingState* css, - const ScopedCollectionDescription& collDesc, - const BSONObj& doc) { - auto currentKeyPattern = ShardKeyPattern(collDesc.getKeyPattern()); - auto ownershipFilter = css->getOwnershipFilter( - opCtx, CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup); - - return ownershipFilter.keyBelongsToMe(currentKeyPattern.extractShardKeyFromDoc(doc)); -} -} // namespace - BSONObj serializeAndTruncateReshardingErrorIfNeeded(Status originalError) { BSONObjBuilder originalBob; originalError.serializeErrorToBSON(&originalBob); @@ -335,36 +310,6 @@ std::unique_ptr<Pipeline, PipelineDeleter> createOplogFetchingPipelineForReshard return Pipeline::create(std::move(stages), expCtx); } -boost::optional<ShardId> getDestinedRecipient(OperationContext* opCtx, - const NamespaceString& sourceNss, - const BSONObj& fullDocument, - CollectionShardingState* css, - const ScopedCollectionDescription& collDesc) { - if (!ShardingState::get(opCtx)->enabled()) { - // Don't bother looking up the sharding state for the collection if the server isn't even - // running with sharding enabled. We know there couldn't possibly be any resharding fields. - return boost::none; - } - - auto reshardingKeyPattern = collDesc.getReshardingKeyIfShouldForwardOps(); - if (!reshardingKeyPattern) - return boost::none; - - if (!documentBelongsToMe(opCtx, css, collDesc, fullDocument)) - return boost::none; - - bool allowLocks = true; - auto tempNssRoutingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo( - opCtx, - constructTemporaryReshardingNss(sourceNss.db(), getCollectionUuid(opCtx, sourceNss)), - allowLocks)); - - auto shardKey = reshardingKeyPattern->extractShardKeyFromDocThrows(fullDocument); - - return tempNssRoutingInfo.findIntersectingChunkWithSimpleCollation(shardKey).getShardId(); -} - bool isFinalOplog(const repl::OplogEntry& oplog) { if (oplog.getOpType() != repl::OpTypeEnum::kNoop) { return false; diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h index 0e8646a0f88..d38b2c8742c 100644 --- a/src/mongo/db/s/resharding_util.h +++ b/src/mongo/db/s/resharding_util.h @@ -281,16 +281,6 @@ std::unique_ptr<Pipeline, PipelineDeleter> createOplogFetchingPipelineForReshard const ShardId& recipientShard); /** - * Returns the shard Id of the recipient shard that would own the document under the new shard - * key pattern. - */ -boost::optional<ShardId> getDestinedRecipient(OperationContext* opCtx, - const NamespaceString& sourceNss, - const BSONObj& fullDocument, - CollectionShardingState* css, - const ScopedCollectionDescription& collDesc); - -/** * Sentinel oplog format: * { * op: "n", diff --git a/src/mongo/db/s/sharding_initialization_op_observer_test.cpp b/src/mongo/db/s/sharding_initialization_op_observer_test.cpp index d93a9d03212..f88fc027875 100644 --- a/src/mongo/db/s/sharding_initialization_op_observer_test.cpp +++ b/src/mongo/db/s/sharding_initialization_op_observer_test.cpp @@ -61,6 +61,8 @@ public: void setUp() override { ShardingMongodTestFixture::setUp(); + serverGlobalParams.clusterRole = ClusterRole::ShardServer; + // NOTE: this assumes that globalInit will always be called on the same thread as the main // test thread ShardingInitializationMongoD::get(operationContext()) diff --git a/src/mongo/db/s/sharding_mongod_test_fixture.cpp b/src/mongo/db/s/sharding_mongod_test_fixture.cpp index f81cda6ba08..abf6df3ff4f 100644 --- a/src/mongo/db/s/sharding_mongod_test_fixture.cpp +++ b/src/mongo/db/s/sharding_mongod_test_fixture.cpp @@ -41,6 +41,7 @@ #include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/op_observer_impl.h" #include "mongo/db/op_observer_registry.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/repl/drop_pending_collection_reaper.h" @@ -84,58 +85,7 @@ using repl::ReplicationCoordinatorMock; using repl::ReplSettings; using unittest::assertGet; -ShardingMongodTestFixture::ShardingMongodTestFixture() { - const auto service = getServiceContext(); - - // Set up this node as shard node, which is part of a replica set - - repl::ReplSettings replSettings; - replSettings.setOplogSizeBytes(512'000); - replSettings.setReplSetString(ConnectionString::forReplicaSet(_setName, _servers).toString()); - auto replCoordPtr = makeReplicationCoordinator(replSettings); - _replCoord = replCoordPtr.get(); - - BSONArrayBuilder serversBob; - for (size_t i = 0; i < _servers.size(); ++i) { - serversBob.append(BSON("host" << _servers[i].toString() << "_id" << static_cast<int>(i))); - } - - auto replSetConfig = - repl::ReplSetConfig::parse(BSON("_id" << _setName << "protocolVersion" << 1 << "version" - << 3 << "members" << serversBob.arr())); - replCoordPtr->setGetConfigReturnValue(replSetConfig); - - repl::ReplicationCoordinator::set(service, std::move(replCoordPtr)); - - auto storagePtr = std::make_unique<repl::StorageInterfaceMock>(); - - repl::DropPendingCollectionReaper::set( - service, std::make_unique<repl::DropPendingCollectionReaper>(storagePtr.get())); - - repl::ReplicationProcess::set(service, - std::make_unique<repl::ReplicationProcess>( - storagePtr.get(), - std::make_unique<repl::ReplicationConsistencyMarkersMock>(), - std::make_unique<repl::ReplicationRecoveryMock>())); - - auto uniqueOpCtx = makeOperationContext(); - ASSERT_OK( - repl::ReplicationProcess::get(uniqueOpCtx.get())->initializeRollbackID(uniqueOpCtx.get())); - - repl::StorageInterface::set(service, std::move(storagePtr)); - - auto opObserver = checked_cast<OpObserverRegistry*>(service->getOpObserver()); - opObserver->addObserver(std::make_unique<OpObserverShardingImpl>()); - opObserver->addObserver(std::make_unique<ConfigServerOpObserver>()); - opObserver->addObserver(std::make_unique<ShardServerOpObserver>()); - - repl::createOplog(uniqueOpCtx.get()); - - // Set the highest FCV because otherwise it defaults to the lower FCV. This way we default to - // testing this release's code, not backwards compatibility code. - // (Generic FCV reference): This FCV reference should exist across LTS binary versions. - serverGlobalParams.mutableFeatureCompatibility.setVersion(multiversion::GenericFCV::kLatest); -} +ShardingMongodTestFixture::ShardingMongodTestFixture() {} ShardingMongodTestFixture::~ShardingMongodTestFixture() = default; @@ -284,6 +234,53 @@ Status ShardingMongodTestFixture::initializeGlobalShardingStateForMongodForTest( void ShardingMongodTestFixture::setUp() { ServiceContextMongoDTest::setUp(); ShardingTestFixtureCommon::setUp(); + + const auto service = getServiceContext(); + + // Set up this node as shard node, which is part of a replica set + + repl::ReplSettings replSettings; + replSettings.setOplogSizeBytes(512'000); + replSettings.setReplSetString(ConnectionString::forReplicaSet(_setName, _servers).toString()); + auto replCoordPtr = makeReplicationCoordinator(replSettings); + _replCoord = replCoordPtr.get(); + + BSONArrayBuilder serversBob; + for (size_t i = 0; i < _servers.size(); ++i) { + serversBob.append(BSON("host" << _servers[i].toString() << "_id" << static_cast<int>(i))); + } + + auto replSetConfig = + repl::ReplSetConfig::parse(BSON("_id" << _setName << "protocolVersion" << 1 << "version" + << 3 << "members" << serversBob.arr())); + replCoordPtr->setGetConfigReturnValue(replSetConfig); + + repl::ReplicationCoordinator::set(service, std::move(replCoordPtr)); + + auto storagePtr = std::make_unique<repl::StorageInterfaceMock>(); + + repl::DropPendingCollectionReaper::set( + service, std::make_unique<repl::DropPendingCollectionReaper>(storagePtr.get())); + + repl::ReplicationProcess::set(service, + std::make_unique<repl::ReplicationProcess>( + storagePtr.get(), + std::make_unique<repl::ReplicationConsistencyMarkersMock>(), + std::make_unique<repl::ReplicationRecoveryMock>())); + + ASSERT_OK(repl::ReplicationProcess::get(operationContext()) + ->initializeRollbackID(operationContext())); + + repl::StorageInterface::set(service, std::move(storagePtr)); + + setupOpObservers(); + + repl::createOplog(operationContext()); + + // Set the highest FCV because otherwise it defaults to the lower FCV. This way we default to + // testing this release's code, not backwards compatibility code. + // (Generic FCV reference): This FCV reference should exist across LTS binary versions. + serverGlobalParams.mutableFeatureCompatibility.setVersion(multiversion::GenericFCV::kLatest); } void ShardingMongodTestFixture::tearDown() { @@ -349,4 +346,11 @@ repl::ReplicationCoordinatorMock* ShardingMongodTestFixture::replicationCoordina return _replCoord; } +void ShardingMongodTestFixture::setupOpObservers() { + auto opObserverRegistry = + checked_cast<OpObserverRegistry*>(getServiceContext()->getOpObserver()); + opObserverRegistry->addObserver(std::make_unique<OpObserverShardingImpl>()); + opObserverRegistry->addObserver(std::make_unique<ShardServerOpObserver>()); +} + } // namespace mongo diff --git a/src/mongo/db/s/sharding_mongod_test_fixture.h b/src/mongo/db/s/sharding_mongod_test_fixture.h index 7c6d955976e..41f4cd1b824 100644 --- a/src/mongo/db/s/sharding_mongod_test_fixture.h +++ b/src/mongo/db/s/sharding_mongod_test_fixture.h @@ -118,6 +118,11 @@ protected: */ virtual std::unique_ptr<BalancerConfiguration> makeBalancerConfiguration(); + /** + * Setups the op observer listeners depending on cluster role. + */ + virtual void setupOpObservers(); + private: /** * Base class returns a TaskExecutorPool with a fixed TaskExecutor and a set of arbitrary diff --git a/src/mongo/db/s/sharding_write_router.cpp b/src/mongo/db/s/sharding_write_router.cpp new file mode 100644 index 00000000000..87542f9c126 --- /dev/null +++ b/src/mongo/db/s/sharding_write_router.cpp @@ -0,0 +1,85 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/s/sharding_write_router.h" + +#include "mongo/db/catalog/collection_catalog.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/grid.h" + +namespace mongo { + +ShardingWriteRouter::ShardingWriteRouter(OperationContext* opCtx, + const NamespaceString& nss, + CatalogCache* catalogCache) { + if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { + _css = CollectionShardingState::get(opCtx, nss); + auto collDesc = _css->getCollectionDescription(opCtx); + + _reshardKeyPattern = collDesc.getReshardingKeyIfShouldForwardOps(); + if (_reshardKeyPattern) { + _ownershipFilter = _css->getOwnershipFilter( + opCtx, CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup); + _shardKeyPattern = ShardKeyPattern(collDesc.getKeyPattern()); + + const auto& reshardingFields = collDesc.getReshardingFields(); + invariant(reshardingFields); + const auto& donorFields = reshardingFields->getDonorFields(); + invariant(donorFields); + + _reshardingChunkMgr = uassertStatusOK(catalogCache->getCollectionRoutingInfo( + opCtx, donorFields->getTempReshardingNss(), true /* allowLocks */)); + } + } +} + +CollectionShardingState* ShardingWriteRouter::getCollectionShardingState() const { + return _css; +} + +boost::optional<ShardId> ShardingWriteRouter::getReshardingDestinedRecipient( + const BSONObj& fullDocument) const { + if (!_reshardKeyPattern) { + return boost::none; + } + + invariant(_ownershipFilter); + invariant(_shardKeyPattern); + invariant(_reshardingChunkMgr); + + if (!_ownershipFilter->keyBelongsToMe(_shardKeyPattern->extractShardKeyFromDoc(fullDocument))) { + return boost::none; + } + + auto shardKey = _reshardKeyPattern->extractShardKeyFromDocThrows(fullDocument); + return _reshardingChunkMgr->findIntersectingChunkWithSimpleCollation(shardKey).getShardId(); +} + +} // namespace mongo diff --git a/src/mongo/db/s/resharding_donor_write_router.h b/src/mongo/db/s/sharding_write_router.h index 13b76115e1c..219e31859b0 100644 --- a/src/mongo/db/s/resharding_donor_write_router.h +++ b/src/mongo/db/s/sharding_write_router.h @@ -37,29 +37,23 @@ class CatalogCache; class ChunkManager; class OperationContext; class ShardId; -class ReshardingDonorWriteRouter { +class ShardingWriteRouter { public: - ReshardingDonorWriteRouter(OperationContext* opCtx, - const NamespaceString& sourceNss, - CatalogCache* catalogCache); + ShardingWriteRouter(OperationContext* opCtx, + const NamespaceString& nss, + CatalogCache* catalogCache); - ReshardingDonorWriteRouter(OperationContext* opCtx, - const NamespaceString& sourceNss, - CatalogCache* catalogCache, - CollectionShardingState* css, - const ScopedCollectionDescription* collDesc); + boost::optional<ShardId> getReshardingDestinedRecipient(const BSONObj& fullDocument) const; CollectionShardingState* getCollectionShardingState() const; - boost::optional<ShardId> getDestinedRecipient(const BSONObj& fullDocument) const; - private: - CollectionShardingState* const _css; - const ScopedCollectionDescription* const _collDesc; + CollectionShardingState* _css{nullptr}; boost::optional<ScopedCollectionFilter> _ownershipFilter; - boost::optional<ShardKeyPattern> _reshardingKeyPattern; - boost::optional<ChunkManager> _tempReshardingChunkMgr; + boost::optional<ShardKeyPattern> _shardKeyPattern; + boost::optional<ShardKeyPattern> _reshardKeyPattern; + boost::optional<ChunkManager> _reshardingChunkMgr; }; } // namespace mongo |