summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2022-03-19 00:52:30 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-19 01:22:51 +0000
commit93cd5e2ab3c457741f708d821abc64a5d1f23d21 (patch)
treed9e3f692f6de19899ddf60a8b750c559269f6dd1 /src/mongo/db
parentc141ef8536d51f05a6fa4017de20286d154d09e1 (diff)
downloadmongo-93cd5e2ab3c457741f708d821abc64a5d1f23d21.tar.gz
SERVER-63494 Enable WouldChangeOwningShard update and findAndModify tests with chunk migrations
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/exec/update_stage.cpp10
-rw-r--r--src/mongo/db/exec/upsert_stage.cpp6
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp42
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp7
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp2
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp4
-rw-r--r--src/mongo/db/service_entry_point_common.cpp5
-rw-r--r--src/mongo/db/transaction_participant.cpp9
-rw-r--r--src/mongo/db/transaction_participant.h4
9 files changed, 65 insertions, 24 deletions
diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp
index 73fd5b67054..2d4a5217308 100644
--- a/src/mongo/db/exec/update_stage.cpp
+++ b/src/mongo/db/exec/update_stage.cpp
@@ -671,9 +671,10 @@ bool UpdateStage::wasReshardingKeyUpdated(const ShardingWriteRouter& shardingWri
auto oldRecipShard = *shardingWriteRouter.getReshardingDestinedRecipient(oldObj.value());
auto newRecipShard = *shardingWriteRouter.getReshardingDestinedRecipient(newObj);
- uassert(WouldChangeOwningShardInfo(oldObj.value(), newObj, false /* upsert */),
- "This update would cause the doc to change owning shards under the new shard key",
- oldRecipShard == newRecipShard);
+ uassert(
+ WouldChangeOwningShardInfo(oldObj.value(), newObj, false /* upsert */, collection()->ns()),
+ "This update would cause the doc to change owning shards under the new shard key",
+ oldRecipShard == newRecipShard);
return true;
}
@@ -750,7 +751,8 @@ bool UpdateStage::wasExistingShardKeyUpdated(const ShardingWriteRouter& sharding
hangBeforeThrowWouldChangeOwningShard.pauseWhileSet(opCtx());
}
- uasserted(WouldChangeOwningShardInfo(oldObj.value(), newObj, false /* upsert */),
+ uasserted(WouldChangeOwningShardInfo(
+ oldObj.value(), newObj, false /* upsert */, collection()->ns()),
"This update would cause the doc to change owning shards");
}
diff --git a/src/mongo/db/exec/upsert_stage.cpp b/src/mongo/db/exec/upsert_stage.cpp
index 606032e9fef..beab4e6a9d7 100644
--- a/src/mongo/db/exec/upsert_stage.cpp
+++ b/src/mongo/db/exec/upsert_stage.cpp
@@ -144,8 +144,10 @@ void UpsertStage::_performInsert(BSONObj newDocument) {
"upserts are only allowed when running in a transaction or with "
"retryWrites: true.",
opCtx()->getTxnNumber());
- uasserted(WouldChangeOwningShardInfo(
- _params.request->getQuery(), newDocument, true /* upsert */),
+ uasserted(WouldChangeOwningShardInfo(_params.request->getQuery(),
+ newDocument,
+ true /* upsert */,
+ collection()->ns()),
"The document we are inserting belongs on a different shard");
}
}
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 27f56dc97d9..4d83ec03723 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -40,6 +40,8 @@
#include "mongo/db/dbhelpers.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/index/index_access_method.h"
+#include "mongo/db/index/index_descriptor.h"
+#include "mongo/db/ops/write_ops_retryability.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/s/collection_sharding_runtime.h"
@@ -158,9 +160,32 @@ private:
void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestamp>) {
std::set<NamespaceString> namespacesTouchedByTransaction;
+ // Inform the session migration subsystem that a transaction has committed for the given
+ // namespace.
+ auto addToSessionMigrationOptimeQueue =
+ [&namespacesTouchedByTransaction](MigrationChunkClonerSourceLegacy* const cloner,
+ const NamespaceString& nss,
+ const repl::OpTime opTime) {
+ if (namespacesTouchedByTransaction.find(nss) == namespacesTouchedByTransaction.end()) {
+ cloner->_addToSessionMigrationOptimeQueue(
+ opTime, SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction);
+
+ namespacesTouchedByTransaction.emplace(nss);
+ }
+ };
+
for (const auto& stmt : _stmts) {
auto opType = stmt.getOpType();
- if (opType == repl::OpTypeEnum::kNoop) {
+
+ // Skip every noop entry except for a WouldChangeOwningShard (WCOS) sentinel noop entry
+ // since for an internal transaction for a retryable WCOS findAndModify that is an upsert,
+ // the applyOps oplog entry on the old owning shard would not have the insert entry; so if
+ // we skip the noop entry here, the write history for the internal transaction would not get
+ // transferred to the recipient since the _prepareOrCommitOpTime would not get added to the
+ // session migration opTime queue below, and this would cause the write to execute again if
+ // there is a retry after the migration.
+ if (opType == repl::OpTypeEnum::kNoop &&
+ !isWouldChangeOwningShardSentinelOplogEntry(stmt)) {
continue;
}
@@ -177,6 +202,11 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam
}
auto* const cloner = dynamic_cast<MigrationChunkClonerSourceLegacy*>(clonerPtr.get());
+ if (isWouldChangeOwningShardSentinelOplogEntry(stmt)) {
+ addToSessionMigrationOptimeQueue(cloner, nss, _prepareOrCommitOpTime);
+ continue;
+ }
+
auto documentKey = getDocumentKeyFromReplOperation(stmt, opType);
auto idElement = documentKey["_id"];
@@ -207,15 +237,7 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam
}
}
- // Inform the session migration subsystem that a transaction has committed for all involved
- // namespaces.
- if (namespacesTouchedByTransaction.find(nss) == namespacesTouchedByTransaction.end()) {
- cloner->_addToSessionMigrationOptimeQueue(
- _prepareOrCommitOpTime,
- SessionCatalogMigrationSource::EntryAtOpTimeType::kTransaction);
-
- namespacesTouchedByTransaction.emplace(nss);
- }
+ addToSessionMigrationOptimeQueue(cloner, nss, _prepareOrCommitOpTime);
// Pass an empty prePostOpTime to the queue because retryable write history doesn't care
// about writes in transactions.
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index afa092fe137..5505acde25e 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -277,8 +277,13 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
throw;
}
- if (!result.isPrePostImage)
+ if (!result.isPrePostImage && !isWouldChangeOwningShardSentinelOplogEntry(oplogEntry)) {
+ // Do not overwrite the "o" field if this is a pre/post image oplog entry. Also do not
+ // overwrite it if this is a WouldChangeOwningShard sentinel oplog entry since it contains
+ // a special BSONObj used for making retries fail with an IncompleteTransactionHistory
+ // error.
oplogEntry.setObject(SessionCatalogMigration::kSessionOplogTag);
+ }
setPrePostImageTs(lastResult, &oplogEntry);
oplogEntry.setPrevWriteOpTimeInTransaction(txnParticipant.getLastWriteOpTime());
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index a79cefc1973..f49e863c224 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -431,7 +431,7 @@ void SessionCatalogMigrationSource::_extractOplogEntriesForInternalTransactionFo
continue;
}
- if (replOp.getNss() != _ns && !isWouldChangeOwningShardSentinelOplogEntry(replOp)) {
+ if (replOp.getNss() != _ns) {
// Skip this operation since it does not involve the namespace being migrated.
continue;
}
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index 1427c74bafe..e295b3e76d4 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -983,7 +983,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
makeDurableReplOp(repl::OpTypeEnum::kInsert, kOtherNs, BSON("x" << -5), BSONObj(), {5});
// WouldChangeOwningShard sentinel op.
auto op6 = makeDurableReplOp(
- repl::OpTypeEnum::kNoop, {}, kWouldChangeOwningShardSentinel, BSONObj(), {6});
+ repl::OpTypeEnum::kNoop, kNs, kWouldChangeOwningShardSentinel, BSONObj(), {6});
auto applyOpsOpTime1 = repl::OpTime(Timestamp(130, 1), 1);
auto entry1 = makeApplyOpsOplogEntry(applyOpsOpTime1,
@@ -1501,7 +1501,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
makeDurableReplOp(repl::OpTypeEnum::kInsert, kOtherNs, BSON("x" << -5), BSONObj(), {5});
// WouldChangeOwningShard sentinel op.
auto op6 = makeDurableReplOp(
- repl::OpTypeEnum::kNoop, {}, kWouldChangeOwningShardSentinel, BSONObj(), {6});
+ repl::OpTypeEnum::kNoop, kNs, kWouldChangeOwningShardSentinel, BSONObj(), {6});
auto applyOpsOpTime1 = repl::OpTime(Timestamp(opTimeSecs, 1), 1);
auto entry1 = makeApplyOpsOplogEntry(applyOpsOpTime1,
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index ec7eccf6c9d..0114da44b14 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -103,6 +103,7 @@
#include "mongo/rpc/reply_builder_interface.h"
#include "mongo/rpc/warn_deprecated_wire_ops.h"
#include "mongo/s/shard_cannot_refresh_due_to_locks_held_exception.h"
+#include "mongo/s/would_change_owning_shard_exception.h"
#include "mongo/transport/hello_metrics.h"
#include "mongo/transport/service_executor.h"
#include "mongo/transport/session.h"
@@ -988,7 +989,9 @@ void CheckoutSessionAndInvokeCommand::_tapError(Status status) {
// in the transaction's participant list, so it is guaranteed to learn its outcome.
_stashTransaction();
} else if (status.code() == ErrorCodes::WouldChangeOwningShard) {
- _txnParticipant->handleWouldChangeOwningShardError(opCtx);
+ auto wouldChangeOwningShardInfo = status.extraInfo<WouldChangeOwningShardInfo>();
+ invariant(wouldChangeOwningShardInfo);
+ _txnParticipant->handleWouldChangeOwningShardError(opCtx, wouldChangeOwningShardInfo);
_stashTransaction();
auto txnResponseMetadata = _txnParticipant->getResponseMetadata();
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index f4bcb4abdc8..f154c9695d2 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -71,6 +71,7 @@
#include "mongo/db/txn_retry_counter_too_old_info.h"
#include "mongo/db/vector_clock_mutable.h"
#include "mongo/logv2/log.h"
+#include "mongo/s/would_change_owning_shard_exception.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/log_with_sampling.h"
#include "mongo/util/net/socket_utils.h"
@@ -3083,7 +3084,8 @@ void TransactionParticipant::Participant::addCommittedStmtIds(
}
void TransactionParticipant::Participant::handleWouldChangeOwningShardError(
- OperationContext* opCtx) {
+ OperationContext* opCtx,
+ std::shared_ptr<const WouldChangeOwningShardInfo> wouldChangeOwningShardInfo) {
if (o().txnState.isNone() && p().autoCommit == boost::none) {
// If this was a retryable write, reset the transaction state so this participant can be
// reused for the transaction mongos will use to handle the WouldChangeOwningShard error.
@@ -3105,9 +3107,12 @@ void TransactionParticipant::Participant::handleWouldChangeOwningShardError(
p().autoCommit != boost::none);
repl::ReplOperation operation;
operation.setOpType(repl::OpTypeEnum::kNoop);
- operation.setNss(NamespaceString());
operation.setObject(kWouldChangeOwningShardSentinel);
+ // Required by chunk migration.
+ invariant(wouldChangeOwningShardInfo->getNs());
+ operation.setNss(*wouldChangeOwningShardInfo->getNs());
+
// The operation that triggers WouldChangeOwningShard should always be the first in its
// transaction.
operation.setInitializedStatementIds({0});
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index fdaacdddcff..31d03b89740 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -780,7 +780,9 @@ public:
* Handles a WouldChangeOwningShard error based on whether the operation that triggered it
* was a retryable write or in a retryable transaction.
*/
- void handleWouldChangeOwningShardError(OperationContext* opCtx);
+ void handleWouldChangeOwningShardError(
+ OperationContext* opCtx,
+ std::shared_ptr<const WouldChangeOwningShardInfo> wouldChangeOwningShardInfo);
private:
// Checks whether the given statementId for the specified transaction has already executed