/**
* Copyright (C) 2008 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kBridge
#include "mongo/platform/basic.h"
#include
#include
#include "mongo/base/init.h"
#include "mongo/base/initializer.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
#include "mongo/db/service_context_noop.h"
#include "mongo/db/service_context_registrar.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/random.h"
#include "mongo/rpc/command_request.h"
#include "mongo/rpc/factory.h"
#include "mongo/rpc/message.h"
#include "mongo/rpc/reply_builder_interface.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
#include "mongo/tools/bridge_commands.h"
#include "mongo/tools/mongobridge_options.h"
#include "mongo/transport/message_compressor_manager.h"
#include "mongo/transport/service_entry_point_impl.h"
#include "mongo/transport/service_executor_synchronous.h"
#include "mongo/transport/transport_layer_asio.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/quick_exit.h"
#include "mongo/util/signal_handlers.h"
#include "mongo/util/text.h"
#include "mongo/util/time_support.h"
#include "mongo/util/timer.h"
namespace mongo {
namespace {
boost::optional extractHostInfo(const OpMsgRequest& request) {
// The initial isMaster request made by mongod and mongos processes should contain a hostInfo
// field that identifies the process by its host:port.
StringData cmdName = request.getCommandName();
if (cmdName != "isMaster" && cmdName != "ismaster") {
return boost::none;
}
if (auto hostInfoElem = request.body["hostInfo"]) {
if (hostInfoElem.type() == String) {
return HostAndPort{hostInfoElem.valueStringData()};
}
}
return boost::none;
}
ServiceContextRegistrar serviceContextCreator([]() {
return std::make_unique();
});
} // namespace
class BridgeContext {
public:
Status runBridgeCommand(StringData cmdName, BSONObj cmdObj) {
auto status = BridgeCommand::findCommand(cmdName);
if (!status.isOK()) {
return status.getStatus();
}
log() << "Processing bridge command: " << cmdName;
BridgeCommand* command = status.getValue();
return command->run(cmdObj, &_settingsMutex, &_settings);
}
boost::optional maybeProcessBridgeCommand(boost::optional cmdRequest) {
if (!cmdRequest) {
return boost::none;
}
if (auto forBridge = cmdRequest->body["$forBridge"]) {
if (forBridge.trueValue()) {
return runBridgeCommand(cmdRequest->getCommandName(), cmdRequest->body);
}
return boost::none;
}
return boost::none;
}
HostSettings getHostSettings(boost::optional host) {
if (host) {
stdx::lock_guard lk(_settingsMutex);
return (_settings)[*host];
}
return {};
}
PseudoRandom makeSeededPRNG() {
static PseudoRandom globalPRNG(mongoBridgeGlobalParams.seed);
return PseudoRandom(globalPRNG.nextInt64());
}
static BridgeContext* get();
private:
static const ServiceContext::Decoration _get;
stdx::mutex _settingsMutex;
HostSettingsMap _settings;
};
const ServiceContextNoop::Decoration BridgeContext::_get =
ServiceContext::declareDecoration();
BridgeContext* BridgeContext::get() {
return &_get(getGlobalServiceContext());
}
class ServiceEntryPointBridge;
class ProxiedConnection {
public:
ProxiedConnection() : _dest(nullptr), _prng(BridgeContext::get()->makeSeededPRNG()) {}
transport::Session* operator->() {
return _dest.get();
}
transport::SessionHandle& getSession() {
return _dest;
}
void setSession(transport::SessionHandle session) {
_dest = std::move(session);
}
const boost::optional& host() const {
return _host;
}
std::string toString() const {
if (_host) {
return _host->toString();
}
return "";
}
void setExhaust(bool val) {
_inExhaust = val;
}
bool inExhaust() const {
return _inExhaust;
}
void extractHostInfo(OpMsgRequest request) {
if (_seenFirstMessage)
return;
_seenFirstMessage = true;
// The initial isMaster request made by mongod and mongos processes should contain a
// hostInfo field that identifies the process by its host:port.
StringData cmdName = request.getCommandName();
if (cmdName != "isMaster" && cmdName != "ismaster") {
return;
}
if (auto hostInfoElem = request.body["hostInfo"]) {
if (hostInfoElem.type() == String) {
_host = HostAndPort{hostInfoElem.valueStringData()};
}
}
}
double nextCanonicalDouble() {
return _prng.nextCanonicalDouble();
}
static ProxiedConnection& get(const transport::SessionHandle& session);
private:
friend class ServiceEntryPointBridge;
static const transport::Session::Decoration _get;
transport::SessionHandle _dest;
PseudoRandom _prng;
boost::optional _host;
bool _seenFirstMessage = false;
bool _inExhaust = false;
};
const transport::Session::Decoration ProxiedConnection::_get =
transport::Session::declareDecoration();
ProxiedConnection& ProxiedConnection::get(const transport::SessionHandle& session) {
return _get(*session);
}
class ServiceEntryPointBridge final : public ServiceEntryPointImpl {
public:
explicit ServiceEntryPointBridge(ServiceContext* svcCtx) : ServiceEntryPointImpl(svcCtx) {}
DbResponse handleRequest(OperationContext* opCtx, const Message& request) final;
};
DbResponse ServiceEntryPointBridge::handleRequest(OperationContext* opCtx, const Message& request) {
const auto& source = opCtx->getClient()->session();
auto& dest = ProxiedConnection::get(source);
auto brCtx = BridgeContext::get();
if (!dest.getSession()) {
dest.setSession([]() -> transport::SessionHandle {
HostAndPort destAddr{mongoBridgeGlobalParams.destUri};
const Seconds kConnectTimeout(30);
auto now = getGlobalServiceContext()->getFastClockSource()->now();
const auto connectExpiration = now + kConnectTimeout;
while (now < connectExpiration) {
auto tl = getGlobalServiceContext()->getTransportLayer();
auto sws =
tl->connect(destAddr, transport::kGlobalSSLMode, connectExpiration - now);
auto status = sws.getStatus();
if (!status.isOK()) {
warning() << "Unable to establish connection to " << destAddr << ": " << status;
now = getGlobalServiceContext()->getFastClockSource()->now();
} else {
return std::move(sws.getValue());
}
sleepmillis(500);
}
return nullptr;
}());
if (!dest.getSession()) {
source->end();
uasserted(50861, str::stream() << "Unable to connect to " << source->remote());
}
}
if (dest.inExhaust()) {
DbMessage dbm(request);
auto response = uassertStatusOK(dest->sourceMessage());
if (response.operation() == dbCompressed) {
MessageCompressorManager compressorMgr;
response = uassertStatusOK(compressorMgr.decompressMessage(response));
}
MsgData::View header = response.header();
QueryResult::View qr = header.view2ptr();
if (qr.getCursorId()) {
return {std::move(response)};
} else {
dest.setExhaust(false);
return {Message(), dbm.getns()};
}
}
const bool isFireAndForgetCommand = OpMsg::isFlagSet(request, OpMsg::kMoreToCome);
boost::optional cmdRequest;
if ((request.operation() == dbQuery &&
NamespaceString(DbMessage(request).getns()).isCommand()) ||
request.operation() == dbCommand || request.operation() == dbMsg) {
cmdRequest = rpc::opMsgRequestFromAnyProtocol(request);
dest.extractHostInfo(*cmdRequest);
LOG(1) << "Received \"" << cmdRequest->getCommandName() << "\" command with arguments "
<< cmdRequest->body << " from " << dest;
}
// Handle a message intended to configure the mongobridge and return a response.
// The 'request' is consumed by the mongobridge and does not get forwarded to
// 'dest'.
if (auto status = brCtx->maybeProcessBridgeCommand(cmdRequest)) {
invariant(!isFireAndForgetCommand);
auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(request));
BSONObj metadata;
BSONObj reply;
StatusWith commandReply(reply);
if (!status->isOK()) {
commandReply = StatusWith(*status);
}
return {
replyBuilder->setCommandReply(std::move(commandReply)).setMetadata(metadata).done()};
}
// Get the message handling settings for 'host' if the source of the connection is
// known. By default, messages are forwarded to 'dest' without any additional delay.
HostSettings hostSettings = brCtx->getHostSettings(dest.host());
switch (hostSettings.state) {
// Close the connection to 'dest'.
case HostSettings::State::kHangUp:
log() << "Rejecting connection from " << dest << ", end connection "
<< source->remote().toString();
source->end();
return {Message()};
// Forward the message to 'dest' with probability '1 - hostSettings.loss'.
case HostSettings::State::kDiscard:
if (dest.nextCanonicalDouble() < hostSettings.loss) {
std::string hostName = dest.toString();
if (cmdRequest) {
log() << "Discarding \"" << cmdRequest->getCommandName()
<< "\" command with arguments " << cmdRequest->body << " from "
<< hostName;
} else {
log() << "Discarding " << networkOpToString(request.operation()) << " from "
<< hostName;
}
return {Message()};
}
// Forward the message to 'dest' after waiting for 'hostSettings.delay'
// milliseconds.
case HostSettings::State::kForward:
sleepmillis(durationCount(hostSettings.delay));
break;
}
uassertStatusOK(dest->sinkMessage(request));
// Send the message we received from 'source' to 'dest'. 'dest' returns a response for
// OP_QUERY, OP_GET_MORE, and OP_COMMAND messages that we respond with.
if (!isFireAndForgetCommand &&
(request.operation() == dbQuery || request.operation() == dbGetMore ||
request.operation() == dbCommand || request.operation() == dbMsg)) {
// TODO dbMsg moreToCome
// Forward the message to 'dest' and receive its reply in 'response'.
auto response = uassertStatusOK(dest->sourceMessage());
uassert(50765,
"Response ID did not match the sent message ID.",
response.header().getResponseToMsgId() == request.header().getId());
// Reload the message handling settings for 'host' in case they were changed
// while waiting for a response from 'dest'.
hostSettings = brCtx->getHostSettings(dest.host());
// It's possible that sending 'request' blocked until 'dest' had something to
// reply with. If the message handling settings were since changed to close
// connections from 'host', then do so now.
if (hostSettings.state == HostSettings::State::kHangUp) {
log() << "Closing connection from " << dest << ", end connection " << source->remote();
source->end();
return {Message()};
}
std::string exhaustNS;
if (request.operation() == dbQuery) {
DbMessage d(request);
QueryMessage q(d);
dest.setExhaust(q.queryOptions & QueryOption_Exhaust);
if (dest.inExhaust()) {
exhaustNS = d.getns();
}
} else {
dest.setExhaust(false);
}
return {std::move(response), exhaustNS};
} else {
return {Message()};
}
}
int bridgeMain(int argc, char** argv, char** envp) {
registerShutdownTask([&] {
// NOTE: This function may be called at any time. It must not
// depend on the prior execution of mongo initializers or the
// existence of threads.
if (hasGlobalServiceContext()) {
auto sc = getGlobalServiceContext();
if (sc->getTransportLayer())
sc->getTransportLayer()->shutdown();
if (sc->getServiceEntryPoint()) {
sc->getServiceEntryPoint()->endAllSessions(transport::Session::kEmptyTagMask);
sc->getServiceEntryPoint()->shutdown(Seconds{10});
}
}
});
setupSignalHandlers();
runGlobalInitializersOrDie(argc, argv, envp);
startSignalProcessingThread(LogFileStatus::kNoLogFileToRotate);
auto serviceContext = getGlobalServiceContext();
serviceContext->setServiceEntryPoint(std::make_unique(serviceContext));
serviceContext->setServiceExecutor(
std::make_unique(serviceContext));
fassert(50766, serviceContext->getServiceExecutor()->start());
transport::TransportLayerASIO::Options opts;
opts.ipList.emplace_back("0.0.0.0");
opts.port = mongoBridgeGlobalParams.port;
serviceContext->setTransportLayer(std::make_unique(
opts, serviceContext->getServiceEntryPoint()));
auto tl = serviceContext->getTransportLayer();
if (!tl->setup().isOK()) {
log() << "Error setting up transport layer";
return EXIT_NET_ERROR;
}
if (!tl->start().isOK()) {
log() << "Error starting transport layer";
return EXIT_NET_ERROR;
}
serviceContext->notifyStartupComplete();
return waitForShutdown();
}
} // namespace mongo
#if defined(_WIN32)
// In Windows, wmain() is an alternate entry point for main(), and receives the same parameters
// as main() but encoded in Windows Unicode (UTF-16); "wide" 16-bit wchar_t characters. The
// WindowsCommandLine object converts these wide character strings to a UTF-8 coded equivalent
// and makes them available through the argv() and envp() members. This enables bridgeMain()
// to process UTF-8 encoded arguments and environment variables without regard to platform.
int wmain(int argc, wchar_t* argvW[], wchar_t* envpW[]) {
mongo::WindowsCommandLine wcl(argc, argvW, envpW);
int exitCode = mongo::bridgeMain(argc, wcl.argv(), wcl.envp());
mongo::quickExit(exitCode);
}
#else
int main(int argc, char* argv[], char** envp) {
int exitCode = mongo::bridgeMain(argc, argv, envp);
mongo::quickExit(exitCode);
}
#endif