summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2021-09-21 16:19:56 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-09-21 17:03:47 +0000
commit800c6d40912751e272853e383f4c4bf1f00e5c88 (patch)
treeb65e26e73d5a26fdca4ec297fc0c283cb90f29b4 /src/mongo/db/s
parent536cd38e7ab615181f5586519f60728c52e47108 (diff)
downloadmongo-800c6d40912751e272853e383f4c4bf1f00e5c88.tar.gz
SERVER-58915 Implement ReshardingDonorWriteRouter functionality along…
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/SConscript40
-rw-r--r--src/mongo/db/s/config/config_server_test_fixture.cpp10
-rw-r--r--src/mongo/db/s/config/config_server_test_fixture.h3
-rw-r--r--src/mongo/db/s/op_observer_sharding_impl.cpp19
-rw-r--r--src/mongo/db/s/op_observer_sharding_impl.h15
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp4
-rw-r--r--src/mongo/db/s/resharding_destined_recipient_test.cpp25
-rw-r--r--src/mongo/db/s/resharding_donor_write_router.cpp55
-rw-r--r--src/mongo/db/s/resharding_util.cpp55
-rw-r--r--src/mongo/db/s/resharding_util.h10
-rw-r--r--src/mongo/db/s/sharding_initialization_op_observer_test.cpp2
-rw-r--r--src/mongo/db/s/sharding_mongod_test_fixture.cpp108
-rw-r--r--src/mongo/db/s/sharding_mongod_test_fixture.h5
-rw-r--r--src/mongo/db/s/sharding_write_router.cpp85
-rw-r--r--src/mongo/db/s/sharding_write_router.h (renamed from src/mongo/db/s/resharding_donor_write_router.h)24
16 files changed, 208 insertions, 256 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index da18f30056d..e11ff37a646 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -16,9 +16,9 @@ env.Library(
'collection_sharding_state.cpp',
'database_sharding_state.cpp',
'operation_sharding_state.cpp',
- 'resharding_donor_write_router.cpp',
'sharding_migration_critical_section.cpp',
'sharding_state.cpp',
+ 'sharding_write_router.cpp',
'transaction_coordinator_curop.cpp',
'transaction_coordinator_factory.cpp',
'transaction_coordinator_worker_curop_repository.cpp',
@@ -27,6 +27,7 @@ env.Library(
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/concurrency/lock_manager',
'$BUILD_DIR/mongo/db/range_arithmetic',
+ '$BUILD_DIR/mongo/s/grid',
'$BUILD_DIR/mongo/s/sharding_routing_table',
],
)
@@ -61,7 +62,12 @@ env.Library(
'range_deletion_util.cpp',
'read_only_catalog_cache_loader.cpp',
'recoverable_critical_section_service.cpp',
+ 'resharding_util.cpp',
+ 'resharding/coordinator_document.idl','resharding/document_source_resharding_iterate_transaction.cpp',
'resharding/document_source_resharding_ownership_match.cpp',
+ 'resharding/donor_document.idl',
+ 'resharding/donor_oplog_id.idl',
+ 'resharding/recipient_document.idl',
'resharding/resharding_change_event_o2_field.idl',
'resharding/resharding_collection_cloner.cpp',
'resharding/resharding_coordinator_commit_monitor.cpp',
@@ -145,35 +151,6 @@ env.Library(
'$BUILD_DIR/mongo/db/session_catalog',
'$BUILD_DIR/mongo/idl/server_parameter',
'$BUILD_DIR/mongo/util/future_util',
- 'resharding_util',
- ],
-)
-
-# Be careful about adding dependencies to this library, as any dependencies will currently be
-# included in mongo_embedded.
-env.Library(
- target='resharding_util',
- source=[
- 'resharding_util.cpp',
- 'resharding/coordinator_document.idl',
- 'resharding/document_source_resharding_iterate_transaction.cpp',
- 'resharding/donor_document.idl',
- 'resharding/donor_oplog_id.idl',
- 'resharding/recipient_document.idl',
- ],
- LIBDEPS_PRIVATE=[
- '$BUILD_DIR/mongo/base',
- '$BUILD_DIR/mongo/db/catalog/collection_options',
- '$BUILD_DIR/mongo/db/common',
- '$BUILD_DIR/mongo/db/concurrency/write_conflict_exception',
- '$BUILD_DIR/mongo/db/curop',
- '$BUILD_DIR/mongo/db/namespace_string',
- '$BUILD_DIR/mongo/db/pipeline/expression_context',
- '$BUILD_DIR/mongo/db/pipeline/pipeline',
- '$BUILD_DIR/mongo/db/storage/write_unit_of_work',
- '$BUILD_DIR/mongo/s/async_requests_sender',
- '$BUILD_DIR/mongo/s/grid',
- 'sharding_api_d',
],
)
@@ -392,7 +369,6 @@ env.Library(
'$BUILD_DIR/mongo/s/commands/shared_cluster_commands',
'$BUILD_DIR/mongo/s/sharding_initialization',
'$BUILD_DIR/mongo/s/sharding_router_api',
- 'resharding_util',
'sharding_runtime_d',
],
)
@@ -543,7 +519,6 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock',
'$BUILD_DIR/mongo/s/sharding_router_test_fixture',
- 'resharding_util',
'shard_server_test_fixture',
'sharding_commands_d',
'sharding_logging',
@@ -597,7 +572,6 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/timeseries/timeseries_options',
'$BUILD_DIR/mongo/util/version_impl',
'config_server_test_fixture',
- 'resharding_util',
],
)
diff --git a/src/mongo/db/s/config/config_server_test_fixture.cpp b/src/mongo/db/s/config/config_server_test_fixture.cpp
index 3144add6236..aa573a94deb 100644
--- a/src/mongo/db/s/config/config_server_test_fixture.cpp
+++ b/src/mongo/db/s/config/config_server_test_fixture.cpp
@@ -43,6 +43,8 @@
#include "mongo/db/commands.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer.h"
+#include "mongo/db/op_observer_impl.h"
+#include "mongo/db/op_observer_registry.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/query_request_helper.h"
@@ -51,6 +53,7 @@
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/s/config/sharding_catalog_manager.h"
+#include "mongo/db/s/config_server_op_observer.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
@@ -473,4 +476,11 @@ void ConfigServerTestFixture::expectSetShardVersion(
});
}
+void ConfigServerTestFixture::setupOpObservers() {
+ auto opObserverRegistry =
+ checked_cast<OpObserverRegistry*>(getServiceContext()->getOpObserver());
+ opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>());
+ opObserverRegistry->addObserver(std::make_unique<ConfigServerOpObserver>());
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/config/config_server_test_fixture.h b/src/mongo/db/s/config/config_server_test_fixture.h
index 5e00755a652..821b8987406 100644
--- a/src/mongo/db/s/config/config_server_test_fixture.h
+++ b/src/mongo/db/s/config/config_server_test_fixture.h
@@ -194,6 +194,9 @@ protected:
std::unique_ptr<BalancerConfiguration> makeBalancerConfiguration() override;
+protected:
+ void setupOpObservers() override;
+
private:
/**
* 'onPreInitGlobalStateFn' is invoked near the end of _setUp() before calling
diff --git a/src/mongo/db/s/op_observer_sharding_impl.cpp b/src/mongo/db/s/op_observer_sharding_impl.cpp
index b55ef001bd0..319db36b61b 100644
--- a/src/mongo/db/s/op_observer_sharding_impl.cpp
+++ b/src/mongo/db/s/op_observer_sharding_impl.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/s/migration_chunk_cloner_source_legacy.h"
#include "mongo/db/s/migration_source_manager.h"
#include "mongo/db/s/resharding_util.h"
+#include "mongo/db/s/sharding_write_router.h"
#include "mongo/logv2/log.h"
namespace mongo {
@@ -121,12 +122,13 @@ void OpObserverShardingImpl::shardObserveInsertOp(OperationContext* opCtx,
const NamespaceString nss,
const BSONObj& insertedDoc,
const repl::OpTime& opTime,
- CollectionShardingState* css,
+ const ShardingWriteRouter& shardingWriteRouter,
const bool fromMigrate,
const bool inMultiDocumentTransaction) {
if (nss == NamespaceString::kSessionTransactionsTableNamespace || fromMigrate)
return;
+ auto css = shardingWriteRouter.getCollectionShardingState();
auto* const csr = CollectionShardingRuntime::get(css);
csr->checkShardVersionOrThrow(opCtx);
@@ -160,9 +162,10 @@ void OpObserverShardingImpl::shardObserveUpdateOp(OperationContext* opCtx,
boost::optional<BSONObj> preImageDoc,
const BSONObj& postImageDoc,
const repl::OpTime& opTime,
- CollectionShardingState* css,
+ const ShardingWriteRouter& shardingWriteRouter,
const repl::OpTime& prePostImageOpTime,
const bool inMultiDocumentTransaction) {
+ auto css = shardingWriteRouter.getCollectionShardingState();
auto* const csr = CollectionShardingRuntime::get(css);
csr->checkShardVersionOrThrow(opCtx);
@@ -195,9 +198,10 @@ void OpObserverShardingImpl::shardObserveDeleteOp(OperationContext* opCtx,
const NamespaceString nss,
const BSONObj& documentKey,
const repl::OpTime& opTime,
- CollectionShardingState* css,
+ const ShardingWriteRouter& shardingWriteRouter,
const repl::OpTime& preImageOpTime,
const bool inMultiDocumentTransaction) {
+ auto css = shardingWriteRouter.getCollectionShardingState();
auto* const csr = CollectionShardingRuntime::get(css);
csr->checkShardVersionOrThrow(opCtx);
@@ -237,13 +241,4 @@ void OpObserverShardingImpl::shardObserveTransactionPrepareOrUnpreparedCommit(
opCtx->getServiceContext(), stmts, prepareOrCommitOptime));
}
-void OpObserverShardingImpl::shardAnnotateOplogEntry(OperationContext* opCtx,
- const NamespaceString nss,
- const BSONObj& doc,
- repl::DurableReplOperation& op,
- CollectionShardingState* css,
- const ScopedCollectionDescription& collDesc) {
- op.setDestinedRecipient(getDestinedRecipient(opCtx, nss, doc, css, collDesc));
-}
-
} // namespace mongo
diff --git a/src/mongo/db/s/op_observer_sharding_impl.h b/src/mongo/db/s/op_observer_sharding_impl.h
index 0fafcf833ff..f9005497c57 100644
--- a/src/mongo/db/s/op_observer_sharding_impl.h
+++ b/src/mongo/db/s/op_observer_sharding_impl.h
@@ -33,6 +33,8 @@
namespace mongo {
+class ShardingWriteRouter;
+
class OpObserverShardingImpl : public OpObserverImpl {
public:
// True if the document being deleted belongs to a chunk which, while still in the shard,
@@ -50,7 +52,7 @@ protected:
NamespaceString nss,
const BSONObj& insertedDoc,
const repl::OpTime& opTime,
- CollectionShardingState* css,
+ const ShardingWriteRouter& shardingWriteRouter,
bool fromMigrate,
bool inMultiDocumentTransaction) override;
void shardObserveUpdateOp(OperationContext* opCtx,
@@ -58,27 +60,20 @@ protected:
boost::optional<BSONObj> preImageDoc,
const BSONObj& updatedDoc,
const repl::OpTime& opTime,
- CollectionShardingState* css,
+ const ShardingWriteRouter& shardingWriteRouter,
const repl::OpTime& prePostImageOpTime,
bool inMultiDocumentTransaction) override;
void shardObserveDeleteOp(OperationContext* opCtx,
NamespaceString nss,
const BSONObj& documentKey,
const repl::OpTime& opTime,
- CollectionShardingState* css,
+ const ShardingWriteRouter& shardingWriteRouter,
const repl::OpTime& preImageOpTime,
bool inMultiDocumentTransaction) override;
void shardObserveTransactionPrepareOrUnpreparedCommit(
OperationContext* opCtx,
const std::vector<repl::ReplOperation>& stmts,
const repl::OpTime& prepareOrCommitOptime) override;
-
- void shardAnnotateOplogEntry(OperationContext* opCtx,
- NamespaceString nss,
- const BSONObj& doc,
- repl::DurableReplOperation& op,
- CollectionShardingState* css,
- const ScopedCollectionDescription& collDesc) override;
};
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
index c437eb3ec05..c72b4a2122a 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
@@ -73,6 +73,10 @@ public:
ServiceContextMongoDTest::setUp();
auto serviceContext = getServiceContext();
+
+ // Initialize sharding components as a shard server.
+ serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+
{
auto opCtx = makeOperationContext();
auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(serviceContext);
diff --git a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
index 3a96cd1672e..02889248639 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
@@ -65,6 +65,10 @@ public:
ServiceContextMongoDTest::setUp();
auto serviceContext = getServiceContext();
+
+ // Initialize sharding components as a shard server.
+ serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+
{
auto opCtx = makeOperationContext();
auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(serviceContext);
diff --git a/src/mongo/db/s/resharding_destined_recipient_test.cpp b/src/mongo/db/s/resharding_destined_recipient_test.cpp
index 676154a6a44..a3ba879cac5 100644
--- a/src/mongo/db/s/resharding_destined_recipient_test.cpp
+++ b/src/mongo/db/s/resharding_destined_recipient_test.cpp
@@ -40,10 +40,10 @@
#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/operation_sharding_state.h"
-#include "mongo/db/s/resharding_util.h"
#include "mongo/db/s/shard_filtering_metadata_refresh.h"
#include "mongo/db/s/shard_server_test_fixture.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/s/sharding_write_router.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/transaction_participant.h"
#include "mongo/s/catalog/sharding_catalog_client_mock.h"
@@ -298,11 +298,10 @@ TEST_F(DestinedRecipientTest, TestGetDestinedRecipient) {
AutoGetCollection coll(opCtx, kNss, MODE_IX);
OperationShardingState::get(opCtx).initializeClientRoutingVersions(
kNss, env.version, env.dbVersion);
- auto* const css = CollectionShardingState::get(opCtx, kNss);
- auto collDesc = css->getCollectionDescription(opCtx);
+ ShardingWriteRouter shardingWriteRouter(opCtx, kNss, Grid::get(opCtx)->catalogCache());
auto destShardId =
- getDestinedRecipient(opCtx, kNss, BSON("x" << 2 << "y" << 10), css, collDesc);
+ shardingWriteRouter.getReshardingDestinedRecipient(BSON("x" << 2 << "y" << 10));
ASSERT(destShardId);
ASSERT_EQ(*destShardId, env.destShard);
}
@@ -315,18 +314,16 @@ TEST_F(DestinedRecipientTest, TestGetDestinedRecipientThrowsOnBlockedRefresh) {
AutoGetCollection coll(opCtx, kNss, MODE_IX);
OperationShardingState::get(opCtx).initializeClientRoutingVersions(
kNss, env.version, env.dbVersion);
- auto* const css = CollectionShardingState::get(opCtx, kNss);
- auto collDesc = css->getCollectionDescription(opCtx);
FailPointEnableBlock failPoint("blockCollectionCacheLookup");
- ASSERT_THROWS_WITH_CHECK(
- getDestinedRecipient(opCtx, kNss, BSON("x" << 2 << "y" << 10), css, collDesc),
- ShardCannotRefreshDueToLocksHeldException,
- [&](const ShardCannotRefreshDueToLocksHeldException& ex) {
- const auto refreshInfo = ex.extraInfo<ShardCannotRefreshDueToLocksHeldInfo>();
- ASSERT(refreshInfo);
- ASSERT_EQ(refreshInfo->getNss(), env.tempNss);
- });
+ ASSERT_THROWS_WITH_CHECK(ShardingWriteRouter(opCtx, kNss, Grid::get(opCtx)->catalogCache()),
+ ShardCannotRefreshDueToLocksHeldException,
+ [&](const ShardCannotRefreshDueToLocksHeldException& ex) {
+ const auto refreshInfo =
+ ex.extraInfo<ShardCannotRefreshDueToLocksHeldInfo>();
+ ASSERT(refreshInfo);
+ ASSERT_EQ(refreshInfo->getNss(), env.tempNss);
+ });
}
auto sw = catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, env.tempNss);
diff --git a/src/mongo/db/s/resharding_donor_write_router.cpp b/src/mongo/db/s/resharding_donor_write_router.cpp
deleted file mode 100644
index d5f4315928f..00000000000
--- a/src/mongo/db/s/resharding_donor_write_router.cpp
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/db/s/resharding_donor_write_router.h"
-
-namespace mongo {
-
-ReshardingDonorWriteRouter::ReshardingDonorWriteRouter(OperationContext* opCtx,
- const NamespaceString& sourceNss,
- CatalogCache* catalogCache)
- : _css(nullptr), _collDesc(nullptr) {}
-
-ReshardingDonorWriteRouter::ReshardingDonorWriteRouter(OperationContext* opCtx,
- const NamespaceString& sourceNss,
- CatalogCache* catalogCache,
- CollectionShardingState* css,
- const ScopedCollectionDescription* collDesc)
- : _css(css), _collDesc(collDesc) {}
-
-CollectionShardingState* ReshardingDonorWriteRouter::getCollectionShardingState() const {
- return nullptr;
-}
-
-boost::optional<ShardId> ReshardingDonorWriteRouter::getDestinedRecipient(
- const BSONObj& fullDocument) const {
- return boost::none;
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp
index baec1d6281d..247fd986682 100644
--- a/src/mongo/db/s/resharding_util.cpp
+++ b/src/mongo/db/s/resharding_util.cpp
@@ -60,31 +60,6 @@
namespace mongo {
using namespace fmt::literals;
-namespace {
-
-UUID getCollectionUuid(OperationContext* opCtx, const NamespaceString& nss) {
- dassert(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IS));
-
- auto uuid = CollectionCatalog::get(opCtx)->lookupUUIDByNSS(opCtx, nss);
- invariant(uuid);
-
- return *uuid;
-}
-
-// Ensure that this shard owns the document. This must be called after verifying that we
-// are in a resharding operation so that we are guaranteed that migrations are suspended.
-bool documentBelongsToMe(OperationContext* opCtx,
- CollectionShardingState* css,
- const ScopedCollectionDescription& collDesc,
- const BSONObj& doc) {
- auto currentKeyPattern = ShardKeyPattern(collDesc.getKeyPattern());
- auto ownershipFilter = css->getOwnershipFilter(
- opCtx, CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup);
-
- return ownershipFilter.keyBelongsToMe(currentKeyPattern.extractShardKeyFromDoc(doc));
-}
-} // namespace
-
BSONObj serializeAndTruncateReshardingErrorIfNeeded(Status originalError) {
BSONObjBuilder originalBob;
originalError.serializeErrorToBSON(&originalBob);
@@ -335,36 +310,6 @@ std::unique_ptr<Pipeline, PipelineDeleter> createOplogFetchingPipelineForReshard
return Pipeline::create(std::move(stages), expCtx);
}
-boost::optional<ShardId> getDestinedRecipient(OperationContext* opCtx,
- const NamespaceString& sourceNss,
- const BSONObj& fullDocument,
- CollectionShardingState* css,
- const ScopedCollectionDescription& collDesc) {
- if (!ShardingState::get(opCtx)->enabled()) {
- // Don't bother looking up the sharding state for the collection if the server isn't even
- // running with sharding enabled. We know there couldn't possibly be any resharding fields.
- return boost::none;
- }
-
- auto reshardingKeyPattern = collDesc.getReshardingKeyIfShouldForwardOps();
- if (!reshardingKeyPattern)
- return boost::none;
-
- if (!documentBelongsToMe(opCtx, css, collDesc, fullDocument))
- return boost::none;
-
- bool allowLocks = true;
- auto tempNssRoutingInfo =
- uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(
- opCtx,
- constructTemporaryReshardingNss(sourceNss.db(), getCollectionUuid(opCtx, sourceNss)),
- allowLocks));
-
- auto shardKey = reshardingKeyPattern->extractShardKeyFromDocThrows(fullDocument);
-
- return tempNssRoutingInfo.findIntersectingChunkWithSimpleCollation(shardKey).getShardId();
-}
-
bool isFinalOplog(const repl::OplogEntry& oplog) {
if (oplog.getOpType() != repl::OpTypeEnum::kNoop) {
return false;
diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h
index 0e8646a0f88..d38b2c8742c 100644
--- a/src/mongo/db/s/resharding_util.h
+++ b/src/mongo/db/s/resharding_util.h
@@ -281,16 +281,6 @@ std::unique_ptr<Pipeline, PipelineDeleter> createOplogFetchingPipelineForReshard
const ShardId& recipientShard);
/**
- * Returns the shard Id of the recipient shard that would own the document under the new shard
- * key pattern.
- */
-boost::optional<ShardId> getDestinedRecipient(OperationContext* opCtx,
- const NamespaceString& sourceNss,
- const BSONObj& fullDocument,
- CollectionShardingState* css,
- const ScopedCollectionDescription& collDesc);
-
-/**
* Sentinel oplog format:
* {
* op: "n",
diff --git a/src/mongo/db/s/sharding_initialization_op_observer_test.cpp b/src/mongo/db/s/sharding_initialization_op_observer_test.cpp
index d93a9d03212..f88fc027875 100644
--- a/src/mongo/db/s/sharding_initialization_op_observer_test.cpp
+++ b/src/mongo/db/s/sharding_initialization_op_observer_test.cpp
@@ -61,6 +61,8 @@ public:
void setUp() override {
ShardingMongodTestFixture::setUp();
+ serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+
// NOTE: this assumes that globalInit will always be called on the same thread as the main
// test thread
ShardingInitializationMongoD::get(operationContext())
diff --git a/src/mongo/db/s/sharding_mongod_test_fixture.cpp b/src/mongo/db/s/sharding_mongod_test_fixture.cpp
index f81cda6ba08..abf6df3ff4f 100644
--- a/src/mongo/db/s/sharding_mongod_test_fixture.cpp
+++ b/src/mongo/db/s/sharding_mongod_test_fixture.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/op_observer_impl.h"
#include "mongo/db/op_observer_registry.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/repl/drop_pending_collection_reaper.h"
@@ -84,58 +85,7 @@ using repl::ReplicationCoordinatorMock;
using repl::ReplSettings;
using unittest::assertGet;
-ShardingMongodTestFixture::ShardingMongodTestFixture() {
- const auto service = getServiceContext();
-
- // Set up this node as shard node, which is part of a replica set
-
- repl::ReplSettings replSettings;
- replSettings.setOplogSizeBytes(512'000);
- replSettings.setReplSetString(ConnectionString::forReplicaSet(_setName, _servers).toString());
- auto replCoordPtr = makeReplicationCoordinator(replSettings);
- _replCoord = replCoordPtr.get();
-
- BSONArrayBuilder serversBob;
- for (size_t i = 0; i < _servers.size(); ++i) {
- serversBob.append(BSON("host" << _servers[i].toString() << "_id" << static_cast<int>(i)));
- }
-
- auto replSetConfig =
- repl::ReplSetConfig::parse(BSON("_id" << _setName << "protocolVersion" << 1 << "version"
- << 3 << "members" << serversBob.arr()));
- replCoordPtr->setGetConfigReturnValue(replSetConfig);
-
- repl::ReplicationCoordinator::set(service, std::move(replCoordPtr));
-
- auto storagePtr = std::make_unique<repl::StorageInterfaceMock>();
-
- repl::DropPendingCollectionReaper::set(
- service, std::make_unique<repl::DropPendingCollectionReaper>(storagePtr.get()));
-
- repl::ReplicationProcess::set(service,
- std::make_unique<repl::ReplicationProcess>(
- storagePtr.get(),
- std::make_unique<repl::ReplicationConsistencyMarkersMock>(),
- std::make_unique<repl::ReplicationRecoveryMock>()));
-
- auto uniqueOpCtx = makeOperationContext();
- ASSERT_OK(
- repl::ReplicationProcess::get(uniqueOpCtx.get())->initializeRollbackID(uniqueOpCtx.get()));
-
- repl::StorageInterface::set(service, std::move(storagePtr));
-
- auto opObserver = checked_cast<OpObserverRegistry*>(service->getOpObserver());
- opObserver->addObserver(std::make_unique<OpObserverShardingImpl>());
- opObserver->addObserver(std::make_unique<ConfigServerOpObserver>());
- opObserver->addObserver(std::make_unique<ShardServerOpObserver>());
-
- repl::createOplog(uniqueOpCtx.get());
-
- // Set the highest FCV because otherwise it defaults to the lower FCV. This way we default to
- // testing this release's code, not backwards compatibility code.
- // (Generic FCV reference): This FCV reference should exist across LTS binary versions.
- serverGlobalParams.mutableFeatureCompatibility.setVersion(multiversion::GenericFCV::kLatest);
-}
+ShardingMongodTestFixture::ShardingMongodTestFixture() {}
ShardingMongodTestFixture::~ShardingMongodTestFixture() = default;
@@ -284,6 +234,53 @@ Status ShardingMongodTestFixture::initializeGlobalShardingStateForMongodForTest(
void ShardingMongodTestFixture::setUp() {
ServiceContextMongoDTest::setUp();
ShardingTestFixtureCommon::setUp();
+
+ const auto service = getServiceContext();
+
+ // Set up this node as shard node, which is part of a replica set
+
+ repl::ReplSettings replSettings;
+ replSettings.setOplogSizeBytes(512'000);
+ replSettings.setReplSetString(ConnectionString::forReplicaSet(_setName, _servers).toString());
+ auto replCoordPtr = makeReplicationCoordinator(replSettings);
+ _replCoord = replCoordPtr.get();
+
+ BSONArrayBuilder serversBob;
+ for (size_t i = 0; i < _servers.size(); ++i) {
+ serversBob.append(BSON("host" << _servers[i].toString() << "_id" << static_cast<int>(i)));
+ }
+
+ auto replSetConfig =
+ repl::ReplSetConfig::parse(BSON("_id" << _setName << "protocolVersion" << 1 << "version"
+ << 3 << "members" << serversBob.arr()));
+ replCoordPtr->setGetConfigReturnValue(replSetConfig);
+
+ repl::ReplicationCoordinator::set(service, std::move(replCoordPtr));
+
+ auto storagePtr = std::make_unique<repl::StorageInterfaceMock>();
+
+ repl::DropPendingCollectionReaper::set(
+ service, std::make_unique<repl::DropPendingCollectionReaper>(storagePtr.get()));
+
+ repl::ReplicationProcess::set(service,
+ std::make_unique<repl::ReplicationProcess>(
+ storagePtr.get(),
+ std::make_unique<repl::ReplicationConsistencyMarkersMock>(),
+ std::make_unique<repl::ReplicationRecoveryMock>()));
+
+ ASSERT_OK(repl::ReplicationProcess::get(operationContext())
+ ->initializeRollbackID(operationContext()));
+
+ repl::StorageInterface::set(service, std::move(storagePtr));
+
+ setupOpObservers();
+
+ repl::createOplog(operationContext());
+
+ // Set the highest FCV because otherwise it defaults to the lower FCV. This way we default to
+ // testing this release's code, not backwards compatibility code.
+ // (Generic FCV reference): This FCV reference should exist across LTS binary versions.
+ serverGlobalParams.mutableFeatureCompatibility.setVersion(multiversion::GenericFCV::kLatest);
}
void ShardingMongodTestFixture::tearDown() {
@@ -349,4 +346,11 @@ repl::ReplicationCoordinatorMock* ShardingMongodTestFixture::replicationCoordina
return _replCoord;
}
+void ShardingMongodTestFixture::setupOpObservers() {
+ auto opObserverRegistry =
+ checked_cast<OpObserverRegistry*>(getServiceContext()->getOpObserver());
+ opObserverRegistry->addObserver(std::make_unique<OpObserverShardingImpl>());
+ opObserverRegistry->addObserver(std::make_unique<ShardServerOpObserver>());
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_mongod_test_fixture.h b/src/mongo/db/s/sharding_mongod_test_fixture.h
index 7c6d955976e..41f4cd1b824 100644
--- a/src/mongo/db/s/sharding_mongod_test_fixture.h
+++ b/src/mongo/db/s/sharding_mongod_test_fixture.h
@@ -118,6 +118,11 @@ protected:
*/
virtual std::unique_ptr<BalancerConfiguration> makeBalancerConfiguration();
+ /**
+ * Setups the op observer listeners depending on cluster role.
+ */
+ virtual void setupOpObservers();
+
private:
/**
* Base class returns a TaskExecutorPool with a fixed TaskExecutor and a set of arbitrary
diff --git a/src/mongo/db/s/sharding_write_router.cpp b/src/mongo/db/s/sharding_write_router.cpp
new file mode 100644
index 00000000000..87542f9c126
--- /dev/null
+++ b/src/mongo/db/s/sharding_write_router.cpp
@@ -0,0 +1,85 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/s/sharding_write_router.h"
+
+#include "mongo/db/catalog/collection_catalog.h"
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/s/catalog_cache.h"
+#include "mongo/s/grid.h"
+
+namespace mongo {
+
+ShardingWriteRouter::ShardingWriteRouter(OperationContext* opCtx,
+ const NamespaceString& nss,
+ CatalogCache* catalogCache) {
+ if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
+ _css = CollectionShardingState::get(opCtx, nss);
+ auto collDesc = _css->getCollectionDescription(opCtx);
+
+ _reshardKeyPattern = collDesc.getReshardingKeyIfShouldForwardOps();
+ if (_reshardKeyPattern) {
+ _ownershipFilter = _css->getOwnershipFilter(
+ opCtx, CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup);
+ _shardKeyPattern = ShardKeyPattern(collDesc.getKeyPattern());
+
+ const auto& reshardingFields = collDesc.getReshardingFields();
+ invariant(reshardingFields);
+ const auto& donorFields = reshardingFields->getDonorFields();
+ invariant(donorFields);
+
+ _reshardingChunkMgr = uassertStatusOK(catalogCache->getCollectionRoutingInfo(
+ opCtx, donorFields->getTempReshardingNss(), true /* allowLocks */));
+ }
+ }
+}
+
+CollectionShardingState* ShardingWriteRouter::getCollectionShardingState() const {
+ return _css;
+}
+
+boost::optional<ShardId> ShardingWriteRouter::getReshardingDestinedRecipient(
+ const BSONObj& fullDocument) const {
+ if (!_reshardKeyPattern) {
+ return boost::none;
+ }
+
+ invariant(_ownershipFilter);
+ invariant(_shardKeyPattern);
+ invariant(_reshardingChunkMgr);
+
+ if (!_ownershipFilter->keyBelongsToMe(_shardKeyPattern->extractShardKeyFromDoc(fullDocument))) {
+ return boost::none;
+ }
+
+ auto shardKey = _reshardKeyPattern->extractShardKeyFromDocThrows(fullDocument);
+ return _reshardingChunkMgr->findIntersectingChunkWithSimpleCollation(shardKey).getShardId();
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding_donor_write_router.h b/src/mongo/db/s/sharding_write_router.h
index 13b76115e1c..219e31859b0 100644
--- a/src/mongo/db/s/resharding_donor_write_router.h
+++ b/src/mongo/db/s/sharding_write_router.h
@@ -37,29 +37,23 @@ class CatalogCache;
class ChunkManager;
class OperationContext;
class ShardId;
-class ReshardingDonorWriteRouter {
+class ShardingWriteRouter {
public:
- ReshardingDonorWriteRouter(OperationContext* opCtx,
- const NamespaceString& sourceNss,
- CatalogCache* catalogCache);
+ ShardingWriteRouter(OperationContext* opCtx,
+ const NamespaceString& nss,
+ CatalogCache* catalogCache);
- ReshardingDonorWriteRouter(OperationContext* opCtx,
- const NamespaceString& sourceNss,
- CatalogCache* catalogCache,
- CollectionShardingState* css,
- const ScopedCollectionDescription* collDesc);
+ boost::optional<ShardId> getReshardingDestinedRecipient(const BSONObj& fullDocument) const;
CollectionShardingState* getCollectionShardingState() const;
- boost::optional<ShardId> getDestinedRecipient(const BSONObj& fullDocument) const;
-
private:
- CollectionShardingState* const _css;
- const ScopedCollectionDescription* const _collDesc;
+ CollectionShardingState* _css{nullptr};
boost::optional<ScopedCollectionFilter> _ownershipFilter;
- boost::optional<ShardKeyPattern> _reshardingKeyPattern;
- boost::optional<ChunkManager> _tempReshardingChunkMgr;
+ boost::optional<ShardKeyPattern> _shardKeyPattern;
+ boost::optional<ShardKeyPattern> _reshardKeyPattern;
+ boost::optional<ChunkManager> _reshardingChunkMgr;
};
} // namespace mongo