diff options
author | auto-revert-processor <dev-prod-dag@mongodb.com> | 2021-11-03 05:54:08 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-11-03 06:33:42 +0000 |
commit | e176e92d0d3c97154bedde797a2d791e6f3e9ee6 (patch) | |
tree | 9ddfe53e8a77d050fa5a8d5954aef843ce94722f | |
parent | 84c2c6eb8a73edb7daa1e253bd3abc4764fda1d0 (diff) | |
download | mongo-e176e92d0d3c97154bedde797a2d791e6f3e9ee6.tar.gz |
Revert "SERVER-61095 improvements to transport_layer_asio_test"
This reverts commit 385b7a54f0e44807611321dacabda314a5a527b1.
-rw-r--r-- | src/mongo/transport/transport_layer_asio.h | 9 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio_test.cpp | 544 | ||||
-rw-r--r-- | src/mongo/util/net/sock.cpp | 4 | ||||
-rw-r--r-- | src/mongo/util/net/sock.h | 4 |
4 files changed, 276 insertions, 285 deletions
diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h index 8c48b3f6ec7..665d0411167 100644 --- a/src/mongo/transport/transport_layer_asio.h +++ b/src/mongo/transport/transport_layer_asio.h @@ -68,14 +68,13 @@ class ServiceEntryPoint; namespace transport { -// Simulates reads and writes that always return 1 byte and fail with EAGAIN +// This fail point simulates reads and writes that always return 1 byte and fail with EAGAIN extern FailPoint transportLayerASIOshortOpportunisticReadWrite; -// Cause an asyncConnect to timeout after it's successfully connected to the remote peer +// This fail point will cause an asyncConnect to timeout after it's successfully connected +// to the remote peer extern FailPoint transportLayerASIOasyncConnectTimesOut; -extern FailPoint transportLayerASIOhangBeforeAccept; - /** * A TransportLayer implementation based on ASIO networking primitives. */ @@ -116,7 +115,7 @@ public: ServiceEntryPoint* sep, const WireSpec& wireSpec = WireSpec::instance()); - ~TransportLayerASIO() override; + virtual ~TransportLayerASIO(); StatusWith<SessionHandle> connect(HostAndPort peer, ConnectSSLMode sslMode, diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp index df3124ca5ee..7bfc2e3f5c4 100644 --- a/src/mongo/transport/transport_layer_asio_test.cpp +++ b/src/mongo/transport/transport_layer_asio_test.cpp @@ -28,387 +28,379 @@ */ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest -#include "mongo/transport/transport_layer_asio.h" - -#include <queue> -#include <utility> -#include <vector> +#include "mongo/platform/basic.h" -#include <asio.hpp> +#include "mongo/transport/transport_layer_asio.h" #include "mongo/db/server_options.h" #include "mongo/logv2/log.h" -#include "mongo/platform/basic.h" #include "mongo/rpc/op_msg.h" #include "mongo/transport/service_entry_point.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" -#include "mongo/util/concurrency/notification.h" #include "mongo/util/net/sock.h" -#include "mongo/util/synchronized_value.h" -#include "mongo/util/time_support.h" + +#include "asio.hpp" namespace mongo { namespace { -#ifdef _WIN32 -using SetsockoptPtr = char*; -#else -using SetsockoptPtr = void*; -#endif - -template <typename T> -class BlockingQueue { +class ServiceEntryPointUtil : public ServiceEntryPoint { public: - void push(T t) { - stdx::unique_lock lk(_mu); - _q.push(std::move(t)); - lk.unlock(); + void startSession(transport::SessionHandle session) override { + stdx::unique_lock<Latch> lk(_mutex); + _sessions.push_back(std::move(session)); + LOGV2(2303201, "started session"); _cv.notify_one(); } - T pop() { - stdx::unique_lock lk(_mu); - _cv.wait(lk, [&] { return !_q.empty(); }); - T r = std::move(_q.front()); - _q.pop(); - return r; + void endAllSessions(transport::Session::TagMask tags) override { + LOGV2(2303301, "end all sessions"); + std::vector<transport::SessionHandle> old_sessions; + { + stdx::unique_lock<Latch> lock(_mutex); + old_sessions.swap(_sessions); + } + old_sessions.clear(); } -private: - mutable Mutex _mu; - mutable stdx::condition_variable _cv; - std::queue<T> _q; -}; - -class ConnectionThread { -public: - explicit ConnectionThread(int port) : _port{port}, _thr{[this] { run(); }} {} + Status start() override { + return Status::OK(); + } - ~ConnectionThread() { - if (!_stop) - stop(); + bool shutdown(Milliseconds timeout) override { + return true; } - void close() { - _s.close(); + void appendStats(BSONObjBuilder*) const override {} + + size_t numOpenSessions() const override { + stdx::unique_lock<Latch> lock(_mutex); + return _sessions.size(); } - void stop() { - LOGV2(6109500, "connection: Tx stop request"); - _stop.set(true); - _thr.join(); - LOGV2(6109501, "connection: joined"); + Future<DbResponse> handleRequest(OperationContext* opCtx, + const Message& request) noexcept override { + MONGO_UNREACHABLE; } -protected: - Socket& socket() { - return _s; + void setTransportLayer(transport::TransportLayer* tl) { + _transport = tl; } -private: - virtual void onConnect() {} - - void run() { - _s.connect(SockAddr::create("localhost", _port, AF_INET)); - LOGV2(6109502, "connection: port {port}", "port"_attr = _port); - onConnect(); - _stop.get(); - LOGV2(6109503, "connection: Rx stop request"); + void waitForConnect() { + stdx::unique_lock<Latch> lock(_mutex); + _cv.wait(lock, [&] { return !_sessions.empty(); }); } - int _port; - stdx::thread _thr; - Socket _s; - Notification<bool> _stop; +private: + mutable Mutex _mutex = MONGO_MAKE_LATCH("::_mutex"); + stdx::condition_variable _cv; + std::vector<transport::SessionHandle> _sessions; + transport::TransportLayer* _transport = nullptr; }; -class SyncClient { +class SimpleConnectionThread { public: - explicit SyncClient(int port) { - std::error_code ec; - _sock.connect(asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port), ec); - ASSERT_EQ(ec, std::error_code()); - LOGV2(6109504, "sync client connected"); + explicit SimpleConnectionThread(int port) : _port(port) { + _thr = stdx::thread{[&] { + auto sa = SockAddr::create("localhost", _port, AF_INET); + _s.connect(sa); + LOGV2(23034, "connection: port {port}", "port"_attr = _port); + stdx::unique_lock<Latch> lk(_mutex); + _cv.wait(lk, [&] { return _stop; }); + LOGV2(23035, "connection: Rx stop request"); + }}; } - std::error_code write(const char* buf, size_t bufSize) { - std::error_code ec; - asio::write(_sock, asio::buffer(buf, bufSize), ec); - return ec; + void forceCloseSocket() { + // Setting linger on to a zero-value timeout causes the socket to send an RST packet to the + // recipient side when closing the connection. + struct linger sl = {1, 0}; +#ifdef _WIN32 + char* pval = reinterpret_cast<char*>(&sl); +#else + void* pval = &sl; +#endif + ASSERT(!setsockopt(_s.rawFD(), SOL_SOCKET, SO_LINGER, pval, sizeof(sl))); + _s.close(); + } + + void stop() { + { + stdx::unique_lock<Latch> lk(_mutex); + _stop = true; + } + LOGV2(23036, "connection: Tx stop request"); + _cv.notify_one(); + _thr.join(); + LOGV2(23037, "connection: stopped"); } private: - asio::io_context _ctx{}; - asio::ip::tcp::socket _sock{_ctx}; + Mutex _mutex = MONGO_MAKE_LATCH("SimpleConnectionThread::_mutex"); + stdx::condition_variable _cv; + stdx::thread _thr; + bool _stop = false; + + Socket _s; + int _port; }; -void ping(SyncClient& client) { - OpMsgBuilder builder; - builder.setBody(BSON("ping" << 1)); - Message msg = builder.finish(); - msg.header().setResponseToMsgId(0); - msg.header().setId(0); - OpMsg::appendChecksum(&msg); - ASSERT_FALSE(client.write(msg.buf(), msg.size())); +std::unique_ptr<transport::TransportLayerASIO> makeAndStartTL(ServiceEntryPoint* sep) { + auto options = [] { + ServerGlobalParams params; + params.noUnixSocket = true; + transport::TransportLayerASIO::Options opts(¶ms); + + // TODO SERVER-30212 should clean this up and assign a port from the supplied port range + // provided by resmoke. + opts.port = 0; + return opts; + }(); + + auto tla = std::make_unique<transport::TransportLayerASIO>(options, sep); + ASSERT_OK(tla->setup()); + ASSERT_OK(tla->start()); + + return tla; } -class MockSEP : public ServiceEntryPoint { -public: - struct StopException {}; +TEST(TransportLayerASIO, PortZeroConnect) { + ServiceEntryPointUtil sepu; + auto tla = makeAndStartTL(&sepu); - struct Session { - using Task = std::function<void(Session&)>; + int port = tla->listenerPort(); + ASSERT_GT(port, 0); + LOGV2(23038, "TransportLayerASIO.listenerPort() is {port}", "port"_attr = port); - ~Session() { - stop(); - } + SimpleConnectionThread connect_thread(port); + sepu.waitForConnect(); - void schedule(Task task) { - LOGV2(6109505, "scheduling task"); - tasks.push(std::move(task)); - } + ASSERT_EQ(sepu.numOpenSessions(), 1); + connect_thread.stop(); + sepu.endAllSessions({}); + tla->shutdown(); +} - void start() { - thread = stdx::thread([this] { run(); }); - } +TEST(TransportLayerASIO, TCPResetAfterConnectionIsSilentlySwallowed) { + ServiceEntryPointUtil sepu; + auto tla = makeAndStartTL(&sepu); - void stop() { - if (thread.joinable()) { - schedule([](auto&&) { throw StopException{}; }); - thread.join(); - } - } + auto hangBeforeAcceptFp = globalFailPointRegistry().find("transportLayerASIOhangBeforeAccept"); + auto timesEntered = hangBeforeAcceptFp->setMode(FailPoint::alwaysOn); - void run() { - LOGV2(6109506, "doSession"); - while (true) { - LOGV2(6109507, "polling for work"); - try { - LOGV2(6109508, "running a session task"); - tasks.pop()(*this); - } catch (const StopException&) { - LOGV2(6109509, "caught StopException"); - return; - } - } - } + SimpleConnectionThread connect_thread(tla->listenerPort()); + hangBeforeAcceptFp->waitForTimesEntered(timesEntered + 1); - std::shared_ptr<transport::Session> session; - stdx::thread thread; - BlockingQueue<Task> tasks; - }; - using Task = Session::Task; + connect_thread.forceCloseSocket(); + hangBeforeAcceptFp->setMode(FailPoint::off); - MockSEP() = default; - explicit MockSEP(std::function<void(Session&)> onStartSession) - : _onStartSession(std::move(onStartSession)) {} + ASSERT_EQ(sepu.numOpenSessions(), 0); + connect_thread.stop(); + tla->shutdown(); +} - ~MockSEP() override { - for (auto& s : **_sessions) - s->stop(); +class TimeoutSEP : public ServiceEntryPoint { +public: + ~TimeoutSEP() override { // This should shutdown immediately, so give the maximum timeout shutdown(Milliseconds::max()); } + void endAllSessions(transport::Session::TagMask tags) override { + MONGO_UNREACHABLE; + } + + bool shutdown(Milliseconds timeout) override { + LOGV2(23039, "Joining all worker threads"); + for (auto& thread : _workerThreads) { + thread.join(); + } + return true; + } + Status start() override { return Status::OK(); } void appendStats(BSONObjBuilder*) const override {} + size_t numOpenSessions() const override { + return 0; + } + Future<DbResponse> handleRequest(OperationContext* opCtx, const Message& request) noexcept override { MONGO_UNREACHABLE; } - void startSession(std::shared_ptr<transport::Session> session) override { - LOGV2(6109510, "Accepted connection", "remote"_attr = session->remote()); - Session& newSession = *_sessions->emplace_back(new Session{std::move(session)}); - if (_onStartSession) - _onStartSession(newSession); - LOGV2(6109511, "started session"); - } + bool waitForTimeout(boost::optional<Milliseconds> timeout = boost::none) { + stdx::unique_lock<Latch> lk(_mutex); + bool ret = true; + if (timeout) { + ret = _cond.wait_for(lk, timeout->toSystemDuration(), [this] { return _finished; }); + } else { + _cond.wait(lk, [this] { return _finished; }); + } - void endAllSessions(transport::Session::TagMask tags) override { - LOGV2(6109512, "end all sessions"); - _sessions->clear(); + _finished = false; + return ret; } - bool shutdown(Milliseconds timeout) override { - LOGV2(6109513, "Joining all worker threads"); - std::exchange(**_sessions, {}); - return true; - } - - size_t numOpenSessions() const override { - return _sessions->size(); +protected: + void notifyComplete() { + stdx::unique_lock<Latch> lk(_mutex); + _finished = true; + _cond.notify_one(); } - void mockOnStartSession(std::function<void(Session&)> cb) { - _onStartSession = std::move(cb); + template <typename FunT> + void startWorkerThread(FunT&& fun) { + _workerThreads.emplace_back(std::forward<FunT>(fun)); } private: - std::function<void(Session&)> _onStartSession; - synchronized_value<std::vector<std::unique_ptr<Session>>> _sessions; -}; - -std::unique_ptr<transport::TransportLayerASIO> makeAndStartTL(ServiceEntryPoint* sep) { - auto options = [] { - ServerGlobalParams params; - params.noUnixSocket = true; - transport::TransportLayerASIO::Options opts(¶ms); - // TODO SERVER-30212 should clean this up and assign a port from the supplied port range - // provided by resmoke. - opts.port = 0; - return opts; - }(); - auto tla = std::make_unique<transport::TransportLayerASIO>(options, sep); - ASSERT_OK(tla->setup()); - ASSERT_OK(tla->start()); - return tla; -} + Mutex _mutex = MONGO_MAKE_LATCH("TimeoutSEP::_mutex"); -TEST(TransportLayerASIO, ListenerPortZeroTreatedAsEphemeral) { - Notification<bool> connected; - MockSEP sep; - sep.mockOnStartSession([&](auto&&) { connected.set(true); }); - auto tla = makeAndStartTL(&sep); - - int port = tla->listenerPort(); - ASSERT_GT(port, 0); - LOGV2(6109514, "TransportLayerASIO listening", "port"_attr = port); + stdx::condition_variable _cond; + bool _finished = false; - ConnectionThread connectThread(port); - connected.get(); + std::vector<stdx::thread> _workerThreads; +}; - ASSERT_EQ(sep.numOpenSessions(), 1); - connectThread.stop(); - tla->shutdown(); -} +class TimeoutSyncSEP : public TimeoutSEP { +public: + enum Mode { kShouldTimeout, kNoTimeout }; + TimeoutSyncSEP(Mode mode) : _mode(mode) {} + + void startSession(transport::SessionHandle session) override { + LOGV2(23040, "Accepted connection", "remote"_attr = session->remote()); + startWorkerThread([this, session = std::move(session)]() mutable { + LOGV2(23041, "waiting for message"); + session->setTimeout(Milliseconds{500}); + auto status = session->sourceMessage().getStatus(); + if (_mode == kShouldTimeout) { + ASSERT_EQ(status, ErrorCodes::NetworkTimeout); + LOGV2(23042, "message timed out"); + } else { + ASSERT_OK(status); + LOGV2(23043, "message received okay"); + } -TEST(TransportLayerASIO, TCPResetAfterConnectionIsSilentlySwallowed) { - class ResettingConnectionThread : public ConnectionThread { - using ConnectionThread::ConnectionThread; - void onConnect() override { - // Linger timeout = 0 causes a RST packet on close. - struct linger sl = {1, 0}; - ASSERT(!setsockopt(socket().rawFD(), - SOL_SOCKET, - SO_LINGER, - reinterpret_cast<SetsockoptPtr>(&sl), - sizeof(sl))); - } - }; + session.reset(); + notifyComplete(); + }); + } - MockSEP sep; - auto tla = makeAndStartTL(&sep); +private: + Mode _mode; +}; - auto& fp = transport::transportLayerASIOhangBeforeAccept; - auto timesEntered = fp.setMode(FailPoint::alwaysOn); +class TimeoutConnector { +public: + TimeoutConnector(int port, bool sendRequest) + : _ctx(), _sock(_ctx), _endpoint(asio::ip::address_v4::loopback(), port) { + std::error_code ec; + _sock.connect(_endpoint, ec); + ASSERT_EQ(ec, std::error_code()); - LOGV2(6109515, "connecting"); + if (sendRequest) { + sendMessage(); + } + } - ResettingConnectionThread connectThread(tla->listenerPort()); - fp.waitForTimesEntered(timesEntered + 1); + void sendMessage() { + OpMsgBuilder builder; + builder.setBody(BSON("ping" << 1)); + Message msg = builder.finish(); + msg.header().setResponseToMsgId(0); + msg.header().setId(0); + OpMsg::appendChecksum(&msg); - LOGV2(6109516, "closing"); - connectThread.close(); - fp.setMode(FailPoint::off); + std::error_code ec; + asio::write(_sock, asio::buffer(msg.buf(), msg.size()), ec); + ASSERT_FALSE(ec); + } - LOGV2(6109517, "asserting"); - ASSERT_EQ(sep.numOpenSessions(), 0); - LOGV2(6109518, "past assert"); - connectThread.stop(); - tla->shutdown(); -} +private: + asio::io_context _ctx; + asio::ip::tcp::socket _sock; + asio::ip::tcp::endpoint _endpoint; +}; /* check that timeouts actually time out */ TEST(TransportLayerASIO, SourceSyncTimeoutTimesOut) { - Notification<StatusWith<Message>> received; - MockSEP sep; - sep.mockOnStartSession([&](MockSEP::Session& session) { - LOGV2(6109519, "setting timeout"); - session.session->setTimeout(Milliseconds{500}); - session.start(); - LOGV2(6109520, "waiting for message"); - session.schedule([&](MockSEP::Session& session) { - received.set(session.session->sourceMessage()); - LOGV2(6109521, "message receive op resolved"); - }); - }); + TimeoutSyncSEP sep(TimeoutSyncSEP::kShouldTimeout); auto tla = makeAndStartTL(&sep); - SyncClient conn(tla->listenerPort()); - LOGV2(6109522, "scheduled"); + TimeoutConnector connector(tla->listenerPort(), false); - ASSERT_EQ(received.get().getStatus(), ErrorCodes::NetworkTimeout); - LOGV2(6109523, "received something"); + sep.waitForTimeout(); tla->shutdown(); } /* check that timeouts don't time out unless there's an actual timeout */ TEST(TransportLayerASIO, SourceSyncTimeoutSucceeds) { - MockSEP sep; - Notification<StatusWith<Message>> received; - sep.mockOnStartSession([&](MockSEP::Session& s) { - s.session->setTimeout(Milliseconds{500}); - s.start(); - s.schedule([&](auto&) { received.set(s.session->sourceMessage()); }); - }); + TimeoutSyncSEP sep(TimeoutSyncSEP::kNoTimeout); auto tla = makeAndStartTL(&sep); - SyncClient conn(tla->listenerPort()); - ping(conn); // This time we send a message - ASSERT_OK(received.get().getStatus()); - LOGV2(6109524, "received something"); + TimeoutConnector connector(tla->listenerPort(), true); + + sep.waitForTimeout(); tla->shutdown(); } -/** Switching from timeouts to no timeouts must reset the timeout to unlimited. */ +/* check that switching from timeouts to no timeouts correctly resets the timeout to unlimited */ +class TimeoutSwitchModesSEP : public TimeoutSEP { +public: + void startSession(transport::SessionHandle session) override { + LOGV2(23044, "Accepted connection", "remote"_attr = session->remote()); + startWorkerThread([this, session = std::move(session)]() mutable { + LOGV2(23045, "waiting for message"); + auto sourceMessage = [&] { return session->sourceMessage().getStatus(); }; + + // the first message we source should time out. + session->setTimeout(Milliseconds{500}); + ASSERT_EQ(sourceMessage(), ErrorCodes::NetworkTimeout); + notifyComplete(); + + LOGV2(23046, "timed out successfully"); + + // get the session back in a known state with the timeout still in place + ASSERT_OK(sourceMessage()); + notifyComplete(); + + LOGV2(23047, "waiting for message without a timeout"); + + // this should block and timeout the waitForComplete mutex, and the session should wait + // for a while to make sure this isn't timing out and then send a message to unblock + // the this call to recv + session->setTimeout(boost::none); + ASSERT_OK(sourceMessage()); + + session.reset(); + notifyComplete(); + LOGV2(23048, "ending test"); + }); + } +}; + TEST(TransportLayerASIO, SwitchTimeoutModes) { - MockSEP sep; - Notification<MockSEP::Session*> mockSessionCreated; - sep.mockOnStartSession([&](MockSEP::Session& s) { - s.start(); - mockSessionCreated.set(&s); - }); + TimeoutSwitchModesSEP sep; auto tla = makeAndStartTL(&sep); - SyncClient conn(tla->listenerPort()); + TimeoutConnector connector(tla->listenerPort(), false); - auto& session = *mockSessionCreated.get(); + ASSERT_TRUE(sep.waitForTimeout()); + + connector.sendMessage(); + ASSERT_TRUE(sep.waitForTimeout()); + + ASSERT_FALSE(sep.waitForTimeout(Milliseconds{1000})); + connector.sendMessage(); + ASSERT_TRUE(sep.waitForTimeout()); - { - LOGV2(6109525, "The first message we source should time out"); - Notification<StatusWith<Message>> done; - session.schedule([&](const auto&) { - session.session->setTimeout(Milliseconds{500}); - done.set(session.session->sourceMessage()); - }); - ASSERT_EQ(done.get().getStatus(), ErrorCodes::NetworkTimeout); - LOGV2(6109526, "timed out successfully"); - } - { - LOGV2(6109527, "Verify a message can be successfully received"); - Notification<StatusWith<Message>> done; - session.schedule([&](const auto&) { done.set(session.session->sourceMessage()); }); - ping(conn); - ASSERT_OK(done.get().getStatus()); - } - { - LOGV2(6109528, "Clear the timeout and verify reception of a late message."); - Notification<StatusWith<Message>> done; - session.schedule([&](const auto&) { - LOGV2(6109529, "waiting for message without a timeout"); - session.session->setTimeout({}); - done.set(session.session->sourceMessage()); - }); - sleepFor(Seconds{1}); - ping(conn); - ASSERT_OK(done.get().getStatus()); - } tla->shutdown(); } diff --git a/src/mongo/util/net/sock.cpp b/src/mongo/util/net/sock.cpp index 3a42470524a..35776fe4c73 100644 --- a/src/mongo/util/net/sock.cpp +++ b/src/mongo/util/net/sock.cpp @@ -299,13 +299,13 @@ SSLPeerInfo Socket::doSSLHandshake(const char* firstBytes, int len) { #endif -bool Socket::connect(const SockAddr& remote) { +bool Socket::connect(SockAddr& remote) { const Milliseconds connectTimeoutMillis(static_cast<int64_t>( _timeout > 0 ? std::min(kMaxConnectTimeoutMS, (_timeout * 1000)) : kMaxConnectTimeoutMS)); return connect(remote, connectTimeoutMillis); } -bool Socket::connect(const SockAddr& remote, Milliseconds connectTimeoutMillis) { +bool Socket::connect(SockAddr& remote, Milliseconds connectTimeoutMillis) { _remote = remote; _fd = ::socket(remote.getType(), SOCK_STREAM, 0); diff --git a/src/mongo/util/net/sock.h b/src/mongo/util/net/sock.h index 7257e5e16f6..2548a46804c 100644 --- a/src/mongo/util/net/sock.h +++ b/src/mongo/util/net/sock.h @@ -109,12 +109,12 @@ public: * an error, or due to a timeout on connection, or due to the system socket deciding the * socket is invalid. */ - bool connect(const SockAddr& remote, Milliseconds connectTimeoutMillis); + bool connect(SockAddr& remote, Milliseconds connectTimeoutMillis); /** * Connect using a default connect timeout of min(_timeout * 1000, kMaxConnectTimeoutMS) */ - bool connect(const SockAddr& remote); + bool connect(SockAddr& remote); void close(); void send(const char* data, int len, const char* context); |