summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-11-18 00:38:58 +0000
committerAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-11-18 00:38:58 +0000
commit63338e0dd6aab76382ddc1034d0b6b2a885f06ad (patch)
treed0ea210eabe8091b06f3eb24574f0a7bdf2380a0
parent13bb35d34dc20273d342e52360b89e6f510d1747 (diff)
downloadmongo-fixed_thread_pool_handshaking.tar.gz
SERVER-51690 Futurize and refactor Mongos execCommandClientfixed_thread_pool_handshaking
-rw-r--r--src/mongo/s/commands/strategy.cpp267
1 files changed, 155 insertions, 112 deletions
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index fdca7bf7ea6..8954dbaf3c9 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -157,30 +157,28 @@ void appendRequiredFieldsToResponse(OperationContext* opCtx, BSONObjBuilder* res
/**
* Invokes the given command and aborts the transaction on any non-retryable errors.
*/
-void invokeInTransactionRouter(OperationContext* opCtx,
- const OpMsgRequest& request,
- CommandInvocation* invocation,
- rpc::ReplyBuilderInterface* result) {
+Future<void> invokeInTransactionRouter(std::shared_ptr<RequestExecutionContext> rec,
+ std::shared_ptr<CommandInvocation> invocation) {
+ auto opCtx = rec->getOpCtx();
auto txnRouter = TransactionRouter::get(opCtx);
invariant(txnRouter);
// No-op if the transaction is not running with snapshot read concern.
txnRouter.setDefaultAtClusterTime(opCtx);
- try {
- CommandHelpers::runCommandInvocation(opCtx, request, invocation, result);
- } catch (const DBException& e) {
- if (ErrorCodes::isSnapshotError(e.code()) ||
- ErrorCodes::isNeedRetargettingError(e.code()) ||
- e.code() == ErrorCodes::ShardInvalidatedForTargeting ||
- e.code() == ErrorCodes::StaleDbVersion) {
- // Don't abort on possibly retryable errors.
- throw;
- }
+ return CommandHelpers::runCommandInvocationAsync(rec, std::move(invocation))
+ .tapError([rec = std::move(rec)](Status status) {
+ if (auto code = status.code(); ErrorCodes::isSnapshotError(code) ||
+ ErrorCodes::isNeedRetargettingError(code) ||
+ code == ErrorCodes::ShardInvalidatedForTargeting ||
+ code == ErrorCodes::StaleDbVersion) {
+ // Don't abort on possibly retryable errors.
+ return;
+ }
- txnRouter.implicitlyAbortTransaction(opCtx, e.toStatus());
- throw;
- }
+ auto opCtx = rec->getOpCtx();
+ TransactionRouter::get(opCtx).implicitlyAbortTransaction(opCtx, status);
+ });
}
/**
@@ -194,99 +192,133 @@ void addContextForTransactionAbortingError(StringData txnIdAsString,
txnIdAsString, latestStmtId, reason));
}
-void execCommandClient(OperationContext* opCtx,
- CommandInvocation* invocation,
- const OpMsgRequest& request,
- rpc::ReplyBuilderInterface* result) {
- [&] {
- const Command* c = invocation->definition();
-
- const auto dbname = request.getDatabase();
- uassert(ErrorCodes::IllegalOperation,
- "Can't use 'local' database through mongos",
- dbname != NamespaceString::kLocalDb);
- uassert(
- ErrorCodes::InvalidNamespace,
- str::stream() << "Invalid database name: '" << dbname << "'",
- NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow));
+// Factory class to construct a future-chain that executes the invocation against the database.
+class ExecCommandClient final : public std::enable_shared_from_this<ExecCommandClient> {
+public:
+ ExecCommandClient(ExecCommandClient&&) = delete;
+ ExecCommandClient(const ExecCommandClient&) = delete;
- StringMap<int> topLevelFields;
- for (auto&& element : request.body) {
- StringData fieldName = element.fieldNameStringData();
- if (fieldName == "help" && element.type() == Bool && element.Bool()) {
- std::stringstream help;
- help << "help for: " << c->getName() << " " << c->help();
- auto body = result->getBodyBuilder();
- body.append("help", help.str());
- CommandHelpers::appendSimpleCommandStatus(body, true, "");
- return;
- }
+ ExecCommandClient(std::shared_ptr<RequestExecutionContext> rec,
+ std::shared_ptr<CommandInvocation> invocation)
+ : _rec(std::move(rec)), _invocation(std::move(invocation)) {}
- uassert(ErrorCodes::FailedToParse,
- str::stream() << "Parsed command object contains duplicate top level key: "
- << fieldName,
- topLevelFields[fieldName]++ == 0);
- }
+ Future<void> run();
- try {
- invocation->checkAuthorization(opCtx, request);
- } catch (const DBException& e) {
+private:
+ // Prepare the environment for running the invocation (e.g., checking authorization).
+ Status _prologue();
+
+ // Returns a future that runs the command invocation.
+ Future<void> _run();
+
+ // Any logic that must be done post command execution, unless an exception is thrown.
+ void _epilogue();
+
+ // Runs at the end of the future-chain returned by `run()` unless an exception, other than
+ // `ErrorCodes::SkipCommandExecution`, is thrown earlier.
+ void _onCompletion();
+
+ const std::shared_ptr<RequestExecutionContext> _rec;
+ const std::shared_ptr<CommandInvocation> _invocation;
+};
+
+Status ExecCommandClient::_prologue() {
+ auto opCtx = _rec->getOpCtx();
+ auto result = _rec->getReplyBuilder();
+ const auto& request = _rec->getRequest();
+ const Command* c = _invocation->definition();
+
+ const auto dbname = request.getDatabase();
+ uassert(ErrorCodes::IllegalOperation,
+ "Can't use 'local' database through mongos",
+ dbname != NamespaceString::kLocalDb);
+ uassert(ErrorCodes::InvalidNamespace,
+ "Invalid database name: '{}'"_format(dbname),
+ NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow));
+
+ StringMap<int> topLevelFields;
+ for (auto&& element : request.body) {
+ StringData fieldName = element.fieldNameStringData();
+ if (fieldName == "help" && element.type() == Bool && element.Bool()) {
auto body = result->getBodyBuilder();
- CommandHelpers::appendCommandStatusNoThrow(body, e.toStatus());
- return;
+ body.append("help", "help for: {} {}"_format(c->getName(), c->help()));
+ CommandHelpers::appendSimpleCommandStatus(body, true, "");
+ return {ErrorCodes::SkipCommandExecution, "Already served help command"};
}
- // attach tracking
- rpc::TrackingMetadata trackingMetadata;
- trackingMetadata.initWithOperName(c->getName());
- rpc::TrackingMetadata::get(opCtx) = trackingMetadata;
+ uassert(ErrorCodes::FailedToParse,
+ "Parsed command object contains duplicate top level key: {}"_format(fieldName),
+ topLevelFields[fieldName]++ == 0);
+ }
- // Extract and process metadata from the command request body.
- ReadPreferenceSetting::get(opCtx) =
- uassertStatusOK(ReadPreferenceSetting::fromContainingBSON(request.body));
- VectorClock::get(opCtx)->gossipIn(opCtx, request.body, !c->requiresAuth());
+ try {
+ _invocation->checkAuthorization(opCtx, request);
+ } catch (const DBException& e) {
+ auto body = result->getBodyBuilder();
+ CommandHelpers::appendCommandStatusNoThrow(body, e.toStatus());
+ return {ErrorCodes::SkipCommandExecution, "Failed to check authorization"};
+ }
- auto txnRouter = TransactionRouter::get(opCtx);
- if (txnRouter) {
- invokeInTransactionRouter(opCtx, request, invocation, result);
- } else {
- CommandHelpers::runCommandInvocation(opCtx, request, invocation, result);
- }
+ // attach tracking
+ rpc::TrackingMetadata trackingMetadata;
+ trackingMetadata.initWithOperName(c->getName());
+ rpc::TrackingMetadata::get(opCtx) = trackingMetadata;
- if (invocation->supportsWriteConcern()) {
- failCommand.executeIf(
- [&](const BSONObj& data) {
- result->getBodyBuilder().append(data["writeConcernError"]);
- if (data.hasField(kErrorLabelsFieldName) &&
- data[kErrorLabelsFieldName].type() == Array) {
- auto labels = data.getObjectField(kErrorLabelsFieldName).getOwned();
- if (!labels.isEmpty()) {
- result->getBodyBuilder().append(kErrorLabelsFieldName,
- BSONArray(labels));
- }
+ // Extract and process metadata from the command request body.
+ ReadPreferenceSetting::get(opCtx) =
+ uassertStatusOK(ReadPreferenceSetting::fromContainingBSON(request.body));
+ VectorClock::get(opCtx)->gossipIn(opCtx, request.body, !c->requiresAuth());
+
+ return Status::OK();
+}
+
+Future<void> ExecCommandClient::_run() {
+ OperationContext* opCtx = _rec->getOpCtx();
+ if (auto txnRouter = TransactionRouter::get(opCtx); txnRouter) {
+ return invokeInTransactionRouter(_rec, _invocation);
+ } else {
+ return CommandHelpers::runCommandInvocationAsync(_rec, _invocation);
+ }
+}
+
+void ExecCommandClient::_epilogue() {
+ auto opCtx = _rec->getOpCtx();
+ auto result = _rec->getReplyBuilder();
+ if (_invocation->supportsWriteConcern()) {
+ failCommand.executeIf(
+ [&](const BSONObj& data) {
+ result->getBodyBuilder().append(data["writeConcernError"]);
+ if (data.hasField(kErrorLabelsFieldName) &&
+ data[kErrorLabelsFieldName].type() == Array) {
+ auto labels = data.getObjectField(kErrorLabelsFieldName).getOwned();
+ if (!labels.isEmpty()) {
+ result->getBodyBuilder().append(kErrorLabelsFieldName, BSONArray(labels));
}
- },
- [&](const BSONObj& data) {
- return CommandHelpers::shouldActivateFailCommandFailPoint(
- data, invocation, opCtx->getClient()) &&
- data.hasField("writeConcernError");
- });
- }
+ }
+ },
+ [&](const BSONObj& data) {
+ return CommandHelpers::shouldActivateFailCommandFailPoint(
+ data, _invocation.get(), opCtx->getClient()) &&
+ data.hasField("writeConcernError");
+ });
+ }
- auto body = result->getBodyBuilder();
+ auto body = result->getBodyBuilder();
- bool ok = CommandHelpers::extractOrAppendOk(body);
- if (!ok) {
- c->incrementCommandsFailed();
+ if (bool ok = CommandHelpers::extractOrAppendOk(body); !ok) {
+ const Command* c = _invocation->definition();
+ c->incrementCommandsFailed();
- if (auto txnRouter = TransactionRouter::get(opCtx)) {
- txnRouter.implicitlyAbortTransaction(opCtx,
- getStatusFromCommandResult(body.asTempObj()));
- }
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ txnRouter.implicitlyAbortTransaction(opCtx,
+ getStatusFromCommandResult(body.asTempObj()));
}
- }();
+ }
+}
- auto body = result->getBodyBuilder();
+void ExecCommandClient::_onCompletion() {
+ auto opCtx = _rec->getOpCtx();
+ auto body = _rec->getReplyBuilder()->getBodyBuilder();
appendRequiredFieldsToResponse(opCtx, &body);
auto seCtx = transport::ServiceExecutorContext::get(opCtx->getClient());
@@ -295,13 +327,29 @@ void execCommandClient(OperationContext* opCtx,
return;
}
- if (!invocation->isSafeForBorrowedThreads()) {
- // If the last command wasn't safe for a borrowed thread, then let's move
- // off of it.
+ if (!_invocation->isSafeForBorrowedThreads()) {
+ // If the last command wasn't safe for a borrowed thread, then let's move off of it.
seCtx->setThreadingModel(transport::ServiceExecutor::ThreadingModel::kDedicated);
}
}
+Future<void> ExecCommandClient::run() {
+ auto pf = makePromiseFuture<void>();
+ auto future = std::move(pf.future)
+ .then([this, anchor = shared_from_this()] { return _prologue(); })
+ .then([this, anchor = shared_from_this()] { return _run(); })
+ .then([this, anchor = shared_from_this()] { _epilogue(); })
+ .onCompletion([this, anchor = shared_from_this()](Status status) {
+ if (!status.isOK() && status.code() != ErrorCodes::SkipCommandExecution)
+ return status; // Execution was interrupted due to an error.
+
+ _onCompletion();
+ return Status::OK();
+ });
+ pf.promise.emplaceValue();
+ return future;
+}
+
MONGO_FAIL_POINT_DEFINE(doNotRefreshShardsOnRetargettingError);
/**
@@ -772,20 +820,15 @@ void ParseAndRunCommand::RunAndRetry::_setup() {
_parc->_rec->getReplyBuilder()->reset();
}
-Future<void> ParseAndRunCommand::RunAndRetry::_run() try {
- auto opCtx = _parc->_rec->getOpCtx();
- auto replyBuilder = _parc->_rec->getReplyBuilder();
-
- execCommandClient(opCtx, _parc->_invocation.get(), _parc->_rec->getRequest(), replyBuilder);
-
- auto responseBuilder = replyBuilder->getBodyBuilder();
- if (auto txnRouter = TransactionRouter::get(opCtx)) {
- txnRouter.appendRecoveryToken(&responseBuilder);
- }
-
- return Future<void>::makeReady(Status::OK());
-} catch (const DBException& ex) {
- return ex.toStatus();
+Future<void> ParseAndRunCommand::RunAndRetry::_run() {
+ auto ecc = std::make_shared<ExecCommandClient>(_parc->_rec, _parc->_invocation);
+ return ecc->run().then([rec = _parc->_rec] {
+ auto opCtx = rec->getOpCtx();
+ auto responseBuilder = rec->getReplyBuilder()->getBodyBuilder();
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ txnRouter.appendRecoveryToken(&responseBuilder);
+ }
+ });
}
void ParseAndRunCommand::RunAndRetry::_checkRetryForTransaction(Status& status) {