If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include #include "mongo/platform/basic.h" #include "mongo/s/service_entry_point_mongos.h" #include "mongo/client/server_discovery_monitor.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" #include "mongo/db/curop.h" #include "mongo/db/dbmessage.h" #include "mongo/db/not_primary_error_tracker.h" #include "mongo/db/operation_context.h" #include "mongo/db/request_execution_context.h" #include "mongo/db/service_context.h" #include "mongo/db/session_catalog.h" #include "mongo/db/stats/counters.h" #include "mongo/logv2/log.h" #include "mongo/rpc/message.h" #include "mongo/rpc/warn_deprecated_wire_ops.h" #include "mongo/s/commands/strategy.h" #include "mongo/s/grid.h" #include "mongo/s/load_balancer_support.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/transaction_router.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kNetwork namespace mongo { namespace { BSONObj buildErrReply(const DBException& ex) { BSONObjBuilder errB; errB.append("$err", ex.what()); errB.append("code", ex.code()); return errB.obj(); } } // namespace // Allows for decomposing `handleRequest` into parts and simplifies composing the future-chain. struct HandleRequest : public std::enable_shared_from_this { struct OpRunnerBase; HandleRequest(OperationContext* opCtx, const Message& message) : rec(std::make_shared(opCtx, message)), op(message.operation()), msgId(message.header().getId()), nsString(getNamespaceString(rec->getDbMessage())) {} // Prepares the environment for handling the request. void setupEnvironment(); // Returns a future that does the heavy lifting of running client commands. Future handleRequest(); // Runs on successful execution of the future returned by `handleRequest`. void onSuccess(const DbResponse&); // Returns a future-chain to handle the request and prepare the response. Future run(); static NamespaceString getNamespaceString(const DbMessage& dbmsg) { if (!dbmsg.messageShouldHaveNs()) return {}; return NamespaceString(dbmsg.getns()); } const std::shared_ptr rec; const NetworkOp op; const int32_t msgId; const NamespaceString nsString; boost::optional slowMsOverride; }; void HandleRequest::setupEnvironment() { using namespace fmt::literals; auto opCtx = rec->getOpCtx(); // This exception will not be returned to the caller, but will be logged and will close the // connection uassert(ErrorCodes::IllegalOperation, "Message type {} is not supported."_format(op), isSupportedRequestNetworkOp(op) && op != dbCompressed); // Decompression should be handled above us. // Start a new NotPrimaryErrorTracker session. Any exceptions thrown from here onwards will be // returned to the caller (if the type of the message permits it). auto client = opCtx->getClient(); NotPrimaryErrorTracker::get(client).startRequest(); AuthorizationSession::get(client)->startRequest(opCtx); CurOp::get(opCtx)->ensureStarted(); } // The base for various operation runners that handle the request, and often generate a DbResponse. struct HandleRequest::OpRunnerBase { explicit OpRunnerBase(std::shared_ptr hr) : hr(std::move(hr)) {} virtual ~OpRunnerBase() = default; virtual Future run() = 0; const std::shared_ptr hr; }; struct CommandOpRunner final : public HandleRequest::OpRunnerBase { using HandleRequest::OpRunnerBase::OpRunnerBase; Future run() override { return Strategy::clientCommand(hr->rec); } }; Future HandleRequest::handleRequest() { switch (op) { case dbQuery: if (!nsString.isCommand()) { globalOpCounters.gotQueryDeprecated(); warnDeprecation(*(rec->getOpCtx()->getClient()), networkOpToString(dbQuery)); return Future::makeReady( makeErrorResponseToDeprecatedOpQuery("OP_QUERY is no longer supported")); } [[fallthrough]]; // It's a query containing a command case dbMsg: return std::make_unique(shared_from_this())->run(); case dbGetMore: { globalOpCounters.gotGetMoreDeprecated(); warnDeprecation(*(rec->getOpCtx()->getClient()), networkOpToString(dbGetMore)); return Future::makeReady( makeErrorResponseToDeprecatedOpQuery("OP_GET_MORE is no longer supported")); } case dbKillCursors: globalOpCounters.gotKillCursorsDeprecated(); warnDeprecation(*(rec->getOpCtx()->getClient()), networkOpToString(op)); uasserted(5745707, "OP_KILL_CURSORS is no longer supported"); case dbInsert: { auto opInsert = InsertOp::parseLegacy(rec->getMessage()); globalOpCounters.gotInsertsDeprecated(opInsert.getDocuments().size()); warnDeprecation(*(rec->getOpCtx()->getClient()), networkOpToString(op)); uasserted(5745706, "OP_INSERT is no longer supported"); } case dbUpdate: globalOpCounters.gotUpdateDeprecated(); warnDeprecation(*(rec->getOpCtx()->getClient()), networkOpToString(op)); uasserted(5745705, "OP_UPDATE is no longer supported"); case dbDelete: globalOpCounters.gotDeleteDeprecated(); warnDeprecation(*(rec->getOpCtx()->getClient()), networkOpToString(op)); uasserted(5745704, "OP_DELETE is no longer supported"); default: MONGO_UNREACHABLE; } } void HandleRequest::onSuccess(const DbResponse& dbResponse) { auto opCtx = rec->getOpCtx(); // Mark the op as complete, populate the response length, and log it if appropriate. CurOp::get(opCtx)->completeAndLogOperation( opCtx, logv2::LogComponent::kCommand, dbResponse.response.size(), slowMsOverride); } Future HandleRequest::run() { auto fp = makePromiseFuture(); auto future = std::move(fp.future) .then([this, anchor = shared_from_this()] { setupEnvironment(); }) .then([this, anchor = shared_from_this()] { return handleRequest(); }) .tap([this, anchor = shared_from_this()](const DbResponse& dbResponse) { onSuccess(dbResponse); }) .tapError([](Status status) { LOGV2(4879803, "Failed to handle request", "error"_attr = redact(status)); }); fp.promise.emplaceValue(); return future; } Future ServiceEntryPointMongos::handleRequestImpl(OperationContext* opCtx, const Message& message) noexcept { auto hr = std::make_shared(opCtx, message); return hr->run(); } Future ServiceEntryPointMongos::handleRequest(OperationContext* opCtx, const Message& message) noexcept { return handleRequestImpl(opCtx, message); } void ServiceEntryPointMongos::onClientConnect(Client* client) { if (load_balancer_support::isFromLoadBalancer(client)) { _loadBalancedConnections.increment(); } } void ServiceEntryPointMongos::onClientDisconnect(Client* client) { if (load_balancer_support::isFromLoadBalancer(client)) { _loadBalancedConnections.decrement(); auto killerOperationContext = client->makeOperationContext(); // Kill any cursors opened by the given Client. auto ccm = Grid::get(client->getServiceContext())->getCursorManager(); ccm->killCursorsSatisfying(killerOperationContext.get(), [&](CursorId, const ClusterCursorManager::CursorEntry& entry) { return entry.originatingClientUuid() == client->getUUID(); }); // Kill any in-progress transactions over this Client connection. auto lsid = load_balancer_support::getMruSession(client); auto killToken = [&]() -> boost::optional { try { return SessionCatalog::get(killerOperationContext.get())->killSession(lsid); } catch (const ExceptionFor&) { return boost::none; } }(); if (!killToken) { // There was no entry in the SessionCatalog for the session most recently used by the // disconnecting client, so we have no transaction state to clean up. return; } OperationContextSession sessionCtx(killerOperationContext.get(), std::move(*killToken)); invariant(lsid == OperationContextSession::get(killerOperationContext.get())->getSessionId()); auto txnRouter = TransactionRouter::get(killerOperationContext.get()); if (txnRouter && txnRouter.isInitialized() && !txnRouter.isTrackingOver()) { txnRouter.implicitlyAbortTransaction( killerOperationContext.get(), {ErrorCodes::Interrupted, "aborting in-progress transaction because load-balanced client disconnected"}); } } } void ServiceEntryPointMongos::appendStats(BSONObjBuilder* bob) const { ServiceEntryPointImpl::appendStats(bob); if (load_balancer_support::isEnabled()) { bob->append("loadBalanced", _loadBalancedConnections); } } } // namespace mongo