summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBilly Donahue <billy.donahue@mongodb.com>2021-11-05 03:32:05 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-11-05 04:11:50 +0000
commit4dce840273127d4b2c86f8c790e081f19e44887a (patch)
tree5d0c9b875bbfdaaa3332b7305a53ef6245a50898
parent32064f88ed311247af2b707a3560e57543aebcc8 (diff)
downloadmongo-4dce840273127d4b2c86f8c790e081f19e44887a.tar.gz
SERVER-61095 improvements to transport_layer_asio_test (again)
-rw-r--r--src/mongo/transport/transport_layer_asio.h9
-rw-r--r--src/mongo/transport/transport_layer_asio_test.cpp553
-rw-r--r--src/mongo/util/net/sock.cpp4
-rw-r--r--src/mongo/util/net/sock.h4
4 files changed, 286 insertions, 284 deletions
diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h
index 665d0411167..8c48b3f6ec7 100644
--- a/src/mongo/transport/transport_layer_asio.h
+++ b/src/mongo/transport/transport_layer_asio.h
@@ -68,13 +68,14 @@ class ServiceEntryPoint;
namespace transport {
-// This fail point simulates reads and writes that always return 1 byte and fail with EAGAIN
+// Simulates reads and writes that always return 1 byte and fail with EAGAIN
extern FailPoint transportLayerASIOshortOpportunisticReadWrite;
-// This fail point will cause an asyncConnect to timeout after it's successfully connected
-// to the remote peer
+// Cause an asyncConnect to timeout after it's successfully connected to the remote peer
extern FailPoint transportLayerASIOasyncConnectTimesOut;
+extern FailPoint transportLayerASIOhangBeforeAccept;
+
/**
* A TransportLayer implementation based on ASIO networking primitives.
*/
@@ -115,7 +116,7 @@ public:
ServiceEntryPoint* sep,
const WireSpec& wireSpec = WireSpec::instance());
- virtual ~TransportLayerASIO();
+ ~TransportLayerASIO() override;
StatusWith<SessionHandle> connect(HostAndPort peer,
ConnectSSLMode sslMode,
diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp
index 7bfc2e3f5c4..740c6cbbd36 100644
--- a/src/mongo/transport/transport_layer_asio_test.cpp
+++ b/src/mongo/transport/transport_layer_asio_test.cpp
@@ -28,196 +28,180 @@
*/
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
-#include "mongo/platform/basic.h"
-
#include "mongo/transport/transport_layer_asio.h"
+#include <queue>
+#include <system_error>
+#include <utility>
+#include <vector>
+
+#include <asio.hpp>
+
#include "mongo/db/server_options.h"
#include "mongo/logv2/log.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/platform/basic.h"
#include "mongo/rpc/op_msg.h"
#include "mongo/transport/service_entry_point.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
+#include "mongo/util/concurrency/notification.h"
#include "mongo/util/net/sock.h"
-
-#include "asio.hpp"
+#include "mongo/util/synchronized_value.h"
+#include "mongo/util/time_support.h"
namespace mongo {
namespace {
-class ServiceEntryPointUtil : public ServiceEntryPoint {
+#ifdef _WIN32
+using SetsockoptPtr = char*;
+#else
+using SetsockoptPtr = void*;
+#endif
+
+template <typename T>
+class BlockingQueue {
public:
- void startSession(transport::SessionHandle session) override {
- stdx::unique_lock<Latch> lk(_mutex);
- _sessions.push_back(std::move(session));
- LOGV2(2303201, "started session");
+ void push(T t) {
+ stdx::unique_lock lk(_mu);
+ _q.push(std::move(t));
+ lk.unlock();
_cv.notify_one();
}
- void endAllSessions(transport::Session::TagMask tags) override {
- LOGV2(2303301, "end all sessions");
- std::vector<transport::SessionHandle> old_sessions;
- {
- stdx::unique_lock<Latch> lock(_mutex);
- old_sessions.swap(_sessions);
- }
- old_sessions.clear();
- }
-
- Status start() override {
- return Status::OK();
+ T pop() {
+ stdx::unique_lock lk(_mu);
+ _cv.wait(lk, [&] { return !_q.empty(); });
+ T r = std::move(_q.front());
+ _q.pop();
+ return r;
}
- bool shutdown(Milliseconds timeout) override {
- return true;
- }
+private:
+ mutable Mutex _mu;
+ mutable stdx::condition_variable _cv;
+ std::queue<T> _q;
+};
- void appendStats(BSONObjBuilder*) const override {}
+class ConnectionThread {
+public:
+ explicit ConnectionThread(int port) : ConnectionThread(port, nullptr) {}
+ ConnectionThread(int port, std::function<void(ConnectionThread&)> onConnect)
+ : _port{port}, _onConnect{std::move(onConnect)}, _thr{[this] { _run(); }} {}
- size_t numOpenSessions() const override {
- stdx::unique_lock<Latch> lock(_mutex);
- return _sessions.size();
+ ~ConnectionThread() {
+ LOGV2(6109500, "connection: Tx stop request");
+ _stopRequest.set(true);
+ _thr.join();
+ LOGV2(6109501, "connection: joined");
}
- Future<DbResponse> handleRequest(OperationContext* opCtx,
- const Message& request) noexcept override {
- MONGO_UNREACHABLE;
+ void close() {
+ _s.close();
}
- void setTransportLayer(transport::TransportLayer* tl) {
- _transport = tl;
+ Socket& socket() {
+ return _s;
}
- void waitForConnect() {
- stdx::unique_lock<Latch> lock(_mutex);
- _cv.wait(lock, [&] { return !_sessions.empty(); });
+private:
+ void _run() {
+ _s.connect(SockAddr::create("localhost", _port, AF_INET));
+ LOGV2(6109502, "connected", "port"_attr = _port);
+ if (_onConnect)
+ _onConnect(*this);
+ _stopRequest.get();
+ LOGV2(6109503, "connection: Rx stop request");
}
-private:
- mutable Mutex _mutex = MONGO_MAKE_LATCH("::_mutex");
- stdx::condition_variable _cv;
- std::vector<transport::SessionHandle> _sessions;
- transport::TransportLayer* _transport = nullptr;
+ int _port;
+ std::function<void(ConnectionThread&)> _onConnect;
+ stdx::thread _thr;
+ Socket _s;
+ Notification<bool> _stopRequest;
};
-class SimpleConnectionThread {
+class SyncClient {
public:
- explicit SimpleConnectionThread(int port) : _port(port) {
- _thr = stdx::thread{[&] {
- auto sa = SockAddr::create("localhost", _port, AF_INET);
- _s.connect(sa);
- LOGV2(23034, "connection: port {port}", "port"_attr = _port);
- stdx::unique_lock<Latch> lk(_mutex);
- _cv.wait(lk, [&] { return _stop; });
- LOGV2(23035, "connection: Rx stop request");
- }};
- }
-
- void forceCloseSocket() {
- // Setting linger on to a zero-value timeout causes the socket to send an RST packet to the
- // recipient side when closing the connection.
- struct linger sl = {1, 0};
-#ifdef _WIN32
- char* pval = reinterpret_cast<char*>(&sl);
-#else
- void* pval = &sl;
-#endif
- ASSERT(!setsockopt(_s.rawFD(), SOL_SOCKET, SO_LINGER, pval, sizeof(sl)));
- _s.close();
+ explicit SyncClient(int port) {
+ std::error_code ec;
+ _sock.connect(asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port), ec);
+ ASSERT_EQ(ec, std::error_code());
+ LOGV2(6109504, "sync client connected");
}
- void stop() {
- {
- stdx::unique_lock<Latch> lk(_mutex);
- _stop = true;
- }
- LOGV2(23036, "connection: Tx stop request");
- _cv.notify_one();
- _thr.join();
- LOGV2(23037, "connection: stopped");
+ std::error_code write(const char* buf, size_t bufSize) {
+ std::error_code ec;
+ asio::write(_sock, asio::buffer(buf, bufSize), ec);
+ return ec;
}
private:
- Mutex _mutex = MONGO_MAKE_LATCH("SimpleConnectionThread::_mutex");
- stdx::condition_variable _cv;
- stdx::thread _thr;
- bool _stop = false;
-
- Socket _s;
- int _port;
+ asio::io_context _ctx{};
+ asio::ip::tcp::socket _sock{_ctx};
};
-std::unique_ptr<transport::TransportLayerASIO> makeAndStartTL(ServiceEntryPoint* sep) {
- auto options = [] {
- ServerGlobalParams params;
- params.noUnixSocket = true;
- transport::TransportLayerASIO::Options opts(&params);
-
- // TODO SERVER-30212 should clean this up and assign a port from the supplied port range
- // provided by resmoke.
- opts.port = 0;
- return opts;
- }();
-
- auto tla = std::make_unique<transport::TransportLayerASIO>(options, sep);
- ASSERT_OK(tla->setup());
- ASSERT_OK(tla->start());
-
- return tla;
+void ping(SyncClient& client) {
+ OpMsgBuilder builder;
+ builder.setBody(BSON("ping" << 1));
+ Message msg = builder.finish();
+ msg.header().setResponseToMsgId(0);
+ msg.header().setId(0);
+ OpMsg::appendChecksum(&msg);
+ ASSERT_EQ(client.write(msg.buf(), msg.size()), std::error_code{});
}
-TEST(TransportLayerASIO, PortZeroConnect) {
- ServiceEntryPointUtil sepu;
- auto tla = makeAndStartTL(&sepu);
-
- int port = tla->listenerPort();
- ASSERT_GT(port, 0);
- LOGV2(23038, "TransportLayerASIO.listenerPort() is {port}", "port"_attr = port);
+struct SessionThread {
+ struct StopException {};
- SimpleConnectionThread connect_thread(port);
- sepu.waitForConnect();
+ explicit SessionThread(std::shared_ptr<transport::Session> s)
+ : _session{std::move(s)}, _thread{[this] { run(); }} {}
- ASSERT_EQ(sepu.numOpenSessions(), 1);
- connect_thread.stop();
- sepu.endAllSessions({});
- tla->shutdown();
-}
+ ~SessionThread() {
+ _join();
+ }
-TEST(TransportLayerASIO, TCPResetAfterConnectionIsSilentlySwallowed) {
- ServiceEntryPointUtil sepu;
- auto tla = makeAndStartTL(&sepu);
+ void schedule(std::function<void(transport::Session&)> task) {
+ _tasks.push(std::move(task));
+ }
- auto hangBeforeAcceptFp = globalFailPointRegistry().find("transportLayerASIOhangBeforeAccept");
- auto timesEntered = hangBeforeAcceptFp->setMode(FailPoint::alwaysOn);
+ void run() {
+ while (true) {
+ try {
+ LOGV2(6109508, "SessionThread: pop and execute a task");
+ _tasks.pop()(*_session);
+ } catch (const StopException&) {
+ LOGV2(6109509, "SessionThread: stopping");
+ return;
+ }
+ }
+ }
- SimpleConnectionThread connect_thread(tla->listenerPort());
- hangBeforeAcceptFp->waitForTimesEntered(timesEntered + 1);
+ transport::Session& session() const {
+ return *_session;
+ }
- connect_thread.forceCloseSocket();
- hangBeforeAcceptFp->setMode(FailPoint::off);
+private:
+ void _join() {
+ if (!_thread.joinable())
+ return;
+ schedule([](auto&&) { throw StopException{}; });
+ _thread.join();
+ }
- ASSERT_EQ(sepu.numOpenSessions(), 0);
- connect_thread.stop();
- tla->shutdown();
-}
+ std::shared_ptr<transport::Session> _session;
+ stdx::thread _thread;
+ BlockingQueue<std::function<void(transport::Session&)>> _tasks;
+};
-class TimeoutSEP : public ServiceEntryPoint {
+class MockSEP : public ServiceEntryPoint {
public:
- ~TimeoutSEP() override {
- // This should shutdown immediately, so give the maximum timeout
- shutdown(Milliseconds::max());
- }
+ MockSEP() = default;
+ explicit MockSEP(std::function<void(SessionThread&)> onStartSession)
+ : _onStartSession(std::move(onStartSession)) {}
- void endAllSessions(transport::Session::TagMask tags) override {
- MONGO_UNREACHABLE;
- }
-
- bool shutdown(Milliseconds timeout) override {
- LOGV2(23039, "Joining all worker threads");
- for (auto& thread : _workerThreads) {
- thread.join();
- }
- return true;
+ ~MockSEP() override {
+ _join();
}
Status start() override {
@@ -226,182 +210,199 @@ public:
void appendStats(BSONObjBuilder*) const override {}
- size_t numOpenSessions() const override {
- return 0;
- }
-
Future<DbResponse> handleRequest(OperationContext* opCtx,
const Message& request) noexcept override {
MONGO_UNREACHABLE;
}
- bool waitForTimeout(boost::optional<Milliseconds> timeout = boost::none) {
- stdx::unique_lock<Latch> lk(_mutex);
- bool ret = true;
- if (timeout) {
- ret = _cond.wait_for(lk, timeout->toSystemDuration(), [this] { return _finished; });
- } else {
- _cond.wait(lk, [this] { return _finished; });
- }
-
- _finished = false;
- return ret;
+ void startSession(std::shared_ptr<transport::Session> session) override {
+ LOGV2(6109510, "Accepted connection", "remote"_attr = session->remote());
+ auto& newSession = [&]() -> SessionThread& {
+ auto vec = *_sessions;
+ vec->push_back(std::make_unique<SessionThread>(std::move(session)));
+ return *vec->back();
+ }();
+ if (_onStartSession)
+ _onStartSession(newSession);
+ LOGV2(6109511, "started session");
}
-protected:
- void notifyComplete() {
- stdx::unique_lock<Latch> lk(_mutex);
- _finished = true;
- _cond.notify_one();
+ void endAllSessions(transport::Session::TagMask tags) override {
+ _join();
}
- template <typename FunT>
- void startWorkerThread(FunT&& fun) {
- _workerThreads.emplace_back(std::forward<FunT>(fun));
+ bool shutdown(Milliseconds timeout) override {
+ _join();
+ return true;
}
-private:
- Mutex _mutex = MONGO_MAKE_LATCH("TimeoutSEP::_mutex");
-
- stdx::condition_variable _cond;
- bool _finished = false;
-
- std::vector<stdx::thread> _workerThreads;
-};
-
-class TimeoutSyncSEP : public TimeoutSEP {
-public:
- enum Mode { kShouldTimeout, kNoTimeout };
- TimeoutSyncSEP(Mode mode) : _mode(mode) {}
-
- void startSession(transport::SessionHandle session) override {
- LOGV2(23040, "Accepted connection", "remote"_attr = session->remote());
- startWorkerThread([this, session = std::move(session)]() mutable {
- LOGV2(23041, "waiting for message");
- session->setTimeout(Milliseconds{500});
- auto status = session->sourceMessage().getStatus();
- if (_mode == kShouldTimeout) {
- ASSERT_EQ(status, ErrorCodes::NetworkTimeout);
- LOGV2(23042, "message timed out");
- } else {
- ASSERT_OK(status);
- LOGV2(23043, "message received okay");
- }
+ size_t numOpenSessions() const override {
+ return _sessions->size();
+ }
- session.reset();
- notifyComplete();
- });
+ void setOnStartSession(std::function<void(SessionThread&)> cb) {
+ _onStartSession = std::move(cb);
}
private:
- Mode _mode;
+ void _join() {
+ LOGV2(6109513, "Joining all session threads");
+ _sessions->clear();
+ }
+
+ std::function<void(SessionThread&)> _onStartSession;
+ synchronized_value<std::vector<std::unique_ptr<SessionThread>>> _sessions;
};
-class TimeoutConnector {
+/**
+ * Properly setting up and tearing down the MockSEP and TransportLayerASIO is
+ * tricky. Most tests can delegate the details to this TestFixture.
+ */
+class TestFixture {
public:
- TimeoutConnector(int port, bool sendRequest)
- : _ctx(), _sock(_ctx), _endpoint(asio::ip::address_v4::loopback(), port) {
- std::error_code ec;
- _sock.connect(_endpoint, ec);
- ASSERT_EQ(ec, std::error_code());
+ TestFixture() : _tla{_makeTLA()} {}
- if (sendRequest) {
- sendMessage();
- }
+ ~TestFixture() {
+ _sep.endAllSessions({});
+ _tla->shutdown();
}
- void sendMessage() {
- OpMsgBuilder builder;
- builder.setBody(BSON("ping" << 1));
- Message msg = builder.finish();
- msg.header().setResponseToMsgId(0);
- msg.header().setId(0);
- OpMsg::appendChecksum(&msg);
+ MockSEP& sep() {
+ return _sep;
+ }
- std::error_code ec;
- asio::write(_sock, asio::buffer(msg.buf(), msg.size()), ec);
- ASSERT_FALSE(ec);
+ transport::TransportLayerASIO& tla() {
+ return *_tla;
}
private:
- asio::io_context _ctx;
- asio::ip::tcp::socket _sock;
- asio::ip::tcp::endpoint _endpoint;
+ std::unique_ptr<transport::TransportLayerASIO> _makeTLA() {
+ auto options = [] {
+ ServerGlobalParams params;
+ params.noUnixSocket = true;
+ transport::TransportLayerASIO::Options opts(&params);
+ // TODO SERVER-30212 should clean this up and assign a port from the supplied port range
+ // provided by resmoke.
+ opts.port = 0;
+ return opts;
+ }();
+ auto tla = std::make_unique<transport::TransportLayerASIO>(options, &_sep);
+ ASSERT_OK(tla->setup());
+ ASSERT_OK(tla->start());
+ return tla;
+ }
+
+ MockSEP _sep;
+ std::unique_ptr<transport::TransportLayerASIO> _tla;
};
-/* check that timeouts actually time out */
-TEST(TransportLayerASIO, SourceSyncTimeoutTimesOut) {
- TimeoutSyncSEP sep(TimeoutSyncSEP::kShouldTimeout);
- auto tla = makeAndStartTL(&sep);
+TEST(TransportLayerASIO, ListenerPortZeroTreatedAsEphemeral) {
+ Notification<bool> connected;
+ TestFixture tf;
+ tf.sep().setOnStartSession([&](auto&&) { connected.set(true); });
- TimeoutConnector connector(tla->listenerPort(), false);
+ int port = tf.tla().listenerPort();
+ ASSERT_GT(port, 0);
+ LOGV2(6109514, "TransportLayerASIO listening", "port"_attr = port);
- sep.waitForTimeout();
- tla->shutdown();
+ ConnectionThread connectThread(port);
+ connected.get();
}
-/* check that timeouts don't time out unless there's an actual timeout */
-TEST(TransportLayerASIO, SourceSyncTimeoutSucceeds) {
- TimeoutSyncSEP sep(TimeoutSyncSEP::kNoTimeout);
- auto tla = makeAndStartTL(&sep);
-
- TimeoutConnector connector(tla->listenerPort(), true);
+TEST(TransportLayerASIO, TCPResetAfterConnectionIsSilentlySwallowed) {
- sep.waitForTimeout();
- tla->shutdown();
-}
+ TestFixture tf;
-/* check that switching from timeouts to no timeouts correctly resets the timeout to unlimited */
-class TimeoutSwitchModesSEP : public TimeoutSEP {
-public:
- void startSession(transport::SessionHandle session) override {
- LOGV2(23044, "Accepted connection", "remote"_attr = session->remote());
- startWorkerThread([this, session = std::move(session)]() mutable {
- LOGV2(23045, "waiting for message");
- auto sourceMessage = [&] { return session->sourceMessage().getStatus(); };
-
- // the first message we source should time out.
- session->setTimeout(Milliseconds{500});
- ASSERT_EQ(sourceMessage(), ErrorCodes::NetworkTimeout);
- notifyComplete();
-
- LOGV2(23046, "timed out successfully");
-
- // get the session back in a known state with the timeout still in place
- ASSERT_OK(sourceMessage());
- notifyComplete();
-
- LOGV2(23047, "waiting for message without a timeout");
-
- // this should block and timeout the waitForComplete mutex, and the session should wait
- // for a while to make sure this isn't timing out and then send a message to unblock
- // the this call to recv
- session->setTimeout(boost::none);
- ASSERT_OK(sourceMessage());
-
- session.reset();
- notifyComplete();
- LOGV2(23048, "ending test");
- });
- }
-};
+ AtomicWord<int> sessionsCreated{0};
+ tf.sep().setOnStartSession([&](auto&&) { sessionsCreated.fetchAndAdd(1); });
-TEST(TransportLayerASIO, SwitchTimeoutModes) {
- TimeoutSwitchModesSEP sep;
- auto tla = makeAndStartTL(&sep);
+ LOGV2(6109515, "connecting");
+ auto& fp = transport::transportLayerASIOhangBeforeAccept;
+ auto timesEntered = fp.setMode(FailPoint::alwaysOn);
+ ConnectionThread connectThread(tf.tla().listenerPort(), [&](ConnectionThread& conn) {
+ // Linger timeout = 0 causes a RST packet on close.
+ struct linger sl = {1, 0};
+ if (setsockopt(conn.socket().rawFD(),
+ SOL_SOCKET,
+ SO_LINGER,
+ reinterpret_cast<SetsockoptPtr>(&sl),
+ sizeof(sl)) != 0) {
+ auto err = make_error_code(std::errc{errno});
+ LOGV2_ERROR(6109517, "setsockopt", "error"_attr = err.message());
+ }
+ });
+ fp.waitForTimesEntered(timesEntered + 1);
- TimeoutConnector connector(tla->listenerPort(), false);
+ LOGV2(6109516, "closing");
+ connectThread.close();
+ fp.setMode(FailPoint::off);
- ASSERT_TRUE(sep.waitForTimeout());
+ ASSERT_EQ(sessionsCreated.load(), 0);
+}
- connector.sendMessage();
- ASSERT_TRUE(sep.waitForTimeout());
+/* check that timeouts actually time out */
+TEST(TransportLayerASIO, SourceSyncTimeoutTimesOut) {
+ TestFixture tf;
+ Notification<StatusWith<Message>> received;
+ tf.sep().setOnStartSession([&](SessionThread& st) {
+ st.session().setTimeout(Milliseconds{500});
+ st.schedule([&](auto& session) { received.set(session.sourceMessage()); });
+ });
+ SyncClient conn(tf.tla().listenerPort());
+ ASSERT_EQ(received.get().getStatus(), ErrorCodes::NetworkTimeout);
+}
- ASSERT_FALSE(sep.waitForTimeout(Milliseconds{1000}));
- connector.sendMessage();
- ASSERT_TRUE(sep.waitForTimeout());
+/* check that timeouts don't time out unless there's an actual timeout */
+TEST(TransportLayerASIO, SourceSyncTimeoutSucceeds) {
+ TestFixture tf;
+ Notification<StatusWith<Message>> received;
+ tf.sep().setOnStartSession([&](SessionThread& st) {
+ st.session().setTimeout(Milliseconds{500});
+ st.schedule([&](auto& session) { received.set(session.sourceMessage()); });
+ });
+ SyncClient conn(tf.tla().listenerPort());
+ ping(conn); // This time we send a message
+ ASSERT_OK(received.get().getStatus());
+}
- tla->shutdown();
+/** Switching from timeouts to no timeouts must reset the timeout to unlimited. */
+TEST(TransportLayerASIO, SwitchTimeoutModes) {
+ TestFixture tf;
+ Notification<SessionThread*> mockSessionCreated;
+ tf.sep().setOnStartSession([&](SessionThread& st) { mockSessionCreated.set(&st); });
+
+ SyncClient conn(tf.tla().listenerPort());
+ auto& st = *mockSessionCreated.get();
+
+ {
+ LOGV2(6109525, "The first message we source should time out");
+ Notification<StatusWith<Message>> done;
+ st.schedule([&](auto& session) {
+ session.setTimeout(Milliseconds{500});
+ done.set(session.sourceMessage());
+ });
+ ASSERT_EQ(done.get().getStatus(), ErrorCodes::NetworkTimeout);
+ LOGV2(6109526, "timed out successfully");
+ }
+ {
+ LOGV2(6109527, "Verify a message can be successfully received");
+ Notification<StatusWith<Message>> done;
+ st.schedule([&](auto& session) { done.set(session.sourceMessage()); });
+ ping(conn);
+ ASSERT_OK(done.get().getStatus());
+ }
+ {
+ LOGV2(6109528, "Clear the timeout and verify reception of a late message.");
+ Notification<StatusWith<Message>> done;
+ st.schedule([&](auto& session) {
+ LOGV2(6109529, "waiting for message without a timeout");
+ session.setTimeout({});
+ done.set(session.sourceMessage());
+ });
+ sleepFor(Seconds{1});
+ ping(conn);
+ ASSERT_OK(done.get().getStatus());
+ }
}
} // namespace
diff --git a/src/mongo/util/net/sock.cpp b/src/mongo/util/net/sock.cpp
index 35776fe4c73..3a42470524a 100644
--- a/src/mongo/util/net/sock.cpp
+++ b/src/mongo/util/net/sock.cpp
@@ -299,13 +299,13 @@ SSLPeerInfo Socket::doSSLHandshake(const char* firstBytes, int len) {
#endif
-bool Socket::connect(SockAddr& remote) {
+bool Socket::connect(const SockAddr& remote) {
const Milliseconds connectTimeoutMillis(static_cast<int64_t>(
_timeout > 0 ? std::min(kMaxConnectTimeoutMS, (_timeout * 1000)) : kMaxConnectTimeoutMS));
return connect(remote, connectTimeoutMillis);
}
-bool Socket::connect(SockAddr& remote, Milliseconds connectTimeoutMillis) {
+bool Socket::connect(const SockAddr& remote, Milliseconds connectTimeoutMillis) {
_remote = remote;
_fd = ::socket(remote.getType(), SOCK_STREAM, 0);
diff --git a/src/mongo/util/net/sock.h b/src/mongo/util/net/sock.h
index 2548a46804c..7257e5e16f6 100644
--- a/src/mongo/util/net/sock.h
+++ b/src/mongo/util/net/sock.h
@@ -109,12 +109,12 @@ public:
* an error, or due to a timeout on connection, or due to the system socket deciding the
* socket is invalid.
*/
- bool connect(SockAddr& remote, Milliseconds connectTimeoutMillis);
+ bool connect(const SockAddr& remote, Milliseconds connectTimeoutMillis);
/**
* Connect using a default connect timeout of min(_timeout * 1000, kMaxConnectTimeoutMS)
*/
- bool connect(SockAddr& remote);
+ bool connect(const SockAddr& remote);
void close();
void send(const char* data, int len, const char* context);