summaryrefslogtreecommitdiff
path: root/src/mongo/tools/bridge.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/tools/bridge.cpp')
-rw-r--r--src/mongo/tools/bridge.cpp554
1 files changed, 280 insertions, 274 deletions
diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp
index 5e5fcbf7417..7d4a1f10e8c 100644
--- a/src/mongo/tools/bridge.cpp
+++ b/src/mongo/tools/bridge.cpp
@@ -35,8 +35,8 @@
#include "mongo/base/init.h"
#include "mongo/base/initializer.h"
-#include "mongo/client/dbclientinterface.h"
#include "mongo/db/dbmessage.h"
+#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
#include "mongo/db/service_context_noop.h"
#include "mongo/db/service_context_registrar.h"
@@ -50,13 +50,14 @@
#include "mongo/stdx/thread.h"
#include "mongo/tools/bridge_commands.h"
#include "mongo/tools/mongobridge_options.h"
+#include "mongo/transport/message_compressor_manager.h"
+#include "mongo/transport/service_entry_point_impl.h"
+#include "mongo/transport/service_executor_synchronous.h"
#include "mongo/transport/transport_layer_asio.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
-#include "mongo/util/net/abstract_message_port.h"
-#include "mongo/util/net/listen.h"
#include "mongo/util/net/message.h"
#include "mongo/util/quick_exit.h"
#include "mongo/util/signal_handlers.h"
@@ -84,237 +85,24 @@ boost::optional<HostAndPort> extractHostInfo(const OpMsgRequest& request) {
return boost::none;
}
-class Forwarder {
-public:
- Forwarder(AbstractMessagingPort* mp,
- stdx::mutex* settingsMutex,
- HostSettingsMap* settings,
- int64_t seed)
- : _mp(mp), _settingsMutex(settingsMutex), _settings(settings), _prng(seed) {}
-
- void operator()() {
- transport::SessionHandle dest = []() -> transport::SessionHandle {
- HostAndPort destAddr{mongoBridgeGlobalParams.destUri};
- const Seconds kConnectTimeout(30);
- auto now = getGlobalServiceContext()->getFastClockSource()->now();
- const auto connectExpiration = now + kConnectTimeout;
- while (now < connectExpiration) {
- auto tl = getGlobalServiceContext()->getTransportLayer();
- auto sws =
- tl->connect(destAddr, transport::kGlobalSSLMode, connectExpiration - now);
- auto status = sws.getStatus();
- if (!status.isOK()) {
- warning() << "Unable to establish connection to " << destAddr << ": " << status;
- now = getGlobalServiceContext()->getFastClockSource()->now();
- } else {
- return std::move(sws.getValue());
- }
-
- sleepmillis(500);
- }
-
- return nullptr;
- }();
-
- if (!dest) {
- log() << "end connection " << _mp->remote();
- _mp->shutdown();
- return;
- }
-
- bool receivingFirstMessage = true;
- boost::optional<HostAndPort> host;
-
- Message request;
- Message response;
- MessageCompressorManager compressorManager;
-
- while (true) {
- try {
- request.reset();
- if (!_mp->recv(request)) {
- log() << "end connection " << _mp->remote().toString();
- _mp->shutdown();
- break;
- }
-
- uassert(ErrorCodes::IllegalOperation,
- str::stream() << "Unsupported network op " << request.operation(),
- isSupportedRequestNetworkOp(request.operation()));
-
- if (request.operation() == dbCompressed) {
- auto swm = compressorManager.decompressMessage(request);
- if (!swm.isOK()) {
- error() << "Error decompressing message: " << swm.getStatus();
- _mp->shutdown();
- return;
- }
- request = std::move(swm.getValue());
- }
-
- const bool isFireAndForgetCommand = OpMsg::isFlagSet(request, OpMsg::kMoreToCome);
-
- boost::optional<OpMsgRequest> cmdRequest;
- if ((request.operation() == dbQuery &&
- NamespaceString(DbMessage(request).getns()).isCommand()) ||
- request.operation() == dbCommand || request.operation() == dbMsg) {
- cmdRequest = rpc::opMsgRequestFromAnyProtocol(request);
- if (receivingFirstMessage) {
- host = extractHostInfo(*cmdRequest);
- }
-
- std::string hostName = host ? (host->toString()) : "<unknown>";
- LOG(1) << "Received \"" << cmdRequest->getCommandName()
- << "\" command with arguments " << cmdRequest->body << " from "
- << hostName;
- }
- receivingFirstMessage = false;
-
- // Handle a message intended to configure the mongobridge and return a response.
- // The 'request' is consumed by the mongobridge and does not get forwarded to
- // 'dest'.
- if (auto status = maybeProcessBridgeCommand(cmdRequest)) {
- invariant(!isFireAndForgetCommand);
-
- auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(request));
- BSONObj metadata;
- BSONObj reply;
- StatusWith<BSONObj> commandReply(reply);
- if (!status->isOK()) {
- commandReply = StatusWith<BSONObj>(*status);
- }
- auto cmdResponse = replyBuilder->setCommandReply(std::move(commandReply))
- .setMetadata(metadata)
- .done();
- cmdResponse.header().setId(nextMessageId());
- cmdResponse.header().setResponseToMsgId(request.header().getId());
- _mp->say(cmdResponse);
- continue;
- }
-
- // Get the message handling settings for 'host' if the source of _mp's connection is
- // known. By default, messages are forwarded to 'dest' without any additional delay.
- HostSettings hostSettings = getHostSettings(host);
-
- switch (hostSettings.state) {
- // Forward the message to 'dest' after waiting for 'hostSettings.delay'
- // milliseconds.
- case HostSettings::State::kForward:
- sleepmillis(durationCount<Milliseconds>(hostSettings.delay));
- break;
- // Close the connection to 'dest'.
- case HostSettings::State::kHangUp:
- log() << "Rejecting connection from " << host->toString()
- << ", end connection " << _mp->remote().toString();
- _mp->shutdown();
- return;
- // Forward the message to 'dest' with probability '1 - hostSettings.loss'.
- case HostSettings::State::kDiscard:
- if (_prng.nextCanonicalDouble() < hostSettings.loss) {
- std::string hostName = host ? (host->toString()) : "<unknown>";
- if (cmdRequest) {
- log() << "Discarding \"" << cmdRequest->getCommandName()
- << "\" command with arguments " << cmdRequest->body
- << " from " << hostName;
- } else {
- log() << "Discarding " << networkOpToString(request.operation())
- << " from " << hostName;
- }
- continue;
- }
- break;
- }
+ServiceContextRegistrar serviceContextCreator([]() {
+ return std::make_unique<ServiceContextNoop>();
+});
- // Send the message we received from '_mp' to 'dest'. 'dest' returns a response for
- // OP_QUERY, OP_GET_MORE, and OP_COMMAND messages that we respond back to
- // '_mp' with.
- if (!isFireAndForgetCommand &&
- (request.operation() == dbQuery || request.operation() == dbGetMore ||
- request.operation() == dbCommand || request.operation() == dbMsg)) {
- // TODO dbMsg moreToCome
- // Forward the message to 'dest' and receive its reply in 'response'.
- uassertStatusOK(dest->sinkMessage(request));
- response = uassertStatusOK(dest->sourceMessage());
- uassert(50727,
- "Response ID did not match the sent message ID.",
- response.header().getResponseToMsgId() == request.header().getId());
-
- // If there's nothing to respond back to '_mp' with, then close the connection.
- if (response.empty()) {
- log() << "Received an empty response, end connection "
- << _mp->remote().toString();
- _mp->shutdown();
- break;
- }
-
- // Reload the message handling settings for 'host' in case they were changed
- // while waiting for a response from 'dest'.
- hostSettings = getHostSettings(host);
-
- // It's possible that sending 'request' blocked until 'dest' had something to
- // reply with. If the message handling settings were since changed to close
- // connections from 'host', then do so now.
- if (hostSettings.state == HostSettings::State::kHangUp) {
- log() << "Closing connection from " << host->toString()
- << ", end connection " << _mp->remote().toString();
- _mp->shutdown();
- break;
- }
-
- _mp->say(response);
-
- // If 'exhaust' is true, then instead of trying to receive another message from
- // '_mp', receive messages from 'dest' until it returns a cursor id of zero.
- bool exhaust = false;
- if (request.operation() == dbQuery) {
- DbMessage d(request);
- QueryMessage q(d);
- exhaust = q.queryOptions & QueryOption_Exhaust;
- }
- while (exhaust) {
- if (response.operation() == dbCompressed) {
- auto swm = compressorManager.decompressMessage(response);
- if (!swm.isOK()) {
- error() << "Error decompressing message: " << swm.getStatus();
- _mp->shutdown();
- return;
- }
- response = std::move(swm.getValue());
- }
-
- MsgData::View header = response.header();
- QueryResult::View qr = header.view2ptr();
- if (qr.getCursorId()) {
- response = uassertStatusOK(dest->sourceMessage());
- _mp->say(response);
- } else {
- exhaust = false;
- }
- }
- } else {
- uassertStatusOK(dest->sinkMessage(request));
- }
- } catch (const DBException& ex) {
- error() << "Caught DBException in Forwarder: " << ex << ", end connection "
- << _mp->remote().toString();
- _mp->shutdown();
- break;
- } catch (...) {
- severe() << exceptionToStatus() << ", terminating";
- quickExit(EXIT_UNCAUGHT);
- }
- }
- }
+} // namespace
-private:
+class BridgeContext {
+public:
Status runBridgeCommand(StringData cmdName, BSONObj cmdObj) {
auto status = BridgeCommand::findCommand(cmdName);
if (!status.isOK()) {
return status.getStatus();
}
+ log() << "Processing bridge command: " << cmdName;
+
BridgeCommand* command = status.getValue();
- return command->run(cmdObj, _settingsMutex, _settings);
+ return command->run(cmdObj, &_settingsMutex, &_settings);
}
boost::optional<Status> maybeProcessBridgeCommand(boost::optional<OpMsgRequest> cmdRequest) {
@@ -334,69 +122,277 @@ private:
HostSettings getHostSettings(boost::optional<HostAndPort> host) {
if (host) {
- stdx::lock_guard<stdx::mutex> lk(*_settingsMutex);
- return (*_settings)[*host];
+ stdx::lock_guard<stdx::mutex> lk(_settingsMutex);
+ return (_settings)[*host];
}
return {};
}
- AbstractMessagingPort* _mp;
+ PseudoRandom makeSeededPRNG() {
+ static PseudoRandom globalPRNG(mongoBridgeGlobalParams.seed);
+ return PseudoRandom(globalPRNG.nextInt64());
+ }
- stdx::mutex* _settingsMutex;
- HostSettingsMap* _settings;
+ static BridgeContext* get();
- PseudoRandom _prng;
+private:
+ static const ServiceContext::Decoration<BridgeContext> _get;
+
+ stdx::mutex _settingsMutex;
+ HostSettingsMap _settings;
};
-class BridgeListener final : public Listener {
+const ServiceContextNoop::Decoration<BridgeContext> BridgeContext::_get =
+ ServiceContext::declareDecoration<BridgeContext>();
+
+BridgeContext* BridgeContext::get() {
+ return &_get(getGlobalServiceContext());
+}
+
+class ServiceEntryPointBridge;
+class ProxiedConnection {
public:
- BridgeListener()
- : Listener(
- "bridge", "0.0.0.0", mongoBridgeGlobalParams.port, getGlobalServiceContext(), false),
- _seedSource(mongoBridgeGlobalParams.seed) {
- log() << "Setting random seed: " << mongoBridgeGlobalParams.seed;
+ ProxiedConnection() : _dest(nullptr), _prng(BridgeContext::get()->makeSeededPRNG()) {}
+
+ transport::Session* operator->() {
+ return _dest.get();
}
- void accepted(std::unique_ptr<AbstractMessagingPort> mp) override final {
- {
- stdx::lock_guard<stdx::mutex> lk(_portsMutex);
- if (_inShutdown.load()) {
- mp->shutdown();
- return;
- }
- _ports.insert(mp.get());
+ const boost::optional<HostAndPort>& host() const {
+ return _host;
+ }
+
+ std::string toString() const {
+ if (_host) {
+ return _host->toString();
}
+ return "<unknown>";
+ }
- Forwarder f(mp.release(), &_settingsMutex, &_settings, _seedSource.nextInt64());
- stdx::thread t(f);
- t.detach();
+ void setExhaust(bool val) {
+ _inExhaust = val;
}
- void shutdownAll() {
- stdx::lock_guard<stdx::mutex> lk(_portsMutex);
- for (auto mp : _ports) {
- mp->shutdown();
+ bool inExhaust() const {
+ return _inExhaust;
+ }
+
+ void extractHostInfo(OpMsgRequest request) {
+ if (_seenFirstMessage)
+ return;
+ _seenFirstMessage = true;
+
+ // The initial isMaster request made by mongod and mongos processes should contain a
+ // hostInfo field that identifies the process by its host:port.
+ StringData cmdName = request.getCommandName();
+ if (cmdName != "isMaster" && cmdName != "ismaster") {
+ return;
}
+
+ if (auto hostInfoElem = request.body["hostInfo"]) {
+ if (hostInfoElem.type() == String) {
+ _host = HostAndPort{hostInfoElem.valueStringData()};
+ }
+ }
+ }
+
+ double nextCanonicalDouble() {
+ return _prng.nextCanonicalDouble();
}
+ static ProxiedConnection& get(const transport::SessionHandle& session);
+
private:
- stdx::mutex _portsMutex;
- std::set<AbstractMessagingPort*> _ports;
- AtomicWord<bool> _inShutdown{false};
+ friend class ServiceEntryPointBridge;
- stdx::mutex _settingsMutex;
- HostSettingsMap _settings;
+ static const transport::Session::Decoration<ProxiedConnection> _get;
+ transport::SessionHandle _dest;
+ PseudoRandom _prng;
+ boost::optional<HostAndPort> _host;
+ bool _seenFirstMessage = false;
+ bool _inExhaust = false;
+};
+
+const transport::Session::Decoration<ProxiedConnection> ProxiedConnection::_get =
+ transport::Session::declareDecoration<ProxiedConnection>();
+
+ProxiedConnection& ProxiedConnection::get(const transport::SessionHandle& session) {
+ return _get(*session);
+}
+
+class ServiceEntryPointBridge final : public ServiceEntryPointImpl {
+public:
+ explicit ServiceEntryPointBridge(ServiceContext* svcCtx) : ServiceEntryPointImpl(svcCtx) {}
- PseudoRandom _seedSource;
+ void startSession(transport::SessionHandle session) final;
+ DbResponse handleRequest(OperationContext* opCtx, const Message& request) final;
};
-std::unique_ptr<mongo::BridgeListener> listener;
+void ServiceEntryPointBridge::startSession(transport::SessionHandle session) {
+ transport::SessionHandle dest = []() -> transport::SessionHandle {
+ HostAndPort destAddr{mongoBridgeGlobalParams.destUri};
+ const Seconds kConnectTimeout(30);
+ auto now = getGlobalServiceContext()->getFastClockSource()->now();
+ const auto connectExpiration = now + kConnectTimeout;
+ while (now < connectExpiration) {
+ auto tl = getGlobalServiceContext()->getTransportLayer();
+ auto sws = tl->connect(destAddr, transport::kGlobalSSLMode, connectExpiration - now);
+ auto status = sws.getStatus();
+ if (!status.isOK()) {
+ warning() << "Unable to establish connection to " << destAddr << ": " << status;
+ now = getGlobalServiceContext()->getFastClockSource()->now();
+ } else {
+ return std::move(sws.getValue());
+ }
-ServiceContextRegistrar serviceContextCreator([]() {
- return std::make_unique<ServiceContextNoop>();
-});
+ sleepmillis(500);
+ }
-} // namespace
+ return nullptr;
+ }();
+
+ if (!dest) {
+ log() << "end connection " << session->remote();
+ return;
+ }
+
+ auto& proxiedConn = ProxiedConnection::get(session);
+ proxiedConn._dest = std::move(dest);
+
+ ServiceEntryPointImpl::startSession(std::move(session));
+}
+
+DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const Message& request) {
+ const auto& source = opCtx->getClient()->session();
+ auto& dest = ProxiedConnection::get(source);
+ auto brCtx = BridgeContext::get();
+
+ if (dest.inExhaust()) {
+ DbMessage dbm(request);
+
+ auto response = uassertStatusOK(dest->sourceMessage());
+ if (response.operation() == dbCompressed) {
+ MessageCompressorManager compressorMgr;
+ response = uassertStatusOK(compressorMgr.decompressMessage(response));
+ }
+
+ MsgData::View header = response.header();
+ QueryResult::View qr = header.view2ptr();
+ if (qr.getCursorId()) {
+ return {std::move(response)};
+ } else {
+ dest.setExhaust(false);
+ return {Message(), dbm.getns()};
+ }
+ }
+
+ const bool isFireAndForgetCommand = OpMsg::isFlagSet(request, OpMsg::kMoreToCome);
+
+ boost::optional<OpMsgRequest> cmdRequest;
+ if ((request.operation() == dbQuery &&
+ NamespaceString(DbMessage(request).getns()).isCommand()) ||
+ request.operation() == dbCommand || request.operation() == dbMsg) {
+ cmdRequest = rpc::opMsgRequestFromAnyProtocol(request);
+
+ dest.extractHostInfo(*cmdRequest);
+
+ LOG(1) << "Received \"" << cmdRequest->getCommandName() << "\" command with arguments "
+ << cmdRequest->body << " from " << dest;
+ }
+
+ // Handle a message intended to configure the mongobridge and return a response.
+ // The 'request' is consumed by the mongobridge and does not get forwarded to
+ // 'dest'.
+ if (auto status = brCtx->maybeProcessBridgeCommand(cmdRequest)) {
+ invariant(!isFireAndForgetCommand);
+
+ auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(request));
+ BSONObj metadata;
+ BSONObj reply;
+ StatusWith<BSONObj> commandReply(reply);
+ if (!status->isOK()) {
+ commandReply = StatusWith<BSONObj>(*status);
+ }
+ return {
+ replyBuilder->setCommandReply(std::move(commandReply)).setMetadata(metadata).done()};
+ }
+
+
+ // Get the message handling settings for 'host' if the source of the connection is
+ // known. By default, messages are forwarded to 'dest' without any additional delay.
+ HostSettings hostSettings = brCtx->getHostSettings(dest.host());
+
+ switch (hostSettings.state) {
+ // Close the connection to 'dest'.
+ case HostSettings::State::kHangUp:
+ log() << "Rejecting connection from " << dest << ", end connection "
+ << source->remote().toString();
+ source->end();
+ return {Message()};
+ // Forward the message to 'dest' with probability '1 - hostSettings.loss'.
+ case HostSettings::State::kDiscard:
+ if (dest.nextCanonicalDouble() < hostSettings.loss) {
+ std::string hostName = dest.toString();
+ if (cmdRequest) {
+ log() << "Discarding \"" << cmdRequest->getCommandName()
+ << "\" command with arguments " << cmdRequest->body << " from "
+ << hostName;
+ } else {
+ log() << "Discarding " << networkOpToString(request.operation()) << " from "
+ << hostName;
+ }
+ return {Message()};
+ }
+ // Forward the message to 'dest' after waiting for 'hostSettings.delay'
+ // milliseconds.
+ case HostSettings::State::kForward:
+ sleepmillis(durationCount<Milliseconds>(hostSettings.delay));
+ break;
+ }
+
+ uassertStatusOK(dest->sinkMessage(request));
+
+ // Send the message we received from 'source' to 'dest'. 'dest' returns a response for
+ // OP_QUERY, OP_GET_MORE, and OP_COMMAND messages that we respond with.
+ if (!isFireAndForgetCommand &&
+ (request.operation() == dbQuery || request.operation() == dbGetMore ||
+ request.operation() == dbCommand || request.operation() == dbMsg)) {
+ // TODO dbMsg moreToCome
+ // Forward the message to 'dest' and receive its reply in 'response'.
+ auto response = uassertStatusOK(dest->sourceMessage());
+ uassert(50765,
+ "Response ID did not match the sent message ID.",
+ response.header().getResponseToMsgId() == request.header().getId());
+
+ // Reload the message handling settings for 'host' in case they were changed
+ // while waiting for a response from 'dest'.
+ hostSettings = brCtx->getHostSettings(dest.host());
+
+ // It's possible that sending 'request' blocked until 'dest' had something to
+ // reply with. If the message handling settings were since changed to close
+ // connections from 'host', then do so now.
+ if (hostSettings.state == HostSettings::State::kHangUp) {
+ log() << "Closing connection from " << dest << ", end connection " << source->remote();
+ source->end();
+ return {Message()};
+ }
+
+ std::string exhaustNS;
+ if (request.operation() == dbQuery) {
+ DbMessage d(request);
+ QueryMessage q(d);
+ dest.setExhaust(q.queryOptions & QueryOption_Exhaust);
+ if (dest.inExhaust()) {
+ exhaustNS = d.getns();
+ }
+ } else {
+ dest.setExhaust(false);
+ }
+ return {std::move(response), exhaustNS};
+ } else {
+ return {Message()};
+ }
+}
int bridgeMain(int argc, char** argv, char** envp) {
@@ -404,8 +400,16 @@ int bridgeMain(int argc, char** argv, char** envp) {
// NOTE: This function may be called at any time. It must not
// depend on the prior execution of mongo initializers or the
// existence of threads.
- ListeningSockets::get()->closeAll();
- listener->shutdownAll();
+ if (hasGlobalServiceContext()) {
+ auto sc = getGlobalServiceContext();
+ if (sc->getTransportLayer())
+ sc->getTransportLayer()->shutdown();
+
+ if (sc->getServiceEntryPoint()) {
+ sc->getServiceEntryPoint()->endAllSessions(transport::Session::kEmptyTagMask);
+ sc->getServiceEntryPoint()->shutdown(Seconds{10});
+ }
+ }
});
setupSignalHandlers();
@@ -414,11 +418,18 @@ int bridgeMain(int argc, char** argv, char** envp) {
startSignalProcessingThread(LogFileStatus::kNoLogFileToRotate);
auto serviceContext = getGlobalServiceContext();
+ serviceContext->setServiceEntryPoint(std::make_unique<ServiceEntryPointBridge>(serviceContext));
+ serviceContext->setServiceExecutor(
+ std::make_unique<transport::ServiceExecutorSynchronous>(serviceContext));
+
+ fassert(50766, serviceContext->getServiceExecutor()->start());
+
transport::TransportLayerASIO::Options opts;
- opts.mode = mongo::transport::TransportLayerASIO::Options::kEgress;
+ opts.ipList = "0.0.0.0";
+ opts.port = mongoBridgeGlobalParams.port;
- serviceContext->setTransportLayer(
- std::make_unique<mongo::transport::TransportLayerASIO>(opts, nullptr));
+ serviceContext->setTransportLayer(std::make_unique<mongo::transport::TransportLayerASIO>(
+ opts, serviceContext->getServiceEntryPoint()));
auto tl = serviceContext->getTransportLayer();
if (!tl->setup().isOK()) {
log() << "Error setting up transport layer";
@@ -431,12 +442,7 @@ int bridgeMain(int argc, char** argv, char** envp) {
}
serviceContext->notifyStartupComplete();
-
- listener = stdx::make_unique<BridgeListener>();
- listener->setupSockets();
- listener->initAndListen();
-
- return EXIT_CLEAN;
+ return waitForShutdown();
}
} // namespace mongo