From dfa8658c18142c560447c7bf6f34a6f788593d28 Mon Sep 17 00:00:00 2001 From: Blake Oler Date: Tue, 28 May 2019 19:20:56 -0400 Subject: SERVER-40791 Track multi-statement transaction operations for migrations at commit time --- .../db/s/migration_chunk_cloner_source_legacy.cpp | 121 +++++++++++++++------ 1 file changed, 85 insertions(+), 36 deletions(-) (limited to 'src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp') diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index a78553c804e..2c3f23ff2d4 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -43,6 +43,8 @@ #include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_process.h" +#include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/s/migration_source_manager.h" #include "mongo/db/s/sharding_statistics.h" #include "mongo/db/s/start_chunk_clone_request.h" #include "mongo/db/service_context.h" @@ -87,6 +89,34 @@ BSONObj createRequestWithSessionId(StringData commandName, return builder.obj(); } +const BSONObj& getDocumentKeyFromReplOperation(repl::ReplOperation replOperation, + repl::OpTypeEnum opType) { + switch (opType) { + case repl::OpTypeEnum::kInsert: + case repl::OpTypeEnum::kDelete: + return replOperation.getObject(); + case repl::OpTypeEnum::kUpdate: + return *replOperation.getObject2(); + default: + MONGO_UNREACHABLE; + } + MONGO_UNREACHABLE; +} + +const char getOpCharForCrudOpType(repl::OpTypeEnum opType) { + switch (opType) { + case repl::OpTypeEnum::kInsert: + return 'i'; + case repl::OpTypeEnum::kUpdate: + return 'u'; + case repl::OpTypeEnum::kDelete: + return 'd'; + default: + MONGO_UNREACHABLE; + } + MONGO_UNREACHABLE; +} + } // namespace /** @@ -110,8 +140,7 @@ public: _prePostImageOpTime(prePostImageOpTime) {} void commit(boost::optional) override { - _cloner->_consumeOperationTrackRequestAndAddToTransferModsQueue( - _idObj, _op, _opTime, _prePostImageOpTime); + _cloner->_addToTransferModsQueue(_idObj, _op, _opTime, _prePostImageOpTime); _cloner->_decrementOutstandingOperationTrackRequests(); } @@ -127,30 +156,62 @@ private: const repl::OpTime _prePostImageOpTime; }; -/** - * Used to keep track of new transactions that involve documents in any chunk - * with an ongoing migration. - */ -class LogPrepareOrCommitOpForShardingHandler final : public RecoveryUnit::Change { -public: - LogPrepareOrCommitOpForShardingHandler(MigrationChunkClonerSourceLegacy* cloner, - const repl::OpTime& opTime) - : _cloner(cloner), _opTime(opTime) {} +void LogTransactionOperationsForShardingHandler::commit(boost::optional) { + std::set namespacesTouchedByTransaction; - void commit(boost::optional) override { - _cloner->_addToSessionMigrationOptimeQueue( - _opTime, SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction); - _cloner->_decrementOutstandingOperationTrackRequests(); - } + for (const auto& stmt : _stmts) { + const auto& nss = stmt.getNss(); - void rollback() override { - _cloner->_decrementOutstandingOperationTrackRequests(); - } + auto csr = CollectionShardingRuntime::get_UNSAFE(_svcCtx, nss); + auto msm = MigrationSourceManager::get_UNSAFE(csr); + if (!msm) { + continue; + } + auto cloner = dynamic_cast(msm->getCloner().get()); -private: - MigrationChunkClonerSourceLegacy* const _cloner; - const repl::OpTime _opTime; -}; + auto opType = stmt.getOpType(); + auto documentKey = getDocumentKeyFromReplOperation(stmt, opType); + + auto idElement = documentKey["_id"]; + if (idElement.eoo()) { + warning() << "Received a document with no id, ignoring: " << redact(documentKey); + continue; + } + + auto const& minKey = cloner->_args.getMinKey(); + auto const& maxKey = cloner->_args.getMaxKey(); + auto const& shardKeyPattern = cloner->_shardKeyPattern; + + if (!isInRange(documentKey, minKey, maxKey, shardKeyPattern)) { + // If the preImageDoc is not in range but the postImageDoc was, we know that the + // document has changed shard keys and no longer belongs in the chunk being cloned. + // We will model the deletion of the preImage document so that the destination chunk + // does not receive an outdated version of this document. + if (opType == repl::OpTypeEnum::kUpdate && + isInRange(stmt.getPreImageDocumentKey(), minKey, maxKey, shardKeyPattern) && + !stmt.getPreImageDocumentKey()["_id"].eoo()) { + opType = repl::OpTypeEnum::kDelete; + idElement = stmt.getPreImageDocumentKey()["id"]; + } else { + continue; + } + } + + // Inform the session migration subsystem that a transaction has committed for all involved + // namespaces. + if (namespacesTouchedByTransaction.find(nss) == namespacesTouchedByTransaction.end()) { + cloner->_addToSessionMigrationOptimeQueue( + _prepareOrCommitOpTime, + SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction); + + namespacesTouchedByTransaction.emplace(nss); + } + + // Pass an empty prePostOpTime to the queue because retryable write history doesn't care + // about writes in transactions. + cloner->_addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {}, {}); + } +} MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequest request, const BSONObj& shardKeyPattern, @@ -456,18 +517,6 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx, } } -void MigrationChunkClonerSourceLegacy::onTransactionPrepareOrUnpreparedCommit( - OperationContext* opCtx, const repl::OpTime& opTime) { - - invariant(opCtx->getTxnNumber()); - - if (!_addedOperationToOutstandingOperationTrackRequests()) { - return; - } - - opCtx->recoveryUnit()->registerChange(new LogPrepareOrCommitOpForShardingHandler(this, opTime)); -} - void MigrationChunkClonerSourceLegacy::_addToSessionMigrationOptimeQueue( const repl::OpTime& opTime, SessionCatalogMigrationSource::EntryAtOpTimeType entryAtOpTimeType) { @@ -478,7 +527,7 @@ void MigrationChunkClonerSourceLegacy::_addToSessionMigrationOptimeQueue( } } -void MigrationChunkClonerSourceLegacy::_consumeOperationTrackRequestAndAddToTransferModsQueue( +void MigrationChunkClonerSourceLegacy::_addToTransferModsQueue( const BSONObj& idObj, const char op, const repl::OpTime& opTime, -- cgit v1.2.1