diff options
author | Cheahuychou Mao <mao.cheahuychou@gmail.com> | 2022-03-19 00:52:30 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-19 01:22:51 +0000 |
commit | 93cd5e2ab3c457741f708d821abc64a5d1f23d21 (patch) | |
tree | d9e3f692f6de19899ddf60a8b750c559269f6dd1 /src/mongo/db | |
parent | c141ef8536d51f05a6fa4017de20286d154d09e1 (diff) | |
download | mongo-93cd5e2ab3c457741f708d821abc64a5d1f23d21.tar.gz |
SERVER-63494 Enable WouldChangeOwningShard update and findAndModify tests with chunk migrations
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/exec/update_stage.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/exec/upsert_stage.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp | 42 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_source.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_source_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 4 |
9 files changed, 65 insertions, 24 deletions
diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp index 73fd5b67054..2d4a5217308 100644 --- a/src/mongo/db/exec/update_stage.cpp +++ b/src/mongo/db/exec/update_stage.cpp @@ -671,9 +671,10 @@ bool UpdateStage::wasReshardingKeyUpdated(const ShardingWriteRouter& shardingWri auto oldRecipShard = *shardingWriteRouter.getReshardingDestinedRecipient(oldObj.value()); auto newRecipShard = *shardingWriteRouter.getReshardingDestinedRecipient(newObj); - uassert(WouldChangeOwningShardInfo(oldObj.value(), newObj, false /* upsert */), - "This update would cause the doc to change owning shards under the new shard key", - oldRecipShard == newRecipShard); + uassert( + WouldChangeOwningShardInfo(oldObj.value(), newObj, false /* upsert */, collection()->ns()), + "This update would cause the doc to change owning shards under the new shard key", + oldRecipShard == newRecipShard); return true; } @@ -750,7 +751,8 @@ bool UpdateStage::wasExistingShardKeyUpdated(const ShardingWriteRouter& sharding hangBeforeThrowWouldChangeOwningShard.pauseWhileSet(opCtx()); } - uasserted(WouldChangeOwningShardInfo(oldObj.value(), newObj, false /* upsert */), + uasserted(WouldChangeOwningShardInfo( + oldObj.value(), newObj, false /* upsert */, collection()->ns()), "This update would cause the doc to change owning shards"); } diff --git a/src/mongo/db/exec/upsert_stage.cpp b/src/mongo/db/exec/upsert_stage.cpp index 606032e9fef..beab4e6a9d7 100644 --- a/src/mongo/db/exec/upsert_stage.cpp +++ b/src/mongo/db/exec/upsert_stage.cpp @@ -144,8 +144,10 @@ void UpsertStage::_performInsert(BSONObj newDocument) { "upserts are only allowed when running in a transaction or with " "retryWrites: true.", opCtx()->getTxnNumber()); - uasserted(WouldChangeOwningShardInfo( - _params.request->getQuery(), newDocument, true /* upsert */), + uasserted(WouldChangeOwningShardInfo(_params.request->getQuery(), + newDocument, + true /* upsert */, + collection()->ns()), "The document we are inserting belongs on a different shard"); } } diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 27f56dc97d9..4d83ec03723 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -40,6 +40,8 @@ #include "mongo/db/dbhelpers.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/index/index_access_method.h" +#include "mongo/db/index/index_descriptor.h" +#include "mongo/db/ops/write_ops_retryability.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_process.h" #include "mongo/db/s/collection_sharding_runtime.h" @@ -158,9 +160,32 @@ private: void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestamp>) { std::set<NamespaceString> namespacesTouchedByTransaction; + // Inform the session migration subsystem that a transaction has committed for the given + // namespace. + auto addToSessionMigrationOptimeQueue = + [&namespacesTouchedByTransaction](MigrationChunkClonerSourceLegacy* const cloner, + const NamespaceString& nss, + const repl::OpTime opTime) { + if (namespacesTouchedByTransaction.find(nss) == namespacesTouchedByTransaction.end()) { + cloner->_addToSessionMigrationOptimeQueue( + opTime, SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction); + + namespacesTouchedByTransaction.emplace(nss); + } + }; + for (const auto& stmt : _stmts) { auto opType = stmt.getOpType(); - if (opType == repl::OpTypeEnum::kNoop) { + + // Skip every noop entry except for a WouldChangeOwningShard (WCOS) sentinel noop entry + // since for an internal transaction for a retryable WCOS findAndModify that is an upsert, + // the applyOps oplog entry on the old owning shard would not have the insert entry; so if + // we skip the noop entry here, the write history for the internal transaction would not get + // transferred to the recipient since the _prepareOrCommitOpTime would not get added to the + // session migration opTime queue below, and this would cause the write to execute again if + // there is a retry after the migration. + if (opType == repl::OpTypeEnum::kNoop && + !isWouldChangeOwningShardSentinelOplogEntry(stmt)) { continue; } @@ -177,6 +202,11 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam } auto* const cloner = dynamic_cast<MigrationChunkClonerSourceLegacy*>(clonerPtr.get()); + if (isWouldChangeOwningShardSentinelOplogEntry(stmt)) { + addToSessionMigrationOptimeQueue(cloner, nss, _prepareOrCommitOpTime); + continue; + } + auto documentKey = getDocumentKeyFromReplOperation(stmt, opType); auto idElement = documentKey["_id"]; @@ -207,15 +237,7 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam } } - // Inform the session migration subsystem that a transaction has committed for all involved - // namespaces. - if (namespacesTouchedByTransaction.find(nss) == namespacesTouchedByTransaction.end()) { - cloner->_addToSessionMigrationOptimeQueue( - _prepareOrCommitOpTime, - SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction); - - namespacesTouchedByTransaction.emplace(nss); - } + addToSessionMigrationOptimeQueue(cloner, nss, _prepareOrCommitOpTime); // Pass an empty prePostOpTime to the queue because retryable write history doesn't care // about writes in transactions. diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index afa092fe137..5505acde25e 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -277,8 +277,13 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, throw; } - if (!result.isPrePostImage) + if (!result.isPrePostImage && !isWouldChangeOwningShardSentinelOplogEntry(oplogEntry)) { + // Do not overwrite the "o" field if this is a pre/post image oplog entry. Also do not + // overwrite it if this is a WouldChangeOwningShard sentinel oplog entry since it contains + // a special BSONObj used for making retries fail with an IncompleteTransactionHistory + // error. oplogEntry.setObject(SessionCatalogMigration::kSessionOplogTag); + } setPrePostImageTs(lastResult, &oplogEntry); oplogEntry.setPrevWriteOpTimeInTransaction(txnParticipant.getLastWriteOpTime()); diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index a79cefc1973..f49e863c224 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -431,7 +431,7 @@ void SessionCatalogMigrationSource::_extractOplogEntriesForInternalTransactionFo continue; } - if (replOp.getNss() != _ns && !isWouldChangeOwningShardSentinelOplogEntry(replOp)) { + if (replOp.getNss() != _ns) { // Skip this operation since it does not involve the namespace being migrated. continue; } diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp index 1427c74bafe..e295b3e76d4 100644 --- a/src/mongo/db/s/session_catalog_migration_source_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp @@ -983,7 +983,7 @@ TEST_F(SessionCatalogMigrationSourceTest, makeDurableReplOp(repl::OpTypeEnum::kInsert, kOtherNs, BSON("x" << -5), BSONObj(), {5}); // WouldChangeOwningShard sentinel op. auto op6 = makeDurableReplOp( - repl::OpTypeEnum::kNoop, {}, kWouldChangeOwningShardSentinel, BSONObj(), {6}); + repl::OpTypeEnum::kNoop, kNs, kWouldChangeOwningShardSentinel, BSONObj(), {6}); auto applyOpsOpTime1 = repl::OpTime(Timestamp(130, 1), 1); auto entry1 = makeApplyOpsOplogEntry(applyOpsOpTime1, @@ -1501,7 +1501,7 @@ TEST_F(SessionCatalogMigrationSourceTest, makeDurableReplOp(repl::OpTypeEnum::kInsert, kOtherNs, BSON("x" << -5), BSONObj(), {5}); // WouldChangeOwningShard sentinel op. auto op6 = makeDurableReplOp( - repl::OpTypeEnum::kNoop, {}, kWouldChangeOwningShardSentinel, BSONObj(), {6}); + repl::OpTypeEnum::kNoop, kNs, kWouldChangeOwningShardSentinel, BSONObj(), {6}); auto applyOpsOpTime1 = repl::OpTime(Timestamp(opTimeSecs, 1), 1); auto entry1 = makeApplyOpsOplogEntry(applyOpsOpTime1, diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index ec7eccf6c9d..0114da44b14 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -103,6 +103,7 @@ #include "mongo/rpc/reply_builder_interface.h" #include "mongo/rpc/warn_deprecated_wire_ops.h" #include "mongo/s/shard_cannot_refresh_due_to_locks_held_exception.h" +#include "mongo/s/would_change_owning_shard_exception.h" #include "mongo/transport/hello_metrics.h" #include "mongo/transport/service_executor.h" #include "mongo/transport/session.h" @@ -988,7 +989,9 @@ void CheckoutSessionAndInvokeCommand::_tapError(Status status) { // in the transaction's participant list, so it is guaranteed to learn its outcome. _stashTransaction(); } else if (status.code() == ErrorCodes::WouldChangeOwningShard) { - _txnParticipant->handleWouldChangeOwningShardError(opCtx); + auto wouldChangeOwningShardInfo = status.extraInfo<WouldChangeOwningShardInfo>(); + invariant(wouldChangeOwningShardInfo); + _txnParticipant->handleWouldChangeOwningShardError(opCtx, wouldChangeOwningShardInfo); _stashTransaction(); auto txnResponseMetadata = _txnParticipant->getResponseMetadata(); diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index f4bcb4abdc8..f154c9695d2 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -71,6 +71,7 @@ #include "mongo/db/txn_retry_counter_too_old_info.h" #include "mongo/db/vector_clock_mutable.h" #include "mongo/logv2/log.h" +#include "mongo/s/would_change_owning_shard_exception.h" #include "mongo/util/fail_point.h" #include "mongo/util/log_with_sampling.h" #include "mongo/util/net/socket_utils.h" @@ -3083,7 +3084,8 @@ void TransactionParticipant::Participant::addCommittedStmtIds( } void TransactionParticipant::Participant::handleWouldChangeOwningShardError( - OperationContext* opCtx) { + OperationContext* opCtx, + std::shared_ptr<const WouldChangeOwningShardInfo> wouldChangeOwningShardInfo) { if (o().txnState.isNone() && p().autoCommit == boost::none) { // If this was a retryable write, reset the transaction state so this participant can be // reused for the transaction mongos will use to handle the WouldChangeOwningShard error. @@ -3105,9 +3107,12 @@ void TransactionParticipant::Participant::handleWouldChangeOwningShardError( p().autoCommit != boost::none); repl::ReplOperation operation; operation.setOpType(repl::OpTypeEnum::kNoop); - operation.setNss(NamespaceString()); operation.setObject(kWouldChangeOwningShardSentinel); + // Required by chunk migration. + invariant(wouldChangeOwningShardInfo->getNs()); + operation.setNss(*wouldChangeOwningShardInfo->getNs()); + // The operation that triggers WouldChangeOwningShard should always be the first in its // transaction. operation.setInitializedStatementIds({0}); diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index fdaacdddcff..31d03b89740 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -780,7 +780,9 @@ public: * Handles a WouldChangeOwningShard error based on whether the operation that triggered it * was a retryable write or in a retryable transaction. */ - void handleWouldChangeOwningShardError(OperationContext* opCtx); + void handleWouldChangeOwningShardError( + OperationContext* opCtx, + std::shared_ptr<const WouldChangeOwningShardInfo> wouldChangeOwningShardInfo); private: // Checks whether the given statementId for the specified transaction has already executed |