/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include #include #include #include #include #include #include #include "mongo/base/status.h" #include "mongo/base/system_error.h" #include "mongo/executor/async_stream_factory_interface.h" #include "mongo/executor/async_stream_interface.h" #include "mongo/executor/async_timer_interface.h" #include "mongo/executor/connection_pool.h" #include "mongo/executor/network_connection_hook.h" #include "mongo/executor/network_interface.h" #include "mongo/executor/remote_command_request.h" #include "mongo/executor/remote_command_response.h" #include "mongo/platform/atomic_word.h" #include "mongo/rpc/metadata/metadata_hook.h" #include "mongo/rpc/protocol.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/stdx/unordered_map.h" #include "mongo/stdx/unordered_set.h" #include "mongo/transport/message_compressor_manager.h" #include "mongo/util/net/message.h" namespace mongo { namespace executor { namespace connection_pool_asio { class ASIOConnection; class ASIOTimer; class ASIOImpl; } // connection_pool_asio class AsyncStreamInterface; #define MONGO_ASIO_INVARIANT(_Expression, ...) \ do { \ if (MONGO_unlikely(!(_Expression))) { \ _failWithInfo(__FILE__, __LINE__, __VA_ARGS__); \ } \ } while (false) #define MONGO_ASIO_INVARIANT_INLOCK(_Expression, ...) \ do { \ if (MONGO_unlikely(!(_Expression))) { \ _failWithInfo_inlock(__FILE__, __LINE__, __VA_ARGS__); \ } \ } while (false) // An AsyncOp can transition through at most 5 states. const int kMaxStateTransitions = 5; /** * Implementation of the replication system's network interface using Christopher * Kohlhoff's ASIO library instead of existing MongoDB networking primitives. */ class NetworkInterfaceASIO final : public NetworkInterface { friend class connection_pool_asio::ASIOConnection; friend class connection_pool_asio::ASIOTimer; friend class connection_pool_asio::ASIOImpl; class AsyncOp; public: struct Options { Options(); std::string instanceName = "NetworkInterfaceASIO"; ConnectionPool::Options connectionPoolOptions; std::unique_ptr timerFactory; std::unique_ptr networkConnectionHook; std::unique_ptr streamFactory; std::unique_ptr metadataHook; }; NetworkInterfaceASIO(Options = Options()); std::string getDiagnosticString() override; uint64_t getNumCanceledOps(); uint64_t getNumFailedOps(); uint64_t getNumSucceededOps(); uint64_t getNumTimedOutOps(); void appendConnectionStats(ConnectionPoolStats* stats) const override; std::string getHostName() override; void startup() override; void shutdown() override; bool inShutdown() const override; void waitForWork() override; void waitForWorkUntil(Date_t when) override; void signalWorkAvailable() override; Date_t now() override; Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish) override; void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) override; Status setAlarm(Date_t when, const stdx::function& action) override; bool onNetworkThread() override; void dropConnections(const HostAndPort& hostAndPort) override; private: using ResponseStatus = TaskExecutor::ResponseStatus; using NetworkInterface::RemoteCommandCompletionFn; using NetworkOpHandler = stdx::function; using TableRow = std::vector; enum class State { kReady, kRunning, kShutdown }; friend class AsyncOp; /** * AsyncConnection encapsulates the per-connection state we maintain. */ class AsyncConnection { public: AsyncConnection(std::unique_ptr, rpc::ProtocolSet serverProtocols); AsyncStreamInterface& stream(); void cancel(); rpc::ProtocolSet serverProtocols() const; rpc::ProtocolSet clientProtocols() const; void setServerProtocols(rpc::ProtocolSet protocols); MessageCompressorManager& getCompressorManager() { return _compressorManager; } private: std::unique_ptr _stream; rpc::ProtocolSet _serverProtocols; // Dynamically initialized from [min max]WireVersionOutgoing. // Its expected that isMaster response is checked only on the caller. rpc::ProtocolSet _clientProtocols{rpc::supports::kNone}; MessageCompressorManager _compressorManager; }; /** * AsyncCommand holds state for a currently running or soon-to-be-run command. */ class AsyncCommand { public: AsyncCommand(AsyncConnection* conn, Message&& command, Date_t now, const HostAndPort& target); NetworkInterfaceASIO::AsyncConnection& conn(); Message& toSend(); Message& toRecv(); MSGHEADER::Value& header(); ResponseStatus response(AsyncOp* op, rpc::Protocol protocol, Date_t now, rpc::EgressMetadataHook* metadataHook = nullptr); HostAndPort target() const { return this->_target; } private: NetworkInterfaceASIO::AsyncConnection* const _conn; Message _toSend; Message _toRecv; // TODO: Investigate efficiency of storing header separately. MSGHEADER::Value _header; const Date_t _start; const HostAndPort _target; }; /** * Helper object to manage individual network operations. */ class AsyncOp { friend class NetworkInterfaceASIO; friend class connection_pool_asio::ASIOConnection; public: /** * Describe the various states through which an AsyncOp transitions. */ enum class State : unsigned char { // A non-state placeholder. kNoState, // A new or zeroed-out AsyncOp. kUninitialized, // An AsyncOp begins its progress when startProgress() is called. kInProgress, // An AsyncOp transitions to kTimedOut when timeOut() is called. // Note that the AsyncOp can be in a kCanceled state and still be // in-flight in NetworkInterfaceASIO. kTimedOut, // An AsyncOp transitions to kCanceled when cancel() is called. // Note that the AsyncOp can be in a kCanceled state and still be // in-flight in NetworkInterfaceASIO. kCanceled, // An AsyncOp is finished once its finish() method is called. Note // that the AsyncOp can be in a kFinished state and still be in the // NetworkInterface's set of in-progress operations. kFinished, }; AsyncOp(NetworkInterfaceASIO* net, const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish, Date_t now); /** * Access control for AsyncOp. These objects should be used through shared_ptrs. * * In order to safely access an AsyncOp: * 1. Take the lock * 2. Check the id * 3. If id matches saved generation, proceed, otherwise op has been recycled. */ struct AccessControl { stdx::mutex mutex; std::size_t id = 0; }; void cancel(); bool canceled() const; void timeOut_inlock(); bool timedOut() const; const TaskExecutor::CallbackHandle& cbHandle() const; AsyncConnection& connection(); void setConnection(AsyncConnection&& conn); // AsyncOp may run multiple commands over its lifetime (for example, an ismaster // command, the command provided to the NetworkInterface via startCommand(), etc.) // Calling beginCommand() resets internal state to prepare to run newCommand. Status beginCommand(const RemoteCommandRequest& request); // This form of beginCommand takes a raw message. It is needed if the caller // has to form the command manually (e.g. to use a specific requestBuilder). Status beginCommand(Message&& newCommand, const HostAndPort& target); AsyncCommand& command(); bool commandIsInitialized() const; void finish(TaskExecutor::ResponseStatus&& status); const RemoteCommandRequest& request() const; void startProgress(Date_t startTime); Date_t start() const; rpc::Protocol operationProtocol() const; void setOperationProtocol(rpc::Protocol proto); void setResponseMetadata(BSONObj m); BSONObj getResponseMetadata(); void reset(); void clearStateTransitions(); void setOnFinish(RemoteCommandCompletionFn&& onFinish); // Returns diagnostic strings for logging. TableRow getStringFields() const; std::string toString() const; asio::io_service::strand& strand() { return _strand; } asio::ip::tcp::resolver& resolver() { return _resolver; } bool operator==(const AsyncOp& other) const; private: // Type to represent the internal id of this request. using AsyncOpId = uint64_t; static const TableRow kFieldLabels; // Return string representation of a given state. std::string _stateToString(State state) const; // Return a string representation of this op's state transitions. std::string _stateString() const; bool _hasSeenState(State state) const; // Track and validate AsyncOp state transitions. // Use the _inlock variant if already holding the access control lock. void _transitionToState(State newState); void _transitionToState_inlock(State newState); // Helper for debugging. void _failWithInfo(const char* file, int line, std::string error) const; NetworkInterfaceASIO* const _owner; // Information describing a task enqueued on the NetworkInterface // via a call to startCommand(). TaskExecutor::CallbackHandle _cbHandle; RemoteCommandRequest _request; RemoteCommandCompletionFn _onFinish; // AsyncOp's have a handle to their connection pool handle. They are // also owned by it when they're in the pool ConnectionPool::ConnectionHandle _connectionPoolHandle; /** * The connection state used to service this request. We wrap it in an optional * as it is instantiated at some point after the AsyncOp is created. */ boost::optional _connection; /** * The RPC protocol used for this operation. We wrap it in an optional as it * is not known until we obtain a connection. */ boost::optional _operationProtocol; Date_t _start; std::unique_ptr _timeoutAlarm; asio::ip::tcp::resolver _resolver; const AsyncOpId _id; /** * We maintain a shared_ptr to an access control object. This ensures that tangent * execution paths, such as timeouts for this operation, will not try to access its * state after it has been cleaned up. */ std::shared_ptr _access; /** * An AsyncOp may run 0, 1, or multiple commands over its lifetime. * AsyncOp only holds at most a single AsyncCommand object at a time, * representing its current running or next-to-be-run command, if there is one. */ boost::optional _command; bool _inSetup; bool _inRefresh; /** * The explicit strand that all operations for this op must run on. * This must be the last member of AsyncOp because any pending * operation for the strand are run when it's dtor is called. Any * members that fall after it will have already been destroyed, which * will make those fields illegal to touch from callbacks. */ asio::io_service::strand _strand; /** * We hold an array of states to show the path this AsyncOp has taken. * Must be holding the access control's lock to edit. */ std::array _states; BSONObj _responseMetadata{}; }; void _startCommand(AsyncOp* op); /** * Wraps a completion handler in pre-condition checks. * When we resume after an asynchronous call, we may find the following: * - the AsyncOp has been canceled in the interim (via cancelCommand()) * - the asynchronous call has returned a non-OK error code * Should both conditions be present, we handle cancelation over errors. States use * _validateAndRun() to perform these checks before advancing the state machine. */ template void _validateAndRun(AsyncOp* op, std::error_code ec, Handler&& handler) { if (op->canceled()) { auto rs = ResponseStatus( ErrorCodes::CallbackCanceled, "Callback canceled", now() - op->start()); return _completeOperation(op, rs); } else if (op->timedOut()) { auto rs = ResponseStatus(ErrorCodes::NetworkInterfaceExceededTimeLimit, "Operation timed out", now() - op->start()); return _completeOperation(op, rs); } else if (ec) return _networkErrorCallback(op, ec); handler(); } // Connection void _connect(AsyncOp* op); // setup plaintext TCP socket void _setupSocket(AsyncOp* op, asio::ip::tcp::resolver::iterator endpoints); void _runIsMaster(AsyncOp* op); void _runConnectionHook(AsyncOp* op); void _authenticate(AsyncOp* op); // Communication state machine void _beginCommunication(AsyncOp* op); void _completedOpCallback(AsyncOp* op); void _networkErrorCallback(AsyncOp* op, const std::error_code& ec); void _completeOperation(AsyncOp* op, TaskExecutor::ResponseStatus resp); void _signalWorkAvailable_inlock(); void _asyncRunCommand(AsyncOp* op, NetworkOpHandler handler); std::string _getDiagnosticString_inlock(AsyncOp* currentOp); // Helpers for debugging crashes void _failWithInfo(const char* file, int line, std::string error, AsyncOp* op = nullptr); void _failWithInfo_inlock(const char* file, int line, std::string error, AsyncOp* op = nullptr); Options _options; asio::io_service _io_service; std::vector _serviceRunners; const std::unique_ptr _metadataHook; const std::unique_ptr _hook; std::atomic _state; // NOLINT std::unique_ptr _timerFactory; std::unique_ptr _streamFactory; ConnectionPool _connectionPool; // If it is necessary to hold this lock while accessing a particular operation with // an AccessControl object, take this lock first, always. stdx::mutex _inProgressMutex; stdx::unordered_map> _inProgress; stdx::unordered_set _inGetConnection; // Operation counters AtomicUInt64 _numCanceledOps; AtomicUInt64 _numFailedOps; // includes timed out ops but does not include canceled ops AtomicUInt64 _numSucceededOps; AtomicUInt64 _numTimedOutOps; stdx::mutex _executorMutex; bool _isExecutorRunnable; stdx::condition_variable _isExecutorRunnableCondition; /** * The explicit strand that all non-op operations run on. This must be the * last member of NetworkInterfaceASIO because any pending operation for * the strand are run when it's dtor is called. Any members that fall after * it will have already been destroyed, which will make those fields * illegal to touch from callbacks. */ asio::io_service::strand _strand; }; template R callNoexcept(T& obj, R (T::*method)(MethodArgs...), DeducedArgs&&... args) { try { return (obj.*method)(std::forward(args)...); } catch (...) { std::terminate(); } } } // namespace executor } // namespace mongo