diff options
author | Andrew Chen <a.chen@mongodb.com> | 2020-07-01 15:48:23 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-07-30 14:48:04 +0000 |
commit | 051bd6e51b33d7fd8a1265e166efffd20178a86d (patch) | |
tree | 6a2b92974c8e1cd148e0f085d0091ea0942e21f5 /src | |
parent | 41fda13f7239308239ade7f143f9b5e2fef0e87b (diff) | |
download | mongo-051bd6e51b33d7fd8a1265e166efffd20178a86d.tar.gz |
SERVER-48978 Futurize handleRequest
Diffstat (limited to 'src')
18 files changed, 122 insertions, 83 deletions
diff --git a/src/mongo/db/dbdirectclient.cpp b/src/mongo/db/dbdirectclient.cpp index ee58c8c31cb..5386bf567d2 100644 --- a/src/mongo/db/dbdirectclient.cpp +++ b/src/mongo/db/dbdirectclient.cpp @@ -143,7 +143,7 @@ DbResponse loopbackBuildResponse(OperationContext* const opCtx, toSend.header().setId(nextMessageId()); toSend.header().setResponseToMsgId(0); - return opCtx->getServiceContext()->getServiceEntryPoint()->handleRequest(opCtx, toSend); + return opCtx->getServiceContext()->getServiceEntryPoint()->handleRequest(opCtx, toSend).get(); } } // namespace diff --git a/src/mongo/db/op_msg_fuzzer.cpp b/src/mongo/db/op_msg_fuzzer.cpp index 117027fc69b..9de41cd26ea 100644 --- a/src/mongo/db/op_msg_fuzzer.cpp +++ b/src/mongo/db/op_msg_fuzzer.cpp @@ -109,7 +109,7 @@ extern "C" int LLVMFuzzerTestOneInput(const char* Data, size_t Size) { mongo::Message msg(std::move(sb)); try { - serviceContext->getServiceEntryPoint()->handleRequest(opCtx.get(), msg); + serviceContext->getServiceEntryPoint()->handleRequest(opCtx.get(), msg).get(); } catch (const mongo::AssertionException&) { // We need to catch exceptions caused by invalid inputs } diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp index 2eba9258aa3..083015b10a9 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp @@ -177,6 +177,7 @@ Status updateConfigDocumentInTxn(OperationContext* opCtx, ->getServiceEntryPoint() ->handleRequest(opCtx, OpMsgRequest::fromDBAndBody(nss.db().toString(), cmdObj).serialize()) + .get() .response); return getStatusFromCommandResult(replyOpMsg.body); @@ -221,6 +222,7 @@ Status commitTxnForConfigDocument(OperationContext* opCtx, TxnNumber txnNumber) OpMsgRequest::fromDBAndBody( NamespaceString::kAdminDb.toString(), cmdObj) .serialize()) + .get() .response); return getStatusFromCommandResult(replyOpMsg.body); diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.cpp b/src/mongo/db/s/transaction_coordinator_futures_util.cpp index 4295c8ace08..3b43c5a4486 100644 --- a/src/mongo/db/s/transaction_coordinator_futures_util.cpp +++ b/src/mongo/db/s/transaction_coordinator_futures_util.cpp @@ -106,7 +106,7 @@ Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemot auto requestOpMsg = OpMsgRequest::fromDBAndBody(NamespaceString::kAdminDb, commandObj).serialize(); const auto replyOpMsg = OpMsg::parseOwned( - service->getServiceEntryPoint()->handleRequest(opCtx, requestOpMsg).response); + service->getServiceEntryPoint()->handleRequest(opCtx, requestOpMsg).get().response); // Document sequences are not yet being used for responses. invariant(replyOpMsg.sequences.empty()); diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index d1554581e13..50b4d714d21 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -1645,9 +1645,9 @@ BSONObj ServiceEntryPointCommon::getRedactedCopyForLogging(const Command* comman return bob.obj(); } -DbResponse ServiceEntryPointCommon::handleRequest(OperationContext* opCtx, - const Message& m, - const Hooks& behaviors) { +Future<DbResponse> ServiceEntryPointCommon::handleRequest(OperationContext* opCtx, + const Message& m, + const Hooks& behaviors) noexcept try { // before we lock... NetworkOp op = m.operation(); bool isCommand = false; @@ -1790,7 +1790,10 @@ DbResponse ServiceEntryPointCommon::handleRequest(OperationContext* opCtx, } recordCurOpMetrics(opCtx); - return dbresponse; + return Future<DbResponse>::makeReady(std::move(dbresponse)); +} catch (const DBException& e) { + LOGV2_ERROR(4879802, "Failed to handle request", "error"_attr = redact(e)); + return e.toStatus(); } ServiceEntryPointCommon::Hooks::~Hooks() = default; diff --git a/src/mongo/db/service_entry_point_common.h b/src/mongo/db/service_entry_point_common.h index 2fa1412c3b2..c122963bd44 100644 --- a/src/mongo/db/service_entry_point_common.h +++ b/src/mongo/db/service_entry_point_common.h @@ -35,6 +35,7 @@ #include "mongo/db/operation_context.h" #include "mongo/rpc/message.h" #include "mongo/util/fail_point.h" +#include "mongo/util/future.h" #include "mongo/util/polymorphic_scoped.h" namespace mongo { @@ -98,7 +99,9 @@ struct ServiceEntryPointCommon { BSONObjBuilder* metadataBob) const = 0; }; - static DbResponse handleRequest(OperationContext* opCtx, const Message& m, const Hooks& hooks); + static Future<DbResponse> handleRequest(OperationContext* opCtx, + const Message& m, + const Hooks& hooks) noexcept; /** * Produce a new object based on cmdObj, but with redactions applied as specified by diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp index b2b5bff551d..75b28b6595a 100644 --- a/src/mongo/db/service_entry_point_mongod.cpp +++ b/src/mongo/db/service_entry_point_mongod.cpp @@ -269,7 +269,8 @@ public: } }; -DbResponse ServiceEntryPointMongod::handleRequest(OperationContext* opCtx, const Message& m) { +Future<DbResponse> ServiceEntryPointMongod::handleRequest(OperationContext* opCtx, + const Message& m) noexcept { return ServiceEntryPointCommon::handleRequest(opCtx, m, Hooks{}); } diff --git a/src/mongo/db/service_entry_point_mongod.h b/src/mongo/db/service_entry_point_mongod.h index 57946da4d0e..a4fc691d408 100644 --- a/src/mongo/db/service_entry_point_mongod.h +++ b/src/mongo/db/service_entry_point_mongod.h @@ -42,7 +42,8 @@ class ServiceEntryPointMongod final : public ServiceEntryPointImpl { public: using ServiceEntryPointImpl::ServiceEntryPointImpl; - DbResponse handleRequest(OperationContext* opCtx, const Message& request) override; + Future<DbResponse> handleRequest(OperationContext* opCtx, + const Message& request) noexcept override; private: class Hooks; diff --git a/src/mongo/embedded/mongo_embedded/mongo_embedded.cpp b/src/mongo/embedded/mongo_embedded/mongo_embedded.cpp index d5ac9ce4bb8..852a503c56f 100644 --- a/src/mongo/embedded/mongo_embedded/mongo_embedded.cpp +++ b/src/mongo/embedded/mongo_embedded/mongo_embedded.cpp @@ -426,7 +426,7 @@ void client_wire_protocol_rpc(mongo_embedded_v1_client* const client, Message msg(std::move(sb)); - client->response = sep->handleRequest(opCtx.get(), msg); + client->response = sep->handleRequest(opCtx.get(), msg).get(); // Note that we skip OP_MSG's optional checksum for embedded. MsgData::View outMessage(client->response.response.buf()); diff --git a/src/mongo/embedded/service_entry_point_embedded.cpp b/src/mongo/embedded/service_entry_point_embedded.cpp index 7d14fbfe393..1dbf1a52c95 100644 --- a/src/mongo/embedded/service_entry_point_embedded.cpp +++ b/src/mongo/embedded/service_entry_point_embedded.cpp @@ -110,7 +110,8 @@ public: BSONObjBuilder* metadataBob) const override {} }; -DbResponse ServiceEntryPointEmbedded::handleRequest(OperationContext* opCtx, const Message& m) { +Future<DbResponse> ServiceEntryPointEmbedded::handleRequest(OperationContext* opCtx, + const Message& m) noexcept { // Only one thread will pump at a time and concurrent calls to this will skip the pumping and go // directly to handleRequest. This means that the jobs in the periodic runner can't provide any // guarantees of the state (that they have run). diff --git a/src/mongo/embedded/service_entry_point_embedded.h b/src/mongo/embedded/service_entry_point_embedded.h index 0a47fd6bd56..bfa06dd672c 100644 --- a/src/mongo/embedded/service_entry_point_embedded.h +++ b/src/mongo/embedded/service_entry_point_embedded.h @@ -42,7 +42,8 @@ class ServiceEntryPointEmbedded final : public ServiceEntryPoint { public: ServiceEntryPointEmbedded() = default; - DbResponse handleRequest(OperationContext* opCtx, const Message& request) override; + Future<DbResponse> handleRequest(OperationContext* opCtx, + const Message& request) noexcept override; void startSession(transport::SessionHandle session) override; void endAllSessions(transport::Session::TagMask tags) override; diff --git a/src/mongo/s/service_entry_point_mongos.cpp b/src/mongo/s/service_entry_point_mongos.cpp index 8db801bc4f4..5df65b3e718 100644 --- a/src/mongo/s/service_entry_point_mongos.cpp +++ b/src/mongo/s/service_entry_point_mongos.cpp @@ -60,7 +60,8 @@ BSONObj buildErrReply(const DBException& ex) { } // namespace -DbResponse ServiceEntryPointMongos::handleRequest(OperationContext* opCtx, const Message& message) { +Future<DbResponse> ServiceEntryPointMongos::handleRequest(OperationContext* opCtx, + const Message& message) noexcept try { const int32_t msgId = message.header().getId(); const NetworkOp op = message.operation(); @@ -94,7 +95,7 @@ DbResponse ServiceEntryPointMongos::handleRequest(OperationContext* opCtx, const CurOp::get(opCtx)->completeAndLogOperation( opCtx, logv2::LogComponent::kCommand, dbResponse.response.size()); - return dbResponse; + return Future<DbResponse>::makeReady(std::move(dbResponse)); } NamespaceString nss; @@ -179,7 +180,10 @@ DbResponse ServiceEntryPointMongos::handleRequest(OperationContext* opCtx, const CurOp::get(opCtx)->completeAndLogOperation( opCtx, logv2::LogComponent::kCommand, dbResponse.response.size()); - return dbResponse; + return Future<DbResponse>::makeReady(std::move(dbResponse)); +} catch (const DBException& e) { + LOGV2(4879803, "Failed to handle request", "error"_attr = redact(e)); + return e.toStatus(); } } // namespace mongo diff --git a/src/mongo/s/service_entry_point_mongos.h b/src/mongo/s/service_entry_point_mongos.h index 085b5991d5c..17d99fa3a01 100644 --- a/src/mongo/s/service_entry_point_mongos.h +++ b/src/mongo/s/service_entry_point_mongos.h @@ -44,7 +44,8 @@ class ServiceEntryPointMongos final : public ServiceEntryPointImpl { public: using ServiceEntryPointImpl::ServiceEntryPointImpl; - DbResponse handleRequest(OperationContext* opCtx, const Message& request) override; + Future<DbResponse> handleRequest(OperationContext* opCtx, + const Message& request) noexcept override; }; } // namespace mongo diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp index e377fae9be8..986114093ed 100644 --- a/src/mongo/tools/bridge.cpp +++ b/src/mongo/tools/bridge.cpp @@ -56,6 +56,7 @@ #include "mongo/transport/transport_layer_asio.h" #include "mongo/util/assert_util.h" #include "mongo/util/exit.h" +#include "mongo/util/future.h" #include "mongo/util/quick_exit.h" #include "mongo/util/signal_handlers.h" #include "mongo/util/str.h" @@ -248,10 +249,12 @@ class ServiceEntryPointBridge final : public ServiceEntryPointImpl { public: explicit ServiceEntryPointBridge(ServiceContext* svcCtx) : ServiceEntryPointImpl(svcCtx) {} - DbResponse handleRequest(OperationContext* opCtx, const Message& request) final; + Future<DbResponse> handleRequest(OperationContext* opCtx, + const Message& request) noexcept final; }; -DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const Message& request) { +Future<DbResponse> ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, + const Message& request) noexcept try { if (request.operation() == dbQuery) { DbMessage d(request); QueryMessage q(d); @@ -344,7 +347,8 @@ DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const if (!status->isOK()) { commandReply = StatusWith<BSONObj>(*status); } - return {replyBuilder->setCommandReply(std::move(commandReply)).done()}; + return Future<DbResponse>::makeReady( + {replyBuilder->setCommandReply(std::move(commandReply)).done()}); } @@ -361,7 +365,7 @@ DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const "remote"_attr = dest, "source"_attr = source->remote().toString()); source->end(); - return {Message()}; + return Future<DbResponse>::makeReady({Message()}); // Forward the message to 'dest' with probability '1 - hostSettings.loss'. case HostSettings::State::kDiscard: if (dest.nextCanonicalDouble() < hostSettings.loss) { @@ -381,7 +385,7 @@ DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const "operation"_attr = networkOpToString(request.operation()), "hostName"_attr = hostName); } - return {Message()}; + return Future<DbResponse>::makeReady({Message()}); } // Forward the message to 'dest' after waiting for 'hostSettings.delay' // milliseconds. @@ -429,7 +433,7 @@ DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const "remote"_attr = dest, "source"_attr = source->remote()); source->end(); - return {Message()}; + return Future<DbResponse>::makeReady({Message()}); } // Only support OP_MSG exhaust cursors. @@ -447,10 +451,13 @@ DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const // whether this should be run again to receive more responses from the exhaust stream. // We do not need to set 'nextInvocation' in the DbResponse because mongobridge // only receives responses but ignores the next request if it is in exhaust mode. - return {std::move(response), isExhaust}; + return Future<DbResponse>::makeReady({std::move(response), isExhaust}); } else { - return {Message()}; + return Future<DbResponse>::makeReady({Message()}); } +} catch (const DBException& e) { + LOGV2_ERROR(4879804, "Failed to handle request", "error"_attr = redact(e)); + return e.toStatus(); } int bridgeMain(int argc, char** argv) { diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h index 2c3ded7849a..aa970f1ef67 100644 --- a/src/mongo/transport/service_entry_point.h +++ b/src/mongo/transport/service_entry_point.h @@ -32,6 +32,7 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/dbmessage.h" #include "mongo/transport/session.h" +#include "mongo/util/future.h" namespace mongo { @@ -82,7 +83,8 @@ public: /** * Processes a request and fills out a DbResponse. */ - virtual DbResponse handleRequest(OperationContext* opCtx, const Message& request) = 0; + virtual Future<DbResponse> handleRequest(OperationContext* opCtx, + const Message& request) noexcept = 0; protected: ServiceEntryPoint() = default; diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 5fbff04a151..373a362d0df 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -467,61 +467,68 @@ void ServiceStateMachine::_processMessage(ThreadGuard guard) { // The handleRequest is implemented in a subclass for mongod/mongos and actually all the // database work for this request. - DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage); - - // opCtx must be killed and delisted here so that the operation cannot show up in currentOp - // results after the response reaches the client. The destruction is postponed for later to - // mitigate its performance impact on the critical path of execution. - _serviceContext->killAndDelistOperation(opCtx.get(), ErrorCodes::OperationIsKilledAndDelisted); - invariant(!_killedOpCtx); - _killedOpCtx = std::move(opCtx); - - // Format our response, if we have one - Message& toSink = dbresponse.response; - if (!toSink.empty()) { - invariant(!OpMsg::isFlagSet(_inMessage, OpMsg::kMoreToCome)); - invariant(!OpMsg::isFlagSet(toSink, OpMsg::kChecksumPresent)); - - // Update the header for the response message. - toSink.header().setId(nextMessageId()); - toSink.header().setResponseToMsgId(_inMessage.header().getId()); - if (OpMsg::isFlagSet(_inMessage, OpMsg::kChecksumPresent)) { + _sep->handleRequest(opCtx.get(), _inMessage) + .then([this, + &compressorMgr = compressorMgr, + opCtx = std::move(opCtx), + guard = std::move(guard)](DbResponse dbresponse) mutable -> void { + // opCtx must be killed and delisted here so that the operation cannot show up in + // currentOp results after the response reaches the client. The destruction is postponed + // for later to mitigate its performance impact on the critical path of execution. + _serviceContext->killAndDelistOperation(opCtx.get(), + ErrorCodes::OperationIsKilledAndDelisted); + invariant(!_killedOpCtx); + _killedOpCtx = std::move(opCtx); + + // Format our response, if we have one + Message& toSink = dbresponse.response; + if (!toSink.empty()) { + invariant(!OpMsg::isFlagSet(_inMessage, OpMsg::kMoreToCome)); + invariant(!OpMsg::isFlagSet(toSink, OpMsg::kChecksumPresent)); + + // Update the header for the response message. + toSink.header().setId(nextMessageId()); + toSink.header().setResponseToMsgId(_inMessage.header().getId()); + if (OpMsg::isFlagSet(_inMessage, OpMsg::kChecksumPresent)) { #ifdef MONGO_CONFIG_SSL - if (!SSLPeerInfo::forSession(_session()).isTLS) { - OpMsg::appendChecksum(&toSink); - } + if (!SSLPeerInfo::forSession(_session()).isTLS) { + OpMsg::appendChecksum(&toSink); + } #else - OpMsg::appendChecksum(&toSink); + OpMsg::appendChecksum(&toSink); #endif - } - - // If the incoming message has the exhaust flag set, then we bypass the normal RPC behavior. - // We will sink the response to the network, but we also synthesize a new request, as if we - // sourced a new message from the network. This new request is sent to the database once - // again to be processed. This cycle repeats as long as the command indicates the exhaust - // stream should continue. - _inMessage = makeExhaustMessage(_inMessage, &dbresponse); - _inExhaust = !_inMessage.empty(); - - networkCounter.hitLogicalOut(toSink.size()); - - if (_compressorId) { - auto swm = compressorMgr.compressMessage(toSink, &_compressorId.value()); - uassertStatusOK(swm.getStatus()); - toSink = swm.getValue(); - } - - TrafficRecorder::get(_serviceContext) - .observe(_sessionHandle, _serviceContext->getPreciseClockSource()->now(), toSink); - - _sinkMessage(std::move(guard), std::move(toSink)); - - } else { - _state.store(State::Source); - _inMessage.reset(); - _inExhaust = false; - return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kDeferredTask); - } + } + + // If the incoming message has the exhaust flag set, then we bypass the normal RPC + // behavior. We will sink the response to the network, but we also synthesize a new + // request, as if we sourced a new message from the network. This new request is + // sent to the database once again to be processed. This cycle repeats as long as + // the command indicates the exhaust stream should continue. + _inMessage = makeExhaustMessage(_inMessage, &dbresponse); + _inExhaust = !_inMessage.empty(); + + networkCounter.hitLogicalOut(toSink.size()); + + if (_compressorId) { + auto swm = compressorMgr.compressMessage(toSink, &_compressorId.value()); + uassertStatusOK(swm.getStatus()); + toSink = swm.getValue(); + } + + TrafficRecorder::get(_serviceContext) + .observe( + _sessionHandle, _serviceContext->getPreciseClockSource()->now(), toSink); + + _sinkMessage(std::move(guard), std::move(toSink)); + + } else { + _state.store(State::Source); + _inMessage.reset(); + _inExhaust = false; + return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kDeferredTask); + } + }) + .get(); } void ServiceStateMachine::runNext() { @@ -668,7 +675,7 @@ void ServiceStateMachine::_cleanupExhaustResources() noexcept try { // Fire and forget. This is a best effort attempt to immediately clean up the exhaust // cursor. If the killCursors request fails here for any reasons, it will still be // cleaned up once the cursor times out. - _sep->handleRequest(opCtx.get(), makeKillCursorsMessage(cursorId)); + _sep->handleRequest(opCtx.get(), makeKillCursorsMessage(cursorId)).get(); } } catch (const DBException& e) { LOGV2(22992, diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp index b5aceac7ea5..02669106853 100644 --- a/src/mongo/transport/service_state_machine_test.cpp +++ b/src/mongo/transport/service_state_machine_test.cpp @@ -72,7 +72,8 @@ public: void startSession(transport::SessionHandle session) override {} - DbResponse handleRequest(OperationContext* opCtx, const Message& request) override { + Future<DbResponse> handleRequest(OperationContext* opCtx, + const Message& request) noexcept override try { LOGV2(22994, "In handleRequest"); _ranHandler = true; ASSERT_TRUE(haveClient()); @@ -98,7 +99,10 @@ public: } dbResponse.response = res; - return dbResponse; + return Future<DbResponse>::makeReady(std::move(dbResponse)); + } catch (const DBException& e) { + LOGV2_ERROR(4879805, "Failed to handle request", "error"_attr = redact(e)); + return e.toStatus(); } void endAllSessions(transport::Session::TagMask tags) override {} diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp index 660f14852f3..a7938e031ba 100644 --- a/src/mongo/transport/transport_layer_asio_test.cpp +++ b/src/mongo/transport/transport_layer_asio_test.cpp @@ -79,7 +79,8 @@ public: return _sessions.size(); } - DbResponse handleRequest(OperationContext* opCtx, const Message& request) override { + Future<DbResponse> handleRequest(OperationContext* opCtx, + const Message& request) noexcept override { MONGO_UNREACHABLE; } @@ -191,7 +192,8 @@ public: return 0; } - DbResponse handleRequest(OperationContext* opCtx, const Message& request) override { + Future<DbResponse> handleRequest(OperationContext* opCtx, + const Message& request) noexcept override { MONGO_UNREACHABLE; } |