diff options
author | Billy Donahue <billy.donahue@mongodb.com> | 2020-08-25 21:42:02 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-04 08:19:48 +0000 |
commit | 77bee7548c4ea1ec635bd387cfb417204098c16f (patch) | |
tree | 921f194bb82c1701660149f2657ed60e438e1de6 | |
parent | 6ac9d725e9ef5e017f46a6d65c836286ca8f88d6 (diff) | |
download | mongo-77bee7548c4ea1ec635bd387cfb417204098c16f.tar.gz |
SERVER-50346 refactor `handleRequest` code fragments into a HandleRequest class
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 346 |
1 files changed, 228 insertions, 118 deletions
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index fb083bc6fa5..4ffb126939e 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -33,6 +33,8 @@ #include "mongo/db/service_entry_point_common.h" +#include <fmt/format.h> + #include "mongo/base/checked_cast.h" #include "mongo/bson/mutable/document.h" #include "mongo/bson/util/bson_extract.h" @@ -100,10 +102,6 @@ #include "mongo/util/fail_point.h" #include "mongo/util/scopeguard.h" -#include <fmt/format.h> - -using namespace fmt::literals; - namespace mongo { MONGO_FAIL_POINT_DEFINE(rsStopGetMore); @@ -128,6 +126,62 @@ ServerStatusMetricField<Counter64> displayNotMasterUnackWrites( namespace { +using namespace fmt::literals; + +/** Allows for the very complex handleRequest function to be decomposed into parts. */ +struct HandleRequest { + struct OpRunner { + explicit OpRunner(HandleRequest* hr) : hr{hr} {} + virtual ~OpRunner() = default; + virtual DbResponse run() = 0; + HandleRequest* hr; + }; + + HandleRequest(OperationContext* opCtx, + const Message& m, + const ServiceEntryPointCommon::Hooks& behaviors) + : opCtx{opCtx}, m{m}, behaviors{behaviors}, dbmsg{m} {} + + DbResponse run(); + + NetworkOp op() const { + return m.operation(); + } + + CurOp& currentOp() { + return *CurOp::get(opCtx); + } + + NamespaceString nsString() const { + if (!dbmsg.messageShouldHaveNs()) + return {}; + return NamespaceString(dbmsg.getns()); + } + + void assertValidNsString() { + if (!nsString().isValid()) { + uassert(16257, str::stream() << "Invalid ns [" << nsString().toString() << "]", false); + } + } + + std::unique_ptr<OpRunner> makeOpRunner(); + void startOperation(); + void completeOperation(const DbResponse& dbresponse); + + Client& client() const { + return *opCtx->getClient(); + } + + OperationContext* opCtx; + const Message& m; + const ServiceEntryPointCommon::Hooks& behaviors; + + DbMessage dbmsg; + + boost::optional<long long> slowMsOverride; + bool forceLog = false; +}; + void generateLegacyQueryErrorResponse(const AssertionException& exception, const QueryMessage& queryMessage, CurOp* curop, @@ -492,7 +546,6 @@ void appendErrorLabelsAndTopologyVersion(OperationContext* opCtx, topologyVersion.serialize(&topologyVersionBuilder); } -namespace { void _abortUnpreparedOrStashPreparedTransaction( OperationContext* opCtx, TransactionParticipant::Participant* txnParticipant) { const bool isPrepared = txnParticipant->transactionIsPrepared(); @@ -514,7 +567,6 @@ void _abortUnpreparedOrStashPreparedTransaction( std::terminate(); } } -} // namespace void invokeWithNoSession(OperationContext* opCtx, const OpMsgRequest& request, @@ -1666,148 +1718,188 @@ DbResponse receivedGetMore(OperationContext* opCtx, return dbresponse; } -} // namespace +struct CommandOpRunner : HandleRequest::OpRunner { + using HandleRequest::OpRunner::OpRunner; + DbResponse run() override { + DbResponse r = receivedCommands(hr->opCtx, hr->m, hr->behaviors); + // Hello should take kMaxAwaitTimeMs at most, log if it takes twice that. + if (auto command = hr->currentOp().getCommand(); + command && (command->getName() == "hello")) { + hr->slowMsOverride = + 2 * durationCount<Milliseconds>(SingleServerIsMasterMonitor::kMaxAwaitTime); + } + return r; + } +}; -BSONObj ServiceEntryPointCommon::getRedactedCopyForLogging(const Command* command, - const BSONObj& cmdObj) { - mutablebson::Document cmdToLog(cmdObj, mutablebson::Document::kInPlaceDisabled); - command->snipForLogging(&cmdToLog); - BSONObjBuilder bob; - cmdToLog.writeTo(&bob); - return bob.obj(); -} +struct QueryOpRunner : HandleRequest::OpRunner { + using HandleRequest::OpRunner::OpRunner; + DbResponse run() override { + hr->opCtx->markKillOnClientDisconnect(); + return receivedQuery( + hr->opCtx, hr->nsString(), *hr->opCtx->getClient(), hr->m, hr->behaviors); + } +}; -Future<DbResponse> ServiceEntryPointCommon::handleRequest(OperationContext* opCtx, - const Message& m, - const Hooks& behaviors) noexcept try { - // before we lock... - NetworkOp op = m.operation(); - bool isCommand = false; +struct GetMoreOpRunner : HandleRequest::OpRunner { + using HandleRequest::OpRunner::OpRunner; + DbResponse run() override { + return receivedGetMore(hr->opCtx, hr->m, hr->currentOp(), &hr->forceLog); + } +}; + +/** + * Fire and forget network operations don't produce a `DbResponse`. + * They override `runAndForget` instead of `run`, and this base + * class provides a `run` that calls it and handles error reporting + * via the `LastError` slot. + */ +struct FireAndForgetOpRunner : HandleRequest::OpRunner { + using HandleRequest::OpRunner::OpRunner; + virtual void runAndForget() = 0; + DbResponse run() final; +}; - DbMessage dbmsg(m); +struct KillCursorsOpRunner : FireAndForgetOpRunner { + using FireAndForgetOpRunner::FireAndForgetOpRunner; + void runAndForget() override { + hr->currentOp().ensureStarted(); + hr->slowMsOverride = 10; + receivedKillCursors(hr->opCtx, hr->m); + } +}; - Client& c = *opCtx->getClient(); +struct InsertOpRunner : FireAndForgetOpRunner { + using FireAndForgetOpRunner::FireAndForgetOpRunner; + void runAndForget() override { + hr->assertValidNsString(); + receivedInsert(hr->opCtx, hr->nsString(), hr->m); + } +}; - if (c.isInDirectClient()) { +struct UpdateOpRunner : FireAndForgetOpRunner { + using FireAndForgetOpRunner::FireAndForgetOpRunner; + void runAndForget() override { + hr->assertValidNsString(); + receivedUpdate(hr->opCtx, hr->nsString(), hr->m); + } +}; + +struct DeleteOpRunner : FireAndForgetOpRunner { + using FireAndForgetOpRunner::FireAndForgetOpRunner; + void runAndForget() override { + hr->assertValidNsString(); + receivedDelete(hr->opCtx, hr->nsString(), hr->m); + } +}; + +std::unique_ptr<HandleRequest::OpRunner> HandleRequest::makeOpRunner() { + switch (op()) { + case dbQuery: + if (!nsString().isCommand()) + return std::make_unique<QueryOpRunner>(this); + // FALLTHROUGH: it's a query containing a command + case dbMsg: + return std::make_unique<CommandOpRunner>(this); + case dbGetMore: + return std::make_unique<GetMoreOpRunner>(this); + case dbKillCursors: + return std::make_unique<KillCursorsOpRunner>(this); + case dbInsert: + return std::make_unique<InsertOpRunner>(this); + case dbUpdate: + return std::make_unique<UpdateOpRunner>(this); + case dbDelete: + return std::make_unique<DeleteOpRunner>(this); + default: + LOGV2(21968, + "Operation isn't supported: {operation}", + "Operation is not supported", + "operation"_attr = static_cast<int>(op())); + return nullptr; + } +} + +DbResponse FireAndForgetOpRunner::run() { + try { + runAndForget(); + } catch (const AssertionException& ue) { + LastError::get(hr->client()).setLastError(ue.code(), ue.reason()); + LOGV2_DEBUG(21969, + 3, + "Caught Assertion in {networkOp}, continuing: {error}", + "Assertion in fire-and-forget operation", + "networkOp"_attr = networkOpToString(hr->op()), + "error"_attr = redact(ue)); + hr->currentOp().debug().errInfo = ue.toStatus(); + } + // A NotWritablePrimary error can be set either within + // receivedInsert/receivedUpdate/receivedDelete or within the AssertionException handler above. + // Either way, we want to throw an exception here, which will cause the client to be + // disconnected. + if (LastError::get(hr->client()).hadNotMasterError()) { + notMasterLegacyUnackWrites.increment(); + uasserted(ErrorCodes::NotWritablePrimary, + str::stream() << "Not-master error while processing '" + << networkOpToString(hr->op()) << "' operation on '" + << hr->nsString() << "' namespace via legacy " + << "fire-and-forget command execution."); + } + return {}; +} + +void HandleRequest::startOperation() { + if (client().isInDirectClient()) { if (!opCtx->getLogicalSessionId() || !opCtx->getTxnNumber()) { invariant(!opCtx->inMultiDocumentTransaction() && !opCtx->lockState()->inAWriteUnitOfWork()); } } else { - LastError::get(c).startRequest(); - AuthorizationSession::get(c)->startRequest(opCtx); + LastError::get(client()).startRequest(); + AuthorizationSession::get(client())->startRequest(opCtx); // We should not be holding any locks at this point invariant(!opCtx->lockState()->isLocked()); } - - const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : nullptr; - const NamespaceString nsString = ns ? NamespaceString(ns) : NamespaceString(); - - if (op == dbQuery) { - if (nsString.isCommand()) { - isCommand = true; - } - } else if (op == dbMsg) { - isCommand = true; - } - - CurOp& currentOp = *CurOp::get(opCtx); { - stdx::lock_guard<Client> lk(*opCtx->getClient()); + stdx::lock_guard<Client> lk(client()); // Commands handling code will reset this if the operation is a command // which is logically a basic CRUD operation like query, insert, etc. - currentOp.setNetworkOp_inlock(op); - currentOp.setLogicalOp_inlock(networkOpToLogicalOp(op)); + currentOp().setNetworkOp_inlock(op()); + currentOp().setLogicalOp_inlock(networkOpToLogicalOp(op())); } +} - OpDebug& debug = currentOp.debug(); - - boost::optional<long long> slowMsOverride; - bool forceLog = false; - +DbResponse HandleRequest::run() { + startOperation(); DbResponse dbresponse; - if (op == dbMsg || (op == dbQuery && isCommand)) { - dbresponse = receivedCommands(opCtx, m, behaviors); - // Hello should take kMaxAwaitTimeMs at most, log if it takes twice that. - if (auto command = currentOp.getCommand(); command && (command->getName() == "hello")) { - slowMsOverride = - 2 * durationCount<Milliseconds>(SingleServerIsMasterMonitor::kMaxAwaitTime); - } - } else if (op == dbQuery) { - invariant(!isCommand); - opCtx->markKillOnClientDisconnect(); - - dbresponse = receivedQuery(opCtx, nsString, c, m, behaviors); - } else if (op == dbGetMore) { - dbresponse = receivedGetMore(opCtx, m, currentOp, &forceLog); + if (auto opRunner = makeOpRunner()) { + dbresponse = opRunner->run(); } else { - // The remaining operations do not return any response. They are fire-and-forget. - try { - if (op == dbKillCursors) { - currentOp.ensureStarted(); - slowMsOverride = 10; - receivedKillCursors(opCtx, m); - } else if (op != dbInsert && op != dbUpdate && op != dbDelete) { - LOGV2(21968, - "Operation isn't supported: {operation}", - "Operation is not supported", - "operation"_attr = static_cast<int>(op)); - currentOp.done(); - forceLog = true; - } else { - if (!nsString.isValid()) { - uassert(16257, str::stream() << "Invalid ns [" << ns << "]", false); - } else if (op == dbInsert) { - receivedInsert(opCtx, nsString, m); - } else if (op == dbUpdate) { - receivedUpdate(opCtx, nsString, m); - } else if (op == dbDelete) { - receivedDelete(opCtx, nsString, m); - } else { - MONGO_UNREACHABLE; - } - } - } catch (const AssertionException& ue) { - LastError::get(c).setLastError(ue.code(), ue.reason()); - LOGV2_DEBUG(21969, - 3, - "Caught Assertion in {networkOp}, continuing: {error}", - "Assertion in fire-and-forget operation", - "networkOp"_attr = networkOpToString(op), - "error"_attr = redact(ue)); - debug.errInfo = ue.toStatus(); - } - // A NotWritablePrimary error can be set either within - // receivedInsert/receivedUpdate/receivedDelete or within the AssertionException handler - // above. Either way, we want to throw an exception here, which will cause the client to be - // disconnected. - if (LastError::get(opCtx->getClient()).hadNotMasterError()) { - notMasterLegacyUnackWrites.increment(); - uasserted(ErrorCodes::NotWritablePrimary, - str::stream() - << "Not-master error while processing '" << networkOpToString(op) - << "' operation on '" << nsString << "' namespace via legacy " - << "fire-and-forget command execution."); - } + currentOp().done(); + forceLog = true; } + completeOperation(dbresponse); + return dbresponse; +} +void HandleRequest::completeOperation(const DbResponse& dbresponse) { // Mark the op as complete, and log it if appropriate. Returns a boolean indicating whether // this op should be written to the profiler. - const bool shouldProfile = currentOp.completeAndLogOperation( + const bool shouldProfile = currentOp().completeAndLogOperation( opCtx, MONGO_LOGV2_DEFAULT_COMPONENT, dbresponse.response.size(), slowMsOverride, forceLog); Top::get(opCtx->getServiceContext()) .incrementGlobalLatencyStats( opCtx, - durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses()), - currentOp.getReadWriteType()); + durationCount<Microseconds>(currentOp().elapsedTimeExcludingPauses()), + currentOp().getReadWriteType()); if (shouldProfile) { // Performance profiling is on if (opCtx->lockState()->isReadLocked()) { LOGV2_DEBUG(21970, 1, "Note: not profiling because of recursive read lock"); - } else if (c.isInDirectClient()) { + } else if (client().isInDirectClient()) { LOGV2_DEBUG(21971, 1, "Note: not profiling because we are in DBDirectClient"); } else if (behaviors.lockedForWriting()) { // TODO SERVER-26825: Fix race condition where fsyncLock is acquired post @@ -1817,15 +1909,33 @@ Future<DbResponse> ServiceEntryPointCommon::handleRequest(OperationContext* opCt LOGV2_DEBUG(21973, 1, "Note: not profiling because server is read-only"); } else { invariant(!opCtx->lockState()->inAWriteUnitOfWork()); - profile(opCtx, op); + profile(opCtx, op()); } } recordCurOpMetrics(opCtx); - return Future<DbResponse>::makeReady(std::move(dbresponse)); -} catch (const DBException& e) { - LOGV2_ERROR(4879802, "Failed to handle request", "error"_attr = redact(e)); - return e.toStatus(); +} + +} // namespace + +BSONObj ServiceEntryPointCommon::getRedactedCopyForLogging(const Command* command, + const BSONObj& cmdObj) { + mutablebson::Document cmdToLog(cmdObj, mutablebson::Document::kInPlaceDisabled); + command->snipForLogging(&cmdToLog); + BSONObjBuilder bob; + cmdToLog.writeTo(&bob); + return bob.obj(); +} + +Future<DbResponse> ServiceEntryPointCommon::handleRequest(OperationContext* opCtx, + const Message& m, + const Hooks& behaviors) noexcept { + try { + return Future<DbResponse>::makeReady(HandleRequest{opCtx, m, behaviors}.run()); + } catch (DBException& e) { + LOGV2_ERROR(4879802, "Failed to handle request", "error"_attr = redact(e)); + return e.toStatus(); + } } ServiceEntryPointCommon::Hooks::~Hooks() = default; |