diff options
author | Jonathan Reams <jbreams@mongodb.com> | 2018-02-20 14:33:42 -0500 |
---|---|---|
committer | Jonathan Reams <jbreams@mongodb.com> | 2018-03-02 11:07:01 -0500 |
commit | b2d8bd06318e1fddf4f1579084bbda4fb556c176 (patch) | |
tree | f591c41a0100dc85b51177396e80b946822aa712 /src/mongo/tools | |
parent | 975d539ae068bd27ebb478b6f3673b89d2ad6beb (diff) | |
download | mongo-b2d8bd06318e1fddf4f1579084bbda4fb556c176.tar.gz |
SERVER-33300 Integrate TransportLayer with DBClient
Diffstat (limited to 'src/mongo/tools')
-rw-r--r-- | src/mongo/tools/bridge.cpp | 76 |
1 files changed, 49 insertions, 27 deletions
diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp index 5f9eac1e4e0..375c88bbc1b 100644 --- a/src/mongo/tools/bridge.cpp +++ b/src/mongo/tools/bridge.cpp @@ -49,6 +49,7 @@ #include "mongo/stdx/thread.h" #include "mongo/tools/bridge_commands.h" #include "mongo/tools/mongobridge_options.h" +#include "mongo/transport/transport_layer_asio.h" #include "mongo/util/assert_util.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" @@ -91,33 +92,33 @@ public: : _mp(mp), _settingsMutex(settingsMutex), _settings(settings), _prng(seed) {} void operator()() { - DBClientConnection dest; - - { + transport::SessionHandle dest = []() -> transport::SessionHandle { HostAndPort destAddr{mongoBridgeGlobalParams.destUri}; const Seconds kConnectTimeout(30); - Timer connectTimer; - while (true) { - // DBClientConnection::connectSocketOnly() is used instead of - // DBClientConnection::connect() to avoid sending an isMaster command when the - // connection is established. We'd otherwise trigger a socket timeout when - // forwarding an _isSelf command because dest's replication subsystem hasn't been - // initialized yet and so it cannot respond to the isMaster command. - auto status = dest.connectSocketOnly(destAddr); - if (status.isOK()) { - break; - } - Seconds elapsed{connectTimer.seconds()}; - if (elapsed >= kConnectTimeout) { - warning() << "Unable to establish connection to " - << mongoBridgeGlobalParams.destUri << " after " << elapsed - << " seconds: " << status; - log() << "end connection " << _mp->remote().toString(); - _mp->shutdown(); - return; + auto now = getGlobalServiceContext()->getFastClockSource()->now(); + const auto connectExpiration = now + kConnectTimeout; + while (now < connectExpiration) { + auto tl = getGlobalServiceContext()->getTransportLayer(); + auto sws = + tl->connect(destAddr, transport::kGlobalSSLMode, connectExpiration - now); + auto status = sws.getStatus(); + if (!status.isOK()) { + warning() << "Unable to establish connection to " << destAddr << ": " << status; + now = getGlobalServiceContext()->getFastClockSource()->now(); + } else { + return std::move(sws.getValue()); } + sleepmillis(500); } + + return nullptr; + }(); + + if (!dest) { + log() << "end connection " << _mp->remote(); + _mp->shutdown(); + return; } bool receivingFirstMessage = true; @@ -231,8 +232,11 @@ public: request.operation() == dbCommand || request.operation() == dbMsg)) { // TODO dbMsg moreToCome // Forward the message to 'dest' and receive its reply in 'response'. - response.reset(); - dest.port().call(request, response); + uassertStatusOK(dest->sinkMessage(request)); + response = uassertStatusOK(dest->sourceMessage()); + uassert(50727, + "Response ID did not match the sent message ID.", + response.header().getResponseToMsgId() == request.header().getId()); // If there's nothing to respond back to '_mp' with, then close the connection. if (response.empty()) { @@ -280,15 +284,14 @@ public: MsgData::View header = response.header(); QueryResult::View qr = header.view2ptr(); if (qr.getCursorId()) { - response.reset(); - dest.port().recv(response); + response = uassertStatusOK(dest->sourceMessage()); _mp->say(response); } else { exhaust = false; } } } else { - dest.port().say(request); + uassertStatusOK(dest->sinkMessage(request)); } } catch (const DBException& ex) { error() << "Caught DBException in Forwarder: " << ex << ", end connection " @@ -409,6 +412,25 @@ int bridgeMain(int argc, char** argv, char** envp) { runGlobalInitializersOrDie(argc, argv, envp); startSignalProcessingThread(LogFileStatus::kNoLogFileToRotate); + auto serviceContext = getGlobalServiceContext(); + transport::TransportLayerASIO::Options opts; + opts.mode = mongo::transport::TransportLayerASIO::Options::kEgress; + + serviceContext->setTransportLayer( + std::make_unique<mongo::transport::TransportLayerASIO>(opts, nullptr)); + auto tl = serviceContext->getTransportLayer(); + if (!tl->setup().isOK()) { + log() << "Error setting up transport layer"; + return EXIT_NET_ERROR; + } + + if (!tl->start().isOK()) { + log() << "Error starting transport layer"; + return EXIT_NET_ERROR; + } + + serviceContext->notifyStartupComplete(); + listener = stdx::make_unique<BridgeListener>(); listener->setupSockets(); listener->initAndListen(); |