summaryrefslogtreecommitdiff
path: root/src/mongo/tools
diff options
context:
space:
mode:
authorJonathan Reams <jbreams@mongodb.com>2018-02-20 14:33:42 -0500
committerJonathan Reams <jbreams@mongodb.com>2018-03-02 11:07:01 -0500
commitb2d8bd06318e1fddf4f1579084bbda4fb556c176 (patch)
treef591c41a0100dc85b51177396e80b946822aa712 /src/mongo/tools
parent975d539ae068bd27ebb478b6f3673b89d2ad6beb (diff)
downloadmongo-b2d8bd06318e1fddf4f1579084bbda4fb556c176.tar.gz
SERVER-33300 Integrate TransportLayer with DBClient
Diffstat (limited to 'src/mongo/tools')
-rw-r--r--src/mongo/tools/bridge.cpp76
1 files changed, 49 insertions, 27 deletions
diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp
index 5f9eac1e4e0..375c88bbc1b 100644
--- a/src/mongo/tools/bridge.cpp
+++ b/src/mongo/tools/bridge.cpp
@@ -49,6 +49,7 @@
#include "mongo/stdx/thread.h"
#include "mongo/tools/bridge_commands.h"
#include "mongo/tools/mongobridge_options.h"
+#include "mongo/transport/transport_layer_asio.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
@@ -91,33 +92,33 @@ public:
: _mp(mp), _settingsMutex(settingsMutex), _settings(settings), _prng(seed) {}
void operator()() {
- DBClientConnection dest;
-
- {
+ transport::SessionHandle dest = []() -> transport::SessionHandle {
HostAndPort destAddr{mongoBridgeGlobalParams.destUri};
const Seconds kConnectTimeout(30);
- Timer connectTimer;
- while (true) {
- // DBClientConnection::connectSocketOnly() is used instead of
- // DBClientConnection::connect() to avoid sending an isMaster command when the
- // connection is established. We'd otherwise trigger a socket timeout when
- // forwarding an _isSelf command because dest's replication subsystem hasn't been
- // initialized yet and so it cannot respond to the isMaster command.
- auto status = dest.connectSocketOnly(destAddr);
- if (status.isOK()) {
- break;
- }
- Seconds elapsed{connectTimer.seconds()};
- if (elapsed >= kConnectTimeout) {
- warning() << "Unable to establish connection to "
- << mongoBridgeGlobalParams.destUri << " after " << elapsed
- << " seconds: " << status;
- log() << "end connection " << _mp->remote().toString();
- _mp->shutdown();
- return;
+ auto now = getGlobalServiceContext()->getFastClockSource()->now();
+ const auto connectExpiration = now + kConnectTimeout;
+ while (now < connectExpiration) {
+ auto tl = getGlobalServiceContext()->getTransportLayer();
+ auto sws =
+ tl->connect(destAddr, transport::kGlobalSSLMode, connectExpiration - now);
+ auto status = sws.getStatus();
+ if (!status.isOK()) {
+ warning() << "Unable to establish connection to " << destAddr << ": " << status;
+ now = getGlobalServiceContext()->getFastClockSource()->now();
+ } else {
+ return std::move(sws.getValue());
}
+
sleepmillis(500);
}
+
+ return nullptr;
+ }();
+
+ if (!dest) {
+ log() << "end connection " << _mp->remote();
+ _mp->shutdown();
+ return;
}
bool receivingFirstMessage = true;
@@ -231,8 +232,11 @@ public:
request.operation() == dbCommand || request.operation() == dbMsg)) {
// TODO dbMsg moreToCome
// Forward the message to 'dest' and receive its reply in 'response'.
- response.reset();
- dest.port().call(request, response);
+ uassertStatusOK(dest->sinkMessage(request));
+ response = uassertStatusOK(dest->sourceMessage());
+ uassert(50727,
+ "Response ID did not match the sent message ID.",
+ response.header().getResponseToMsgId() == request.header().getId());
// If there's nothing to respond back to '_mp' with, then close the connection.
if (response.empty()) {
@@ -280,15 +284,14 @@ public:
MsgData::View header = response.header();
QueryResult::View qr = header.view2ptr();
if (qr.getCursorId()) {
- response.reset();
- dest.port().recv(response);
+ response = uassertStatusOK(dest->sourceMessage());
_mp->say(response);
} else {
exhaust = false;
}
}
} else {
- dest.port().say(request);
+ uassertStatusOK(dest->sinkMessage(request));
}
} catch (const DBException& ex) {
error() << "Caught DBException in Forwarder: " << ex << ", end connection "
@@ -409,6 +412,25 @@ int bridgeMain(int argc, char** argv, char** envp) {
runGlobalInitializersOrDie(argc, argv, envp);
startSignalProcessingThread(LogFileStatus::kNoLogFileToRotate);
+ auto serviceContext = getGlobalServiceContext();
+ transport::TransportLayerASIO::Options opts;
+ opts.mode = mongo::transport::TransportLayerASIO::Options::kEgress;
+
+ serviceContext->setTransportLayer(
+ std::make_unique<mongo::transport::TransportLayerASIO>(opts, nullptr));
+ auto tl = serviceContext->getTransportLayer();
+ if (!tl->setup().isOK()) {
+ log() << "Error setting up transport layer";
+ return EXIT_NET_ERROR;
+ }
+
+ if (!tl->start().isOK()) {
+ log() << "Error starting transport layer";
+ return EXIT_NET_ERROR;
+ }
+
+ serviceContext->notifyStartupComplete();
+
listener = stdx::make_unique<BridgeListener>();
listener->setupSockets();
listener->initAndListen();