summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2021-09-21 16:19:56 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-09-21 17:03:47 +0000
commit800c6d40912751e272853e383f4c4bf1f00e5c88 (patch)
treeb65e26e73d5a26fdca4ec297fc0c283cb90f29b4
parent536cd38e7ab615181f5586519f60728c52e47108 (diff)
downloadmongo-800c6d40912751e272853e383f4c4bf1f00e5c88.tar.gz
SERVER-58915 Implement ReshardingDonorWriteRouter functionality along…
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/exec/update_stage.cpp30
-rw-r--r--src/mongo/db/exec/update_stage.h5
-rw-r--r--src/mongo/db/op_observer_impl.cpp48
-rw-r--r--src/mongo/db/op_observer_impl.h15
-rw-r--r--src/mongo/db/s/SConscript40
-rw-r--r--src/mongo/db/s/config/config_server_test_fixture.cpp10
-rw-r--r--src/mongo/db/s/config/config_server_test_fixture.h3
-rw-r--r--src/mongo/db/s/op_observer_sharding_impl.cpp19
-rw-r--r--src/mongo/db/s/op_observer_sharding_impl.h15
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp4
-rw-r--r--src/mongo/db/s/resharding_destined_recipient_test.cpp25
-rw-r--r--src/mongo/db/s/resharding_donor_write_router.cpp55
-rw-r--r--src/mongo/db/s/resharding_util.cpp55
-rw-r--r--src/mongo/db/s/resharding_util.h10
-rw-r--r--src/mongo/db/s/sharding_initialization_op_observer_test.cpp2
-rw-r--r--src/mongo/db/s/sharding_mongod_test_fixture.cpp108
-rw-r--r--src/mongo/db/s/sharding_mongod_test_fixture.h5
-rw-r--r--src/mongo/db/s/sharding_write_router.cpp85
-rw-r--r--src/mongo/db/s/sharding_write_router.h (renamed from src/mongo/db/s/resharding_donor_write_router.h)24
-rw-r--r--src/mongo/db/vector_clock_mongod_test.cpp8
-rw-r--r--src/mongo/db/vector_clock_test_fixture.cpp8
-rw-r--r--src/mongo/db/vector_clock_test_fixture.h3
-rw-r--r--src/mongo/dbtests/SConscript1
-rw-r--r--src/mongo/s/query/SConscript3
26 files changed, 284 insertions, 302 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 74e28f13ddf..0471048fd3b 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1290,7 +1290,6 @@ env.Library(
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/auth/auth_checks',
'$BUILD_DIR/mongo/db/index/index_access_methods',
- '$BUILD_DIR/mongo/db/s/resharding_util',
'$BUILD_DIR/mongo/scripting/scripting',
'$BUILD_DIR/mongo/util/background_job',
'$BUILD_DIR/mongo/util/elapsed_tracker',
diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp
index 5859e85e72f..2b6e3afddb9 100644
--- a/src/mongo/db/exec/update_stage.cpp
+++ b/src/mongo/db/exec/update_stage.cpp
@@ -48,13 +48,14 @@
#include "mongo/db/query/explain.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/operation_sharding_state.h"
-#include "mongo/db/s/resharding_util.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/s/sharding_write_router.h"
#include "mongo/db/service_context.h"
#include "mongo/db/storage/duplicate_key_error_info.h"
#include "mongo/db/update/path_support.h"
#include "mongo/db/update/storage_validation.h"
#include "mongo/logv2/log.h"
+#include "mongo/s/grid.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/s/would_change_owning_shard_exception.h"
#include "mongo/util/assert_util.h"
@@ -608,7 +609,7 @@ void UpdateStage::_checkRestrictionsOnUpdatingShardKeyAreNotViolated(
}
-bool UpdateStage::wasReshardingKeyUpdated(CollectionShardingState* css,
+bool UpdateStage::wasReshardingKeyUpdated(const ShardingWriteRouter& shardingWriteRouter,
const ScopedCollectionDescription& collDesc,
const BSONObj& newObj,
const Snapshotted<BSONObj>& oldObj) {
@@ -625,9 +626,8 @@ bool UpdateStage::wasReshardingKeyUpdated(CollectionShardingState* css,
FieldRefSet shardKeyPaths(collDesc.getKeyPatternFields());
_checkRestrictionsOnUpdatingShardKeyAreNotViolated(collDesc, shardKeyPaths);
- auto oldRecipShard =
- getDestinedRecipient(opCtx(), collection()->ns(), oldObj.value(), css, collDesc);
- auto newRecipShard = getDestinedRecipient(opCtx(), collection()->ns(), newObj, css, collDesc);
+ auto oldRecipShard = *shardingWriteRouter.getReshardingDestinedRecipient(oldObj.value());
+ auto newRecipShard = *shardingWriteRouter.getReshardingDestinedRecipient(newObj);
uassert(WouldChangeOwningShardInfo(oldObj.value(), newObj, false /* upsert */),
"This update would cause the doc to change owning shards under the new shard key",
@@ -638,7 +638,15 @@ bool UpdateStage::wasReshardingKeyUpdated(CollectionShardingState* css,
bool UpdateStage::checkUpdateChangesShardKeyFields(const boost::optional<BSONObj>& newObjCopy,
const Snapshotted<BSONObj>& oldObj) {
- auto* const css = CollectionShardingState::get(opCtx(), collection()->ns());
+ ShardingWriteRouter shardingWriteRouter(
+ opCtx(), collection()->ns(), Grid::get(opCtx())->catalogCache());
+ auto css = shardingWriteRouter.getCollectionShardingState();
+
+ // css can be null when this is a config server.
+ if (css == nullptr) {
+ return false;
+ }
+
const auto collDesc = css->getCollectionDescription(opCtx());
// Calling mutablebson::Document::getObject() renders a full copy of the updated document. This
@@ -652,16 +660,20 @@ bool UpdateStage::checkUpdateChangesShardKeyFields(const boost::optional<BSONObj
// It is possible that both the existing and new shard keys are being updated, so we do not want
// to short-circuit checking whether either is being modified.
- const auto existingShardKeyUpdated = wasExistingShardKeyUpdated(css, collDesc, newObj, oldObj);
- const auto reshardingKeyUpdated = wasReshardingKeyUpdated(css, collDesc, newObj, oldObj);
+ const auto existingShardKeyUpdated =
+ wasExistingShardKeyUpdated(shardingWriteRouter, collDesc, newObj, oldObj);
+ const auto reshardingKeyUpdated =
+ wasReshardingKeyUpdated(shardingWriteRouter, collDesc, newObj, oldObj);
return existingShardKeyUpdated || reshardingKeyUpdated;
}
-bool UpdateStage::wasExistingShardKeyUpdated(CollectionShardingState* css,
+bool UpdateStage::wasExistingShardKeyUpdated(const ShardingWriteRouter& shardingWriteRouter,
const ScopedCollectionDescription& collDesc,
const BSONObj& newObj,
const Snapshotted<BSONObj>& oldObj) {
+ const auto css = shardingWriteRouter.getCollectionShardingState();
+
const ShardKeyPattern shardKeyPattern(collDesc.getKeyPattern());
auto oldShardKey = shardKeyPattern.extractShardKeyFromDoc(oldObj.value());
auto newShardKey = shardKeyPattern.extractShardKeyFromDoc(newObj);
diff --git a/src/mongo/db/exec/update_stage.h b/src/mongo/db/exec/update_stage.h
index 3080f186e36..5bb55272e9e 100644
--- a/src/mongo/db/exec/update_stage.h
+++ b/src/mongo/db/exec/update_stage.h
@@ -44,6 +44,7 @@ namespace mongo {
class OperationContext;
class OpDebug;
struct PlanSummaryStats;
+class ShardingWriteRouter;
struct UpdateStageParams {
using DocumentCounter = std::function<size_t(const BSONObj&)>;
@@ -186,12 +187,12 @@ private:
* If the update changes shard key fields but the new shard key remains on the same node,
* returns true. If the update does not change shard key fields, returns false.
*/
- bool wasExistingShardKeyUpdated(CollectionShardingState* css,
+ bool wasExistingShardKeyUpdated(const ShardingWriteRouter& shardingWriteRouter,
const ScopedCollectionDescription& collDesc,
const BSONObj& newObj,
const Snapshotted<BSONObj>& oldObj);
- bool wasReshardingKeyUpdated(CollectionShardingState* css,
+ bool wasReshardingKeyUpdated(const ShardingWriteRouter& shardingWriteRouter,
const ScopedCollectionDescription& collDesc,
const BSONObj& newObj,
const Snapshotted<BSONObj>& oldObj);
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index cf107fab9d3..b4b93f6baef 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/import_collection_oplog_entry_gen.h"
+#include "mongo/db/catalog_raii.h"
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
@@ -61,7 +62,7 @@
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
#include "mongo/db/repl/tenant_migration_decoration.h"
#include "mongo/db/s/collection_sharding_state.h"
-#include "mongo/db/s/resharding_util.h"
+#include "mongo/db/s/sharding_write_router.h"
#include "mongo/db/server_options.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/timeseries/bucket_catalog.h"
@@ -486,8 +487,8 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
std::vector<repl::OpTime> opTimeList;
repl::OpTime lastOpTime;
- auto* const css = CollectionShardingState::get(opCtx, nss);
- auto collDesc = css->getCollectionDescription(opCtx);
+ ShardingWriteRouter shardingWriteRouter(opCtx, nss, Grid::get(opCtx)->catalogCache());
+
if (inMultiDocumentTransaction) {
// Do not add writes to the profile collection to the list of transaction operations, since
// these are done outside the transaction. There is no top-level WriteUnitOfWork when we are
@@ -499,14 +500,16 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
for (auto iter = first; iter != last; iter++) {
auto operation = MutableOplogEntry::makeInsertOperation(nss, uuid.get(), iter->doc);
- shardAnnotateOplogEntry(opCtx, nss, iter->doc, operation, css, collDesc);
+ operation.setDestinedRecipient(
+ shardingWriteRouter.getReshardingDestinedRecipient(iter->doc));
txnParticipant.addTransactionOperation(opCtx, operation);
}
} else {
std::function<boost::optional<ShardId>(const BSONObj& doc)> getDestinedRecipientFn =
- [&](const BSONObj& doc) {
- return getDestinedRecipient(opCtx, nss, doc, css, collDesc);
+ [&shardingWriteRouter](const BSONObj& doc) {
+ return shardingWriteRouter.getReshardingDestinedRecipient(doc);
};
+
MutableOplogEntry oplogEntryTemplate;
oplogEntryTemplate.setNss(nss);
oplogEntryTemplate.setUuid(uuid);
@@ -538,8 +541,13 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
size_t index = 0;
for (auto it = first; it != last; it++, index++) {
auto opTime = opTimeList.empty() ? repl::OpTime() : opTimeList[index];
- shardObserveInsertOp(
- opCtx, nss, it->doc, opTime, css, fromMigrate, inMultiDocumentTransaction);
+ shardObserveInsertOp(opCtx,
+ nss,
+ it->doc,
+ opTime,
+ shardingWriteRouter,
+ fromMigrate,
+ inMultiDocumentTransaction);
}
if (nss.coll() == "system.js") {
@@ -594,16 +602,15 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
const bool inMultiDocumentTransaction =
txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen();
- auto* const css = CollectionShardingState::get(opCtx, args.nss);
- auto collDesc = css->getCollectionDescription(opCtx);
+ ShardingWriteRouter shardingWriteRouter(opCtx, args.nss, Grid::get(opCtx)->catalogCache());
OpTimeBundle opTime;
if (inMultiDocumentTransaction) {
auto operation = MutableOplogEntry::makeUpdateOperation(
args.nss, args.uuid, args.updateArgs.update, args.updateArgs.criteria);
- shardAnnotateOplogEntry(
- opCtx, args.nss, args.updateArgs.updatedDoc, operation, css, collDesc);
+ operation.setDestinedRecipient(
+ shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs.updatedDoc));
if (args.updateArgs.preImageRecordingEnabledForCollection) {
invariant(args.updateArgs.preImageDoc);
@@ -613,12 +620,8 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
txnParticipant.addTransactionOperation(opCtx, operation);
} else {
MutableOplogEntry oplogEntry;
- shardAnnotateOplogEntry(opCtx,
- args.nss,
- args.updateArgs.updatedDoc,
- oplogEntry.getDurableReplOperation(),
- css,
- collDesc);
+ oplogEntry.getDurableReplOperation().setDestinedRecipient(
+ shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs.updatedDoc));
if (opCtx->getTxnNumber() && args.updateArgs.storeImageInSideCollection) {
// If we've stored a preImage:
@@ -665,7 +668,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
args.updateArgs.preImageDoc,
args.updateArgs.updatedDoc,
opTime.writeOpTime,
- css,
+ shardingWriteRouter,
opTime.prePostImageOpTime,
inMultiDocumentTransaction);
}
@@ -697,9 +700,10 @@ void OpObserverImpl::aboutToDelete(OperationContext* opCtx,
auto* const css = CollectionShardingState::get(opCtx, nss);
auto collDesc = css->getCollectionDescription(opCtx);
+ ShardingWriteRouter shardingWriteRouter(opCtx, nss, Grid::get(opCtx)->catalogCache());
repl::DurableReplOperation op;
- shardAnnotateOplogEntry(opCtx, nss, doc, op, css, collDesc);
+ op.setDestinedRecipient(shardingWriteRouter.getReshardingDestinedRecipient(doc));
destinedRecipientDecoration(opCtx) = op.getDestinedRecipient();
shardObserveAboutToDelete(opCtx, nss, doc);
@@ -772,12 +776,12 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
if (nss != NamespaceString::kSessionTransactionsTableNamespace) {
if (!args.fromMigrate) {
- auto* const css = CollectionShardingState::get(opCtx, nss);
+ ShardingWriteRouter shardingWriteRouter(opCtx, nss, Grid::get(opCtx)->catalogCache());
shardObserveDeleteOp(opCtx,
nss,
documentKey.getShardKeyAndId(),
opTime.writeOpTime,
- css,
+ shardingWriteRouter,
opTime.prePostImageOpTime,
inMultiDocumentTransaction);
}
diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h
index 971c7891e8b..2fe3637eeb5 100644
--- a/src/mongo/db/op_observer_impl.h
+++ b/src/mongo/db/op_observer_impl.h
@@ -39,6 +39,8 @@ class ReplOperation;
} // namespace repl
+class ShardingWriteRouter;
+
class OpObserverImpl : public OpObserver {
OpObserverImpl(const OpObserverImpl&) = delete;
OpObserverImpl& operator=(const OpObserverImpl&) = delete;
@@ -229,7 +231,7 @@ private:
const NamespaceString nss,
const BSONObj& insertedDoc,
const repl::OpTime& opTime,
- CollectionShardingState* css,
+ const ShardingWriteRouter& shardingWriteRouter,
const bool fromMigrate,
const bool inMultiDocumentTransaction) {}
virtual void shardObserveUpdateOp(OperationContext* opCtx,
@@ -237,27 +239,20 @@ private:
boost::optional<BSONObj> preImageDoc,
const BSONObj& postImageDoc,
const repl::OpTime& opTime,
- CollectionShardingState* css,
+ const ShardingWriteRouter& shardingWriteRouter,
const repl::OpTime& prePostImageOpTime,
const bool inMultiDocumentTransaction) {}
virtual void shardObserveDeleteOp(OperationContext* opCtx,
const NamespaceString nss,
const BSONObj& documentKey,
const repl::OpTime& opTime,
- CollectionShardingState* css,
+ const ShardingWriteRouter& shardingWriteRouter,
const repl::OpTime& preImageOpTime,
const bool inMultiDocumentTransaction) {}
virtual void shardObserveTransactionPrepareOrUnpreparedCommit(
OperationContext* opCtx,
const std::vector<repl::ReplOperation>& stmts,
const repl::OpTime& prepareOrCommitOptime) {}
-
- virtual void shardAnnotateOplogEntry(OperationContext* opCtx,
- const NamespaceString nss,
- const BSONObj& doc,
- repl::DurableReplOperation& op,
- CollectionShardingState* css,
- const ScopedCollectionDescription& collDesc) {}
};
extern const OperationContext::Decoration<boost::optional<OpObserverImpl::DocumentKey>>
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index da18f30056d..e11ff37a646 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -16,9 +16,9 @@ env.Library(
'collection_sharding_state.cpp',
'database_sharding_state.cpp',
'operation_sharding_state.cpp',
- 'resharding_donor_write_router.cpp',
'sharding_migration_critical_section.cpp',
'sharding_state.cpp',
+ 'sharding_write_router.cpp',
'transaction_coordinator_curop.cpp',
'transaction_coordinator_factory.cpp',
'transaction_coordinator_worker_curop_repository.cpp',
@@ -27,6 +27,7 @@ env.Library(
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/concurrency/lock_manager',
'$BUILD_DIR/mongo/db/range_arithmetic',
+ '$BUILD_DIR/mongo/s/grid',
'$BUILD_DIR/mongo/s/sharding_routing_table',
],
)
@@ -61,7 +62,12 @@ env.Library(
'range_deletion_util.cpp',
'read_only_catalog_cache_loader.cpp',
'recoverable_critical_section_service.cpp',
+ 'resharding_util.cpp',
+ 'resharding/coordinator_document.idl','resharding/document_source_resharding_iterate_transaction.cpp',
'resharding/document_source_resharding_ownership_match.cpp',
+ 'resharding/donor_document.idl',
+ 'resharding/donor_oplog_id.idl',
+ 'resharding/recipient_document.idl',
'resharding/resharding_change_event_o2_field.idl',
'resharding/resharding_collection_cloner.cpp',
'resharding/resharding_coordinator_commit_monitor.cpp',
@@ -145,35 +151,6 @@ env.Library(
'$BUILD_DIR/mongo/db/session_catalog',
'$BUILD_DIR/mongo/idl/server_parameter',
'$BUILD_DIR/mongo/util/future_util',
- 'resharding_util',
- ],
-)
-
-# Be careful about adding dependencies to this library, as any dependencies will currently be
-# included in mongo_embedded.
-env.Library(
- target='resharding_util',
- source=[
- 'resharding_util.cpp',
- 'resharding/coordinator_document.idl',
- 'resharding/document_source_resharding_iterate_transaction.cpp',
- 'resharding/donor_document.idl',
- 'resharding/donor_oplog_id.idl',
- 'resharding/recipient_document.idl',
- ],
- LIBDEPS_PRIVATE=[
- '$BUILD_DIR/mongo/base',
- '$BUILD_DIR/mongo/db/catalog/collection_options',
- '$BUILD_DIR/mongo/db/common',
- '$BUILD_DIR/mongo/db/concurrency/write_conflict_exception',
- '$BUILD_DIR/mongo/db/curop',
- '$BUILD_DIR/mongo/db/namespace_string',
- '$BUILD_DIR/mongo/db/pipeline/expression_context',
- '$BUILD_DIR/mongo/db/pipeline/pipeline',
- '$BUILD_DIR/mongo/db/storage/write_unit_of_work',
- '$BUILD_DIR/mongo/s/async_requests_sender',
- '$BUILD_DIR/mongo/s/grid',
- 'sharding_api_d',
],
)
@@ -392,7 +369,6 @@ env.Library(
'$BUILD_DIR/mongo/s/commands/shared_cluster_commands',
'$BUILD_DIR/mongo/s/sharding_initialization',
'$BUILD_DIR/mongo/s/sharding_router_api',
- 'resharding_util',
'sharding_runtime_d',
],
)
@@ -543,7 +519,6 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock',
'$BUILD_DIR/mongo/s/sharding_router_test_fixture',
- 'resharding_util',
'shard_server_test_fixture',
'sharding_commands_d',
'sharding_logging',
@@ -597,7 +572,6 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/timeseries/timeseries_options',
'$BUILD_DIR/mongo/util/version_impl',
'config_server_test_fixture',
- 'resharding_util',
],
)
diff --git a/src/mongo/db/s/config/config_server_test_fixture.cpp b/src/mongo/db/s/config/config_server_test_fixture.cpp
index 3144add6236..aa573a94deb 100644
--- a/src/mongo/db/s/config/config_server_test_fixture.cpp
+++ b/src/mongo/db/s/config/config_server_test_fixture.cpp
@@ -43,6 +43,8 @@
#include "mongo/db/commands.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer.h"
+#include "mongo/db/op_observer_impl.h"
+#include "mongo/db/op_observer_registry.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/query_request_helper.h"
@@ -51,6 +53,7 @@
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/s/config/sharding_catalog_manager.h"
+#include "mongo/db/s/config_server_op_observer.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
@@ -473,4 +476,11 @@ void ConfigServerTestFixture::expectSetShardVersion(
});
}
+void ConfigServerTestFixture::setupOpObservers() {
+ auto opObserverRegistry =
+ checked_cast<OpObserverRegistry*>(getServiceContext()->getOpObserver());
+ opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>());
+ opObserverRegistry->addObserver(std::make_unique<ConfigServerOpObserver>());
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/config/config_server_test_fixture.h b/src/mongo/db/s/config/config_server_test_fixture.h
index 5e00755a652..821b8987406 100644
--- a/src/mongo/db/s/config/config_server_test_fixture.h
+++ b/src/mongo/db/s/config/config_server_test_fixture.h
@@ -194,6 +194,9 @@ protected:
std::unique_ptr<BalancerConfiguration> makeBalancerConfiguration() override;
+protected:
+ void setupOpObservers() override;
+
private:
/**
* 'onPreInitGlobalStateFn' is invoked near the end of _setUp() before calling
diff --git a/src/mongo/db/s/op_observer_sharding_impl.cpp b/src/mongo/db/s/op_observer_sharding_impl.cpp
index b55ef001bd0..319db36b61b 100644
--- a/src/mongo/db/s/op_observer_sharding_impl.cpp
+++ b/src/mongo/db/s/op_observer_sharding_impl.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/s/migration_chunk_cloner_source_legacy.h"
#include "mongo/db/s/migration_source_manager.h"
#include "mongo/db/s/resharding_util.h"
+#include "mongo/db/s/sharding_write_router.h"
#include "mongo/logv2/log.h"
namespace mongo {
@@ -121,12 +122,13 @@ void OpObserverShardingImpl::shardObserveInsertOp(OperationContext* opCtx,
const NamespaceString nss,
const BSONObj& insertedDoc,
const repl::OpTime& opTime,
- CollectionShardingState* css,
+ const ShardingWriteRouter& shardingWriteRouter,
const bool fromMigrate,
const bool inMultiDocumentTransaction) {
if (nss == NamespaceString::kSessionTransactionsTableNamespace || fromMigrate)
return;
+ auto css = shardingWriteRouter.getCollectionShardingState();
auto* const csr = CollectionShardingRuntime::get(css);
csr->checkShardVersionOrThrow(opCtx);
@@ -160,9 +162,10 @@ void OpObserverShardingImpl::shardObserveUpdateOp(OperationContext* opCtx,
boost::optional<BSONObj> preImageDoc,
const BSONObj& postImageDoc,
const repl::OpTime& opTime,
- CollectionShardingState* css,
+ const ShardingWriteRouter& shardingWriteRouter,
const repl::OpTime& prePostImageOpTime,
const bool inMultiDocumentTransaction) {
+ auto css = shardingWriteRouter.getCollectionShardingState();
auto* const csr = CollectionShardingRuntime::get(css);
csr->checkShardVersionOrThrow(opCtx);
@@ -195,9 +198,10 @@ void OpObserverShardingImpl::shardObserveDeleteOp(OperationContext* opCtx,
const NamespaceString nss,
const BSONObj& documentKey,
const repl::OpTime& opTime,
- CollectionShardingState* css,
+ const ShardingWriteRouter& shardingWriteRouter,
const repl::OpTime& preImageOpTime,
const bool inMultiDocumentTransaction) {
+ auto css = shardingWriteRouter.getCollectionShardingState();
auto* const csr = CollectionShardingRuntime::get(css);
csr->checkShardVersionOrThrow(opCtx);
@@ -237,13 +241,4 @@ void OpObserverShardingImpl::shardObserveTransactionPrepareOrUnpreparedCommit(
opCtx->getServiceContext(), stmts, prepareOrCommitOptime));
}
-void OpObserverShardingImpl::shardAnnotateOplogEntry(OperationContext* opCtx,
- const NamespaceString nss,
- const BSONObj& doc,
- repl::DurableReplOperation& op,
- CollectionShardingState* css,
- const ScopedCollectionDescription& collDesc) {
- op.setDestinedRecipient(getDestinedRecipient(opCtx, nss, doc, css, collDesc));
-}
-
} // namespace mongo
diff --git a/src/mongo/db/s/op_observer_sharding_impl.h b/src/mongo/db/s/op_observer_sharding_impl.h
index 0fafcf833ff..f9005497c57 100644
--- a/src/mongo/db/s/op_observer_sharding_impl.h
+++ b/src/mongo/db/s/op_observer_sharding_impl.h
@@ -33,6 +33,8 @@
namespace mongo {
+class ShardingWriteRouter;
+
class OpObserverShardingImpl : public OpObserverImpl {
public:
// True if the document being deleted belongs to a chunk which, while still in the shard,
@@ -50,7 +52,7 @@ protected:
NamespaceString nss,
const BSONObj& insertedDoc,
const repl::OpTime& opTime,
- CollectionShardingState* css,
+ const ShardingWriteRouter& shardingWriteRouter,
bool fromMigrate,
bool inMultiDocumentTransaction) override;
void shardObserveUpdateOp(OperationContext* opCtx,
@@ -58,27 +60,20 @@ protected:
boost::optional<BSONObj> preImageDoc,
const BSONObj& updatedDoc,
const repl::OpTime& opTime,
- CollectionShardingState* css,
+ const ShardingWriteRouter& shardingWriteRouter,
const repl::OpTime& prePostImageOpTime,
bool inMultiDocumentTransaction) override;
void shardObserveDeleteOp(OperationContext* opCtx,
NamespaceString nss,
const BSONObj& documentKey,
const repl::OpTime& opTime,
- CollectionShardingState* css,
+ const ShardingWriteRouter& shardingWriteRouter,
const repl::OpTime& preImageOpTime,
bool inMultiDocumentTransaction) override;
void shardObserveTransactionPrepareOrUnpreparedCommit(
OperationContext* opCtx,
const std::vector<repl::ReplOperation>& stmts,
const repl::OpTime& prepareOrCommitOptime) override;
-
- void shardAnnotateOplogEntry(OperationContext* opCtx,
- NamespaceString nss,
- const BSONObj& doc,
- repl::DurableReplOperation& op,
- CollectionShardingState* css,
- const ScopedCollectionDescription& collDesc) override;
};
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
index c437eb3ec05..c72b4a2122a 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
@@ -73,6 +73,10 @@ public:
ServiceContextMongoDTest::setUp();
auto serviceContext = getServiceContext();
+
+ // Initialize sharding components as a shard server.
+ serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+
{
auto opCtx = makeOperationContext();
auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(serviceContext);
diff --git a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
index 3a96cd1672e..02889248639 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
@@ -65,6 +65,10 @@ public:
ServiceContextMongoDTest::setUp();
auto serviceContext = getServiceContext();
+
+ // Initialize sharding components as a shard server.
+ serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+
{
auto opCtx = makeOperationContext();
auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(serviceContext);
diff --git a/src/mongo/db/s/resharding_destined_recipient_test.cpp b/src/mongo/db/s/resharding_destined_recipient_test.cpp
index 676154a6a44..a3ba879cac5 100644
--- a/src/mongo/db/s/resharding_destined_recipient_test.cpp
+++ b/src/mongo/db/s/resharding_destined_recipient_test.cpp
@@ -40,10 +40,10 @@
#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/operation_sharding_state.h"
-#include "mongo/db/s/resharding_util.h"
#include "mongo/db/s/shard_filtering_metadata_refresh.h"
#include "mongo/db/s/shard_server_test_fixture.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/s/sharding_write_router.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/transaction_participant.h"
#include "mongo/s/catalog/sharding_catalog_client_mock.h"
@@ -298,11 +298,10 @@ TEST_F(DestinedRecipientTest, TestGetDestinedRecipient) {
AutoGetCollection coll(opCtx, kNss, MODE_IX);
OperationShardingState::get(opCtx).initializeClientRoutingVersions(
kNss, env.version, env.dbVersion);
- auto* const css = CollectionShardingState::get(opCtx, kNss);
- auto collDesc = css->getCollectionDescription(opCtx);
+ ShardingWriteRouter shardingWriteRouter(opCtx, kNss, Grid::get(opCtx)->catalogCache());
auto destShardId =
- getDestinedRecipient(opCtx, kNss, BSON("x" << 2 << "y" << 10), css, collDesc);
+ shardingWriteRouter.getReshardingDestinedRecipient(BSON("x" << 2 << "y" << 10));
ASSERT(destShardId);
ASSERT_EQ(*destShardId, env.destShard);
}
@@ -315,18 +314,16 @@ TEST_F(DestinedRecipientTest, TestGetDestinedRecipientThrowsOnBlockedRefresh) {
AutoGetCollection coll(opCtx, kNss, MODE_IX);
OperationShardingState::get(opCtx).initializeClientRoutingVersions(
kNss, env.version, env.dbVersion);
- auto* const css = CollectionShardingState::get(opCtx, kNss);
- auto collDesc = css->getCollectionDescription(opCtx);
FailPointEnableBlock failPoint("blockCollectionCacheLookup");
- ASSERT_THROWS_WITH_CHECK(
- getDestinedRecipient(opCtx, kNss, BSON("x" << 2 << "y" << 10), css, collDesc),
- ShardCannotRefreshDueToLocksHeldException,
- [&](const ShardCannotRefreshDueToLocksHeldException& ex) {
- const auto refreshInfo = ex.extraInfo<ShardCannotRefreshDueToLocksHeldInfo>();
- ASSERT(refreshInfo);
- ASSERT_EQ(refreshInfo->getNss(), env.tempNss);
- });
+ ASSERT_THROWS_WITH_CHECK(ShardingWriteRouter(opCtx, kNss, Grid::get(opCtx)->catalogCache()),
+ ShardCannotRefreshDueToLocksHeldException,
+ [&](const ShardCannotRefreshDueToLocksHeldException& ex) {
+ const auto refreshInfo =
+ ex.extraInfo<ShardCannotRefreshDueToLocksHeldInfo>();
+ ASSERT(refreshInfo);
+ ASSERT_EQ(refreshInfo->getNss(), env.tempNss);
+ });
}
auto sw = catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, env.tempNss);
diff --git a/src/mongo/db/s/resharding_donor_write_router.cpp b/src/mongo/db/s/resharding_donor_write_router.cpp
deleted file mode 100644
index d5f4315928f..00000000000
--- a/src/mongo/db/s/resharding_donor_write_router.cpp
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/db/s/resharding_donor_write_router.h"
-
-namespace mongo {
-
-ReshardingDonorWriteRouter::ReshardingDonorWriteRouter(OperationContext* opCtx,
- const NamespaceString& sourceNss,
- CatalogCache* catalogCache)
- : _css(nullptr), _collDesc(nullptr) {}
-
-ReshardingDonorWriteRouter::ReshardingDonorWriteRouter(OperationContext* opCtx,
- const NamespaceString& sourceNss,
- CatalogCache* catalogCache,
- CollectionShardingState* css,
- const ScopedCollectionDescription* collDesc)
- : _css(css), _collDesc(collDesc) {}
-
-CollectionShardingState* ReshardingDonorWriteRouter::getCollectionShardingState() const {
- return nullptr;
-}
-
-boost::optional<ShardId> ReshardingDonorWriteRouter::getDestinedRecipient(
- const BSONObj& fullDocument) const {
- return boost::none;
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp
index baec1d6281d..247fd986682 100644
--- a/src/mongo/db/s/resharding_util.cpp
+++ b/src/mongo/db/s/resharding_util.cpp
@@ -60,31 +60,6 @@
namespace mongo {
using namespace fmt::literals;
-namespace {
-
-UUID getCollectionUuid(OperationContext* opCtx, const NamespaceString& nss) {
- dassert(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IS));
-
- auto uuid = CollectionCatalog::get(opCtx)->lookupUUIDByNSS(opCtx, nss);
- invariant(uuid);
-
- return *uuid;
-}
-
-// Ensure that this shard owns the document. This must be called after verifying that we
-// are in a resharding operation so that we are guaranteed that migrations are suspended.
-bool documentBelongsToMe(OperationContext* opCtx,
- CollectionShardingState* css,
- const ScopedCollectionDescription& collDesc,
- const BSONObj& doc) {
- auto currentKeyPattern = ShardKeyPattern(collDesc.getKeyPattern());
- auto ownershipFilter = css->getOwnershipFilter(
- opCtx, CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup);
-
- return ownershipFilter.keyBelongsToMe(currentKeyPattern.extractShardKeyFromDoc(doc));
-}
-} // namespace
-
BSONObj serializeAndTruncateReshardingErrorIfNeeded(Status originalError) {
BSONObjBuilder originalBob;
originalError.serializeErrorToBSON(&originalBob);
@@ -335,36 +310,6 @@ std::unique_ptr<Pipeline, PipelineDeleter> createOplogFetchingPipelineForReshard
return Pipeline::create(std::move(stages), expCtx);
}
-boost::optional<ShardId> getDestinedRecipient(OperationContext* opCtx,
- const NamespaceString& sourceNss,
- const BSONObj& fullDocument,
- CollectionShardingState* css,
- const ScopedCollectionDescription& collDesc) {
- if (!ShardingState::get(opCtx)->enabled()) {
- // Don't bother looking up the sharding state for the collection if the server isn't even
- // running with sharding enabled. We know there couldn't possibly be any resharding fields.
- return boost::none;
- }
-
- auto reshardingKeyPattern = collDesc.getReshardingKeyIfShouldForwardOps();
- if (!reshardingKeyPattern)
- return boost::none;
-
- if (!documentBelongsToMe(opCtx, css, collDesc, fullDocument))
- return boost::none;
-
- bool allowLocks = true;
- auto tempNssRoutingInfo =
- uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(
- opCtx,
- constructTemporaryReshardingNss(sourceNss.db(), getCollectionUuid(opCtx, sourceNss)),
- allowLocks));
-
- auto shardKey = reshardingKeyPattern->extractShardKeyFromDocThrows(fullDocument);
-
- return tempNssRoutingInfo.findIntersectingChunkWithSimpleCollation(shardKey).getShardId();
-}
-
bool isFinalOplog(const repl::OplogEntry& oplog) {
if (oplog.getOpType() != repl::OpTypeEnum::kNoop) {
return false;
diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h
index 0e8646a0f88..d38b2c8742c 100644
--- a/src/mongo/db/s/resharding_util.h
+++ b/src/mongo/db/s/resharding_util.h
@@ -281,16 +281,6 @@ std::unique_ptr<Pipeline, PipelineDeleter> createOplogFetchingPipelineForReshard
const ShardId& recipientShard);
/**
- * Returns the shard Id of the recipient shard that would own the document under the new shard
- * key pattern.
- */
-boost::optional<ShardId> getDestinedRecipient(OperationContext* opCtx,
- const NamespaceString& sourceNss,
- const BSONObj& fullDocument,
- CollectionShardingState* css,
- const ScopedCollectionDescription& collDesc);
-
-/**
* Sentinel oplog format:
* {
* op: "n",
diff --git a/src/mongo/db/s/sharding_initialization_op_observer_test.cpp b/src/mongo/db/s/sharding_initialization_op_observer_test.cpp
index d93a9d03212..f88fc027875 100644
--- a/src/mongo/db/s/sharding_initialization_op_observer_test.cpp
+++ b/src/mongo/db/s/sharding_initialization_op_observer_test.cpp
@@ -61,6 +61,8 @@ public:
void setUp() override {
ShardingMongodTestFixture::setUp();
+ serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+
// NOTE: this assumes that globalInit will always be called on the same thread as the main
// test thread
ShardingInitializationMongoD::get(operationContext())
diff --git a/src/mongo/db/s/sharding_mongod_test_fixture.cpp b/src/mongo/db/s/sharding_mongod_test_fixture.cpp
index f81cda6ba08..abf6df3ff4f 100644
--- a/src/mongo/db/s/sharding_mongod_test_fixture.cpp
+++ b/src/mongo/db/s/sharding_mongod_test_fixture.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/op_observer_impl.h"
#include "mongo/db/op_observer_registry.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/repl/drop_pending_collection_reaper.h"
@@ -84,58 +85,7 @@ using repl::ReplicationCoordinatorMock;
using repl::ReplSettings;
using unittest::assertGet;
-ShardingMongodTestFixture::ShardingMongodTestFixture() {
- const auto service = getServiceContext();
-
- // Set up this node as shard node, which is part of a replica set
-
- repl::ReplSettings replSettings;
- replSettings.setOplogSizeBytes(512'000);
- replSettings.setReplSetString(ConnectionString::forReplicaSet(_setName, _servers).toString());
- auto replCoordPtr = makeReplicationCoordinator(replSettings);
- _replCoord = replCoordPtr.get();
-
- BSONArrayBuilder serversBob;
- for (size_t i = 0; i < _servers.size(); ++i) {
- serversBob.append(BSON("host" << _servers[i].toString() << "_id" << static_cast<int>(i)));
- }
-
- auto replSetConfig =
- repl::ReplSetConfig::parse(BSON("_id" << _setName << "protocolVersion" << 1 << "version"
- << 3 << "members" << serversBob.arr()));
- replCoordPtr->setGetConfigReturnValue(replSetConfig);
-
- repl::ReplicationCoordinator::set(service, std::move(replCoordPtr));
-
- auto storagePtr = std::make_unique<repl::StorageInterfaceMock>();
-
- repl::DropPendingCollectionReaper::set(
- service, std::make_unique<repl::DropPendingCollectionReaper>(storagePtr.get()));
-
- repl::ReplicationProcess::set(service,
- std::make_unique<repl::ReplicationProcess>(
- storagePtr.get(),
- std::make_unique<repl::ReplicationConsistencyMarkersMock>(),
- std::make_unique<repl::ReplicationRecoveryMock>()));
-
- auto uniqueOpCtx = makeOperationContext();
- ASSERT_OK(
- repl::ReplicationProcess::get(uniqueOpCtx.get())->initializeRollbackID(uniqueOpCtx.get()));
-
- repl::StorageInterface::set(service, std::move(storagePtr));
-
- auto opObserver = checked_cast<OpObserverRegistry*>(service->getOpObserver());
- opObserver->addObserver(std::make_unique<OpObserverShardingImpl>());
- opObserver->addObserver(std::make_unique<ConfigServerOpObserver>());
- opObserver->addObserver(std::make_unique<ShardServerOpObserver>());
-
- repl::createOplog(uniqueOpCtx.get());
-
- // Set the highest FCV because otherwise it defaults to the lower FCV. This way we default to
- // testing this release's code, not backwards compatibility code.
- // (Generic FCV reference): This FCV reference should exist across LTS binary versions.
- serverGlobalParams.mutableFeatureCompatibility.setVersion(multiversion::GenericFCV::kLatest);
-}
+ShardingMongodTestFixture::ShardingMongodTestFixture() {}
ShardingMongodTestFixture::~ShardingMongodTestFixture() = default;
@@ -284,6 +234,53 @@ Status ShardingMongodTestFixture::initializeGlobalShardingStateForMongodForTest(
void ShardingMongodTestFixture::setUp() {
ServiceContextMongoDTest::setUp();
ShardingTestFixtureCommon::setUp();
+
+ const auto service = getServiceContext();
+
+ // Set up this node as shard node, which is part of a replica set
+
+ repl::ReplSettings replSettings;
+ replSettings.setOplogSizeBytes(512'000);
+ replSettings.setReplSetString(ConnectionString::forReplicaSet(_setName, _servers).toString());
+ auto replCoordPtr = makeReplicationCoordinator(replSettings);
+ _replCoord = replCoordPtr.get();
+
+ BSONArrayBuilder serversBob;
+ for (size_t i = 0; i < _servers.size(); ++i) {
+ serversBob.append(BSON("host" << _servers[i].toString() << "_id" << static_cast<int>(i)));
+ }
+
+ auto replSetConfig =
+ repl::ReplSetConfig::parse(BSON("_id" << _setName << "protocolVersion" << 1 << "version"
+ << 3 << "members" << serversBob.arr()));
+ replCoordPtr->setGetConfigReturnValue(replSetConfig);
+
+ repl::ReplicationCoordinator::set(service, std::move(replCoordPtr));
+
+ auto storagePtr = std::make_unique<repl::StorageInterfaceMock>();
+
+ repl::DropPendingCollectionReaper::set(
+ service, std::make_unique<repl::DropPendingCollectionReaper>(storagePtr.get()));
+
+ repl::ReplicationProcess::set(service,
+ std::make_unique<repl::ReplicationProcess>(
+ storagePtr.get(),
+ std::make_unique<repl::ReplicationConsistencyMarkersMock>(),
+ std::make_unique<repl::ReplicationRecoveryMock>()));
+
+ ASSERT_OK(repl::ReplicationProcess::get(operationContext())
+ ->initializeRollbackID(operationContext()));
+
+ repl::StorageInterface::set(service, std::move(storagePtr));
+
+ setupOpObservers();
+
+ repl::createOplog(operationContext());
+
+ // Set the highest FCV because otherwise it defaults to the lower FCV. This way we default to
+ // testing this release's code, not backwards compatibility code.
+ // (Generic FCV reference): This FCV reference should exist across LTS binary versions.
+ serverGlobalParams.mutableFeatureCompatibility.setVersion(multiversion::GenericFCV::kLatest);
}
void ShardingMongodTestFixture::tearDown() {
@@ -349,4 +346,11 @@ repl::ReplicationCoordinatorMock* ShardingMongodTestFixture::replicationCoordina
return _replCoord;
}
+void ShardingMongodTestFixture::setupOpObservers() {
+ auto opObserverRegistry =
+ checked_cast<OpObserverRegistry*>(getServiceContext()->getOpObserver());
+ opObserverRegistry->addObserver(std::make_unique<OpObserverShardingImpl>());
+ opObserverRegistry->addObserver(std::make_unique<ShardServerOpObserver>());
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_mongod_test_fixture.h b/src/mongo/db/s/sharding_mongod_test_fixture.h
index 7c6d955976e..41f4cd1b824 100644
--- a/src/mongo/db/s/sharding_mongod_test_fixture.h
+++ b/src/mongo/db/s/sharding_mongod_test_fixture.h
@@ -118,6 +118,11 @@ protected:
*/
virtual std::unique_ptr<BalancerConfiguration> makeBalancerConfiguration();
+ /**
+ * Setups the op observer listeners depending on cluster role.
+ */
+ virtual void setupOpObservers();
+
private:
/**
* Base class returns a TaskExecutorPool with a fixed TaskExecutor and a set of arbitrary
diff --git a/src/mongo/db/s/sharding_write_router.cpp b/src/mongo/db/s/sharding_write_router.cpp
new file mode 100644
index 00000000000..87542f9c126
--- /dev/null
+++ b/src/mongo/db/s/sharding_write_router.cpp
@@ -0,0 +1,85 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/s/sharding_write_router.h"
+
+#include "mongo/db/catalog/collection_catalog.h"
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/s/catalog_cache.h"
+#include "mongo/s/grid.h"
+
+namespace mongo {
+
+ShardingWriteRouter::ShardingWriteRouter(OperationContext* opCtx,
+ const NamespaceString& nss,
+ CatalogCache* catalogCache) {
+ if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
+ _css = CollectionShardingState::get(opCtx, nss);
+ auto collDesc = _css->getCollectionDescription(opCtx);
+
+ _reshardKeyPattern = collDesc.getReshardingKeyIfShouldForwardOps();
+ if (_reshardKeyPattern) {
+ _ownershipFilter = _css->getOwnershipFilter(
+ opCtx, CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup);
+ _shardKeyPattern = ShardKeyPattern(collDesc.getKeyPattern());
+
+ const auto& reshardingFields = collDesc.getReshardingFields();
+ invariant(reshardingFields);
+ const auto& donorFields = reshardingFields->getDonorFields();
+ invariant(donorFields);
+
+ _reshardingChunkMgr = uassertStatusOK(catalogCache->getCollectionRoutingInfo(
+ opCtx, donorFields->getTempReshardingNss(), true /* allowLocks */));
+ }
+ }
+}
+
+CollectionShardingState* ShardingWriteRouter::getCollectionShardingState() const {
+ return _css;
+}
+
+boost::optional<ShardId> ShardingWriteRouter::getReshardingDestinedRecipient(
+ const BSONObj& fullDocument) const {
+ if (!_reshardKeyPattern) {
+ return boost::none;
+ }
+
+ invariant(_ownershipFilter);
+ invariant(_shardKeyPattern);
+ invariant(_reshardingChunkMgr);
+
+ if (!_ownershipFilter->keyBelongsToMe(_shardKeyPattern->extractShardKeyFromDoc(fullDocument))) {
+ return boost::none;
+ }
+
+ auto shardKey = _reshardKeyPattern->extractShardKeyFromDocThrows(fullDocument);
+ return _reshardingChunkMgr->findIntersectingChunkWithSimpleCollation(shardKey).getShardId();
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding_donor_write_router.h b/src/mongo/db/s/sharding_write_router.h
index 13b76115e1c..219e31859b0 100644
--- a/src/mongo/db/s/resharding_donor_write_router.h
+++ b/src/mongo/db/s/sharding_write_router.h
@@ -37,29 +37,23 @@ class CatalogCache;
class ChunkManager;
class OperationContext;
class ShardId;
-class ReshardingDonorWriteRouter {
+class ShardingWriteRouter {
public:
- ReshardingDonorWriteRouter(OperationContext* opCtx,
- const NamespaceString& sourceNss,
- CatalogCache* catalogCache);
+ ShardingWriteRouter(OperationContext* opCtx,
+ const NamespaceString& nss,
+ CatalogCache* catalogCache);
- ReshardingDonorWriteRouter(OperationContext* opCtx,
- const NamespaceString& sourceNss,
- CatalogCache* catalogCache,
- CollectionShardingState* css,
- const ScopedCollectionDescription* collDesc);
+ boost::optional<ShardId> getReshardingDestinedRecipient(const BSONObj& fullDocument) const;
CollectionShardingState* getCollectionShardingState() const;
- boost::optional<ShardId> getDestinedRecipient(const BSONObj& fullDocument) const;
-
private:
- CollectionShardingState* const _css;
- const ScopedCollectionDescription* const _collDesc;
+ CollectionShardingState* _css{nullptr};
boost::optional<ScopedCollectionFilter> _ownershipFilter;
- boost::optional<ShardKeyPattern> _reshardingKeyPattern;
- boost::optional<ChunkManager> _tempReshardingChunkMgr;
+ boost::optional<ShardKeyPattern> _shardKeyPattern;
+ boost::optional<ShardKeyPattern> _reshardKeyPattern;
+ boost::optional<ChunkManager> _reshardingChunkMgr;
};
} // namespace mongo
diff --git a/src/mongo/db/vector_clock_mongod_test.cpp b/src/mongo/db/vector_clock_mongod_test.cpp
index 9e3406d4bc9..6bc558676a6 100644
--- a/src/mongo/db/vector_clock_mongod_test.cpp
+++ b/src/mongo/db/vector_clock_mongod_test.cpp
@@ -32,6 +32,8 @@
#include "mongo/db/keys_collection_client_direct.h"
#include "mongo/db/keys_collection_manager.h"
#include "mongo/db/logical_time_validator.h"
+#include "mongo/db/op_observer_impl.h"
+#include "mongo/db/op_observer_registry.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/s/sharding_mongod_test_fixture.h"
#include "mongo/db/vector_clock_mutable.h"
@@ -79,6 +81,12 @@ protected:
_keyManager->refreshNow(operationContext());
}
+ void setupOpObservers() override {
+ auto opObserverRegistry =
+ checked_cast<OpObserverRegistry*>(getServiceContext()->getOpObserver());
+ opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>());
+ }
+
private:
std::shared_ptr<KeysCollectionManager> _keyManager;
};
diff --git a/src/mongo/db/vector_clock_test_fixture.cpp b/src/mongo/db/vector_clock_test_fixture.cpp
index b8c7205512e..c25324339db 100644
--- a/src/mongo/db/vector_clock_test_fixture.cpp
+++ b/src/mongo/db/vector_clock_test_fixture.cpp
@@ -35,6 +35,8 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/logical_time.h"
+#include "mongo/db/op_observer_impl.h"
+#include "mongo/db/op_observer_registry.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/service_context.h"
#include "mongo/db/signed_logical_time.h"
@@ -104,4 +106,10 @@ DBDirectClient* VectorClockTestFixture::getDBClient() const {
return _dbDirectClient.get();
}
+void VectorClockTestFixture::setupOpObservers() {
+ auto opObserverRegistry =
+ checked_cast<OpObserverRegistry*>(getServiceContext()->getOpObserver());
+ opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>());
+}
+
} // namespace mongo
diff --git a/src/mongo/db/vector_clock_test_fixture.h b/src/mongo/db/vector_clock_test_fixture.h
index 96a3a94a577..3fc5b043744 100644
--- a/src/mongo/db/vector_clock_test_fixture.h
+++ b/src/mongo/db/vector_clock_test_fixture.h
@@ -74,6 +74,9 @@ protected:
DBDirectClient* getDBClient() const;
+protected:
+ void setupOpObservers() override;
+
private:
VectorClock* _clock;
std::shared_ptr<ClockSourceMock> _mockClockSource = std::make_shared<ClockSourceMock>();
diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript
index 93da5634d8c..1e59959297d 100644
--- a/src/mongo/dbtests/SConscript
+++ b/src/mongo/dbtests/SConscript
@@ -168,7 +168,6 @@ env.Program(
"$BUILD_DIR/mongo/db/repl/serveronly_repl",
"$BUILD_DIR/mongo/db/repl/storage_interface_impl",
"$BUILD_DIR/mongo/db/repl/timestamp_block",
- "$BUILD_DIR/mongo/db/s/resharding_util",
"$BUILD_DIR/mongo/db/server_options_core",
"$BUILD_DIR/mongo/db/sessions_collection_standalone",
"$BUILD_DIR/mongo/db/storage/durable_catalog_impl",
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index 37dee4875f3..a5fd981abe6 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -120,11 +120,12 @@ env.Library(
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/auth/auth',
'$BUILD_DIR/mongo/db/auth/authprivilege',
+ '$BUILD_DIR/mongo/db/curop',
'$BUILD_DIR/mongo/db/generic_cursor',
'$BUILD_DIR/mongo/db/kill_sessions',
'$BUILD_DIR/mongo/db/logical_session_cache',
'$BUILD_DIR/mongo/db/logical_session_id',
- '$BUILD_DIR/mongo/db/shared_request_handling',
+ '$BUILD_DIR/mongo/db/query/query_knobs',
],
)