diff options
author | Randolph Tan <randolph@10gen.com> | 2021-09-21 16:19:56 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-10-25 19:41:46 +0000 |
commit | 075ed1ea34178e3328620e942941e43e85b5b567 (patch) | |
tree | d2934d6f0f4273eae9889810849712c57c16420b | |
parent | 6aec5b3bf11bc33f60b739063df48747126c91f4 (diff) | |
download | mongo-075ed1ea34178e3328620e942941e43e85b5b567.tar.gz |
SERVER-58915 Implement ReshardingDonorWriteRouter functionality along…
(cherry picked from commit 800c6d40912751e272853e383f4c4bf1f00e5c88)
25 files changed, 281 insertions, 303 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index fe0f353c384..5c1cf10467e 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1244,7 +1244,6 @@ env.Library( '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/auth/auth_checks', '$BUILD_DIR/mongo/db/index/index_access_methods', - '$BUILD_DIR/mongo/db/s/resharding_util', '$BUILD_DIR/mongo/scripting/scripting', '$BUILD_DIR/mongo/util/background_job', '$BUILD_DIR/mongo/util/elapsed_tracker', diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp index 708c25dfe30..ee69c650012 100644 --- a/src/mongo/db/exec/update_stage.cpp +++ b/src/mongo/db/exec/update_stage.cpp @@ -48,13 +48,14 @@ #include "mongo/db/query/explain.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/operation_sharding_state.h" -#include "mongo/db/s/resharding_util.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/sharding_write_router.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/duplicate_key_error_info.h" #include "mongo/db/update/path_support.h" #include "mongo/db/update/storage_validation.h" #include "mongo/logv2/log.h" +#include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/s/would_change_owning_shard_exception.h" #include "mongo/util/assert_util.h" @@ -608,7 +609,7 @@ void UpdateStage::_checkRestrictionsOnUpdatingShardKeyAreNotViolated( } -bool UpdateStage::wasReshardingKeyUpdated(CollectionShardingState* css, +bool UpdateStage::wasReshardingKeyUpdated(const ShardingWriteRouter& shardingWriteRouter, const ScopedCollectionDescription& collDesc, const BSONObj& newObj, const Snapshotted<BSONObj>& oldObj) { @@ -625,9 +626,8 @@ bool UpdateStage::wasReshardingKeyUpdated(CollectionShardingState* css, FieldRefSet shardKeyPaths(collDesc.getKeyPatternFields()); _checkRestrictionsOnUpdatingShardKeyAreNotViolated(collDesc, shardKeyPaths); - auto oldRecipShard = - getDestinedRecipient(opCtx(), collection()->ns(), oldObj.value(), css, collDesc); - auto newRecipShard = getDestinedRecipient(opCtx(), collection()->ns(), newObj, css, collDesc); + auto oldRecipShard = *shardingWriteRouter.getReshardingDestinedRecipient(oldObj.value()); + auto newRecipShard = *shardingWriteRouter.getReshardingDestinedRecipient(newObj); uassert(WouldChangeOwningShardInfo(oldObj.value(), newObj, false /* upsert */), "This update would cause the doc to change owning shards under the new shard key", @@ -638,7 +638,15 @@ bool UpdateStage::wasReshardingKeyUpdated(CollectionShardingState* css, bool UpdateStage::checkUpdateChangesShardKeyFields(const boost::optional<BSONObj>& newObjCopy, const Snapshotted<BSONObj>& oldObj) { - auto* const css = CollectionShardingState::get(opCtx(), collection()->ns()); + ShardingWriteRouter shardingWriteRouter( + opCtx(), collection()->ns(), Grid::get(opCtx())->catalogCache()); + auto css = shardingWriteRouter.getCollectionShardingState(); + + // css can be null when this is a config server. + if (css == nullptr) { + return false; + } + const auto collDesc = css->getCollectionDescription(opCtx()); // Calling mutablebson::Document::getObject() renders a full copy of the updated document. This @@ -652,16 +660,20 @@ bool UpdateStage::checkUpdateChangesShardKeyFields(const boost::optional<BSONObj // It is possible that both the existing and new shard keys are being updated, so we do not want // to short-circuit checking whether either is being modified. - const auto existingShardKeyUpdated = wasExistingShardKeyUpdated(css, collDesc, newObj, oldObj); - const auto reshardingKeyUpdated = wasReshardingKeyUpdated(css, collDesc, newObj, oldObj); + const auto existingShardKeyUpdated = + wasExistingShardKeyUpdated(shardingWriteRouter, collDesc, newObj, oldObj); + const auto reshardingKeyUpdated = + wasReshardingKeyUpdated(shardingWriteRouter, collDesc, newObj, oldObj); return existingShardKeyUpdated || reshardingKeyUpdated; } -bool UpdateStage::wasExistingShardKeyUpdated(CollectionShardingState* css, +bool UpdateStage::wasExistingShardKeyUpdated(const ShardingWriteRouter& shardingWriteRouter, const ScopedCollectionDescription& collDesc, const BSONObj& newObj, const Snapshotted<BSONObj>& oldObj) { + const auto css = shardingWriteRouter.getCollectionShardingState(); + const ShardKeyPattern shardKeyPattern(collDesc.getKeyPattern()); auto oldShardKey = shardKeyPattern.extractShardKeyFromDoc(oldObj.value()); auto newShardKey = shardKeyPattern.extractShardKeyFromDoc(newObj); diff --git a/src/mongo/db/exec/update_stage.h b/src/mongo/db/exec/update_stage.h index 3080f186e36..5bb55272e9e 100644 --- a/src/mongo/db/exec/update_stage.h +++ b/src/mongo/db/exec/update_stage.h @@ -44,6 +44,7 @@ namespace mongo { class OperationContext; class OpDebug; struct PlanSummaryStats; +class ShardingWriteRouter; struct UpdateStageParams { using DocumentCounter = std::function<size_t(const BSONObj&)>; @@ -186,12 +187,12 @@ private: * If the update changes shard key fields but the new shard key remains on the same node, * returns true. If the update does not change shard key fields, returns false. */ - bool wasExistingShardKeyUpdated(CollectionShardingState* css, + bool wasExistingShardKeyUpdated(const ShardingWriteRouter& shardingWriteRouter, const ScopedCollectionDescription& collDesc, const BSONObj& newObj, const Snapshotted<BSONObj>& oldObj); - bool wasReshardingKeyUpdated(CollectionShardingState* css, + bool wasReshardingKeyUpdated(const ShardingWriteRouter& shardingWriteRouter, const ScopedCollectionDescription& collDesc, const BSONObj& newObj, const Snapshotted<BSONObj>& oldObj); diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 32b9f1ec877..358be5cf68e 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -41,6 +41,7 @@ #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/import_collection_oplog_entry_gen.h" +#include "mongo/db/catalog_raii.h" #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" @@ -60,7 +61,7 @@ #include "mongo/db/repl/tenant_migration_access_blocker_util.h" #include "mongo/db/repl/tenant_migration_decoration.h" #include "mongo/db/s/collection_sharding_state.h" -#include "mongo/db/s/resharding_util.h" +#include "mongo/db/s/sharding_write_router.h" #include "mongo/db/server_options.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/db/timeseries/bucket_catalog.h" @@ -507,8 +508,8 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, std::vector<repl::OpTime> opTimeList; repl::OpTime lastOpTime; - auto* const css = CollectionShardingState::get(opCtx, nss); - auto collDesc = css->getCollectionDescription(opCtx); + ShardingWriteRouter shardingWriteRouter(opCtx, nss, Grid::get(opCtx)->catalogCache()); + if (inMultiDocumentTransaction) { // Do not add writes to the profile collection to the list of transaction operations, since // these are done outside the transaction. There is no top-level WriteUnitOfWork when we are @@ -520,14 +521,16 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, for (auto iter = first; iter != last; iter++) { auto operation = MutableOplogEntry::makeInsertOperation(nss, uuid.get(), iter->doc); - shardAnnotateOplogEntry(opCtx, nss, iter->doc, operation, css, collDesc); + operation.setDestinedRecipient( + shardingWriteRouter.getReshardingDestinedRecipient(iter->doc)); txnParticipant.addTransactionOperation(opCtx, operation); } } else { std::function<boost::optional<ShardId>(const BSONObj& doc)> getDestinedRecipientFn = - [&](const BSONObj& doc) { - return getDestinedRecipient(opCtx, nss, doc, css, collDesc); + [&shardingWriteRouter](const BSONObj& doc) { + return shardingWriteRouter.getReshardingDestinedRecipient(doc); }; + MutableOplogEntry oplogEntryTemplate; oplogEntryTemplate.setNss(nss); oplogEntryTemplate.setUuid(uuid); @@ -559,8 +562,13 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, size_t index = 0; for (auto it = first; it != last; it++, index++) { auto opTime = opTimeList.empty() ? repl::OpTime() : opTimeList[index]; - shardObserveInsertOp( - opCtx, nss, it->doc, opTime, css, fromMigrate, inMultiDocumentTransaction); + shardObserveInsertOp(opCtx, + nss, + it->doc, + opTime, + shardingWriteRouter, + fromMigrate, + inMultiDocumentTransaction); } if (nss.coll() == "system.js") { @@ -615,16 +623,15 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg const bool inMultiDocumentTransaction = txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen(); - auto* const css = CollectionShardingState::get(opCtx, args.nss); - auto collDesc = css->getCollectionDescription(opCtx); + ShardingWriteRouter shardingWriteRouter(opCtx, args.nss, Grid::get(opCtx)->catalogCache()); OpTimeBundle opTime; if (inMultiDocumentTransaction) { auto operation = MutableOplogEntry::makeUpdateOperation( args.nss, args.uuid, args.updateArgs.update, args.updateArgs.criteria); - shardAnnotateOplogEntry( - opCtx, args.nss, args.updateArgs.updatedDoc, operation, css, collDesc); + operation.setDestinedRecipient( + shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs.updatedDoc)); if (args.updateArgs.preImageRecordingEnabledForCollection) { invariant(args.updateArgs.preImageDoc); @@ -634,12 +641,8 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg txnParticipant.addTransactionOperation(opCtx, operation); } else { MutableOplogEntry oplogEntry; - shardAnnotateOplogEntry(opCtx, - args.nss, - args.updateArgs.updatedDoc, - oplogEntry.getDurableReplOperation(), - css, - collDesc); + oplogEntry.getDurableReplOperation().setDestinedRecipient( + shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs.updatedDoc)); if (opCtx->getTxnNumber() && args.updateArgs.storeImageInSideCollection) { // If we've stored a preImage: @@ -686,7 +689,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg args.updateArgs.preImageDoc, args.updateArgs.updatedDoc, opTime.writeOpTime, - css, + shardingWriteRouter, opTime.prePostImageOpTime, inMultiDocumentTransaction); } @@ -718,9 +721,10 @@ void OpObserverImpl::aboutToDelete(OperationContext* opCtx, auto* const css = CollectionShardingState::get(opCtx, nss); auto collDesc = css->getCollectionDescription(opCtx); + ShardingWriteRouter shardingWriteRouter(opCtx, nss, Grid::get(opCtx)->catalogCache()); repl::DurableReplOperation op; - shardAnnotateOplogEntry(opCtx, nss, doc, op, css, collDesc); + op.setDestinedRecipient(shardingWriteRouter.getReshardingDestinedRecipient(doc)); destinedRecipientDecoration(opCtx) = op.getDestinedRecipient(); shardObserveAboutToDelete(opCtx, nss, doc); @@ -793,12 +797,12 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, if (nss != NamespaceString::kSessionTransactionsTableNamespace) { if (!args.fromMigrate) { - auto* const css = CollectionShardingState::get(opCtx, nss); + ShardingWriteRouter shardingWriteRouter(opCtx, nss, Grid::get(opCtx)->catalogCache()); shardObserveDeleteOp(opCtx, nss, documentKey.getShardKeyAndId(), opTime.writeOpTime, - css, + shardingWriteRouter, opTime.prePostImageOpTime, inMultiDocumentTransaction); } diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h index 9e6f2d91772..be2b5b2896a 100644 --- a/src/mongo/db/op_observer_impl.h +++ b/src/mongo/db/op_observer_impl.h @@ -39,6 +39,8 @@ class ReplOperation; } // namespace repl +class ShardingWriteRouter; + class OpObserverImpl : public OpObserver { OpObserverImpl(const OpObserverImpl&) = delete; OpObserverImpl& operator=(const OpObserverImpl&) = delete; @@ -230,7 +232,7 @@ private: const NamespaceString nss, const BSONObj& insertedDoc, const repl::OpTime& opTime, - CollectionShardingState* css, + const ShardingWriteRouter& shardingWriteRouter, const bool fromMigrate, const bool inMultiDocumentTransaction) {} virtual void shardObserveUpdateOp(OperationContext* opCtx, @@ -238,27 +240,20 @@ private: boost::optional<BSONObj> preImageDoc, const BSONObj& postImageDoc, const repl::OpTime& opTime, - CollectionShardingState* css, + const ShardingWriteRouter& shardingWriteRouter, const repl::OpTime& prePostImageOpTime, const bool inMultiDocumentTransaction) {} virtual void shardObserveDeleteOp(OperationContext* opCtx, const NamespaceString nss, const BSONObj& documentKey, const repl::OpTime& opTime, - CollectionShardingState* css, + const ShardingWriteRouter& shardingWriteRouter, const repl::OpTime& preImageOpTime, const bool inMultiDocumentTransaction) {} virtual void shardObserveTransactionPrepareOrUnpreparedCommit( OperationContext* opCtx, const std::vector<repl::ReplOperation>& stmts, const repl::OpTime& prepareOrCommitOptime) {} - - virtual void shardAnnotateOplogEntry(OperationContext* opCtx, - const NamespaceString nss, - const BSONObj& doc, - repl::DurableReplOperation& op, - CollectionShardingState* css, - const ScopedCollectionDescription& collDesc) {} }; extern const OperationContext::Decoration<boost::optional<OpObserverImpl::DocumentKey>> diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index ba2b3fb13f4..f8373b1e0b0 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -16,9 +16,9 @@ env.Library( 'collection_sharding_state.cpp', 'database_sharding_state.cpp', 'operation_sharding_state.cpp', - 'resharding_donor_write_router.cpp', 'sharding_migration_critical_section.cpp', 'sharding_state.cpp', + 'sharding_write_router.cpp', 'transaction_coordinator_curop.cpp', 'transaction_coordinator_factory.cpp', 'transaction_coordinator_worker_curop_repository.cpp', @@ -27,6 +27,7 @@ env.Library( '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/concurrency/lock_manager', '$BUILD_DIR/mongo/db/range_arithmetic', + '$BUILD_DIR/mongo/s/grid', '$BUILD_DIR/mongo/s/sharding_routing_table', ], ) @@ -63,7 +64,12 @@ env.Library( 'range_deletion_util.cpp', 'read_only_catalog_cache_loader.cpp', 'recoverable_critical_section_service.cpp', + 'resharding_util.cpp', + 'resharding/coordinator_document.idl','resharding/document_source_resharding_iterate_transaction.cpp', 'resharding/document_source_resharding_ownership_match.cpp', + 'resharding/donor_document.idl', + 'resharding/donor_oplog_id.idl', + 'resharding/recipient_document.idl', 'resharding/resharding_change_event_o2_field.idl', 'resharding/resharding_collection_cloner.cpp', 'resharding/resharding_coordinator_commit_monitor.cpp', @@ -149,35 +155,6 @@ env.Library( '$BUILD_DIR/mongo/db/session_catalog', '$BUILD_DIR/mongo/idl/server_parameter', '$BUILD_DIR/mongo/util/future_util', - 'resharding_util', - ], -) - -# Be careful about adding dependencies to this library, as any dependencies will currently be -# included in mongo_embedded. -env.Library( - target='resharding_util', - source=[ - 'resharding_util.cpp', - 'resharding/coordinator_document.idl', - 'resharding/document_source_resharding_iterate_transaction.cpp', - 'resharding/donor_document.idl', - 'resharding/donor_oplog_id.idl', - 'resharding/recipient_document.idl', - ], - LIBDEPS_PRIVATE=[ - '$BUILD_DIR/mongo/base', - '$BUILD_DIR/mongo/db/catalog/collection_options_idl', - '$BUILD_DIR/mongo/db/common', - '$BUILD_DIR/mongo/db/concurrency/write_conflict_exception', - '$BUILD_DIR/mongo/db/curop', - '$BUILD_DIR/mongo/db/namespace_string', - '$BUILD_DIR/mongo/db/pipeline/expression_context', - '$BUILD_DIR/mongo/db/pipeline/pipeline', - '$BUILD_DIR/mongo/db/storage/write_unit_of_work', - '$BUILD_DIR/mongo/s/async_requests_sender', - '$BUILD_DIR/mongo/s/grid', - 'sharding_api_d', ], ) @@ -405,7 +382,6 @@ env.Library( '$BUILD_DIR/mongo/s/commands/shared_cluster_commands', '$BUILD_DIR/mongo/s/sharding_initialization', '$BUILD_DIR/mongo/s/sharding_router_api', - 'resharding_util', 'sharding_runtime_d', ], ) @@ -556,7 +532,6 @@ env.CppUnitTest( '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock', '$BUILD_DIR/mongo/s/sharding_router_test_fixture', - 'resharding_util', 'shard_server_test_fixture', 'sharding_commands_d', 'sharding_logging', @@ -610,6 +585,5 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/repl/replication_info', '$BUILD_DIR/mongo/util/version_impl', 'config_server_test_fixture', - 'resharding_util', ], ) diff --git a/src/mongo/db/s/config/config_server_test_fixture.cpp b/src/mongo/db/s/config/config_server_test_fixture.cpp index 9b42faeff2c..850d228fea0 100644 --- a/src/mongo/db/s/config/config_server_test_fixture.cpp +++ b/src/mongo/db/s/config/config_server_test_fixture.cpp @@ -43,6 +43,8 @@ #include "mongo/db/commands.h" #include "mongo/db/namespace_string.h" #include "mongo/db/op_observer.h" +#include "mongo/db/op_observer_impl.h" +#include "mongo/db/op_observer_registry.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/query_request_helper.h" @@ -51,6 +53,7 @@ #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/s/config/sharding_catalog_manager.h" +#include "mongo/db/s/config_server_op_observer.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/rpc/metadata/repl_set_metadata.h" @@ -456,4 +459,11 @@ void ConfigServerTestFixture::expectSetShardVersion( }); } +void ConfigServerTestFixture::setupOpObservers() { + auto opObserverRegistry = + checked_cast<OpObserverRegistry*>(getServiceContext()->getOpObserver()); + opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>()); + opObserverRegistry->addObserver(std::make_unique<ConfigServerOpObserver>()); +} + } // namespace mongo diff --git a/src/mongo/db/s/config/config_server_test_fixture.h b/src/mongo/db/s/config/config_server_test_fixture.h index 764488a46e6..bf041e57077 100644 --- a/src/mongo/db/s/config/config_server_test_fixture.h +++ b/src/mongo/db/s/config/config_server_test_fixture.h @@ -187,6 +187,9 @@ protected: std::unique_ptr<BalancerConfiguration> makeBalancerConfiguration() override; +protected: + void setupOpObservers() override; + private: /** * 'onPreInitGlobalStateFn' is invoked near the end of _setUp() before calling diff --git a/src/mongo/db/s/op_observer_sharding_impl.cpp b/src/mongo/db/s/op_observer_sharding_impl.cpp index f2afff26956..b7ef1c07d42 100644 --- a/src/mongo/db/s/op_observer_sharding_impl.cpp +++ b/src/mongo/db/s/op_observer_sharding_impl.cpp @@ -41,6 +41,7 @@ #include "mongo/db/s/migration_chunk_cloner_source_legacy.h" #include "mongo/db/s/migration_source_manager.h" #include "mongo/db/s/resharding_util.h" +#include "mongo/db/s/sharding_write_router.h" #include "mongo/logv2/log.h" namespace mongo { @@ -120,12 +121,13 @@ void OpObserverShardingImpl::shardObserveInsertOp(OperationContext* opCtx, const NamespaceString nss, const BSONObj& insertedDoc, const repl::OpTime& opTime, - CollectionShardingState* css, + const ShardingWriteRouter& shardingWriteRouter, const bool fromMigrate, const bool inMultiDocumentTransaction) { if (nss == NamespaceString::kSessionTransactionsTableNamespace || fromMigrate) return; + auto css = shardingWriteRouter.getCollectionShardingState(); auto* const csr = CollectionShardingRuntime::get(css); csr->checkShardVersionOrThrow(opCtx); @@ -159,9 +161,10 @@ void OpObserverShardingImpl::shardObserveUpdateOp(OperationContext* opCtx, boost::optional<BSONObj> preImageDoc, const BSONObj& postImageDoc, const repl::OpTime& opTime, - CollectionShardingState* css, + const ShardingWriteRouter& shardingWriteRouter, const repl::OpTime& prePostImageOpTime, const bool inMultiDocumentTransaction) { + auto css = shardingWriteRouter.getCollectionShardingState(); auto* const csr = CollectionShardingRuntime::get(css); csr->checkShardVersionOrThrow(opCtx); @@ -194,9 +197,10 @@ void OpObserverShardingImpl::shardObserveDeleteOp(OperationContext* opCtx, const NamespaceString nss, const BSONObj& documentKey, const repl::OpTime& opTime, - CollectionShardingState* css, + const ShardingWriteRouter& shardingWriteRouter, const repl::OpTime& preImageOpTime, const bool inMultiDocumentTransaction) { + auto css = shardingWriteRouter.getCollectionShardingState(); auto* const csr = CollectionShardingRuntime::get(css); csr->checkShardVersionOrThrow(opCtx); @@ -236,13 +240,4 @@ void OpObserverShardingImpl::shardObserveTransactionPrepareOrUnpreparedCommit( opCtx->getServiceContext(), stmts, prepareOrCommitOptime)); } -void OpObserverShardingImpl::shardAnnotateOplogEntry(OperationContext* opCtx, - const NamespaceString nss, - const BSONObj& doc, - repl::DurableReplOperation& op, - CollectionShardingState* css, - const ScopedCollectionDescription& collDesc) { - op.setDestinedRecipient(getDestinedRecipient(opCtx, nss, doc, css, collDesc)); -} - } // namespace mongo diff --git a/src/mongo/db/s/op_observer_sharding_impl.h b/src/mongo/db/s/op_observer_sharding_impl.h index 95c7d609c2e..3cdb199f623 100644 --- a/src/mongo/db/s/op_observer_sharding_impl.h +++ b/src/mongo/db/s/op_observer_sharding_impl.h @@ -33,6 +33,8 @@ namespace mongo { +class ShardingWriteRouter; + class OpObserverShardingImpl : public OpObserverImpl { public: // True if the document being deleted belongs to a chunk which, while still in the shard, @@ -50,7 +52,7 @@ protected: const NamespaceString nss, const BSONObj& insertedDoc, const repl::OpTime& opTime, - CollectionShardingState* css, + const ShardingWriteRouter& shardingWriteRouter, const bool fromMigrate, const bool inMultiDocumentTransaction) override; void shardObserveUpdateOp(OperationContext* opCtx, @@ -58,27 +60,20 @@ protected: boost::optional<BSONObj> preImageDoc, const BSONObj& updatedDoc, const repl::OpTime& opTime, - CollectionShardingState* css, + const ShardingWriteRouter& shardingWriteRouter, const repl::OpTime& prePostImageOpTime, const bool inMultiDocumentTransaction) override; void shardObserveDeleteOp(OperationContext* opCtx, const NamespaceString nss, const BSONObj& documentKey, const repl::OpTime& opTime, - CollectionShardingState* css, + const ShardingWriteRouter& shardingWriteRouter, const repl::OpTime& preImageOpTime, const bool inMultiDocumentTransaction) override; void shardObserveTransactionPrepareOrUnpreparedCommit( OperationContext* opCtx, const std::vector<repl::ReplOperation>& stmts, const repl::OpTime& prepareOrCommitOptime) override; - - void shardAnnotateOplogEntry(OperationContext* opCtx, - const NamespaceString nss, - const BSONObj& doc, - repl::DurableReplOperation& op, - CollectionShardingState* css, - const ScopedCollectionDescription& collDesc) override; }; } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp index a54e6dde5fc..82c238f1d4a 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp @@ -73,6 +73,10 @@ public: ServiceContextMongoDTest::setUp(); auto serviceContext = getServiceContext(); + + // Initialize sharding components as a shard server. + serverGlobalParams.clusterRole = ClusterRole::ShardServer; + { auto opCtx = makeOperationContext(); auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(serviceContext); diff --git a/src/mongo/db/s/resharding_destined_recipient_test.cpp b/src/mongo/db/s/resharding_destined_recipient_test.cpp index 26eb6ca32d0..61ece45171b 100644 --- a/src/mongo/db/s/resharding_destined_recipient_test.cpp +++ b/src/mongo/db/s/resharding_destined_recipient_test.cpp @@ -40,10 +40,10 @@ #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/operation_sharding_state.h" -#include "mongo/db/s/resharding_util.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/shard_server_test_fixture.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/sharding_write_router.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/db/transaction_participant.h" #include "mongo/s/catalog/sharding_catalog_client_mock.h" @@ -288,11 +288,10 @@ TEST_F(DestinedRecipientTest, TestGetDestinedRecipient) { AutoGetCollection coll(opCtx, kNss, MODE_IX); OperationShardingState::get(opCtx).initializeClientRoutingVersions( kNss, env.version, env.dbVersion); - auto* const css = CollectionShardingState::get(opCtx, kNss); - auto collDesc = css->getCollectionDescription(opCtx); + ShardingWriteRouter shardingWriteRouter(opCtx, kNss, Grid::get(opCtx)->catalogCache()); auto destShardId = - getDestinedRecipient(opCtx, kNss, BSON("x" << 2 << "y" << 10), css, collDesc); + shardingWriteRouter.getReshardingDestinedRecipient(BSON("x" << 2 << "y" << 10)); ASSERT(destShardId); ASSERT_EQ(*destShardId, env.destShard); } @@ -305,18 +304,16 @@ TEST_F(DestinedRecipientTest, TestGetDestinedRecipientThrowsOnBlockedRefresh) { AutoGetCollection coll(opCtx, kNss, MODE_IX); OperationShardingState::get(opCtx).initializeClientRoutingVersions( kNss, env.version, env.dbVersion); - auto* const css = CollectionShardingState::get(opCtx, kNss); - auto collDesc = css->getCollectionDescription(opCtx); FailPointEnableBlock failPoint("blockCollectionCacheLookup"); - ASSERT_THROWS_WITH_CHECK( - getDestinedRecipient(opCtx, kNss, BSON("x" << 2 << "y" << 10), css, collDesc), - ShardCannotRefreshDueToLocksHeldException, - [&](const ShardCannotRefreshDueToLocksHeldException& ex) { - const auto refreshInfo = ex.extraInfo<ShardCannotRefreshDueToLocksHeldInfo>(); - ASSERT(refreshInfo); - ASSERT_EQ(refreshInfo->getNss(), env.tempNss); - }); + ASSERT_THROWS_WITH_CHECK(ShardingWriteRouter(opCtx, kNss, Grid::get(opCtx)->catalogCache()), + ShardCannotRefreshDueToLocksHeldException, + [&](const ShardCannotRefreshDueToLocksHeldException& ex) { + const auto refreshInfo = + ex.extraInfo<ShardCannotRefreshDueToLocksHeldInfo>(); + ASSERT(refreshInfo); + ASSERT_EQ(refreshInfo->getNss(), env.tempNss); + }); } auto sw = catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, env.tempNss); diff --git a/src/mongo/db/s/resharding_donor_write_router.cpp b/src/mongo/db/s/resharding_donor_write_router.cpp deleted file mode 100644 index d5f4315928f..00000000000 --- a/src/mongo/db/s/resharding_donor_write_router.cpp +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/db/s/resharding_donor_write_router.h" - -namespace mongo { - -ReshardingDonorWriteRouter::ReshardingDonorWriteRouter(OperationContext* opCtx, - const NamespaceString& sourceNss, - CatalogCache* catalogCache) - : _css(nullptr), _collDesc(nullptr) {} - -ReshardingDonorWriteRouter::ReshardingDonorWriteRouter(OperationContext* opCtx, - const NamespaceString& sourceNss, - CatalogCache* catalogCache, - CollectionShardingState* css, - const ScopedCollectionDescription* collDesc) - : _css(css), _collDesc(collDesc) {} - -CollectionShardingState* ReshardingDonorWriteRouter::getCollectionShardingState() const { - return nullptr; -} - -boost::optional<ShardId> ReshardingDonorWriteRouter::getDestinedRecipient( - const BSONObj& fullDocument) const { - return boost::none; -} - -} // namespace mongo diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp index baec1d6281d..247fd986682 100644 --- a/src/mongo/db/s/resharding_util.cpp +++ b/src/mongo/db/s/resharding_util.cpp @@ -60,31 +60,6 @@ namespace mongo { using namespace fmt::literals; -namespace { - -UUID getCollectionUuid(OperationContext* opCtx, const NamespaceString& nss) { - dassert(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IS)); - - auto uuid = CollectionCatalog::get(opCtx)->lookupUUIDByNSS(opCtx, nss); - invariant(uuid); - - return *uuid; -} - -// Ensure that this shard owns the document. This must be called after verifying that we -// are in a resharding operation so that we are guaranteed that migrations are suspended. -bool documentBelongsToMe(OperationContext* opCtx, - CollectionShardingState* css, - const ScopedCollectionDescription& collDesc, - const BSONObj& doc) { - auto currentKeyPattern = ShardKeyPattern(collDesc.getKeyPattern()); - auto ownershipFilter = css->getOwnershipFilter( - opCtx, CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup); - - return ownershipFilter.keyBelongsToMe(currentKeyPattern.extractShardKeyFromDoc(doc)); -} -} // namespace - BSONObj serializeAndTruncateReshardingErrorIfNeeded(Status originalError) { BSONObjBuilder originalBob; originalError.serializeErrorToBSON(&originalBob); @@ -335,36 +310,6 @@ std::unique_ptr<Pipeline, PipelineDeleter> createOplogFetchingPipelineForReshard return Pipeline::create(std::move(stages), expCtx); } -boost::optional<ShardId> getDestinedRecipient(OperationContext* opCtx, - const NamespaceString& sourceNss, - const BSONObj& fullDocument, - CollectionShardingState* css, - const ScopedCollectionDescription& collDesc) { - if (!ShardingState::get(opCtx)->enabled()) { - // Don't bother looking up the sharding state for the collection if the server isn't even - // running with sharding enabled. We know there couldn't possibly be any resharding fields. - return boost::none; - } - - auto reshardingKeyPattern = collDesc.getReshardingKeyIfShouldForwardOps(); - if (!reshardingKeyPattern) - return boost::none; - - if (!documentBelongsToMe(opCtx, css, collDesc, fullDocument)) - return boost::none; - - bool allowLocks = true; - auto tempNssRoutingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo( - opCtx, - constructTemporaryReshardingNss(sourceNss.db(), getCollectionUuid(opCtx, sourceNss)), - allowLocks)); - - auto shardKey = reshardingKeyPattern->extractShardKeyFromDocThrows(fullDocument); - - return tempNssRoutingInfo.findIntersectingChunkWithSimpleCollation(shardKey).getShardId(); -} - bool isFinalOplog(const repl::OplogEntry& oplog) { if (oplog.getOpType() != repl::OpTypeEnum::kNoop) { return false; diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h index 0e8646a0f88..d38b2c8742c 100644 --- a/src/mongo/db/s/resharding_util.h +++ b/src/mongo/db/s/resharding_util.h @@ -281,16 +281,6 @@ std::unique_ptr<Pipeline, PipelineDeleter> createOplogFetchingPipelineForReshard const ShardId& recipientShard); /** - * Returns the shard Id of the recipient shard that would own the document under the new shard - * key pattern. - */ -boost::optional<ShardId> getDestinedRecipient(OperationContext* opCtx, - const NamespaceString& sourceNss, - const BSONObj& fullDocument, - CollectionShardingState* css, - const ScopedCollectionDescription& collDesc); - -/** * Sentinel oplog format: * { * op: "n", diff --git a/src/mongo/db/s/sharding_initialization_op_observer_test.cpp b/src/mongo/db/s/sharding_initialization_op_observer_test.cpp index c5a1b8c1bad..9ed234895e6 100644 --- a/src/mongo/db/s/sharding_initialization_op_observer_test.cpp +++ b/src/mongo/db/s/sharding_initialization_op_observer_test.cpp @@ -61,6 +61,8 @@ public: void setUp() override { ShardingMongodTestFixture::setUp(); + serverGlobalParams.clusterRole = ClusterRole::ShardServer; + // NOTE: this assumes that globalInit will always be called on the same thread as the main // test thread ShardingInitializationMongoD::get(operationContext()) diff --git a/src/mongo/db/s/sharding_mongod_test_fixture.cpp b/src/mongo/db/s/sharding_mongod_test_fixture.cpp index 3760b7e0661..a70c430aa9f 100644 --- a/src/mongo/db/s/sharding_mongod_test_fixture.cpp +++ b/src/mongo/db/s/sharding_mongod_test_fixture.cpp @@ -41,6 +41,7 @@ #include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/op_observer_impl.h" #include "mongo/db/op_observer_registry.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/repl/drop_pending_collection_reaper.h" @@ -84,59 +85,7 @@ using repl::ReplicationCoordinatorMock; using repl::ReplSettings; using unittest::assertGet; -ShardingMongodTestFixture::ShardingMongodTestFixture() { - const auto service = getServiceContext(); - - // Set up this node as shard node, which is part of a replica set - - repl::ReplSettings replSettings; - replSettings.setOplogSizeBytes(512'000); - replSettings.setReplSetString(ConnectionString::forReplicaSet(_setName, _servers).toString()); - auto replCoordPtr = makeReplicationCoordinator(replSettings); - _replCoord = replCoordPtr.get(); - - BSONArrayBuilder serversBob; - for (size_t i = 0; i < _servers.size(); ++i) { - serversBob.append(BSON("host" << _servers[i].toString() << "_id" << static_cast<int>(i))); - } - - auto replSetConfig = - repl::ReplSetConfig::parse(BSON("_id" << _setName << "protocolVersion" << 1 << "version" - << 3 << "members" << serversBob.arr())); - replCoordPtr->setGetConfigReturnValue(replSetConfig); - - repl::ReplicationCoordinator::set(service, std::move(replCoordPtr)); - - auto storagePtr = std::make_unique<repl::StorageInterfaceMock>(); - - repl::DropPendingCollectionReaper::set( - service, std::make_unique<repl::DropPendingCollectionReaper>(storagePtr.get())); - - repl::ReplicationProcess::set(service, - std::make_unique<repl::ReplicationProcess>( - storagePtr.get(), - std::make_unique<repl::ReplicationConsistencyMarkersMock>(), - std::make_unique<repl::ReplicationRecoveryMock>())); - - auto uniqueOpCtx = makeOperationContext(); - ASSERT_OK( - repl::ReplicationProcess::get(uniqueOpCtx.get())->initializeRollbackID(uniqueOpCtx.get())); - - repl::StorageInterface::set(service, std::move(storagePtr)); - - auto opObserver = checked_cast<OpObserverRegistry*>(service->getOpObserver()); - opObserver->addObserver(std::make_unique<OpObserverShardingImpl>()); - opObserver->addObserver(std::make_unique<ConfigServerOpObserver>()); - opObserver->addObserver(std::make_unique<ShardServerOpObserver>()); - - repl::createOplog(uniqueOpCtx.get()); - - // Set the highest FCV because otherwise it defaults to the lower FCV. This way we default to - // testing this release's code, not backwards compatibility code. - // (Generic FCV reference): This FCV reference should exist across LTS binary versions. - serverGlobalParams.mutableFeatureCompatibility.setVersion( - ServerGlobalParams::FeatureCompatibility::kLatest); -} +ShardingMongodTestFixture::ShardingMongodTestFixture() {} ShardingMongodTestFixture::~ShardingMongodTestFixture() = default; @@ -285,6 +234,54 @@ Status ShardingMongodTestFixture::initializeGlobalShardingStateForMongodForTest( void ShardingMongodTestFixture::setUp() { ServiceContextMongoDTest::setUp(); ShardingTestFixtureCommon::setUp(); + + const auto service = getServiceContext(); + + // Set up this node as shard node, which is part of a replica set + + repl::ReplSettings replSettings; + replSettings.setOplogSizeBytes(512'000); + replSettings.setReplSetString(ConnectionString::forReplicaSet(_setName, _servers).toString()); + auto replCoordPtr = makeReplicationCoordinator(replSettings); + _replCoord = replCoordPtr.get(); + + BSONArrayBuilder serversBob; + for (size_t i = 0; i < _servers.size(); ++i) { + serversBob.append(BSON("host" << _servers[i].toString() << "_id" << static_cast<int>(i))); + } + + auto replSetConfig = + repl::ReplSetConfig::parse(BSON("_id" << _setName << "protocolVersion" << 1 << "version" + << 3 << "members" << serversBob.arr())); + replCoordPtr->setGetConfigReturnValue(replSetConfig); + + repl::ReplicationCoordinator::set(service, std::move(replCoordPtr)); + + auto storagePtr = std::make_unique<repl::StorageInterfaceMock>(); + + repl::DropPendingCollectionReaper::set( + service, std::make_unique<repl::DropPendingCollectionReaper>(storagePtr.get())); + + repl::ReplicationProcess::set(service, + std::make_unique<repl::ReplicationProcess>( + storagePtr.get(), + std::make_unique<repl::ReplicationConsistencyMarkersMock>(), + std::make_unique<repl::ReplicationRecoveryMock>())); + + ASSERT_OK(repl::ReplicationProcess::get(operationContext()) + ->initializeRollbackID(operationContext())); + + repl::StorageInterface::set(service, std::move(storagePtr)); + + setupOpObservers(); + + repl::createOplog(operationContext()); + + // Set the highest FCV because otherwise it defaults to the lower FCV. This way we default to + // testing this release's code, not backwards compatibility code. + // (Generic FCV reference): This FCV reference should exist across LTS binary versions. + serverGlobalParams.mutableFeatureCompatibility.setVersion( + ServerGlobalParams::FeatureCompatibility::kLatest); } void ShardingMongodTestFixture::tearDown() { @@ -350,4 +347,11 @@ repl::ReplicationCoordinatorMock* ShardingMongodTestFixture::replicationCoordina return _replCoord; } +void ShardingMongodTestFixture::setupOpObservers() { + auto opObserverRegistry = + checked_cast<OpObserverRegistry*>(getServiceContext()->getOpObserver()); + opObserverRegistry->addObserver(std::make_unique<OpObserverShardingImpl>()); + opObserverRegistry->addObserver(std::make_unique<ShardServerOpObserver>()); +} + } // namespace mongo diff --git a/src/mongo/db/s/sharding_mongod_test_fixture.h b/src/mongo/db/s/sharding_mongod_test_fixture.h index 7c6d955976e..41f4cd1b824 100644 --- a/src/mongo/db/s/sharding_mongod_test_fixture.h +++ b/src/mongo/db/s/sharding_mongod_test_fixture.h @@ -118,6 +118,11 @@ protected: */ virtual std::unique_ptr<BalancerConfiguration> makeBalancerConfiguration(); + /** + * Setups the op observer listeners depending on cluster role. + */ + virtual void setupOpObservers(); + private: /** * Base class returns a TaskExecutorPool with a fixed TaskExecutor and a set of arbitrary diff --git a/src/mongo/db/s/sharding_write_router.cpp b/src/mongo/db/s/sharding_write_router.cpp new file mode 100644 index 00000000000..87542f9c126 --- /dev/null +++ b/src/mongo/db/s/sharding_write_router.cpp @@ -0,0 +1,85 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/s/sharding_write_router.h" + +#include "mongo/db/catalog/collection_catalog.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/grid.h" + +namespace mongo { + +ShardingWriteRouter::ShardingWriteRouter(OperationContext* opCtx, + const NamespaceString& nss, + CatalogCache* catalogCache) { + if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { + _css = CollectionShardingState::get(opCtx, nss); + auto collDesc = _css->getCollectionDescription(opCtx); + + _reshardKeyPattern = collDesc.getReshardingKeyIfShouldForwardOps(); + if (_reshardKeyPattern) { + _ownershipFilter = _css->getOwnershipFilter( + opCtx, CollectionShardingState::OrphanCleanupPolicy::kAllowOrphanCleanup); + _shardKeyPattern = ShardKeyPattern(collDesc.getKeyPattern()); + + const auto& reshardingFields = collDesc.getReshardingFields(); + invariant(reshardingFields); + const auto& donorFields = reshardingFields->getDonorFields(); + invariant(donorFields); + + _reshardingChunkMgr = uassertStatusOK(catalogCache->getCollectionRoutingInfo( + opCtx, donorFields->getTempReshardingNss(), true /* allowLocks */)); + } + } +} + +CollectionShardingState* ShardingWriteRouter::getCollectionShardingState() const { + return _css; +} + +boost::optional<ShardId> ShardingWriteRouter::getReshardingDestinedRecipient( + const BSONObj& fullDocument) const { + if (!_reshardKeyPattern) { + return boost::none; + } + + invariant(_ownershipFilter); + invariant(_shardKeyPattern); + invariant(_reshardingChunkMgr); + + if (!_ownershipFilter->keyBelongsToMe(_shardKeyPattern->extractShardKeyFromDoc(fullDocument))) { + return boost::none; + } + + auto shardKey = _reshardKeyPattern->extractShardKeyFromDocThrows(fullDocument); + return _reshardingChunkMgr->findIntersectingChunkWithSimpleCollation(shardKey).getShardId(); +} + +} // namespace mongo diff --git a/src/mongo/db/s/resharding_donor_write_router.h b/src/mongo/db/s/sharding_write_router.h index 13b76115e1c..219e31859b0 100644 --- a/src/mongo/db/s/resharding_donor_write_router.h +++ b/src/mongo/db/s/sharding_write_router.h @@ -37,29 +37,23 @@ class CatalogCache; class ChunkManager; class OperationContext; class ShardId; -class ReshardingDonorWriteRouter { +class ShardingWriteRouter { public: - ReshardingDonorWriteRouter(OperationContext* opCtx, - const NamespaceString& sourceNss, - CatalogCache* catalogCache); + ShardingWriteRouter(OperationContext* opCtx, + const NamespaceString& nss, + CatalogCache* catalogCache); - ReshardingDonorWriteRouter(OperationContext* opCtx, - const NamespaceString& sourceNss, - CatalogCache* catalogCache, - CollectionShardingState* css, - const ScopedCollectionDescription* collDesc); + boost::optional<ShardId> getReshardingDestinedRecipient(const BSONObj& fullDocument) const; CollectionShardingState* getCollectionShardingState() const; - boost::optional<ShardId> getDestinedRecipient(const BSONObj& fullDocument) const; - private: - CollectionShardingState* const _css; - const ScopedCollectionDescription* const _collDesc; + CollectionShardingState* _css{nullptr}; boost::optional<ScopedCollectionFilter> _ownershipFilter; - boost::optional<ShardKeyPattern> _reshardingKeyPattern; - boost::optional<ChunkManager> _tempReshardingChunkMgr; + boost::optional<ShardKeyPattern> _shardKeyPattern; + boost::optional<ShardKeyPattern> _reshardKeyPattern; + boost::optional<ChunkManager> _reshardingChunkMgr; }; } // namespace mongo diff --git a/src/mongo/db/vector_clock_mongod_test.cpp b/src/mongo/db/vector_clock_mongod_test.cpp index 25c3d267825..46ce61b5faf 100644 --- a/src/mongo/db/vector_clock_mongod_test.cpp +++ b/src/mongo/db/vector_clock_mongod_test.cpp @@ -32,6 +32,8 @@ #include "mongo/db/keys_collection_client_direct.h" #include "mongo/db/keys_collection_manager.h" #include "mongo/db/logical_time_validator.h" +#include "mongo/db/op_observer_impl.h" +#include "mongo/db/op_observer_registry.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/s/sharding_mongod_test_fixture.h" #include "mongo/db/vector_clock_mutable.h" @@ -79,6 +81,12 @@ protected: _keyManager->refreshNow(operationContext()); } + void setupOpObservers() override { + auto opObserverRegistry = + checked_cast<OpObserverRegistry*>(getServiceContext()->getOpObserver()); + opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>()); + } + private: std::shared_ptr<KeysCollectionManager> _keyManager; }; diff --git a/src/mongo/db/vector_clock_test_fixture.cpp b/src/mongo/db/vector_clock_test_fixture.cpp index b8c7205512e..c25324339db 100644 --- a/src/mongo/db/vector_clock_test_fixture.cpp +++ b/src/mongo/db/vector_clock_test_fixture.cpp @@ -35,6 +35,8 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/logical_time.h" +#include "mongo/db/op_observer_impl.h" +#include "mongo/db/op_observer_registry.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/service_context.h" #include "mongo/db/signed_logical_time.h" @@ -104,4 +106,10 @@ DBDirectClient* VectorClockTestFixture::getDBClient() const { return _dbDirectClient.get(); } +void VectorClockTestFixture::setupOpObservers() { + auto opObserverRegistry = + checked_cast<OpObserverRegistry*>(getServiceContext()->getOpObserver()); + opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>()); +} + } // namespace mongo diff --git a/src/mongo/db/vector_clock_test_fixture.h b/src/mongo/db/vector_clock_test_fixture.h index 96a3a94a577..3fc5b043744 100644 --- a/src/mongo/db/vector_clock_test_fixture.h +++ b/src/mongo/db/vector_clock_test_fixture.h @@ -74,6 +74,9 @@ protected: DBDirectClient* getDBClient() const; +protected: + void setupOpObservers() override; + private: VectorClock* _clock; std::shared_ptr<ClockSourceMock> _mockClockSource = std::make_shared<ClockSourceMock>(); diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript index 09bc1e576d3..1860b4f5d21 100644 --- a/src/mongo/dbtests/SConscript +++ b/src/mongo/dbtests/SConscript @@ -169,7 +169,6 @@ if not has_option('noshell') and usemozjs: "$BUILD_DIR/mongo/db/repl/serveronly_repl", "$BUILD_DIR/mongo/db/repl/storage_interface_impl", "$BUILD_DIR/mongo/db/repl/timestamp_block", - "$BUILD_DIR/mongo/db/s/resharding_util", "$BUILD_DIR/mongo/db/server_options_core", "$BUILD_DIR/mongo/db/sessions_collection_standalone", "$BUILD_DIR/mongo/db/storage/durable_catalog_impl", diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index 37dee4875f3..a5fd981abe6 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -120,11 +120,12 @@ env.Library( '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/auth/auth', '$BUILD_DIR/mongo/db/auth/authprivilege', + '$BUILD_DIR/mongo/db/curop', '$BUILD_DIR/mongo/db/generic_cursor', '$BUILD_DIR/mongo/db/kill_sessions', '$BUILD_DIR/mongo/db/logical_session_cache', '$BUILD_DIR/mongo/db/logical_session_id', - '$BUILD_DIR/mongo/db/shared_request_handling', + '$BUILD_DIR/mongo/db/query/query_knobs', ], ) |