diff options
author | Billy Donahue <billy.donahue@mongodb.com> | 2021-11-05 03:32:05 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-06-08 16:02:03 +0000 |
commit | 1e4c6f23678343729a953b849da530946ca6e100 (patch) | |
tree | 855b37656105e217046792c37c8105ef617793fb | |
parent | 224c25e0d3073978ae531c47ace605affae02664 (diff) | |
download | mongo-1e4c6f23678343729a953b849da530946ca6e100.tar.gz |
SERVER-61095 improvements to transport_layer_asio_test (again)
(cherry picked from commit 4dce840273127d4b2c86f8c790e081f19e44887a)
-rw-r--r-- | src/mongo/transport/transport_layer_asio.h | 9 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio_test.cpp | 503 | ||||
-rw-r--r-- | src/mongo/util/net/sock.cpp | 4 | ||||
-rw-r--r-- | src/mongo/util/net/sock.h | 4 |
4 files changed, 255 insertions, 265 deletions
diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h index f551dac5420..f11b0dbc274 100644 --- a/src/mongo/transport/transport_layer_asio.h +++ b/src/mongo/transport/transport_layer_asio.h @@ -69,13 +69,14 @@ class ServiceEntryPoint; namespace transport { -// This fail point simulates reads and writes that always return 1 byte and fail with EAGAIN +// Simulates reads and writes that always return 1 byte and fail with EAGAIN extern FailPoint transportLayerASIOshortOpportunisticReadWrite; -// This fail point will cause an asyncConnect to timeout after it's successfully connected -// to the remote peer +// Cause an asyncConnect to timeout after it's successfully connected to the remote peer extern FailPoint transportLayerASIOasyncConnectTimesOut; +extern FailPoint transportLayerASIOhangBeforeAccept; + /** * A TransportLayer implementation based on ASIO networking primitives. */ @@ -116,7 +117,7 @@ public: TransportLayerASIO(const Options& opts, ServiceEntryPoint* sep); - virtual ~TransportLayerASIO(); + ~TransportLayerASIO() override; StatusWith<SessionHandle> connect(HostAndPort peer, ConnectSSLMode sslMode, diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp index 660f14852f3..eafc0fc777b 100644 --- a/src/mongo/transport/transport_layer_asio_test.cpp +++ b/src/mongo/transport/transport_layer_asio_test.cpp @@ -28,157 +28,180 @@ */ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest -#include "mongo/platform/basic.h" - #include "mongo/transport/transport_layer_asio.h" +#include <queue> +#include <system_error> +#include <utility> +#include <vector> + +#include <asio.hpp> + #include "mongo/db/server_options.h" #include "mongo/logv2/log.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/platform/basic.h" #include "mongo/rpc/op_msg.h" #include "mongo/transport/service_entry_point.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" +#include "mongo/util/concurrency/notification.h" #include "mongo/util/net/sock.h" - -#include "asio.hpp" +#include "mongo/util/synchronized_value.h" +#include "mongo/util/time_support.h" namespace mongo { namespace { -class ServiceEntryPointUtil : public ServiceEntryPoint { +#ifdef _WIN32 +using SetsockoptPtr = char*; +#else +using SetsockoptPtr = void*; +#endif + +template <typename T> +class BlockingQueue { public: - void startSession(transport::SessionHandle session) override { - stdx::unique_lock<Latch> lk(_mutex); - _sessions.push_back(std::move(session)); - LOGV2(23032, "started session"); + void push(T t) { + stdx::unique_lock lk(_mu); + _q.push(std::move(t)); + lk.unlock(); _cv.notify_one(); } - void endAllSessions(transport::Session::TagMask tags) override { - LOGV2(23033, "end all sessions"); - std::vector<transport::SessionHandle> old_sessions; - { - stdx::unique_lock<Latch> lock(_mutex); - old_sessions.swap(_sessions); - } - old_sessions.clear(); - } - - Status start() override { - return Status::OK(); + T pop() { + stdx::unique_lock lk(_mu); + _cv.wait(lk, [&] { return !_q.empty(); }); + T r = std::move(_q.front()); + _q.pop(); + return r; } - bool shutdown(Milliseconds timeout) override { - return true; - } +private: + mutable Mutex _mu; + mutable stdx::condition_variable _cv; + std::queue<T> _q; +}; - void appendStats(BSONObjBuilder*) const override {} +class ConnectionThread { +public: + explicit ConnectionThread(int port) : ConnectionThread(port, nullptr) {} + ConnectionThread(int port, std::function<void(ConnectionThread&)> onConnect) + : _port{port}, _onConnect{std::move(onConnect)}, _thr{[this] { _run(); }} {} - size_t numOpenSessions() const override { - stdx::unique_lock<Latch> lock(_mutex); - return _sessions.size(); + ~ConnectionThread() { + LOGV2(6109500, "connection: Tx stop request"); + _stopRequest.set(true); + _thr.join(); + LOGV2(6109501, "connection: joined"); } - DbResponse handleRequest(OperationContext* opCtx, const Message& request) override { - MONGO_UNREACHABLE; + void close() { + _s.close(); } - void setTransportLayer(transport::TransportLayer* tl) { - _transport = tl; + Socket& socket() { + return _s; } - void waitForConnect() { - stdx::unique_lock<Latch> lock(_mutex); - _cv.wait(lock, [&] { return !_sessions.empty(); }); +private: + void _run() { + _s.connect(SockAddr("localhost", _port, AF_INET)); + LOGV2(6109502, "connected", "port"_attr = _port); + if (_onConnect) + _onConnect(*this); + _stopRequest.get(); + LOGV2(6109503, "connection: Rx stop request"); } -private: - mutable Mutex _mutex = MONGO_MAKE_LATCH("::_mutex"); - stdx::condition_variable _cv; - std::vector<transport::SessionHandle> _sessions; - transport::TransportLayer* _transport = nullptr; + int _port; + std::function<void(ConnectionThread&)> _onConnect; + stdx::thread _thr; + Socket _s; + Notification<bool> _stopRequest; }; -class SimpleConnectionThread { +class SyncClient { public: - explicit SimpleConnectionThread(int port) : _port(port) { - _thr = stdx::thread{[&] { - Socket s; - SockAddr sa{"localhost", _port, AF_INET}; - s.connect(sa); - LOGV2(23034, "connection: port {port}", "port"_attr = _port); - stdx::unique_lock<Latch> lk(_mutex); - _cv.wait(lk, [&] { return _stop; }); - LOGV2(23035, "connection: Rx stop request"); - }}; + explicit SyncClient(int port) { + std::error_code ec; + _sock.connect(asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port), ec); + ASSERT_EQ(ec, std::error_code()); + LOGV2(6109504, "sync client connected"); } - void stop() { - { - stdx::unique_lock<Latch> lk(_mutex); - _stop = true; - } - LOGV2(23036, "connection: Tx stop request"); - _cv.notify_one(); - _thr.join(); - LOGV2(23037, "connection: stopped"); + std::error_code write(const char* buf, size_t bufSize) { + std::error_code ec; + asio::write(_sock, asio::buffer(buf, bufSize), ec); + return ec; } private: - Mutex _mutex = MONGO_MAKE_LATCH("SimpleConnectionThread::_mutex"); - stdx::condition_variable _cv; - stdx::thread _thr; - bool _stop = false; - int _port; + asio::io_context _ctx{}; + asio::ip::tcp::socket _sock{_ctx}; }; -TEST(TransportLayerASIO, PortZeroConnect) { - ServiceEntryPointUtil sepu; +void ping(SyncClient& client) { + OpMsgBuilder builder; + builder.setBody(BSON("ping" << 1)); + Message msg = builder.finish(); + msg.header().setResponseToMsgId(0); + msg.header().setId(0); + OpMsg::appendChecksum(&msg); + ASSERT_EQ(client.write(msg.buf(), msg.size()), std::error_code{}); +} - auto options = [] { - ServerGlobalParams params; - params.noUnixSocket = true; - transport::TransportLayerASIO::Options opts(¶ms); +struct SessionThread { + struct StopException {}; - // TODO SERVER-30212 should clean this up and assign a port from the supplied port range - // provided by resmoke. - opts.port = 0; - return opts; - }(); + explicit SessionThread(std::shared_ptr<transport::Session> s) + : _session{std::move(s)}, _thread{[this] { run(); }} {} - transport::TransportLayerASIO tla(options, &sepu); - sepu.setTransportLayer(&tla); + ~SessionThread() { + _join(); + } - ASSERT_OK(tla.setup()); - ASSERT_OK(tla.start()); - int port = tla.listenerPort(); - ASSERT_GT(port, 0); - LOGV2(23038, "TransportLayerASIO.listenerPort() is {port}", "port"_attr = port); + void schedule(std::function<void(transport::Session&)> task) { + _tasks.push(std::move(task)); + } - SimpleConnectionThread connect_thread(port); - sepu.waitForConnect(); - connect_thread.stop(); - sepu.endAllSessions({}); - tla.shutdown(); -} + void run() { + while (true) { + try { + LOGV2(6109508, "SessionThread: pop and execute a task"); + _tasks.pop()(*_session); + } catch (const StopException&) { + LOGV2(6109509, "SessionThread: stopping"); + return; + } + } + } -class TimeoutSEP : public ServiceEntryPoint { -public: - ~TimeoutSEP() override { - // This should shutdown immediately, so give the maximum timeout - shutdown(Milliseconds::max()); + transport::Session& session() const { + return *_session; } - void endAllSessions(transport::Session::TagMask tags) override { - MONGO_UNREACHABLE; +private: + void _join() { + if (!_thread.joinable()) + return; + schedule([](auto&&) { throw StopException{}; }); + _thread.join(); } - bool shutdown(Milliseconds timeout) override { - LOGV2(23039, "Joining all worker threads"); - for (auto& thread : _workerThreads) { - thread.join(); - } - return true; + std::shared_ptr<transport::Session> _session; + stdx::thread _thread; + BlockingQueue<std::function<void(transport::Session&)>> _tasks; +}; + +class MockSEP : public ServiceEntryPoint { +public: + MockSEP() = default; + explicit MockSEP(std::function<void(SessionThread&)> onStartSession) + : _onStartSession(std::move(onStartSession)) {} + + ~MockSEP() override { + _join(); } Status start() override { @@ -187,201 +210,167 @@ public: void appendStats(BSONObjBuilder*) const override {} - size_t numOpenSessions() const override { - return 0; - } - DbResponse handleRequest(OperationContext* opCtx, const Message& request) override { MONGO_UNREACHABLE; } - bool waitForTimeout(boost::optional<Milliseconds> timeout = boost::none) { - stdx::unique_lock<Latch> lk(_mutex); - bool ret = true; - if (timeout) { - ret = _cond.wait_for(lk, timeout->toSystemDuration(), [this] { return _finished; }); - } else { - _cond.wait(lk, [this] { return _finished; }); - } - - _finished = false; - return ret; + void startSession(std::shared_ptr<transport::Session> session) override { + LOGV2(6109510, "Accepted connection", "remote"_attr = session->remote()); + auto& newSession = [&]() -> SessionThread& { + auto vec = *_sessions; + vec->push_back(std::make_unique<SessionThread>(std::move(session))); + return *vec->back(); + }(); + if (_onStartSession) + _onStartSession(newSession); + LOGV2(6109511, "started session"); } -protected: - void notifyComplete() { - stdx::unique_lock<Latch> lk(_mutex); - _finished = true; - _cond.notify_one(); + void endAllSessions(transport::Session::TagMask tags) override { + _join(); } - template <typename FunT> - void startWorkerThread(FunT&& fun) { - _workerThreads.emplace_back(std::forward<FunT>(fun)); + bool shutdown(Milliseconds timeout) override { + _join(); + return true; } -private: - Mutex _mutex = MONGO_MAKE_LATCH("TimeoutSEP::_mutex"); - - stdx::condition_variable _cond; - bool _finished = false; - - std::vector<stdx::thread> _workerThreads; -}; - -class TimeoutSyncSEP : public TimeoutSEP { -public: - enum Mode { kShouldTimeout, kNoTimeout }; - TimeoutSyncSEP(Mode mode) : _mode(mode) {} - - void startSession(transport::SessionHandle session) override { - LOGV2(23040, - "Accepted connection from {session_remote}", - "session_remote"_attr = session->remote()); - startWorkerThread([this, session = std::move(session)]() mutable { - LOGV2(23041, "waiting for message"); - session->setTimeout(Milliseconds{500}); - auto status = session->sourceMessage().getStatus(); - if (_mode == kShouldTimeout) { - ASSERT_EQ(status, ErrorCodes::NetworkTimeout); - LOGV2(23042, "message timed out"); - } else { - ASSERT_OK(status); - LOGV2(23043, "message received okay"); - } + size_t numOpenSessions() const override { + MONGO_UNREACHABLE; + } - session.reset(); - notifyComplete(); - }); + void setOnStartSession(std::function<void(SessionThread&)> cb) { + _onStartSession = std::move(cb); } private: - Mode _mode; + void _join() { + LOGV2(6109513, "Joining all session threads"); + _sessions->clear(); + } + + std::function<void(SessionThread&)> _onStartSession; + synchronized_value<std::vector<std::unique_ptr<SessionThread>>> _sessions; }; -class TimeoutConnector { +/** + * Properly setting up and tearing down the MockSEP and TransportLayerASIO is + * tricky. Most tests can delegate the details to this TestFixture. + */ +class TestFixture { public: - TimeoutConnector(int port, bool sendRequest) - : _ctx(), _sock(_ctx), _endpoint(asio::ip::address_v4::loopback(), port) { - std::error_code ec; - _sock.connect(_endpoint, ec); - ASSERT_EQ(ec, std::error_code()); + TestFixture() : _tla{_makeTLA()} {} - if (sendRequest) { - sendMessage(); - } + ~TestFixture() { + _sep.endAllSessions({}); + _tla->shutdown(); } - void sendMessage() { - OpMsgBuilder builder; - builder.setBody(BSON("ping" << 1)); - Message msg = builder.finish(); - msg.header().setResponseToMsgId(0); - msg.header().setId(0); - OpMsg::appendChecksum(&msg); + MockSEP& sep() { + return _sep; + } - std::error_code ec; - asio::write(_sock, asio::buffer(msg.buf(), msg.size()), ec); - ASSERT_FALSE(ec); + transport::TransportLayerASIO& tla() { + return *_tla; } private: - asio::io_context _ctx; - asio::ip::tcp::socket _sock; - asio::ip::tcp::endpoint _endpoint; + std::unique_ptr<transport::TransportLayerASIO> _makeTLA() { + auto options = [] { + ServerGlobalParams params; + params.noUnixSocket = true; + transport::TransportLayerASIO::Options opts(¶ms); + // TODO SERVER-30212 should clean this up and assign a port from the supplied port range + // provided by resmoke. + opts.port = 0; + return opts; + }(); + auto tla = std::make_unique<transport::TransportLayerASIO>(options, &_sep); + ASSERT_OK(tla->setup()); + ASSERT_OK(tla->start()); + return tla; + } + + MockSEP _sep; + std::unique_ptr<transport::TransportLayerASIO> _tla; }; -std::unique_ptr<transport::TransportLayerASIO> makeAndStartTL(ServiceEntryPoint* sep) { - auto options = [] { - ServerGlobalParams params; - params.noUnixSocket = true; - transport::TransportLayerASIO::Options opts(¶ms); - opts.port = 0; - return opts; - }(); +TEST(TransportLayerASIO, ListenerPortZeroTreatedAsEphemeral) { + Notification<bool> connected; + TestFixture tf; + tf.sep().setOnStartSession([&](auto&&) { connected.set(true); }); - auto tla = std::make_unique<transport::TransportLayerASIO>(options, sep); - ASSERT_OK(tla->setup()); - ASSERT_OK(tla->start()); + int port = tf.tla().listenerPort(); + ASSERT_GT(port, 0); + LOGV2(6109514, "TransportLayerASIO listening", "port"_attr = port); - return tla; + ConnectionThread connectThread(port); + connected.get(); } /* check that timeouts actually time out */ TEST(TransportLayerASIO, SourceSyncTimeoutTimesOut) { - TimeoutSyncSEP sep(TimeoutSyncSEP::kShouldTimeout); - auto tla = makeAndStartTL(&sep); - - TimeoutConnector connector(tla->listenerPort(), false); - - sep.waitForTimeout(); - tla->shutdown(); + TestFixture tf; + Notification<StatusWith<Message>> received; + tf.sep().setOnStartSession([&](SessionThread& st) { + st.session().setTimeout(Milliseconds{500}); + st.schedule([&](auto& session) { received.set(session.sourceMessage()); }); + }); + SyncClient conn(tf.tla().listenerPort()); + ASSERT_EQ(received.get().getStatus(), ErrorCodes::NetworkTimeout); } /* check that timeouts don't time out unless there's an actual timeout */ TEST(TransportLayerASIO, SourceSyncTimeoutSucceeds) { - TimeoutSyncSEP sep(TimeoutSyncSEP::kNoTimeout); - auto tla = makeAndStartTL(&sep); - - TimeoutConnector connector(tla->listenerPort(), true); - - sep.waitForTimeout(); - tla->shutdown(); + TestFixture tf; + Notification<StatusWith<Message>> received; + tf.sep().setOnStartSession([&](SessionThread& st) { + st.session().setTimeout(Milliseconds{500}); + st.schedule([&](auto& session) { received.set(session.sourceMessage()); }); + }); + SyncClient conn(tf.tla().listenerPort()); + ping(conn); // This time we send a message + ASSERT_OK(received.get().getStatus()); } -/* check that switching from timeouts to no timeouts correctly resets the timeout to unlimited */ -class TimeoutSwitchModesSEP : public TimeoutSEP { -public: - void startSession(transport::SessionHandle session) override { - LOGV2(23044, - "Accepted connection from {session_remote}", - "session_remote"_attr = session->remote()); - startWorkerThread([this, session = std::move(session)]() mutable { - LOGV2(23045, "waiting for message"); - auto sourceMessage = [&] { return session->sourceMessage().getStatus(); }; - - // the first message we source should time out. - session->setTimeout(Milliseconds{500}); - ASSERT_EQ(sourceMessage(), ErrorCodes::NetworkTimeout); - notifyComplete(); - - LOGV2(23046, "timed out successfully"); - - // get the session back in a known state with the timeout still in place - ASSERT_OK(sourceMessage()); - notifyComplete(); - - LOGV2(23047, "waiting for message without a timeout"); - - // this should block and timeout the waitForComplete mutex, and the session should wait - // for a while to make sure this isn't timing out and then send a message to unblock - // the this call to recv - session->setTimeout(boost::none); - ASSERT_OK(sourceMessage()); - - session.reset(); - notifyComplete(); - LOGV2(23048, "ending test"); +/** Switching from timeouts to no timeouts must reset the timeout to unlimited. */ +TEST(TransportLayerASIO, SwitchTimeoutModes) { + TestFixture tf; + Notification<SessionThread*> mockSessionCreated; + tf.sep().setOnStartSession([&](SessionThread& st) { mockSessionCreated.set(&st); }); + + SyncClient conn(tf.tla().listenerPort()); + auto& st = *mockSessionCreated.get(); + + { + LOGV2(6109525, "The first message we source should time out"); + Notification<StatusWith<Message>> done; + st.schedule([&](auto& session) { + session.setTimeout(Milliseconds{500}); + done.set(session.sourceMessage()); }); + ASSERT_EQ(done.get().getStatus(), ErrorCodes::NetworkTimeout); + LOGV2(6109526, "timed out successfully"); + } + { + LOGV2(6109527, "Verify a message can be successfully received"); + Notification<StatusWith<Message>> done; + st.schedule([&](auto& session) { done.set(session.sourceMessage()); }); + ping(conn); + ASSERT_OK(done.get().getStatus()); + } + { + LOGV2(6109528, "Clear the timeout and verify reception of a late message."); + Notification<StatusWith<Message>> done; + st.schedule([&](auto& session) { + LOGV2(6109529, "waiting for message without a timeout"); + session.setTimeout({}); + done.set(session.sourceMessage()); + }); + sleepFor(Seconds{1}); + ping(conn); + ASSERT_OK(done.get().getStatus()); } -}; - -TEST(TransportLayerASIO, SwitchTimeoutModes) { - TimeoutSwitchModesSEP sep; - auto tla = makeAndStartTL(&sep); - - TimeoutConnector connector(tla->listenerPort(), false); - - ASSERT_TRUE(sep.waitForTimeout()); - - connector.sendMessage(); - ASSERT_TRUE(sep.waitForTimeout()); - - ASSERT_FALSE(sep.waitForTimeout(Milliseconds{1000})); - connector.sendMessage(); - ASSERT_TRUE(sep.waitForTimeout()); - - tla->shutdown(); } } // namespace diff --git a/src/mongo/util/net/sock.cpp b/src/mongo/util/net/sock.cpp index 206e0b65e22..89b7b89c67b 100644 --- a/src/mongo/util/net/sock.cpp +++ b/src/mongo/util/net/sock.cpp @@ -297,13 +297,13 @@ SSLPeerInfo Socket::doSSLHandshake(const char* firstBytes, int len) { #endif -bool Socket::connect(SockAddr& remote) { +bool Socket::connect(const SockAddr& remote) { const Milliseconds connectTimeoutMillis(static_cast<int64_t>( _timeout > 0 ? std::min(kMaxConnectTimeoutMS, (_timeout * 1000)) : kMaxConnectTimeoutMS)); return connect(remote, connectTimeoutMillis); } -bool Socket::connect(SockAddr& remote, Milliseconds connectTimeoutMillis) { +bool Socket::connect(const SockAddr& remote, Milliseconds connectTimeoutMillis) { _remote = remote; _fd = ::socket(remote.getType(), SOCK_STREAM, 0); diff --git a/src/mongo/util/net/sock.h b/src/mongo/util/net/sock.h index acb059bd5b3..b0b6de3f69b 100644 --- a/src/mongo/util/net/sock.h +++ b/src/mongo/util/net/sock.h @@ -109,12 +109,12 @@ public: * an error, or due to a timeout on connection, or due to the system socket deciding the * socket is invalid. */ - bool connect(SockAddr& remote, Milliseconds connectTimeoutMillis); + bool connect(const SockAddr& remote, Milliseconds connectTimeoutMillis); /** * Connect using a default connect timeout of min(_timeout * 1000, kMaxConnectTimeoutMS) */ - bool connect(SockAddr& remote); + bool connect(const SockAddr& remote); void close(); void send(const char* data, int len, const char* context); |