diff options
author | Adam Midvidy <amidvidy@gmail.com> | 2015-07-29 09:24:57 -0400 |
---|---|---|
committer | Adam Midvidy <amidvidy@gmail.com> | 2015-07-29 15:04:50 -0400 |
commit | b0f76ffbe26eacfa7666a9a137d9e0956778b85d (patch) | |
tree | c943913ed5ef18b9bab3f2f9d4b9b05401fd654c /src | |
parent | 54287f9a7a35e335b91689f5f5e7d5efcdb7dd15 (diff) | |
download | mongo-b0f76ffbe26eacfa7666a9a137d9e0956778b85d.tar.gz |
SERVER-19439 implement connection hooking in NetworkInterfaceMock
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/executor/SConscript | 10 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.cpp | 115 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.h | 42 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock_test.cpp | 401 |
4 files changed, 562 insertions, 6 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript index 961e38cf47f..ce159af25a5 100644 --- a/src/mongo/executor/SConscript +++ b/src/mongo/executor/SConscript @@ -44,6 +44,16 @@ env.Library('network_interface_mock', 'task_executor_interface', ]) +env.CppUnitTest( + target='network_interface_mock_test', + source=[ + 'network_interface_mock_test.cpp' + ], + LIBDEPS=[ + 'network_interface_mock', + ], +) + env.Library(target='network_test_env', source=['network_test_env.cpp',], LIBDEPS=[ diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index b4e2b22ddd6..4e8c427d12e 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -26,11 +26,17 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + #include "mongo/platform/basic.h" #include "mongo/executor/network_interface_mock.h" +#include <algorithm> +#include <iterator> + #include "mongo/stdx/functional.h" +#include "mongo/util/log.h" #include "mongo/util/time_support.h" namespace mongo { @@ -71,12 +77,26 @@ void NetworkInterfaceMock::startCommand(const TaskExecutor::CallbackHandle& cbHa stdx::lock_guard<stdx::mutex> lk(_mutex); invariant(!_inShutdown); const Date_t now = _now_inlock(); - NetworkOperationIterator insertBefore = _unscheduled.begin(); - while ((insertBefore != _unscheduled.end()) && - (insertBefore->getNextConsiderationDate() <= now)) { - ++insertBefore; + auto op = NetworkOperation(cbHandle, request, now, onFinish); + + // If we don't have a hook, or we have already 'connected' to this host, enqueue the op. + if (!_hook || _connections.count(request.target)) { + _enqueueOperation_inlock(std::move(op)); + } else { + _connectThenEnqueueOperation_inlock(request.target, std::move(op)); + } +} + +void NetworkInterfaceMock::setHandshakeReplyForHost( + const mongo::HostAndPort& host, mongo::executor::RemoteCommandResponse&& reply) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + auto it = _handshakeReplies.find(host); + if (it == std::end(_handshakeReplies)) { + auto res = _handshakeReplies.emplace(host, std::move(reply)); + invariant(res.second); + } else { + it->second = std::move(reply); } - _unscheduled.insert(insertBefore, NetworkOperation(cbHandle, request, now, onFinish)); } static bool findAndCancelIf( @@ -290,8 +310,91 @@ void NetworkInterfaceMock::waitForWorkUntil(Date_t when) { _waitForWork_inlock(&lk); } +void NetworkInterfaceMock::_enqueueOperation_inlock( + mongo::executor::NetworkInterfaceMock::NetworkOperation&& op) { + auto insertBefore = + std::upper_bound(std::begin(_unscheduled), + std::end(_unscheduled), + op, + [](const NetworkOperation& a, const NetworkOperation& b) { + return a.getNextConsiderationDate() < b.getNextConsiderationDate(); + }); + + _unscheduled.emplace(insertBefore, std::move(op)); +} + +void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort& target, + NetworkOperation&& op) { + invariant(_hook); // if there is no hook, we shouldn't even hit this codepath + invariant(!_connections.count(target)); + + auto handshakeReplyIter = _handshakeReplies.find(target); + + auto handshakeReply = (handshakeReplyIter != std::end(_handshakeReplies)) + ? handshakeReplyIter->second + : RemoteCommandResponse(BSONObj(), BSONObj(), Milliseconds(0)); + + auto valid = _hook->validateHost(target, handshakeReply); + if (!valid.isOK()) { + op.setResponse(_now_inlock(), valid); + op.finishResponse(); + return; + } + + auto swHookPostconnectCommand = _hook->makeRequest(target); + + if (!swHookPostconnectCommand.isOK()) { + op.setResponse(_now_inlock(), swHookPostconnectCommand.getStatus()); + op.finishResponse(); + return; + } + + boost::optional<RemoteCommandRequest> hookPostconnectCommand = + std::move(swHookPostconnectCommand.getValue()); + + if (!hookPostconnectCommand) { + // If we don't have a post connect command, enqueue the actual command. + _enqueueOperation_inlock(std::move(op)); + _connections.emplace(op.getRequest().target); + return; + } + + // The completion handler for the postconnect command schedules the original command. + auto postconnectCompletionHandler = + [this, op](StatusWith<RemoteCommandResponse> response) mutable { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (!response.isOK()) { + op.setResponse(_now_inlock(), response.getStatus()); + op.finishResponse(); + return; + } + + auto handleStatus = + _hook->handleReply(op.getRequest().target, std::move(response.getValue())); + + if (!handleStatus.isOK()) { + op.setResponse(_now_inlock(), handleStatus); + op.finishResponse(); + return; + } + + _enqueueOperation_inlock(std::move(op)); + _connections.emplace(op.getRequest().target); + }; + + auto postconnectOp = NetworkOperation(op.getCallbackHandle(), + std::move(*hookPostconnectCommand), + _now_inlock(), + std::move(postconnectCompletionHandler)); + + _enqueueOperation_inlock(std::move(postconnectOp)); +} + void NetworkInterfaceMock::setConnectionHook(std::unique_ptr<ConnectionHook> hook) { - MONGO_UNREACHABLE; + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(!_hasStarted); + invariant(!_hook); + _hook = std::move(hook); } void NetworkInterfaceMock::signalWorkAvailable() { diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index 91491b578e0..f8fc61840f8 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -28,7 +28,10 @@ #pragma once +#include <memory> #include <queue> +#include <unordered_map> +#include <unordered_set> #include <utility> #include <vector> @@ -167,6 +170,19 @@ public: */ void runReadyNetworkOperations(); + /** + * Sets the reply of the 'isMaster' handshake for a specific host. This reply will only + * be given to the 'validateHost' method of the ConnectionHook set on this object - NOT + * to the completion handlers of any 'isMaster' commands scheduled with 'startCommand'. + * + * This reply will persist until it is changed again using this method. + * + * If the NetworkInterfaceMock conducts a handshake with a simulated host which has not + * had a handshake reply set, a default constructed RemoteCommandResponse will be passed + * to validateHost if a hook is set. + */ + void setHandshakeReplyForHost(const HostAndPort& host, RemoteCommandResponse&& reply); + private: /** * Information describing a scheduled alarm. @@ -218,6 +234,16 @@ private: bool _isExecutorThreadRunnable_inlock(); /** + * Enqueues a network operation to run in order of 'consideration date'. + */ + void _enqueueOperation_inlock(NetworkOperation&& op); + + /** + * "Connects" to a remote host, and then enqueues the provided operation. + */ + void _connectThenEnqueueOperation_inlock(const HostAndPort& target, NetworkOperation&& op); + + /** * Runs all ready network operations, called while holding "lk". May drop and * reaquire "lk" several times, but will not return until the executor has blocked * in waitFor*. @@ -273,6 +299,18 @@ private: // Heap of alarms, with the next alarm always on top. std::priority_queue<AlarmInfo, std::vector<AlarmInfo>, std::greater<AlarmInfo>> _alarms; // (M) + + // The connection hook. + std::unique_ptr<ConnectionHook> _hook; // (R) + + // The set of hosts we have seen so far. If we see a new host, we will execute the + // ConnectionHook's validation and post-connection logic. + // + // TODO: provide a way to simulate disconnections. + std::unordered_set<HostAndPort> _connections; // (M) + + // The handshake replies set for each host. + std::unordered_map<HostAndPort, RemoteCommandResponse> _handshakeReplies; // (M) }; /** @@ -306,6 +344,10 @@ public: return cbHandle == _cbHandle; } + const TaskExecutor::CallbackHandle& getCallbackHandle() const { + return _cbHandle; + } + /** * Gets the request that initiated this operation. */ diff --git a/src/mongo/executor/network_interface_mock_test.cpp b/src/mongo/executor/network_interface_mock_test.cpp new file mode 100644 index 00000000000..8e623778458 --- /dev/null +++ b/src/mongo/executor/network_interface_mock_test.cpp @@ -0,0 +1,401 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <iostream> +#include <memory> +#include <utility> + +#include "mongo/base/status.h" +#include "mongo/executor/network_interface.h" +#include "mongo/executor/network_interface_mock.h" +#include "mongo/executor/thread_pool_mock.h" +#include "mongo/stdx/memory.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace executor { +namespace { + +template <typename ValidateFunc, typename RequestFunc, typename ReplyFunc> +class TestConnectionHook final : public NetworkInterface::ConnectionHook { +public: + TestConnectionHook(ValidateFunc&& validateFunc, + RequestFunc&& requestFunc, + ReplyFunc&& replyFunc) + : _validateFunc(std::forward<ValidateFunc>(validateFunc)), + _requestFunc(std::forward<RequestFunc>(requestFunc)), + _replyFunc(std::forward<ReplyFunc>(replyFunc)) {} + + Status validateHost(const HostAndPort& remoteHost, + const RemoteCommandResponse& isMasterReply) override { + return _validateFunc(remoteHost, isMasterReply); + } + + StatusWith<boost::optional<RemoteCommandRequest>> makeRequest(const HostAndPort& remoteHost) { + return _requestFunc(remoteHost); + } + + Status handleReply(const HostAndPort& remoteHost, RemoteCommandResponse&& response) { + return _replyFunc(remoteHost, std::move(response)); + } + +private: + ValidateFunc _validateFunc; + RequestFunc _requestFunc; + ReplyFunc _replyFunc; +}; + +template <typename Val, typename Req, typename Rep> +static std::unique_ptr<TestConnectionHook<Val, Req, Rep>> makeTestHook(Val&& validateFunc, + Req&& requestFunc, + Rep&& replyFunc) { + return stdx::make_unique<TestConnectionHook<Val, Req, Rep>>(std::forward<Val>(validateFunc), + std::forward<Req>(requestFunc), + std::forward<Rep>(replyFunc)); +} + +class NetworkInterfaceMockTest : public mongo::unittest::Test { +public: + NetworkInterfaceMockTest() : _net{}, _executor(&_net, 1) {} + + NetworkInterfaceMock& net() { + return _net; + } + + ThreadPoolMock& executor() { + return _executor; + } + + HostAndPort testHost() { + return {"localHost", 27017}; + } + + // intentionally not done in setUp as some methods need to be called prior to starting + // the network. + void startNetwork() { + net().startup(); + executor().startup(); + } + + virtual void tearDown() override { + net().exitNetwork(); + executor().shutdown(); + // Wake up sleeping executor threads so they clean up. + net().signalWorkAvailable(); + executor().join(); + net().shutdown(); + } + +private: + NetworkInterfaceMock _net; + ThreadPoolMock _executor; +}; + +TEST_F(NetworkInterfaceMockTest, ConnectionHook) { + bool validateCalled = false; + bool hostCorrectForValidate = false; + bool replyCorrectForValidate; + + bool makeRequestCalled = false; + bool hostCorrectForRequest = false; + + bool handleReplyCalled = false; + bool gotExpectedReply = false; + + RemoteCommandRequest expectedRequest{testHost(), + "test", + BSON("1" << 2), + BSON("some" + << "stuff")}; + + RemoteCommandResponse expectedResponse{BSON("foo" + << "bar" + << "baz" + << "garply"), + BSON("bar" + << "baz"), + Milliseconds(30)}; + + // need to copy as it will be moved + auto isMasterReplyData = BSON("iamyour" + << "father"); + + RemoteCommandResponse isMasterReply{ + isMasterReplyData.copy(), BSON("blah" << 2), Milliseconds(20)}; + + net().setHandshakeReplyForHost(testHost(), std::move(isMasterReply)); + + // Since the contract of these methods is that they do not throw, we run the ASSERTs in + // the test scope. + net().setConnectionHook(makeTestHook( + [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) { + validateCalled = true; + hostCorrectForValidate = (remoteHost == testHost()); + replyCorrectForValidate = (isMasterReply.data == isMasterReplyData); + return Status::OK(); + }, + [&](const HostAndPort& remoteHost) { + makeRequestCalled = true; + hostCorrectForRequest = (remoteHost == testHost()); + return boost::make_optional<RemoteCommandRequest>(expectedRequest); + }, + [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) { + handleReplyCalled = true; + hostCorrectForRequest = (remoteHost == testHost()); + gotExpectedReply = + (expectedResponse.data == response.data); // Don't bother checking all fields. + return Status::OK(); + })); + + startNetwork(); + + TaskExecutor::CallbackHandle cb{}; + + bool commandFinished = false; + bool gotCorrectCommandReply = false; + + RemoteCommandRequest actualCommandExpected{ + testHost(), "testDB", BSON("test" << 1), rpc::makeEmptyMetadata()}; + RemoteCommandResponse actualResponseExpected{BSON("1212121212" + << "12121212121212"), + BSONObj(), + Milliseconds(0)}; + + net().startCommand(cb, + actualCommandExpected, + [&](StatusWith<RemoteCommandResponse> resp) { + commandFinished = true; + if (resp.isOK()) { + gotCorrectCommandReply = (actualResponseExpected.toString() == + resp.getValue().toString()); + } + }); + + // At this point validate and makeRequest should have been called. + ASSERT(validateCalled); + ASSERT(hostCorrectForValidate); + ASSERT(replyCorrectForValidate); + ASSERT(makeRequestCalled); + ASSERT(hostCorrectForRequest); + + // handleReply should not have been called as we haven't responded to the reply yet. + ASSERT(!handleReplyCalled); + // we haven't gotten to the actual command yet + ASSERT(!commandFinished); + + { + net().enterNetwork(); + ASSERT(net().hasReadyRequests()); + auto req = net().getNextReadyRequest(); + ASSERT(req->getRequest().cmdObj == expectedRequest.cmdObj); + net().scheduleResponse(req, net().now(), expectedResponse); + net().runReadyNetworkOperations(); + net().exitNetwork(); + } + + // We should have responded to the post connect command. + ASSERT(handleReplyCalled); + ASSERT(gotExpectedReply); + + // We should not have responsed to the actual command. + ASSERT(!commandFinished); + + { + net().enterNetwork(); + ASSERT(net().hasReadyRequests()); + auto actualCommand = net().getNextReadyRequest(); + ASSERT(actualCommand->getRequest().cmdObj == actualCommandExpected.cmdObj); + net().scheduleResponse(actualCommand, net().now(), actualResponseExpected); + net().runReadyNetworkOperations(); + net().exitNetwork(); + } + + ASSERT(commandFinished); + ASSERT(gotCorrectCommandReply); +} + +TEST_F(NetworkInterfaceMockTest, ConnectionHookFailedValidation) { + net().setConnectionHook(makeTestHook( + [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status { + // We just need some obscure non-OK code. + return {ErrorCodes::ConflictingOperationInProgress, "blah"}; + }, + [&](const HostAndPort& remoteHost) + -> StatusWith<boost::optional<RemoteCommandRequest>> { MONGO_UNREACHABLE; }, + [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) + -> Status { MONGO_UNREACHABLE; })); + + startNetwork(); + + TaskExecutor::CallbackHandle cb{}; + + bool commandFinished = false; + bool statusPropagated = false; + + net().startCommand(cb, + RemoteCommandRequest{}, + [&](StatusWith<RemoteCommandResponse> resp) { + commandFinished = true; + + statusPropagated = resp.getStatus().code() == + ErrorCodes::ConflictingOperationInProgress; + }); + + { + net().enterNetwork(); + // We should have short-circuited the network and immediately called the callback. + // If we change isMaster replies to go through the normal network mechanism, + // this test will need to change. + ASSERT(!net().hasReadyRequests()); + net().exitNetwork(); + } + + ASSERT(commandFinished); + ASSERT(statusPropagated); +} + +TEST_F(NetworkInterfaceMockTest, ConnectionHookNoRequest) { + bool makeRequestCalled = false; + net().setConnectionHook(makeTestHook( + [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) + -> Status { return Status::OK(); }, + [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> { + makeRequestCalled = true; + return {boost::none}; + }, + [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) + -> Status { MONGO_UNREACHABLE; })); + + startNetwork(); + + TaskExecutor::CallbackHandle cb{}; + + bool commandFinished = false; + + net().startCommand(cb, + RemoteCommandRequest{}, + [&](StatusWith<RemoteCommandResponse> resp) { commandFinished = true; }); + + { + net().enterNetwork(); + ASSERT(net().hasReadyRequests()); + auto req = net().getNextReadyRequest(); + net().scheduleResponse(req, net().now(), RemoteCommandResponse{}); + net().runReadyNetworkOperations(); + net().exitNetwork(); + } + + ASSERT(commandFinished); +} + +TEST_F(NetworkInterfaceMockTest, ConnectionHookMakeRequestFails) { + bool makeRequestCalled = false; + net().setConnectionHook(makeTestHook( + [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) + -> Status { return Status::OK(); }, + [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> { + makeRequestCalled = true; + return {ErrorCodes::InvalidSyncSource, "blah"}; + }, + [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) + -> Status { MONGO_UNREACHABLE; })); + + startNetwork(); + + TaskExecutor::CallbackHandle cb{}; + + bool commandFinished = false; + bool errorPropagated = false; + + net().startCommand(cb, + RemoteCommandRequest{}, + [&](StatusWith<RemoteCommandResponse> resp) { + commandFinished = true; + errorPropagated = + resp.getStatus().code() == ErrorCodes::InvalidSyncSource; + }); + + { + net().enterNetwork(); + ASSERT(!net().hasReadyRequests()); + net().exitNetwork(); + } + + ASSERT(commandFinished); + ASSERT(errorPropagated); +} + +TEST_F(NetworkInterfaceMockTest, ConnectionHookHandleReplyFails) { + bool handleReplyCalled = false; + net().setConnectionHook(makeTestHook( + [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) + -> Status { return Status::OK(); }, + [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> { + return boost::make_optional<RemoteCommandRequest>({}); + }, + [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) -> Status { + handleReplyCalled = true; + return {ErrorCodes::CappedPositionLost, "woot"}; + })); + + startNetwork(); + + TaskExecutor::CallbackHandle cb{}; + + bool commandFinished = false; + bool errorPropagated = false; + + net().startCommand(cb, + RemoteCommandRequest{}, + [&](StatusWith<RemoteCommandResponse> resp) { + commandFinished = true; + errorPropagated = + resp.getStatus().code() == ErrorCodes::CappedPositionLost; + }); + + ASSERT(!handleReplyCalled); + + { + net().enterNetwork(); + ASSERT(net().hasReadyRequests()); + auto req = net().getNextReadyRequest(); + net().scheduleResponse(req, net().now(), RemoteCommandResponse{}); + net().runReadyNetworkOperations(); + net().exitNetwork(); + } + + ASSERT(handleReplyCalled); + ASSERT(commandFinished); + ASSERT(errorPropagated); +} + +} // namespace +} // namespace executor +} // namespace mongo |