summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-10-14 17:34:02 +0000
committerAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-10-14 17:38:12 +0000
commitb03c93d55baefc8a70a6ee790f1d497fd1ff8b70 (patch)
treeceb757df180fac5519145c761a61c57882f655ed
parent1d6af89487957dfa75835b60f4c09e8c3cdf5cd5 (diff)
downloadmongo-b03c93d55baefc8a70a6ee790f1d497fd1ff8b70.tar.gz
SERVER-49107 Futurize and refactor command invocation
-rw-r--r--src/mongo/db/service_entry_point_common.cpp222
1 files changed, 138 insertions, 84 deletions
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 58aec2a488b..6b17d5fff44 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -568,28 +568,6 @@ void appendErrorLabelsAndTopologyVersion(OperationContext* opCtx,
topologyVersion.serialize(&topologyVersionBuilder);
}
-void _abortUnpreparedOrStashPreparedTransaction(
- OperationContext* opCtx, TransactionParticipant::Participant* txnParticipant) {
- const bool isPrepared = txnParticipant->transactionIsPrepared();
- try {
- if (isPrepared)
- txnParticipant->stashTransactionResources(opCtx);
- else if (txnParticipant->transactionIsOpen())
- txnParticipant->abortTransaction(opCtx);
- } catch (...) {
- // It is illegal for this to throw so we catch and log this here for diagnosability.
- LOGV2_FATAL_CONTINUE(21974,
- "Caught exception during transaction {txnNumber} {operation} "
- "{logicalSessionId}: {error}",
- "Unable to stash/abort transaction",
- "operation"_attr = (isPrepared ? "stash" : "abort"),
- "txnNumber"_attr = opCtx->getTxnNumber(),
- "logicalSessionId"_attr = opCtx->getLogicalSessionId()->toBSON(),
- "error"_attr = exceptionToStatus());
- std::terminate();
- }
-}
-
class ExecCommandDatabase : public std::enable_shared_from_this<ExecCommandDatabase> {
public:
explicit ExecCommandDatabase(std::shared_ptr<HandleRequest::ExecutionContext> execContext)
@@ -746,33 +724,99 @@ private:
const bool _shouldWaitForWriteConcern;
};
-Future<void> invokeWithNoSession(std::shared_ptr<ExecCommandDatabase> ecd) {
- auto execContext = ecd->getExecutionContext();
- OperationContext* opCtx = execContext->getOpCtx();
- const OpMsgRequest& request = execContext->getRequest();
- CommandInvocation* invocation = ecd->getInvocation().get();
- rpc::ReplyBuilderInterface* replyBuilder = execContext->getReplyBuilder();
- tenant_migration_donor::migrationConflictHandler(
- opCtx,
- request.getDatabase(),
- [&] { CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); },
- replyBuilder);
- return Status::OK();
+// Simplifies the interface for invoking commands and allows asynchronous execution of command
+// invocations.
+class InvokeCommand : public std::enable_shared_from_this<InvokeCommand> {
+public:
+ explicit InvokeCommand(std::shared_ptr<ExecCommandDatabase> ecd) : _ecd(std::move(ecd)) {}
+
+ Future<void> run(const bool checkoutSession);
+
+private:
+ class SessionCheckoutPath;
+
+ Future<void> _runInvocation();
+
+ const std::shared_ptr<ExecCommandDatabase> _ecd;
+};
+
+class InvokeCommand::SessionCheckoutPath
+ : public std::enable_shared_from_this<InvokeCommand::SessionCheckoutPath> {
+public:
+ SessionCheckoutPath(std::shared_ptr<InvokeCommand> parent) : _parent(std::move(parent)) {}
+
+ Future<void> run();
+
+private:
+ void _cleanupIncompleteTxn();
+
+ Future<void> _checkOutSession();
+ void _tapError(Status);
+ Future<void> _commitInvocation();
+
+ const std::shared_ptr<InvokeCommand> _parent;
+
+ std::unique_ptr<MongoDOperationContextSession> _sessionTxnState;
+ boost::optional<TransactionParticipant::Participant> _txnParticipant;
+ boost::optional<ScopeGuard<std::function<void()>>> _guard;
+};
+
+Future<void> InvokeCommand::run(const bool checkoutSession) {
+ auto [past, present] = makePromiseFuture<void>();
+ auto future = std::move(present).then([this, checkoutSession, anchor = shared_from_this()] {
+ if (checkoutSession)
+ return std::make_shared<SessionCheckoutPath>(std::move(anchor))->run();
+ return _runInvocation();
+ });
+ past.emplaceValue();
+ return future;
}
-Future<void> invokeWithSessionCheckedOut(std::shared_ptr<ExecCommandDatabase> ecd) {
+Future<void> InvokeCommand::SessionCheckoutPath::run() {
+ auto anchor = shared_from_this();
+ return makeReadyFutureWith([] {})
+ .then([this, anchor] { return _checkOutSession(); })
+ .then([this, anchor] {
+ return _parent->_runInvocation().tapError(
+ [this, anchor](Status status) { return _tapError(std::move(status)); });
+ })
+ .then([this, anchor] { return _commitInvocation(); });
+}
+
+void InvokeCommand::SessionCheckoutPath::_cleanupIncompleteTxn() {
+ auto opCtx = _parent->_ecd->getExecutionContext()->getOpCtx();
+ const bool isPrepared = _txnParticipant->transactionIsPrepared();
+ try {
+ if (isPrepared)
+ _txnParticipant->stashTransactionResources(opCtx);
+ else if (_txnParticipant->transactionIsOpen())
+ _txnParticipant->abortTransaction(opCtx);
+ } catch (...) {
+ // It is illegal for this to throw so we catch and log this here for diagnosability.
+ LOGV2_FATAL_CONTINUE(21974,
+ "Caught exception during transaction {txnNumber} {operation} "
+ "{logicalSessionId}: {error}",
+ "Unable to stash/abort transaction",
+ "operation"_attr = (isPrepared ? "stash" : "abort"),
+ "txnNumber"_attr = opCtx->getTxnNumber(),
+ "logicalSessionId"_attr = opCtx->getLogicalSessionId()->toBSON(),
+ "error"_attr = exceptionToStatus());
+ std::terminate();
+ }
+}
+
+Future<void> InvokeCommand::SessionCheckoutPath::_checkOutSession() {
+ auto ecd = _parent->_ecd;
auto execContext = ecd->getExecutionContext();
- OperationContext* opCtx = execContext->getOpCtx();
- const OpMsgRequest& request = execContext->getRequest();
+ auto opCtx = execContext->getOpCtx();
CommandInvocation* invocation = ecd->getInvocation().get();
const OperationSessionInfoFromClient& sessionOptions = ecd->getSessionOptions();
- rpc::ReplyBuilderInterface* replyBuilder = execContext->getReplyBuilder();
// This constructor will check out the session. It handles the appropriate state management
// for both multi-statement transactions and retryable writes. Currently, only requests with
// a transaction number will check out the session.
- MongoDOperationContextSession sessionTxnState(opCtx);
- auto txnParticipant = TransactionParticipant::get(opCtx);
+ _sessionTxnState = std::make_unique<MongoDOperationContextSession>(opCtx);
+ _txnParticipant.emplace(TransactionParticipant::get(opCtx));
if (!opCtx->getClient()->isInDirectClient()) {
bool beganOrContinuedTxn{false};
@@ -780,13 +824,13 @@ Future<void> invokeWithSessionCheckedOut(std::shared_ptr<ExecCommandDatabase> ec
// transaction on that session.
while (!beganOrContinuedTxn) {
try {
- txnParticipant.beginOrContinue(opCtx,
- *sessionOptions.getTxnNumber(),
- sessionOptions.getAutocommit(),
- sessionOptions.getStartTransaction());
+ _txnParticipant->beginOrContinue(opCtx,
+ *sessionOptions.getTxnNumber(),
+ sessionOptions.getAutocommit(),
+ sessionOptions.getStartTransaction());
beganOrContinuedTxn = true;
} catch (const ExceptionFor<ErrorCodes::PreparedTransactionInProgress>&) {
- auto prepareCompleted = txnParticipant.onExitPrepare();
+ auto prepareCompleted = _txnParticipant->onExitPrepare();
CurOpFailpointHelpers::waitWhileFailPointEnabled(
&waitAfterNewStatementBlocksBehindPrepare,
@@ -818,21 +862,19 @@ Future<void> invokeWithSessionCheckedOut(std::shared_ptr<ExecCommandDatabase> ec
// transactions on failure to unstash the transaction resources to opCtx. We don't want to
// have this error guard for beginOrContinue as it can abort the transaction for any
// accidental invalid statements in the transaction.
- auto abortOnError = makeGuard([&txnParticipant, opCtx] {
- if (txnParticipant.transactionIsInProgress()) {
- txnParticipant.abortTransaction(opCtx);
+ auto abortOnError = makeGuard([&] {
+ if (_txnParticipant->transactionIsInProgress()) {
+ _txnParticipant->abortTransaction(opCtx);
}
});
- txnParticipant.unstashTransactionResources(opCtx, invocation->definition()->getName());
+ _txnParticipant->unstashTransactionResources(opCtx, invocation->definition()->getName());
// Unstash success.
abortOnError.dismiss();
}
- auto guard = makeGuard([opCtx, &txnParticipant] {
- _abortUnpreparedOrStashPreparedTransaction(opCtx, &txnParticipant);
- });
+ _guard.emplace([this] { _cleanupIncompleteTxn(); });
if (!opCtx->getClient()->isInDirectClient()) {
const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
@@ -857,23 +899,40 @@ Future<void> invokeWithSessionCheckedOut(std::shared_ptr<ExecCommandDatabase> ec
(cmdName == "create"_sd || cmdName == "createIndexes"_sd)) {
if (!readConcernSupport.readConcernSupport.isOK()) {
uassertStatusOK(readConcernSupport.readConcernSupport.withContext(
- str::stream() << "Command " << cmdName
- << " does not support this transaction's "
- << readConcernArgs.toString()));
+ "Command {} does not support this transaction's {}"_format(
+ cmdName, readConcernArgs.toString())));
}
}
}
// Use the API parameters that were stored when the transaction was initiated.
- APIParameters::get(opCtx) = txnParticipant.getAPIParameters(opCtx);
+ APIParameters::get(opCtx) = _txnParticipant->getAPIParameters(opCtx);
- try {
- tenant_migration_donor::migrationConflictHandler(
- opCtx,
- request.getDatabase(),
- [&] { CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); },
- replyBuilder);
- } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>&) {
+ return Status::OK();
+}
+
+Future<void> InvokeCommand::_runInvocation() try {
+ auto execContext = _ecd->getExecutionContext();
+ OperationContext* opCtx = execContext->getOpCtx();
+ const OpMsgRequest& request = execContext->getRequest();
+ CommandInvocation* invocation = _ecd->getInvocation().get();
+ rpc::ReplyBuilderInterface* replyBuilder = execContext->getReplyBuilder();
+
+ tenant_migration_donor::migrationConflictHandler(
+ opCtx,
+ request.getDatabase(),
+ [&] { CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); },
+ replyBuilder);
+
+ return Status::OK();
+} catch (const DBException& ex) {
+ return ex.toStatus();
+}
+
+void InvokeCommand::SessionCheckoutPath::_tapError(Status status) {
+ auto opCtx = _parent->_ecd->getExecutionContext()->getOpCtx();
+ const OperationSessionInfoFromClient& sessionOptions = _parent->_ecd->getSessionOptions();
+ if (status.code() == ErrorCodes::CommandOnShardedViewNotSupportedOnMongod) {
// Exceptions are used to resolve views in a sharded cluster, so they should be handled
// specially to avoid unnecessary aborts.
@@ -886,21 +945,23 @@ Future<void> invokeWithSessionCheckedOut(std::shared_ptr<ExecCommandDatabase> ec
// avoid leaving it orphaned in this case, which is fine even if it is re-targeted
// because the retry will include "startTransaction" again and "restart" a transaction
// at the active txnNumber.
- throw;
+ return;
}
// If this shard has completed an earlier statement for this transaction, it must already be
// in the transaction's participant list, so it is guaranteed to learn its outcome.
- txnParticipant.stashTransactionResources(opCtx);
- guard.dismiss();
- throw;
- } catch (const ExceptionFor<ErrorCodes::WouldChangeOwningShard>&) {
- txnParticipant.stashTransactionResources(opCtx);
- txnParticipant.resetRetryableWriteState(opCtx);
- guard.dismiss();
- throw;
+ _txnParticipant->stashTransactionResources(opCtx);
+ _guard->dismiss();
+ } else if (status.code() == ErrorCodes::WouldChangeOwningShard) {
+ _txnParticipant->stashTransactionResources(opCtx);
+ _txnParticipant->resetRetryableWriteState(opCtx);
+ _guard->dismiss();
}
+}
+Future<void> InvokeCommand::SessionCheckoutPath::_commitInvocation() {
+ auto execContext = _parent->_ecd->getExecutionContext();
+ auto replyBuilder = execContext->getReplyBuilder();
if (auto okField = replyBuilder->getBodyBuilder().asTempObj()["ok"]) {
// If ok is present, use its truthiness.
if (!okField.trueValue()) {
@@ -909,12 +970,12 @@ Future<void> invokeWithSessionCheckedOut(std::shared_ptr<ExecCommandDatabase> ec
}
// Stash or commit the transaction when the command succeeds.
- txnParticipant.stashTransactionResources(opCtx);
- guard.dismiss();
+ _txnParticipant->stashTransactionResources(execContext->getOpCtx());
+ _guard->dismiss();
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer ||
serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
- auto txnResponseMetadata = txnParticipant.getResponseMetadata();
+ auto txnResponseMetadata = _txnParticipant->getResponseMetadata();
auto bodyBuilder = replyBuilder->getBodyBuilder();
txnResponseMetadata.serialize(&bodyBuilder);
}
@@ -1023,12 +1084,7 @@ Future<void> RunCommandImpl::_runCommand() {
invariant(!_shouldWaitForWriteConcern);
execContext->behaviors->uassertCommandDoesNotSpecifyWriteConcern(
execContext->getRequest().body);
- if (_shouldCheckOutSession) {
- return invokeWithSessionCheckedOut(_ecd);
- } else {
- return invokeWithNoSession(_ecd);
- }
- return Status::OK();
+ return std::make_shared<InvokeCommand>(_ecd)->run(_shouldCheckOutSession);
}
void RunCommandImpl::RunCommandAndWaitForWriteConcern::_waitForWriteConcern(BSONObjBuilder& bb) {
@@ -1141,9 +1197,7 @@ Future<void> RunCommandImpl::RunCommandAndWaitForWriteConcern::_run() {
_execContext->getReplyBuilder()->setCommandReply(errorBuilder.obj());
return Status::OK();
}
- if (_rci->_shouldCheckOutSession)
- return invokeWithSessionCheckedOut(_rci->_ecd);
- return invokeWithNoSession(_rci->_ecd);
+ return std::make_shared<InvokeCommand>(_rci->_ecd)->run(_rci->_shouldCheckOutSession);
}
Future<void> RunCommandImpl::RunCommandAndWaitForWriteConcern::_onRunCompletion(Status status) {