summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-11-05 02:21:26 +0000
committerAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-11-05 02:21:26 +0000
commitf4373b85f0f394c485cbff447312c567924ba5e3 (patch)
tree59d7311cf50bac0590522722ec1fe78f03230197
parent51067e095275ba5a2454be993c83a93e526e18ce (diff)
downloadmongo-f4373b85f0f394c485cbff447312c567924ba5e3.tar.gz
SERVER-51690 Futurize clientCommand to support async command execution
-rw-r--r--src/mongo/s/commands/strategy.cpp265
-rw-r--r--src/mongo/s/commands/strategy.h2
-rw-r--r--src/mongo/s/service_entry_point_mongos.cpp2
3 files changed, 154 insertions, 115 deletions
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index eb386f3763b..d294ee93cf7 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -308,11 +308,12 @@ MONGO_FAIL_POINT_DEFINE(doNotRefreshShardsOnRetargettingError);
* Executes the command for the given request, and appends the result to replyBuilder
* and error labels, if any, to errorBuilder.
*/
-void runCommand(OperationContext* opCtx,
- const OpMsgRequest& request,
- const Message& m,
- rpc::ReplyBuilderInterface* replyBuilder,
- BSONObjBuilder* errorBuilder) {
+Future<void> runCommand(std::shared_ptr<RequestExecutionContext> rec,
+ std::shared_ptr<BSONObjBuilder> errorBuilder) try {
+ auto opCtx = rec->getOpCtx();
+ const auto& request = rec->getRequest();
+ const auto& m = rec->getMessage();
+ auto replyBuilder = rec->getReplyBuilder();
auto const opType = m.operation();
auto const commandName = request.getCommandName();
auto const command = CommandHelpers::findCommand(commandName);
@@ -323,7 +324,7 @@ void runCommand(OperationContext* opCtx,
{ErrorCodes::CommandNotFound, str::stream() << "no such cmd: " << commandName});
globalCommandRegistry()->incrementUnknownCommands();
appendRequiredFieldsToResponse(opCtx, &builder);
- return;
+ return Status::OK();
}
opCtx->setExhaust(OpMsg::isFlagSet(m, OpMsg::kExhaustSupported));
@@ -400,7 +401,7 @@ void runCommand(OperationContext* opCtx,
if (!readConcernParseStatus.isOK()) {
auto builder = replyBuilder->getBodyBuilder();
CommandHelpers::appendCommandStatusNoThrow(builder, readConcernParseStatus);
- return;
+ return Status::OK();
}
auto& apiParams = APIParameters::get(opCtx);
@@ -451,7 +452,7 @@ void runCommand(OperationContext* opCtx,
CommandHelpers::appendCommandStatusNoThrow(
responseBuilder,
Status(ErrorCodes::InvalidOptions, "Command does not support writeConcern"));
- return;
+ return Status::OK();
}
bool clientSuppliedWriteConcern = !wc.usedDefault;
@@ -489,7 +490,7 @@ void runCommand(OperationContext* opCtx,
Status{ErrorCodes::InvalidOptions,
"writeConcern provenance must be unset or \"{}\""_format(
ReadWriteConcernProvenance::kClientSupplied)});
- return;
+ return Status::OK();
}
// If the client didn't provide a provenance, then an appropriate value needs to be
@@ -553,7 +554,7 @@ void runCommand(OperationContext* opCtx,
Status{ErrorCodes::InvalidOptions,
"readConcern provenance must be unset or \"{}\""_format(
ReadWriteConcernProvenance::kClientSupplied)});
- return;
+ return Status::OK();
}
// If the client didn't provide a provenance, then an appropriate value needs to be
@@ -588,7 +589,7 @@ void runCommand(OperationContext* opCtx,
{ErrorCodes::InvalidOptions,
"The readConcern level must be either 'local' (default), 'majority' or "
"'snapshot' in order to run in a transaction"});
- return;
+ return Status::OK();
}
if (readConcernArgs.getArgsOpTime()) {
auto responseBuilder = replyBuilder->getBodyBuilder();
@@ -598,7 +599,7 @@ void runCommand(OperationContext* opCtx,
str::stream()
<< "The readConcern cannot specify '"
<< repl::ReadConcernArgs::kAfterOpTimeFieldName << "' in a transaction"});
- return;
+ return Status::OK();
}
}
@@ -618,7 +619,7 @@ void runCommand(OperationContext* opCtx,
readConcernSupport.readConcernSupport.withContext(
str::stream() << "Command " << invocation->definition()->getName()
<< " does not support " << readConcernArgs.toString()));
- return;
+ return Status::OK();
}
}
@@ -681,7 +682,7 @@ void runCommand(OperationContext* opCtx,
txnRouter.appendRecoveryToken(&responseBuilder);
}
- return;
+ return Status::OK();
} catch (ShardInvalidatedForTargetingException& ex) {
auto catalogCache = Grid::get(opCtx)->catalogCache();
catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true);
@@ -886,26 +887,9 @@ void runCommand(OperationContext* opCtx,
errorBuilder->appendElements(errorLabels);
throw;
}
-}
-
-/**
- * Attaches the topology version to the response.
- */
-void attachTopologyVersionDuringShutdown(OperationContext* opCtx,
- const DBException& ex,
- BSONObjBuilder* errorBuilder) {
- // Only attach the topology version if the mongos is in quiesce mode. If the mongos is in
- // quiesce mode, this shutdown error is due to mongos rather than a shard.
- auto code = ex.code();
- if (code && ErrorCodes::isA<ErrorCategory::ShutdownError>(code)) {
- if (auto mongosTopCoord = MongosTopologyCoordinator::get(opCtx);
- mongosTopCoord && mongosTopCoord->inQuiesceMode()) {
- // Append the topology version to the response.
- const auto topologyVersion = mongosTopCoord->getTopologyVersion();
- BSONObjBuilder topologyVersionBuilder(errorBuilder->subobjStart("topologyVersion"));
- topologyVersion.serialize(&topologyVersionBuilder);
- }
- }
+ return Status::OK();
+} catch (const DBException& e) {
+ return e.toStatus();
}
} // namespace
@@ -1028,79 +1012,120 @@ DbResponse Strategy::queryOp(OperationContext* opCtx, const NamespaceString& nss
cursorId)};
}
-Future<DbResponse> Strategy::clientCommand(std::shared_ptr<RequestExecutionContext> rec) try {
- auto opCtx = rec->getOpCtx();
- const Message& m = rec->getMessage();
- auto reply = rpc::makeReplyBuilder(rpc::protocolForMessage(m));
- BSONObjBuilder errorBuilder;
+// Maintains the state required to execute client commands, and provides the interface to construct
+// a future-chain that runs the command against the database.
+class ClientCommand final : public std::enable_shared_from_this<ClientCommand> {
+public:
+ ClientCommand(ClientCommand&&) = delete;
+ ClientCommand(const ClientCommand&) = delete;
- bool propagateException = false;
+ explicit ClientCommand(std::shared_ptr<RequestExecutionContext> rec)
+ : _rec(std::move(rec)), _errorBuilder(std::make_shared<BSONObjBuilder>()) {}
- try {
- // Parse.
- OpMsgRequest request = [&] {
- try {
- return rpc::opMsgRequestFromAnyProtocol(m);
- } catch (const DBException& ex) {
- // If this error needs to fail the connection, propagate it out.
- if (ErrorCodes::isConnectionFatalMessageParseError(ex.code()))
- propagateException = true;
-
- LOGV2_DEBUG(22769,
- 1,
- "Exception thrown while parsing command {error}",
- "Exception thrown while parsing command",
- "error"_attr = redact(ex));
- throw;
- }
- }();
+ // Returns the future-chain that produces the response by parsing and executing the command.
+ Future<DbResponse> run();
- // Execute.
- std::string db = request.getDatabase().toString();
- try {
- LOGV2_DEBUG(22770,
- 3,
- "Command begin db: {db} msg id: {headerId}",
- "Command begin",
- "db"_attr = db,
- "headerId"_attr = m.header().getId());
- runCommand(opCtx, request, m, reply.get(), &errorBuilder);
+private:
+ void _parse();
+
+ Future<void> _execute();
+
+ // Handler for exceptions thrown during parsing and executing the command.
+ Future<void> _handleException(Status);
+
+ // Extracts the command response from the replyBuilder.
+ DbResponse _produceResponse();
+
+ const std::shared_ptr<RequestExecutionContext> _rec;
+ const std::shared_ptr<BSONObjBuilder> _errorBuilder;
+
+ bool _propagateException = false;
+};
+
+void ClientCommand::_parse() try {
+ const auto& msg = _rec->getMessage();
+ _rec->setReplyBuilder(rpc::makeReplyBuilder(rpc::protocolForMessage(msg)));
+ _rec->setRequest(rpc::opMsgRequestFromAnyProtocol(msg));
+} catch (const DBException& ex) {
+ // If this error needs to fail the connection, propagate it out.
+ if (ErrorCodes::isConnectionFatalMessageParseError(ex.code()))
+ _propagateException = true;
+
+ LOGV2_DEBUG(22769,
+ 1,
+ "Exception thrown while parsing command {error}",
+ "Exception thrown while parsing command",
+ "error"_attr = redact(ex));
+ throw;
+}
+
+Future<void> ClientCommand::_execute() {
+ LOGV2_DEBUG(22770,
+ 3,
+ "Command begin db: {db} msg id: {headerId}",
+ "Command begin",
+ "db"_attr = _rec->getRequest().getDatabase().toString(),
+ "headerId"_attr = _rec->getMessage().header().getId());
+
+ return runCommand(_rec, _errorBuilder)
+ .then([this, anchor = shared_from_this()] {
LOGV2_DEBUG(22771,
3,
"Command end db: {db} msg id: {headerId}",
"Command end",
- "db"_attr = db,
- "headerId"_attr = m.header().getId());
- } catch (const DBException& ex) {
+ "db"_attr = _rec->getRequest().getDatabase().toString(),
+ "headerId"_attr = _rec->getMessage().header().getId());
+ })
+ .tapError([this, anchor = shared_from_this()](Status status) {
LOGV2_DEBUG(
22772,
1,
"Exception thrown while processing command on {db} msg id: {headerId} {error}",
"Exception thrown while processing command",
- "db"_attr = db,
- "headerId"_attr = m.header().getId(),
- "error"_attr = redact(ex));
+ "db"_attr = _rec->getRequest().getDatabase().toString(),
+ "headerId"_attr = _rec->getMessage().header().getId(),
+ "error"_attr = redact(status));
// Record the exception in CurOp.
- CurOp::get(opCtx)->debug().errInfo = ex.toStatus();
- throw;
- }
- } catch (const DBException& ex) {
- if (propagateException) {
- throw;
- }
+ CurOp::get(_rec->getOpCtx())->debug().errInfo = std::move(status);
+ });
+}
- reply->reset();
- auto bob = reply->getBodyBuilder();
- CommandHelpers::appendCommandStatusNoThrow(bob, ex.toStatus());
- appendRequiredFieldsToResponse(opCtx, &bob);
+Future<void> ClientCommand::_handleException(Status status) {
+ if (_propagateException) {
+ return status;
+ }
- attachTopologyVersionDuringShutdown(opCtx, ex, &errorBuilder);
- bob.appendElements(errorBuilder.obj());
+ auto opCtx = _rec->getOpCtx();
+ auto reply = _rec->getReplyBuilder();
+
+ reply->reset();
+ auto bob = reply->getBodyBuilder();
+ CommandHelpers::appendCommandStatusNoThrow(bob, status);
+ appendRequiredFieldsToResponse(opCtx, &bob);
+
+ // Only attach the topology version to the response if mongos is in quiesce mode. If mongos is
+ // in quiesce mode, this shutdown error is due to mongos rather than a shard.
+ if (ErrorCodes::isA<ErrorCategory::ShutdownError>(status)) {
+ if (auto mongosTopCoord = MongosTopologyCoordinator::get(opCtx);
+ mongosTopCoord && mongosTopCoord->inQuiesceMode()) {
+ // Append the topology version to the response.
+ const auto topologyVersion = mongosTopCoord->getTopologyVersion();
+ BSONObjBuilder topologyVersionBuilder(_errorBuilder->subobjStart("topologyVersion"));
+ topologyVersion.serialize(&topologyVersionBuilder);
+ }
}
+ bob.appendElements(_errorBuilder->obj());
+ return Status::OK();
+}
+
+DbResponse ClientCommand::_produceResponse() {
+ const auto& m = _rec->getMessage();
+ auto reply = _rec->getReplyBuilder();
+
if (OpMsg::isFlagSet(m, OpMsg::kMoreToCome)) {
- return DbResponse{}; // Don't reply.
+ return {}; // Don't reply.
}
DbResponse dbResponse;
@@ -1114,8 +1139,24 @@ Future<DbResponse> Strategy::clientCommand(std::shared_ptr<RequestExecutionConte
dbResponse.response = reply->done();
return dbResponse;
-} catch (const DBException& e) {
- return e.toStatus();
+}
+
+Future<DbResponse> ClientCommand::run() {
+ auto pf = makePromiseFuture<void>();
+ auto future = std::move(pf.future)
+ .then([this, anchor = shared_from_this()] { _parse(); })
+ .then([this, anchor = shared_from_this()] { return _execute(); })
+ .onError([this, anchor = shared_from_this()](Status status) {
+ return _handleException(std::move(status));
+ })
+ .then([this, anchor = shared_from_this()] { return _produceResponse(); });
+ pf.promise.emplaceValue();
+ return future;
+}
+
+Future<DbResponse> Strategy::clientCommand(std::shared_ptr<RequestExecutionContext> rec) {
+ auto instance = std::make_shared<ClientCommand>(std::move(rec));
+ return instance->run();
}
DbResponse Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm) {
@@ -1238,29 +1279,27 @@ void Strategy::killCursors(OperationContext* opCtx, DbMessage* dbm) {
}
}
-void Strategy::writeOp(OperationContext* opCtx, DbMessage* dbm) {
- const auto& msg = dbm->msg();
- rpc::OpMsgReplyBuilder reply;
- BSONObjBuilder errorBuilder;
- runCommand(opCtx,
- [&]() {
- switch (msg.operation()) {
- case dbInsert: {
- return InsertOp::parseLegacy(msg).serialize({});
- }
- case dbUpdate: {
- return UpdateOp::parseLegacy(msg).serialize({});
- }
- case dbDelete: {
- return DeleteOp::parseLegacy(msg).serialize({});
- }
- default:
- MONGO_UNREACHABLE;
- }
- }(),
- msg,
- &reply,
- &errorBuilder); // built objects are ignored
+void Strategy::writeOp(std::shared_ptr<RequestExecutionContext> rec) {
+ rec->setRequest([msg = rec->getMessage()]() {
+ switch (msg.operation()) {
+ case dbInsert: {
+ return InsertOp::parseLegacy(msg).serialize({});
+ }
+ case dbUpdate: {
+ return UpdateOp::parseLegacy(msg).serialize({});
+ }
+ case dbDelete: {
+ return DeleteOp::parseLegacy(msg).serialize({});
+ }
+ default:
+ MONGO_UNREACHABLE;
+ }
+ }());
+
+ rec->setReplyBuilder(std::make_unique<rpc::OpMsgReplyBuilder>());
+ runCommand(std::move(rec),
+ std::make_shared<BSONObjBuilder>()) // built objects are ignored
+ .get();
}
void Strategy::explainFind(OperationContext* opCtx,
diff --git a/src/mongo/s/commands/strategy.h b/src/mongo/s/commands/strategy.h
index 85fafda1acb..95f3f8c9449 100644
--- a/src/mongo/s/commands/strategy.h
+++ b/src/mongo/s/commands/strategy.h
@@ -75,7 +75,7 @@ public:
* with the result from the operation. Doesn't send any response back and does not throw on
* errors.
*/
- static void writeOp(OperationContext* opCtx, DbMessage* dbm);
+ static void writeOp(std::shared_ptr<RequestExecutionContext> rec);
/**
* Executes a command from either OP_QUERY or OP_MSG wire protocols.
diff --git a/src/mongo/s/service_entry_point_mongos.cpp b/src/mongo/s/service_entry_point_mongos.cpp
index 870d58968aa..e731dc9589f 100644
--- a/src/mongo/s/service_entry_point_mongos.cpp
+++ b/src/mongo/s/service_entry_point_mongos.cpp
@@ -236,7 +236,7 @@ struct KillCursorsOpRunner final : public OpRunner {
struct WriteOpRunner final : public OpRunner {
using OpRunner::OpRunner;
DbResponse runOperation() override {
- Strategy::writeOp(hr->rec->getOpCtx(), &hr->rec->getDbMessage()); // No Response.
+ Strategy::writeOp(hr->rec); // No Response.
return {};
}
};