summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBilly Donahue <billy.donahue@mongodb.com>2020-08-25 21:42:02 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-04 08:19:48 +0000
commit77bee7548c4ea1ec635bd387cfb417204098c16f (patch)
tree921f194bb82c1701660149f2657ed60e438e1de6
parent6ac9d725e9ef5e017f46a6d65c836286ca8f88d6 (diff)
downloadmongo-77bee7548c4ea1ec635bd387cfb417204098c16f.tar.gz
SERVER-50346 refactor `handleRequest` code fragments into a HandleRequest class
-rw-r--r--src/mongo/db/service_entry_point_common.cpp346
1 files changed, 228 insertions, 118 deletions
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index fb083bc6fa5..4ffb126939e 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -33,6 +33,8 @@
#include "mongo/db/service_entry_point_common.h"
+#include <fmt/format.h>
+
#include "mongo/base/checked_cast.h"
#include "mongo/bson/mutable/document.h"
#include "mongo/bson/util/bson_extract.h"
@@ -100,10 +102,6 @@
#include "mongo/util/fail_point.h"
#include "mongo/util/scopeguard.h"
-#include <fmt/format.h>
-
-using namespace fmt::literals;
-
namespace mongo {
MONGO_FAIL_POINT_DEFINE(rsStopGetMore);
@@ -128,6 +126,62 @@ ServerStatusMetricField<Counter64> displayNotMasterUnackWrites(
namespace {
+using namespace fmt::literals;
+
+/** Allows for the very complex handleRequest function to be decomposed into parts. */
+struct HandleRequest {
+ struct OpRunner {
+ explicit OpRunner(HandleRequest* hr) : hr{hr} {}
+ virtual ~OpRunner() = default;
+ virtual DbResponse run() = 0;
+ HandleRequest* hr;
+ };
+
+ HandleRequest(OperationContext* opCtx,
+ const Message& m,
+ const ServiceEntryPointCommon::Hooks& behaviors)
+ : opCtx{opCtx}, m{m}, behaviors{behaviors}, dbmsg{m} {}
+
+ DbResponse run();
+
+ NetworkOp op() const {
+ return m.operation();
+ }
+
+ CurOp& currentOp() {
+ return *CurOp::get(opCtx);
+ }
+
+ NamespaceString nsString() const {
+ if (!dbmsg.messageShouldHaveNs())
+ return {};
+ return NamespaceString(dbmsg.getns());
+ }
+
+ void assertValidNsString() {
+ if (!nsString().isValid()) {
+ uassert(16257, str::stream() << "Invalid ns [" << nsString().toString() << "]", false);
+ }
+ }
+
+ std::unique_ptr<OpRunner> makeOpRunner();
+ void startOperation();
+ void completeOperation(const DbResponse& dbresponse);
+
+ Client& client() const {
+ return *opCtx->getClient();
+ }
+
+ OperationContext* opCtx;
+ const Message& m;
+ const ServiceEntryPointCommon::Hooks& behaviors;
+
+ DbMessage dbmsg;
+
+ boost::optional<long long> slowMsOverride;
+ bool forceLog = false;
+};
+
void generateLegacyQueryErrorResponse(const AssertionException& exception,
const QueryMessage& queryMessage,
CurOp* curop,
@@ -492,7 +546,6 @@ void appendErrorLabelsAndTopologyVersion(OperationContext* opCtx,
topologyVersion.serialize(&topologyVersionBuilder);
}
-namespace {
void _abortUnpreparedOrStashPreparedTransaction(
OperationContext* opCtx, TransactionParticipant::Participant* txnParticipant) {
const bool isPrepared = txnParticipant->transactionIsPrepared();
@@ -514,7 +567,6 @@ void _abortUnpreparedOrStashPreparedTransaction(
std::terminate();
}
}
-} // namespace
void invokeWithNoSession(OperationContext* opCtx,
const OpMsgRequest& request,
@@ -1666,148 +1718,188 @@ DbResponse receivedGetMore(OperationContext* opCtx,
return dbresponse;
}
-} // namespace
+struct CommandOpRunner : HandleRequest::OpRunner {
+ using HandleRequest::OpRunner::OpRunner;
+ DbResponse run() override {
+ DbResponse r = receivedCommands(hr->opCtx, hr->m, hr->behaviors);
+ // Hello should take kMaxAwaitTimeMs at most, log if it takes twice that.
+ if (auto command = hr->currentOp().getCommand();
+ command && (command->getName() == "hello")) {
+ hr->slowMsOverride =
+ 2 * durationCount<Milliseconds>(SingleServerIsMasterMonitor::kMaxAwaitTime);
+ }
+ return r;
+ }
+};
-BSONObj ServiceEntryPointCommon::getRedactedCopyForLogging(const Command* command,
- const BSONObj& cmdObj) {
- mutablebson::Document cmdToLog(cmdObj, mutablebson::Document::kInPlaceDisabled);
- command->snipForLogging(&cmdToLog);
- BSONObjBuilder bob;
- cmdToLog.writeTo(&bob);
- return bob.obj();
-}
+struct QueryOpRunner : HandleRequest::OpRunner {
+ using HandleRequest::OpRunner::OpRunner;
+ DbResponse run() override {
+ hr->opCtx->markKillOnClientDisconnect();
+ return receivedQuery(
+ hr->opCtx, hr->nsString(), *hr->opCtx->getClient(), hr->m, hr->behaviors);
+ }
+};
-Future<DbResponse> ServiceEntryPointCommon::handleRequest(OperationContext* opCtx,
- const Message& m,
- const Hooks& behaviors) noexcept try {
- // before we lock...
- NetworkOp op = m.operation();
- bool isCommand = false;
+struct GetMoreOpRunner : HandleRequest::OpRunner {
+ using HandleRequest::OpRunner::OpRunner;
+ DbResponse run() override {
+ return receivedGetMore(hr->opCtx, hr->m, hr->currentOp(), &hr->forceLog);
+ }
+};
+
+/**
+ * Fire and forget network operations don't produce a `DbResponse`.
+ * They override `runAndForget` instead of `run`, and this base
+ * class provides a `run` that calls it and handles error reporting
+ * via the `LastError` slot.
+ */
+struct FireAndForgetOpRunner : HandleRequest::OpRunner {
+ using HandleRequest::OpRunner::OpRunner;
+ virtual void runAndForget() = 0;
+ DbResponse run() final;
+};
- DbMessage dbmsg(m);
+struct KillCursorsOpRunner : FireAndForgetOpRunner {
+ using FireAndForgetOpRunner::FireAndForgetOpRunner;
+ void runAndForget() override {
+ hr->currentOp().ensureStarted();
+ hr->slowMsOverride = 10;
+ receivedKillCursors(hr->opCtx, hr->m);
+ }
+};
- Client& c = *opCtx->getClient();
+struct InsertOpRunner : FireAndForgetOpRunner {
+ using FireAndForgetOpRunner::FireAndForgetOpRunner;
+ void runAndForget() override {
+ hr->assertValidNsString();
+ receivedInsert(hr->opCtx, hr->nsString(), hr->m);
+ }
+};
- if (c.isInDirectClient()) {
+struct UpdateOpRunner : FireAndForgetOpRunner {
+ using FireAndForgetOpRunner::FireAndForgetOpRunner;
+ void runAndForget() override {
+ hr->assertValidNsString();
+ receivedUpdate(hr->opCtx, hr->nsString(), hr->m);
+ }
+};
+
+struct DeleteOpRunner : FireAndForgetOpRunner {
+ using FireAndForgetOpRunner::FireAndForgetOpRunner;
+ void runAndForget() override {
+ hr->assertValidNsString();
+ receivedDelete(hr->opCtx, hr->nsString(), hr->m);
+ }
+};
+
+std::unique_ptr<HandleRequest::OpRunner> HandleRequest::makeOpRunner() {
+ switch (op()) {
+ case dbQuery:
+ if (!nsString().isCommand())
+ return std::make_unique<QueryOpRunner>(this);
+ // FALLTHROUGH: it's a query containing a command
+ case dbMsg:
+ return std::make_unique<CommandOpRunner>(this);
+ case dbGetMore:
+ return std::make_unique<GetMoreOpRunner>(this);
+ case dbKillCursors:
+ return std::make_unique<KillCursorsOpRunner>(this);
+ case dbInsert:
+ return std::make_unique<InsertOpRunner>(this);
+ case dbUpdate:
+ return std::make_unique<UpdateOpRunner>(this);
+ case dbDelete:
+ return std::make_unique<DeleteOpRunner>(this);
+ default:
+ LOGV2(21968,
+ "Operation isn't supported: {operation}",
+ "Operation is not supported",
+ "operation"_attr = static_cast<int>(op()));
+ return nullptr;
+ }
+}
+
+DbResponse FireAndForgetOpRunner::run() {
+ try {
+ runAndForget();
+ } catch (const AssertionException& ue) {
+ LastError::get(hr->client()).setLastError(ue.code(), ue.reason());
+ LOGV2_DEBUG(21969,
+ 3,
+ "Caught Assertion in {networkOp}, continuing: {error}",
+ "Assertion in fire-and-forget operation",
+ "networkOp"_attr = networkOpToString(hr->op()),
+ "error"_attr = redact(ue));
+ hr->currentOp().debug().errInfo = ue.toStatus();
+ }
+ // A NotWritablePrimary error can be set either within
+ // receivedInsert/receivedUpdate/receivedDelete or within the AssertionException handler above.
+ // Either way, we want to throw an exception here, which will cause the client to be
+ // disconnected.
+ if (LastError::get(hr->client()).hadNotMasterError()) {
+ notMasterLegacyUnackWrites.increment();
+ uasserted(ErrorCodes::NotWritablePrimary,
+ str::stream() << "Not-master error while processing '"
+ << networkOpToString(hr->op()) << "' operation on '"
+ << hr->nsString() << "' namespace via legacy "
+ << "fire-and-forget command execution.");
+ }
+ return {};
+}
+
+void HandleRequest::startOperation() {
+ if (client().isInDirectClient()) {
if (!opCtx->getLogicalSessionId() || !opCtx->getTxnNumber()) {
invariant(!opCtx->inMultiDocumentTransaction() &&
!opCtx->lockState()->inAWriteUnitOfWork());
}
} else {
- LastError::get(c).startRequest();
- AuthorizationSession::get(c)->startRequest(opCtx);
+ LastError::get(client()).startRequest();
+ AuthorizationSession::get(client())->startRequest(opCtx);
// We should not be holding any locks at this point
invariant(!opCtx->lockState()->isLocked());
}
-
- const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : nullptr;
- const NamespaceString nsString = ns ? NamespaceString(ns) : NamespaceString();
-
- if (op == dbQuery) {
- if (nsString.isCommand()) {
- isCommand = true;
- }
- } else if (op == dbMsg) {
- isCommand = true;
- }
-
- CurOp& currentOp = *CurOp::get(opCtx);
{
- stdx::lock_guard<Client> lk(*opCtx->getClient());
+ stdx::lock_guard<Client> lk(client());
// Commands handling code will reset this if the operation is a command
// which is logically a basic CRUD operation like query, insert, etc.
- currentOp.setNetworkOp_inlock(op);
- currentOp.setLogicalOp_inlock(networkOpToLogicalOp(op));
+ currentOp().setNetworkOp_inlock(op());
+ currentOp().setLogicalOp_inlock(networkOpToLogicalOp(op()));
}
+}
- OpDebug& debug = currentOp.debug();
-
- boost::optional<long long> slowMsOverride;
- bool forceLog = false;
-
+DbResponse HandleRequest::run() {
+ startOperation();
DbResponse dbresponse;
- if (op == dbMsg || (op == dbQuery && isCommand)) {
- dbresponse = receivedCommands(opCtx, m, behaviors);
- // Hello should take kMaxAwaitTimeMs at most, log if it takes twice that.
- if (auto command = currentOp.getCommand(); command && (command->getName() == "hello")) {
- slowMsOverride =
- 2 * durationCount<Milliseconds>(SingleServerIsMasterMonitor::kMaxAwaitTime);
- }
- } else if (op == dbQuery) {
- invariant(!isCommand);
- opCtx->markKillOnClientDisconnect();
-
- dbresponse = receivedQuery(opCtx, nsString, c, m, behaviors);
- } else if (op == dbGetMore) {
- dbresponse = receivedGetMore(opCtx, m, currentOp, &forceLog);
+ if (auto opRunner = makeOpRunner()) {
+ dbresponse = opRunner->run();
} else {
- // The remaining operations do not return any response. They are fire-and-forget.
- try {
- if (op == dbKillCursors) {
- currentOp.ensureStarted();
- slowMsOverride = 10;
- receivedKillCursors(opCtx, m);
- } else if (op != dbInsert && op != dbUpdate && op != dbDelete) {
- LOGV2(21968,
- "Operation isn't supported: {operation}",
- "Operation is not supported",
- "operation"_attr = static_cast<int>(op));
- currentOp.done();
- forceLog = true;
- } else {
- if (!nsString.isValid()) {
- uassert(16257, str::stream() << "Invalid ns [" << ns << "]", false);
- } else if (op == dbInsert) {
- receivedInsert(opCtx, nsString, m);
- } else if (op == dbUpdate) {
- receivedUpdate(opCtx, nsString, m);
- } else if (op == dbDelete) {
- receivedDelete(opCtx, nsString, m);
- } else {
- MONGO_UNREACHABLE;
- }
- }
- } catch (const AssertionException& ue) {
- LastError::get(c).setLastError(ue.code(), ue.reason());
- LOGV2_DEBUG(21969,
- 3,
- "Caught Assertion in {networkOp}, continuing: {error}",
- "Assertion in fire-and-forget operation",
- "networkOp"_attr = networkOpToString(op),
- "error"_attr = redact(ue));
- debug.errInfo = ue.toStatus();
- }
- // A NotWritablePrimary error can be set either within
- // receivedInsert/receivedUpdate/receivedDelete or within the AssertionException handler
- // above. Either way, we want to throw an exception here, which will cause the client to be
- // disconnected.
- if (LastError::get(opCtx->getClient()).hadNotMasterError()) {
- notMasterLegacyUnackWrites.increment();
- uasserted(ErrorCodes::NotWritablePrimary,
- str::stream()
- << "Not-master error while processing '" << networkOpToString(op)
- << "' operation on '" << nsString << "' namespace via legacy "
- << "fire-and-forget command execution.");
- }
+ currentOp().done();
+ forceLog = true;
}
+ completeOperation(dbresponse);
+ return dbresponse;
+}
+void HandleRequest::completeOperation(const DbResponse& dbresponse) {
// Mark the op as complete, and log it if appropriate. Returns a boolean indicating whether
// this op should be written to the profiler.
- const bool shouldProfile = currentOp.completeAndLogOperation(
+ const bool shouldProfile = currentOp().completeAndLogOperation(
opCtx, MONGO_LOGV2_DEFAULT_COMPONENT, dbresponse.response.size(), slowMsOverride, forceLog);
Top::get(opCtx->getServiceContext())
.incrementGlobalLatencyStats(
opCtx,
- durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses()),
- currentOp.getReadWriteType());
+ durationCount<Microseconds>(currentOp().elapsedTimeExcludingPauses()),
+ currentOp().getReadWriteType());
if (shouldProfile) {
// Performance profiling is on
if (opCtx->lockState()->isReadLocked()) {
LOGV2_DEBUG(21970, 1, "Note: not profiling because of recursive read lock");
- } else if (c.isInDirectClient()) {
+ } else if (client().isInDirectClient()) {
LOGV2_DEBUG(21971, 1, "Note: not profiling because we are in DBDirectClient");
} else if (behaviors.lockedForWriting()) {
// TODO SERVER-26825: Fix race condition where fsyncLock is acquired post
@@ -1817,15 +1909,33 @@ Future<DbResponse> ServiceEntryPointCommon::handleRequest(OperationContext* opCt
LOGV2_DEBUG(21973, 1, "Note: not profiling because server is read-only");
} else {
invariant(!opCtx->lockState()->inAWriteUnitOfWork());
- profile(opCtx, op);
+ profile(opCtx, op());
}
}
recordCurOpMetrics(opCtx);
- return Future<DbResponse>::makeReady(std::move(dbresponse));
-} catch (const DBException& e) {
- LOGV2_ERROR(4879802, "Failed to handle request", "error"_attr = redact(e));
- return e.toStatus();
+}
+
+} // namespace
+
+BSONObj ServiceEntryPointCommon::getRedactedCopyForLogging(const Command* command,
+ const BSONObj& cmdObj) {
+ mutablebson::Document cmdToLog(cmdObj, mutablebson::Document::kInPlaceDisabled);
+ command->snipForLogging(&cmdToLog);
+ BSONObjBuilder bob;
+ cmdToLog.writeTo(&bob);
+ return bob.obj();
+}
+
+Future<DbResponse> ServiceEntryPointCommon::handleRequest(OperationContext* opCtx,
+ const Message& m,
+ const Hooks& behaviors) noexcept {
+ try {
+ return Future<DbResponse>::makeReady(HandleRequest{opCtx, m, behaviors}.run());
+ } catch (DBException& e) {
+ LOGV2_ERROR(4879802, "Failed to handle request", "error"_attr = redact(e));
+ return e.toStatus();
+ }
}
ServiceEntryPointCommon::Hooks::~Hooks() = default;