summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2022-02-28 19:31:31 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-28 20:16:11 +0000
commitb17dfaaec98f63c16d05c9f76768df98be78699e (patch)
treee1ef6535504b8b6658f58a72a1f51292879023ab /src/mongo/s
parentcd3d711e8c208c3766717ded817fcc4130db2d42 (diff)
downloadmongo-b17dfaaec98f63c16d05c9f76768df98be78699e.tar.gz
SERVER-59186 Use transaction API for all current changing a document's shard key logic
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/commands/cluster_find_and_modify_cmd.cpp211
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp301
-rw-r--r--src/mongo/s/commands/document_shard_key_update_test.cpp4
-rw-r--r--src/mongo/s/commands/document_shard_key_update_util.cpp100
-rw-r--r--src/mongo/s/commands/document_shard_key_update_util.h33
-rw-r--r--src/mongo/s/commands/strategy.cpp32
-rw-r--r--src/mongo/s/session_catalog_router.cpp19
-rw-r--r--src/mongo/s/session_catalog_router.h10
-rw-r--r--src/mongo/s/transaction_router.cpp25
-rw-r--r--src/mongo/s/transaction_router.h5
-rw-r--r--src/mongo/s/transaction_router_resource_yielder.cpp55
-rw-r--r--src/mongo/s/transaction_router_resource_yielder.h63
-rw-r--r--src/mongo/s/write_ops/batched_command_request.cpp7
-rw-r--r--src/mongo/s/write_ops/batched_command_request.h2
15 files changed, 703 insertions, 165 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index b4490ae4821..22dab3091e2 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -52,6 +52,7 @@ env.Library(
'router_transactions_stats.idl',
'session_catalog_router.cpp',
'stale_shard_version_helpers.cpp',
+ 'transaction_router_resource_yielder.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/commands/txn_cmd_request',
diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
index 77b925d92a9..2a6e4c6ef13 100644
--- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
@@ -61,6 +61,7 @@
#include "mongo/s/session_catalog_router.h"
#include "mongo/s/stale_exception.h"
#include "mongo/s/transaction_router.h"
+#include "mongo/s/transaction_router_resource_yielder.h"
#include "mongo/s/would_change_owning_shard_exception.h"
#include "mongo/util/timer.h"
@@ -134,41 +135,140 @@ void handleWouldChangeOwningShardErrorRetryableWrite(
const NamespaceString& nss,
const write_ops::FindAndModifyCommandRequest& request,
BSONObjBuilder* result) {
- auto txn = txn_api::TransactionWithRetries(
- opCtx, Grid::get(opCtx)->getExecutorPool()->getFixedExecutor());
+ auto txn =
+ txn_api::TransactionWithRetries(opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(),
+ TransactionRouterResourceYielder::make());
- auto swResult = txn.runSyncNoThrow(
- opCtx,
- [cmdObj = request.toBSON({}), nss, result](const txn_api::TransactionClient& txnClient,
- ExecutorPtr txnExec) {
- auto res = txnClient.runCommand(nss.db(), cmdObj).get();
- uassertStatusOK(getStatusFromCommandResult(res));
+ // Shared state for the transaction API use below.
+ struct SharedBlock {
+ SharedBlock(NamespaceString nss_) : nss(nss_) {}
- result->appendElementsUnique(
- CommandHelpers::filterCommandReplyForPassthrough(res.removeField("recoveryToken")));
+ NamespaceString nss;
+ BSONObj response;
+ };
+ auto sharedBlock = std::make_shared<SharedBlock>(nss);
- return SemiFuture<void>::makeReady();
+ auto swCommitResult = txn.runSyncNoThrow(
+ opCtx,
+ [cmdObj = request.toBSON({}), sharedBlock](const txn_api::TransactionClient& txnClient,
+ ExecutorPtr txnExec) {
+ return txnClient.runCommand(sharedBlock->nss.db(), cmdObj)
+ .thenRunOn(txnExec)
+ .then([sharedBlock](auto res) {
+ uassertStatusOK(getStatusFromCommandResult(res));
+
+ sharedBlock->response = CommandHelpers::filterCommandReplyForPassthrough(
+ res.removeField("recoveryToken"));
+ })
+ .semi();
});
- auto cmdStatus = swResult.getStatus();
- if (cmdStatus != ErrorCodes::DuplicateKey ||
- (cmdStatus == ErrorCodes::DuplicateKey &&
- !cmdStatus.extraInfo<DuplicateKeyErrorInfo>()->getKeyPattern().hasField("_id"))) {
- cmdStatus.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext);
+ result->appendElementsUnique(
+ CommandHelpers::filterCommandReplyForPassthrough(sharedBlock->response));
+
+ auto bodyStatus = swCommitResult.getStatus();
+ if (bodyStatus != ErrorCodes::DuplicateKey ||
+ (bodyStatus == ErrorCodes::DuplicateKey &&
+ !bodyStatus.extraInfo<DuplicateKeyErrorInfo>()->getKeyPattern().hasField("_id"))) {
+ bodyStatus.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext);
};
- uassertStatusOK(cmdStatus);
+ uassertStatusOK(bodyStatus);
- const auto& wcError = swResult.getValue().wcError;
+ uassertStatusOK(swCommitResult.getValue().cmdStatus);
+ const auto& wcError = swCommitResult.getValue().wcError;
if (!wcError.toStatus().isOK()) {
appendWriteConcernErrorDetailToCmdResponse(shardId, wcError, *result);
}
}
-void updateShardKeyValueOnWouldChangeOwningShardError(OperationContext* opCtx,
- const NamespaceString nss,
- Status responseStatus,
- const BSONObj& cmdObj,
- BSONObjBuilder* result) {
+void updateReplyOnWouldChangeOwningShardSuccess(bool matchedDocOrUpserted,
+ const WouldChangeOwningShardInfo& changeInfo,
+ bool shouldReturnPostImage,
+ BSONObjBuilder* result) {
+ auto upserted = matchedDocOrUpserted && changeInfo.getShouldUpsert();
+ auto updatedExistingDocument = matchedDocOrUpserted && !upserted;
+
+ BSONObjBuilder lastErrorObjBuilder(result->subobjStart("lastErrorObject"));
+ lastErrorObjBuilder.appendNumber("n", matchedDocOrUpserted ? 1 : 0);
+ lastErrorObjBuilder.appendBool("updatedExisting", updatedExistingDocument);
+ if (upserted) {
+ lastErrorObjBuilder.appendAs(changeInfo.getPostImage()["_id"], "upserted");
+ }
+ lastErrorObjBuilder.doneFast();
+
+ if (updatedExistingDocument) {
+ result->append(
+ "value", shouldReturnPostImage ? changeInfo.getPostImage() : changeInfo.getPreImage());
+ } else if (upserted && shouldReturnPostImage) {
+ result->append("value", changeInfo.getPostImage());
+ } else {
+ result->appendNull("value");
+ }
+ result->append("ok", 1.0);
+}
+
+void handleWouldChangeOwningShardErrorTransaction(
+ OperationContext* opCtx,
+ const NamespaceString nss,
+ Status responseStatus,
+ const write_ops::FindAndModifyCommandRequest& request,
+ BSONObjBuilder* result) {
+
+ BSONObjBuilder extraInfoBuilder;
+ responseStatus.extraInfo()->serialize(&extraInfoBuilder);
+ auto extraInfo = extraInfoBuilder.obj();
+
+ // Shared state for the transaction API use below.
+ struct SharedBlock {
+ SharedBlock(WouldChangeOwningShardInfo changeInfo_, NamespaceString nss_)
+ : changeInfo(changeInfo_), nss(nss_) {}
+
+ WouldChangeOwningShardInfo changeInfo;
+ NamespaceString nss;
+ bool matchedDocOrUpserted{false};
+ };
+ auto sharedBlock = std::make_shared<SharedBlock>(
+ WouldChangeOwningShardInfo::parseFromCommandError(extraInfo), nss);
+
+ try {
+ auto txn =
+ txn_api::TransactionWithRetries(opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(),
+ TransactionRouterResourceYielder::make());
+
+ txn.runSync(opCtx,
+ [sharedBlock](const txn_api::TransactionClient& txnClient,
+ ExecutorPtr txnExec) -> SemiFuture<void> {
+ return documentShardKeyUpdateUtil::updateShardKeyForDocument(
+ txnClient, txnExec, sharedBlock->nss, sharedBlock->changeInfo)
+ .thenRunOn(txnExec)
+ .then([sharedBlock](bool matchedDocOrUpserted) {
+ sharedBlock->matchedDocOrUpserted = matchedDocOrUpserted;
+ })
+ .semi();
+ });
+
+ auto shouldReturnPostImage = request.getNew() && *request.getNew();
+ updateReplyOnWouldChangeOwningShardSuccess(sharedBlock->matchedDocOrUpserted,
+ sharedBlock->changeInfo,
+ shouldReturnPostImage,
+ result);
+ } catch (DBException& e) {
+ if (e.code() == ErrorCodes::DuplicateKey &&
+ e.extraInfo<DuplicateKeyErrorInfo>()->getKeyPattern().hasField("_id")) {
+ e.addContext(documentShardKeyUpdateUtil::kDuplicateKeyErrorContext);
+ }
+ e.addContext("findAndModify");
+ throw;
+ }
+}
+
+void handleWouldChangeOwningShardErrorTransactionLegacy(OperationContext* opCtx,
+ const NamespaceString nss,
+ Status responseStatus,
+ const BSONObj& cmdObj,
+ BSONObjBuilder* result) {
BSONObjBuilder extraInfoBuilder;
responseStatus.extraInfo()->serialize(&extraInfoBuilder);
auto extraInfo = extraInfoBuilder.obj();
@@ -176,31 +276,12 @@ void updateShardKeyValueOnWouldChangeOwningShardError(OperationContext* opCtx,
WouldChangeOwningShardInfo::parseFromCommandError(extraInfo);
try {
- auto matchedDocOrUpserted = documentShardKeyUpdateUtil::updateShardKeyForDocument(
+ auto matchedDocOrUpserted = documentShardKeyUpdateUtil::updateShardKeyForDocumentLegacy(
opCtx, nss, wouldChangeOwningShardExtraInfo);
- auto upserted = matchedDocOrUpserted && wouldChangeOwningShardExtraInfo.getShouldUpsert();
- auto updatedExistingDocument = matchedDocOrUpserted && !upserted;
-
- BSONObjBuilder lastErrorObjBuilder(result->subobjStart("lastErrorObject"));
- lastErrorObjBuilder.appendNumber("n", matchedDocOrUpserted ? 1 : 0);
- lastErrorObjBuilder.appendBool("updatedExisting", updatedExistingDocument);
- if (upserted) {
- lastErrorObjBuilder.appendAs(wouldChangeOwningShardExtraInfo.getPostImage()["_id"],
- "upserted");
- }
- lastErrorObjBuilder.doneFast();
auto shouldReturnPostImage = cmdObj.getBoolField("new");
- if (updatedExistingDocument) {
- result->append("value",
- shouldReturnPostImage ? wouldChangeOwningShardExtraInfo.getPostImage()
- : wouldChangeOwningShardExtraInfo.getPreImage());
- } else if (upserted && shouldReturnPostImage) {
- result->append("value", wouldChangeOwningShardExtraInfo.getPostImage());
- } else {
- result->appendNull("value");
- }
- result->append("ok", 1.0);
+ updateReplyOnWouldChangeOwningShardSuccess(
+ matchedDocOrUpserted, wouldChangeOwningShardExtraInfo, shouldReturnPostImage, result);
} catch (DBException& e) {
if (e.code() == ErrorCodes::DuplicateKey &&
e.extraInfo<DuplicateKeyErrorInfo>()->getKeyPattern().hasField("_id")) {
@@ -443,28 +524,34 @@ private:
}
if (responseStatus.code() == ErrorCodes::WouldChangeOwningShard) {
- if (isRetryableWrite) {
- if (feature_flags::gFeatureFlagInternalTransactions.isEnabled(
- serverGlobalParams.featureCompatibility)) {
- auto parsedRequest = write_ops::FindAndModifyCommandRequest::parse(
- IDLParserErrorContext("ClusterFindAndModify"), cmdObj);
- // Strip write concern because this command will be sent as part of a
- // transaction and the write concern has already been loaded onto the opCtx and
- // will be picked up by the transaction API.
- //
- // Strip runtime constants because they will be added again when this command is
- // recursively sent through the service entry point.
- parsedRequest.setWriteConcern(boost::none);
- parsedRequest.setLegacyRuntimeConstants(boost::none);
+ if (feature_flags::gFeatureFlagInternalTransactions.isEnabled(
+ serverGlobalParams.featureCompatibility)) {
+ auto parsedRequest = write_ops::FindAndModifyCommandRequest::parse(
+ IDLParserErrorContext("ClusterFindAndModify"), cmdObj);
+ // Strip write concern because this command will be sent as part of a
+ // transaction and the write concern has already been loaded onto the opCtx and
+ // will be picked up by the transaction API.
+ parsedRequest.setWriteConcern(boost::none);
+
+ // Strip runtime constants because they will be added again when this command is
+ // recursively sent through the service entry point.
+ parsedRequest.setLegacyRuntimeConstants(boost::none);
+ if (isRetryableWrite) {
handleWouldChangeOwningShardErrorRetryableWrite(
opCtx, shardId, nss, parsedRequest, result);
} else {
+ handleWouldChangeOwningShardErrorTransaction(
+ opCtx, nss, responseStatus, parsedRequest, result);
+ }
+ } else {
+ // TODO SERVER-62375: Remove this branch.
+ if (isRetryableWrite) {
_handleWouldChangeOwningShardErrorRetryableWriteLegacy(
opCtx, shardId, shardVersion, dbVersion, nss, cmdObj, result);
+ } else {
+ handleWouldChangeOwningShardErrorTransactionLegacy(
+ opCtx, nss, responseStatus, cmdObj, result);
}
- } else {
- updateShardKeyValueOnWouldChangeOwningShardError(
- opCtx, nss, responseStatus, cmdObj, result);
}
return;
@@ -497,7 +584,7 @@ private:
// Re-run the findAndModify command that will change the shard key value in a
// transaction. We call _runCommand recursively, and this second time through
// since it will be run as a transaction it will take the other code path to
- // updateShardKeyValueOnWouldChangeOwningShardError. We ensure the retried
+ // handleWouldChangeOwningShardErrorTransactionLegacy. We ensure the retried
// operation does not include WC inside the transaction by stripping it from the
// cmdObj. The transaction commit will still use the WC, because it uses the WC
// from the opCtx (which has been set previously in Strategy).
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index 2f176bed049..5dbb436c0d2 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/commands/update_metrics.h"
#include "mongo/db/commands/write_commands_common.h"
#include "mongo/db/curop.h"
+#include "mongo/db/internal_transactions_feature_flag_gen.h"
#include "mongo/db/not_primary_error_tracker.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/stats/counters.h"
@@ -56,6 +57,7 @@
#include "mongo/s/multi_statement_transaction_requests_sender.h"
#include "mongo/s/session_catalog_router.h"
#include "mongo/s/transaction_router.h"
+#include "mongo/s/transaction_router_resource_yielder.h"
#include "mongo/s/would_change_owning_shard_exception.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
@@ -148,6 +150,134 @@ boost::optional<WouldChangeOwningShardInfo> getWouldChangeOwningShardErrorInfo(
return boost::none;
}
+void handleWouldChangeOwningShardErrorRetryableWrite(OperationContext* opCtx,
+ BatchedCommandRequest* request,
+ BatchedCommandResponse* response) {
+ // Strip write concern because this command will be sent as part of a
+ // transaction and the write concern has already been loaded onto the opCtx and
+ // will be picked up by the transaction API.
+ request->unsetWriteConcern();
+
+ // Strip runtime constants because they will be added again when the API sends this command
+ // through the service entry point.
+ request->unsetLegacyRuntimeConstants();
+
+ // Unset error details because they will be repopulated below.
+ response->unsetErrDetails();
+
+ auto txn =
+ txn_api::TransactionWithRetries(opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(),
+ TransactionRouterResourceYielder::make());
+
+ // Shared state for the transaction API use below.
+ struct SharedBlock {
+ SharedBlock(NamespaceString nss_) : nss(nss_) {}
+
+ NamespaceString nss;
+ BSONObj response;
+ };
+ auto sharedBlock = std::make_shared<SharedBlock>(request->getNS());
+
+ auto swCommitResult = txn.runSyncNoThrow(
+ opCtx,
+ [cmdObj = request->toBSON(), sharedBlock](const txn_api::TransactionClient& txnClient,
+ ExecutorPtr txnExec) {
+ return txnClient.runCommand(sharedBlock->nss.db(), cmdObj)
+ .thenRunOn(txnExec)
+ .then([sharedBlock](auto res) {
+ uassertStatusOK(getStatusFromWriteCommandReply(res));
+
+ sharedBlock->response = CommandHelpers::filterCommandReplyForPassthrough(
+ res.removeField("recoveryToken"));
+ })
+ .semi();
+ });
+
+ auto bodyStatus = swCommitResult.getStatus();
+ if (!bodyStatus.isOK()) {
+ if (bodyStatus != ErrorCodes::DuplicateKey ||
+ (bodyStatus == ErrorCodes::DuplicateKey &&
+ !bodyStatus.extraInfo<DuplicateKeyErrorInfo>()->getKeyPattern().hasField("_id"))) {
+ bodyStatus.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext);
+ };
+
+ auto error = std::make_unique<WriteErrorDetail>();
+ error->setIndex(0);
+ error->setStatus(bodyStatus);
+ response->addToErrDetails(error.release());
+ return;
+ }
+
+ uassertStatusOK(swCommitResult.getValue().cmdStatus);
+
+ // Note this will clear existing response as part of parsing.
+ std::string errMsg = "Failed to parse response from WouldChangeOwningShard error handling";
+ response->parseBSON(sharedBlock->response, &errMsg);
+
+ // Make a unique pointer with a copy of the error detail because
+ // BatchedCommandResponse::setWriteConcernError() expects a pointer to a heap allocated
+ // WriteConcernErrorDetail that it can take unique ownership of.
+ auto writeConcernDetail =
+ std::make_unique<WriteConcernErrorDetail>(swCommitResult.getValue().wcError);
+ if (!writeConcernDetail->toStatus().isOK()) {
+ response->setWriteConcernError(writeConcernDetail.release());
+ }
+}
+
+struct UpdateShardKeyResult {
+ bool updatedShardKey{false};
+ boost::optional<BSONObj> upsertedId;
+};
+
+UpdateShardKeyResult handleWouldChangeOwningShardErrorTransaction(
+ OperationContext* opCtx,
+ BatchedCommandRequest* request,
+ BatchedCommandResponse* response,
+ const WouldChangeOwningShardInfo& changeInfo) {
+ // Shared state for the transaction API use below.
+ struct SharedBlock {
+ SharedBlock(WouldChangeOwningShardInfo changeInfo_, NamespaceString nss_)
+ : changeInfo(changeInfo_), nss(nss_) {}
+
+ WouldChangeOwningShardInfo changeInfo;
+ NamespaceString nss;
+ bool updatedShardKey{false};
+ };
+ auto sharedBlock = std::make_shared<SharedBlock>(changeInfo, request->getNS());
+
+ auto txn =
+ txn_api::TransactionWithRetries(opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(),
+ TransactionRouterResourceYielder::make());
+
+ try {
+ txn.runSync(opCtx,
+ [sharedBlock](const txn_api::TransactionClient& txnClient,
+ ExecutorPtr txnExec) -> SemiFuture<void> {
+ return documentShardKeyUpdateUtil::updateShardKeyForDocument(
+ txnClient, txnExec, sharedBlock->nss, sharedBlock->changeInfo)
+ .thenRunOn(txnExec)
+ .then([sharedBlock](bool updatedShardKey) {
+ sharedBlock->updatedShardKey = updatedShardKey;
+ })
+ .semi();
+ });
+ } catch (const ExceptionFor<ErrorCodes::DuplicateKey>& ex) {
+ Status status = ex->getKeyPattern().hasField("_id")
+ ? ex.toStatus().withContext(documentShardKeyUpdateUtil::kDuplicateKeyErrorContext)
+ : ex.toStatus();
+ uassertStatusOK(status);
+ }
+
+ // If the operation was an upsert, record the _id of the new document.
+ boost::optional<BSONObj> upsertedId;
+ if (sharedBlock->updatedShardKey && sharedBlock->changeInfo.getShouldUpsert()) {
+ upsertedId = sharedBlock->changeInfo.getPostImage()["_id"].wrap();
+ }
+ return UpdateShardKeyResult{sharedBlock->updatedShardKey, std::move(upsertedId)};
+}
+
/**
* Changes the shard key for the document if the response object contains a WouldChangeOwningShard
* error. If the original command was sent as a retryable write, starts a transaction on the same
@@ -169,91 +299,112 @@ bool handleWouldChangeOwningShardError(OperationContext* opCtx,
bool updatedShardKey = false;
boost::optional<BSONObj> upsertedId;
- if (isRetryableWrite) {
- if (MONGO_unlikely(hangAfterThrowWouldChangeOwningShardRetryableWrite.shouldFail())) {
- LOGV2(22759, "Hit hangAfterThrowWouldChangeOwningShardRetryableWrite failpoint");
- hangAfterThrowWouldChangeOwningShardRetryableWrite.pauseWhileSet(opCtx);
- }
- RouterOperationContextSession routerSession(opCtx);
- try {
- // Start transaction and re-run the original update command
- auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
-
- // Ensure the retried operation does not include WC inside the transaction. The
- // transaction commit will still use the WC, because it uses the WC from the opCtx
- // (which has been set previously in Strategy).
- request->unsetWriteConcern();
-
- documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx);
- // Clear the error details from the response object before sending the write again
- response->unsetErrDetails();
- cluster::write(opCtx, *request, &stats, response);
- wouldChangeOwningShardErrorInfo =
- getWouldChangeOwningShardErrorInfo(opCtx, *request, response, !isRetryableWrite);
- if (!wouldChangeOwningShardErrorInfo)
- uassertStatusOK(response->toStatus());
-
- // If we do not get WouldChangeOwningShard when re-running the update, the document has
- // been modified or deleted concurrently and we do not need to delete it and insert a
- // new one.
- updatedShardKey = wouldChangeOwningShardErrorInfo &&
- documentShardKeyUpdateUtil::updateShardKeyForDocument(
- opCtx, request->getNS(), wouldChangeOwningShardErrorInfo.get());
-
- // If the operation was an upsert, record the _id of the new document.
- if (updatedShardKey && wouldChangeOwningShardErrorInfo->getShouldUpsert()) {
- upsertedId = wouldChangeOwningShardErrorInfo->getPostImage()["_id"].wrap();
+ if (feature_flags::gFeatureFlagInternalTransactions.isEnabled(
+ serverGlobalParams.featureCompatibility)) {
+ if (isRetryableWrite) {
+ if (MONGO_unlikely(hangAfterThrowWouldChangeOwningShardRetryableWrite.shouldFail())) {
+ LOGV2(5918603, "Hit hangAfterThrowWouldChangeOwningShardRetryableWrite failpoint");
+ hangAfterThrowWouldChangeOwningShardRetryableWrite.pauseWhileSet(opCtx);
}
- // Commit the transaction
- auto commitResponse =
- documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx);
+ handleWouldChangeOwningShardErrorRetryableWrite(opCtx, request, response);
+ } else {
+ auto updateResult = handleWouldChangeOwningShardErrorTransaction(
+ opCtx, request, response, *wouldChangeOwningShardErrorInfo);
+ updatedShardKey = updateResult.updatedShardKey;
+ upsertedId = std::move(updateResult.upsertedId);
+ }
+ } else {
+ // TODO SERVER-62375: Delete this branch.
+ if (isRetryableWrite) {
+ if (MONGO_unlikely(hangAfterThrowWouldChangeOwningShardRetryableWrite.shouldFail())) {
+ LOGV2(22759, "Hit hangAfterThrowWouldChangeOwningShardRetryableWrite failpoint");
+ hangAfterThrowWouldChangeOwningShardRetryableWrite.pauseWhileSet(opCtx);
+ }
+ RouterOperationContextSession routerSession(opCtx);
+ try {
+ // Start transaction and re-run the original update command
+ auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+ readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
+
+ // Ensure the retried operation does not include WC inside the transaction. The
+ // transaction commit will still use the WC, because it uses the WC from the opCtx
+ // (which has been set previously in Strategy).
+ request->unsetWriteConcern();
+
+ documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx);
+ // Clear the error details from the response object before sending the write again
+ response->unsetErrDetails();
+ cluster::write(opCtx, *request, &stats, response);
+ wouldChangeOwningShardErrorInfo = getWouldChangeOwningShardErrorInfo(
+ opCtx, *request, response, !isRetryableWrite);
+ if (!wouldChangeOwningShardErrorInfo)
+ uassertStatusOK(response->toStatus());
+
+ // If we do not get WouldChangeOwningShard when re-running the update, the document
+ // has been modified or deleted concurrently and we do not need to delete it and
+ // insert a new one.
+ updatedShardKey =
+ wouldChangeOwningShardErrorInfo &&
+ documentShardKeyUpdateUtil::updateShardKeyForDocumentLegacy(
+ opCtx, request->getNS(), wouldChangeOwningShardErrorInfo.get());
+
+ // If the operation was an upsert, record the _id of the new document.
+ if (updatedShardKey && wouldChangeOwningShardErrorInfo->getShouldUpsert()) {
+ upsertedId = wouldChangeOwningShardErrorInfo->getPostImage()["_id"].wrap();
+ }
- uassertStatusOK(getStatusFromCommandResult(commitResponse));
+ // Commit the transaction
+ auto commitResponse =
+ documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx);
- auto writeConcernDetail = getWriteConcernErrorDetailFromBSONObj(commitResponse);
- if (writeConcernDetail && !writeConcernDetail->toStatus().isOK())
- response->setWriteConcernError(writeConcernDetail.release());
- } catch (DBException& e) {
- if (e.code() == ErrorCodes::DuplicateKey &&
- e.extraInfo<DuplicateKeyErrorInfo>()->getKeyPattern().hasField("_id")) {
- e.addContext(documentShardKeyUpdateUtil::kDuplicateKeyErrorContext);
- } else {
- e.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext);
- }
+ uassertStatusOK(getStatusFromCommandResult(commitResponse));
- if (!response->isErrDetailsSet() || !response->getErrDetails().back()) {
- auto error = std::make_unique<WriteErrorDetail>();
- error->setIndex(0);
- response->addToErrDetails(error.release());
- }
+ auto writeConcernDetail = getWriteConcernErrorDetailFromBSONObj(commitResponse);
+ if (writeConcernDetail && !writeConcernDetail->toStatus().isOK())
+ response->setWriteConcernError(writeConcernDetail.release());
+ } catch (DBException& e) {
+ if (e.code() == ErrorCodes::DuplicateKey &&
+ e.extraInfo<DuplicateKeyErrorInfo>()->getKeyPattern().hasField("_id")) {
+ e.addContext(documentShardKeyUpdateUtil::kDuplicateKeyErrorContext);
+ } else {
+ e.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext);
+ }
- // Set the error status to the status of the failed command and abort the transaction.
- auto status = e.toStatus();
- response->getErrDetails().back()->setStatus(status);
+ if (!response->isErrDetailsSet() || !response->getErrDetails().back()) {
+ auto error = std::make_unique<WriteErrorDetail>();
+ error->setIndex(0);
+ response->addToErrDetails(error.release());
+ }
- auto txnRouterForAbort = TransactionRouter::get(opCtx);
- if (txnRouterForAbort)
- txnRouterForAbort.implicitlyAbortTransaction(opCtx, status);
+ // Set the error status to the status of the failed command and abort the
+ // transaction.
+ auto status = e.toStatus();
+ response->getErrDetails().back()->setStatus(status);
- return false;
- }
- } else {
- try {
- // Delete the original document and insert the new one
- updatedShardKey = documentShardKeyUpdateUtil::updateShardKeyForDocument(
- opCtx, request->getNS(), wouldChangeOwningShardErrorInfo.get());
+ auto txnRouterForAbort = TransactionRouter::get(opCtx);
+ if (txnRouterForAbort)
+ txnRouterForAbort.implicitlyAbortTransaction(opCtx, status);
- // If the operation was an upsert, record the _id of the new document.
- if (updatedShardKey && wouldChangeOwningShardErrorInfo->getShouldUpsert()) {
- upsertedId = wouldChangeOwningShardErrorInfo->getPostImage()["_id"].wrap();
+ return false;
+ }
+ } else {
+ try {
+ // Delete the original document and insert the new one
+ updatedShardKey = documentShardKeyUpdateUtil::updateShardKeyForDocumentLegacy(
+ opCtx, request->getNS(), wouldChangeOwningShardErrorInfo.get());
+
+ // If the operation was an upsert, record the _id of the new document.
+ if (updatedShardKey && wouldChangeOwningShardErrorInfo->getShouldUpsert()) {
+ upsertedId = wouldChangeOwningShardErrorInfo->getPostImage()["_id"].wrap();
+ }
+ } catch (const ExceptionFor<ErrorCodes::DuplicateKey>& ex) {
+ Status status = ex->getKeyPattern().hasField("_id")
+ ? ex.toStatus().withContext(
+ documentShardKeyUpdateUtil::kDuplicateKeyErrorContext)
+ : ex.toStatus();
+ uassertStatusOK(status);
}
- } catch (const ExceptionFor<ErrorCodes::DuplicateKey>& ex) {
- Status status = ex->getKeyPattern().hasField("_id")
- ? ex.toStatus().withContext(documentShardKeyUpdateUtil::kDuplicateKeyErrorContext)
- : ex.toStatus();
- uassertStatusOK(status);
}
}
diff --git a/src/mongo/s/commands/document_shard_key_update_test.cpp b/src/mongo/s/commands/document_shard_key_update_test.cpp
index dcaf62de0db..4c297ce7bc8 100644
--- a/src/mongo/s/commands/document_shard_key_update_test.cpp
+++ b/src/mongo/s/commands/document_shard_key_update_test.cpp
@@ -53,7 +53,7 @@ TEST_F(DocumentShardKeyUpdateTest, constructShardKeyDeleteCmdObj) {
NamespaceString nss("test.foo");
BSONObj updatePreImage = BSON("x" << 4 << "y" << 3 << "_id" << 20);
- auto deleteCmdObj = constructShardKeyDeleteCmdObj(nss, updatePreImage);
+ auto deleteCmdObj = constructShardKeyDeleteCmdObj(nss, updatePreImage, boost::none);
auto deletesObj = deleteCmdObj["deletes"].Array();
ASSERT_EQ(deletesObj.size(), 1U);
@@ -69,7 +69,7 @@ TEST_F(DocumentShardKeyUpdateTest, constructShardKeyInsertCmdObj) {
NamespaceString nss("test.foo");
BSONObj updatePostImage = BSON("x" << 4 << "y" << 3 << "_id" << 20);
- auto insertCmdObj = constructShardKeyInsertCmdObj(nss, updatePostImage);
+ auto insertCmdObj = constructShardKeyInsertCmdObj(nss, updatePostImage, boost::none);
auto insertsObj = insertCmdObj["documents"].Array();
ASSERT_EQ(insertsObj.size(), 1U);
diff --git a/src/mongo/s/commands/document_shard_key_update_util.cpp b/src/mongo/s/commands/document_shard_key_update_util.cpp
index 629c29ebca5..4524dc78689 100644
--- a/src/mongo/s/commands/document_shard_key_update_util.cpp
+++ b/src/mongo/s/commands/document_shard_key_update_util.cpp
@@ -102,7 +102,8 @@ bool executeOperationsAsPartOfShardKeyUpdate(OperationContext* opCtx,
* original document _id retrieved from 'updatePreImage'.
*/
write_ops::DeleteCommandRequest createShardKeyDeleteOp(const NamespaceString& nss,
- const BSONObj& updatePreImage) {
+ const BSONObj& updatePreImage,
+ boost::optional<StmtId> stmtId) {
write_ops::DeleteCommandRequest deleteOp(nss);
deleteOp.setDeletes({[&] {
write_ops::DeleteOpEntry entry;
@@ -110,7 +111,9 @@ write_ops::DeleteCommandRequest createShardKeyDeleteOp(const NamespaceString& ns
entry.setMulti(false);
return entry;
}()});
- deleteOp.getWriteCommandRequestBase().setStmtId(1);
+ if (stmtId) {
+ deleteOp.getWriteCommandRequestBase().setStmtId(*stmtId);
+ }
return deleteOp;
}
@@ -119,10 +122,13 @@ write_ops::DeleteCommandRequest createShardKeyDeleteOp(const NamespaceString& ns
* Creates the insert op that will be used to insert the new document with the post-update image.
*/
write_ops::InsertCommandRequest createShardKeyInsertOp(const NamespaceString& nss,
- const BSONObj& updatePostImage) {
+ const BSONObj& updatePostImage,
+ boost::optional<StmtId> stmtId) {
write_ops::InsertCommandRequest insertOp(nss);
insertOp.setDocuments({updatePostImage});
- insertOp.getWriteCommandRequestBase().setStmtId(2);
+ if (stmtId) {
+ insertOp.getWriteCommandRequestBase().setStmtId(*stmtId);
+ }
return insertOp;
}
@@ -130,14 +136,14 @@ write_ops::InsertCommandRequest createShardKeyInsertOp(const NamespaceString& ns
namespace documentShardKeyUpdateUtil {
-bool updateShardKeyForDocument(OperationContext* opCtx,
- const NamespaceString& nss,
- const WouldChangeOwningShardInfo& documentKeyChangeInfo) {
+bool updateShardKeyForDocumentLegacy(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const WouldChangeOwningShardInfo& documentKeyChangeInfo) {
auto updatePreImage = documentKeyChangeInfo.getPreImage().getOwned();
auto updatePostImage = documentKeyChangeInfo.getPostImage().getOwned();
- auto deleteCmdObj = constructShardKeyDeleteCmdObj(nss, updatePreImage);
- auto insertCmdObj = constructShardKeyInsertCmdObj(nss, updatePostImage);
+ auto deleteCmdObj = constructShardKeyDeleteCmdObj(nss, updatePreImage, boost::none);
+ auto insertCmdObj = constructShardKeyInsertCmdObj(nss, updatePostImage, boost::none);
return executeOperationsAsPartOfShardKeyUpdate(
opCtx, deleteCmdObj, insertCmdObj, nss.db(), documentKeyChangeInfo.getShouldUpsert());
@@ -160,15 +166,83 @@ BSONObj commitShardKeyUpdateTransaction(OperationContext* opCtx) {
return txnRouter.commitTransaction(opCtx, boost::none);
}
-BSONObj constructShardKeyDeleteCmdObj(const NamespaceString& nss, const BSONObj& updatePreImage) {
- auto deleteOp = createShardKeyDeleteOp(nss, updatePreImage);
+BSONObj constructShardKeyDeleteCmdObj(const NamespaceString& nss,
+ const BSONObj& updatePreImage,
+ boost::optional<StmtId> stmtId) {
+ auto deleteOp = createShardKeyDeleteOp(nss, updatePreImage, stmtId);
return deleteOp.toBSON({});
}
-BSONObj constructShardKeyInsertCmdObj(const NamespaceString& nss, const BSONObj& updatePostImage) {
- auto insertOp = createShardKeyInsertOp(nss, updatePostImage);
+BSONObj constructShardKeyInsertCmdObj(const NamespaceString& nss,
+ const BSONObj& updatePostImage,
+ boost::optional<StmtId> stmtId) {
+ auto insertOp = createShardKeyInsertOp(nss, updatePostImage, stmtId);
return insertOp.toBSON({});
}
+SemiFuture<bool> updateShardKeyForDocument(const txn_api::TransactionClient& txnClient,
+ ExecutorPtr txnExec,
+ const NamespaceString& nss,
+ const WouldChangeOwningShardInfo& changeInfo) {
+ // Use stmtId=1 for this delete (and 2 for the subsequent insert) because the original
+ // update/findAndModify that threw the WouldChangeOwningShard error used stmtId=0 to store the
+ // WouldChangeOwningShard sentinel noop entry.
+ auto deleteCmdObj = documentShardKeyUpdateUtil::constructShardKeyDeleteCmdObj(
+ nss, changeInfo.getPreImage().getOwned(), {1});
+ auto deleteOpMsg = OpMsgRequest::fromDBAndBody(nss.db(), std::move(deleteCmdObj));
+ auto deleteRequest = BatchedCommandRequest::parseDelete(std::move(deleteOpMsg));
+
+ return txnClient.runCRUDOp(deleteRequest, {})
+ .thenRunOn(txnExec)
+ .then([&txnClient, &nss, &changeInfo](
+ auto deleteResponse) -> SemiFuture<BatchedCommandResponse> {
+ uassertStatusOK(deleteResponse.toStatus());
+
+ // If shouldUpsert is true, this means the original command specified {upsert:
+ // true} and did not match any docs, so we should not match any when doing
+ // this delete. If shouldUpsert is false and we do not delete any document,
+ // this is essentially equivalent to not matching a doc and we should not
+ // insert.
+ if (changeInfo.getShouldUpsert()) {
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ "Delete matched a document when it should not have.",
+ deleteResponse.getN() == 0);
+ } else if (deleteResponse.getN() != 1) {
+ iassert(Status(ErrorCodes::WouldChangeOwningShardDeletedNoDocument,
+ "When handling WouldChangeOwningShard error, the delete matched no "
+ "documents and should not upsert"));
+ }
+
+ if (MONGO_unlikely(hangBeforeInsertOnUpdateShardKey.shouldFail())) {
+ LOGV2(5918602, "Hit hangBeforeInsertOnUpdateShardKey failpoint");
+ hangBeforeInsertOnUpdateShardKey.pauseWhileSet();
+ }
+
+ auto insertCmdObj = documentShardKeyUpdateUtil::constructShardKeyInsertCmdObj(
+ nss, changeInfo.getPostImage().getOwned(), {2});
+ auto insertOpMsg = OpMsgRequest::fromDBAndBody(nss.db(), std::move(insertCmdObj));
+ auto insertRequest = BatchedCommandRequest::parseInsert(std::move(insertOpMsg));
+
+ return txnClient.runCRUDOp(insertRequest, {});
+ })
+ .thenRunOn(txnExec)
+ .then([&nss](auto insertResponse) {
+ uassertStatusOK(insertResponse.toStatus());
+
+ uassert(ErrorCodes::NamespaceNotFound,
+ "Document not successfully inserted while changing shard key for namespace " +
+ nss.ns(),
+ insertResponse.getN() == 1);
+
+ return true;
+ })
+ .onError<ErrorCodes::WouldChangeOwningShardDeletedNoDocument>([](Status status) {
+ // We failed to delete a document and were not configured to upsert, so the insert
+ // was never sent. Propagate that failure by returning false.
+ return false;
+ })
+ .semi();
+}
+
} // namespace documentShardKeyUpdateUtil
} // namespace mongo
diff --git a/src/mongo/s/commands/document_shard_key_update_util.h b/src/mongo/s/commands/document_shard_key_update_util.h
index 2684c7de8a0..31825eece60 100644
--- a/src/mongo/s/commands/document_shard_key_update_util.h
+++ b/src/mongo/s/commands/document_shard_key_update_util.h
@@ -35,6 +35,7 @@
#include "mongo/db/logical_session_id.h"
#include "mongo/db/ops/write_ops.h"
+#include "mongo/db/transaction_api.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/s/transaction_router.h"
@@ -65,14 +66,32 @@ static constexpr StringData kNonDuplicateKeyErrorContext =
"transaction failed."_sd;
/**
+ * TODO SERVER-62375: Remove this function.
+ *
* Coordinating method and external point of entry for updating a document's shard key. This method
* creates the necessary extra operations. It will then run each operation using the ClusterWriter.
* If any statement throws, an exception will leave this method, and must be handled by external
* callers.
*/
-bool updateShardKeyForDocument(OperationContext* opCtx,
- const NamespaceString& nss,
- const WouldChangeOwningShardInfo& documentKeyChangeInfo);
+bool updateShardKeyForDocumentLegacy(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const WouldChangeOwningShardInfo& documentKeyChangeInfo);
+
+/**
+ * Coordinating method and external point of entry for updating a document's shard key. This method
+ * creates the necessary extra operations. It will then run each operation using the given
+ * transaction client. If any statement throws, an exception will leave this method, and must be
+ * handled by external callers.
+ *
+ * Returns an error on any error returned by a command. If the original update was sent with
+ * {upsert: false}, returns whether or not we deleted the original doc and inserted the new one
+ * sucessfully. If the original update was sent with {upsert: true}, returns whether or not we
+ * inserted the new doc successfully.
+ */
+SemiFuture<bool> updateShardKeyForDocument(const txn_api::TransactionClient& txnClient,
+ ExecutorPtr txnExec,
+ const NamespaceString& nss,
+ const WouldChangeOwningShardInfo& changeInfo);
/**
* Starts a transaction on this session. This method is called when WouldChangeOwningShard is thrown
@@ -93,7 +112,9 @@ BSONObj commitShardKeyUpdateTransaction(OperationContext* opCtx);
* This method should not be called outside of this class. It is only temporarily exposed for
* intermediary test coverage.
*/
-BSONObj constructShardKeyDeleteCmdObj(const NamespaceString& nss, const BSONObj& updatePreImage);
+BSONObj constructShardKeyDeleteCmdObj(const NamespaceString& nss,
+ const BSONObj& updatePreImage,
+ boost::optional<StmtId> stmtId);
/*
* Creates the BSONObj that will be used to insert the new document with the post-update image.
@@ -102,6 +123,8 @@ BSONObj constructShardKeyDeleteCmdObj(const NamespaceString& nss, const BSONObj&
* This method should not be called outside of this class. It is only temporarily exposed for
* intermediary test coverage.
*/
-BSONObj constructShardKeyInsertCmdObj(const NamespaceString& nss, const BSONObj& updatePostImage);
+BSONObj constructShardKeyInsertCmdObj(const NamespaceString& nss,
+ const BSONObj& updatePostImage,
+ boost::optional<StmtId> stmtId);
} // namespace documentShardKeyUpdateUtil
} // namespace mongo
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 8ad6fc25a61..181411e9c38 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -191,12 +191,20 @@ Future<void> invokeInTransactionRouter(std::shared_ptr<RequestExecutionContext>
ErrorCodes::isNeedRetargettingError(code) ||
code == ErrorCodes::ShardInvalidatedForTargeting ||
code == ErrorCodes::StaleDbVersion ||
- code == ErrorCodes::ShardCannotRefreshDueToLocksHeld) {
+ code == ErrorCodes::ShardCannotRefreshDueToLocksHeld ||
+ code == ErrorCodes::WouldChangeOwningShard) {
// Don't abort on possibly retryable errors.
return;
}
auto opCtx = rec->getOpCtx();
+
+ auto txnRouter = TransactionRouter::get(opCtx);
+ if (!txnRouter) {
+ // The command had yielded its session while the error was thrown.
+ return;
+ }
+
TransactionRouter::get(opCtx).implicitlyAbortTransaction(opCtx, status);
});
}
@@ -435,6 +443,8 @@ private:
// Logs and updates statistics if an error occurs.
void _tapOnError(const Status& status);
+ void _tapOnSuccess();
+
ParseAndRunCommand* const _parc;
boost::optional<RouterOperationContextSession> _routerSession;
@@ -910,8 +920,16 @@ void ParseAndRunCommand::RunAndRetry::_checkRetryForTransaction(Status& status)
// be retried on.
auto opCtx = _parc->_rec->getOpCtx();
auto txnRouter = TransactionRouter::get(opCtx);
- if (!txnRouter)
+ if (!txnRouter) {
+ if (opCtx->inMultiDocumentTransaction()) {
+ // This command must have failed while its session was yielded. We cannot retry in this
+ // case, whatever the session was yielded to is responsible for that, so rethrow the
+ // error.
+ iassert(status);
+ }
+
return;
+ }
ScopeGuard abortGuard([&] { txnRouter.implicitlyAbortTransaction(opCtx, status); });
@@ -1067,6 +1085,15 @@ void ParseAndRunCommand::RunInvocation::_tapOnError(const Status& status) {
_parc->_errorBuilder->appendElements(errorLabels);
}
+void ParseAndRunCommand::RunInvocation::_tapOnSuccess() {
+ auto opCtx = _parc->_rec->getOpCtx();
+ if (_parc->_osi && _parc->_osi->getAutocommit() == boost::optional<bool>(false)) {
+ tassert(5918604,
+ "A successful transaction command must always check out its session after yielding",
+ TransactionRouter::get(opCtx));
+ }
+}
+
Future<void> ParseAndRunCommand::RunAndRetry::run() {
return makeReadyFutureWith([&] {
// Try kMaxNumStaleVersionRetries times. On the last try, exceptions are rethrown.
@@ -1107,6 +1134,7 @@ Future<void> ParseAndRunCommand::RunInvocation::run() {
return future_util::makeState<RunAndRetry>(_parc).thenWithState(
[](auto* runner) { return runner->run(); });
})
+ .tap([this]() { _tapOnSuccess(); })
.tapError([this](Status status) { _tapOnError(status); });
}
diff --git a/src/mongo/s/session_catalog_router.cpp b/src/mongo/s/session_catalog_router.cpp
index d0a8da6984b..1207f371dbf 100644
--- a/src/mongo/s/session_catalog_router.cpp
+++ b/src/mongo/s/session_catalog_router.cpp
@@ -72,7 +72,24 @@ RouterOperationContextSession::RouterOperationContextSession(OperationContext* o
: _opCtx(opCtx), _operationContextSession(opCtx) {}
RouterOperationContextSession::~RouterOperationContextSession() {
- TransactionRouter::get(_opCtx).stash(_opCtx);
+ if (auto txnRouter = TransactionRouter::get(_opCtx)) {
+ // Only stash if the session wasn't yielded.
+ txnRouter.stash(_opCtx);
+ }
};
+void RouterOperationContextSession::checkIn(OperationContext* opCtx) {
+ invariant(OperationContextSession::get(opCtx));
+
+ TransactionRouter::get(opCtx).stash(opCtx);
+ OperationContextSession::checkIn(opCtx);
+}
+
+void RouterOperationContextSession::checkOut(OperationContext* opCtx) {
+ invariant(!OperationContextSession::get(opCtx));
+
+ OperationContextSession::checkOut(opCtx);
+ TransactionRouter::get(opCtx).unstash(opCtx);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/session_catalog_router.h b/src/mongo/s/session_catalog_router.h
index ff54747aeed..301b40fffca 100644
--- a/src/mongo/s/session_catalog_router.h
+++ b/src/mongo/s/session_catalog_router.h
@@ -61,6 +61,16 @@ public:
RouterOperationContextSession(OperationContext* opCtx);
~RouterOperationContextSession();
+ /**
+ * These methods take an operation context with a checked-out session and allow it to be
+ * temporarily or permanently checked back in, in order to allow other operations to use it.
+ *
+ * Check-in may only be called if the session has actually been checked out previously and
+ * similarly check-out may only be called if the session is not checked out already.
+ */
+ static void checkIn(OperationContext* opCtx);
+ static void checkOut(OperationContext* opCtx);
+
private:
OperationContext* _opCtx;
OperationContextSession _operationContextSession;
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index 70a99dc4eb9..198e1209212 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -498,7 +498,9 @@ void TransactionRouter::Router::processParticipantResponse(OperationContext* opC
}
auto commandStatus = getStatusFromCommandResult(responseObj);
- if (!commandStatus.isOK()) {
+ // WouldChangeOwningShard errors don't abort their transaction and the responses containing them
+ // include transaction metadata, so we treat them as successful responses.
+ if (!commandStatus.isOK() && commandStatus != ErrorCodes::WouldChangeOwningShard) {
return;
}
@@ -1046,6 +1048,19 @@ void TransactionRouter::Router::stash(OperationContext* opCtx) {
o(lk).metricsTracker->trySetInactive(tickSource, tickSource->getTicks());
}
+void TransactionRouter::Router::unstash(OperationContext* opCtx) {
+ if (!isInitialized()) {
+ return;
+ }
+
+ // TODO SERVER-64052: Validate that the transaction number hasn't changed and metrics are
+ // updated appropriately.
+
+ auto tickSource = opCtx->getServiceContext()->getTickSource();
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).metricsTracker->trySetActive(tickSource, tickSource->getTicks());
+}
+
BSONObj TransactionRouter::Router::_handOffCommitToCoordinator(OperationContext* opCtx) {
invariant(o().coordinatorId);
auto coordinatorIter = o().participants.find(*o().coordinatorId);
@@ -1247,7 +1262,7 @@ BSONObj TransactionRouter::Router::abortTransaction(OperationContext* opCtx) {
3,
"{sessionId}:{txnNumber} Aborting transaction on {numParticipantShards} shard(s)",
"Aborting transaction on all participant shards",
- "sessionId"_attr = _sessionId().getId(),
+ "sessionId"_attr = _sessionId(),
"txnNumber"_attr = o().txnNumberAndRetryCounter.getTxnNumber(),
"txnRetryCounter"_attr = o().txnNumberAndRetryCounter.getTxnRetryCounter(),
"numParticipantShards"_attr = o().participants.size());
@@ -1295,7 +1310,7 @@ void TransactionRouter::Router::implicitlyAbortTransaction(OperationContext* opC
"may have been handed off to the coordinator",
"Not sending implicit abortTransaction to participant shards after error because "
"coordinating the commit decision may have been handed off to the coordinator shard",
- "sessionId"_attr = _sessionId().getId(),
+ "sessionId"_attr = _sessionId(),
"txnNumber"_attr = o().txnNumberAndRetryCounter.getTxnNumber(),
"txnRetryCounter"_attr = o().txnNumberAndRetryCounter.getTxnRetryCounter(),
"error"_attr = redact(status));
@@ -1324,7 +1339,7 @@ void TransactionRouter::Router::implicitlyAbortTransaction(OperationContext* opC
"{sessionId}:{txnNumber} Implicitly aborting transaction on {numParticipantShards} "
"shard(s) due to error: {error}",
"Implicitly aborting transaction on all participant shards",
- "sessionId"_attr = _sessionId().getId(),
+ "sessionId"_attr = _sessionId(),
"txnNumber"_attr = o().txnNumberAndRetryCounter.getTxnNumber(),
"txnRetryCounter"_attr = o().txnNumberAndRetryCounter.getTxnRetryCounter(),
"numParticipantShards"_attr = o().participants.size(),
@@ -1342,7 +1357,7 @@ void TransactionRouter::Router::implicitlyAbortTransaction(OperationContext* opC
3,
"{sessionId}:{txnNumber} Implicitly aborting transaction failed {error}",
"Implicitly aborting transaction failed",
- "sessionId"_attr = _sessionId().getId(),
+ "sessionId"_attr = _sessionId(),
"txnNumber"_attr = o().txnNumberAndRetryCounter.getTxnNumber(),
"txnRetryCounter"_attr = o().txnNumberAndRetryCounter.getTxnRetryCounter(),
"error"_attr = ex);
diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h
index b3021f0aaa9..9da80bd9afd 100644
--- a/src/mongo/s/transaction_router.h
+++ b/src/mongo/s/transaction_router.h
@@ -377,6 +377,11 @@ public:
void stash(OperationContext* opCtx);
/**
+ * Validates transaction state is still compatible after a yield.
+ */
+ void unstash(OperationContext* opCtx);
+
+ /**
* Attaches the required transaction related fields for a request to be sent to the given
* shard.
*
diff --git a/src/mongo/s/transaction_router_resource_yielder.cpp b/src/mongo/s/transaction_router_resource_yielder.cpp
new file mode 100644
index 00000000000..252231483a4
--- /dev/null
+++ b/src/mongo/s/transaction_router_resource_yielder.cpp
@@ -0,0 +1,55 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/s/transaction_router_resource_yielder.h"
+
+#include "mongo/db/session_catalog.h"
+#include "mongo/s/session_catalog_router.h"
+
+namespace mongo {
+
+std::unique_ptr<TransactionRouterResourceYielder> TransactionRouterResourceYielder::make() {
+ return std::make_unique<TransactionRouterResourceYielder>();
+}
+
+void TransactionRouterResourceYielder::yield(OperationContext* opCtx) {
+ Session* const session = OperationContextSession::get(opCtx);
+ if (session) {
+ RouterOperationContextSession::checkIn(opCtx);
+ }
+ _yielded = (session != nullptr);
+}
+
+void TransactionRouterResourceYielder::unyield(OperationContext* opCtx) {
+ if (_yielded) {
+ RouterOperationContextSession::checkOut(opCtx);
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/transaction_router_resource_yielder.h b/src/mongo/s/transaction_router_resource_yielder.h
new file mode 100644
index 00000000000..173c588c73a
--- /dev/null
+++ b/src/mongo/s/transaction_router_resource_yielder.h
@@ -0,0 +1,63 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/operation_context.h"
+#include "mongo/db/resource_yielder.h"
+
+namespace mongo {
+
+/**
+ * Implementation of ResourceYielder that yields resources checked out in the course of running a
+ * distributed transaction.
+ */
+class TransactionRouterResourceYielder : public ResourceYielder {
+public:
+ /**
+ * Returns a newly allocated yielder.
+ */
+ static std::unique_ptr<TransactionRouterResourceYielder> make();
+
+ /**
+ * If the opCtx has a checked out RouterOperationContextSession, yields it.
+ */
+ void yield(OperationContext* opCtx) override;
+
+ /**
+ * If the opCtx had previously checked out RouterOperationContextSession, checks it back out.
+ * Note this may throw if the opCtx has been interrupted.
+ */
+ void unyield(OperationContext* opCtx) override;
+
+private:
+ bool _yielded{false};
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/write_ops/batched_command_request.cpp b/src/mongo/s/write_ops/batched_command_request.cpp
index ced74997591..c916b5b58ba 100644
--- a/src/mongo/s/write_ops/batched_command_request.cpp
+++ b/src/mongo/s/write_ops/batched_command_request.cpp
@@ -130,6 +130,13 @@ void BatchedCommandRequest::setLegacyRuntimeConstants(LegacyRuntimeConstants run
}});
}
+void BatchedCommandRequest::unsetLegacyRuntimeConstants() {
+ _visit(visit_helper::Overloaded{
+ [](write_ops::InsertCommandRequest&) {},
+ [&](write_ops::UpdateCommandRequest& op) { op.setLegacyRuntimeConstants(boost::none); },
+ [&](write_ops::DeleteCommandRequest& op) { op.setLegacyRuntimeConstants(boost::none); }});
+}
+
const boost::optional<LegacyRuntimeConstants>& BatchedCommandRequest::getLegacyRuntimeConstants()
const {
struct Visitor {
diff --git a/src/mongo/s/write_ops/batched_command_request.h b/src/mongo/s/write_ops/batched_command_request.h
index a7e02f2e58a..f900d52140a 100644
--- a/src/mongo/s/write_ops/batched_command_request.h
+++ b/src/mongo/s/write_ops/batched_command_request.h
@@ -138,6 +138,8 @@ public:
void setLegacyRuntimeConstants(LegacyRuntimeConstants runtimeConstants);
+ void unsetLegacyRuntimeConstants();
+
bool hasLegacyRuntimeConstants() const;
const boost::optional<LegacyRuntimeConstants>& getLegacyRuntimeConstants() const;