summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
authorJonathan Reams <jbreams@mongodb.com>2018-05-03 15:40:22 -0400
committerJonathan Reams <jbreams@mongodb.com>2018-05-16 14:16:09 -0400
commit2e330a0a33f271c6ac50441c2e7a8a1a6e776265 (patch)
tree08a346263be0f968e5a54caa59073c3948da332e /src/mongo/executor
parentd646baf48e55f2e84ff811a0191ebf1e253ea9c6 (diff)
downloadmongo-2e330a0a33f271c6ac50441c2e7a8a1a6e776265.tar.gz
SERVER-34730 Delete NetworkInterfaceASIO
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/SConscript172
-rw-r--r--src/mongo/executor/async_mock_stream_factory.cpp335
-rw-r--r--src/mongo/executor/async_mock_stream_factory.h226
-rw-r--r--src/mongo/executor/async_secure_stream.cpp125
-rw-r--r--src/mongo/executor/async_secure_stream.h74
-rw-r--r--src/mongo/executor/async_secure_stream_factory.cpp69
-rw-r--r--src/mongo/executor/async_secure_stream_factory.h60
-rw-r--r--src/mongo/executor/async_stream.cpp95
-rw-r--r--src/mongo/executor/async_stream.h61
-rw-r--r--src/mongo/executor/async_stream_common.cpp68
-rw-r--r--src/mongo/executor/async_stream_common.h146
-rw-r--r--src/mongo/executor/async_stream_factory.cpp47
-rw-r--r--src/mongo/executor/async_stream_factory.h49
-rw-r--r--src/mongo/executor/async_stream_factory_interface.h59
-rw-r--r--src/mongo/executor/async_stream_interface.h70
-rw-r--r--src/mongo/executor/async_stream_test.cpp144
-rw-r--r--src/mongo/executor/async_timer_asio.cpp72
-rw-r--r--src/mongo/executor/async_timer_asio.h65
-rw-r--r--src/mongo/executor/async_timer_interface.h5
-rw-r--r--src/mongo/executor/async_timer_mock.cpp10
-rw-r--r--src/mongo/executor/async_timer_mock.h8
-rw-r--r--src/mongo/executor/async_timer_mock_test.cpp170
-rw-r--r--src/mongo/executor/connection_pool_asio.cpp333
-rw-r--r--src/mongo/executor/connection_pool_asio.h138
-rw-r--r--src/mongo/executor/connection_pool_asio_integration_test.cpp341
-rw-r--r--src/mongo/executor/network_connection_hook.h3
-rw-r--r--src/mongo/executor/network_interface.cpp2
-rw-r--r--src/mongo/executor/network_interface.h16
-rw-r--r--src/mongo/executor/network_interface_asio.cpp527
-rw-r--r--src/mongo/executor/network_interface_asio.h526
-rw-r--r--src/mongo/executor/network_interface_asio_auth.cpp232
-rw-r--r--src/mongo/executor/network_interface_asio_command.cpp506
-rw-r--r--src/mongo/executor/network_interface_asio_connect.cpp110
-rw-r--r--src/mongo/executor/network_interface_asio_integration_test.cpp236
-rw-r--r--src/mongo/executor/network_interface_asio_operation.cpp408
-rw-r--r--src/mongo/executor/network_interface_asio_test.cpp1079
-rw-r--r--src/mongo/executor/network_interface_asio_test_utils.h188
-rw-r--r--src/mongo/executor/network_interface_factory.cpp5
-rw-r--r--src/mongo/executor/network_interface_factory.h9
-rw-r--r--src/mongo/executor/network_interface_integration_fixture.cpp (renamed from src/mongo/executor/network_interface_asio_integration_fixture.cpp)81
-rw-r--r--src/mongo/executor/network_interface_integration_fixture.h (renamed from src/mongo/executor/network_interface_asio_integration_fixture.h)35
-rw-r--r--src/mongo/executor/network_interface_integration_test.cpp373
-rw-r--r--src/mongo/executor/network_interface_mock.cpp2
-rw-r--r--src/mongo/executor/network_interface_mock.h3
-rw-r--r--src/mongo/executor/network_interface_mock_test.cpp26
-rw-r--r--src/mongo/executor/network_interface_tl.cpp42
-rw-r--r--src/mongo/executor/network_interface_tl.h3
-rw-r--r--src/mongo/executor/test_network_connection_hook.h3
48 files changed, 549 insertions, 6808 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript
index 6a6e061bf12..2f44dd45198 100644
--- a/src/mongo/executor/SConscript
+++ b/src/mongo/executor/SConscript
@@ -4,8 +4,6 @@ Import("env")
env = env.Clone()
-env.InjectThirdPartyIncludePaths('asio')
-
env.Library(
target='connection_pool_stats',
source=[
@@ -16,6 +14,13 @@ env.Library(
'$BUILD_DIR/mongo/util/net/network',
])
+env.Library(target='async_timer_mock',
+ source=['async_timer_mock.cpp'],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/base/system_error',
+ ])
+
env.Library(target='remote_command',
source=[
'remote_command_request.cpp',
@@ -46,26 +51,6 @@ env.Library(target='task_executor_interface',
'remote_command',
])
-env.Library(target='async_timer_mock',
- source=['async_timer_mock.cpp'],
- LIBDEPS=[
- '$BUILD_DIR/mongo/base',
- '$BUILD_DIR/third_party/shim_asio',
- ])
-
-env.CppUnitTest(target='async_timer_mock_test',
- source=['async_timer_mock_test.cpp'],
- LIBDEPS=[
- 'async_timer_mock',
- ])
-
-env.Library(target='async_timer_asio',
- source=['async_timer_asio.cpp'],
- LIBDEPS=[
- '$BUILD_DIR/mongo/base',
- '$BUILD_DIR/third_party/shim_asio',
- ])
-
env.Library(target='network_interface',
source=['network_interface.cpp',],
LIBDEPS=[
@@ -115,21 +100,6 @@ env.CppUnitTest(
],
)
-env.CppIntegrationTest(
- target='connection_pool_asio_integration_test',
- source=[
- 'connection_pool_asio_integration_test.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/rpc/command_status',
- 'network_interface_asio',
- '$BUILD_DIR/mongo/executor/thread_pool_task_executor',
- '$BUILD_DIR/mongo/executor/network_interface_thread_pool',
- '$BUILD_DIR/mongo/executor/network_interface_factory',
- '$BUILD_DIR/mongo/util/version_impl',
- ],
-)
-
env.CppUnitTest(
target='network_interface_mock_test',
source=[
@@ -169,6 +139,7 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/auth/authcommon',
+ '$BUILD_DIR/mongo/db/commands/test_commands_enabled',
'$BUILD_DIR/mongo/transport/transport_layer_manager',
'connection_pool_executor',
'network_interface',
@@ -176,87 +147,11 @@ env.Library(
)
env.Library(
- target='network_interface_asio',
- source=[
- 'connection_pool_asio.cpp',
- 'network_interface_asio.cpp',
- 'network_interface_asio_auth.cpp',
- 'network_interface_asio_command.cpp',
- 'network_interface_asio_connect.cpp',
- 'network_interface_asio_operation.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/base',
- '$BUILD_DIR/mongo/base/system_error',
- '$BUILD_DIR/mongo/client/client_query',
- '$BUILD_DIR/mongo/db/auth/authcommon',
- '$BUILD_DIR/mongo/db/wire_version',
- '$BUILD_DIR/mongo/rpc/command_status',
- '$BUILD_DIR/mongo/rpc/rpc',
- '$BUILD_DIR/mongo/transport/message_compressor',
- '$BUILD_DIR/third_party/shim_asio',
- 'async_stream',
- 'async_timer_asio',
- 'connection_pool_executor',
- 'egress_tag_closer_manager',
- 'network_interface',
- 'task_executor_interface',
- ],
- LIBDEPS_PRIVATE=[
- '$BUILD_DIR/mongo/db/commands/test_commands_enabled',
- ],
-)
-
-env.Library(
- target='async_stream',
+ target='network_interface_fixture',
source=[
- 'async_secure_stream.cpp',
- 'async_secure_stream_factory.cpp',
- 'async_stream.cpp',
- 'async_stream_common.cpp',
- 'async_stream_factory.cpp',
+ 'network_interface_integration_fixture.cpp'
],
LIBDEPS=[
- '$BUILD_DIR/mongo/base/system_error',
- '$BUILD_DIR/mongo/client/authentication',
- '$BUILD_DIR/mongo/util/net/ssl_manager',
- '$BUILD_DIR/third_party/shim_asio',
- 'task_executor_interface',
- ]
-)
-
-env.CppUnitTest(
- target='async_stream_test',
- source=[
- 'async_stream_test.cpp',
- ],
- LIBDEPS=[
- 'async_stream',
- ]
-)
-
-env.CppUnitTest(
- target='network_interface_asio_test',
- source=[
- 'async_mock_stream_factory.cpp',
- 'network_interface_asio_test.cpp',
- ],
- LIBDEPS=[
- 'async_timer_mock',
- 'network_interface_asio',
- '$BUILD_DIR/mongo/executor/thread_pool_task_executor',
- '$BUILD_DIR/mongo/executor/network_interface_thread_pool',
- '$BUILD_DIR/mongo/executor/network_interface_factory',
- '$BUILD_DIR/mongo/util/version_impl',
- ],
-)
-env.Library(
- target='network_interface_asio_fixture',
- source=[
- 'network_interface_asio_integration_fixture.cpp'
- ],
- LIBDEPS=[
- 'network_interface_asio',
'$BUILD_DIR/mongo/unittest/integration_test_main',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor',
'$BUILD_DIR/mongo/executor/network_interface_thread_pool',
@@ -268,33 +163,17 @@ env.Library(
)
env.CppIntegrationTest(
- target='network_interface_asio_integration_test',
+ target='network_interface_integration_test',
source=[
- 'network_interface_asio_integration_test.cpp',
+ 'network_interface_integration_test.cpp',
],
LIBDEPS=[
- 'network_interface_asio_fixture',
- ],
-)
-
-env.CppIntegrationTest(
- target='network_interface_perf_test',
- source=[
- 'network_interface_perf_test.cpp',
- ],
- LIBDEPS=[
- 'network_interface_asio',
- '$BUILD_DIR/mongo/executor/thread_pool_task_executor',
- '$BUILD_DIR/mongo/executor/network_interface_thread_pool',
- '$BUILD_DIR/mongo/executor/network_interface_factory',
- '$BUILD_DIR/mongo/db/auth/authmocks',
- '$BUILD_DIR/mongo/db/service_context_noop_init',
- '$BUILD_DIR/mongo/rpc/command_status',
- '$BUILD_DIR/mongo/util/version_impl',
+ 'network_interface_fixture',
+ '$BUILD_DIR/mongo/db/commands/test_commands_enabled',
+ '$BUILD_DIR/mongo/db/wire_version',
],
)
-
env.Library(
target='network_interface_factory',
source=[
@@ -302,10 +181,10 @@ env.Library(
],
LIBDEPS=[
'network_interface',
- 'network_interface_asio',
- 'network_interface_tl',
+ 'connection_pool_executor',
],
LIBDEPS_PRIVATE=[
+ 'network_interface_tl',
'egress_tag_closer_manager',
])
@@ -345,23 +224,6 @@ env.Library(
]
)
-env.CppUnitTest(
- target='network_interface_thread_pool_test',
- source=[
- 'network_interface_thread_pool_test.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/util/concurrency/thread_pool_test_fixture',
- 'async_timer_mock',
- 'connection_pool_executor',
- 'network_interface_asio',
- 'network_interface_thread_pool',
- '$BUILD_DIR/mongo/executor/thread_pool_task_executor',
- '$BUILD_DIR/mongo/executor/network_interface_thread_pool',
- '$BUILD_DIR/mongo/executor/network_interface_factory'
- ],
-)
-
env.Library(
target='thread_pool_task_executor_test_fixture',
source=[
diff --git a/src/mongo/executor/async_mock_stream_factory.cpp b/src/mongo/executor/async_mock_stream_factory.cpp
deleted file mode 100644
index 0d57e80ac5b..00000000000
--- a/src/mongo/executor/async_mock_stream_factory.cpp
+++ /dev/null
@@ -1,335 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/executor/async_mock_stream_factory.h"
-
-#include <exception>
-#include <iterator>
-#include <system_error>
-
-#include "mongo/base/system_error.h"
-#include "mongo/rpc/command_reply_builder.h"
-#include "mongo/rpc/factory.h"
-#include "mongo/rpc/legacy_reply_builder.h"
-#include "mongo/rpc/message.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/util/assert_util.h"
-#include "mongo/util/log.h"
-
-namespace mongo {
-namespace executor {
-
-namespace {
-StringData stateToString(AsyncMockStreamFactory::MockStream::StreamState state) {
- switch (state) {
- case AsyncMockStreamFactory::MockStream::StreamState::kRunning:
- return "Running";
- case AsyncMockStreamFactory::MockStream::StreamState::kBlockedBeforeConnect:
- return "Blocked before connect";
- case AsyncMockStreamFactory::MockStream::StreamState::kBlockedAfterWrite:
- return "Blocked after write";
- case AsyncMockStreamFactory::MockStream::StreamState::kBlockedBeforeRead:
- return "Blocked before read";
- case AsyncMockStreamFactory::MockStream::StreamState::kCanceled:
- return "Canceled";
- }
- MONGO_UNREACHABLE;
-}
-
-template <typename Handler>
-void checkCanceled(asio::io_service::strand* strand,
- AsyncMockStreamFactory::MockStream::StreamState* state,
- Handler&& handler,
- std::size_t bytes,
- std::error_code ec = std::error_code()) {
- auto wasCancelled = (*state == AsyncMockStreamFactory::MockStream::StreamState::kCanceled);
- *state = AsyncMockStreamFactory::MockStream::StreamState::kRunning;
- strand->post([handler, wasCancelled, bytes, ec] {
- handler(wasCancelled ? make_error_code(asio::error::operation_aborted) : ec, bytes);
- });
-}
-
-} // namespace
-
-std::unique_ptr<AsyncStreamInterface> AsyncMockStreamFactory::makeStream(
- asio::io_service::strand* strand, const HostAndPort& target) {
- return stdx::make_unique<MockStream>(strand, this, target);
-}
-
-void AsyncMockStreamFactory::_createStream(const HostAndPort& host, MockStream* stream) {
- stdx::lock_guard<stdx::mutex> lk(_factoryMutex);
- log() << "creating stream for: " << host;
- _streams.emplace(host, stream);
- _factoryCv.notify_all();
-}
-
-void AsyncMockStreamFactory::_destroyStream(const HostAndPort& host) {
- stdx::lock_guard<stdx::mutex> lk(_factoryMutex);
- log() << "destroying stream for: " << host;
- _streams.erase(host);
-}
-
-AsyncMockStreamFactory::MockStream* AsyncMockStreamFactory::blockUntilStreamExists(
- const HostAndPort& host) {
- stdx::unique_lock<stdx::mutex> lk(_factoryMutex);
-
- auto iter = std::begin(_streams);
-
- _factoryCv.wait(lk, [&] { return (iter = _streams.find(host)) != std::end(_streams); });
-
- return iter->second;
-}
-
-AsyncMockStreamFactory::MockStream::MockStream(asio::io_service::strand* strand,
- AsyncMockStreamFactory* factory,
- const HostAndPort& target)
- : _strand(strand), _factory(factory), _target(target) {
- _factory->_createStream(_target, this);
-}
-
-AsyncMockStreamFactory::MockStream::~MockStream() {
- _factory->_destroyStream(_target);
-}
-
-void AsyncMockStreamFactory::MockStream::connect(asio::ip::tcp::resolver::iterator endpoints,
- ConnectHandler&& connectHandler) {
- // Suspend execution after "connecting"
- _defer(kBlockedBeforeConnect, [this, connectHandler, endpoints]() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
-
- // We shim a lambda to give connectHandler the right signature since it doesn't take
- // a size_t param.
- checkCanceled(
- _strand,
- &_state,
- [connectHandler](std::error_code ec, std::size_t) { return connectHandler(ec); },
- 0);
- });
-}
-
-void AsyncMockStreamFactory::MockStream::write(asio::const_buffer buf,
- StreamHandler&& writeHandler) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
-
- auto begin = asio::buffer_cast<const uint8_t*>(buf);
- auto size = asio::buffer_size(buf);
- _writeQueue.push({begin, begin + size});
-
- // Suspend execution after data is written.
- _defer_inlock(kBlockedAfterWrite, [this, writeHandler, size]() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- checkCanceled(_strand, &_state, std::move(writeHandler), size);
- });
-}
-
-void AsyncMockStreamFactory::MockStream::cancel() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- log() << "cancel() for: " << _target;
-
- // If _state is kRunning then we don't have a deferred operation to cancel.
- if (_state == kRunning) {
- return;
- }
-
- _state = kCanceled;
-}
-
-void AsyncMockStreamFactory::MockStream::read(asio::mutable_buffer buf,
- StreamHandler&& readHandler) {
- // Suspend execution before data is read.
- _defer(kBlockedBeforeRead, [this, buf, readHandler]() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- int nToCopy = 0;
-
- // If we've set an error, return that instead of a read.
- if (!_error) {
- auto nextRead = std::move(_readQueue.front());
- _readQueue.pop();
-
- auto beginDst = asio::buffer_cast<uint8_t*>(buf);
- nToCopy = std::min(nextRead.size(), asio::buffer_size(buf));
-
- auto endSrc = std::begin(nextRead);
- std::advance(endSrc, nToCopy);
-
- auto endDst = std::copy(std::begin(nextRead), endSrc, beginDst);
- invariant((endDst - beginDst) == static_cast<std::ptrdiff_t>(nToCopy));
- log() << "read " << nToCopy << " bytes, " << (nextRead.size() - nToCopy)
- << " remaining in buffer";
- }
-
- auto handler = readHandler;
-
- // If we did not receive all the bytes, we should return an error
- if (static_cast<size_t>(nToCopy) < asio::buffer_size(buf)) {
- handler = [readHandler](std::error_code ec, size_t len) {
- // If we have an error here we've been canceled, and that takes precedence
- if (ec)
- return readHandler(ec, len);
-
- // Call the original handler with an error
- readHandler(make_error_code(ErrorCodes::InvalidLength), len);
- };
- }
-
- checkCanceled(_strand, &_state, std::move(handler), nToCopy, _error);
- _error.clear();
- });
-}
-
-void AsyncMockStreamFactory::MockStream::pushRead(std::vector<uint8_t> toRead) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- invariant(_state != kRunning);
- _readQueue.emplace(std::move(toRead));
-}
-
-void AsyncMockStreamFactory::MockStream::setError(std::error_code ec) {
- _error = ec;
-}
-
-std::vector<uint8_t> AsyncMockStreamFactory::MockStream::popWrite() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- invariant(_state != kRunning);
- auto nextWrite = std::move(_writeQueue.front());
- _writeQueue.pop();
- return nextWrite;
-}
-
-void AsyncMockStreamFactory::MockStream::_defer(StreamState state, Action&& handler) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _defer_inlock(state, std::move(handler));
-}
-
-void AsyncMockStreamFactory::MockStream::_defer_inlock(StreamState state, Action&& handler) {
- invariant(_state == kRunning);
- _state = state;
-
- invariant(!_deferredAction);
- _deferredAction = std::move(handler);
- _deferredCV.notify_one();
-}
-
-void AsyncMockStreamFactory::MockStream::unblock() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _unblock_inlock();
-}
-
-void AsyncMockStreamFactory::MockStream::_unblock_inlock() {
- // Can be canceled here at which point we will call the handler with the CallbackCanceled
- // status when we invoke the _deferredAction.
- invariant(_state != kRunning);
-
- if (_state != kCanceled) {
- _state = kRunning;
- }
- // Post our deferred action to resume state machine execution
- invariant(_deferredAction);
- _strand->post(std::move(_deferredAction));
- _deferredAction = nullptr;
-}
-
-auto AsyncMockStreamFactory::MockStream::waitUntilBlocked() -> StreamState {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _deferredCV.wait(lk, [this]() { return _state != kRunning; });
- log() << "returning from waitUntilBlocked, state: " << stateToString(_state);
- return _state;
-}
-
-HostAndPort AsyncMockStreamFactory::MockStream::target() {
- return _target;
-}
-
-void AsyncMockStreamFactory::MockStream::simulateServer(
- rpc::Protocol proto,
- const stdx::function<RemoteCommandResponse(RemoteCommandRequest)> replyFunc) {
- std::exception_ptr ex;
- uint32_t messageId = 0;
-
- RemoteCommandResponse resp;
- {
- WriteEvent write{this};
-
- std::vector<uint8_t> messageData = popWrite();
- Message msg(SharedBuffer::allocate(messageData.size()));
- memcpy(msg.buf(), messageData.data(), messageData.size());
-
- auto request = rpc::opMsgRequestFromAnyProtocol(msg);
- ASSERT(rpc::protocolForMessage(msg) == proto);
-
- RemoteCommandRequest rcr(target(), request.getDatabase().toString(), request.body, nullptr);
-
- messageId = msg.header().getId();
-
- // So we can allow ASSERTs in replyFunc, we capture any exceptions, but rethrow
- // them later to prevent deadlock
- try {
- resp = replyFunc(std::move(rcr));
- } catch (...) {
- ex = std::current_exception();
- }
- }
-
- auto replyBuilder = rpc::makeReplyBuilder(proto);
- replyBuilder->setCommandReply(resp.data);
- replyBuilder->setMetadata(resp.metadata);
-
- auto replyMsg = replyBuilder->done();
- replyMsg.header().setResponseToMsgId(messageId);
-
- {
- // The first read will be for the header.
- ReadEvent read{this};
- auto hdrBytes = reinterpret_cast<const uint8_t*>(replyMsg.header().view2ptr());
- pushRead({hdrBytes, hdrBytes + sizeof(MSGHEADER::Value)});
- }
-
- {
- // The second read will be for the message data.
- ReadEvent read{this};
- auto dataBytes = reinterpret_cast<const uint8_t*>(replyMsg.buf());
- auto pastHeader = dataBytes;
- std::advance(pastHeader, sizeof(MSGHEADER::Value));
- pushRead({pastHeader, dataBytes + static_cast<std::size_t>(replyMsg.size())});
- }
-
- if (ex) {
- // Rethrow ASSERTS after the NIA completes it's Write-Read-Read sequence.
- std::rethrow_exception(ex);
- }
-}
-
-bool AsyncMockStreamFactory::MockStream::isOpen() {
- return true;
-}
-
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/async_mock_stream_factory.h b/src/mongo/executor/async_mock_stream_factory.h
deleted file mode 100644
index 6f65edd8425..00000000000
--- a/src/mongo/executor/async_mock_stream_factory.h
+++ /dev/null
@@ -1,226 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <asio.hpp>
-#include <cstdint>
-#include <memory>
-#include <queue>
-
-#include "mongo/executor/async_stream_factory_interface.h"
-#include "mongo/executor/async_stream_interface.h"
-#include "mongo/executor/remote_command_request.h"
-#include "mongo/executor/remote_command_response.h"
-#include "mongo/rpc/protocol.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/functional.h"
-#include "mongo/stdx/mutex.h"
-#include "mongo/stdx/unordered_map.h"
-#include "mongo/unittest/unittest.h"
-#include "mongo/util/net/hostandport.h"
-
-namespace mongo {
-namespace executor {
-
-class AsyncStreamInterface;
-
-/**
- * A factory that produces mock streams to allow for testing of NetworkInterfaceASIO.
- *
- * The streams produced by this factory simulate a flow of Events (ConnectEvent,
- * ReadEvent, WriteEvent). The streams created by this factory will automatically
- * pause themselves at each Event, and the caller must unblock them by destroying
- * the Event object to continue.
- *
- * Example use of this factory:
- *
- * AsyncMockStreamFactory factory();
- *
- * // NIA will then call makeStream(...) to create new streams from the
- * // factory, or the caller can do this manually.
- *
- * // Wait for the desired stream to exist
- * auto stream = streamFactory.blockUntilStreamExists(host);
- *
- * // If we do not care to inspect after a certain event, we can skip it:
- * ConnectEvent{stream}.skip();
- *
- * // To examine the stream at an Event, instantiate the event object.
- * // When the Event object goes out of scope the stream will unblock.
- * {
- * WriteEvent write{stream};
- *
- * // Inspect what NIA wrote to this stream:
- * auto messageData = stream->popWrite();
- * ...
- * }
- *
- * // The Event object will keep the stream blocked as long as it exists.
- * // Use this window to perform operations on the stream or inspect it.
- * {
- * ReadEvent read{stream};
- *
- * // Simulate data sent to this stream over the network
- * stream->pushRead( ... );
- *
- * // Or, simulate a networking error
- * stream->setError( error_code );
- * }
- */
-class AsyncMockStreamFactory final : public AsyncStreamFactoryInterface {
-public:
- AsyncMockStreamFactory() = default;
-
- std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service::strand* strand,
- const HostAndPort& host) override;
-
- /**
- * A mock stream class for testing the egress networking layer.
- *
- * At the core of this class is an idea of deferring actions and allowing inspection
- * of state of the stream before those actions happen.
- *
- * This class operates on the assumption that two threads are in use: a networking
- * thread used by NIA to issue IO calls on the MockStream, and a test thread to
- * wait on those calls and react.
- *
- * When the test thread creates an Event object, the constructor sends it to wait
- * on a condition variable. When NIA issues an IO call on the stream, the MockStream
- * load the proper handler into a placeholder, and then calls notify() on the
- * condition variable. At that point the stream is paused and the test thread
- * may operate on it.
- */
- class MockStream final : public AsyncStreamInterface {
- public:
- MockStream(asio::io_service::strand* strand,
- AsyncMockStreamFactory* factory,
- const HostAndPort& target);
-
- // Use unscoped enum so we can specialize on it
- enum StreamState {
- kRunning,
- kBlockedBeforeConnect,
- kBlockedBeforeRead,
- kBlockedAfterWrite,
- kCanceled,
- };
-
- ~MockStream();
-
- void connect(asio::ip::tcp::resolver::iterator endpoints,
- ConnectHandler&& connectHandler) override;
- void write(asio::const_buffer buf, StreamHandler&& writeHandler) override;
- void read(asio::mutable_buffer buf, StreamHandler&& readHandler) override;
-
- bool isOpen() override;
-
- HostAndPort target();
-
- StreamState waitUntilBlocked();
-
- void cancel() override;
-
- std::vector<uint8_t> popWrite();
- void pushRead(std::vector<uint8_t> toRead);
-
- void setError(std::error_code ec);
-
- void unblock();
-
- void simulateServer(
- rpc::Protocol proto,
- const stdx::function<RemoteCommandResponse(RemoteCommandRequest)> replyFunc);
-
- private:
- using Action = stdx::function<void()>;
-
- void _defer(StreamState state, Action&& handler);
- void _defer_inlock(StreamState state, Action&& handler);
- void _unblock_inlock();
-
- asio::io_service::strand* _strand;
-
- AsyncMockStreamFactory* _factory;
- HostAndPort _target;
-
- stdx::mutex _mutex;
-
- stdx::condition_variable _deferredCV;
- StreamState _state{kRunning};
-
- std::queue<std::vector<uint8_t>> _readQueue;
- std::queue<std::vector<uint8_t>> _writeQueue;
-
- std::error_code _error;
-
- Action _deferredAction;
- };
-
- MockStream* blockUntilStreamExists(const HostAndPort& host);
-
-private:
- void _createStream(const HostAndPort& host, MockStream* stream);
- void _destroyStream(const HostAndPort& host);
-
- stdx::mutex _factoryMutex;
- stdx::condition_variable _factoryCv;
-
- stdx::unordered_map<HostAndPort, MockStream*> _streams;
-};
-
-template <int EventType>
-class StreamEvent {
-public:
- StreamEvent(AsyncMockStreamFactory::MockStream* stream) : _stream(stream) {
- ASSERT(stream->waitUntilBlocked() == EventType);
- }
-
- void skip() {
- _stream->unblock();
- skipped = true;
- }
-
- ~StreamEvent() {
- if (!skipped) {
- skip();
- }
- }
-
-private:
- bool skipped = false;
- AsyncMockStreamFactory::MockStream* _stream = nullptr;
-};
-
-using ReadEvent = StreamEvent<AsyncMockStreamFactory::MockStream::StreamState::kBlockedBeforeRead>;
-using WriteEvent = StreamEvent<AsyncMockStreamFactory::MockStream::StreamState::kBlockedAfterWrite>;
-using ConnectEvent =
- StreamEvent<AsyncMockStreamFactory::MockStream::StreamState::kBlockedBeforeConnect>;
-
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/async_secure_stream.cpp b/src/mongo/executor/async_secure_stream.cpp
deleted file mode 100644
index eb69bd5d18e..00000000000
--- a/src/mongo/executor/async_secure_stream.cpp
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/executor/async_secure_stream.h"
-
-#include "mongo/base/system_error.h"
-#include "mongo/config.h"
-#include "mongo/executor/async_stream_common.h"
-#include "mongo/util/log.h"
-#include "mongo/util/net/ssl_manager.h"
-
-#ifdef MONGO_CONFIG_SSL
-
-namespace mongo {
-namespace executor {
-
-AsyncSecureStream::AsyncSecureStream(asio::io_service::strand* strand,
- asio::ssl::context* sslContext)
- : _strand(strand), _stream(_strand->get_io_service(), *sslContext, "") {}
-
-AsyncSecureStream::~AsyncSecureStream() {
- destroyStream(&_stream.lowest_layer(), _connected);
-}
-
-void AsyncSecureStream::connect(asio::ip::tcp::resolver::iterator endpoints,
- ConnectHandler&& connectHandler) {
- // Stash the connectHandler as we won't be able to call it until we re-enter the state
- // machine.
- _userHandler = std::move(connectHandler);
- asio::async_connect(
- _stream.lowest_layer(),
- std::move(endpoints),
- _strand->wrap([this](std::error_code ec, asio::ip::tcp::resolver::iterator iter) {
- if (ec) {
- return _userHandler(ec);
- }
-
- ec = setStreamNonBlocking(&_stream.next_layer());
- if (ec) {
- return _userHandler(ec);
- }
-
- ec = setStreamNoDelay(&_stream.next_layer());
- if (ec) {
- return _userHandler(ec);
- }
-
- _connected = true;
- return _handleConnect(std::move(iter));
- }));
-}
-
-void AsyncSecureStream::write(asio::const_buffer buffer, StreamHandler&& streamHandler) {
- writeStream(&_stream, _strand, _connected, buffer, std::move(streamHandler));
-}
-
-void AsyncSecureStream::read(asio::mutable_buffer buffer, StreamHandler&& streamHandler) {
- readStream(&_stream, _strand, _connected, buffer, std::move(streamHandler));
-}
-
-void AsyncSecureStream::_handleConnect(asio::ip::tcp::resolver::iterator iter) {
- _stream.async_handshake(decltype(_stream)::client,
- _strand->wrap([this, iter](std::error_code ec) {
- if (ec) {
- return _userHandler(ec);
- }
- return _handleHandshake(ec, iter->host_name());
- }));
-}
-
-void AsyncSecureStream::_handleHandshake(std::error_code ec, const std::string& hostName) {
- auto certStatus =
- getSSLManager()->parseAndValidatePeerCertificate(_stream.native_handle(), hostName);
- if (certStatus.isOK()) {
- _userHandler(make_error_code(ErrorCodes::OK));
- } else {
- warning() << "Failed to validate peer certificate during SSL handshake: "
- << certStatus.getStatus();
- // We aren't able to propagate error extra info through here so make sure we only use a code
- // that won't have any.
- _userHandler(make_error_code(ErrorCodes::SSLHandshakeFailed));
- }
-}
-
-void AsyncSecureStream::cancel() {
- cancelStream(&_stream.lowest_layer());
-}
-
-bool AsyncSecureStream::isOpen() {
- return checkIfStreamIsOpen(&_stream.next_layer(), _connected);
-}
-
-} // namespace executor
-} // namespace mongo
-
-#endif
diff --git a/src/mongo/executor/async_secure_stream.h b/src/mongo/executor/async_secure_stream.h
deleted file mode 100644
index cead9d6fada..00000000000
--- a/src/mongo/executor/async_secure_stream.h
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/config.h"
-
-#ifdef MONGO_CONFIG_SSL
-
-#include <asio.hpp>
-
-#include "mongo/executor/async_stream_interface.h"
-#include "mongo/util/net/ssl.hpp"
-
-namespace mongo {
-namespace executor {
-
-class AsyncSecureStream final : public AsyncStreamInterface {
-public:
- AsyncSecureStream(asio::io_service::strand* strand, asio::ssl::context* sslContext);
-
- ~AsyncSecureStream();
-
- void connect(asio::ip::tcp::resolver::iterator endpoints,
- ConnectHandler&& connectHandler) override;
-
- void write(asio::const_buffer buffer, StreamHandler&& streamHandler) override;
-
- void read(asio::mutable_buffer buffer, StreamHandler&& streamHandler) override;
-
- void cancel() override;
-
- bool isOpen() override;
-
-private:
- void _handleConnect(asio::ip::tcp::resolver::iterator iter);
-
- void _handleHandshake(std::error_code ec, const std::string& hostName);
-
- asio::io_service::strand* const _strand;
- asio::ssl::stream<asio::ip::tcp::socket> _stream;
- ConnectHandler _userHandler;
- bool _connected = false;
-};
-
-} // namespace executor
-} // namespace mongo
-
-#endif
diff --git a/src/mongo/executor/async_secure_stream_factory.cpp b/src/mongo/executor/async_secure_stream_factory.cpp
deleted file mode 100644
index a907dbe894d..00000000000
--- a/src/mongo/executor/async_secure_stream_factory.cpp
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/executor/async_secure_stream_factory.h"
-
-#include "mongo/config.h"
-#include "mongo/executor/async_secure_stream.h"
-#include "mongo/executor/async_stream.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/util/net/ssl_manager.h"
-#include "mongo/util/net/ssl_options.h"
-
-#ifdef MONGO_CONFIG_SSL
-
-namespace mongo {
-namespace executor {
-
-AsyncSecureStreamFactory::AsyncSecureStreamFactory(SSLManagerInterface* sslManager)
- : _sslContext(asio::ssl::context::sslv23) {
- // We use sslv23, which corresponds to OpenSSLs SSLv23_method, for compatibility with older
- // versions of OpenSSL. This mirrors the call to SSL_CTX_new in ssl_manager.cpp. In
- // initAsyncSSLContext we explicitly disable all protocols other than TLSv1, TLSv1.1,
- // and TLSv1.2.
- uassertStatusOK(
- sslManager->initSSLContext(_sslContext.native_handle(),
- getSSLGlobalParams(),
- SSLManagerInterface::ConnectionDirection::kOutgoing));
-}
-
-std::unique_ptr<AsyncStreamInterface> AsyncSecureStreamFactory::makeStream(
- asio::io_service::strand* strand, const HostAndPort&) {
- int sslModeVal = getSSLGlobalParams().sslMode.load();
- if (sslModeVal == SSLParams::SSLMode_preferSSL || sslModeVal == SSLParams::SSLMode_requireSSL) {
- return stdx::make_unique<AsyncSecureStream>(strand, &_sslContext);
- }
- return stdx::make_unique<AsyncStream>(strand);
-}
-
-} // namespace executor
-} // namespace mongo
-
-#endif
diff --git a/src/mongo/executor/async_secure_stream_factory.h b/src/mongo/executor/async_secure_stream_factory.h
deleted file mode 100644
index ad25ad487d9..00000000000
--- a/src/mongo/executor/async_secure_stream_factory.h
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/config.h"
-
-#ifdef MONGO_CONFIG_SSL
-
-#include <asio.hpp>
-
-#include "mongo/executor/async_stream_factory_interface.h"
-#include "mongo/util/net/ssl.hpp"
-
-namespace mongo {
-class SSLManagerInterface;
-namespace executor {
-
-class AsyncStreamInterface;
-
-class AsyncSecureStreamFactory final : public AsyncStreamFactoryInterface {
-public:
- AsyncSecureStreamFactory(SSLManagerInterface* sslManager);
-
- std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service::strand* strand,
- const HostAndPort&) override;
-
-private:
- asio::ssl::context _sslContext;
-};
-
-} // namespace executor
-} // namespace mongo
-
-#endif
diff --git a/src/mongo/executor/async_stream.cpp b/src/mongo/executor/async_stream.cpp
deleted file mode 100644
index 075a9094dd4..00000000000
--- a/src/mongo/executor/async_stream.cpp
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/executor/async_stream.h"
-#include "mongo/executor/async_stream_common.h"
-#include "mongo/util/assert_util.h"
-#include "mongo/util/log.h"
-
-namespace mongo {
-namespace executor {
-
-using asio::ip::tcp;
-
-AsyncStream::AsyncStream(asio::io_service::strand* strand)
- : _strand(strand), _stream(_strand->get_io_service()) {}
-
-AsyncStream::~AsyncStream() {
- destroyStream(&_stream, _connected);
-}
-
-void AsyncStream::connect(tcp::resolver::iterator iter, ConnectHandler&& connectHandler) {
- asio::async_connect(
- _stream,
- std::move(iter),
- // We need to wrap this with a lambda of the right signature so it compiles, even
- // if we don't actually use the resolver iterator.
- _strand->wrap([this, connectHandler](std::error_code ec, tcp::resolver::iterator iter) {
- if (ec) {
- return connectHandler(ec);
- }
-
- // We assume that our owner is responsible for keeping us alive until we call
- // connectHandler, so _connected should always be a valid memory location.
- ec = setStreamNonBlocking(&_stream);
- if (ec) {
- return connectHandler(ec);
- }
-
- ec = setStreamNoDelay(&_stream);
- if (ec) {
- return connectHandler(ec);
- }
-
- _connected = true;
- return connectHandler(ec);
- }));
-}
-
-void AsyncStream::write(asio::const_buffer buffer, StreamHandler&& streamHandler) {
- writeStream(&_stream, _strand, _connected, buffer, std::move(streamHandler));
-}
-
-void AsyncStream::read(asio::mutable_buffer buffer, StreamHandler&& streamHandler) {
- readStream(&_stream, _strand, _connected, buffer, std::move(streamHandler));
-}
-
-void AsyncStream::cancel() {
- cancelStream(&_stream);
-}
-
-bool AsyncStream::isOpen() {
- return checkIfStreamIsOpen(&_stream, _connected);
-}
-
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/async_stream.h b/src/mongo/executor/async_stream.h
deleted file mode 100644
index 3a9c9d42b15..00000000000
--- a/src/mongo/executor/async_stream.h
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <asio.hpp>
-
-#include "mongo/executor/async_stream_interface.h"
-
-namespace mongo {
-namespace executor {
-
-class AsyncStream final : public AsyncStreamInterface {
-public:
- AsyncStream(asio::io_service::strand* strand);
-
- ~AsyncStream();
-
- void connect(asio::ip::tcp::resolver::iterator iter, ConnectHandler&& connectHandler) override;
-
- void write(asio::const_buffer buffer, StreamHandler&& streamHandler) override;
-
- void read(asio::mutable_buffer buffer, StreamHandler&& streamHandler) override;
-
- void cancel() override;
-
- bool isOpen() override;
-
-private:
- asio::io_service::strand* const _strand;
- asio::ip::tcp::socket _stream;
- bool _connected = false;
-};
-
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/async_stream_common.cpp b/src/mongo/executor/async_stream_common.cpp
deleted file mode 100644
index 92708862c94..00000000000
--- a/src/mongo/executor/async_stream_common.cpp
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/executor/async_stream_common.h"
-
-#include "mongo/util/log.h"
-
-namespace mongo {
-namespace executor {
-
-void logCloseFailed(std::error_code ec) {
- invariant(ec);
- log() << "Failed to close stream: " << ec.message();
-}
-
-void logCancelFailed(std::error_code ec) {
- invariant(ec);
- log() << "Failed to cancel stream: " << ec.message();
-}
-
-void logFailureInSetStreamNonBlocking(std::error_code ec) {
- invariant(ec);
- severe() << "Failed to set non-blocking mode on stream: " << ec.message();
-}
-
-void logFailureInSetStreamNoDelay(std::error_code ec) {
- invariant(ec);
- severe() << "Failed to set no-delay mode on stream: " << ec.message();
-}
-
-void logUnexpectedErrorInCheckOpen(std::error_code ec) {
- invariant(ec);
- log() << "unexpected error when checking if a stream was open: " << ec.message()
- << ", the only errors we expect are EOF and network/connection reset";
-}
-
-
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/async_stream_common.h b/src/mongo/executor/async_stream_common.h
deleted file mode 100644
index 67c12bbff3b..00000000000
--- a/src/mongo/executor/async_stream_common.h
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include <asio.hpp>
-#include <system_error>
-#include <utility>
-
-#include "mongo/util/assert_util.h"
-
-namespace mongo {
-namespace executor {
-
-void logCloseFailed(std::error_code ec);
-
-template <typename ASIOStream>
-void destroyStream(ASIOStream* stream, bool connected) {
- if (!connected) {
- return;
- }
-
- std::error_code ec;
-
- stream->shutdown(asio::ip::tcp::socket::shutdown_both, ec);
- if (ec) {
- logCloseFailed(ec);
- }
-
- stream->close(ec);
- if (ec) {
- logCloseFailed(ec);
- }
-}
-
-template <typename ASIOStream, typename Buffer, typename Handler>
-void writeStream(ASIOStream* stream,
- asio::io_service::strand* strand,
- bool connected,
- Buffer&& buffer,
- Handler&& handler) {
- invariant(connected);
- asio::async_write(*stream,
- asio::buffer(std::forward<Buffer>(buffer)),
- strand->wrap(std::forward<Handler>(handler)));
-}
-
-template <typename ASIOStream, typename Buffer, typename Handler>
-void readStream(ASIOStream* stream,
- asio::io_service::strand* strand,
- bool connected,
- Buffer&& buffer,
- Handler&& handler) {
- invariant(connected);
- asio::async_read(*stream,
- asio::buffer(std::forward<Buffer>(buffer)),
- strand->wrap(std::forward<Handler>(handler)));
-}
-
-void logCancelFailed(std::error_code ec);
-
-template <typename ASIOStream>
-void cancelStream(ASIOStream* stream) {
- std::error_code ec;
- stream->cancel(ec);
- if (ec) {
- logCancelFailed(ec);
- }
-}
-
-void logFailureInSetStreamNonBlocking(std::error_code ec);
-void logFailureInSetStreamNoDelay(std::error_code ec);
-
-template <typename ASIOStream>
-std::error_code setStreamNonBlocking(ASIOStream* stream) {
- std::error_code ec;
- stream->non_blocking(true, ec);
- if (ec) {
- logFailureInSetStreamNonBlocking(ec);
- }
- return ec;
-}
-
-template <typename ASIOStream>
-std::error_code setStreamNoDelay(ASIOStream* stream) {
- std::error_code ec;
- stream->set_option(asio::ip::tcp::no_delay(true), ec);
- if (ec) {
- logFailureInSetStreamNoDelay(ec);
- }
- return ec;
-}
-
-void logUnexpectedErrorInCheckOpen(std::error_code ec);
-
-template <typename ASIOStream>
-bool checkIfStreamIsOpen(ASIOStream* stream, bool connected) {
- if (!connected) {
- return false;
- };
- std::error_code ec;
- std::array<char, 1> buf;
- // Although we call the blocking form of receive, we ensure the socket is in non-blocking mode.
- // ASIO implements receive on POSIX using the 'recvmsg' system call, which returns immediately
- // if the socket is non-blocking and in a valid state, but there is no data to receive. On
- // windows, receive is implemented with WSARecv, which has the same semantics.
- invariant(stream->non_blocking());
- stream->receive(asio::buffer(buf), asio::socket_base::message_peek, ec);
- if (!ec || ec == asio::error::would_block || ec == asio::error::try_again) {
- // If the read worked or we got EWOULDBLOCK or EAGAIN (since we are in non-blocking mode),
- // we assume the socket is still open.
- return true;
- } else if (ec == asio::error::eof || ec == asio::error::connection_reset ||
- ec == asio::error::network_reset) {
- return false;
- }
- // We got a different error. Log it and return false so we throw the connection away.
- logUnexpectedErrorInCheckOpen(ec);
- return false;
-}
-
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/async_stream_factory.cpp b/src/mongo/executor/async_stream_factory.cpp
deleted file mode 100644
index 5f24adf341b..00000000000
--- a/src/mongo/executor/async_stream_factory.cpp
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/executor/async_stream_factory.h"
-
-#include "mongo/executor/async_secure_stream.h"
-#include "mongo/executor/async_stream.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/util/assert_util.h"
-
-namespace mongo {
-namespace executor {
-
-std::unique_ptr<AsyncStreamInterface> AsyncStreamFactory::makeStream(
- asio::io_service::strand* strand, const HostAndPort&) {
- return stdx::make_unique<AsyncStream>(strand);
-}
-
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/async_stream_factory.h b/src/mongo/executor/async_stream_factory.h
deleted file mode 100644
index bbf9767427b..00000000000
--- a/src/mongo/executor/async_stream_factory.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <memory>
-
-#include "mongo/executor/async_stream_factory_interface.h"
-
-namespace mongo {
-namespace executor {
-
-class AsyncStreamInterface;
-
-class AsyncStreamFactory final : public AsyncStreamFactoryInterface {
-public:
- AsyncStreamFactory() = default;
-
- std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service::strand* strand,
- const HostAndPort&) override;
-};
-
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/async_stream_factory_interface.h b/src/mongo/executor/async_stream_factory_interface.h
deleted file mode 100644
index c894a1fee5a..00000000000
--- a/src/mongo/executor/async_stream_factory_interface.h
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <asio.hpp>
-#include <memory>
-
-#include "mongo/base/disallow_copying.h"
-
-namespace mongo {
-struct HostAndPort;
-namespace executor {
-
-class AsyncStreamInterface;
-
-/**
- * A factory for AsyncStream creation to support mocking.
- */
-class AsyncStreamFactoryInterface {
- MONGO_DISALLOW_COPYING(AsyncStreamFactoryInterface);
-
-public:
- virtual ~AsyncStreamFactoryInterface() = default;
-
- virtual std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service::strand* strand,
- const HostAndPort& target) = 0;
-
-protected:
- AsyncStreamFactoryInterface() = default;
-};
-
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/async_stream_interface.h b/src/mongo/executor/async_stream_interface.h
deleted file mode 100644
index ca97e350923..00000000000
--- a/src/mongo/executor/async_stream_interface.h
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <asio.hpp>
-#include <memory>
-#include <system_error>
-
-#include "mongo/base/disallow_copying.h"
-#include "mongo/stdx/functional.h"
-
-namespace mongo {
-namespace executor {
-
-/**
- * A bidirectional stream supporting asynchronous reads and writes.
- */
-class AsyncStreamInterface {
- MONGO_DISALLOW_COPYING(AsyncStreamInterface);
-
-public:
- virtual ~AsyncStreamInterface() = default;
-
- using ConnectHandler = stdx::function<void(std::error_code)>;
-
- using StreamHandler = stdx::function<void(std::error_code, std::size_t)>;
-
- virtual void connect(asio::ip::tcp::resolver::iterator endpoints,
- ConnectHandler&& connectHandler) = 0;
-
- virtual void write(asio::const_buffer buf, StreamHandler&& writeHandler) = 0;
-
- virtual void read(asio::mutable_buffer buf, StreamHandler&& readHandler) = 0;
-
- virtual void cancel() = 0;
-
- virtual bool isOpen() = 0;
-
-protected:
- AsyncStreamInterface() = default;
-};
-
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/async_stream_test.cpp b/src/mongo/executor/async_stream_test.cpp
deleted file mode 100644
index 2731df5d713..00000000000
--- a/src/mongo/executor/async_stream_test.cpp
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
-
-#include "mongo/platform/basic.h"
-
-#include <asio.hpp>
-#include <system_error>
-#include <vector>
-
-#include "mongo/executor/async_stream.h"
-#include "mongo/executor/network_interface_asio_test_utils.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/stdx/mutex.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/unittest/unittest.h"
-#include "mongo/util/assert_util.h"
-#include "mongo/util/log.h"
-#include "mongo/util/scopeguard.h"
-
-namespace mongo {
-namespace {
-
-using asio::ip::tcp;
-
-class Server {
-public:
- void startup() {
- _thread = stdx::thread([this] {
- try {
- tcp::acceptor acceptor{_io_service,
- // Let the OS choose the accepting port.
- tcp::endpoint{asio::ip::make_address_v4("127.0.0.1"), 0}};
- _endpoint.emplace(acceptor.local_endpoint());
- tcp::socket socket{_io_service};
- _started.emplace(true);
- acceptor.accept(socket);
- log() << "got incoming connection";
- _stopped.get();
- log() << "shutting down socket";
- socket.shutdown(tcp::socket::shutdown_both);
- } catch (...) {
- log() << exceptionToStatus();
- }
- });
- _started.get();
- }
-
- // idempotent
- void shutdown() {
- _stopped.emplace(true);
- _thread.join();
- }
-
- Server() {
- startup();
- }
-
- tcp::endpoint endpoint() {
- return _endpoint.get();
- }
-
- ~Server() {
- if (!_stopped.hasCompleted()) {
- shutdown();
- }
- }
-
-private:
- stdx::thread _thread;
- executor::Deferred<bool> _started;
- executor::Deferred<bool> _stopped;
- executor::Deferred<tcp::endpoint> _endpoint;
- asio::io_service _io_service;
- std::vector<tcp::socket> _sockets;
-};
-
-TEST(AsyncStreamTest, IsOpen) {
- Server server;
- asio::io_service io_service;
- stdx::thread clientWorker([&io_service] {
- log() << "starting clientWorker";
- asio::io_service::work work(io_service);
- io_service.run();
- });
- auto guard = MakeGuard([&clientWorker, &io_service] {
- io_service.stop();
- clientWorker.join();
- });
- asio::io_service::strand strand{io_service};
- executor::AsyncStream stream{&strand};
- ASSERT_FALSE(stream.isOpen());
-
- tcp::resolver resolver{io_service};
- auto endpoints = resolver.resolve(server.endpoint());
-
- executor::Deferred<bool> opened;
-
- log() << "opening up outgoing connection";
- stream.connect(endpoints, [opened](std::error_code ec) mutable {
- log() << "opened outgoing connection";
- opened.emplace(!ec);
- });
-
- ASSERT_TRUE(opened.get());
- ASSERT_TRUE(stream.isOpen());
-
- server.shutdown();
-
- // There is nothing we can wait on to determinstically know when
- // the socket will transition to closed. Busy wait for that.
- while (stream.isOpen()) {
- stdx::this_thread::sleep_for(Milliseconds(1).toSystemDuration());
- }
-}
-
-} // namespace
-} // namespace mongo
diff --git a/src/mongo/executor/async_timer_asio.cpp b/src/mongo/executor/async_timer_asio.cpp
deleted file mode 100644
index bc287540e93..00000000000
--- a/src/mongo/executor/async_timer_asio.cpp
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/executor/async_timer_asio.h"
-
-#include "mongo/stdx/chrono.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/util/log.h"
-
-namespace mongo {
-namespace executor {
-
-AsyncTimerASIO::AsyncTimerASIO(asio::io_service::strand* strand, Milliseconds expiration)
- : _strand(strand), _timer(_strand->get_io_service(), expiration.toSystemDuration()) {}
-
-void AsyncTimerASIO::cancel() {
- std::error_code ec;
- _timer.cancel(ec);
- if (ec) {
- log() << "Failed to cancel timer: " << ec.message();
- }
-}
-
-void AsyncTimerASIO::asyncWait(AsyncTimerInterface::Handler handler) {
- _timer.async_wait(_strand->wrap(std::move(handler)));
-}
-
-void AsyncTimerASIO::expireAfter(Milliseconds expiration) {
- _timer.expires_after(stdx::chrono::milliseconds(expiration.count()));
-}
-
-std::unique_ptr<AsyncTimerInterface> AsyncTimerFactoryASIO::make(asio::io_service::strand* strand,
- Milliseconds expiration) {
- return stdx::make_unique<AsyncTimerASIO>(strand, expiration);
-}
-
-Date_t AsyncTimerFactoryASIO::now() {
- return Date_t::fromDurationSinceEpoch(asio::system_timer::clock_type::now() -
- asio::system_timer::clock_type::from_time_t(0));
-}
-
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/async_timer_asio.h b/src/mongo/executor/async_timer_asio.h
deleted file mode 100644
index 01e1fbe18b8..00000000000
--- a/src/mongo/executor/async_timer_asio.h
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <asio.hpp>
-#include <asio/system_timer.hpp>
-
-#include "mongo/executor/async_timer_interface.h"
-
-namespace mongo {
-namespace executor {
-
-class AsyncTimerASIO final : public AsyncTimerInterface {
-public:
- AsyncTimerASIO(asio::io_service::strand* strand, Milliseconds expiration);
-
- void cancel() override;
-
- void asyncWait(AsyncTimerInterface::Handler handler) override;
-
- void expireAfter(Milliseconds expiration) override;
-
-private:
- asio::io_service::strand* const _strand;
- asio::system_timer _timer;
-};
-
-class AsyncTimerFactoryASIO final : public AsyncTimerFactoryInterface {
-public:
- AsyncTimerFactoryASIO() = default;
-
- std::unique_ptr<AsyncTimerInterface> make(asio::io_service::strand* strand,
- Milliseconds expiration) override;
-
- Date_t now() override;
-};
-
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/async_timer_interface.h b/src/mongo/executor/async_timer_interface.h
index fdfce5b39d8..3076a8f68b6 100644
--- a/src/mongo/executor/async_timer_interface.h
+++ b/src/mongo/executor/async_timer_interface.h
@@ -30,8 +30,6 @@
#include <system_error>
-#include <asio.hpp>
-
#include "mongo/base/disallow_copying.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/time_support.h"
@@ -87,8 +85,7 @@ class AsyncTimerFactoryInterface {
public:
virtual ~AsyncTimerFactoryInterface() = default;
- virtual std::unique_ptr<AsyncTimerInterface> make(asio::io_service::strand* strand,
- Milliseconds expiration) = 0;
+ virtual std::unique_ptr<AsyncTimerInterface> make(Milliseconds expiration) = 0;
virtual Date_t now() = 0;
diff --git a/src/mongo/executor/async_timer_mock.cpp b/src/mongo/executor/async_timer_mock.cpp
index 39c9323612d..87904dd284b 100644
--- a/src/mongo/executor/async_timer_mock.cpp
+++ b/src/mongo/executor/async_timer_mock.cpp
@@ -28,6 +28,7 @@
#include "mongo/executor/async_timer_mock.h"
+#include "mongo/base/system_error.h"
#include "mongo/stdx/memory.h"
namespace mongo {
@@ -40,7 +41,7 @@ const Milliseconds kZeroMilliseconds = Milliseconds(0);
AsyncTimerMockImpl::AsyncTimerMockImpl(Milliseconds expiration) : _timeLeft(expiration) {}
void AsyncTimerMockImpl::cancel() {
- _callAllHandlers(asio::error::operation_aborted);
+ _callAllHandlers(std::error_code(ErrorCodes::CallbackCanceled, mongoErrorCategory()));
}
void AsyncTimerMockImpl::asyncWait(AsyncTimerInterface::Handler handler) {
@@ -95,7 +96,7 @@ void AsyncTimerMockImpl::expireAfter(Milliseconds expiration) {
// Call handlers with a "canceled" error code
for (const auto& handler : tmp) {
- handler(asio::error::operation_aborted);
+ handler(std::error_code(ErrorCodes::CallbackCanceled, mongoErrorCategory()));
}
}
@@ -131,11 +132,6 @@ void AsyncTimerMock::expireAfter(Milliseconds expiration) {
}
std::unique_ptr<AsyncTimerInterface> AsyncTimerFactoryMock::make(Milliseconds expiration) {
- return make(nullptr, expiration);
-}
-
-std::unique_ptr<AsyncTimerInterface> AsyncTimerFactoryMock::make(asio::io_service::strand* strand,
- Milliseconds expiration) {
stdx::lock_guard<stdx::recursive_mutex> lk(_timersMutex);
auto elem = _timers.emplace(std::make_shared<AsyncTimerMockImpl>(expiration));
return stdx::make_unique<AsyncTimerMock>(*elem.first);
diff --git a/src/mongo/executor/async_timer_mock.h b/src/mongo/executor/async_timer_mock.h
index ad77d323f5e..20f6c039f8e 100644
--- a/src/mongo/executor/async_timer_mock.h
+++ b/src/mongo/executor/async_timer_mock.h
@@ -124,13 +124,7 @@ public:
/**
* Create and return a new AsyncTimerMock object.
*/
- std::unique_ptr<AsyncTimerInterface> make(Milliseconds expiration);
-
- /**
- * Create and return a new AsyncTimerMock object.
- */
- std::unique_ptr<AsyncTimerInterface> make(asio::io_service::strand* strand,
- Milliseconds expiration) override;
+ std::unique_ptr<AsyncTimerInterface> make(Milliseconds expiration) override;
/**
* Advance the current "time" and make stale timers expire.
diff --git a/src/mongo/executor/async_timer_mock_test.cpp b/src/mongo/executor/async_timer_mock_test.cpp
deleted file mode 100644
index 53b0dbbb598..00000000000
--- a/src/mongo/executor/async_timer_mock_test.cpp
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/executor/async_timer_mock.h"
-#include "mongo/stdx/future.h"
-#include "mongo/unittest/unittest.h"
-
-namespace mongo {
-namespace executor {
-
-TEST(AsyncTimerMock, BasicTest) {
- AsyncTimerFactoryMock factory;
-
- // Set an early timer
- bool timer1Fired = false;
- auto timer1 = factory.make(Milliseconds(1000));
- timer1->asyncWait([&timer1Fired](std::error_code ec) {
- ASSERT(!ec);
- timer1Fired = true;
- });
-
- // Set a later timer
- bool timer2Fired = false;
- auto timer2 = factory.make(Milliseconds(2000));
- timer2->asyncWait([&timer2Fired](std::error_code ec) {
- ASSERT(!ec);
- timer2Fired = true;
- });
-
- // Advance clock a little, nothing should fire
- factory.fastForward(Milliseconds(500));
- ASSERT(!timer1Fired);
- ASSERT(!timer2Fired);
-
- // Advance clock so early timer fires
- factory.fastForward(Milliseconds(600));
- ASSERT(timer1Fired);
-
- // Second timer should still be waiting
- ASSERT(!timer2Fired);
-
- // Advance clock so second timer fires
- factory.fastForward(Milliseconds(1000));
- ASSERT(timer2Fired);
-}
-
-TEST(AsyncTimerMock, Cancel) {
- AsyncTimerFactoryMock factory;
-
- // Set a timer
- bool fired = false;
- auto timer = factory.make(Milliseconds(100));
- timer->asyncWait([&fired](std::error_code ec) {
- // This timer should have been canceled
- ASSERT(ec);
- ASSERT(ec == asio::error::operation_aborted);
- fired = true;
- });
-
- // Cancel timer
- timer->cancel();
-
- // Ensure that its handler was called
- ASSERT(fired);
-}
-
-TEST(AsyncTimerMock, CancelExpired) {
- AsyncTimerFactoryMock factory;
-
- // Set a timer
- bool fired = false;
- auto timer = factory.make(Milliseconds(100));
- timer->asyncWait([&fired](std::error_code ec) {
- // This timer should NOT have been canceled
- ASSERT(!ec);
- fired = true;
- });
-
- // Fast forward so it expires
- factory.fastForward(Milliseconds(200));
- ASSERT(fired);
-
- fired = false;
-
- // Cancel it, should not fire again
- timer->cancel();
- ASSERT(!fired);
-}
-
-TEST(AsyncTimerMock, Now) {
- AsyncTimerFactoryMock factory;
-
- ASSERT(factory.now() == Date_t::fromMillisSinceEpoch(0));
-
- factory.fastForward(Milliseconds(200));
- ASSERT(factory.now() == Date_t::fromMillisSinceEpoch(200));
-
- factory.fastForward(Milliseconds(1000));
- ASSERT(factory.now() == Date_t::fromMillisSinceEpoch(1200));
-
- factory.fastForward(Milliseconds(1022));
- ASSERT(factory.now() == Date_t::fromMillisSinceEpoch(2222));
-}
-
-TEST(AsyncTimerMock, WorksAfterCancel) {
- AsyncTimerFactoryMock factory;
-
- // Set a timer
- bool fired1 = false;
- auto timer = factory.make(Milliseconds(100));
- timer->asyncWait([&fired1](std::error_code ec) {
- // This timer should have been canceled
- ASSERT(ec);
- ASSERT(ec == asio::error::operation_aborted);
- fired1 = true;
- });
-
- // Cancel timer
- timer->cancel();
-
- // Ensure that its handler was called
- ASSERT(fired1);
-
- fired1 = false;
- bool fired2 = false;
-
- // Add a new callback to to timer.
- timer->asyncWait([&fired2](std::error_code ec) {
- // This timer should NOT have been canceled
- ASSERT(!ec);
- fired2 = true;
- });
-
- // Fast forward so it expires
- factory.fastForward(Milliseconds(100));
- ASSERT(fired2);
- ASSERT(!fired1);
-}
-
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/connection_pool_asio.cpp b/src/mongo/executor/connection_pool_asio.cpp
deleted file mode 100644
index 028316349db..00000000000
--- a/src/mongo/executor/connection_pool_asio.cpp
+++ /dev/null
@@ -1,333 +0,0 @@
-/** * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/executor/connection_pool_asio.h"
-
-#include <asio.hpp>
-
-#include "mongo/executor/async_stream_factory_interface.h"
-#include "mongo/executor/network_interface_asio.h"
-#include "mongo/rpc/factory.h"
-#include "mongo/rpc/legacy_request_builder.h"
-#include "mongo/rpc/reply_interface.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/util/log.h"
-
-namespace mongo {
-namespace executor {
-namespace connection_pool_asio {
-
-ASIOTimer::ASIOTimer(asio::io_service::strand* strand)
- : _strand(strand),
- _impl(strand->get_io_service()),
- _callbackSharedState(std::make_shared<CallbackSharedState>()) {}
-
-ASIOTimer::~ASIOTimer() {
- stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex);
- ++_callbackSharedState->id;
-}
-
-const auto kMaxTimerDuration = duration_cast<Milliseconds>(ASIOTimer::clock_type::duration::max());
-
-void ASIOTimer::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
- _strand->dispatch([this, timeout, cb] {
- _cb = std::move(cb);
-
- cancelTimeout();
-
- try {
- _impl.expires_after(std::min(kMaxTimerDuration, timeout).toSystemDuration());
- } catch (const asio::system_error& ec) {
- severe() << "Failed to set connection pool timer: " << ec.what();
- fassertFailed(40333);
- }
-
- decltype(_callbackSharedState->id) id;
- decltype(_callbackSharedState) sharedState;
-
- {
- stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex);
- id = ++_callbackSharedState->id;
- sharedState = _callbackSharedState;
- }
-
- _impl.async_wait(_strand->wrap([this, id, sharedState](const asio::error_code& error) {
- if (error == asio::error::operation_aborted) {
- return;
- }
-
- stdx::unique_lock<stdx::mutex> lk(sharedState->mutex);
-
- // If the id in shared state doesn't match the id in our callback, it
- // means we were cancelled, but still executed. This can occur if we
- // were cancelled just before our timeout. We need a generation, rather
- // than just a bool here, because we could have been cancelled and
- // another callback set, in which case we shouldn't run and the we
- // should let the other callback execute instead.
- if (sharedState->id == id) {
- auto cb = std::move(_cb);
- lk.unlock();
- cb();
- }
- }));
- });
-}
-
-void ASIOTimer::cancelTimeout() {
- decltype(_callbackSharedState) sharedState;
- decltype(_callbackSharedState->id) id;
- {
- stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex);
- id = ++_callbackSharedState->id;
- sharedState = _callbackSharedState;
- }
- _strand->dispatch([this, id, sharedState] {
- stdx::lock_guard<stdx::mutex> lk(sharedState->mutex);
- if (sharedState->id != id)
- return;
-
- std::error_code ec;
- _impl.cancel(ec);
- if (ec) {
- log() << "Failed to cancel connection pool timer: " << ec.message();
- }
- });
-}
-
-ASIOConnection::ASIOConnection(const HostAndPort& hostAndPort, size_t generation, ASIOImpl* global)
- : _global(global),
- _hostAndPort(hostAndPort),
- _generation(generation),
- _impl(makeAsyncOp(this)),
- _timer(&_impl->strand()) {}
-
-ASIOConnection::~ASIOConnection() {
- if (_impl) {
- stdx::lock_guard<stdx::mutex> lk(_impl->_access->mutex);
- _impl->_access->id++;
- }
-}
-
-void ASIOConnection::indicateSuccess() {
- _status = Status::OK();
-}
-
-void ASIOConnection::indicateFailure(Status status) {
- invariant(!status.isOK());
- _status = std::move(status);
-}
-
-const HostAndPort& ASIOConnection::getHostAndPort() const {
- return _hostAndPort;
-}
-
-bool ASIOConnection::isHealthy() {
- // Check if the remote host has closed the connection.
- return _impl->connection().stream().isOpen();
-}
-
-void ASIOConnection::indicateUsed() {
- // It is illegal to attempt to use a connection after calling indicateFailure().
- invariant(_status.isOK() || _status == ConnectionPool::kConnectionStateUnknown);
- _lastUsed = _global->now();
-}
-
-Date_t ASIOConnection::getLastUsed() const {
- return _lastUsed;
-}
-
-const Status& ASIOConnection::getStatus() const {
- return _status;
-}
-
-size_t ASIOConnection::getGeneration() const {
- return _generation;
-}
-
-std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::makeAsyncOp(ASIOConnection* conn) {
- return stdx::make_unique<NetworkInterfaceASIO::AsyncOp>(
- conn->_global->_impl,
- TaskExecutor::CallbackHandle(),
- RemoteCommandRequest{conn->getHostAndPort(),
- std::string("admin"),
- BSON("isMaster" << 1),
- BSONObj(),
- nullptr},
- [conn](const TaskExecutor::ResponseStatus& rs) {
- auto cb = std::move(conn->_setupCallback);
- cb(conn, rs.status);
- },
- conn->_global->now());
-}
-
-Message ASIOConnection::makeIsMasterRequest(ASIOConnection* conn) {
- return rpc::legacyRequestFromOpMsgRequest(
- OpMsgRequest::fromDBAndBody("admin", BSON("isMaster" << 1)));
-}
-
-void ASIOConnection::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
- _timer.setTimeout(timeout, std::move(cb));
-}
-
-void ASIOConnection::cancelTimeout() {
- _timer.cancelTimeout();
-}
-
-void ASIOConnection::setup(Milliseconds timeout, SetupCallback cb) {
- _impl->strand().dispatch([this, timeout, cb] {
- _setupCallback = [this, cb](ConnectionInterface* ptr, Status status) {
- {
- stdx::lock_guard<stdx::mutex> lk(_impl->_access->mutex);
- _impl->_access->id++;
-
- // If our connection timeout callback ran but wasn't the reason we exited
- // the state machine, clear any TIMED_OUT state.
- if (status.isOK()) {
- _impl->_transitionToState_inlock(
- NetworkInterfaceASIO::AsyncOp::State::kUninitialized);
- _impl->_transitionToState_inlock(
- NetworkInterfaceASIO::AsyncOp::State::kInProgress);
- _impl->_transitionToState_inlock(
- NetworkInterfaceASIO::AsyncOp::State::kFinished);
- }
- }
-
- cancelTimeout();
- cb(ptr, status);
- };
-
- // Capturing the shared access pad and generation before calling setTimeout gives us enough
- // information to avoid calling the timer if we shouldn't without needing any other
- // resources that might have been cleaned up.
- decltype(_impl->_access) access;
- std::size_t generation;
- {
- stdx::lock_guard<stdx::mutex> lk(_impl->_access->mutex);
- access = _impl->_access;
- generation = access->id;
- }
-
- // Actually timeout setup
- setTimeout(timeout, [this, access, generation] {
- stdx::lock_guard<stdx::mutex> lk(access->mutex);
- if (generation != access->id) {
- // The operation has been cleaned up, do not access.
- return;
- }
- _impl->timeOut_inlock();
- });
-
- _global->_impl->_connect(_impl.get());
- });
-}
-
-void ASIOConnection::resetToUnknown() {
- _status = ConnectionPool::kConnectionStateUnknown;
-}
-
-void ASIOConnection::refresh(Milliseconds timeout, RefreshCallback cb) {
- _impl->strand().dispatch([this, timeout, cb] {
- auto op = _impl.get();
-
- // We clear state transitions because we're re-running a portion of the asio state machine
- // without entering in startCommand (which would call this for us).
- op->clearStateTransitions();
-
- _refreshCallback = std::move(cb);
-
- // Actually timeout refreshes
- setTimeout(timeout, [this] { _impl->connection().stream().cancel(); });
-
- // Our pings are isMaster's
- auto beginStatus = op->beginCommand(makeIsMasterRequest(this), _hostAndPort);
- if (!beginStatus.isOK()) {
- auto cb = std::move(_refreshCallback);
- cb(this, beginStatus);
- return;
- }
-
- // If we fail during refresh, the _onFinish function of the AsyncOp will get called. As such
- // we
- // need to intercept those calls so we can capture them. This will get cleared out when we
- // fill
- // in the real onFinish in startCommand.
- op->setOnFinish([this](RemoteCommandResponse failedResponse) {
- invariant(!failedResponse.isOK());
- auto cb = std::move(_refreshCallback);
- cb(this, failedResponse.status);
- });
-
- op->_inRefresh = true;
-
- _global->_impl->_asyncRunCommand(op, [this, op](std::error_code ec, size_t bytes) {
- cancelTimeout();
-
- auto cb = std::move(_refreshCallback);
-
- if (ec)
- return cb(this, Status(ErrorCodes::HostUnreachable, ec.message()));
-
- op->_inRefresh = false;
- cb(this, Status::OK());
- });
- });
-}
-
-std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::releaseAsyncOp() {
- {
- stdx::lock_guard<stdx::mutex> lk(_impl->_access->mutex);
- _impl->_access->id++;
- }
- return std::move(_impl);
-}
-
-void ASIOConnection::bindAsyncOp(std::unique_ptr<NetworkInterfaceASIO::AsyncOp> op) {
- _impl = std::move(op);
-}
-
-ASIOImpl::ASIOImpl(NetworkInterfaceASIO* impl) : _impl(impl) {}
-
-Date_t ASIOImpl::now() {
- return _impl->now();
-}
-
-std::unique_ptr<ConnectionPool::TimerInterface> ASIOImpl::makeTimer() {
- return stdx::make_unique<ASIOTimer>(&_impl->_strand);
-}
-
-std::shared_ptr<ConnectionPool::ConnectionInterface> ASIOImpl::makeConnection(
- const HostAndPort& hostAndPort, size_t generation) {
- return std::make_shared<ASIOConnection>(hostAndPort, generation, this);
-}
-
-} // namespace connection_pool_asio
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/connection_pool_asio.h b/src/mongo/executor/connection_pool_asio.h
deleted file mode 100644
index 9bb1cae50f5..00000000000
--- a/src/mongo/executor/connection_pool_asio.h
+++ /dev/null
@@ -1,138 +0,0 @@
-/** * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <asio/system_timer.hpp>
-
-#include <memory>
-
-#include "mongo/executor/async_stream_interface.h"
-#include "mongo/executor/connection_pool.h"
-#include "mongo/executor/network_interface.h"
-#include "mongo/executor/network_interface_asio.h"
-#include "mongo/stdx/mutex.h"
-
-namespace mongo {
-namespace executor {
-namespace connection_pool_asio {
-
-/**
- * Implements connection pool timers on top of asio
- */
-class ASIOTimer final : public ConnectionPool::TimerInterface {
-public:
- using clock_type = asio::system_timer::clock_type;
-
- ASIOTimer(asio::io_service::strand* strand);
- ~ASIOTimer();
-
- void setTimeout(Milliseconds timeout, TimeoutCallback cb) override;
- void cancelTimeout() override;
-
-private:
- struct CallbackSharedState {
- stdx::mutex mutex;
- std::size_t id = 0;
- };
-
- TimeoutCallback _cb;
- asio::io_service::strand* const _strand;
- asio::basic_waitable_timer<clock_type> _impl;
- std::shared_ptr<CallbackSharedState> _callbackSharedState;
-};
-
-/**
- * Implements connection pool connections on top of asio
- *
- * Owns an async op when it's out of the pool
- */
-class ASIOConnection final : public ConnectionPool::ConnectionInterface {
-public:
- ASIOConnection(const HostAndPort& hostAndPort, size_t generation, ASIOImpl* global);
- ~ASIOConnection();
-
- void indicateSuccess() override;
- void indicateUsed() override;
- void indicateFailure(Status status) override;
- const HostAndPort& getHostAndPort() const override;
-
- std::unique_ptr<NetworkInterfaceASIO::AsyncOp> releaseAsyncOp();
- void bindAsyncOp(std::unique_ptr<NetworkInterfaceASIO::AsyncOp> op);
-
- bool isHealthy() override;
-
-private:
- Date_t getLastUsed() const override;
- const Status& getStatus() const override;
-
- void setTimeout(Milliseconds timeout, TimeoutCallback cb) override;
- void cancelTimeout() override;
-
- void setup(Milliseconds timeout, SetupCallback cb) override;
- void resetToUnknown() override;
- void refresh(Milliseconds timeout, RefreshCallback cb) override;
-
- size_t getGeneration() const override;
-
- static std::unique_ptr<NetworkInterfaceASIO::AsyncOp> makeAsyncOp(ASIOConnection* conn);
- static Message makeIsMasterRequest(ASIOConnection* conn);
-
-private:
- SetupCallback _setupCallback;
- RefreshCallback _refreshCallback;
- ASIOImpl* const _global;
- Date_t _lastUsed;
- Status _status = ConnectionPool::kConnectionStateUnknown;
- HostAndPort _hostAndPort;
- size_t _generation;
- std::unique_ptr<NetworkInterfaceASIO::AsyncOp> _impl;
- ASIOTimer _timer;
-};
-
-/**
- * Implementions connection pool implementation for asio
- */
-class ASIOImpl final : public ConnectionPool::DependentTypeFactoryInterface {
- friend class ASIOConnection;
-
-public:
- ASIOImpl(NetworkInterfaceASIO* impl);
-
- std::shared_ptr<ConnectionPool::ConnectionInterface> makeConnection(
- const HostAndPort& hostAndPort, size_t generation) override;
- std::unique_ptr<ConnectionPool::TimerInterface> makeTimer() override;
-
- Date_t now() override;
-
-private:
- NetworkInterfaceASIO* const _impl;
-};
-
-} // namespace connection_pool_asio
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/connection_pool_asio_integration_test.cpp b/src/mongo/executor/connection_pool_asio_integration_test.cpp
deleted file mode 100644
index 9db13ae302e..00000000000
--- a/src/mongo/executor/connection_pool_asio_integration_test.cpp
+++ /dev/null
@@ -1,341 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/executor/connection_pool_asio.h"
-
-#include "mongo/client/connection_string.h"
-#include "mongo/executor/async_stream_factory.h"
-#include "mongo/executor/async_timer_asio.h"
-#include "mongo/executor/connection_pool_stats.h"
-#include "mongo/executor/network_connection_hook.h"
-#include "mongo/executor/network_interface_asio_test_utils.h"
-#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/stdx/future.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/unittest/integration_test.h"
-#include "mongo/unittest/unittest.h"
-#include "mongo/util/fail_point_service.h"
-#include "mongo/util/scopeguard.h"
-#include "mongo/util/time_support.h"
-
-namespace mongo {
-namespace executor {
-namespace {
-
-class MyNetworkConnectionHook : public NetworkConnectionHook {
-public:
- Status validateHost(const HostAndPort& remoteHost,
- const RemoteCommandResponse& isMasterReply) override {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
-
- _count++;
-
- return Status::OK();
- }
-
- StatusWith<boost::optional<RemoteCommandRequest>> makeRequest(
- const HostAndPort& remoteHost) override {
- return StatusWith<boost::optional<RemoteCommandRequest>>(boost::none);
- }
-
- Status handleReply(const HostAndPort& remoteHost, RemoteCommandResponse&& response) override {
- MONGO_UNREACHABLE;
- }
-
- static int count() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
-
- return _count;
- }
-
-private:
- static stdx::mutex _mutex;
- static int _count;
-};
-
-stdx::mutex MyNetworkConnectionHook::_mutex;
-int MyNetworkConnectionHook::_count = 0;
-
-TEST(ConnectionPoolASIO, TestPing) {
- auto fixture = unittest::getFixtureConnectionString();
-
- NetworkInterfaceASIO::Options options;
- options.streamFactory = stdx::make_unique<AsyncStreamFactory>();
- options.networkConnectionHook = stdx::make_unique<MyNetworkConnectionHook>();
- options.connectionPoolOptions.maxConnections = 10;
- options.timerFactory = stdx::make_unique<AsyncTimerFactoryASIO>();
- NetworkInterfaceASIO net{std::move(options)};
-
- net.startup();
- auto guard = MakeGuard([&] { net.shutdown(); });
-
- const int N = 50;
-
- std::array<stdx::thread, N> threads;
-
- for (auto& thread : threads) {
- thread = stdx::thread([&net, &fixture]() {
- auto status = Status::OK();
- Deferred<RemoteCommandResponse> deferred;
-
- RemoteCommandRequest request{
- fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr};
- net.startCommand(
- makeCallbackHandle(),
- request,
- [&deferred](RemoteCommandResponse resp) { deferred.emplace(std::move(resp)); })
- .transitional_ignore();
-
- ASSERT_OK(deferred.get().status);
- });
- }
-
- for (auto&& thread : threads) {
- thread.join();
- }
-
- ASSERT_LTE(MyNetworkConnectionHook::count(), 10);
-}
-
-/**
- * This test verifies a fix for SERVER-22391, where we raced if a new request
- * came in at the same time a host timeout was triggering.
- */
-TEST(ConnectionPoolASIO, TestHostTimeoutRace) {
- auto fixture = unittest::getFixtureConnectionString();
-
- NetworkInterfaceASIO::Options options;
- options.streamFactory = stdx::make_unique<AsyncStreamFactory>();
- options.connectionPoolOptions.hostTimeout = Milliseconds(1);
- options.timerFactory = stdx::make_unique<AsyncTimerFactoryASIO>();
- NetworkInterfaceASIO net{std::move(options)};
-
- net.startup();
- auto guard = MakeGuard([&] { net.shutdown(); });
-
- for (int i = 0; i < 1000; i++) {
- Deferred<RemoteCommandResponse> deferred;
- RemoteCommandRequest request{
- fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr};
- net.startCommand(makeCallbackHandle(),
- request,
- [&](RemoteCommandResponse resp) { deferred.emplace(std::move(resp)); })
- .transitional_ignore();
-
- ASSERT_OK(deferred.get().status);
- sleepmillis(1);
- }
-}
-
-
-/**
- * Verify that a connections that timeout immediately don't invariant.
- */
-TEST(ConnectionPoolASIO, ConnSetupTimeout) {
- auto fixture = unittest::getFixtureConnectionString();
-
- NetworkInterfaceASIO::Options options;
- options.streamFactory = stdx::make_unique<AsyncStreamFactory>();
- options.timerFactory = stdx::make_unique<AsyncTimerFactoryASIO>();
- options.connectionPoolOptions.refreshTimeout = Milliseconds(-2);
- NetworkInterfaceASIO net{std::move(options)};
-
- net.startup();
- auto guard = MakeGuard([&] { net.shutdown(); });
-
- Deferred<RemoteCommandResponse> deferred;
- RemoteCommandRequest request{
- fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr};
- net.startCommand(makeCallbackHandle(),
- request,
- [&](RemoteCommandResponse resp) { deferred.emplace(std::move(resp)); })
- .transitional_ignore();
-
- ASSERT_EQ(deferred.get().status.code(), ErrorCodes::NetworkInterfaceExceededTimeLimit);
-}
-
-/**
- * Verify that connection refreshes actually occur, and that they drop down the totalAvailable
- * correctly. Verifies SERVER-25006
- */
-TEST(ConnectionPoolASIO, ConnRefreshHappens) {
- auto fixture = unittest::getFixtureConnectionString();
-
- NetworkInterfaceASIO::Options options;
- options.streamFactory = stdx::make_unique<AsyncStreamFactory>();
- options.timerFactory = stdx::make_unique<AsyncTimerFactoryASIO>();
- options.connectionPoolOptions.refreshRequirement = Milliseconds(10);
- NetworkInterfaceASIO net{std::move(options)};
-
- net.startup();
- auto guard = MakeGuard([&] { net.shutdown(); });
-
- std::array<Deferred<RemoteCommandResponse>, 10> deferreds;
-
- RemoteCommandRequest request{fixture.getServers()[0],
- "admin",
- BSON("sleep" << 1 << "lock"
- << "none"
- << "secs"
- << 2),
- BSONObj(),
- nullptr};
-
- for (auto& deferred : deferreds) {
- net.startCommand(makeCallbackHandle(),
- request,
- [&](RemoteCommandResponse resp) { deferred.emplace(std::move(resp)); })
- .transitional_ignore();
- }
-
- for (auto& deferred : deferreds) {
- ASSERT_EQ(deferred.get().isOK(), true);
- }
-
- sleepmillis(1000);
-
- ConnectionPoolStats cps;
- net.appendConnectionStats(&cps);
-
- ASSERT_LTE(cps.totalAvailable + cps.totalInUse, 1u);
- ASSERT_EQ(cps.totalCreated, 10u);
-}
-
-/**
- * Verify that when a refresh fails, it doesn't trigger an invariant
- */
-TEST(ConnectionPoolASIO, ConnRefreshSurvivesFailure) {
- auto fixture = unittest::getFixtureConnectionString();
-
- NetworkInterfaceASIO::Options options;
- options.streamFactory = stdx::make_unique<AsyncStreamFactory>();
- options.timerFactory = stdx::make_unique<AsyncTimerFactoryASIO>();
- options.connectionPoolOptions.refreshRequirement = Milliseconds(0);
- NetworkInterfaceASIO net{std::move(options)};
-
- net.startup();
- auto guard = MakeGuard([&] { net.shutdown(); });
-
- Deferred<RemoteCommandResponse> deferred;
-
- RemoteCommandRequest request{
- fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr};
- net.startCommand(makeCallbackHandle(),
- request,
- [&](RemoteCommandResponse resp) { deferred.emplace(std::move(resp)); })
- .transitional_ignore();
-
- deferred.get();
-
- getGlobalFailPointRegistry()
- ->getFailPoint("NetworkInterfaceASIOasyncRunCommandFail")
- ->setMode(FailPoint::nTimes, 1);
-
- sleepmillis(1000);
-
- ConnectionPoolStats cps;
- net.appendConnectionStats(&cps);
-
- ASSERT_EQ(cps.totalAvailable, 0u);
- ASSERT_EQ(cps.totalCreated, 1u);
-}
-
-/**
- * Tests that thrashing the timer while calls to setup are occurring doesn't crash. This could
- * occur if a setup connection goes through, but the timer thread is too over-worked to cancel the
- * timers before they're invoked
- */
-TEST(ConnectionPoolASIO, ConnSetupSurvivesFailure) {
- const int kNumThreads = 8;
- const int kNumOps = 1000;
-
- auto fixture = unittest::getFixtureConnectionString();
-
- NetworkInterfaceASIO::Options options;
- options.streamFactory = stdx::make_unique<AsyncStreamFactory>();
- options.timerFactory = stdx::make_unique<AsyncTimerFactoryASIO>();
- options.connectionPoolOptions.refreshTimeout = Seconds(1);
- options.connectionPoolOptions.maxConnections = kNumThreads;
- NetworkInterfaceASIO net{std::move(options)};
-
- net.startup();
- auto guard = MakeGuard([&] { net.shutdown(); });
-
- AtomicWord<size_t> unfinished(kNumThreads * kNumOps);
- AtomicWord<size_t> unstarted(kNumThreads * kNumOps);
-
- std::array<stdx::thread, kNumThreads> threads;
- stdx::mutex mutex;
- stdx::condition_variable condvar;
-
- for (auto& thread : threads) {
- thread = stdx::thread([&] {
- for (int i = 0; i < kNumOps; i++) {
- RemoteCommandRequest request{fixture.getServers()[0],
- "admin",
- BSON("sleep" << 1 << "lock"
- << "none"
- << "secs"
- << 3),
- BSONObj(),
- nullptr};
- net.startCommand(makeCallbackHandle(),
- request,
- [&](RemoteCommandResponse resp) {
- if (!unfinished.subtractAndFetch(1)) {
- condvar.notify_one();
- }
- })
- .transitional_ignore();
- unstarted.subtractAndFetch(1);
- }
- });
- }
-
- stdx::thread timerThrasher([&] {
- while (unstarted.load()) {
- net.setAlarm(Date_t::now() + Seconds(1), [] {}).transitional_ignore();
- }
- });
-
-
- for (auto& thread : threads) {
- thread.join();
- }
-
- stdx::unique_lock<stdx::mutex> lk(mutex);
- condvar.wait(lk, [&] { return !unfinished.load(); });
-
- timerThrasher.join();
-}
-
-} // namespace
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/network_connection_hook.h b/src/mongo/executor/network_connection_hook.h
index f76eed79b2e..993adcd1798 100644
--- a/src/mongo/executor/network_connection_hook.h
+++ b/src/mongo/executor/network_connection_hook.h
@@ -30,6 +30,8 @@
#include <boost/optional.hpp>
+#include "mongo/bson/bsonobj.h"
+
namespace mongo {
class Status;
@@ -65,6 +67,7 @@ public:
* std::terminate.
*/
virtual Status validateHost(const HostAndPort& remoteHost,
+ const BSONObj& isMasterRequest,
const RemoteCommandResponse& isMasterReply) = 0;
/**
diff --git a/src/mongo/executor/network_interface.cpp b/src/mongo/executor/network_interface.cpp
index e19e9284a13..3bf3cb8445b 100644
--- a/src/mongo/executor/network_interface.cpp
+++ b/src/mongo/executor/network_interface.cpp
@@ -41,6 +41,8 @@ const unsigned int NetworkInterface::kMessagingPortKeepOpen;
NetworkInterface::NetworkInterface() {}
NetworkInterface::~NetworkInterface() {}
+MONGO_FP_DECLARE(networkInterfaceDiscardCommandsBeforeAcquireConn);
+MONGO_FP_DECLARE(networkInterfaceDiscardCommandsAfterAcquireConn);
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h
index b333813df14..eb740416596 100644
--- a/src/mongo/executor/network_interface.h
+++ b/src/mongo/executor/network_interface.h
@@ -35,6 +35,7 @@
#include "mongo/executor/task_executor.h"
#include "mongo/stdx/functional.h"
#include "mongo/transport/baton.h"
+#include "mongo/util/fail_point_service.h"
#include "mongo/util/future.h"
namespace mongo {
@@ -43,6 +44,9 @@ class BSONObjBuilder;
namespace executor {
+MONGO_FP_FORWARD_DECLARE(networkInterfaceDiscardCommandsBeforeAcquireConn);
+MONGO_FP_FORWARD_DECLARE(networkInterfaceDiscardCommandsAfterAcquireConn);
+
/**
* Interface to networking for use by TaskExecutor implementations.
*/
@@ -119,6 +123,18 @@ public:
*/
virtual std::string getHostName() = 0;
+ struct Counters {
+ uint64_t canceled = 0;
+ uint64_t timedOut = 0;
+ uint64_t failed = 0;
+ uint64_t succeeded = 0;
+ };
+ /*
+ * Returns a copy of the operation counters (see struct Counters above). This method should
+ * only be used in tests, and will invariant if getTestCommands() returns false.
+ */
+ virtual Counters getCounters() const = 0;
+
/**
* Starts asynchronous execution of the command described by "request".
*
diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp
deleted file mode 100644
index 7eabcfbb71e..00000000000
--- a/src/mongo/executor/network_interface_asio.cpp
+++ /dev/null
@@ -1,527 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/executor/network_interface_asio.h"
-
-#include <asio/system_timer.hpp>
-
-#include <utility>
-
-#include "mongo/executor/async_stream_factory.h"
-#include "mongo/executor/async_stream_interface.h"
-#include "mongo/executor/async_timer_asio.h"
-#include "mongo/executor/async_timer_interface.h"
-#include "mongo/executor/async_timer_mock.h"
-#include "mongo/executor/connection_pool_asio.h"
-#include "mongo/executor/connection_pool_stats.h"
-#include "mongo/rpc/metadata/metadata_hook.h"
-#include "mongo/stdx/chrono.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/util/concurrency/idle_thread_block.h"
-#include "mongo/util/concurrency/thread_name.h"
-#include "mongo/util/log.h"
-#include "mongo/util/net/socket_utils.h"
-#include "mongo/util/net/ssl_manager.h"
-#include "mongo/util/table_formatter.h"
-#include "mongo/util/time_support.h"
-
-namespace mongo {
-namespace executor {
-
-namespace {
-const std::size_t kIOServiceWorkers = 1;
-} // namespace
-
-NetworkInterfaceASIO::Options::Options() = default;
-
-NetworkInterfaceASIO::NetworkInterfaceASIO(Options options)
- : _options(std::move(options)),
- _io_service(),
- _metadataHook(std::move(_options.metadataHook)),
- _hook(std::move(_options.networkConnectionHook)),
- _state(State::kReady),
- _timerFactory(std::move(_options.timerFactory)),
- _streamFactory(std::move(_options.streamFactory)),
- _connectionPool(stdx::make_unique<connection_pool_asio::ASIOImpl>(this),
- _options.instanceName,
- _options.connectionPoolOptions),
- _isExecutorRunnable(false),
- _strand(_io_service) {
- invariant(_timerFactory);
-}
-
-std::string NetworkInterfaceASIO::getDiagnosticString() {
- stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
- return _getDiagnosticString_inlock(nullptr);
-}
-
-std::string NetworkInterfaceASIO::_getDiagnosticString_inlock(AsyncOp* currentOp) {
- str::stream output;
- std::vector<TableRow> rows;
-
- output << "\nNetworkInterfaceASIO Operations' Diagnostic:\n";
- rows.push_back({"Operation:", "Count:"});
- rows.push_back({"Connecting", std::to_string(_inGetConnection.size())});
- rows.push_back({"In Progress", std::to_string(_inProgress.size())});
- rows.push_back({"Succeeded", std::to_string(getNumSucceededOps())});
- rows.push_back({"Canceled", std::to_string(getNumCanceledOps())});
- rows.push_back({"Failed", std::to_string(getNumFailedOps())});
- rows.push_back({"Timed Out", std::to_string(getNumTimedOutOps())});
- output << toTable(rows);
-
- if (_inProgress.size() > 0) {
- rows.clear();
- rows.push_back(AsyncOp::kFieldLabels);
-
- // Push AsyncOps
- for (auto&& kv : _inProgress) {
- auto row = kv.first->getStringFields();
- if (currentOp) {
- // If this is the AsyncOp we blew up on, mark with an asterisk
- if (*currentOp == *(kv.first)) {
- row[0] = "*";
- }
- }
-
- rows.push_back(row);
- }
-
- // Format as a table
- output << "\n" << toTable(rows);
- }
-
- output << "\n";
-
- return output;
-}
-
-uint64_t NetworkInterfaceASIO::getNumCanceledOps() {
- return _numCanceledOps.load();
-}
-
-uint64_t NetworkInterfaceASIO::getNumFailedOps() {
- return _numFailedOps.load();
-}
-
-uint64_t NetworkInterfaceASIO::getNumSucceededOps() {
- return _numSucceededOps.load();
-}
-
-uint64_t NetworkInterfaceASIO::getNumTimedOutOps() {
- return _numTimedOutOps.load();
-}
-
-void NetworkInterfaceASIO::appendConnectionStats(ConnectionPoolStats* stats) const {
- _connectionPool.appendConnectionStats(stats);
-}
-
-std::string NetworkInterfaceASIO::getHostName() {
- return getHostNameCached();
-}
-
-void NetworkInterfaceASIO::startup() {
- _serviceRunners.resize(kIOServiceWorkers);
- for (std::size_t i = 0; i < kIOServiceWorkers; ++i) {
- _serviceRunners[i] = stdx::thread([this, i]() {
- setThreadName(_options.instanceName + "-" + std::to_string(i));
- try {
- LOG(2) << "The NetworkInterfaceASIO worker thread is spinning up";
- asio::io_service::work work(_io_service);
- std::error_code ec;
- _io_service.run(ec);
- if (ec) {
- severe() << "Failure in _io_service.run(): " << ec.message();
- fassertFailed(40335);
- }
- } catch (...) {
- severe() << "Uncaught exception in NetworkInterfaceASIO IO "
- "worker thread of type: "
- << exceptionToStatus();
- fassertFailed(28820);
- }
- });
- };
- _state.store(State::kRunning);
-}
-
-void NetworkInterfaceASIO::shutdown() {
- _state.store(State::kShutdown);
- _io_service.stop();
- for (auto&& worker : _serviceRunners) {
- worker.join();
- }
- LOG(2) << "NetworkInterfaceASIO shutdown successfully";
-}
-
-void NetworkInterfaceASIO::waitForWork() {
- stdx::unique_lock<stdx::mutex> lk(_executorMutex);
- // TODO: This can be restructured with a lambda.
- while (!_isExecutorRunnable) {
- MONGO_IDLE_THREAD_BLOCK;
- _isExecutorRunnableCondition.wait(lk);
- }
- _isExecutorRunnable = false;
-}
-
-void NetworkInterfaceASIO::waitForWorkUntil(Date_t when) {
- stdx::unique_lock<stdx::mutex> lk(_executorMutex);
- // TODO: This can be restructured with a lambda.
- while (!_isExecutorRunnable) {
- const Milliseconds waitTime(when - now());
- if (waitTime <= Milliseconds(0)) {
- break;
- }
- MONGO_IDLE_THREAD_BLOCK;
- _isExecutorRunnableCondition.wait_for(lk, waitTime.toSystemDuration());
- }
- _isExecutorRunnable = false;
-}
-
-void NetworkInterfaceASIO::signalWorkAvailable() {
- stdx::unique_lock<stdx::mutex> lk(_executorMutex);
- _signalWorkAvailable_inlock();
-}
-
-void NetworkInterfaceASIO::_signalWorkAvailable_inlock() {
- if (!_isExecutorRunnable) {
- _isExecutorRunnable = true;
- _isExecutorRunnableCondition.notify_one();
- }
-}
-
-Date_t NetworkInterfaceASIO::now() {
- return _timerFactory->now();
-}
-
-namespace {
-
-Status attachMetadataIfNeeded(RemoteCommandRequest& request,
- rpc::EgressMetadataHook* metadataHook) {
-
- // Append the metadata of the request with metadata from the metadata hook
- // if a hook is installed
- if (metadataHook) {
- BSONObjBuilder augmentedBob(std::move(request.metadata));
-
- auto writeStatus = callNoexcept(*metadataHook,
- &rpc::EgressMetadataHook::writeRequestMetadata,
- request.opCtx,
- &augmentedBob);
- if (!writeStatus.isOK()) {
- return writeStatus;
- }
-
- request.metadata = augmentedBob.obj();
- }
-
- return Status::OK();
-}
-
-} // namespace
-
-Status NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
- RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish,
- const transport::BatonHandle& baton) {
- MONGO_ASIO_INVARIANT(onFinish, "Invalid completion function");
- {
- stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
- const auto insertResult = _inGetConnection.emplace(cbHandle);
- // We should never see the same CallbackHandle added twice
- MONGO_ASIO_INVARIANT_INLOCK(insertResult.second, "Same CallbackHandle added twice");
- }
-
- if (inShutdown()) {
- return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceASIO shutdown in progress"};
- }
-
- LOG(2) << "startCommand: " << redact(request.toString());
-
- auto getConnectionStartTime = now();
-
- auto statusMetadata = attachMetadataIfNeeded(request, _metadataHook.get());
- if (!statusMetadata.isOK()) {
- return statusMetadata;
- }
-
- auto nextStep = [this, getConnectionStartTime, cbHandle, request, onFinish](
- StatusWith<ConnectionPool::ConnectionHandle> swConn) {
-
- if (!swConn.isOK()) {
- LOG(2) << "Failed to get connection from pool for request " << request.id << ": "
- << swConn.getStatus();
-
- bool wasPreviouslyCanceled = false;
- {
- stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
- wasPreviouslyCanceled = _inGetConnection.erase(cbHandle) == 0;
- }
-
- Status status = wasPreviouslyCanceled
- ? Status(ErrorCodes::CallbackCanceled, "Callback canceled")
- : swConn.getStatus();
- if (ErrorCodes::isExceededTimeLimitError(status.code())) {
- _numTimedOutOps.fetchAndAdd(1);
- }
- if (status.code() != ErrorCodes::CallbackCanceled) {
- _numFailedOps.fetchAndAdd(1);
- }
-
- onFinish({status, now() - getConnectionStartTime});
- signalWorkAvailable();
- return;
- }
-
- auto conn = static_cast<connection_pool_asio::ASIOConnection*>(swConn.getValue().get());
-
- AsyncOp* op = nullptr;
-
- stdx::unique_lock<stdx::mutex> lk(_inProgressMutex);
-
- const auto eraseCount = _inGetConnection.erase(cbHandle);
-
- // If we didn't find the request, we've been canceled
- if (eraseCount == 0) {
- lk.unlock();
-
- onFinish({ErrorCodes::CallbackCanceled,
- "Callback canceled",
- now() - getConnectionStartTime});
-
- // Though we were canceled, we know that the stream is fine, so indicate success.
- conn->indicateSuccess();
-
- signalWorkAvailable();
-
- return;
- }
-
- // We can't release the AsyncOp until we know we were not canceled.
- auto ownedOp = conn->releaseAsyncOp();
- op = ownedOp.get();
-
- // This AsyncOp may be recycled. We expect timeout and canceled to be clean.
- // If this op was most recently used to connect, its state transitions won't have been
- // reset, so we do that here.
- MONGO_ASIO_INVARIANT_INLOCK(!op->canceled(), "AsyncOp has dirty canceled flag", op);
- MONGO_ASIO_INVARIANT_INLOCK(!op->timedOut(), "AsyncOp has dirty timeout flag", op);
- op->clearStateTransitions();
-
- // Now that we're inProgress, an external cancel can touch our op, but
- // not until we release the inProgressMutex.
- _inProgress.emplace(op, std::move(ownedOp));
-
- op->_cbHandle = std::move(cbHandle);
- op->_request = std::move(request);
- op->_onFinish = std::move(onFinish);
- op->_connectionPoolHandle = std::move(swConn.getValue());
- op->startProgress(getConnectionStartTime);
-
- // This ditches the lock and gets us onto the strand (so we're
- // threadsafe)
- op->_strand.post([this, op, getConnectionStartTime] {
- const auto timeout = op->_request.timeout;
-
- // Set timeout now that we have the correct request object
- if (timeout != RemoteCommandRequest::kNoTimeout) {
- // Subtract the time it took to get the connection from the pool from the request
- // timeout.
- auto getConnectionDuration = now() - getConnectionStartTime;
- if (getConnectionDuration >= timeout) {
- // We only assume that the request timer is guaranteed to fire *after* the
- // timeout duration - but make no stronger assumption. It is thus possible that
- // we have already exceeded the timeout. In this case we timeout the operation
- // manually.
- std::stringstream msg;
- msg << "Remote command timed out while waiting to get a connection from the "
- << "pool, took " << getConnectionDuration << ", timeout was set to "
- << timeout;
- auto rs = ResponseStatus(ErrorCodes::NetworkInterfaceExceededTimeLimit,
- msg.str(),
- getConnectionDuration);
- return _completeOperation(op, rs);
- }
-
- // The above conditional guarantees that the adjusted timeout will never underflow.
- MONGO_ASIO_INVARIANT(timeout > getConnectionDuration, "timeout underflowed", op);
- const auto adjustedTimeout = timeout - getConnectionDuration;
- const auto requestId = op->_request.id;
-
- try {
- op->_timeoutAlarm =
- op->_owner->_timerFactory->make(&op->_strand, adjustedTimeout);
- } catch (std::system_error& e) {
- severe() << "Failed to construct timer for AsyncOp: " << e.what();
- fassertFailed(40334);
- }
-
- std::shared_ptr<AsyncOp::AccessControl> access;
- std::size_t generation;
- {
- stdx::lock_guard<stdx::mutex> lk(op->_access->mutex);
- access = op->_access;
- generation = access->id;
- }
-
- op->_timeoutAlarm->asyncWait([op, access, generation, requestId, adjustedTimeout](
- std::error_code ec) {
- // We must pass a check for safe access before using op inside the
- // callback or we may attempt access on an invalid pointer.
- stdx::lock_guard<stdx::mutex> lk(access->mutex);
- if (generation != access->id) {
- // The operation has been cleaned up, do not access.
- return;
- }
-
- if (!ec) {
- LOG(2) << "Request " << requestId << " timed out"
- << ", adjusted timeout after getting connection from pool was "
- << adjustedTimeout << ", op was " << redact(op->toString());
-
- op->timeOut_inlock();
- } else {
- LOG(2) << "Failed to time request " << requestId << "out: " << ec.message()
- << ", op was " << redact(op->toString());
- }
- });
- }
-
- _beginCommunication(op);
- });
- };
-
- _connectionPool.get(request.target, request.timeout, nextStep);
- return Status::OK();
-}
-
-void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const transport::BatonHandle& baton) {
- stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
-
- // If we found a matching cbHandle in _inGetConnection, then
- // simply removing it has the same effect as cancelling it, so we
- // can just return.
- if (_inGetConnection.erase(cbHandle) != 0) {
- _numCanceledOps.fetchAndAdd(1);
- return;
- }
-
- // TODO: This linear scan is unfortunate. It is here because our
- // primary data structure is to keep the AsyncOps in an
- // unordered_map by pointer, but here we only have the
- // callback. We could keep two data structures at the risk of
- // having them diverge.
- for (auto&& kv : _inProgress) {
- if (kv.first->cbHandle() == cbHandle) {
- kv.first->cancel();
- _numCanceledOps.fetchAndAdd(1);
- break;
- }
- }
-}
-
-Status NetworkInterfaceASIO::setAlarm(Date_t when,
- const stdx::function<void()>& action,
- const transport::BatonHandle& baton) {
- if (inShutdown()) {
- return {ErrorCodes::ShutdownInProgress, "NetworkInterfaceASIO shutdown in progress"};
- }
-
- std::shared_ptr<asio::system_timer> alarm;
-
- try {
- auto timeLeft = when - now();
- // "alarm" must stay alive until it expires, hence the shared_ptr.
- alarm = std::make_shared<asio::system_timer>(_io_service, timeLeft.toSystemDuration());
- } catch (...) {
- return exceptionToStatus();
- }
-
- alarm->async_wait([alarm, this, action, when, baton](std::error_code ec) {
- const auto nowValue = now();
- if (nowValue < when) {
- warning() << "ASIO alarm returned early. Expected at: " << when
- << ", fired at: " << nowValue;
- const auto status = setAlarm(when, action, baton);
- if ((!status.isOK()) && (status.code() != ErrorCodes::ShutdownInProgress)) {
- fassertFailedWithStatus(40383, status);
- }
- return;
- }
- if (!ec) {
- return action();
- } else if (ec != asio::error::operation_aborted) {
- // When the network interface is shut down, it will cancel all pending
- // alarms, raising an "operation_aborted" error here, which we ignore.
- warning() << "setAlarm() received an error: " << ec.message();
- }
- });
-
- return Status::OK();
-};
-
-bool NetworkInterfaceASIO::inShutdown() const {
- return (_state.load() == State::kShutdown);
-}
-
-bool NetworkInterfaceASIO::onNetworkThread() {
- auto id = stdx::this_thread::get_id();
- return std::any_of(_serviceRunners.begin(),
- _serviceRunners.end(),
- [id](const stdx::thread& thread) { return id == thread.get_id(); });
-}
-
-void NetworkInterfaceASIO::_failWithInfo(const char* file,
- int line,
- std::string error,
- AsyncOp* op) {
- stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
- _failWithInfo_inlock(file, line, error, op);
-}
-
-void NetworkInterfaceASIO::_failWithInfo_inlock(const char* file,
- int line,
- std::string error,
- AsyncOp* op) {
- std::stringstream ss;
- ss << "Invariant failure at " << file << ":" << line << ": " << error;
- ss << _getDiagnosticString_inlock(op);
- Status status{ErrorCodes::InternalError, ss.str()};
- fassertFailedWithStatus(34429, status);
-}
-
-void NetworkInterfaceASIO::dropConnections(const HostAndPort& hostAndPort) {
- _connectionPool.dropConnections(hostAndPort);
-}
-
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h
deleted file mode 100644
index 3a423e8b779..00000000000
--- a/src/mongo/executor/network_interface_asio.h
+++ /dev/null
@@ -1,526 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <asio.hpp>
-
-#include <array>
-#include <boost/optional.hpp>
-#include <memory>
-#include <string>
-#include <system_error>
-#include <vector>
-
-#include "mongo/base/status.h"
-#include "mongo/base/system_error.h"
-#include "mongo/executor/async_stream_factory_interface.h"
-#include "mongo/executor/async_stream_interface.h"
-#include "mongo/executor/async_timer_interface.h"
-#include "mongo/executor/connection_pool.h"
-#include "mongo/executor/network_connection_hook.h"
-#include "mongo/executor/network_interface.h"
-#include "mongo/executor/remote_command_request.h"
-#include "mongo/executor/remote_command_response.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/rpc/message.h"
-#include "mongo/rpc/metadata/metadata_hook.h"
-#include "mongo/rpc/protocol.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/functional.h"
-#include "mongo/stdx/mutex.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/stdx/unordered_map.h"
-#include "mongo/stdx/unordered_set.h"
-#include "mongo/transport/message_compressor_manager.h"
-
-namespace mongo {
-
-namespace executor {
-
-namespace connection_pool_asio {
-class ASIOConnection;
-class ASIOTimer;
-class ASIOImpl;
-} // connection_pool_asio
-
-class AsyncStreamInterface;
-
-#define MONGO_ASIO_INVARIANT(_Expression, ...) \
- do { \
- if (MONGO_unlikely(!(_Expression))) { \
- _failWithInfo(__FILE__, __LINE__, __VA_ARGS__); \
- } \
- } while (false)
-
-#define MONGO_ASIO_INVARIANT_INLOCK(_Expression, ...) \
- do { \
- if (MONGO_unlikely(!(_Expression))) { \
- _failWithInfo_inlock(__FILE__, __LINE__, __VA_ARGS__); \
- } \
- } while (false)
-
-// An AsyncOp can transition through at most 5 states.
-const int kMaxStateTransitions = 5;
-
-/**
- * Implementation of the replication system's network interface using Christopher
- * Kohlhoff's ASIO library instead of existing MongoDB networking primitives.
- */
-class NetworkInterfaceASIO final : public NetworkInterface {
- friend class connection_pool_asio::ASIOConnection;
- friend class connection_pool_asio::ASIOTimer;
- friend class connection_pool_asio::ASIOImpl;
- class AsyncOp;
-
-public:
- struct Options {
- Options();
-
- std::string instanceName = "NetworkInterfaceASIO";
- ConnectionPool::Options connectionPoolOptions;
- std::unique_ptr<AsyncTimerFactoryInterface> timerFactory;
- std::unique_ptr<NetworkConnectionHook> networkConnectionHook;
- std::unique_ptr<AsyncStreamFactoryInterface> streamFactory;
- std::unique_ptr<rpc::EgressMetadataHook> metadataHook;
- };
-
- NetworkInterfaceASIO(Options = Options());
-
- std::string getDiagnosticString() override;
-
- uint64_t getNumCanceledOps();
- uint64_t getNumFailedOps();
- uint64_t getNumSucceededOps();
- uint64_t getNumTimedOutOps();
-
- void appendConnectionStats(ConnectionPoolStats* stats) const override;
- std::string getHostName() override;
- void startup() override;
- void shutdown() override;
- bool inShutdown() const override;
- void waitForWork() override;
- void waitForWorkUntil(Date_t when) override;
- void signalWorkAvailable() override;
- Date_t now() override;
- Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
- RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish,
- const transport::BatonHandle& baton = nullptr) override;
- void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
- const transport::BatonHandle& baton = nullptr) override;
- Status setAlarm(Date_t when,
- const stdx::function<void()>& action,
- const transport::BatonHandle& baton = nullptr) override;
-
- bool onNetworkThread() override;
-
- void dropConnections(const HostAndPort& hostAndPort) override;
-
-private:
- using ResponseStatus = TaskExecutor::ResponseStatus;
- using NetworkInterface::RemoteCommandCompletionFn;
- using NetworkOpHandler = stdx::function<void(std::error_code, size_t)>;
- using TableRow = std::vector<std::string>;
-
- enum class State { kReady, kRunning, kShutdown };
-
- friend class AsyncOp;
-
- /**
- * AsyncConnection encapsulates the per-connection state we maintain.
- */
- class AsyncConnection {
- public:
- AsyncConnection(std::unique_ptr<AsyncStreamInterface>, rpc::ProtocolSet serverProtocols);
-
- AsyncStreamInterface& stream();
-
- void cancel();
-
- rpc::ProtocolSet serverProtocols() const;
- rpc::ProtocolSet clientProtocols() const;
- void setServerProtocols(rpc::ProtocolSet protocols);
-
- MessageCompressorManager& getCompressorManager() {
- return _compressorManager;
- }
-
- private:
- std::unique_ptr<AsyncStreamInterface> _stream;
-
- rpc::ProtocolSet _serverProtocols;
-
- // Dynamically initialized from [min max]WireVersionOutgoing.
- // Its expected that isMaster response is checked only on the caller.
- rpc::ProtocolSet _clientProtocols{rpc::supports::kNone};
-
- MessageCompressorManager _compressorManager;
- };
-
- /**
- * AsyncCommand holds state for a currently running or soon-to-be-run command.
- */
- class AsyncCommand {
- public:
- AsyncCommand(AsyncConnection* conn,
- Message&& command,
- Date_t now,
- const HostAndPort& target);
-
- NetworkInterfaceASIO::AsyncConnection& conn();
-
- Message& toSend();
- Message& toRecv();
- MSGHEADER::Value& header();
-
- ResponseStatus response(AsyncOp* op,
- rpc::Protocol protocol,
- Date_t now,
- rpc::EgressMetadataHook* metadataHook = nullptr);
-
- HostAndPort target() const {
- return this->_target;
- }
-
- private:
- NetworkInterfaceASIO::AsyncConnection* const _conn;
-
- Message _toSend;
- Message _toRecv;
-
- // TODO: Investigate efficiency of storing header separately.
- MSGHEADER::Value _header;
-
- const Date_t _start;
-
- const HostAndPort _target;
- };
-
- /**
- * Helper object to manage individual network operations.
- */
- class AsyncOp {
- friend class NetworkInterfaceASIO;
- friend class connection_pool_asio::ASIOConnection;
-
- public:
- /**
- * Describe the various states through which an AsyncOp transitions.
- */
- enum class State : unsigned char {
- // A non-state placeholder.
- kNoState,
- // A new or zeroed-out AsyncOp.
- kUninitialized,
- // An AsyncOp begins its progress when startProgress() is called.
- kInProgress,
- // An AsyncOp transitions to kTimedOut when timeOut() is called.
- // Note that the AsyncOp can be in a kCanceled state and still be
- // in-flight in NetworkInterfaceASIO.
- kTimedOut,
- // An AsyncOp transitions to kCanceled when cancel() is called.
- // Note that the AsyncOp can be in a kCanceled state and still be
- // in-flight in NetworkInterfaceASIO.
- kCanceled,
- // An AsyncOp is finished once its finish() method is called. Note
- // that the AsyncOp can be in a kFinished state and still be in the
- // NetworkInterface's set of in-progress operations.
- kFinished,
- };
-
- AsyncOp(NetworkInterfaceASIO* net,
- const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish,
- Date_t now);
-
- /**
- * Access control for AsyncOp. These objects should be used through shared_ptrs.
- *
- * In order to safely access an AsyncOp:
- * 1. Take the lock
- * 2. Check the id
- * 3. If id matches saved generation, proceed, otherwise op has been recycled.
- */
- struct AccessControl {
- stdx::mutex mutex;
- std::size_t id = 0;
- };
-
- void cancel();
- bool canceled() const;
- void timeOut_inlock();
- bool timedOut() const;
-
- const TaskExecutor::CallbackHandle& cbHandle() const;
-
- AsyncConnection& connection();
-
- void setConnection(AsyncConnection&& conn);
-
- // AsyncOp may run multiple commands over its lifetime (for example, an ismaster
- // command, the command provided to the NetworkInterface via startCommand(), etc.)
- // Calling beginCommand() resets internal state to prepare to run newCommand.
- Status beginCommand(const RemoteCommandRequest& request);
-
- // This form of beginCommand takes a raw message. It is needed if the caller
- // has to form the command manually (e.g. to use a specific requestBuilder).
- Status beginCommand(Message&& newCommand, const HostAndPort& target);
-
- AsyncCommand& command();
- bool commandIsInitialized() const;
-
- void finish(TaskExecutor::ResponseStatus&& status);
-
- const RemoteCommandRequest& request() const;
-
- void startProgress(Date_t startTime);
-
- Date_t start() const;
-
- rpc::Protocol operationProtocol() const;
-
- void setOperationProtocol(rpc::Protocol proto);
-
- void setResponseMetadata(BSONObj m);
- BSONObj getResponseMetadata();
-
- void reset();
-
- void clearStateTransitions();
-
- void setOnFinish(RemoteCommandCompletionFn&& onFinish);
-
- // Returns diagnostic strings for logging.
- TableRow getStringFields() const;
- std::string toString() const;
-
- asio::io_service::strand& strand() {
- return _strand;
- }
-
- asio::ip::tcp::resolver& resolver() {
- return _resolver;
- }
-
- bool operator==(const AsyncOp& other) const;
-
- private:
- // Type to represent the internal id of this request.
- using AsyncOpId = uint64_t;
-
- static const TableRow kFieldLabels;
-
- // Return string representation of a given state.
- std::string _stateToString(State state) const;
-
- // Return a string representation of this op's state transitions.
- std::string _stateString() const;
-
- bool _hasSeenState(State state) const;
-
- // Track and validate AsyncOp state transitions.
- // Use the _inlock variant if already holding the access control lock.
- void _transitionToState(State newState);
- void _transitionToState_inlock(State newState);
-
- // Helper for debugging.
- void _failWithInfo(const char* file, int line, std::string error) const;
-
- NetworkInterfaceASIO* const _owner;
- // Information describing a task enqueued on the NetworkInterface
- // via a call to startCommand().
- TaskExecutor::CallbackHandle _cbHandle;
- RemoteCommandRequest _request;
- RemoteCommandCompletionFn _onFinish;
-
- // AsyncOp's have a handle to their connection pool handle. They are
- // also owned by it when they're in the pool
- ConnectionPool::ConnectionHandle _connectionPoolHandle;
-
- /**
- * The connection state used to service this request. We wrap it in an optional
- * as it is instantiated at some point after the AsyncOp is created.
- */
- boost::optional<AsyncConnection> _connection;
-
- /**
- * The RPC protocol used for this operation. We wrap it in an optional as it
- * is not known until we obtain a connection.
- */
- boost::optional<rpc::Protocol> _operationProtocol;
-
- Date_t _start;
- std::unique_ptr<AsyncTimerInterface> _timeoutAlarm;
-
- asio::ip::tcp::resolver _resolver;
-
- const AsyncOpId _id;
-
- /**
- * We maintain a shared_ptr to an access control object. This ensures that tangent
- * execution paths, such as timeouts for this operation, will not try to access its
- * state after it has been cleaned up.
- */
- std::shared_ptr<AccessControl> _access;
-
- /**
- * An AsyncOp may run 0, 1, or multiple commands over its lifetime.
- * AsyncOp only holds at most a single AsyncCommand object at a time,
- * representing its current running or next-to-be-run command, if there is one.
- */
- boost::optional<AsyncCommand> _command;
- bool _inSetup;
- bool _inRefresh;
-
- /**
- * The explicit strand that all operations for this op must run on.
- * This must be the last member of AsyncOp because any pending
- * operation for the strand are run when it's dtor is called. Any
- * members that fall after it will have already been destroyed, which
- * will make those fields illegal to touch from callbacks.
- */
- asio::io_service::strand _strand;
-
- /**
- * We hold an array of states to show the path this AsyncOp has taken.
- * Must be holding the access control's lock to edit.
- */
- std::array<State, kMaxStateTransitions> _states;
-
- BSONObj _responseMetadata{};
- };
-
- void _startCommand(AsyncOp* op);
-
- /**
- * Wraps a completion handler in pre-condition checks.
- * When we resume after an asynchronous call, we may find the following:
- * - the AsyncOp has been canceled in the interim (via cancelCommand())
- * - the asynchronous call has returned a non-OK error code
- * Should both conditions be present, we handle cancelation over errors. States use
- * _validateAndRun() to perform these checks before advancing the state machine.
- */
- template <typename Handler>
- void _validateAndRun(AsyncOp* op, std::error_code ec, Handler&& handler) {
- if (op->canceled()) {
- auto rs = ResponseStatus(
- ErrorCodes::CallbackCanceled, "Callback canceled", now() - op->start());
- return _completeOperation(op, rs);
- } else if (op->timedOut()) {
- auto rs = ResponseStatus(ErrorCodes::NetworkInterfaceExceededTimeLimit,
- "Operation timed out",
- now() - op->start());
- return _completeOperation(op, rs);
- } else if (ec)
- return _networkErrorCallback(op, ec);
-
- handler();
- }
-
- // Connection
- void _connect(AsyncOp* op);
-
- // setup plaintext TCP socket
- void _setupSocket(AsyncOp* op, asio::ip::tcp::resolver::iterator endpoints);
-
- void _runIsMaster(AsyncOp* op);
- void _runConnectionHook(AsyncOp* op);
- void _authenticate(AsyncOp* op);
-
- // Communication state machine
- void _beginCommunication(AsyncOp* op);
- void _completedOpCallback(AsyncOp* op);
- void _networkErrorCallback(AsyncOp* op, const std::error_code& ec);
- void _completeOperation(AsyncOp* op, TaskExecutor::ResponseStatus resp);
-
- void _signalWorkAvailable_inlock();
-
- void _asyncRunCommand(AsyncOp* op, NetworkOpHandler handler);
-
- std::string _getDiagnosticString_inlock(AsyncOp* currentOp);
-
- // Helpers for debugging crashes
- void _failWithInfo(const char* file, int line, std::string error, AsyncOp* op = nullptr);
- void _failWithInfo_inlock(const char* file, int line, std::string error, AsyncOp* op = nullptr);
-
- Options _options;
-
- asio::io_service _io_service;
- std::vector<stdx::thread> _serviceRunners;
-
- const std::unique_ptr<rpc::EgressMetadataHook> _metadataHook;
-
- const std::unique_ptr<NetworkConnectionHook> _hook;
-
- std::atomic<State> _state; // NOLINT
-
- std::unique_ptr<AsyncTimerFactoryInterface> _timerFactory;
-
- std::unique_ptr<AsyncStreamFactoryInterface> _streamFactory;
-
- ConnectionPool _connectionPool;
-
- // If it is necessary to hold this lock while accessing a particular operation with
- // an AccessControl object, take this lock first, always.
- stdx::mutex _inProgressMutex;
- stdx::unordered_map<AsyncOp*, std::unique_ptr<AsyncOp>> _inProgress;
- stdx::unordered_set<TaskExecutor::CallbackHandle> _inGetConnection;
-
- // Operation counters
- AtomicUInt64 _numCanceledOps;
- AtomicUInt64 _numFailedOps; // includes timed out ops but does not include canceled ops
- AtomicUInt64 _numSucceededOps;
- AtomicUInt64 _numTimedOutOps;
-
- stdx::mutex _executorMutex;
- bool _isExecutorRunnable;
- stdx::condition_variable _isExecutorRunnableCondition;
-
- /**
- * The explicit strand that all non-op operations run on. This must be the
- * last member of NetworkInterfaceASIO because any pending operation for
- * the strand are run when it's dtor is called. Any members that fall after
- * it will have already been destroyed, which will make those fields
- * illegal to touch from callbacks.
- */
- asio::io_service::strand _strand;
-};
-
-template <typename T, typename R, typename... MethodArgs, typename... DeducedArgs>
-R callNoexcept(T& obj, R (T::*method)(MethodArgs...), DeducedArgs&&... args) {
- try {
- return (obj.*method)(std::forward<DeducedArgs>(args)...);
- } catch (...) {
- std::terminate();
- }
-}
-
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/network_interface_asio_auth.cpp b/src/mongo/executor/network_interface_asio_auth.cpp
deleted file mode 100644
index 1a981b3d017..00000000000
--- a/src/mongo/executor/network_interface_asio_auth.cpp
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/executor/network_interface_asio.h"
-
-#include "mongo/bson/util/builder.h"
-#include "mongo/client/authenticate.h"
-#include "mongo/config.h"
-#include "mongo/db/auth/authorization_manager_global.h"
-#include "mongo/db/auth/internal_user_auth.h"
-#include "mongo/db/commands.h"
-#include "mongo/db/commands/test_commands_enabled.h"
-#include "mongo/db/server_options.h"
-#include "mongo/db/wire_version.h"
-#include "mongo/rpc/factory.h"
-#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/rpc/legacy_request_builder.h"
-#include "mongo/rpc/metadata/client_metadata.h"
-#include "mongo/rpc/reply_interface.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/util/log.h"
-#include "mongo/util/net/ssl_manager.h"
-#include "mongo/util/version.h"
-
-namespace mongo {
-namespace executor {
-
-using ResponseStatus = TaskExecutor::ResponseStatus;
-
-void NetworkInterfaceASIO::_runIsMaster(AsyncOp* op) {
- BSONObjBuilder bob;
- bob.append("isMaster", 1);
- bob.append("hangUpOnStepDown", false);
-
- const auto versionString = VersionInfoInterface::instance().version();
- ClientMetadata::serialize(_options.instanceName, versionString, &bob);
-
- if (getTestCommandsEnabled()) {
- // Only include the host:port of this process in the isMaster command request if test
- // commands are enabled. mongobridge uses this field to identify the process opening a
- // connection to it.
- StringBuilder sb;
- sb << getHostName() << ':' << serverGlobalParams.port;
- bob.append("hostInfo", sb.str());
- }
-
- op->connection().getCompressorManager().clientBegin(&bob);
-
- if (WireSpec::instance().isInternalClient) {
- WireSpec::appendInternalClientWireVersion(WireSpec::instance().outgoing, &bob);
- }
-
- // We use a legacy request to create our ismaster request because we may
- // have to communicate with servers that do not support other protocols.
- auto isMasterRequest =
- rpc::legacyRequestFromOpMsgRequest(OpMsgRequest::fromDBAndBody("admin", bob.obj()));
-
- // Set current command to ismaster request and run
- auto beginStatus = op->beginCommand(std::move(isMasterRequest), op->request().target);
- if (!beginStatus.isOK()) {
- return _completeOperation(op, beginStatus);
- }
-
- // Callback to parse protocol information out of received ismaster response
- auto parseIsMaster = [this, op]() {
-
- auto swCommandReply = op->command().response(op, rpc::Protocol::kOpQuery, now());
- if (!swCommandReply.isOK()) {
- return _completeOperation(op, swCommandReply);
- }
-
- auto commandReply = std::move(swCommandReply);
-
- // Ensure that the isMaster response is "ok:1".
- auto commandStatus = getStatusFromCommandResult(commandReply.data);
- if (!commandStatus.isOK()) {
- return _completeOperation(op, commandStatus);
- }
-
- auto protocolSet = rpc::parseProtocolSetFromIsMasterReply(commandReply.data);
- if (!protocolSet.isOK())
- return _completeOperation(op, protocolSet.getStatus());
-
- auto validateStatus =
- rpc::validateWireVersion(WireSpec::instance().outgoing, protocolSet.getValue().version);
- if (!validateStatus.isOK()) {
- warning() << "remote host has incompatible wire version: " << validateStatus;
-
- return _completeOperation(op, validateStatus);
- }
- auto egressTagManager = _options.connectionPoolOptions.egressTagCloserManager;
- if (egressTagManager) {
- // Tag outgoing connection so it can be kept open on FCV upgrade if it is not to a
- // server with a lower binary version.
- if (protocolSet.getValue().version.maxWireVersion >=
- WireSpec::instance().outgoing.maxWireVersion) {
- egressTagManager->mutateTags(
- op->command().target(),
- [](transport::Session::TagMask tags) { return transport::Session::kKeepOpen; });
- }
- } // Some unit and integration tests do not set up an egress tag manager.
-
- op->connection().setServerProtocols(protocolSet.getValue().protocolSet);
-
- invariant(op->connection().clientProtocols() != rpc::supports::kNone);
- // Set the operation protocol
- auto negotiatedProtocol =
- rpc::negotiate(op->connection().serverProtocols(), op->connection().clientProtocols());
-
- if (!negotiatedProtocol.isOK()) {
- // Add relatively verbose logging here, since this should not happen unless we are
- // mongos and we try to connect to a node that doesn't support OP_COMMAND.
- warning() << "failed to negotiate protocol with remote host: " << op->request().target;
- warning() << "request was: " << redact(op->request().cmdObj);
- warning() << "response was: " << redact(commandReply.data);
-
- auto clientProtos = rpc::toString(op->connection().clientProtocols());
- if (clientProtos.isOK()) {
- warning() << "our (client) supported protocols: " << clientProtos.getValue();
- }
- auto serverProtos = rpc::toString(op->connection().serverProtocols());
- if (serverProtos.isOK()) {
- warning() << "remote server's supported protocols:" << serverProtos.getValue();
- }
- return _completeOperation(op, negotiatedProtocol.getStatus());
- }
-
- op->setOperationProtocol(negotiatedProtocol.getValue());
-
- op->connection().getCompressorManager().clientFinish(commandReply.data);
-
- if (_hook) {
- // Run the validation hook.
- auto validHost = callNoexcept(
- *_hook, &NetworkConnectionHook::validateHost, op->request().target, commandReply);
- if (!validHost.isOK()) {
- return _completeOperation(op, validHost);
- }
- }
-
- return _authenticate(op);
-
- };
-
- _asyncRunCommand(op, [this, op, parseIsMaster](std::error_code ec, size_t bytes) {
- _validateAndRun(op, ec, std::move(parseIsMaster));
- });
-}
-
-void NetworkInterfaceASIO::_authenticate(AsyncOp* op) {
- // There is currently no way for NetworkInterfaceASIO's users to run a command
- // without going through _authenticate(). Callers may want to run certain commands,
- // such as ismasters, pre-auth. We may want to offer this choice in the future.
-
- // This check is sufficient to see if auth is enabled on the system,
- // and avoids creating dependencies on deeper, less accessible auth code.
- if (!isInternalAuthSet()) {
- return _runConnectionHook(op);
- }
-
- // We will only have a valid clientName if SSL is enabled.
- std::string clientName;
-#ifdef MONGO_CONFIG_SSL
- if (getSSLManager()) {
- clientName = getSSLManager()->getSSLConfiguration().clientSubjectName.toString();
- }
-#endif
-
- // authenticateClient will use this to run auth-related commands over our connection.
- auto runCommandHook = [this, op](executor::RemoteCommandRequest request,
- auth::AuthCompletionHandler handler) {
-
- // SERVER-14170: Set the metadataHook to nullptr explicitly as we cannot write metadata
- // here.
- auto beginStatus = op->beginCommand(request);
- if (!beginStatus.isOK()) {
- return handler(beginStatus);
- }
-
- auto callAuthCompletionHandler = [this, op, handler]() {
- auto authResponse = op->command().response(op, op->operationProtocol(), now(), nullptr);
- handler(authResponse);
- };
-
- _asyncRunCommand(op,
- [this, op, callAuthCompletionHandler](std::error_code ec, size_t bytes) {
- _validateAndRun(op, ec, callAuthCompletionHandler);
- });
- };
-
- // This will be called when authentication has completed.
- auto authHook = [this, op](auth::AuthResponse response) {
- if (!response.isOK())
- return _completeOperation(op, response);
- return _runConnectionHook(op);
- };
-
- auto params = getInternalUserAuthParams();
- auth::authenticateClient(params, op->request().target, clientName, runCommandHook, authHook);
-}
-
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp
deleted file mode 100644
index ca111a09f51..00000000000
--- a/src/mongo/executor/network_interface_asio_command.cpp
+++ /dev/null
@@ -1,506 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/executor/network_interface_asio.h"
-
-#include <type_traits>
-#include <utility>
-
-#include "mongo/base/static_assert.h"
-#include "mongo/db/dbmessage.h"
-#include "mongo/db/jsobj.h"
-#include "mongo/executor/async_stream_interface.h"
-#include "mongo/executor/connection_pool_asio.h"
-#include "mongo/rpc/factory.h"
-#include "mongo/rpc/metadata/metadata_hook.h"
-#include "mongo/rpc/protocol.h"
-#include "mongo/rpc/reply_interface.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/util/assert_util.h"
-#include "mongo/util/fail_point_service.h"
-#include "mongo/util/log.h"
-#include "mongo/util/mongoutils/str.h"
-
-namespace mongo {
-namespace executor {
-
-/**
- * The following send - receive utility functions are "stateless" in that they exist
- * apart from the AsyncOp state machine.
- */
-
-namespace {
-
-using namespace std::literals::string_literals;
-
-MONGO_FP_DECLARE(NetworkInterfaceASIOasyncRunCommandFail);
-
-using asio::ip::tcp;
-using ResponseStatus = TaskExecutor::ResponseStatus;
-
-// A type conforms to the NetworkHandler concept if it is a callable type that takes a
-// std::error_code and std::size_t and returns void. The std::error_code parameter is used
-// to inform the handler if the asynchronous operation it was waiting on succeeded, and the
-// size_t parameter conveys how many bytes were read or written.
-template <typename FunctionLike>
-using IsNetworkHandler =
- std::is_convertible<FunctionLike, stdx::function<void(std::error_code, std::size_t)>>;
-
-template <typename Handler>
-void asyncSendMessage(AsyncStreamInterface& stream, Message* m, Handler&& handler) {
- MONGO_STATIC_ASSERT_MSG(
- IsNetworkHandler<Handler>::value,
- "Handler passed to asyncSendMessage does not conform to NetworkHandler concept");
- m->header().setResponseToMsgId(0);
- m->header().setId(nextMessageId());
- // TODO: Some day we may need to support vector messages.
- fassert(28708, m->buf() != 0);
- stream.write(asio::buffer(m->buf(), m->size()), std::forward<Handler>(handler));
-}
-
-template <typename Handler>
-void asyncRecvMessageHeader(AsyncStreamInterface& stream,
- MSGHEADER::Value* header,
- Handler&& handler) {
- MONGO_STATIC_ASSERT_MSG(
- IsNetworkHandler<Handler>::value,
- "Handler passed to asyncRecvMessageHeader does not conform to NetworkHandler concept");
- stream.read(asio::buffer(header->view().view2ptr(), sizeof(decltype(*header))),
- std::forward<Handler>(handler));
-}
-
-template <typename Handler>
-void asyncRecvMessageBody(AsyncStreamInterface& stream,
- MSGHEADER::Value* header,
- Message* m,
- Handler&& handler) {
- MONGO_STATIC_ASSERT_MSG(
- IsNetworkHandler<Handler>::value,
- "Handler passed to asyncRecvMessageBody does not conform to NetworkHandler concept");
- // validate message length
- int len = header->constView().getMessageLength();
- if (len == 542393671) {
- LOG(3) << "attempt to access MongoDB over HTTP on the native driver port.";
- return handler(make_error_code(ErrorCodes::ProtocolError), 0);
- } else if (static_cast<size_t>(len) < sizeof(MSGHEADER::Value) ||
- static_cast<size_t>(len) > MaxMessageSizeBytes) {
- warning() << "recv(): message len " << len << " is invalid. "
- << "Min " << sizeof(MSGHEADER::Value) << " Max: " << MaxMessageSizeBytes;
- return handler(make_error_code(ErrorCodes::InvalidLength), 0);
- }
-
- int z = (len + 1023) & 0xfffffc00;
- invariant(z >= len);
- m->setData(SharedBuffer::allocate(z));
- MsgData::View mdView = m->buf();
-
- // copy header data into master buffer
- int headerLen = sizeof(MSGHEADER::Value);
- memcpy(mdView.view2ptr(), header, headerLen);
- int bodyLength = len - headerLen;
- invariant(bodyLength >= 0);
-
- // receive remaining data into md->data
- stream.read(asio::buffer(mdView.data(), bodyLength), std::forward<Handler>(handler));
-}
-
-ResponseStatus decodeRPC(Message* received,
- rpc::Protocol protocol,
- Milliseconds elapsed,
- const HostAndPort& source,
- rpc::EgressMetadataHook* metadataHook) {
- try {
- // makeReply will throw if the reply is invalid
- auto reply = rpc::makeReply(received);
- if (reply->getProtocol() != protocol) {
- auto requestProtocol = rpc::toString(static_cast<rpc::ProtocolSet>(protocol));
- if (!requestProtocol.isOK())
- return {requestProtocol.getStatus(), elapsed};
-
- return {ErrorCodes::RPCProtocolNegotiationFailed,
- str::stream() << "Mismatched RPC protocols - request was '"
- << requestProtocol.getValue().toString()
- << "' '"
- << " but reply was '"
- << networkOpToString(received->operation())
- << "'",
- elapsed};
- }
- auto commandReply = reply->getCommandReply();
- auto replyMetadata = reply->getMetadata();
-
- // Handle incoming reply metadata.
- if (metadataHook) {
- auto listenStatus = callNoexcept(*metadataHook,
- &rpc::EgressMetadataHook::readReplyMetadata,
- nullptr, // adding operationTime is handled via lambda
- source.toString(),
- replyMetadata);
- if (!listenStatus.isOK()) {
- return {listenStatus, elapsed};
- }
- }
-
- return {RemoteCommandResponse(
- std::move(*received), std::move(commandReply), std::move(replyMetadata), elapsed)};
- } catch (...) {
- return {exceptionToStatus(), elapsed};
- }
-}
-
-} // namespace
-
-NetworkInterfaceASIO::AsyncCommand::AsyncCommand(AsyncConnection* conn,
- Message&& command,
- Date_t now,
- const HostAndPort& target)
- : _conn(conn), _toSend(std::move(command)), _start(now), _target(target) {
- _toSend.header().setResponseToMsgId(0);
-}
-
-NetworkInterfaceASIO::AsyncConnection& NetworkInterfaceASIO::AsyncCommand::conn() {
- return *_conn;
-}
-
-Message& NetworkInterfaceASIO::AsyncCommand::toSend() {
- return _toSend;
-}
-
-Message& NetworkInterfaceASIO::AsyncCommand::toRecv() {
- return _toRecv;
-}
-
-MSGHEADER::Value& NetworkInterfaceASIO::AsyncCommand::header() {
- return _header;
-}
-
-ResponseStatus NetworkInterfaceASIO::AsyncCommand::response(AsyncOp* op,
- rpc::Protocol protocol,
- Date_t now,
- rpc::EgressMetadataHook* metadataHook) {
- auto& received = _toRecv;
- if (received.operation() == dbCompressed) {
- auto swm = conn().getCompressorManager().decompressMessage(received);
- if (!swm.isOK()) {
- return swm.getStatus();
- }
- received = std::move(swm.getValue());
- }
-
- auto rs = decodeRPC(&received, protocol, now - _start, _target, metadataHook);
- if (rs.isOK())
- op->setResponseMetadata(rs.metadata);
- return rs;
-}
-
-void NetworkInterfaceASIO::_startCommand(AsyncOp* op) {
- LOG(3) << "running command " << redact(op->request().cmdObj) << " against database "
- << op->request().dbname << " across network to " << op->request().target.toString();
- if (inShutdown()) {
- return;
- }
-
- // _connect() will continue the state machine.
- _connect(op);
-}
-
-void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) {
- // The way that we connect connections for the connection pool is by
- // starting the callback chain with connect(), but getting off at the first
- // _beginCommunication. I.e. all AsyncOp's start off with _inSetup == true
- // and arrive here as they're connected and authed. Once they hit here, we
- // return to the connection pool's get() callback with _inSetup == false,
- // so we can proceed with user operations after they return to this
- // codepath.
- if (op->_inSetup) {
- auto host = op->request().target;
- auto getConnectionDuration = now() - op->start();
- log() << "Successfully connected to " << host << ", took " << getConnectionDuration << " ("
- << _connectionPool.getNumConnectionsPerHost(host) << " connections now open to "
- << host << ")";
- op->_inSetup = false;
- op->finish(RemoteCommandResponse());
- return;
- }
-
- LOG(3) << "Initiating asynchronous command: " << redact(op->request().toString());
-
- auto beginStatus = op->beginCommand(op->request());
- if (!beginStatus.isOK()) {
- return _completeOperation(op, beginStatus);
- }
-
- _asyncRunCommand(op, [this, op](std::error_code ec, size_t bytes) {
- _validateAndRun(op, ec, [this, op]() { _completedOpCallback(op); });
- });
-}
-
-void NetworkInterfaceASIO::_completedOpCallback(AsyncOp* op) {
- auto response = op->command().response(op, op->operationProtocol(), now(), _metadataHook.get());
- _completeOperation(op, response);
-}
-
-void NetworkInterfaceASIO::_networkErrorCallback(AsyncOp* op, const std::error_code& ec) {
- ErrorCodes::Error errorCode = (ec.category() == mongoErrorCategory())
- ? ErrorCodes::Error(ec.value())
- : ErrorCodes::HostUnreachable;
- _completeOperation(op, {errorCode, ec.message(), Milliseconds(now() - op->_start)});
-}
-
-// NOTE: This method may only be called by ASIO threads
-// (do not call from methods entered by TaskExecutor threads)
-void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, ResponseStatus resp) {
- auto metadata = op->getResponseMetadata();
- if (!metadata.isEmpty()) {
- resp.metadata = metadata;
- }
-
- // Cancel this operation's timeout. Note that the timeout callback may already be running,
- // may have run, or may have already been scheduled to run in the near future.
- if (op->_timeoutAlarm) {
- op->_timeoutAlarm->cancel();
- }
-
- if (ErrorCodes::isExceededTimeLimitError(resp.status.code())) {
- _numTimedOutOps.fetchAndAdd(1);
- }
-
- if (op->_inSetup) {
- // If we are in setup we should only be here if we failed to connect.
- MONGO_ASIO_INVARIANT(!resp.isOK(), "Failed to connect in setup", op);
- // If we fail during connection, we won't be able to access any of op's members after
- // calling finish(), so we return here.
- log() << "Failed to connect to " << op->request().target << " - " << redact(resp.status);
- op->finish(std::move(resp));
- return;
- }
-
- if (op->_inRefresh) {
- // If we are in refresh we should only be here if we failed to heartbeat.
- MONGO_ASIO_INVARIANT(!resp.isOK(), "In refresh, but did not fail to heartbeat", op);
- // If we fail during heartbeating, we won't be able to access any of op's members after
- // calling finish(), so we return here.
- log() << "Failed asio heartbeat to "
- << (op->commandIsInitialized() ? op->command().target().toString() : "unknown"s)
- << " - " << redact(resp.status);
- _numFailedOps.fetchAndAdd(1);
- op->finish(std::move(resp));
- return;
- }
-
- if (!resp.isOK()) {
- // In the case that resp is not OK, but _inSetup is false, we are using a connection
- // that we got from the pool to execute a command, but it failed for some reason.
- if (op->commandIsInitialized() && shouldLog(LogstreamBuilder::severityCast(2))) {
- const auto performLog = [&resp](Message& message) {
- LOG(2) << "Failed to send message. Reason: " << redact(resp.status) << ". Message: "
- << rpc::opMsgRequestFromAnyProtocol(message).body.toString(
- logger::globalLogDomain()->shouldRedactLogs());
- };
-
- // Message might be compressed, decompress in that case so we can log the body
- Message& maybeCompressed = op->command().toSend();
- if (maybeCompressed.operation() != dbCompressed) {
- performLog(maybeCompressed);
- } else {
- StatusWith<Message> decompressedMessage =
- op->command().conn().getCompressorManager().decompressMessage(maybeCompressed);
- if (decompressedMessage.isOK()) {
- performLog(decompressedMessage.getValue());
- } else {
- LOG(2) << "Failed to execute a command. Reason: " << redact(resp.status)
- << ". Decompression failed with: "
- << redact(decompressedMessage.getStatus());
- }
- }
- } else {
- LOG(2) << "Failed to execute a command. Reason: " << redact(resp.status);
- }
-
- if (resp.status.code() != ErrorCodes::CallbackCanceled) {
- _numFailedOps.fetchAndAdd(1);
- }
- } else {
- _numSucceededOps.fetchAndAdd(1);
- }
-
- std::unique_ptr<AsyncOp> ownedOp;
-
- {
- stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
-
- auto iter = _inProgress.find(op);
-
- MONGO_ASIO_INVARIANT_INLOCK(
- iter != _inProgress.end(), "Could not find AsyncOp in _inProgress", op);
-
- ownedOp = std::move(iter->second);
- _inProgress.erase(iter);
- }
-
- op->finish(std::move(resp));
-
- MONGO_ASIO_INVARIANT(static_cast<bool>(ownedOp), "Invalid AsyncOp", op);
-
- auto conn = std::move(op->_connectionPoolHandle);
- auto asioConn = static_cast<connection_pool_asio::ASIOConnection*>(conn.get());
-
- // Prevent any other threads or callbacks from accessing this op so we may safely complete
- // and destroy it. It is key that we do this after we remove the op from the _inProgress map
- // or someone else in cancelCommand could read the bumped generation and cancel the next
- // command that uses this op. See SERVER-20556.
- {
- stdx::lock_guard<stdx::mutex> lk(op->_access->mutex);
- ++(op->_access->id);
- }
-
- // We need to bump the generation BEFORE we call reset() or we could flip the timeout in the
- // timeout callback before returning the AsyncOp to the pool.
- ownedOp->reset();
-
- asioConn->bindAsyncOp(std::move(ownedOp));
- if (!resp.isOK()) {
- asioConn->indicateFailure(resp.status);
- } else {
- asioConn->indicateUsed();
- asioConn->indicateSuccess();
- }
-
- signalWorkAvailable();
-}
-
-void NetworkInterfaceASIO::_asyncRunCommand(AsyncOp* op, NetworkOpHandler handler) {
- LOG(2) << "Starting asynchronous command " << op->request().id << " on host "
- << op->request().target.toString();
-
- if (MONGO_FAIL_POINT(NetworkInterfaceASIOasyncRunCommandFail)) {
- _validateAndRun(op, asio::error::basic_errors::network_unreachable, [] {});
- return;
- }
-
- // We invert the following steps below to run a command:
- // 1 - send the given command
- // 2 - receive a header for the response
- // 3 - validate and receive response body
- // 4 - advance the state machine by calling handler()
- auto& cmd = op->command();
-
- // Step 4
- auto recvMessageCallback = [handler](std::error_code ec, size_t bytes) {
- // We don't call _validateAndRun here as we assume the caller will.
- handler(ec, bytes);
- };
-
- // Step 3
- auto recvHeaderCallback = [this, &cmd, handler, recvMessageCallback, op](std::error_code ec,
- size_t bytes) {
- // The operation could have been canceled after starting the command, but before
- // receiving the header
- _validateAndRun(op, ec, [recvMessageCallback, bytes, &cmd, handler] {
- // validate response id
- uint32_t expectedId = cmd.toSend().header().getId();
- uint32_t actualId = cmd.header().constView().getResponseToMsgId();
- if (actualId != expectedId) {
- LOG(3) << "got wrong response:"
- << " expected response id: " << expectedId
- << ", got response id: " << actualId;
- return handler(make_error_code(ErrorCodes::ProtocolError), bytes);
- }
-
- asyncRecvMessageBody(
- cmd.conn().stream(), &cmd.header(), &cmd.toRecv(), std::move(recvMessageCallback));
- });
- };
-
- // Step 2
- auto sendMessageCallback = [this, &cmd, handler, recvHeaderCallback, op](std::error_code ec,
- size_t bytes) {
- _validateAndRun(op, ec, [&cmd, recvHeaderCallback] {
- asyncRecvMessageHeader(
- cmd.conn().stream(), &cmd.header(), std::move(recvHeaderCallback));
- });
-
-
- };
-
- // Step 1
- asyncSendMessage(cmd.conn().stream(), &cmd.toSend(), std::move(sendMessageCallback));
-}
-
-void NetworkInterfaceASIO::_runConnectionHook(AsyncOp* op) {
- if (!_hook) {
- return _beginCommunication(op);
- }
-
- auto swOptionalRequest =
- callNoexcept(*_hook, &NetworkConnectionHook::makeRequest, op->request().target);
-
- if (!swOptionalRequest.isOK()) {
- return _completeOperation(op, swOptionalRequest.getStatus());
- }
-
- auto optionalRequest = std::move(swOptionalRequest.getValue());
-
- if (optionalRequest == boost::none) {
- return _beginCommunication(op);
- }
-
- auto beginStatus = op->beginCommand(*optionalRequest);
- if (!beginStatus.isOK()) {
- return _completeOperation(op, beginStatus);
- }
-
- auto finishHook = [this, op]() {
- auto response =
- op->command().response(op, op->operationProtocol(), now(), _metadataHook.get());
-
- if (!response.isOK()) {
- return _completeOperation(op, response);
- }
-
- auto handleStatus = callNoexcept(
- *_hook, &NetworkConnectionHook::handleReply, op->request().target, std::move(response));
-
- if (!handleStatus.isOK()) {
- return _completeOperation(op, handleStatus);
- }
-
- return _beginCommunication(op);
- };
-
- return _asyncRunCommand(op, [this, op, finishHook](std::error_code ec, std::size_t bytes) {
- _validateAndRun(op, ec, finishHook);
- });
-}
-
-
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/network_interface_asio_connect.cpp b/src/mongo/executor/network_interface_asio_connect.cpp
deleted file mode 100644
index 57a9f302b58..00000000000
--- a/src/mongo/executor/network_interface_asio_connect.cpp
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/executor/network_interface_asio.h"
-
-#include <utility>
-
-#include "mongo/base/system_error.h"
-#include "mongo/config.h"
-#include "mongo/db/wire_version.h"
-#include "mongo/executor/async_stream.h"
-#include "mongo/executor/async_stream_factory.h"
-#include "mongo/executor/async_stream_interface.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/util/log.h"
-
-namespace mongo {
-namespace executor {
-
-using asio::ip::tcp;
-
-NetworkInterfaceASIO::AsyncConnection::AsyncConnection(std::unique_ptr<AsyncStreamInterface> stream,
- rpc::ProtocolSet protocols)
- : _stream(std::move(stream)),
- _serverProtocols(protocols),
- _clientProtocols(rpc::computeProtocolSet(WireSpec::instance().outgoing)) {}
-
-AsyncStreamInterface& NetworkInterfaceASIO::AsyncConnection::stream() {
- return *_stream;
-}
-
-void NetworkInterfaceASIO::AsyncConnection::cancel() {
- _stream->cancel();
-}
-
-rpc::ProtocolSet NetworkInterfaceASIO::AsyncConnection::serverProtocols() const {
- return _serverProtocols;
-}
-
-rpc::ProtocolSet NetworkInterfaceASIO::AsyncConnection::clientProtocols() const {
- return _clientProtocols;
-}
-
-void NetworkInterfaceASIO::AsyncConnection::setServerProtocols(rpc::ProtocolSet protocols) {
- _serverProtocols = protocols;
-}
-
-void NetworkInterfaceASIO::_connect(AsyncOp* op) {
- log() << "Connecting to " << op->request().target.toString();
-
- tcp::resolver::query query(op->request().target.host(),
- std::to_string(op->request().target.port()));
- // TODO: Investigate how we might hint or use shortcuts to resolve when possible.
- const auto thenConnect = [this, op](std::error_code ec, tcp::resolver::iterator endpoints) {
- if (endpoints == tcp::resolver::iterator()) {
- // Workaround a bug in ASIO returning an invalid resolver iterator (with a non-error
- // std::error_code) when file descriptors are exhausted.
- ec = make_error_code(ErrorCodes::HostUnreachable);
- }
- _validateAndRun(
- op, ec, [this, op, endpoints]() { _setupSocket(op, std::move(endpoints)); });
- };
- op->resolver().async_resolve(query, op->_strand.wrap(std::move(thenConnect)));
-}
-
-void NetworkInterfaceASIO::_setupSocket(AsyncOp* op, tcp::resolver::iterator endpoints) {
- // TODO: Consider moving this call to post-auth so we only assign completed connections.
- {
- auto stream = _streamFactory->makeStream(&op->strand(), op->request().target);
- op->setConnection({std::move(stream), rpc::supports::kOpQueryOnly});
- }
-
- auto& stream = op->connection().stream();
-
- stream.connect(std::move(endpoints), [this, op](std::error_code ec) {
- _validateAndRun(op, ec, [this, op]() { _runIsMaster(op); });
- });
-}
-
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/network_interface_asio_integration_test.cpp b/src/mongo/executor/network_interface_asio_integration_test.cpp
deleted file mode 100644
index 79688a60d19..00000000000
--- a/src/mongo/executor/network_interface_asio_integration_test.cpp
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
-
-#include "mongo/platform/basic.h"
-
-#include <algorithm>
-#include <exception>
-
-#include "mongo/client/connection_string.h"
-#include "mongo/executor/network_interface_asio_integration_fixture.h"
-#include "mongo/executor/network_interface_asio_test_utils.h"
-#include "mongo/platform/random.h"
-#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/stdx/future.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/unittest/integration_test.h"
-#include "mongo/unittest/unittest.h"
-#include "mongo/util/assert_util.h"
-#include "mongo/util/concurrency/thread_pool.h"
-#include "mongo/util/log.h"
-#include "mongo/util/scopeguard.h"
-
-namespace mongo {
-namespace executor {
-namespace {
-
-TEST_F(NetworkInterfaceASIOIntegrationFixture, Ping) {
- startNet();
- assertCommandOK("admin", BSON("ping" << 1));
-}
-
-TEST_F(NetworkInterfaceASIOIntegrationFixture, Timeouts) {
- startNet();
- // This sleep command will take 10 seconds, so we should time out client side first given
- // our timeout of 100 milliseconds.
- assertCommandFailsOnClient("admin",
- BSON("sleep" << 1 << "lock"
- << "none"
- << "secs"
- << 10),
- ErrorCodes::NetworkInterfaceExceededTimeLimit,
- Milliseconds(100));
-
- // Run a sleep command that should return before we hit the ASIO timeout.
- assertCommandOK("admin",
- BSON("sleep" << 1 << "lock"
- << "none"
- << "secs"
- << 1),
- Milliseconds(10000000));
-}
-
-class StressTestOp {
-public:
- using Fixture = NetworkInterfaceASIOIntegrationFixture;
- using Pool = ThreadPoolInterface;
-
- void run(Fixture* fixture,
- StartCommandCB onFinish,
- Milliseconds timeout = RemoteCommandRequest::kNoTimeout) {
- auto cb = makeCallbackHandle();
-
- RemoteCommandRequest request{unittest::getFixtureConnectionString().getServers()[0],
- "admin",
- _command,
- nullptr,
- timeout};
-
- fixture->startCommand(cb, request, onFinish);
-
- if (_cancel) {
- invariant(fixture->getRandomNumberGenerator());
- sleepmillis(fixture->getRandomNumberGenerator()->nextInt32(10));
- fixture->net().cancelCommand(cb);
- }
- }
-
- static void runTimeoutOp(Fixture* fixture, StartCommandCB onFinish) {
- return StressTestOp(BSON("sleep" << 1 << "lock"
- << "none"
- << "secs"
- << 1),
- false)
- .run(fixture, onFinish, Milliseconds(100));
- }
-
- static void runCompleteOp(Fixture* fixture, StartCommandCB onFinish) {
- return StressTestOp(BSON("sleep" << 1 << "lock"
- << "none"
- << "millis"
- << 100),
- false)
- .run(fixture, onFinish);
- }
-
- static void runCancelOp(Fixture* fixture, StartCommandCB onFinish) {
- return StressTestOp(BSON("sleep" << 1 << "lock"
- << "none"
- << "secs"
- << 10),
- true)
- .run(fixture, onFinish);
- }
-
- static void runLongOp(Fixture* fixture, StartCommandCB onFinish) {
- return StressTestOp(BSON("sleep" << 1 << "lock"
- << "none"
- << "secs"
- << 30),
- false)
- .run(fixture, onFinish);
- }
-
-private:
- StressTestOp(const BSONObj& command, bool cancel) : _command(command), _cancel(cancel) {}
-
- BSONObj _command;
- bool _cancel;
-};
-
-TEST_F(NetworkInterfaceASIOIntegrationFixture, StressTest) {
- constexpr std::size_t numOps = 500;
- RemoteCommandResponse testResults[numOps];
- ErrorCodes::Error expectedResults[numOps];
- CountdownLatch cl(numOps);
-
- startNet();
-
- std::unique_ptr<SecureRandom> seedSource{SecureRandom::create()};
- auto seed = seedSource->nextInt64();
-
- log() << "Random seed is " << seed;
- auto rng = PseudoRandom(seed); // TODO: read from command line
- setRandomNumberGenerator(&rng);
- log() << "Starting stress test...";
-
- for (std::size_t i = 0; i < numOps; ++i) {
- // stagger operations slightly to mitigate connection pool contention
- sleepmillis(rng.nextInt32(50));
-
- auto r = rng.nextCanonicalDouble();
-
- auto cb = [&testResults, &cl, i](const RemoteCommandResponse& resp) {
- testResults[i] = resp;
- cl.countDown();
- };
-
- if (r < .3) {
- expectedResults[i] = ErrorCodes::CallbackCanceled;
- StressTestOp::runCancelOp(this, cb);
- } else if (r < .7) {
- expectedResults[i] = ErrorCodes::OK;
- StressTestOp::runCompleteOp(this, cb);
- } else if (r < .99) {
- expectedResults[i] = ErrorCodes::NetworkInterfaceExceededTimeLimit;
- StressTestOp::runTimeoutOp(this, cb);
- } else {
- // Just a sprinkling of long ops, to mitigate connection pool contention
- expectedResults[i] = ErrorCodes::OK;
- StressTestOp::runLongOp(this, cb);
- }
- };
-
- cl.await();
-
- for (std::size_t i = 0; i < numOps; ++i) {
- const auto& resp = testResults[i];
- auto ec = resp.isOK() ? getStatusFromCommandResult(resp.data) : resp.status;
- ASSERT_EQ(ec, expectedResults[i]);
- }
-}
-
-// Hook that intentionally never finishes
-class HangingHook : public executor::NetworkConnectionHook {
- Status validateHost(const HostAndPort&, const RemoteCommandResponse&) final {
- return Status::OK();
- }
-
- StatusWith<boost::optional<RemoteCommandRequest>> makeRequest(
- const HostAndPort& remoteHost) final {
- return {boost::make_optional(RemoteCommandRequest(remoteHost,
- "admin",
- BSON("sleep" << 1 << "lock"
- << "none"
- << "secs"
- << 100000000),
- BSONObj(),
- nullptr))};
- }
-
- Status handleReply(const HostAndPort& remoteHost, RemoteCommandResponse&& response) final {
- MONGO_UNREACHABLE;
- }
-};
-
-
-// Test that we time out a command if the connection hook hangs.
-TEST_F(NetworkInterfaceASIOIntegrationFixture, HookHangs) {
- NetworkInterfaceASIO::Options options;
- options.networkConnectionHook = stdx::make_unique<HangingHook>();
- startNet(std::move(options));
-
- assertCommandFailsOnClient(
- "admin", BSON("ping" << 1), ErrorCodes::NetworkInterfaceExceededTimeLimit, Seconds(1));
-}
-
-} // namespace
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/network_interface_asio_operation.cpp b/src/mongo/executor/network_interface_asio_operation.cpp
deleted file mode 100644
index 4a78f0c076e..00000000000
--- a/src/mongo/executor/network_interface_asio_operation.cpp
+++ /dev/null
@@ -1,408 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/executor/network_interface_asio.h"
-
-#include "mongo/base/status_with.h"
-#include "mongo/db/query/getmore_request.h"
-#include "mongo/db/query/query_request.h"
-#include "mongo/executor/async_stream_interface.h"
-#include "mongo/executor/connection_pool_asio.h"
-#include "mongo/executor/network_interface_asio.h"
-#include "mongo/rpc/factory.h"
-#include "mongo/rpc/metadata/metadata_hook.h"
-#include "mongo/util/log.h"
-#include "mongo/util/time_support.h"
-
-#define MONGO_ASYNC_OP_INVARIANT(_Expression, _Error) \
- do { \
- if (MONGO_unlikely(!(_Expression))) { \
- _failWithInfo(__FILE__, __LINE__, _Error); \
- } \
- } while (false)
-
-namespace mongo {
-namespace executor {
-
-using asio::ip::tcp;
-
-namespace {
-
-// Used to generate unique identifiers for AsyncOps, the same AsyncOp may
-// be used to run multiple distinct requests.
-AtomicUInt64 kAsyncOpIdCounter(0);
-
-} // namespace
-
-const NetworkInterfaceASIO::TableRow NetworkInterfaceASIO::AsyncOp::kFieldLabels = {
- "", "id", "states", "start_time", "request"};
-
-NetworkInterfaceASIO::AsyncOp::AsyncOp(NetworkInterfaceASIO* const owner,
- const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish,
- Date_t now)
- : _owner(owner),
- _cbHandle(cbHandle),
- _request(request),
- _onFinish(onFinish),
- _start(now),
- _resolver(owner->_io_service),
- _id(kAsyncOpIdCounter.addAndFetch(1)),
- _access(std::make_shared<AsyncOp::AccessControl>()),
- _inSetup(true),
- _inRefresh(false),
- _strand(owner->_io_service) {
- // No need to take lock when we aren't yet constructed.
- _transitionToState_inlock(State::kUninitialized);
-}
-
-void NetworkInterfaceASIO::AsyncOp::cancel() {
- LOG(2) << "Canceling operation; original request was: " << redact(request().toString());
- stdx::lock_guard<stdx::mutex> lk(_access->mutex);
- auto access = _access;
- auto generation = access->id;
-
- // An operation may be in mid-flight when it is canceled, so we cancel any
- // in-progress async ops but do not complete the operation now.
-
- _strand.post([this, access, generation] {
- stdx::lock_guard<stdx::mutex> lk(access->mutex);
- if (generation == access->id) {
- _transitionToState_inlock(AsyncOp::State::kCanceled);
- if (_connection) {
- _connection->cancel();
- }
- }
- });
-}
-
-bool NetworkInterfaceASIO::AsyncOp::canceled() const {
- return _hasSeenState(State::kCanceled);
-}
-
-void NetworkInterfaceASIO::AsyncOp::timeOut_inlock() {
- LOG(2) << "Operation timing out; original request was: " << redact(request().toString());
- auto access = _access;
- auto generation = access->id;
-
- // An operation may be in mid-flight when it times out, so we cancel any
- // in-progress stream operations but do not complete the operation now.
-
- _strand.post([this, access, generation] {
- stdx::lock_guard<stdx::mutex> lk(access->mutex);
- if (generation == access->id) {
- _transitionToState_inlock(AsyncOp::State::kTimedOut);
- if (_connection) {
- _connection->cancel();
- }
- }
- });
-}
-
-bool NetworkInterfaceASIO::AsyncOp::timedOut() const {
- return _hasSeenState(State::kTimedOut);
-}
-
-const TaskExecutor::CallbackHandle& NetworkInterfaceASIO::AsyncOp::cbHandle() const {
- return _cbHandle;
-}
-
-NetworkInterfaceASIO::AsyncConnection& NetworkInterfaceASIO::AsyncOp::connection() {
- MONGO_ASYNC_OP_INVARIANT(_connection.is_initialized(), "Connection not yet initialized");
- return *_connection;
-}
-
-void NetworkInterfaceASIO::AsyncOp::setConnection(AsyncConnection&& conn) {
- MONGO_ASYNC_OP_INVARIANT(!_connection.is_initialized(), "Connection already initialized");
- _connection = std::move(conn);
-}
-
-Status NetworkInterfaceASIO::AsyncOp::beginCommand(Message&& newCommand,
- const HostAndPort& target) {
- // NOTE: We operate based on the assumption that AsyncOp's
- // AsyncConnection does not change over its lifetime.
- MONGO_ASYNC_OP_INVARIANT(_connection.is_initialized(),
- "Connection should not change over AsyncOp's lifetime");
-
- auto swm = _connection->getCompressorManager().compressMessage(newCommand);
- if (!swm.isOK())
- return swm.getStatus();
-
- // Construct a new AsyncCommand object for each command.
- _command.emplace(_connection.get_ptr(), std::move(swm.getValue()), _owner->now(), target);
- return Status::OK();
-}
-
-Status NetworkInterfaceASIO::AsyncOp::beginCommand(const RemoteCommandRequest& request) {
- return beginCommand(
- rpc::messageFromOpMsgRequest(
- operationProtocol(),
- OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj, request.metadata)),
- request.target);
-}
-
-NetworkInterfaceASIO::AsyncCommand& NetworkInterfaceASIO::AsyncOp::command() {
- MONGO_ASYNC_OP_INVARIANT(_command.is_initialized(), "Command is not yet initialized");
-
- return *_command;
-}
-
-bool NetworkInterfaceASIO::AsyncOp::commandIsInitialized() const {
- return _command.is_initialized();
-}
-
-void NetworkInterfaceASIO::AsyncOp::finish(ResponseStatus&& rs) {
- // We never hold the access lock when we call finish from NetworkInterfaceASIO.
- _transitionToState(AsyncOp::State::kFinished);
-
- LOG(2) << "Request " << _request.id << " finished with response: "
- << redact(rs.isOK() ? rs.data.toString() : rs.status.toString());
-
- // Calling the completion handler may invalidate state in this op, so do it last.
- _onFinish(rs);
-}
-
-const RemoteCommandRequest& NetworkInterfaceASIO::AsyncOp::request() const {
- return _request;
-}
-
-void NetworkInterfaceASIO::AsyncOp::startProgress(Date_t startTime) {
- _start = startTime;
- // We never hold the access lock when we call startProgress from NetworkInterfaceASIO.
- _transitionToState(AsyncOp::State::kInProgress);
-}
-
-Date_t NetworkInterfaceASIO::AsyncOp::start() const {
- return _start;
-}
-
-rpc::Protocol NetworkInterfaceASIO::AsyncOp::operationProtocol() const {
- MONGO_ASYNC_OP_INVARIANT(_operationProtocol.is_initialized(), "Protocol not yet set");
- return *_operationProtocol;
-}
-
-void NetworkInterfaceASIO::AsyncOp::setOperationProtocol(rpc::Protocol proto) {
- MONGO_ASYNC_OP_INVARIANT(!_operationProtocol.is_initialized(), "Protocol already set");
- _operationProtocol = proto;
-}
-
-void NetworkInterfaceASIO::AsyncOp::setResponseMetadata(BSONObj m) {
- _responseMetadata = m;
-}
-
-BSONObj NetworkInterfaceASIO::AsyncOp::getResponseMetadata() {
- return _responseMetadata;
-}
-
-void NetworkInterfaceASIO::AsyncOp::reset() {
- // We don't reset owner as it never changes
- _cbHandle = {};
- _request = {};
- _onFinish = {};
- _connectionPoolHandle = {};
- // We don't reset _connection as we want to reuse it.
- // Ditto for _operationProtocol.
- _start = {};
- _timeoutAlarm.reset();
- // _id stays the same for the lifetime of this object.
- _command = boost::none;
- // _inSetup should always be false at this point.
- // We never hold the access lock when we call this from NetworkInterfaceASIO.
- clearStateTransitions();
-}
-
-void NetworkInterfaceASIO::AsyncOp::clearStateTransitions() {
- _transitionToState(AsyncOp::State::kUninitialized);
-}
-
-void NetworkInterfaceASIO::AsyncOp::setOnFinish(RemoteCommandCompletionFn&& onFinish) {
- _onFinish = std::move(onFinish);
-}
-
-// Return a string representation of the given state.
-std::string NetworkInterfaceASIO::AsyncOp::_stateToString(AsyncOp::State state) const {
- switch (state) {
- case State::kUninitialized:
- return "UNINITIALIZED";
- case State::kInProgress:
- return "IN_PROGRESS";
- case State::kTimedOut:
- return "TIMED_OUT";
- case State::kCanceled:
- return "CANCELED";
- case State::kFinished:
- return "DONE";
- case State::kNoState:
- return "---";
- default:
- MONGO_UNREACHABLE;
- }
-}
-
-std::string NetworkInterfaceASIO::AsyncOp::_stateString() const {
- str::stream s;
- s << "[ ";
-
- for (int i = 0; i < kMaxStateTransitions; i++) {
- if (_states[i] == State::kNoState) {
- break;
- }
-
- if (i != 0) {
- s << ", ";
- }
-
- s << _stateToString(_states[i]);
- }
-
- s << " ]";
-
- return s;
-}
-
-NetworkInterfaceASIO::TableRow NetworkInterfaceASIO::AsyncOp::getStringFields() const {
- // We leave a placeholder for an asterisk
- return {"", std::to_string(_id), _stateString(), _start.toString(), _request.toString()};
-}
-
-std::string NetworkInterfaceASIO::AsyncOp::toString() const {
- str::stream s;
- int fieldIdx = 1;
- bool first = true;
-
- for (auto field : getStringFields()) {
- if (field != "") {
- if (first) {
- first = false;
- } else {
- s << ", ";
- }
-
- s << kFieldLabels[fieldIdx] << ": " << field;
- fieldIdx++;
- }
- }
- return s;
-}
-
-bool NetworkInterfaceASIO::AsyncOp::operator==(const AsyncOp& other) const {
- return _id == other._id;
-}
-
-bool NetworkInterfaceASIO::AsyncOp::_hasSeenState(AsyncOp::State state) const {
- return std::any_of(std::begin(_states), std::end(_states), [state](AsyncOp::State _state) {
- return _state == state;
- });
-}
-
-void NetworkInterfaceASIO::AsyncOp::_transitionToState(AsyncOp::State newState) {
- stdx::lock_guard<stdx::mutex> lk(_access->mutex);
- _transitionToState_inlock(newState);
-}
-
-void NetworkInterfaceASIO::AsyncOp::_transitionToState_inlock(AsyncOp::State newState) {
- if (newState == State::kUninitialized) {
- _states[0] = State::kUninitialized;
- for (int i = 1; i < kMaxStateTransitions; i++) {
- _states[i] = State::kNoState;
- }
- return;
- }
-
- // We can transition to cancelled multiple times if cancel() is called
- // multiple times. Ignore that transition if we're already cancelled.
- if (newState == State::kCanceled) {
- // Find the current state
- auto iter = std::find_if_not(_states.rbegin(), _states.rend(), [](const State& state) {
- return state == State::kNoState;
- });
-
- // If its cancelled, just return
- if (iter != _states.rend() && *iter == State::kCanceled) {
- return;
- }
- }
-
- for (int i = 0; i < kMaxStateTransitions; i++) {
- // We can't transition to the same state twice.
- MONGO_ASYNC_OP_INVARIANT(_states[i] != newState,
- "Cannot use the same state (" + _stateToString(newState) +
- ") twice");
-
- if (_states[i] == State::kNoState) {
- // Perform some validation before transitioning.
- switch (newState) {
- case State::kInProgress:
- MONGO_ASYNC_OP_INVARIANT(i == 1,
- "kInProgress must come directly after kUninitialized");
- break;
- case State::kTimedOut:
- // During connection setup, it is possible to timeout before the stream is
- // initialized, so we have to allow this transition.
- break;
- case State::kCanceled:
- MONGO_ASYNC_OP_INVARIANT(
- i > 1, _stateToString(newState) + " must come after kInProgress");
- MONGO_ASYNC_OP_INVARIANT(_states[i - 1] != State::kUninitialized,
- _stateToString(newState) +
- " cannot come after kUninitialized");
- break;
- case State::kFinished:
- MONGO_ASYNC_OP_INVARIANT(i > 0, "kFinished must come after kUninitialized");
- break;
- default:
- MONGO_UNREACHABLE;
- }
-
- // Update state.
- _states[i] = newState;
- return;
- }
- }
-
- // If we get here, we've already transitioned to the max allowed states, explode.
- MONGO_UNREACHABLE;
-}
-
-void NetworkInterfaceASIO::AsyncOp::_failWithInfo(const char* file,
- int line,
- std::string error) const {
- std::stringstream ss;
- ss << "Invariant failure at " << file << ":" << line << ": " << error
- << ", Operation: " << toString();
- Status status{ErrorCodes::InternalError, ss.str()};
- fassertFailedWithStatus(34430, status);
-}
-
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/network_interface_asio_test.cpp b/src/mongo/executor/network_interface_asio_test.cpp
deleted file mode 100644
index a5ca74a89c7..00000000000
--- a/src/mongo/executor/network_interface_asio_test.cpp
+++ /dev/null
@@ -1,1079 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
-
-#include "mongo/platform/basic.h"
-
-#include <boost/optional.hpp>
-
-#include "mongo/base/status_with.h"
-#include "mongo/db/jsobj.h"
-#include "mongo/db/wire_version.h"
-#include "mongo/executor/async_mock_stream_factory.h"
-#include "mongo/executor/async_timer_mock.h"
-#include "mongo/executor/network_interface_asio.h"
-#include "mongo/executor/network_interface_asio_test_utils.h"
-#include "mongo/executor/test_network_connection_hook.h"
-#include "mongo/rpc/factory.h"
-#include "mongo/rpc/legacy_reply_builder.h"
-#include "mongo/rpc/message.h"
-#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/unittest/unittest.h"
-#include "mongo/util/log.h"
-#include "mongo/util/scopeguard.h"
-
-namespace mongo {
-namespace executor {
-namespace {
-
-using ResponseStatus = TaskExecutor::ResponseStatus;
-
-HostAndPort testHost{"localhost", 20000};
-
-void initWireSpecMongoD() {
- WireSpec& spec = WireSpec::instance();
- // accept from latest internal version
- spec.incomingInternalClient.minWireVersion = LATEST_WIRE_VERSION;
- spec.incomingInternalClient.maxWireVersion = LATEST_WIRE_VERSION;
- // accept from any external version
- spec.incomingExternalClient.minWireVersion = RELEASE_2_4_AND_BEFORE;
- spec.incomingExternalClient.maxWireVersion = LATEST_WIRE_VERSION;
- // connect to latest
- spec.outgoing.minWireVersion = LATEST_WIRE_VERSION;
- spec.outgoing.maxWireVersion = LATEST_WIRE_VERSION;
-}
-
-// Utility function to use with mock streams
-RemoteCommandResponse simulateIsMaster(RemoteCommandRequest request) {
- ASSERT_EQ(std::string{request.cmdObj.firstElementFieldName()}, "isMaster");
- ASSERT_EQ(request.dbname, "admin");
-
- RemoteCommandResponse response;
- response.data =
- BSON("minWireVersion" << mongo::WireSpec::instance().incomingExternalClient.minWireVersion
- << "maxWireVersion"
- << mongo::WireSpec::instance().incomingExternalClient.maxWireVersion);
- return response;
-}
-
-BSONObj objConcat(std::initializer_list<BSONObj> objs) {
- BSONObjBuilder bob;
-
- for (const auto& obj : objs) {
- bob.appendElements(obj);
- }
-
- return bob.obj();
-}
-
-class NetworkInterfaceASIOTest : public mongo::unittest::Test {
-public:
- void setUp() override {
- initWireSpecMongoD();
- NetworkInterfaceASIO::Options options;
-
- // Use mock timer factory
- auto timerFactory = stdx::make_unique<AsyncTimerFactoryMock>();
- _timerFactory = timerFactory.get();
- options.timerFactory = std::move(timerFactory);
-
- auto factory = stdx::make_unique<AsyncMockStreamFactory>();
- // keep unowned pointer, but pass ownership to NIA
- _streamFactory = factory.get();
- options.streamFactory = std::move(factory);
- _net = stdx::make_unique<NetworkInterfaceASIO>(std::move(options));
- _net->startup();
- }
-
- void tearDown() override {
- if (!_net->inShutdown()) {
- _net->shutdown();
- }
- }
-
- Deferred<RemoteCommandResponse> startCommand(const TaskExecutor::CallbackHandle& cbHandle,
- RemoteCommandRequest& request) {
- Deferred<RemoteCommandResponse> deferredResponse;
- ASSERT_OK(net().startCommand(
- cbHandle, request, [deferredResponse](ResponseStatus response) mutable {
- deferredResponse.emplace(std::move(response));
- }));
- return deferredResponse;
- }
-
- // Helper to run startCommand and wait for it
- RemoteCommandResponse startCommandSync(RemoteCommandRequest& request) {
- auto deferred = startCommand(makeCallbackHandle(), request);
-
- // wait for the operation to complete
- auto& result = deferred.get();
- return result;
- }
-
- NetworkInterfaceASIO& net() {
- return *_net;
- }
-
- AsyncMockStreamFactory& streamFactory() {
- return *_streamFactory;
- }
-
- AsyncTimerFactoryMock& timerFactory() {
- return *_timerFactory;
- }
-
- void assertNumOps(uint64_t canceled, uint64_t timedOut, uint64_t failed, uint64_t succeeded) {
- ASSERT_EQ(canceled, net().getNumCanceledOps());
- ASSERT_EQ(timedOut, net().getNumTimedOutOps());
- ASSERT_EQ(failed, net().getNumFailedOps());
- ASSERT_EQ(succeeded, net().getNumSucceededOps());
- }
-
-protected:
- AsyncTimerFactoryMock* _timerFactory;
- AsyncMockStreamFactory* _streamFactory;
- std::unique_ptr<NetworkInterfaceASIO> _net;
-};
-
-TEST_F(NetworkInterfaceASIOTest, CancelMissingOperation) {
- // This is just a sanity check, this action should have no effect.
- net().cancelCommand(makeCallbackHandle());
- assertNumOps(0u, 0u, 0u, 0u);
-}
-
-TEST_F(NetworkInterfaceASIOTest, CancelOperation) {
- auto cbh = makeCallbackHandle();
-
- // Kick off our operation
- RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr};
- auto deferred = startCommand(cbh, request);
-
- // Create and initialize a stream so operation can begin
- auto stream = streamFactory().blockUntilStreamExists(testHost);
- ConnectEvent{stream}.skip();
-
- // simulate isMaster reply.
- stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request) -> RemoteCommandResponse {
- return simulateIsMaster(request);
- });
-
- {
- // Cancel operation while blocked in the write for determinism. By calling cancel here we
- // ensure that it is not a no-op and that the asio::operation_aborted error will always
- // be returned to the NIA.
- WriteEvent write{stream};
- net().cancelCommand(cbh);
- }
-
- // Wait for op to complete, assert that it was canceled.
- auto& result = deferred.get();
- ASSERT_EQ(ErrorCodes::CallbackCanceled, result.status);
- ASSERT(result.elapsedMillis);
- assertNumOps(1u, 0u, 0u, 0u);
-}
-
-TEST_F(NetworkInterfaceASIOTest, ImmediateCancel) {
- auto cbh = makeCallbackHandle();
-
- RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr};
- auto deferred = startCommand(cbh, request);
-
- // Cancel immediately
- net().cancelCommand(cbh);
-
- // Allow stream to connect so operation can return
- auto stream = streamFactory().blockUntilStreamExists(testHost);
- ConnectEvent{stream}.skip();
- stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request) -> RemoteCommandResponse {
- return simulateIsMaster(request);
- });
-
- auto& result = deferred.get();
- ASSERT_EQ(ErrorCodes::CallbackCanceled, result.status);
- ASSERT(result.elapsedMillis);
- // expect 0 completed ops because the op was canceled before getting a connection
- assertNumOps(1u, 0u, 0u, 0u);
-}
-
-TEST_F(NetworkInterfaceASIOTest, LateCancel) {
- auto cbh = makeCallbackHandle();
- RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr};
- auto deferred = startCommand(cbh, request);
-
- // Allow stream to connect so operation can return
- auto stream = streamFactory().blockUntilStreamExists(testHost);
- ConnectEvent{stream}.skip();
- stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request) -> RemoteCommandResponse {
- return simulateIsMaster(request);
- });
-
- // Simulate user command
- stream->simulateServer(rpc::Protocol::kOpMsg,
- [](RemoteCommandRequest request) -> RemoteCommandResponse {
- RemoteCommandResponse response;
- response.data = BSONObj();
- response.metadata = BSONObj();
- return response;
- });
-
- // Allow to complete, then cancel, nothing should happen.
- auto& result = deferred.get();
- net().cancelCommand(cbh);
-
- ASSERT(result.isOK());
- ASSERT(result.elapsedMillis);
- assertNumOps(0u, 0u, 0u, 1u);
-}
-
-TEST_F(NetworkInterfaceASIOTest, CancelWithNetworkError) {
- auto cbh = makeCallbackHandle();
- RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr};
- auto deferred = startCommand(cbh, request);
-
- // Create and initialize a stream so operation can begin
- auto stream = streamFactory().blockUntilStreamExists(testHost);
- ConnectEvent{stream}.skip();
-
- // simulate isMaster reply.
- stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request) -> RemoteCommandResponse {
- return simulateIsMaster(request);
- });
-
- {
- WriteEvent{stream}.skip();
- ReadEvent read{stream};
-
- // Trigger both a cancellation and a network error
- stream->setError(make_error_code(ErrorCodes::HostUnreachable));
- net().cancelCommand(cbh);
- }
-
- // Wait for op to complete, assert that cancellation error had precedence.
- auto& result = deferred.get();
- ASSERT(result.status == ErrorCodes::CallbackCanceled);
- ASSERT(result.elapsedMillis);
- assertNumOps(1u, 0u, 0u, 0u);
-}
-
-TEST_F(NetworkInterfaceASIOTest, CancelWithTimeout) {
- auto cbh = makeCallbackHandle();
- RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr};
- auto deferred = startCommand(cbh, request);
-
- // Create and initialize a stream so operation can begin
- auto stream = streamFactory().blockUntilStreamExists(testHost);
- ConnectEvent{stream}.skip();
-
- stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request) -> RemoteCommandResponse {
- return simulateIsMaster(request);
- });
-
- {
- WriteEvent write{stream};
-
- // Trigger both a cancellation and a timeout
- net().cancelCommand(cbh);
- timerFactory().fastForward(Milliseconds(500));
- }
-
- // Wait for op to complete, assert that cancellation error had precedence.
- auto& result = deferred.get();
- ASSERT_EQ(ErrorCodes::CallbackCanceled, result.status);
- ASSERT(result.elapsedMillis);
- assertNumOps(1u, 0u, 0u, 0u);
-}
-
-TEST_F(NetworkInterfaceASIOTest, TimeoutWithNetworkError) {
- auto cbh = makeCallbackHandle();
- RemoteCommandRequest request{
- testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr, Milliseconds(1000)};
- auto deferred = startCommand(cbh, request);
-
- // Create and initialize a stream so operation can begin
- auto stream = streamFactory().blockUntilStreamExists(testHost);
- ConnectEvent{stream}.skip();
- stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request) -> RemoteCommandResponse {
- return simulateIsMaster(request);
- });
-
- {
- WriteEvent{stream}.skip();
- ReadEvent read{stream};
-
- // Trigger both a timeout and a network error
- stream->setError(make_error_code(ErrorCodes::HostUnreachable));
- timerFactory().fastForward(Milliseconds(2000));
- }
-
- // Wait for op to complete, assert that timeout had precedence.
- auto& result = deferred.get();
- ASSERT_EQ(ErrorCodes::NetworkInterfaceExceededTimeLimit, result.status);
- ASSERT(result.elapsedMillis);
- assertNumOps(0u, 1u, 1u, 0u);
-}
-
-TEST_F(NetworkInterfaceASIOTest, CancelWithTimeoutAndNetworkError) {
- auto cbh = makeCallbackHandle();
- RemoteCommandRequest request{
- testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr, Milliseconds(1000)};
- auto deferred = startCommand(cbh, request);
-
- // Create and initialize a stream so operation can begin
- auto stream = streamFactory().blockUntilStreamExists(testHost);
- ConnectEvent{stream}.skip();
- stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request) -> RemoteCommandResponse {
- return simulateIsMaster(request);
- });
-
- {
- WriteEvent{stream}.skip();
- ReadEvent read{stream};
-
- // Trigger a timeout, a cancellation, and a network error
- stream->setError(make_error_code(ErrorCodes::HostUnreachable));
- timerFactory().fastForward(Milliseconds(2000));
- net().cancelCommand(cbh);
- }
-
- // Wait for op to complete, assert that the cancellation had precedence.
- auto& result = deferred.get();
- ASSERT_EQ(ErrorCodes::CallbackCanceled, result.status);
- ASSERT(result.elapsedMillis);
- assertNumOps(1u, 0u, 0u, 0u);
-}
-
-TEST_F(NetworkInterfaceASIOTest, AsyncOpTimeout) {
- // Kick off operation
- auto cb = makeCallbackHandle();
- Milliseconds timeout(1000);
- RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr, timeout};
- auto deferred = startCommand(cb, request);
-
- // Create and initialize a stream so operation can begin
- auto stream = streamFactory().blockUntilStreamExists(testHost);
- ConnectEvent{stream}.skip();
-
- // Simulate isMaster reply.
- stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request) -> RemoteCommandResponse {
- return simulateIsMaster(request);
- });
-
- {
- // Wait for the operation to block on write so we know it's been added.
- WriteEvent write{stream};
-
- // Get the timer factory
- auto& factory = timerFactory();
-
- // Advance clock but not enough to force a timeout, assert still active
- factory.fastForward(Milliseconds(500));
- ASSERT(!deferred.hasCompleted());
-
- // Advance clock and force timeout
- factory.fastForward(Milliseconds(500));
- }
-
- auto& result = deferred.get();
- ASSERT_EQ(ErrorCodes::NetworkInterfaceExceededTimeLimit, result.status);
- ASSERT(result.elapsedMillis);
- assertNumOps(0u, 1u, 1u, 0u);
-}
-
-TEST_F(NetworkInterfaceASIOTest, StartCommand) {
- RemoteCommandRequest request{testHost, "testDB", BSON("foo" << 1), BSON("bar" << 1), nullptr};
- auto deferred = startCommand(makeCallbackHandle(), request);
-
- auto stream = streamFactory().blockUntilStreamExists(testHost);
-
- // Allow stream to connect.
- ConnectEvent{stream}.skip();
-
- // simulate isMaster reply.
- stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request) -> RemoteCommandResponse {
- return simulateIsMaster(request);
- });
-
- auto expectedMetadata = BSON("meep"
- << "beep");
- auto expectedCommandReply = BSON("boop"
- << "bop"
- << "ok"
- << 1.0);
-
- auto expectedCommandReplyWithMetadata = objConcat({expectedCommandReply, expectedMetadata});
-
- // simulate user command
- stream->simulateServer(
- rpc::Protocol::kOpMsg, [&](RemoteCommandRequest request) -> RemoteCommandResponse {
- ASSERT_EQ(std::string{request.cmdObj.firstElementFieldName()}, "foo");
- ASSERT_EQ(request.dbname, "testDB");
-
- RemoteCommandResponse response;
- response.data = expectedCommandReply;
- response.metadata = expectedMetadata;
- return response;
- });
-
- auto& res = deferred.get();
- ASSERT(res.elapsedMillis);
- uassertStatusOK(res.status);
- ASSERT_BSONOBJ_EQ(res.data, expectedCommandReplyWithMetadata);
- ASSERT_BSONOBJ_EQ(res.metadata, expectedCommandReplyWithMetadata);
- assertNumOps(0u, 0u, 0u, 1u);
-}
-
-TEST_F(NetworkInterfaceASIOTest, InShutdown) {
- ASSERT_FALSE(net().inShutdown());
- net().shutdown();
- ASSERT(net().inShutdown());
-}
-
-TEST_F(NetworkInterfaceASIOTest, StartCommandReturnsNotOKIfShutdownHasStarted) {
- net().shutdown();
- RemoteCommandRequest request;
- ASSERT_NOT_OK(
- net().startCommand(makeCallbackHandle(), request, [&](RemoteCommandResponse resp) {}));
-}
-
-class MalformedMessageTest : public NetworkInterfaceASIOTest {
-public:
- using MessageHook = stdx::function<void(MsgData::View)>;
-
- void runMessageTest(ErrorCodes::Error code, bool loadBody, MessageHook hook) {
- // Kick off our operation
- RemoteCommandRequest request{testHost, "testDB", BSON("ping" << 1), BSONObj(), nullptr};
- auto deferred = startCommand(makeCallbackHandle(), request);
-
- // Wait for it to block waiting for a write
- auto stream = streamFactory().blockUntilStreamExists(testHost);
- ConnectEvent{stream}.skip();
- stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request) -> RemoteCommandResponse {
- return simulateIsMaster(request);
- });
-
- uint32_t messageId = 0;
-
- {
- // Get the appropriate message id
- WriteEvent write{stream};
- std::vector<uint8_t> messageData = stream->popWrite();
- messageId =
- MsgData::ConstView(reinterpret_cast<const char*>(messageData.data())).getId();
- }
-
- // Build a mock reply message
- auto replyBuilder = rpc::makeReplyBuilder(rpc::Protocol::kOpMsg);
- replyBuilder->setCommandReply(BSON("hello!" << 1));
- replyBuilder->setMetadata(BSONObj());
-
- auto message = replyBuilder->done();
- message.header().setResponseToMsgId(messageId);
-
- auto actualSize = message.header().getLen();
-
- // Allow caller to mess with the Message
- hook(message.header());
-
- {
- // Load the header
- ReadEvent read{stream};
- auto headerBytes = reinterpret_cast<const uint8_t*>(message.header().view2ptr());
- stream->pushRead({headerBytes, headerBytes + sizeof(MSGHEADER::Value)});
- }
-
- if (loadBody) {
- // Load the body if we need to
- ReadEvent read{stream};
- auto dataBytes = reinterpret_cast<const uint8_t*>(message.buf());
- auto body = dataBytes;
- std::advance(body, sizeof(MSGHEADER::Value));
- stream->pushRead({body, dataBytes + static_cast<std::size_t>(actualSize)});
- }
-
- auto& response = deferred.get();
- ASSERT_EQ(code, response.status);
- ASSERT(response.elapsedMillis);
- assertNumOps(0u, 0u, 1u, 0u);
- }
-};
-
-TEST_F(MalformedMessageTest, messageHeaderWrongResponseTo) {
- runMessageTest(ErrorCodes::ProtocolError, false, [](MsgData::View message) {
- message.setResponseToMsgId(message.getResponseToMsgId() + 1);
- });
-}
-
-TEST_F(MalformedMessageTest, messageHeaderlenZero) {
- runMessageTest(
- ErrorCodes::InvalidLength, false, [](MsgData::View message) { message.setLen(0); });
-}
-
-TEST_F(MalformedMessageTest, MessageHeaderLenTooSmall) {
- runMessageTest(ErrorCodes::InvalidLength, false, [](MsgData::View message) {
- message.setLen(6);
- }); // min is 16
-}
-
-TEST_F(MalformedMessageTest, MessageHeaderLenTooLarge) {
- runMessageTest(ErrorCodes::InvalidLength, false, [](MsgData::View message) {
- message.setLen(48000001);
- }); // max is 48000000
-}
-
-TEST_F(MalformedMessageTest, MessageHeaderLenNegative) {
- runMessageTest(
- ErrorCodes::InvalidLength, false, [](MsgData::View message) { message.setLen(-1); });
-}
-
-TEST_F(MalformedMessageTest, MessageLenSmallerThanActual) {
- runMessageTest(ErrorCodes::InvalidBSON, true, [](MsgData::View message) {
- message.setLen(message.getLen() - 10);
- });
-}
-
-TEST_F(MalformedMessageTest, FailedToReadAllBytesForMessage) {
- runMessageTest(ErrorCodes::InvalidLength, true, [](MsgData::View message) {
- message.setLen(message.getLen() + 100);
- });
-}
-
-TEST_F(MalformedMessageTest, UnsupportedOpcode) {
- runMessageTest(ErrorCodes::UnsupportedFormat, true, [](MsgData::View message) {
- message.setOperation(2222);
- });
-}
-
-TEST_F(MalformedMessageTest, MismatchedOpcode) {
- runMessageTest(ErrorCodes::UnsupportedFormat, true, [](MsgData::View message) {
- message.setOperation(2006);
- });
-}
-
-class NetworkInterfaceASIOConnectionHookTest : public NetworkInterfaceASIOTest {
-public:
- void setUp() override {}
-
- void start(std::unique_ptr<NetworkConnectionHook> hook) {
- auto factory = stdx::make_unique<AsyncMockStreamFactory>();
- // keep unowned pointer, but pass ownership to NIA
- _streamFactory = factory.get();
- NetworkInterfaceASIO::Options options{};
- options.streamFactory = std::move(factory);
- options.networkConnectionHook = std::move(hook);
- options.timerFactory = stdx::make_unique<AsyncTimerFactoryMock>();
- _net = stdx::make_unique<NetworkInterfaceASIO>(std::move(options));
- _net->startup();
- }
-};
-
-TEST_F(NetworkInterfaceASIOConnectionHookTest, InvalidIsMaster) {
- auto validationFailedStatus =
- Status(ErrorCodes::InterruptedDueToReplStateChange, "operation was interrupted");
-
- start(makeTestHook(
- [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) {
- return Status(ErrorCodes::UnknownError, "unused");
- },
- [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> {
- return {boost::none};
- },
- [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) {
- return Status::OK();
- }));
-
- RemoteCommandRequest request{testHost,
- "blah",
- BSON("foo"
- << "bar"),
- nullptr};
- auto deferred = startCommand(makeCallbackHandle(), request);
-
- auto stream = streamFactory().blockUntilStreamExists(testHost);
-
- ConnectEvent{stream}.skip();
-
- // simulate isMaster reply.
- stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request) -> RemoteCommandResponse {
- RemoteCommandResponse response;
- response.data = BSON("ok" << 0.0 << "errmsg"
- << "operation was interrupted"
- << "code"
- << 11602);
- return response;
- });
-
- // we should stop here.
- auto& res = deferred.get();
- ASSERT_EQ(validationFailedStatus, res.status);
- ASSERT(res.elapsedMillis);
-
- assertNumOps(0u, 0u, 1u, 0u);
-}
-
-TEST_F(NetworkInterfaceASIOConnectionHookTest, ValidateHostInvalid) {
- bool validateCalled = false;
- bool hostCorrect = false;
- bool isMasterReplyCorrect = false;
- bool makeRequestCalled = false;
- bool handleReplyCalled = false;
-
- auto validationFailedStatus = Status(ErrorCodes::AlreadyInitialized, "blahhhhh");
-
- start(makeTestHook(
- [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) {
- validateCalled = true;
- hostCorrect = (remoteHost == testHost);
- isMasterReplyCorrect = (isMasterReply.data["TESTKEY"].str() == "TESTVALUE");
- return validationFailedStatus;
- },
- [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> {
- makeRequestCalled = true;
- return {boost::none};
- },
- [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) {
- handleReplyCalled = true;
- return Status::OK();
- }));
-
- RemoteCommandRequest request{testHost,
- "blah",
- BSON("foo"
- << "bar"),
- nullptr};
- auto deferred = startCommand(makeCallbackHandle(), request);
-
- auto stream = streamFactory().blockUntilStreamExists(testHost);
-
- ConnectEvent{stream}.skip();
-
- // simulate isMaster reply.
- stream->simulateServer(
- rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse {
- RemoteCommandResponse response;
- response.data =
- BSON("minWireVersion"
- << mongo::WireSpec::instance().incomingExternalClient.minWireVersion
- << "maxWireVersion"
- << mongo::WireSpec::instance().incomingExternalClient.maxWireVersion
- << "TESTKEY"
- << "TESTVALUE");
- return response;
- });
-
- // we should stop here.
- auto& res = deferred.get();
- ASSERT_EQ(validationFailedStatus, res.status);
- ASSERT(res.elapsedMillis);
- ASSERT(validateCalled);
- ASSERT(hostCorrect);
- ASSERT(isMasterReplyCorrect);
-
- ASSERT(!makeRequestCalled);
- ASSERT(!handleReplyCalled);
- assertNumOps(0u, 0u, 1u, 0u);
-}
-
-TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsError) {
- bool makeRequestCalled = false;
- bool handleReplyCalled = false;
-
- Status makeRequestError{ErrorCodes::DBPathInUse, "bloooh"};
-
- start(makeTestHook(
- [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status {
- return Status::OK();
- },
- [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> {
- makeRequestCalled = true;
- return makeRequestError;
- },
- [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) {
- handleReplyCalled = true;
- return Status::OK();
- }));
-
- RemoteCommandRequest request{testHost,
- "blah",
- BSON("foo"
- << "bar"),
- nullptr};
- auto deferred = startCommand(makeCallbackHandle(), request);
-
- auto stream = streamFactory().blockUntilStreamExists(testHost);
- ConnectEvent{stream}.skip();
-
- // simulate isMaster reply.
- stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request) -> RemoteCommandResponse {
- return simulateIsMaster(request);
- });
-
- // We should stop here.
- auto& res = deferred.get();
-
- ASSERT_EQ(makeRequestError, res.status);
- ASSERT(res.elapsedMillis);
- ASSERT(makeRequestCalled);
- ASSERT(!handleReplyCalled);
- assertNumOps(0u, 0u, 1u, 0u);
-}
-
-TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsNone) {
- bool makeRequestCalled = false;
- bool handleReplyCalled = false;
-
- start(makeTestHook(
- [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status {
- return Status::OK();
- },
- [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> {
- makeRequestCalled = true;
- return {boost::none};
- },
- [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) {
- handleReplyCalled = true;
- return Status::OK();
- }));
-
- auto commandRequest = BSON("foo"
- << "bar");
-
- auto commandReply = BSON("foo"
- << "boo"
- << "ok"
- << 1.0);
-
- auto metadata = BSON("aaa"
- << "bbb");
-
- auto commandReplyWithMetadata = objConcat({commandReply, metadata});
-
- RemoteCommandRequest request{testHost, "blah", commandRequest, nullptr};
- auto deferred = startCommand(makeCallbackHandle(), request);
-
- auto stream = streamFactory().blockUntilStreamExists(testHost);
- ConnectEvent{stream}.skip();
-
- // simulate isMaster reply.
- stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request) -> RemoteCommandResponse {
- return simulateIsMaster(request);
- });
-
- // Simulate user command.
- stream->simulateServer(rpc::Protocol::kOpMsg,
- [&](RemoteCommandRequest request) -> RemoteCommandResponse {
- ASSERT_BSONOBJ_EQ(commandRequest, request.cmdObj.removeField("$db"));
-
- RemoteCommandResponse response;
- response.data = commandReply;
- response.metadata = metadata;
- return response;
- });
-
- // We should get back the reply now.
- auto& result = deferred.get();
-
- ASSERT(result.isOK());
- ASSERT_BSONOBJ_EQ(commandReplyWithMetadata, result.data);
- ASSERT(result.elapsedMillis);
- ASSERT_BSONOBJ_EQ(commandReplyWithMetadata, result.metadata);
- assertNumOps(0u, 0u, 0u, 1u);
-}
-
-TEST_F(NetworkInterfaceASIOConnectionHookTest, HandleReplyReturnsError) {
- bool makeRequestCalled = false;
-
- bool handleReplyCalled = false;
- bool handleReplyArgumentCorrect = false;
-
- BSONObj hookCommandRequest = BSON("1ddd"
- << "fff");
- BSONObj hookRequestMetadata = BSON("wdwd" << 1212);
-
- BSONObj hookCommandReply = BSON("blah"
- << "blah"
- << "ok"
- << 1.0);
-
- BSONObj hookUnifiedRequest = ([&] {
- BSONObjBuilder bob;
- bob.appendElements(hookCommandRequest);
- bob.appendElements(hookRequestMetadata);
- bob.append("$db", "foo");
- return bob.obj();
- }());
-
- BSONObj hookReplyMetadata = BSON("1111" << 2222);
- BSONObj hookCommandReplyWithMetadata = objConcat({hookCommandReply, hookReplyMetadata});
-
- Status handleReplyError{ErrorCodes::AuthSchemaIncompatible, "daowdjkpowkdjpow"};
-
- start(makeTestHook(
- [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status {
- return Status::OK();
- },
- [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> {
- makeRequestCalled = true;
- return {boost::make_optional<RemoteCommandRequest>(
- {testHost, "foo", hookCommandRequest, hookRequestMetadata, nullptr})};
-
- },
- [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) {
- handleReplyCalled = true;
- handleReplyArgumentCorrect = SimpleBSONObjComparator::kInstance.evaluate(
- response.data == hookCommandReplyWithMetadata) &&
- SimpleBSONObjComparator::kInstance.evaluate(response.metadata ==
- hookCommandReplyWithMetadata);
- return handleReplyError;
- }));
-
- auto commandRequest = BSON("foo"
- << "bar");
- RemoteCommandRequest request{testHost, "blah", commandRequest, nullptr};
- auto deferred = startCommand(makeCallbackHandle(), request);
-
- auto stream = streamFactory().blockUntilStreamExists(testHost);
- ConnectEvent{stream}.skip();
-
- // simulate isMaster reply.
- stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request) -> RemoteCommandResponse {
- return simulateIsMaster(request);
- });
-
- // Simulate hook reply
- stream->simulateServer(rpc::Protocol::kOpMsg,
- [&](RemoteCommandRequest request) -> RemoteCommandResponse {
- ASSERT_BSONOBJ_EQ(request.cmdObj, hookUnifiedRequest);
- ASSERT_BSONOBJ_EQ(request.metadata, BSONObj());
-
- RemoteCommandResponse response;
- response.data = hookCommandReply;
- response.metadata = hookReplyMetadata;
- return response;
- });
-
- auto& result = deferred.get();
- ASSERT_EQ(handleReplyError, result.status);
- ASSERT(result.elapsedMillis);
- ASSERT(makeRequestCalled);
- ASSERT(handleReplyCalled);
- ASSERT(handleReplyArgumentCorrect);
- assertNumOps(0u, 0u, 1u, 0u);
-}
-
-TEST_F(NetworkInterfaceASIOTest, SetAlarm) {
- // set a first alarm, to execute after "expiration"
- Date_t expiration = net().now() + Milliseconds(100);
-
- Deferred<Date_t> deferred;
- ASSERT_OK(net().setAlarm(
- expiration, [this, expiration, deferred]() mutable { deferred.emplace(net().now()); }));
-
- // Get our timer factory
- auto& factory = timerFactory();
-
- // force the alarm to fire
- factory.fastForward(Milliseconds(5000));
-
- // assert that it executed after "expiration"
- auto& result = deferred.get();
- ASSERT(result >= expiration);
-
- expiration = net().now() + Milliseconds(99999999);
- Deferred<bool> deferred2;
- ASSERT_OK(net().setAlarm(expiration, [this, deferred2]() mutable { deferred2.emplace(true); }));
-
- net().shutdown();
- ASSERT(!deferred2.hasCompleted());
-}
-
-TEST_F(NetworkInterfaceASIOTest, SetAlarmReturnsNotOKIfShutdownHasStarted) {
- net().shutdown();
- ASSERT_NOT_OK(net().setAlarm(net().now() + Milliseconds(100), [] {}));
-}
-
-TEST_F(NetworkInterfaceASIOTest, IsMasterRequestContainsOutgoingWireVersionInternalClientInfo) {
- WireSpec::instance().isInternalClient = true;
-
- RemoteCommandRequest request{testHost, "testDB", BSON("ping" << 1), BSONObj(), nullptr};
- auto deferred = startCommand(makeCallbackHandle(), request);
- auto stream = streamFactory().blockUntilStreamExists(testHost);
- ConnectEvent{stream}.skip();
-
- // Verify that the isMaster reply has the expected internalClient data.
- stream->simulateServer(
- rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse {
- auto internalClientElem = request.cmdObj["internalClient"];
- ASSERT_EQ(internalClientElem.type(), BSONType::Object);
- auto minWireVersionElem = internalClientElem.Obj()["minWireVersion"];
- auto maxWireVersionElem = internalClientElem.Obj()["maxWireVersion"];
- ASSERT_EQ(minWireVersionElem.type(), BSONType::NumberInt);
- ASSERT_EQ(maxWireVersionElem.type(), BSONType::NumberInt);
- ASSERT_EQ(minWireVersionElem.numberInt(), WireSpec::instance().outgoing.minWireVersion);
- ASSERT_EQ(maxWireVersionElem.numberInt(), WireSpec::instance().outgoing.maxWireVersion);
- return simulateIsMaster(request);
- });
-
- // Simulate ping reply.
- stream->simulateServer(rpc::Protocol::kOpMsg,
- [&](RemoteCommandRequest request) -> RemoteCommandResponse {
- RemoteCommandResponse response;
- response.data = BSON("ok" << 1);
- return response;
- });
-
- // Verify that the ping op is counted as a success.
- auto& res = deferred.get();
- ASSERT(res.elapsedMillis);
- assertNumOps(0u, 0u, 0u, 1u);
-}
-
-TEST_F(NetworkInterfaceASIOTest, IsMasterRequestMissingInternalClientInfoWhenNotInternalClient) {
- WireSpec::instance().isInternalClient = false;
-
- RemoteCommandRequest request{testHost, "testDB", BSON("ping" << 1), BSONObj(), nullptr};
- auto deferred = startCommand(makeCallbackHandle(), request);
- auto stream = streamFactory().blockUntilStreamExists(testHost);
- ConnectEvent{stream}.skip();
-
- // Verify that the isMaster reply has the expected internalClient data.
- stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request) -> RemoteCommandResponse {
- ASSERT_FALSE(request.cmdObj["internalClient"]);
- return simulateIsMaster(request);
- });
-
- // Simulate ping reply.
- stream->simulateServer(rpc::Protocol::kOpMsg,
- [&](RemoteCommandRequest request) -> RemoteCommandResponse {
- RemoteCommandResponse response;
- response.data = BSON("ok" << 1);
- return response;
- });
-
- // Verify that the ping op is counted as a success.
- auto& res = deferred.get();
- ASSERT(res.elapsedMillis);
- assertNumOps(0u, 0u, 0u, 1u);
-}
-
-class NetworkInterfaceASIOMetadataTest : public NetworkInterfaceASIOTest {
-protected:
- void setUp() override {}
-
- void start(std::unique_ptr<rpc::EgressMetadataHook> metadataHook) {
- auto factory = stdx::make_unique<AsyncMockStreamFactory>();
- _streamFactory = factory.get();
- NetworkInterfaceASIO::Options options{};
- options.streamFactory = std::move(factory);
- options.metadataHook = std::move(metadataHook);
- options.timerFactory = stdx::make_unique<AsyncTimerFactoryMock>();
- _net = stdx::make_unique<NetworkInterfaceASIO>(std::move(options));
- _net->startup();
- }
-};
-
-class TestMetadataHook : public rpc::EgressMetadataHook {
-public:
- TestMetadataHook(bool* wroteRequestMetadata, bool* gotReplyMetadata)
- : _wroteRequestMetadata(wroteRequestMetadata), _gotReplyMetadata(gotReplyMetadata) {}
-
- Status writeRequestMetadata(OperationContext* opCtx, BSONObjBuilder* metadataBob) override {
- metadataBob->append("foo", "bar");
- *_wroteRequestMetadata = true;
- return Status::OK();
- }
-
- Status readReplyMetadata(OperationContext* opCtx,
- StringData replySource,
- const BSONObj& metadataObj) override {
- *_gotReplyMetadata = (metadataObj["baz"].str() == "garply");
- return Status::OK();
- }
-
-private:
- bool* _wroteRequestMetadata;
- bool* _gotReplyMetadata;
-};
-
-TEST_F(NetworkInterfaceASIOMetadataTest, Metadata) {
- bool wroteRequestMetadata = false;
- bool gotReplyMetadata = false;
- auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>();
- hookList->addHook(
- stdx::make_unique<TestMetadataHook>(&wroteRequestMetadata, &gotReplyMetadata));
- start(std::move(hookList));
-
- RemoteCommandRequest request{testHost, "blah", BSON("ping" << 1), nullptr};
- auto deferred = startCommand(makeCallbackHandle(), request);
-
- auto stream = streamFactory().blockUntilStreamExists(testHost);
- ConnectEvent{stream}.skip();
-
- // simulate isMaster reply.
- stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request) -> RemoteCommandResponse {
- return simulateIsMaster(request);
- });
-
- // Simulate hook reply
- stream->simulateServer(rpc::Protocol::kOpMsg,
- [&](RemoteCommandRequest request) -> RemoteCommandResponse {
- ASSERT_EQ("bar", request.cmdObj["foo"].str());
- RemoteCommandResponse response;
- response.data = BSON("ok" << 1);
- response.metadata = BSON("baz"
- << "garply");
- return response;
- });
-
- auto& res = deferred.get();
- ASSERT(res.elapsedMillis);
- ASSERT(wroteRequestMetadata);
- ASSERT(gotReplyMetadata);
- assertNumOps(0u, 0u, 0u, 1u);
-}
-
-} // namespace
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/network_interface_asio_test_utils.h b/src/mongo/executor/network_interface_asio_test_utils.h
deleted file mode 100644
index a95da3dfc5f..00000000000
--- a/src/mongo/executor/network_interface_asio_test_utils.h
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-#pragma once
-
-#include <memory>
-
-#include <boost/optional.hpp>
-#include <utility>
-#include <vector>
-
-#include "mongo/executor/task_executor.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/future.h"
-#include "mongo/stdx/mutex.h"
-#include "mongo/util/concurrency/thread_pool_interface.h"
-
-namespace mongo {
-namespace executor {
-
-/**
- * A mock class mimicking TaskExecutor::CallbackState, does nothing.
- */
-class MockCallbackState final : public TaskExecutor::CallbackState {
-public:
- MockCallbackState() = default;
- void cancel() override {}
- void waitForCompletion() override {}
- bool isCanceled() const override {
- return false;
- }
-};
-
-inline TaskExecutor::CallbackHandle makeCallbackHandle() {
- return TaskExecutor::CallbackHandle(std::make_shared<MockCallbackState>());
-}
-
-/**
- * Simple future-like utility for waiting for the result of startCommand.
- */
-template <typename T>
-class Deferred {
-public:
- template <typename... Args>
- void emplace(Args&&... args) {
- _emplace(_state.get(), std::forward<Args>(args)...);
- }
-
- T& get() {
- return _get(_state.get());
- }
-
- bool hasCompleted() {
- stdx::unique_lock<stdx::mutex> lk(_state->mtx);
- return _state->thing.is_initialized();
- }
-
- template <typename Continuation>
- auto then(ThreadPoolInterface* pool, Continuation&& continuation)
- -> Deferred<decltype(continuation(std::declval<Deferred<T>>().get()))> {
- // XXX: The ugliness of the above type signature is because you can't refer to 'this' in
- // a template parameter, at least on g++-4.8.2.
- auto state = _state;
- Deferred<decltype(continuation(get()))> thenDeferred;
-
- pool->schedule([this, thenDeferred, continuation, state]() mutable {
- thenDeferred.emplace(continuation(_get(state.get())));
- });
- return thenDeferred;
- }
-
-private:
- struct State {
- stdx::mutex mtx;
- stdx::condition_variable cv;
- boost::optional<T> thing;
- };
-
- template <typename... Args>
- void _emplace(State* state, Args&&... args) {
- stdx::lock_guard<stdx::mutex> lk(_state->mtx);
- invariant(!state->thing.is_initialized());
- state->thing.emplace(std::forward<T>(args)...);
- state->cv.notify_one();
- }
-
- T& _get(State* state) {
- stdx::unique_lock<stdx::mutex> lk(state->mtx);
- state->cv.wait(lk, [state] { return state->thing.is_initialized(); });
- return *state->thing;
- }
-
- std::shared_ptr<State> _state = std::make_shared<State>();
-};
-
-class CountdownLatch {
-public:
- explicit CountdownLatch(uint32_t count) : _count(count) {}
-
- void countDown() {
- stdx::lock_guard<stdx::mutex> lk(_mtx);
- if (_count == 0) {
- return;
- }
-
- --_count;
- if (_count == 0) {
- _cv.notify_all();
- }
- }
-
- void await() {
- stdx::unique_lock<stdx::mutex> lk(_mtx);
- _cv.wait(lk, [&] { return _count == 0; });
- }
-
-private:
- stdx::mutex _mtx;
- stdx::condition_variable _cv;
- size_t _count;
-};
-
-namespace helpers {
-
-template <typename T>
-static Deferred<std::vector<T>> collect(std::vector<Deferred<T>>& ds, ThreadPoolInterface* pool) {
- Deferred<std::vector<T>> out;
- struct CollectState {
- // hack to avoid requiring U to be default constructible.
- std::vector<boost::optional<T>> mem{};
- std::size_t numFinished = 0;
- std::size_t goal = 0;
- stdx::mutex mtx;
- };
-
- auto collectState = std::make_shared<CollectState>();
- collectState->goal = ds.size();
- collectState->mem.resize(collectState->goal);
-
- for (std::size_t i = 0; i < ds.size(); ++i) {
- ds[i].then(pool, [collectState, out, i](T res) mutable {
- // The bool return is unused.
- stdx::lock_guard<stdx::mutex> lk(collectState->mtx);
- collectState->mem[i] = std::move(res);
-
- // If we're done.
- if (collectState->goal == ++collectState->numFinished) {
- std::vector<T> outInitialized;
- outInitialized.reserve(collectState->mem.size());
- for (auto&& mem_entry : collectState->mem) {
- outInitialized.emplace_back(std::move(*mem_entry));
- }
- out.emplace(outInitialized);
- }
- return true;
- });
- }
- return out;
-}
-
-} // namespace helpers
-
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/network_interface_factory.cpp b/src/mongo/executor/network_interface_factory.cpp
index 3890e8bc64f..0f4f8526e64 100644
--- a/src/mongo/executor/network_interface_factory.cpp
+++ b/src/mongo/executor/network_interface_factory.cpp
@@ -34,16 +34,11 @@
#include "mongo/base/status.h"
#include "mongo/config.h"
#include "mongo/db/server_parameters.h"
-#include "mongo/executor/async_secure_stream_factory.h"
-#include "mongo/executor/async_stream_factory.h"
-#include "mongo/executor/async_stream_interface.h"
-#include "mongo/executor/async_timer_asio.h"
#include "mongo/executor/connection_pool.h"
#include "mongo/executor/network_connection_hook.h"
#include "mongo/executor/network_interface_tl.h"
#include "mongo/rpc/metadata/metadata_hook.h"
#include "mongo/stdx/memory.h"
-#include "mongo/util/net/ssl_manager.h"
namespace mongo {
namespace executor {
diff --git a/src/mongo/executor/network_interface_factory.h b/src/mongo/executor/network_interface_factory.h
index 7aa8a8cdce5..712cf8d812e 100644
--- a/src/mongo/executor/network_interface_factory.h
+++ b/src/mongo/executor/network_interface_factory.h
@@ -32,18 +32,13 @@
#include <string>
#include "mongo/executor/connection_pool.h"
+#include "mongo/executor/network_connection_hook.h"
#include "mongo/executor/network_interface.h"
+#include "mongo/rpc/metadata/metadata_hook.h"
namespace mongo {
-
-namespace rpc {
-class EgressMetadataHook;
-} // namespace rpc
-
namespace executor {
-class NetworkConnectionHook;
-
/**
* Returns a new NetworkInterface.
*/
diff --git a/src/mongo/executor/network_interface_asio_integration_fixture.cpp b/src/mongo/executor/network_interface_integration_fixture.cpp
index 0a9d8cff01b..6a593374fb6 100644
--- a/src/mongo/executor/network_interface_asio_integration_fixture.cpp
+++ b/src/mongo/executor/network_interface_integration_fixture.cpp
@@ -31,11 +31,8 @@
#include "mongo/platform/basic.h"
#include "mongo/client/connection_string.h"
-#include "mongo/executor/async_stream_factory.h"
-#include "mongo/executor/async_stream_interface.h"
-#include "mongo/executor/async_timer_asio.h"
-#include "mongo/executor/network_interface_asio.h"
-#include "mongo/executor/network_interface_asio_integration_fixture.h"
+#include "mongo/executor/network_interface_factory.h"
+#include "mongo/executor/network_interface_integration_fixture.h"
#include "mongo/executor/remote_command_response.h"
#include "mongo/executor/task_executor.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -47,62 +44,57 @@
namespace mongo {
namespace executor {
-void NetworkInterfaceASIOIntegrationFixture::startNet(NetworkInterfaceASIO::Options options) {
- options.streamFactory = stdx::make_unique<AsyncStreamFactory>();
- options.timerFactory = stdx::make_unique<AsyncTimerFactoryASIO>();
+
+void NetworkInterfaceIntegrationFixture::startNet(
+ std::unique_ptr<NetworkConnectionHook> connectHook) {
+ ConnectionPool::Options options;
#ifdef _WIN32
// Connections won't queue on widnows, so attempting to open too many connections
// concurrently will result in refused connections and test failure.
- options.connectionPoolOptions.maxConnections = 16u;
+ options.maxConnections = 16u;
#else
- options.connectionPoolOptions.maxConnections = 256u;
+ options.maxConnections = 256u;
#endif
- _net = stdx::make_unique<NetworkInterfaceASIO>(std::move(options));
+ _net = makeNetworkInterface(
+ "NetworkInterfaceIntegrationFixture", std::move(connectHook), nullptr, std::move(options));
+
_net->startup();
}
-void NetworkInterfaceASIOIntegrationFixture::tearDown() {
+void NetworkInterfaceIntegrationFixture::tearDown() {
if (!_net->inShutdown()) {
_net->shutdown();
}
}
-NetworkInterfaceASIO& NetworkInterfaceASIOIntegrationFixture::net() {
+NetworkInterface& NetworkInterfaceIntegrationFixture::net() {
return *_net;
}
-ConnectionString NetworkInterfaceASIOIntegrationFixture::fixture() {
+ConnectionString NetworkInterfaceIntegrationFixture::fixture() {
return unittest::getFixtureConnectionString();
}
-void NetworkInterfaceASIOIntegrationFixture::setRandomNumberGenerator(PseudoRandom* generator) {
+void NetworkInterfaceIntegrationFixture::setRandomNumberGenerator(PseudoRandom* generator) {
_rng = generator;
}
-PseudoRandom* NetworkInterfaceASIOIntegrationFixture::getRandomNumberGenerator() {
+PseudoRandom* NetworkInterfaceIntegrationFixture::getRandomNumberGenerator() {
return _rng;
}
-void NetworkInterfaceASIOIntegrationFixture::startCommand(
- const TaskExecutor::CallbackHandle& cbHandle,
- RemoteCommandRequest& request,
- StartCommandCB onFinish) {
+void NetworkInterfaceIntegrationFixture::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ RemoteCommandRequest& request,
+ StartCommandCB onFinish) {
net().startCommand(cbHandle, request, onFinish).transitional_ignore();
}
-Deferred<RemoteCommandResponse> NetworkInterfaceASIOIntegrationFixture::runCommand(
- const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request) {
- Deferred<RemoteCommandResponse> deferred;
- net()
- .startCommand(
- cbHandle,
- request,
- [deferred](RemoteCommandResponse resp) mutable { deferred.emplace(std::move(resp)); })
- .transitional_ignore();
- return deferred;
+Future<RemoteCommandResponse> NetworkInterfaceIntegrationFixture::runCommand(
+ const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest request) {
+ return net().startCommand(cbHandle, request);
}
-RemoteCommandResponse NetworkInterfaceASIOIntegrationFixture::runCommandSync(
+RemoteCommandResponse NetworkInterfaceIntegrationFixture::runCommandSync(
RemoteCommandRequest& request) {
auto deferred = runCommand(makeCallbackHandle(), request);
auto& res = deferred.get();
@@ -114,9 +106,9 @@ RemoteCommandResponse NetworkInterfaceASIOIntegrationFixture::runCommandSync(
return res;
}
-void NetworkInterfaceASIOIntegrationFixture::assertCommandOK(StringData db,
- const BSONObj& cmd,
- Milliseconds timeoutMillis) {
+void NetworkInterfaceIntegrationFixture::assertCommandOK(StringData db,
+ const BSONObj& cmd,
+ Milliseconds timeoutMillis) {
RemoteCommandRequest request{
fixture().getServers()[0], db.toString(), cmd, BSONObj(), nullptr, timeoutMillis};
auto res = runCommandSync(request);
@@ -125,16 +117,20 @@ void NetworkInterfaceASIOIntegrationFixture::assertCommandOK(StringData db,
ASSERT(!res.data["writeErrors"]);
}
-void NetworkInterfaceASIOIntegrationFixture::assertCommandFailsOnClient(
- StringData db, const BSONObj& cmd, ErrorCodes::Error reason, Milliseconds timeoutMillis) {
+void NetworkInterfaceIntegrationFixture::assertCommandFailsOnClient(StringData db,
+ const BSONObj& cmd,
+ ErrorCodes::Error reason,
+ Milliseconds timeoutMillis) {
RemoteCommandRequest request{
fixture().getServers()[0], db.toString(), cmd, BSONObj(), nullptr, timeoutMillis};
auto res = runCommandSync(request);
ASSERT_EQ(reason, res.status.code());
}
-void NetworkInterfaceASIOIntegrationFixture::assertCommandFailsOnServer(
- StringData db, const BSONObj& cmd, ErrorCodes::Error reason, Milliseconds timeoutMillis) {
+void NetworkInterfaceIntegrationFixture::assertCommandFailsOnServer(StringData db,
+ const BSONObj& cmd,
+ ErrorCodes::Error reason,
+ Milliseconds timeoutMillis) {
RemoteCommandRequest request{
fixture().getServers()[0], db.toString(), cmd, BSONObj(), nullptr, timeoutMillis};
auto res = runCommandSync(request);
@@ -143,10 +139,10 @@ void NetworkInterfaceASIOIntegrationFixture::assertCommandFailsOnServer(
ASSERT_EQ(reason, serverStatus);
}
-void NetworkInterfaceASIOIntegrationFixture::assertWriteError(StringData db,
- const BSONObj& cmd,
- ErrorCodes::Error reason,
- Milliseconds timeoutMillis) {
+void NetworkInterfaceIntegrationFixture::assertWriteError(StringData db,
+ const BSONObj& cmd,
+ ErrorCodes::Error reason,
+ Milliseconds timeoutMillis) {
RemoteCommandRequest request{
fixture().getServers()[0], db.toString(), cmd, BSONObj(), nullptr, timeoutMillis};
auto res = runCommandSync(request);
@@ -158,5 +154,6 @@ void NetworkInterfaceASIOIntegrationFixture::assertWriteError(StringData db,
firstWriteError.getStringField("errmsg"));
ASSERT_EQ(reason, writeErrorStatus);
}
+
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/executor/network_interface_asio_integration_fixture.h b/src/mongo/executor/network_interface_integration_fixture.h
index f6207e7a12c..a83a5d06f31 100644
--- a/src/mongo/executor/network_interface_asio_integration_fixture.h
+++ b/src/mongo/executor/network_interface_integration_fixture.h
@@ -29,9 +29,11 @@
#include "mongo/unittest/unittest.h"
-#include "mongo/executor/network_interface_asio.h"
-#include "mongo/executor/network_interface_asio_test_utils.h"
+#include "mongo/client/connection_string.h"
+#include "mongo/executor/network_connection_hook.h"
+#include "mongo/executor/network_interface.h"
#include "mongo/executor/task_executor.h"
+#include "mongo/util/future.h"
namespace mongo {
@@ -39,14 +41,31 @@ class PseudoRandom;
namespace executor {
+/**
+ * A mock class mimicking TaskExecutor::CallbackState, does nothing.
+ */
+class MockCallbackState final : public TaskExecutor::CallbackState {
+public:
+ MockCallbackState() = default;
+ void cancel() override {}
+ void waitForCompletion() override {}
+ bool isCanceled() const override {
+ return false;
+ }
+};
+
+inline TaskExecutor::CallbackHandle makeCallbackHandle() {
+ return TaskExecutor::CallbackHandle(std::make_shared<MockCallbackState>());
+}
+
using StartCommandCB = stdx::function<void(const RemoteCommandResponse&)>;
-class NetworkInterfaceASIOIntegrationFixture : public mongo::unittest::Test {
+class NetworkInterfaceIntegrationFixture : public mongo::unittest::Test {
public:
- void startNet(NetworkInterfaceASIO::Options options = NetworkInterfaceASIO::Options());
+ void startNet(std::unique_ptr<NetworkConnectionHook> connectHook = nullptr);
void tearDown() override;
- NetworkInterfaceASIO& net();
+ NetworkInterface& net();
ConnectionString fixture();
@@ -58,8 +77,8 @@ public:
RemoteCommandRequest& request,
StartCommandCB onFinish);
- Deferred<RemoteCommandResponse> runCommand(const TaskExecutor::CallbackHandle& cbHandle,
- RemoteCommandRequest& request);
+ Future<RemoteCommandResponse> runCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ RemoteCommandRequest request);
RemoteCommandResponse runCommandSync(RemoteCommandRequest& request);
@@ -82,7 +101,7 @@ public:
Milliseconds timeoutMillis = Minutes(5));
private:
- std::unique_ptr<NetworkInterfaceASIO> _net;
+ std::unique_ptr<NetworkInterface> _net;
PseudoRandom* _rng = nullptr;
};
} // namespace executor
diff --git a/src/mongo/executor/network_interface_integration_test.cpp b/src/mongo/executor/network_interface_integration_test.cpp
new file mode 100644
index 00000000000..b5a9d08edeb
--- /dev/null
+++ b/src/mongo/executor/network_interface_integration_test.cpp
@@ -0,0 +1,373 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+
+#include "mongo/platform/basic.h"
+
+#include <algorithm>
+#include <exception>
+
+#include "mongo/base/status_with.h"
+#include "mongo/client/connection_string.h"
+#include "mongo/db/commands/test_commands_enabled.h"
+#include "mongo/db/wire_version.h"
+#include "mongo/executor/network_connection_hook.h"
+#include "mongo/executor/network_interface_integration_fixture.h"
+#include "mongo/executor/test_network_connection_hook.h"
+#include "mongo/rpc/factory.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/rpc/message.h"
+#include "mongo/stdx/future.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/unittest/integration_test.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+namespace executor {
+namespace {
+
+bool pingCommandMissing(const RemoteCommandResponse& result) {
+ if (result.isOK()) {
+ // On mongos, there is no sleep command, so just check that the command failed with
+ // a "Command not found" error code
+ ASSERT_EQ(result.data["ok"].Double(), 0.0);
+ ASSERT_EQ(result.data["code"].Int(), 59);
+ return true;
+ }
+
+ return false;
+}
+
+TEST_F(NetworkInterfaceIntegrationFixture, Ping) {
+ startNet();
+ assertCommandOK("admin", BSON("ping" << 1));
+}
+
+// Hook that intentionally never finishes
+class HangingHook : public executor::NetworkConnectionHook {
+ Status validateHost(const HostAndPort&,
+ const BSONObj& request,
+ const RemoteCommandResponse&) final {
+ return Status::OK();
+ }
+
+ StatusWith<boost::optional<RemoteCommandRequest>> makeRequest(
+ const HostAndPort& remoteHost) final {
+ return {boost::make_optional(RemoteCommandRequest(remoteHost,
+ "admin",
+ BSON("sleep" << 1 << "lock"
+ << "none"
+ << "secs"
+ << 100000000),
+ BSONObj(),
+ nullptr))};
+ }
+
+ Status handleReply(const HostAndPort& remoteHost, RemoteCommandResponse&& response) final {
+ if (pingCommandMissing(response)) {
+ return {ErrorCodes::NetworkInterfaceExceededTimeLimit,
+ "No ping command. Simulating timeout"};
+ }
+
+ MONGO_UNREACHABLE;
+ }
+};
+
+
+// Test that we time out a command if the connection hook hangs.
+TEST_F(NetworkInterfaceIntegrationFixture, HookHangs) {
+ startNet(stdx::make_unique<HangingHook>());
+
+ assertCommandFailsOnClient(
+ "admin", BSON("ping" << 1), ErrorCodes::NetworkInterfaceExceededTimeLimit, Seconds(1));
+}
+
+using ResponseStatus = TaskExecutor::ResponseStatus;
+
+BSONObj objConcat(std::initializer_list<BSONObj> objs) {
+ BSONObjBuilder bob;
+
+ for (const auto& obj : objs) {
+ bob.appendElements(obj);
+ }
+
+ return bob.obj();
+}
+
+class NetworkInterfaceTest : public NetworkInterfaceIntegrationFixture {
+public:
+ void assertNumOps(uint64_t canceled, uint64_t timedOut, uint64_t failed, uint64_t succeeded) {
+ auto counters = net().getCounters();
+ ASSERT_EQ(canceled, counters.canceled);
+ ASSERT_EQ(timedOut, counters.timedOut);
+ ASSERT_EQ(failed, counters.failed);
+ ASSERT_EQ(succeeded, counters.succeeded);
+ }
+
+ void setUp() override {
+ setTestCommandsEnabled(true);
+ startNet(std::make_unique<WaitForIsMasterHook>(this));
+ }
+
+ RemoteCommandRequest makeTestCommand(boost::optional<Milliseconds> timeout = boost::none,
+ BSONObj cmd = BSON("echo" << 1 << "foo"
+ << "bar")) {
+ auto cs = fixture();
+ return RemoteCommandRequest(cs.getServers().front(),
+ "admin",
+ std::move(cmd),
+ BSONObj(),
+ nullptr,
+ timeout ? *timeout : RemoteCommandRequest::kNoTimeout);
+ }
+
+ struct IsMasterData {
+ BSONObj request;
+ RemoteCommandResponse response;
+ };
+ IsMasterData waitForIsMaster() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _isMasterCond.wait(lk, [this] { return _isMasterResult != boost::none; });
+
+ return std::move(*_isMasterResult);
+ }
+
+ bool hasIsMaster() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _isMasterResult != boost::none;
+ }
+
+private:
+ class WaitForIsMasterHook : public NetworkConnectionHook {
+ public:
+ explicit WaitForIsMasterHook(NetworkInterfaceTest* parent) : _parent(parent) {}
+
+ Status validateHost(const HostAndPort& host,
+ const BSONObj& request,
+ const RemoteCommandResponse& isMasterReply) override {
+ stdx::lock_guard<stdx::mutex> lk(_parent->_mutex);
+ _parent->_isMasterResult = {request, isMasterReply};
+ _parent->_isMasterCond.notify_all();
+ return Status::OK();
+ }
+
+ StatusWith<boost::optional<RemoteCommandRequest>> makeRequest(const HostAndPort&) override {
+ return {boost::none};
+ }
+
+ Status handleReply(const HostAndPort&, RemoteCommandResponse&&) override {
+ return Status::OK();
+ }
+
+ private:
+ NetworkInterfaceTest* _parent;
+ };
+
+ stdx::mutex _mutex;
+ stdx::condition_variable _isMasterCond;
+ boost::optional<IsMasterData> _isMasterResult;
+};
+
+TEST_F(NetworkInterfaceTest, CancelMissingOperation) {
+ // This is just a sanity check, this action should have no effect.
+ net().cancelCommand(makeCallbackHandle());
+ assertNumOps(0u, 0u, 0u, 0u);
+}
+
+TEST_F(NetworkInterfaceTest, CancelOperation) {
+ auto cbh = makeCallbackHandle();
+
+ // Kick off our operation
+ FailPointEnableBlock fpb("networkInterfaceDiscardCommandsAfterAcquireConn");
+
+ auto deferred = runCommand(cbh, makeTestCommand());
+
+ waitForIsMaster();
+
+ net().cancelCommand(cbh);
+
+ // Wait for op to complete, assert that it was canceled.
+ auto result = deferred.get();
+ ASSERT_EQ(ErrorCodes::CallbackCanceled, result.status);
+ ASSERT(result.elapsedMillis);
+ assertNumOps(1u, 0u, 0u, 0u);
+}
+
+TEST_F(NetworkInterfaceTest, ImmediateCancel) {
+ auto cbh = makeCallbackHandle();
+
+ // Kick off our operation
+
+ FailPointEnableBlock fpb("networkInterfaceDiscardCommandsBeforeAcquireConn");
+
+ auto deferred = runCommand(cbh, makeTestCommand());
+
+ net().cancelCommand(cbh);
+
+ ASSERT_FALSE(hasIsMaster());
+
+ // Wait for op to complete, assert that it was canceled.
+ auto result = deferred.get();
+ ASSERT_EQ(ErrorCodes::CallbackCanceled, result.status);
+ ASSERT(result.elapsedMillis);
+ assertNumOps(1u, 0u, 0u, 0u);
+}
+
+TEST_F(NetworkInterfaceTest, LateCancel) {
+ auto cbh = makeCallbackHandle();
+
+ auto deferred = runCommand(cbh, makeTestCommand());
+
+ // Wait for op to complete, assert that it was canceled.
+ auto result = deferred.get();
+ net().cancelCommand(cbh);
+
+ ASSERT(result.isOK());
+ ASSERT(result.elapsedMillis);
+ assertNumOps(0u, 0u, 0u, 1u);
+}
+
+TEST_F(NetworkInterfaceTest, AsyncOpTimeout) {
+ // Kick off operation
+ auto cb = makeCallbackHandle();
+ auto request = makeTestCommand(Milliseconds{1000});
+ request.cmdObj = BSON("sleep" << 1 << "lock"
+ << "none"
+ << "secs"
+ << 1000000000);
+ auto deferred = runCommand(cb, request);
+
+ waitForIsMaster();
+
+ auto result = deferred.get();
+
+ // mongos doesn't implement the ping command, so ignore the response there, otherwise
+ // check that we've timed out.
+ if (!pingCommandMissing(result)) {
+ ASSERT_EQ(ErrorCodes::NetworkInterfaceExceededTimeLimit, result.status);
+ ASSERT(result.elapsedMillis);
+ assertNumOps(0u, 1u, 0u, 0u);
+ }
+}
+
+TEST_F(NetworkInterfaceTest, StartCommand) {
+ auto commandRequest = BSON("echo" << 1 << "boop"
+ << "bop");
+
+ // This opmsg request expect the following reply, which is generated below
+ // { echo: { echo: 1, boop: "bop", $db: "admin" }, ok: 1.0 }
+ auto expectedCommandReply = [&] {
+ BSONObjBuilder echoed;
+ echoed.appendElements(commandRequest);
+ echoed << "$db"
+ << "admin";
+ return echoed.obj();
+ }();
+ auto request = makeTestCommand(boost::none, commandRequest);
+
+ auto deferred = runCommand(makeCallbackHandle(), std::move(request));
+
+ auto res = deferred.get();
+
+ ASSERT(res.elapsedMillis);
+ uassertStatusOK(res.status);
+ ASSERT_BSONOBJ_EQ(res.data.getObjectField("echo"), expectedCommandReply);
+ ASSERT_EQ(res.data.getIntField("ok"), 1);
+ assertNumOps(0u, 0u, 0u, 1u);
+}
+
+TEST_F(NetworkInterfaceTest, SetAlarm) {
+ // set a first alarm, to execute after "expiration"
+ Date_t expiration = net().now() + Milliseconds(100);
+ auto makeTimerFuture = [&] {
+ Promise<Date_t> promise;
+ auto future = promise.getFuture();
+
+ return std::make_pair(
+ [ this, promise = promise.share() ]() mutable { promise.emplaceValue(net().now()); },
+ std::move(future));
+ };
+
+ auto futurePair = makeTimerFuture();
+ ASSERT_OK(net().setAlarm(expiration, std::move(futurePair.first)));
+
+ // assert that it executed after "expiration"
+ auto& result = futurePair.second.get();
+ ASSERT(result >= expiration);
+
+ expiration = net().now() + Milliseconds(99999999);
+ auto futurePair2 = makeTimerFuture();
+ ASSERT_OK(net().setAlarm(expiration, std::move(futurePair2.first)));
+
+ net().shutdown();
+ ASSERT_TRUE(!futurePair2.second.isReady());
+}
+
+TEST_F(NetworkInterfaceTest, IsMasterRequestContainsOutgoingWireVersionInternalClientInfo) {
+ WireSpec::instance().isInternalClient = true;
+
+ auto deferred = runCommand(makeCallbackHandle(), makeTestCommand());
+ auto isMasterHandshake = waitForIsMaster();
+
+ // Verify that the isMaster reply has the expected internalClient data.
+ auto internalClientElem = isMasterHandshake.request["internalClient"];
+ ASSERT_EQ(internalClientElem.type(), BSONType::Object);
+ auto minWireVersionElem = internalClientElem.Obj()["minWireVersion"];
+ auto maxWireVersionElem = internalClientElem.Obj()["maxWireVersion"];
+ ASSERT_EQ(minWireVersionElem.type(), BSONType::NumberInt);
+ ASSERT_EQ(maxWireVersionElem.type(), BSONType::NumberInt);
+ ASSERT_EQ(minWireVersionElem.numberInt(), WireSpec::instance().outgoing.minWireVersion);
+ ASSERT_EQ(maxWireVersionElem.numberInt(), WireSpec::instance().outgoing.maxWireVersion);
+
+ // Verify that the ping op is counted as a success.
+ auto res = deferred.get();
+ ASSERT(res.elapsedMillis);
+ assertNumOps(0u, 0u, 0u, 1u);
+}
+
+TEST_F(NetworkInterfaceTest, IsMasterRequestMissingInternalClientInfoWhenNotInternalClient) {
+ WireSpec::instance().isInternalClient = false;
+
+ auto deferred = runCommand(makeCallbackHandle(), makeTestCommand());
+ auto isMasterHandshake = waitForIsMaster();
+
+ // Verify that the isMaster reply has the expected internalClient data.
+ ASSERT_FALSE(isMasterHandshake.request["internalClient"]);
+ // Verify that the ping op is counted as a success.
+ auto res = deferred.get();
+ ASSERT(res.elapsedMillis);
+ assertNumOps(0u, 0u, 0u, 1u);
+}
+
+} // namespace
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index 674dd8bb116..42cda1b4005 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -481,7 +481,7 @@ void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort
? handshakeReplyIter->second
: RemoteCommandResponse(BSONObj(), BSONObj(), Milliseconds(0));
- auto valid = _hook->validateHost(target, handshakeReply);
+ auto valid = _hook->validateHost(target, op.getRequest().cmdObj, handshakeReply);
if (!valid.isOK()) {
op.setResponse(_now_inlock(), valid);
op.finishResponse();
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index 76a9a6462a8..fdb5b58d50c 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -84,6 +84,9 @@ public:
virtual ~NetworkInterfaceMock();
virtual void appendConnectionStats(ConnectionPoolStats* stats) const;
virtual std::string getDiagnosticString();
+ Counters getCounters() const override {
+ return Counters();
+ }
/**
* Logs the contents of the queues for diagnostics.
diff --git a/src/mongo/executor/network_interface_mock_test.cpp b/src/mongo/executor/network_interface_mock_test.cpp
index 59c33e1fc82..a09ed4aeac0 100644
--- a/src/mongo/executor/network_interface_mock_test.cpp
+++ b/src/mongo/executor/network_interface_mock_test.cpp
@@ -133,7 +133,9 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHook) {
// Since the contract of these methods is that they do not throw, we run the ASSERTs in
// the test scope.
net().setConnectionHook(makeTestHook(
- [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) {
+ [&](const HostAndPort& remoteHost,
+ const BSONObj&,
+ const RemoteCommandResponse& isMasterReply) {
validateCalled = true;
hostCorrectForValidate = (remoteHost == testHost());
replyCorrectForValidate = SimpleBSONObjComparator::kInstance.evaluate(
@@ -219,7 +221,9 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHook) {
TEST_F(NetworkInterfaceMockTest, ConnectionHookFailedValidation) {
net().setConnectionHook(makeTestHook(
- [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status {
+ [&](const HostAndPort& remoteHost,
+ const BSONObj&,
+ const RemoteCommandResponse& isMasterReply) -> Status {
// We just need some obscure non-OK code.
return {ErrorCodes::ConflictingOperationInProgress, "blah"};
},
@@ -260,9 +264,9 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookFailedValidation) {
TEST_F(NetworkInterfaceMockTest, ConnectionHookNoRequest) {
bool makeRequestCalled = false;
net().setConnectionHook(makeTestHook(
- [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status {
- return Status::OK();
- },
+ [&](const HostAndPort& remoteHost,
+ const BSONObj&,
+ const RemoteCommandResponse& isMasterReply) -> Status { return Status::OK(); },
[&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> {
makeRequestCalled = true;
return {boost::none};
@@ -296,9 +300,9 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookNoRequest) {
TEST_F(NetworkInterfaceMockTest, ConnectionHookMakeRequestFails) {
bool makeRequestCalled = false;
net().setConnectionHook(makeTestHook(
- [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status {
- return Status::OK();
- },
+ [&](const HostAndPort& remoteHost,
+ const BSONObj&,
+ const RemoteCommandResponse& isMasterReply) -> Status { return Status::OK(); },
[&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> {
makeRequestCalled = true;
return {ErrorCodes::InvalidSyncSource, "blah"};
@@ -333,9 +337,9 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookMakeRequestFails) {
TEST_F(NetworkInterfaceMockTest, ConnectionHookHandleReplyFails) {
bool handleReplyCalled = false;
net().setConnectionHook(makeTestHook(
- [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status {
- return Status::OK();
- },
+ [&](const HostAndPort& remoteHost,
+ const BSONObj&,
+ const RemoteCommandResponse& isMasterReply) -> Status { return Status::OK(); },
[&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> {
return boost::make_optional<RemoteCommandRequest>({});
},
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index 62111f3f1b8..5fb164cf9e6 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -32,6 +32,7 @@
#include "mongo/executor/network_interface_tl.h"
+#include "mongo/db/commands/test_commands_enabled.h"
#include "mongo/db/server_options.h"
#include "mongo/executor/connection_pool_tl.h"
#include "mongo/transport/transport_layer_manager.h"
@@ -70,6 +71,12 @@ void NetworkInterfaceTL::appendConnectionStats(ConnectionPoolStats* stats) const
pool->appendConnectionStats(stats);
}
+NetworkInterface::Counters NetworkInterfaceTL::getCounters() const {
+ invariant(getTestCommandsEnabled());
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _counters;
+}
+
std::string NetworkInterfaceTL::getHostName() {
return getHostNameCached();
}
@@ -174,6 +181,15 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
state->deadline = state->start + state->request.timeout;
}
+ if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsBeforeAcquireConn)) {
+ log() << "Discarding command due to failpoint before acquireConn";
+ std::move(state->mergedFuture)
+ .getAsync([onFinish](StatusWith<RemoteCommandResponse> response) {
+ onFinish(RemoteCommandResponse(response.getStatus(), Milliseconds{0}));
+ });
+ return Status::OK();
+ }
+
// Interacting with the connection pool can involve more work than just getting a connection
// out. In particular, we can end up having to spin up new connections, and fulfilling promises
// for other requesters. Returning connections has the same issue.
@@ -254,6 +270,11 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
std::shared_ptr<CommandState> state,
CommandState::ConnHandle conn,
const transport::BatonHandle& baton) {
+ if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsAfterAcquireConn)) {
+ conn->indicateSuccess();
+ return std::move(state->mergedFuture);
+ }
+
if (state->done.load()) {
conn->indicateSuccess();
uasserted(ErrorCodes::CallbackCanceled, "Command was canceled");
@@ -277,7 +298,7 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
state->timer = _reactor->makeTimer();
state->timer->waitUntil(state->deadline, baton)
- .getAsync([client, state, baton](Status status) {
+ .getAsync([this, client, state, baton](Status status) {
if (status == ErrorCodes::CallbackCanceled) {
invariant(state->done.load());
return;
@@ -287,6 +308,11 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
return;
}
+ if (getTestCommandsEnabled()) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _counters.timedOut++;
+ }
+
LOG(2) << "Request " << state->request.id << " timed out"
<< ", deadline was " << state->deadline << ", op was "
<< redact(state->request.toString());
@@ -324,6 +350,15 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
if (state->done.swap(true))
return;
+ if (getTestCommandsEnabled()) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (swr.isOK() && swr.getValue().status.isOK()) {
+ _counters.succeeded++;
+ } else {
+ _counters.failed++;
+ }
+ }
+
if (state->timer) {
state->timer->cancel(baton);
}
@@ -354,6 +389,11 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan
return;
}
+ if (getTestCommandsEnabled()) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _counters.canceled++;
+ }
+
LOG(2) << "Canceling operation; original request was: " << redact(state->request.toString());
state->promise.setError({ErrorCodes::CallbackCanceled,
str::stream() << "Command canceled; original request was: "
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index 76053a511d7..b3e3296d8dc 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -54,6 +54,8 @@ public:
std::string getDiagnosticString() override;
void appendConnectionStats(ConnectionPoolStats* stats) const override;
std::string getHostName() override;
+ Counters getCounters() const override;
+
void startup() override;
void shutdown() override;
bool inShutdown() const override;
@@ -121,6 +123,7 @@ private:
ConnectionPool::Options _connPoolOpts;
std::unique_ptr<NetworkConnectionHook> _onConnectHook;
std::unique_ptr<ConnectionPool> _pool;
+ Counters _counters;
std::unique_ptr<rpc::EgressMetadataHook> _metadataHook;
AtomicBool _inShutdown;
diff --git a/src/mongo/executor/test_network_connection_hook.h b/src/mongo/executor/test_network_connection_hook.h
index 0294b23cd38..b78379d3919 100644
--- a/src/mongo/executor/test_network_connection_hook.h
+++ b/src/mongo/executor/test_network_connection_hook.h
@@ -52,8 +52,9 @@ public:
_replyFunc(std::forward<ReplyFunc>(replyFunc)) {}
Status validateHost(const HostAndPort& remoteHost,
+ const BSONObj& request,
const RemoteCommandResponse& isMasterReply) override {
- return _validateFunc(remoteHost, isMasterReply);
+ return _validateFunc(remoteHost, request, isMasterReply);
}
StatusWith<boost::optional<RemoteCommandRequest>> makeRequest(const HostAndPort& remoteHost) {