/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO #include "mongo/platform/basic.h" #include #include "mongo/base/status_with.h" #include "mongo/db/jsobj.h" #include "mongo/db/wire_version.h" #include "mongo/executor/async_mock_stream_factory.h" #include "mongo/executor/async_timer_mock.h" #include "mongo/executor/network_interface_asio.h" #include "mongo/executor/network_interface_asio_test_utils.h" #include "mongo/executor/test_network_connection_hook.h" #include "mongo/rpc/factory.h" #include "mongo/rpc/legacy_reply_builder.h" #include "mongo/rpc/metadata/egress_metadata_hook_list.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/unittest.h" #include "mongo/util/log.h" #include "mongo/util/net/message.h" #include "mongo/util/scopeguard.h" namespace mongo { namespace executor { namespace { using ResponseStatus = TaskExecutor::ResponseStatus; HostAndPort testHost{"localhost", 20000}; void initWireSpecMongoD() { WireSpec& spec = WireSpec::instance(); // accept from latest internal version spec.incomingInternalClient.minWireVersion = LATEST_WIRE_VERSION; spec.incomingInternalClient.maxWireVersion = LATEST_WIRE_VERSION; // accept from any external version spec.incomingExternalClient.minWireVersion = RELEASE_2_4_AND_BEFORE; spec.incomingExternalClient.maxWireVersion = LATEST_WIRE_VERSION; // connect to latest spec.outgoing.minWireVersion = LATEST_WIRE_VERSION; spec.outgoing.maxWireVersion = LATEST_WIRE_VERSION; } // Utility function to use with mock streams RemoteCommandResponse simulateIsMaster(RemoteCommandRequest request) { ASSERT_EQ(std::string{request.cmdObj.firstElementFieldName()}, "isMaster"); ASSERT_EQ(request.dbname, "admin"); RemoteCommandResponse response; response.data = BSON("minWireVersion" << mongo::WireSpec::instance().incomingExternalClient.minWireVersion << "maxWireVersion" << mongo::WireSpec::instance().incomingExternalClient.maxWireVersion); return response; } BSONObj objConcat(std::initializer_list objs) { BSONObjBuilder bob; for (const auto& obj : objs) { bob.appendElements(obj); } return bob.obj(); } class NetworkInterfaceASIOTest : public mongo::unittest::Test { public: void setUp() override { initWireSpecMongoD(); NetworkInterfaceASIO::Options options; // Use mock timer factory auto timerFactory = stdx::make_unique(); _timerFactory = timerFactory.get(); options.timerFactory = std::move(timerFactory); auto factory = stdx::make_unique(); // keep unowned pointer, but pass ownership to NIA _streamFactory = factory.get(); options.streamFactory = std::move(factory); _net = stdx::make_unique(std::move(options)); _net->startup(); } void tearDown() override { if (!_net->inShutdown()) { _net->shutdown(); } } Deferred startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request) { Deferred deferredResponse; ASSERT_OK(net().startCommand( cbHandle, request, [deferredResponse](ResponseStatus response) mutable { deferredResponse.emplace(std::move(response)); })); return deferredResponse; } // Helper to run startCommand and wait for it RemoteCommandResponse startCommandSync(RemoteCommandRequest& request) { auto deferred = startCommand(makeCallbackHandle(), request); // wait for the operation to complete auto& result = deferred.get(); return result; } NetworkInterfaceASIO& net() { return *_net; } AsyncMockStreamFactory& streamFactory() { return *_streamFactory; } AsyncTimerFactoryMock& timerFactory() { return *_timerFactory; } void assertNumOps(uint64_t canceled, uint64_t timedOut, uint64_t failed, uint64_t succeeded) { ASSERT_EQ(canceled, net().getNumCanceledOps()); ASSERT_EQ(timedOut, net().getNumTimedOutOps()); ASSERT_EQ(failed, net().getNumFailedOps()); ASSERT_EQ(succeeded, net().getNumSucceededOps()); } protected: AsyncTimerFactoryMock* _timerFactory; AsyncMockStreamFactory* _streamFactory; std::unique_ptr _net; }; TEST_F(NetworkInterfaceASIOTest, CancelMissingOperation) { // This is just a sanity check, this action should have no effect. net().cancelCommand(makeCallbackHandle()); assertNumOps(0u, 0u, 0u, 0u); } TEST_F(NetworkInterfaceASIOTest, CancelOperation) { auto cbh = makeCallbackHandle(); // Kick off our operation RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr}; auto deferred = startCommand(cbh, request); // Create and initialize a stream so operation can begin auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); // simulate isMaster reply. stream->simulateServer(rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse { return simulateIsMaster(request); }); { // Cancel operation while blocked in the write for determinism. By calling cancel here we // ensure that it is not a no-op and that the asio::operation_aborted error will always // be returned to the NIA. WriteEvent write{stream}; net().cancelCommand(cbh); } // Wait for op to complete, assert that it was canceled. auto& result = deferred.get(); ASSERT_EQ(ErrorCodes::CallbackCanceled, result.status); ASSERT(result.elapsedMillis); assertNumOps(1u, 0u, 0u, 0u); } TEST_F(NetworkInterfaceASIOTest, ImmediateCancel) { auto cbh = makeCallbackHandle(); RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr}; auto deferred = startCommand(cbh, request); // Cancel immediately net().cancelCommand(cbh); // Allow stream to connect so operation can return auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); stream->simulateServer(rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse { return simulateIsMaster(request); }); auto& result = deferred.get(); ASSERT_EQ(ErrorCodes::CallbackCanceled, result.status); ASSERT(result.elapsedMillis); // expect 0 completed ops because the op was canceled before getting a connection assertNumOps(1u, 0u, 0u, 0u); } TEST_F(NetworkInterfaceASIOTest, LateCancel) { auto cbh = makeCallbackHandle(); RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr}; auto deferred = startCommand(cbh, request); // Allow stream to connect so operation can return auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); stream->simulateServer(rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse { return simulateIsMaster(request); }); // Simulate user command stream->simulateServer(rpc::Protocol::kOpMsg, [](RemoteCommandRequest request) -> RemoteCommandResponse { RemoteCommandResponse response; response.data = BSONObj(); response.metadata = BSONObj(); return response; }); // Allow to complete, then cancel, nothing should happen. auto& result = deferred.get(); net().cancelCommand(cbh); ASSERT(result.isOK()); ASSERT(result.elapsedMillis); assertNumOps(0u, 0u, 0u, 1u); } TEST_F(NetworkInterfaceASIOTest, CancelWithNetworkError) { auto cbh = makeCallbackHandle(); RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr}; auto deferred = startCommand(cbh, request); // Create and initialize a stream so operation can begin auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); // simulate isMaster reply. stream->simulateServer(rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse { return simulateIsMaster(request); }); { WriteEvent{stream}.skip(); ReadEvent read{stream}; // Trigger both a cancellation and a network error stream->setError(make_error_code(ErrorCodes::HostUnreachable)); net().cancelCommand(cbh); } // Wait for op to complete, assert that cancellation error had precedence. auto& result = deferred.get(); ASSERT(result.status == ErrorCodes::CallbackCanceled); ASSERT(result.elapsedMillis); assertNumOps(1u, 0u, 0u, 0u); } TEST_F(NetworkInterfaceASIOTest, CancelWithTimeout) { auto cbh = makeCallbackHandle(); RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr}; auto deferred = startCommand(cbh, request); // Create and initialize a stream so operation can begin auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); stream->simulateServer(rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse { return simulateIsMaster(request); }); { WriteEvent write{stream}; // Trigger both a cancellation and a timeout net().cancelCommand(cbh); timerFactory().fastForward(Milliseconds(500)); } // Wait for op to complete, assert that cancellation error had precedence. auto& result = deferred.get(); ASSERT_EQ(ErrorCodes::CallbackCanceled, result.status); ASSERT(result.elapsedMillis); assertNumOps(1u, 0u, 0u, 0u); } TEST_F(NetworkInterfaceASIOTest, TimeoutWithNetworkError) { auto cbh = makeCallbackHandle(); RemoteCommandRequest request{ testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr, Milliseconds(1000)}; auto deferred = startCommand(cbh, request); // Create and initialize a stream so operation can begin auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); stream->simulateServer(rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse { return simulateIsMaster(request); }); { WriteEvent{stream}.skip(); ReadEvent read{stream}; // Trigger both a timeout and a network error stream->setError(make_error_code(ErrorCodes::HostUnreachable)); timerFactory().fastForward(Milliseconds(2000)); } // Wait for op to complete, assert that timeout had precedence. auto& result = deferred.get(); ASSERT_EQ(ErrorCodes::NetworkInterfaceExceededTimeLimit, result.status); ASSERT(result.elapsedMillis); assertNumOps(0u, 1u, 1u, 0u); } TEST_F(NetworkInterfaceASIOTest, CancelWithTimeoutAndNetworkError) { auto cbh = makeCallbackHandle(); RemoteCommandRequest request{ testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr, Milliseconds(1000)}; auto deferred = startCommand(cbh, request); // Create and initialize a stream so operation can begin auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); stream->simulateServer(rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse { return simulateIsMaster(request); }); { WriteEvent{stream}.skip(); ReadEvent read{stream}; // Trigger a timeout, a cancellation, and a network error stream->setError(make_error_code(ErrorCodes::HostUnreachable)); timerFactory().fastForward(Milliseconds(2000)); net().cancelCommand(cbh); } // Wait for op to complete, assert that the cancellation had precedence. auto& result = deferred.get(); ASSERT_EQ(ErrorCodes::CallbackCanceled, result.status); ASSERT(result.elapsedMillis); assertNumOps(1u, 0u, 0u, 0u); } TEST_F(NetworkInterfaceASIOTest, AsyncOpTimeout) { // Kick off operation auto cb = makeCallbackHandle(); Milliseconds timeout(1000); RemoteCommandRequest request{testHost, "testDB", BSON("a" << 1), BSONObj(), nullptr, timeout}; auto deferred = startCommand(cb, request); // Create and initialize a stream so operation can begin auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); // Simulate isMaster reply. stream->simulateServer(rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse { return simulateIsMaster(request); }); { // Wait for the operation to block on write so we know it's been added. WriteEvent write{stream}; // Get the timer factory auto& factory = timerFactory(); // Advance clock but not enough to force a timeout, assert still active factory.fastForward(Milliseconds(500)); ASSERT(!deferred.hasCompleted()); // Advance clock and force timeout factory.fastForward(Milliseconds(500)); } auto& result = deferred.get(); ASSERT_EQ(ErrorCodes::NetworkInterfaceExceededTimeLimit, result.status); ASSERT(result.elapsedMillis); assertNumOps(0u, 1u, 1u, 0u); } TEST_F(NetworkInterfaceASIOTest, StartCommand) { RemoteCommandRequest request{testHost, "testDB", BSON("foo" << 1), BSON("bar" << 1), nullptr}; auto deferred = startCommand(makeCallbackHandle(), request); auto stream = streamFactory().blockUntilStreamExists(testHost); // Allow stream to connect. ConnectEvent{stream}.skip(); // simulate isMaster reply. stream->simulateServer(rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse { return simulateIsMaster(request); }); auto expectedMetadata = BSON("meep" << "beep"); auto expectedCommandReply = BSON("boop" << "bop" << "ok" << 1.0); auto expectedCommandReplyWithMetadata = objConcat({expectedCommandReply, expectedMetadata}); // simulate user command stream->simulateServer( rpc::Protocol::kOpMsg, [&](RemoteCommandRequest request) -> RemoteCommandResponse { ASSERT_EQ(std::string{request.cmdObj.firstElementFieldName()}, "foo"); ASSERT_EQ(request.dbname, "testDB"); RemoteCommandResponse response; response.data = expectedCommandReply; response.metadata = expectedMetadata; return response; }); auto& res = deferred.get(); ASSERT(res.elapsedMillis); uassertStatusOK(res.status); ASSERT_BSONOBJ_EQ(res.data, expectedCommandReplyWithMetadata); ASSERT_BSONOBJ_EQ(res.metadata, expectedCommandReplyWithMetadata); assertNumOps(0u, 0u, 0u, 1u); } TEST_F(NetworkInterfaceASIOTest, InShutdown) { ASSERT_FALSE(net().inShutdown()); net().shutdown(); ASSERT(net().inShutdown()); } TEST_F(NetworkInterfaceASIOTest, StartCommandReturnsNotOKIfShutdownHasStarted) { net().shutdown(); RemoteCommandRequest request; ASSERT_NOT_OK( net().startCommand(makeCallbackHandle(), request, [&](RemoteCommandResponse resp) {})); } class MalformedMessageTest : public NetworkInterfaceASIOTest { public: using MessageHook = stdx::function; void runMessageTest(ErrorCodes::Error code, bool loadBody, MessageHook hook) { // Kick off our operation RemoteCommandRequest request{testHost, "testDB", BSON("ping" << 1), BSONObj(), nullptr}; auto deferred = startCommand(makeCallbackHandle(), request); // Wait for it to block waiting for a write auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); stream->simulateServer(rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse { return simulateIsMaster(request); }); uint32_t messageId = 0; { // Get the appropriate message id WriteEvent write{stream}; std::vector messageData = stream->popWrite(); messageId = MsgData::ConstView(reinterpret_cast(messageData.data())).getId(); } // Build a mock reply message auto replyBuilder = rpc::makeReplyBuilder(rpc::Protocol::kOpMsg); replyBuilder->setCommandReply(BSON("hello!" << 1)); replyBuilder->setMetadata(BSONObj()); auto message = replyBuilder->done(); message.header().setResponseToMsgId(messageId); auto actualSize = message.header().getLen(); // Allow caller to mess with the Message hook(message.header()); { // Load the header ReadEvent read{stream}; auto headerBytes = reinterpret_cast(message.header().view2ptr()); stream->pushRead({headerBytes, headerBytes + sizeof(MSGHEADER::Value)}); } if (loadBody) { // Load the body if we need to ReadEvent read{stream}; auto dataBytes = reinterpret_cast(message.buf()); auto body = dataBytes; std::advance(body, sizeof(MSGHEADER::Value)); stream->pushRead({body, dataBytes + static_cast(actualSize)}); } auto& response = deferred.get(); ASSERT_EQ(code, response.status); ASSERT(response.elapsedMillis); assertNumOps(0u, 0u, 1u, 0u); } }; TEST_F(MalformedMessageTest, messageHeaderWrongResponseTo) { runMessageTest(ErrorCodes::ProtocolError, false, [](MsgData::View message) { message.setResponseToMsgId(message.getResponseToMsgId() + 1); }); } TEST_F(MalformedMessageTest, messageHeaderlenZero) { runMessageTest( ErrorCodes::InvalidLength, false, [](MsgData::View message) { message.setLen(0); }); } TEST_F(MalformedMessageTest, MessageHeaderLenTooSmall) { runMessageTest(ErrorCodes::InvalidLength, false, [](MsgData::View message) { message.setLen(6); }); // min is 16 } TEST_F(MalformedMessageTest, MessageHeaderLenTooLarge) { runMessageTest(ErrorCodes::InvalidLength, false, [](MsgData::View message) { message.setLen(48000001); }); // max is 48000000 } TEST_F(MalformedMessageTest, MessageHeaderLenNegative) { runMessageTest( ErrorCodes::InvalidLength, false, [](MsgData::View message) { message.setLen(-1); }); } TEST_F(MalformedMessageTest, MessageLenSmallerThanActual) { runMessageTest(ErrorCodes::InvalidBSON, true, [](MsgData::View message) { message.setLen(message.getLen() - 10); }); } TEST_F(MalformedMessageTest, FailedToReadAllBytesForMessage) { runMessageTest(ErrorCodes::InvalidLength, true, [](MsgData::View message) { message.setLen(message.getLen() + 100); }); } TEST_F(MalformedMessageTest, UnsupportedOpcode) { runMessageTest(ErrorCodes::UnsupportedFormat, true, [](MsgData::View message) { message.setOperation(2222); }); } TEST_F(MalformedMessageTest, MismatchedOpcode) { runMessageTest(ErrorCodes::UnsupportedFormat, true, [](MsgData::View message) { message.setOperation(2006); }); } class NetworkInterfaceASIOConnectionHookTest : public NetworkInterfaceASIOTest { public: void setUp() override {} void start(std::unique_ptr hook) { auto factory = stdx::make_unique(); // keep unowned pointer, but pass ownership to NIA _streamFactory = factory.get(); NetworkInterfaceASIO::Options options{}; options.streamFactory = std::move(factory); options.networkConnectionHook = std::move(hook); options.timerFactory = stdx::make_unique(); _net = stdx::make_unique(std::move(options)); _net->startup(); } }; TEST_F(NetworkInterfaceASIOConnectionHookTest, InvalidIsMaster) { auto validationFailedStatus = Status(ErrorCodes::InterruptedDueToReplStateChange, "operation was interrupted"); start(makeTestHook( [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) { return Status(ErrorCodes::UnknownError, "unused"); }, [&](const HostAndPort& remoteHost) -> StatusWith> { return {boost::none}; }, [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) { return Status::OK(); })); RemoteCommandRequest request{testHost, "blah", BSON("foo" << "bar"), nullptr}; auto deferred = startCommand(makeCallbackHandle(), request); auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); // simulate isMaster reply. stream->simulateServer(rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse { RemoteCommandResponse response; response.data = BSON("ok" << 0.0 << "errmsg" << "operation was interrupted" << "code" << 11602); return response; }); // we should stop here. auto& res = deferred.get(); ASSERT_EQ(validationFailedStatus, res.status); ASSERT(res.elapsedMillis); assertNumOps(0u, 0u, 1u, 0u); } TEST_F(NetworkInterfaceASIOConnectionHookTest, ValidateHostInvalid) { bool validateCalled = false; bool hostCorrect = false; bool isMasterReplyCorrect = false; bool makeRequestCalled = false; bool handleReplyCalled = false; auto validationFailedStatus = Status(ErrorCodes::AlreadyInitialized, "blahhhhh"); start(makeTestHook( [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) { validateCalled = true; hostCorrect = (remoteHost == testHost); isMasterReplyCorrect = (isMasterReply.data["TESTKEY"].str() == "TESTVALUE"); return validationFailedStatus; }, [&](const HostAndPort& remoteHost) -> StatusWith> { makeRequestCalled = true; return {boost::none}; }, [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) { handleReplyCalled = true; return Status::OK(); })); RemoteCommandRequest request{testHost, "blah", BSON("foo" << "bar"), nullptr}; auto deferred = startCommand(makeCallbackHandle(), request); auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); // simulate isMaster reply. stream->simulateServer( rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse { RemoteCommandResponse response; response.data = BSON("minWireVersion" << mongo::WireSpec::instance().incomingExternalClient.minWireVersion << "maxWireVersion" << mongo::WireSpec::instance().incomingExternalClient.maxWireVersion << "TESTKEY" << "TESTVALUE"); return response; }); // we should stop here. auto& res = deferred.get(); ASSERT_EQ(validationFailedStatus, res.status); ASSERT(res.elapsedMillis); ASSERT(validateCalled); ASSERT(hostCorrect); ASSERT(isMasterReplyCorrect); ASSERT(!makeRequestCalled); ASSERT(!handleReplyCalled); assertNumOps(0u, 0u, 1u, 0u); } TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsError) { bool makeRequestCalled = false; bool handleReplyCalled = false; Status makeRequestError{ErrorCodes::DBPathInUse, "bloooh"}; start(makeTestHook( [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status { return Status::OK(); }, [&](const HostAndPort& remoteHost) -> StatusWith> { makeRequestCalled = true; return makeRequestError; }, [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) { handleReplyCalled = true; return Status::OK(); })); RemoteCommandRequest request{testHost, "blah", BSON("foo" << "bar"), nullptr}; auto deferred = startCommand(makeCallbackHandle(), request); auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); // simulate isMaster reply. stream->simulateServer(rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse { return simulateIsMaster(request); }); // We should stop here. auto& res = deferred.get(); ASSERT_EQ(makeRequestError, res.status); ASSERT(res.elapsedMillis); ASSERT(makeRequestCalled); ASSERT(!handleReplyCalled); assertNumOps(0u, 0u, 1u, 0u); } TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsNone) { bool makeRequestCalled = false; bool handleReplyCalled = false; start(makeTestHook( [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status { return Status::OK(); }, [&](const HostAndPort& remoteHost) -> StatusWith> { makeRequestCalled = true; return {boost::none}; }, [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) { handleReplyCalled = true; return Status::OK(); })); auto commandRequest = BSON("foo" << "bar"); auto commandReply = BSON("foo" << "boo" << "ok" << 1.0); auto metadata = BSON("aaa" << "bbb"); auto commandReplyWithMetadata = objConcat({commandReply, metadata}); RemoteCommandRequest request{testHost, "blah", commandRequest, nullptr}; auto deferred = startCommand(makeCallbackHandle(), request); auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); // simulate isMaster reply. stream->simulateServer(rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse { return simulateIsMaster(request); }); // Simulate user command. stream->simulateServer(rpc::Protocol::kOpMsg, [&](RemoteCommandRequest request) -> RemoteCommandResponse { ASSERT_BSONOBJ_EQ(commandRequest, request.cmdObj.removeField("$db")); RemoteCommandResponse response; response.data = commandReply; response.metadata = metadata; return response; }); // We should get back the reply now. auto& result = deferred.get(); ASSERT(result.isOK()); ASSERT_BSONOBJ_EQ(commandReplyWithMetadata, result.data); ASSERT(result.elapsedMillis); ASSERT_BSONOBJ_EQ(commandReplyWithMetadata, result.metadata); assertNumOps(0u, 0u, 0u, 1u); } TEST_F(NetworkInterfaceASIOConnectionHookTest, HandleReplyReturnsError) { bool makeRequestCalled = false; bool handleReplyCalled = false; bool handleReplyArgumentCorrect = false; BSONObj hookCommandRequest = BSON("1ddd" << "fff"); BSONObj hookRequestMetadata = BSON("wdwd" << 1212); BSONObj hookCommandReply = BSON("blah" << "blah" << "ok" << 1.0); BSONObj hookUnifiedRequest = ([&] { BSONObjBuilder bob; bob.appendElements(hookCommandRequest); bob.appendElements(hookRequestMetadata); bob.append("$db", "foo"); return bob.obj(); }()); BSONObj hookReplyMetadata = BSON("1111" << 2222); BSONObj hookCommandReplyWithMetadata = objConcat({hookCommandReply, hookReplyMetadata}); Status handleReplyError{ErrorCodes::AuthSchemaIncompatible, "daowdjkpowkdjpow"}; start(makeTestHook( [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status { return Status::OK(); }, [&](const HostAndPort& remoteHost) -> StatusWith> { makeRequestCalled = true; return {boost::make_optional( {testHost, "foo", hookCommandRequest, hookRequestMetadata, nullptr})}; }, [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) { handleReplyCalled = true; handleReplyArgumentCorrect = SimpleBSONObjComparator::kInstance.evaluate( response.data == hookCommandReplyWithMetadata) && SimpleBSONObjComparator::kInstance.evaluate(response.metadata == hookCommandReplyWithMetadata); return handleReplyError; })); auto commandRequest = BSON("foo" << "bar"); RemoteCommandRequest request{testHost, "blah", commandRequest, nullptr}; auto deferred = startCommand(makeCallbackHandle(), request); auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); // simulate isMaster reply. stream->simulateServer(rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse { return simulateIsMaster(request); }); // Simulate hook reply stream->simulateServer(rpc::Protocol::kOpMsg, [&](RemoteCommandRequest request) -> RemoteCommandResponse { ASSERT_BSONOBJ_EQ(request.cmdObj, hookUnifiedRequest); ASSERT_BSONOBJ_EQ(request.metadata, BSONObj()); RemoteCommandResponse response; response.data = hookCommandReply; response.metadata = hookReplyMetadata; return response; }); auto& result = deferred.get(); ASSERT_EQ(handleReplyError, result.status); ASSERT(result.elapsedMillis); ASSERT(makeRequestCalled); ASSERT(handleReplyCalled); ASSERT(handleReplyArgumentCorrect); assertNumOps(0u, 0u, 1u, 0u); } TEST_F(NetworkInterfaceASIOTest, SetAlarm) { // set a first alarm, to execute after "expiration" Date_t expiration = net().now() + Milliseconds(100); Deferred deferred; ASSERT_OK(net().setAlarm( expiration, [this, expiration, deferred]() mutable { deferred.emplace(net().now()); })); // Get our timer factory auto& factory = timerFactory(); // force the alarm to fire factory.fastForward(Milliseconds(5000)); // assert that it executed after "expiration" auto& result = deferred.get(); ASSERT(result >= expiration); expiration = net().now() + Milliseconds(99999999); Deferred deferred2; ASSERT_OK(net().setAlarm(expiration, [this, deferred2]() mutable { deferred2.emplace(true); })); net().shutdown(); ASSERT(!deferred2.hasCompleted()); } TEST_F(NetworkInterfaceASIOTest, SetAlarmReturnsNotOKIfShutdownHasStarted) { net().shutdown(); ASSERT_NOT_OK(net().setAlarm(net().now() + Milliseconds(100), [] {})); } TEST_F(NetworkInterfaceASIOTest, IsMasterRequestContainsOutgoingWireVersionInternalClientInfo) { WireSpec::instance().isInternalClient = true; RemoteCommandRequest request{testHost, "testDB", BSON("ping" << 1), BSONObj(), nullptr}; auto deferred = startCommand(makeCallbackHandle(), request); auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); // Verify that the isMaster reply has the expected internalClient data. stream->simulateServer( rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse { auto internalClientElem = request.cmdObj["internalClient"]; ASSERT_EQ(internalClientElem.type(), BSONType::Object); auto minWireVersionElem = internalClientElem.Obj()["minWireVersion"]; auto maxWireVersionElem = internalClientElem.Obj()["maxWireVersion"]; ASSERT_EQ(minWireVersionElem.type(), BSONType::NumberInt); ASSERT_EQ(maxWireVersionElem.type(), BSONType::NumberInt); ASSERT_EQ(minWireVersionElem.numberInt(), WireSpec::instance().outgoing.minWireVersion); ASSERT_EQ(maxWireVersionElem.numberInt(), WireSpec::instance().outgoing.maxWireVersion); return simulateIsMaster(request); }); // Simulate ping reply. stream->simulateServer(rpc::Protocol::kOpMsg, [&](RemoteCommandRequest request) -> RemoteCommandResponse { RemoteCommandResponse response; response.data = BSON("ok" << 1); return response; }); // Verify that the ping op is counted as a success. auto& res = deferred.get(); ASSERT(res.elapsedMillis); assertNumOps(0u, 0u, 0u, 1u); } TEST_F(NetworkInterfaceASIOTest, IsMasterRequestMissingInternalClientInfoWhenNotInternalClient) { WireSpec::instance().isInternalClient = false; RemoteCommandRequest request{testHost, "testDB", BSON("ping" << 1), BSONObj(), nullptr}; auto deferred = startCommand(makeCallbackHandle(), request); auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); // Verify that the isMaster reply has the expected internalClient data. stream->simulateServer(rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse { ASSERT_FALSE(request.cmdObj["internalClient"]); return simulateIsMaster(request); }); // Simulate ping reply. stream->simulateServer(rpc::Protocol::kOpMsg, [&](RemoteCommandRequest request) -> RemoteCommandResponse { RemoteCommandResponse response; response.data = BSON("ok" << 1); return response; }); // Verify that the ping op is counted as a success. auto& res = deferred.get(); ASSERT(res.elapsedMillis); assertNumOps(0u, 0u, 0u, 1u); } class NetworkInterfaceASIOMetadataTest : public NetworkInterfaceASIOTest { protected: void setUp() override {} void start(std::unique_ptr metadataHook) { auto factory = stdx::make_unique(); _streamFactory = factory.get(); NetworkInterfaceASIO::Options options{}; options.streamFactory = std::move(factory); options.metadataHook = std::move(metadataHook); options.timerFactory = stdx::make_unique(); _net = stdx::make_unique(std::move(options)); _net->startup(); } }; class TestMetadataHook : public rpc::EgressMetadataHook { public: TestMetadataHook(bool* wroteRequestMetadata, bool* gotReplyMetadata) : _wroteRequestMetadata(wroteRequestMetadata), _gotReplyMetadata(gotReplyMetadata) {} Status writeRequestMetadata(OperationContext* opCtx, BSONObjBuilder* metadataBob) override { metadataBob->append("foo", "bar"); *_wroteRequestMetadata = true; return Status::OK(); } Status readReplyMetadata(OperationContext* opCtx, StringData replySource, const BSONObj& metadataObj) override { *_gotReplyMetadata = (metadataObj["baz"].str() == "garply"); return Status::OK(); } private: bool* _wroteRequestMetadata; bool* _gotReplyMetadata; }; TEST_F(NetworkInterfaceASIOMetadataTest, Metadata) { bool wroteRequestMetadata = false; bool gotReplyMetadata = false; auto hookList = stdx::make_unique(); hookList->addHook( stdx::make_unique(&wroteRequestMetadata, &gotReplyMetadata)); start(std::move(hookList)); RemoteCommandRequest request{testHost, "blah", BSON("ping" << 1), nullptr}; auto deferred = startCommand(makeCallbackHandle(), request); auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); // simulate isMaster reply. stream->simulateServer(rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse { return simulateIsMaster(request); }); // Simulate hook reply stream->simulateServer(rpc::Protocol::kOpMsg, [&](RemoteCommandRequest request) -> RemoteCommandResponse { ASSERT_EQ("bar", request.cmdObj["foo"].str()); RemoteCommandResponse response; response.data = BSON("ok" << 1); response.metadata = BSON("baz" << "garply"); return response; }); auto& res = deferred.get(); ASSERT(res.elapsedMillis); ASSERT(wroteRequestMetadata); ASSERT(gotReplyMetadata); assertNumOps(0u, 0u, 0u, 1u); } } // namespace } // namespace executor } // namespace mongo