From 4dce840273127d4b2c86f8c790e081f19e44887a Mon Sep 17 00:00:00 2001 From: Billy Donahue Date: Fri, 5 Nov 2021 03:32:05 +0000 Subject: SERVER-61095 improvements to transport_layer_asio_test (again) --- src/mongo/transport/transport_layer_asio.h | 9 +- src/mongo/transport/transport_layer_asio_test.cpp | 553 +++++++++++----------- src/mongo/util/net/sock.cpp | 4 +- src/mongo/util/net/sock.h | 4 +- 4 files changed, 286 insertions(+), 284 deletions(-) diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h index 665d0411167..8c48b3f6ec7 100644 --- a/src/mongo/transport/transport_layer_asio.h +++ b/src/mongo/transport/transport_layer_asio.h @@ -68,13 +68,14 @@ class ServiceEntryPoint; namespace transport { -// This fail point simulates reads and writes that always return 1 byte and fail with EAGAIN +// Simulates reads and writes that always return 1 byte and fail with EAGAIN extern FailPoint transportLayerASIOshortOpportunisticReadWrite; -// This fail point will cause an asyncConnect to timeout after it's successfully connected -// to the remote peer +// Cause an asyncConnect to timeout after it's successfully connected to the remote peer extern FailPoint transportLayerASIOasyncConnectTimesOut; +extern FailPoint transportLayerASIOhangBeforeAccept; + /** * A TransportLayer implementation based on ASIO networking primitives. */ @@ -115,7 +116,7 @@ public: ServiceEntryPoint* sep, const WireSpec& wireSpec = WireSpec::instance()); - virtual ~TransportLayerASIO(); + ~TransportLayerASIO() override; StatusWith connect(HostAndPort peer, ConnectSSLMode sslMode, diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp index 7bfc2e3f5c4..740c6cbbd36 100644 --- a/src/mongo/transport/transport_layer_asio_test.cpp +++ b/src/mongo/transport/transport_layer_asio_test.cpp @@ -28,196 +28,180 @@ */ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest -#include "mongo/platform/basic.h" - #include "mongo/transport/transport_layer_asio.h" +#include +#include +#include +#include + +#include + #include "mongo/db/server_options.h" #include "mongo/logv2/log.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/platform/basic.h" #include "mongo/rpc/op_msg.h" #include "mongo/transport/service_entry_point.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" +#include "mongo/util/concurrency/notification.h" #include "mongo/util/net/sock.h" - -#include "asio.hpp" +#include "mongo/util/synchronized_value.h" +#include "mongo/util/time_support.h" namespace mongo { namespace { -class ServiceEntryPointUtil : public ServiceEntryPoint { +#ifdef _WIN32 +using SetsockoptPtr = char*; +#else +using SetsockoptPtr = void*; +#endif + +template +class BlockingQueue { public: - void startSession(transport::SessionHandle session) override { - stdx::unique_lock lk(_mutex); - _sessions.push_back(std::move(session)); - LOGV2(2303201, "started session"); + void push(T t) { + stdx::unique_lock lk(_mu); + _q.push(std::move(t)); + lk.unlock(); _cv.notify_one(); } - void endAllSessions(transport::Session::TagMask tags) override { - LOGV2(2303301, "end all sessions"); - std::vector old_sessions; - { - stdx::unique_lock lock(_mutex); - old_sessions.swap(_sessions); - } - old_sessions.clear(); - } - - Status start() override { - return Status::OK(); + T pop() { + stdx::unique_lock lk(_mu); + _cv.wait(lk, [&] { return !_q.empty(); }); + T r = std::move(_q.front()); + _q.pop(); + return r; } - bool shutdown(Milliseconds timeout) override { - return true; - } +private: + mutable Mutex _mu; + mutable stdx::condition_variable _cv; + std::queue _q; +}; - void appendStats(BSONObjBuilder*) const override {} +class ConnectionThread { +public: + explicit ConnectionThread(int port) : ConnectionThread(port, nullptr) {} + ConnectionThread(int port, std::function onConnect) + : _port{port}, _onConnect{std::move(onConnect)}, _thr{[this] { _run(); }} {} - size_t numOpenSessions() const override { - stdx::unique_lock lock(_mutex); - return _sessions.size(); + ~ConnectionThread() { + LOGV2(6109500, "connection: Tx stop request"); + _stopRequest.set(true); + _thr.join(); + LOGV2(6109501, "connection: joined"); } - Future handleRequest(OperationContext* opCtx, - const Message& request) noexcept override { - MONGO_UNREACHABLE; + void close() { + _s.close(); } - void setTransportLayer(transport::TransportLayer* tl) { - _transport = tl; + Socket& socket() { + return _s; } - void waitForConnect() { - stdx::unique_lock lock(_mutex); - _cv.wait(lock, [&] { return !_sessions.empty(); }); +private: + void _run() { + _s.connect(SockAddr::create("localhost", _port, AF_INET)); + LOGV2(6109502, "connected", "port"_attr = _port); + if (_onConnect) + _onConnect(*this); + _stopRequest.get(); + LOGV2(6109503, "connection: Rx stop request"); } -private: - mutable Mutex _mutex = MONGO_MAKE_LATCH("::_mutex"); - stdx::condition_variable _cv; - std::vector _sessions; - transport::TransportLayer* _transport = nullptr; + int _port; + std::function _onConnect; + stdx::thread _thr; + Socket _s; + Notification _stopRequest; }; -class SimpleConnectionThread { +class SyncClient { public: - explicit SimpleConnectionThread(int port) : _port(port) { - _thr = stdx::thread{[&] { - auto sa = SockAddr::create("localhost", _port, AF_INET); - _s.connect(sa); - LOGV2(23034, "connection: port {port}", "port"_attr = _port); - stdx::unique_lock lk(_mutex); - _cv.wait(lk, [&] { return _stop; }); - LOGV2(23035, "connection: Rx stop request"); - }}; - } - - void forceCloseSocket() { - // Setting linger on to a zero-value timeout causes the socket to send an RST packet to the - // recipient side when closing the connection. - struct linger sl = {1, 0}; -#ifdef _WIN32 - char* pval = reinterpret_cast(&sl); -#else - void* pval = &sl; -#endif - ASSERT(!setsockopt(_s.rawFD(), SOL_SOCKET, SO_LINGER, pval, sizeof(sl))); - _s.close(); + explicit SyncClient(int port) { + std::error_code ec; + _sock.connect(asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port), ec); + ASSERT_EQ(ec, std::error_code()); + LOGV2(6109504, "sync client connected"); } - void stop() { - { - stdx::unique_lock lk(_mutex); - _stop = true; - } - LOGV2(23036, "connection: Tx stop request"); - _cv.notify_one(); - _thr.join(); - LOGV2(23037, "connection: stopped"); + std::error_code write(const char* buf, size_t bufSize) { + std::error_code ec; + asio::write(_sock, asio::buffer(buf, bufSize), ec); + return ec; } private: - Mutex _mutex = MONGO_MAKE_LATCH("SimpleConnectionThread::_mutex"); - stdx::condition_variable _cv; - stdx::thread _thr; - bool _stop = false; - - Socket _s; - int _port; + asio::io_context _ctx{}; + asio::ip::tcp::socket _sock{_ctx}; }; -std::unique_ptr makeAndStartTL(ServiceEntryPoint* sep) { - auto options = [] { - ServerGlobalParams params; - params.noUnixSocket = true; - transport::TransportLayerASIO::Options opts(¶ms); - - // TODO SERVER-30212 should clean this up and assign a port from the supplied port range - // provided by resmoke. - opts.port = 0; - return opts; - }(); - - auto tla = std::make_unique(options, sep); - ASSERT_OK(tla->setup()); - ASSERT_OK(tla->start()); - - return tla; +void ping(SyncClient& client) { + OpMsgBuilder builder; + builder.setBody(BSON("ping" << 1)); + Message msg = builder.finish(); + msg.header().setResponseToMsgId(0); + msg.header().setId(0); + OpMsg::appendChecksum(&msg); + ASSERT_EQ(client.write(msg.buf(), msg.size()), std::error_code{}); } -TEST(TransportLayerASIO, PortZeroConnect) { - ServiceEntryPointUtil sepu; - auto tla = makeAndStartTL(&sepu); - - int port = tla->listenerPort(); - ASSERT_GT(port, 0); - LOGV2(23038, "TransportLayerASIO.listenerPort() is {port}", "port"_attr = port); +struct SessionThread { + struct StopException {}; - SimpleConnectionThread connect_thread(port); - sepu.waitForConnect(); + explicit SessionThread(std::shared_ptr s) + : _session{std::move(s)}, _thread{[this] { run(); }} {} - ASSERT_EQ(sepu.numOpenSessions(), 1); - connect_thread.stop(); - sepu.endAllSessions({}); - tla->shutdown(); -} + ~SessionThread() { + _join(); + } -TEST(TransportLayerASIO, TCPResetAfterConnectionIsSilentlySwallowed) { - ServiceEntryPointUtil sepu; - auto tla = makeAndStartTL(&sepu); + void schedule(std::function task) { + _tasks.push(std::move(task)); + } - auto hangBeforeAcceptFp = globalFailPointRegistry().find("transportLayerASIOhangBeforeAccept"); - auto timesEntered = hangBeforeAcceptFp->setMode(FailPoint::alwaysOn); + void run() { + while (true) { + try { + LOGV2(6109508, "SessionThread: pop and execute a task"); + _tasks.pop()(*_session); + } catch (const StopException&) { + LOGV2(6109509, "SessionThread: stopping"); + return; + } + } + } - SimpleConnectionThread connect_thread(tla->listenerPort()); - hangBeforeAcceptFp->waitForTimesEntered(timesEntered + 1); + transport::Session& session() const { + return *_session; + } - connect_thread.forceCloseSocket(); - hangBeforeAcceptFp->setMode(FailPoint::off); +private: + void _join() { + if (!_thread.joinable()) + return; + schedule([](auto&&) { throw StopException{}; }); + _thread.join(); + } - ASSERT_EQ(sepu.numOpenSessions(), 0); - connect_thread.stop(); - tla->shutdown(); -} + std::shared_ptr _session; + stdx::thread _thread; + BlockingQueue> _tasks; +}; -class TimeoutSEP : public ServiceEntryPoint { +class MockSEP : public ServiceEntryPoint { public: - ~TimeoutSEP() override { - // This should shutdown immediately, so give the maximum timeout - shutdown(Milliseconds::max()); - } + MockSEP() = default; + explicit MockSEP(std::function onStartSession) + : _onStartSession(std::move(onStartSession)) {} - void endAllSessions(transport::Session::TagMask tags) override { - MONGO_UNREACHABLE; - } - - bool shutdown(Milliseconds timeout) override { - LOGV2(23039, "Joining all worker threads"); - for (auto& thread : _workerThreads) { - thread.join(); - } - return true; + ~MockSEP() override { + _join(); } Status start() override { @@ -226,182 +210,199 @@ public: void appendStats(BSONObjBuilder*) const override {} - size_t numOpenSessions() const override { - return 0; - } - Future handleRequest(OperationContext* opCtx, const Message& request) noexcept override { MONGO_UNREACHABLE; } - bool waitForTimeout(boost::optional timeout = boost::none) { - stdx::unique_lock lk(_mutex); - bool ret = true; - if (timeout) { - ret = _cond.wait_for(lk, timeout->toSystemDuration(), [this] { return _finished; }); - } else { - _cond.wait(lk, [this] { return _finished; }); - } - - _finished = false; - return ret; + void startSession(std::shared_ptr session) override { + LOGV2(6109510, "Accepted connection", "remote"_attr = session->remote()); + auto& newSession = [&]() -> SessionThread& { + auto vec = *_sessions; + vec->push_back(std::make_unique(std::move(session))); + return *vec->back(); + }(); + if (_onStartSession) + _onStartSession(newSession); + LOGV2(6109511, "started session"); } -protected: - void notifyComplete() { - stdx::unique_lock lk(_mutex); - _finished = true; - _cond.notify_one(); + void endAllSessions(transport::Session::TagMask tags) override { + _join(); } - template - void startWorkerThread(FunT&& fun) { - _workerThreads.emplace_back(std::forward(fun)); + bool shutdown(Milliseconds timeout) override { + _join(); + return true; } -private: - Mutex _mutex = MONGO_MAKE_LATCH("TimeoutSEP::_mutex"); - - stdx::condition_variable _cond; - bool _finished = false; - - std::vector _workerThreads; -}; - -class TimeoutSyncSEP : public TimeoutSEP { -public: - enum Mode { kShouldTimeout, kNoTimeout }; - TimeoutSyncSEP(Mode mode) : _mode(mode) {} - - void startSession(transport::SessionHandle session) override { - LOGV2(23040, "Accepted connection", "remote"_attr = session->remote()); - startWorkerThread([this, session = std::move(session)]() mutable { - LOGV2(23041, "waiting for message"); - session->setTimeout(Milliseconds{500}); - auto status = session->sourceMessage().getStatus(); - if (_mode == kShouldTimeout) { - ASSERT_EQ(status, ErrorCodes::NetworkTimeout); - LOGV2(23042, "message timed out"); - } else { - ASSERT_OK(status); - LOGV2(23043, "message received okay"); - } + size_t numOpenSessions() const override { + return _sessions->size(); + } - session.reset(); - notifyComplete(); - }); + void setOnStartSession(std::function cb) { + _onStartSession = std::move(cb); } private: - Mode _mode; + void _join() { + LOGV2(6109513, "Joining all session threads"); + _sessions->clear(); + } + + std::function _onStartSession; + synchronized_value>> _sessions; }; -class TimeoutConnector { +/** + * Properly setting up and tearing down the MockSEP and TransportLayerASIO is + * tricky. Most tests can delegate the details to this TestFixture. + */ +class TestFixture { public: - TimeoutConnector(int port, bool sendRequest) - : _ctx(), _sock(_ctx), _endpoint(asio::ip::address_v4::loopback(), port) { - std::error_code ec; - _sock.connect(_endpoint, ec); - ASSERT_EQ(ec, std::error_code()); + TestFixture() : _tla{_makeTLA()} {} - if (sendRequest) { - sendMessage(); - } + ~TestFixture() { + _sep.endAllSessions({}); + _tla->shutdown(); } - void sendMessage() { - OpMsgBuilder builder; - builder.setBody(BSON("ping" << 1)); - Message msg = builder.finish(); - msg.header().setResponseToMsgId(0); - msg.header().setId(0); - OpMsg::appendChecksum(&msg); + MockSEP& sep() { + return _sep; + } - std::error_code ec; - asio::write(_sock, asio::buffer(msg.buf(), msg.size()), ec); - ASSERT_FALSE(ec); + transport::TransportLayerASIO& tla() { + return *_tla; } private: - asio::io_context _ctx; - asio::ip::tcp::socket _sock; - asio::ip::tcp::endpoint _endpoint; + std::unique_ptr _makeTLA() { + auto options = [] { + ServerGlobalParams params; + params.noUnixSocket = true; + transport::TransportLayerASIO::Options opts(¶ms); + // TODO SERVER-30212 should clean this up and assign a port from the supplied port range + // provided by resmoke. + opts.port = 0; + return opts; + }(); + auto tla = std::make_unique(options, &_sep); + ASSERT_OK(tla->setup()); + ASSERT_OK(tla->start()); + return tla; + } + + MockSEP _sep; + std::unique_ptr _tla; }; -/* check that timeouts actually time out */ -TEST(TransportLayerASIO, SourceSyncTimeoutTimesOut) { - TimeoutSyncSEP sep(TimeoutSyncSEP::kShouldTimeout); - auto tla = makeAndStartTL(&sep); +TEST(TransportLayerASIO, ListenerPortZeroTreatedAsEphemeral) { + Notification connected; + TestFixture tf; + tf.sep().setOnStartSession([&](auto&&) { connected.set(true); }); - TimeoutConnector connector(tla->listenerPort(), false); + int port = tf.tla().listenerPort(); + ASSERT_GT(port, 0); + LOGV2(6109514, "TransportLayerASIO listening", "port"_attr = port); - sep.waitForTimeout(); - tla->shutdown(); + ConnectionThread connectThread(port); + connected.get(); } -/* check that timeouts don't time out unless there's an actual timeout */ -TEST(TransportLayerASIO, SourceSyncTimeoutSucceeds) { - TimeoutSyncSEP sep(TimeoutSyncSEP::kNoTimeout); - auto tla = makeAndStartTL(&sep); - - TimeoutConnector connector(tla->listenerPort(), true); +TEST(TransportLayerASIO, TCPResetAfterConnectionIsSilentlySwallowed) { - sep.waitForTimeout(); - tla->shutdown(); -} + TestFixture tf; -/* check that switching from timeouts to no timeouts correctly resets the timeout to unlimited */ -class TimeoutSwitchModesSEP : public TimeoutSEP { -public: - void startSession(transport::SessionHandle session) override { - LOGV2(23044, "Accepted connection", "remote"_attr = session->remote()); - startWorkerThread([this, session = std::move(session)]() mutable { - LOGV2(23045, "waiting for message"); - auto sourceMessage = [&] { return session->sourceMessage().getStatus(); }; - - // the first message we source should time out. - session->setTimeout(Milliseconds{500}); - ASSERT_EQ(sourceMessage(), ErrorCodes::NetworkTimeout); - notifyComplete(); - - LOGV2(23046, "timed out successfully"); - - // get the session back in a known state with the timeout still in place - ASSERT_OK(sourceMessage()); - notifyComplete(); - - LOGV2(23047, "waiting for message without a timeout"); - - // this should block and timeout the waitForComplete mutex, and the session should wait - // for a while to make sure this isn't timing out and then send a message to unblock - // the this call to recv - session->setTimeout(boost::none); - ASSERT_OK(sourceMessage()); - - session.reset(); - notifyComplete(); - LOGV2(23048, "ending test"); - }); - } -}; + AtomicWord sessionsCreated{0}; + tf.sep().setOnStartSession([&](auto&&) { sessionsCreated.fetchAndAdd(1); }); -TEST(TransportLayerASIO, SwitchTimeoutModes) { - TimeoutSwitchModesSEP sep; - auto tla = makeAndStartTL(&sep); + LOGV2(6109515, "connecting"); + auto& fp = transport::transportLayerASIOhangBeforeAccept; + auto timesEntered = fp.setMode(FailPoint::alwaysOn); + ConnectionThread connectThread(tf.tla().listenerPort(), [&](ConnectionThread& conn) { + // Linger timeout = 0 causes a RST packet on close. + struct linger sl = {1, 0}; + if (setsockopt(conn.socket().rawFD(), + SOL_SOCKET, + SO_LINGER, + reinterpret_cast(&sl), + sizeof(sl)) != 0) { + auto err = make_error_code(std::errc{errno}); + LOGV2_ERROR(6109517, "setsockopt", "error"_attr = err.message()); + } + }); + fp.waitForTimesEntered(timesEntered + 1); - TimeoutConnector connector(tla->listenerPort(), false); + LOGV2(6109516, "closing"); + connectThread.close(); + fp.setMode(FailPoint::off); - ASSERT_TRUE(sep.waitForTimeout()); + ASSERT_EQ(sessionsCreated.load(), 0); +} - connector.sendMessage(); - ASSERT_TRUE(sep.waitForTimeout()); +/* check that timeouts actually time out */ +TEST(TransportLayerASIO, SourceSyncTimeoutTimesOut) { + TestFixture tf; + Notification> received; + tf.sep().setOnStartSession([&](SessionThread& st) { + st.session().setTimeout(Milliseconds{500}); + st.schedule([&](auto& session) { received.set(session.sourceMessage()); }); + }); + SyncClient conn(tf.tla().listenerPort()); + ASSERT_EQ(received.get().getStatus(), ErrorCodes::NetworkTimeout); +} - ASSERT_FALSE(sep.waitForTimeout(Milliseconds{1000})); - connector.sendMessage(); - ASSERT_TRUE(sep.waitForTimeout()); +/* check that timeouts don't time out unless there's an actual timeout */ +TEST(TransportLayerASIO, SourceSyncTimeoutSucceeds) { + TestFixture tf; + Notification> received; + tf.sep().setOnStartSession([&](SessionThread& st) { + st.session().setTimeout(Milliseconds{500}); + st.schedule([&](auto& session) { received.set(session.sourceMessage()); }); + }); + SyncClient conn(tf.tla().listenerPort()); + ping(conn); // This time we send a message + ASSERT_OK(received.get().getStatus()); +} - tla->shutdown(); +/** Switching from timeouts to no timeouts must reset the timeout to unlimited. */ +TEST(TransportLayerASIO, SwitchTimeoutModes) { + TestFixture tf; + Notification mockSessionCreated; + tf.sep().setOnStartSession([&](SessionThread& st) { mockSessionCreated.set(&st); }); + + SyncClient conn(tf.tla().listenerPort()); + auto& st = *mockSessionCreated.get(); + + { + LOGV2(6109525, "The first message we source should time out"); + Notification> done; + st.schedule([&](auto& session) { + session.setTimeout(Milliseconds{500}); + done.set(session.sourceMessage()); + }); + ASSERT_EQ(done.get().getStatus(), ErrorCodes::NetworkTimeout); + LOGV2(6109526, "timed out successfully"); + } + { + LOGV2(6109527, "Verify a message can be successfully received"); + Notification> done; + st.schedule([&](auto& session) { done.set(session.sourceMessage()); }); + ping(conn); + ASSERT_OK(done.get().getStatus()); + } + { + LOGV2(6109528, "Clear the timeout and verify reception of a late message."); + Notification> done; + st.schedule([&](auto& session) { + LOGV2(6109529, "waiting for message without a timeout"); + session.setTimeout({}); + done.set(session.sourceMessage()); + }); + sleepFor(Seconds{1}); + ping(conn); + ASSERT_OK(done.get().getStatus()); + } } } // namespace diff --git a/src/mongo/util/net/sock.cpp b/src/mongo/util/net/sock.cpp index 35776fe4c73..3a42470524a 100644 --- a/src/mongo/util/net/sock.cpp +++ b/src/mongo/util/net/sock.cpp @@ -299,13 +299,13 @@ SSLPeerInfo Socket::doSSLHandshake(const char* firstBytes, int len) { #endif -bool Socket::connect(SockAddr& remote) { +bool Socket::connect(const SockAddr& remote) { const Milliseconds connectTimeoutMillis(static_cast( _timeout > 0 ? std::min(kMaxConnectTimeoutMS, (_timeout * 1000)) : kMaxConnectTimeoutMS)); return connect(remote, connectTimeoutMillis); } -bool Socket::connect(SockAddr& remote, Milliseconds connectTimeoutMillis) { +bool Socket::connect(const SockAddr& remote, Milliseconds connectTimeoutMillis) { _remote = remote; _fd = ::socket(remote.getType(), SOCK_STREAM, 0); diff --git a/src/mongo/util/net/sock.h b/src/mongo/util/net/sock.h index 2548a46804c..7257e5e16f6 100644 --- a/src/mongo/util/net/sock.h +++ b/src/mongo/util/net/sock.h @@ -109,12 +109,12 @@ public: * an error, or due to a timeout on connection, or due to the system socket deciding the * socket is invalid. */ - bool connect(SockAddr& remote, Milliseconds connectTimeoutMillis); + bool connect(const SockAddr& remote, Milliseconds connectTimeoutMillis); /** * Connect using a default connect timeout of min(_timeout * 1000, kMaxConnectTimeoutMS) */ - bool connect(SockAddr& remote); + bool connect(const SockAddr& remote); void close(); void send(const char* data, int len, const char* context); -- cgit v1.2.1