diff options
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 222 |
1 files changed, 138 insertions, 84 deletions
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 58aec2a488b..6b17d5fff44 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -568,28 +568,6 @@ void appendErrorLabelsAndTopologyVersion(OperationContext* opCtx, topologyVersion.serialize(&topologyVersionBuilder); } -void _abortUnpreparedOrStashPreparedTransaction( - OperationContext* opCtx, TransactionParticipant::Participant* txnParticipant) { - const bool isPrepared = txnParticipant->transactionIsPrepared(); - try { - if (isPrepared) - txnParticipant->stashTransactionResources(opCtx); - else if (txnParticipant->transactionIsOpen()) - txnParticipant->abortTransaction(opCtx); - } catch (...) { - // It is illegal for this to throw so we catch and log this here for diagnosability. - LOGV2_FATAL_CONTINUE(21974, - "Caught exception during transaction {txnNumber} {operation} " - "{logicalSessionId}: {error}", - "Unable to stash/abort transaction", - "operation"_attr = (isPrepared ? "stash" : "abort"), - "txnNumber"_attr = opCtx->getTxnNumber(), - "logicalSessionId"_attr = opCtx->getLogicalSessionId()->toBSON(), - "error"_attr = exceptionToStatus()); - std::terminate(); - } -} - class ExecCommandDatabase : public std::enable_shared_from_this<ExecCommandDatabase> { public: explicit ExecCommandDatabase(std::shared_ptr<HandleRequest::ExecutionContext> execContext) @@ -746,33 +724,99 @@ private: const bool _shouldWaitForWriteConcern; }; -Future<void> invokeWithNoSession(std::shared_ptr<ExecCommandDatabase> ecd) { - auto execContext = ecd->getExecutionContext(); - OperationContext* opCtx = execContext->getOpCtx(); - const OpMsgRequest& request = execContext->getRequest(); - CommandInvocation* invocation = ecd->getInvocation().get(); - rpc::ReplyBuilderInterface* replyBuilder = execContext->getReplyBuilder(); - tenant_migration_donor::migrationConflictHandler( - opCtx, - request.getDatabase(), - [&] { CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); }, - replyBuilder); - return Status::OK(); +// Simplifies the interface for invoking commands and allows asynchronous execution of command +// invocations. +class InvokeCommand : public std::enable_shared_from_this<InvokeCommand> { +public: + explicit InvokeCommand(std::shared_ptr<ExecCommandDatabase> ecd) : _ecd(std::move(ecd)) {} + + Future<void> run(const bool checkoutSession); + +private: + class SessionCheckoutPath; + + Future<void> _runInvocation(); + + const std::shared_ptr<ExecCommandDatabase> _ecd; +}; + +class InvokeCommand::SessionCheckoutPath + : public std::enable_shared_from_this<InvokeCommand::SessionCheckoutPath> { +public: + SessionCheckoutPath(std::shared_ptr<InvokeCommand> parent) : _parent(std::move(parent)) {} + + Future<void> run(); + +private: + void _cleanupIncompleteTxn(); + + Future<void> _checkOutSession(); + void _tapError(Status); + Future<void> _commitInvocation(); + + const std::shared_ptr<InvokeCommand> _parent; + + std::unique_ptr<MongoDOperationContextSession> _sessionTxnState; + boost::optional<TransactionParticipant::Participant> _txnParticipant; + boost::optional<ScopeGuard<std::function<void()>>> _guard; +}; + +Future<void> InvokeCommand::run(const bool checkoutSession) { + auto [past, present] = makePromiseFuture<void>(); + auto future = std::move(present).then([this, checkoutSession, anchor = shared_from_this()] { + if (checkoutSession) + return std::make_shared<SessionCheckoutPath>(std::move(anchor))->run(); + return _runInvocation(); + }); + past.emplaceValue(); + return future; } -Future<void> invokeWithSessionCheckedOut(std::shared_ptr<ExecCommandDatabase> ecd) { +Future<void> InvokeCommand::SessionCheckoutPath::run() { + auto anchor = shared_from_this(); + return makeReadyFutureWith([] {}) + .then([this, anchor] { return _checkOutSession(); }) + .then([this, anchor] { + return _parent->_runInvocation().tapError( + [this, anchor](Status status) { return _tapError(std::move(status)); }); + }) + .then([this, anchor] { return _commitInvocation(); }); +} + +void InvokeCommand::SessionCheckoutPath::_cleanupIncompleteTxn() { + auto opCtx = _parent->_ecd->getExecutionContext()->getOpCtx(); + const bool isPrepared = _txnParticipant->transactionIsPrepared(); + try { + if (isPrepared) + _txnParticipant->stashTransactionResources(opCtx); + else if (_txnParticipant->transactionIsOpen()) + _txnParticipant->abortTransaction(opCtx); + } catch (...) { + // It is illegal for this to throw so we catch and log this here for diagnosability. + LOGV2_FATAL_CONTINUE(21974, + "Caught exception during transaction {txnNumber} {operation} " + "{logicalSessionId}: {error}", + "Unable to stash/abort transaction", + "operation"_attr = (isPrepared ? "stash" : "abort"), + "txnNumber"_attr = opCtx->getTxnNumber(), + "logicalSessionId"_attr = opCtx->getLogicalSessionId()->toBSON(), + "error"_attr = exceptionToStatus()); + std::terminate(); + } +} + +Future<void> InvokeCommand::SessionCheckoutPath::_checkOutSession() { + auto ecd = _parent->_ecd; auto execContext = ecd->getExecutionContext(); - OperationContext* opCtx = execContext->getOpCtx(); - const OpMsgRequest& request = execContext->getRequest(); + auto opCtx = execContext->getOpCtx(); CommandInvocation* invocation = ecd->getInvocation().get(); const OperationSessionInfoFromClient& sessionOptions = ecd->getSessionOptions(); - rpc::ReplyBuilderInterface* replyBuilder = execContext->getReplyBuilder(); // This constructor will check out the session. It handles the appropriate state management // for both multi-statement transactions and retryable writes. Currently, only requests with // a transaction number will check out the session. - MongoDOperationContextSession sessionTxnState(opCtx); - auto txnParticipant = TransactionParticipant::get(opCtx); + _sessionTxnState = std::make_unique<MongoDOperationContextSession>(opCtx); + _txnParticipant.emplace(TransactionParticipant::get(opCtx)); if (!opCtx->getClient()->isInDirectClient()) { bool beganOrContinuedTxn{false}; @@ -780,13 +824,13 @@ Future<void> invokeWithSessionCheckedOut(std::shared_ptr<ExecCommandDatabase> ec // transaction on that session. while (!beganOrContinuedTxn) { try { - txnParticipant.beginOrContinue(opCtx, - *sessionOptions.getTxnNumber(), - sessionOptions.getAutocommit(), - sessionOptions.getStartTransaction()); + _txnParticipant->beginOrContinue(opCtx, + *sessionOptions.getTxnNumber(), + sessionOptions.getAutocommit(), + sessionOptions.getStartTransaction()); beganOrContinuedTxn = true; } catch (const ExceptionFor<ErrorCodes::PreparedTransactionInProgress>&) { - auto prepareCompleted = txnParticipant.onExitPrepare(); + auto prepareCompleted = _txnParticipant->onExitPrepare(); CurOpFailpointHelpers::waitWhileFailPointEnabled( &waitAfterNewStatementBlocksBehindPrepare, @@ -818,21 +862,19 @@ Future<void> invokeWithSessionCheckedOut(std::shared_ptr<ExecCommandDatabase> ec // transactions on failure to unstash the transaction resources to opCtx. We don't want to // have this error guard for beginOrContinue as it can abort the transaction for any // accidental invalid statements in the transaction. - auto abortOnError = makeGuard([&txnParticipant, opCtx] { - if (txnParticipant.transactionIsInProgress()) { - txnParticipant.abortTransaction(opCtx); + auto abortOnError = makeGuard([&] { + if (_txnParticipant->transactionIsInProgress()) { + _txnParticipant->abortTransaction(opCtx); } }); - txnParticipant.unstashTransactionResources(opCtx, invocation->definition()->getName()); + _txnParticipant->unstashTransactionResources(opCtx, invocation->definition()->getName()); // Unstash success. abortOnError.dismiss(); } - auto guard = makeGuard([opCtx, &txnParticipant] { - _abortUnpreparedOrStashPreparedTransaction(opCtx, &txnParticipant); - }); + _guard.emplace([this] { _cleanupIncompleteTxn(); }); if (!opCtx->getClient()->isInDirectClient()) { const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); @@ -857,23 +899,40 @@ Future<void> invokeWithSessionCheckedOut(std::shared_ptr<ExecCommandDatabase> ec (cmdName == "create"_sd || cmdName == "createIndexes"_sd)) { if (!readConcernSupport.readConcernSupport.isOK()) { uassertStatusOK(readConcernSupport.readConcernSupport.withContext( - str::stream() << "Command " << cmdName - << " does not support this transaction's " - << readConcernArgs.toString())); + "Command {} does not support this transaction's {}"_format( + cmdName, readConcernArgs.toString()))); } } } // Use the API parameters that were stored when the transaction was initiated. - APIParameters::get(opCtx) = txnParticipant.getAPIParameters(opCtx); + APIParameters::get(opCtx) = _txnParticipant->getAPIParameters(opCtx); - try { - tenant_migration_donor::migrationConflictHandler( - opCtx, - request.getDatabase(), - [&] { CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); }, - replyBuilder); - } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>&) { + return Status::OK(); +} + +Future<void> InvokeCommand::_runInvocation() try { + auto execContext = _ecd->getExecutionContext(); + OperationContext* opCtx = execContext->getOpCtx(); + const OpMsgRequest& request = execContext->getRequest(); + CommandInvocation* invocation = _ecd->getInvocation().get(); + rpc::ReplyBuilderInterface* replyBuilder = execContext->getReplyBuilder(); + + tenant_migration_donor::migrationConflictHandler( + opCtx, + request.getDatabase(), + [&] { CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); }, + replyBuilder); + + return Status::OK(); +} catch (const DBException& ex) { + return ex.toStatus(); +} + +void InvokeCommand::SessionCheckoutPath::_tapError(Status status) { + auto opCtx = _parent->_ecd->getExecutionContext()->getOpCtx(); + const OperationSessionInfoFromClient& sessionOptions = _parent->_ecd->getSessionOptions(); + if (status.code() == ErrorCodes::CommandOnShardedViewNotSupportedOnMongod) { // Exceptions are used to resolve views in a sharded cluster, so they should be handled // specially to avoid unnecessary aborts. @@ -886,21 +945,23 @@ Future<void> invokeWithSessionCheckedOut(std::shared_ptr<ExecCommandDatabase> ec // avoid leaving it orphaned in this case, which is fine even if it is re-targeted // because the retry will include "startTransaction" again and "restart" a transaction // at the active txnNumber. - throw; + return; } // If this shard has completed an earlier statement for this transaction, it must already be // in the transaction's participant list, so it is guaranteed to learn its outcome. - txnParticipant.stashTransactionResources(opCtx); - guard.dismiss(); - throw; - } catch (const ExceptionFor<ErrorCodes::WouldChangeOwningShard>&) { - txnParticipant.stashTransactionResources(opCtx); - txnParticipant.resetRetryableWriteState(opCtx); - guard.dismiss(); - throw; + _txnParticipant->stashTransactionResources(opCtx); + _guard->dismiss(); + } else if (status.code() == ErrorCodes::WouldChangeOwningShard) { + _txnParticipant->stashTransactionResources(opCtx); + _txnParticipant->resetRetryableWriteState(opCtx); + _guard->dismiss(); } +} +Future<void> InvokeCommand::SessionCheckoutPath::_commitInvocation() { + auto execContext = _parent->_ecd->getExecutionContext(); + auto replyBuilder = execContext->getReplyBuilder(); if (auto okField = replyBuilder->getBodyBuilder().asTempObj()["ok"]) { // If ok is present, use its truthiness. if (!okField.trueValue()) { @@ -909,12 +970,12 @@ Future<void> invokeWithSessionCheckedOut(std::shared_ptr<ExecCommandDatabase> ec } // Stash or commit the transaction when the command succeeds. - txnParticipant.stashTransactionResources(opCtx); - guard.dismiss(); + _txnParticipant->stashTransactionResources(execContext->getOpCtx()); + _guard->dismiss(); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer || serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - auto txnResponseMetadata = txnParticipant.getResponseMetadata(); + auto txnResponseMetadata = _txnParticipant->getResponseMetadata(); auto bodyBuilder = replyBuilder->getBodyBuilder(); txnResponseMetadata.serialize(&bodyBuilder); } @@ -1023,12 +1084,7 @@ Future<void> RunCommandImpl::_runCommand() { invariant(!_shouldWaitForWriteConcern); execContext->behaviors->uassertCommandDoesNotSpecifyWriteConcern( execContext->getRequest().body); - if (_shouldCheckOutSession) { - return invokeWithSessionCheckedOut(_ecd); - } else { - return invokeWithNoSession(_ecd); - } - return Status::OK(); + return std::make_shared<InvokeCommand>(_ecd)->run(_shouldCheckOutSession); } void RunCommandImpl::RunCommandAndWaitForWriteConcern::_waitForWriteConcern(BSONObjBuilder& bb) { @@ -1141,9 +1197,7 @@ Future<void> RunCommandImpl::RunCommandAndWaitForWriteConcern::_run() { _execContext->getReplyBuilder()->setCommandReply(errorBuilder.obj()); return Status::OK(); } - if (_rci->_shouldCheckOutSession) - return invokeWithSessionCheckedOut(_rci->_ecd); - return invokeWithNoSession(_rci->_ecd); + return std::make_shared<InvokeCommand>(_rci->_ecd)->run(_rci->_shouldCheckOutSession); } Future<void> RunCommandImpl::RunCommandAndWaitForWriteConcern::_onRunCompletion(Status status) { |