diff options
author | Amirsaman Memaripour <amirsaman.memaripour@mongodb.com> | 2020-09-25 18:16:01 +0000 |
---|---|---|
committer | Amirsaman Memaripour <amirsaman.memaripour@mongodb.com> | 2020-09-25 18:16:01 +0000 |
commit | 4c283d5c34ba9a9ba2ede5fa066dcdd9ab337651 (patch) | |
tree | 5f0ab1791145cba843a6dd26aa36d313bc64a141 | |
parent | efb3f7689e9f6c54f039dab068c4c279cf87ac99 (diff) | |
download | mongo-4c283d5c34ba9a9ba2ede5fa066dcdd9ab337651.tar.gz |
SERVER-49107 Futurize and refactor receivedCommands()
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 256 |
1 files changed, 141 insertions, 115 deletions
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index f1569ffb0fb..085595c229b 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -268,22 +268,23 @@ void generateLegacyQueryErrorResponse(const AssertionException& exception, response->setData(bb.release()); } -void registerError(OperationContext* opCtx, const DBException& exception) { - LastError::get(opCtx->getClient()).setLastError(exception.code(), exception.reason()); - CurOp::get(opCtx)->debug().errInfo = exception.toStatus(); +void registerError(OperationContext* opCtx, const Status& status) { + LastError::get(opCtx->getClient()).setLastError(status.code(), status.reason()); + CurOp::get(opCtx)->debug().errInfo = status; } void generateErrorResponse(OperationContext* opCtx, rpc::ReplyBuilderInterface* replyBuilder, - const DBException& exception, + const Status& status, const BSONObj& replyMetadata, BSONObj extraFields = {}) { - registerError(opCtx, exception); + invariant(!status.isOK()); + registerError(opCtx, status); // We could have thrown an exception after setting fields in the builder, // so we need to reset it to a clean state just to be sure. replyBuilder->reset(); - replyBuilder->setCommandReply(exception.toStatus(), extraFields); + replyBuilder->setCommandReply(status, extraFields); replyBuilder->getBodyBuilder().appendElements(replyMetadata); } @@ -1357,7 +1358,8 @@ void execCommandDatabase(OperationContext* opCtx, ServiceEntryPointCommon::getRedactedCopyForLogging(command, request.body)), "error"_attr = redact(e.toString())); - generateErrorResponse(opCtx, replyBuilder, e, metadataBob.obj(), extraFieldsBuilder.obj()); + generateErrorResponse( + opCtx, replyBuilder, e.toStatus(), metadataBob.obj(), extraFieldsBuilder.obj()); if (ErrorCodes::isA<ErrorCategory::CloseConnectionError>(e.code())) { // Rethrow the exception to the top to signal that the client connection should be @@ -1384,112 +1386,115 @@ void curOpCommandSetup(OperationContext* opCtx, const OpMsgRequest& request) { curop->setNS_inlock(nss.ns()); } -DbResponse receivedCommands(OperationContext* opCtx, - const Message& message, - const ServiceEntryPointCommon::Hooks& behaviors) { - auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message)); - OpMsgRequest request; - Command* c = nullptr; - [&] { - try { // Parse. - request = rpc::opMsgRequestFromAnyProtocol(message); - } catch (const DBException& ex) { - // If this error needs to fail the connection, propagate it out. - if (ErrorCodes::isConnectionFatalMessageParseError(ex.code())) - throw; - - BSONObjBuilder metadataBob; - behaviors.appendReplyMetadataOnError(opCtx, &metadataBob); - - BSONObjBuilder extraFieldsBuilder; - appendClusterAndOperationTime( - opCtx, &extraFieldsBuilder, &metadataBob, LogicalTime::kUninitialized); - - // Otherwise, reply with the parse error. This is useful for cases where parsing fails - // due to user-supplied input, such as the document too deep error. Since we failed - // during parsing, we can't log anything about the command. - LOGV2_DEBUG(21963, - 1, - "Assertion while parsing command: {error}", - "Assertion while parsing command", - "error"_attr = ex.toString()); - generateErrorResponse( - opCtx, replyBuilder.get(), ex, metadataBob.obj(), extraFieldsBuilder.obj()); - - return; // From lambda. Don't try executing if parsing failed. - } - - try { // Execute. - curOpCommandSetup(opCtx, request); - - // In the absence of a Command object, no redaction is possible. Therefore - // to avoid displaying potentially sensitive information in the logs, - // we restrict the log message to the name of the unrecognized command. - // However, the complete command object will still be echoed to the client. - if (!(c = CommandHelpers::findCommand(request.getCommandName()))) { - globalCommandRegistry()->incrementUnknownCommands(); - std::string msg = str::stream() - << "no such command: '" << request.getCommandName() << "'"; - LOGV2_DEBUG(21964, - 2, - "No such command: {command}", - "Command not found in registry", - "command"_attr = request.getCommandName()); - uasserted(ErrorCodes::CommandNotFound, str::stream() << msg); - } +Future<void> parseCommand(std::shared_ptr<HandleRequest::ExecutionContext> execContext) try { + execContext->setRequest(rpc::opMsgRequestFromAnyProtocol(execContext->getMessage())); + return Status::OK(); +} catch (const DBException& ex) { + // Need to set request as `makeCommandResponse` expects an empty request on failure. + execContext->setRequest({}); + + // Otherwise, reply with the parse error. This is useful for cases where parsing fails due to + // user-supplied input, such as the document too deep error. Since we failed during parsing, we + // can't log anything about the command. + LOGV2_DEBUG(21963, + 1, + "Assertion while parsing command: {error}", + "Assertion while parsing command", + "error"_attr = ex.toString()); - LOGV2_DEBUG(21965, - 2, - "Run command {db}.$cmd {commandArgs}", - "About to run the command", - "db"_attr = request.getDatabase(), - "commandArgs"_attr = redact( - ServiceEntryPointCommon::getRedactedCopyForLogging(c, request.body))); + return ex.toStatus(); +} - { - // Try to set this as early as possible, as soon as we have figured out the - // command. - stdx::lock_guard<Client> lk(*opCtx->getClient()); - CurOp::get(opCtx)->setLogicalOp_inlock(c->getLogicalOp()); - } +Future<void> executeCommand(std::shared_ptr<HandleRequest::ExecutionContext> execContext) { + auto [past, present] = makePromiseFuture<void>(); + auto future = + std::move(present) + .then([execContext]() -> Future<void> { + // Prepare environment for command execution (e.g., find command object in registry) + auto opCtx = execContext->getOpCtx(); + auto& request = execContext->getRequest(); + curOpCommandSetup(opCtx, request); + + // In the absence of a Command object, no redaction is possible. Therefore to avoid + // displaying potentially sensitive information in the logs, we restrict the log + // message to the name of the unrecognized command. However, the complete command + // object will still be echoed to the client. + if (execContext->setCommand(CommandHelpers::findCommand(request.getCommandName())); + !execContext->getCommand()) { + globalCommandRegistry()->incrementUnknownCommands(); + LOGV2_DEBUG(21964, + 2, + "No such command: {command}", + "Command not found in registry", + "command"_attr = request.getCommandName()); + return Status(ErrorCodes::CommandNotFound, + fmt::format("no such command: '{}'", request.getCommandName())); + } - opCtx->setExhaust(OpMsg::isFlagSet(message, OpMsg::kExhaustSupported)); + Command* c = execContext->getCommand(); + LOGV2_DEBUG( + 21965, + 2, + "Run command {db}.$cmd {commandArgs}", + "About to run the command", + "db"_attr = request.getDatabase(), + "commandArgs"_attr = redact( + ServiceEntryPointCommon::getRedactedCopyForLogging(c, request.body))); - const auto session = opCtx->getClient()->session(); - if (session) { - if (!opCtx->isExhaust() || c->getName() != "hello"_sd) { - InExhaustIsMaster::get(session.get())->setInExhaustIsMaster(false); + { + // Try to set this as early as possible, as soon as we have figured out the + // command. + stdx::lock_guard<Client> lk(*opCtx->getClient()); + CurOp::get(opCtx)->setLogicalOp_inlock(c->getLogicalOp()); } - } - execCommandDatabase(opCtx, c, request, replyBuilder.get(), behaviors); - } catch (const DBException& ex) { - BSONObjBuilder metadataBob; - behaviors.appendReplyMetadataOnError(opCtx, &metadataBob); + opCtx->setExhaust( + OpMsg::isFlagSet(execContext->getMessage(), OpMsg::kExhaustSupported)); - BSONObjBuilder extraFieldsBuilder; - appendClusterAndOperationTime( - opCtx, &extraFieldsBuilder, &metadataBob, LogicalTime::kUninitialized); + const auto session = opCtx->getClient()->session(); + if (session) { + if (!opCtx->isExhaust() || c->getName() != "hello"_sd) { + InExhaustIsMaster::get(session.get())->setInExhaustIsMaster(false); + } + } - LOGV2_DEBUG(21966, - 1, - "Assertion while executing command '{command}' on database '{db}': {error}", - "Assertion while executing command", - "command"_attr = request.getCommandName(), - "db"_attr = request.getDatabase(), - "error"_attr = ex.toString()); + // Hello should take kMaxAwaitTimeMs at most, log if it takes twice that. + if (c->getName() == "hello") { + execContext->slowMsOverride = + 2 * durationCount<Milliseconds>(SingleServerIsMasterMonitor::kMaxAwaitTime); + } - generateErrorResponse( - opCtx, replyBuilder.get(), ex, metadataBob.obj(), extraFieldsBuilder.obj()); + return Status::OK(); + }) + .then([execContext]() -> Future<void> { + execCommandDatabase(execContext->getOpCtx(), + execContext->getCommand(), + execContext->getRequest(), + execContext->getReplyBuilder(), + *execContext->behaviors); + return Status::OK(); + }) + .tapError([execContext](Status status) { + LOGV2_DEBUG( + 21966, + 1, + "Assertion while executing command '{command}' on database '{db}': {error}", + "Assertion while executing command", + "command"_attr = execContext->getRequest().getCommandName(), + "db"_attr = execContext->getRequest().getDatabase(), + "error"_attr = status.toString()); + }); + past.emplaceValue(); + return future; +} - if (ErrorCodes::isA<ErrorCategory::CloseConnectionError>(ex.code())) { - // Rethrow the exception to the top to signal that the client connection should be - // closed. - throw; - } - } - }(); +DbResponse makeCommandResponse(std::shared_ptr<HandleRequest::ExecutionContext> execContext) { + auto opCtx = execContext->getOpCtx(); + const Message& message = execContext->getMessage(); + OpMsgRequest request = execContext->getRequest(); + const Command* c = execContext->getCommand(); + auto replyBuilder = execContext->getReplyBuilder(); if (OpMsg::isFlagSet(message, OpMsg::kMoreToCome)) { // Close the connection to get client to go through server selection again. @@ -1522,6 +1527,38 @@ DbResponse receivedCommands(OperationContext* opCtx, return dbResponse; } +Future<DbResponse> receivedCommands(std::shared_ptr<HandleRequest::ExecutionContext> execContext) { + execContext->setReplyBuilder( + rpc::makeReplyBuilder(rpc::protocolForMessage(execContext->getMessage()))); + return parseCommand(execContext) + .then([execContext]() { return executeCommand(std::move(execContext)); }) + .onError([execContext](Status status) { + if (ErrorCodes::isConnectionFatalMessageParseError(status.code())) { + // If this error needs to fail the connection, propagate it out. + internalAssert(status); + } + + auto opCtx = execContext->getOpCtx(); + BSONObjBuilder metadataBob; + execContext->behaviors->appendReplyMetadataOnError(opCtx, &metadataBob); + + BSONObjBuilder extraFieldsBuilder; + appendClusterAndOperationTime( + opCtx, &extraFieldsBuilder, &metadataBob, LogicalTime::kUninitialized); + + auto replyBuilder = execContext->getReplyBuilder(); + generateErrorResponse( + opCtx, replyBuilder, status, metadataBob.obj(), extraFieldsBuilder.obj()); + + if (ErrorCodes::isA<ErrorCategory::CloseConnectionError>(status.code())) { + // Return the exception to the top to signal that the client connection should be + // closed. + internalAssert(status); + } + }) + .then([execContext] { return makeCommandResponse(std::move(execContext)); }); +} + DbResponse receivedQuery(OperationContext* opCtx, const NamespaceString& nss, Client& c, @@ -1737,19 +1774,8 @@ DbResponse receivedGetMore(OperationContext* opCtx, struct CommandOpRunner : HandleRequest::OpRunner { using HandleRequest::OpRunner::OpRunner; - Future<DbResponse> run() override try { - DbResponse r = receivedCommands(executionContext->getOpCtx(), - executionContext->getMessage(), - *executionContext->behaviors); - // Hello should take kMaxAwaitTimeMs at most, log if it takes twice that. - if (auto command = executionContext->currentOp().getCommand(); - command && (command->getName() == "hello")) { - executionContext->slowMsOverride = - 2 * durationCount<Milliseconds>(SingleServerIsMasterMonitor::kMaxAwaitTime); - } - return r; - } catch (const DBException& ex) { - return ex.toStatus(); + Future<DbResponse> run() override { + return receivedCommands(executionContext); } }; |