diff options
author | Amirsaman Memaripour <amirsaman.memaripour@mongodb.com> | 2020-11-05 02:21:26 +0000 |
---|---|---|
committer | Amirsaman Memaripour <amirsaman.memaripour@mongodb.com> | 2020-11-05 02:21:26 +0000 |
commit | f4373b85f0f394c485cbff447312c567924ba5e3 (patch) | |
tree | 59d7311cf50bac0590522722ec1fe78f03230197 | |
parent | 51067e095275ba5a2454be993c83a93e526e18ce (diff) | |
download | mongo-f4373b85f0f394c485cbff447312c567924ba5e3.tar.gz |
SERVER-51690 Futurize clientCommand to support async command execution
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 265 | ||||
-rw-r--r-- | src/mongo/s/commands/strategy.h | 2 | ||||
-rw-r--r-- | src/mongo/s/service_entry_point_mongos.cpp | 2 |
3 files changed, 154 insertions, 115 deletions
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index eb386f3763b..d294ee93cf7 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -308,11 +308,12 @@ MONGO_FAIL_POINT_DEFINE(doNotRefreshShardsOnRetargettingError); * Executes the command for the given request, and appends the result to replyBuilder * and error labels, if any, to errorBuilder. */ -void runCommand(OperationContext* opCtx, - const OpMsgRequest& request, - const Message& m, - rpc::ReplyBuilderInterface* replyBuilder, - BSONObjBuilder* errorBuilder) { +Future<void> runCommand(std::shared_ptr<RequestExecutionContext> rec, + std::shared_ptr<BSONObjBuilder> errorBuilder) try { + auto opCtx = rec->getOpCtx(); + const auto& request = rec->getRequest(); + const auto& m = rec->getMessage(); + auto replyBuilder = rec->getReplyBuilder(); auto const opType = m.operation(); auto const commandName = request.getCommandName(); auto const command = CommandHelpers::findCommand(commandName); @@ -323,7 +324,7 @@ void runCommand(OperationContext* opCtx, {ErrorCodes::CommandNotFound, str::stream() << "no such cmd: " << commandName}); globalCommandRegistry()->incrementUnknownCommands(); appendRequiredFieldsToResponse(opCtx, &builder); - return; + return Status::OK(); } opCtx->setExhaust(OpMsg::isFlagSet(m, OpMsg::kExhaustSupported)); @@ -400,7 +401,7 @@ void runCommand(OperationContext* opCtx, if (!readConcernParseStatus.isOK()) { auto builder = replyBuilder->getBodyBuilder(); CommandHelpers::appendCommandStatusNoThrow(builder, readConcernParseStatus); - return; + return Status::OK(); } auto& apiParams = APIParameters::get(opCtx); @@ -451,7 +452,7 @@ void runCommand(OperationContext* opCtx, CommandHelpers::appendCommandStatusNoThrow( responseBuilder, Status(ErrorCodes::InvalidOptions, "Command does not support writeConcern")); - return; + return Status::OK(); } bool clientSuppliedWriteConcern = !wc.usedDefault; @@ -489,7 +490,7 @@ void runCommand(OperationContext* opCtx, Status{ErrorCodes::InvalidOptions, "writeConcern provenance must be unset or \"{}\""_format( ReadWriteConcernProvenance::kClientSupplied)}); - return; + return Status::OK(); } // If the client didn't provide a provenance, then an appropriate value needs to be @@ -553,7 +554,7 @@ void runCommand(OperationContext* opCtx, Status{ErrorCodes::InvalidOptions, "readConcern provenance must be unset or \"{}\""_format( ReadWriteConcernProvenance::kClientSupplied)}); - return; + return Status::OK(); } // If the client didn't provide a provenance, then an appropriate value needs to be @@ -588,7 +589,7 @@ void runCommand(OperationContext* opCtx, {ErrorCodes::InvalidOptions, "The readConcern level must be either 'local' (default), 'majority' or " "'snapshot' in order to run in a transaction"}); - return; + return Status::OK(); } if (readConcernArgs.getArgsOpTime()) { auto responseBuilder = replyBuilder->getBodyBuilder(); @@ -598,7 +599,7 @@ void runCommand(OperationContext* opCtx, str::stream() << "The readConcern cannot specify '" << repl::ReadConcernArgs::kAfterOpTimeFieldName << "' in a transaction"}); - return; + return Status::OK(); } } @@ -618,7 +619,7 @@ void runCommand(OperationContext* opCtx, readConcernSupport.readConcernSupport.withContext( str::stream() << "Command " << invocation->definition()->getName() << " does not support " << readConcernArgs.toString())); - return; + return Status::OK(); } } @@ -681,7 +682,7 @@ void runCommand(OperationContext* opCtx, txnRouter.appendRecoveryToken(&responseBuilder); } - return; + return Status::OK(); } catch (ShardInvalidatedForTargetingException& ex) { auto catalogCache = Grid::get(opCtx)->catalogCache(); catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true); @@ -886,26 +887,9 @@ void runCommand(OperationContext* opCtx, errorBuilder->appendElements(errorLabels); throw; } -} - -/** - * Attaches the topology version to the response. - */ -void attachTopologyVersionDuringShutdown(OperationContext* opCtx, - const DBException& ex, - BSONObjBuilder* errorBuilder) { - // Only attach the topology version if the mongos is in quiesce mode. If the mongos is in - // quiesce mode, this shutdown error is due to mongos rather than a shard. - auto code = ex.code(); - if (code && ErrorCodes::isA<ErrorCategory::ShutdownError>(code)) { - if (auto mongosTopCoord = MongosTopologyCoordinator::get(opCtx); - mongosTopCoord && mongosTopCoord->inQuiesceMode()) { - // Append the topology version to the response. - const auto topologyVersion = mongosTopCoord->getTopologyVersion(); - BSONObjBuilder topologyVersionBuilder(errorBuilder->subobjStart("topologyVersion")); - topologyVersion.serialize(&topologyVersionBuilder); - } - } + return Status::OK(); +} catch (const DBException& e) { + return e.toStatus(); } } // namespace @@ -1028,79 +1012,120 @@ DbResponse Strategy::queryOp(OperationContext* opCtx, const NamespaceString& nss cursorId)}; } -Future<DbResponse> Strategy::clientCommand(std::shared_ptr<RequestExecutionContext> rec) try { - auto opCtx = rec->getOpCtx(); - const Message& m = rec->getMessage(); - auto reply = rpc::makeReplyBuilder(rpc::protocolForMessage(m)); - BSONObjBuilder errorBuilder; +// Maintains the state required to execute client commands, and provides the interface to construct +// a future-chain that runs the command against the database. +class ClientCommand final : public std::enable_shared_from_this<ClientCommand> { +public: + ClientCommand(ClientCommand&&) = delete; + ClientCommand(const ClientCommand&) = delete; - bool propagateException = false; + explicit ClientCommand(std::shared_ptr<RequestExecutionContext> rec) + : _rec(std::move(rec)), _errorBuilder(std::make_shared<BSONObjBuilder>()) {} - try { - // Parse. - OpMsgRequest request = [&] { - try { - return rpc::opMsgRequestFromAnyProtocol(m); - } catch (const DBException& ex) { - // If this error needs to fail the connection, propagate it out. - if (ErrorCodes::isConnectionFatalMessageParseError(ex.code())) - propagateException = true; - - LOGV2_DEBUG(22769, - 1, - "Exception thrown while parsing command {error}", - "Exception thrown while parsing command", - "error"_attr = redact(ex)); - throw; - } - }(); + // Returns the future-chain that produces the response by parsing and executing the command. + Future<DbResponse> run(); - // Execute. - std::string db = request.getDatabase().toString(); - try { - LOGV2_DEBUG(22770, - 3, - "Command begin db: {db} msg id: {headerId}", - "Command begin", - "db"_attr = db, - "headerId"_attr = m.header().getId()); - runCommand(opCtx, request, m, reply.get(), &errorBuilder); +private: + void _parse(); + + Future<void> _execute(); + + // Handler for exceptions thrown during parsing and executing the command. + Future<void> _handleException(Status); + + // Extracts the command response from the replyBuilder. + DbResponse _produceResponse(); + + const std::shared_ptr<RequestExecutionContext> _rec; + const std::shared_ptr<BSONObjBuilder> _errorBuilder; + + bool _propagateException = false; +}; + +void ClientCommand::_parse() try { + const auto& msg = _rec->getMessage(); + _rec->setReplyBuilder(rpc::makeReplyBuilder(rpc::protocolForMessage(msg))); + _rec->setRequest(rpc::opMsgRequestFromAnyProtocol(msg)); +} catch (const DBException& ex) { + // If this error needs to fail the connection, propagate it out. + if (ErrorCodes::isConnectionFatalMessageParseError(ex.code())) + _propagateException = true; + + LOGV2_DEBUG(22769, + 1, + "Exception thrown while parsing command {error}", + "Exception thrown while parsing command", + "error"_attr = redact(ex)); + throw; +} + +Future<void> ClientCommand::_execute() { + LOGV2_DEBUG(22770, + 3, + "Command begin db: {db} msg id: {headerId}", + "Command begin", + "db"_attr = _rec->getRequest().getDatabase().toString(), + "headerId"_attr = _rec->getMessage().header().getId()); + + return runCommand(_rec, _errorBuilder) + .then([this, anchor = shared_from_this()] { LOGV2_DEBUG(22771, 3, "Command end db: {db} msg id: {headerId}", "Command end", - "db"_attr = db, - "headerId"_attr = m.header().getId()); - } catch (const DBException& ex) { + "db"_attr = _rec->getRequest().getDatabase().toString(), + "headerId"_attr = _rec->getMessage().header().getId()); + }) + .tapError([this, anchor = shared_from_this()](Status status) { LOGV2_DEBUG( 22772, 1, "Exception thrown while processing command on {db} msg id: {headerId} {error}", "Exception thrown while processing command", - "db"_attr = db, - "headerId"_attr = m.header().getId(), - "error"_attr = redact(ex)); + "db"_attr = _rec->getRequest().getDatabase().toString(), + "headerId"_attr = _rec->getMessage().header().getId(), + "error"_attr = redact(status)); // Record the exception in CurOp. - CurOp::get(opCtx)->debug().errInfo = ex.toStatus(); - throw; - } - } catch (const DBException& ex) { - if (propagateException) { - throw; - } + CurOp::get(_rec->getOpCtx())->debug().errInfo = std::move(status); + }); +} - reply->reset(); - auto bob = reply->getBodyBuilder(); - CommandHelpers::appendCommandStatusNoThrow(bob, ex.toStatus()); - appendRequiredFieldsToResponse(opCtx, &bob); +Future<void> ClientCommand::_handleException(Status status) { + if (_propagateException) { + return status; + } - attachTopologyVersionDuringShutdown(opCtx, ex, &errorBuilder); - bob.appendElements(errorBuilder.obj()); + auto opCtx = _rec->getOpCtx(); + auto reply = _rec->getReplyBuilder(); + + reply->reset(); + auto bob = reply->getBodyBuilder(); + CommandHelpers::appendCommandStatusNoThrow(bob, status); + appendRequiredFieldsToResponse(opCtx, &bob); + + // Only attach the topology version to the response if mongos is in quiesce mode. If mongos is + // in quiesce mode, this shutdown error is due to mongos rather than a shard. + if (ErrorCodes::isA<ErrorCategory::ShutdownError>(status)) { + if (auto mongosTopCoord = MongosTopologyCoordinator::get(opCtx); + mongosTopCoord && mongosTopCoord->inQuiesceMode()) { + // Append the topology version to the response. + const auto topologyVersion = mongosTopCoord->getTopologyVersion(); + BSONObjBuilder topologyVersionBuilder(_errorBuilder->subobjStart("topologyVersion")); + topologyVersion.serialize(&topologyVersionBuilder); + } } + bob.appendElements(_errorBuilder->obj()); + return Status::OK(); +} + +DbResponse ClientCommand::_produceResponse() { + const auto& m = _rec->getMessage(); + auto reply = _rec->getReplyBuilder(); + if (OpMsg::isFlagSet(m, OpMsg::kMoreToCome)) { - return DbResponse{}; // Don't reply. + return {}; // Don't reply. } DbResponse dbResponse; @@ -1114,8 +1139,24 @@ Future<DbResponse> Strategy::clientCommand(std::shared_ptr<RequestExecutionConte dbResponse.response = reply->done(); return dbResponse; -} catch (const DBException& e) { - return e.toStatus(); +} + +Future<DbResponse> ClientCommand::run() { + auto pf = makePromiseFuture<void>(); + auto future = std::move(pf.future) + .then([this, anchor = shared_from_this()] { _parse(); }) + .then([this, anchor = shared_from_this()] { return _execute(); }) + .onError([this, anchor = shared_from_this()](Status status) { + return _handleException(std::move(status)); + }) + .then([this, anchor = shared_from_this()] { return _produceResponse(); }); + pf.promise.emplaceValue(); + return future; +} + +Future<DbResponse> Strategy::clientCommand(std::shared_ptr<RequestExecutionContext> rec) { + auto instance = std::make_shared<ClientCommand>(std::move(rec)); + return instance->run(); } DbResponse Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm) { @@ -1238,29 +1279,27 @@ void Strategy::killCursors(OperationContext* opCtx, DbMessage* dbm) { } } -void Strategy::writeOp(OperationContext* opCtx, DbMessage* dbm) { - const auto& msg = dbm->msg(); - rpc::OpMsgReplyBuilder reply; - BSONObjBuilder errorBuilder; - runCommand(opCtx, - [&]() { - switch (msg.operation()) { - case dbInsert: { - return InsertOp::parseLegacy(msg).serialize({}); - } - case dbUpdate: { - return UpdateOp::parseLegacy(msg).serialize({}); - } - case dbDelete: { - return DeleteOp::parseLegacy(msg).serialize({}); - } - default: - MONGO_UNREACHABLE; - } - }(), - msg, - &reply, - &errorBuilder); // built objects are ignored +void Strategy::writeOp(std::shared_ptr<RequestExecutionContext> rec) { + rec->setRequest([msg = rec->getMessage()]() { + switch (msg.operation()) { + case dbInsert: { + return InsertOp::parseLegacy(msg).serialize({}); + } + case dbUpdate: { + return UpdateOp::parseLegacy(msg).serialize({}); + } + case dbDelete: { + return DeleteOp::parseLegacy(msg).serialize({}); + } + default: + MONGO_UNREACHABLE; + } + }()); + + rec->setReplyBuilder(std::make_unique<rpc::OpMsgReplyBuilder>()); + runCommand(std::move(rec), + std::make_shared<BSONObjBuilder>()) // built objects are ignored + .get(); } void Strategy::explainFind(OperationContext* opCtx, diff --git a/src/mongo/s/commands/strategy.h b/src/mongo/s/commands/strategy.h index 85fafda1acb..95f3f8c9449 100644 --- a/src/mongo/s/commands/strategy.h +++ b/src/mongo/s/commands/strategy.h @@ -75,7 +75,7 @@ public: * with the result from the operation. Doesn't send any response back and does not throw on * errors. */ - static void writeOp(OperationContext* opCtx, DbMessage* dbm); + static void writeOp(std::shared_ptr<RequestExecutionContext> rec); /** * Executes a command from either OP_QUERY or OP_MSG wire protocols. diff --git a/src/mongo/s/service_entry_point_mongos.cpp b/src/mongo/s/service_entry_point_mongos.cpp index 870d58968aa..e731dc9589f 100644 --- a/src/mongo/s/service_entry_point_mongos.cpp +++ b/src/mongo/s/service_entry_point_mongos.cpp @@ -236,7 +236,7 @@ struct KillCursorsOpRunner final : public OpRunner { struct WriteOpRunner final : public OpRunner { using OpRunner::OpRunner; DbResponse runOperation() override { - Strategy::writeOp(hr->rec->getOpCtx(), &hr->rec->getDbMessage()); // No Response. + Strategy::writeOp(hr->rec); // No Response. return {}; } }; |