diff options
author | Jonathan Reams <jbreams@mongodb.com> | 2018-06-07 12:11:56 -0400 |
---|---|---|
committer | Jonathan Reams <jbreams@mongodb.com> | 2018-06-11 11:22:07 -0400 |
commit | 0fd199005495c78cbdcfc28d189646a3aecb5c55 (patch) | |
tree | f2fbfa7343dbdeb8b3dd2097138af61b2a7b03ef | |
parent | 3a7e35704243c716222dfe5d4241bc73c4168484 (diff) | |
download | mongo-0fd199005495c78cbdcfc28d189646a3aecb5c55.tar.gz |
SERVER-35494 Add timeouts to TransportLayer::asyncConnect
-rw-r--r-- | src/mongo/client/async_client.cpp | 5 | ||||
-rw-r--r-- | src/mongo/client/async_client.h | 3 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_tl.cpp | 6 | ||||
-rw-r--r-- | src/mongo/transport/session_asio.h | 14 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer.h | 3 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio.cpp | 78 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio.h | 7 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio_integration_test.cpp | 26 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_manager.cpp | 5 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_manager.h | 3 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_mock.cpp | 3 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_mock.h | 3 |
12 files changed, 129 insertions, 27 deletions
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp index 0d8e4ec6a3a..dfa700eb4e6 100644 --- a/src/mongo/client/async_client.cpp +++ b/src/mongo/client/async_client.cpp @@ -56,9 +56,10 @@ namespace mongo { Future<AsyncDBClient::Handle> AsyncDBClient::connect(const HostAndPort& peer, transport::ConnectSSLMode sslMode, ServiceContext* const context, - transport::ReactorHandle reactor) { + transport::ReactorHandle reactor, + Milliseconds timeout) { auto tl = context->getTransportLayer(); - return tl->asyncConnect(peer, sslMode, std::move(reactor)) + return tl->asyncConnect(peer, sslMode, std::move(reactor), timeout) .then([peer, context](transport::SessionHandle session) { return std::make_shared<AsyncDBClient>(peer, std::move(session), context); }); diff --git a/src/mongo/client/async_client.h b/src/mongo/client/async_client.h index ec34dca021e..c915a7107f9 100644 --- a/src/mongo/client/async_client.h +++ b/src/mongo/client/async_client.h @@ -55,7 +55,8 @@ public: static Future<Handle> connect(const HostAndPort& peer, transport::ConnectSSLMode sslMode, ServiceContext* const context, - transport::ReactorHandle reactor); + transport::ReactorHandle reactor, + Milliseconds timeout); Future<executor::RemoteCommandResponse> runCommandRequest( executor::RemoteCommandRequest request, const transport::BatonHandle& baton = nullptr); diff --git a/src/mongo/executor/connection_pool_tl.cpp b/src/mongo/executor/connection_pool_tl.cpp index 63fff489e6b..bb41a687a6e 100644 --- a/src/mongo/executor/connection_pool_tl.cpp +++ b/src/mongo/executor/connection_pool_tl.cpp @@ -124,9 +124,13 @@ void TLConnection::setup(Milliseconds timeout, SetupCallback cb) { << timeout; handler->promise.setError( Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, std::move(reason))); + + if (_client) { + _client->cancel(); + } }); - AsyncDBClient::connect(_peer, transport::kGlobalSSLMode, _serviceContext, _reactor) + AsyncDBClient::connect(_peer, transport::kGlobalSSLMode, _serviceContext, _reactor, timeout) .onError([](StatusWith<AsyncDBClient::Handle> swc) -> StatusWith<AsyncDBClient::Handle> { return Status(ErrorCodes::HostUnreachable, swc.getStatus().reason()); }) diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h index f0766681dbe..f9fac4276a0 100644 --- a/src/mongo/transport/session_asio.h +++ b/src/mongo/transport/session_asio.h @@ -210,7 +210,10 @@ protected: friend TransportLayerASIO::BatonASIO; #ifdef MONGO_CONFIG_SSL - Future<void> handshakeSSLForEgress(const HostAndPort& target) { + // The unique_lock here is held by TransportLayerASIO to synchronize with the asyncConnect + // timeout callback. It will be unlocked before the SSL actually handshake begins. + Future<void> handshakeSSLForEgressWithLock(stdx::unique_lock<stdx::mutex> lk, + const HostAndPort& target) { if (!_tl->_egressSSLContext) { return Future<void>::makeReady(Status(ErrorCodes::SSLHandshakeFailed, "SSL requested but SSL support is disabled")); @@ -218,6 +221,8 @@ protected: _sslSocket.emplace( std::move(_socket), *_tl->_egressSSLContext, removeFQDNRoot(target.host())); + lk.unlock(); + auto doHandshake = [&] { if (_blockingMode == Sync) { std::error_code ec; @@ -239,6 +244,13 @@ protected: } }); } + + // For synchronous connections where we don't have an async timer, just take a dummy lock and + // pass it to the WithLock version of handshakeSSLForEgress + Future<void> handshakeSSLForEgress(const HostAndPort& target) { + stdx::mutex mutex; + return handshakeSSLForEgressWithLock(stdx::unique_lock<stdx::mutex>(mutex), target); + } #endif void ensureSync() { diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h index c0138e18bdc..3080eceabac 100644 --- a/src/mongo/transport/transport_layer.h +++ b/src/mongo/transport/transport_layer.h @@ -79,7 +79,8 @@ public: virtual Future<SessionHandle> asyncConnect(HostAndPort peer, ConnectSSLMode sslMode, - const ReactorHandle& reactor) = 0; + const ReactorHandle& reactor, + Milliseconds timeout) = 0; /** * Start the TransportLayer. After this point, the TransportLayer will begin accepting active diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index df997165621..57d760b57ff 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -62,6 +62,8 @@ namespace mongo { namespace transport { +MONGO_FAIL_POINT_DEFINE(transportLayerASIOasyncConnectTimesOut); + class ASIOReactorTimer final : public ReactorTimer { public: explicit ASIOReactorTimer(asio::io_context& ctx) @@ -450,7 +452,7 @@ private: Status makeConnectError(Status status, const HostAndPort& peer, const WrappedEndpoint& endpoint) { std::string errmsg; - if (peer.toString() != endpoint.toString()) { + if (peer.toString() != endpoint.toString() && !endpoint.toString().empty()) { errmsg = str::stream() << "Error connecting to " << peer << " (" << endpoint.toString() << ")"; } else { @@ -551,16 +553,19 @@ StatusWith<TransportLayerASIO::ASIOSessionHandle> TransportLayerASIO::_doSyncCon Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer, ConnectSSLMode sslMode, - const ReactorHandle& reactor) { + const ReactorHandle& reactor, + Milliseconds timeout) { + struct AsyncConnectState { AsyncConnectState(HostAndPort peer, asio::io_context& context) - : socket(context), resolver(context), peer(std::move(peer)) {} + : socket(context), timeoutTimer(context), resolver(context), peer(std::move(peer)) {} - Future<SessionHandle> finish() { - return SessionHandle(std::move(session)); - } + AtomicBool done{false}; + Promise<SessionHandle> promise; + stdx::mutex mutex; GenericSocket socket; + ASIOReactorTimer timeoutTimer; WrappedResolver resolver; WrappedEndpoint resolvedEndpoint; const HostAndPort peer; @@ -569,22 +574,50 @@ Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer, auto reactorImpl = checked_cast<ASIOReactor*>(reactor.get()); auto connector = std::make_shared<AsyncConnectState>(std::move(peer), *reactorImpl); + Future<SessionHandle> mergedFuture = connector->promise.getFuture(); if (connector->peer.host().empty()) { return Status{ErrorCodes::HostNotFound, "Hostname or IP address to connect to is empty"}; } - return connector->resolver.asyncResolve(connector->peer, _listenerOptions.enableIPv6) + if (timeout > Milliseconds{0} && timeout < Milliseconds::max()) { + connector->timeoutTimer.waitFor(timeout).getAsync([connector](Status status) { + if (status == ErrorCodes::CallbackCanceled || connector->done.swap(true)) { + return; + } + + connector->promise.setError( + makeConnectError({ErrorCodes::NetworkTimeout, "Connecting timed out"}, + connector->peer, + connector->resolvedEndpoint)); + + std::error_code ec; + stdx::lock_guard<stdx::mutex> lk(connector->mutex); + connector->resolver.cancel(); + if (connector->session) { + connector->session->end(); + } else { + connector->socket.cancel(ec); + } + }); + } + + connector->resolver.asyncResolve(connector->peer, _listenerOptions.enableIPv6) .then([connector](WrappedResolver::EndpointVector results) { + stdx::unique_lock<stdx::mutex> lk(connector->mutex); connector->resolvedEndpoint = results.front(); connector->socket.open(connector->resolvedEndpoint->protocol()); connector->socket.non_blocking(true); + lk.unlock(); + return connector->socket.async_connect(*connector->resolvedEndpoint, UseFuture{}); }) - .then([this, connector, sslMode]() { + .then([this, connector, sslMode]() -> Future<void> { + stdx::unique_lock<stdx::mutex> lk(connector->mutex); connector->session = std::make_shared<ASIOSession>(this, std::move(connector->socket), false); connector->session->ensureAsync(); + #ifndef MONGO_CONFIG_SSL if (sslMode == kEnableSSL) { uasserted(ErrorCodes::InvalidSSLConfiguration, "SSL requested but not supported"); @@ -594,16 +627,35 @@ Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer, if (sslMode == kEnableSSL || (sslMode == kGlobalSSLMode && ((globalSSLMode == SSLParams::SSLMode_preferSSL) || (globalSSLMode == SSLParams::SSLMode_requireSSL)))) { - return connector->session->handshakeSSLForEgress(connector->peer).then([connector] { - return connector->finish(); - }); + return connector->session + ->handshakeSSLForEgressWithLock(std::move(lk), connector->peer) + .then([connector] { return Status::OK(); }); } #endif - return connector->finish(); + return Status::OK(); }) - .onError([connector](Status status) -> Future<SessionHandle> { + .onError([connector](Status status) -> Future<void> { return makeConnectError(status, connector->peer, connector->resolvedEndpoint); + }) + .getAsync([connector](Status connectResult) { + if (MONGO_FAIL_POINT(transportLayerASIOasyncConnectTimesOut)) { + log() << "asyncConnectTimesOut fail point is active. simulating timeout."; + return; + } + + if (connector->done.swap(true)) { + return; + } + + connector->timeoutTimer.cancel(); + if (connectResult.isOK()) { + connector->promise.emplaceValue(std::move(connector->session)); + } else { + connector->promise.setError(connectResult); + } }); + + return mergedFuture; } Status TransportLayerASIO::setup() { diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h index b554a56b5ed..e3950295357 100644 --- a/src/mongo/transport/transport_layer_asio.h +++ b/src/mongo/transport/transport_layer_asio.h @@ -70,6 +70,10 @@ namespace transport { // This fail point simulates reads and writes that always return 1 byte and fail with EAGAIN MONGO_FAIL_POINT_DECLARE(transportLayerASIOshortOpportunisticReadWrite); +// This fail point will cause an asyncConnect to timeout after it's successfully connected +// to the remote peer +MONGO_FAIL_POINT_DECLARE(transportLayerASIOasyncConnectTimesOut); + /** * A TransportLayer implementation based on ASIO networking primitives. */ @@ -115,7 +119,8 @@ public: Future<SessionHandle> asyncConnect(HostAndPort peer, ConnectSSLMode sslMode, - const ReactorHandle& reactor) final; + const ReactorHandle& reactor, + Milliseconds timeout) final; Status setup() final; diff --git a/src/mongo/transport/transport_layer_asio_integration_test.cpp b/src/mongo/transport/transport_layer_asio_integration_test.cpp index cebbdb64650..0e595449d54 100644 --- a/src/mongo/transport/transport_layer_asio_integration_test.cpp +++ b/src/mongo/transport/transport_layer_asio_integration_test.cpp @@ -100,7 +100,7 @@ TEST(TransportLayerASIO, ShortReadsAndWritesWork) { auto server = connectionString.getServers().front(); auto sc = getGlobalServiceContext(); - auto reactor = sc->getTransportLayer()->getReactor(transport::TransportLayer::kEgress); + auto reactor = sc->getTransportLayer()->getReactor(transport::TransportLayer::kNewReactor); stdx::thread thread([&] { reactor->run(); }); const auto threadGuard = MakeGuard([&] { @@ -109,7 +109,8 @@ TEST(TransportLayerASIO, ShortReadsAndWritesWork) { }); AsyncDBClient::Handle handle = - AsyncDBClient::connect(server, transport::kGlobalSSLMode, sc, reactor).get(); + AsyncDBClient::connect(server, transport::kGlobalSSLMode, sc, reactor, Milliseconds::max()) + .get(); handle->initWireVersion(__FILE__, nullptr).get(); @@ -135,5 +136,26 @@ TEST(TransportLayerASIO, ShortReadsAndWritesWork) { } } +TEST(TransportLayerASIO, asyncConnectTimeoutCleansUpSocket) { + auto connectionString = unittest::getFixtureConnectionString(); + auto server = connectionString.getServers().front(); + + auto sc = getGlobalServiceContext(); + auto reactor = sc->getTransportLayer()->getReactor(transport::TransportLayer::kNewReactor); + + stdx::thread thread([&] { reactor->run(); }); + + const auto threadGuard = MakeGuard([&] { + reactor->stop(); + thread.join(); + }); + + FailPointEnableBlock fp("transportLayerASIOasyncConnectTimesOut"); + auto client = + AsyncDBClient::connect(server, transport::kGlobalSSLMode, sc, reactor, Milliseconds{500}) + .getNoThrow(); + ASSERT_EQ(client.getStatus(), ErrorCodes::NetworkTimeout); +} + } // namespace } // namespace mongo diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp index f92538c773a..da3a0be9717 100644 --- a/src/mongo/transport/transport_layer_manager.cpp +++ b/src/mongo/transport/transport_layer_manager.cpp @@ -67,8 +67,9 @@ StatusWith<SessionHandle> TransportLayerManager::connect(HostAndPort peer, Future<SessionHandle> TransportLayerManager::asyncConnect(HostAndPort peer, ConnectSSLMode sslMode, - const ReactorHandle& reactor) { - return _tls.front()->asyncConnect(peer, sslMode, reactor); + const ReactorHandle& reactor, + Milliseconds timeout) { + return _tls.front()->asyncConnect(peer, sslMode, reactor, timeout); } ReactorHandle TransportLayerManager::getReactor(WhichReactor which) { diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h index 69bfa6ff8c2..47927d53157 100644 --- a/src/mongo/transport/transport_layer_manager.h +++ b/src/mongo/transport/transport_layer_manager.h @@ -61,7 +61,8 @@ public: Milliseconds timeout) override; Future<SessionHandle> asyncConnect(HostAndPort peer, ConnectSSLMode sslMode, - const ReactorHandle& reactor) override; + const ReactorHandle& reactor, + Milliseconds timeout) override; Status start() override; void shutdown() override; diff --git a/src/mongo/transport/transport_layer_mock.cpp b/src/mongo/transport/transport_layer_mock.cpp index 50cf529d545..38c8f808d9b 100644 --- a/src/mongo/transport/transport_layer_mock.cpp +++ b/src/mongo/transport/transport_layer_mock.cpp @@ -69,7 +69,8 @@ StatusWith<SessionHandle> TransportLayerMock::connect(HostAndPort peer, Future<SessionHandle> TransportLayerMock::asyncConnect(HostAndPort peer, ConnectSSLMode sslMode, - const ReactorHandle& reactor) { + const ReactorHandle& reactor, + Milliseconds timeout) { MONGO_UNREACHABLE; } diff --git a/src/mongo/transport/transport_layer_mock.h b/src/mongo/transport/transport_layer_mock.h index 006269242eb..d0be4eece50 100644 --- a/src/mongo/transport/transport_layer_mock.h +++ b/src/mongo/transport/transport_layer_mock.h @@ -57,7 +57,8 @@ public: Milliseconds timeout) override; Future<SessionHandle> asyncConnect(HostAndPort peer, ConnectSSLMode sslMode, - const ReactorHandle& reactor) override; + const ReactorHandle& reactor, + Milliseconds timeout) override; Status setup() override; Status start() override; |