diff options
Diffstat (limited to 'src/mongo/tools/bridge.cpp')
-rw-r--r-- | src/mongo/tools/bridge.cpp | 554 |
1 files changed, 280 insertions, 274 deletions
diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp index 5e5fcbf7417..7d4a1f10e8c 100644 --- a/src/mongo/tools/bridge.cpp +++ b/src/mongo/tools/bridge.cpp @@ -35,8 +35,8 @@ #include "mongo/base/init.h" #include "mongo/base/initializer.h" -#include "mongo/client/dbclientinterface.h" #include "mongo/db/dbmessage.h" +#include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" #include "mongo/db/service_context_noop.h" #include "mongo/db/service_context_registrar.h" @@ -50,13 +50,14 @@ #include "mongo/stdx/thread.h" #include "mongo/tools/bridge_commands.h" #include "mongo/tools/mongobridge_options.h" +#include "mongo/transport/message_compressor_manager.h" +#include "mongo/transport/service_entry_point_impl.h" +#include "mongo/transport/service_executor_synchronous.h" #include "mongo/transport/transport_layer_asio.h" #include "mongo/util/assert_util.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" -#include "mongo/util/net/abstract_message_port.h" -#include "mongo/util/net/listen.h" #include "mongo/util/net/message.h" #include "mongo/util/quick_exit.h" #include "mongo/util/signal_handlers.h" @@ -84,237 +85,24 @@ boost::optional<HostAndPort> extractHostInfo(const OpMsgRequest& request) { return boost::none; } -class Forwarder { -public: - Forwarder(AbstractMessagingPort* mp, - stdx::mutex* settingsMutex, - HostSettingsMap* settings, - int64_t seed) - : _mp(mp), _settingsMutex(settingsMutex), _settings(settings), _prng(seed) {} - - void operator()() { - transport::SessionHandle dest = []() -> transport::SessionHandle { - HostAndPort destAddr{mongoBridgeGlobalParams.destUri}; - const Seconds kConnectTimeout(30); - auto now = getGlobalServiceContext()->getFastClockSource()->now(); - const auto connectExpiration = now + kConnectTimeout; - while (now < connectExpiration) { - auto tl = getGlobalServiceContext()->getTransportLayer(); - auto sws = - tl->connect(destAddr, transport::kGlobalSSLMode, connectExpiration - now); - auto status = sws.getStatus(); - if (!status.isOK()) { - warning() << "Unable to establish connection to " << destAddr << ": " << status; - now = getGlobalServiceContext()->getFastClockSource()->now(); - } else { - return std::move(sws.getValue()); - } - - sleepmillis(500); - } - - return nullptr; - }(); - - if (!dest) { - log() << "end connection " << _mp->remote(); - _mp->shutdown(); - return; - } - - bool receivingFirstMessage = true; - boost::optional<HostAndPort> host; - - Message request; - Message response; - MessageCompressorManager compressorManager; - - while (true) { - try { - request.reset(); - if (!_mp->recv(request)) { - log() << "end connection " << _mp->remote().toString(); - _mp->shutdown(); - break; - } - - uassert(ErrorCodes::IllegalOperation, - str::stream() << "Unsupported network op " << request.operation(), - isSupportedRequestNetworkOp(request.operation())); - - if (request.operation() == dbCompressed) { - auto swm = compressorManager.decompressMessage(request); - if (!swm.isOK()) { - error() << "Error decompressing message: " << swm.getStatus(); - _mp->shutdown(); - return; - } - request = std::move(swm.getValue()); - } - - const bool isFireAndForgetCommand = OpMsg::isFlagSet(request, OpMsg::kMoreToCome); - - boost::optional<OpMsgRequest> cmdRequest; - if ((request.operation() == dbQuery && - NamespaceString(DbMessage(request).getns()).isCommand()) || - request.operation() == dbCommand || request.operation() == dbMsg) { - cmdRequest = rpc::opMsgRequestFromAnyProtocol(request); - if (receivingFirstMessage) { - host = extractHostInfo(*cmdRequest); - } - - std::string hostName = host ? (host->toString()) : "<unknown>"; - LOG(1) << "Received \"" << cmdRequest->getCommandName() - << "\" command with arguments " << cmdRequest->body << " from " - << hostName; - } - receivingFirstMessage = false; - - // Handle a message intended to configure the mongobridge and return a response. - // The 'request' is consumed by the mongobridge and does not get forwarded to - // 'dest'. - if (auto status = maybeProcessBridgeCommand(cmdRequest)) { - invariant(!isFireAndForgetCommand); - - auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(request)); - BSONObj metadata; - BSONObj reply; - StatusWith<BSONObj> commandReply(reply); - if (!status->isOK()) { - commandReply = StatusWith<BSONObj>(*status); - } - auto cmdResponse = replyBuilder->setCommandReply(std::move(commandReply)) - .setMetadata(metadata) - .done(); - cmdResponse.header().setId(nextMessageId()); - cmdResponse.header().setResponseToMsgId(request.header().getId()); - _mp->say(cmdResponse); - continue; - } - - // Get the message handling settings for 'host' if the source of _mp's connection is - // known. By default, messages are forwarded to 'dest' without any additional delay. - HostSettings hostSettings = getHostSettings(host); - - switch (hostSettings.state) { - // Forward the message to 'dest' after waiting for 'hostSettings.delay' - // milliseconds. - case HostSettings::State::kForward: - sleepmillis(durationCount<Milliseconds>(hostSettings.delay)); - break; - // Close the connection to 'dest'. - case HostSettings::State::kHangUp: - log() << "Rejecting connection from " << host->toString() - << ", end connection " << _mp->remote().toString(); - _mp->shutdown(); - return; - // Forward the message to 'dest' with probability '1 - hostSettings.loss'. - case HostSettings::State::kDiscard: - if (_prng.nextCanonicalDouble() < hostSettings.loss) { - std::string hostName = host ? (host->toString()) : "<unknown>"; - if (cmdRequest) { - log() << "Discarding \"" << cmdRequest->getCommandName() - << "\" command with arguments " << cmdRequest->body - << " from " << hostName; - } else { - log() << "Discarding " << networkOpToString(request.operation()) - << " from " << hostName; - } - continue; - } - break; - } +ServiceContextRegistrar serviceContextCreator([]() { + return std::make_unique<ServiceContextNoop>(); +}); - // Send the message we received from '_mp' to 'dest'. 'dest' returns a response for - // OP_QUERY, OP_GET_MORE, and OP_COMMAND messages that we respond back to - // '_mp' with. - if (!isFireAndForgetCommand && - (request.operation() == dbQuery || request.operation() == dbGetMore || - request.operation() == dbCommand || request.operation() == dbMsg)) { - // TODO dbMsg moreToCome - // Forward the message to 'dest' and receive its reply in 'response'. - uassertStatusOK(dest->sinkMessage(request)); - response = uassertStatusOK(dest->sourceMessage()); - uassert(50727, - "Response ID did not match the sent message ID.", - response.header().getResponseToMsgId() == request.header().getId()); - - // If there's nothing to respond back to '_mp' with, then close the connection. - if (response.empty()) { - log() << "Received an empty response, end connection " - << _mp->remote().toString(); - _mp->shutdown(); - break; - } - - // Reload the message handling settings for 'host' in case they were changed - // while waiting for a response from 'dest'. - hostSettings = getHostSettings(host); - - // It's possible that sending 'request' blocked until 'dest' had something to - // reply with. If the message handling settings were since changed to close - // connections from 'host', then do so now. - if (hostSettings.state == HostSettings::State::kHangUp) { - log() << "Closing connection from " << host->toString() - << ", end connection " << _mp->remote().toString(); - _mp->shutdown(); - break; - } - - _mp->say(response); - - // If 'exhaust' is true, then instead of trying to receive another message from - // '_mp', receive messages from 'dest' until it returns a cursor id of zero. - bool exhaust = false; - if (request.operation() == dbQuery) { - DbMessage d(request); - QueryMessage q(d); - exhaust = q.queryOptions & QueryOption_Exhaust; - } - while (exhaust) { - if (response.operation() == dbCompressed) { - auto swm = compressorManager.decompressMessage(response); - if (!swm.isOK()) { - error() << "Error decompressing message: " << swm.getStatus(); - _mp->shutdown(); - return; - } - response = std::move(swm.getValue()); - } - - MsgData::View header = response.header(); - QueryResult::View qr = header.view2ptr(); - if (qr.getCursorId()) { - response = uassertStatusOK(dest->sourceMessage()); - _mp->say(response); - } else { - exhaust = false; - } - } - } else { - uassertStatusOK(dest->sinkMessage(request)); - } - } catch (const DBException& ex) { - error() << "Caught DBException in Forwarder: " << ex << ", end connection " - << _mp->remote().toString(); - _mp->shutdown(); - break; - } catch (...) { - severe() << exceptionToStatus() << ", terminating"; - quickExit(EXIT_UNCAUGHT); - } - } - } +} // namespace -private: +class BridgeContext { +public: Status runBridgeCommand(StringData cmdName, BSONObj cmdObj) { auto status = BridgeCommand::findCommand(cmdName); if (!status.isOK()) { return status.getStatus(); } + log() << "Processing bridge command: " << cmdName; + BridgeCommand* command = status.getValue(); - return command->run(cmdObj, _settingsMutex, _settings); + return command->run(cmdObj, &_settingsMutex, &_settings); } boost::optional<Status> maybeProcessBridgeCommand(boost::optional<OpMsgRequest> cmdRequest) { @@ -334,69 +122,277 @@ private: HostSettings getHostSettings(boost::optional<HostAndPort> host) { if (host) { - stdx::lock_guard<stdx::mutex> lk(*_settingsMutex); - return (*_settings)[*host]; + stdx::lock_guard<stdx::mutex> lk(_settingsMutex); + return (_settings)[*host]; } return {}; } - AbstractMessagingPort* _mp; + PseudoRandom makeSeededPRNG() { + static PseudoRandom globalPRNG(mongoBridgeGlobalParams.seed); + return PseudoRandom(globalPRNG.nextInt64()); + } - stdx::mutex* _settingsMutex; - HostSettingsMap* _settings; + static BridgeContext* get(); - PseudoRandom _prng; +private: + static const ServiceContext::Decoration<BridgeContext> _get; + + stdx::mutex _settingsMutex; + HostSettingsMap _settings; }; -class BridgeListener final : public Listener { +const ServiceContextNoop::Decoration<BridgeContext> BridgeContext::_get = + ServiceContext::declareDecoration<BridgeContext>(); + +BridgeContext* BridgeContext::get() { + return &_get(getGlobalServiceContext()); +} + +class ServiceEntryPointBridge; +class ProxiedConnection { public: - BridgeListener() - : Listener( - "bridge", "0.0.0.0", mongoBridgeGlobalParams.port, getGlobalServiceContext(), false), - _seedSource(mongoBridgeGlobalParams.seed) { - log() << "Setting random seed: " << mongoBridgeGlobalParams.seed; + ProxiedConnection() : _dest(nullptr), _prng(BridgeContext::get()->makeSeededPRNG()) {} + + transport::Session* operator->() { + return _dest.get(); } - void accepted(std::unique_ptr<AbstractMessagingPort> mp) override final { - { - stdx::lock_guard<stdx::mutex> lk(_portsMutex); - if (_inShutdown.load()) { - mp->shutdown(); - return; - } - _ports.insert(mp.get()); + const boost::optional<HostAndPort>& host() const { + return _host; + } + + std::string toString() const { + if (_host) { + return _host->toString(); } + return "<unknown>"; + } - Forwarder f(mp.release(), &_settingsMutex, &_settings, _seedSource.nextInt64()); - stdx::thread t(f); - t.detach(); + void setExhaust(bool val) { + _inExhaust = val; } - void shutdownAll() { - stdx::lock_guard<stdx::mutex> lk(_portsMutex); - for (auto mp : _ports) { - mp->shutdown(); + bool inExhaust() const { + return _inExhaust; + } + + void extractHostInfo(OpMsgRequest request) { + if (_seenFirstMessage) + return; + _seenFirstMessage = true; + + // The initial isMaster request made by mongod and mongos processes should contain a + // hostInfo field that identifies the process by its host:port. + StringData cmdName = request.getCommandName(); + if (cmdName != "isMaster" && cmdName != "ismaster") { + return; } + + if (auto hostInfoElem = request.body["hostInfo"]) { + if (hostInfoElem.type() == String) { + _host = HostAndPort{hostInfoElem.valueStringData()}; + } + } + } + + double nextCanonicalDouble() { + return _prng.nextCanonicalDouble(); } + static ProxiedConnection& get(const transport::SessionHandle& session); + private: - stdx::mutex _portsMutex; - std::set<AbstractMessagingPort*> _ports; - AtomicWord<bool> _inShutdown{false}; + friend class ServiceEntryPointBridge; - stdx::mutex _settingsMutex; - HostSettingsMap _settings; + static const transport::Session::Decoration<ProxiedConnection> _get; + transport::SessionHandle _dest; + PseudoRandom _prng; + boost::optional<HostAndPort> _host; + bool _seenFirstMessage = false; + bool _inExhaust = false; +}; + +const transport::Session::Decoration<ProxiedConnection> ProxiedConnection::_get = + transport::Session::declareDecoration<ProxiedConnection>(); + +ProxiedConnection& ProxiedConnection::get(const transport::SessionHandle& session) { + return _get(*session); +} + +class ServiceEntryPointBridge final : public ServiceEntryPointImpl { +public: + explicit ServiceEntryPointBridge(ServiceContext* svcCtx) : ServiceEntryPointImpl(svcCtx) {} - PseudoRandom _seedSource; + void startSession(transport::SessionHandle session) final; + DbResponse handleRequest(OperationContext* opCtx, const Message& request) final; }; -std::unique_ptr<mongo::BridgeListener> listener; +void ServiceEntryPointBridge::startSession(transport::SessionHandle session) { + transport::SessionHandle dest = []() -> transport::SessionHandle { + HostAndPort destAddr{mongoBridgeGlobalParams.destUri}; + const Seconds kConnectTimeout(30); + auto now = getGlobalServiceContext()->getFastClockSource()->now(); + const auto connectExpiration = now + kConnectTimeout; + while (now < connectExpiration) { + auto tl = getGlobalServiceContext()->getTransportLayer(); + auto sws = tl->connect(destAddr, transport::kGlobalSSLMode, connectExpiration - now); + auto status = sws.getStatus(); + if (!status.isOK()) { + warning() << "Unable to establish connection to " << destAddr << ": " << status; + now = getGlobalServiceContext()->getFastClockSource()->now(); + } else { + return std::move(sws.getValue()); + } -ServiceContextRegistrar serviceContextCreator([]() { - return std::make_unique<ServiceContextNoop>(); -}); + sleepmillis(500); + } -} // namespace + return nullptr; + }(); + + if (!dest) { + log() << "end connection " << session->remote(); + return; + } + + auto& proxiedConn = ProxiedConnection::get(session); + proxiedConn._dest = std::move(dest); + + ServiceEntryPointImpl::startSession(std::move(session)); +} + +DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const Message& request) { + const auto& source = opCtx->getClient()->session(); + auto& dest = ProxiedConnection::get(source); + auto brCtx = BridgeContext::get(); + + if (dest.inExhaust()) { + DbMessage dbm(request); + + auto response = uassertStatusOK(dest->sourceMessage()); + if (response.operation() == dbCompressed) { + MessageCompressorManager compressorMgr; + response = uassertStatusOK(compressorMgr.decompressMessage(response)); + } + + MsgData::View header = response.header(); + QueryResult::View qr = header.view2ptr(); + if (qr.getCursorId()) { + return {std::move(response)}; + } else { + dest.setExhaust(false); + return {Message(), dbm.getns()}; + } + } + + const bool isFireAndForgetCommand = OpMsg::isFlagSet(request, OpMsg::kMoreToCome); + + boost::optional<OpMsgRequest> cmdRequest; + if ((request.operation() == dbQuery && + NamespaceString(DbMessage(request).getns()).isCommand()) || + request.operation() == dbCommand || request.operation() == dbMsg) { + cmdRequest = rpc::opMsgRequestFromAnyProtocol(request); + + dest.extractHostInfo(*cmdRequest); + + LOG(1) << "Received \"" << cmdRequest->getCommandName() << "\" command with arguments " + << cmdRequest->body << " from " << dest; + } + + // Handle a message intended to configure the mongobridge and return a response. + // The 'request' is consumed by the mongobridge and does not get forwarded to + // 'dest'. + if (auto status = brCtx->maybeProcessBridgeCommand(cmdRequest)) { + invariant(!isFireAndForgetCommand); + + auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(request)); + BSONObj metadata; + BSONObj reply; + StatusWith<BSONObj> commandReply(reply); + if (!status->isOK()) { + commandReply = StatusWith<BSONObj>(*status); + } + return { + replyBuilder->setCommandReply(std::move(commandReply)).setMetadata(metadata).done()}; + } + + + // Get the message handling settings for 'host' if the source of the connection is + // known. By default, messages are forwarded to 'dest' without any additional delay. + HostSettings hostSettings = brCtx->getHostSettings(dest.host()); + + switch (hostSettings.state) { + // Close the connection to 'dest'. + case HostSettings::State::kHangUp: + log() << "Rejecting connection from " << dest << ", end connection " + << source->remote().toString(); + source->end(); + return {Message()}; + // Forward the message to 'dest' with probability '1 - hostSettings.loss'. + case HostSettings::State::kDiscard: + if (dest.nextCanonicalDouble() < hostSettings.loss) { + std::string hostName = dest.toString(); + if (cmdRequest) { + log() << "Discarding \"" << cmdRequest->getCommandName() + << "\" command with arguments " << cmdRequest->body << " from " + << hostName; + } else { + log() << "Discarding " << networkOpToString(request.operation()) << " from " + << hostName; + } + return {Message()}; + } + // Forward the message to 'dest' after waiting for 'hostSettings.delay' + // milliseconds. + case HostSettings::State::kForward: + sleepmillis(durationCount<Milliseconds>(hostSettings.delay)); + break; + } + + uassertStatusOK(dest->sinkMessage(request)); + + // Send the message we received from 'source' to 'dest'. 'dest' returns a response for + // OP_QUERY, OP_GET_MORE, and OP_COMMAND messages that we respond with. + if (!isFireAndForgetCommand && + (request.operation() == dbQuery || request.operation() == dbGetMore || + request.operation() == dbCommand || request.operation() == dbMsg)) { + // TODO dbMsg moreToCome + // Forward the message to 'dest' and receive its reply in 'response'. + auto response = uassertStatusOK(dest->sourceMessage()); + uassert(50765, + "Response ID did not match the sent message ID.", + response.header().getResponseToMsgId() == request.header().getId()); + + // Reload the message handling settings for 'host' in case they were changed + // while waiting for a response from 'dest'. + hostSettings = brCtx->getHostSettings(dest.host()); + + // It's possible that sending 'request' blocked until 'dest' had something to + // reply with. If the message handling settings were since changed to close + // connections from 'host', then do so now. + if (hostSettings.state == HostSettings::State::kHangUp) { + log() << "Closing connection from " << dest << ", end connection " << source->remote(); + source->end(); + return {Message()}; + } + + std::string exhaustNS; + if (request.operation() == dbQuery) { + DbMessage d(request); + QueryMessage q(d); + dest.setExhaust(q.queryOptions & QueryOption_Exhaust); + if (dest.inExhaust()) { + exhaustNS = d.getns(); + } + } else { + dest.setExhaust(false); + } + return {std::move(response), exhaustNS}; + } else { + return {Message()}; + } +} int bridgeMain(int argc, char** argv, char** envp) { @@ -404,8 +400,16 @@ int bridgeMain(int argc, char** argv, char** envp) { // NOTE: This function may be called at any time. It must not // depend on the prior execution of mongo initializers or the // existence of threads. - ListeningSockets::get()->closeAll(); - listener->shutdownAll(); + if (hasGlobalServiceContext()) { + auto sc = getGlobalServiceContext(); + if (sc->getTransportLayer()) + sc->getTransportLayer()->shutdown(); + + if (sc->getServiceEntryPoint()) { + sc->getServiceEntryPoint()->endAllSessions(transport::Session::kEmptyTagMask); + sc->getServiceEntryPoint()->shutdown(Seconds{10}); + } + } }); setupSignalHandlers(); @@ -414,11 +418,18 @@ int bridgeMain(int argc, char** argv, char** envp) { startSignalProcessingThread(LogFileStatus::kNoLogFileToRotate); auto serviceContext = getGlobalServiceContext(); + serviceContext->setServiceEntryPoint(std::make_unique<ServiceEntryPointBridge>(serviceContext)); + serviceContext->setServiceExecutor( + std::make_unique<transport::ServiceExecutorSynchronous>(serviceContext)); + + fassert(50766, serviceContext->getServiceExecutor()->start()); + transport::TransportLayerASIO::Options opts; - opts.mode = mongo::transport::TransportLayerASIO::Options::kEgress; + opts.ipList = "0.0.0.0"; + opts.port = mongoBridgeGlobalParams.port; - serviceContext->setTransportLayer( - std::make_unique<mongo::transport::TransportLayerASIO>(opts, nullptr)); + serviceContext->setTransportLayer(std::make_unique<mongo::transport::TransportLayerASIO>( + opts, serviceContext->getServiceEntryPoint())); auto tl = serviceContext->getTransportLayer(); if (!tl->setup().isOK()) { log() << "Error setting up transport layer"; @@ -431,12 +442,7 @@ int bridgeMain(int argc, char** argv, char** envp) { } serviceContext->notifyStartupComplete(); - - listener = stdx::make_unique<BridgeListener>(); - listener->setupSockets(); - listener->initAndListen(); - - return EXIT_CLEAN; + return waitForShutdown(); } } // namespace mongo |