diff options
author | Amirsaman Memaripour <amirsaman.memaripour@mongodb.com> | 2020-11-18 00:38:58 +0000 |
---|---|---|
committer | Amirsaman Memaripour <amirsaman.memaripour@mongodb.com> | 2020-11-18 00:38:58 +0000 |
commit | 63338e0dd6aab76382ddc1034d0b6b2a885f06ad (patch) | |
tree | d0ea210eabe8091b06f3eb24574f0a7bdf2380a0 | |
parent | 13bb35d34dc20273d342e52360b89e6f510d1747 (diff) | |
download | mongo-63338e0dd6aab76382ddc1034d0b6b2a885f06ad.tar.gz |
SERVER-51690 Futurize and refactor Mongos execCommandClientfixed_thread_pool_handshaking
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 267 |
1 files changed, 155 insertions, 112 deletions
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index fdca7bf7ea6..8954dbaf3c9 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -157,30 +157,28 @@ void appendRequiredFieldsToResponse(OperationContext* opCtx, BSONObjBuilder* res /** * Invokes the given command and aborts the transaction on any non-retryable errors. */ -void invokeInTransactionRouter(OperationContext* opCtx, - const OpMsgRequest& request, - CommandInvocation* invocation, - rpc::ReplyBuilderInterface* result) { +Future<void> invokeInTransactionRouter(std::shared_ptr<RequestExecutionContext> rec, + std::shared_ptr<CommandInvocation> invocation) { + auto opCtx = rec->getOpCtx(); auto txnRouter = TransactionRouter::get(opCtx); invariant(txnRouter); // No-op if the transaction is not running with snapshot read concern. txnRouter.setDefaultAtClusterTime(opCtx); - try { - CommandHelpers::runCommandInvocation(opCtx, request, invocation, result); - } catch (const DBException& e) { - if (ErrorCodes::isSnapshotError(e.code()) || - ErrorCodes::isNeedRetargettingError(e.code()) || - e.code() == ErrorCodes::ShardInvalidatedForTargeting || - e.code() == ErrorCodes::StaleDbVersion) { - // Don't abort on possibly retryable errors. - throw; - } + return CommandHelpers::runCommandInvocationAsync(rec, std::move(invocation)) + .tapError([rec = std::move(rec)](Status status) { + if (auto code = status.code(); ErrorCodes::isSnapshotError(code) || + ErrorCodes::isNeedRetargettingError(code) || + code == ErrorCodes::ShardInvalidatedForTargeting || + code == ErrorCodes::StaleDbVersion) { + // Don't abort on possibly retryable errors. + return; + } - txnRouter.implicitlyAbortTransaction(opCtx, e.toStatus()); - throw; - } + auto opCtx = rec->getOpCtx(); + TransactionRouter::get(opCtx).implicitlyAbortTransaction(opCtx, status); + }); } /** @@ -194,99 +192,133 @@ void addContextForTransactionAbortingError(StringData txnIdAsString, txnIdAsString, latestStmtId, reason)); } -void execCommandClient(OperationContext* opCtx, - CommandInvocation* invocation, - const OpMsgRequest& request, - rpc::ReplyBuilderInterface* result) { - [&] { - const Command* c = invocation->definition(); - - const auto dbname = request.getDatabase(); - uassert(ErrorCodes::IllegalOperation, - "Can't use 'local' database through mongos", - dbname != NamespaceString::kLocalDb); - uassert( - ErrorCodes::InvalidNamespace, - str::stream() << "Invalid database name: '" << dbname << "'", - NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow)); +// Factory class to construct a future-chain that executes the invocation against the database. +class ExecCommandClient final : public std::enable_shared_from_this<ExecCommandClient> { +public: + ExecCommandClient(ExecCommandClient&&) = delete; + ExecCommandClient(const ExecCommandClient&) = delete; - StringMap<int> topLevelFields; - for (auto&& element : request.body) { - StringData fieldName = element.fieldNameStringData(); - if (fieldName == "help" && element.type() == Bool && element.Bool()) { - std::stringstream help; - help << "help for: " << c->getName() << " " << c->help(); - auto body = result->getBodyBuilder(); - body.append("help", help.str()); - CommandHelpers::appendSimpleCommandStatus(body, true, ""); - return; - } + ExecCommandClient(std::shared_ptr<RequestExecutionContext> rec, + std::shared_ptr<CommandInvocation> invocation) + : _rec(std::move(rec)), _invocation(std::move(invocation)) {} - uassert(ErrorCodes::FailedToParse, - str::stream() << "Parsed command object contains duplicate top level key: " - << fieldName, - topLevelFields[fieldName]++ == 0); - } + Future<void> run(); - try { - invocation->checkAuthorization(opCtx, request); - } catch (const DBException& e) { +private: + // Prepare the environment for running the invocation (e.g., checking authorization). + Status _prologue(); + + // Returns a future that runs the command invocation. + Future<void> _run(); + + // Any logic that must be done post command execution, unless an exception is thrown. + void _epilogue(); + + // Runs at the end of the future-chain returned by `run()` unless an exception, other than + // `ErrorCodes::SkipCommandExecution`, is thrown earlier. + void _onCompletion(); + + const std::shared_ptr<RequestExecutionContext> _rec; + const std::shared_ptr<CommandInvocation> _invocation; +}; + +Status ExecCommandClient::_prologue() { + auto opCtx = _rec->getOpCtx(); + auto result = _rec->getReplyBuilder(); + const auto& request = _rec->getRequest(); + const Command* c = _invocation->definition(); + + const auto dbname = request.getDatabase(); + uassert(ErrorCodes::IllegalOperation, + "Can't use 'local' database through mongos", + dbname != NamespaceString::kLocalDb); + uassert(ErrorCodes::InvalidNamespace, + "Invalid database name: '{}'"_format(dbname), + NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow)); + + StringMap<int> topLevelFields; + for (auto&& element : request.body) { + StringData fieldName = element.fieldNameStringData(); + if (fieldName == "help" && element.type() == Bool && element.Bool()) { auto body = result->getBodyBuilder(); - CommandHelpers::appendCommandStatusNoThrow(body, e.toStatus()); - return; + body.append("help", "help for: {} {}"_format(c->getName(), c->help())); + CommandHelpers::appendSimpleCommandStatus(body, true, ""); + return {ErrorCodes::SkipCommandExecution, "Already served help command"}; } - // attach tracking - rpc::TrackingMetadata trackingMetadata; - trackingMetadata.initWithOperName(c->getName()); - rpc::TrackingMetadata::get(opCtx) = trackingMetadata; + uassert(ErrorCodes::FailedToParse, + "Parsed command object contains duplicate top level key: {}"_format(fieldName), + topLevelFields[fieldName]++ == 0); + } - // Extract and process metadata from the command request body. - ReadPreferenceSetting::get(opCtx) = - uassertStatusOK(ReadPreferenceSetting::fromContainingBSON(request.body)); - VectorClock::get(opCtx)->gossipIn(opCtx, request.body, !c->requiresAuth()); + try { + _invocation->checkAuthorization(opCtx, request); + } catch (const DBException& e) { + auto body = result->getBodyBuilder(); + CommandHelpers::appendCommandStatusNoThrow(body, e.toStatus()); + return {ErrorCodes::SkipCommandExecution, "Failed to check authorization"}; + } - auto txnRouter = TransactionRouter::get(opCtx); - if (txnRouter) { - invokeInTransactionRouter(opCtx, request, invocation, result); - } else { - CommandHelpers::runCommandInvocation(opCtx, request, invocation, result); - } + // attach tracking + rpc::TrackingMetadata trackingMetadata; + trackingMetadata.initWithOperName(c->getName()); + rpc::TrackingMetadata::get(opCtx) = trackingMetadata; - if (invocation->supportsWriteConcern()) { - failCommand.executeIf( - [&](const BSONObj& data) { - result->getBodyBuilder().append(data["writeConcernError"]); - if (data.hasField(kErrorLabelsFieldName) && - data[kErrorLabelsFieldName].type() == Array) { - auto labels = data.getObjectField(kErrorLabelsFieldName).getOwned(); - if (!labels.isEmpty()) { - result->getBodyBuilder().append(kErrorLabelsFieldName, - BSONArray(labels)); - } + // Extract and process metadata from the command request body. + ReadPreferenceSetting::get(opCtx) = + uassertStatusOK(ReadPreferenceSetting::fromContainingBSON(request.body)); + VectorClock::get(opCtx)->gossipIn(opCtx, request.body, !c->requiresAuth()); + + return Status::OK(); +} + +Future<void> ExecCommandClient::_run() { + OperationContext* opCtx = _rec->getOpCtx(); + if (auto txnRouter = TransactionRouter::get(opCtx); txnRouter) { + return invokeInTransactionRouter(_rec, _invocation); + } else { + return CommandHelpers::runCommandInvocationAsync(_rec, _invocation); + } +} + +void ExecCommandClient::_epilogue() { + auto opCtx = _rec->getOpCtx(); + auto result = _rec->getReplyBuilder(); + if (_invocation->supportsWriteConcern()) { + failCommand.executeIf( + [&](const BSONObj& data) { + result->getBodyBuilder().append(data["writeConcernError"]); + if (data.hasField(kErrorLabelsFieldName) && + data[kErrorLabelsFieldName].type() == Array) { + auto labels = data.getObjectField(kErrorLabelsFieldName).getOwned(); + if (!labels.isEmpty()) { + result->getBodyBuilder().append(kErrorLabelsFieldName, BSONArray(labels)); } - }, - [&](const BSONObj& data) { - return CommandHelpers::shouldActivateFailCommandFailPoint( - data, invocation, opCtx->getClient()) && - data.hasField("writeConcernError"); - }); - } + } + }, + [&](const BSONObj& data) { + return CommandHelpers::shouldActivateFailCommandFailPoint( + data, _invocation.get(), opCtx->getClient()) && + data.hasField("writeConcernError"); + }); + } - auto body = result->getBodyBuilder(); + auto body = result->getBodyBuilder(); - bool ok = CommandHelpers::extractOrAppendOk(body); - if (!ok) { - c->incrementCommandsFailed(); + if (bool ok = CommandHelpers::extractOrAppendOk(body); !ok) { + const Command* c = _invocation->definition(); + c->incrementCommandsFailed(); - if (auto txnRouter = TransactionRouter::get(opCtx)) { - txnRouter.implicitlyAbortTransaction(opCtx, - getStatusFromCommandResult(body.asTempObj())); - } + if (auto txnRouter = TransactionRouter::get(opCtx)) { + txnRouter.implicitlyAbortTransaction(opCtx, + getStatusFromCommandResult(body.asTempObj())); } - }(); + } +} - auto body = result->getBodyBuilder(); +void ExecCommandClient::_onCompletion() { + auto opCtx = _rec->getOpCtx(); + auto body = _rec->getReplyBuilder()->getBodyBuilder(); appendRequiredFieldsToResponse(opCtx, &body); auto seCtx = transport::ServiceExecutorContext::get(opCtx->getClient()); @@ -295,13 +327,29 @@ void execCommandClient(OperationContext* opCtx, return; } - if (!invocation->isSafeForBorrowedThreads()) { - // If the last command wasn't safe for a borrowed thread, then let's move - // off of it. + if (!_invocation->isSafeForBorrowedThreads()) { + // If the last command wasn't safe for a borrowed thread, then let's move off of it. seCtx->setThreadingModel(transport::ServiceExecutor::ThreadingModel::kDedicated); } } +Future<void> ExecCommandClient::run() { + auto pf = makePromiseFuture<void>(); + auto future = std::move(pf.future) + .then([this, anchor = shared_from_this()] { return _prologue(); }) + .then([this, anchor = shared_from_this()] { return _run(); }) + .then([this, anchor = shared_from_this()] { _epilogue(); }) + .onCompletion([this, anchor = shared_from_this()](Status status) { + if (!status.isOK() && status.code() != ErrorCodes::SkipCommandExecution) + return status; // Execution was interrupted due to an error. + + _onCompletion(); + return Status::OK(); + }); + pf.promise.emplaceValue(); + return future; +} + MONGO_FAIL_POINT_DEFINE(doNotRefreshShardsOnRetargettingError); /** @@ -772,20 +820,15 @@ void ParseAndRunCommand::RunAndRetry::_setup() { _parc->_rec->getReplyBuilder()->reset(); } -Future<void> ParseAndRunCommand::RunAndRetry::_run() try { - auto opCtx = _parc->_rec->getOpCtx(); - auto replyBuilder = _parc->_rec->getReplyBuilder(); - - execCommandClient(opCtx, _parc->_invocation.get(), _parc->_rec->getRequest(), replyBuilder); - - auto responseBuilder = replyBuilder->getBodyBuilder(); - if (auto txnRouter = TransactionRouter::get(opCtx)) { - txnRouter.appendRecoveryToken(&responseBuilder); - } - - return Future<void>::makeReady(Status::OK()); -} catch (const DBException& ex) { - return ex.toStatus(); +Future<void> ParseAndRunCommand::RunAndRetry::_run() { + auto ecc = std::make_shared<ExecCommandClient>(_parc->_rec, _parc->_invocation); + return ecc->run().then([rec = _parc->_rec] { + auto opCtx = rec->getOpCtx(); + auto responseBuilder = rec->getReplyBuilder()->getBodyBuilder(); + if (auto txnRouter = TransactionRouter::get(opCtx)) { + txnRouter.appendRecoveryToken(&responseBuilder); + } + }); } void ParseAndRunCommand::RunAndRetry::_checkRetryForTransaction(Status& status) { |