summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJonathan Reams <jbreams@mongodb.com>2018-06-07 12:11:56 -0400
committerJonathan Reams <jbreams@mongodb.com>2018-06-11 11:22:07 -0400
commit0fd199005495c78cbdcfc28d189646a3aecb5c55 (patch)
treef2fbfa7343dbdeb8b3dd2097138af61b2a7b03ef
parent3a7e35704243c716222dfe5d4241bc73c4168484 (diff)
downloadmongo-0fd199005495c78cbdcfc28d189646a3aecb5c55.tar.gz
SERVER-35494 Add timeouts to TransportLayer::asyncConnect
-rw-r--r--src/mongo/client/async_client.cpp5
-rw-r--r--src/mongo/client/async_client.h3
-rw-r--r--src/mongo/executor/connection_pool_tl.cpp6
-rw-r--r--src/mongo/transport/session_asio.h14
-rw-r--r--src/mongo/transport/transport_layer.h3
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp78
-rw-r--r--src/mongo/transport/transport_layer_asio.h7
-rw-r--r--src/mongo/transport/transport_layer_asio_integration_test.cpp26
-rw-r--r--src/mongo/transport/transport_layer_manager.cpp5
-rw-r--r--src/mongo/transport/transport_layer_manager.h3
-rw-r--r--src/mongo/transport/transport_layer_mock.cpp3
-rw-r--r--src/mongo/transport/transport_layer_mock.h3
12 files changed, 129 insertions, 27 deletions
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp
index 0d8e4ec6a3a..dfa700eb4e6 100644
--- a/src/mongo/client/async_client.cpp
+++ b/src/mongo/client/async_client.cpp
@@ -56,9 +56,10 @@ namespace mongo {
Future<AsyncDBClient::Handle> AsyncDBClient::connect(const HostAndPort& peer,
transport::ConnectSSLMode sslMode,
ServiceContext* const context,
- transport::ReactorHandle reactor) {
+ transport::ReactorHandle reactor,
+ Milliseconds timeout) {
auto tl = context->getTransportLayer();
- return tl->asyncConnect(peer, sslMode, std::move(reactor))
+ return tl->asyncConnect(peer, sslMode, std::move(reactor), timeout)
.then([peer, context](transport::SessionHandle session) {
return std::make_shared<AsyncDBClient>(peer, std::move(session), context);
});
diff --git a/src/mongo/client/async_client.h b/src/mongo/client/async_client.h
index ec34dca021e..c915a7107f9 100644
--- a/src/mongo/client/async_client.h
+++ b/src/mongo/client/async_client.h
@@ -55,7 +55,8 @@ public:
static Future<Handle> connect(const HostAndPort& peer,
transport::ConnectSSLMode sslMode,
ServiceContext* const context,
- transport::ReactorHandle reactor);
+ transport::ReactorHandle reactor,
+ Milliseconds timeout);
Future<executor::RemoteCommandResponse> runCommandRequest(
executor::RemoteCommandRequest request, const transport::BatonHandle& baton = nullptr);
diff --git a/src/mongo/executor/connection_pool_tl.cpp b/src/mongo/executor/connection_pool_tl.cpp
index 63fff489e6b..bb41a687a6e 100644
--- a/src/mongo/executor/connection_pool_tl.cpp
+++ b/src/mongo/executor/connection_pool_tl.cpp
@@ -124,9 +124,13 @@ void TLConnection::setup(Milliseconds timeout, SetupCallback cb) {
<< timeout;
handler->promise.setError(
Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, std::move(reason)));
+
+ if (_client) {
+ _client->cancel();
+ }
});
- AsyncDBClient::connect(_peer, transport::kGlobalSSLMode, _serviceContext, _reactor)
+ AsyncDBClient::connect(_peer, transport::kGlobalSSLMode, _serviceContext, _reactor, timeout)
.onError([](StatusWith<AsyncDBClient::Handle> swc) -> StatusWith<AsyncDBClient::Handle> {
return Status(ErrorCodes::HostUnreachable, swc.getStatus().reason());
})
diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h
index f0766681dbe..f9fac4276a0 100644
--- a/src/mongo/transport/session_asio.h
+++ b/src/mongo/transport/session_asio.h
@@ -210,7 +210,10 @@ protected:
friend TransportLayerASIO::BatonASIO;
#ifdef MONGO_CONFIG_SSL
- Future<void> handshakeSSLForEgress(const HostAndPort& target) {
+ // The unique_lock here is held by TransportLayerASIO to synchronize with the asyncConnect
+ // timeout callback. It will be unlocked before the SSL actually handshake begins.
+ Future<void> handshakeSSLForEgressWithLock(stdx::unique_lock<stdx::mutex> lk,
+ const HostAndPort& target) {
if (!_tl->_egressSSLContext) {
return Future<void>::makeReady(Status(ErrorCodes::SSLHandshakeFailed,
"SSL requested but SSL support is disabled"));
@@ -218,6 +221,8 @@ protected:
_sslSocket.emplace(
std::move(_socket), *_tl->_egressSSLContext, removeFQDNRoot(target.host()));
+ lk.unlock();
+
auto doHandshake = [&] {
if (_blockingMode == Sync) {
std::error_code ec;
@@ -239,6 +244,13 @@ protected:
}
});
}
+
+ // For synchronous connections where we don't have an async timer, just take a dummy lock and
+ // pass it to the WithLock version of handshakeSSLForEgress
+ Future<void> handshakeSSLForEgress(const HostAndPort& target) {
+ stdx::mutex mutex;
+ return handshakeSSLForEgressWithLock(stdx::unique_lock<stdx::mutex>(mutex), target);
+ }
#endif
void ensureSync() {
diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h
index c0138e18bdc..3080eceabac 100644
--- a/src/mongo/transport/transport_layer.h
+++ b/src/mongo/transport/transport_layer.h
@@ -79,7 +79,8 @@ public:
virtual Future<SessionHandle> asyncConnect(HostAndPort peer,
ConnectSSLMode sslMode,
- const ReactorHandle& reactor) = 0;
+ const ReactorHandle& reactor,
+ Milliseconds timeout) = 0;
/**
* Start the TransportLayer. After this point, the TransportLayer will begin accepting active
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index df997165621..57d760b57ff 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -62,6 +62,8 @@
namespace mongo {
namespace transport {
+MONGO_FAIL_POINT_DEFINE(transportLayerASIOasyncConnectTimesOut);
+
class ASIOReactorTimer final : public ReactorTimer {
public:
explicit ASIOReactorTimer(asio::io_context& ctx)
@@ -450,7 +452,7 @@ private:
Status makeConnectError(Status status, const HostAndPort& peer, const WrappedEndpoint& endpoint) {
std::string errmsg;
- if (peer.toString() != endpoint.toString()) {
+ if (peer.toString() != endpoint.toString() && !endpoint.toString().empty()) {
errmsg = str::stream() << "Error connecting to " << peer << " (" << endpoint.toString()
<< ")";
} else {
@@ -551,16 +553,19 @@ StatusWith<TransportLayerASIO::ASIOSessionHandle> TransportLayerASIO::_doSyncCon
Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer,
ConnectSSLMode sslMode,
- const ReactorHandle& reactor) {
+ const ReactorHandle& reactor,
+ Milliseconds timeout) {
+
struct AsyncConnectState {
AsyncConnectState(HostAndPort peer, asio::io_context& context)
- : socket(context), resolver(context), peer(std::move(peer)) {}
+ : socket(context), timeoutTimer(context), resolver(context), peer(std::move(peer)) {}
- Future<SessionHandle> finish() {
- return SessionHandle(std::move(session));
- }
+ AtomicBool done{false};
+ Promise<SessionHandle> promise;
+ stdx::mutex mutex;
GenericSocket socket;
+ ASIOReactorTimer timeoutTimer;
WrappedResolver resolver;
WrappedEndpoint resolvedEndpoint;
const HostAndPort peer;
@@ -569,22 +574,50 @@ Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer,
auto reactorImpl = checked_cast<ASIOReactor*>(reactor.get());
auto connector = std::make_shared<AsyncConnectState>(std::move(peer), *reactorImpl);
+ Future<SessionHandle> mergedFuture = connector->promise.getFuture();
if (connector->peer.host().empty()) {
return Status{ErrorCodes::HostNotFound, "Hostname or IP address to connect to is empty"};
}
- return connector->resolver.asyncResolve(connector->peer, _listenerOptions.enableIPv6)
+ if (timeout > Milliseconds{0} && timeout < Milliseconds::max()) {
+ connector->timeoutTimer.waitFor(timeout).getAsync([connector](Status status) {
+ if (status == ErrorCodes::CallbackCanceled || connector->done.swap(true)) {
+ return;
+ }
+
+ connector->promise.setError(
+ makeConnectError({ErrorCodes::NetworkTimeout, "Connecting timed out"},
+ connector->peer,
+ connector->resolvedEndpoint));
+
+ std::error_code ec;
+ stdx::lock_guard<stdx::mutex> lk(connector->mutex);
+ connector->resolver.cancel();
+ if (connector->session) {
+ connector->session->end();
+ } else {
+ connector->socket.cancel(ec);
+ }
+ });
+ }
+
+ connector->resolver.asyncResolve(connector->peer, _listenerOptions.enableIPv6)
.then([connector](WrappedResolver::EndpointVector results) {
+ stdx::unique_lock<stdx::mutex> lk(connector->mutex);
connector->resolvedEndpoint = results.front();
connector->socket.open(connector->resolvedEndpoint->protocol());
connector->socket.non_blocking(true);
+ lk.unlock();
+
return connector->socket.async_connect(*connector->resolvedEndpoint, UseFuture{});
})
- .then([this, connector, sslMode]() {
+ .then([this, connector, sslMode]() -> Future<void> {
+ stdx::unique_lock<stdx::mutex> lk(connector->mutex);
connector->session =
std::make_shared<ASIOSession>(this, std::move(connector->socket), false);
connector->session->ensureAsync();
+
#ifndef MONGO_CONFIG_SSL
if (sslMode == kEnableSSL) {
uasserted(ErrorCodes::InvalidSSLConfiguration, "SSL requested but not supported");
@@ -594,16 +627,35 @@ Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer,
if (sslMode == kEnableSSL ||
(sslMode == kGlobalSSLMode && ((globalSSLMode == SSLParams::SSLMode_preferSSL) ||
(globalSSLMode == SSLParams::SSLMode_requireSSL)))) {
- return connector->session->handshakeSSLForEgress(connector->peer).then([connector] {
- return connector->finish();
- });
+ return connector->session
+ ->handshakeSSLForEgressWithLock(std::move(lk), connector->peer)
+ .then([connector] { return Status::OK(); });
}
#endif
- return connector->finish();
+ return Status::OK();
})
- .onError([connector](Status status) -> Future<SessionHandle> {
+ .onError([connector](Status status) -> Future<void> {
return makeConnectError(status, connector->peer, connector->resolvedEndpoint);
+ })
+ .getAsync([connector](Status connectResult) {
+ if (MONGO_FAIL_POINT(transportLayerASIOasyncConnectTimesOut)) {
+ log() << "asyncConnectTimesOut fail point is active. simulating timeout.";
+ return;
+ }
+
+ if (connector->done.swap(true)) {
+ return;
+ }
+
+ connector->timeoutTimer.cancel();
+ if (connectResult.isOK()) {
+ connector->promise.emplaceValue(std::move(connector->session));
+ } else {
+ connector->promise.setError(connectResult);
+ }
});
+
+ return mergedFuture;
}
Status TransportLayerASIO::setup() {
diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h
index b554a56b5ed..e3950295357 100644
--- a/src/mongo/transport/transport_layer_asio.h
+++ b/src/mongo/transport/transport_layer_asio.h
@@ -70,6 +70,10 @@ namespace transport {
// This fail point simulates reads and writes that always return 1 byte and fail with EAGAIN
MONGO_FAIL_POINT_DECLARE(transportLayerASIOshortOpportunisticReadWrite);
+// This fail point will cause an asyncConnect to timeout after it's successfully connected
+// to the remote peer
+MONGO_FAIL_POINT_DECLARE(transportLayerASIOasyncConnectTimesOut);
+
/**
* A TransportLayer implementation based on ASIO networking primitives.
*/
@@ -115,7 +119,8 @@ public:
Future<SessionHandle> asyncConnect(HostAndPort peer,
ConnectSSLMode sslMode,
- const ReactorHandle& reactor) final;
+ const ReactorHandle& reactor,
+ Milliseconds timeout) final;
Status setup() final;
diff --git a/src/mongo/transport/transport_layer_asio_integration_test.cpp b/src/mongo/transport/transport_layer_asio_integration_test.cpp
index cebbdb64650..0e595449d54 100644
--- a/src/mongo/transport/transport_layer_asio_integration_test.cpp
+++ b/src/mongo/transport/transport_layer_asio_integration_test.cpp
@@ -100,7 +100,7 @@ TEST(TransportLayerASIO, ShortReadsAndWritesWork) {
auto server = connectionString.getServers().front();
auto sc = getGlobalServiceContext();
- auto reactor = sc->getTransportLayer()->getReactor(transport::TransportLayer::kEgress);
+ auto reactor = sc->getTransportLayer()->getReactor(transport::TransportLayer::kNewReactor);
stdx::thread thread([&] { reactor->run(); });
const auto threadGuard = MakeGuard([&] {
@@ -109,7 +109,8 @@ TEST(TransportLayerASIO, ShortReadsAndWritesWork) {
});
AsyncDBClient::Handle handle =
- AsyncDBClient::connect(server, transport::kGlobalSSLMode, sc, reactor).get();
+ AsyncDBClient::connect(server, transport::kGlobalSSLMode, sc, reactor, Milliseconds::max())
+ .get();
handle->initWireVersion(__FILE__, nullptr).get();
@@ -135,5 +136,26 @@ TEST(TransportLayerASIO, ShortReadsAndWritesWork) {
}
}
+TEST(TransportLayerASIO, asyncConnectTimeoutCleansUpSocket) {
+ auto connectionString = unittest::getFixtureConnectionString();
+ auto server = connectionString.getServers().front();
+
+ auto sc = getGlobalServiceContext();
+ auto reactor = sc->getTransportLayer()->getReactor(transport::TransportLayer::kNewReactor);
+
+ stdx::thread thread([&] { reactor->run(); });
+
+ const auto threadGuard = MakeGuard([&] {
+ reactor->stop();
+ thread.join();
+ });
+
+ FailPointEnableBlock fp("transportLayerASIOasyncConnectTimesOut");
+ auto client =
+ AsyncDBClient::connect(server, transport::kGlobalSSLMode, sc, reactor, Milliseconds{500})
+ .getNoThrow();
+ ASSERT_EQ(client.getStatus(), ErrorCodes::NetworkTimeout);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp
index f92538c773a..da3a0be9717 100644
--- a/src/mongo/transport/transport_layer_manager.cpp
+++ b/src/mongo/transport/transport_layer_manager.cpp
@@ -67,8 +67,9 @@ StatusWith<SessionHandle> TransportLayerManager::connect(HostAndPort peer,
Future<SessionHandle> TransportLayerManager::asyncConnect(HostAndPort peer,
ConnectSSLMode sslMode,
- const ReactorHandle& reactor) {
- return _tls.front()->asyncConnect(peer, sslMode, reactor);
+ const ReactorHandle& reactor,
+ Milliseconds timeout) {
+ return _tls.front()->asyncConnect(peer, sslMode, reactor, timeout);
}
ReactorHandle TransportLayerManager::getReactor(WhichReactor which) {
diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h
index 69bfa6ff8c2..47927d53157 100644
--- a/src/mongo/transport/transport_layer_manager.h
+++ b/src/mongo/transport/transport_layer_manager.h
@@ -61,7 +61,8 @@ public:
Milliseconds timeout) override;
Future<SessionHandle> asyncConnect(HostAndPort peer,
ConnectSSLMode sslMode,
- const ReactorHandle& reactor) override;
+ const ReactorHandle& reactor,
+ Milliseconds timeout) override;
Status start() override;
void shutdown() override;
diff --git a/src/mongo/transport/transport_layer_mock.cpp b/src/mongo/transport/transport_layer_mock.cpp
index 50cf529d545..38c8f808d9b 100644
--- a/src/mongo/transport/transport_layer_mock.cpp
+++ b/src/mongo/transport/transport_layer_mock.cpp
@@ -69,7 +69,8 @@ StatusWith<SessionHandle> TransportLayerMock::connect(HostAndPort peer,
Future<SessionHandle> TransportLayerMock::asyncConnect(HostAndPort peer,
ConnectSSLMode sslMode,
- const ReactorHandle& reactor) {
+ const ReactorHandle& reactor,
+ Milliseconds timeout) {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/transport/transport_layer_mock.h b/src/mongo/transport/transport_layer_mock.h
index 006269242eb..d0be4eece50 100644
--- a/src/mongo/transport/transport_layer_mock.h
+++ b/src/mongo/transport/transport_layer_mock.h
@@ -57,7 +57,8 @@ public:
Milliseconds timeout) override;
Future<SessionHandle> asyncConnect(HostAndPort peer,
ConnectSSLMode sslMode,
- const ReactorHandle& reactor) override;
+ const ReactorHandle& reactor,
+ Milliseconds timeout) override;
Status setup() override;
Status start() override;