summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-09-25 18:16:01 +0000
committerAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-09-25 18:16:01 +0000
commit4c283d5c34ba9a9ba2ede5fa066dcdd9ab337651 (patch)
tree5f0ab1791145cba843a6dd26aa36d313bc64a141
parentefb3f7689e9f6c54f039dab068c4c279cf87ac99 (diff)
downloadmongo-4c283d5c34ba9a9ba2ede5fa066dcdd9ab337651.tar.gz
SERVER-49107 Futurize and refactor receivedCommands()
-rw-r--r--src/mongo/db/service_entry_point_common.cpp256
1 files changed, 141 insertions, 115 deletions
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index f1569ffb0fb..085595c229b 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -268,22 +268,23 @@ void generateLegacyQueryErrorResponse(const AssertionException& exception,
response->setData(bb.release());
}
-void registerError(OperationContext* opCtx, const DBException& exception) {
- LastError::get(opCtx->getClient()).setLastError(exception.code(), exception.reason());
- CurOp::get(opCtx)->debug().errInfo = exception.toStatus();
+void registerError(OperationContext* opCtx, const Status& status) {
+ LastError::get(opCtx->getClient()).setLastError(status.code(), status.reason());
+ CurOp::get(opCtx)->debug().errInfo = status;
}
void generateErrorResponse(OperationContext* opCtx,
rpc::ReplyBuilderInterface* replyBuilder,
- const DBException& exception,
+ const Status& status,
const BSONObj& replyMetadata,
BSONObj extraFields = {}) {
- registerError(opCtx, exception);
+ invariant(!status.isOK());
+ registerError(opCtx, status);
// We could have thrown an exception after setting fields in the builder,
// so we need to reset it to a clean state just to be sure.
replyBuilder->reset();
- replyBuilder->setCommandReply(exception.toStatus(), extraFields);
+ replyBuilder->setCommandReply(status, extraFields);
replyBuilder->getBodyBuilder().appendElements(replyMetadata);
}
@@ -1357,7 +1358,8 @@ void execCommandDatabase(OperationContext* opCtx,
ServiceEntryPointCommon::getRedactedCopyForLogging(command, request.body)),
"error"_attr = redact(e.toString()));
- generateErrorResponse(opCtx, replyBuilder, e, metadataBob.obj(), extraFieldsBuilder.obj());
+ generateErrorResponse(
+ opCtx, replyBuilder, e.toStatus(), metadataBob.obj(), extraFieldsBuilder.obj());
if (ErrorCodes::isA<ErrorCategory::CloseConnectionError>(e.code())) {
// Rethrow the exception to the top to signal that the client connection should be
@@ -1384,112 +1386,115 @@ void curOpCommandSetup(OperationContext* opCtx, const OpMsgRequest& request) {
curop->setNS_inlock(nss.ns());
}
-DbResponse receivedCommands(OperationContext* opCtx,
- const Message& message,
- const ServiceEntryPointCommon::Hooks& behaviors) {
- auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message));
- OpMsgRequest request;
- Command* c = nullptr;
- [&] {
- try { // Parse.
- request = rpc::opMsgRequestFromAnyProtocol(message);
- } catch (const DBException& ex) {
- // If this error needs to fail the connection, propagate it out.
- if (ErrorCodes::isConnectionFatalMessageParseError(ex.code()))
- throw;
-
- BSONObjBuilder metadataBob;
- behaviors.appendReplyMetadataOnError(opCtx, &metadataBob);
-
- BSONObjBuilder extraFieldsBuilder;
- appendClusterAndOperationTime(
- opCtx, &extraFieldsBuilder, &metadataBob, LogicalTime::kUninitialized);
-
- // Otherwise, reply with the parse error. This is useful for cases where parsing fails
- // due to user-supplied input, such as the document too deep error. Since we failed
- // during parsing, we can't log anything about the command.
- LOGV2_DEBUG(21963,
- 1,
- "Assertion while parsing command: {error}",
- "Assertion while parsing command",
- "error"_attr = ex.toString());
- generateErrorResponse(
- opCtx, replyBuilder.get(), ex, metadataBob.obj(), extraFieldsBuilder.obj());
-
- return; // From lambda. Don't try executing if parsing failed.
- }
-
- try { // Execute.
- curOpCommandSetup(opCtx, request);
-
- // In the absence of a Command object, no redaction is possible. Therefore
- // to avoid displaying potentially sensitive information in the logs,
- // we restrict the log message to the name of the unrecognized command.
- // However, the complete command object will still be echoed to the client.
- if (!(c = CommandHelpers::findCommand(request.getCommandName()))) {
- globalCommandRegistry()->incrementUnknownCommands();
- std::string msg = str::stream()
- << "no such command: '" << request.getCommandName() << "'";
- LOGV2_DEBUG(21964,
- 2,
- "No such command: {command}",
- "Command not found in registry",
- "command"_attr = request.getCommandName());
- uasserted(ErrorCodes::CommandNotFound, str::stream() << msg);
- }
+Future<void> parseCommand(std::shared_ptr<HandleRequest::ExecutionContext> execContext) try {
+ execContext->setRequest(rpc::opMsgRequestFromAnyProtocol(execContext->getMessage()));
+ return Status::OK();
+} catch (const DBException& ex) {
+ // Need to set request as `makeCommandResponse` expects an empty request on failure.
+ execContext->setRequest({});
+
+ // Otherwise, reply with the parse error. This is useful for cases where parsing fails due to
+ // user-supplied input, such as the document too deep error. Since we failed during parsing, we
+ // can't log anything about the command.
+ LOGV2_DEBUG(21963,
+ 1,
+ "Assertion while parsing command: {error}",
+ "Assertion while parsing command",
+ "error"_attr = ex.toString());
- LOGV2_DEBUG(21965,
- 2,
- "Run command {db}.$cmd {commandArgs}",
- "About to run the command",
- "db"_attr = request.getDatabase(),
- "commandArgs"_attr = redact(
- ServiceEntryPointCommon::getRedactedCopyForLogging(c, request.body)));
+ return ex.toStatus();
+}
- {
- // Try to set this as early as possible, as soon as we have figured out the
- // command.
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- CurOp::get(opCtx)->setLogicalOp_inlock(c->getLogicalOp());
- }
+Future<void> executeCommand(std::shared_ptr<HandleRequest::ExecutionContext> execContext) {
+ auto [past, present] = makePromiseFuture<void>();
+ auto future =
+ std::move(present)
+ .then([execContext]() -> Future<void> {
+ // Prepare environment for command execution (e.g., find command object in registry)
+ auto opCtx = execContext->getOpCtx();
+ auto& request = execContext->getRequest();
+ curOpCommandSetup(opCtx, request);
+
+ // In the absence of a Command object, no redaction is possible. Therefore to avoid
+ // displaying potentially sensitive information in the logs, we restrict the log
+ // message to the name of the unrecognized command. However, the complete command
+ // object will still be echoed to the client.
+ if (execContext->setCommand(CommandHelpers::findCommand(request.getCommandName()));
+ !execContext->getCommand()) {
+ globalCommandRegistry()->incrementUnknownCommands();
+ LOGV2_DEBUG(21964,
+ 2,
+ "No such command: {command}",
+ "Command not found in registry",
+ "command"_attr = request.getCommandName());
+ return Status(ErrorCodes::CommandNotFound,
+ fmt::format("no such command: '{}'", request.getCommandName()));
+ }
- opCtx->setExhaust(OpMsg::isFlagSet(message, OpMsg::kExhaustSupported));
+ Command* c = execContext->getCommand();
+ LOGV2_DEBUG(
+ 21965,
+ 2,
+ "Run command {db}.$cmd {commandArgs}",
+ "About to run the command",
+ "db"_attr = request.getDatabase(),
+ "commandArgs"_attr = redact(
+ ServiceEntryPointCommon::getRedactedCopyForLogging(c, request.body)));
- const auto session = opCtx->getClient()->session();
- if (session) {
- if (!opCtx->isExhaust() || c->getName() != "hello"_sd) {
- InExhaustIsMaster::get(session.get())->setInExhaustIsMaster(false);
+ {
+ // Try to set this as early as possible, as soon as we have figured out the
+ // command.
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ CurOp::get(opCtx)->setLogicalOp_inlock(c->getLogicalOp());
}
- }
- execCommandDatabase(opCtx, c, request, replyBuilder.get(), behaviors);
- } catch (const DBException& ex) {
- BSONObjBuilder metadataBob;
- behaviors.appendReplyMetadataOnError(opCtx, &metadataBob);
+ opCtx->setExhaust(
+ OpMsg::isFlagSet(execContext->getMessage(), OpMsg::kExhaustSupported));
- BSONObjBuilder extraFieldsBuilder;
- appendClusterAndOperationTime(
- opCtx, &extraFieldsBuilder, &metadataBob, LogicalTime::kUninitialized);
+ const auto session = opCtx->getClient()->session();
+ if (session) {
+ if (!opCtx->isExhaust() || c->getName() != "hello"_sd) {
+ InExhaustIsMaster::get(session.get())->setInExhaustIsMaster(false);
+ }
+ }
- LOGV2_DEBUG(21966,
- 1,
- "Assertion while executing command '{command}' on database '{db}': {error}",
- "Assertion while executing command",
- "command"_attr = request.getCommandName(),
- "db"_attr = request.getDatabase(),
- "error"_attr = ex.toString());
+ // Hello should take kMaxAwaitTimeMs at most, log if it takes twice that.
+ if (c->getName() == "hello") {
+ execContext->slowMsOverride =
+ 2 * durationCount<Milliseconds>(SingleServerIsMasterMonitor::kMaxAwaitTime);
+ }
- generateErrorResponse(
- opCtx, replyBuilder.get(), ex, metadataBob.obj(), extraFieldsBuilder.obj());
+ return Status::OK();
+ })
+ .then([execContext]() -> Future<void> {
+ execCommandDatabase(execContext->getOpCtx(),
+ execContext->getCommand(),
+ execContext->getRequest(),
+ execContext->getReplyBuilder(),
+ *execContext->behaviors);
+ return Status::OK();
+ })
+ .tapError([execContext](Status status) {
+ LOGV2_DEBUG(
+ 21966,
+ 1,
+ "Assertion while executing command '{command}' on database '{db}': {error}",
+ "Assertion while executing command",
+ "command"_attr = execContext->getRequest().getCommandName(),
+ "db"_attr = execContext->getRequest().getDatabase(),
+ "error"_attr = status.toString());
+ });
+ past.emplaceValue();
+ return future;
+}
- if (ErrorCodes::isA<ErrorCategory::CloseConnectionError>(ex.code())) {
- // Rethrow the exception to the top to signal that the client connection should be
- // closed.
- throw;
- }
- }
- }();
+DbResponse makeCommandResponse(std::shared_ptr<HandleRequest::ExecutionContext> execContext) {
+ auto opCtx = execContext->getOpCtx();
+ const Message& message = execContext->getMessage();
+ OpMsgRequest request = execContext->getRequest();
+ const Command* c = execContext->getCommand();
+ auto replyBuilder = execContext->getReplyBuilder();
if (OpMsg::isFlagSet(message, OpMsg::kMoreToCome)) {
// Close the connection to get client to go through server selection again.
@@ -1522,6 +1527,38 @@ DbResponse receivedCommands(OperationContext* opCtx,
return dbResponse;
}
+Future<DbResponse> receivedCommands(std::shared_ptr<HandleRequest::ExecutionContext> execContext) {
+ execContext->setReplyBuilder(
+ rpc::makeReplyBuilder(rpc::protocolForMessage(execContext->getMessage())));
+ return parseCommand(execContext)
+ .then([execContext]() { return executeCommand(std::move(execContext)); })
+ .onError([execContext](Status status) {
+ if (ErrorCodes::isConnectionFatalMessageParseError(status.code())) {
+ // If this error needs to fail the connection, propagate it out.
+ internalAssert(status);
+ }
+
+ auto opCtx = execContext->getOpCtx();
+ BSONObjBuilder metadataBob;
+ execContext->behaviors->appendReplyMetadataOnError(opCtx, &metadataBob);
+
+ BSONObjBuilder extraFieldsBuilder;
+ appendClusterAndOperationTime(
+ opCtx, &extraFieldsBuilder, &metadataBob, LogicalTime::kUninitialized);
+
+ auto replyBuilder = execContext->getReplyBuilder();
+ generateErrorResponse(
+ opCtx, replyBuilder, status, metadataBob.obj(), extraFieldsBuilder.obj());
+
+ if (ErrorCodes::isA<ErrorCategory::CloseConnectionError>(status.code())) {
+ // Return the exception to the top to signal that the client connection should be
+ // closed.
+ internalAssert(status);
+ }
+ })
+ .then([execContext] { return makeCommandResponse(std::move(execContext)); });
+}
+
DbResponse receivedQuery(OperationContext* opCtx,
const NamespaceString& nss,
Client& c,
@@ -1737,19 +1774,8 @@ DbResponse receivedGetMore(OperationContext* opCtx,
struct CommandOpRunner : HandleRequest::OpRunner {
using HandleRequest::OpRunner::OpRunner;
- Future<DbResponse> run() override try {
- DbResponse r = receivedCommands(executionContext->getOpCtx(),
- executionContext->getMessage(),
- *executionContext->behaviors);
- // Hello should take kMaxAwaitTimeMs at most, log if it takes twice that.
- if (auto command = executionContext->currentOp().getCommand();
- command && (command->getName() == "hello")) {
- executionContext->slowMsOverride =
- 2 * durationCount<Milliseconds>(SingleServerIsMasterMonitor::kMaxAwaitTime);
- }
- return r;
- } catch (const DBException& ex) {
- return ex.toStatus();
+ Future<DbResponse> run() override {
+ return receivedCommands(executionContext);
}
};