/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/platform/basic.h"
#include
#include
#include
#include "mongo/base/status.h"
#include "mongo/executor/network_connection_hook.h"
#include "mongo/executor/network_interface.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/test_network_connection_hook.h"
#include "mongo/executor/thread_pool_mock.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
namespace executor {
namespace {
class NetworkInterfaceMockTest : public mongo::unittest::Test {
public:
NetworkInterfaceMockTest()
: _net{}, _executor(&_net, 1, ThreadPoolMock::Options()), _tearDownCalled(false) {}
NetworkInterfaceMock& net() {
return _net;
}
ThreadPoolMock& executor() {
return _executor;
}
HostAndPort testHost() {
return {"localHost", 27017};
}
// intentionally not done in setUp as some methods need to be called prior to starting
// the network.
void startNetwork() {
net().startup();
executor().startup();
}
virtual void setUp() override {
_tearDownCalled = false;
}
virtual void tearDown() override {
// We're calling tearDown() manually in some tests so
// we can check post-conditions.
if (_tearDownCalled) {
return;
}
_tearDownCalled = true;
net().exitNetwork();
executor().shutdown();
// Wake up sleeping executor threads so they clean up.
net().signalWorkAvailable();
executor().join();
net().shutdown();
}
private:
NetworkInterfaceMock _net;
ThreadPoolMock _executor;
bool _tearDownCalled;
};
TEST_F(NetworkInterfaceMockTest, ConnectionHook) {
bool validateCalled = false;
bool hostCorrectForValidate = false;
bool replyCorrectForValidate;
bool makeRequestCalled = false;
bool hostCorrectForRequest = false;
bool handleReplyCalled = false;
bool gotExpectedReply = false;
RemoteCommandRequest expectedRequest{testHost(),
"test",
BSON("1" << 2),
BSON("some"
<< "stuff"),
nullptr};
RemoteCommandResponse expectedResponse{BSON("foo"
<< "bar"
<< "baz"
<< "garply"),
BSON("bar"
<< "baz"),
Milliseconds(30)};
// need to copy as it will be moved
auto isMasterReplyData = BSON("iamyour"
<< "father");
RemoteCommandResponse isMasterReply{
isMasterReplyData.copy(), BSON("blah" << 2), Milliseconds(20)};
net().setHandshakeReplyForHost(testHost(), std::move(isMasterReply));
// Since the contract of these methods is that they do not throw, we run the ASSERTs in
// the test scope.
net().setConnectionHook(makeTestHook(
[&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) {
validateCalled = true;
hostCorrectForValidate = (remoteHost == testHost());
replyCorrectForValidate = SimpleBSONObjComparator::kInstance.evaluate(
isMasterReply.data == isMasterReplyData);
return Status::OK();
},
[&](const HostAndPort& remoteHost) {
makeRequestCalled = true;
hostCorrectForRequest = (remoteHost == testHost());
return boost::make_optional(expectedRequest);
},
[&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) {
handleReplyCalled = true;
hostCorrectForRequest = (remoteHost == testHost());
gotExpectedReply = SimpleBSONObjComparator::kInstance.evaluate(
expectedResponse.data == response.data); // Don't bother checking all fields.
return Status::OK();
}));
startNetwork();
TaskExecutor::CallbackHandle cb{};
bool commandFinished = false;
bool gotCorrectCommandReply = false;
RemoteCommandRequest actualCommandExpected{
testHost(), "testDB", BSON("test" << 1), rpc::makeEmptyMetadata(), nullptr};
RemoteCommandResponse actualResponseExpected{BSON("1212121212"
<< "12121212121212"),
BSONObj(),
Milliseconds(0)};
ASSERT_OK(net().startCommand(cb, actualCommandExpected, [&](RemoteCommandResponse resp) {
commandFinished = true;
if (resp.isOK()) {
gotCorrectCommandReply = (actualResponseExpected.toString() == resp.toString());
}
}));
// At this point validate and makeRequest should have been called.
ASSERT(validateCalled);
ASSERT(hostCorrectForValidate);
ASSERT(replyCorrectForValidate);
ASSERT(makeRequestCalled);
ASSERT(hostCorrectForRequest);
// handleReply should not have been called as we haven't responded to the reply yet.
ASSERT(!handleReplyCalled);
// we haven't gotten to the actual command yet
ASSERT(!commandFinished);
{
net().enterNetwork();
ASSERT(net().hasReadyRequests());
auto req = net().getNextReadyRequest();
ASSERT_BSONOBJ_EQ(req->getRequest().cmdObj, expectedRequest.cmdObj);
net().scheduleResponse(req, net().now(), expectedResponse);
net().runReadyNetworkOperations();
net().exitNetwork();
}
// We should have responded to the post connect command.
ASSERT(handleReplyCalled);
ASSERT(gotExpectedReply);
// We should not have responsed to the actual command.
ASSERT(!commandFinished);
{
net().enterNetwork();
ASSERT(net().hasReadyRequests());
auto actualCommand = net().getNextReadyRequest();
ASSERT_BSONOBJ_EQ(actualCommand->getRequest().cmdObj, actualCommandExpected.cmdObj);
net().scheduleResponse(actualCommand, net().now(), actualResponseExpected);
net().runReadyNetworkOperations();
net().exitNetwork();
}
ASSERT(commandFinished);
ASSERT(gotCorrectCommandReply);
}
TEST_F(NetworkInterfaceMockTest, ConnectionHookFailedValidation) {
net().setConnectionHook(makeTestHook(
[&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status {
// We just need some obscure non-OK code.
return {ErrorCodes::ConflictingOperationInProgress, "blah"};
},
[&](const HostAndPort& remoteHost) -> StatusWith> {
MONGO_UNREACHABLE;
},
[&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) -> Status {
MONGO_UNREACHABLE;
}));
startNetwork();
TaskExecutor::CallbackHandle cb{};
bool commandFinished = false;
bool statusPropagated = false;
RemoteCommandRequest request;
ASSERT_OK(net().startCommand(cb, request, [&](RemoteCommandResponse resp) {
commandFinished = true;
statusPropagated = resp.status.code() == ErrorCodes::ConflictingOperationInProgress;
}));
{
net().enterNetwork();
// We should have short-circuited the network and immediately called the callback.
// If we change isMaster replies to go through the normal network mechanism,
// this test will need to change.
ASSERT(!net().hasReadyRequests());
net().exitNetwork();
}
ASSERT(commandFinished);
ASSERT(statusPropagated);
}
TEST_F(NetworkInterfaceMockTest, ConnectionHookNoRequest) {
bool makeRequestCalled = false;
net().setConnectionHook(makeTestHook(
[&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status {
return Status::OK();
},
[&](const HostAndPort& remoteHost) -> StatusWith> {
makeRequestCalled = true;
return {boost::none};
},
[&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) -> Status {
MONGO_UNREACHABLE;
}));
startNetwork();
TaskExecutor::CallbackHandle cb{};
bool commandFinished = false;
RemoteCommandRequest request;
ASSERT_OK(net().startCommand(
cb, request, [&](RemoteCommandResponse resp) { commandFinished = true; }));
{
net().enterNetwork();
ASSERT(net().hasReadyRequests());
auto req = net().getNextReadyRequest();
net().scheduleResponse(req, net().now(), RemoteCommandResponse{});
net().runReadyNetworkOperations();
net().exitNetwork();
}
ASSERT(commandFinished);
}
TEST_F(NetworkInterfaceMockTest, ConnectionHookMakeRequestFails) {
bool makeRequestCalled = false;
net().setConnectionHook(makeTestHook(
[&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status {
return Status::OK();
},
[&](const HostAndPort& remoteHost) -> StatusWith> {
makeRequestCalled = true;
return {ErrorCodes::InvalidSyncSource, "blah"};
},
[&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) -> Status {
MONGO_UNREACHABLE;
}));
startNetwork();
TaskExecutor::CallbackHandle cb{};
bool commandFinished = false;
bool errorPropagated = false;
RemoteCommandRequest request;
ASSERT_OK(net().startCommand(cb, request, [&](RemoteCommandResponse resp) {
commandFinished = true;
errorPropagated = resp.status.code() == ErrorCodes::InvalidSyncSource;
}));
{
net().enterNetwork();
ASSERT(!net().hasReadyRequests());
net().exitNetwork();
}
ASSERT(commandFinished);
ASSERT(errorPropagated);
}
TEST_F(NetworkInterfaceMockTest, ConnectionHookHandleReplyFails) {
bool handleReplyCalled = false;
net().setConnectionHook(makeTestHook(
[&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status {
return Status::OK();
},
[&](const HostAndPort& remoteHost) -> StatusWith> {
return boost::make_optional({});
},
[&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) -> Status {
handleReplyCalled = true;
return {ErrorCodes::CappedPositionLost, "woot"};
}));
startNetwork();
TaskExecutor::CallbackHandle cb{};
bool commandFinished = false;
bool errorPropagated = false;
RemoteCommandRequest request;
ASSERT_OK(net().startCommand(cb, request, [&](RemoteCommandResponse resp) {
commandFinished = true;
errorPropagated = resp.status.code() == ErrorCodes::CappedPositionLost;
}));
ASSERT(!handleReplyCalled);
{
net().enterNetwork();
ASSERT(net().hasReadyRequests());
auto req = net().getNextReadyRequest();
net().scheduleResponse(req, net().now(), RemoteCommandResponse{});
net().runReadyNetworkOperations();
net().exitNetwork();
}
ASSERT(handleReplyCalled);
ASSERT(commandFinished);
ASSERT(errorPropagated);
}
TEST_F(NetworkInterfaceMockTest, InShutdown) {
startNetwork();
ASSERT_FALSE(net().inShutdown());
tearDown();
ASSERT(net().inShutdown());
}
TEST_F(NetworkInterfaceMockTest, StartCommandReturnsNotOKIfShutdownHasStarted) {
startNetwork();
tearDown();
TaskExecutor::CallbackHandle cb{};
RemoteCommandRequest request;
ASSERT_NOT_OK(net().startCommand(cb, request, [](RemoteCommandResponse resp) {}));
}
TEST_F(NetworkInterfaceMockTest, SetAlarmReturnsNotOKIfShutdownHasStarted) {
startNetwork();
tearDown();
ASSERT_NOT_OK(net().setAlarm(net().now() + Milliseconds(100), [] {}));
}
TEST_F(NetworkInterfaceMockTest, CommandTimeout) {
startNetwork();
TaskExecutor::CallbackHandle cb;
RemoteCommandRequest request;
request.timeout = Milliseconds(2000);
ErrorCodes::Error statusPropagated = ErrorCodes::OK;
auto finishFn = [&](RemoteCommandResponse resp) { statusPropagated = resp.status.code(); };
//
// Command times out.
//
ASSERT_OK(net().startCommand(cb, request, finishFn));
net().enterNetwork();
ASSERT(net().hasReadyRequests());
net().blackHole(net().getNextReadyRequest());
net().runUntil(net().now() + Milliseconds(2010));
net().exitNetwork();
ASSERT_NOT_EQUALS(ErrorCodes::OK, statusPropagated);
//
// Command finishes before timeout.
//
Date_t start = net().now();
ASSERT_OK(net().startCommand(cb, request, finishFn));
net().enterNetwork();
// Consume the request. We'll schedule a successful response later.
ASSERT(net().hasReadyRequests());
auto noi = net().getNextReadyRequest();
// Assert the command hasn't timed out after 1000ms.
net().runUntil(start + Milliseconds(1000));
ASSERT_EQUALS(start + Milliseconds(1000), net().now());
ASSERT_NOT_EQUALS(ErrorCodes::OK, statusPropagated);
// Reply with a successful response.
net().scheduleResponse(noi, net().now(), {});
net().runReadyNetworkOperations();
net().exitNetwork();
ASSERT_EQUALS(ErrorCodes::OK, statusPropagated);
ASSERT_EQUALS(start + Milliseconds(1000), net().now());
}
} // namespace
} // namespace executor
} // namespace mongo