diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/s/collection_sharding_runtime.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_runtime.h | 7 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.h | 6 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source.h | 11 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp | 121 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.h | 52 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.h | 5 | ||||
-rw-r--r-- | src/mongo/db/s/move_chunk_command.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/s/op_observer_sharding_impl.cpp | 40 |
11 files changed, 165 insertions, 101 deletions
diff --git a/src/mongo/db/s/collection_sharding_runtime.cpp b/src/mongo/db/s/collection_sharding_runtime.cpp index 05fbb152475..684ae740a00 100644 --- a/src/mongo/db/s/collection_sharding_runtime.cpp +++ b/src/mongo/db/s/collection_sharding_runtime.cpp @@ -86,6 +86,12 @@ CollectionShardingRuntime* CollectionShardingRuntime::get(OperationContext* opCt return checked_cast<CollectionShardingRuntime*>(css); } +CollectionShardingRuntime* CollectionShardingRuntime::get_UNSAFE(ServiceContext* svcCtx, + const NamespaceString& nss) { + auto* const css = CollectionShardingState::get_UNSAFE(svcCtx, nss); + return checked_cast<CollectionShardingRuntime*>(css); +} + void CollectionShardingRuntime::setFilteringMetadata(OperationContext* opCtx, CollectionMetadata newMetadata) { invariant(!newMetadata.isSharded() || !isNamespaceAlwaysUnsharded(_nss), diff --git a/src/mongo/db/s/collection_sharding_runtime.h b/src/mongo/db/s/collection_sharding_runtime.h index d2aefc10ff4..42d91a18913 100644 --- a/src/mongo/db/s/collection_sharding_runtime.h +++ b/src/mongo/db/s/collection_sharding_runtime.h @@ -65,6 +65,13 @@ public: static CollectionShardingRuntime* get(OperationContext* opCtx, const NamespaceString& nss); /** + * It is the caller's responsibility to ensure that the collection locks for this namespace are + * held when this is called. The returned pointer should never be stored. + */ + static CollectionShardingRuntime* get_UNSAFE(ServiceContext* svcCtx, + const NamespaceString& nss); + + /** * Updates the collection's filtering metadata based on changes received from the config server * and also resolves the pending receives map in case some of these pending receives have * committed on the config server or have been abandoned by the donor shard. diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 7c572a029cd..0d8871a0f45 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -148,6 +148,12 @@ CollectionShardingState* CollectionShardingState::get(OperationContext* opCtx, return &collectionsMap->getOrCreate(nss); } +CollectionShardingState* CollectionShardingState::get_UNSAFE(ServiceContext* svcCtx, + const NamespaceString& nss) { + auto& collectionsMap = CollectionShardingStateMap::get(svcCtx); + return &collectionsMap->getOrCreate(nss); +} + void CollectionShardingState::report(OperationContext* opCtx, BSONObjBuilder* builder) { auto& collectionsMap = CollectionShardingStateMap::get(opCtx->getServiceContext()); collectionsMap->report(opCtx, builder); diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index 9991a1811ab..d62b010bec6 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -70,6 +70,12 @@ public: static CollectionShardingState* get(OperationContext* opCtx, const NamespaceString& nss); /** + * It is the caller's responsibility to ensure that the collection locks for this namespace are + * held when this is called. The returned pointer should never be stored. + */ + static CollectionShardingState* get_UNSAFE(ServiceContext* svcCtx, const NamespaceString& nss); + + /** * Reports all collections which have filtering information associated. */ static void report(OperationContext* opCtx, BSONObjBuilder* builder); diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h index 52c8993163e..c871a3e08a8 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source.h +++ b/src/mongo/db/s/migration_chunk_cloner_source.h @@ -156,17 +156,6 @@ public: const repl::OpTime& opTime, const repl::OpTime& preImageOpTime) = 0; - /** - * Notifies this cloner that a transaction involving the collection being cloned was prepared or - * committed. It is up to the cloner's implementation to decide what to do with this information - * and it is valid for the implementation to ignore it. - * - * NOTE: Must be called with at least IX lock held on the collection. - */ - virtual void onTransactionPrepareOrUnpreparedCommit(OperationContext* opCtx, - const repl::OpTime& opTime) = 0; - - protected: MigrationChunkClonerSource(); }; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index a78553c804e..2c3f23ff2d4 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -43,6 +43,8 @@ #include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_process.h" +#include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/s/migration_source_manager.h" #include "mongo/db/s/sharding_statistics.h" #include "mongo/db/s/start_chunk_clone_request.h" #include "mongo/db/service_context.h" @@ -87,6 +89,34 @@ BSONObj createRequestWithSessionId(StringData commandName, return builder.obj(); } +const BSONObj& getDocumentKeyFromReplOperation(repl::ReplOperation replOperation, + repl::OpTypeEnum opType) { + switch (opType) { + case repl::OpTypeEnum::kInsert: + case repl::OpTypeEnum::kDelete: + return replOperation.getObject(); + case repl::OpTypeEnum::kUpdate: + return *replOperation.getObject2(); + default: + MONGO_UNREACHABLE; + } + MONGO_UNREACHABLE; +} + +const char getOpCharForCrudOpType(repl::OpTypeEnum opType) { + switch (opType) { + case repl::OpTypeEnum::kInsert: + return 'i'; + case repl::OpTypeEnum::kUpdate: + return 'u'; + case repl::OpTypeEnum::kDelete: + return 'd'; + default: + MONGO_UNREACHABLE; + } + MONGO_UNREACHABLE; +} + } // namespace /** @@ -110,8 +140,7 @@ public: _prePostImageOpTime(prePostImageOpTime) {} void commit(boost::optional<Timestamp>) override { - _cloner->_consumeOperationTrackRequestAndAddToTransferModsQueue( - _idObj, _op, _opTime, _prePostImageOpTime); + _cloner->_addToTransferModsQueue(_idObj, _op, _opTime, _prePostImageOpTime); _cloner->_decrementOutstandingOperationTrackRequests(); } @@ -127,30 +156,62 @@ private: const repl::OpTime _prePostImageOpTime; }; -/** - * Used to keep track of new transactions that involve documents in any chunk - * with an ongoing migration. - */ -class LogPrepareOrCommitOpForShardingHandler final : public RecoveryUnit::Change { -public: - LogPrepareOrCommitOpForShardingHandler(MigrationChunkClonerSourceLegacy* cloner, - const repl::OpTime& opTime) - : _cloner(cloner), _opTime(opTime) {} +void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestamp>) { + std::set<NamespaceString> namespacesTouchedByTransaction; - void commit(boost::optional<Timestamp>) override { - _cloner->_addToSessionMigrationOptimeQueue( - _opTime, SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction); - _cloner->_decrementOutstandingOperationTrackRequests(); - } + for (const auto& stmt : _stmts) { + const auto& nss = stmt.getNss(); - void rollback() override { - _cloner->_decrementOutstandingOperationTrackRequests(); - } + auto csr = CollectionShardingRuntime::get_UNSAFE(_svcCtx, nss); + auto msm = MigrationSourceManager::get_UNSAFE(csr); + if (!msm) { + continue; + } + auto cloner = dynamic_cast<MigrationChunkClonerSourceLegacy*>(msm->getCloner().get()); -private: - MigrationChunkClonerSourceLegacy* const _cloner; - const repl::OpTime _opTime; -}; + auto opType = stmt.getOpType(); + auto documentKey = getDocumentKeyFromReplOperation(stmt, opType); + + auto idElement = documentKey["_id"]; + if (idElement.eoo()) { + warning() << "Received a document with no id, ignoring: " << redact(documentKey); + continue; + } + + auto const& minKey = cloner->_args.getMinKey(); + auto const& maxKey = cloner->_args.getMaxKey(); + auto const& shardKeyPattern = cloner->_shardKeyPattern; + + if (!isInRange(documentKey, minKey, maxKey, shardKeyPattern)) { + // If the preImageDoc is not in range but the postImageDoc was, we know that the + // document has changed shard keys and no longer belongs in the chunk being cloned. + // We will model the deletion of the preImage document so that the destination chunk + // does not receive an outdated version of this document. + if (opType == repl::OpTypeEnum::kUpdate && + isInRange(stmt.getPreImageDocumentKey(), minKey, maxKey, shardKeyPattern) && + !stmt.getPreImageDocumentKey()["_id"].eoo()) { + opType = repl::OpTypeEnum::kDelete; + idElement = stmt.getPreImageDocumentKey()["id"]; + } else { + continue; + } + } + + // Inform the session migration subsystem that a transaction has committed for all involved + // namespaces. + if (namespacesTouchedByTransaction.find(nss) == namespacesTouchedByTransaction.end()) { + cloner->_addToSessionMigrationOptimeQueue( + _prepareOrCommitOpTime, + SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction); + + namespacesTouchedByTransaction.emplace(nss); + } + + // Pass an empty prePostOpTime to the queue because retryable write history doesn't care + // about writes in transactions. + cloner->_addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {}, {}); + } +} MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequest request, const BSONObj& shardKeyPattern, @@ -456,18 +517,6 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx, } } -void MigrationChunkClonerSourceLegacy::onTransactionPrepareOrUnpreparedCommit( - OperationContext* opCtx, const repl::OpTime& opTime) { - - invariant(opCtx->getTxnNumber()); - - if (!_addedOperationToOutstandingOperationTrackRequests()) { - return; - } - - opCtx->recoveryUnit()->registerChange(new LogPrepareOrCommitOpForShardingHandler(this, opTime)); -} - void MigrationChunkClonerSourceLegacy::_addToSessionMigrationOptimeQueue( const repl::OpTime& opTime, SessionCatalogMigrationSource::EntryAtOpTimeType entryAtOpTimeType) { @@ -478,7 +527,7 @@ void MigrationChunkClonerSourceLegacy::_addToSessionMigrationOptimeQueue( } } -void MigrationChunkClonerSourceLegacy::_consumeOperationTrackRequestAndAddToTransferModsQueue( +void MigrationChunkClonerSourceLegacy::_addToTransferModsQueue( const BSONObj& idObj, const char op, const repl::OpTime& opTime, diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index c737716b93f..e77998d907d 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -54,6 +54,30 @@ class Collection; class Database; class RecordId; +/** + * Used to commit work for LogOpForSharding. Used to keep track of changes in documents that are + * part of a chunk being migrated. + */ +class LogTransactionOperationsForShardingHandler final : public RecoveryUnit::Change { +public: + /** + * Invariant: idObj should belong to a document that is part of the active chunk being migrated + */ + LogTransactionOperationsForShardingHandler(ServiceContext* svcCtx, + const std::vector<repl::ReplOperation>& stmts, + const repl::OpTime& prepareOrCommitOpTime) + : _svcCtx(svcCtx), _stmts(stmts), _prepareOrCommitOpTime(prepareOrCommitOpTime) {} + + void commit(boost::optional<Timestamp>) override; + + void rollback() override{}; + +private: + ServiceContext* _svcCtx; + std::vector<repl::ReplOperation> _stmts; + const repl::OpTime _prepareOrCommitOpTime; +}; + class MigrationChunkClonerSourceLegacy final : public MigrationChunkClonerSource { MigrationChunkClonerSourceLegacy(const MigrationChunkClonerSourceLegacy&) = delete; MigrationChunkClonerSourceLegacy& operator=(const MigrationChunkClonerSourceLegacy&) = delete; @@ -91,10 +115,6 @@ public: const repl::OpTime& opTime, const repl::OpTime& preImageOpTime) override; - void onTransactionPrepareOrUnpreparedCommit(OperationContext* opCtx, - const repl::OpTime& opTime) override; - - // Legacy cloner specific functionality /** @@ -183,7 +203,7 @@ public: private: friend class LogOpForShardingHandler; - friend class LogPrepareOrCommitOpForShardingHandler; + friend class LogTransactionOperationsForShardingHandler; // Represents the states in which the cloner can be enum State { kNew, kCloning, kDone }; @@ -216,18 +236,20 @@ private: const repl::OpTime& opTime, SessionCatalogMigrationSource::EntryAtOpTimeType entryAtOpTimeType); + void _addToSessionMigrationOptimeQueueForTransactionCommit( + const repl::OpTime& opTime, + SessionCatalogMigrationSource::EntryAtOpTimeType entryAtOpTimeType); + /* - * Consumes the operation track request and appends the relevant document changes to - * the appropriate internal data structures (known colloquially as the 'transfer mods queue'). - * These structures track document changes that are part of a part of a chunk being migrated. - * In doing so, this the method also removes the corresponding operation track request from the - * operation track requests queue. + * Appends the relevant document changes to the appropriate internal data structures (known + * colloquially as the 'transfer mods queue'). These structures track document changes that are + * part of a part of a chunk being migrated. In doing so, this the method also removes the + * corresponding operation track request from the operation track requests queue. */ - void _consumeOperationTrackRequestAndAddToTransferModsQueue( - const BSONObj& idObj, - const char op, - const repl::OpTime& opTime, - const repl::OpTime& prePostImageOpTime); + void _addToTransferModsQueue(const BSONObj& idObj, + const char op, + const repl::OpTime& opTime, + const repl::OpTime& prePostImageOpTime); /** * Adds an operation to the outstanding operation track requests. Returns false if the cloner diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 8935b676e45..ae210f50e5e 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -127,6 +127,10 @@ MigrationSourceManager* MigrationSourceManager::get(CollectionShardingRuntime* c return msmForCsr(csr); } +MigrationSourceManager* MigrationSourceManager::get_UNSAFE(CollectionShardingRuntime* csr) { + return msmForCsr(csr); +} + MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, MoveChunkRequest request, ConnectionString donorConnStr, @@ -280,8 +284,8 @@ Status MigrationSourceManager::startClone(OperationContext* opCtx) { auto const readConcernArgs = repl::ReadConcernArgs( replCoord->getMyLastAppliedOpTime(), repl::ReadConcernLevel::kLocalReadConcern); - uassertStatusOK( - waitForReadConcern(opCtx, readConcernArgs, false, PrepareConflictBehavior::kEnforce)); + uassertStatusOK(waitForReadConcern( + opCtx, readConcernArgs, false, PrepareConflictBehavior::kIgnoreConflicts)); } Status startCloneStatus = _cloneDriver->startClone(opCtx); diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index b567b5ad7ce..cf0a14fd9df 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -77,6 +77,11 @@ public: */ static MigrationSourceManager* get(CollectionShardingRuntime* csr, CollectionShardingRuntime::CSRLock& csrLock); + /** + * It is the caller's responsibility to ensure that the collection locks for this namespace are + * held when this is called. The returned pointer should never be stored. + */ + static MigrationSourceManager* get_UNSAFE(CollectionShardingRuntime* csr); /** * Instantiates a new migration source manager with the specified migration parameters. Must be diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp index dd62c984292..0b3c40b7564 100644 --- a/src/mongo/db/s/move_chunk_command.cpp +++ b/src/mongo/db/s/move_chunk_command.cpp @@ -103,6 +103,10 @@ public: return true; } + bool canIgnorePrepareConflicts() const override { + return true; + } + Status checkAuthForCommand(Client* client, const std::string& dbname, const BSONObj& cmdObj) const override { diff --git a/src/mongo/db/s/op_observer_sharding_impl.cpp b/src/mongo/db/s/op_observer_sharding_impl.cpp index 2b2ae6aecdf..1cc5844cde3 100644 --- a/src/mongo/db/s/op_observer_sharding_impl.cpp +++ b/src/mongo/db/s/op_observer_sharding_impl.cpp @@ -33,6 +33,7 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/s/migration_chunk_cloner_source_legacy.h" #include "mongo/db/s/migration_source_manager.h" namespace mongo { @@ -166,43 +167,8 @@ void OpObserverShardingImpl::shardObserveTransactionPrepareOrUnpreparedCommit( const std::vector<repl::ReplOperation>& stmts, const repl::OpTime& prepareOrCommitOptime) { - std::set<NamespaceString> namespacesTouchedByTransaction; - - for (const auto& stmt : stmts) { - const auto& nss = stmt.getNss(); - - invariant(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IS)); - - auto csr = CollectionShardingRuntime::get(opCtx, nss); - auto csrLock = CollectionShardingRuntime::CSRLock::lock(opCtx, csr); - auto msm = MigrationSourceManager::get(csr, csrLock); - if (!msm) { - continue; - } - - if (namespacesTouchedByTransaction.find(nss) == namespacesTouchedByTransaction.end()) { - msm->getCloner()->onTransactionPrepareOrUnpreparedCommit(opCtx, prepareOrCommitOptime); - namespacesTouchedByTransaction.insert(nss); - } - - const auto& opType = stmt.getOpType(); - - // We pass an empty opTime to observers because retryable write history doesn't care about - // writes in transactions. - if (opType == repl::OpTypeEnum::kInsert) { - msm->getCloner()->onInsertOp(opCtx, stmt.getObject(), {}); - } else if (opType == repl::OpTypeEnum::kUpdate) { - if (auto updateDoc = stmt.getObject2()) { - msm->getCloner()->onUpdateOp( - opCtx, stmt.getPreImageDocumentKey(), *updateDoc, {}, {}); - } - } else if (opType == repl::OpTypeEnum::kDelete) { - if (isMigratingWithCSRLock(csr, csrLock, stmt.getObject())) { - msm->getCloner()->onDeleteOp( - opCtx, getDocumentKey(opCtx, nss, stmt.getObject()), {}, {}); - } - } - } + opCtx->recoveryUnit()->registerChange(new LogTransactionOperationsForShardingHandler( + opCtx->getServiceContext(), stmts, prepareOrCommitOptime)); } } // namespace mongo |