diff options
author | Billy Donahue <billy.donahue@mongodb.com> | 2021-11-02 16:51:18 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-11-02 18:07:22 +0000 |
commit | 385b7a54f0e44807611321dacabda314a5a527b1 (patch) | |
tree | 25281d132331e904f548b6b53f2b73b662498995 | |
parent | 51a20c4844e082e9167e300dbb5f005e875c1053 (diff) | |
download | mongo-385b7a54f0e44807611321dacabda314a5a527b1.tar.gz |
SERVER-61095 improvements to transport_layer_asio_test
-rw-r--r-- | src/mongo/transport/transport_layer_asio.h | 9 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio_test.cpp | 544 | ||||
-rw-r--r-- | src/mongo/util/net/sock.cpp | 4 | ||||
-rw-r--r-- | src/mongo/util/net/sock.h | 4 |
4 files changed, 285 insertions, 276 deletions
diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h index 665d0411167..8c48b3f6ec7 100644 --- a/src/mongo/transport/transport_layer_asio.h +++ b/src/mongo/transport/transport_layer_asio.h @@ -68,13 +68,14 @@ class ServiceEntryPoint; namespace transport { -// This fail point simulates reads and writes that always return 1 byte and fail with EAGAIN +// Simulates reads and writes that always return 1 byte and fail with EAGAIN extern FailPoint transportLayerASIOshortOpportunisticReadWrite; -// This fail point will cause an asyncConnect to timeout after it's successfully connected -// to the remote peer +// Cause an asyncConnect to timeout after it's successfully connected to the remote peer extern FailPoint transportLayerASIOasyncConnectTimesOut; +extern FailPoint transportLayerASIOhangBeforeAccept; + /** * A TransportLayer implementation based on ASIO networking primitives. */ @@ -115,7 +116,7 @@ public: ServiceEntryPoint* sep, const WireSpec& wireSpec = WireSpec::instance()); - virtual ~TransportLayerASIO(); + ~TransportLayerASIO() override; StatusWith<SessionHandle> connect(HostAndPort peer, ConnectSSLMode sslMode, diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp index 7bfc2e3f5c4..df3124ca5ee 100644 --- a/src/mongo/transport/transport_layer_asio_test.cpp +++ b/src/mongo/transport/transport_layer_asio_test.cpp @@ -28,379 +28,387 @@ */ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest -#include "mongo/platform/basic.h" - #include "mongo/transport/transport_layer_asio.h" +#include <queue> +#include <utility> +#include <vector> + +#include <asio.hpp> + #include "mongo/db/server_options.h" #include "mongo/logv2/log.h" +#include "mongo/platform/basic.h" #include "mongo/rpc/op_msg.h" #include "mongo/transport/service_entry_point.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" +#include "mongo/util/concurrency/notification.h" #include "mongo/util/net/sock.h" - -#include "asio.hpp" +#include "mongo/util/synchronized_value.h" +#include "mongo/util/time_support.h" namespace mongo { namespace { -class ServiceEntryPointUtil : public ServiceEntryPoint { +#ifdef _WIN32 +using SetsockoptPtr = char*; +#else +using SetsockoptPtr = void*; +#endif + +template <typename T> +class BlockingQueue { public: - void startSession(transport::SessionHandle session) override { - stdx::unique_lock<Latch> lk(_mutex); - _sessions.push_back(std::move(session)); - LOGV2(2303201, "started session"); + void push(T t) { + stdx::unique_lock lk(_mu); + _q.push(std::move(t)); + lk.unlock(); _cv.notify_one(); } - void endAllSessions(transport::Session::TagMask tags) override { - LOGV2(2303301, "end all sessions"); - std::vector<transport::SessionHandle> old_sessions; - { - stdx::unique_lock<Latch> lock(_mutex); - old_sessions.swap(_sessions); - } - old_sessions.clear(); - } - - Status start() override { - return Status::OK(); - } - - bool shutdown(Milliseconds timeout) override { - return true; - } - - void appendStats(BSONObjBuilder*) const override {} - - size_t numOpenSessions() const override { - stdx::unique_lock<Latch> lock(_mutex); - return _sessions.size(); - } - - Future<DbResponse> handleRequest(OperationContext* opCtx, - const Message& request) noexcept override { - MONGO_UNREACHABLE; - } - - void setTransportLayer(transport::TransportLayer* tl) { - _transport = tl; - } - - void waitForConnect() { - stdx::unique_lock<Latch> lock(_mutex); - _cv.wait(lock, [&] { return !_sessions.empty(); }); + T pop() { + stdx::unique_lock lk(_mu); + _cv.wait(lk, [&] { return !_q.empty(); }); + T r = std::move(_q.front()); + _q.pop(); + return r; } private: - mutable Mutex _mutex = MONGO_MAKE_LATCH("::_mutex"); - stdx::condition_variable _cv; - std::vector<transport::SessionHandle> _sessions; - transport::TransportLayer* _transport = nullptr; + mutable Mutex _mu; + mutable stdx::condition_variable _cv; + std::queue<T> _q; }; -class SimpleConnectionThread { +class ConnectionThread { public: - explicit SimpleConnectionThread(int port) : _port(port) { - _thr = stdx::thread{[&] { - auto sa = SockAddr::create("localhost", _port, AF_INET); - _s.connect(sa); - LOGV2(23034, "connection: port {port}", "port"_attr = _port); - stdx::unique_lock<Latch> lk(_mutex); - _cv.wait(lk, [&] { return _stop; }); - LOGV2(23035, "connection: Rx stop request"); - }}; + explicit ConnectionThread(int port) : _port{port}, _thr{[this] { run(); }} {} + + ~ConnectionThread() { + if (!_stop) + stop(); } - void forceCloseSocket() { - // Setting linger on to a zero-value timeout causes the socket to send an RST packet to the - // recipient side when closing the connection. - struct linger sl = {1, 0}; -#ifdef _WIN32 - char* pval = reinterpret_cast<char*>(&sl); -#else - void* pval = &sl; -#endif - ASSERT(!setsockopt(_s.rawFD(), SOL_SOCKET, SO_LINGER, pval, sizeof(sl))); + void close() { _s.close(); } void stop() { - { - stdx::unique_lock<Latch> lk(_mutex); - _stop = true; - } - LOGV2(23036, "connection: Tx stop request"); - _cv.notify_one(); + LOGV2(6109500, "connection: Tx stop request"); + _stop.set(true); _thr.join(); - LOGV2(23037, "connection: stopped"); + LOGV2(6109501, "connection: joined"); + } + +protected: + Socket& socket() { + return _s; } private: - Mutex _mutex = MONGO_MAKE_LATCH("SimpleConnectionThread::_mutex"); - stdx::condition_variable _cv; - stdx::thread _thr; - bool _stop = false; + virtual void onConnect() {} + + void run() { + _s.connect(SockAddr::create("localhost", _port, AF_INET)); + LOGV2(6109502, "connection: port {port}", "port"_attr = _port); + onConnect(); + _stop.get(); + LOGV2(6109503, "connection: Rx stop request"); + } - Socket _s; int _port; + stdx::thread _thr; + Socket _s; + Notification<bool> _stop; }; -std::unique_ptr<transport::TransportLayerASIO> makeAndStartTL(ServiceEntryPoint* sep) { - auto options = [] { - ServerGlobalParams params; - params.noUnixSocket = true; - transport::TransportLayerASIO::Options opts(¶ms); +class SyncClient { +public: + explicit SyncClient(int port) { + std::error_code ec; + _sock.connect(asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port), ec); + ASSERT_EQ(ec, std::error_code()); + LOGV2(6109504, "sync client connected"); + } - // TODO SERVER-30212 should clean this up and assign a port from the supplied port range - // provided by resmoke. - opts.port = 0; - return opts; - }(); + std::error_code write(const char* buf, size_t bufSize) { + std::error_code ec; + asio::write(_sock, asio::buffer(buf, bufSize), ec); + return ec; + } - auto tla = std::make_unique<transport::TransportLayerASIO>(options, sep); - ASSERT_OK(tla->setup()); - ASSERT_OK(tla->start()); +private: + asio::io_context _ctx{}; + asio::ip::tcp::socket _sock{_ctx}; +}; - return tla; +void ping(SyncClient& client) { + OpMsgBuilder builder; + builder.setBody(BSON("ping" << 1)); + Message msg = builder.finish(); + msg.header().setResponseToMsgId(0); + msg.header().setId(0); + OpMsg::appendChecksum(&msg); + ASSERT_FALSE(client.write(msg.buf(), msg.size())); } -TEST(TransportLayerASIO, PortZeroConnect) { - ServiceEntryPointUtil sepu; - auto tla = makeAndStartTL(&sepu); +class MockSEP : public ServiceEntryPoint { +public: + struct StopException {}; - int port = tla->listenerPort(); - ASSERT_GT(port, 0); - LOGV2(23038, "TransportLayerASIO.listenerPort() is {port}", "port"_attr = port); + struct Session { + using Task = std::function<void(Session&)>; - SimpleConnectionThread connect_thread(port); - sepu.waitForConnect(); + ~Session() { + stop(); + } - ASSERT_EQ(sepu.numOpenSessions(), 1); - connect_thread.stop(); - sepu.endAllSessions({}); - tla->shutdown(); -} + void schedule(Task task) { + LOGV2(6109505, "scheduling task"); + tasks.push(std::move(task)); + } -TEST(TransportLayerASIO, TCPResetAfterConnectionIsSilentlySwallowed) { - ServiceEntryPointUtil sepu; - auto tla = makeAndStartTL(&sepu); + void start() { + thread = stdx::thread([this] { run(); }); + } - auto hangBeforeAcceptFp = globalFailPointRegistry().find("transportLayerASIOhangBeforeAccept"); - auto timesEntered = hangBeforeAcceptFp->setMode(FailPoint::alwaysOn); + void stop() { + if (thread.joinable()) { + schedule([](auto&&) { throw StopException{}; }); + thread.join(); + } + } - SimpleConnectionThread connect_thread(tla->listenerPort()); - hangBeforeAcceptFp->waitForTimesEntered(timesEntered + 1); + void run() { + LOGV2(6109506, "doSession"); + while (true) { + LOGV2(6109507, "polling for work"); + try { + LOGV2(6109508, "running a session task"); + tasks.pop()(*this); + } catch (const StopException&) { + LOGV2(6109509, "caught StopException"); + return; + } + } + } - connect_thread.forceCloseSocket(); - hangBeforeAcceptFp->setMode(FailPoint::off); + std::shared_ptr<transport::Session> session; + stdx::thread thread; + BlockingQueue<Task> tasks; + }; + using Task = Session::Task; - ASSERT_EQ(sepu.numOpenSessions(), 0); - connect_thread.stop(); - tla->shutdown(); -} + MockSEP() = default; + explicit MockSEP(std::function<void(Session&)> onStartSession) + : _onStartSession(std::move(onStartSession)) {} -class TimeoutSEP : public ServiceEntryPoint { -public: - ~TimeoutSEP() override { + ~MockSEP() override { + for (auto& s : **_sessions) + s->stop(); // This should shutdown immediately, so give the maximum timeout shutdown(Milliseconds::max()); } - void endAllSessions(transport::Session::TagMask tags) override { - MONGO_UNREACHABLE; - } - - bool shutdown(Milliseconds timeout) override { - LOGV2(23039, "Joining all worker threads"); - for (auto& thread : _workerThreads) { - thread.join(); - } - return true; - } - Status start() override { return Status::OK(); } void appendStats(BSONObjBuilder*) const override {} - size_t numOpenSessions() const override { - return 0; - } - Future<DbResponse> handleRequest(OperationContext* opCtx, const Message& request) noexcept override { MONGO_UNREACHABLE; } - bool waitForTimeout(boost::optional<Milliseconds> timeout = boost::none) { - stdx::unique_lock<Latch> lk(_mutex); - bool ret = true; - if (timeout) { - ret = _cond.wait_for(lk, timeout->toSystemDuration(), [this] { return _finished; }); - } else { - _cond.wait(lk, [this] { return _finished; }); - } - - _finished = false; - return ret; + void startSession(std::shared_ptr<transport::Session> session) override { + LOGV2(6109510, "Accepted connection", "remote"_attr = session->remote()); + Session& newSession = *_sessions->emplace_back(new Session{std::move(session)}); + if (_onStartSession) + _onStartSession(newSession); + LOGV2(6109511, "started session"); } -protected: - void notifyComplete() { - stdx::unique_lock<Latch> lk(_mutex); - _finished = true; - _cond.notify_one(); + void endAllSessions(transport::Session::TagMask tags) override { + LOGV2(6109512, "end all sessions"); + _sessions->clear(); } - template <typename FunT> - void startWorkerThread(FunT&& fun) { - _workerThreads.emplace_back(std::forward<FunT>(fun)); + bool shutdown(Milliseconds timeout) override { + LOGV2(6109513, "Joining all worker threads"); + std::exchange(**_sessions, {}); + return true; } -private: - Mutex _mutex = MONGO_MAKE_LATCH("TimeoutSEP::_mutex"); + size_t numOpenSessions() const override { + return _sessions->size(); + } - stdx::condition_variable _cond; - bool _finished = false; + void mockOnStartSession(std::function<void(Session&)> cb) { + _onStartSession = std::move(cb); + } - std::vector<stdx::thread> _workerThreads; +private: + std::function<void(Session&)> _onStartSession; + synchronized_value<std::vector<std::unique_ptr<Session>>> _sessions; }; -class TimeoutSyncSEP : public TimeoutSEP { -public: - enum Mode { kShouldTimeout, kNoTimeout }; - TimeoutSyncSEP(Mode mode) : _mode(mode) {} - - void startSession(transport::SessionHandle session) override { - LOGV2(23040, "Accepted connection", "remote"_attr = session->remote()); - startWorkerThread([this, session = std::move(session)]() mutable { - LOGV2(23041, "waiting for message"); - session->setTimeout(Milliseconds{500}); - auto status = session->sourceMessage().getStatus(); - if (_mode == kShouldTimeout) { - ASSERT_EQ(status, ErrorCodes::NetworkTimeout); - LOGV2(23042, "message timed out"); - } else { - ASSERT_OK(status); - LOGV2(23043, "message received okay"); - } +std::unique_ptr<transport::TransportLayerASIO> makeAndStartTL(ServiceEntryPoint* sep) { + auto options = [] { + ServerGlobalParams params; + params.noUnixSocket = true; + transport::TransportLayerASIO::Options opts(¶ms); + // TODO SERVER-30212 should clean this up and assign a port from the supplied port range + // provided by resmoke. + opts.port = 0; + return opts; + }(); + auto tla = std::make_unique<transport::TransportLayerASIO>(options, sep); + ASSERT_OK(tla->setup()); + ASSERT_OK(tla->start()); + return tla; +} - session.reset(); - notifyComplete(); - }); - } +TEST(TransportLayerASIO, ListenerPortZeroTreatedAsEphemeral) { + Notification<bool> connected; + MockSEP sep; + sep.mockOnStartSession([&](auto&&) { connected.set(true); }); + auto tla = makeAndStartTL(&sep); -private: - Mode _mode; -}; + int port = tla->listenerPort(); + ASSERT_GT(port, 0); + LOGV2(6109514, "TransportLayerASIO listening", "port"_attr = port); -class TimeoutConnector { -public: - TimeoutConnector(int port, bool sendRequest) - : _ctx(), _sock(_ctx), _endpoint(asio::ip::address_v4::loopback(), port) { - std::error_code ec; - _sock.connect(_endpoint, ec); - ASSERT_EQ(ec, std::error_code()); + ConnectionThread connectThread(port); + connected.get(); - if (sendRequest) { - sendMessage(); + ASSERT_EQ(sep.numOpenSessions(), 1); + connectThread.stop(); + tla->shutdown(); +} + +TEST(TransportLayerASIO, TCPResetAfterConnectionIsSilentlySwallowed) { + class ResettingConnectionThread : public ConnectionThread { + using ConnectionThread::ConnectionThread; + void onConnect() override { + // Linger timeout = 0 causes a RST packet on close. + struct linger sl = {1, 0}; + ASSERT(!setsockopt(socket().rawFD(), + SOL_SOCKET, + SO_LINGER, + reinterpret_cast<SetsockoptPtr>(&sl), + sizeof(sl))); } - } + }; - void sendMessage() { - OpMsgBuilder builder; - builder.setBody(BSON("ping" << 1)); - Message msg = builder.finish(); - msg.header().setResponseToMsgId(0); - msg.header().setId(0); - OpMsg::appendChecksum(&msg); + MockSEP sep; + auto tla = makeAndStartTL(&sep); - std::error_code ec; - asio::write(_sock, asio::buffer(msg.buf(), msg.size()), ec); - ASSERT_FALSE(ec); - } + auto& fp = transport::transportLayerASIOhangBeforeAccept; + auto timesEntered = fp.setMode(FailPoint::alwaysOn); -private: - asio::io_context _ctx; - asio::ip::tcp::socket _sock; - asio::ip::tcp::endpoint _endpoint; -}; + LOGV2(6109515, "connecting"); + + ResettingConnectionThread connectThread(tla->listenerPort()); + fp.waitForTimesEntered(timesEntered + 1); + + LOGV2(6109516, "closing"); + connectThread.close(); + fp.setMode(FailPoint::off); + + LOGV2(6109517, "asserting"); + ASSERT_EQ(sep.numOpenSessions(), 0); + LOGV2(6109518, "past assert"); + connectThread.stop(); + tla->shutdown(); +} /* check that timeouts actually time out */ TEST(TransportLayerASIO, SourceSyncTimeoutTimesOut) { - TimeoutSyncSEP sep(TimeoutSyncSEP::kShouldTimeout); + Notification<StatusWith<Message>> received; + MockSEP sep; + sep.mockOnStartSession([&](MockSEP::Session& session) { + LOGV2(6109519, "setting timeout"); + session.session->setTimeout(Milliseconds{500}); + session.start(); + LOGV2(6109520, "waiting for message"); + session.schedule([&](MockSEP::Session& session) { + received.set(session.session->sourceMessage()); + LOGV2(6109521, "message receive op resolved"); + }); + }); auto tla = makeAndStartTL(&sep); + SyncClient conn(tla->listenerPort()); - TimeoutConnector connector(tla->listenerPort(), false); + LOGV2(6109522, "scheduled"); - sep.waitForTimeout(); + ASSERT_EQ(received.get().getStatus(), ErrorCodes::NetworkTimeout); + LOGV2(6109523, "received something"); tla->shutdown(); } /* check that timeouts don't time out unless there's an actual timeout */ TEST(TransportLayerASIO, SourceSyncTimeoutSucceeds) { - TimeoutSyncSEP sep(TimeoutSyncSEP::kNoTimeout); + MockSEP sep; + Notification<StatusWith<Message>> received; + sep.mockOnStartSession([&](MockSEP::Session& s) { + s.session->setTimeout(Milliseconds{500}); + s.start(); + s.schedule([&](auto&) { received.set(s.session->sourceMessage()); }); + }); auto tla = makeAndStartTL(&sep); + SyncClient conn(tla->listenerPort()); - TimeoutConnector connector(tla->listenerPort(), true); - - sep.waitForTimeout(); + ping(conn); // This time we send a message + ASSERT_OK(received.get().getStatus()); + LOGV2(6109524, "received something"); tla->shutdown(); } -/* check that switching from timeouts to no timeouts correctly resets the timeout to unlimited */ -class TimeoutSwitchModesSEP : public TimeoutSEP { -public: - void startSession(transport::SessionHandle session) override { - LOGV2(23044, "Accepted connection", "remote"_attr = session->remote()); - startWorkerThread([this, session = std::move(session)]() mutable { - LOGV2(23045, "waiting for message"); - auto sourceMessage = [&] { return session->sourceMessage().getStatus(); }; - - // the first message we source should time out. - session->setTimeout(Milliseconds{500}); - ASSERT_EQ(sourceMessage(), ErrorCodes::NetworkTimeout); - notifyComplete(); - - LOGV2(23046, "timed out successfully"); - - // get the session back in a known state with the timeout still in place - ASSERT_OK(sourceMessage()); - notifyComplete(); - - LOGV2(23047, "waiting for message without a timeout"); - - // this should block and timeout the waitForComplete mutex, and the session should wait - // for a while to make sure this isn't timing out and then send a message to unblock - // the this call to recv - session->setTimeout(boost::none); - ASSERT_OK(sourceMessage()); - - session.reset(); - notifyComplete(); - LOGV2(23048, "ending test"); - }); - } -}; - +/** Switching from timeouts to no timeouts must reset the timeout to unlimited. */ TEST(TransportLayerASIO, SwitchTimeoutModes) { - TimeoutSwitchModesSEP sep; + MockSEP sep; + Notification<MockSEP::Session*> mockSessionCreated; + sep.mockOnStartSession([&](MockSEP::Session& s) { + s.start(); + mockSessionCreated.set(&s); + }); auto tla = makeAndStartTL(&sep); - TimeoutConnector connector(tla->listenerPort(), false); - - ASSERT_TRUE(sep.waitForTimeout()); - - connector.sendMessage(); - ASSERT_TRUE(sep.waitForTimeout()); + SyncClient conn(tla->listenerPort()); - ASSERT_FALSE(sep.waitForTimeout(Milliseconds{1000})); - connector.sendMessage(); - ASSERT_TRUE(sep.waitForTimeout()); + auto& session = *mockSessionCreated.get(); + { + LOGV2(6109525, "The first message we source should time out"); + Notification<StatusWith<Message>> done; + session.schedule([&](const auto&) { + session.session->setTimeout(Milliseconds{500}); + done.set(session.session->sourceMessage()); + }); + ASSERT_EQ(done.get().getStatus(), ErrorCodes::NetworkTimeout); + LOGV2(6109526, "timed out successfully"); + } + { + LOGV2(6109527, "Verify a message can be successfully received"); + Notification<StatusWith<Message>> done; + session.schedule([&](const auto&) { done.set(session.session->sourceMessage()); }); + ping(conn); + ASSERT_OK(done.get().getStatus()); + } + { + LOGV2(6109528, "Clear the timeout and verify reception of a late message."); + Notification<StatusWith<Message>> done; + session.schedule([&](const auto&) { + LOGV2(6109529, "waiting for message without a timeout"); + session.session->setTimeout({}); + done.set(session.session->sourceMessage()); + }); + sleepFor(Seconds{1}); + ping(conn); + ASSERT_OK(done.get().getStatus()); + } tla->shutdown(); } diff --git a/src/mongo/util/net/sock.cpp b/src/mongo/util/net/sock.cpp index 35776fe4c73..3a42470524a 100644 --- a/src/mongo/util/net/sock.cpp +++ b/src/mongo/util/net/sock.cpp @@ -299,13 +299,13 @@ SSLPeerInfo Socket::doSSLHandshake(const char* firstBytes, int len) { #endif -bool Socket::connect(SockAddr& remote) { +bool Socket::connect(const SockAddr& remote) { const Milliseconds connectTimeoutMillis(static_cast<int64_t>( _timeout > 0 ? std::min(kMaxConnectTimeoutMS, (_timeout * 1000)) : kMaxConnectTimeoutMS)); return connect(remote, connectTimeoutMillis); } -bool Socket::connect(SockAddr& remote, Milliseconds connectTimeoutMillis) { +bool Socket::connect(const SockAddr& remote, Milliseconds connectTimeoutMillis) { _remote = remote; _fd = ::socket(remote.getType(), SOCK_STREAM, 0); diff --git a/src/mongo/util/net/sock.h b/src/mongo/util/net/sock.h index 2548a46804c..7257e5e16f6 100644 --- a/src/mongo/util/net/sock.h +++ b/src/mongo/util/net/sock.h @@ -109,12 +109,12 @@ public: * an error, or due to a timeout on connection, or due to the system socket deciding the * socket is invalid. */ - bool connect(SockAddr& remote, Milliseconds connectTimeoutMillis); + bool connect(const SockAddr& remote, Milliseconds connectTimeoutMillis); /** * Connect using a default connect timeout of min(_timeout * 1000, kMaxConnectTimeoutMS) */ - bool connect(SockAddr& remote); + bool connect(const SockAddr& remote); void close(); void send(const char* data, int len, const char* context); |