summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2022-03-09 07:57:19 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-09 08:45:38 +0000
commitfb608f9bc8fe237694152b42f568ec97a12b4081 (patch)
treef68362d2a0104cf40e1236a206cac27fc9da4164 /src
parentff90764302229844a8431c68dcaf3f40e4db9a19 (diff)
downloadmongo-fb608f9bc8fe237694152b42f568ec97a12b4081.tar.gz
SERVER-64057 Implement a scoped object for setting the expected shard/database versions
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp6
-rw-r--r--src/mongo/db/index_builds_coordinator_mongod.cpp11
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp6
-rw-r--r--src/mongo/db/s/SConscript9
-rw-r--r--src/mongo/db/s/collection_metadata_filtering_test.cpp4
-rw-r--r--src/mongo/db/s/collection_sharding_runtime_test.cpp6
-rw-r--r--src/mongo/db/s/op_observer_sharding_test.cpp3
-rw-r--r--src/mongo/db/s/operation_sharding_state.cpp93
-rw-r--r--src/mongo/db/s/operation_sharding_state.h62
-rw-r--r--src/mongo/db/s/operation_sharding_state_test.cpp88
-rw-r--r--src/mongo/db/s/range_deletion_util_test.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner.cpp7
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_test.cpp (renamed from src/mongo/db/s/resharding_collection_test.cpp)0
-rw-r--r--src/mongo/db/s/resharding/resharding_destined_recipient_test.cpp (renamed from src/mongo/db/s/resharding_destined_recipient_test.cpp)36
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp289
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h334
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_application.cpp3
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp6
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator.cpp8
-rw-r--r--src/mongo/db/s/sharding_write_router_bm.cpp4
-rw-r--r--src/mongo/db/service_entry_point_common.cpp4
-rw-r--r--src/mongo/s/chunk_version.h7
-rw-r--r--src/mongo/s/database_version.cpp4
-rw-r--r--src/mongo/s/database_version.h10
24 files changed, 558 insertions, 446 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index b59bb1ae4e6..75368eb49bb 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -841,8 +841,10 @@ Status runAggregate(OperationContext* opCtx,
// Set this operation's shard version for the underlying collection to unsharded.
// This is prerequisite for future shard versioning checks.
- OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- resolvedView.getNamespace(), ChunkVersion::UNSHARDED(), boost::none);
+ ScopedSetShardRole scopedSetShardRole(opCtx,
+ resolvedView.getNamespace(),
+ ChunkVersion::UNSHARDED() /* shardVersion */,
+ boost::none /* databaseVersion */);
bool collectionIsSharded = [opCtx, &resolvedView]() {
AutoGetCollection autoColl(opCtx,
diff --git a/src/mongo/db/index_builds_coordinator_mongod.cpp b/src/mongo/db/index_builds_coordinator_mongod.cpp
index be01c71eb53..4232817b409 100644
--- a/src/mongo/db/index_builds_coordinator_mongod.cpp
+++ b/src/mongo/db/index_builds_coordinator_mongod.cpp
@@ -279,8 +279,6 @@ IndexBuildsCoordinatorMongod::_startIndexBuild(OperationContext* opCtx,
const auto nss = CollectionCatalog::get(opCtx)->resolveNamespaceStringOrUUID(opCtx, nssOrUuid);
auto& oss = OperationShardingState::get(opCtx);
- const auto shardVersion = oss.getShardVersion(nss);
- const auto dbVersion = oss.getDbVersion(dbName);
// Task in thread pool should have similar CurOp representation to the caller so that it can be
// identified as a createIndexes operation.
@@ -323,8 +321,8 @@ IndexBuildsCoordinatorMongod::_startIndexBuild(OperationContext* opCtx,
replState,
startPromise = std::move(startPromise),
startTimestamp,
- shardVersion,
- dbVersion,
+ shardVersion = oss.getShardVersion(nss),
+ dbVersion = oss.getDbVersion(dbName),
resumeInfo,
impersonatedClientAttrs = std::move(impersonatedClientAttrs)
](auto status) mutable noexcept {
@@ -344,14 +342,13 @@ IndexBuildsCoordinatorMongod::_startIndexBuild(OperationContext* opCtx,
auto opCtx = Client::getCurrent()->makeOperationContext();
// Load the external client's attributes into this thread's client for auditing.
- auto authSession = AuthorizationSession::get(Client::getCurrent());
+ auto authSession = AuthorizationSession::get(opCtx->getClient());
if (authSession) {
authSession->setImpersonatedUserData(std::move(impersonatedClientAttrs.userNames),
std::move(impersonatedClientAttrs.roleNames));
}
- auto& oss = OperationShardingState::get(opCtx.get());
- oss.initializeClientRoutingVersions(nss, shardVersion, dbVersion);
+ ScopedSetShardRole scopedSetShardRole(opCtx.get(), nss, shardVersion, dbVersion);
{
stdx::unique_lock<Client> lk(*opCtx->getClient());
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
index 68801556252..1e68272a9f9 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
@@ -397,8 +397,7 @@ bool ShardServerProcessInterface::setExpectedDbVersion(OperationContext* opCtx,
"Expected db version must match known db version",
knownDBVersion == dbVersion);
} else {
- OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- nss, boost::none, dbVersion);
+ OperationShardingState::setShardRole(opCtx, nss, boost::none, dbVersion);
}
return false;
@@ -412,8 +411,7 @@ void ShardServerProcessInterface::setExpectedShardVersion(
if (oss.hasShardVersion(nss)) {
invariant(oss.getShardVersion(nss) == chunkVersion);
} else {
- OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- nss, chunkVersion, boost::none);
+ OperationShardingState::setShardRole(opCtx, nss, chunkVersion, boost::none);
}
}
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index eb407909514..531dbc57111 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -497,15 +497,16 @@ env.CppUnitTest(
'op_observer_sharding_test.cpp',
'persistent_task_queue_test.cpp',
'range_deletion_util_test.cpp',
- 'resharding_collection_test.cpp',
- 'resharding_destined_recipient_test.cpp',
'resharding/resharding_agg_test.cpp',
'resharding/resharding_collection_cloner_test.cpp',
+ 'resharding/resharding_collection_test.cpp',
'resharding/resharding_data_replication_test.cpp',
+ 'resharding/resharding_destined_recipient_test.cpp',
'resharding/resharding_donor_oplog_iterator_test.cpp',
'resharding/resharding_donor_recipient_common_test.cpp',
- 'resharding/resharding_metrics_test.cpp',
+ 'resharding/resharding_donor_service_test.cpp',
'resharding/resharding_metrics_new_test.cpp',
+ 'resharding/resharding_metrics_test.cpp',
'resharding/resharding_oplog_applier_test.cpp',
'resharding/resharding_oplog_batch_applier_test.cpp',
'resharding/resharding_oplog_batch_preparer_test.cpp',
@@ -514,7 +515,6 @@ env.CppUnitTest(
'resharding/resharding_oplog_session_application_test.cpp',
'resharding/resharding_recipient_service_external_state_test.cpp',
'resharding/resharding_recipient_service_test.cpp',
- 'resharding/resharding_donor_service_test.cpp',
'resharding/resharding_txn_cloner_test.cpp',
'session_catalog_migration_destination_test.cpp',
'session_catalog_migration_source_test.cpp',
@@ -536,6 +536,7 @@ env.CppUnitTest(
'transaction_coordinator_test.cpp',
'type_shard_collection_test.cpp',
'type_shard_identity_test.cpp',
+ 'operation_sharding_state_test.cpp',
'vector_clock_shard_server_test.cpp',
],
LIBDEPS=[
diff --git a/src/mongo/db/s/collection_metadata_filtering_test.cpp b/src/mongo/db/s/collection_metadata_filtering_test.cpp
index 2190a56ce8f..f7c994c706a 100644
--- a/src/mongo/db/s/collection_metadata_filtering_test.cpp
+++ b/src/mongo/db/s/collection_metadata_filtering_test.cpp
@@ -125,8 +125,8 @@ protected:
_manager = std::make_shared<MetadataManager>(
getServiceContext(), kNss, executor(), CollectionMetadata(cm, ShardId("0")));
- OperationShardingState::get(operationContext())
- .initializeClientRoutingVersions(kNss, cm.getVersion(ShardId("0")), boost::none);
+ OperationShardingState::setShardRole(
+ operationContext(), kNss, cm.getVersion(ShardId("0")), boost::none);
}
std::shared_ptr<MetadataManager> _manager;
diff --git a/src/mongo/db/s/collection_sharding_runtime_test.cpp b/src/mongo/db/s/collection_sharding_runtime_test.cpp
index 91a7bb31157..540e7f5363f 100644
--- a/src/mongo/db/s/collection_sharding_runtime_test.cpp
+++ b/src/mongo/db/s/collection_sharding_runtime_test.cpp
@@ -81,8 +81,8 @@ protected:
boost::none);
if (!OperationShardingState::isOperationVersioned(opCtx)) {
- OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- kTestNss, cm.getVersion(ShardId("0")), boost::none);
+ OperationShardingState::setShardRole(
+ opCtx, kTestNss, cm.getVersion(ShardId("0")), boost::none);
}
return CollectionMetadata(std::move(cm), ShardId("0"));
@@ -123,7 +123,6 @@ TEST_F(CollectionShardingRuntimeTest,
TEST_F(
CollectionShardingRuntimeTest,
GetCurrentMetadataIfKnownReturnsUnshardedAfterSetFilteringMetadataIsCalledWithUnshardedMetadata) {
-
CollectionShardingRuntime csr(getServiceContext(), kTestNss, executor());
csr.setFilteringMetadata(operationContext(), CollectionMetadata());
const auto optCurrMetadata = csr.getCurrentMetadataIfKnown();
@@ -135,7 +134,6 @@ TEST_F(
TEST_F(
CollectionShardingRuntimeTest,
GetCurrentMetadataIfKnownReturnsShardedAfterSetFilteringMetadataIsCalledWithShardedMetadata) {
-
CollectionShardingRuntime csr(getServiceContext(), kTestNss, executor());
OperationContext* opCtx = operationContext();
auto metadata = makeShardedMetadata(opCtx);
diff --git a/src/mongo/db/s/op_observer_sharding_test.cpp b/src/mongo/db/s/op_observer_sharding_test.cpp
index 735b9e87212..8ea4dd63904 100644
--- a/src/mongo/db/s/op_observer_sharding_test.cpp
+++ b/src/mongo/db/s/op_observer_sharding_test.cpp
@@ -48,8 +48,7 @@ void setCollectionFilteringMetadata(OperationContext* opCtx, CollectionMetadata
CollectionShardingRuntime::get(opCtx, kTestNss)
->setFilteringMetadata(opCtx, std::move(metadata));
- OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- kTestNss, version, boost::none);
+ OperationShardingState::setShardRole(opCtx, kTestNss, version, boost::none);
}
class DocumentKeyStateTest : public ShardServerTestFixture {
diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp
index 9a782d5eb6f..74d71ef6594 100644
--- a/src/mongo/db/s/operation_sharding_state.cpp
+++ b/src/mongo/db/s/operation_sharding_state.cpp
@@ -27,10 +27,10 @@
* it in the license file.
*/
-#include "mongo/platform/basic.h"
-
#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/logv2/log_debug.h"
+
namespace mongo {
namespace {
@@ -60,26 +60,35 @@ bool OperationShardingState::isOperationVersioned(OperationContext* opCtx) {
return !oss._shardVersions.empty();
}
-void OperationShardingState::initializeClientRoutingVersions(
- NamespaceString nss,
- const boost::optional<ChunkVersion>& shardVersion,
- const boost::optional<DatabaseVersion>& dbVersion) {
+void OperationShardingState::setShardRole(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const boost::optional<ChunkVersion>& shardVersion,
+ const boost::optional<DatabaseVersion>& databaseVersion) {
+ auto& oss = OperationShardingState::get(opCtx);
+
if (shardVersion) {
- // Changing the shardVersion expected for a namespace is not safe to happen in the
- // middle of execution, but for the cases where operation is retried on the same
- // OperationContext it can be set twice to the same value.
- if (_shardVersionsChecked.contains(nss.ns())) {
- invariant(_shardVersions[nss.ns()] == *shardVersion,
- str::stream()
- << "Trying to set " << shardVersion->toString() << " for " << nss.ns()
- << " but it already has " << _shardVersions[nss.ns()].toString());
- } else {
- _shardVersions.emplace(nss.ns(), *shardVersion);
+ auto emplaceResult = oss._shardVersions.try_emplace(nss.ns(), *shardVersion);
+ auto& tracker = emplaceResult.first->second;
+ if (!emplaceResult.second) {
+ uassert(640570,
+ str::stream() << "Illegal attempt to change the expected shard version for "
+ << nss << " from " << tracker.v << " to " << *shardVersion,
+ tracker.v == *shardVersion);
}
+ invariant(++tracker.recursion > 0);
}
- if (dbVersion) {
- const auto [_, inserted] = _databaseVersions.emplace(nss.db(), *dbVersion);
- invariant(inserted);
+
+ if (databaseVersion) {
+ auto emplaceResult = oss._databaseVersions.try_emplace(nss.db(), *databaseVersion);
+ auto& tracker = emplaceResult.first->second;
+ if (!emplaceResult.second) {
+ uassert(640571,
+ str::stream() << "Illegal attempt to change the expected database version for "
+ << nss.db() << " from " << tracker.v << " to "
+ << *databaseVersion,
+ tracker.v == *databaseVersion);
+ }
+ invariant(++tracker.recursion > 0);
}
}
@@ -93,12 +102,10 @@ bool OperationShardingState::hasShardVersion(const NamespaceString& nss) const {
}
boost::optional<ChunkVersion> OperationShardingState::getShardVersion(const NamespaceString& nss) {
- _shardVersionsChecked.insert(nss.ns());
const auto it = _shardVersions.find(nss.ns());
if (it != _shardVersions.end()) {
- return it->second;
+ return it->second.v;
}
-
return boost::none;
}
@@ -106,13 +113,12 @@ bool OperationShardingState::hasDbVersion() const {
return !_databaseVersions.empty();
}
-boost::optional<DatabaseVersion> OperationShardingState::getDbVersion(
- const StringData dbName) const {
+boost::optional<DatabaseVersion> OperationShardingState::getDbVersion(StringData dbName) const {
const auto it = _databaseVersions.find(dbName);
- if (it == _databaseVersions.end()) {
- return boost::none;
+ if (it != _databaseVersions.end()) {
+ return it->second.v;
}
- return it->second;
+ return boost::none;
}
bool OperationShardingState::waitForMigrationCriticalSectionSignal(OperationContext* opCtx) {
@@ -196,4 +202,37 @@ ScopedAllowImplicitCollectionCreate_UNSAFE::~ScopedAllowImplicitCollectionCreate
oss._allowCollectionCreation = false;
}
+ScopedSetShardRole::ScopedSetShardRole(OperationContext* opCtx,
+ NamespaceString nss,
+ boost::optional<ChunkVersion> shardVersion,
+ boost::optional<DatabaseVersion> databaseVersion)
+ : _opCtx(opCtx),
+ _nss(std::move(nss)),
+ _shardVersion(std::move(shardVersion)),
+ _databaseVersion(std::move(databaseVersion)) {
+ OperationShardingState::setShardRole(_opCtx, _nss, _shardVersion, _databaseVersion);
+}
+
+ScopedSetShardRole::~ScopedSetShardRole() {
+ auto& oss = OperationShardingState::get(_opCtx);
+
+ if (_shardVersion) {
+ auto it = oss._shardVersions.find(_nss.ns());
+ invariant(it != oss._shardVersions.end());
+ auto& tracker = it->second;
+ invariant(--tracker.recursion >= 0);
+ if (tracker.recursion == 0)
+ oss._shardVersions.erase(it);
+ }
+
+ if (_databaseVersion) {
+ auto it = oss._databaseVersions.find(_nss.db());
+ invariant(it != oss._databaseVersions.end());
+ auto& tracker = it->second;
+ invariant(--tracker.recursion >= 0);
+ if (tracker.recursion == 0)
+ oss._databaseVersions.erase(it);
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h
index 2a39b630658..63c726fcc0b 100644
--- a/src/mongo/db/s/operation_sharding_state.h
+++ b/src/mongo/db/s/operation_sharding_state.h
@@ -41,6 +41,28 @@
namespace mongo {
/**
+ * Marks the opCtx during scope in which it has been instantiated as running in the shard role for
+ * the specified collection. This indicates to the underlying storage system that the caller has
+ * performed 'routing', in the sense that it is aware of what data is located on this node.
+ */
+class ScopedSetShardRole {
+public:
+ ScopedSetShardRole(OperationContext* opCtx,
+ NamespaceString nss,
+ boost::optional<ChunkVersion> shardVersion,
+ boost::optional<DatabaseVersion> databaseVersion);
+ ~ScopedSetShardRole();
+
+private:
+ OperationContext* const _opCtx;
+
+ NamespaceString _nss;
+
+ boost::optional<ChunkVersion> _shardVersion;
+ boost::optional<DatabaseVersion> _databaseVersion;
+};
+
+/**
* A decoration on OperationContext representing per-operation shard version metadata sent to mongod
* from mongos as a command parameter.
*
@@ -86,13 +108,13 @@ public:
};
/**
- * Stores the given shardVersion and databaseVersion for the given namespace. Note: The shard
- * version for the given namespace stored in the OperationShardingState can be overwritten if it
- * has not been checked yet.
+ * Same semantics as ScopedSetShardRole above, but the role remains set for the lifetime of the
+ * opCtx.
*/
- void initializeClientRoutingVersions(NamespaceString nss,
- const boost::optional<ChunkVersion>& shardVersion,
- const boost::optional<DatabaseVersion>& dbVersion);
+ static void setShardRole(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const boost::optional<ChunkVersion>& shardVersion,
+ const boost::optional<DatabaseVersion>& dbVersion);
/**
* Removes the databaseVersion stored for the given namespace.
@@ -173,19 +195,31 @@ public:
boost::optional<Status> resetShardingOperationFailedStatus();
private:
+ friend class ScopedSetShardRole;
friend class ShardServerOpObserver; // For access to _allowCollectionCreation below
// Specifies whether the request is allowed to create database/collection implicitly
bool _allowCollectionCreation{false};
- // The OperationShardingState class supports storing shardVersions for multiple namespaces (and
- // databaseVersions for multiple databases), even though client code has not been written yet to
- // *send* multiple shardVersions or databaseVersions.
- StringMap<ChunkVersion> _shardVersions;
- StringMap<DatabaseVersion> _databaseVersions;
-
- // Stores shards that have undergone a version check.
- StringSet _shardVersionsChecked;
+ // Stores the shard version expected for each collection that will be accessed
+ struct ShardVersionTracker {
+ ShardVersionTracker(ChunkVersion v) : v(v) {}
+ ShardVersionTracker(ShardVersionTracker&&) = default;
+ ShardVersionTracker(const ShardVersionTracker&) = delete;
+ ChunkVersion v;
+ int recursion{0};
+ };
+ StringMap<ShardVersionTracker> _shardVersions;
+
+ // Stores the database version expected for each database that will be accessed
+ struct DatabaseVersionTracker {
+ DatabaseVersionTracker(DatabaseVersion v) : v(v) {}
+ DatabaseVersionTracker(DatabaseVersionTracker&&) = default;
+ DatabaseVersionTracker(const DatabaseVersionTracker&) = delete;
+ DatabaseVersion v;
+ int recursion{0};
+ };
+ StringMap<DatabaseVersionTracker> _databaseVersions;
// This value will only be non-null if version check during the operation execution failed due
// to stale version and there was a migration for that namespace, which was in critical section.
diff --git a/src/mongo/db/s/operation_sharding_state_test.cpp b/src/mongo/db/s/operation_sharding_state_test.cpp
new file mode 100644
index 00000000000..0c4732b51ab
--- /dev/null
+++ b/src/mongo/db/s/operation_sharding_state_test.cpp
@@ -0,0 +1,88 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/s/shard_server_test_fixture.h"
+
+namespace mongo {
+namespace {
+
+const NamespaceString kNss("TestDB", "TestColl");
+const NamespaceString kAnotherNss("TestDB", "AnotherColl");
+
+using OperationShardingStateTest = ShardServerTestFixture;
+
+TEST_F(OperationShardingStateTest, ScopedSetShardRoleDbVersion) {
+ DatabaseVersion dbv{DatabaseVersion::makeFixed()};
+ ScopedSetShardRole scopedSetShardRole(operationContext(), kNss, boost::none, dbv);
+
+ auto& oss = OperationShardingState::get(operationContext());
+ ASSERT_EQ(dbv, *oss.getDbVersion(kNss.db()));
+}
+
+TEST_F(OperationShardingStateTest, ScopedSetShardRoleShardVersion) {
+ ChunkVersion shardVersion(1, 0, OID::gen(), Timestamp(1, 0));
+ ScopedSetShardRole scopedSetShardRole(operationContext(), kNss, shardVersion, boost::none);
+
+ auto& oss = OperationShardingState::get(operationContext());
+ ASSERT_EQ(shardVersion, *oss.getShardVersion(kNss));
+}
+
+TEST_F(OperationShardingStateTest, ScopedSetShardRoleChangeShardVersionSameNamespace) {
+ auto& oss = OperationShardingState::get(operationContext());
+
+ {
+ ChunkVersion shardVersion1(1, 0, OID::gen(), Timestamp(10, 0));
+ ScopedSetShardRole scopedSetShardRole1(
+ operationContext(), kNss, shardVersion1, boost::none);
+ ASSERT_EQ(shardVersion1, *oss.getShardVersion(kNss));
+ }
+ {
+ ChunkVersion shardVersion2(1, 0, OID::gen(), Timestamp(20, 0));
+ ScopedSetShardRole scopedSetShardRole2(
+ operationContext(), kNss, shardVersion2, boost::none);
+ ASSERT_EQ(shardVersion2, *oss.getShardVersion(kNss));
+ }
+}
+
+TEST_F(OperationShardingStateTest, ScopedSetShardRoleRecursiveShardVersionDifferentNamespaces) {
+ ChunkVersion shardVersion1(1, 0, OID::gen(), Timestamp(10, 0));
+ ChunkVersion shardVersion2(1, 0, OID::gen(), Timestamp(20, 0));
+
+ ScopedSetShardRole scopedSetShardRole1(operationContext(), kNss, shardVersion1, boost::none);
+ ScopedSetShardRole scopedSetShardRole2(
+ operationContext(), kAnotherNss, shardVersion2, boost::none);
+
+ auto& oss = OperationShardingState::get(operationContext());
+ ASSERT_EQ(shardVersion1, *oss.getShardVersion(kNss));
+ ASSERT_EQ(shardVersion2, *oss.getShardVersion(kAnotherNss));
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/range_deletion_util_test.cpp b/src/mongo/db/s/range_deletion_util_test.cpp
index ed8c0b5e4e6..5fe823cc79b 100644
--- a/src/mongo/db/s/range_deletion_util_test.cpp
+++ b/src/mongo/db/s/range_deletion_util_test.cpp
@@ -120,8 +120,8 @@ public:
makeStandaloneRoutingTableHistory(std::move(rt)),
boost::none);
if (!OperationShardingState::isOperationVersioned(_opCtx)) {
- OperationShardingState::get(_opCtx).initializeClientRoutingVersions(
- kNss, cm.getVersion(ShardId("dummyShardId")), boost::none);
+ OperationShardingState::setShardRole(
+ _opCtx, kNss, cm.getVersion(ShardId("dummyShardId")), boost::none);
}
AutoGetDb autoDb(_opCtx, kNss.db(), MODE_IX);
Lock::CollectionLock collLock(_opCtx, kNss, MODE_IX);
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
index b950fd5fd61..9132438053a 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
@@ -279,9 +279,10 @@ bool ReshardingCollectionCloner::doOneBatch(OperationContext* opCtx, Pipeline& p
// the temporary resharding collection. We attach shard version IGNORED to the insert operations
// and retry once on a StaleConfig exception to allow the collection metadata information to be
// recovered.
- auto& oss = OperationShardingState::get(opCtx);
- oss.initializeClientRoutingVersions(
- _outputNss, ChunkVersion::IGNORED() /* shardVersion */, boost::none /* dbVersion */);
+ ScopedSetShardRole scopedSetShardRole(opCtx,
+ _outputNss,
+ ChunkVersion::IGNORED() /* shardVersion */,
+ boost::none /* databaseVersion */);
int bytesInserted = resharding::data_copy::withOneStaleConfigRetry(
opCtx, [&] { return resharding::data_copy::insertBatch(opCtx, _outputNss, batch); });
diff --git a/src/mongo/db/s/resharding_collection_test.cpp b/src/mongo/db/s/resharding/resharding_collection_test.cpp
index 71e3c780990..71e3c780990 100644
--- a/src/mongo/db/s/resharding_collection_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_collection_test.cpp
diff --git a/src/mongo/db/s/resharding_destined_recipient_test.cpp b/src/mongo/db/s/resharding/resharding_destined_recipient_test.cpp
index 23f0cb3ffc8..fadc390a510 100644
--- a/src/mongo/db/s/resharding_destined_recipient_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_destined_recipient_test.cpp
@@ -295,8 +295,7 @@ TEST_F(DestinedRecipientTest, TestGetDestinedRecipient) {
auto env = setupReshardingEnv(opCtx, true);
AutoGetCollection coll(opCtx, kNss, MODE_IX);
- OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- kNss, env.version, env.dbVersion);
+ OperationShardingState::setShardRole(opCtx, kNss, env.version, env.dbVersion);
ShardingWriteRouter shardingWriteRouter(opCtx, kNss, Grid::get(opCtx)->catalogCache());
auto destShardId =
@@ -311,8 +310,7 @@ TEST_F(DestinedRecipientTest, TestGetDestinedRecipientThrowsOnBlockedRefresh) {
{
AutoGetCollection coll(opCtx, kNss, MODE_IX);
- OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- kNss, env.version, env.dbVersion);
+ OperationShardingState::setShardRole(opCtx, kNss, env.version, env.dbVersion);
FailPointEnableBlock failPoint("blockCollectionCacheLookup");
ASSERT_THROWS_WITH_CHECK(ShardingWriteRouter(opCtx, kNss, Grid::get(opCtx)->catalogCache()),
@@ -332,8 +330,7 @@ TEST_F(DestinedRecipientTest, TestOpObserverSetsDestinedRecipientOnInserts) {
auto opCtx = operationContext();
auto env = setupReshardingEnv(opCtx, true);
- OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- kNss, env.version, env.dbVersion);
+ OperationShardingState::setShardRole(opCtx, kNss, env.version, env.dbVersion);
writeDoc(opCtx, kNss, BSON("_id" << 0 << "x" << 2 << "y" << 10), env);
auto entry = getLastOplogEntry(opCtx);
@@ -347,8 +344,7 @@ TEST_F(DestinedRecipientTest, TestOpObserverSetsDestinedRecipientOnInsertsInTran
auto opCtx = operationContext();
auto env = setupReshardingEnv(opCtx, true);
- OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- kNss, env.version, env.dbVersion);
+ OperationShardingState::setShardRole(opCtx, kNss, env.version, env.dbVersion);
runInTransaction(
opCtx, [&]() { writeDoc(opCtx, kNss, BSON("_id" << 0 << "x" << 2 << "y" << 10), env); });
@@ -375,8 +371,7 @@ TEST_F(DestinedRecipientTest, TestOpObserverSetsDestinedRecipientOnUpdates) {
auto env = setupReshardingEnv(opCtx, true);
- OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- kNss, env.version, env.dbVersion);
+ OperationShardingState::setShardRole(opCtx, kNss, env.version, env.dbVersion);
updateDoc(opCtx, kNss, BSON("_id" << 0), BSON("$set" << BSON("z" << 50)), env);
auto entry = getLastOplogEntry(opCtx);
@@ -395,8 +390,7 @@ TEST_F(DestinedRecipientTest, TestOpObserverSetsDestinedRecipientOnMultiUpdates)
auto env = setupReshardingEnv(opCtx, true);
- OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- kNss, ChunkVersion::IGNORED(), env.dbVersion);
+ OperationShardingState::setShardRole(opCtx, kNss, ChunkVersion::IGNORED(), env.dbVersion);
client.update(kNss.ns(),
BSON("x" << 0),
BSON("$set" << BSON("z" << 5)),
@@ -418,8 +412,7 @@ TEST_F(DestinedRecipientTest, TestOpObserverSetsDestinedRecipientOnUpdatesOutOfP
auto env = setupReshardingEnv(opCtx, true);
- OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- kNss, env.version, env.dbVersion);
+ OperationShardingState::setShardRole(opCtx, kNss, env.version, env.dbVersion);
updateDoc(opCtx, kNss, BSON("_id" << 0), BSON("$set" << BSON("z" << 50)), env);
auto entry = getLastOplogEntry(opCtx);
@@ -437,8 +430,7 @@ TEST_F(DestinedRecipientTest, TestOpObserverSetsDestinedRecipientOnUpdatesInTran
auto env = setupReshardingEnv(opCtx, true);
- OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- kNss, env.version, env.dbVersion);
+ OperationShardingState::setShardRole(opCtx, kNss, env.version, env.dbVersion);
runInTransaction(opCtx, [&]() {
updateDoc(opCtx, kNss, BSON("_id" << 0), BSON("$set" << BSON("z" << 50)), env);
});
@@ -466,8 +458,7 @@ TEST_F(DestinedRecipientTest, TestOpObserverSetsDestinedRecipientOnDeletes) {
auto env = setupReshardingEnv(opCtx, true);
- OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- kNss, env.version, env.dbVersion);
+ OperationShardingState::setShardRole(opCtx, kNss, env.version, env.dbVersion);
deleteDoc(opCtx, kNss, BSON("_id" << 0), env);
auto entry = getLastOplogEntry(opCtx);
@@ -485,8 +476,7 @@ TEST_F(DestinedRecipientTest, TestOpObserverSetsDestinedRecipientOnDeletesInTran
auto env = setupReshardingEnv(opCtx, true);
- OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- kNss, env.version, env.dbVersion);
+ OperationShardingState::setShardRole(opCtx, kNss, env.version, env.dbVersion);
runInTransaction(opCtx, [&]() { deleteDoc(opCtx, kNss, BSON("_id" << 0), env); });
// Look for destined recipient in latest oplog entry. Since this write was done in a
@@ -512,8 +502,7 @@ TEST_F(DestinedRecipientTest, TestUpdateChangesOwningShardThrows) {
auto env = setupReshardingEnv(opCtx, true);
- OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- kNss, env.version, env.dbVersion);
+ OperationShardingState::setShardRole(opCtx, kNss, env.version, env.dbVersion);
ASSERT_THROWS(runInTransaction(opCtx,
[&]() {
updateDoc(opCtx,
@@ -533,8 +522,7 @@ TEST_F(DestinedRecipientTest, TestUpdateSameOwningShard) {
auto env = setupReshardingEnv(opCtx, true);
- OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- kNss, env.version, env.dbVersion);
+ OperationShardingState::setShardRole(opCtx, kNss, env.version, env.dbVersion);
runInTransaction(opCtx, [&]() {
updateDoc(opCtx, kNss, BSON("_id" << 0 << "x" << 2), BSON("$set" << BSON("y" << 3)), env);
});
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
index ce285e48b9a..f3c2abb8ac7 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
@@ -27,10 +27,6 @@
* it in the license file.
*/
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/s/resharding/resharding_donor_recipient_common_test.h"
-
#include "mongo/db/catalog/drop_database.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/db_raii.h"
@@ -50,6 +46,291 @@ namespace {
using namespace fmt::literals;
+/**
+ * This test fixture does not create any resharding POSs and should be preferred to
+ * `ReshardingDonorRecipientCommonTest` when they are not required.
+ */
+class ReshardingDonorRecipientCommonInternalsTest : public ShardServerTestFixture {
+public:
+ const UUID kExistingUUID = UUID::gen();
+ const Timestamp kExistingTimestamp = Timestamp(10, 5);
+ const NamespaceString kOriginalNss = NamespaceString("db", "foo");
+
+ const NamespaceString kTemporaryReshardingNss =
+ constructTemporaryReshardingNss("db", kExistingUUID);
+ const std::string kOriginalShardKey = "oldKey";
+ const BSONObj kOriginalShardKeyPattern = BSON(kOriginalShardKey << 1);
+ const std::string kReshardingKey = "newKey";
+ const BSONObj kReshardingKeyPattern = BSON(kReshardingKey << 1);
+ const OID kOriginalEpoch = OID::gen();
+ const OID kReshardingEpoch = OID::gen();
+ const UUID kReshardingUUID = UUID::gen();
+ const Timestamp kReshardingTimestamp = Timestamp(kExistingTimestamp.getSecs() + 1, 0);
+
+ const DonorShardFetchTimestamp kThisShard =
+ makeDonorShardFetchTimestamp(ShardId("shardOne"), Timestamp(10, 0));
+ const DonorShardFetchTimestamp kOtherShard =
+ makeDonorShardFetchTimestamp(ShardId("shardTwo"), Timestamp(20, 0));
+
+ const std::vector<DonorShardFetchTimestamp> kShards = {kThisShard, kOtherShard};
+
+ const Timestamp kCloneTimestamp = Timestamp(20, 0);
+
+protected:
+ CollectionMetadata makeShardedMetadataForOriginalCollection(
+ OperationContext* opCtx, const ShardId& shardThatChunkExistsOn) {
+ return makeShardedMetadata(opCtx,
+ kOriginalNss,
+ kOriginalShardKey,
+ kOriginalShardKeyPattern,
+ kExistingUUID,
+ kExistingTimestamp,
+ kOriginalEpoch,
+ shardThatChunkExistsOn);
+ }
+
+ CollectionMetadata makeShardedMetadataForTemporaryReshardingCollection(
+ OperationContext* opCtx, const ShardId& shardThatChunkExistsOn) {
+ return makeShardedMetadata(opCtx,
+ kTemporaryReshardingNss,
+ kReshardingKey,
+ kReshardingKeyPattern,
+ kReshardingUUID,
+ kReshardingTimestamp,
+ kReshardingEpoch,
+ shardThatChunkExistsOn);
+ }
+
+ CollectionMetadata makeShardedMetadata(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const std::string& shardKey,
+ const BSONObj& shardKeyPattern,
+ const UUID& uuid,
+ const Timestamp& timestamp,
+ const OID& epoch,
+ const ShardId& shardThatChunkExistsOn) {
+ auto range = ChunkRange(BSON(shardKey << MINKEY), BSON(shardKey << MAXKEY));
+ auto chunk = ChunkType(
+ uuid, std::move(range), ChunkVersion(1, 0, epoch, timestamp), shardThatChunkExistsOn);
+ ChunkManager cm(kThisShard.getShardId(),
+ DatabaseVersion(uuid, timestamp),
+ makeStandaloneRoutingTableHistory(
+ RoutingTableHistory::makeNew(nss,
+ uuid,
+ shardKeyPattern,
+ nullptr,
+ false,
+ epoch,
+ timestamp,
+ boost::none /* timeseriesFields */,
+ boost::none,
+ boost::none /* chunkSizeBytes */,
+ true,
+ {std::move(chunk)})),
+ boost::none);
+
+ if (!OperationShardingState::isOperationVersioned(opCtx)) {
+ OperationShardingState::setShardRole(
+ opCtx, nss, cm.getVersion(kThisShard.getShardId()), boost::none);
+ }
+
+ return CollectionMetadata(std::move(cm), kThisShard.getShardId());
+ }
+
+ ReshardingDonorDocument makeDonorStateDoc() {
+ DonorShardContext donorCtx;
+ donorCtx.setState(DonorStateEnum::kPreparingToDonate);
+
+ ReshardingDonorDocument doc(std::move(donorCtx),
+ {kThisShard.getShardId(), kOtherShard.getShardId()});
+
+ NamespaceString sourceNss = kOriginalNss;
+ auto sourceUUID = UUID::gen();
+ auto commonMetadata = CommonReshardingMetadata(
+ UUID::gen(), sourceNss, sourceUUID, kTemporaryReshardingNss, kReshardingKeyPattern);
+
+ doc.setCommonReshardingMetadata(std::move(commonMetadata));
+ return doc;
+ }
+
+ ReshardingRecipientDocument makeRecipientStateDoc() {
+ RecipientShardContext recipCtx;
+ recipCtx.setState(RecipientStateEnum::kCloning);
+
+ ReshardingRecipientDocument doc(
+ std::move(recipCtx), {kThisShard.getShardId(), kOtherShard.getShardId()}, 1000);
+
+ NamespaceString sourceNss = kOriginalNss;
+ auto sourceUUID = UUID::gen();
+ auto commonMetadata = CommonReshardingMetadata(
+ UUID::gen(), sourceNss, sourceUUID, kTemporaryReshardingNss, kReshardingKeyPattern);
+
+ doc.setCommonReshardingMetadata(std::move(commonMetadata));
+
+ // A document in the cloning state requires a clone timestamp.
+ doc.setCloneTimestamp(kCloneTimestamp);
+ return doc;
+ }
+
+ ReshardingFields createCommonReshardingFields(const UUID& reshardingUUID,
+ CoordinatorStateEnum state) {
+ auto fields = ReshardingFields(reshardingUUID);
+ fields.setState(state);
+ return fields;
+ };
+
+ void appendDonorFieldsToReshardingFields(ReshardingFields& fields,
+ const BSONObj& reshardingKey) {
+ std::vector<ShardId> donorShardIds;
+ for (const auto& shard : kShards) {
+ donorShardIds.emplace_back(shard.getShardId());
+ }
+
+ fields.setDonorFields(
+ TypeCollectionDonorFields(kTemporaryReshardingNss, reshardingKey, donorShardIds));
+ }
+
+ void appendRecipientFieldsToReshardingFields(
+ ReshardingFields& fields,
+ const std::vector<DonorShardFetchTimestamp> donorShards,
+ const UUID& existingUUID,
+ const NamespaceString& originalNss,
+ const boost::optional<Timestamp>& cloneTimestamp = boost::none) {
+ auto recipientFields =
+ TypeCollectionRecipientFields(donorShards, existingUUID, originalNss, 5000);
+ emplaceCloneTimestampIfExists(recipientFields, cloneTimestamp);
+ fields.setRecipientFields(std::move(recipientFields));
+ }
+
+ template <class ReshardingDocument>
+ void assertCommonDocFieldsMatchReshardingFields(const NamespaceString& nss,
+ const UUID& reshardingUUID,
+ const UUID& existingUUID,
+ const BSONObj& reshardingKey,
+ const ReshardingDocument& reshardingDoc) {
+ ASSERT_EQ(reshardingDoc.getReshardingUUID(), reshardingUUID);
+ ASSERT_EQ(reshardingDoc.getSourceNss(), nss);
+ ASSERT_EQ(reshardingDoc.getSourceUUID(), existingUUID);
+ ASSERT_BSONOBJ_EQ(reshardingDoc.getReshardingKey().toBSON(), reshardingKey);
+ }
+
+ void assertDonorDocMatchesReshardingFields(const NamespaceString& nss,
+ const UUID& existingUUID,
+ const ReshardingFields& reshardingFields,
+ const ReshardingDonorDocument& donorDoc) {
+ assertCommonDocFieldsMatchReshardingFields<ReshardingDonorDocument>(
+ nss,
+ reshardingFields.getReshardingUUID(),
+ existingUUID,
+ reshardingFields.getDonorFields()->getReshardingKey().toBSON(),
+ donorDoc);
+ ASSERT(donorDoc.getMutableState().getState() == DonorStateEnum::kPreparingToDonate);
+ ASSERT(donorDoc.getMutableState().getMinFetchTimestamp() == boost::none);
+ }
+
+ void assertRecipientDocMatchesReshardingFields(
+ const CollectionMetadata& metadata,
+ const ReshardingFields& reshardingFields,
+ const ReshardingRecipientDocument& recipientDoc) {
+ assertCommonDocFieldsMatchReshardingFields<ReshardingRecipientDocument>(
+ reshardingFields.getRecipientFields()->getSourceNss(),
+ reshardingFields.getReshardingUUID(),
+ reshardingFields.getRecipientFields()->getSourceUUID(),
+ metadata.getShardKeyPattern().toBSON(),
+ recipientDoc);
+
+ ASSERT(recipientDoc.getMutableState().getState() ==
+ RecipientStateEnum::kAwaitingFetchTimestamp);
+ ASSERT(!recipientDoc.getCloneTimestamp());
+
+ const auto donorShards = reshardingFields.getRecipientFields()->getDonorShards();
+ std::map<ShardId, DonorShardFetchTimestamp> donorShardMap;
+ for (const auto& donor : donorShards) {
+ donorShardMap.emplace(donor.getShardId(), donor);
+ }
+
+ for (const auto& donorShardFromRecipientDoc : recipientDoc.getDonorShards()) {
+ auto donorIter = donorShardMap.find(donorShardFromRecipientDoc.getShardId());
+ ASSERT(donorIter != donorShardMap.end());
+ ASSERT_EQ(donorIter->second.getMinFetchTimestamp().has_value(),
+ donorShardFromRecipientDoc.getMinFetchTimestamp().has_value());
+
+ if (donorIter->second.getMinFetchTimestamp()) {
+ ASSERT_EQ(*donorIter->second.getMinFetchTimestamp(),
+ *donorShardFromRecipientDoc.getMinFetchTimestamp());
+ }
+
+ donorShardMap.erase(donorShardFromRecipientDoc.getShardId());
+ }
+
+ ASSERT(donorShardMap.empty());
+ }
+
+private:
+ DonorShardFetchTimestamp makeDonorShardFetchTimestamp(
+ ShardId shardId, boost::optional<Timestamp> fetchTimestamp) {
+ DonorShardFetchTimestamp donorFetchTimestamp(shardId);
+ donorFetchTimestamp.setMinFetchTimestamp(fetchTimestamp);
+ return donorFetchTimestamp;
+ }
+};
+
+/**
+ * This fixture starts with the above internals test and also creates (notably) the resharding donor
+ * and recipient POSs.
+ */
+class ReshardingDonorRecipientCommonTest : public ReshardingDonorRecipientCommonInternalsTest {
+public:
+ void setUp() override {
+ ShardServerTestFixture::setUp();
+
+ WaitForMajorityService::get(getServiceContext()).startup(getServiceContext());
+
+ _primaryOnlyServiceRegistry = repl::PrimaryOnlyServiceRegistry::get(getServiceContext());
+
+ std::unique_ptr<ReshardingDonorService> donorService =
+ std::make_unique<ReshardingDonorService>(getServiceContext());
+ _primaryOnlyServiceRegistry->registerService(std::move(donorService));
+
+ std::unique_ptr<ReshardingRecipientService> recipientService =
+ std::make_unique<ReshardingRecipientService>(getServiceContext());
+ _primaryOnlyServiceRegistry->registerService(std::move(recipientService));
+ _primaryOnlyServiceRegistry->onStartup(operationContext());
+
+ stepUp();
+ }
+
+ void tearDown() override {
+ WaitForMajorityService::get(getServiceContext()).shutDown();
+
+ Grid::get(operationContext())->getExecutorPool()->shutdownAndJoin();
+
+ _primaryOnlyServiceRegistry->onShutdown();
+
+ Grid::get(operationContext())->clearForUnitTests();
+
+ ShardServerTestFixture::tearDown();
+ }
+
+ void stepUp() {
+ auto replCoord = repl::ReplicationCoordinator::get(getServiceContext());
+
+ // Advance term
+ _term++;
+
+ ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
+ ASSERT_OK(replCoord->updateTerm(operationContext(), _term));
+ replCoord->setMyLastAppliedOpTimeAndWallTime(
+ repl::OpTimeAndWallTime(repl::OpTime(Timestamp(1, 1), _term), Date_t()));
+
+ _primaryOnlyServiceRegistry->onStepUpComplete(operationContext(), _term);
+ }
+
+protected:
+ repl::PrimaryOnlyServiceRegistry* _primaryOnlyServiceRegistry;
+ long long _term = 0;
+};
+
TEST_F(ReshardingDonorRecipientCommonInternalsTest, ConstructDonorDocumentFromReshardingFields) {
OperationContext* opCtx = operationContext();
auto metadata = makeShardedMetadataForOriginalCollection(opCtx, kThisShard.getShardId());
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h
deleted file mode 100644
index ff5d2ffa67b..00000000000
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h
+++ /dev/null
@@ -1,334 +0,0 @@
-/**
- * Copyright (C) 2020-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/catalog_raii.h"
-#include "mongo/db/db_raii.h"
-#include "mongo/db/repl/wait_for_majority_service.h"
-#include "mongo/db/s/collection_sharding_runtime.h"
-#include "mongo/db/s/operation_sharding_state.h"
-#include "mongo/db/s/resharding/resharding_donor_recipient_common.h"
-#include "mongo/db/s/shard_server_test_fixture.h"
-#include "mongo/unittest/death_test.h"
-#include "mongo/util/fail_point.h"
-
-namespace mongo {
-namespace {
-
-using namespace fmt::literals;
-
-/**
- * This test fixture does not create any resharding POSs and should be preferred to
- * `ReshardingDonorRecipientCommonTest` when they are not required.
- */
-class ReshardingDonorRecipientCommonInternalsTest : public ShardServerTestFixture {
-public:
- const UUID kExistingUUID = UUID::gen();
- const Timestamp kExistingTimestamp = Timestamp(10, 5);
- const NamespaceString kOriginalNss = NamespaceString("db", "foo");
-
- const NamespaceString kTemporaryReshardingNss =
- constructTemporaryReshardingNss("db", kExistingUUID);
- const std::string kOriginalShardKey = "oldKey";
- const BSONObj kOriginalShardKeyPattern = BSON(kOriginalShardKey << 1);
- const std::string kReshardingKey = "newKey";
- const BSONObj kReshardingKeyPattern = BSON(kReshardingKey << 1);
- const OID kOriginalEpoch = OID::gen();
- const OID kReshardingEpoch = OID::gen();
- const UUID kReshardingUUID = UUID::gen();
- const Timestamp kReshardingTimestamp = Timestamp(kExistingTimestamp.getSecs() + 1, 0);
-
- const DonorShardFetchTimestamp kThisShard =
- makeDonorShardFetchTimestamp(ShardId("shardOne"), Timestamp(10, 0));
- const DonorShardFetchTimestamp kOtherShard =
- makeDonorShardFetchTimestamp(ShardId("shardTwo"), Timestamp(20, 0));
-
- const std::vector<DonorShardFetchTimestamp> kShards = {kThisShard, kOtherShard};
-
- const Timestamp kCloneTimestamp = Timestamp(20, 0);
-
-protected:
- CollectionMetadata makeShardedMetadataForOriginalCollection(
- OperationContext* opCtx, const ShardId& shardThatChunkExistsOn) {
- return makeShardedMetadata(opCtx,
- kOriginalNss,
- kOriginalShardKey,
- kOriginalShardKeyPattern,
- kExistingUUID,
- kExistingTimestamp,
- kOriginalEpoch,
- shardThatChunkExistsOn);
- }
-
- CollectionMetadata makeShardedMetadataForTemporaryReshardingCollection(
- OperationContext* opCtx, const ShardId& shardThatChunkExistsOn) {
- return makeShardedMetadata(opCtx,
- kTemporaryReshardingNss,
- kReshardingKey,
- kReshardingKeyPattern,
- kReshardingUUID,
- kReshardingTimestamp,
- kReshardingEpoch,
- shardThatChunkExistsOn);
- }
-
- CollectionMetadata makeShardedMetadata(OperationContext* opCtx,
- const NamespaceString& nss,
- const std::string& shardKey,
- const BSONObj& shardKeyPattern,
- const UUID& uuid,
- const Timestamp& timestamp,
- const OID& epoch,
- const ShardId& shardThatChunkExistsOn) {
- auto range = ChunkRange(BSON(shardKey << MINKEY), BSON(shardKey << MAXKEY));
- auto chunk = ChunkType(
- uuid, std::move(range), ChunkVersion(1, 0, epoch, timestamp), shardThatChunkExistsOn);
- ChunkManager cm(kThisShard.getShardId(),
- DatabaseVersion(uuid, timestamp),
- makeStandaloneRoutingTableHistory(
- RoutingTableHistory::makeNew(nss,
- uuid,
- shardKeyPattern,
- nullptr,
- false,
- epoch,
- timestamp,
- boost::none /* timeseriesFields */,
- boost::none,
- boost::none /* chunkSizeBytes */,
- true,
- {std::move(chunk)})),
- boost::none);
-
- if (!OperationShardingState::isOperationVersioned(opCtx)) {
- OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- nss, cm.getVersion(kThisShard.getShardId()), boost::none);
- }
-
- return CollectionMetadata(std::move(cm), kThisShard.getShardId());
- }
-
- ReshardingDonorDocument makeDonorStateDoc() {
- DonorShardContext donorCtx;
- donorCtx.setState(DonorStateEnum::kPreparingToDonate);
-
- ReshardingDonorDocument doc(std::move(donorCtx),
- {kThisShard.getShardId(), kOtherShard.getShardId()});
-
- NamespaceString sourceNss = kOriginalNss;
- auto sourceUUID = UUID::gen();
- auto commonMetadata = CommonReshardingMetadata(
- UUID::gen(), sourceNss, sourceUUID, kTemporaryReshardingNss, kReshardingKeyPattern);
-
- doc.setCommonReshardingMetadata(std::move(commonMetadata));
- return doc;
- }
-
- ReshardingRecipientDocument makeRecipientStateDoc() {
- RecipientShardContext recipCtx;
- recipCtx.setState(RecipientStateEnum::kCloning);
-
- ReshardingRecipientDocument doc(
- std::move(recipCtx), {kThisShard.getShardId(), kOtherShard.getShardId()}, 1000);
-
- NamespaceString sourceNss = kOriginalNss;
- auto sourceUUID = UUID::gen();
- auto commonMetadata = CommonReshardingMetadata(
- UUID::gen(), sourceNss, sourceUUID, kTemporaryReshardingNss, kReshardingKeyPattern);
-
- doc.setCommonReshardingMetadata(std::move(commonMetadata));
-
- // A document in the cloning state requires a clone timestamp.
- doc.setCloneTimestamp(kCloneTimestamp);
- return doc;
- }
-
- ReshardingFields createCommonReshardingFields(const UUID& reshardingUUID,
- CoordinatorStateEnum state) {
- auto fields = ReshardingFields(reshardingUUID);
- fields.setState(state);
- return fields;
- };
-
- void appendDonorFieldsToReshardingFields(ReshardingFields& fields,
- const BSONObj& reshardingKey) {
- std::vector<ShardId> donorShardIds;
- for (const auto& shard : kShards) {
- donorShardIds.emplace_back(shard.getShardId());
- }
-
- fields.setDonorFields(
- TypeCollectionDonorFields(kTemporaryReshardingNss, reshardingKey, donorShardIds));
- }
-
- void appendRecipientFieldsToReshardingFields(
- ReshardingFields& fields,
- const std::vector<DonorShardFetchTimestamp> donorShards,
- const UUID& existingUUID,
- const NamespaceString& originalNss,
- const boost::optional<Timestamp>& cloneTimestamp = boost::none) {
- auto recipientFields =
- TypeCollectionRecipientFields(donorShards, existingUUID, originalNss, 5000);
- emplaceCloneTimestampIfExists(recipientFields, cloneTimestamp);
- fields.setRecipientFields(std::move(recipientFields));
- }
-
- template <class ReshardingDocument>
- void assertCommonDocFieldsMatchReshardingFields(const NamespaceString& nss,
- const UUID& reshardingUUID,
- const UUID& existingUUID,
- const BSONObj& reshardingKey,
- const ReshardingDocument& reshardingDoc) {
- ASSERT_EQ(reshardingDoc.getReshardingUUID(), reshardingUUID);
- ASSERT_EQ(reshardingDoc.getSourceNss(), nss);
- ASSERT_EQ(reshardingDoc.getSourceUUID(), existingUUID);
- ASSERT_BSONOBJ_EQ(reshardingDoc.getReshardingKey().toBSON(), reshardingKey);
- }
-
- void assertDonorDocMatchesReshardingFields(const NamespaceString& nss,
- const UUID& existingUUID,
- const ReshardingFields& reshardingFields,
- const ReshardingDonorDocument& donorDoc) {
- assertCommonDocFieldsMatchReshardingFields<ReshardingDonorDocument>(
- nss,
- reshardingFields.getReshardingUUID(),
- existingUUID,
- reshardingFields.getDonorFields()->getReshardingKey().toBSON(),
- donorDoc);
- ASSERT(donorDoc.getMutableState().getState() == DonorStateEnum::kPreparingToDonate);
- ASSERT(donorDoc.getMutableState().getMinFetchTimestamp() == boost::none);
- }
-
- void assertRecipientDocMatchesReshardingFields(
- const CollectionMetadata& metadata,
- const ReshardingFields& reshardingFields,
- const ReshardingRecipientDocument& recipientDoc) {
- assertCommonDocFieldsMatchReshardingFields<ReshardingRecipientDocument>(
- reshardingFields.getRecipientFields()->getSourceNss(),
- reshardingFields.getReshardingUUID(),
- reshardingFields.getRecipientFields()->getSourceUUID(),
- metadata.getShardKeyPattern().toBSON(),
- recipientDoc);
-
- ASSERT(recipientDoc.getMutableState().getState() ==
- RecipientStateEnum::kAwaitingFetchTimestamp);
- ASSERT(!recipientDoc.getCloneTimestamp());
-
- const auto donorShards = reshardingFields.getRecipientFields()->getDonorShards();
- std::map<ShardId, DonorShardFetchTimestamp> donorShardMap;
- for (const auto& donor : donorShards) {
- donorShardMap.emplace(donor.getShardId(), donor);
- }
-
- for (const auto& donorShardFromRecipientDoc : recipientDoc.getDonorShards()) {
- auto donorIter = donorShardMap.find(donorShardFromRecipientDoc.getShardId());
- ASSERT(donorIter != donorShardMap.end());
- ASSERT_EQ(donorIter->second.getMinFetchTimestamp().has_value(),
- donorShardFromRecipientDoc.getMinFetchTimestamp().has_value());
-
- if (donorIter->second.getMinFetchTimestamp()) {
- ASSERT_EQ(*donorIter->second.getMinFetchTimestamp(),
- *donorShardFromRecipientDoc.getMinFetchTimestamp());
- }
-
- donorShardMap.erase(donorShardFromRecipientDoc.getShardId());
- }
-
- ASSERT(donorShardMap.empty());
- }
-
-private:
- DonorShardFetchTimestamp makeDonorShardFetchTimestamp(
- ShardId shardId, boost::optional<Timestamp> fetchTimestamp) {
- DonorShardFetchTimestamp donorFetchTimestamp(shardId);
- donorFetchTimestamp.setMinFetchTimestamp(fetchTimestamp);
- return donorFetchTimestamp;
- }
-};
-
-/**
- * This fixture starts with the above internals test and also creates (notably) the resharding donor
- * and recipient POSs.
- */
-class ReshardingDonorRecipientCommonTest : public ReshardingDonorRecipientCommonInternalsTest {
-public:
- void setUp() override {
- ShardServerTestFixture::setUp();
-
- WaitForMajorityService::get(getServiceContext()).startup(getServiceContext());
-
- _primaryOnlyServiceRegistry = repl::PrimaryOnlyServiceRegistry::get(getServiceContext());
-
- std::unique_ptr<ReshardingDonorService> donorService =
- std::make_unique<ReshardingDonorService>(getServiceContext());
- _primaryOnlyServiceRegistry->registerService(std::move(donorService));
-
- std::unique_ptr<ReshardingRecipientService> recipientService =
- std::make_unique<ReshardingRecipientService>(getServiceContext());
- _primaryOnlyServiceRegistry->registerService(std::move(recipientService));
- _primaryOnlyServiceRegistry->onStartup(operationContext());
-
- stepUp();
- }
-
- void tearDown() override {
- WaitForMajorityService::get(getServiceContext()).shutDown();
-
- Grid::get(operationContext())->getExecutorPool()->shutdownAndJoin();
-
- _primaryOnlyServiceRegistry->onShutdown();
-
- Grid::get(operationContext())->clearForUnitTests();
-
- ShardServerTestFixture::tearDown();
- }
-
- void stepUp() {
- auto replCoord = repl::ReplicationCoordinator::get(getServiceContext());
-
- // Advance term
- _term++;
-
- ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
- ASSERT_OK(replCoord->updateTerm(operationContext(), _term));
- replCoord->setMyLastAppliedOpTimeAndWallTime(
- repl::OpTimeAndWallTime(repl::OpTime(Timestamp(1, 1), _term), Date_t()));
-
- _primaryOnlyServiceRegistry->onStepUpComplete(operationContext(), _term);
- }
-
-protected:
- repl::PrimaryOnlyServiceRegistry* _primaryOnlyServiceRegistry;
- long long _term = 0;
-};
-
-} // namespace
-
-} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp
index 7e2aacc030e..0842f9aa88c 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp
@@ -81,8 +81,7 @@ void runWithTransaction(OperationContext* opCtx,
// the temporary resharding collection. We attach shard version IGNORED to the write operations
// and leave it to ReshardingOplogBatchApplier::applyBatch() to retry on a StaleConfig exception
// to allow the collection metadata information to be recovered.
- auto& oss = OperationShardingState::get(asr.opCtx());
- oss.initializeClientRoutingVersions(nss, ChunkVersion::IGNORED(), boost::none);
+ ScopedSetShardRole scopedSetShardRole(asr.opCtx(), nss, ChunkVersion::IGNORED(), boost::none);
MongoDOperationContextSession ocs(asr.opCtx());
auto txnParticipant = TransactionParticipant::get(asr.opCtx());
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp
index ee732a68207..e675dca01a8 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp
@@ -85,11 +85,11 @@ SemiFuture<void> ReshardingOplogBatchApplier::applyBatch(
// attach shard version IGNORED to the write operations and retry once
// on a StaleConfig exception to allow the collection metadata
// information to be recovered.
- auto& oss = OperationShardingState::get(opCtx.get());
- oss.initializeClientRoutingVersions(
+ ScopedSetShardRole scopedSetShardRole(
+ opCtx.get(),
_crudApplication.getOutputNss(),
ChunkVersion::IGNORED() /* shardVersion */,
- boost::none /* dbVersion */);
+ boost::none /* databaseVersion */);
resharding::data_copy::withOneStaleConfigRetry(opCtx.get(), [&] {
uassertStatusOK(
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.cpp b/src/mongo/db/s/sharding_ddl_coordinator.cpp
index ffc0e4c7741..3c05fbc25c0 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator.cpp
+++ b/src/mongo/db/s/sharding_ddl_coordinator.cpp
@@ -196,8 +196,12 @@ SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTas
auto* opCtx = opCtxHolder.get();
invariant(metadata().getDatabaseVersion());
- OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- nss(), boost::none /* ChunkVersion */, metadata().getDatabaseVersion());
+ ScopedSetShardRole scopedSetShardRole(
+ opCtx,
+ nss(),
+ boost::none /* shardVersion */,
+ metadata().getDatabaseVersion() /* databaseVersion */);
+
// Check under the dbLock if this is still the primary shard for the database
DatabaseShardingState::checkIsPrimaryShardForDb(opCtx, nss().db());
};
diff --git a/src/mongo/db/s/sharding_write_router_bm.cpp b/src/mongo/db/s/sharding_write_router_bm.cpp
index d7da519f6d5..1118117e90d 100644
--- a/src/mongo/db/s/sharding_write_router_bm.cpp
+++ b/src/mongo/db/s/sharding_write_router_bm.cpp
@@ -147,8 +147,8 @@ std::unique_ptr<CatalogCacheMock> createCatalogCacheMock(OperationContext* opCtx
opCtx->getServiceContext(),
std::make_unique<CollectionShardingStateFactoryShard>(opCtx->getServiceContext()));
- OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- kNss, chunkManager.getVersion(originatorShard), boost::none);
+ OperationShardingState::setShardRole(
+ opCtx, kNss, chunkManager.getVersion(originatorShard), boost::none);
// Configuring the filtering metadata such that calls to getCollectionDescription return what we
// want. Specifically the reshardingFields are what we use. Its specified by the chunkManager.
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 84d99a67405..2cfb3f2286e 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -1575,8 +1575,8 @@ void ExecCommandDatabase::_initiateCommand() {
databaseVersion = DatabaseVersion(databaseVersionElem.Obj());
}
- OperationShardingState::get(opCtx).initializeClientRoutingVersions(
- namespaceForSharding, shardVersion, databaseVersion);
+ OperationShardingState::setShardRole(
+ opCtx, namespaceForSharding, shardVersion, databaseVersion);
}
_scoped = _execContext->behaviors->scopedOperationCompletionShardingActions(opCtx);
diff --git a/src/mongo/s/chunk_version.h b/src/mongo/s/chunk_version.h
index f176e4f0cea..5aca379983e 100644
--- a/src/mongo/s/chunk_version.h
+++ b/src/mongo/s/chunk_version.h
@@ -269,8 +269,11 @@ private:
};
inline std::ostream& operator<<(std::ostream& s, const ChunkVersion& v) {
- s << v.toString();
- return s;
+ return s << v.toString();
+}
+
+inline StringBuilder& operator<<(StringBuilder& s, const ChunkVersion& v) {
+ return s << v.toString();
}
} // namespace mongo
diff --git a/src/mongo/s/database_version.cpp b/src/mongo/s/database_version.cpp
index 37ca371e001..df160cd1eda 100644
--- a/src/mongo/s/database_version.cpp
+++ b/src/mongo/s/database_version.cpp
@@ -52,4 +52,8 @@ bool DatabaseVersion::operator<(const DatabaseVersion& other) const {
}
}
+std::string DatabaseVersion::toString() const {
+ return BSON("dbVersion" << toBSON()).toString();
+}
+
} // namespace mongo
diff --git a/src/mongo/s/database_version.h b/src/mongo/s/database_version.h
index a798f76aa28..5de01497318 100644
--- a/src/mongo/s/database_version.h
+++ b/src/mongo/s/database_version.h
@@ -107,6 +107,16 @@ public:
UUID getUuid() const {
return *DatabaseVersionBase::getUuid();
}
+
+ std::string toString() const;
};
+inline std::ostream& operator<<(std::ostream& s, const DatabaseVersion& v) {
+ return s << v.toString();
+}
+
+inline StringBuilder& operator<<(StringBuilder& s, const DatabaseVersion& v) {
+ return s << v.toString();
+}
+
} // namespace mongo