From b17dfaaec98f63c16d05c9f76768df98be78699e Mon Sep 17 00:00:00 2001 From: Jack Mulrow Date: Mon, 28 Feb 2022 19:31:31 +0000 Subject: SERVER-59186 Use transaction API for all current changing a document's shard key logic --- src/mongo/s/SConscript | 1 + .../s/commands/cluster_find_and_modify_cmd.cpp | 211 ++++++++++----- src/mongo/s/commands/cluster_write_cmd.cpp | 301 ++++++++++++++++----- .../s/commands/document_shard_key_update_test.cpp | 4 +- .../s/commands/document_shard_key_update_util.cpp | 100 ++++++- .../s/commands/document_shard_key_update_util.h | 33 ++- src/mongo/s/commands/strategy.cpp | 32 ++- src/mongo/s/session_catalog_router.cpp | 19 +- src/mongo/s/session_catalog_router.h | 10 + src/mongo/s/transaction_router.cpp | 25 +- src/mongo/s/transaction_router.h | 5 + .../s/transaction_router_resource_yielder.cpp | 55 ++++ src/mongo/s/transaction_router_resource_yielder.h | 63 +++++ src/mongo/s/write_ops/batched_command_request.cpp | 7 + src/mongo/s/write_ops/batched_command_request.h | 2 + 15 files changed, 703 insertions(+), 165 deletions(-) create mode 100644 src/mongo/s/transaction_router_resource_yielder.cpp create mode 100644 src/mongo/s/transaction_router_resource_yielder.h (limited to 'src/mongo/s') diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index b4490ae4821..22dab3091e2 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -52,6 +52,7 @@ env.Library( 'router_transactions_stats.idl', 'session_catalog_router.cpp', 'stale_shard_version_helpers.cpp', + 'transaction_router_resource_yielder.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/commands/txn_cmd_request', diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index 77b925d92a9..2a6e4c6ef13 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -61,6 +61,7 @@ #include "mongo/s/session_catalog_router.h" #include "mongo/s/stale_exception.h" #include "mongo/s/transaction_router.h" +#include "mongo/s/transaction_router_resource_yielder.h" #include "mongo/s/would_change_owning_shard_exception.h" #include "mongo/util/timer.h" @@ -134,41 +135,140 @@ void handleWouldChangeOwningShardErrorRetryableWrite( const NamespaceString& nss, const write_ops::FindAndModifyCommandRequest& request, BSONObjBuilder* result) { - auto txn = txn_api::TransactionWithRetries( - opCtx, Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()); + auto txn = + txn_api::TransactionWithRetries(opCtx, + Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), + TransactionRouterResourceYielder::make()); - auto swResult = txn.runSyncNoThrow( - opCtx, - [cmdObj = request.toBSON({}), nss, result](const txn_api::TransactionClient& txnClient, - ExecutorPtr txnExec) { - auto res = txnClient.runCommand(nss.db(), cmdObj).get(); - uassertStatusOK(getStatusFromCommandResult(res)); + // Shared state for the transaction API use below. + struct SharedBlock { + SharedBlock(NamespaceString nss_) : nss(nss_) {} - result->appendElementsUnique( - CommandHelpers::filterCommandReplyForPassthrough(res.removeField("recoveryToken"))); + NamespaceString nss; + BSONObj response; + }; + auto sharedBlock = std::make_shared(nss); - return SemiFuture::makeReady(); + auto swCommitResult = txn.runSyncNoThrow( + opCtx, + [cmdObj = request.toBSON({}), sharedBlock](const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) { + return txnClient.runCommand(sharedBlock->nss.db(), cmdObj) + .thenRunOn(txnExec) + .then([sharedBlock](auto res) { + uassertStatusOK(getStatusFromCommandResult(res)); + + sharedBlock->response = CommandHelpers::filterCommandReplyForPassthrough( + res.removeField("recoveryToken")); + }) + .semi(); }); - auto cmdStatus = swResult.getStatus(); - if (cmdStatus != ErrorCodes::DuplicateKey || - (cmdStatus == ErrorCodes::DuplicateKey && - !cmdStatus.extraInfo()->getKeyPattern().hasField("_id"))) { - cmdStatus.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext); + result->appendElementsUnique( + CommandHelpers::filterCommandReplyForPassthrough(sharedBlock->response)); + + auto bodyStatus = swCommitResult.getStatus(); + if (bodyStatus != ErrorCodes::DuplicateKey || + (bodyStatus == ErrorCodes::DuplicateKey && + !bodyStatus.extraInfo()->getKeyPattern().hasField("_id"))) { + bodyStatus.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext); }; - uassertStatusOK(cmdStatus); + uassertStatusOK(bodyStatus); - const auto& wcError = swResult.getValue().wcError; + uassertStatusOK(swCommitResult.getValue().cmdStatus); + const auto& wcError = swCommitResult.getValue().wcError; if (!wcError.toStatus().isOK()) { appendWriteConcernErrorDetailToCmdResponse(shardId, wcError, *result); } } -void updateShardKeyValueOnWouldChangeOwningShardError(OperationContext* opCtx, - const NamespaceString nss, - Status responseStatus, - const BSONObj& cmdObj, - BSONObjBuilder* result) { +void updateReplyOnWouldChangeOwningShardSuccess(bool matchedDocOrUpserted, + const WouldChangeOwningShardInfo& changeInfo, + bool shouldReturnPostImage, + BSONObjBuilder* result) { + auto upserted = matchedDocOrUpserted && changeInfo.getShouldUpsert(); + auto updatedExistingDocument = matchedDocOrUpserted && !upserted; + + BSONObjBuilder lastErrorObjBuilder(result->subobjStart("lastErrorObject")); + lastErrorObjBuilder.appendNumber("n", matchedDocOrUpserted ? 1 : 0); + lastErrorObjBuilder.appendBool("updatedExisting", updatedExistingDocument); + if (upserted) { + lastErrorObjBuilder.appendAs(changeInfo.getPostImage()["_id"], "upserted"); + } + lastErrorObjBuilder.doneFast(); + + if (updatedExistingDocument) { + result->append( + "value", shouldReturnPostImage ? changeInfo.getPostImage() : changeInfo.getPreImage()); + } else if (upserted && shouldReturnPostImage) { + result->append("value", changeInfo.getPostImage()); + } else { + result->appendNull("value"); + } + result->append("ok", 1.0); +} + +void handleWouldChangeOwningShardErrorTransaction( + OperationContext* opCtx, + const NamespaceString nss, + Status responseStatus, + const write_ops::FindAndModifyCommandRequest& request, + BSONObjBuilder* result) { + + BSONObjBuilder extraInfoBuilder; + responseStatus.extraInfo()->serialize(&extraInfoBuilder); + auto extraInfo = extraInfoBuilder.obj(); + + // Shared state for the transaction API use below. + struct SharedBlock { + SharedBlock(WouldChangeOwningShardInfo changeInfo_, NamespaceString nss_) + : changeInfo(changeInfo_), nss(nss_) {} + + WouldChangeOwningShardInfo changeInfo; + NamespaceString nss; + bool matchedDocOrUpserted{false}; + }; + auto sharedBlock = std::make_shared( + WouldChangeOwningShardInfo::parseFromCommandError(extraInfo), nss); + + try { + auto txn = + txn_api::TransactionWithRetries(opCtx, + Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), + TransactionRouterResourceYielder::make()); + + txn.runSync(opCtx, + [sharedBlock](const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) -> SemiFuture { + return documentShardKeyUpdateUtil::updateShardKeyForDocument( + txnClient, txnExec, sharedBlock->nss, sharedBlock->changeInfo) + .thenRunOn(txnExec) + .then([sharedBlock](bool matchedDocOrUpserted) { + sharedBlock->matchedDocOrUpserted = matchedDocOrUpserted; + }) + .semi(); + }); + + auto shouldReturnPostImage = request.getNew() && *request.getNew(); + updateReplyOnWouldChangeOwningShardSuccess(sharedBlock->matchedDocOrUpserted, + sharedBlock->changeInfo, + shouldReturnPostImage, + result); + } catch (DBException& e) { + if (e.code() == ErrorCodes::DuplicateKey && + e.extraInfo()->getKeyPattern().hasField("_id")) { + e.addContext(documentShardKeyUpdateUtil::kDuplicateKeyErrorContext); + } + e.addContext("findAndModify"); + throw; + } +} + +void handleWouldChangeOwningShardErrorTransactionLegacy(OperationContext* opCtx, + const NamespaceString nss, + Status responseStatus, + const BSONObj& cmdObj, + BSONObjBuilder* result) { BSONObjBuilder extraInfoBuilder; responseStatus.extraInfo()->serialize(&extraInfoBuilder); auto extraInfo = extraInfoBuilder.obj(); @@ -176,31 +276,12 @@ void updateShardKeyValueOnWouldChangeOwningShardError(OperationContext* opCtx, WouldChangeOwningShardInfo::parseFromCommandError(extraInfo); try { - auto matchedDocOrUpserted = documentShardKeyUpdateUtil::updateShardKeyForDocument( + auto matchedDocOrUpserted = documentShardKeyUpdateUtil::updateShardKeyForDocumentLegacy( opCtx, nss, wouldChangeOwningShardExtraInfo); - auto upserted = matchedDocOrUpserted && wouldChangeOwningShardExtraInfo.getShouldUpsert(); - auto updatedExistingDocument = matchedDocOrUpserted && !upserted; - - BSONObjBuilder lastErrorObjBuilder(result->subobjStart("lastErrorObject")); - lastErrorObjBuilder.appendNumber("n", matchedDocOrUpserted ? 1 : 0); - lastErrorObjBuilder.appendBool("updatedExisting", updatedExistingDocument); - if (upserted) { - lastErrorObjBuilder.appendAs(wouldChangeOwningShardExtraInfo.getPostImage()["_id"], - "upserted"); - } - lastErrorObjBuilder.doneFast(); auto shouldReturnPostImage = cmdObj.getBoolField("new"); - if (updatedExistingDocument) { - result->append("value", - shouldReturnPostImage ? wouldChangeOwningShardExtraInfo.getPostImage() - : wouldChangeOwningShardExtraInfo.getPreImage()); - } else if (upserted && shouldReturnPostImage) { - result->append("value", wouldChangeOwningShardExtraInfo.getPostImage()); - } else { - result->appendNull("value"); - } - result->append("ok", 1.0); + updateReplyOnWouldChangeOwningShardSuccess( + matchedDocOrUpserted, wouldChangeOwningShardExtraInfo, shouldReturnPostImage, result); } catch (DBException& e) { if (e.code() == ErrorCodes::DuplicateKey && e.extraInfo()->getKeyPattern().hasField("_id")) { @@ -443,28 +524,34 @@ private: } if (responseStatus.code() == ErrorCodes::WouldChangeOwningShard) { - if (isRetryableWrite) { - if (feature_flags::gFeatureFlagInternalTransactions.isEnabled( - serverGlobalParams.featureCompatibility)) { - auto parsedRequest = write_ops::FindAndModifyCommandRequest::parse( - IDLParserErrorContext("ClusterFindAndModify"), cmdObj); - // Strip write concern because this command will be sent as part of a - // transaction and the write concern has already been loaded onto the opCtx and - // will be picked up by the transaction API. - // - // Strip runtime constants because they will be added again when this command is - // recursively sent through the service entry point. - parsedRequest.setWriteConcern(boost::none); - parsedRequest.setLegacyRuntimeConstants(boost::none); + if (feature_flags::gFeatureFlagInternalTransactions.isEnabled( + serverGlobalParams.featureCompatibility)) { + auto parsedRequest = write_ops::FindAndModifyCommandRequest::parse( + IDLParserErrorContext("ClusterFindAndModify"), cmdObj); + // Strip write concern because this command will be sent as part of a + // transaction and the write concern has already been loaded onto the opCtx and + // will be picked up by the transaction API. + parsedRequest.setWriteConcern(boost::none); + + // Strip runtime constants because they will be added again when this command is + // recursively sent through the service entry point. + parsedRequest.setLegacyRuntimeConstants(boost::none); + if (isRetryableWrite) { handleWouldChangeOwningShardErrorRetryableWrite( opCtx, shardId, nss, parsedRequest, result); } else { + handleWouldChangeOwningShardErrorTransaction( + opCtx, nss, responseStatus, parsedRequest, result); + } + } else { + // TODO SERVER-62375: Remove this branch. + if (isRetryableWrite) { _handleWouldChangeOwningShardErrorRetryableWriteLegacy( opCtx, shardId, shardVersion, dbVersion, nss, cmdObj, result); + } else { + handleWouldChangeOwningShardErrorTransactionLegacy( + opCtx, nss, responseStatus, cmdObj, result); } - } else { - updateShardKeyValueOnWouldChangeOwningShardError( - opCtx, nss, responseStatus, cmdObj, result); } return; @@ -497,7 +584,7 @@ private: // Re-run the findAndModify command that will change the shard key value in a // transaction. We call _runCommand recursively, and this second time through // since it will be run as a transaction it will take the other code path to - // updateShardKeyValueOnWouldChangeOwningShardError. We ensure the retried + // handleWouldChangeOwningShardErrorTransactionLegacy. We ensure the retried // operation does not include WC inside the transaction by stripping it from the // cmdObj. The transaction commit will still use the WC, because it uses the WC // from the opCtx (which has been set previously in Strategy). diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index 2f176bed049..5dbb436c0d2 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -38,6 +38,7 @@ #include "mongo/db/commands/update_metrics.h" #include "mongo/db/commands/write_commands_common.h" #include "mongo/db/curop.h" +#include "mongo/db/internal_transactions_feature_flag_gen.h" #include "mongo/db/not_primary_error_tracker.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/stats/counters.h" @@ -56,6 +57,7 @@ #include "mongo/s/multi_statement_transaction_requests_sender.h" #include "mongo/s/session_catalog_router.h" #include "mongo/s/transaction_router.h" +#include "mongo/s/transaction_router_resource_yielder.h" #include "mongo/s/would_change_owning_shard_exception.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" @@ -148,6 +150,134 @@ boost::optional getWouldChangeOwningShardErrorInfo( return boost::none; } +void handleWouldChangeOwningShardErrorRetryableWrite(OperationContext* opCtx, + BatchedCommandRequest* request, + BatchedCommandResponse* response) { + // Strip write concern because this command will be sent as part of a + // transaction and the write concern has already been loaded onto the opCtx and + // will be picked up by the transaction API. + request->unsetWriteConcern(); + + // Strip runtime constants because they will be added again when the API sends this command + // through the service entry point. + request->unsetLegacyRuntimeConstants(); + + // Unset error details because they will be repopulated below. + response->unsetErrDetails(); + + auto txn = + txn_api::TransactionWithRetries(opCtx, + Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), + TransactionRouterResourceYielder::make()); + + // Shared state for the transaction API use below. + struct SharedBlock { + SharedBlock(NamespaceString nss_) : nss(nss_) {} + + NamespaceString nss; + BSONObj response; + }; + auto sharedBlock = std::make_shared(request->getNS()); + + auto swCommitResult = txn.runSyncNoThrow( + opCtx, + [cmdObj = request->toBSON(), sharedBlock](const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) { + return txnClient.runCommand(sharedBlock->nss.db(), cmdObj) + .thenRunOn(txnExec) + .then([sharedBlock](auto res) { + uassertStatusOK(getStatusFromWriteCommandReply(res)); + + sharedBlock->response = CommandHelpers::filterCommandReplyForPassthrough( + res.removeField("recoveryToken")); + }) + .semi(); + }); + + auto bodyStatus = swCommitResult.getStatus(); + if (!bodyStatus.isOK()) { + if (bodyStatus != ErrorCodes::DuplicateKey || + (bodyStatus == ErrorCodes::DuplicateKey && + !bodyStatus.extraInfo()->getKeyPattern().hasField("_id"))) { + bodyStatus.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext); + }; + + auto error = std::make_unique(); + error->setIndex(0); + error->setStatus(bodyStatus); + response->addToErrDetails(error.release()); + return; + } + + uassertStatusOK(swCommitResult.getValue().cmdStatus); + + // Note this will clear existing response as part of parsing. + std::string errMsg = "Failed to parse response from WouldChangeOwningShard error handling"; + response->parseBSON(sharedBlock->response, &errMsg); + + // Make a unique pointer with a copy of the error detail because + // BatchedCommandResponse::setWriteConcernError() expects a pointer to a heap allocated + // WriteConcernErrorDetail that it can take unique ownership of. + auto writeConcernDetail = + std::make_unique(swCommitResult.getValue().wcError); + if (!writeConcernDetail->toStatus().isOK()) { + response->setWriteConcernError(writeConcernDetail.release()); + } +} + +struct UpdateShardKeyResult { + bool updatedShardKey{false}; + boost::optional upsertedId; +}; + +UpdateShardKeyResult handleWouldChangeOwningShardErrorTransaction( + OperationContext* opCtx, + BatchedCommandRequest* request, + BatchedCommandResponse* response, + const WouldChangeOwningShardInfo& changeInfo) { + // Shared state for the transaction API use below. + struct SharedBlock { + SharedBlock(WouldChangeOwningShardInfo changeInfo_, NamespaceString nss_) + : changeInfo(changeInfo_), nss(nss_) {} + + WouldChangeOwningShardInfo changeInfo; + NamespaceString nss; + bool updatedShardKey{false}; + }; + auto sharedBlock = std::make_shared(changeInfo, request->getNS()); + + auto txn = + txn_api::TransactionWithRetries(opCtx, + Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), + TransactionRouterResourceYielder::make()); + + try { + txn.runSync(opCtx, + [sharedBlock](const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) -> SemiFuture { + return documentShardKeyUpdateUtil::updateShardKeyForDocument( + txnClient, txnExec, sharedBlock->nss, sharedBlock->changeInfo) + .thenRunOn(txnExec) + .then([sharedBlock](bool updatedShardKey) { + sharedBlock->updatedShardKey = updatedShardKey; + }) + .semi(); + }); + } catch (const ExceptionFor& ex) { + Status status = ex->getKeyPattern().hasField("_id") + ? ex.toStatus().withContext(documentShardKeyUpdateUtil::kDuplicateKeyErrorContext) + : ex.toStatus(); + uassertStatusOK(status); + } + + // If the operation was an upsert, record the _id of the new document. + boost::optional upsertedId; + if (sharedBlock->updatedShardKey && sharedBlock->changeInfo.getShouldUpsert()) { + upsertedId = sharedBlock->changeInfo.getPostImage()["_id"].wrap(); + } + return UpdateShardKeyResult{sharedBlock->updatedShardKey, std::move(upsertedId)}; +} + /** * Changes the shard key for the document if the response object contains a WouldChangeOwningShard * error. If the original command was sent as a retryable write, starts a transaction on the same @@ -169,91 +299,112 @@ bool handleWouldChangeOwningShardError(OperationContext* opCtx, bool updatedShardKey = false; boost::optional upsertedId; - if (isRetryableWrite) { - if (MONGO_unlikely(hangAfterThrowWouldChangeOwningShardRetryableWrite.shouldFail())) { - LOGV2(22759, "Hit hangAfterThrowWouldChangeOwningShardRetryableWrite failpoint"); - hangAfterThrowWouldChangeOwningShardRetryableWrite.pauseWhileSet(opCtx); - } - RouterOperationContextSession routerSession(opCtx); - try { - // Start transaction and re-run the original update command - auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); - readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); - - // Ensure the retried operation does not include WC inside the transaction. The - // transaction commit will still use the WC, because it uses the WC from the opCtx - // (which has been set previously in Strategy). - request->unsetWriteConcern(); - - documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx); - // Clear the error details from the response object before sending the write again - response->unsetErrDetails(); - cluster::write(opCtx, *request, &stats, response); - wouldChangeOwningShardErrorInfo = - getWouldChangeOwningShardErrorInfo(opCtx, *request, response, !isRetryableWrite); - if (!wouldChangeOwningShardErrorInfo) - uassertStatusOK(response->toStatus()); - - // If we do not get WouldChangeOwningShard when re-running the update, the document has - // been modified or deleted concurrently and we do not need to delete it and insert a - // new one. - updatedShardKey = wouldChangeOwningShardErrorInfo && - documentShardKeyUpdateUtil::updateShardKeyForDocument( - opCtx, request->getNS(), wouldChangeOwningShardErrorInfo.get()); - - // If the operation was an upsert, record the _id of the new document. - if (updatedShardKey && wouldChangeOwningShardErrorInfo->getShouldUpsert()) { - upsertedId = wouldChangeOwningShardErrorInfo->getPostImage()["_id"].wrap(); + if (feature_flags::gFeatureFlagInternalTransactions.isEnabled( + serverGlobalParams.featureCompatibility)) { + if (isRetryableWrite) { + if (MONGO_unlikely(hangAfterThrowWouldChangeOwningShardRetryableWrite.shouldFail())) { + LOGV2(5918603, "Hit hangAfterThrowWouldChangeOwningShardRetryableWrite failpoint"); + hangAfterThrowWouldChangeOwningShardRetryableWrite.pauseWhileSet(opCtx); } - // Commit the transaction - auto commitResponse = - documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx); + handleWouldChangeOwningShardErrorRetryableWrite(opCtx, request, response); + } else { + auto updateResult = handleWouldChangeOwningShardErrorTransaction( + opCtx, request, response, *wouldChangeOwningShardErrorInfo); + updatedShardKey = updateResult.updatedShardKey; + upsertedId = std::move(updateResult.upsertedId); + } + } else { + // TODO SERVER-62375: Delete this branch. + if (isRetryableWrite) { + if (MONGO_unlikely(hangAfterThrowWouldChangeOwningShardRetryableWrite.shouldFail())) { + LOGV2(22759, "Hit hangAfterThrowWouldChangeOwningShardRetryableWrite failpoint"); + hangAfterThrowWouldChangeOwningShardRetryableWrite.pauseWhileSet(opCtx); + } + RouterOperationContextSession routerSession(opCtx); + try { + // Start transaction and re-run the original update command + auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); + readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + + // Ensure the retried operation does not include WC inside the transaction. The + // transaction commit will still use the WC, because it uses the WC from the opCtx + // (which has been set previously in Strategy). + request->unsetWriteConcern(); + + documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx); + // Clear the error details from the response object before sending the write again + response->unsetErrDetails(); + cluster::write(opCtx, *request, &stats, response); + wouldChangeOwningShardErrorInfo = getWouldChangeOwningShardErrorInfo( + opCtx, *request, response, !isRetryableWrite); + if (!wouldChangeOwningShardErrorInfo) + uassertStatusOK(response->toStatus()); + + // If we do not get WouldChangeOwningShard when re-running the update, the document + // has been modified or deleted concurrently and we do not need to delete it and + // insert a new one. + updatedShardKey = + wouldChangeOwningShardErrorInfo && + documentShardKeyUpdateUtil::updateShardKeyForDocumentLegacy( + opCtx, request->getNS(), wouldChangeOwningShardErrorInfo.get()); + + // If the operation was an upsert, record the _id of the new document. + if (updatedShardKey && wouldChangeOwningShardErrorInfo->getShouldUpsert()) { + upsertedId = wouldChangeOwningShardErrorInfo->getPostImage()["_id"].wrap(); + } - uassertStatusOK(getStatusFromCommandResult(commitResponse)); + // Commit the transaction + auto commitResponse = + documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx); - auto writeConcernDetail = getWriteConcernErrorDetailFromBSONObj(commitResponse); - if (writeConcernDetail && !writeConcernDetail->toStatus().isOK()) - response->setWriteConcernError(writeConcernDetail.release()); - } catch (DBException& e) { - if (e.code() == ErrorCodes::DuplicateKey && - e.extraInfo()->getKeyPattern().hasField("_id")) { - e.addContext(documentShardKeyUpdateUtil::kDuplicateKeyErrorContext); - } else { - e.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext); - } + uassertStatusOK(getStatusFromCommandResult(commitResponse)); - if (!response->isErrDetailsSet() || !response->getErrDetails().back()) { - auto error = std::make_unique(); - error->setIndex(0); - response->addToErrDetails(error.release()); - } + auto writeConcernDetail = getWriteConcernErrorDetailFromBSONObj(commitResponse); + if (writeConcernDetail && !writeConcernDetail->toStatus().isOK()) + response->setWriteConcernError(writeConcernDetail.release()); + } catch (DBException& e) { + if (e.code() == ErrorCodes::DuplicateKey && + e.extraInfo()->getKeyPattern().hasField("_id")) { + e.addContext(documentShardKeyUpdateUtil::kDuplicateKeyErrorContext); + } else { + e.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext); + } - // Set the error status to the status of the failed command and abort the transaction. - auto status = e.toStatus(); - response->getErrDetails().back()->setStatus(status); + if (!response->isErrDetailsSet() || !response->getErrDetails().back()) { + auto error = std::make_unique(); + error->setIndex(0); + response->addToErrDetails(error.release()); + } - auto txnRouterForAbort = TransactionRouter::get(opCtx); - if (txnRouterForAbort) - txnRouterForAbort.implicitlyAbortTransaction(opCtx, status); + // Set the error status to the status of the failed command and abort the + // transaction. + auto status = e.toStatus(); + response->getErrDetails().back()->setStatus(status); - return false; - } - } else { - try { - // Delete the original document and insert the new one - updatedShardKey = documentShardKeyUpdateUtil::updateShardKeyForDocument( - opCtx, request->getNS(), wouldChangeOwningShardErrorInfo.get()); + auto txnRouterForAbort = TransactionRouter::get(opCtx); + if (txnRouterForAbort) + txnRouterForAbort.implicitlyAbortTransaction(opCtx, status); - // If the operation was an upsert, record the _id of the new document. - if (updatedShardKey && wouldChangeOwningShardErrorInfo->getShouldUpsert()) { - upsertedId = wouldChangeOwningShardErrorInfo->getPostImage()["_id"].wrap(); + return false; + } + } else { + try { + // Delete the original document and insert the new one + updatedShardKey = documentShardKeyUpdateUtil::updateShardKeyForDocumentLegacy( + opCtx, request->getNS(), wouldChangeOwningShardErrorInfo.get()); + + // If the operation was an upsert, record the _id of the new document. + if (updatedShardKey && wouldChangeOwningShardErrorInfo->getShouldUpsert()) { + upsertedId = wouldChangeOwningShardErrorInfo->getPostImage()["_id"].wrap(); + } + } catch (const ExceptionFor& ex) { + Status status = ex->getKeyPattern().hasField("_id") + ? ex.toStatus().withContext( + documentShardKeyUpdateUtil::kDuplicateKeyErrorContext) + : ex.toStatus(); + uassertStatusOK(status); } - } catch (const ExceptionFor& ex) { - Status status = ex->getKeyPattern().hasField("_id") - ? ex.toStatus().withContext(documentShardKeyUpdateUtil::kDuplicateKeyErrorContext) - : ex.toStatus(); - uassertStatusOK(status); } } diff --git a/src/mongo/s/commands/document_shard_key_update_test.cpp b/src/mongo/s/commands/document_shard_key_update_test.cpp index dcaf62de0db..4c297ce7bc8 100644 --- a/src/mongo/s/commands/document_shard_key_update_test.cpp +++ b/src/mongo/s/commands/document_shard_key_update_test.cpp @@ -53,7 +53,7 @@ TEST_F(DocumentShardKeyUpdateTest, constructShardKeyDeleteCmdObj) { NamespaceString nss("test.foo"); BSONObj updatePreImage = BSON("x" << 4 << "y" << 3 << "_id" << 20); - auto deleteCmdObj = constructShardKeyDeleteCmdObj(nss, updatePreImage); + auto deleteCmdObj = constructShardKeyDeleteCmdObj(nss, updatePreImage, boost::none); auto deletesObj = deleteCmdObj["deletes"].Array(); ASSERT_EQ(deletesObj.size(), 1U); @@ -69,7 +69,7 @@ TEST_F(DocumentShardKeyUpdateTest, constructShardKeyInsertCmdObj) { NamespaceString nss("test.foo"); BSONObj updatePostImage = BSON("x" << 4 << "y" << 3 << "_id" << 20); - auto insertCmdObj = constructShardKeyInsertCmdObj(nss, updatePostImage); + auto insertCmdObj = constructShardKeyInsertCmdObj(nss, updatePostImage, boost::none); auto insertsObj = insertCmdObj["documents"].Array(); ASSERT_EQ(insertsObj.size(), 1U); diff --git a/src/mongo/s/commands/document_shard_key_update_util.cpp b/src/mongo/s/commands/document_shard_key_update_util.cpp index 629c29ebca5..4524dc78689 100644 --- a/src/mongo/s/commands/document_shard_key_update_util.cpp +++ b/src/mongo/s/commands/document_shard_key_update_util.cpp @@ -102,7 +102,8 @@ bool executeOperationsAsPartOfShardKeyUpdate(OperationContext* opCtx, * original document _id retrieved from 'updatePreImage'. */ write_ops::DeleteCommandRequest createShardKeyDeleteOp(const NamespaceString& nss, - const BSONObj& updatePreImage) { + const BSONObj& updatePreImage, + boost::optional stmtId) { write_ops::DeleteCommandRequest deleteOp(nss); deleteOp.setDeletes({[&] { write_ops::DeleteOpEntry entry; @@ -110,7 +111,9 @@ write_ops::DeleteCommandRequest createShardKeyDeleteOp(const NamespaceString& ns entry.setMulti(false); return entry; }()}); - deleteOp.getWriteCommandRequestBase().setStmtId(1); + if (stmtId) { + deleteOp.getWriteCommandRequestBase().setStmtId(*stmtId); + } return deleteOp; } @@ -119,10 +122,13 @@ write_ops::DeleteCommandRequest createShardKeyDeleteOp(const NamespaceString& ns * Creates the insert op that will be used to insert the new document with the post-update image. */ write_ops::InsertCommandRequest createShardKeyInsertOp(const NamespaceString& nss, - const BSONObj& updatePostImage) { + const BSONObj& updatePostImage, + boost::optional stmtId) { write_ops::InsertCommandRequest insertOp(nss); insertOp.setDocuments({updatePostImage}); - insertOp.getWriteCommandRequestBase().setStmtId(2); + if (stmtId) { + insertOp.getWriteCommandRequestBase().setStmtId(*stmtId); + } return insertOp; } @@ -130,14 +136,14 @@ write_ops::InsertCommandRequest createShardKeyInsertOp(const NamespaceString& ns namespace documentShardKeyUpdateUtil { -bool updateShardKeyForDocument(OperationContext* opCtx, - const NamespaceString& nss, - const WouldChangeOwningShardInfo& documentKeyChangeInfo) { +bool updateShardKeyForDocumentLegacy(OperationContext* opCtx, + const NamespaceString& nss, + const WouldChangeOwningShardInfo& documentKeyChangeInfo) { auto updatePreImage = documentKeyChangeInfo.getPreImage().getOwned(); auto updatePostImage = documentKeyChangeInfo.getPostImage().getOwned(); - auto deleteCmdObj = constructShardKeyDeleteCmdObj(nss, updatePreImage); - auto insertCmdObj = constructShardKeyInsertCmdObj(nss, updatePostImage); + auto deleteCmdObj = constructShardKeyDeleteCmdObj(nss, updatePreImage, boost::none); + auto insertCmdObj = constructShardKeyInsertCmdObj(nss, updatePostImage, boost::none); return executeOperationsAsPartOfShardKeyUpdate( opCtx, deleteCmdObj, insertCmdObj, nss.db(), documentKeyChangeInfo.getShouldUpsert()); @@ -160,15 +166,83 @@ BSONObj commitShardKeyUpdateTransaction(OperationContext* opCtx) { return txnRouter.commitTransaction(opCtx, boost::none); } -BSONObj constructShardKeyDeleteCmdObj(const NamespaceString& nss, const BSONObj& updatePreImage) { - auto deleteOp = createShardKeyDeleteOp(nss, updatePreImage); +BSONObj constructShardKeyDeleteCmdObj(const NamespaceString& nss, + const BSONObj& updatePreImage, + boost::optional stmtId) { + auto deleteOp = createShardKeyDeleteOp(nss, updatePreImage, stmtId); return deleteOp.toBSON({}); } -BSONObj constructShardKeyInsertCmdObj(const NamespaceString& nss, const BSONObj& updatePostImage) { - auto insertOp = createShardKeyInsertOp(nss, updatePostImage); +BSONObj constructShardKeyInsertCmdObj(const NamespaceString& nss, + const BSONObj& updatePostImage, + boost::optional stmtId) { + auto insertOp = createShardKeyInsertOp(nss, updatePostImage, stmtId); return insertOp.toBSON({}); } +SemiFuture updateShardKeyForDocument(const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec, + const NamespaceString& nss, + const WouldChangeOwningShardInfo& changeInfo) { + // Use stmtId=1 for this delete (and 2 for the subsequent insert) because the original + // update/findAndModify that threw the WouldChangeOwningShard error used stmtId=0 to store the + // WouldChangeOwningShard sentinel noop entry. + auto deleteCmdObj = documentShardKeyUpdateUtil::constructShardKeyDeleteCmdObj( + nss, changeInfo.getPreImage().getOwned(), {1}); + auto deleteOpMsg = OpMsgRequest::fromDBAndBody(nss.db(), std::move(deleteCmdObj)); + auto deleteRequest = BatchedCommandRequest::parseDelete(std::move(deleteOpMsg)); + + return txnClient.runCRUDOp(deleteRequest, {}) + .thenRunOn(txnExec) + .then([&txnClient, &nss, &changeInfo]( + auto deleteResponse) -> SemiFuture { + uassertStatusOK(deleteResponse.toStatus()); + + // If shouldUpsert is true, this means the original command specified {upsert: + // true} and did not match any docs, so we should not match any when doing + // this delete. If shouldUpsert is false and we do not delete any document, + // this is essentially equivalent to not matching a doc and we should not + // insert. + if (changeInfo.getShouldUpsert()) { + uassert(ErrorCodes::ConflictingOperationInProgress, + "Delete matched a document when it should not have.", + deleteResponse.getN() == 0); + } else if (deleteResponse.getN() != 1) { + iassert(Status(ErrorCodes::WouldChangeOwningShardDeletedNoDocument, + "When handling WouldChangeOwningShard error, the delete matched no " + "documents and should not upsert")); + } + + if (MONGO_unlikely(hangBeforeInsertOnUpdateShardKey.shouldFail())) { + LOGV2(5918602, "Hit hangBeforeInsertOnUpdateShardKey failpoint"); + hangBeforeInsertOnUpdateShardKey.pauseWhileSet(); + } + + auto insertCmdObj = documentShardKeyUpdateUtil::constructShardKeyInsertCmdObj( + nss, changeInfo.getPostImage().getOwned(), {2}); + auto insertOpMsg = OpMsgRequest::fromDBAndBody(nss.db(), std::move(insertCmdObj)); + auto insertRequest = BatchedCommandRequest::parseInsert(std::move(insertOpMsg)); + + return txnClient.runCRUDOp(insertRequest, {}); + }) + .thenRunOn(txnExec) + .then([&nss](auto insertResponse) { + uassertStatusOK(insertResponse.toStatus()); + + uassert(ErrorCodes::NamespaceNotFound, + "Document not successfully inserted while changing shard key for namespace " + + nss.ns(), + insertResponse.getN() == 1); + + return true; + }) + .onError([](Status status) { + // We failed to delete a document and were not configured to upsert, so the insert + // was never sent. Propagate that failure by returning false. + return false; + }) + .semi(); +} + } // namespace documentShardKeyUpdateUtil } // namespace mongo diff --git a/src/mongo/s/commands/document_shard_key_update_util.h b/src/mongo/s/commands/document_shard_key_update_util.h index 2684c7de8a0..31825eece60 100644 --- a/src/mongo/s/commands/document_shard_key_update_util.h +++ b/src/mongo/s/commands/document_shard_key_update_util.h @@ -35,6 +35,7 @@ #include "mongo/db/logical_session_id.h" #include "mongo/db/ops/write_ops.h" +#include "mongo/db/transaction_api.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/s/transaction_router.h" @@ -65,14 +66,32 @@ static constexpr StringData kNonDuplicateKeyErrorContext = "transaction failed."_sd; /** + * TODO SERVER-62375: Remove this function. + * * Coordinating method and external point of entry for updating a document's shard key. This method * creates the necessary extra operations. It will then run each operation using the ClusterWriter. * If any statement throws, an exception will leave this method, and must be handled by external * callers. */ -bool updateShardKeyForDocument(OperationContext* opCtx, - const NamespaceString& nss, - const WouldChangeOwningShardInfo& documentKeyChangeInfo); +bool updateShardKeyForDocumentLegacy(OperationContext* opCtx, + const NamespaceString& nss, + const WouldChangeOwningShardInfo& documentKeyChangeInfo); + +/** + * Coordinating method and external point of entry for updating a document's shard key. This method + * creates the necessary extra operations. It will then run each operation using the given + * transaction client. If any statement throws, an exception will leave this method, and must be + * handled by external callers. + * + * Returns an error on any error returned by a command. If the original update was sent with + * {upsert: false}, returns whether or not we deleted the original doc and inserted the new one + * sucessfully. If the original update was sent with {upsert: true}, returns whether or not we + * inserted the new doc successfully. + */ +SemiFuture updateShardKeyForDocument(const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec, + const NamespaceString& nss, + const WouldChangeOwningShardInfo& changeInfo); /** * Starts a transaction on this session. This method is called when WouldChangeOwningShard is thrown @@ -93,7 +112,9 @@ BSONObj commitShardKeyUpdateTransaction(OperationContext* opCtx); * This method should not be called outside of this class. It is only temporarily exposed for * intermediary test coverage. */ -BSONObj constructShardKeyDeleteCmdObj(const NamespaceString& nss, const BSONObj& updatePreImage); +BSONObj constructShardKeyDeleteCmdObj(const NamespaceString& nss, + const BSONObj& updatePreImage, + boost::optional stmtId); /* * Creates the BSONObj that will be used to insert the new document with the post-update image. @@ -102,6 +123,8 @@ BSONObj constructShardKeyDeleteCmdObj(const NamespaceString& nss, const BSONObj& * This method should not be called outside of this class. It is only temporarily exposed for * intermediary test coverage. */ -BSONObj constructShardKeyInsertCmdObj(const NamespaceString& nss, const BSONObj& updatePostImage); +BSONObj constructShardKeyInsertCmdObj(const NamespaceString& nss, + const BSONObj& updatePostImage, + boost::optional stmtId); } // namespace documentShardKeyUpdateUtil } // namespace mongo diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 8ad6fc25a61..181411e9c38 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -191,12 +191,20 @@ Future invokeInTransactionRouter(std::shared_ptr ErrorCodes::isNeedRetargettingError(code) || code == ErrorCodes::ShardInvalidatedForTargeting || code == ErrorCodes::StaleDbVersion || - code == ErrorCodes::ShardCannotRefreshDueToLocksHeld) { + code == ErrorCodes::ShardCannotRefreshDueToLocksHeld || + code == ErrorCodes::WouldChangeOwningShard) { // Don't abort on possibly retryable errors. return; } auto opCtx = rec->getOpCtx(); + + auto txnRouter = TransactionRouter::get(opCtx); + if (!txnRouter) { + // The command had yielded its session while the error was thrown. + return; + } + TransactionRouter::get(opCtx).implicitlyAbortTransaction(opCtx, status); }); } @@ -435,6 +443,8 @@ private: // Logs and updates statistics if an error occurs. void _tapOnError(const Status& status); + void _tapOnSuccess(); + ParseAndRunCommand* const _parc; boost::optional _routerSession; @@ -910,8 +920,16 @@ void ParseAndRunCommand::RunAndRetry::_checkRetryForTransaction(Status& status) // be retried on. auto opCtx = _parc->_rec->getOpCtx(); auto txnRouter = TransactionRouter::get(opCtx); - if (!txnRouter) + if (!txnRouter) { + if (opCtx->inMultiDocumentTransaction()) { + // This command must have failed while its session was yielded. We cannot retry in this + // case, whatever the session was yielded to is responsible for that, so rethrow the + // error. + iassert(status); + } + return; + } ScopeGuard abortGuard([&] { txnRouter.implicitlyAbortTransaction(opCtx, status); }); @@ -1067,6 +1085,15 @@ void ParseAndRunCommand::RunInvocation::_tapOnError(const Status& status) { _parc->_errorBuilder->appendElements(errorLabels); } +void ParseAndRunCommand::RunInvocation::_tapOnSuccess() { + auto opCtx = _parc->_rec->getOpCtx(); + if (_parc->_osi && _parc->_osi->getAutocommit() == boost::optional(false)) { + tassert(5918604, + "A successful transaction command must always check out its session after yielding", + TransactionRouter::get(opCtx)); + } +} + Future ParseAndRunCommand::RunAndRetry::run() { return makeReadyFutureWith([&] { // Try kMaxNumStaleVersionRetries times. On the last try, exceptions are rethrown. @@ -1107,6 +1134,7 @@ Future ParseAndRunCommand::RunInvocation::run() { return future_util::makeState(_parc).thenWithState( [](auto* runner) { return runner->run(); }); }) + .tap([this]() { _tapOnSuccess(); }) .tapError([this](Status status) { _tapOnError(status); }); } diff --git a/src/mongo/s/session_catalog_router.cpp b/src/mongo/s/session_catalog_router.cpp index d0a8da6984b..1207f371dbf 100644 --- a/src/mongo/s/session_catalog_router.cpp +++ b/src/mongo/s/session_catalog_router.cpp @@ -72,7 +72,24 @@ RouterOperationContextSession::RouterOperationContextSession(OperationContext* o : _opCtx(opCtx), _operationContextSession(opCtx) {} RouterOperationContextSession::~RouterOperationContextSession() { - TransactionRouter::get(_opCtx).stash(_opCtx); + if (auto txnRouter = TransactionRouter::get(_opCtx)) { + // Only stash if the session wasn't yielded. + txnRouter.stash(_opCtx); + } }; +void RouterOperationContextSession::checkIn(OperationContext* opCtx) { + invariant(OperationContextSession::get(opCtx)); + + TransactionRouter::get(opCtx).stash(opCtx); + OperationContextSession::checkIn(opCtx); +} + +void RouterOperationContextSession::checkOut(OperationContext* opCtx) { + invariant(!OperationContextSession::get(opCtx)); + + OperationContextSession::checkOut(opCtx); + TransactionRouter::get(opCtx).unstash(opCtx); +} + } // namespace mongo diff --git a/src/mongo/s/session_catalog_router.h b/src/mongo/s/session_catalog_router.h index ff54747aeed..301b40fffca 100644 --- a/src/mongo/s/session_catalog_router.h +++ b/src/mongo/s/session_catalog_router.h @@ -61,6 +61,16 @@ public: RouterOperationContextSession(OperationContext* opCtx); ~RouterOperationContextSession(); + /** + * These methods take an operation context with a checked-out session and allow it to be + * temporarily or permanently checked back in, in order to allow other operations to use it. + * + * Check-in may only be called if the session has actually been checked out previously and + * similarly check-out may only be called if the session is not checked out already. + */ + static void checkIn(OperationContext* opCtx); + static void checkOut(OperationContext* opCtx); + private: OperationContext* _opCtx; OperationContextSession _operationContextSession; diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index 70a99dc4eb9..198e1209212 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -498,7 +498,9 @@ void TransactionRouter::Router::processParticipantResponse(OperationContext* opC } auto commandStatus = getStatusFromCommandResult(responseObj); - if (!commandStatus.isOK()) { + // WouldChangeOwningShard errors don't abort their transaction and the responses containing them + // include transaction metadata, so we treat them as successful responses. + if (!commandStatus.isOK() && commandStatus != ErrorCodes::WouldChangeOwningShard) { return; } @@ -1046,6 +1048,19 @@ void TransactionRouter::Router::stash(OperationContext* opCtx) { o(lk).metricsTracker->trySetInactive(tickSource, tickSource->getTicks()); } +void TransactionRouter::Router::unstash(OperationContext* opCtx) { + if (!isInitialized()) { + return; + } + + // TODO SERVER-64052: Validate that the transaction number hasn't changed and metrics are + // updated appropriately. + + auto tickSource = opCtx->getServiceContext()->getTickSource(); + stdx::lock_guard lk(*opCtx->getClient()); + o(lk).metricsTracker->trySetActive(tickSource, tickSource->getTicks()); +} + BSONObj TransactionRouter::Router::_handOffCommitToCoordinator(OperationContext* opCtx) { invariant(o().coordinatorId); auto coordinatorIter = o().participants.find(*o().coordinatorId); @@ -1247,7 +1262,7 @@ BSONObj TransactionRouter::Router::abortTransaction(OperationContext* opCtx) { 3, "{sessionId}:{txnNumber} Aborting transaction on {numParticipantShards} shard(s)", "Aborting transaction on all participant shards", - "sessionId"_attr = _sessionId().getId(), + "sessionId"_attr = _sessionId(), "txnNumber"_attr = o().txnNumberAndRetryCounter.getTxnNumber(), "txnRetryCounter"_attr = o().txnNumberAndRetryCounter.getTxnRetryCounter(), "numParticipantShards"_attr = o().participants.size()); @@ -1295,7 +1310,7 @@ void TransactionRouter::Router::implicitlyAbortTransaction(OperationContext* opC "may have been handed off to the coordinator", "Not sending implicit abortTransaction to participant shards after error because " "coordinating the commit decision may have been handed off to the coordinator shard", - "sessionId"_attr = _sessionId().getId(), + "sessionId"_attr = _sessionId(), "txnNumber"_attr = o().txnNumberAndRetryCounter.getTxnNumber(), "txnRetryCounter"_attr = o().txnNumberAndRetryCounter.getTxnRetryCounter(), "error"_attr = redact(status)); @@ -1324,7 +1339,7 @@ void TransactionRouter::Router::implicitlyAbortTransaction(OperationContext* opC "{sessionId}:{txnNumber} Implicitly aborting transaction on {numParticipantShards} " "shard(s) due to error: {error}", "Implicitly aborting transaction on all participant shards", - "sessionId"_attr = _sessionId().getId(), + "sessionId"_attr = _sessionId(), "txnNumber"_attr = o().txnNumberAndRetryCounter.getTxnNumber(), "txnRetryCounter"_attr = o().txnNumberAndRetryCounter.getTxnRetryCounter(), "numParticipantShards"_attr = o().participants.size(), @@ -1342,7 +1357,7 @@ void TransactionRouter::Router::implicitlyAbortTransaction(OperationContext* opC 3, "{sessionId}:{txnNumber} Implicitly aborting transaction failed {error}", "Implicitly aborting transaction failed", - "sessionId"_attr = _sessionId().getId(), + "sessionId"_attr = _sessionId(), "txnNumber"_attr = o().txnNumberAndRetryCounter.getTxnNumber(), "txnRetryCounter"_attr = o().txnNumberAndRetryCounter.getTxnRetryCounter(), "error"_attr = ex); diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h index b3021f0aaa9..9da80bd9afd 100644 --- a/src/mongo/s/transaction_router.h +++ b/src/mongo/s/transaction_router.h @@ -376,6 +376,11 @@ public: */ void stash(OperationContext* opCtx); + /** + * Validates transaction state is still compatible after a yield. + */ + void unstash(OperationContext* opCtx); + /** * Attaches the required transaction related fields for a request to be sent to the given * shard. diff --git a/src/mongo/s/transaction_router_resource_yielder.cpp b/src/mongo/s/transaction_router_resource_yielder.cpp new file mode 100644 index 00000000000..252231483a4 --- /dev/null +++ b/src/mongo/s/transaction_router_resource_yielder.cpp @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/s/transaction_router_resource_yielder.h" + +#include "mongo/db/session_catalog.h" +#include "mongo/s/session_catalog_router.h" + +namespace mongo { + +std::unique_ptr TransactionRouterResourceYielder::make() { + return std::make_unique(); +} + +void TransactionRouterResourceYielder::yield(OperationContext* opCtx) { + Session* const session = OperationContextSession::get(opCtx); + if (session) { + RouterOperationContextSession::checkIn(opCtx); + } + _yielded = (session != nullptr); +} + +void TransactionRouterResourceYielder::unyield(OperationContext* opCtx) { + if (_yielded) { + RouterOperationContextSession::checkOut(opCtx); + } +} + +} // namespace mongo diff --git a/src/mongo/s/transaction_router_resource_yielder.h b/src/mongo/s/transaction_router_resource_yielder.h new file mode 100644 index 00000000000..173c588c73a --- /dev/null +++ b/src/mongo/s/transaction_router_resource_yielder.h @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/operation_context.h" +#include "mongo/db/resource_yielder.h" + +namespace mongo { + +/** + * Implementation of ResourceYielder that yields resources checked out in the course of running a + * distributed transaction. + */ +class TransactionRouterResourceYielder : public ResourceYielder { +public: + /** + * Returns a newly allocated yielder. + */ + static std::unique_ptr make(); + + /** + * If the opCtx has a checked out RouterOperationContextSession, yields it. + */ + void yield(OperationContext* opCtx) override; + + /** + * If the opCtx had previously checked out RouterOperationContextSession, checks it back out. + * Note this may throw if the opCtx has been interrupted. + */ + void unyield(OperationContext* opCtx) override; + +private: + bool _yielded{false}; +}; + +} // namespace mongo diff --git a/src/mongo/s/write_ops/batched_command_request.cpp b/src/mongo/s/write_ops/batched_command_request.cpp index ced74997591..c916b5b58ba 100644 --- a/src/mongo/s/write_ops/batched_command_request.cpp +++ b/src/mongo/s/write_ops/batched_command_request.cpp @@ -130,6 +130,13 @@ void BatchedCommandRequest::setLegacyRuntimeConstants(LegacyRuntimeConstants run }}); } +void BatchedCommandRequest::unsetLegacyRuntimeConstants() { + _visit(visit_helper::Overloaded{ + [](write_ops::InsertCommandRequest&) {}, + [&](write_ops::UpdateCommandRequest& op) { op.setLegacyRuntimeConstants(boost::none); }, + [&](write_ops::DeleteCommandRequest& op) { op.setLegacyRuntimeConstants(boost::none); }}); +} + const boost::optional& BatchedCommandRequest::getLegacyRuntimeConstants() const { struct Visitor { diff --git a/src/mongo/s/write_ops/batched_command_request.h b/src/mongo/s/write_ops/batched_command_request.h index a7e02f2e58a..f900d52140a 100644 --- a/src/mongo/s/write_ops/batched_command_request.h +++ b/src/mongo/s/write_ops/batched_command_request.h @@ -138,6 +138,8 @@ public: void setLegacyRuntimeConstants(LegacyRuntimeConstants runtimeConstants); + void unsetLegacyRuntimeConstants(); + bool hasLegacyRuntimeConstants() const; const boost::optional& getLegacyRuntimeConstants() const; -- cgit v1.2.1