summaryrefslogtreecommitdiff
path: root/src/mongo/tools
diff options
context:
space:
mode:
authorJonathan Reams <jbreams@mongodb.com>2018-06-11 16:17:35 -0400
committerJonathan Reams <jbreams@mongodb.com>2018-06-19 10:16:14 -0400
commitc11dd5e6ff3f70200d1cfd6a5e3dc6966beac3b2 (patch)
treed402b02e98995eae5bb7fd1583e09d30ab702cfe /src/mongo/tools
parent26ac71b68b48e01ee07701a4401dabd003ab4896 (diff)
downloadmongo-c11dd5e6ff3f70200d1cfd6a5e3dc6966beac3b2.tar.gz
SERVER-35550 Connect back to server from bridge in client thread
Diffstat (limited to 'src/mongo/tools')
-rw-r--r--src/mongo/tools/bridge.cpp73
1 files changed, 38 insertions, 35 deletions
diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp
index a2893fbc172..cc249bdeed9 100644
--- a/src/mongo/tools/bridge.cpp
+++ b/src/mongo/tools/bridge.cpp
@@ -158,6 +158,14 @@ public:
return _dest.get();
}
+ transport::SessionHandle& getSession() {
+ return _dest;
+ }
+
+ void setSession(transport::SessionHandle session) {
+ _dest = std::move(session);
+ }
+
const boost::optional<HostAndPort>& host() const {
return _host;
}
@@ -224,49 +232,44 @@ class ServiceEntryPointBridge final : public ServiceEntryPointImpl {
public:
explicit ServiceEntryPointBridge(ServiceContext* svcCtx) : ServiceEntryPointImpl(svcCtx) {}
- void startSession(transport::SessionHandle session) final;
DbResponse handleRequest(OperationContext* opCtx, const Message& request) final;
};
-void ServiceEntryPointBridge::startSession(transport::SessionHandle session) {
- transport::SessionHandle dest = []() -> transport::SessionHandle {
- HostAndPort destAddr{mongoBridgeGlobalParams.destUri};
- const Seconds kConnectTimeout(30);
- auto now = getGlobalServiceContext()->getFastClockSource()->now();
- const auto connectExpiration = now + kConnectTimeout;
- while (now < connectExpiration) {
- auto tl = getGlobalServiceContext()->getTransportLayer();
- auto sws = tl->connect(destAddr, transport::kGlobalSSLMode, connectExpiration - now);
- auto status = sws.getStatus();
- if (!status.isOK()) {
- warning() << "Unable to establish connection to " << destAddr << ": " << status;
- now = getGlobalServiceContext()->getFastClockSource()->now();
- } else {
- return std::move(sws.getValue());
- }
-
- sleepmillis(500);
- }
-
- return nullptr;
- }();
-
- if (!dest) {
- log() << "end connection " << session->remote();
- return;
- }
-
- auto& proxiedConn = ProxiedConnection::get(session);
- proxiedConn._dest = std::move(dest);
-
- ServiceEntryPointImpl::startSession(std::move(session));
-}
-
DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const Message& request) {
const auto& source = opCtx->getClient()->session();
auto& dest = ProxiedConnection::get(source);
auto brCtx = BridgeContext::get();
+ if (!dest.getSession()) {
+ dest.setSession([]() -> transport::SessionHandle {
+ HostAndPort destAddr{mongoBridgeGlobalParams.destUri};
+ const Seconds kConnectTimeout(30);
+ auto now = getGlobalServiceContext()->getFastClockSource()->now();
+ const auto connectExpiration = now + kConnectTimeout;
+ while (now < connectExpiration) {
+ auto tl = getGlobalServiceContext()->getTransportLayer();
+ auto sws =
+ tl->connect(destAddr, transport::kGlobalSSLMode, connectExpiration - now);
+ auto status = sws.getStatus();
+ if (!status.isOK()) {
+ warning() << "Unable to establish connection to " << destAddr << ": " << status;
+ now = getGlobalServiceContext()->getFastClockSource()->now();
+ } else {
+ return std::move(sws.getValue());
+ }
+
+ sleepmillis(500);
+ }
+
+ return nullptr;
+ }());
+
+ if (!dest.getSession()) {
+ source->end();
+ uasserted(50854, str::stream() << "Unable to connect to " << source->remote());
+ }
+ }
+
if (dest.inExhaust()) {
DbMessage dbm(request);