diff options
26 files changed, 316 insertions, 64 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml index 5cc3e03d326..302f42137e2 100644 --- a/etc/backports_required_for_multiversion_tests.yml +++ b/etc/backports_required_for_multiversion_tests.yml @@ -248,6 +248,8 @@ last-continuous: ticket: SERVER-72620 - test_file: jstests/core/timeseries/bucket_unpacking_with_sort_extended_range.js ticket: SERVER-73110 + - test_file: jstests/sharding/prepare_transaction_then_migrate.js + ticket: SERVER-71219 suites: null last-lts: all: @@ -316,7 +318,7 @@ last-lts: - test_file: jstests/core/txns/errors_on_committed_transaction.js ticket: SERVER-52547 - test_file: jstests/sharding/prepare_transaction_then_migrate.js - ticket: SERVER-52906 + ticket: SERVER-71219 - test_file: jstests/sharding/migration_waits_for_majority_commit.js ticket: SERVER-52906 - test_file: jstests/sharding/migration_ignore_interrupts_1.js diff --git a/jstests/sharding/prepare_transaction_then_migrate.js b/jstests/sharding/prepare_transaction_then_migrate.js index 36a9581752d..5a12a83ed4c 100644 --- a/jstests/sharding/prepare_transaction_then_migrate.js +++ b/jstests/sharding/prepare_transaction_then_migrate.js @@ -3,7 +3,7 @@ * 1. Ignore multi-statement transaction prepare conflicts in the clone phase, and * 2. Pick up the changes for prepared transactions in the transfer mods phase. * - * @tags: [uses_transactions, uses_prepare_transaction] + * @tags: [uses_transactions, uses_prepare_transaction, requires_persistence] */ (function() { @@ -17,8 +17,17 @@ const collName = "user"; const staticMongod = MongoRunner.runMongod({}); // For startParallelOps. -let runTest = function(withStepUp) { - const st = new ShardingTest({shards: {rs0: {nodes: withStepUp ? 2 : 1}, rs1: {nodes: 1}}}); +const TestMode = { + kBasic: 'basic', + kWithStepUp: 'with stepUp', + kWithRestart: 'with restart', +}; + +let runTest = function(testMode) { + jsTest.log(`Running test in mode ${testMode}`); + + const st = new ShardingTest( + {shards: {rs0: {nodes: testMode == TestMode.kWithStepUp ? 2 : 1}, rs1: {nodes: 1}}}); const collection = st.s.getDB(dbName).getCollection(collName); CreateShardedCollectionUtil.shardCollectionWithChunks(collection, {x: 1}, [ @@ -81,8 +90,23 @@ let runTest = function(withStepUp) { let prepareTimestamp = res.prepareTimestamp; - if (withStepUp) { + if (testMode == TestMode.kWithStepUp) { st.rs0.stepUp(st.rs0.getSecondary()); + } else if (testMode == TestMode.kWithRestart) { + TestData.skipCollectionAndIndexValidation = true; + st.rs0.restart(st.rs0.getPrimary()); + st.rs0.waitForPrimary(); + TestData.skipCollectionAndIndexValidation = false; + + assert.soon(() => { + try { + st.shard0.getDB(dbName).getCollection(collName).findOne(); + return true; + } catch (ex) { + print("Caught expected once exception due to restart: " + tojson(ex)); + return false; + } + }); } const joinMoveChunk = @@ -157,9 +181,9 @@ let runTest = function(withStepUp) { st.stop(); }; -runTest(false); -// TODO: SERVER-71219 Enable test after fixing. -// runTest(true); +runTest(TestMode.kBasic); +runTest(TestMode.kWithStepUp); +runTest(TestMode.kWithRestart); MongoRunner.stopMongod(staticMongod); })(); diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h index c145ebc3371..f0afcd2a860 100644 --- a/src/mongo/db/auth/auth_op_observer.h +++ b/src/mongo/db/auth/auth_op_observer.h @@ -206,6 +206,10 @@ public: size_t numberOfPrePostImagesToWrite, Date_t wallClockTime) final {} + void onTransactionPrepareNonPrimary(OperationContext* opCtx, + const std::vector<repl::OplogEntry>& statements, + const repl::OpTime& prepareOpTime) final {} + void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} diff --git a/src/mongo/db/fcv_op_observer.h b/src/mongo/db/fcv_op_observer.h index 7380f65358f..8ee4a32dc05 100644 --- a/src/mongo/db/fcv_op_observer.h +++ b/src/mongo/db/fcv_op_observer.h @@ -203,6 +203,10 @@ public: size_t numberOfPrePostImagesToWrite, Date_t wallClockTime) final{}; + void onTransactionPrepareNonPrimary(OperationContext* opCtx, + const std::vector<repl::OplogEntry>& statements, + const repl::OpTime& prepareOpTime) final {} + void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final{}; diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h index 498d728d187..4e1b3d03a3e 100644 --- a/src/mongo/db/free_mon/free_mon_op_observer.h +++ b/src/mongo/db/free_mon/free_mon_op_observer.h @@ -206,6 +206,10 @@ public: size_t numberOfPrePostImagesToWrite, Date_t wallClockTime) final {} + void onTransactionPrepareNonPrimary(OperationContext* opCtx, + const std::vector<repl::OplogEntry>& statements, + const repl::OpTime& prepareOpTime) final {} + void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index 163d4738b0e..5332973563d 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -496,6 +496,15 @@ public: Date_t wallClockTime) = 0; /** + * This is called when a transaction transitions into prepare while it is not primary. Example + * case can include secondary oplog application or when node was restared and tries to + * recover prepared transactions from the oplog. + */ + virtual void onTransactionPrepareNonPrimary(OperationContext* opCtx, + const std::vector<repl::OplogEntry>& statements, + const repl::OpTime& prepareOpTime) = 0; + + /** * The onTransactionAbort method is called when an atomic transaction aborts, before the * RecoveryUnit onRollback() is called. It must not be called when the transaction to abort is * active. diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index f045cff94ff..61a960bea4b 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -2218,6 +2218,12 @@ void OpObserverImpl::onTransactionPrepare( shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, *statements, prepareOpTime); } +void OpObserverImpl::onTransactionPrepareNonPrimary(OperationContext* opCtx, + const std::vector<repl::OplogEntry>& statements, + const repl::OpTime& prepareOpTime) { + shardObserveNonPrimaryTransactionPrepare(opCtx, statements, prepareOpTime); +} + void OpObserverImpl::onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) { invariant(opCtx->getTxnNumber()); diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h index 77676e9548e..55c32faf581 100644 --- a/src/mongo/db/op_observer_impl.h +++ b/src/mongo/db/op_observer_impl.h @@ -209,6 +209,11 @@ public: const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, size_t numberOfPrePostImagesToWrite, Date_t wallClockTime) final; + + void onTransactionPrepareNonPrimary(OperationContext* opCtx, + const std::vector<repl::OplogEntry>& statements, + const repl::OpTime& prepareOpTime) final; + void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final; void onMajorityCommitPointUpdate(ServiceContext* service, @@ -244,6 +249,10 @@ private: OperationContext* opCtx, const std::vector<repl::ReplOperation>& stmts, const repl::OpTime& prepareOrCommitOptime) {} + virtual void shardObserveNonPrimaryTransactionPrepare( + OperationContext* opCtx, + const std::vector<repl::OplogEntry>& stmts, + const repl::OpTime& prepareOrCommitOptime) {} void _onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) final; }; diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h index 667164d7564..feaf3a13f96 100644 --- a/src/mongo/db/op_observer_noop.h +++ b/src/mongo/db/op_observer_noop.h @@ -183,6 +183,9 @@ public: const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, size_t numberOfPrePostImagesToWrite, Date_t wallClockTime) override{}; + void onTransactionPrepareNonPrimary(OperationContext* opCtx, + const std::vector<repl::OplogEntry>& statements, + const repl::OpTime& prepareOpTime) override {} void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override{}; void onMajorityCommitPointUpdate(ServiceContext* service, diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h index 4b30bc7c72b..2792854d735 100644 --- a/src/mongo/db/op_observer_registry.h +++ b/src/mongo/db/op_observer_registry.h @@ -425,6 +425,15 @@ public: } } + void onTransactionPrepareNonPrimary(OperationContext* opCtx, + const std::vector<repl::OplogEntry>& statements, + const repl::OpTime& prepareOpTime) override { + ReservedTimes times{opCtx}; + for (auto& observer : _observers) { + observer->onTransactionPrepareNonPrimary(opCtx, statements, prepareOpTime); + } + } + void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override { ReservedTimes times{opCtx}; diff --git a/src/mongo/db/repl/primary_only_service_op_observer.h b/src/mongo/db/repl/primary_only_service_op_observer.h index 5f552149f98..76c225a483c 100644 --- a/src/mongo/db/repl/primary_only_service_op_observer.h +++ b/src/mongo/db/repl/primary_only_service_op_observer.h @@ -208,6 +208,10 @@ public: size_t numberOfPrePostImagesToWrite, Date_t wallClockTime) final {} + void onTransactionPrepareNonPrimary(OperationContext* opCtx, + const std::vector<repl::OplogEntry>& statements, + const repl::OpTime& prepareOpTime) final {} + void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} diff --git a/src/mongo/db/repl/tenant_migration_donor_op_observer.h b/src/mongo/db/repl/tenant_migration_donor_op_observer.h index 43992f5e040..403d2b6b486 100644 --- a/src/mongo/db/repl/tenant_migration_donor_op_observer.h +++ b/src/mongo/db/repl/tenant_migration_donor_op_observer.h @@ -188,6 +188,10 @@ public: Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept final {} + void onTransactionPrepareNonPrimary(OperationContext* opCtx, + const std::vector<repl::OplogEntry>& statements, + const repl::OpTime& prepareOpTime) final {} + std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare( OperationContext* opCtx, const std::vector<OplogSlot>& reservedSlots, diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.h b/src/mongo/db/repl/tenant_migration_recipient_op_observer.h index dd42ff6581f..7f5181a6d4a 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.h +++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.h @@ -207,6 +207,10 @@ public: size_t numberOfPrePostImagesToWrite, Date_t wallClockTime) final {} + void onTransactionPrepareNonPrimary(OperationContext* opCtx, + const std::vector<repl::OplogEntry>& statements, + const repl::OpTime& prepareOpTime) final {} + void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp index b34403b0de6..94d06c02665 100644 --- a/src/mongo/db/repl/transaction_oplog_application.cpp +++ b/src/mongo/db/repl/transaction_oplog_application.cpp @@ -38,6 +38,7 @@ #include "mongo/db/concurrency/exception_util.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/index_builds_coordinator.h" +#include "mongo/db/op_observer.h" #include "mongo/db/repl/apply_ops.h" #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/repl/timestamp_block.h" @@ -498,6 +499,11 @@ Status _applyPrepareTransaction(OperationContext* opCtx, } txnParticipant.prepareTransaction(opCtx, entry.getOpTime()); + + auto opObserver = opCtx->getServiceContext()->getOpObserver(); + invariant(opObserver); + opObserver->onTransactionPrepareNonPrimary(opCtx, ops, entry.getOpTime()); + // Prepare transaction success. abortOnError.dismiss(); diff --git a/src/mongo/db/s/collection_metadata.cpp b/src/mongo/db/s/collection_metadata.cpp index ea9f6f0c7c1..6b0fb689356 100644 --- a/src/mongo/db/s/collection_metadata.cpp +++ b/src/mongo/db/s/collection_metadata.cpp @@ -107,13 +107,13 @@ void CollectionMetadata::throwIfReshardingInProgress(NamespaceString const& nss) } } -BSONObj CollectionMetadata::extractDocumentKey(const BSONObj& doc) const { +BSONObj CollectionMetadata::extractDocumentKey(const ShardKeyPattern* shardKeyPattern, + const BSONObj& doc) { BSONObj key; - if (isSharded()) { - auto const& pattern = _cm->getShardKeyPattern(); - key = dotted_path_support::extractElementsBasedOnTemplate(doc, pattern.toBSON()); - if (pattern.hasId()) { + if (shardKeyPattern) { + key = dotted_path_support::extractElementsBasedOnTemplate(doc, shardKeyPattern->toBSON()); + if (shardKeyPattern->hasId()) { return key; } // else, try to append an _id field from the document. @@ -127,6 +127,10 @@ BSONObj CollectionMetadata::extractDocumentKey(const BSONObj& doc) const { return doc; } +BSONObj CollectionMetadata::extractDocumentKey(const BSONObj& doc) const { + return extractDocumentKey(isSharded() ? &_cm->getShardKeyPattern() : nullptr, doc); +} + std::string CollectionMetadata::toStringBasic() const { if (isSharded()) { return str::stream() << "collection version: " << _cm->getVersion().toString() diff --git a/src/mongo/db/s/collection_metadata.h b/src/mongo/db/s/collection_metadata.h index b691f94ebe5..c2b323c0cc7 100644 --- a/src/mongo/db/s/collection_metadata.h +++ b/src/mongo/db/s/collection_metadata.h @@ -162,6 +162,12 @@ public: BSONObj extractDocumentKey(const BSONObj& doc) const; /** + * Static version of the function above. Only use this for internal sharding operations where + * shard key pattern is fixed and cannot change. + */ + static BSONObj extractDocumentKey(const ShardKeyPattern* shardKeyPattern, const BSONObj& doc); + + /** * String output of the collection and shard versions. */ std::string toStringBasic() const; diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h index 18e83a4b994..2acd0017ab7 100644 --- a/src/mongo/db/s/config_server_op_observer.h +++ b/src/mongo/db/s/config_server_op_observer.h @@ -208,6 +208,10 @@ public: size_t numberOfPrePostImagesToWrite, Date_t wallClockTime) override {} + void onTransactionPrepareNonPrimary(OperationContext* opCtx, + const std::vector<repl::OplogEntry>& statements, + const repl::OpTime& prepareOpTime) override {} + void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 0b68b839712..2272599806e 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -33,20 +33,26 @@ #include "mongo/db/s/migration_chunk_cloner_source_legacy.h" +#include <fmt/format.h> + #include "mongo/base/status.h" #include "mongo/bson/bsonobj.h" #include "mongo/client/read_preference.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/ops/write_ops_retryability.h" +#include "mongo/db/query/get_executor.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_process.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/migration_source_manager.h" +#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/shard_key_index_util.h" #include "mongo/db/s/sharding_runtime_d_params_gen.h" #include "mongo/db/s/sharding_statistics.h" @@ -69,6 +75,8 @@ namespace mongo { namespace { +using namespace fmt::literals; + const char kRecvChunkStatus[] = "_recvChunkStatus"; const char kRecvChunkCommit[] = "_recvChunkCommit"; const char kRecvChunkAbort[] = "_recvChunkAbort"; @@ -107,9 +115,8 @@ BSONObj createRequestWithSessionId(StringData commandName, return builder.obj(); } -BSONObj getDocumentKeyFromReplOperation(repl::ReplOperation replOperation, - repl::OpTypeEnum opType) { - switch (opType) { +BSONObj getDocumentKeyFromReplOperation(repl::ReplOperation replOperation) { + switch (replOperation.getOpType()) { case repl::OpTypeEnum::kInsert: case repl::OpTypeEnum::kDelete: return replOperation.getObject(); @@ -168,6 +175,30 @@ private: const repl::OpTime _opTime; }; +LogTransactionOperationsForShardingHandler::LogTransactionOperationsForShardingHandler( + LogicalSessionId lsid, + const std::vector<repl::OplogEntry>& stmts, + repl::OpTime prepareOrCommitOpTime) + : _lsid(std::move(lsid)), _prepareOrCommitOpTime(std::move(prepareOrCommitOpTime)) { + _stmts.reserve(stmts.size()); + _ownedReplBSONObj.reserve(stmts.size()); + + for (const auto& op : stmts) { + auto ownedBSON = op.getDurableReplOperation().toBSON().getOwned(); + _ownedReplBSONObj.push_back(ownedBSON); + _stmts.push_back( + repl::ReplOperation::parse({"MigrationChunkClonerSource_toReplOperation"}, ownedBSON)); + } +} + +LogTransactionOperationsForShardingHandler::LogTransactionOperationsForShardingHandler( + LogicalSessionId lsid, + const std::vector<repl::ReplOperation>& stmts, + repl::OpTime prepareOrCommitOpTime) + : _lsid(std::move(lsid)), + _stmts(stmts), + _prepareOrCommitOpTime(std::move(prepareOrCommitOpTime)) {} + void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestamp>) { std::set<NamespaceString> namespacesTouchedByTransaction; @@ -225,7 +256,7 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam continue; } - auto preImageDocKey = getDocumentKeyFromReplOperation(stmt, opType); + auto preImageDocKey = getDocumentKeyFromReplOperation(stmt); auto idElement = preImageDocKey["_id"]; if (idElement.eoo()) { @@ -235,52 +266,35 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam continue; } - auto const& minKey = cloner->_args.getMin().get(); - auto const& maxKey = cloner->_args.getMax().get(); - auto const& shardKeyPattern = cloner->_shardKeyPattern; - - // Note: This assumes that prepared transactions will always have post document key - // set. There is a small window where create collection coordinator releases the critical - // section and before it writes down the chunks for non-empty collections. So in theory, - // it is possible to have a prepared transaction while collection is unsharded - // and becomes sharded midway. This doesn't happen in practice because the only way to - // have a prepared transactions without being sharded is by directly connecting to the - // shards and manually preparing the transaction. Another exception is when transaction - // is prepared on an older version that doesn't set the post image document key. - auto postImageDocKey = stmt.getPostImageDocumentKey(); - if (postImageDocKey.isEmpty()) { - LOGV2_WARNING( - 6836102, - "Migration encountered a transaction operation without a post image document key", - "preImageDocKey"_attr = preImageDocKey); - } else { - auto postShardKeyValues = - shardKeyPattern.extractShardKeyFromDocumentKey(postImageDocKey); - fassert(6836100, !postShardKeyValues.isEmpty()); - - if (!isShardKeyValueInRange(postShardKeyValues, minKey, maxKey)) { - // If the preImageDoc is not in range but the postImageDoc was, we know that the - // document has changed shard keys and no longer belongs in the chunk being cloned. - // We will model the deletion of the preImage document so that the destination chunk - // does not receive an outdated version of this document. - - auto preImageShardKeyValues = - shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey); - fassert(6836101, !preImageShardKeyValues.isEmpty()); - - if (opType == repl::OpTypeEnum::kUpdate && - isShardKeyValueInRange(preImageShardKeyValues, minKey, maxKey)) { - opType = repl::OpTypeEnum::kDelete; - idElement = postImageDocKey["_id"]; - } else { + if (opType == repl::OpTypeEnum::kUpdate) { + auto const& shardKeyPattern = cloner->_shardKeyPattern; + auto preImageShardKeyValues = + shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey); + + // If prepare was performed from another term, we will not have the post image doc key + // since it is not persisted in the oplog. + auto postImageDocKey = stmt.getPostImageDocumentKey(); + if (!postImageDocKey.isEmpty()) { + if (!cloner->_processUpdateForXferMod(preImageDocKey, postImageDocKey)) { + // We don't need to add this op to session migration if neither post or pre + // image doc falls within the chunk range. continue; } + } else { + // We can't perform reads here using the same recovery unit because the transaction + // is already committed. We instead defer performing the reads when xferMods command + // is called. Also allow this op to be added to session migration since we can't + // tell whether post image doc will fall within the chunk range. If it turns out + // both preImage and postImage doc don't fall into the chunk range, it is not wrong + // for this op to be added to session migration, but it will result in wasted work + // and unneccesary extra oplog storage on the destination. + cloner->_deferProcessingForXferMod(preImageDocKey); } + } else { + cloner->_addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {}); } addToSessionMigrationOptimeQueueIfNeeded(cloner, nss, _prepareOrCommitOpTime); - - cloner->_addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {}); } } @@ -800,11 +814,78 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx, return Status::OK(); } +bool MigrationChunkClonerSourceLegacy::_processUpdateForXferMod(const BSONObj& preImageDocKey, + const BSONObj& postImageDocKey) { + auto const& minKey = _args.getMin().value(); + auto const& maxKey = _args.getMax().value(); + + auto postShardKeyValues = _shardKeyPattern.extractShardKeyFromDocumentKey(postImageDocKey); + fassert(6836100, !postShardKeyValues.isEmpty()); + + auto opType = repl::OpTypeEnum::kUpdate; + auto idElement = preImageDocKey["_id"]; + + if (!isShardKeyValueInRange(postShardKeyValues, minKey, maxKey)) { + // If the preImageDoc is not in range but the postImageDoc was, we know that the + // document has changed shard keys and no longer belongs in the chunk being cloned. + // We will model the deletion of the preImage document so that the destination chunk + // does not receive an outdated version of this document. + + auto preImageShardKeyValues = + _shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey); + fassert(6836101, !preImageShardKeyValues.isEmpty()); + + if (!isShardKeyValueInRange(preImageShardKeyValues, minKey, maxKey)) { + return false; + } + + opType = repl::OpTypeEnum::kDelete; + idElement = postImageDocKey["_id"]; + } + + _addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {}); + + return true; +} + +void MigrationChunkClonerSourceLegacy::_deferProcessingForXferMod(const BSONObj& preImageDocKey) { + stdx::lock_guard<Latch> sl(_mutex); + _deferredReloadOrDeletePreImageDocKeys.push_back(preImageDocKey.getOwned()); + _deferredUntransferredOpsCounter++; +} + +void MigrationChunkClonerSourceLegacy::_processDeferredXferMods(OperationContext* opCtx, + Database* db) { + std::vector<BSONObj> deferredReloadOrDeletePreImageDocKeys; + + { + stdx::unique_lock lk(_mutex); + deferredReloadOrDeletePreImageDocKeys.swap(_deferredReloadOrDeletePreImageDocKeys); + } + + for (const auto& preImageDocKey : deferredReloadOrDeletePreImageDocKeys) { + auto idElement = preImageDocKey["_id"]; + BSONObj newerVersionDoc; + if (!Helpers::findById(opCtx, db, nss().ns(), BSON("_id" << idElement), newerVersionDoc)) { + // If the document can no longer be found, this means that another later op must have + // deleted it. That delete would have been captured by the xferMods so nothing else to + // do here. + continue; + } + + auto postImageDocKey = + CollectionMetadata::extractDocumentKey(&_shardKeyPattern, newerVersionDoc); + static_cast<void>(_processUpdateForXferMod(preImageDocKey, postImageDocKey)); + } +} + Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx, Database* db, BSONObjBuilder* builder) { dassert(opCtx->lockState()->isCollectionLockedForMode(nss(), MODE_IS)); + _processDeferredXferMods(opCtx, db); + std::list<BSONObj> deleteList; std::list<BSONObj> updateList; @@ -849,6 +930,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx, _untransferredDeletesCounter = _deleted.size(); _reload.splice(_reload.cbegin(), updateList); _untransferredUpsertsCounter = _reload.size(); + _deferredUntransferredOpsCounter = _deferredReloadOrDeletePreImageDocKeys.size(); return Status::OK(); } @@ -863,6 +945,8 @@ void MigrationChunkClonerSourceLegacy::_cleanup() { _untransferredUpsertsCounter = 0; _deleted.clear(); _untransferredDeletesCounter = 0; + _deferredReloadOrDeletePreImageDocKeys.clear(); + _deferredUntransferredOpsCounter = 0; } StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(OperationContext* opCtx, @@ -1111,7 +1195,8 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC stdx::lock_guard<Latch> sl(_mutex); int64_t untransferredModsSizeBytes = _untransferredDeletesCounter * _averageObjectIdSize + - _untransferredUpsertsCounter * _averageObjectSizeForCloneLocs; + (_untransferredUpsertsCounter + _deferredUntransferredOpsCounter) * + _averageObjectSizeForCloneLocs; if (_forceJumbo && _jumboChunkCloneState) { LOGV2(21992, @@ -1177,6 +1262,7 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC "moveChunk data transfer within threshold to allow write blocking", "_untransferredUpsertsCounter"_attr = _untransferredUpsertsCounter, "_untransferredDeletesCounter"_attr = _untransferredDeletesCounter, + "_deferredUntransferredOpsCounter"_attr = _deferredUntransferredOpsCounter, "_averageObjectSizeForCloneLocs"_attr = _averageObjectSizeForCloneLocs, "_averageObjectIdSize"_attr = _averageObjectIdSize, "untransferredModsSizeBytes"_attr = untransferredModsSizeBytes, diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 803c1f512af..4fff7da8d17 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -67,13 +67,13 @@ const long long kFixedCommandOverhead = 32 * 1024; */ class LogTransactionOperationsForShardingHandler final : public RecoveryUnit::Change { public: - /** - * Invariant: idObj should belong to a document that is part of the active chunk being migrated - */ - LogTransactionOperationsForShardingHandler(const LogicalSessionId lsid, + LogTransactionOperationsForShardingHandler(LogicalSessionId lsid, + const std::vector<repl::OplogEntry>& stmts, + repl::OpTime prepareOrCommitOpTime); + + LogTransactionOperationsForShardingHandler(LogicalSessionId lsid, const std::vector<repl::ReplOperation>& stmts, - const repl::OpTime& prepareOrCommitOpTime) - : _lsid(lsid), _stmts(stmts), _prepareOrCommitOpTime(prepareOrCommitOpTime) {} + repl::OpTime prepareOrCommitOpTime); void commit(boost::optional<Timestamp>) override; @@ -81,6 +81,8 @@ public: private: const LogicalSessionId _lsid; + // Use to keep BSON obj alive for the lifetime of this object. + std::vector<BSONObj> _ownedReplBSONObj; std::vector<repl::ReplOperation> _stmts; const repl::OpTime _prepareOrCommitOpTime; }; @@ -486,6 +488,23 @@ private: */ Status _checkRecipientCloningStatus(OperationContext* opCtx, Milliseconds maxTimeToWait); + /** + * Inspects the pre and post image document keys and determines which xferMods bucket to + * add a new entry. Returns false if neither pre or post image document keys fall into + * the chunk boundaries being migrated. + */ + bool _processUpdateForXferMod(const BSONObj& preImageDocKey, const BSONObj& postImageDocKey); + + /** + * Defer processing of update ops into xferMods entries to when nextModsBatch is called. + */ + void _deferProcessingForXferMod(const BSONObj& preImageDocKey); + + /** + * Converts all deferred update ops captured by the op observer into xferMods entries. + */ + void _processDeferredXferMods(OperationContext* opCtx, Database* database); + // The original move range request const ShardsvrMoveRange _args; @@ -547,6 +566,13 @@ private: // Amount of delete xfer mods that have not yet reached the recipient. size_t _untransferredDeletesCounter{0}; + // Amount of ops that are yet to be converted to update/delete xferMods. + size_t _deferredUntransferredOpsCounter{0}; + + // Stores document keys of document that needs to be examined if we need to put in to xferMods + // list later. + std::vector<BSONObj> _deferredReloadOrDeletePreImageDocKeys; + // Total bytes in _reload + _deleted (xfer mods) uint64_t _memoryUsed{0}; diff --git a/src/mongo/db/s/op_observer_sharding_impl.cpp b/src/mongo/db/s/op_observer_sharding_impl.cpp index ab8ce8ca5e8..860fe7ad050 100644 --- a/src/mongo/db/s/op_observer_sharding_impl.cpp +++ b/src/mongo/db/s/op_observer_sharding_impl.cpp @@ -241,4 +241,14 @@ void OpObserverShardingImpl::shardObserveTransactionPrepareOrUnpreparedCommit( *opCtx->getLogicalSessionId(), stmts, prepareOrCommitOptime)); } +void OpObserverShardingImpl::shardObserveNonPrimaryTransactionPrepare( + OperationContext* opCtx, + const std::vector<repl::OplogEntry>& stmts, + const repl::OpTime& prepareOrCommitOptime) { + + opCtx->recoveryUnit()->registerChange( + std::make_unique<LogTransactionOperationsForShardingHandler>( + *opCtx->getLogicalSessionId(), stmts, prepareOrCommitOptime)); +} + } // namespace mongo diff --git a/src/mongo/db/s/op_observer_sharding_impl.h b/src/mongo/db/s/op_observer_sharding_impl.h index f9005497c57..d7295482332 100644 --- a/src/mongo/db/s/op_observer_sharding_impl.h +++ b/src/mongo/db/s/op_observer_sharding_impl.h @@ -74,6 +74,10 @@ protected: OperationContext* opCtx, const std::vector<repl::ReplOperation>& stmts, const repl::OpTime& prepareOrCommitOptime) override; + void shardObserveNonPrimaryTransactionPrepare( + OperationContext* opCtx, + const std::vector<repl::OplogEntry>& stmts, + const repl::OpTime& prepareOrCommitOptime) override; }; } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_op_observer.h b/src/mongo/db/s/resharding/resharding_op_observer.h index 30d319a041d..e8affe3ef4a 100644 --- a/src/mongo/db/s/resharding/resharding_op_observer.h +++ b/src/mongo/db/s/resharding/resharding_op_observer.h @@ -228,6 +228,10 @@ public: size_t numberOfPrePostImagesToWrite, Date_t wallClockTime) override {} + void onTransactionPrepareNonPrimary(OperationContext* opCtx, + const std::vector<repl::OplogEntry>& statements, + const repl::OpTime& prepareOpTime) override {} + void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h index 5a0671254d6..78e84b664cf 100644 --- a/src/mongo/db/s/shard_server_op_observer.h +++ b/src/mongo/db/s/shard_server_op_observer.h @@ -207,6 +207,10 @@ public: size_t numberOfPrePostImagesToWrite, Date_t wallClockTime) override {} + void onTransactionPrepareNonPrimary(OperationContext* opCtx, + const std::vector<repl::OplogEntry>& statements, + const repl::OpTime& prepareOpTime) override {} + void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.h b/src/mongo/db/serverless/shard_split_donor_op_observer.h index 55796479ffe..d8b607db6df 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer.h +++ b/src/mongo/db/serverless/shard_split_donor_op_observer.h @@ -204,6 +204,10 @@ public: size_t numberOfPrePostImagesToWrite, Date_t wallClockTime) final {} + void onTransactionPrepareNonPrimary(OperationContext* opCtx, + const std::vector<repl::OplogEntry>& statements, + const repl::OpTime& prepareOpTime) final {} + void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} diff --git a/src/mongo/db/user_write_block_mode_op_observer.h b/src/mongo/db/user_write_block_mode_op_observer.h index 330779d0cd6..b021431befc 100644 --- a/src/mongo/db/user_write_block_mode_op_observer.h +++ b/src/mongo/db/user_write_block_mode_op_observer.h @@ -227,6 +227,10 @@ public: size_t numberOfPrePostImagesToWrite, Date_t wallClockTime) final {} + void onTransactionPrepareNonPrimary(OperationContext* opCtx, + const std::vector<repl::OplogEntry>& statements, + const repl::OpTime& prepareOpTime) final {} + void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} diff --git a/src/mongo/idl/cluster_server_parameter_op_observer.h b/src/mongo/idl/cluster_server_parameter_op_observer.h index 2ef05729e39..f227598fc12 100644 --- a/src/mongo/idl/cluster_server_parameter_op_observer.h +++ b/src/mongo/idl/cluster_server_parameter_op_observer.h @@ -212,6 +212,10 @@ public: size_t numberOfPrePostImagesToWrite, Date_t wallClockTime) final {} + void onTransactionPrepareNonPrimary(OperationContext* opCtx, + const std::vector<repl::OplogEntry>& statements, + const repl::OpTime& prepareOpTime) final {} + void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} |