diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2022-03-09 07:57:19 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-09 08:45:38 +0000 |
commit | fb608f9bc8fe237694152b42f568ec97a12b4081 (patch) | |
tree | f68362d2a0104cf40e1236a206cac27fc9da4164 /src | |
parent | ff90764302229844a8431c68dcaf3f40e4db9a19 (diff) | |
download | mongo-fb608f9bc8fe237694152b42f568ec97a12b4081.tar.gz |
SERVER-64057 Implement a scoped object for setting the expected shard/database versions
Diffstat (limited to 'src')
24 files changed, 558 insertions, 446 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index b59bb1ae4e6..75368eb49bb 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -841,8 +841,10 @@ Status runAggregate(OperationContext* opCtx, // Set this operation's shard version for the underlying collection to unsharded. // This is prerequisite for future shard versioning checks. - OperationShardingState::get(opCtx).initializeClientRoutingVersions( - resolvedView.getNamespace(), ChunkVersion::UNSHARDED(), boost::none); + ScopedSetShardRole scopedSetShardRole(opCtx, + resolvedView.getNamespace(), + ChunkVersion::UNSHARDED() /* shardVersion */, + boost::none /* databaseVersion */); bool collectionIsSharded = [opCtx, &resolvedView]() { AutoGetCollection autoColl(opCtx, diff --git a/src/mongo/db/index_builds_coordinator_mongod.cpp b/src/mongo/db/index_builds_coordinator_mongod.cpp index be01c71eb53..4232817b409 100644 --- a/src/mongo/db/index_builds_coordinator_mongod.cpp +++ b/src/mongo/db/index_builds_coordinator_mongod.cpp @@ -279,8 +279,6 @@ IndexBuildsCoordinatorMongod::_startIndexBuild(OperationContext* opCtx, const auto nss = CollectionCatalog::get(opCtx)->resolveNamespaceStringOrUUID(opCtx, nssOrUuid); auto& oss = OperationShardingState::get(opCtx); - const auto shardVersion = oss.getShardVersion(nss); - const auto dbVersion = oss.getDbVersion(dbName); // Task in thread pool should have similar CurOp representation to the caller so that it can be // identified as a createIndexes operation. @@ -323,8 +321,8 @@ IndexBuildsCoordinatorMongod::_startIndexBuild(OperationContext* opCtx, replState, startPromise = std::move(startPromise), startTimestamp, - shardVersion, - dbVersion, + shardVersion = oss.getShardVersion(nss), + dbVersion = oss.getDbVersion(dbName), resumeInfo, impersonatedClientAttrs = std::move(impersonatedClientAttrs) ](auto status) mutable noexcept { @@ -344,14 +342,13 @@ IndexBuildsCoordinatorMongod::_startIndexBuild(OperationContext* opCtx, auto opCtx = Client::getCurrent()->makeOperationContext(); // Load the external client's attributes into this thread's client for auditing. - auto authSession = AuthorizationSession::get(Client::getCurrent()); + auto authSession = AuthorizationSession::get(opCtx->getClient()); if (authSession) { authSession->setImpersonatedUserData(std::move(impersonatedClientAttrs.userNames), std::move(impersonatedClientAttrs.roleNames)); } - auto& oss = OperationShardingState::get(opCtx.get()); - oss.initializeClientRoutingVersions(nss, shardVersion, dbVersion); + ScopedSetShardRole scopedSetShardRole(opCtx.get(), nss, shardVersion, dbVersion); { stdx::unique_lock<Client> lk(*opCtx->getClient()); diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index 68801556252..1e68272a9f9 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -397,8 +397,7 @@ bool ShardServerProcessInterface::setExpectedDbVersion(OperationContext* opCtx, "Expected db version must match known db version", knownDBVersion == dbVersion); } else { - OperationShardingState::get(opCtx).initializeClientRoutingVersions( - nss, boost::none, dbVersion); + OperationShardingState::setShardRole(opCtx, nss, boost::none, dbVersion); } return false; @@ -412,8 +411,7 @@ void ShardServerProcessInterface::setExpectedShardVersion( if (oss.hasShardVersion(nss)) { invariant(oss.getShardVersion(nss) == chunkVersion); } else { - OperationShardingState::get(opCtx).initializeClientRoutingVersions( - nss, chunkVersion, boost::none); + OperationShardingState::setShardRole(opCtx, nss, chunkVersion, boost::none); } } diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index eb407909514..531dbc57111 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -497,15 +497,16 @@ env.CppUnitTest( 'op_observer_sharding_test.cpp', 'persistent_task_queue_test.cpp', 'range_deletion_util_test.cpp', - 'resharding_collection_test.cpp', - 'resharding_destined_recipient_test.cpp', 'resharding/resharding_agg_test.cpp', 'resharding/resharding_collection_cloner_test.cpp', + 'resharding/resharding_collection_test.cpp', 'resharding/resharding_data_replication_test.cpp', + 'resharding/resharding_destined_recipient_test.cpp', 'resharding/resharding_donor_oplog_iterator_test.cpp', 'resharding/resharding_donor_recipient_common_test.cpp', - 'resharding/resharding_metrics_test.cpp', + 'resharding/resharding_donor_service_test.cpp', 'resharding/resharding_metrics_new_test.cpp', + 'resharding/resharding_metrics_test.cpp', 'resharding/resharding_oplog_applier_test.cpp', 'resharding/resharding_oplog_batch_applier_test.cpp', 'resharding/resharding_oplog_batch_preparer_test.cpp', @@ -514,7 +515,6 @@ env.CppUnitTest( 'resharding/resharding_oplog_session_application_test.cpp', 'resharding/resharding_recipient_service_external_state_test.cpp', 'resharding/resharding_recipient_service_test.cpp', - 'resharding/resharding_donor_service_test.cpp', 'resharding/resharding_txn_cloner_test.cpp', 'session_catalog_migration_destination_test.cpp', 'session_catalog_migration_source_test.cpp', @@ -536,6 +536,7 @@ env.CppUnitTest( 'transaction_coordinator_test.cpp', 'type_shard_collection_test.cpp', 'type_shard_identity_test.cpp', + 'operation_sharding_state_test.cpp', 'vector_clock_shard_server_test.cpp', ], LIBDEPS=[ diff --git a/src/mongo/db/s/collection_metadata_filtering_test.cpp b/src/mongo/db/s/collection_metadata_filtering_test.cpp index 2190a56ce8f..f7c994c706a 100644 --- a/src/mongo/db/s/collection_metadata_filtering_test.cpp +++ b/src/mongo/db/s/collection_metadata_filtering_test.cpp @@ -125,8 +125,8 @@ protected: _manager = std::make_shared<MetadataManager>( getServiceContext(), kNss, executor(), CollectionMetadata(cm, ShardId("0"))); - OperationShardingState::get(operationContext()) - .initializeClientRoutingVersions(kNss, cm.getVersion(ShardId("0")), boost::none); + OperationShardingState::setShardRole( + operationContext(), kNss, cm.getVersion(ShardId("0")), boost::none); } std::shared_ptr<MetadataManager> _manager; diff --git a/src/mongo/db/s/collection_sharding_runtime_test.cpp b/src/mongo/db/s/collection_sharding_runtime_test.cpp index 91a7bb31157..540e7f5363f 100644 --- a/src/mongo/db/s/collection_sharding_runtime_test.cpp +++ b/src/mongo/db/s/collection_sharding_runtime_test.cpp @@ -81,8 +81,8 @@ protected: boost::none); if (!OperationShardingState::isOperationVersioned(opCtx)) { - OperationShardingState::get(opCtx).initializeClientRoutingVersions( - kTestNss, cm.getVersion(ShardId("0")), boost::none); + OperationShardingState::setShardRole( + opCtx, kTestNss, cm.getVersion(ShardId("0")), boost::none); } return CollectionMetadata(std::move(cm), ShardId("0")); @@ -123,7 +123,6 @@ TEST_F(CollectionShardingRuntimeTest, TEST_F( CollectionShardingRuntimeTest, GetCurrentMetadataIfKnownReturnsUnshardedAfterSetFilteringMetadataIsCalledWithUnshardedMetadata) { - CollectionShardingRuntime csr(getServiceContext(), kTestNss, executor()); csr.setFilteringMetadata(operationContext(), CollectionMetadata()); const auto optCurrMetadata = csr.getCurrentMetadataIfKnown(); @@ -135,7 +134,6 @@ TEST_F( TEST_F( CollectionShardingRuntimeTest, GetCurrentMetadataIfKnownReturnsShardedAfterSetFilteringMetadataIsCalledWithShardedMetadata) { - CollectionShardingRuntime csr(getServiceContext(), kTestNss, executor()); OperationContext* opCtx = operationContext(); auto metadata = makeShardedMetadata(opCtx); diff --git a/src/mongo/db/s/op_observer_sharding_test.cpp b/src/mongo/db/s/op_observer_sharding_test.cpp index 735b9e87212..8ea4dd63904 100644 --- a/src/mongo/db/s/op_observer_sharding_test.cpp +++ b/src/mongo/db/s/op_observer_sharding_test.cpp @@ -48,8 +48,7 @@ void setCollectionFilteringMetadata(OperationContext* opCtx, CollectionMetadata CollectionShardingRuntime::get(opCtx, kTestNss) ->setFilteringMetadata(opCtx, std::move(metadata)); - OperationShardingState::get(opCtx).initializeClientRoutingVersions( - kTestNss, version, boost::none); + OperationShardingState::setShardRole(opCtx, kTestNss, version, boost::none); } class DocumentKeyStateTest : public ShardServerTestFixture { diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp index 9a782d5eb6f..74d71ef6594 100644 --- a/src/mongo/db/s/operation_sharding_state.cpp +++ b/src/mongo/db/s/operation_sharding_state.cpp @@ -27,10 +27,10 @@ * it in the license file. */ -#include "mongo/platform/basic.h" - #include "mongo/db/s/operation_sharding_state.h" +#include "mongo/logv2/log_debug.h" + namespace mongo { namespace { @@ -60,26 +60,35 @@ bool OperationShardingState::isOperationVersioned(OperationContext* opCtx) { return !oss._shardVersions.empty(); } -void OperationShardingState::initializeClientRoutingVersions( - NamespaceString nss, - const boost::optional<ChunkVersion>& shardVersion, - const boost::optional<DatabaseVersion>& dbVersion) { +void OperationShardingState::setShardRole(OperationContext* opCtx, + const NamespaceString& nss, + const boost::optional<ChunkVersion>& shardVersion, + const boost::optional<DatabaseVersion>& databaseVersion) { + auto& oss = OperationShardingState::get(opCtx); + if (shardVersion) { - // Changing the shardVersion expected for a namespace is not safe to happen in the - // middle of execution, but for the cases where operation is retried on the same - // OperationContext it can be set twice to the same value. - if (_shardVersionsChecked.contains(nss.ns())) { - invariant(_shardVersions[nss.ns()] == *shardVersion, - str::stream() - << "Trying to set " << shardVersion->toString() << " for " << nss.ns() - << " but it already has " << _shardVersions[nss.ns()].toString()); - } else { - _shardVersions.emplace(nss.ns(), *shardVersion); + auto emplaceResult = oss._shardVersions.try_emplace(nss.ns(), *shardVersion); + auto& tracker = emplaceResult.first->second; + if (!emplaceResult.second) { + uassert(640570, + str::stream() << "Illegal attempt to change the expected shard version for " + << nss << " from " << tracker.v << " to " << *shardVersion, + tracker.v == *shardVersion); } + invariant(++tracker.recursion > 0); } - if (dbVersion) { - const auto [_, inserted] = _databaseVersions.emplace(nss.db(), *dbVersion); - invariant(inserted); + + if (databaseVersion) { + auto emplaceResult = oss._databaseVersions.try_emplace(nss.db(), *databaseVersion); + auto& tracker = emplaceResult.first->second; + if (!emplaceResult.second) { + uassert(640571, + str::stream() << "Illegal attempt to change the expected database version for " + << nss.db() << " from " << tracker.v << " to " + << *databaseVersion, + tracker.v == *databaseVersion); + } + invariant(++tracker.recursion > 0); } } @@ -93,12 +102,10 @@ bool OperationShardingState::hasShardVersion(const NamespaceString& nss) const { } boost::optional<ChunkVersion> OperationShardingState::getShardVersion(const NamespaceString& nss) { - _shardVersionsChecked.insert(nss.ns()); const auto it = _shardVersions.find(nss.ns()); if (it != _shardVersions.end()) { - return it->second; + return it->second.v; } - return boost::none; } @@ -106,13 +113,12 @@ bool OperationShardingState::hasDbVersion() const { return !_databaseVersions.empty(); } -boost::optional<DatabaseVersion> OperationShardingState::getDbVersion( - const StringData dbName) const { +boost::optional<DatabaseVersion> OperationShardingState::getDbVersion(StringData dbName) const { const auto it = _databaseVersions.find(dbName); - if (it == _databaseVersions.end()) { - return boost::none; + if (it != _databaseVersions.end()) { + return it->second.v; } - return it->second; + return boost::none; } bool OperationShardingState::waitForMigrationCriticalSectionSignal(OperationContext* opCtx) { @@ -196,4 +202,37 @@ ScopedAllowImplicitCollectionCreate_UNSAFE::~ScopedAllowImplicitCollectionCreate oss._allowCollectionCreation = false; } +ScopedSetShardRole::ScopedSetShardRole(OperationContext* opCtx, + NamespaceString nss, + boost::optional<ChunkVersion> shardVersion, + boost::optional<DatabaseVersion> databaseVersion) + : _opCtx(opCtx), + _nss(std::move(nss)), + _shardVersion(std::move(shardVersion)), + _databaseVersion(std::move(databaseVersion)) { + OperationShardingState::setShardRole(_opCtx, _nss, _shardVersion, _databaseVersion); +} + +ScopedSetShardRole::~ScopedSetShardRole() { + auto& oss = OperationShardingState::get(_opCtx); + + if (_shardVersion) { + auto it = oss._shardVersions.find(_nss.ns()); + invariant(it != oss._shardVersions.end()); + auto& tracker = it->second; + invariant(--tracker.recursion >= 0); + if (tracker.recursion == 0) + oss._shardVersions.erase(it); + } + + if (_databaseVersion) { + auto it = oss._databaseVersions.find(_nss.db()); + invariant(it != oss._databaseVersions.end()); + auto& tracker = it->second; + invariant(--tracker.recursion >= 0); + if (tracker.recursion == 0) + oss._databaseVersions.erase(it); + } +} + } // namespace mongo diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h index 2a39b630658..63c726fcc0b 100644 --- a/src/mongo/db/s/operation_sharding_state.h +++ b/src/mongo/db/s/operation_sharding_state.h @@ -41,6 +41,28 @@ namespace mongo { /** + * Marks the opCtx during scope in which it has been instantiated as running in the shard role for + * the specified collection. This indicates to the underlying storage system that the caller has + * performed 'routing', in the sense that it is aware of what data is located on this node. + */ +class ScopedSetShardRole { +public: + ScopedSetShardRole(OperationContext* opCtx, + NamespaceString nss, + boost::optional<ChunkVersion> shardVersion, + boost::optional<DatabaseVersion> databaseVersion); + ~ScopedSetShardRole(); + +private: + OperationContext* const _opCtx; + + NamespaceString _nss; + + boost::optional<ChunkVersion> _shardVersion; + boost::optional<DatabaseVersion> _databaseVersion; +}; + +/** * A decoration on OperationContext representing per-operation shard version metadata sent to mongod * from mongos as a command parameter. * @@ -86,13 +108,13 @@ public: }; /** - * Stores the given shardVersion and databaseVersion for the given namespace. Note: The shard - * version for the given namespace stored in the OperationShardingState can be overwritten if it - * has not been checked yet. + * Same semantics as ScopedSetShardRole above, but the role remains set for the lifetime of the + * opCtx. */ - void initializeClientRoutingVersions(NamespaceString nss, - const boost::optional<ChunkVersion>& shardVersion, - const boost::optional<DatabaseVersion>& dbVersion); + static void setShardRole(OperationContext* opCtx, + const NamespaceString& nss, + const boost::optional<ChunkVersion>& shardVersion, + const boost::optional<DatabaseVersion>& dbVersion); /** * Removes the databaseVersion stored for the given namespace. @@ -173,19 +195,31 @@ public: boost::optional<Status> resetShardingOperationFailedStatus(); private: + friend class ScopedSetShardRole; friend class ShardServerOpObserver; // For access to _allowCollectionCreation below // Specifies whether the request is allowed to create database/collection implicitly bool _allowCollectionCreation{false}; - // The OperationShardingState class supports storing shardVersions for multiple namespaces (and - // databaseVersions for multiple databases), even though client code has not been written yet to - // *send* multiple shardVersions or databaseVersions. - StringMap<ChunkVersion> _shardVersions; - StringMap<DatabaseVersion> _databaseVersions; - - // Stores shards that have undergone a version check. - StringSet _shardVersionsChecked; + // Stores the shard version expected for each collection that will be accessed + struct ShardVersionTracker { + ShardVersionTracker(ChunkVersion v) : v(v) {} + ShardVersionTracker(ShardVersionTracker&&) = default; + ShardVersionTracker(const ShardVersionTracker&) = delete; + ChunkVersion v; + int recursion{0}; + }; + StringMap<ShardVersionTracker> _shardVersions; + + // Stores the database version expected for each database that will be accessed + struct DatabaseVersionTracker { + DatabaseVersionTracker(DatabaseVersion v) : v(v) {} + DatabaseVersionTracker(DatabaseVersionTracker&&) = default; + DatabaseVersionTracker(const DatabaseVersionTracker&) = delete; + DatabaseVersion v; + int recursion{0}; + }; + StringMap<DatabaseVersionTracker> _databaseVersions; // This value will only be non-null if version check during the operation execution failed due // to stale version and there was a migration for that namespace, which was in critical section. diff --git a/src/mongo/db/s/operation_sharding_state_test.cpp b/src/mongo/db/s/operation_sharding_state_test.cpp new file mode 100644 index 00000000000..0c4732b51ab --- /dev/null +++ b/src/mongo/db/s/operation_sharding_state_test.cpp @@ -0,0 +1,88 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/shard_server_test_fixture.h" + +namespace mongo { +namespace { + +const NamespaceString kNss("TestDB", "TestColl"); +const NamespaceString kAnotherNss("TestDB", "AnotherColl"); + +using OperationShardingStateTest = ShardServerTestFixture; + +TEST_F(OperationShardingStateTest, ScopedSetShardRoleDbVersion) { + DatabaseVersion dbv{DatabaseVersion::makeFixed()}; + ScopedSetShardRole scopedSetShardRole(operationContext(), kNss, boost::none, dbv); + + auto& oss = OperationShardingState::get(operationContext()); + ASSERT_EQ(dbv, *oss.getDbVersion(kNss.db())); +} + +TEST_F(OperationShardingStateTest, ScopedSetShardRoleShardVersion) { + ChunkVersion shardVersion(1, 0, OID::gen(), Timestamp(1, 0)); + ScopedSetShardRole scopedSetShardRole(operationContext(), kNss, shardVersion, boost::none); + + auto& oss = OperationShardingState::get(operationContext()); + ASSERT_EQ(shardVersion, *oss.getShardVersion(kNss)); +} + +TEST_F(OperationShardingStateTest, ScopedSetShardRoleChangeShardVersionSameNamespace) { + auto& oss = OperationShardingState::get(operationContext()); + + { + ChunkVersion shardVersion1(1, 0, OID::gen(), Timestamp(10, 0)); + ScopedSetShardRole scopedSetShardRole1( + operationContext(), kNss, shardVersion1, boost::none); + ASSERT_EQ(shardVersion1, *oss.getShardVersion(kNss)); + } + { + ChunkVersion shardVersion2(1, 0, OID::gen(), Timestamp(20, 0)); + ScopedSetShardRole scopedSetShardRole2( + operationContext(), kNss, shardVersion2, boost::none); + ASSERT_EQ(shardVersion2, *oss.getShardVersion(kNss)); + } +} + +TEST_F(OperationShardingStateTest, ScopedSetShardRoleRecursiveShardVersionDifferentNamespaces) { + ChunkVersion shardVersion1(1, 0, OID::gen(), Timestamp(10, 0)); + ChunkVersion shardVersion2(1, 0, OID::gen(), Timestamp(20, 0)); + + ScopedSetShardRole scopedSetShardRole1(operationContext(), kNss, shardVersion1, boost::none); + ScopedSetShardRole scopedSetShardRole2( + operationContext(), kAnotherNss, shardVersion2, boost::none); + + auto& oss = OperationShardingState::get(operationContext()); + ASSERT_EQ(shardVersion1, *oss.getShardVersion(kNss)); + ASSERT_EQ(shardVersion2, *oss.getShardVersion(kAnotherNss)); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/range_deletion_util_test.cpp b/src/mongo/db/s/range_deletion_util_test.cpp index ed8c0b5e4e6..5fe823cc79b 100644 --- a/src/mongo/db/s/range_deletion_util_test.cpp +++ b/src/mongo/db/s/range_deletion_util_test.cpp @@ -120,8 +120,8 @@ public: makeStandaloneRoutingTableHistory(std::move(rt)), boost::none); if (!OperationShardingState::isOperationVersioned(_opCtx)) { - OperationShardingState::get(_opCtx).initializeClientRoutingVersions( - kNss, cm.getVersion(ShardId("dummyShardId")), boost::none); + OperationShardingState::setShardRole( + _opCtx, kNss, cm.getVersion(ShardId("dummyShardId")), boost::none); } AutoGetDb autoDb(_opCtx, kNss.db(), MODE_IX); Lock::CollectionLock collLock(_opCtx, kNss, MODE_IX); diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp index b950fd5fd61..9132438053a 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp +++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp @@ -279,9 +279,10 @@ bool ReshardingCollectionCloner::doOneBatch(OperationContext* opCtx, Pipeline& p // the temporary resharding collection. We attach shard version IGNORED to the insert operations // and retry once on a StaleConfig exception to allow the collection metadata information to be // recovered. - auto& oss = OperationShardingState::get(opCtx); - oss.initializeClientRoutingVersions( - _outputNss, ChunkVersion::IGNORED() /* shardVersion */, boost::none /* dbVersion */); + ScopedSetShardRole scopedSetShardRole(opCtx, + _outputNss, + ChunkVersion::IGNORED() /* shardVersion */, + boost::none /* databaseVersion */); int bytesInserted = resharding::data_copy::withOneStaleConfigRetry( opCtx, [&] { return resharding::data_copy::insertBatch(opCtx, _outputNss, batch); }); diff --git a/src/mongo/db/s/resharding_collection_test.cpp b/src/mongo/db/s/resharding/resharding_collection_test.cpp index 71e3c780990..71e3c780990 100644 --- a/src/mongo/db/s/resharding_collection_test.cpp +++ b/src/mongo/db/s/resharding/resharding_collection_test.cpp diff --git a/src/mongo/db/s/resharding_destined_recipient_test.cpp b/src/mongo/db/s/resharding/resharding_destined_recipient_test.cpp index 23f0cb3ffc8..fadc390a510 100644 --- a/src/mongo/db/s/resharding_destined_recipient_test.cpp +++ b/src/mongo/db/s/resharding/resharding_destined_recipient_test.cpp @@ -295,8 +295,7 @@ TEST_F(DestinedRecipientTest, TestGetDestinedRecipient) { auto env = setupReshardingEnv(opCtx, true); AutoGetCollection coll(opCtx, kNss, MODE_IX); - OperationShardingState::get(opCtx).initializeClientRoutingVersions( - kNss, env.version, env.dbVersion); + OperationShardingState::setShardRole(opCtx, kNss, env.version, env.dbVersion); ShardingWriteRouter shardingWriteRouter(opCtx, kNss, Grid::get(opCtx)->catalogCache()); auto destShardId = @@ -311,8 +310,7 @@ TEST_F(DestinedRecipientTest, TestGetDestinedRecipientThrowsOnBlockedRefresh) { { AutoGetCollection coll(opCtx, kNss, MODE_IX); - OperationShardingState::get(opCtx).initializeClientRoutingVersions( - kNss, env.version, env.dbVersion); + OperationShardingState::setShardRole(opCtx, kNss, env.version, env.dbVersion); FailPointEnableBlock failPoint("blockCollectionCacheLookup"); ASSERT_THROWS_WITH_CHECK(ShardingWriteRouter(opCtx, kNss, Grid::get(opCtx)->catalogCache()), @@ -332,8 +330,7 @@ TEST_F(DestinedRecipientTest, TestOpObserverSetsDestinedRecipientOnInserts) { auto opCtx = operationContext(); auto env = setupReshardingEnv(opCtx, true); - OperationShardingState::get(opCtx).initializeClientRoutingVersions( - kNss, env.version, env.dbVersion); + OperationShardingState::setShardRole(opCtx, kNss, env.version, env.dbVersion); writeDoc(opCtx, kNss, BSON("_id" << 0 << "x" << 2 << "y" << 10), env); auto entry = getLastOplogEntry(opCtx); @@ -347,8 +344,7 @@ TEST_F(DestinedRecipientTest, TestOpObserverSetsDestinedRecipientOnInsertsInTran auto opCtx = operationContext(); auto env = setupReshardingEnv(opCtx, true); - OperationShardingState::get(opCtx).initializeClientRoutingVersions( - kNss, env.version, env.dbVersion); + OperationShardingState::setShardRole(opCtx, kNss, env.version, env.dbVersion); runInTransaction( opCtx, [&]() { writeDoc(opCtx, kNss, BSON("_id" << 0 << "x" << 2 << "y" << 10), env); }); @@ -375,8 +371,7 @@ TEST_F(DestinedRecipientTest, TestOpObserverSetsDestinedRecipientOnUpdates) { auto env = setupReshardingEnv(opCtx, true); - OperationShardingState::get(opCtx).initializeClientRoutingVersions( - kNss, env.version, env.dbVersion); + OperationShardingState::setShardRole(opCtx, kNss, env.version, env.dbVersion); updateDoc(opCtx, kNss, BSON("_id" << 0), BSON("$set" << BSON("z" << 50)), env); auto entry = getLastOplogEntry(opCtx); @@ -395,8 +390,7 @@ TEST_F(DestinedRecipientTest, TestOpObserverSetsDestinedRecipientOnMultiUpdates) auto env = setupReshardingEnv(opCtx, true); - OperationShardingState::get(opCtx).initializeClientRoutingVersions( - kNss, ChunkVersion::IGNORED(), env.dbVersion); + OperationShardingState::setShardRole(opCtx, kNss, ChunkVersion::IGNORED(), env.dbVersion); client.update(kNss.ns(), BSON("x" << 0), BSON("$set" << BSON("z" << 5)), @@ -418,8 +412,7 @@ TEST_F(DestinedRecipientTest, TestOpObserverSetsDestinedRecipientOnUpdatesOutOfP auto env = setupReshardingEnv(opCtx, true); - OperationShardingState::get(opCtx).initializeClientRoutingVersions( - kNss, env.version, env.dbVersion); + OperationShardingState::setShardRole(opCtx, kNss, env.version, env.dbVersion); updateDoc(opCtx, kNss, BSON("_id" << 0), BSON("$set" << BSON("z" << 50)), env); auto entry = getLastOplogEntry(opCtx); @@ -437,8 +430,7 @@ TEST_F(DestinedRecipientTest, TestOpObserverSetsDestinedRecipientOnUpdatesInTran auto env = setupReshardingEnv(opCtx, true); - OperationShardingState::get(opCtx).initializeClientRoutingVersions( - kNss, env.version, env.dbVersion); + OperationShardingState::setShardRole(opCtx, kNss, env.version, env.dbVersion); runInTransaction(opCtx, [&]() { updateDoc(opCtx, kNss, BSON("_id" << 0), BSON("$set" << BSON("z" << 50)), env); }); @@ -466,8 +458,7 @@ TEST_F(DestinedRecipientTest, TestOpObserverSetsDestinedRecipientOnDeletes) { auto env = setupReshardingEnv(opCtx, true); - OperationShardingState::get(opCtx).initializeClientRoutingVersions( - kNss, env.version, env.dbVersion); + OperationShardingState::setShardRole(opCtx, kNss, env.version, env.dbVersion); deleteDoc(opCtx, kNss, BSON("_id" << 0), env); auto entry = getLastOplogEntry(opCtx); @@ -485,8 +476,7 @@ TEST_F(DestinedRecipientTest, TestOpObserverSetsDestinedRecipientOnDeletesInTran auto env = setupReshardingEnv(opCtx, true); - OperationShardingState::get(opCtx).initializeClientRoutingVersions( - kNss, env.version, env.dbVersion); + OperationShardingState::setShardRole(opCtx, kNss, env.version, env.dbVersion); runInTransaction(opCtx, [&]() { deleteDoc(opCtx, kNss, BSON("_id" << 0), env); }); // Look for destined recipient in latest oplog entry. Since this write was done in a @@ -512,8 +502,7 @@ TEST_F(DestinedRecipientTest, TestUpdateChangesOwningShardThrows) { auto env = setupReshardingEnv(opCtx, true); - OperationShardingState::get(opCtx).initializeClientRoutingVersions( - kNss, env.version, env.dbVersion); + OperationShardingState::setShardRole(opCtx, kNss, env.version, env.dbVersion); ASSERT_THROWS(runInTransaction(opCtx, [&]() { updateDoc(opCtx, @@ -533,8 +522,7 @@ TEST_F(DestinedRecipientTest, TestUpdateSameOwningShard) { auto env = setupReshardingEnv(opCtx, true); - OperationShardingState::get(opCtx).initializeClientRoutingVersions( - kNss, env.version, env.dbVersion); + OperationShardingState::setShardRole(opCtx, kNss, env.version, env.dbVersion); runInTransaction(opCtx, [&]() { updateDoc(opCtx, kNss, BSON("_id" << 0 << "x" << 2), BSON("$set" << BSON("y" << 3)), env); }); diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp index ce285e48b9a..f3c2abb8ac7 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp @@ -27,10 +27,6 @@ * it in the license file. */ -#include "mongo/platform/basic.h" - -#include "mongo/db/s/resharding/resharding_donor_recipient_common_test.h" - #include "mongo/db/catalog/drop_database.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/db_raii.h" @@ -50,6 +46,291 @@ namespace { using namespace fmt::literals; +/** + * This test fixture does not create any resharding POSs and should be preferred to + * `ReshardingDonorRecipientCommonTest` when they are not required. + */ +class ReshardingDonorRecipientCommonInternalsTest : public ShardServerTestFixture { +public: + const UUID kExistingUUID = UUID::gen(); + const Timestamp kExistingTimestamp = Timestamp(10, 5); + const NamespaceString kOriginalNss = NamespaceString("db", "foo"); + + const NamespaceString kTemporaryReshardingNss = + constructTemporaryReshardingNss("db", kExistingUUID); + const std::string kOriginalShardKey = "oldKey"; + const BSONObj kOriginalShardKeyPattern = BSON(kOriginalShardKey << 1); + const std::string kReshardingKey = "newKey"; + const BSONObj kReshardingKeyPattern = BSON(kReshardingKey << 1); + const OID kOriginalEpoch = OID::gen(); + const OID kReshardingEpoch = OID::gen(); + const UUID kReshardingUUID = UUID::gen(); + const Timestamp kReshardingTimestamp = Timestamp(kExistingTimestamp.getSecs() + 1, 0); + + const DonorShardFetchTimestamp kThisShard = + makeDonorShardFetchTimestamp(ShardId("shardOne"), Timestamp(10, 0)); + const DonorShardFetchTimestamp kOtherShard = + makeDonorShardFetchTimestamp(ShardId("shardTwo"), Timestamp(20, 0)); + + const std::vector<DonorShardFetchTimestamp> kShards = {kThisShard, kOtherShard}; + + const Timestamp kCloneTimestamp = Timestamp(20, 0); + +protected: + CollectionMetadata makeShardedMetadataForOriginalCollection( + OperationContext* opCtx, const ShardId& shardThatChunkExistsOn) { + return makeShardedMetadata(opCtx, + kOriginalNss, + kOriginalShardKey, + kOriginalShardKeyPattern, + kExistingUUID, + kExistingTimestamp, + kOriginalEpoch, + shardThatChunkExistsOn); + } + + CollectionMetadata makeShardedMetadataForTemporaryReshardingCollection( + OperationContext* opCtx, const ShardId& shardThatChunkExistsOn) { + return makeShardedMetadata(opCtx, + kTemporaryReshardingNss, + kReshardingKey, + kReshardingKeyPattern, + kReshardingUUID, + kReshardingTimestamp, + kReshardingEpoch, + shardThatChunkExistsOn); + } + + CollectionMetadata makeShardedMetadata(OperationContext* opCtx, + const NamespaceString& nss, + const std::string& shardKey, + const BSONObj& shardKeyPattern, + const UUID& uuid, + const Timestamp& timestamp, + const OID& epoch, + const ShardId& shardThatChunkExistsOn) { + auto range = ChunkRange(BSON(shardKey << MINKEY), BSON(shardKey << MAXKEY)); + auto chunk = ChunkType( + uuid, std::move(range), ChunkVersion(1, 0, epoch, timestamp), shardThatChunkExistsOn); + ChunkManager cm(kThisShard.getShardId(), + DatabaseVersion(uuid, timestamp), + makeStandaloneRoutingTableHistory( + RoutingTableHistory::makeNew(nss, + uuid, + shardKeyPattern, + nullptr, + false, + epoch, + timestamp, + boost::none /* timeseriesFields */, + boost::none, + boost::none /* chunkSizeBytes */, + true, + {std::move(chunk)})), + boost::none); + + if (!OperationShardingState::isOperationVersioned(opCtx)) { + OperationShardingState::setShardRole( + opCtx, nss, cm.getVersion(kThisShard.getShardId()), boost::none); + } + + return CollectionMetadata(std::move(cm), kThisShard.getShardId()); + } + + ReshardingDonorDocument makeDonorStateDoc() { + DonorShardContext donorCtx; + donorCtx.setState(DonorStateEnum::kPreparingToDonate); + + ReshardingDonorDocument doc(std::move(donorCtx), + {kThisShard.getShardId(), kOtherShard.getShardId()}); + + NamespaceString sourceNss = kOriginalNss; + auto sourceUUID = UUID::gen(); + auto commonMetadata = CommonReshardingMetadata( + UUID::gen(), sourceNss, sourceUUID, kTemporaryReshardingNss, kReshardingKeyPattern); + + doc.setCommonReshardingMetadata(std::move(commonMetadata)); + return doc; + } + + ReshardingRecipientDocument makeRecipientStateDoc() { + RecipientShardContext recipCtx; + recipCtx.setState(RecipientStateEnum::kCloning); + + ReshardingRecipientDocument doc( + std::move(recipCtx), {kThisShard.getShardId(), kOtherShard.getShardId()}, 1000); + + NamespaceString sourceNss = kOriginalNss; + auto sourceUUID = UUID::gen(); + auto commonMetadata = CommonReshardingMetadata( + UUID::gen(), sourceNss, sourceUUID, kTemporaryReshardingNss, kReshardingKeyPattern); + + doc.setCommonReshardingMetadata(std::move(commonMetadata)); + + // A document in the cloning state requires a clone timestamp. + doc.setCloneTimestamp(kCloneTimestamp); + return doc; + } + + ReshardingFields createCommonReshardingFields(const UUID& reshardingUUID, + CoordinatorStateEnum state) { + auto fields = ReshardingFields(reshardingUUID); + fields.setState(state); + return fields; + }; + + void appendDonorFieldsToReshardingFields(ReshardingFields& fields, + const BSONObj& reshardingKey) { + std::vector<ShardId> donorShardIds; + for (const auto& shard : kShards) { + donorShardIds.emplace_back(shard.getShardId()); + } + + fields.setDonorFields( + TypeCollectionDonorFields(kTemporaryReshardingNss, reshardingKey, donorShardIds)); + } + + void appendRecipientFieldsToReshardingFields( + ReshardingFields& fields, + const std::vector<DonorShardFetchTimestamp> donorShards, + const UUID& existingUUID, + const NamespaceString& originalNss, + const boost::optional<Timestamp>& cloneTimestamp = boost::none) { + auto recipientFields = + TypeCollectionRecipientFields(donorShards, existingUUID, originalNss, 5000); + emplaceCloneTimestampIfExists(recipientFields, cloneTimestamp); + fields.setRecipientFields(std::move(recipientFields)); + } + + template <class ReshardingDocument> + void assertCommonDocFieldsMatchReshardingFields(const NamespaceString& nss, + const UUID& reshardingUUID, + const UUID& existingUUID, + const BSONObj& reshardingKey, + const ReshardingDocument& reshardingDoc) { + ASSERT_EQ(reshardingDoc.getReshardingUUID(), reshardingUUID); + ASSERT_EQ(reshardingDoc.getSourceNss(), nss); + ASSERT_EQ(reshardingDoc.getSourceUUID(), existingUUID); + ASSERT_BSONOBJ_EQ(reshardingDoc.getReshardingKey().toBSON(), reshardingKey); + } + + void assertDonorDocMatchesReshardingFields(const NamespaceString& nss, + const UUID& existingUUID, + const ReshardingFields& reshardingFields, + const ReshardingDonorDocument& donorDoc) { + assertCommonDocFieldsMatchReshardingFields<ReshardingDonorDocument>( + nss, + reshardingFields.getReshardingUUID(), + existingUUID, + reshardingFields.getDonorFields()->getReshardingKey().toBSON(), + donorDoc); + ASSERT(donorDoc.getMutableState().getState() == DonorStateEnum::kPreparingToDonate); + ASSERT(donorDoc.getMutableState().getMinFetchTimestamp() == boost::none); + } + + void assertRecipientDocMatchesReshardingFields( + const CollectionMetadata& metadata, + const ReshardingFields& reshardingFields, + const ReshardingRecipientDocument& recipientDoc) { + assertCommonDocFieldsMatchReshardingFields<ReshardingRecipientDocument>( + reshardingFields.getRecipientFields()->getSourceNss(), + reshardingFields.getReshardingUUID(), + reshardingFields.getRecipientFields()->getSourceUUID(), + metadata.getShardKeyPattern().toBSON(), + recipientDoc); + + ASSERT(recipientDoc.getMutableState().getState() == + RecipientStateEnum::kAwaitingFetchTimestamp); + ASSERT(!recipientDoc.getCloneTimestamp()); + + const auto donorShards = reshardingFields.getRecipientFields()->getDonorShards(); + std::map<ShardId, DonorShardFetchTimestamp> donorShardMap; + for (const auto& donor : donorShards) { + donorShardMap.emplace(donor.getShardId(), donor); + } + + for (const auto& donorShardFromRecipientDoc : recipientDoc.getDonorShards()) { + auto donorIter = donorShardMap.find(donorShardFromRecipientDoc.getShardId()); + ASSERT(donorIter != donorShardMap.end()); + ASSERT_EQ(donorIter->second.getMinFetchTimestamp().has_value(), + donorShardFromRecipientDoc.getMinFetchTimestamp().has_value()); + + if (donorIter->second.getMinFetchTimestamp()) { + ASSERT_EQ(*donorIter->second.getMinFetchTimestamp(), + *donorShardFromRecipientDoc.getMinFetchTimestamp()); + } + + donorShardMap.erase(donorShardFromRecipientDoc.getShardId()); + } + + ASSERT(donorShardMap.empty()); + } + +private: + DonorShardFetchTimestamp makeDonorShardFetchTimestamp( + ShardId shardId, boost::optional<Timestamp> fetchTimestamp) { + DonorShardFetchTimestamp donorFetchTimestamp(shardId); + donorFetchTimestamp.setMinFetchTimestamp(fetchTimestamp); + return donorFetchTimestamp; + } +}; + +/** + * This fixture starts with the above internals test and also creates (notably) the resharding donor + * and recipient POSs. + */ +class ReshardingDonorRecipientCommonTest : public ReshardingDonorRecipientCommonInternalsTest { +public: + void setUp() override { + ShardServerTestFixture::setUp(); + + WaitForMajorityService::get(getServiceContext()).startup(getServiceContext()); + + _primaryOnlyServiceRegistry = repl::PrimaryOnlyServiceRegistry::get(getServiceContext()); + + std::unique_ptr<ReshardingDonorService> donorService = + std::make_unique<ReshardingDonorService>(getServiceContext()); + _primaryOnlyServiceRegistry->registerService(std::move(donorService)); + + std::unique_ptr<ReshardingRecipientService> recipientService = + std::make_unique<ReshardingRecipientService>(getServiceContext()); + _primaryOnlyServiceRegistry->registerService(std::move(recipientService)); + _primaryOnlyServiceRegistry->onStartup(operationContext()); + + stepUp(); + } + + void tearDown() override { + WaitForMajorityService::get(getServiceContext()).shutDown(); + + Grid::get(operationContext())->getExecutorPool()->shutdownAndJoin(); + + _primaryOnlyServiceRegistry->onShutdown(); + + Grid::get(operationContext())->clearForUnitTests(); + + ShardServerTestFixture::tearDown(); + } + + void stepUp() { + auto replCoord = repl::ReplicationCoordinator::get(getServiceContext()); + + // Advance term + _term++; + + ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY)); + ASSERT_OK(replCoord->updateTerm(operationContext(), _term)); + replCoord->setMyLastAppliedOpTimeAndWallTime( + repl::OpTimeAndWallTime(repl::OpTime(Timestamp(1, 1), _term), Date_t())); + + _primaryOnlyServiceRegistry->onStepUpComplete(operationContext(), _term); + } + +protected: + repl::PrimaryOnlyServiceRegistry* _primaryOnlyServiceRegistry; + long long _term = 0; +}; + TEST_F(ReshardingDonorRecipientCommonInternalsTest, ConstructDonorDocumentFromReshardingFields) { OperationContext* opCtx = operationContext(); auto metadata = makeShardedMetadataForOriginalCollection(opCtx, kThisShard.getShardId()); diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h deleted file mode 100644 index ff5d2ffa67b..00000000000 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h +++ /dev/null @@ -1,334 +0,0 @@ -/** - * Copyright (C) 2020-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/db/catalog_raii.h" -#include "mongo/db/db_raii.h" -#include "mongo/db/repl/wait_for_majority_service.h" -#include "mongo/db/s/collection_sharding_runtime.h" -#include "mongo/db/s/operation_sharding_state.h" -#include "mongo/db/s/resharding/resharding_donor_recipient_common.h" -#include "mongo/db/s/shard_server_test_fixture.h" -#include "mongo/unittest/death_test.h" -#include "mongo/util/fail_point.h" - -namespace mongo { -namespace { - -using namespace fmt::literals; - -/** - * This test fixture does not create any resharding POSs and should be preferred to - * `ReshardingDonorRecipientCommonTest` when they are not required. - */ -class ReshardingDonorRecipientCommonInternalsTest : public ShardServerTestFixture { -public: - const UUID kExistingUUID = UUID::gen(); - const Timestamp kExistingTimestamp = Timestamp(10, 5); - const NamespaceString kOriginalNss = NamespaceString("db", "foo"); - - const NamespaceString kTemporaryReshardingNss = - constructTemporaryReshardingNss("db", kExistingUUID); - const std::string kOriginalShardKey = "oldKey"; - const BSONObj kOriginalShardKeyPattern = BSON(kOriginalShardKey << 1); - const std::string kReshardingKey = "newKey"; - const BSONObj kReshardingKeyPattern = BSON(kReshardingKey << 1); - const OID kOriginalEpoch = OID::gen(); - const OID kReshardingEpoch = OID::gen(); - const UUID kReshardingUUID = UUID::gen(); - const Timestamp kReshardingTimestamp = Timestamp(kExistingTimestamp.getSecs() + 1, 0); - - const DonorShardFetchTimestamp kThisShard = - makeDonorShardFetchTimestamp(ShardId("shardOne"), Timestamp(10, 0)); - const DonorShardFetchTimestamp kOtherShard = - makeDonorShardFetchTimestamp(ShardId("shardTwo"), Timestamp(20, 0)); - - const std::vector<DonorShardFetchTimestamp> kShards = {kThisShard, kOtherShard}; - - const Timestamp kCloneTimestamp = Timestamp(20, 0); - -protected: - CollectionMetadata makeShardedMetadataForOriginalCollection( - OperationContext* opCtx, const ShardId& shardThatChunkExistsOn) { - return makeShardedMetadata(opCtx, - kOriginalNss, - kOriginalShardKey, - kOriginalShardKeyPattern, - kExistingUUID, - kExistingTimestamp, - kOriginalEpoch, - shardThatChunkExistsOn); - } - - CollectionMetadata makeShardedMetadataForTemporaryReshardingCollection( - OperationContext* opCtx, const ShardId& shardThatChunkExistsOn) { - return makeShardedMetadata(opCtx, - kTemporaryReshardingNss, - kReshardingKey, - kReshardingKeyPattern, - kReshardingUUID, - kReshardingTimestamp, - kReshardingEpoch, - shardThatChunkExistsOn); - } - - CollectionMetadata makeShardedMetadata(OperationContext* opCtx, - const NamespaceString& nss, - const std::string& shardKey, - const BSONObj& shardKeyPattern, - const UUID& uuid, - const Timestamp& timestamp, - const OID& epoch, - const ShardId& shardThatChunkExistsOn) { - auto range = ChunkRange(BSON(shardKey << MINKEY), BSON(shardKey << MAXKEY)); - auto chunk = ChunkType( - uuid, std::move(range), ChunkVersion(1, 0, epoch, timestamp), shardThatChunkExistsOn); - ChunkManager cm(kThisShard.getShardId(), - DatabaseVersion(uuid, timestamp), - makeStandaloneRoutingTableHistory( - RoutingTableHistory::makeNew(nss, - uuid, - shardKeyPattern, - nullptr, - false, - epoch, - timestamp, - boost::none /* timeseriesFields */, - boost::none, - boost::none /* chunkSizeBytes */, - true, - {std::move(chunk)})), - boost::none); - - if (!OperationShardingState::isOperationVersioned(opCtx)) { - OperationShardingState::get(opCtx).initializeClientRoutingVersions( - nss, cm.getVersion(kThisShard.getShardId()), boost::none); - } - - return CollectionMetadata(std::move(cm), kThisShard.getShardId()); - } - - ReshardingDonorDocument makeDonorStateDoc() { - DonorShardContext donorCtx; - donorCtx.setState(DonorStateEnum::kPreparingToDonate); - - ReshardingDonorDocument doc(std::move(donorCtx), - {kThisShard.getShardId(), kOtherShard.getShardId()}); - - NamespaceString sourceNss = kOriginalNss; - auto sourceUUID = UUID::gen(); - auto commonMetadata = CommonReshardingMetadata( - UUID::gen(), sourceNss, sourceUUID, kTemporaryReshardingNss, kReshardingKeyPattern); - - doc.setCommonReshardingMetadata(std::move(commonMetadata)); - return doc; - } - - ReshardingRecipientDocument makeRecipientStateDoc() { - RecipientShardContext recipCtx; - recipCtx.setState(RecipientStateEnum::kCloning); - - ReshardingRecipientDocument doc( - std::move(recipCtx), {kThisShard.getShardId(), kOtherShard.getShardId()}, 1000); - - NamespaceString sourceNss = kOriginalNss; - auto sourceUUID = UUID::gen(); - auto commonMetadata = CommonReshardingMetadata( - UUID::gen(), sourceNss, sourceUUID, kTemporaryReshardingNss, kReshardingKeyPattern); - - doc.setCommonReshardingMetadata(std::move(commonMetadata)); - - // A document in the cloning state requires a clone timestamp. - doc.setCloneTimestamp(kCloneTimestamp); - return doc; - } - - ReshardingFields createCommonReshardingFields(const UUID& reshardingUUID, - CoordinatorStateEnum state) { - auto fields = ReshardingFields(reshardingUUID); - fields.setState(state); - return fields; - }; - - void appendDonorFieldsToReshardingFields(ReshardingFields& fields, - const BSONObj& reshardingKey) { - std::vector<ShardId> donorShardIds; - for (const auto& shard : kShards) { - donorShardIds.emplace_back(shard.getShardId()); - } - - fields.setDonorFields( - TypeCollectionDonorFields(kTemporaryReshardingNss, reshardingKey, donorShardIds)); - } - - void appendRecipientFieldsToReshardingFields( - ReshardingFields& fields, - const std::vector<DonorShardFetchTimestamp> donorShards, - const UUID& existingUUID, - const NamespaceString& originalNss, - const boost::optional<Timestamp>& cloneTimestamp = boost::none) { - auto recipientFields = - TypeCollectionRecipientFields(donorShards, existingUUID, originalNss, 5000); - emplaceCloneTimestampIfExists(recipientFields, cloneTimestamp); - fields.setRecipientFields(std::move(recipientFields)); - } - - template <class ReshardingDocument> - void assertCommonDocFieldsMatchReshardingFields(const NamespaceString& nss, - const UUID& reshardingUUID, - const UUID& existingUUID, - const BSONObj& reshardingKey, - const ReshardingDocument& reshardingDoc) { - ASSERT_EQ(reshardingDoc.getReshardingUUID(), reshardingUUID); - ASSERT_EQ(reshardingDoc.getSourceNss(), nss); - ASSERT_EQ(reshardingDoc.getSourceUUID(), existingUUID); - ASSERT_BSONOBJ_EQ(reshardingDoc.getReshardingKey().toBSON(), reshardingKey); - } - - void assertDonorDocMatchesReshardingFields(const NamespaceString& nss, - const UUID& existingUUID, - const ReshardingFields& reshardingFields, - const ReshardingDonorDocument& donorDoc) { - assertCommonDocFieldsMatchReshardingFields<ReshardingDonorDocument>( - nss, - reshardingFields.getReshardingUUID(), - existingUUID, - reshardingFields.getDonorFields()->getReshardingKey().toBSON(), - donorDoc); - ASSERT(donorDoc.getMutableState().getState() == DonorStateEnum::kPreparingToDonate); - ASSERT(donorDoc.getMutableState().getMinFetchTimestamp() == boost::none); - } - - void assertRecipientDocMatchesReshardingFields( - const CollectionMetadata& metadata, - const ReshardingFields& reshardingFields, - const ReshardingRecipientDocument& recipientDoc) { - assertCommonDocFieldsMatchReshardingFields<ReshardingRecipientDocument>( - reshardingFields.getRecipientFields()->getSourceNss(), - reshardingFields.getReshardingUUID(), - reshardingFields.getRecipientFields()->getSourceUUID(), - metadata.getShardKeyPattern().toBSON(), - recipientDoc); - - ASSERT(recipientDoc.getMutableState().getState() == - RecipientStateEnum::kAwaitingFetchTimestamp); - ASSERT(!recipientDoc.getCloneTimestamp()); - - const auto donorShards = reshardingFields.getRecipientFields()->getDonorShards(); - std::map<ShardId, DonorShardFetchTimestamp> donorShardMap; - for (const auto& donor : donorShards) { - donorShardMap.emplace(donor.getShardId(), donor); - } - - for (const auto& donorShardFromRecipientDoc : recipientDoc.getDonorShards()) { - auto donorIter = donorShardMap.find(donorShardFromRecipientDoc.getShardId()); - ASSERT(donorIter != donorShardMap.end()); - ASSERT_EQ(donorIter->second.getMinFetchTimestamp().has_value(), - donorShardFromRecipientDoc.getMinFetchTimestamp().has_value()); - - if (donorIter->second.getMinFetchTimestamp()) { - ASSERT_EQ(*donorIter->second.getMinFetchTimestamp(), - *donorShardFromRecipientDoc.getMinFetchTimestamp()); - } - - donorShardMap.erase(donorShardFromRecipientDoc.getShardId()); - } - - ASSERT(donorShardMap.empty()); - } - -private: - DonorShardFetchTimestamp makeDonorShardFetchTimestamp( - ShardId shardId, boost::optional<Timestamp> fetchTimestamp) { - DonorShardFetchTimestamp donorFetchTimestamp(shardId); - donorFetchTimestamp.setMinFetchTimestamp(fetchTimestamp); - return donorFetchTimestamp; - } -}; - -/** - * This fixture starts with the above internals test and also creates (notably) the resharding donor - * and recipient POSs. - */ -class ReshardingDonorRecipientCommonTest : public ReshardingDonorRecipientCommonInternalsTest { -public: - void setUp() override { - ShardServerTestFixture::setUp(); - - WaitForMajorityService::get(getServiceContext()).startup(getServiceContext()); - - _primaryOnlyServiceRegistry = repl::PrimaryOnlyServiceRegistry::get(getServiceContext()); - - std::unique_ptr<ReshardingDonorService> donorService = - std::make_unique<ReshardingDonorService>(getServiceContext()); - _primaryOnlyServiceRegistry->registerService(std::move(donorService)); - - std::unique_ptr<ReshardingRecipientService> recipientService = - std::make_unique<ReshardingRecipientService>(getServiceContext()); - _primaryOnlyServiceRegistry->registerService(std::move(recipientService)); - _primaryOnlyServiceRegistry->onStartup(operationContext()); - - stepUp(); - } - - void tearDown() override { - WaitForMajorityService::get(getServiceContext()).shutDown(); - - Grid::get(operationContext())->getExecutorPool()->shutdownAndJoin(); - - _primaryOnlyServiceRegistry->onShutdown(); - - Grid::get(operationContext())->clearForUnitTests(); - - ShardServerTestFixture::tearDown(); - } - - void stepUp() { - auto replCoord = repl::ReplicationCoordinator::get(getServiceContext()); - - // Advance term - _term++; - - ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY)); - ASSERT_OK(replCoord->updateTerm(operationContext(), _term)); - replCoord->setMyLastAppliedOpTimeAndWallTime( - repl::OpTimeAndWallTime(repl::OpTime(Timestamp(1, 1), _term), Date_t())); - - _primaryOnlyServiceRegistry->onStepUpComplete(operationContext(), _term); - } - -protected: - repl::PrimaryOnlyServiceRegistry* _primaryOnlyServiceRegistry; - long long _term = 0; -}; - -} // namespace - -} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp index 7e2aacc030e..0842f9aa88c 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp @@ -81,8 +81,7 @@ void runWithTransaction(OperationContext* opCtx, // the temporary resharding collection. We attach shard version IGNORED to the write operations // and leave it to ReshardingOplogBatchApplier::applyBatch() to retry on a StaleConfig exception // to allow the collection metadata information to be recovered. - auto& oss = OperationShardingState::get(asr.opCtx()); - oss.initializeClientRoutingVersions(nss, ChunkVersion::IGNORED(), boost::none); + ScopedSetShardRole scopedSetShardRole(asr.opCtx(), nss, ChunkVersion::IGNORED(), boost::none); MongoDOperationContextSession ocs(asr.opCtx()); auto txnParticipant = TransactionParticipant::get(asr.opCtx()); diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp index ee732a68207..e675dca01a8 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp @@ -85,11 +85,11 @@ SemiFuture<void> ReshardingOplogBatchApplier::applyBatch( // attach shard version IGNORED to the write operations and retry once // on a StaleConfig exception to allow the collection metadata // information to be recovered. - auto& oss = OperationShardingState::get(opCtx.get()); - oss.initializeClientRoutingVersions( + ScopedSetShardRole scopedSetShardRole( + opCtx.get(), _crudApplication.getOutputNss(), ChunkVersion::IGNORED() /* shardVersion */, - boost::none /* dbVersion */); + boost::none /* databaseVersion */); resharding::data_copy::withOneStaleConfigRetry(opCtx.get(), [&] { uassertStatusOK( diff --git a/src/mongo/db/s/sharding_ddl_coordinator.cpp b/src/mongo/db/s/sharding_ddl_coordinator.cpp index ffc0e4c7741..3c05fbc25c0 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.cpp +++ b/src/mongo/db/s/sharding_ddl_coordinator.cpp @@ -196,8 +196,12 @@ SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTas auto* opCtx = opCtxHolder.get(); invariant(metadata().getDatabaseVersion()); - OperationShardingState::get(opCtx).initializeClientRoutingVersions( - nss(), boost::none /* ChunkVersion */, metadata().getDatabaseVersion()); + ScopedSetShardRole scopedSetShardRole( + opCtx, + nss(), + boost::none /* shardVersion */, + metadata().getDatabaseVersion() /* databaseVersion */); + // Check under the dbLock if this is still the primary shard for the database DatabaseShardingState::checkIsPrimaryShardForDb(opCtx, nss().db()); }; diff --git a/src/mongo/db/s/sharding_write_router_bm.cpp b/src/mongo/db/s/sharding_write_router_bm.cpp index d7da519f6d5..1118117e90d 100644 --- a/src/mongo/db/s/sharding_write_router_bm.cpp +++ b/src/mongo/db/s/sharding_write_router_bm.cpp @@ -147,8 +147,8 @@ std::unique_ptr<CatalogCacheMock> createCatalogCacheMock(OperationContext* opCtx opCtx->getServiceContext(), std::make_unique<CollectionShardingStateFactoryShard>(opCtx->getServiceContext())); - OperationShardingState::get(opCtx).initializeClientRoutingVersions( - kNss, chunkManager.getVersion(originatorShard), boost::none); + OperationShardingState::setShardRole( + opCtx, kNss, chunkManager.getVersion(originatorShard), boost::none); // Configuring the filtering metadata such that calls to getCollectionDescription return what we // want. Specifically the reshardingFields are what we use. Its specified by the chunkManager. diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 84d99a67405..2cfb3f2286e 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -1575,8 +1575,8 @@ void ExecCommandDatabase::_initiateCommand() { databaseVersion = DatabaseVersion(databaseVersionElem.Obj()); } - OperationShardingState::get(opCtx).initializeClientRoutingVersions( - namespaceForSharding, shardVersion, databaseVersion); + OperationShardingState::setShardRole( + opCtx, namespaceForSharding, shardVersion, databaseVersion); } _scoped = _execContext->behaviors->scopedOperationCompletionShardingActions(opCtx); diff --git a/src/mongo/s/chunk_version.h b/src/mongo/s/chunk_version.h index f176e4f0cea..5aca379983e 100644 --- a/src/mongo/s/chunk_version.h +++ b/src/mongo/s/chunk_version.h @@ -269,8 +269,11 @@ private: }; inline std::ostream& operator<<(std::ostream& s, const ChunkVersion& v) { - s << v.toString(); - return s; + return s << v.toString(); +} + +inline StringBuilder& operator<<(StringBuilder& s, const ChunkVersion& v) { + return s << v.toString(); } } // namespace mongo diff --git a/src/mongo/s/database_version.cpp b/src/mongo/s/database_version.cpp index 37ca371e001..df160cd1eda 100644 --- a/src/mongo/s/database_version.cpp +++ b/src/mongo/s/database_version.cpp @@ -52,4 +52,8 @@ bool DatabaseVersion::operator<(const DatabaseVersion& other) const { } } +std::string DatabaseVersion::toString() const { + return BSON("dbVersion" << toBSON()).toString(); +} + } // namespace mongo diff --git a/src/mongo/s/database_version.h b/src/mongo/s/database_version.h index a798f76aa28..5de01497318 100644 --- a/src/mongo/s/database_version.h +++ b/src/mongo/s/database_version.h @@ -107,6 +107,16 @@ public: UUID getUuid() const { return *DatabaseVersionBase::getUuid(); } + + std::string toString() const; }; +inline std::ostream& operator<<(std::ostream& s, const DatabaseVersion& v) { + return s << v.toString(); +} + +inline StringBuilder& operator<<(StringBuilder& s, const DatabaseVersion& v) { + return s << v.toString(); +} + } // namespace mongo |