diff options
author | Jonathan Reams <jbreams@mongodb.com> | 2018-05-03 15:40:22 -0400 |
---|---|---|
committer | Jonathan Reams <jbreams@mongodb.com> | 2018-05-16 14:16:09 -0400 |
commit | 2e330a0a33f271c6ac50441c2e7a8a1a6e776265 (patch) | |
tree | 08a346263be0f968e5a54caa59073c3948da332e /src/mongo/executor | |
parent | d646baf48e55f2e84ff811a0191ebf1e253ea9c6 (diff) | |
download | mongo-2e330a0a33f271c6ac50441c2e7a8a1a6e776265.tar.gz |
SERVER-34730 Delete NetworkInterfaceASIO
Diffstat (limited to 'src/mongo/executor')
48 files changed, 549 insertions, 6808 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript index 6a6e061bf12..2f44dd45198 100644 --- a/src/mongo/executor/SConscript +++ b/src/mongo/executor/SConscript @@ -4,8 +4,6 @@ Import("env") env = env.Clone() -env.InjectThirdPartyIncludePaths('asio') - env.Library( target='connection_pool_stats', source=[ @@ -16,6 +14,13 @@ env.Library( '$BUILD_DIR/mongo/util/net/network', ]) +env.Library(target='async_timer_mock', + source=['async_timer_mock.cpp'], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/base/system_error', + ]) + env.Library(target='remote_command', source=[ 'remote_command_request.cpp', @@ -46,26 +51,6 @@ env.Library(target='task_executor_interface', 'remote_command', ]) -env.Library(target='async_timer_mock', - source=['async_timer_mock.cpp'], - LIBDEPS=[ - '$BUILD_DIR/mongo/base', - '$BUILD_DIR/third_party/shim_asio', - ]) - -env.CppUnitTest(target='async_timer_mock_test', - source=['async_timer_mock_test.cpp'], - LIBDEPS=[ - 'async_timer_mock', - ]) - -env.Library(target='async_timer_asio', - source=['async_timer_asio.cpp'], - LIBDEPS=[ - '$BUILD_DIR/mongo/base', - '$BUILD_DIR/third_party/shim_asio', - ]) - env.Library(target='network_interface', source=['network_interface.cpp',], LIBDEPS=[ @@ -115,21 +100,6 @@ env.CppUnitTest( ], ) -env.CppIntegrationTest( - target='connection_pool_asio_integration_test', - source=[ - 'connection_pool_asio_integration_test.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/rpc/command_status', - 'network_interface_asio', - '$BUILD_DIR/mongo/executor/thread_pool_task_executor', - '$BUILD_DIR/mongo/executor/network_interface_thread_pool', - '$BUILD_DIR/mongo/executor/network_interface_factory', - '$BUILD_DIR/mongo/util/version_impl', - ], -) - env.CppUnitTest( target='network_interface_mock_test', source=[ @@ -169,6 +139,7 @@ env.Library( ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/auth/authcommon', + '$BUILD_DIR/mongo/db/commands/test_commands_enabled', '$BUILD_DIR/mongo/transport/transport_layer_manager', 'connection_pool_executor', 'network_interface', @@ -176,87 +147,11 @@ env.Library( ) env.Library( - target='network_interface_asio', - source=[ - 'connection_pool_asio.cpp', - 'network_interface_asio.cpp', - 'network_interface_asio_auth.cpp', - 'network_interface_asio_command.cpp', - 'network_interface_asio_connect.cpp', - 'network_interface_asio_operation.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/base', - '$BUILD_DIR/mongo/base/system_error', - '$BUILD_DIR/mongo/client/client_query', - '$BUILD_DIR/mongo/db/auth/authcommon', - '$BUILD_DIR/mongo/db/wire_version', - '$BUILD_DIR/mongo/rpc/command_status', - '$BUILD_DIR/mongo/rpc/rpc', - '$BUILD_DIR/mongo/transport/message_compressor', - '$BUILD_DIR/third_party/shim_asio', - 'async_stream', - 'async_timer_asio', - 'connection_pool_executor', - 'egress_tag_closer_manager', - 'network_interface', - 'task_executor_interface', - ], - LIBDEPS_PRIVATE=[ - '$BUILD_DIR/mongo/db/commands/test_commands_enabled', - ], -) - -env.Library( - target='async_stream', + target='network_interface_fixture', source=[ - 'async_secure_stream.cpp', - 'async_secure_stream_factory.cpp', - 'async_stream.cpp', - 'async_stream_common.cpp', - 'async_stream_factory.cpp', + 'network_interface_integration_fixture.cpp' ], LIBDEPS=[ - '$BUILD_DIR/mongo/base/system_error', - '$BUILD_DIR/mongo/client/authentication', - '$BUILD_DIR/mongo/util/net/ssl_manager', - '$BUILD_DIR/third_party/shim_asio', - 'task_executor_interface', - ] -) - -env.CppUnitTest( - target='async_stream_test', - source=[ - 'async_stream_test.cpp', - ], - LIBDEPS=[ - 'async_stream', - ] -) - -env.CppUnitTest( - target='network_interface_asio_test', - source=[ - 'async_mock_stream_factory.cpp', - 'network_interface_asio_test.cpp', - ], - LIBDEPS=[ - 'async_timer_mock', - 'network_interface_asio', - '$BUILD_DIR/mongo/executor/thread_pool_task_executor', - '$BUILD_DIR/mongo/executor/network_interface_thread_pool', - '$BUILD_DIR/mongo/executor/network_interface_factory', - '$BUILD_DIR/mongo/util/version_impl', - ], -) -env.Library( - target='network_interface_asio_fixture', - source=[ - 'network_interface_asio_integration_fixture.cpp' - ], - LIBDEPS=[ - 'network_interface_asio', '$BUILD_DIR/mongo/unittest/integration_test_main', '$BUILD_DIR/mongo/executor/thread_pool_task_executor', '$BUILD_DIR/mongo/executor/network_interface_thread_pool', @@ -268,33 +163,17 @@ env.Library( ) env.CppIntegrationTest( - target='network_interface_asio_integration_test', + target='network_interface_integration_test', source=[ - 'network_interface_asio_integration_test.cpp', + 'network_interface_integration_test.cpp', ], LIBDEPS=[ - 'network_interface_asio_fixture', - ], -) - -env.CppIntegrationTest( - target='network_interface_perf_test', - source=[ - 'network_interface_perf_test.cpp', - ], - LIBDEPS=[ - 'network_interface_asio', - '$BUILD_DIR/mongo/executor/thread_pool_task_executor', - '$BUILD_DIR/mongo/executor/network_interface_thread_pool', - '$BUILD_DIR/mongo/executor/network_interface_factory', - '$BUILD_DIR/mongo/db/auth/authmocks', - '$BUILD_DIR/mongo/db/service_context_noop_init', - '$BUILD_DIR/mongo/rpc/command_status', - '$BUILD_DIR/mongo/util/version_impl', + 'network_interface_fixture', + '$BUILD_DIR/mongo/db/commands/test_commands_enabled', + '$BUILD_DIR/mongo/db/wire_version', ], ) - env.Library( target='network_interface_factory', source=[ @@ -302,10 +181,10 @@ env.Library( ], LIBDEPS=[ 'network_interface', - 'network_interface_asio', - 'network_interface_tl', + 'connection_pool_executor', ], LIBDEPS_PRIVATE=[ + 'network_interface_tl', 'egress_tag_closer_manager', ]) @@ -345,23 +224,6 @@ env.Library( ] ) -env.CppUnitTest( - target='network_interface_thread_pool_test', - source=[ - 'network_interface_thread_pool_test.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/util/concurrency/thread_pool_test_fixture', - 'async_timer_mock', - 'connection_pool_executor', - 'network_interface_asio', - 'network_interface_thread_pool', - '$BUILD_DIR/mongo/executor/thread_pool_task_executor', - '$BUILD_DIR/mongo/executor/network_interface_thread_pool', - '$BUILD_DIR/mongo/executor/network_interface_factory' - ], -) - env.Library( target='thread_pool_task_executor_test_fixture', source=[ diff --git a/src/mongo/executor/async_mock_stream_factory.cpp b/src/mongo/executor/async_mock_stream_factory.cpp deleted file mode 100644 index 0d57e80ac5b..00000000000 --- a/src/mongo/executor/async_mock_stream_factory.cpp +++ /dev/null @@ -1,335 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO - -#include "mongo/platform/basic.h" - -#include "mongo/executor/async_mock_stream_factory.h" - -#include <exception> -#include <iterator> -#include <system_error> - -#include "mongo/base/system_error.h" -#include "mongo/rpc/command_reply_builder.h" -#include "mongo/rpc/factory.h" -#include "mongo/rpc/legacy_reply_builder.h" -#include "mongo/rpc/message.h" -#include "mongo/stdx/memory.h" -#include "mongo/util/assert_util.h" -#include "mongo/util/log.h" - -namespace mongo { -namespace executor { - -namespace { -StringData stateToString(AsyncMockStreamFactory::MockStream::StreamState state) { - switch (state) { - case AsyncMockStreamFactory::MockStream::StreamState::kRunning: - return "Running"; - case AsyncMockStreamFactory::MockStream::StreamState::kBlockedBeforeConnect: - return "Blocked before connect"; - case AsyncMockStreamFactory::MockStream::StreamState::kBlockedAfterWrite: - return "Blocked after write"; - case AsyncMockStreamFactory::MockStream::StreamState::kBlockedBeforeRead: - return "Blocked before read"; - case AsyncMockStreamFactory::MockStream::StreamState::kCanceled: - return "Canceled"; - } - MONGO_UNREACHABLE; -} - -template <typename Handler> -void checkCanceled(asio::io_service::strand* strand, - AsyncMockStreamFactory::MockStream::StreamState* state, - Handler&& handler, - std::size_t bytes, - std::error_code ec = std::error_code()) { - auto wasCancelled = (*state == AsyncMockStreamFactory::MockStream::StreamState::kCanceled); - *state = AsyncMockStreamFactory::MockStream::StreamState::kRunning; - strand->post([handler, wasCancelled, bytes, ec] { - handler(wasCancelled ? make_error_code(asio::error::operation_aborted) : ec, bytes); - }); -} - -} // namespace - -std::unique_ptr<AsyncStreamInterface> AsyncMockStreamFactory::makeStream( - asio::io_service::strand* strand, const HostAndPort& target) { - return stdx::make_unique<MockStream>(strand, this, target); -} - -void AsyncMockStreamFactory::_createStream(const HostAndPort& host, MockStream* stream) { - stdx::lock_guard<stdx::mutex> lk(_factoryMutex); - log() << "creating stream for: " << host; - _streams.emplace(host, stream); - _factoryCv.notify_all(); -} - -void AsyncMockStreamFactory::_destroyStream(const HostAndPort& host) { - stdx::lock_guard<stdx::mutex> lk(_factoryMutex); - log() << "destroying stream for: " << host; - _streams.erase(host); -} - -AsyncMockStreamFactory::MockStream* AsyncMockStreamFactory::blockUntilStreamExists( - const HostAndPort& host) { - stdx::unique_lock<stdx::mutex> lk(_factoryMutex); - - auto iter = std::begin(_streams); - - _factoryCv.wait(lk, [&] { return (iter = _streams.find(host)) != std::end(_streams); }); - - return iter->second; -} - -AsyncMockStreamFactory::MockStream::MockStream(asio::io_service::strand* strand, - AsyncMockStreamFactory* factory, - const HostAndPort& target) - : _strand(strand), _factory(factory), _target(target) { - _factory->_createStream(_target, this); -} - -AsyncMockStreamFactory::MockStream::~MockStream() { - _factory->_destroyStream(_target); -} - -void AsyncMockStreamFactory::MockStream::connect(asio::ip::tcp::resolver::iterator endpoints, - ConnectHandler&& connectHandler) { - // Suspend execution after "connecting" - _defer(kBlockedBeforeConnect, [this, connectHandler, endpoints]() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - - // We shim a lambda to give connectHandler the right signature since it doesn't take - // a size_t param. - checkCanceled( - _strand, - &_state, - [connectHandler](std::error_code ec, std::size_t) { return connectHandler(ec); }, - 0); - }); -} - -void AsyncMockStreamFactory::MockStream::write(asio::const_buffer buf, - StreamHandler&& writeHandler) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - - auto begin = asio::buffer_cast<const uint8_t*>(buf); - auto size = asio::buffer_size(buf); - _writeQueue.push({begin, begin + size}); - - // Suspend execution after data is written. - _defer_inlock(kBlockedAfterWrite, [this, writeHandler, size]() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - checkCanceled(_strand, &_state, std::move(writeHandler), size); - }); -} - -void AsyncMockStreamFactory::MockStream::cancel() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - log() << "cancel() for: " << _target; - - // If _state is kRunning then we don't have a deferred operation to cancel. - if (_state == kRunning) { - return; - } - - _state = kCanceled; -} - -void AsyncMockStreamFactory::MockStream::read(asio::mutable_buffer buf, - StreamHandler&& readHandler) { - // Suspend execution before data is read. - _defer(kBlockedBeforeRead, [this, buf, readHandler]() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - int nToCopy = 0; - - // If we've set an error, return that instead of a read. - if (!_error) { - auto nextRead = std::move(_readQueue.front()); - _readQueue.pop(); - - auto beginDst = asio::buffer_cast<uint8_t*>(buf); - nToCopy = std::min(nextRead.size(), asio::buffer_size(buf)); - - auto endSrc = std::begin(nextRead); - std::advance(endSrc, nToCopy); - - auto endDst = std::copy(std::begin(nextRead), endSrc, beginDst); - invariant((endDst - beginDst) == static_cast<std::ptrdiff_t>(nToCopy)); - log() << "read " << nToCopy << " bytes, " << (nextRead.size() - nToCopy) - << " remaining in buffer"; - } - - auto handler = readHandler; - - // If we did not receive all the bytes, we should return an error - if (static_cast<size_t>(nToCopy) < asio::buffer_size(buf)) { - handler = [readHandler](std::error_code ec, size_t len) { - // If we have an error here we've been canceled, and that takes precedence - if (ec) - return readHandler(ec, len); - - // Call the original handler with an error - readHandler(make_error_code(ErrorCodes::InvalidLength), len); - }; - } - - checkCanceled(_strand, &_state, std::move(handler), nToCopy, _error); - _error.clear(); - }); -} - -void AsyncMockStreamFactory::MockStream::pushRead(std::vector<uint8_t> toRead) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - invariant(_state != kRunning); - _readQueue.emplace(std::move(toRead)); -} - -void AsyncMockStreamFactory::MockStream::setError(std::error_code ec) { - _error = ec; -} - -std::vector<uint8_t> AsyncMockStreamFactory::MockStream::popWrite() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - invariant(_state != kRunning); - auto nextWrite = std::move(_writeQueue.front()); - _writeQueue.pop(); - return nextWrite; -} - -void AsyncMockStreamFactory::MockStream::_defer(StreamState state, Action&& handler) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - _defer_inlock(state, std::move(handler)); -} - -void AsyncMockStreamFactory::MockStream::_defer_inlock(StreamState state, Action&& handler) { - invariant(_state == kRunning); - _state = state; - - invariant(!_deferredAction); - _deferredAction = std::move(handler); - _deferredCV.notify_one(); -} - -void AsyncMockStreamFactory::MockStream::unblock() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - _unblock_inlock(); -} - -void AsyncMockStreamFactory::MockStream::_unblock_inlock() { - // Can be canceled here at which point we will call the handler with the CallbackCanceled - // status when we invoke the _deferredAction. - invariant(_state != kRunning); - - if (_state != kCanceled) { - _state = kRunning; - } - // Post our deferred action to resume state machine execution - invariant(_deferredAction); - _strand->post(std::move(_deferredAction)); - _deferredAction = nullptr; -} - -auto AsyncMockStreamFactory::MockStream::waitUntilBlocked() -> StreamState { - stdx::unique_lock<stdx::mutex> lk(_mutex); - _deferredCV.wait(lk, [this]() { return _state != kRunning; }); - log() << "returning from waitUntilBlocked, state: " << stateToString(_state); - return _state; -} - -HostAndPort AsyncMockStreamFactory::MockStream::target() { - return _target; -} - -void AsyncMockStreamFactory::MockStream::simulateServer( - rpc::Protocol proto, - const stdx::function<RemoteCommandResponse(RemoteCommandRequest)> replyFunc) { - std::exception_ptr ex; - uint32_t messageId = 0; - - RemoteCommandResponse resp; - { - WriteEvent write{this}; - - std::vector<uint8_t> messageData = popWrite(); - Message msg(SharedBuffer::allocate(messageData.size())); - memcpy(msg.buf(), messageData.data(), messageData.size()); - - auto request = rpc::opMsgRequestFromAnyProtocol(msg); - ASSERT(rpc::protocolForMessage(msg) == proto); - - RemoteCommandRequest rcr(target(), request.getDatabase().toString(), request.body, nullptr); - - messageId = msg.header().getId(); - - // So we can allow ASSERTs in replyFunc, we capture any exceptions, but rethrow - // them later to prevent deadlock - try { - resp = replyFunc(std::move(rcr)); - } catch (...) { - ex = std::current_exception(); - } - } - - auto replyBuilder = rpc::makeReplyBuilder(proto); - replyBuilder->setCommandReply(resp.data); - replyBuilder->setMetadata(resp.metadata); - - auto replyMsg = replyBuilder->done(); - replyMsg.header().setResponseToMsgId(messageId); - - { - // The first read will be for the header. - ReadEvent read{this}; - auto hdrBytes = reinterpret_cast<const uint8_t*>(replyMsg.header().view2ptr()); - pushRead({hdrBytes, hdrBytes + sizeof(MSGHEADER::Value)}); - } - - { - // The second read will be for the message data. - ReadEvent read{this}; - auto dataBytes = reinterpret_cast<const uint8_t*>(replyMsg.buf()); - auto pastHeader = dataBytes; - std::advance(pastHeader, sizeof(MSGHEADER::Value)); - pushRead({pastHeader, dataBytes + static_cast<std::size_t>(replyMsg.size())}); - } - - if (ex) { - // Rethrow ASSERTS after the NIA completes it's Write-Read-Read sequence. - std::rethrow_exception(ex); - } -} - -bool AsyncMockStreamFactory::MockStream::isOpen() { - return true; -} - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/async_mock_stream_factory.h b/src/mongo/executor/async_mock_stream_factory.h deleted file mode 100644 index 6f65edd8425..00000000000 --- a/src/mongo/executor/async_mock_stream_factory.h +++ /dev/null @@ -1,226 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <asio.hpp> -#include <cstdint> -#include <memory> -#include <queue> - -#include "mongo/executor/async_stream_factory_interface.h" -#include "mongo/executor/async_stream_interface.h" -#include "mongo/executor/remote_command_request.h" -#include "mongo/executor/remote_command_response.h" -#include "mongo/rpc/protocol.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/functional.h" -#include "mongo/stdx/mutex.h" -#include "mongo/stdx/unordered_map.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/net/hostandport.h" - -namespace mongo { -namespace executor { - -class AsyncStreamInterface; - -/** - * A factory that produces mock streams to allow for testing of NetworkInterfaceASIO. - * - * The streams produced by this factory simulate a flow of Events (ConnectEvent, - * ReadEvent, WriteEvent). The streams created by this factory will automatically - * pause themselves at each Event, and the caller must unblock them by destroying - * the Event object to continue. - * - * Example use of this factory: - * - * AsyncMockStreamFactory factory(); - * - * // NIA will then call makeStream(...) to create new streams from the - * // factory, or the caller can do this manually. - * - * // Wait for the desired stream to exist - * auto stream = streamFactory.blockUntilStreamExists(host); - * - * // If we do not care to inspect after a certain event, we can skip it: - * ConnectEvent{stream}.skip(); - * - * // To examine the stream at an Event, instantiate the event object. - * // When the Event object goes out of scope the stream will unblock. - * { - * WriteEvent write{stream}; - * - * // Inspect what NIA wrote to this stream: - * auto messageData = stream->popWrite(); - * ... - * } - * - * // The Event object will keep the stream blocked as long as it exists. - * // Use this window to perform operations on the stream or inspect it. - * { - * ReadEvent read{stream}; - * - * // Simulate data sent to this stream over the network - * stream->pushRead( ... ); - * - * // Or, simulate a networking error - * stream->setError( error_code ); - * } - */ -class AsyncMockStreamFactory final : public AsyncStreamFactoryInterface { -public: - AsyncMockStreamFactory() = default; - - std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service::strand* strand, - const HostAndPort& host) override; - - /** - * A mock stream class for testing the egress networking layer. - * - * At the core of this class is an idea of deferring actions and allowing inspection - * of state of the stream before those actions happen. - * - * This class operates on the assumption that two threads are in use: a networking - * thread used by NIA to issue IO calls on the MockStream, and a test thread to - * wait on those calls and react. - * - * When the test thread creates an Event object, the constructor sends it to wait - * on a condition variable. When NIA issues an IO call on the stream, the MockStream - * load the proper handler into a placeholder, and then calls notify() on the - * condition variable. At that point the stream is paused and the test thread - * may operate on it. - */ - class MockStream final : public AsyncStreamInterface { - public: - MockStream(asio::io_service::strand* strand, - AsyncMockStreamFactory* factory, - const HostAndPort& target); - - // Use unscoped enum so we can specialize on it - enum StreamState { - kRunning, - kBlockedBeforeConnect, - kBlockedBeforeRead, - kBlockedAfterWrite, - kCanceled, - }; - - ~MockStream(); - - void connect(asio::ip::tcp::resolver::iterator endpoints, - ConnectHandler&& connectHandler) override; - void write(asio::const_buffer buf, StreamHandler&& writeHandler) override; - void read(asio::mutable_buffer buf, StreamHandler&& readHandler) override; - - bool isOpen() override; - - HostAndPort target(); - - StreamState waitUntilBlocked(); - - void cancel() override; - - std::vector<uint8_t> popWrite(); - void pushRead(std::vector<uint8_t> toRead); - - void setError(std::error_code ec); - - void unblock(); - - void simulateServer( - rpc::Protocol proto, - const stdx::function<RemoteCommandResponse(RemoteCommandRequest)> replyFunc); - - private: - using Action = stdx::function<void()>; - - void _defer(StreamState state, Action&& handler); - void _defer_inlock(StreamState state, Action&& handler); - void _unblock_inlock(); - - asio::io_service::strand* _strand; - - AsyncMockStreamFactory* _factory; - HostAndPort _target; - - stdx::mutex _mutex; - - stdx::condition_variable _deferredCV; - StreamState _state{kRunning}; - - std::queue<std::vector<uint8_t>> _readQueue; - std::queue<std::vector<uint8_t>> _writeQueue; - - std::error_code _error; - - Action _deferredAction; - }; - - MockStream* blockUntilStreamExists(const HostAndPort& host); - -private: - void _createStream(const HostAndPort& host, MockStream* stream); - void _destroyStream(const HostAndPort& host); - - stdx::mutex _factoryMutex; - stdx::condition_variable _factoryCv; - - stdx::unordered_map<HostAndPort, MockStream*> _streams; -}; - -template <int EventType> -class StreamEvent { -public: - StreamEvent(AsyncMockStreamFactory::MockStream* stream) : _stream(stream) { - ASSERT(stream->waitUntilBlocked() == EventType); - } - - void skip() { - _stream->unblock(); - skipped = true; - } - - ~StreamEvent() { - if (!skipped) { - skip(); - } - } - -private: - bool skipped = false; - AsyncMockStreamFactory::MockStream* _stream = nullptr; -}; - -using ReadEvent = StreamEvent<AsyncMockStreamFactory::MockStream::StreamState::kBlockedBeforeRead>; -using WriteEvent = StreamEvent<AsyncMockStreamFactory::MockStream::StreamState::kBlockedAfterWrite>; -using ConnectEvent = - StreamEvent<AsyncMockStreamFactory::MockStream::StreamState::kBlockedBeforeConnect>; - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/async_secure_stream.cpp b/src/mongo/executor/async_secure_stream.cpp deleted file mode 100644 index eb69bd5d18e..00000000000 --- a/src/mongo/executor/async_secure_stream.cpp +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO - -#include "mongo/platform/basic.h" - -#include "mongo/executor/async_secure_stream.h" - -#include "mongo/base/system_error.h" -#include "mongo/config.h" -#include "mongo/executor/async_stream_common.h" -#include "mongo/util/log.h" -#include "mongo/util/net/ssl_manager.h" - -#ifdef MONGO_CONFIG_SSL - -namespace mongo { -namespace executor { - -AsyncSecureStream::AsyncSecureStream(asio::io_service::strand* strand, - asio::ssl::context* sslContext) - : _strand(strand), _stream(_strand->get_io_service(), *sslContext, "") {} - -AsyncSecureStream::~AsyncSecureStream() { - destroyStream(&_stream.lowest_layer(), _connected); -} - -void AsyncSecureStream::connect(asio::ip::tcp::resolver::iterator endpoints, - ConnectHandler&& connectHandler) { - // Stash the connectHandler as we won't be able to call it until we re-enter the state - // machine. - _userHandler = std::move(connectHandler); - asio::async_connect( - _stream.lowest_layer(), - std::move(endpoints), - _strand->wrap([this](std::error_code ec, asio::ip::tcp::resolver::iterator iter) { - if (ec) { - return _userHandler(ec); - } - - ec = setStreamNonBlocking(&_stream.next_layer()); - if (ec) { - return _userHandler(ec); - } - - ec = setStreamNoDelay(&_stream.next_layer()); - if (ec) { - return _userHandler(ec); - } - - _connected = true; - return _handleConnect(std::move(iter)); - })); -} - -void AsyncSecureStream::write(asio::const_buffer buffer, StreamHandler&& streamHandler) { - writeStream(&_stream, _strand, _connected, buffer, std::move(streamHandler)); -} - -void AsyncSecureStream::read(asio::mutable_buffer buffer, StreamHandler&& streamHandler) { - readStream(&_stream, _strand, _connected, buffer, std::move(streamHandler)); -} - -void AsyncSecureStream::_handleConnect(asio::ip::tcp::resolver::iterator iter) { - _stream.async_handshake(decltype(_stream)::client, - _strand->wrap([this, iter](std::error_code ec) { - if (ec) { - return _userHandler(ec); - } - return _handleHandshake(ec, iter->host_name()); - })); -} - -void AsyncSecureStream::_handleHandshake(std::error_code ec, const std::string& hostName) { - auto certStatus = - getSSLManager()->parseAndValidatePeerCertificate(_stream.native_handle(), hostName); - if (certStatus.isOK()) { - _userHandler(make_error_code(ErrorCodes::OK)); - } else { - warning() << "Failed to validate peer certificate during SSL handshake: " - << certStatus.getStatus(); - // We aren't able to propagate error extra info through here so make sure we only use a code - // that won't have any. - _userHandler(make_error_code(ErrorCodes::SSLHandshakeFailed)); - } -} - -void AsyncSecureStream::cancel() { - cancelStream(&_stream.lowest_layer()); -} - -bool AsyncSecureStream::isOpen() { - return checkIfStreamIsOpen(&_stream.next_layer(), _connected); -} - -} // namespace executor -} // namespace mongo - -#endif diff --git a/src/mongo/executor/async_secure_stream.h b/src/mongo/executor/async_secure_stream.h deleted file mode 100644 index cead9d6fada..00000000000 --- a/src/mongo/executor/async_secure_stream.h +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/config.h" - -#ifdef MONGO_CONFIG_SSL - -#include <asio.hpp> - -#include "mongo/executor/async_stream_interface.h" -#include "mongo/util/net/ssl.hpp" - -namespace mongo { -namespace executor { - -class AsyncSecureStream final : public AsyncStreamInterface { -public: - AsyncSecureStream(asio::io_service::strand* strand, asio::ssl::context* sslContext); - - ~AsyncSecureStream(); - - void connect(asio::ip::tcp::resolver::iterator endpoints, - ConnectHandler&& connectHandler) override; - - void write(asio::const_buffer buffer, StreamHandler&& streamHandler) override; - - void read(asio::mutable_buffer buffer, StreamHandler&& streamHandler) override; - - void cancel() override; - - bool isOpen() override; - -private: - void _handleConnect(asio::ip::tcp::resolver::iterator iter); - - void _handleHandshake(std::error_code ec, const std::string& hostName); - - asio::io_service::strand* const _strand; - asio::ssl::stream<asio::ip::tcp::socket> _stream; - ConnectHandler _userHandler; - bool _connected = false; -}; - -} // namespace executor -} // namespace mongo - -#endif diff --git a/src/mongo/executor/async_secure_stream_factory.cpp b/src/mongo/executor/async_secure_stream_factory.cpp deleted file mode 100644 index a907dbe894d..00000000000 --- a/src/mongo/executor/async_secure_stream_factory.cpp +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/executor/async_secure_stream_factory.h" - -#include "mongo/config.h" -#include "mongo/executor/async_secure_stream.h" -#include "mongo/executor/async_stream.h" -#include "mongo/stdx/memory.h" -#include "mongo/util/net/ssl_manager.h" -#include "mongo/util/net/ssl_options.h" - -#ifdef MONGO_CONFIG_SSL - -namespace mongo { -namespace executor { - -AsyncSecureStreamFactory::AsyncSecureStreamFactory(SSLManagerInterface* sslManager) - : _sslContext(asio::ssl::context::sslv23) { - // We use sslv23, which corresponds to OpenSSLs SSLv23_method, for compatibility with older - // versions of OpenSSL. This mirrors the call to SSL_CTX_new in ssl_manager.cpp. In - // initAsyncSSLContext we explicitly disable all protocols other than TLSv1, TLSv1.1, - // and TLSv1.2. - uassertStatusOK( - sslManager->initSSLContext(_sslContext.native_handle(), - getSSLGlobalParams(), - SSLManagerInterface::ConnectionDirection::kOutgoing)); -} - -std::unique_ptr<AsyncStreamInterface> AsyncSecureStreamFactory::makeStream( - asio::io_service::strand* strand, const HostAndPort&) { - int sslModeVal = getSSLGlobalParams().sslMode.load(); - if (sslModeVal == SSLParams::SSLMode_preferSSL || sslModeVal == SSLParams::SSLMode_requireSSL) { - return stdx::make_unique<AsyncSecureStream>(strand, &_sslContext); - } - return stdx::make_unique<AsyncStream>(strand); -} - -} // namespace executor -} // namespace mongo - -#endif diff --git a/src/mongo/executor/async_secure_stream_factory.h b/src/mongo/executor/async_secure_stream_factory.h deleted file mode 100644 index ad25ad487d9..00000000000 --- a/src/mongo/executor/async_secure_stream_factory.h +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/config.h" - -#ifdef MONGO_CONFIG_SSL - -#include <asio.hpp> - -#include "mongo/executor/async_stream_factory_interface.h" -#include "mongo/util/net/ssl.hpp" - -namespace mongo { -class SSLManagerInterface; -namespace executor { - -class AsyncStreamInterface; - -class AsyncSecureStreamFactory final : public AsyncStreamFactoryInterface { -public: - AsyncSecureStreamFactory(SSLManagerInterface* sslManager); - - std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service::strand* strand, - const HostAndPort&) override; - -private: - asio::ssl::context _sslContext; -}; - -} // namespace executor -} // namespace mongo - -#endif diff --git a/src/mongo/executor/async_stream.cpp b/src/mongo/executor/async_stream.cpp deleted file mode 100644 index 075a9094dd4..00000000000 --- a/src/mongo/executor/async_stream.cpp +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO - -#include "mongo/platform/basic.h" - -#include "mongo/executor/async_stream.h" -#include "mongo/executor/async_stream_common.h" -#include "mongo/util/assert_util.h" -#include "mongo/util/log.h" - -namespace mongo { -namespace executor { - -using asio::ip::tcp; - -AsyncStream::AsyncStream(asio::io_service::strand* strand) - : _strand(strand), _stream(_strand->get_io_service()) {} - -AsyncStream::~AsyncStream() { - destroyStream(&_stream, _connected); -} - -void AsyncStream::connect(tcp::resolver::iterator iter, ConnectHandler&& connectHandler) { - asio::async_connect( - _stream, - std::move(iter), - // We need to wrap this with a lambda of the right signature so it compiles, even - // if we don't actually use the resolver iterator. - _strand->wrap([this, connectHandler](std::error_code ec, tcp::resolver::iterator iter) { - if (ec) { - return connectHandler(ec); - } - - // We assume that our owner is responsible for keeping us alive until we call - // connectHandler, so _connected should always be a valid memory location. - ec = setStreamNonBlocking(&_stream); - if (ec) { - return connectHandler(ec); - } - - ec = setStreamNoDelay(&_stream); - if (ec) { - return connectHandler(ec); - } - - _connected = true; - return connectHandler(ec); - })); -} - -void AsyncStream::write(asio::const_buffer buffer, StreamHandler&& streamHandler) { - writeStream(&_stream, _strand, _connected, buffer, std::move(streamHandler)); -} - -void AsyncStream::read(asio::mutable_buffer buffer, StreamHandler&& streamHandler) { - readStream(&_stream, _strand, _connected, buffer, std::move(streamHandler)); -} - -void AsyncStream::cancel() { - cancelStream(&_stream); -} - -bool AsyncStream::isOpen() { - return checkIfStreamIsOpen(&_stream, _connected); -} - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/async_stream.h b/src/mongo/executor/async_stream.h deleted file mode 100644 index 3a9c9d42b15..00000000000 --- a/src/mongo/executor/async_stream.h +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <asio.hpp> - -#include "mongo/executor/async_stream_interface.h" - -namespace mongo { -namespace executor { - -class AsyncStream final : public AsyncStreamInterface { -public: - AsyncStream(asio::io_service::strand* strand); - - ~AsyncStream(); - - void connect(asio::ip::tcp::resolver::iterator iter, ConnectHandler&& connectHandler) override; - - void write(asio::const_buffer buffer, StreamHandler&& streamHandler) override; - - void read(asio::mutable_buffer buffer, StreamHandler&& streamHandler) override; - - void cancel() override; - - bool isOpen() override; - -private: - asio::io_service::strand* const _strand; - asio::ip::tcp::socket _stream; - bool _connected = false; -}; - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/async_stream_common.cpp b/src/mongo/executor/async_stream_common.cpp deleted file mode 100644 index 92708862c94..00000000000 --- a/src/mongo/executor/async_stream_common.cpp +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO - -#include "mongo/platform/basic.h" - -#include "mongo/executor/async_stream_common.h" - -#include "mongo/util/log.h" - -namespace mongo { -namespace executor { - -void logCloseFailed(std::error_code ec) { - invariant(ec); - log() << "Failed to close stream: " << ec.message(); -} - -void logCancelFailed(std::error_code ec) { - invariant(ec); - log() << "Failed to cancel stream: " << ec.message(); -} - -void logFailureInSetStreamNonBlocking(std::error_code ec) { - invariant(ec); - severe() << "Failed to set non-blocking mode on stream: " << ec.message(); -} - -void logFailureInSetStreamNoDelay(std::error_code ec) { - invariant(ec); - severe() << "Failed to set no-delay mode on stream: " << ec.message(); -} - -void logUnexpectedErrorInCheckOpen(std::error_code ec) { - invariant(ec); - log() << "unexpected error when checking if a stream was open: " << ec.message() - << ", the only errors we expect are EOF and network/connection reset"; -} - - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/async_stream_common.h b/src/mongo/executor/async_stream_common.h deleted file mode 100644 index 67c12bbff3b..00000000000 --- a/src/mongo/executor/async_stream_common.h +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include <asio.hpp> -#include <system_error> -#include <utility> - -#include "mongo/util/assert_util.h" - -namespace mongo { -namespace executor { - -void logCloseFailed(std::error_code ec); - -template <typename ASIOStream> -void destroyStream(ASIOStream* stream, bool connected) { - if (!connected) { - return; - } - - std::error_code ec; - - stream->shutdown(asio::ip::tcp::socket::shutdown_both, ec); - if (ec) { - logCloseFailed(ec); - } - - stream->close(ec); - if (ec) { - logCloseFailed(ec); - } -} - -template <typename ASIOStream, typename Buffer, typename Handler> -void writeStream(ASIOStream* stream, - asio::io_service::strand* strand, - bool connected, - Buffer&& buffer, - Handler&& handler) { - invariant(connected); - asio::async_write(*stream, - asio::buffer(std::forward<Buffer>(buffer)), - strand->wrap(std::forward<Handler>(handler))); -} - -template <typename ASIOStream, typename Buffer, typename Handler> -void readStream(ASIOStream* stream, - asio::io_service::strand* strand, - bool connected, - Buffer&& buffer, - Handler&& handler) { - invariant(connected); - asio::async_read(*stream, - asio::buffer(std::forward<Buffer>(buffer)), - strand->wrap(std::forward<Handler>(handler))); -} - -void logCancelFailed(std::error_code ec); - -template <typename ASIOStream> -void cancelStream(ASIOStream* stream) { - std::error_code ec; - stream->cancel(ec); - if (ec) { - logCancelFailed(ec); - } -} - -void logFailureInSetStreamNonBlocking(std::error_code ec); -void logFailureInSetStreamNoDelay(std::error_code ec); - -template <typename ASIOStream> -std::error_code setStreamNonBlocking(ASIOStream* stream) { - std::error_code ec; - stream->non_blocking(true, ec); - if (ec) { - logFailureInSetStreamNonBlocking(ec); - } - return ec; -} - -template <typename ASIOStream> -std::error_code setStreamNoDelay(ASIOStream* stream) { - std::error_code ec; - stream->set_option(asio::ip::tcp::no_delay(true), ec); - if (ec) { - logFailureInSetStreamNoDelay(ec); - } - return ec; -} - -void logUnexpectedErrorInCheckOpen(std::error_code ec); - -template <typename ASIOStream> -bool checkIfStreamIsOpen(ASIOStream* stream, bool connected) { - if (!connected) { - return false; - }; - std::error_code ec; - std::array<char, 1> buf; - // Although we call the blocking form of receive, we ensure the socket is in non-blocking mode. - // ASIO implements receive on POSIX using the 'recvmsg' system call, which returns immediately - // if the socket is non-blocking and in a valid state, but there is no data to receive. On - // windows, receive is implemented with WSARecv, which has the same semantics. - invariant(stream->non_blocking()); - stream->receive(asio::buffer(buf), asio::socket_base::message_peek, ec); - if (!ec || ec == asio::error::would_block || ec == asio::error::try_again) { - // If the read worked or we got EWOULDBLOCK or EAGAIN (since we are in non-blocking mode), - // we assume the socket is still open. - return true; - } else if (ec == asio::error::eof || ec == asio::error::connection_reset || - ec == asio::error::network_reset) { - return false; - } - // We got a different error. Log it and return false so we throw the connection away. - logUnexpectedErrorInCheckOpen(ec); - return false; -} - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/async_stream_factory.cpp b/src/mongo/executor/async_stream_factory.cpp deleted file mode 100644 index 5f24adf341b..00000000000 --- a/src/mongo/executor/async_stream_factory.cpp +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/executor/async_stream_factory.h" - -#include "mongo/executor/async_secure_stream.h" -#include "mongo/executor/async_stream.h" -#include "mongo/stdx/memory.h" -#include "mongo/util/assert_util.h" - -namespace mongo { -namespace executor { - -std::unique_ptr<AsyncStreamInterface> AsyncStreamFactory::makeStream( - asio::io_service::strand* strand, const HostAndPort&) { - return stdx::make_unique<AsyncStream>(strand); -} - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/async_stream_factory.h b/src/mongo/executor/async_stream_factory.h deleted file mode 100644 index bbf9767427b..00000000000 --- a/src/mongo/executor/async_stream_factory.h +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <memory> - -#include "mongo/executor/async_stream_factory_interface.h" - -namespace mongo { -namespace executor { - -class AsyncStreamInterface; - -class AsyncStreamFactory final : public AsyncStreamFactoryInterface { -public: - AsyncStreamFactory() = default; - - std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service::strand* strand, - const HostAndPort&) override; -}; - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/async_stream_factory_interface.h b/src/mongo/executor/async_stream_factory_interface.h deleted file mode 100644 index c894a1fee5a..00000000000 --- a/src/mongo/executor/async_stream_factory_interface.h +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <asio.hpp> -#include <memory> - -#include "mongo/base/disallow_copying.h" - -namespace mongo { -struct HostAndPort; -namespace executor { - -class AsyncStreamInterface; - -/** - * A factory for AsyncStream creation to support mocking. - */ -class AsyncStreamFactoryInterface { - MONGO_DISALLOW_COPYING(AsyncStreamFactoryInterface); - -public: - virtual ~AsyncStreamFactoryInterface() = default; - - virtual std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service::strand* strand, - const HostAndPort& target) = 0; - -protected: - AsyncStreamFactoryInterface() = default; -}; - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/async_stream_interface.h b/src/mongo/executor/async_stream_interface.h deleted file mode 100644 index ca97e350923..00000000000 --- a/src/mongo/executor/async_stream_interface.h +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <asio.hpp> -#include <memory> -#include <system_error> - -#include "mongo/base/disallow_copying.h" -#include "mongo/stdx/functional.h" - -namespace mongo { -namespace executor { - -/** - * A bidirectional stream supporting asynchronous reads and writes. - */ -class AsyncStreamInterface { - MONGO_DISALLOW_COPYING(AsyncStreamInterface); - -public: - virtual ~AsyncStreamInterface() = default; - - using ConnectHandler = stdx::function<void(std::error_code)>; - - using StreamHandler = stdx::function<void(std::error_code, std::size_t)>; - - virtual void connect(asio::ip::tcp::resolver::iterator endpoints, - ConnectHandler&& connectHandler) = 0; - - virtual void write(asio::const_buffer buf, StreamHandler&& writeHandler) = 0; - - virtual void read(asio::mutable_buffer buf, StreamHandler&& readHandler) = 0; - - virtual void cancel() = 0; - - virtual bool isOpen() = 0; - -protected: - AsyncStreamInterface() = default; -}; - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/async_stream_test.cpp b/src/mongo/executor/async_stream_test.cpp deleted file mode 100644 index 2731df5d713..00000000000 --- a/src/mongo/executor/async_stream_test.cpp +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO - -#include "mongo/platform/basic.h" - -#include <asio.hpp> -#include <system_error> -#include <vector> - -#include "mongo/executor/async_stream.h" -#include "mongo/executor/network_interface_asio_test_utils.h" -#include "mongo/platform/atomic_word.h" -#include "mongo/stdx/mutex.h" -#include "mongo/stdx/thread.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/assert_util.h" -#include "mongo/util/log.h" -#include "mongo/util/scopeguard.h" - -namespace mongo { -namespace { - -using asio::ip::tcp; - -class Server { -public: - void startup() { - _thread = stdx::thread([this] { - try { - tcp::acceptor acceptor{_io_service, - // Let the OS choose the accepting port. - tcp::endpoint{asio::ip::make_address_v4("127.0.0.1"), 0}}; - _endpoint.emplace(acceptor.local_endpoint()); - tcp::socket socket{_io_service}; - _started.emplace(true); - acceptor.accept(socket); - log() << "got incoming connection"; - _stopped.get(); - log() << "shutting down socket"; - socket.shutdown(tcp::socket::shutdown_both); - } catch (...) { - log() << exceptionToStatus(); - } - }); - _started.get(); - } - - // idempotent - void shutdown() { - _stopped.emplace(true); - _thread.join(); - } - - Server() { - startup(); - } - - tcp::endpoint endpoint() { - return _endpoint.get(); - } - - ~Server() { - if (!_stopped.hasCompleted()) { - shutdown(); - } - } - -private: - stdx::thread _thread; - executor::Deferred<bool> _started; - executor::Deferred<bool> _stopped; - executor::Deferred<tcp::endpoint> _endpoint; - asio::io_service _io_service; - std::vector<tcp::socket> _sockets; -}; - -TEST(AsyncStreamTest, IsOpen) { - Server server; - asio::io_service io_service; - stdx::thread clientWorker([&io_service] { - log() << "starting clientWorker"; - asio::io_service::work work(io_service); - io_service.run(); - }); - auto guard = MakeGuard([&clientWorker, &io_service] { - io_service.stop(); - clientWorker.join(); - }); - asio::io_service::strand strand{io_service}; - executor::AsyncStream stream{&strand}; - ASSERT_FALSE(stream.isOpen()); - - tcp::resolver resolver{io_service}; - auto endpoints = resolver.resolve(server.endpoint()); - - executor::Deferred<bool> opened; - - log() << "opening up outgoing connection"; - stream.connect(endpoints, [opened](std::error_code ec) mutable { - log() << "opened outgoing connection"; - opened.emplace(!ec); - }); - - ASSERT_TRUE(opened.get()); - ASSERT_TRUE(stream.isOpen()); - - server.shutdown(); - - // There is nothing we can wait on to determinstically know when - // the socket will transition to closed. Busy wait for that. - while (stream.isOpen()) { - stdx::this_thread::sleep_for(Milliseconds(1).toSystemDuration()); - } -} - -} // namespace -} // namespace mongo diff --git a/src/mongo/executor/async_timer_asio.cpp b/src/mongo/executor/async_timer_asio.cpp deleted file mode 100644 index bc287540e93..00000000000 --- a/src/mongo/executor/async_timer_asio.cpp +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO - -#include "mongo/platform/basic.h" - -#include "mongo/executor/async_timer_asio.h" - -#include "mongo/stdx/chrono.h" -#include "mongo/stdx/memory.h" -#include "mongo/util/log.h" - -namespace mongo { -namespace executor { - -AsyncTimerASIO::AsyncTimerASIO(asio::io_service::strand* strand, Milliseconds expiration) - : _strand(strand), _timer(_strand->get_io_service(), expiration.toSystemDuration()) {} - -void AsyncTimerASIO::cancel() { - std::error_code ec; - _timer.cancel(ec); - if (ec) { - log() << "Failed to cancel timer: " << ec.message(); - } -} - -void AsyncTimerASIO::asyncWait(AsyncTimerInterface::Handler handler) { - _timer.async_wait(_strand->wrap(std::move(handler))); -} - -void AsyncTimerASIO::expireAfter(Milliseconds expiration) { - _timer.expires_after(stdx::chrono::milliseconds(expiration.count())); -} - -std::unique_ptr<AsyncTimerInterface> AsyncTimerFactoryASIO::make(asio::io_service::strand* strand, - Milliseconds expiration) { - return stdx::make_unique<AsyncTimerASIO>(strand, expiration); -} - -Date_t AsyncTimerFactoryASIO::now() { - return Date_t::fromDurationSinceEpoch(asio::system_timer::clock_type::now() - - asio::system_timer::clock_type::from_time_t(0)); -} - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/async_timer_asio.h b/src/mongo/executor/async_timer_asio.h deleted file mode 100644 index 01e1fbe18b8..00000000000 --- a/src/mongo/executor/async_timer_asio.h +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <asio.hpp> -#include <asio/system_timer.hpp> - -#include "mongo/executor/async_timer_interface.h" - -namespace mongo { -namespace executor { - -class AsyncTimerASIO final : public AsyncTimerInterface { -public: - AsyncTimerASIO(asio::io_service::strand* strand, Milliseconds expiration); - - void cancel() override; - - void asyncWait(AsyncTimerInterface::Handler handler) override; - - void expireAfter(Milliseconds expiration) override; - -private: - asio::io_service::strand* const _strand; - asio::system_timer _timer; -}; - -class AsyncTimerFactoryASIO final : public AsyncTimerFactoryInterface { -public: - AsyncTimerFactoryASIO() = default; - - std::unique_ptr<AsyncTimerInterface> make(asio::io_service::strand* strand, - Milliseconds expiration) override; - - Date_t now() override; -}; - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/async_timer_interface.h b/src/mongo/executor/async_timer_interface.h index fdfce5b39d8..3076a8f68b6 100644 --- a/src/mongo/executor/async_timer_interface.h +++ b/src/mongo/executor/async_timer_interface.h @@ -30,8 +30,6 @@ #include <system_error> -#include <asio.hpp> - #include "mongo/base/disallow_copying.h" #include "mongo/stdx/functional.h" #include "mongo/util/time_support.h" @@ -87,8 +85,7 @@ class AsyncTimerFactoryInterface { public: virtual ~AsyncTimerFactoryInterface() = default; - virtual std::unique_ptr<AsyncTimerInterface> make(asio::io_service::strand* strand, - Milliseconds expiration) = 0; + virtual std::unique_ptr<AsyncTimerInterface> make(Milliseconds expiration) = 0; virtual Date_t now() = 0; diff --git a/src/mongo/executor/async_timer_mock.cpp b/src/mongo/executor/async_timer_mock.cpp index 39c9323612d..87904dd284b 100644 --- a/src/mongo/executor/async_timer_mock.cpp +++ b/src/mongo/executor/async_timer_mock.cpp @@ -28,6 +28,7 @@ #include "mongo/executor/async_timer_mock.h" +#include "mongo/base/system_error.h" #include "mongo/stdx/memory.h" namespace mongo { @@ -40,7 +41,7 @@ const Milliseconds kZeroMilliseconds = Milliseconds(0); AsyncTimerMockImpl::AsyncTimerMockImpl(Milliseconds expiration) : _timeLeft(expiration) {} void AsyncTimerMockImpl::cancel() { - _callAllHandlers(asio::error::operation_aborted); + _callAllHandlers(std::error_code(ErrorCodes::CallbackCanceled, mongoErrorCategory())); } void AsyncTimerMockImpl::asyncWait(AsyncTimerInterface::Handler handler) { @@ -95,7 +96,7 @@ void AsyncTimerMockImpl::expireAfter(Milliseconds expiration) { // Call handlers with a "canceled" error code for (const auto& handler : tmp) { - handler(asio::error::operation_aborted); + handler(std::error_code(ErrorCodes::CallbackCanceled, mongoErrorCategory())); } } @@ -131,11 +132,6 @@ void AsyncTimerMock::expireAfter(Milliseconds expiration) { } std::unique_ptr<AsyncTimerInterface> AsyncTimerFactoryMock::make(Milliseconds expiration) { - return make(nullptr, expiration); -} - -std::unique_ptr<AsyncTimerInterface> AsyncTimerFactoryMock::make(asio::io_service::strand* strand, - Milliseconds expiration) { stdx::lock_guard<stdx::recursive_mutex> lk(_timersMutex); auto elem = _timers.emplace(std::make_shared<AsyncTimerMockImpl>(expiration)); return stdx::make_unique<AsyncTimerMock>(*elem.first); diff --git a/src/mongo/executor/async_timer_mock.h b/src/mongo/executor/async_timer_mock.h index ad77d323f5e..20f6c039f8e 100644 --- a/src/mongo/executor/async_timer_mock.h +++ b/src/mongo/executor/async_timer_mock.h @@ -124,13 +124,7 @@ public: /** * Create and return a new AsyncTimerMock object. */ - std::unique_ptr<AsyncTimerInterface> make(Milliseconds expiration); - - /** - * Create and return a new AsyncTimerMock object. - */ - std::unique_ptr<AsyncTimerInterface> make(asio::io_service::strand* strand, - Milliseconds expiration) override; + std::unique_ptr<AsyncTimerInterface> make(Milliseconds expiration) override; /** * Advance the current "time" and make stale timers expire. diff --git a/src/mongo/executor/async_timer_mock_test.cpp b/src/mongo/executor/async_timer_mock_test.cpp deleted file mode 100644 index 53b0dbbb598..00000000000 --- a/src/mongo/executor/async_timer_mock_test.cpp +++ /dev/null @@ -1,170 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork - -#include "mongo/platform/basic.h" - -#include "mongo/executor/async_timer_mock.h" -#include "mongo/stdx/future.h" -#include "mongo/unittest/unittest.h" - -namespace mongo { -namespace executor { - -TEST(AsyncTimerMock, BasicTest) { - AsyncTimerFactoryMock factory; - - // Set an early timer - bool timer1Fired = false; - auto timer1 = factory.make(Milliseconds(1000)); - timer1->asyncWait([&timer1Fired](std::error_code ec) { - ASSERT(!ec); - timer1Fired = true; - }); - - // Set a later timer - bool timer2Fired = false; - auto timer2 = factory.make(Milliseconds(2000)); - timer2->asyncWait([&timer2Fired](std::error_code ec) { - ASSERT(!ec); - timer2Fired = true; - }); - - // Advance clock a little, nothing should fire - factory.fastForward(Milliseconds(500)); - ASSERT(!timer1Fired); - ASSERT(!timer2Fired); - - // Advance clock so early timer fires - factory.fastForward(Milliseconds(600)); - ASSERT(timer1Fired); - - // Second timer should still be waiting - ASSERT(!timer2Fired); - - // Advance clock so second timer fires - factory.fastForward(Milliseconds(1000)); - ASSERT(timer2Fired); -} - -TEST(AsyncTimerMock, Cancel) { - AsyncTimerFactoryMock factory; - - // Set a timer - bool fired = false; - auto timer = factory.make(Milliseconds(100)); - timer->asyncWait([&fired](std::error_code ec) { - // This timer should have been canceled - ASSERT(ec); - ASSERT(ec == asio::error::operation_aborted); - fired = true; - }); - - // Cancel timer - timer->cancel(); - - // Ensure that its handler was called - ASSERT(fired); -} - -TEST(AsyncTimerMock, CancelExpired) { - AsyncTimerFactoryMock factory; - - // Set a timer - bool fired = false; - auto timer = factory.make(Milliseconds(100)); - timer->asyncWait([&fired](std::error_code ec) { - // This timer should NOT have been canceled - ASSERT(!ec); - fired = true; - }); - - // Fast forward so it expires - factory.fastForward(Milliseconds(200)); - ASSERT(fired); - - fired = false; - - // Cancel it, should not fire again - timer->cancel(); - ASSERT(!fired); -} - -TEST(AsyncTimerMock, Now) { - AsyncTimerFactoryMock factory; - - ASSERT(factory.now() == Date_t::fromMillisSinceEpoch(0)); - - factory.fastForward(Milliseconds(200)); - ASSERT(factory.now() == Date_t::fromMillisSinceEpoch(200)); - - factory.fastForward(Milliseconds(1000)); - ASSERT(factory.now() == Date_t::fromMillisSinceEpoch(1200)); - - factory.fastForward(Milliseconds(1022)); - ASSERT(factory.now() == Date_t::fromMillisSinceEpoch(2222)); -} - -TEST(AsyncTimerMock, WorksAfterCancel) { - AsyncTimerFactoryMock factory; - - // Set a timer - bool fired1 = false; - auto timer = factory.make(Milliseconds(100)); - timer->asyncWait([&fired1](std::error_code ec) { - // This timer should have been canceled - ASSERT(ec); - ASSERT(ec == asio::error::operation_aborted); - fired1 = true; - }); - - // Cancel timer - timer->cancel(); - - // Ensure that its handler was called - ASSERT(fired1); - - fired1 = false; - bool fired2 = false; - - // Add a new callback to to timer. - timer->asyncWait([&fired2](std::error_code ec) { - // This timer should NOT have been canceled - ASSERT(!ec); - fired2 = true; - }); - - // Fast forward so it expires - factory.fastForward(Milliseconds(100)); - ASSERT(fired2); - ASSERT(!fired1); -} - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/connection_pool_asio.cpp b/src/mongo/executor/connection_pool_asio.cpp deleted file mode 100644 index 028316349db..00000000000 --- a/src/mongo/executor/connection_pool_asio.cpp +++ /dev/null @@ -1,333 +0,0 @@ -/** * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO - -#include "mongo/platform/basic.h" - -#include "mongo/executor/connection_pool_asio.h" - -#include <asio.hpp> - -#include "mongo/executor/async_stream_factory_interface.h" -#include "mongo/executor/network_interface_asio.h" -#include "mongo/rpc/factory.h" -#include "mongo/rpc/legacy_request_builder.h" -#include "mongo/rpc/reply_interface.h" -#include "mongo/stdx/memory.h" -#include "mongo/util/log.h" - -namespace mongo { -namespace executor { -namespace connection_pool_asio { - -ASIOTimer::ASIOTimer(asio::io_service::strand* strand) - : _strand(strand), - _impl(strand->get_io_service()), - _callbackSharedState(std::make_shared<CallbackSharedState>()) {} - -ASIOTimer::~ASIOTimer() { - stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex); - ++_callbackSharedState->id; -} - -const auto kMaxTimerDuration = duration_cast<Milliseconds>(ASIOTimer::clock_type::duration::max()); - -void ASIOTimer::setTimeout(Milliseconds timeout, TimeoutCallback cb) { - _strand->dispatch([this, timeout, cb] { - _cb = std::move(cb); - - cancelTimeout(); - - try { - _impl.expires_after(std::min(kMaxTimerDuration, timeout).toSystemDuration()); - } catch (const asio::system_error& ec) { - severe() << "Failed to set connection pool timer: " << ec.what(); - fassertFailed(40333); - } - - decltype(_callbackSharedState->id) id; - decltype(_callbackSharedState) sharedState; - - { - stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex); - id = ++_callbackSharedState->id; - sharedState = _callbackSharedState; - } - - _impl.async_wait(_strand->wrap([this, id, sharedState](const asio::error_code& error) { - if (error == asio::error::operation_aborted) { - return; - } - - stdx::unique_lock<stdx::mutex> lk(sharedState->mutex); - - // If the id in shared state doesn't match the id in our callback, it - // means we were cancelled, but still executed. This can occur if we - // were cancelled just before our timeout. We need a generation, rather - // than just a bool here, because we could have been cancelled and - // another callback set, in which case we shouldn't run and the we - // should let the other callback execute instead. - if (sharedState->id == id) { - auto cb = std::move(_cb); - lk.unlock(); - cb(); - } - })); - }); -} - -void ASIOTimer::cancelTimeout() { - decltype(_callbackSharedState) sharedState; - decltype(_callbackSharedState->id) id; - { - stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex); - id = ++_callbackSharedState->id; - sharedState = _callbackSharedState; - } - _strand->dispatch([this, id, sharedState] { - stdx::lock_guard<stdx::mutex> lk(sharedState->mutex); - if (sharedState->id != id) - return; - - std::error_code ec; - _impl.cancel(ec); - if (ec) { - log() << "Failed to cancel connection pool timer: " << ec.message(); - } - }); -} - -ASIOConnection::ASIOConnection(const HostAndPort& hostAndPort, size_t generation, ASIOImpl* global) - : _global(global), - _hostAndPort(hostAndPort), - _generation(generation), - _impl(makeAsyncOp(this)), - _timer(&_impl->strand()) {} - -ASIOConnection::~ASIOConnection() { - if (_impl) { - stdx::lock_guard<stdx::mutex> lk(_impl->_access->mutex); - _impl->_access->id++; - } -} - -void ASIOConnection::indicateSuccess() { - _status = Status::OK(); -} - -void ASIOConnection::indicateFailure(Status status) { - invariant(!status.isOK()); - _status = std::move(status); -} - -const HostAndPort& ASIOConnection::getHostAndPort() const { - return _hostAndPort; -} - -bool ASIOConnection::isHealthy() { - // Check if the remote host has closed the connection. - return _impl->connection().stream().isOpen(); -} - -void ASIOConnection::indicateUsed() { - // It is illegal to attempt to use a connection after calling indicateFailure(). - invariant(_status.isOK() || _status == ConnectionPool::kConnectionStateUnknown); - _lastUsed = _global->now(); -} - -Date_t ASIOConnection::getLastUsed() const { - return _lastUsed; -} - -const Status& ASIOConnection::getStatus() const { - return _status; -} - -size_t ASIOConnection::getGeneration() const { - return _generation; -} - -std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::makeAsyncOp(ASIOConnection* conn) { - return stdx::make_unique<NetworkInterfaceASIO::AsyncOp>( - conn->_global->_impl, - TaskExecutor::CallbackHandle(), - RemoteCommandRequest{conn->getHostAndPort(), - std::string("admin"), - BSON("isMaster" << 1), - BSONObj(), - nullptr}, - [conn](const TaskExecutor::ResponseStatus& rs) { - auto cb = std::move(conn->_setupCallback); - cb(conn, rs.status); - }, - conn->_global->now()); -} - -Message ASIOConnection::makeIsMasterRequest(ASIOConnection* conn) { - return rpc::legacyRequestFromOpMsgRequest( - OpMsgRequest::fromDBAndBody("admin", BSON("isMaster" << 1))); -} - -void ASIOConnection::setTimeout(Milliseconds timeout, TimeoutCallback cb) { - _timer.setTimeout(timeout, std::move(cb)); -} - -void ASIOConnection::cancelTimeout() { - _timer.cancelTimeout(); -} - -void ASIOConnection::setup(Milliseconds timeout, SetupCallback cb) { - _impl->strand().dispatch([this, timeout, cb] { - _setupCallback = [this, cb](ConnectionInterface* ptr, Status status) { - { - stdx::lock_guard<stdx::mutex> lk(_impl->_access->mutex); - _impl->_access->id++; - - // If our connection timeout callback ran but wasn't the reason we exited - // the state machine, clear any TIMED_OUT state. - if (status.isOK()) { - _impl->_transitionToState_inlock( - NetworkInterfaceASIO::AsyncOp::State::kUninitialized); - _impl->_transitionToState_inlock( - NetworkInterfaceASIO::AsyncOp::State::kInProgress); - _impl->_transitionToState_inlock( - NetworkInterfaceASIO::AsyncOp::State::kFinished); - } - } - - cancelTimeout(); - cb(ptr, status); - }; - - // Capturing the shared access pad and generation before calling setTimeout gives us enough - // information to avoid calling the timer if we shouldn't without needing any other - // resources that might have been cleaned up. - decltype(_impl->_access) access; - std::size_t generation; - { - stdx::lock_guard<stdx::mutex> lk(_impl->_access->mutex); - access = _impl->_access; - generation = access->id; - } - - // Actually timeout setup - setTimeout(timeout, [this, access, generation] { - stdx::lock_guard<stdx::mutex> lk(access->mutex); - if (generation != access->id) { - // The operation has been cleaned up, do not access. - return; - } - _impl->timeOut_inlock(); - }); - - _global->_impl->_connect(_impl.get()); - }); -} - -void ASIOConnection::resetToUnknown() { - _status = ConnectionPool::kConnectionStateUnknown; -} - -void ASIOConnection::refresh(Milliseconds timeout, RefreshCallback cb) { - _impl->strand().dispatch([this, timeout, cb] { - auto op = _impl.get(); - - // We clear state transitions because we're re-running a portion of the asio state machine - // without entering in startCommand (which would call this for us). - op->clearStateTransitions(); - - _refreshCallback = std::move(cb); - - // Actually timeout refreshes - setTimeout(timeout, [this] { _impl->connection().stream().cancel(); }); - - // Our pings are isMaster's - auto beginStatus = op->beginCommand(makeIsMasterRequest(this), _hostAndPort); - if (!beginStatus.isOK()) { - auto cb = std::move(_refreshCallback); - cb(this, beginStatus); - return; - } - - // If we fail during refresh, the _onFinish function of the AsyncOp will get called. As such - // we - // need to intercept those calls so we can capture them. This will get cleared out when we - // fill - // in the real onFinish in startCommand. - op->setOnFinish([this](RemoteCommandResponse failedResponse) { - invariant(!failedResponse.isOK()); - auto cb = std::move(_refreshCallback); - cb(this, failedResponse.status); - }); - - op->_inRefresh = true; - - _global->_impl->_asyncRunCommand(op, [this, op](std::error_code ec, size_t bytes) { - cancelTimeout(); - - auto cb = std::move(_refreshCallback); - - if (ec) - return cb(this, Status(ErrorCodes::HostUnreachable, ec.message())); - - op->_inRefresh = false; - cb(this, Status::OK()); - }); - }); -} - -std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::releaseAsyncOp() { - { - stdx::lock_guard<stdx::mutex> lk(_impl->_access->mutex); - _impl->_access->id++; - } - return std::move(_impl); -} - -void ASIOConnection::bindAsyncOp(std::unique_ptr<NetworkInterfaceASIO::AsyncOp> op) { - _impl = std::move(op); -} - -ASIOImpl::ASIOImpl(NetworkInterfaceASIO* impl) : _impl(impl) {} - -Date_t ASIOImpl::now() { - return _impl->now(); -} - -std::unique_ptr<ConnectionPool::TimerInterface> ASIOImpl::makeTimer() { - return stdx::make_unique<ASIOTimer>(&_impl->_strand); -} - -std::shared_ptr<ConnectionPool::ConnectionInterface> ASIOImpl::makeConnection( - const HostAndPort& hostAndPort, size_t generation) { - return std::make_shared<ASIOConnection>(hostAndPort, generation, this); -} - -} // namespace connection_pool_asio -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/connection_pool_asio.h b/src/mongo/executor/connection_pool_asio.h deleted file mode 100644 index 9bb1cae50f5..00000000000 --- a/src/mongo/executor/connection_pool_asio.h +++ /dev/null @@ -1,138 +0,0 @@ -/** * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <asio/system_timer.hpp> - -#include <memory> - -#include "mongo/executor/async_stream_interface.h" -#include "mongo/executor/connection_pool.h" -#include "mongo/executor/network_interface.h" -#include "mongo/executor/network_interface_asio.h" -#include "mongo/stdx/mutex.h" - -namespace mongo { -namespace executor { -namespace connection_pool_asio { - -/** - * Implements connection pool timers on top of asio - */ -class ASIOTimer final : public ConnectionPool::TimerInterface { -public: - using clock_type = asio::system_timer::clock_type; - - ASIOTimer(asio::io_service::strand* strand); - ~ASIOTimer(); - - void setTimeout(Milliseconds timeout, TimeoutCallback cb) override; - void cancelTimeout() override; - -private: - struct CallbackSharedState { - stdx::mutex mutex; - std::size_t id = 0; - }; - - TimeoutCallback _cb; - asio::io_service::strand* const _strand; - asio::basic_waitable_timer<clock_type> _impl; - std::shared_ptr<CallbackSharedState> _callbackSharedState; -}; - -/** - * Implements connection pool connections on top of asio - * - * Owns an async op when it's out of the pool - */ -class ASIOConnection final : public ConnectionPool::ConnectionInterface { -public: - ASIOConnection(const HostAndPort& hostAndPort, size_t generation, ASIOImpl* global); - ~ASIOConnection(); - - void indicateSuccess() override; - void indicateUsed() override; - void indicateFailure(Status status) override; - const HostAndPort& getHostAndPort() const override; - - std::unique_ptr<NetworkInterfaceASIO::AsyncOp> releaseAsyncOp(); - void bindAsyncOp(std::unique_ptr<NetworkInterfaceASIO::AsyncOp> op); - - bool isHealthy() override; - -private: - Date_t getLastUsed() const override; - const Status& getStatus() const override; - - void setTimeout(Milliseconds timeout, TimeoutCallback cb) override; - void cancelTimeout() override; - - void setup(Milliseconds timeout, SetupCallback cb) override; - void resetToUnknown() override; - void refresh(Milliseconds timeout, RefreshCallback cb) override; - - size_t getGeneration() const override; - - static std::unique_ptr<NetworkInterfaceASIO::AsyncOp> makeAsyncOp(ASIOConnection* conn); - static Message makeIsMasterRequest(ASIOConnection* conn); - -private: - SetupCallback _setupCallback; - RefreshCallback _refreshCallback; - ASIOImpl* const _global; - Date_t _lastUsed; - Status _status = ConnectionPool::kConnectionStateUnknown; - HostAndPort _hostAndPort; - size_t _generation; - std::unique_ptr<NetworkInterfaceASIO::AsyncOp> _impl; - ASIOTimer _timer; -}; - -/** - * Implementions connection pool implementation for asio - */ -class ASIOImpl final : public ConnectionPool::DependentTypeFactoryInterface { - friend class ASIOConnection; - -public: - ASIOImpl(NetworkInterfaceASIO* impl); - - std::shared_ptr<ConnectionPool::ConnectionInterface> makeConnection( - const HostAndPort& hostAndPort, size_t generation) override; - std::unique_ptr<ConnectionPool::TimerInterface> makeTimer() override; - - Date_t now() override; - -private: - NetworkInterfaceASIO* const _impl; -}; - -} // namespace connection_pool_asio -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/connection_pool_asio_integration_test.cpp b/src/mongo/executor/connection_pool_asio_integration_test.cpp deleted file mode 100644 index 9db13ae302e..00000000000 --- a/src/mongo/executor/connection_pool_asio_integration_test.cpp +++ /dev/null @@ -1,341 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/executor/connection_pool_asio.h" - -#include "mongo/client/connection_string.h" -#include "mongo/executor/async_stream_factory.h" -#include "mongo/executor/async_timer_asio.h" -#include "mongo/executor/connection_pool_stats.h" -#include "mongo/executor/network_connection_hook.h" -#include "mongo/executor/network_interface_asio_test_utils.h" -#include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/stdx/future.h" -#include "mongo/stdx/memory.h" -#include "mongo/unittest/integration_test.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/fail_point_service.h" -#include "mongo/util/scopeguard.h" -#include "mongo/util/time_support.h" - -namespace mongo { -namespace executor { -namespace { - -class MyNetworkConnectionHook : public NetworkConnectionHook { -public: - Status validateHost(const HostAndPort& remoteHost, - const RemoteCommandResponse& isMasterReply) override { - stdx::unique_lock<stdx::mutex> lk(_mutex); - - _count++; - - return Status::OK(); - } - - StatusWith<boost::optional<RemoteCommandRequest>> makeRequest( - const HostAndPort& remoteHost) override { - return StatusWith<boost::optional<RemoteCommandRequest>>(boost::none); - } - - Status handleReply(const HostAndPort& remoteHost, RemoteCommandResponse&& response) override { - MONGO_UNREACHABLE; - } - - static int count() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - - return _count; - } - -private: - static stdx::mutex _mutex; - static int _count; -}; - -stdx::mutex MyNetworkConnectionHook::_mutex; -int MyNetworkConnectionHook::_count = 0; - -TEST(ConnectionPoolASIO, TestPing) { - auto fixture = unittest::getFixtureConnectionString(); - - NetworkInterfaceASIO::Options options; - options.streamFactory = stdx::make_unique<AsyncStreamFactory>(); - options.networkConnectionHook = stdx::make_unique<MyNetworkConnectionHook>(); - options.connectionPoolOptions.maxConnections = 10; - options.timerFactory = stdx::make_unique<AsyncTimerFactoryASIO>(); - NetworkInterfaceASIO net{std::move(options)}; - - net.startup(); - auto guard = MakeGuard([&] { net.shutdown(); }); - - const int N = 50; - - std::array<stdx::thread, N> threads; - - for (auto& thread : threads) { - thread = stdx::thread([&net, &fixture]() { - auto status = Status::OK(); - Deferred<RemoteCommandResponse> deferred; - - RemoteCommandRequest request{ - fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr}; - net.startCommand( - makeCallbackHandle(), - request, - [&deferred](RemoteCommandResponse resp) { deferred.emplace(std::move(resp)); }) - .transitional_ignore(); - - ASSERT_OK(deferred.get().status); - }); - } - - for (auto&& thread : threads) { - thread.join(); - } - - ASSERT_LTE(MyNetworkConnectionHook::count(), 10); -} - -/** - * This test verifies a fix for SERVER-22391, where we raced if a new request - * came in at the same time a host timeout was triggering. - */ -TEST(ConnectionPoolASIO, TestHostTimeoutRace) { - auto fixture = unittest::getFixtureConnectionString(); - - NetworkInterfaceASIO::Options options; - options.streamFactory = stdx::make_unique<AsyncStreamFactory>(); - options.connectionPoolOptions.hostTimeout = Milliseconds(1); - options.timerFactory = stdx::make_unique<AsyncTimerFactoryASIO>(); - NetworkInterfaceASIO net{std::move(options)}; - - net.startup(); - auto guard = MakeGuard([&] { net.shutdown(); }); - - for (int i = 0; i < 1000; i++) { - Deferred<RemoteCommandResponse> deferred; - RemoteCommandRequest request{ - fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr}; - net.startCommand(makeCallbackHandle(), - request, - [&](RemoteCommandResponse resp) { deferred.emplace(std::move(resp)); }) - .transitional_ignore(); - - ASSERT_OK(deferred.get().status); - sleepmillis(1); - } -} - - -/** - * Verify that a connections that timeout immediately don't invariant. - */ -TEST(ConnectionPoolASIO, ConnSetupTimeout) { - auto fixture = unittest::getFixtureConnectionString(); - - NetworkInterfaceASIO::Options options; - options.streamFactory = stdx::make_unique<AsyncStreamFactory>(); - options.timerFactory = stdx::make_unique<AsyncTimerFactoryASIO>(); - options.connectionPoolOptions.refreshTimeout = Milliseconds(-2); - NetworkInterfaceASIO net{std::move(options)}; - - net.startup(); - auto guard = MakeGuard([&] { net.shutdown(); }); - - Deferred<RemoteCommandResponse> deferred; - RemoteCommandRequest request{ - fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr}; - net.startCommand(makeCallbackHandle(), - request, - [&](RemoteCommandResponse resp) { deferred.emplace(std::move(resp)); }) - .transitional_ignore(); - - ASSERT_EQ(deferred.get().status.code(), ErrorCodes::NetworkInterfaceExceededTimeLimit); -} - -/** - * Verify that connection refreshes actually occur, and that they drop down the totalAvailable - * correctly. Verifies SERVER-25006 - */ -TEST(ConnectionPoolASIO, ConnRefreshHappens) { - auto fixture = unittest::getFixtureConnectionString(); - - NetworkInterfaceASIO::Options options; - options.streamFactory = stdx::make_unique<AsyncStreamFactory>(); - options.timerFactory = stdx::make_unique<AsyncTimerFactoryASIO>(); - options.connectionPoolOptions.refreshRequirement = Milliseconds(10); - NetworkInterfaceASIO net{std::move(options)}; - - net.startup(); - auto guard = MakeGuard([&] { net.shutdown(); }); - - std::array<Deferred<RemoteCommandResponse>, 10> deferreds; - - RemoteCommandRequest request{fixture.getServers()[0], - "admin", - BSON("sleep" << 1 << "lock" - << "none" - << "secs" - << 2), - BSONObj(), - nullptr}; - - for (auto& deferred : deferreds) { - net.startCommand(makeCallbackHandle(), - request, - [&](RemoteCommandResponse resp) { deferred.emplace(std::move(resp)); }) - .transitional_ignore(); - } - - for (auto& deferred : deferreds) { - ASSERT_EQ(deferred.get().isOK(), true); - } - - sleepmillis(1000); - - ConnectionPoolStats cps; - net.appendConnectionStats(&cps); - - ASSERT_LTE(cps.totalAvailable + cps.totalInUse, 1u); - ASSERT_EQ(cps.totalCreated, 10u); -} - -/** - * Verify that when a refresh fails, it doesn't trigger an invariant - */ -TEST(ConnectionPoolASIO, ConnRefreshSurvivesFailure) { - auto fixture = unittest::getFixtureConnectionString(); - - NetworkInterfaceASIO::Options options; - options.streamFactory = stdx::make_unique<AsyncStreamFactory>(); - options.timerFactory = stdx::make_unique<AsyncTimerFactoryASIO>(); - options.connectionPoolOptions.refreshRequirement = Milliseconds(0); - NetworkInterfaceASIO net{std::move(options)}; - - net.startup(); - auto guard = MakeGuard([&] { net.shutdown(); }); - - Deferred<RemoteCommandResponse> deferred; - - RemoteCommandRequest request{ - fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr}; - net.startCommand(makeCallbackHandle(), - request, - [&](RemoteCommandResponse resp) { deferred.emplace(std::move(resp)); }) - .transitional_ignore(); - - deferred.get(); - - getGlobalFailPointRegistry() - ->getFailPoint("NetworkInterfaceASIOasyncRunCommandFail") - ->setMode(FailPoint::nTimes, 1); - - sleepmillis(1000); - - ConnectionPoolStats cps; - net.appendConnectionStats(&cps); - - ASSERT_EQ(cps.totalAvailable, 0u); - ASSERT_EQ(cps.totalCreated, 1u); -} - -/** - * Tests that thrashing the timer while calls to setup are occurring doesn't crash. This could - * occur if a setup connection goes through, but the timer thread is too over-worked to cancel the - * timers before they're invoked - */ -TEST(ConnectionPoolASIO, ConnSetupSurvivesFailure) { - const int kNumThreads = 8; - const int kNumOps = 1000; - - auto fixture = unittest::getFixtureConnectionString(); - - NetworkInterfaceASIO::Options options; - options.streamFactory = stdx::make_unique<AsyncStreamFactory>(); - options.timerFactory = stdx::make_unique<AsyncTimerFactoryASIO>(); - options.connectionPoolOptions.refreshTimeout = Seconds(1); - options.connectionPoolOptions.maxConnections = kNumThreads; - NetworkInterfaceASIO net{std::move(options)}; - - net.startup(); - auto guard = MakeGuard([&] { net.shutdown(); }); - - AtomicWord<size_t> unfinished(kNumThreads * kNumOps); - AtomicWord<size_t> unstarted(kNumThreads * kNumOps); - - std::array<stdx::thread, kNumThreads> threads; - stdx::mutex mutex; - stdx::condition_variable condvar; - - for (auto& thread : threads) { - thread = stdx::thread([&] { - for (int i = 0; i < kNumOps; i++) { - RemoteCommandRequest request{fixture.getServers()[0], - "admin", - BSON("sleep" << 1 << "lock" - << "none" - << "secs" - << 3), - BSONObj(), - nullptr}; - net.startCommand(makeCallbackHandle(), - request, - [&](RemoteCommandResponse resp) { - if (!unfinished.subtractAndFetch(1)) { - condvar.notify_one(); - } - }) - .transitional_ignore(); - unstarted.subtractAndFetch(1); - } - }); - } - - stdx::thread timerThrasher([&] { - while (unstarted.load()) { - net.setAlarm(Date_t::now() + Seconds(1), [] {}).transitional_ignore(); - } - }); - - - for (auto& thread : threads) { - thread.join(); - } - - stdx::unique_lock<stdx::mutex> lk(mutex); - condvar.wait(lk, [&] { return !unfinished.load(); }); - - timerThrasher.join(); -} - -} // namespace -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/network_connection_hook.h b/src/mongo/executor/network_connection_hook.h index f76eed79b2e..993adcd1798 100644 --- a/src/mongo/executor/network_connection_hook.h +++ b/src/mongo/executor/network_connection_hook.h @@ -30,6 +30,8 @@ #include <boost/optional.hpp> +#include "mongo/bson/bsonobj.h" + namespace mongo { class Status; @@ -65,6 +67,7 @@ public: * std::terminate. */ virtual Status validateHost(const HostAndPort& remoteHost, + const BSONObj& isMasterRequest, const RemoteCommandResponse& isMasterReply) = 0; /** diff --git a/src/mongo/executor/network_interface.cpp b/src/mongo/executor/network_interface.cpp index e19e9284a13..3bf3cb8445b 100644 --- a/src/mongo/executor/network_interface.cpp +++ b/src/mongo/executor/network_interface.cpp @@ -41,6 +41,8 @@ const unsigned int NetworkInterface::kMessagingPortKeepOpen; NetworkInterface::NetworkInterface() {} NetworkInterface::~NetworkInterface() {} +MONGO_FP_DECLARE(networkInterfaceDiscardCommandsBeforeAcquireConn); +MONGO_FP_DECLARE(networkInterfaceDiscardCommandsAfterAcquireConn); } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h index b333813df14..eb740416596 100644 --- a/src/mongo/executor/network_interface.h +++ b/src/mongo/executor/network_interface.h @@ -35,6 +35,7 @@ #include "mongo/executor/task_executor.h" #include "mongo/stdx/functional.h" #include "mongo/transport/baton.h" +#include "mongo/util/fail_point_service.h" #include "mongo/util/future.h" namespace mongo { @@ -43,6 +44,9 @@ class BSONObjBuilder; namespace executor { +MONGO_FP_FORWARD_DECLARE(networkInterfaceDiscardCommandsBeforeAcquireConn); +MONGO_FP_FORWARD_DECLARE(networkInterfaceDiscardCommandsAfterAcquireConn); + /** * Interface to networking for use by TaskExecutor implementations. */ @@ -119,6 +123,18 @@ public: */ virtual std::string getHostName() = 0; + struct Counters { + uint64_t canceled = 0; + uint64_t timedOut = 0; + uint64_t failed = 0; + uint64_t succeeded = 0; + }; + /* + * Returns a copy of the operation counters (see struct Counters above). This method should + * only be used in tests, and will invariant if getTestCommands() returns false. + */ + virtual Counters getCounters() const = 0; + /** * Starts asynchronous execution of the command described by "request". * diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp deleted file mode 100644 index 7eabcfbb71e..00000000000 --- a/src/mongo/executor/network_interface_asio.cpp +++ /dev/null @@ -1,527 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO - -#include "mongo/platform/basic.h" - -#include "mongo/executor/network_interface_asio.h" - -#include <asio/system_timer.hpp> - -#include <utility> - -#include "mongo/executor/async_stream_factory.h" -#include "mongo/executor/async_stream_interface.h" -#include "mongo/executor/async_timer_asio.h" -#include "mongo/executor/async_timer_interface.h" -#include "mongo/executor/async_timer_mock.h" -#include "mongo/executor/connection_pool_asio.h" -#include "mongo/executor/connection_pool_stats.h" -#include "mongo/rpc/metadata/metadata_hook.h" -#include "mongo/stdx/chrono.h" -#include "mongo/stdx/memory.h" -#include "mongo/util/concurrency/idle_thread_block.h" -#include "mongo/util/concurrency/thread_name.h" -#include "mongo/util/log.h" -#include "mongo/util/net/socket_utils.h" -#include "mongo/util/net/ssl_manager.h" -#include "mongo/util/table_formatter.h" -#include "mongo/util/time_support.h" - -namespace mongo { -namespace executor { - -namespace { -const std::size_t kIOServiceWorkers = 1; -} // namespace - -NetworkInterfaceASIO::Options::Options() = default; - -NetworkInterfaceASIO::NetworkInterfaceASIO(Options options) - : _options(std::move(options)), - _io_service(), - _metadataHook(std::move(_options.metadataHook)), - _hook(std::move(_options.networkConnectionHook)), - _state(State::kReady), - _timerFactory(std::move(_options.timerFactory)), - _streamFactory(std::move(_options.streamFactory)), - _connectionPool(stdx::make_unique<connection_pool_asio::ASIOImpl>(this), - _options.instanceName, - _options.connectionPoolOptions), - _isExecutorRunnable(false), - _strand(_io_service) { - invariant(_timerFactory); -} - -std::string NetworkInterfaceASIO::getDiagnosticString() { - stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); - return _getDiagnosticString_inlock(nullptr); -} - -std::string NetworkInterfaceASIO::_getDiagnosticString_inlock(AsyncOp* currentOp) { - str::stream output; - std::vector<TableRow> rows; - - output << "\nNetworkInterfaceASIO Operations' Diagnostic:\n"; - rows.push_back({"Operation:", "Count:"}); - rows.push_back({"Connecting", std::to_string(_inGetConnection.size())}); - rows.push_back({"In Progress", std::to_string(_inProgress.size())}); - rows.push_back({"Succeeded", std::to_string(getNumSucceededOps())}); - rows.push_back({"Canceled", std::to_string(getNumCanceledOps())}); - rows.push_back({"Failed", std::to_string(getNumFailedOps())}); - rows.push_back({"Timed Out", std::to_string(getNumTimedOutOps())}); - output << toTable(rows); - - if (_inProgress.size() > 0) { - rows.clear(); - rows.push_back(AsyncOp::kFieldLabels); - - // Push AsyncOps - for (auto&& kv : _inProgress) { - auto row = kv.first->getStringFields(); - if (currentOp) { - // If this is the AsyncOp we blew up on, mark with an asterisk - if (*currentOp == *(kv.first)) { - row[0] = "*"; - } - } - - rows.push_back(row); - } - - // Format as a table - output << "\n" << toTable(rows); - } - - output << "\n"; - - return output; -} - -uint64_t NetworkInterfaceASIO::getNumCanceledOps() { - return _numCanceledOps.load(); -} - -uint64_t NetworkInterfaceASIO::getNumFailedOps() { - return _numFailedOps.load(); -} - -uint64_t NetworkInterfaceASIO::getNumSucceededOps() { - return _numSucceededOps.load(); -} - -uint64_t NetworkInterfaceASIO::getNumTimedOutOps() { - return _numTimedOutOps.load(); -} - -void NetworkInterfaceASIO::appendConnectionStats(ConnectionPoolStats* stats) const { - _connectionPool.appendConnectionStats(stats); -} - -std::string NetworkInterfaceASIO::getHostName() { - return getHostNameCached(); -} - -void NetworkInterfaceASIO::startup() { - _serviceRunners.resize(kIOServiceWorkers); - for (std::size_t i = 0; i < kIOServiceWorkers; ++i) { - _serviceRunners[i] = stdx::thread([this, i]() { - setThreadName(_options.instanceName + "-" + std::to_string(i)); - try { - LOG(2) << "The NetworkInterfaceASIO worker thread is spinning up"; - asio::io_service::work work(_io_service); - std::error_code ec; - _io_service.run(ec); - if (ec) { - severe() << "Failure in _io_service.run(): " << ec.message(); - fassertFailed(40335); - } - } catch (...) { - severe() << "Uncaught exception in NetworkInterfaceASIO IO " - "worker thread of type: " - << exceptionToStatus(); - fassertFailed(28820); - } - }); - }; - _state.store(State::kRunning); -} - -void NetworkInterfaceASIO::shutdown() { - _state.store(State::kShutdown); - _io_service.stop(); - for (auto&& worker : _serviceRunners) { - worker.join(); - } - LOG(2) << "NetworkInterfaceASIO shutdown successfully"; -} - -void NetworkInterfaceASIO::waitForWork() { - stdx::unique_lock<stdx::mutex> lk(_executorMutex); - // TODO: This can be restructured with a lambda. - while (!_isExecutorRunnable) { - MONGO_IDLE_THREAD_BLOCK; - _isExecutorRunnableCondition.wait(lk); - } - _isExecutorRunnable = false; -} - -void NetworkInterfaceASIO::waitForWorkUntil(Date_t when) { - stdx::unique_lock<stdx::mutex> lk(_executorMutex); - // TODO: This can be restructured with a lambda. - while (!_isExecutorRunnable) { - const Milliseconds waitTime(when - now()); - if (waitTime <= Milliseconds(0)) { - break; - } - MONGO_IDLE_THREAD_BLOCK; - _isExecutorRunnableCondition.wait_for(lk, waitTime.toSystemDuration()); - } - _isExecutorRunnable = false; -} - -void NetworkInterfaceASIO::signalWorkAvailable() { - stdx::unique_lock<stdx::mutex> lk(_executorMutex); - _signalWorkAvailable_inlock(); -} - -void NetworkInterfaceASIO::_signalWorkAvailable_inlock() { - if (!_isExecutorRunnable) { - _isExecutorRunnable = true; - _isExecutorRunnableCondition.notify_one(); - } -} - -Date_t NetworkInterfaceASIO::now() { - return _timerFactory->now(); -} - -namespace { - -Status attachMetadataIfNeeded(RemoteCommandRequest& request, - rpc::EgressMetadataHook* metadataHook) { - - // Append the metadata of the request with metadata from the metadata hook - // if a hook is installed - if (metadataHook) { - BSONObjBuilder augmentedBob(std::move(request.metadata)); - - auto writeStatus = callNoexcept(*metadataHook, - &rpc::EgressMetadataHook::writeRequestMetadata, - request.opCtx, - &augmentedBob); - if (!writeStatus.isOK()) { - return writeStatus; - } - - request.metadata = augmentedBob.obj(); - } - - return Status::OK(); -} - -} // namespace - -Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHandle, - RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish, - const transport::BatonHandle& baton) { - MONGO_ASIO_INVARIANT(onFinish, "Invalid completion function"); - { - stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); - const auto insertResult = _inGetConnection.emplace(cbHandle); - // We should never see the same CallbackHandle added twice - MONGO_ASIO_INVARIANT_INLOCK(insertResult.second, "Same CallbackHandle added twice"); - } - - if (inShutdown()) { - return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceASIO shutdown in progress"}; - } - - LOG(2) << "startCommand: " << redact(request.toString()); - - auto getConnectionStartTime = now(); - - auto statusMetadata = attachMetadataIfNeeded(request, _metadataHook.get()); - if (!statusMetadata.isOK()) { - return statusMetadata; - } - - auto nextStep = [this, getConnectionStartTime, cbHandle, request, onFinish]( - StatusWith<ConnectionPool::ConnectionHandle> swConn) { - - if (!swConn.isOK()) { - LOG(2) << "Failed to get connection from pool for request " << request.id << ": " - << swConn.getStatus(); - - bool wasPreviouslyCanceled = false; - { - stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); - wasPreviouslyCanceled = _inGetConnection.erase(cbHandle) == 0; - } - - Status status = wasPreviouslyCanceled - ? Status(ErrorCodes::CallbackCanceled, "Callback canceled") - : swConn.getStatus(); - if (ErrorCodes::isExceededTimeLimitError(status.code())) { - _numTimedOutOps.fetchAndAdd(1); - } - if (status.code() != ErrorCodes::CallbackCanceled) { - _numFailedOps.fetchAndAdd(1); - } - - onFinish({status, now() - getConnectionStartTime}); - signalWorkAvailable(); - return; - } - - auto conn = static_cast<connection_pool_asio::ASIOConnection*>(swConn.getValue().get()); - - AsyncOp* op = nullptr; - - stdx::unique_lock<stdx::mutex> lk(_inProgressMutex); - - const auto eraseCount = _inGetConnection.erase(cbHandle); - - // If we didn't find the request, we've been canceled - if (eraseCount == 0) { - lk.unlock(); - - onFinish({ErrorCodes::CallbackCanceled, - "Callback canceled", - now() - getConnectionStartTime}); - - // Though we were canceled, we know that the stream is fine, so indicate success. - conn->indicateSuccess(); - - signalWorkAvailable(); - - return; - } - - // We can't release the AsyncOp until we know we were not canceled. - auto ownedOp = conn->releaseAsyncOp(); - op = ownedOp.get(); - - // This AsyncOp may be recycled. We expect timeout and canceled to be clean. - // If this op was most recently used to connect, its state transitions won't have been - // reset, so we do that here. - MONGO_ASIO_INVARIANT_INLOCK(!op->canceled(), "AsyncOp has dirty canceled flag", op); - MONGO_ASIO_INVARIANT_INLOCK(!op->timedOut(), "AsyncOp has dirty timeout flag", op); - op->clearStateTransitions(); - - // Now that we're inProgress, an external cancel can touch our op, but - // not until we release the inProgressMutex. - _inProgress.emplace(op, std::move(ownedOp)); - - op->_cbHandle = std::move(cbHandle); - op->_request = std::move(request); - op->_onFinish = std::move(onFinish); - op->_connectionPoolHandle = std::move(swConn.getValue()); - op->startProgress(getConnectionStartTime); - - // This ditches the lock and gets us onto the strand (so we're - // threadsafe) - op->_strand.post([this, op, getConnectionStartTime] { - const auto timeout = op->_request.timeout; - - // Set timeout now that we have the correct request object - if (timeout != RemoteCommandRequest::kNoTimeout) { - // Subtract the time it took to get the connection from the pool from the request - // timeout. - auto getConnectionDuration = now() - getConnectionStartTime; - if (getConnectionDuration >= timeout) { - // We only assume that the request timer is guaranteed to fire *after* the - // timeout duration - but make no stronger assumption. It is thus possible that - // we have already exceeded the timeout. In this case we timeout the operation - // manually. - std::stringstream msg; - msg << "Remote command timed out while waiting to get a connection from the " - << "pool, took " << getConnectionDuration << ", timeout was set to " - << timeout; - auto rs = ResponseStatus(ErrorCodes::NetworkInterfaceExceededTimeLimit, - msg.str(), - getConnectionDuration); - return _completeOperation(op, rs); - } - - // The above conditional guarantees that the adjusted timeout will never underflow. - MONGO_ASIO_INVARIANT(timeout > getConnectionDuration, "timeout underflowed", op); - const auto adjustedTimeout = timeout - getConnectionDuration; - const auto requestId = op->_request.id; - - try { - op->_timeoutAlarm = - op->_owner->_timerFactory->make(&op->_strand, adjustedTimeout); - } catch (std::system_error& e) { - severe() << "Failed to construct timer for AsyncOp: " << e.what(); - fassertFailed(40334); - } - - std::shared_ptr<AsyncOp::AccessControl> access; - std::size_t generation; - { - stdx::lock_guard<stdx::mutex> lk(op->_access->mutex); - access = op->_access; - generation = access->id; - } - - op->_timeoutAlarm->asyncWait([op, access, generation, requestId, adjustedTimeout]( - std::error_code ec) { - // We must pass a check for safe access before using op inside the - // callback or we may attempt access on an invalid pointer. - stdx::lock_guard<stdx::mutex> lk(access->mutex); - if (generation != access->id) { - // The operation has been cleaned up, do not access. - return; - } - - if (!ec) { - LOG(2) << "Request " << requestId << " timed out" - << ", adjusted timeout after getting connection from pool was " - << adjustedTimeout << ", op was " << redact(op->toString()); - - op->timeOut_inlock(); - } else { - LOG(2) << "Failed to time request " << requestId << "out: " << ec.message() - << ", op was " << redact(op->toString()); - } - }); - } - - _beginCommunication(op); - }); - }; - - _connectionPool.get(request.target, request.timeout, nextStep); - return Status::OK(); -} - -void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, - const transport::BatonHandle& baton) { - stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); - - // If we found a matching cbHandle in _inGetConnection, then - // simply removing it has the same effect as cancelling it, so we - // can just return. - if (_inGetConnection.erase(cbHandle) != 0) { - _numCanceledOps.fetchAndAdd(1); - return; - } - - // TODO: This linear scan is unfortunate. It is here because our - // primary data structure is to keep the AsyncOps in an - // unordered_map by pointer, but here we only have the - // callback. We could keep two data structures at the risk of - // having them diverge. - for (auto&& kv : _inProgress) { - if (kv.first->cbHandle() == cbHandle) { - kv.first->cancel(); - _numCanceledOps.fetchAndAdd(1); - break; - } - } -} - -Status NetworkInterfaceASIO::setAlarm(Date_t when, - const stdx::function<void()>& action, - const transport::BatonHandle& baton) { - if (inShutdown()) { - return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceASIO shutdown in progress"}; - } - - std::shared_ptr<asio::system_timer> alarm; - - try { - auto timeLeft = when - now(); - // "alarm" must stay alive until it expires, hence the shared_ptr. - alarm = std::make_shared<asio::system_timer>(_io_service, timeLeft.toSystemDuration()); - } catch (...) { - return exceptionToStatus(); - } - - alarm->async_wait([alarm, this, action, when, baton](std::error_code ec) { - const auto nowValue = now(); - if (nowValue < when) { - warning() << "ASIO alarm returned early. Expected at: " << when - << ", fired at: " << nowValue; - const auto status = setAlarm(when, action, baton); - if ((!status.isOK()) && (status.code() != ErrorCodes::ShutdownInProgress)) { - fassertFailedWithStatus(40383, status); - } - return; - } - if (!ec) { - return action(); - } else if (ec != asio::error::operation_aborted) { - // When the network interface is shut down, it will cancel all pending - // alarms, raising an "operation_aborted" error here, which we ignore. - warning() << "setAlarm() received an error: " << ec.message(); - } - }); - - return Status::OK(); -}; - -bool NetworkInterfaceASIO::inShutdown() const { - return (_state.load() == State::kShutdown); -} - -bool NetworkInterfaceASIO::onNetworkThread() { - auto id = stdx::this_thread::get_id(); - return std::any_of(_serviceRunners.begin(), - _serviceRunners.end(), - [id](const stdx::thread& thread) { return id == thread.get_id(); }); -} - -void NetworkInterfaceASIO::_failWithInfo(const char* file, - int line, - std::string error, - AsyncOp* op) { - stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); - _failWithInfo_inlock(file, line, error, op); -} - -void NetworkInterfaceASIO::_failWithInfo_inlock(const char* file, - int line, - std::string error, - AsyncOp* op) { - std::stringstream ss; - ss << "Invariant failure at " << file << ":" << line << ": " << error; - ss << _getDiagnosticString_inlock(op); - Status status{ErrorCodes::InternalError, ss.str()}; - fassertFailedWithStatus(34429, status); -} - -void NetworkInterfaceASIO::dropConnections(const HostAndPort& hostAndPort) { - _connectionPool.dropConnections(hostAndPort); -} - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h deleted file mode 100644 index 3a423e8b779..00000000000 --- a/src/mongo/executor/network_interface_asio.h +++ /dev/null @@ -1,526 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <asio.hpp> - -#include <array> -#include <boost/optional.hpp> -#include <memory> -#include <string> -#include <system_error> -#include <vector> - -#include "mongo/base/status.h" -#include "mongo/base/system_error.h" -#include "mongo/executor/async_stream_factory_interface.h" -#include "mongo/executor/async_stream_interface.h" -#include "mongo/executor/async_timer_interface.h" -#include "mongo/executor/connection_pool.h" -#include "mongo/executor/network_connection_hook.h" -#include "mongo/executor/network_interface.h" -#include "mongo/executor/remote_command_request.h" -#include "mongo/executor/remote_command_response.h" -#include "mongo/platform/atomic_word.h" -#include "mongo/rpc/message.h" -#include "mongo/rpc/metadata/metadata_hook.h" -#include "mongo/rpc/protocol.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/functional.h" -#include "mongo/stdx/mutex.h" -#include "mongo/stdx/thread.h" -#include "mongo/stdx/unordered_map.h" -#include "mongo/stdx/unordered_set.h" -#include "mongo/transport/message_compressor_manager.h" - -namespace mongo { - -namespace executor { - -namespace connection_pool_asio { -class ASIOConnection; -class ASIOTimer; -class ASIOImpl; -} // connection_pool_asio - -class AsyncStreamInterface; - -#define MONGO_ASIO_INVARIANT(_Expression, ...) \ - do { \ - if (MONGO_unlikely(!(_Expression))) { \ - _failWithInfo(__FILE__, __LINE__, __VA_ARGS__); \ - } \ - } while (false) - -#define MONGO_ASIO_INVARIANT_INLOCK(_Expression, ...) \ - do { \ - if (MONGO_unlikely(!(_Expression))) { \ - _failWithInfo_inlock(__FILE__, __LINE__, __VA_ARGS__); \ - } \ - } while (false) - -// An AsyncOp can transition through at most 5 states. -const int kMaxStateTransitions = 5; - -/** - * Implementation of the replication system's network interface using Christopher - * Kohlhoff's ASIO library instead of existing MongoDB networking primitives. - */ -class NetworkInterfaceASIO final : public NetworkInterface { - friend class connection_pool_asio::ASIOConnection; - friend class connection_pool_asio::ASIOTimer; - friend class connection_pool_asio::ASIOImpl; - class AsyncOp; - -public: - struct Options { - Options(); - - std::string instanceName = "NetworkInterfaceASIO"; - ConnectionPool::Options connectionPoolOptions; - std::unique_ptr<AsyncTimerFactoryInterface> timerFactory; - std::unique_ptr<NetworkConnectionHook> networkConnectionHook; - std::unique_ptr<AsyncStreamFactoryInterface> streamFactory; - std::unique_ptr<rpc::EgressMetadataHook> metadataHook; - }; - - NetworkInterfaceASIO(Options = Options()); - - std::string getDiagnosticString() override; - - uint64_t getNumCanceledOps(); - uint64_t getNumFailedOps(); - uint64_t getNumSucceededOps(); - uint64_t getNumTimedOutOps(); - - void appendConnectionStats(ConnectionPoolStats* stats) const override; - std::string getHostName() override; - void startup() override; - void shutdown() override; - bool inShutdown() const override; - void waitForWork() override; - void waitForWorkUntil(Date_t when) override; - void signalWorkAvailable() override; - Date_t now() override; - Status startCommand(const TaskExecutor::CallbackHandle& cbHandle, - RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish, - const transport::BatonHandle& baton = nullptr) override; - void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, - const transport::BatonHandle& baton = nullptr) override; - Status setAlarm(Date_t when, - const stdx::function<void()>& action, - const transport::BatonHandle& baton = nullptr) override; - - bool onNetworkThread() override; - - void dropConnections(const HostAndPort& hostAndPort) override; - -private: - using ResponseStatus = TaskExecutor::ResponseStatus; - using NetworkInterface::RemoteCommandCompletionFn; - using NetworkOpHandler = stdx::function<void(std::error_code, size_t)>; - using TableRow = std::vector<std::string>; - - enum class State { kReady, kRunning, kShutdown }; - - friend class AsyncOp; - - /** - * AsyncConnection encapsulates the per-connection state we maintain. - */ - class AsyncConnection { - public: - AsyncConnection(std::unique_ptr<AsyncStreamInterface>, rpc::ProtocolSet serverProtocols); - - AsyncStreamInterface& stream(); - - void cancel(); - - rpc::ProtocolSet serverProtocols() const; - rpc::ProtocolSet clientProtocols() const; - void setServerProtocols(rpc::ProtocolSet protocols); - - MessageCompressorManager& getCompressorManager() { - return _compressorManager; - } - - private: - std::unique_ptr<AsyncStreamInterface> _stream; - - rpc::ProtocolSet _serverProtocols; - - // Dynamically initialized from [min max]WireVersionOutgoing. - // Its expected that isMaster response is checked only on the caller. - rpc::ProtocolSet _clientProtocols{rpc::supports::kNone}; - - MessageCompressorManager _compressorManager; - }; - - /** - * AsyncCommand holds state for a currently running or soon-to-be-run command. - */ - class AsyncCommand { - public: - AsyncCommand(AsyncConnection* conn, - Message&& command, - Date_t now, - const HostAndPort& target); - - NetworkInterfaceASIO::AsyncConnection& conn(); - - Message& toSend(); - Message& toRecv(); - MSGHEADER::Value& header(); - - ResponseStatus response(AsyncOp* op, - rpc::Protocol protocol, - Date_t now, - rpc::EgressMetadataHook* metadataHook = nullptr); - - HostAndPort target() const { - return this->_target; - } - - private: - NetworkInterfaceASIO::AsyncConnection* const _conn; - - Message _toSend; - Message _toRecv; - - // TODO: Investigate efficiency of storing header separately. - MSGHEADER::Value _header; - - const Date_t _start; - - const HostAndPort _target; - }; - - /** - * Helper object to manage individual network operations. - */ - class AsyncOp { - friend class NetworkInterfaceASIO; - friend class connection_pool_asio::ASIOConnection; - - public: - /** - * Describe the various states through which an AsyncOp transitions. - */ - enum class State : unsigned char { - // A non-state placeholder. - kNoState, - // A new or zeroed-out AsyncOp. - kUninitialized, - // An AsyncOp begins its progress when startProgress() is called. - kInProgress, - // An AsyncOp transitions to kTimedOut when timeOut() is called. - // Note that the AsyncOp can be in a kCanceled state and still be - // in-flight in NetworkInterfaceASIO. - kTimedOut, - // An AsyncOp transitions to kCanceled when cancel() is called. - // Note that the AsyncOp can be in a kCanceled state and still be - // in-flight in NetworkInterfaceASIO. - kCanceled, - // An AsyncOp is finished once its finish() method is called. Note - // that the AsyncOp can be in a kFinished state and still be in the - // NetworkInterface's set of in-progress operations. - kFinished, - }; - - AsyncOp(NetworkInterfaceASIO* net, - const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish, - Date_t now); - - /** - * Access control for AsyncOp. These objects should be used through shared_ptrs. - * - * In order to safely access an AsyncOp: - * 1. Take the lock - * 2. Check the id - * 3. If id matches saved generation, proceed, otherwise op has been recycled. - */ - struct AccessControl { - stdx::mutex mutex; - std::size_t id = 0; - }; - - void cancel(); - bool canceled() const; - void timeOut_inlock(); - bool timedOut() const; - - const TaskExecutor::CallbackHandle& cbHandle() const; - - AsyncConnection& connection(); - - void setConnection(AsyncConnection&& conn); - - // AsyncOp may run multiple commands over its lifetime (for example, an ismaster - // command, the command provided to the NetworkInterface via startCommand(), etc.) - // Calling beginCommand() resets internal state to prepare to run newCommand. - Status beginCommand(const RemoteCommandRequest& request); - - // This form of beginCommand takes a raw message. It is needed if the caller - // has to form the command manually (e.g. to use a specific requestBuilder). - Status beginCommand(Message&& newCommand, const HostAndPort& target); - - AsyncCommand& command(); - bool commandIsInitialized() const; - - void finish(TaskExecutor::ResponseStatus&& status); - - const RemoteCommandRequest& request() const; - - void startProgress(Date_t startTime); - - Date_t start() const; - - rpc::Protocol operationProtocol() const; - - void setOperationProtocol(rpc::Protocol proto); - - void setResponseMetadata(BSONObj m); - BSONObj getResponseMetadata(); - - void reset(); - - void clearStateTransitions(); - - void setOnFinish(RemoteCommandCompletionFn&& onFinish); - - // Returns diagnostic strings for logging. - TableRow getStringFields() const; - std::string toString() const; - - asio::io_service::strand& strand() { - return _strand; - } - - asio::ip::tcp::resolver& resolver() { - return _resolver; - } - - bool operator==(const AsyncOp& other) const; - - private: - // Type to represent the internal id of this request. - using AsyncOpId = uint64_t; - - static const TableRow kFieldLabels; - - // Return string representation of a given state. - std::string _stateToString(State state) const; - - // Return a string representation of this op's state transitions. - std::string _stateString() const; - - bool _hasSeenState(State state) const; - - // Track and validate AsyncOp state transitions. - // Use the _inlock variant if already holding the access control lock. - void _transitionToState(State newState); - void _transitionToState_inlock(State newState); - - // Helper for debugging. - void _failWithInfo(const char* file, int line, std::string error) const; - - NetworkInterfaceASIO* const _owner; - // Information describing a task enqueued on the NetworkInterface - // via a call to startCommand(). - TaskExecutor::CallbackHandle _cbHandle; - RemoteCommandRequest _request; - RemoteCommandCompletionFn _onFinish; - - // AsyncOp's have a handle to their connection pool handle. They are - // also owned by it when they're in the pool - ConnectionPool::ConnectionHandle _connectionPoolHandle; - - /** - * The connection state used to service this request. We wrap it in an optional - * as it is instantiated at some point after the AsyncOp is created. - */ - boost::optional<AsyncConnection> _connection; - - /** - * The RPC protocol used for this operation. We wrap it in an optional as it - * is not known until we obtain a connection. - */ - boost::optional<rpc::Protocol> _operationProtocol; - - Date_t _start; - std::unique_ptr<AsyncTimerInterface> _timeoutAlarm; - - asio::ip::tcp::resolver _resolver; - - const AsyncOpId _id; - - /** - * We maintain a shared_ptr to an access control object. This ensures that tangent - * execution paths, such as timeouts for this operation, will not try to access its - * state after it has been cleaned up. - */ - std::shared_ptr<AccessControl> _access; - - /** - * An AsyncOp may run 0, 1, or multiple commands over its lifetime. - * AsyncOp only holds at most a single AsyncCommand object at a time, - * representing its current running or next-to-be-run command, if there is one. - */ - boost::optional<AsyncCommand> _command; - bool _inSetup; - bool _inRefresh; - - /** - * The explicit strand that all operations for this op must run on. - * This must be the last member of AsyncOp because any pending - * operation for the strand are run when it's dtor is called. Any - * members that fall after it will have already been destroyed, which - * will make those fields illegal to touch from callbacks. - */ - asio::io_service::strand _strand; - - /** - * We hold an array of states to show the path this AsyncOp has taken. - * Must be holding the access control's lock to edit. - */ - std::array<State, kMaxStateTransitions> _states; - - BSONObj _responseMetadata{}; - }; - - void _startCommand(AsyncOp* op); - - /** - * Wraps a completion handler in pre-condition checks. - * When we resume after an asynchronous call, we may find the following: - * - the AsyncOp has been canceled in the interim (via cancelCommand()) - * - the asynchronous call has returned a non-OK error code - * Should both conditions be present, we handle cancelation over errors. States use - * _validateAndRun() to perform these checks before advancing the state machine. - */ - template <typename Handler> - void _validateAndRun(AsyncOp* op, std::error_code ec, Handler&& handler) { - if (op->canceled()) { - auto rs = ResponseStatus( - ErrorCodes::CallbackCanceled, "Callback canceled", now() - op->start()); - return _completeOperation(op, rs); - } else if (op->timedOut()) { - auto rs = ResponseStatus(ErrorCodes::NetworkInterfaceExceededTimeLimit, - "Operation timed out", - now() - op->start()); - return _completeOperation(op, rs); - } else if (ec) - return _networkErrorCallback(op, ec); - - handler(); - } - - // Connection - void _connect(AsyncOp* op); - - // setup plaintext TCP socket - void _setupSocket(AsyncOp* op, asio::ip::tcp::resolver::iterator endpoints); - - void _runIsMaster(AsyncOp* op); - void _runConnectionHook(AsyncOp* op); - void _authenticate(AsyncOp* op); - - // Communication state machine - void _beginCommunication(AsyncOp* op); - void _completedOpCallback(AsyncOp* op); - void _networkErrorCallback(AsyncOp* op, const std::error_code& ec); - void _completeOperation(AsyncOp* op, TaskExecutor::ResponseStatus resp); - - void _signalWorkAvailable_inlock(); - - void _asyncRunCommand(AsyncOp* op, NetworkOpHandler handler); - - std::string _getDiagnosticString_inlock(AsyncOp* currentOp); - - // Helpers for debugging crashes - void _failWithInfo(const char* file, int line, std::string error, AsyncOp* op = nullptr); - void _failWithInfo_inlock(const char* file, int line, std::string error, AsyncOp* op = nullptr); - - Options _options; - - asio::io_service _io_service; - std::vector<stdx::thread> _serviceRunners; - - const std::unique_ptr<rpc::EgressMetadataHook> _metadataHook; - - const std::unique_ptr<NetworkConnectionHook> _hook; - - std::atomic<State> _state; // NOLINT - - std::unique_ptr<AsyncTimerFactoryInterface> _timerFactory; - - std::unique_ptr<AsyncStreamFactoryInterface> _streamFactory; - - ConnectionPool _connectionPool; - - // If it is necessary to hold this lock while accessing a particular operation with - // an AccessControl object, take this lock first, always. - stdx::mutex _inProgressMutex; - stdx::unordered_map<AsyncOp*, std::unique_ptr<AsyncOp>> _inProgress; - stdx::unordered_set<TaskExecutor::CallbackHandle> _inGetConnection; - - // Operation counters - AtomicUInt64 _numCanceledOps; - AtomicUInt64 _numFailedOps; // includes timed out ops but does not include canceled ops - AtomicUInt64 _numSucceededOps; - AtomicUInt64 _numTimedOutOps; - - stdx::mutex _executorMutex; - bool _isExecutorRunnable; - stdx::condition_variable _isExecutorRunnableCondition; - - /** - * The explicit strand that all non-op operations run on. This must be the - * last member of NetworkInterfaceASIO because any pending operation for - * the strand are run when it's dtor is called. Any members that fall after - * it will have already been destroyed, which will make those fields - * illegal to touch from callbacks. - */ - asio::io_service::strand _strand; -}; - -template <typename T, typename R, typename... MethodArgs, typename... DeducedArgs> -R callNoexcept(T& obj, R (T::*method)(MethodArgs...), DeducedArgs&&... args) { - try { - return (obj.*method)(std::forward<DeducedArgs>(args)...); - } catch (...) { - std::terminate(); - } -} - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/network_interface_asio_auth.cpp b/src/mongo/executor/network_interface_asio_auth.cpp deleted file mode 100644 index 1a981b3d017..00000000000 --- a/src/mongo/executor/network_interface_asio_auth.cpp +++ /dev/null @@ -1,232 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO - -#include "mongo/platform/basic.h" - -#include "mongo/executor/network_interface_asio.h" - -#include "mongo/bson/util/builder.h" -#include "mongo/client/authenticate.h" -#include "mongo/config.h" -#include "mongo/db/auth/authorization_manager_global.h" -#include "mongo/db/auth/internal_user_auth.h" -#include "mongo/db/commands.h" -#include "mongo/db/commands/test_commands_enabled.h" -#include "mongo/db/server_options.h" -#include "mongo/db/wire_version.h" -#include "mongo/rpc/factory.h" -#include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/rpc/legacy_request_builder.h" -#include "mongo/rpc/metadata/client_metadata.h" -#include "mongo/rpc/reply_interface.h" -#include "mongo/stdx/memory.h" -#include "mongo/util/log.h" -#include "mongo/util/net/ssl_manager.h" -#include "mongo/util/version.h" - -namespace mongo { -namespace executor { - -using ResponseStatus = TaskExecutor::ResponseStatus; - -void NetworkInterfaceASIO::_runIsMaster(AsyncOp* op) { - BSONObjBuilder bob; - bob.append("isMaster", 1); - bob.append("hangUpOnStepDown", false); - - const auto versionString = VersionInfoInterface::instance().version(); - ClientMetadata::serialize(_options.instanceName, versionString, &bob); - - if (getTestCommandsEnabled()) { - // Only include the host:port of this process in the isMaster command request if test - // commands are enabled. mongobridge uses this field to identify the process opening a - // connection to it. - StringBuilder sb; - sb << getHostName() << ':' << serverGlobalParams.port; - bob.append("hostInfo", sb.str()); - } - - op->connection().getCompressorManager().clientBegin(&bob); - - if (WireSpec::instance().isInternalClient) { - WireSpec::appendInternalClientWireVersion(WireSpec::instance().outgoing, &bob); - } - - // We use a legacy request to create our ismaster request because we may - // have to communicate with servers that do not support other protocols. - auto isMasterRequest = - rpc::legacyRequestFromOpMsgRequest(OpMsgRequest::fromDBAndBody("admin", bob.obj())); - - // Set current command to ismaster request and run - auto beginStatus = op->beginCommand(std::move(isMasterRequest), op->request().target); - if (!beginStatus.isOK()) { - return _completeOperation(op, beginStatus); - } - - // Callback to parse protocol information out of received ismaster response - auto parseIsMaster = [this, op]() { - - auto swCommandReply = op->command().response(op, rpc::Protocol::kOpQuery, now()); - if (!swCommandReply.isOK()) { - return _completeOperation(op, swCommandReply); - } - - auto commandReply = std::move(swCommandReply); - - // Ensure that the isMaster response is "ok:1". - auto commandStatus = getStatusFromCommandResult(commandReply.data); - if (!commandStatus.isOK()) { - return _completeOperation(op, commandStatus); - } - - auto protocolSet = rpc::parseProtocolSetFromIsMasterReply(commandReply.data); - if (!protocolSet.isOK()) - return _completeOperation(op, protocolSet.getStatus()); - - auto validateStatus = - rpc::validateWireVersion(WireSpec::instance().outgoing, protocolSet.getValue().version); - if (!validateStatus.isOK()) { - warning() << "remote host has incompatible wire version: " << validateStatus; - - return _completeOperation(op, validateStatus); - } - auto egressTagManager = _options.connectionPoolOptions.egressTagCloserManager; - if (egressTagManager) { - // Tag outgoing connection so it can be kept open on FCV upgrade if it is not to a - // server with a lower binary version. - if (protocolSet.getValue().version.maxWireVersion >= - WireSpec::instance().outgoing.maxWireVersion) { - egressTagManager->mutateTags( - op->command().target(), - [](transport::Session::TagMask tags) { return transport::Session::kKeepOpen; }); - } - } // Some unit and integration tests do not set up an egress tag manager. - - op->connection().setServerProtocols(protocolSet.getValue().protocolSet); - - invariant(op->connection().clientProtocols() != rpc::supports::kNone); - // Set the operation protocol - auto negotiatedProtocol = - rpc::negotiate(op->connection().serverProtocols(), op->connection().clientProtocols()); - - if (!negotiatedProtocol.isOK()) { - // Add relatively verbose logging here, since this should not happen unless we are - // mongos and we try to connect to a node that doesn't support OP_COMMAND. - warning() << "failed to negotiate protocol with remote host: " << op->request().target; - warning() << "request was: " << redact(op->request().cmdObj); - warning() << "response was: " << redact(commandReply.data); - - auto clientProtos = rpc::toString(op->connection().clientProtocols()); - if (clientProtos.isOK()) { - warning() << "our (client) supported protocols: " << clientProtos.getValue(); - } - auto serverProtos = rpc::toString(op->connection().serverProtocols()); - if (serverProtos.isOK()) { - warning() << "remote server's supported protocols:" << serverProtos.getValue(); - } - return _completeOperation(op, negotiatedProtocol.getStatus()); - } - - op->setOperationProtocol(negotiatedProtocol.getValue()); - - op->connection().getCompressorManager().clientFinish(commandReply.data); - - if (_hook) { - // Run the validation hook. - auto validHost = callNoexcept( - *_hook, &NetworkConnectionHook::validateHost, op->request().target, commandReply); - if (!validHost.isOK()) { - return _completeOperation(op, validHost); - } - } - - return _authenticate(op); - - }; - - _asyncRunCommand(op, [this, op, parseIsMaster](std::error_code ec, size_t bytes) { - _validateAndRun(op, ec, std::move(parseIsMaster)); - }); -} - -void NetworkInterfaceASIO::_authenticate(AsyncOp* op) { - // There is currently no way for NetworkInterfaceASIO's users to run a command - // without going through _authenticate(). Callers may want to run certain commands, - // such as ismasters, pre-auth. We may want to offer this choice in the future. - - // This check is sufficient to see if auth is enabled on the system, - // and avoids creating dependencies on deeper, less accessible auth code. - if (!isInternalAuthSet()) { - return _runConnectionHook(op); - } - - // We will only have a valid clientName if SSL is enabled. - std::string clientName; -#ifdef MONGO_CONFIG_SSL - if (getSSLManager()) { - clientName = getSSLManager()->getSSLConfiguration().clientSubjectName.toString(); - } -#endif - - // authenticateClient will use this to run auth-related commands over our connection. - auto runCommandHook = [this, op](executor::RemoteCommandRequest request, - auth::AuthCompletionHandler handler) { - - // SERVER-14170: Set the metadataHook to nullptr explicitly as we cannot write metadata - // here. - auto beginStatus = op->beginCommand(request); - if (!beginStatus.isOK()) { - return handler(beginStatus); - } - - auto callAuthCompletionHandler = [this, op, handler]() { - auto authResponse = op->command().response(op, op->operationProtocol(), now(), nullptr); - handler(authResponse); - }; - - _asyncRunCommand(op, - [this, op, callAuthCompletionHandler](std::error_code ec, size_t bytes) { - _validateAndRun(op, ec, callAuthCompletionHandler); - }); - }; - - // This will be called when authentication has completed. - auto authHook = [this, op](auth::AuthResponse response) { - if (!response.isOK()) - return _completeOperation(op, response); - return _runConnectionHook(op); - }; - - auto params = getInternalUserAuthParams(); - auth::authenticateClient(params, op->request().target, clientName, runCommandHook, authHook); -} - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp deleted file mode 100644 index ca111a09f51..00000000000 --- a/src/mongo/executor/network_interface_asio_command.cpp +++ /dev/null @@ -1,506 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO - -#include "mongo/platform/basic.h" - -#include "mongo/executor/network_interface_asio.h" - -#include <type_traits> -#include <utility> - -#include "mongo/base/static_assert.h" -#include "mongo/db/dbmessage.h" -#include "mongo/db/jsobj.h" -#include "mongo/executor/async_stream_interface.h" -#include "mongo/executor/connection_pool_asio.h" -#include "mongo/rpc/factory.h" -#include "mongo/rpc/metadata/metadata_hook.h" -#include "mongo/rpc/protocol.h" -#include "mongo/rpc/reply_interface.h" -#include "mongo/stdx/memory.h" -#include "mongo/util/assert_util.h" -#include "mongo/util/fail_point_service.h" -#include "mongo/util/log.h" -#include "mongo/util/mongoutils/str.h" - -namespace mongo { -namespace executor { - -/** - * The following send - receive utility functions are "stateless" in that they exist - * apart from the AsyncOp state machine. - */ - -namespace { - -using namespace std::literals::string_literals; - -MONGO_FP_DECLARE(NetworkInterfaceASIOasyncRunCommandFail); - -using asio::ip::tcp; -using ResponseStatus = TaskExecutor::ResponseStatus; - -// A type conforms to the NetworkHandler concept if it is a callable type that takes a -// std::error_code and std::size_t and returns void. The std::error_code parameter is used -// to inform the handler if the asynchronous operation it was waiting on succeeded, and the -// size_t parameter conveys how many bytes were read or written. -template <typename FunctionLike> -using IsNetworkHandler = - std::is_convertible<FunctionLike, stdx::function<void(std::error_code, std::size_t)>>; - -template <typename Handler> -void asyncSendMessage(AsyncStreamInterface& stream, Message* m, Handler&& handler) { - MONGO_STATIC_ASSERT_MSG( - IsNetworkHandler<Handler>::value, - "Handler passed to asyncSendMessage does not conform to NetworkHandler concept"); - m->header().setResponseToMsgId(0); - m->header().setId(nextMessageId()); - // TODO: Some day we may need to support vector messages. - fassert(28708, m->buf() != 0); - stream.write(asio::buffer(m->buf(), m->size()), std::forward<Handler>(handler)); -} - -template <typename Handler> -void asyncRecvMessageHeader(AsyncStreamInterface& stream, - MSGHEADER::Value* header, - Handler&& handler) { - MONGO_STATIC_ASSERT_MSG( - IsNetworkHandler<Handler>::value, - "Handler passed to asyncRecvMessageHeader does not conform to NetworkHandler concept"); - stream.read(asio::buffer(header->view().view2ptr(), sizeof(decltype(*header))), - std::forward<Handler>(handler)); -} - -template <typename Handler> -void asyncRecvMessageBody(AsyncStreamInterface& stream, - MSGHEADER::Value* header, - Message* m, - Handler&& handler) { - MONGO_STATIC_ASSERT_MSG( - IsNetworkHandler<Handler>::value, - "Handler passed to asyncRecvMessageBody does not conform to NetworkHandler concept"); - // validate message length - int len = header->constView().getMessageLength(); - if (len == 542393671) { - LOG(3) << "attempt to access MongoDB over HTTP on the native driver port."; - return handler(make_error_code(ErrorCodes::ProtocolError), 0); - } else if (static_cast<size_t>(len) < sizeof(MSGHEADER::Value) || - static_cast<size_t>(len) > MaxMessageSizeBytes) { - warning() << "recv(): message len " << len << " is invalid. " - << "Min " << sizeof(MSGHEADER::Value) << " Max: " << MaxMessageSizeBytes; - return handler(make_error_code(ErrorCodes::InvalidLength), 0); - } - - int z = (len + 1023) & 0xfffffc00; - invariant(z >= len); - m->setData(SharedBuffer::allocate(z)); - MsgData::View mdView = m->buf(); - - // copy header data into master buffer - int headerLen = sizeof(MSGHEADER::Value); - memcpy(mdView.view2ptr(), header, headerLen); - int bodyLength = len - headerLen; - invariant(bodyLength >= 0); - - // receive remaining data into md->data - stream.read(asio::buffer(mdView.data(), bodyLength), std::forward<Handler>(handler)); -} - -ResponseStatus decodeRPC(Message* received, - rpc::Protocol protocol, - Milliseconds elapsed, - const HostAndPort& source, - rpc::EgressMetadataHook* metadataHook) { - try { - // makeReply will throw if the reply is invalid - auto reply = rpc::makeReply(received); - if (reply->getProtocol() != protocol) { - auto requestProtocol = rpc::toString(static_cast<rpc::ProtocolSet>(protocol)); - if (!requestProtocol.isOK()) - return {requestProtocol.getStatus(), elapsed}; - - return {ErrorCodes::RPCProtocolNegotiationFailed, - str::stream() << "Mismatched RPC protocols - request was '" - << requestProtocol.getValue().toString() - << "' '" - << " but reply was '" - << networkOpToString(received->operation()) - << "'", - elapsed}; - } - auto commandReply = reply->getCommandReply(); - auto replyMetadata = reply->getMetadata(); - - // Handle incoming reply metadata. - if (metadataHook) { - auto listenStatus = callNoexcept(*metadataHook, - &rpc::EgressMetadataHook::readReplyMetadata, - nullptr, // adding operationTime is handled via lambda - source.toString(), - replyMetadata); - if (!listenStatus.isOK()) { - return {listenStatus, elapsed}; - } - } - - return {RemoteCommandResponse( - std::move(*received), std::move(commandReply), std::move(replyMetadata), elapsed)}; - } catch (...) { - return {exceptionToStatus(), elapsed}; - } -} - -} // namespace - -NetworkInterfaceASIO::AsyncCommand::AsyncCommand(AsyncConnection* conn, - Message&& command, - Date_t now, - const HostAndPort& target) - : _conn(conn), _toSend(std::move(command)), _start(now), _target(target) { - _toSend.header().setResponseToMsgId(0); -} - -NetworkInterfaceASIO::AsyncConnection& NetworkInterfaceASIO::AsyncCommand::conn() { - return *_conn; -} - -Message& NetworkInterfaceASIO::AsyncCommand::toSend() { - return _toSend; -} - -Message& NetworkInterfaceASIO::AsyncCommand::toRecv() { - return _toRecv; -} - -MSGHEADER::Value& NetworkInterfaceASIO::AsyncCommand::header() { - return _header; -} - -ResponseStatus NetworkInterfaceASIO::AsyncCommand::response(AsyncOp* op, - rpc::Protocol protocol, - Date_t now, - rpc::EgressMetadataHook* metadataHook) { - auto& received = _toRecv; - if (received.operation() == dbCompressed) { - auto swm = conn().getCompressorManager().decompressMessage(received); - if (!swm.isOK()) { - return swm.getStatus(); - } - received = std::move(swm.getValue()); - } - - auto rs = decodeRPC(&received, protocol, now - _start, _target, metadataHook); - if (rs.isOK()) - op->setResponseMetadata(rs.metadata); - return rs; -} - -void NetworkInterfaceASIO::_startCommand(AsyncOp* op) { - LOG(3) << "running command " << redact(op->request().cmdObj) << " against database " - << op->request().dbname << " across network to " << op->request().target.toString(); - if (inShutdown()) { - return; - } - - // _connect() will continue the state machine. - _connect(op); -} - -void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) { - // The way that we connect connections for the connection pool is by - // starting the callback chain with connect(), but getting off at the first - // _beginCommunication. I.e. all AsyncOp's start off with _inSetup == true - // and arrive here as they're connected and authed. Once they hit here, we - // return to the connection pool's get() callback with _inSetup == false, - // so we can proceed with user operations after they return to this - // codepath. - if (op->_inSetup) { - auto host = op->request().target; - auto getConnectionDuration = now() - op->start(); - log() << "Successfully connected to " << host << ", took " << getConnectionDuration << " (" - << _connectionPool.getNumConnectionsPerHost(host) << " connections now open to " - << host << ")"; - op->_inSetup = false; - op->finish(RemoteCommandResponse()); - return; - } - - LOG(3) << "Initiating asynchronous command: " << redact(op->request().toString()); - - auto beginStatus = op->beginCommand(op->request()); - if (!beginStatus.isOK()) { - return _completeOperation(op, beginStatus); - } - - _asyncRunCommand(op, [this, op](std::error_code ec, size_t bytes) { - _validateAndRun(op, ec, [this, op]() { _completedOpCallback(op); }); - }); -} - -void NetworkInterfaceASIO::_completedOpCallback(AsyncOp* op) { - auto response = op->command().response(op, op->operationProtocol(), now(), _metadataHook.get()); - _completeOperation(op, response); -} - -void NetworkInterfaceASIO::_networkErrorCallback(AsyncOp* op, const std::error_code& ec) { - ErrorCodes::Error errorCode = (ec.category() == mongoErrorCategory()) - ? ErrorCodes::Error(ec.value()) - : ErrorCodes::HostUnreachable; - _completeOperation(op, {errorCode, ec.message(), Milliseconds(now() - op->_start)}); -} - -// NOTE: This method may only be called by ASIO threads -// (do not call from methods entered by TaskExecutor threads) -void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, ResponseStatus resp) { - auto metadata = op->getResponseMetadata(); - if (!metadata.isEmpty()) { - resp.metadata = metadata; - } - - // Cancel this operation's timeout. Note that the timeout callback may already be running, - // may have run, or may have already been scheduled to run in the near future. - if (op->_timeoutAlarm) { - op->_timeoutAlarm->cancel(); - } - - if (ErrorCodes::isExceededTimeLimitError(resp.status.code())) { - _numTimedOutOps.fetchAndAdd(1); - } - - if (op->_inSetup) { - // If we are in setup we should only be here if we failed to connect. - MONGO_ASIO_INVARIANT(!resp.isOK(), "Failed to connect in setup", op); - // If we fail during connection, we won't be able to access any of op's members after - // calling finish(), so we return here. - log() << "Failed to connect to " << op->request().target << " - " << redact(resp.status); - op->finish(std::move(resp)); - return; - } - - if (op->_inRefresh) { - // If we are in refresh we should only be here if we failed to heartbeat. - MONGO_ASIO_INVARIANT(!resp.isOK(), "In refresh, but did not fail to heartbeat", op); - // If we fail during heartbeating, we won't be able to access any of op's members after - // calling finish(), so we return here. - log() << "Failed asio heartbeat to " - << (op->commandIsInitialized() ? op->command().target().toString() : "unknown"s) - << " - " << redact(resp.status); - _numFailedOps.fetchAndAdd(1); - op->finish(std::move(resp)); - return; - } - - if (!resp.isOK()) { - // In the case that resp is not OK, but _inSetup is false, we are using a connection - // that we got from the pool to execute a command, but it failed for some reason. - if (op->commandIsInitialized() && shouldLog(LogstreamBuilder::severityCast(2))) { - const auto performLog = [&resp](Message& message) { - LOG(2) << "Failed to send message. Reason: " << redact(resp.status) << ". Message: " - << rpc::opMsgRequestFromAnyProtocol(message).body.toString( - logger::globalLogDomain()->shouldRedactLogs()); - }; - - // Message might be compressed, decompress in that case so we can log the body - Message& maybeCompressed = op->command().toSend(); - if (maybeCompressed.operation() != dbCompressed) { - performLog(maybeCompressed); - } else { - StatusWith<Message> decompressedMessage = - op->command().conn().getCompressorManager().decompressMessage(maybeCompressed); - if (decompressedMessage.isOK()) { - performLog(decompressedMessage.getValue()); - } else { - LOG(2) << "Failed to execute a command. Reason: " << redact(resp.status) - << ". Decompression failed with: " - << redact(decompressedMessage.getStatus()); - } - } - } else { - LOG(2) << "Failed to execute a command. Reason: " << redact(resp.status); - } - - if (resp.status.code() != ErrorCodes::CallbackCanceled) { - _numFailedOps.fetchAndAdd(1); - } - } else { - _numSucceededOps.fetchAndAdd(1); - } - - std::unique_ptr<AsyncOp> ownedOp; - - { - stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); - - auto iter = _inProgress.find(op); - - MONGO_ASIO_INVARIANT_INLOCK( - iter != _inProgress.end(), "Could not find AsyncOp in _inProgress", op); - - ownedOp = std::move(iter->second); - _inProgress.erase(iter); - } - - op->finish(std::move(resp)); - - MONGO_ASIO_INVARIANT(static_cast<bool>(ownedOp), "Invalid AsyncOp", op); - - auto conn = std::move(op->_connectionPoolHandle); - auto asioConn = static_cast<connection_pool_asio::ASIOConnection*>(conn.get()); - - // Prevent any other threads or callbacks from accessing this op so we may safely complete - // and destroy it. It is key that we do this after we remove the op from the _inProgress map - // or someone else in cancelCommand could read the bumped generation and cancel the next - // command that uses this op. See SERVER-20556. - { - stdx::lock_guard<stdx::mutex> lk(op->_access->mutex); - ++(op->_access->id); - } - - // We need to bump the generation BEFORE we call reset() or we could flip the timeout in the - // timeout callback before returning the AsyncOp to the pool. - ownedOp->reset(); - - asioConn->bindAsyncOp(std::move(ownedOp)); - if (!resp.isOK()) { - asioConn->indicateFailure(resp.status); - } else { - asioConn->indicateUsed(); - asioConn->indicateSuccess(); - } - - signalWorkAvailable(); -} - -void NetworkInterfaceASIO::_asyncRunCommand(AsyncOp* op, NetworkOpHandler handler) { - LOG(2) << "Starting asynchronous command " << op->request().id << " on host " - << op->request().target.toString(); - - if (MONGO_FAIL_POINT(NetworkInterfaceASIOasyncRunCommandFail)) { - _validateAndRun(op, asio::error::basic_errors::network_unreachable, [] {}); - return; - } - - // We invert the following steps below to run a command: - // 1 - send the given command - // 2 - receive a header for the response - // 3 - validate and receive response body - // 4 - advance the state machine by calling handler() - auto& cmd = op->command(); - - // Step 4 - auto recvMessageCallback = [handler](std::error_code ec, size_t bytes) { - // We don't call _validateAndRun here as we assume the caller will. - handler(ec, bytes); - }; - - // Step 3 - auto recvHeaderCallback = [this, &cmd, handler, recvMessageCallback, op](std::error_code ec, - size_t bytes) { - // The operation could have been canceled after starting the command, but before - // receiving the header - _validateAndRun(op, ec, [recvMessageCallback, bytes, &cmd, handler] { - // validate response id - uint32_t expectedId = cmd.toSend().header().getId(); - uint32_t actualId = cmd.header().constView().getResponseToMsgId(); - if (actualId != expectedId) { - LOG(3) << "got wrong response:" - << " expected response id: " << expectedId - << ", got response id: " << actualId; - return handler(make_error_code(ErrorCodes::ProtocolError), bytes); - } - - asyncRecvMessageBody( - cmd.conn().stream(), &cmd.header(), &cmd.toRecv(), std::move(recvMessageCallback)); - }); - }; - - // Step 2 - auto sendMessageCallback = [this, &cmd, handler, recvHeaderCallback, op](std::error_code ec, - size_t bytes) { - _validateAndRun(op, ec, [&cmd, recvHeaderCallback] { - asyncRecvMessageHeader( - cmd.conn().stream(), &cmd.header(), std::move(recvHeaderCallback)); - }); - - - }; - - // Step 1 - asyncSendMessage(cmd.conn().stream(), &cmd.toSend(), std::move(sendMessageCallback)); -} - -void NetworkInterfaceASIO::_runConnectionHook(AsyncOp* op) { - if (!_hook) { - return _beginCommunication(op); - } - - auto swOptionalRequest = - callNoexcept(*_hook, &NetworkConnectionHook::makeRequest, op->request().target); - - if (!swOptionalRequest.isOK()) { - return _completeOperation(op, swOptionalRequest.getStatus()); - } - - auto optionalRequest = std::move(swOptionalRequest.getValue()); - - if (optionalRequest == boost::none) { - return _beginCommunication(op); - } - - auto beginStatus = op->beginCommand(*optionalRequest); - if (!beginStatus.isOK()) { - return _completeOperation(op, beginStatus); - } - - auto finishHook = [this, op]() { - auto response = - op->command().response(op, op->operationProtocol(), now(), _metadataHook.get()); - - if (!response.isOK()) { - return _completeOperation(op, response); - } - - auto handleStatus = callNoexcept( - *_hook, &NetworkConnectionHook::handleReply, op->request().target, std::move(response)); - - if (!handleStatus.isOK()) { - return _completeOperation(op, handleStatus); - } - - return _beginCommunication(op); - }; - - return _asyncRunCommand(op, [this, op, finishHook](std::error_code ec, std::size_t bytes) { - _validateAndRun(op, ec, finishHook); - }); -} - - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/network_interface_asio_connect.cpp b/src/mongo/executor/network_interface_asio_connect.cpp deleted file mode 100644 index 57a9f302b58..00000000000 --- a/src/mongo/executor/network_interface_asio_connect.cpp +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO - -#include "mongo/platform/basic.h" - -#include "mongo/executor/network_interface_asio.h" - -#include <utility> - -#include "mongo/base/system_error.h" -#include "mongo/config.h" -#include "mongo/db/wire_version.h" -#include "mongo/executor/async_stream.h" -#include "mongo/executor/async_stream_factory.h" -#include "mongo/executor/async_stream_interface.h" -#include "mongo/stdx/memory.h" -#include "mongo/util/log.h" - -namespace mongo { -namespace executor { - -using asio::ip::tcp; - -NetworkInterfaceASIO::AsyncConnection::AsyncConnection(std::unique_ptr<AsyncStreamInterface> stream, - rpc::ProtocolSet protocols) - : _stream(std::move(stream)), - _serverProtocols(protocols), - _clientProtocols(rpc::computeProtocolSet(WireSpec::instance().outgoing)) {} - -AsyncStreamInterface& NetworkInterfaceASIO::AsyncConnection::stream() { - return *_stream; -} - -void NetworkInterfaceASIO::AsyncConnection::cancel() { - _stream->cancel(); -} - -rpc::ProtocolSet NetworkInterfaceASIO::AsyncConnection::serverProtocols() const { - return _serverProtocols; -} - -rpc::ProtocolSet NetworkInterfaceASIO::AsyncConnection::clientProtocols() const { - return _clientProtocols; -} - -void NetworkInterfaceASIO::AsyncConnection::setServerProtocols(rpc::ProtocolSet protocols) { - _serverProtocols = protocols; -} - -void NetworkInterfaceASIO::_connect(AsyncOp* op) { - log() << "Connecting to " << op->request().target.toString(); - - tcp::resolver::query query(op->request().target.host(), - std::to_string(op->request().target.port())); - // TODO: Investigate how we might hint or use shortcuts to resolve when possible. - const auto thenConnect = [this, op](std::error_code ec, tcp::resolver::iterator endpoints) { - if (endpoints == tcp::resolver::iterator()) { - // Workaround a bug in ASIO returning an invalid resolver iterator (with a non-error - // std::error_code) when file descriptors are exhausted. - ec = make_error_code(ErrorCodes::HostUnreachable); - } - _validateAndRun( - op, ec, [this, op, endpoints]() { _setupSocket(op, std::move(endpoints)); }); - }; - op->resolver().async_resolve(query, op->_strand.wrap(std::move(thenConnect))); -} - -void NetworkInterfaceASIO::_setupSocket(AsyncOp* op, tcp::resolver::iterator endpoints) { - // TODO: Consider moving this call to post-auth so we only assign completed connections. - { - auto stream = _streamFactory->makeStream(&op->strand(), op->request().target); - op->setConnection({std::move(stream), rpc::supports::kOpQueryOnly}); - } - - auto& stream = op->connection().stream(); - - stream.connect(std::move(endpoints), [this, op](std::error_code ec) { - _validateAndRun(op, ec, [this, op]() { _runIsMaster(op); }); - }); -} - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/network_interface_asio_integration_test.cpp b/src/mongo/executor/network_interface_asio_integration_test.cpp deleted file mode 100644 index 79688a60d19..00000000000 --- a/src/mongo/executor/network_interface_asio_integration_test.cpp +++ /dev/null @@ -1,236 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO - -#include "mongo/platform/basic.h" - -#include <algorithm> -#include <exception> - -#include "mongo/client/connection_string.h" -#include "mongo/executor/network_interface_asio_integration_fixture.h" -#include "mongo/executor/network_interface_asio_test_utils.h" -#include "mongo/platform/random.h" -#include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/stdx/future.h" -#include "mongo/stdx/memory.h" -#include "mongo/unittest/integration_test.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/assert_util.h" -#include "mongo/util/concurrency/thread_pool.h" -#include "mongo/util/log.h" -#include "mongo/util/scopeguard.h" - -namespace mongo { -namespace executor { -namespace { - -TEST_F(NetworkInterfaceASIOIntegrationFixture, Ping) { - startNet(); - assertCommandOK("admin", BSON("ping" << 1)); -} - -TEST_F(NetworkInterfaceASIOIntegrationFixture, Timeouts) { - startNet(); - // This sleep command will take 10 seconds, so we should time out client side first given - // our timeout of 100 milliseconds. - assertCommandFailsOnClient("admin", - BSON("sleep" << 1 << "lock" - << "none" - << "secs" - << 10), - ErrorCodes::NetworkInterfaceExceededTimeLimit, - Milliseconds(100)); - - // Run a sleep command that should return before we hit the ASIO timeout. - assertCommandOK("admin", - BSON("sleep" << 1 << "lock" - << "none" - << "secs" - << 1), - Milliseconds(10000000)); -} - -class StressTestOp { -public: - using Fixture = NetworkInterfaceASIOIntegrationFixture; - using Pool = ThreadPoolInterface; - - void run(Fixture* fixture, - StartCommandCB onFinish, - Milliseconds timeout = RemoteCommandRequest::kNoTimeout) { - auto cb = makeCallbackHandle(); - - RemoteCommandRequest request{unittest::getFixtureConnectionString().getServers()[0], - "admin", - _command, - nullptr, - timeout}; - - fixture->startCommand(cb, request, onFinish); - - if (_cancel) { - invariant(fixture->getRandomNumberGenerator()); - sleepmillis(fixture->getRandomNumberGenerator()->nextInt32(10)); - fixture->net().cancelCommand(cb); - } - } - - static void runTimeoutOp(Fixture* fixture, StartCommandCB onFinish) { - return StressTestOp(BSON("sleep" << 1 << "lock" - << "none" - << "secs" - << 1), - false) - .run(fixture, onFinish, Milliseconds(100)); - } - - static void runCompleteOp(Fixture* fixture, StartCommandCB onFinish) { - return StressTestOp(BSON("sleep" << 1 << "lock" - << "none" - << "millis" - << 100), - false) - .run(fixture, onFinish); - } - - static void runCancelOp(Fixture* fixture, StartCommandCB onFinish) { - return StressTestOp(BSON("sleep" << 1 << "lock" - << "none" - << "secs" - << 10), - true) - .run(fixture, onFinish); - } - - static void runLongOp(Fixture* fixture, StartCommandCB onFinish) { - return StressTestOp(BSON("sleep" << 1 << "lock" - << "none" - << "secs" - << 30), - false) - .run(fixture, onFinish); - } - -private: - StressTestOp(const BSONObj& command, bool cancel) : _command(command), _cancel(cancel) {} - - BSONObj _command; - bool _cancel; -}; - -TEST_F(NetworkInterfaceASIOIntegrationFixture, StressTest) { - constexpr std::size_t numOps = 500; - RemoteCommandResponse testResults[numOps]; - ErrorCodes::Error expectedResults[numOps]; - CountdownLatch cl(numOps); - - startNet(); - - std::unique_ptr<SecureRandom> seedSource{SecureRandom::create()}; - auto seed = seedSource->nextInt64(); - - log() << "Random seed is " << seed; - auto rng = PseudoRandom(seed); // TODO: read from command line - setRandomNumberGenerator(&rng); - log() << "Starting stress test..."; - - for (std::size_t i = 0; i < numOps; ++i) { - // stagger operations slightly to mitigate connection pool contention - sleepmillis(rng.nextInt32(50)); - - auto r = rng.nextCanonicalDouble(); - - auto cb = [&testResults, &cl, i](const RemoteCommandResponse& resp) { - testResults[i] = resp; - cl.countDown(); - }; - - if (r < .3) { - expectedResults[i] = ErrorCodes::CallbackCanceled; - StressTestOp::runCancelOp(this, cb); - } else if (r < .7) { - expectedResults[i] = ErrorCodes::OK; - StressTestOp::runCompleteOp(this, cb); - } else if (r < .99) { - expectedResults[i] = ErrorCodes::NetworkInterfaceExceededTimeLimit; - StressTestOp::runTimeoutOp(this, cb); - } else { - // Just a sprinkling of long ops, to mitigate connection pool contention - expectedResults[i] = ErrorCodes::OK; - StressTestOp::runLongOp(this, cb); - } - }; - - cl.await(); - - for (std::size_t i = 0; i < numOps; ++i) { - const auto& resp = testResults[i]; - auto ec = resp.isOK() ? getStatusFromCommandResult(resp.data) : resp.status; - ASSERT_EQ(ec, expectedResults[i]); - } -} - -// Hook that intentionally never finishes -class HangingHook : public executor::NetworkConnectionHook { - Status validateHost(const HostAndPort&, const RemoteCommandResponse&) final { - return Status::OK(); - } - - StatusWith<boost::optional<RemoteCommandRequest>> makeRequest( - const HostAndPort& remoteHost) final { - return {boost::make_optional(RemoteCommandRequest(remoteHost, - "admin", - BSON("sleep" << 1 << "lock" - << "none" - << "secs" - << 100000000), - BSONObj(), - nullptr))}; - } - - Status handleReply(const HostAndPort& remoteHost, RemoteCommandResponse&& response) final { - MONGO_UNREACHABLE; - } -}; - - -// Test that we time out a command if the connection hook hangs. -TEST_F(NetworkInterfaceASIOIntegrationFixture, HookHangs) { - NetworkInterfaceASIO::Options options; - options.networkConnectionHook = stdx::make_unique<HangingHook>(); - startNet(std::move(options)); - - assertCommandFailsOnClient( - "admin", BSON("ping" << 1), ErrorCodes::NetworkInterfaceExceededTimeLimit, Seconds(1)); -} - -} // namespace -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/network_interface_asio_operation.cpp b/src/mongo/executor/network_interface_asio_operation.cpp deleted file mode 100644 index 4a78f0c076e..00000000000 --- a/src/mongo/executor/network_interface_asio_operation.cpp +++ /dev/null @@ -1,408 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO - -#include "mongo/platform/basic.h" - -#include "mongo/executor/network_interface_asio.h" - -#include "mongo/base/status_with.h" -#include "mongo/db/query/getmore_request.h" -#include "mongo/db/query/query_request.h" -#include "mongo/executor/async_stream_interface.h" -#include "mongo/executor/connection_pool_asio.h" -#include "mongo/executor/network_interface_asio.h" -#include "mongo/rpc/factory.h" -#include "mongo/rpc/metadata/metadata_hook.h" -#include "mongo/util/log.h" -#include "mongo/util/time_support.h" - -#define MONGO_ASYNC_OP_INVARIANT(_Expression, _Error) \ - do { \ - if (MONGO_unlikely(!(_Expression))) { \ - _failWithInfo(__FILE__, __LINE__, _Error); \ - } \ - } while (false) - -namespace mongo { -namespace executor { - -using asio::ip::tcp; - -namespace { - -// Used to generate unique identifiers for AsyncOps, the same AsyncOp may -// be used to run multiple distinct requests. -AtomicUInt64 kAsyncOpIdCounter(0); - -} // namespace - -const NetworkInterfaceASIO::TableRow NetworkInterfaceASIO::AsyncOp::kFieldLabels = { - "", "id", "states", "start_time", "request"}; - -NetworkInterfaceASIO::AsyncOp::AsyncOp(NetworkInterfaceASIO* const owner, - const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish, - Date_t now) - : _owner(owner), - _cbHandle(cbHandle), - _request(request), - _onFinish(onFinish), - _start(now), - _resolver(owner->_io_service), - _id(kAsyncOpIdCounter.addAndFetch(1)), - _access(std::make_shared<AsyncOp::AccessControl>()), - _inSetup(true), - _inRefresh(false), - _strand(owner->_io_service) { - // No need to take lock when we aren't yet constructed. - _transitionToState_inlock(State::kUninitialized); -} - -void NetworkInterfaceASIO::AsyncOp::cancel() { - LOG(2) << "Canceling operation; original request was: " << redact(request().toString()); - stdx::lock_guard<stdx::mutex> lk(_access->mutex); - auto access = _access; - auto generation = access->id; - - // An operation may be in mid-flight when it is canceled, so we cancel any - // in-progress async ops but do not complete the operation now. - - _strand.post([this, access, generation] { - stdx::lock_guard<stdx::mutex> lk(access->mutex); - if (generation == access->id) { - _transitionToState_inlock(AsyncOp::State::kCanceled); - if (_connection) { - _connection->cancel(); - } - } - }); -} - -bool NetworkInterfaceASIO::AsyncOp::canceled() const { - return _hasSeenState(State::kCanceled); -} - -void NetworkInterfaceASIO::AsyncOp::timeOut_inlock() { - LOG(2) << "Operation timing out; original request was: " << redact(request().toString()); - auto access = _access; - auto generation = access->id; - - // An operation may be in mid-flight when it times out, so we cancel any - // in-progress stream operations but do not complete the operation now. - - _strand.post([this, access, generation] { - stdx::lock_guard<stdx::mutex> lk(access->mutex); - if (generation == access->id) { - _transitionToState_inlock(AsyncOp::State::kTimedOut); - if (_connection) { - _connection->cancel(); - } - } - }); -} - -bool NetworkInterfaceASIO::AsyncOp::timedOut() const { - return _hasSeenState(State::kTimedOut); -} - -const TaskExecutor::CallbackHandle& NetworkInterfaceASIO::AsyncOp::cbHandle() const { - return _cbHandle; -} - -NetworkInterfaceASIO::AsyncConnection& NetworkInterfaceASIO::AsyncOp::connection() { - MONGO_ASYNC_OP_INVARIANT(_connection.is_initialized(), "Connection not yet initialized"); - return *_connection; -} - -void NetworkInterfaceASIO::AsyncOp::setConnection(AsyncConnection&& conn) { - MONGO_ASYNC_OP_INVARIANT(!_connection.is_initialized(), "Connection already initialized"); - _connection = std::move(conn); -} - -Status NetworkInterfaceASIO::AsyncOp::beginCommand(Message&& newCommand, - const HostAndPort& target) { - // NOTE: We operate based on the assumption that AsyncOp's - // AsyncConnection does not change over its lifetime. - MONGO_ASYNC_OP_INVARIANT(_connection.is_initialized(), - "Connection should not change over AsyncOp's lifetime"); - - auto swm = _connection->getCompressorManager().compressMessage(newCommand); - if (!swm.isOK()) - return swm.getStatus(); - - // Construct a new AsyncCommand object for each command. - _command.emplace(_connection.get_ptr(), std::move(swm.getValue()), _owner->now(), target); - return Status::OK(); -} - -Status NetworkInterfaceASIO::AsyncOp::beginCommand(const RemoteCommandRequest& request) { - return beginCommand( - rpc::messageFromOpMsgRequest( - operationProtocol(), - OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj, request.metadata)), - request.target); -} - -NetworkInterfaceASIO::AsyncCommand& NetworkInterfaceASIO::AsyncOp::command() { - MONGO_ASYNC_OP_INVARIANT(_command.is_initialized(), "Command is not yet initialized"); - - return *_command; -} - -bool NetworkInterfaceASIO::AsyncOp::commandIsInitialized() const { - return _command.is_initialized(); -} - -void NetworkInterfaceASIO::AsyncOp::finish(ResponseStatus&& rs) { - // We never hold the access lock when we call finish from NetworkInterfaceASIO. - _transitionToState(AsyncOp::State::kFinished); - - LOG(2) << "Request " << _request.id << " finished with response: " - << redact(rs.isOK() ? rs.data.toString() : rs.status.toString()); - - // Calling the completion handler may invalidate state in this op, so do it last. - _onFinish(rs); -} - -const RemoteCommandRequest& NetworkInterfaceASIO::AsyncOp::request() const { - return _request; -} - -void NetworkInterfaceASIO::AsyncOp::startProgress(Date_t startTime) { - _start = startTime; - // We never hold the access lock when we call startProgress from NetworkInterfaceASIO. - _transitionToState(AsyncOp::State::kInProgress); -} - -Date_t NetworkInterfaceASIO::AsyncOp::start() const { - return _start; -} - -rpc::Protocol NetworkInterfaceASIO::AsyncOp::operationProtocol() const { - MONGO_ASYNC_OP_INVARIANT(_operationProtocol.is_initialized(), "Protocol not yet set"); - return *_operationProtocol; -} - -void NetworkInterfaceASIO::AsyncOp::setOperationProtocol(rpc::Protocol proto) { - MONGO_ASYNC_OP_INVARIANT(!_operationProtocol.is_initialized(), "Protocol already set"); - _operationProtocol = proto; -} - -void NetworkInterfaceASIO::AsyncOp::setResponseMetadata(BSONObj m) { - _responseMetadata = m; -} - -BSONObj NetworkInterfaceASIO::AsyncOp::getResponseMetadata() { - return _responseMetadata; -} - -void NetworkInterfaceASIO::AsyncOp::reset() { - // We don't reset owner as it never changes - _cbHandle = {}; - _request = {}; - _onFinish = {}; - _connectionPoolHandle = {}; - // We don't reset _connection as we want to reuse it. - // Ditto for _operationProtocol. - _start = {}; - _timeoutAlarm.reset(); - // _id stays the same for the lifetime of this object. - _command = boost::none; - // _inSetup should always be false at this point. - // We never hold the access lock when we call this from NetworkInterfaceASIO. - clearStateTransitions(); -} - -void NetworkInterfaceASIO::AsyncOp::clearStateTransitions() { - _transitionToState(AsyncOp::State::kUninitialized); -} - -void NetworkInterfaceASIO::AsyncOp::setOnFinish(RemoteCommandCompletionFn&& onFinish) { - _onFinish = std::move(onFinish); -} - -// Return a string representation of the given state. -std::string NetworkInterfaceASIO::AsyncOp::_stateToString(AsyncOp::State state) const { - switch (state) { - case State::kUninitialized: - return "UNINITIALIZED"; - case State::kInProgress: - return "IN_PROGRESS"; - case State::kTimedOut: - return "TIMED_OUT"; - case State::kCanceled: - return "CANCELED"; - case State::kFinished: - return "DONE"; - case State::kNoState: - return "---"; - default: - MONGO_UNREACHABLE; - } -} - -std::string NetworkInterfaceASIO::AsyncOp::_stateString() const { - str::stream s; - s << "[ "; - - for (int i = 0; i < kMaxStateTransitions; i++) { - if (_states[i] == State::kNoState) { - break; - } - - if (i != 0) { - s << ", "; - } - - s << _stateToString(_states[i]); - } - - s << " ]"; - - return s; -} - -NetworkInterfaceASIO::TableRow NetworkInterfaceASIO::AsyncOp::getStringFields() const { - // We leave a placeholder for an asterisk - return {"", std::to_string(_id), _stateString(), _start.toString(), _request.toString()}; -} - -std::string NetworkInterfaceASIO::AsyncOp::toString() const { - str::stream s; - int fieldIdx = 1; - bool first = true; - - for (auto field : getStringFields()) { - if (field != "") { - if (first) { - first = false; - } else { - s << ", "; - } - - s << kFieldLabels[fieldIdx] << ": " << field; - fieldIdx++; - } - } - return s; -} - -bool NetworkInterfaceASIO::AsyncOp::operator==(const AsyncOp& other) const { - return _id == other._id; -} - -bool NetworkInterfaceASIO::AsyncOp::_hasSeenState(AsyncOp::State state) const { - return std::any_of(std::begin(_states), std::end(_states), [state](AsyncOp::State _state) { - return _state == state; - }); -} - -void NetworkInterfaceASIO::AsyncOp::_transitionToState(AsyncOp::State newState) { - stdx::lock_guard<stdx::mutex> lk(_access->mutex); - _transitionToState_inlock(newState); -} - -void NetworkInterfaceASIO::AsyncOp::_transitionToState_inlock(AsyncOp::State newState) { - if (newState == State::kUninitialized) { - _states[0] = State::kUninitialized; - for (int i = 1; i < kMaxStateTransitions; i++) { - _states[i] = State::kNoState; - } - return; - } - - // We can transition to cancelled multiple times if cancel() is called - // multiple times. Ignore that transition if we're already cancelled. - if (newState == State::kCanceled) { - // Find the current state - auto iter = std::find_if_not(_states.rbegin(), _states.rend(), [](const State& state) { - return state == State::kNoState; - }); - - // If its cancelled, just return - if (iter != _states.rend() && *iter == State::kCanceled) { - return; - } - } - - for (int i = 0; i < kMaxStateTransitions; i++) { - // We can't transition to the same state twice. - MONGO_ASYNC_OP_INVARIANT(_states[i] != newState, - "Cannot use the same state (" + _stateToString(newState) + - ") twice"); - - if (_states[i] == State::kNoState) { - // Perform some validation before transitioning. - switch (newState) { - case State::kInProgress: - MONGO_ASYNC_OP_INVARIANT(i == 1, - "kInProgress must come directly after kUninitialized"); - break; - case State::kTimedOut: - // During connection setup, it is possible to timeout before the stream is - // initialized, so we have to allow this transition. - break; - case State::kCanceled: - MONGO_ASYNC_OP_INVARIANT( - i > 1, _stateToString(newState) + " must come after kInProgress"); - MONGO_ASYNC_OP_INVARIANT(_states[i - 1] != State::kUninitialized, - _stateToString(newState) + - " cannot come after kUninitialized"); - break; - case State::kFinished: - MONGO_ASYNC_OP_INVARIANT(i > 0, "kFinished must come after kUninitialized"); - break; - default: - MONGO_UNREACHABLE; - } - - // Update state. - _states[i] = newState; - return; - } - } - - // If we get here, we've already transitioned to the max allowed states, explode. - MONGO_UNREACHABLE; -} - -void NetworkInterfaceASIO::AsyncOp::_failWithInfo(const char* file, - int line, - std::string error) const { - std::stringstream ss; - ss << "Invariant failure at " << file << ":" << line << ": " << error - << ", Operation: " << toString(); - Status status{ErrorCodes::InternalError, ss.str()}; - fassertFailedWithStatus(34430, status); -} - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/network_interface_asio_test.cpp b/src/mongo/executor/network_interface_asio_test.cpp deleted file mode 100644 index a5ca74a89c7..00000000000 --- a/src/mongo/executor/network_interface_asio_test.cpp +++ /dev/null @@ -1,1079 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO - -#include "mongo/platform/basic.h" - -#include <boost/optional.hpp> - -#include "mongo/base/status_with.h" -#include "mongo/db/jsobj.h" -#include "mongo/db/wire_version.h" -#include "mongo/executor/async_mock_stream_factory.h" -#include "mongo/executor/async_timer_mock.h" -#include "mongo/executor/network_interface_asio.h" -#include "mongo/executor/network_interface_asio_test_utils.h" -#include "mongo/executor/test_network_connection_hook.h" -#include "mongo/rpc/factory.h" -#include "mongo/rpc/legacy_reply_builder.h" -#include "mongo/rpc/message.h" -#include "mongo/rpc/metadata/egress_metadata_hook_list.h" -#include "mongo/stdx/memory.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/log.h" -#include "mongo/util/scopeguard.h" - -namespace mongo { -namespace executor { -namespace { - -using ResponseStatus = TaskExecutor::ResponseStatus; - -HostAndPort testHost{"localhost", 20000}; - -void initWireSpecMongoD() { - WireSpec& spec = WireSpec::instance(); - // accept from latest internal version - spec.incomingInternalClient.minWireVersion = LATEST_WIRE_VERSION; - spec.incomingInternalClient.maxWireVersion = LATEST_WIRE_VERSION; - // accept from any external version - spec.incomingExternalClient.minWireVersion = RELEASE_2_4_AND_BEFORE; - spec.incomingExternalClient.maxWireVersion = LATEST_WIRE_VERSION; - // connect to latest - spec.outgoing.minWireVersion = LATEST_WIRE_VERSION; - spec.outgoing.maxWireVersion = LATEST_WIRE_VERSION; -} - -// Utility function to use with mock streams -RemoteCommandResponse simulateIsMaster(RemoteCommandRequest request) { - ASSERT_EQ(std::string{request.cmdObj.firstElementFieldName()}, "isMaster"); - ASSERT_EQ(request.dbname, "admin"); - - RemoteCommandResponse response; - response.data = - BSON("minWireVersion" << mongo::WireSpec::instance().incomingExternalClient.minWireVersion - << "maxWireVersion" - << mongo::WireSpec::instance().incomingExternalClient.maxWireVersion); - return response; -} - -BSONObj objConcat(std::initializer_list<BSONObj> objs) { - BSONObjBuilder bob; - - for (const auto& obj : objs) { - bob.appendElements(obj); - } - - return bob.obj(); -} - -class NetworkInterfaceASIOTest : public mongo::unittest::Test { -public: - void setUp() override { - initWireSpecMongoD(); - NetworkInterfaceASIO::Options options; - - // Use mock timer factory - auto timerFactory = stdx::make_unique<AsyncTimerFactoryMock>(); - _timerFactory = timerFactory.get(); - options.timerFactory = std::move(timerFactory); - - auto factory = stdx::make_unique<AsyncMockStreamFactory>(); - // keep unowned pointer, but pass ownership to NIA - _streamFactory = factory.get(); - options.streamFactory = std::move(factory); - _net = stdx::make_unique<NetworkInterfaceASIO>(std::move(options)); - _net->startup(); - } - - void tearDown() override { - if (!_net->inShutdown()) { - _net->shutdown(); - } - } - - Deferred<RemoteCommandResponse> startCommand(const TaskExecutor::CallbackHandle& cbHandle, - RemoteCommandRequest& request) { - Deferred<RemoteCommandResponse> deferredResponse; - ASSERT_OK(net().startCommand( - cbHandle, request, [deferredResponse](ResponseStatus response) mutable { - deferredResponse.emplace(std::move(response)); - })); - return deferredResponse; - } - - // Helper to run startCommand and wait for it - RemoteCommandResponse startCommandSync(RemoteCommandRequest& request) { - auto deferred = startCommand(makeCallbackHandle(), request); - - // wait for the operation to complete - auto& result = deferred.get(); - return result; - } - - NetworkInterfaceASIO& net() { - return *_net; - } - - AsyncMockStreamFactory& streamFactory() { - return *_streamFactory; - } - - AsyncTimerFactoryMock& timerFactory() { - return *_timerFactory; - } - - void assertNumOps(uint64_t canceled, uint64_t timedOut, uint64_t failed, uint64_t succeeded) { - ASSERT_EQ(canceled, net().getNumCanceledOps()); - ASSERT_EQ(timedOut, net().getNumTimedOutOps()); - ASSERT_EQ(failed, net().getNumFailedOps()); - ASSERT_EQ(succeeded, net().getNumSucceededOps()); - } - -protected: - AsyncTimerFactoryMock* _timerFactory; - AsyncMockStreamFactory* _streamFactory; - std::unique_ptr<NetworkInterfaceASIO> _net; -}; - -TEST_F(NetworkInterfaceASIOTest, CancelMissingOperation) { - // This is just a sanity check, this action should have no effect. - net().cancelCommand(makeCallbackHandle()); - assertNumOps(0u, 0u, 0u, 0u); -} - -TEST_F(NetworkInterfaceASIOTest, CancelOperation) { - auto cbh = makeCallbackHandle(); - - // Kick off our operation - RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr}; - auto deferred = startCommand(cbh, request); - - // Create and initialize a stream so operation can begin - auto stream = streamFactory().blockUntilStreamExists(testHost); - ConnectEvent{stream}.skip(); - - // simulate isMaster reply. - stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) -> RemoteCommandResponse { - return simulateIsMaster(request); - }); - - { - // Cancel operation while blocked in the write for determinism. By calling cancel here we - // ensure that it is not a no-op and that the asio::operation_aborted error will always - // be returned to the NIA. - WriteEvent write{stream}; - net().cancelCommand(cbh); - } - - // Wait for op to complete, assert that it was canceled. - auto& result = deferred.get(); - ASSERT_EQ(ErrorCodes::CallbackCanceled, result.status); - ASSERT(result.elapsedMillis); - assertNumOps(1u, 0u, 0u, 0u); -} - -TEST_F(NetworkInterfaceASIOTest, ImmediateCancel) { - auto cbh = makeCallbackHandle(); - - RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr}; - auto deferred = startCommand(cbh, request); - - // Cancel immediately - net().cancelCommand(cbh); - - // Allow stream to connect so operation can return - auto stream = streamFactory().blockUntilStreamExists(testHost); - ConnectEvent{stream}.skip(); - stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) -> RemoteCommandResponse { - return simulateIsMaster(request); - }); - - auto& result = deferred.get(); - ASSERT_EQ(ErrorCodes::CallbackCanceled, result.status); - ASSERT(result.elapsedMillis); - // expect 0 completed ops because the op was canceled before getting a connection - assertNumOps(1u, 0u, 0u, 0u); -} - -TEST_F(NetworkInterfaceASIOTest, LateCancel) { - auto cbh = makeCallbackHandle(); - RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr}; - auto deferred = startCommand(cbh, request); - - // Allow stream to connect so operation can return - auto stream = streamFactory().blockUntilStreamExists(testHost); - ConnectEvent{stream}.skip(); - stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) -> RemoteCommandResponse { - return simulateIsMaster(request); - }); - - // Simulate user command - stream->simulateServer(rpc::Protocol::kOpMsg, - [](RemoteCommandRequest request) -> RemoteCommandResponse { - RemoteCommandResponse response; - response.data = BSONObj(); - response.metadata = BSONObj(); - return response; - }); - - // Allow to complete, then cancel, nothing should happen. - auto& result = deferred.get(); - net().cancelCommand(cbh); - - ASSERT(result.isOK()); - ASSERT(result.elapsedMillis); - assertNumOps(0u, 0u, 0u, 1u); -} - -TEST_F(NetworkInterfaceASIOTest, CancelWithNetworkError) { - auto cbh = makeCallbackHandle(); - RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr}; - auto deferred = startCommand(cbh, request); - - // Create and initialize a stream so operation can begin - auto stream = streamFactory().blockUntilStreamExists(testHost); - ConnectEvent{stream}.skip(); - - // simulate isMaster reply. - stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) -> RemoteCommandResponse { - return simulateIsMaster(request); - }); - - { - WriteEvent{stream}.skip(); - ReadEvent read{stream}; - - // Trigger both a cancellation and a network error - stream->setError(make_error_code(ErrorCodes::HostUnreachable)); - net().cancelCommand(cbh); - } - - // Wait for op to complete, assert that cancellation error had precedence. - auto& result = deferred.get(); - ASSERT(result.status == ErrorCodes::CallbackCanceled); - ASSERT(result.elapsedMillis); - assertNumOps(1u, 0u, 0u, 0u); -} - -TEST_F(NetworkInterfaceASIOTest, CancelWithTimeout) { - auto cbh = makeCallbackHandle(); - RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr}; - auto deferred = startCommand(cbh, request); - - // Create and initialize a stream so operation can begin - auto stream = streamFactory().blockUntilStreamExists(testHost); - ConnectEvent{stream}.skip(); - - stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) -> RemoteCommandResponse { - return simulateIsMaster(request); - }); - - { - WriteEvent write{stream}; - - // Trigger both a cancellation and a timeout - net().cancelCommand(cbh); - timerFactory().fastForward(Milliseconds(500)); - } - - // Wait for op to complete, assert that cancellation error had precedence. - auto& result = deferred.get(); - ASSERT_EQ(ErrorCodes::CallbackCanceled, result.status); - ASSERT(result.elapsedMillis); - assertNumOps(1u, 0u, 0u, 0u); -} - -TEST_F(NetworkInterfaceASIOTest, TimeoutWithNetworkError) { - auto cbh = makeCallbackHandle(); - RemoteCommandRequest request{ - testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr, Milliseconds(1000)}; - auto deferred = startCommand(cbh, request); - - // Create and initialize a stream so operation can begin - auto stream = streamFactory().blockUntilStreamExists(testHost); - ConnectEvent{stream}.skip(); - stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) -> RemoteCommandResponse { - return simulateIsMaster(request); - }); - - { - WriteEvent{stream}.skip(); - ReadEvent read{stream}; - - // Trigger both a timeout and a network error - stream->setError(make_error_code(ErrorCodes::HostUnreachable)); - timerFactory().fastForward(Milliseconds(2000)); - } - - // Wait for op to complete, assert that timeout had precedence. - auto& result = deferred.get(); - ASSERT_EQ(ErrorCodes::NetworkInterfaceExceededTimeLimit, result.status); - ASSERT(result.elapsedMillis); - assertNumOps(0u, 1u, 1u, 0u); -} - -TEST_F(NetworkInterfaceASIOTest, CancelWithTimeoutAndNetworkError) { - auto cbh = makeCallbackHandle(); - RemoteCommandRequest request{ - testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr, Milliseconds(1000)}; - auto deferred = startCommand(cbh, request); - - // Create and initialize a stream so operation can begin - auto stream = streamFactory().blockUntilStreamExists(testHost); - ConnectEvent{stream}.skip(); - stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) -> RemoteCommandResponse { - return simulateIsMaster(request); - }); - - { - WriteEvent{stream}.skip(); - ReadEvent read{stream}; - - // Trigger a timeout, a cancellation, and a network error - stream->setError(make_error_code(ErrorCodes::HostUnreachable)); - timerFactory().fastForward(Milliseconds(2000)); - net().cancelCommand(cbh); - } - - // Wait for op to complete, assert that the cancellation had precedence. - auto& result = deferred.get(); - ASSERT_EQ(ErrorCodes::CallbackCanceled, result.status); - ASSERT(result.elapsedMillis); - assertNumOps(1u, 0u, 0u, 0u); -} - -TEST_F(NetworkInterfaceASIOTest, AsyncOpTimeout) { - // Kick off operation - auto cb = makeCallbackHandle(); - Milliseconds timeout(1000); - RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr, timeout}; - auto deferred = startCommand(cb, request); - - // Create and initialize a stream so operation can begin - auto stream = streamFactory().blockUntilStreamExists(testHost); - ConnectEvent{stream}.skip(); - - // Simulate isMaster reply. - stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) -> RemoteCommandResponse { - return simulateIsMaster(request); - }); - - { - // Wait for the operation to block on write so we know it's been added. - WriteEvent write{stream}; - - // Get the timer factory - auto& factory = timerFactory(); - - // Advance clock but not enough to force a timeout, assert still active - factory.fastForward(Milliseconds(500)); - ASSERT(!deferred.hasCompleted()); - - // Advance clock and force timeout - factory.fastForward(Milliseconds(500)); - } - - auto& result = deferred.get(); - ASSERT_EQ(ErrorCodes::NetworkInterfaceExceededTimeLimit, result.status); - ASSERT(result.elapsedMillis); - assertNumOps(0u, 1u, 1u, 0u); -} - -TEST_F(NetworkInterfaceASIOTest, StartCommand) { - RemoteCommandRequest request{testHost, "testDB", BSON("foo" << 1), BSON("bar" << 1), nullptr}; - auto deferred = startCommand(makeCallbackHandle(), request); - - auto stream = streamFactory().blockUntilStreamExists(testHost); - - // Allow stream to connect. - ConnectEvent{stream}.skip(); - - // simulate isMaster reply. - stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) -> RemoteCommandResponse { - return simulateIsMaster(request); - }); - - auto expectedMetadata = BSON("meep" - << "beep"); - auto expectedCommandReply = BSON("boop" - << "bop" - << "ok" - << 1.0); - - auto expectedCommandReplyWithMetadata = objConcat({expectedCommandReply, expectedMetadata}); - - // simulate user command - stream->simulateServer( - rpc::Protocol::kOpMsg, [&](RemoteCommandRequest request) -> RemoteCommandResponse { - ASSERT_EQ(std::string{request.cmdObj.firstElementFieldName()}, "foo"); - ASSERT_EQ(request.dbname, "testDB"); - - RemoteCommandResponse response; - response.data = expectedCommandReply; - response.metadata = expectedMetadata; - return response; - }); - - auto& res = deferred.get(); - ASSERT(res.elapsedMillis); - uassertStatusOK(res.status); - ASSERT_BSONOBJ_EQ(res.data, expectedCommandReplyWithMetadata); - ASSERT_BSONOBJ_EQ(res.metadata, expectedCommandReplyWithMetadata); - assertNumOps(0u, 0u, 0u, 1u); -} - -TEST_F(NetworkInterfaceASIOTest, InShutdown) { - ASSERT_FALSE(net().inShutdown()); - net().shutdown(); - ASSERT(net().inShutdown()); -} - -TEST_F(NetworkInterfaceASIOTest, StartCommandReturnsNotOKIfShutdownHasStarted) { - net().shutdown(); - RemoteCommandRequest request; - ASSERT_NOT_OK( - net().startCommand(makeCallbackHandle(), request, [&](RemoteCommandResponse resp) {})); -} - -class MalformedMessageTest : public NetworkInterfaceASIOTest { -public: - using MessageHook = stdx::function<void(MsgData::View)>; - - void runMessageTest(ErrorCodes::Error code, bool loadBody, MessageHook hook) { - // Kick off our operation - RemoteCommandRequest request{testHost, "testDB", BSON("ping" << 1), BSONObj(), nullptr}; - auto deferred = startCommand(makeCallbackHandle(), request); - - // Wait for it to block waiting for a write - auto stream = streamFactory().blockUntilStreamExists(testHost); - ConnectEvent{stream}.skip(); - stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) -> RemoteCommandResponse { - return simulateIsMaster(request); - }); - - uint32_t messageId = 0; - - { - // Get the appropriate message id - WriteEvent write{stream}; - std::vector<uint8_t> messageData = stream->popWrite(); - messageId = - MsgData::ConstView(reinterpret_cast<const char*>(messageData.data())).getId(); - } - - // Build a mock reply message - auto replyBuilder = rpc::makeReplyBuilder(rpc::Protocol::kOpMsg); - replyBuilder->setCommandReply(BSON("hello!" << 1)); - replyBuilder->setMetadata(BSONObj()); - - auto message = replyBuilder->done(); - message.header().setResponseToMsgId(messageId); - - auto actualSize = message.header().getLen(); - - // Allow caller to mess with the Message - hook(message.header()); - - { - // Load the header - ReadEvent read{stream}; - auto headerBytes = reinterpret_cast<const uint8_t*>(message.header().view2ptr()); - stream->pushRead({headerBytes, headerBytes + sizeof(MSGHEADER::Value)}); - } - - if (loadBody) { - // Load the body if we need to - ReadEvent read{stream}; - auto dataBytes = reinterpret_cast<const uint8_t*>(message.buf()); - auto body = dataBytes; - std::advance(body, sizeof(MSGHEADER::Value)); - stream->pushRead({body, dataBytes + static_cast<std::size_t>(actualSize)}); - } - - auto& response = deferred.get(); - ASSERT_EQ(code, response.status); - ASSERT(response.elapsedMillis); - assertNumOps(0u, 0u, 1u, 0u); - } -}; - -TEST_F(MalformedMessageTest, messageHeaderWrongResponseTo) { - runMessageTest(ErrorCodes::ProtocolError, false, [](MsgData::View message) { - message.setResponseToMsgId(message.getResponseToMsgId() + 1); - }); -} - -TEST_F(MalformedMessageTest, messageHeaderlenZero) { - runMessageTest( - ErrorCodes::InvalidLength, false, [](MsgData::View message) { message.setLen(0); }); -} - -TEST_F(MalformedMessageTest, MessageHeaderLenTooSmall) { - runMessageTest(ErrorCodes::InvalidLength, false, [](MsgData::View message) { - message.setLen(6); - }); // min is 16 -} - -TEST_F(MalformedMessageTest, MessageHeaderLenTooLarge) { - runMessageTest(ErrorCodes::InvalidLength, false, [](MsgData::View message) { - message.setLen(48000001); - }); // max is 48000000 -} - -TEST_F(MalformedMessageTest, MessageHeaderLenNegative) { - runMessageTest( - ErrorCodes::InvalidLength, false, [](MsgData::View message) { message.setLen(-1); }); -} - -TEST_F(MalformedMessageTest, MessageLenSmallerThanActual) { - runMessageTest(ErrorCodes::InvalidBSON, true, [](MsgData::View message) { - message.setLen(message.getLen() - 10); - }); -} - -TEST_F(MalformedMessageTest, FailedToReadAllBytesForMessage) { - runMessageTest(ErrorCodes::InvalidLength, true, [](MsgData::View message) { - message.setLen(message.getLen() + 100); - }); -} - -TEST_F(MalformedMessageTest, UnsupportedOpcode) { - runMessageTest(ErrorCodes::UnsupportedFormat, true, [](MsgData::View message) { - message.setOperation(2222); - }); -} - -TEST_F(MalformedMessageTest, MismatchedOpcode) { - runMessageTest(ErrorCodes::UnsupportedFormat, true, [](MsgData::View message) { - message.setOperation(2006); - }); -} - -class NetworkInterfaceASIOConnectionHookTest : public NetworkInterfaceASIOTest { -public: - void setUp() override {} - - void start(std::unique_ptr<NetworkConnectionHook> hook) { - auto factory = stdx::make_unique<AsyncMockStreamFactory>(); - // keep unowned pointer, but pass ownership to NIA - _streamFactory = factory.get(); - NetworkInterfaceASIO::Options options{}; - options.streamFactory = std::move(factory); - options.networkConnectionHook = std::move(hook); - options.timerFactory = stdx::make_unique<AsyncTimerFactoryMock>(); - _net = stdx::make_unique<NetworkInterfaceASIO>(std::move(options)); - _net->startup(); - } -}; - -TEST_F(NetworkInterfaceASIOConnectionHookTest, InvalidIsMaster) { - auto validationFailedStatus = - Status(ErrorCodes::InterruptedDueToReplStateChange, "operation was interrupted"); - - start(makeTestHook( - [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) { - return Status(ErrorCodes::UnknownError, "unused"); - }, - [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> { - return {boost::none}; - }, - [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) { - return Status::OK(); - })); - - RemoteCommandRequest request{testHost, - "blah", - BSON("foo" - << "bar"), - nullptr}; - auto deferred = startCommand(makeCallbackHandle(), request); - - auto stream = streamFactory().blockUntilStreamExists(testHost); - - ConnectEvent{stream}.skip(); - - // simulate isMaster reply. - stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) -> RemoteCommandResponse { - RemoteCommandResponse response; - response.data = BSON("ok" << 0.0 << "errmsg" - << "operation was interrupted" - << "code" - << 11602); - return response; - }); - - // we should stop here. - auto& res = deferred.get(); - ASSERT_EQ(validationFailedStatus, res.status); - ASSERT(res.elapsedMillis); - - assertNumOps(0u, 0u, 1u, 0u); -} - -TEST_F(NetworkInterfaceASIOConnectionHookTest, ValidateHostInvalid) { - bool validateCalled = false; - bool hostCorrect = false; - bool isMasterReplyCorrect = false; - bool makeRequestCalled = false; - bool handleReplyCalled = false; - - auto validationFailedStatus = Status(ErrorCodes::AlreadyInitialized, "blahhhhh"); - - start(makeTestHook( - [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) { - validateCalled = true; - hostCorrect = (remoteHost == testHost); - isMasterReplyCorrect = (isMasterReply.data["TESTKEY"].str() == "TESTVALUE"); - return validationFailedStatus; - }, - [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> { - makeRequestCalled = true; - return {boost::none}; - }, - [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) { - handleReplyCalled = true; - return Status::OK(); - })); - - RemoteCommandRequest request{testHost, - "blah", - BSON("foo" - << "bar"), - nullptr}; - auto deferred = startCommand(makeCallbackHandle(), request); - - auto stream = streamFactory().blockUntilStreamExists(testHost); - - ConnectEvent{stream}.skip(); - - // simulate isMaster reply. - stream->simulateServer( - rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse { - RemoteCommandResponse response; - response.data = - BSON("minWireVersion" - << mongo::WireSpec::instance().incomingExternalClient.minWireVersion - << "maxWireVersion" - << mongo::WireSpec::instance().incomingExternalClient.maxWireVersion - << "TESTKEY" - << "TESTVALUE"); - return response; - }); - - // we should stop here. - auto& res = deferred.get(); - ASSERT_EQ(validationFailedStatus, res.status); - ASSERT(res.elapsedMillis); - ASSERT(validateCalled); - ASSERT(hostCorrect); - ASSERT(isMasterReplyCorrect); - - ASSERT(!makeRequestCalled); - ASSERT(!handleReplyCalled); - assertNumOps(0u, 0u, 1u, 0u); -} - -TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsError) { - bool makeRequestCalled = false; - bool handleReplyCalled = false; - - Status makeRequestError{ErrorCodes::DBPathInUse, "bloooh"}; - - start(makeTestHook( - [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status { - return Status::OK(); - }, - [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> { - makeRequestCalled = true; - return makeRequestError; - }, - [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) { - handleReplyCalled = true; - return Status::OK(); - })); - - RemoteCommandRequest request{testHost, - "blah", - BSON("foo" - << "bar"), - nullptr}; - auto deferred = startCommand(makeCallbackHandle(), request); - - auto stream = streamFactory().blockUntilStreamExists(testHost); - ConnectEvent{stream}.skip(); - - // simulate isMaster reply. - stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) -> RemoteCommandResponse { - return simulateIsMaster(request); - }); - - // We should stop here. - auto& res = deferred.get(); - - ASSERT_EQ(makeRequestError, res.status); - ASSERT(res.elapsedMillis); - ASSERT(makeRequestCalled); - ASSERT(!handleReplyCalled); - assertNumOps(0u, 0u, 1u, 0u); -} - -TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsNone) { - bool makeRequestCalled = false; - bool handleReplyCalled = false; - - start(makeTestHook( - [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status { - return Status::OK(); - }, - [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> { - makeRequestCalled = true; - return {boost::none}; - }, - [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) { - handleReplyCalled = true; - return Status::OK(); - })); - - auto commandRequest = BSON("foo" - << "bar"); - - auto commandReply = BSON("foo" - << "boo" - << "ok" - << 1.0); - - auto metadata = BSON("aaa" - << "bbb"); - - auto commandReplyWithMetadata = objConcat({commandReply, metadata}); - - RemoteCommandRequest request{testHost, "blah", commandRequest, nullptr}; - auto deferred = startCommand(makeCallbackHandle(), request); - - auto stream = streamFactory().blockUntilStreamExists(testHost); - ConnectEvent{stream}.skip(); - - // simulate isMaster reply. - stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) -> RemoteCommandResponse { - return simulateIsMaster(request); - }); - - // Simulate user command. - stream->simulateServer(rpc::Protocol::kOpMsg, - [&](RemoteCommandRequest request) -> RemoteCommandResponse { - ASSERT_BSONOBJ_EQ(commandRequest, request.cmdObj.removeField("$db")); - - RemoteCommandResponse response; - response.data = commandReply; - response.metadata = metadata; - return response; - }); - - // We should get back the reply now. - auto& result = deferred.get(); - - ASSERT(result.isOK()); - ASSERT_BSONOBJ_EQ(commandReplyWithMetadata, result.data); - ASSERT(result.elapsedMillis); - ASSERT_BSONOBJ_EQ(commandReplyWithMetadata, result.metadata); - assertNumOps(0u, 0u, 0u, 1u); -} - -TEST_F(NetworkInterfaceASIOConnectionHookTest, HandleReplyReturnsError) { - bool makeRequestCalled = false; - - bool handleReplyCalled = false; - bool handleReplyArgumentCorrect = false; - - BSONObj hookCommandRequest = BSON("1ddd" - << "fff"); - BSONObj hookRequestMetadata = BSON("wdwd" << 1212); - - BSONObj hookCommandReply = BSON("blah" - << "blah" - << "ok" - << 1.0); - - BSONObj hookUnifiedRequest = ([&] { - BSONObjBuilder bob; - bob.appendElements(hookCommandRequest); - bob.appendElements(hookRequestMetadata); - bob.append("$db", "foo"); - return bob.obj(); - }()); - - BSONObj hookReplyMetadata = BSON("1111" << 2222); - BSONObj hookCommandReplyWithMetadata = objConcat({hookCommandReply, hookReplyMetadata}); - - Status handleReplyError{ErrorCodes::AuthSchemaIncompatible, "daowdjkpowkdjpow"}; - - start(makeTestHook( - [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status { - return Status::OK(); - }, - [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> { - makeRequestCalled = true; - return {boost::make_optional<RemoteCommandRequest>( - {testHost, "foo", hookCommandRequest, hookRequestMetadata, nullptr})}; - - }, - [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) { - handleReplyCalled = true; - handleReplyArgumentCorrect = SimpleBSONObjComparator::kInstance.evaluate( - response.data == hookCommandReplyWithMetadata) && - SimpleBSONObjComparator::kInstance.evaluate(response.metadata == - hookCommandReplyWithMetadata); - return handleReplyError; - })); - - auto commandRequest = BSON("foo" - << "bar"); - RemoteCommandRequest request{testHost, "blah", commandRequest, nullptr}; - auto deferred = startCommand(makeCallbackHandle(), request); - - auto stream = streamFactory().blockUntilStreamExists(testHost); - ConnectEvent{stream}.skip(); - - // simulate isMaster reply. - stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) -> RemoteCommandResponse { - return simulateIsMaster(request); - }); - - // Simulate hook reply - stream->simulateServer(rpc::Protocol::kOpMsg, - [&](RemoteCommandRequest request) -> RemoteCommandResponse { - ASSERT_BSONOBJ_EQ(request.cmdObj, hookUnifiedRequest); - ASSERT_BSONOBJ_EQ(request.metadata, BSONObj()); - - RemoteCommandResponse response; - response.data = hookCommandReply; - response.metadata = hookReplyMetadata; - return response; - }); - - auto& result = deferred.get(); - ASSERT_EQ(handleReplyError, result.status); - ASSERT(result.elapsedMillis); - ASSERT(makeRequestCalled); - ASSERT(handleReplyCalled); - ASSERT(handleReplyArgumentCorrect); - assertNumOps(0u, 0u, 1u, 0u); -} - -TEST_F(NetworkInterfaceASIOTest, SetAlarm) { - // set a first alarm, to execute after "expiration" - Date_t expiration = net().now() + Milliseconds(100); - - Deferred<Date_t> deferred; - ASSERT_OK(net().setAlarm( - expiration, [this, expiration, deferred]() mutable { deferred.emplace(net().now()); })); - - // Get our timer factory - auto& factory = timerFactory(); - - // force the alarm to fire - factory.fastForward(Milliseconds(5000)); - - // assert that it executed after "expiration" - auto& result = deferred.get(); - ASSERT(result >= expiration); - - expiration = net().now() + Milliseconds(99999999); - Deferred<bool> deferred2; - ASSERT_OK(net().setAlarm(expiration, [this, deferred2]() mutable { deferred2.emplace(true); })); - - net().shutdown(); - ASSERT(!deferred2.hasCompleted()); -} - -TEST_F(NetworkInterfaceASIOTest, SetAlarmReturnsNotOKIfShutdownHasStarted) { - net().shutdown(); - ASSERT_NOT_OK(net().setAlarm(net().now() + Milliseconds(100), [] {})); -} - -TEST_F(NetworkInterfaceASIOTest, IsMasterRequestContainsOutgoingWireVersionInternalClientInfo) { - WireSpec::instance().isInternalClient = true; - - RemoteCommandRequest request{testHost, "testDB", BSON("ping" << 1), BSONObj(), nullptr}; - auto deferred = startCommand(makeCallbackHandle(), request); - auto stream = streamFactory().blockUntilStreamExists(testHost); - ConnectEvent{stream}.skip(); - - // Verify that the isMaster reply has the expected internalClient data. - stream->simulateServer( - rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse { - auto internalClientElem = request.cmdObj["internalClient"]; - ASSERT_EQ(internalClientElem.type(), BSONType::Object); - auto minWireVersionElem = internalClientElem.Obj()["minWireVersion"]; - auto maxWireVersionElem = internalClientElem.Obj()["maxWireVersion"]; - ASSERT_EQ(minWireVersionElem.type(), BSONType::NumberInt); - ASSERT_EQ(maxWireVersionElem.type(), BSONType::NumberInt); - ASSERT_EQ(minWireVersionElem.numberInt(), WireSpec::instance().outgoing.minWireVersion); - ASSERT_EQ(maxWireVersionElem.numberInt(), WireSpec::instance().outgoing.maxWireVersion); - return simulateIsMaster(request); - }); - - // Simulate ping reply. - stream->simulateServer(rpc::Protocol::kOpMsg, - [&](RemoteCommandRequest request) -> RemoteCommandResponse { - RemoteCommandResponse response; - response.data = BSON("ok" << 1); - return response; - }); - - // Verify that the ping op is counted as a success. - auto& res = deferred.get(); - ASSERT(res.elapsedMillis); - assertNumOps(0u, 0u, 0u, 1u); -} - -TEST_F(NetworkInterfaceASIOTest, IsMasterRequestMissingInternalClientInfoWhenNotInternalClient) { - WireSpec::instance().isInternalClient = false; - - RemoteCommandRequest request{testHost, "testDB", BSON("ping" << 1), BSONObj(), nullptr}; - auto deferred = startCommand(makeCallbackHandle(), request); - auto stream = streamFactory().blockUntilStreamExists(testHost); - ConnectEvent{stream}.skip(); - - // Verify that the isMaster reply has the expected internalClient data. - stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) -> RemoteCommandResponse { - ASSERT_FALSE(request.cmdObj["internalClient"]); - return simulateIsMaster(request); - }); - - // Simulate ping reply. - stream->simulateServer(rpc::Protocol::kOpMsg, - [&](RemoteCommandRequest request) -> RemoteCommandResponse { - RemoteCommandResponse response; - response.data = BSON("ok" << 1); - return response; - }); - - // Verify that the ping op is counted as a success. - auto& res = deferred.get(); - ASSERT(res.elapsedMillis); - assertNumOps(0u, 0u, 0u, 1u); -} - -class NetworkInterfaceASIOMetadataTest : public NetworkInterfaceASIOTest { -protected: - void setUp() override {} - - void start(std::unique_ptr<rpc::EgressMetadataHook> metadataHook) { - auto factory = stdx::make_unique<AsyncMockStreamFactory>(); - _streamFactory = factory.get(); - NetworkInterfaceASIO::Options options{}; - options.streamFactory = std::move(factory); - options.metadataHook = std::move(metadataHook); - options.timerFactory = stdx::make_unique<AsyncTimerFactoryMock>(); - _net = stdx::make_unique<NetworkInterfaceASIO>(std::move(options)); - _net->startup(); - } -}; - -class TestMetadataHook : public rpc::EgressMetadataHook { -public: - TestMetadataHook(bool* wroteRequestMetadata, bool* gotReplyMetadata) - : _wroteRequestMetadata(wroteRequestMetadata), _gotReplyMetadata(gotReplyMetadata) {} - - Status writeRequestMetadata(OperationContext* opCtx, BSONObjBuilder* metadataBob) override { - metadataBob->append("foo", "bar"); - *_wroteRequestMetadata = true; - return Status::OK(); - } - - Status readReplyMetadata(OperationContext* opCtx, - StringData replySource, - const BSONObj& metadataObj) override { - *_gotReplyMetadata = (metadataObj["baz"].str() == "garply"); - return Status::OK(); - } - -private: - bool* _wroteRequestMetadata; - bool* _gotReplyMetadata; -}; - -TEST_F(NetworkInterfaceASIOMetadataTest, Metadata) { - bool wroteRequestMetadata = false; - bool gotReplyMetadata = false; - auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>(); - hookList->addHook( - stdx::make_unique<TestMetadataHook>(&wroteRequestMetadata, &gotReplyMetadata)); - start(std::move(hookList)); - - RemoteCommandRequest request{testHost, "blah", BSON("ping" << 1), nullptr}; - auto deferred = startCommand(makeCallbackHandle(), request); - - auto stream = streamFactory().blockUntilStreamExists(testHost); - ConnectEvent{stream}.skip(); - - // simulate isMaster reply. - stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) -> RemoteCommandResponse { - return simulateIsMaster(request); - }); - - // Simulate hook reply - stream->simulateServer(rpc::Protocol::kOpMsg, - [&](RemoteCommandRequest request) -> RemoteCommandResponse { - ASSERT_EQ("bar", request.cmdObj["foo"].str()); - RemoteCommandResponse response; - response.data = BSON("ok" << 1); - response.metadata = BSON("baz" - << "garply"); - return response; - }); - - auto& res = deferred.get(); - ASSERT(res.elapsedMillis); - ASSERT(wroteRequestMetadata); - ASSERT(gotReplyMetadata); - assertNumOps(0u, 0u, 0u, 1u); -} - -} // namespace -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/network_interface_asio_test_utils.h b/src/mongo/executor/network_interface_asio_test_utils.h deleted file mode 100644 index a95da3dfc5f..00000000000 --- a/src/mongo/executor/network_interface_asio_test_utils.h +++ /dev/null @@ -1,188 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ -#pragma once - -#include <memory> - -#include <boost/optional.hpp> -#include <utility> -#include <vector> - -#include "mongo/executor/task_executor.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/future.h" -#include "mongo/stdx/mutex.h" -#include "mongo/util/concurrency/thread_pool_interface.h" - -namespace mongo { -namespace executor { - -/** - * A mock class mimicking TaskExecutor::CallbackState, does nothing. - */ -class MockCallbackState final : public TaskExecutor::CallbackState { -public: - MockCallbackState() = default; - void cancel() override {} - void waitForCompletion() override {} - bool isCanceled() const override { - return false; - } -}; - -inline TaskExecutor::CallbackHandle makeCallbackHandle() { - return TaskExecutor::CallbackHandle(std::make_shared<MockCallbackState>()); -} - -/** - * Simple future-like utility for waiting for the result of startCommand. - */ -template <typename T> -class Deferred { -public: - template <typename... Args> - void emplace(Args&&... args) { - _emplace(_state.get(), std::forward<Args>(args)...); - } - - T& get() { - return _get(_state.get()); - } - - bool hasCompleted() { - stdx::unique_lock<stdx::mutex> lk(_state->mtx); - return _state->thing.is_initialized(); - } - - template <typename Continuation> - auto then(ThreadPoolInterface* pool, Continuation&& continuation) - -> Deferred<decltype(continuation(std::declval<Deferred<T>>().get()))> { - // XXX: The ugliness of the above type signature is because you can't refer to 'this' in - // a template parameter, at least on g++-4.8.2. - auto state = _state; - Deferred<decltype(continuation(get()))> thenDeferred; - - pool->schedule([this, thenDeferred, continuation, state]() mutable { - thenDeferred.emplace(continuation(_get(state.get()))); - }); - return thenDeferred; - } - -private: - struct State { - stdx::mutex mtx; - stdx::condition_variable cv; - boost::optional<T> thing; - }; - - template <typename... Args> - void _emplace(State* state, Args&&... args) { - stdx::lock_guard<stdx::mutex> lk(_state->mtx); - invariant(!state->thing.is_initialized()); - state->thing.emplace(std::forward<T>(args)...); - state->cv.notify_one(); - } - - T& _get(State* state) { - stdx::unique_lock<stdx::mutex> lk(state->mtx); - state->cv.wait(lk, [state] { return state->thing.is_initialized(); }); - return *state->thing; - } - - std::shared_ptr<State> _state = std::make_shared<State>(); -}; - -class CountdownLatch { -public: - explicit CountdownLatch(uint32_t count) : _count(count) {} - - void countDown() { - stdx::lock_guard<stdx::mutex> lk(_mtx); - if (_count == 0) { - return; - } - - --_count; - if (_count == 0) { - _cv.notify_all(); - } - } - - void await() { - stdx::unique_lock<stdx::mutex> lk(_mtx); - _cv.wait(lk, [&] { return _count == 0; }); - } - -private: - stdx::mutex _mtx; - stdx::condition_variable _cv; - size_t _count; -}; - -namespace helpers { - -template <typename T> -static Deferred<std::vector<T>> collect(std::vector<Deferred<T>>& ds, ThreadPoolInterface* pool) { - Deferred<std::vector<T>> out; - struct CollectState { - // hack to avoid requiring U to be default constructible. - std::vector<boost::optional<T>> mem{}; - std::size_t numFinished = 0; - std::size_t goal = 0; - stdx::mutex mtx; - }; - - auto collectState = std::make_shared<CollectState>(); - collectState->goal = ds.size(); - collectState->mem.resize(collectState->goal); - - for (std::size_t i = 0; i < ds.size(); ++i) { - ds[i].then(pool, [collectState, out, i](T res) mutable { - // The bool return is unused. - stdx::lock_guard<stdx::mutex> lk(collectState->mtx); - collectState->mem[i] = std::move(res); - - // If we're done. - if (collectState->goal == ++collectState->numFinished) { - std::vector<T> outInitialized; - outInitialized.reserve(collectState->mem.size()); - for (auto&& mem_entry : collectState->mem) { - outInitialized.emplace_back(std::move(*mem_entry)); - } - out.emplace(outInitialized); - } - return true; - }); - } - return out; -} - -} // namespace helpers - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/network_interface_factory.cpp b/src/mongo/executor/network_interface_factory.cpp index 3890e8bc64f..0f4f8526e64 100644 --- a/src/mongo/executor/network_interface_factory.cpp +++ b/src/mongo/executor/network_interface_factory.cpp @@ -34,16 +34,11 @@ #include "mongo/base/status.h" #include "mongo/config.h" #include "mongo/db/server_parameters.h" -#include "mongo/executor/async_secure_stream_factory.h" -#include "mongo/executor/async_stream_factory.h" -#include "mongo/executor/async_stream_interface.h" -#include "mongo/executor/async_timer_asio.h" #include "mongo/executor/connection_pool.h" #include "mongo/executor/network_connection_hook.h" #include "mongo/executor/network_interface_tl.h" #include "mongo/rpc/metadata/metadata_hook.h" #include "mongo/stdx/memory.h" -#include "mongo/util/net/ssl_manager.h" namespace mongo { namespace executor { diff --git a/src/mongo/executor/network_interface_factory.h b/src/mongo/executor/network_interface_factory.h index 7aa8a8cdce5..712cf8d812e 100644 --- a/src/mongo/executor/network_interface_factory.h +++ b/src/mongo/executor/network_interface_factory.h @@ -32,18 +32,13 @@ #include <string> #include "mongo/executor/connection_pool.h" +#include "mongo/executor/network_connection_hook.h" #include "mongo/executor/network_interface.h" +#include "mongo/rpc/metadata/metadata_hook.h" namespace mongo { - -namespace rpc { -class EgressMetadataHook; -} // namespace rpc - namespace executor { -class NetworkConnectionHook; - /** * Returns a new NetworkInterface. */ diff --git a/src/mongo/executor/network_interface_asio_integration_fixture.cpp b/src/mongo/executor/network_interface_integration_fixture.cpp index 0a9d8cff01b..6a593374fb6 100644 --- a/src/mongo/executor/network_interface_asio_integration_fixture.cpp +++ b/src/mongo/executor/network_interface_integration_fixture.cpp @@ -31,11 +31,8 @@ #include "mongo/platform/basic.h" #include "mongo/client/connection_string.h" -#include "mongo/executor/async_stream_factory.h" -#include "mongo/executor/async_stream_interface.h" -#include "mongo/executor/async_timer_asio.h" -#include "mongo/executor/network_interface_asio.h" -#include "mongo/executor/network_interface_asio_integration_fixture.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/network_interface_integration_fixture.h" #include "mongo/executor/remote_command_response.h" #include "mongo/executor/task_executor.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -47,62 +44,57 @@ namespace mongo { namespace executor { -void NetworkInterfaceASIOIntegrationFixture::startNet(NetworkInterfaceASIO::Options options) { - options.streamFactory = stdx::make_unique<AsyncStreamFactory>(); - options.timerFactory = stdx::make_unique<AsyncTimerFactoryASIO>(); + +void NetworkInterfaceIntegrationFixture::startNet( + std::unique_ptr<NetworkConnectionHook> connectHook) { + ConnectionPool::Options options; #ifdef _WIN32 // Connections won't queue on widnows, so attempting to open too many connections // concurrently will result in refused connections and test failure. - options.connectionPoolOptions.maxConnections = 16u; + options.maxConnections = 16u; #else - options.connectionPoolOptions.maxConnections = 256u; + options.maxConnections = 256u; #endif - _net = stdx::make_unique<NetworkInterfaceASIO>(std::move(options)); + _net = makeNetworkInterface( + "NetworkInterfaceIntegrationFixture", std::move(connectHook), nullptr, std::move(options)); + _net->startup(); } -void NetworkInterfaceASIOIntegrationFixture::tearDown() { +void NetworkInterfaceIntegrationFixture::tearDown() { if (!_net->inShutdown()) { _net->shutdown(); } } -NetworkInterfaceASIO& NetworkInterfaceASIOIntegrationFixture::net() { +NetworkInterface& NetworkInterfaceIntegrationFixture::net() { return *_net; } -ConnectionString NetworkInterfaceASIOIntegrationFixture::fixture() { +ConnectionString NetworkInterfaceIntegrationFixture::fixture() { return unittest::getFixtureConnectionString(); } -void NetworkInterfaceASIOIntegrationFixture::setRandomNumberGenerator(PseudoRandom* generator) { +void NetworkInterfaceIntegrationFixture::setRandomNumberGenerator(PseudoRandom* generator) { _rng = generator; } -PseudoRandom* NetworkInterfaceASIOIntegrationFixture::getRandomNumberGenerator() { +PseudoRandom* NetworkInterfaceIntegrationFixture::getRandomNumberGenerator() { return _rng; } -void NetworkInterfaceASIOIntegrationFixture::startCommand( - const TaskExecutor::CallbackHandle& cbHandle, - RemoteCommandRequest& request, - StartCommandCB onFinish) { +void NetworkInterfaceIntegrationFixture::startCommand(const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandRequest& request, + StartCommandCB onFinish) { net().startCommand(cbHandle, request, onFinish).transitional_ignore(); } -Deferred<RemoteCommandResponse> NetworkInterfaceASIOIntegrationFixture::runCommand( - const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request) { - Deferred<RemoteCommandResponse> deferred; - net() - .startCommand( - cbHandle, - request, - [deferred](RemoteCommandResponse resp) mutable { deferred.emplace(std::move(resp)); }) - .transitional_ignore(); - return deferred; +Future<RemoteCommandResponse> NetworkInterfaceIntegrationFixture::runCommand( + const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest request) { + return net().startCommand(cbHandle, request); } -RemoteCommandResponse NetworkInterfaceASIOIntegrationFixture::runCommandSync( +RemoteCommandResponse NetworkInterfaceIntegrationFixture::runCommandSync( RemoteCommandRequest& request) { auto deferred = runCommand(makeCallbackHandle(), request); auto& res = deferred.get(); @@ -114,9 +106,9 @@ RemoteCommandResponse NetworkInterfaceASIOIntegrationFixture::runCommandSync( return res; } -void NetworkInterfaceASIOIntegrationFixture::assertCommandOK(StringData db, - const BSONObj& cmd, - Milliseconds timeoutMillis) { +void NetworkInterfaceIntegrationFixture::assertCommandOK(StringData db, + const BSONObj& cmd, + Milliseconds timeoutMillis) { RemoteCommandRequest request{ fixture().getServers()[0], db.toString(), cmd, BSONObj(), nullptr, timeoutMillis}; auto res = runCommandSync(request); @@ -125,16 +117,20 @@ void NetworkInterfaceASIOIntegrationFixture::assertCommandOK(StringData db, ASSERT(!res.data["writeErrors"]); } -void NetworkInterfaceASIOIntegrationFixture::assertCommandFailsOnClient( - StringData db, const BSONObj& cmd, ErrorCodes::Error reason, Milliseconds timeoutMillis) { +void NetworkInterfaceIntegrationFixture::assertCommandFailsOnClient(StringData db, + const BSONObj& cmd, + ErrorCodes::Error reason, + Milliseconds timeoutMillis) { RemoteCommandRequest request{ fixture().getServers()[0], db.toString(), cmd, BSONObj(), nullptr, timeoutMillis}; auto res = runCommandSync(request); ASSERT_EQ(reason, res.status.code()); } -void NetworkInterfaceASIOIntegrationFixture::assertCommandFailsOnServer( - StringData db, const BSONObj& cmd, ErrorCodes::Error reason, Milliseconds timeoutMillis) { +void NetworkInterfaceIntegrationFixture::assertCommandFailsOnServer(StringData db, + const BSONObj& cmd, + ErrorCodes::Error reason, + Milliseconds timeoutMillis) { RemoteCommandRequest request{ fixture().getServers()[0], db.toString(), cmd, BSONObj(), nullptr, timeoutMillis}; auto res = runCommandSync(request); @@ -143,10 +139,10 @@ void NetworkInterfaceASIOIntegrationFixture::assertCommandFailsOnServer( ASSERT_EQ(reason, serverStatus); } -void NetworkInterfaceASIOIntegrationFixture::assertWriteError(StringData db, - const BSONObj& cmd, - ErrorCodes::Error reason, - Milliseconds timeoutMillis) { +void NetworkInterfaceIntegrationFixture::assertWriteError(StringData db, + const BSONObj& cmd, + ErrorCodes::Error reason, + Milliseconds timeoutMillis) { RemoteCommandRequest request{ fixture().getServers()[0], db.toString(), cmd, BSONObj(), nullptr, timeoutMillis}; auto res = runCommandSync(request); @@ -158,5 +154,6 @@ void NetworkInterfaceASIOIntegrationFixture::assertWriteError(StringData db, firstWriteError.getStringField("errmsg")); ASSERT_EQ(reason, writeErrorStatus); } + } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/network_interface_asio_integration_fixture.h b/src/mongo/executor/network_interface_integration_fixture.h index f6207e7a12c..a83a5d06f31 100644 --- a/src/mongo/executor/network_interface_asio_integration_fixture.h +++ b/src/mongo/executor/network_interface_integration_fixture.h @@ -29,9 +29,11 @@ #include "mongo/unittest/unittest.h" -#include "mongo/executor/network_interface_asio.h" -#include "mongo/executor/network_interface_asio_test_utils.h" +#include "mongo/client/connection_string.h" +#include "mongo/executor/network_connection_hook.h" +#include "mongo/executor/network_interface.h" #include "mongo/executor/task_executor.h" +#include "mongo/util/future.h" namespace mongo { @@ -39,14 +41,31 @@ class PseudoRandom; namespace executor { +/** + * A mock class mimicking TaskExecutor::CallbackState, does nothing. + */ +class MockCallbackState final : public TaskExecutor::CallbackState { +public: + MockCallbackState() = default; + void cancel() override {} + void waitForCompletion() override {} + bool isCanceled() const override { + return false; + } +}; + +inline TaskExecutor::CallbackHandle makeCallbackHandle() { + return TaskExecutor::CallbackHandle(std::make_shared<MockCallbackState>()); +} + using StartCommandCB = stdx::function<void(const RemoteCommandResponse&)>; -class NetworkInterfaceASIOIntegrationFixture : public mongo::unittest::Test { +class NetworkInterfaceIntegrationFixture : public mongo::unittest::Test { public: - void startNet(NetworkInterfaceASIO::Options options = NetworkInterfaceASIO::Options()); + void startNet(std::unique_ptr<NetworkConnectionHook> connectHook = nullptr); void tearDown() override; - NetworkInterfaceASIO& net(); + NetworkInterface& net(); ConnectionString fixture(); @@ -58,8 +77,8 @@ public: RemoteCommandRequest& request, StartCommandCB onFinish); - Deferred<RemoteCommandResponse> runCommand(const TaskExecutor::CallbackHandle& cbHandle, - RemoteCommandRequest& request); + Future<RemoteCommandResponse> runCommand(const TaskExecutor::CallbackHandle& cbHandle, + RemoteCommandRequest request); RemoteCommandResponse runCommandSync(RemoteCommandRequest& request); @@ -82,7 +101,7 @@ public: Milliseconds timeoutMillis = Minutes(5)); private: - std::unique_ptr<NetworkInterfaceASIO> _net; + std::unique_ptr<NetworkInterface> _net; PseudoRandom* _rng = nullptr; }; } // namespace executor diff --git a/src/mongo/executor/network_interface_integration_test.cpp b/src/mongo/executor/network_interface_integration_test.cpp new file mode 100644 index 00000000000..b5a9d08edeb --- /dev/null +++ b/src/mongo/executor/network_interface_integration_test.cpp @@ -0,0 +1,373 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include "mongo/platform/basic.h" + +#include <algorithm> +#include <exception> + +#include "mongo/base/status_with.h" +#include "mongo/client/connection_string.h" +#include "mongo/db/commands/test_commands_enabled.h" +#include "mongo/db/wire_version.h" +#include "mongo/executor/network_connection_hook.h" +#include "mongo/executor/network_interface_integration_fixture.h" +#include "mongo/executor/test_network_connection_hook.h" +#include "mongo/rpc/factory.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/message.h" +#include "mongo/stdx/future.h" +#include "mongo/stdx/memory.h" +#include "mongo/unittest/integration_test.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { +namespace executor { +namespace { + +bool pingCommandMissing(const RemoteCommandResponse& result) { + if (result.isOK()) { + // On mongos, there is no sleep command, so just check that the command failed with + // a "Command not found" error code + ASSERT_EQ(result.data["ok"].Double(), 0.0); + ASSERT_EQ(result.data["code"].Int(), 59); + return true; + } + + return false; +} + +TEST_F(NetworkInterfaceIntegrationFixture, Ping) { + startNet(); + assertCommandOK("admin", BSON("ping" << 1)); +} + +// Hook that intentionally never finishes +class HangingHook : public executor::NetworkConnectionHook { + Status validateHost(const HostAndPort&, + const BSONObj& request, + const RemoteCommandResponse&) final { + return Status::OK(); + } + + StatusWith<boost::optional<RemoteCommandRequest>> makeRequest( + const HostAndPort& remoteHost) final { + return {boost::make_optional(RemoteCommandRequest(remoteHost, + "admin", + BSON("sleep" << 1 << "lock" + << "none" + << "secs" + << 100000000), + BSONObj(), + nullptr))}; + } + + Status handleReply(const HostAndPort& remoteHost, RemoteCommandResponse&& response) final { + if (pingCommandMissing(response)) { + return {ErrorCodes::NetworkInterfaceExceededTimeLimit, + "No ping command. Simulating timeout"}; + } + + MONGO_UNREACHABLE; + } +}; + + +// Test that we time out a command if the connection hook hangs. +TEST_F(NetworkInterfaceIntegrationFixture, HookHangs) { + startNet(stdx::make_unique<HangingHook>()); + + assertCommandFailsOnClient( + "admin", BSON("ping" << 1), ErrorCodes::NetworkInterfaceExceededTimeLimit, Seconds(1)); +} + +using ResponseStatus = TaskExecutor::ResponseStatus; + +BSONObj objConcat(std::initializer_list<BSONObj> objs) { + BSONObjBuilder bob; + + for (const auto& obj : objs) { + bob.appendElements(obj); + } + + return bob.obj(); +} + +class NetworkInterfaceTest : public NetworkInterfaceIntegrationFixture { +public: + void assertNumOps(uint64_t canceled, uint64_t timedOut, uint64_t failed, uint64_t succeeded) { + auto counters = net().getCounters(); + ASSERT_EQ(canceled, counters.canceled); + ASSERT_EQ(timedOut, counters.timedOut); + ASSERT_EQ(failed, counters.failed); + ASSERT_EQ(succeeded, counters.succeeded); + } + + void setUp() override { + setTestCommandsEnabled(true); + startNet(std::make_unique<WaitForIsMasterHook>(this)); + } + + RemoteCommandRequest makeTestCommand(boost::optional<Milliseconds> timeout = boost::none, + BSONObj cmd = BSON("echo" << 1 << "foo" + << "bar")) { + auto cs = fixture(); + return RemoteCommandRequest(cs.getServers().front(), + "admin", + std::move(cmd), + BSONObj(), + nullptr, + timeout ? *timeout : RemoteCommandRequest::kNoTimeout); + } + + struct IsMasterData { + BSONObj request; + RemoteCommandResponse response; + }; + IsMasterData waitForIsMaster() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _isMasterCond.wait(lk, [this] { return _isMasterResult != boost::none; }); + + return std::move(*_isMasterResult); + } + + bool hasIsMaster() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _isMasterResult != boost::none; + } + +private: + class WaitForIsMasterHook : public NetworkConnectionHook { + public: + explicit WaitForIsMasterHook(NetworkInterfaceTest* parent) : _parent(parent) {} + + Status validateHost(const HostAndPort& host, + const BSONObj& request, + const RemoteCommandResponse& isMasterReply) override { + stdx::lock_guard<stdx::mutex> lk(_parent->_mutex); + _parent->_isMasterResult = {request, isMasterReply}; + _parent->_isMasterCond.notify_all(); + return Status::OK(); + } + + StatusWith<boost::optional<RemoteCommandRequest>> makeRequest(const HostAndPort&) override { + return {boost::none}; + } + + Status handleReply(const HostAndPort&, RemoteCommandResponse&&) override { + return Status::OK(); + } + + private: + NetworkInterfaceTest* _parent; + }; + + stdx::mutex _mutex; + stdx::condition_variable _isMasterCond; + boost::optional<IsMasterData> _isMasterResult; +}; + +TEST_F(NetworkInterfaceTest, CancelMissingOperation) { + // This is just a sanity check, this action should have no effect. + net().cancelCommand(makeCallbackHandle()); + assertNumOps(0u, 0u, 0u, 0u); +} + +TEST_F(NetworkInterfaceTest, CancelOperation) { + auto cbh = makeCallbackHandle(); + + // Kick off our operation + FailPointEnableBlock fpb("networkInterfaceDiscardCommandsAfterAcquireConn"); + + auto deferred = runCommand(cbh, makeTestCommand()); + + waitForIsMaster(); + + net().cancelCommand(cbh); + + // Wait for op to complete, assert that it was canceled. + auto result = deferred.get(); + ASSERT_EQ(ErrorCodes::CallbackCanceled, result.status); + ASSERT(result.elapsedMillis); + assertNumOps(1u, 0u, 0u, 0u); +} + +TEST_F(NetworkInterfaceTest, ImmediateCancel) { + auto cbh = makeCallbackHandle(); + + // Kick off our operation + + FailPointEnableBlock fpb("networkInterfaceDiscardCommandsBeforeAcquireConn"); + + auto deferred = runCommand(cbh, makeTestCommand()); + + net().cancelCommand(cbh); + + ASSERT_FALSE(hasIsMaster()); + + // Wait for op to complete, assert that it was canceled. + auto result = deferred.get(); + ASSERT_EQ(ErrorCodes::CallbackCanceled, result.status); + ASSERT(result.elapsedMillis); + assertNumOps(1u, 0u, 0u, 0u); +} + +TEST_F(NetworkInterfaceTest, LateCancel) { + auto cbh = makeCallbackHandle(); + + auto deferred = runCommand(cbh, makeTestCommand()); + + // Wait for op to complete, assert that it was canceled. + auto result = deferred.get(); + net().cancelCommand(cbh); + + ASSERT(result.isOK()); + ASSERT(result.elapsedMillis); + assertNumOps(0u, 0u, 0u, 1u); +} + +TEST_F(NetworkInterfaceTest, AsyncOpTimeout) { + // Kick off operation + auto cb = makeCallbackHandle(); + auto request = makeTestCommand(Milliseconds{1000}); + request.cmdObj = BSON("sleep" << 1 << "lock" + << "none" + << "secs" + << 1000000000); + auto deferred = runCommand(cb, request); + + waitForIsMaster(); + + auto result = deferred.get(); + + // mongos doesn't implement the ping command, so ignore the response there, otherwise + // check that we've timed out. + if (!pingCommandMissing(result)) { + ASSERT_EQ(ErrorCodes::NetworkInterfaceExceededTimeLimit, result.status); + ASSERT(result.elapsedMillis); + assertNumOps(0u, 1u, 0u, 0u); + } +} + +TEST_F(NetworkInterfaceTest, StartCommand) { + auto commandRequest = BSON("echo" << 1 << "boop" + << "bop"); + + // This opmsg request expect the following reply, which is generated below + // { echo: { echo: 1, boop: "bop", $db: "admin" }, ok: 1.0 } + auto expectedCommandReply = [&] { + BSONObjBuilder echoed; + echoed.appendElements(commandRequest); + echoed << "$db" + << "admin"; + return echoed.obj(); + }(); + auto request = makeTestCommand(boost::none, commandRequest); + + auto deferred = runCommand(makeCallbackHandle(), std::move(request)); + + auto res = deferred.get(); + + ASSERT(res.elapsedMillis); + uassertStatusOK(res.status); + ASSERT_BSONOBJ_EQ(res.data.getObjectField("echo"), expectedCommandReply); + ASSERT_EQ(res.data.getIntField("ok"), 1); + assertNumOps(0u, 0u, 0u, 1u); +} + +TEST_F(NetworkInterfaceTest, SetAlarm) { + // set a first alarm, to execute after "expiration" + Date_t expiration = net().now() + Milliseconds(100); + auto makeTimerFuture = [&] { + Promise<Date_t> promise; + auto future = promise.getFuture(); + + return std::make_pair( + [ this, promise = promise.share() ]() mutable { promise.emplaceValue(net().now()); }, + std::move(future)); + }; + + auto futurePair = makeTimerFuture(); + ASSERT_OK(net().setAlarm(expiration, std::move(futurePair.first))); + + // assert that it executed after "expiration" + auto& result = futurePair.second.get(); + ASSERT(result >= expiration); + + expiration = net().now() + Milliseconds(99999999); + auto futurePair2 = makeTimerFuture(); + ASSERT_OK(net().setAlarm(expiration, std::move(futurePair2.first))); + + net().shutdown(); + ASSERT_TRUE(!futurePair2.second.isReady()); +} + +TEST_F(NetworkInterfaceTest, IsMasterRequestContainsOutgoingWireVersionInternalClientInfo) { + WireSpec::instance().isInternalClient = true; + + auto deferred = runCommand(makeCallbackHandle(), makeTestCommand()); + auto isMasterHandshake = waitForIsMaster(); + + // Verify that the isMaster reply has the expected internalClient data. + auto internalClientElem = isMasterHandshake.request["internalClient"]; + ASSERT_EQ(internalClientElem.type(), BSONType::Object); + auto minWireVersionElem = internalClientElem.Obj()["minWireVersion"]; + auto maxWireVersionElem = internalClientElem.Obj()["maxWireVersion"]; + ASSERT_EQ(minWireVersionElem.type(), BSONType::NumberInt); + ASSERT_EQ(maxWireVersionElem.type(), BSONType::NumberInt); + ASSERT_EQ(minWireVersionElem.numberInt(), WireSpec::instance().outgoing.minWireVersion); + ASSERT_EQ(maxWireVersionElem.numberInt(), WireSpec::instance().outgoing.maxWireVersion); + + // Verify that the ping op is counted as a success. + auto res = deferred.get(); + ASSERT(res.elapsedMillis); + assertNumOps(0u, 0u, 0u, 1u); +} + +TEST_F(NetworkInterfaceTest, IsMasterRequestMissingInternalClientInfoWhenNotInternalClient) { + WireSpec::instance().isInternalClient = false; + + auto deferred = runCommand(makeCallbackHandle(), makeTestCommand()); + auto isMasterHandshake = waitForIsMaster(); + + // Verify that the isMaster reply has the expected internalClient data. + ASSERT_FALSE(isMasterHandshake.request["internalClient"]); + // Verify that the ping op is counted as a success. + auto res = deferred.get(); + ASSERT(res.elapsedMillis); + assertNumOps(0u, 0u, 0u, 1u); +} + +} // namespace +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index 674dd8bb116..42cda1b4005 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -481,7 +481,7 @@ void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort ? handshakeReplyIter->second : RemoteCommandResponse(BSONObj(), BSONObj(), Milliseconds(0)); - auto valid = _hook->validateHost(target, handshakeReply); + auto valid = _hook->validateHost(target, op.getRequest().cmdObj, handshakeReply); if (!valid.isOK()) { op.setResponse(_now_inlock(), valid); op.finishResponse(); diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index 76a9a6462a8..fdb5b58d50c 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -84,6 +84,9 @@ public: virtual ~NetworkInterfaceMock(); virtual void appendConnectionStats(ConnectionPoolStats* stats) const; virtual std::string getDiagnosticString(); + Counters getCounters() const override { + return Counters(); + } /** * Logs the contents of the queues for diagnostics. diff --git a/src/mongo/executor/network_interface_mock_test.cpp b/src/mongo/executor/network_interface_mock_test.cpp index 59c33e1fc82..a09ed4aeac0 100644 --- a/src/mongo/executor/network_interface_mock_test.cpp +++ b/src/mongo/executor/network_interface_mock_test.cpp @@ -133,7 +133,9 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHook) { // Since the contract of these methods is that they do not throw, we run the ASSERTs in // the test scope. net().setConnectionHook(makeTestHook( - [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) { + [&](const HostAndPort& remoteHost, + const BSONObj&, + const RemoteCommandResponse& isMasterReply) { validateCalled = true; hostCorrectForValidate = (remoteHost == testHost()); replyCorrectForValidate = SimpleBSONObjComparator::kInstance.evaluate( @@ -219,7 +221,9 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHook) { TEST_F(NetworkInterfaceMockTest, ConnectionHookFailedValidation) { net().setConnectionHook(makeTestHook( - [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status { + [&](const HostAndPort& remoteHost, + const BSONObj&, + const RemoteCommandResponse& isMasterReply) -> Status { // We just need some obscure non-OK code. return {ErrorCodes::ConflictingOperationInProgress, "blah"}; }, @@ -260,9 +264,9 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookFailedValidation) { TEST_F(NetworkInterfaceMockTest, ConnectionHookNoRequest) { bool makeRequestCalled = false; net().setConnectionHook(makeTestHook( - [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status { - return Status::OK(); - }, + [&](const HostAndPort& remoteHost, + const BSONObj&, + const RemoteCommandResponse& isMasterReply) -> Status { return Status::OK(); }, [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> { makeRequestCalled = true; return {boost::none}; @@ -296,9 +300,9 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookNoRequest) { TEST_F(NetworkInterfaceMockTest, ConnectionHookMakeRequestFails) { bool makeRequestCalled = false; net().setConnectionHook(makeTestHook( - [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status { - return Status::OK(); - }, + [&](const HostAndPort& remoteHost, + const BSONObj&, + const RemoteCommandResponse& isMasterReply) -> Status { return Status::OK(); }, [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> { makeRequestCalled = true; return {ErrorCodes::InvalidSyncSource, "blah"}; @@ -333,9 +337,9 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookMakeRequestFails) { TEST_F(NetworkInterfaceMockTest, ConnectionHookHandleReplyFails) { bool handleReplyCalled = false; net().setConnectionHook(makeTestHook( - [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status { - return Status::OK(); - }, + [&](const HostAndPort& remoteHost, + const BSONObj&, + const RemoteCommandResponse& isMasterReply) -> Status { return Status::OK(); }, [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> { return boost::make_optional<RemoteCommandRequest>({}); }, diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index 62111f3f1b8..5fb164cf9e6 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -32,6 +32,7 @@ #include "mongo/executor/network_interface_tl.h" +#include "mongo/db/commands/test_commands_enabled.h" #include "mongo/db/server_options.h" #include "mongo/executor/connection_pool_tl.h" #include "mongo/transport/transport_layer_manager.h" @@ -70,6 +71,12 @@ void NetworkInterfaceTL::appendConnectionStats(ConnectionPoolStats* stats) const pool->appendConnectionStats(stats); } +NetworkInterface::Counters NetworkInterfaceTL::getCounters() const { + invariant(getTestCommandsEnabled()); + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _counters; +} + std::string NetworkInterfaceTL::getHostName() { return getHostNameCached(); } @@ -174,6 +181,15 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa state->deadline = state->start + state->request.timeout; } + if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsBeforeAcquireConn)) { + log() << "Discarding command due to failpoint before acquireConn"; + std::move(state->mergedFuture) + .getAsync([onFinish](StatusWith<RemoteCommandResponse> response) { + onFinish(RemoteCommandResponse(response.getStatus(), Milliseconds{0})); + }); + return Status::OK(); + } + // Interacting with the connection pool can involve more work than just getting a connection // out. In particular, we can end up having to spin up new connections, and fulfilling promises // for other requesters. Returning connections has the same issue. @@ -254,6 +270,11 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( std::shared_ptr<CommandState> state, CommandState::ConnHandle conn, const transport::BatonHandle& baton) { + if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsAfterAcquireConn)) { + conn->indicateSuccess(); + return std::move(state->mergedFuture); + } + if (state->done.load()) { conn->indicateSuccess(); uasserted(ErrorCodes::CallbackCanceled, "Command was canceled"); @@ -277,7 +298,7 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( state->timer = _reactor->makeTimer(); state->timer->waitUntil(state->deadline, baton) - .getAsync([client, state, baton](Status status) { + .getAsync([this, client, state, baton](Status status) { if (status == ErrorCodes::CallbackCanceled) { invariant(state->done.load()); return; @@ -287,6 +308,11 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( return; } + if (getTestCommandsEnabled()) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _counters.timedOut++; + } + LOG(2) << "Request " << state->request.id << " timed out" << ", deadline was " << state->deadline << ", op was " << redact(state->request.toString()); @@ -324,6 +350,15 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( if (state->done.swap(true)) return; + if (getTestCommandsEnabled()) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (swr.isOK() && swr.getValue().status.isOK()) { + _counters.succeeded++; + } else { + _counters.failed++; + } + } + if (state->timer) { state->timer->cancel(baton); } @@ -354,6 +389,11 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan return; } + if (getTestCommandsEnabled()) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _counters.canceled++; + } + LOG(2) << "Canceling operation; original request was: " << redact(state->request.toString()); state->promise.setError({ErrorCodes::CallbackCanceled, str::stream() << "Command canceled; original request was: " diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index 76053a511d7..b3e3296d8dc 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -54,6 +54,8 @@ public: std::string getDiagnosticString() override; void appendConnectionStats(ConnectionPoolStats* stats) const override; std::string getHostName() override; + Counters getCounters() const override; + void startup() override; void shutdown() override; bool inShutdown() const override; @@ -121,6 +123,7 @@ private: ConnectionPool::Options _connPoolOpts; std::unique_ptr<NetworkConnectionHook> _onConnectHook; std::unique_ptr<ConnectionPool> _pool; + Counters _counters; std::unique_ptr<rpc::EgressMetadataHook> _metadataHook; AtomicBool _inShutdown; diff --git a/src/mongo/executor/test_network_connection_hook.h b/src/mongo/executor/test_network_connection_hook.h index 0294b23cd38..b78379d3919 100644 --- a/src/mongo/executor/test_network_connection_hook.h +++ b/src/mongo/executor/test_network_connection_hook.h @@ -52,8 +52,9 @@ public: _replyFunc(std::forward<ReplyFunc>(replyFunc)) {} Status validateHost(const HostAndPort& remoteHost, + const BSONObj& request, const RemoteCommandResponse& isMasterReply) override { - return _validateFunc(remoteHost, isMasterReply); + return _validateFunc(remoteHost, request, isMasterReply); } StatusWith<boost::optional<RemoteCommandRequest>> makeRequest(const HostAndPort& remoteHost) { |