/** * Copyright (C) 2023-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include #include #include "mongo/config.h" #include "mongo/transport/asio/asio_session.h" #include "mongo/transport/asio/asio_transport_layer.h" #include "mongo/transport/baton.h" #ifdef MONGO_CONFIG_SSL #include "mongo/util/net/ssl.hpp" #endif namespace mongo::transport { /** * Provides common functionality between AsyncAsioSession and SyncAsioSession. * * NOTE: This functionality is currently provided by inheritance, but composition might be a * preferred approach after more refactoring. */ class CommonAsioSession : public AsioSession { public: /** * If the socket is disconnected while any of these options are being set, this constructor * may throw, but it is guaranteed to throw a mongo DBException. */ CommonAsioSession(AsioTransportLayer* tl, GenericSocket socket, bool isIngressSession, Endpoint endpoint = Endpoint(), std::shared_ptr transientSSLContext = nullptr); CommonAsioSession(const CommonAsioSession&) = delete; CommonAsioSession& operator=(const CommonAsioSession&) = delete; TransportLayer* getTransportLayer() const override { return _tl; } const HostAndPort& remote() const override { return _remote; } const HostAndPort& local() const override { return _local; } const SockAddr& remoteAddr() const override { return _remoteAddr; } const SockAddr& localAddr() const override { return _localAddr; } void end() override; StatusWith sourceMessage() noexcept override; Future asyncSourceMessage(const BatonHandle& baton = nullptr) noexcept override; Status waitForData() noexcept override; Future asyncWaitForData() noexcept override; Status sinkMessage(Message message) noexcept override; Future asyncSinkMessage(Message message, const BatonHandle& baton = nullptr) noexcept override; void cancelAsyncOperations(const BatonHandle& baton = nullptr) override; void setTimeout(boost::optional timeout) override; bool isConnected() override; bool isFromLoadBalancer() const override { return _isFromLoadBalancer; } #ifdef MONGO_CONFIG_SSL const std::shared_ptr& getSSLManager() const override; #endif protected: #ifdef MONGO_CONFIG_SSL /** Constructs a SSL socket required to initiate SSL handshake for egress connections. */ Status buildSSLSocket(const HostAndPort& target) override; Future handshakeSSLForEgress(const HostAndPort& target, const ReactorHandle& reactor) override; #endif GenericSocket& getSocket() override; ExecutorFuture parseProxyProtocolHeader(const ReactorHandle& reactor) override; /** * Provides the means to track and cancel async I/O operations scheduled through `Session`. * Any I/O operation goes through the following steps: * - `start()`: changes the state from `kNotStarted` to `kRunning`. * - Before scheduling the async operation, checks for cancellation through `isCanceled()`. * - `complete()`: clears the state, and prepares the session for future operations. * * This class is thread-safe. */ class AsyncOperationState { public: void start() { const auto prev = _state.swap(State::kRunning); invariant(prev == State::kNotStarted, "Another operation was in progress"); } bool isCanceled() const { return _state.load() == State::kCanceled; } void complete() { const auto prev = _state.swap(State::kNotStarted); invariant(prev != State::kNotStarted, "No operation was running"); } /** * Instructs an active operation to cancel (if there is any). Otherwise, it does nothing. * Cancellation is non-blocking and `cancel()` doesn't block for completion of ongoing * operations. */ void cancel() { auto expected = State::kRunning; _state.compareAndSwap(&expected, State::kCanceled); } private: /** * State transition diagram: * -+-> [kNotStarted] --> [kRunning] --> [kCanceled] * | | | * +--------------------------+--------------+ */ enum class State { kNotStarted, kRunning, kCanceled }; AtomicWord _state{State::kNotStarted}; }; Future sourceMessageImpl(const BatonHandle& baton = nullptr); Future sinkMessageImpl(Message message, const BatonHandle& baton = nullptr); template Future read(const MutableBufferSequence& buffers, const BatonHandle& baton = nullptr); template Future write(const ConstBufferSequence& buffers, const BatonHandle& baton = nullptr); template Future opportunisticRead(Stream& stream, const MutableBufferSequence& buffers, const BatonHandle& baton = nullptr); /** * moreToSend checks the ssl socket after an opportunisticWrite. If there are still bytes to * send, we manually send them off the underlying socket. Then we hook that up with a future * that gets us back to sending from the ssl side. * * There are two variants because we call opportunisticWrite on generic sockets and ssl sockets. * The generic socket impl never has more to send (because it doesn't have an inner socket it * needs to keep sending). */ template boost::optional> moreToSend(GenericSocket& socket, const ConstBufferSequence& buffers, const BatonHandle& baton) { return boost::none; } #ifdef MONGO_CONFIG_SSL template boost::optional> moreToSend(asio::ssl::stream& socket, const ConstBufferSequence& buffers, const BatonHandle& baton) { if (_sslSocket->getCoreOutputBuffer().size()) { return opportunisticWrite(getSocket(), _sslSocket->getCoreOutputBuffer(), baton) .then([this, &socket, buffers, baton] { return opportunisticWrite(socket, buffers, baton); }); } return boost::none; } #endif template Future opportunisticWrite(Stream& stream, const ConstBufferSequence& buffers, const BatonHandle& baton = nullptr); #ifdef MONGO_CONFIG_SSL template Future maybeHandshakeSSLForIngress(const MutableBufferSequence& buffer); #endif template bool checkForHTTPRequest(const Buffer& buffers); /** * Called from read() to send an HTTP response back to a client that's trying to use HTTP * over a native MongoDB port. This returns a Future to match its only caller, but it * always contains an error, so it could really return Future */ Future sendHTTPResponse(const BatonHandle& baton = nullptr); enum BlockingMode { unknown, sync, async, }; BlockingMode _blockingMode = unknown; HostAndPort _remote; HostAndPort _local; SockAddr _remoteAddr; SockAddr _localAddr; boost::optional _configuredTimeout; boost::optional _socketTimeout; GenericSocket _socket; #ifdef MONGO_CONFIG_SSL boost::optional> _sslSocket; bool _ranHandshake = false; std::shared_ptr _sslContext; #endif AsioTransportLayer* const _tl; bool _isIngressSession; bool _isFromLoadBalancer = false; boost::optional _proxiedSrcEndpoint; boost::optional _proxiedDstEndpoint; AsyncOperationState _asyncOpState; /** * Strictly orders the start and cancellation of asynchronous operations: * - Holding the mutex while starting asynchronous operations (e.g., adding the session to the * networking baton) ensures cancellation either happens before or after scheduling of the * operation. * - Holding the mutex while canceling asynchronous operations guarantees no operation can start * while cancellation is in progress. * * Opportunistic read and write are implemented as recursive future continuations, so we may * recursively acquire the mutex when the future is readied inline. */ stdx::recursive_mutex _asyncOpMutex; // NOLINT }; /** * This is an AsioSession which is intended to only use the `asyncSourceMessage`, * `asyncSinkMessage`, and `asyncWaitForData` subset of the Session's read/write/wait interface. * Usage of non async counterparts of these functions causes an invariant to be triggered. * * NOTE: Longer-term the intention of the split between Sync and Async implementations is to firstly * have two cleaner implementations instead of one implementation that supports both modes of * operation, and secondly to reduce the size of the read/write/wait Session interface into a * generic future-returning interface, where the readiness of those futures depends on the * implementing subclass. At the moment, the distinction between the classes is limited to which * functions are allowed to be called. */ class AsyncAsioSession : public CommonAsioSession { public: using CommonAsioSession::CommonAsioSession; ~AsyncAsioSession() override { end(); } protected: void ensureSync() override; void ensureAsync() override; }; /** * This is an AsioSession which is intended to only use the `sourceMessage`, `sinkMessage`, and * `waitForData` subset of the Session's read/write/wait interface. Usage of async counterparts of * these functions causes an invariant to be triggered. * * NOTE: See AsyncAsioSession's note explaining the current state and purpose of the separation. */ class SyncAsioSession : public CommonAsioSession { public: using CommonAsioSession::CommonAsioSession; ~SyncAsioSession() override { end(); } protected: void ensureSync() override; void ensureAsync() override; }; } // namespace mongo::transport