From 1fc9cba6988ab1b600be1a0549caf6146619e4df Mon Sep 17 00:00:00 2001 From: Adam Midvidy Date: Wed, 24 Jun 2015 13:54:45 -0400 Subject: SERVER-19035 autodetect support for OP_COMMAND in remote servers --- src/mongo/client/dbclient.cpp | 161 ++++++++++++++++++------ src/mongo/client/dbclientinterface.h | 79 ++++++------ src/mongo/client/scoped_db_conn_test.cpp | 40 ++++-- src/mongo/db/repl/isself.cpp | 9 +- src/mongo/rpc/factory.cpp | 28 +++++ src/mongo/rpc/factory.h | 13 ++ src/mongo/rpc/protocol.cpp | 43 +++++++ src/mongo/rpc/protocol.h | 6 + src/mongo/rpc/protocol_test.cpp | 36 ++++++ src/mongo/s/client/sharding_connection_hook.cpp | 34 ----- src/mongo/shell/shell_options.h | 5 +- src/mongo/shell/shell_utils.cpp | 7 +- src/mongo/shell/shell_utils_launcher.cpp | 3 +- src/mongo/tools/bridge.cpp | 2 +- src/mongo/tools/sniffer.cpp | 4 +- 15 files changed, 343 insertions(+), 127 deletions(-) (limited to 'src') diff --git a/src/mongo/client/dbclient.cpp b/src/mongo/client/dbclient.cpp index 53e0ddc548d..e25645489c5 100644 --- a/src/mongo/client/dbclient.cpp +++ b/src/mongo/client/dbclient.cpp @@ -45,6 +45,7 @@ #include "mongo/db/auth/internal_user_auth.h" #include "mongo/db/json.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/wire_version.h" #include "mongo/rpc/factory.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata.h" @@ -902,46 +903,122 @@ BSONObj DBClientInterface::findOne(const string& ns, return v.empty() ? BSONObj() : v[0]; } -bool DBClientConnection::connect(const HostAndPort& server, string& errmsg) { - _server = server; - _serverString = _server.toString(); - return _connect(errmsg); -} +namespace { -bool DBClientConnection::_connect(string& errmsg) { - _serverString = _server.toString(); - _serverAddrString.clear(); +/** + * RAII class to force usage of OP_QUERY on a connection. + */ +class ScopedForceOpQuery { +public: + ScopedForceOpQuery(DBClientBase* conn) + : _conn(conn), _oldProtos(conn->getClientRPCProtocols()) { + _conn->setClientRPCProtocols(rpc::supports::kOpQueryOnly); + } - // we keep around SockAddr for connection life -- maybe MessagingPort - // requires that? - std::unique_ptr serverSockAddr(new SockAddr(_server.host().c_str(), _server.port())); - if (!serverSockAddr->isValid()) { - errmsg = str::stream() << "couldn't initialize connection to host " - << _server.host().c_str() << ", address is invalid"; - return false; + ~ScopedForceOpQuery() { + _conn->setClientRPCProtocols(_oldProtos); } - server.reset(serverSockAddr.release()); - p.reset(new MessagingPort(_so_timeout, _logLevel)); +private: + DBClientBase* const _conn; + const rpc::ProtocolSet _oldProtos; +}; - if (_server.host().empty()) { - errmsg = str::stream() << "couldn't connect to server " << toString() << ", host is empty"; - return false; +/** +* Initializes the wire version of conn, and returns the isMaster reply. +*/ +StatusWith initWireVersion(DBClientBase* conn) { + try { + // We need to force the usage of OP_QUERY on this command, even if we have previously + // detected support for OP_COMMAND on a connection. This is necessary to handle the case + // where we reconnect to an older version of MongoDB running at the same host/port. + ScopedForceOpQuery forceOpQuery{conn}; + + auto result = conn->runCommandWithMetadata( + "admin", "isMaster", rpc::makeEmptyMetadata(), BSON("isMaster" << 1)); + + BSONObj isMasterObj = result->getCommandReply().getOwned(); + + if (isMasterObj.hasField("minWireVersion") && isMasterObj.hasField("maxWireVersion")) { + int minWireVersion = isMasterObj["minWireVersion"].numberInt(); + int maxWireVersion = isMasterObj["maxWireVersion"].numberInt(); + conn->setWireVersions(minWireVersion, maxWireVersion); + } + + return isMasterObj; + + } catch (...) { + return exceptionToStatus(); } +} - _serverAddrString = server->getAddr(); +} // namespace - if (_serverAddrString == "0.0.0.0") { - errmsg = str::stream() << "couldn't connect to server " << toString() - << ", address resolved to 0.0.0.0"; +bool DBClientConnection::connect(const HostAndPort& server, std::string& errmsg) { + auto connectStatus = connect(server); + if (!connectStatus.isOK()) { + errmsg = connectStatus.reason(); return false; } + return true; +} + + +Status DBClientConnection::connect(const HostAndPort& serverAddress) { + auto connectStatus = connectSocketOnly(serverAddress); + if (!connectStatus.isOK()) { + return connectStatus; + } - if (!p->connect(*server)) { - errmsg = str::stream() << "couldn't connect to server " << toString() - << ", connection attempt failed"; + auto swIsMasterReply = initWireVersion(this); + if (!swIsMasterReply.isOK()) { _failed = true; - return false; + return swIsMasterReply.getStatus(); + } + + auto swProtocolSet = rpc::parseProtocolSetFromIsMasterReply(swIsMasterReply.getValue()); + if (!swProtocolSet.isOK()) { + return swProtocolSet.getStatus(); + } + + _setServerRPCProtocols(swProtocolSet.getValue()); + + return Status::OK(); +} + +Status DBClientConnection::connectSocketOnly(const HostAndPort& serverAddress) { + _serverAddress = serverAddress; + _failed = true; + + // We need to construct a SockAddr so we can resolve the address. + SockAddr osAddr{serverAddress.host().c_str(), serverAddress.port()}; + + if (!osAddr.isValid()) { + return Status(ErrorCodes::InvalidOptions, + str::stream() << "couldn't initialize connection to host " + << serverAddress.host() << ", address is invalid"); + } + + _port.reset(new MessagingPort(_so_timeout, _logLevel)); + + if (serverAddress.host().empty()) { + return Status(ErrorCodes::InvalidOptions, + str::stream() << "couldn't connect to server " << _serverAddress.toString() + << ", host is empty"); + } + + if (osAddr.getAddr() == "0.0.0.0") { + return Status(ErrorCodes::InvalidOptions, + str::stream() << "couldn't connect to server " << _serverAddress.toString() + << ", address resolved to 0.0.0.0"); + } + + _resolvedAddress = osAddr.getAddr(); + + if (!_port->connect(osAddr)) { + return Status(ErrorCodes::OperationFailed, + str::stream() << "couldn't connect to server " << _serverAddress.toString() + << ", connection attempt failed"); } else { LOG(1) << "connected to server " << toString() << endl; } @@ -949,11 +1026,14 @@ bool DBClientConnection::_connect(string& errmsg) { #ifdef MONGO_CONFIG_SSL int sslModeVal = sslGlobalParams.sslMode.load(); if (sslModeVal == SSLParams::SSLMode_preferSSL || sslModeVal == SSLParams::SSLMode_requireSSL) { - return p->secure(sslManager(), _server.host()); + if (!_port->secure(sslManager(), serverAddress.host())) { + return Status(ErrorCodes::OperationFailed, "Failed to initialize SSL on connection"); + } } #endif - return true; + _failed = false; + return Status::OK(); } void DBClientConnection::logout(const string& dbname, BSONObj& info) { @@ -988,10 +1068,11 @@ void DBClientConnection::_checkConnection() { LOG(_logLevel) << "trying reconnect to " << toString() << endl; string errmsg; _failed = false; - if (!_connect(errmsg)) { + auto connectStatus = connect(_serverAddress); + if (!connectStatus.isOK()) { _failed = true; LOG(_logLevel) << "reconnect " << toString() << " failed " << errmsg << endl; - throw SocketException(SocketException::CONNECT_ERROR, toString()); + throw SocketException(SocketException::CONNECT_ERROR, connectStatus.reason()); } LOG(_logLevel) << "reconnect " << toString() << " ok" << endl; @@ -1009,14 +1090,14 @@ void DBClientConnection::_checkConnection() { void DBClientConnection::setSoTimeout(double timeout) { _so_timeout = timeout; - if (p) { - p->setSocketTimeout(timeout); + if (_port) { + _port->setSocketTimeout(timeout); } } uint64_t DBClientConnection::getSockCreationMicroSec() const { - if (p) { - return p->getSockCreationMicroSec(); + if (_port) { + return _port->getSockCreationMicroSec(); } else { return INVALID_SOCK_CREATION_TIME; } @@ -1126,7 +1207,7 @@ unsigned long long DBClientConnection::query(stdx::functionshutdown(); + _port->shutdown(); throw; } @@ -1491,7 +1572,7 @@ void DBClientConnection::checkResponse(const char* data, int nReturned, bool* re */ *retry = false; - *host = _serverString; + *host = _serverAddress.toString(); if (!_parentReplSetName.empty() && nReturned) { verify(data); @@ -1525,11 +1606,11 @@ void DBClientConnection::handleNotMasterResponse(const BSONElement& elemToCheck) } MONGO_LOG_COMPONENT(1, logger::LogComponent::kReplication) - << "got not master from: " << _serverString << " of repl set: " << _parentReplSetName; + << "got not master from: " << _serverAddress << " of repl set: " << _parentReplSetName; ReplicaSetMonitorPtr monitor = ReplicaSetMonitor::get(_parentReplSetName); if (monitor) { - monitor->failedHost(_server); + monitor->failedHost(_serverAddress); } _failed = true; diff --git a/src/mongo/client/dbclientinterface.h b/src/mongo/client/dbclientinterface.h index bcbaa42d1da..7db99d0a096 100644 --- a/src/mongo/client/dbclientinterface.h +++ b/src/mongo/client/dbclientinterface.h @@ -946,18 +946,14 @@ private: /** * The rpc protocols this client supports. * - * TODO: Change to rpc::supports::kAll once OP_COMMAND is implemented in - * mongos (SERVER-18292). */ - rpc::ProtocolSet _clientRPCProtocols{rpc::supports::kOpQueryOnly}; + rpc::ProtocolSet _clientRPCProtocols{rpc::supports::kAll}; /** - * The rpc protocol the remote server(s) support. - * - * TODO: implement proper detection of RPC protocol support when OP_COMMAND - * is implemented in mongos (SERVER-18292). + * The rpc protocol the remote server(s) support. We support 'opQueryOnly' by default unless + * we detect support for OP_COMMAND at connection time. */ - rpc::ProtocolSet _serverRPCProtocols{rpc::supports::kAll}; + rpc::ProtocolSet _serverRPCProtocols{rpc::supports::kOpQueryOnly}; rpc::RequestMetadataWriter _metadataWriter; rpc::ReplyMetadataReader _metadataReader; @@ -1128,16 +1124,34 @@ public: _numConnections.fetchAndAdd(-1); } - /** Connect to a Mongo database server. + /** + * Connect to a Mongo database server. + * + * If autoReconnect is true, you can try to use the DBClientConnection even when + * false was returned -- it will try to connect again. + * + * @param server server to connect to. + * @param errmsg any relevant error message will appended to the string + * @return false if fails to connect. + */ + virtual bool connect(const HostAndPort& server, std::string& errmsg); - If autoReconnect is true, you can try to use the DBClientConnection even when - false was returned -- it will try to connect again. + /** + * Semantically equivalent to the previous connect method, but returns a Status + * instead of taking an errmsg out parameter. + * + * @param server The server to connect to. + */ + Status connect(const HostAndPort& server); - @param server server to connect to. - @param errmsg any relevant error message will appended to the string - @return false if fails to connect. - */ - virtual bool connect(const HostAndPort& server, std::string& errmsg); + /** + * This version of connect does not run 'isMaster' after creating a TCP connection to the + * remote host. This method should be used only when calling 'isMaster' would create a deadlock, + * such as in 'isSelf'. + * + * @param server The server to connect to. + */ + Status connectSocketOnly(const HostAndPort& server); /** Connect to a Mongo database server. Exception throwing version. Throws a UserException if cannot connect. @@ -1148,11 +1162,6 @@ public: @param serverHostname host to connect to. can include port number ( 127.0.0.1 , 127.0.0.1:5555 ) */ - void connect(const std::string& serverHostname) { - std::string errmsg; - if (!connect(HostAndPort(serverHostname), errmsg)) - throw ConnectException(std::string("can't connect ") + errmsg); - } /** * Logs out the connection for the given database. @@ -1195,29 +1204,29 @@ public: } bool isStillConnected() { - return p ? p->isStillConnected() : true; + return _port ? _port->isStillConnected() : true; } MessagingPort& port() { - verify(p); - return *p; + verify(_port); + return *_port; } std::string toString() const { std::stringstream ss; - ss << _serverString; - if (!_serverAddrString.empty()) - ss << " (" << _serverAddrString << ")"; + ss << _serverAddress; + if (!_resolvedAddress.empty()) + ss << " (" << _resolvedAddress << ")"; if (_failed) ss << " failed"; return ss.str(); } std::string getServerAddress() const { - return _serverString; + return _serverAddress.toString(); } const HostAndPort& getServerHostAndPort() const { - return _server; + return _serverAddress; } virtual void killCursor(long long cursorID); @@ -1270,14 +1279,15 @@ protected: virtual void _auth(const BSONObj& params); virtual void sayPiggyBack(Message& toSend); - std::unique_ptr p; - std::unique_ptr server; + std::unique_ptr _port; + bool _failed; const bool autoReconnect; Backoff autoReconnectBackoff; - HostAndPort _server; // remember for reconnects - std::string _serverString; // server host and port - std::string _serverAddrString; // resolved ip of server + + HostAndPort _serverAddress; + std::string _resolvedAddress; + void _checkConnection(); // throws SocketException if in failed state and not reconnecting or if waiting to reconnect @@ -1288,7 +1298,6 @@ protected: std::map authCache; double _so_timeout; - bool _connect(std::string& errmsg); static AtomicInt32 _numConnections; static bool _lazyKillCursor; // lazy means we piggy back kill cursors on next op diff --git a/src/mongo/client/scoped_db_conn_test.cpp b/src/mongo/client/scoped_db_conn_test.cpp index bfd57ecbb77..52bfb8241db 100644 --- a/src/mongo/client/scoped_db_conn_test.cpp +++ b/src/mongo/client/scoped_db_conn_test.cpp @@ -36,15 +36,19 @@ #include "mongo/client/connpool.h" #include "mongo/client/global_conn_pool.h" +#include "mongo/db/wire_version.h" +#include "mongo/rpc/factory.h" +#include "mongo/rpc/reply_builder_interface.h" +#include "mongo/rpc/request_interface.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/fail_point_service.h" +#include "mongo/util/log.h" #include "mongo/util/net/listen.h" #include "mongo/util/net/message_port.h" #include "mongo/util/net/message_server.h" -#include "mongo/util/fail_point_service.h" -#include "mongo/util/log.h" #include "mongo/util/quick_exit.h" #include "mongo/util/time_support.h" #include "mongo/util/timer.h" -#include "mongo/unittest/unittest.h" /** * Tests for ScopedDbConnection, particularly in connection pool management. @@ -107,7 +111,23 @@ class DummyMessageHandler final : public MessageHandler { public: virtual void connected(AbstractMessagingPort* p) {} - virtual void process(Message& m, AbstractMessagingPort* por) {} + virtual void process(Message& m, AbstractMessagingPort* port) { + auto request = rpc::makeRequest(&m); + auto reply = rpc::makeReplyBuilder(request->getProtocol()); + + BSONObjBuilder commandResponse; + + // We need to handle the isMaster received during connection. + if (request->getCommandName() == "isMaster") { + commandResponse.append("maxWireVersion", WireVersion::RELEASE_3_1_5); + commandResponse.append("minWireVersion", WireVersion::RELEASE_2_4_AND_BEFORE); + } + + port->reply(m, + *reply->setMetadata(rpc::makeEmptyMetadata()) + .setCommandReply(commandResponse.done()) + .done()); + } } dummyHandler; @@ -221,13 +241,13 @@ public: // Make sure the dummy server is up and running before proceeding while (true) { - try { - conn.connect(TARGET_HOST); + auto connectStatus = conn.connect(HostAndPort{TARGET_HOST}); + if (connectStatus.isOK()) { break; - } catch (const ConnectException&) { - if (timer.seconds() > 20) { - FAIL("Timed out connecting to dummy server"); - } + } + if (timer.seconds() > 20) { + FAIL(str::stream() + << "Timed out connecting to dummy server: " << connectStatus.toString()); } } } diff --git a/src/mongo/db/repl/isself.cpp b/src/mongo/db/repl/isself.cpp index e34b1cc9660..2146a834588 100644 --- a/src/mongo/db/repl/isself.cpp +++ b/src/mongo/db/repl/isself.cpp @@ -191,9 +191,14 @@ bool isSelf(const HostAndPort& hostAndPort) { try { DBClientConnection conn; - std::string errmsg; conn.setSoTimeout(30); // 30 second timeout - if (!conn.connect(hostAndPort, errmsg)) { + + // We need to avoid the isMaster call triggered by a normal connect, which would + // cause a deadlock. 'isSelf' is called by the Replication Coordinator when validating + // a replica set configuration document, but the 'isMaster' command requires a lock on the + // replication coordinator to execute. As such we call we call 'connectSocketOnly', which + // does not call 'isMaster'. + if (!conn.connectSocketOnly(hostAndPort).isOK()) { return false; } diff --git a/src/mongo/rpc/factory.cpp b/src/mongo/rpc/factory.cpp index 085fa9e7a7b..ae16d16c62e 100644 --- a/src/mongo/rpc/factory.cpp +++ b/src/mongo/rpc/factory.cpp @@ -31,8 +31,12 @@ #include "mongo/rpc/factory.h" #include "mongo/rpc/command_reply.h" +#include "mongo/rpc/command_reply_builder.h" +#include "mongo/rpc/command_request.h" #include "mongo/rpc/command_request_builder.h" #include "mongo/rpc/legacy_reply.h" +#include "mongo/rpc/legacy_reply_builder.h" +#include "mongo/rpc/legacy_request.h" #include "mongo/rpc/legacy_request_builder.h" #include "mongo/rpc/protocol.h" #include "mongo/stdx/memory.h" @@ -68,5 +72,29 @@ std::unique_ptr makeReply(const Message* unownedMessage) { } } +std::unique_ptr makeRequest(const Message* unownedMessage) { + switch (unownedMessage->operation()) { + case mongo::dbQuery: + return stdx::make_unique(unownedMessage); + case mongo::dbCommand: + return stdx::make_unique(unownedMessage); + default: + uasserted(ErrorCodes::UnsupportedFormat, + str::stream() << "Received a reply message with unexpected opcode: " + << unownedMessage->operation()); + } +} + +std::unique_ptr makeReplyBuilder(Protocol protocol) { + switch (protocol) { + case Protocol::kOpQuery: + return stdx::make_unique(); + case Protocol::kOpCommandV1: + return stdx::make_unique(); + default: + MONGO_UNREACHABLE; + } +} + } // namespace rpc } // namespace mongo diff --git a/src/mongo/rpc/factory.h b/src/mongo/rpc/factory.h index 7de92664b69..2776ffacc47 100644 --- a/src/mongo/rpc/factory.h +++ b/src/mongo/rpc/factory.h @@ -41,8 +41,10 @@ namespace mongo { class Message; namespace rpc { +class ReplyBuilderInterface; class ReplyInterface; class RequestBuilderInterface; +class RequestInterface; /** * Returns the appropriate concrete RequestBuilder. Throws if one cannot be chosen. @@ -56,5 +58,16 @@ std::unique_ptr makeRequestBuilder(ProtocolSet clientPr */ std::unique_ptr makeReply(const Message* unownedMessage); +/** + * Returns the appropriate concrete Request according to the contents of the message. + * Throws if one cannot be chosen. + */ +std::unique_ptr makeRequest(const Message* unownedMessage); + +/** + * Returns the appropriate concrete ReplyBuilder. + */ +std::unique_ptr makeReplyBuilder(Protocol protocol); + } // namespace rpc } // namespace mongo diff --git a/src/mongo/rpc/protocol.cpp b/src/mongo/rpc/protocol.cpp index f50860b85fd..3bb68f77672 100644 --- a/src/mongo/rpc/protocol.cpp +++ b/src/mongo/rpc/protocol.cpp @@ -34,6 +34,9 @@ #include #include "mongo/base/string_data.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/wire_version.h" #include "mongo/util/mongoutils/str.h" namespace mongo { @@ -105,5 +108,45 @@ StatusWith parseProtocolSet(StringData repr) { << "and 'all' (0x3) are supported."); } +StatusWith parseProtocolSetFromIsMasterReply(const BSONObj& isMasterReply) { + long long maxWireVersion; + auto maxWireExtractStatus = + bsonExtractIntegerField(isMasterReply, "maxWireVersion", &maxWireVersion); + + long long minWireVersion; + auto minWireExtractStatus = + bsonExtractIntegerField(isMasterReply, "minWireVersion", &minWireVersion); + + // MongoDB 2.4 and earlier do not have maxWireVersion/minWireVersion in their 'isMaster' replies + if ((maxWireExtractStatus == minWireExtractStatus) && + (maxWireExtractStatus == ErrorCodes::NoSuchKey)) { + return supports::kOpQueryOnly; + } else if (!maxWireExtractStatus.isOK()) { + return maxWireExtractStatus; + } else if (!minWireExtractStatus.isOK()) { + return minWireExtractStatus; + } + + bool hasWireVersionForOpCommandInMongod = (minWireVersion <= WireVersion::RELEASE_3_1_5) && + (maxWireVersion >= WireVersion::RELEASE_3_1_5); + + bool isMongos = false; + + std::string msgField; + auto msgFieldExtractStatus = bsonExtractStringField(isMasterReply, "msg", &msgField); + + if (msgFieldExtractStatus == ErrorCodes::NoSuchKey) { + isMongos = false; + } else if (!msgFieldExtractStatus.isOK()) { + return msgFieldExtractStatus; + } else { + isMongos = (msgField == "isdbgrid"); + } + + return (!isMongos && hasWireVersionForOpCommandInMongod) ? supports::kAll + : supports::kOpQueryOnly; +} + + } // namespace rpc } // namespace mongo diff --git a/src/mongo/rpc/protocol.h b/src/mongo/rpc/protocol.h index 99e6cacc92c..8999f0b40d2 100644 --- a/src/mongo/rpc/protocol.h +++ b/src/mongo/rpc/protocol.h @@ -35,6 +35,7 @@ #include "mongo/platform/cstdint.h" namespace mongo { +class BSONObj; namespace rpc { /** @@ -95,5 +96,10 @@ StatusWith toString(ProtocolSet protocols); */ StatusWith parseProtocolSet(StringData repr); +/** + * Determines the ProtocolSet of a remote server from an isMaster reply. + */ +StatusWith parseProtocolSetFromIsMasterReply(const BSONObj& isMasterReply); + } // namespace rpc } // namespace mongo diff --git a/src/mongo/rpc/protocol_test.cpp b/src/mongo/rpc/protocol_test.cpp index 71ff1559fda..0f26cc586b2 100644 --- a/src/mongo/rpc/protocol_test.cpp +++ b/src/mongo/rpc/protocol_test.cpp @@ -29,12 +29,17 @@ #include "mongo/platform/basic.h" #include "mongo/base/status.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/wire_version.h" #include "mongo/rpc/protocol.h" #include "mongo/unittest/unittest.h" namespace { +using mongo::WireVersion; using namespace mongo::rpc; +using mongo::unittest::assertGet; +using mongo::BSONObj; // Checks if negotiation of the first to protocol sets results in the 'proto' const auto assert_negotiated = [](ProtocolSet fst, ProtocolSet snd, Protocol proto) { @@ -63,4 +68,35 @@ TEST(Protocol, FailedNegotiation) { assert_not_negotiated(supports::kOpCommandOnly, supports::kNone); } +TEST(Protocol, parseProtocolSetFromIsMasterReply) { + { + // MongoDB 3.2 (mongod) + auto mongod32 = BSON("maxWireVersion" + << static_cast(WireVersion::RELEASE_3_1_5) << "minWireVersion" + << static_cast(WireVersion::RELEASE_2_4_AND_BEFORE)); + + ASSERT_EQ(assertGet(parseProtocolSetFromIsMasterReply(mongod32)), supports::kAll); + } + { + // MongoDB 3.2 (mongos) + auto mongos32 = BSON("maxWireVersion" + << static_cast(WireVersion::RELEASE_3_1_5) << "minWireVersion" + << static_cast(WireVersion::RELEASE_2_4_AND_BEFORE) << "msg" + << "isdbgrid"); + + ASSERT_EQ(assertGet(parseProtocolSetFromIsMasterReply(mongos32)), supports::kOpQueryOnly); + } + { + // MongoDB 3.0 (mongod) + auto mongod30 = BSON("maxWireVersion" + << static_cast(WireVersion::RELEASE_2_7_7) << "minWireVersion" + << static_cast(WireVersion::RELEASE_2_4_AND_BEFORE)); + ASSERT_EQ(assertGet(parseProtocolSetFromIsMasterReply(mongod30)), supports::kOpQueryOnly); + } + { + auto mongod24 = BSONObj(); + ASSERT_EQ(assertGet(parseProtocolSetFromIsMasterReply(mongod24)), supports::kOpQueryOnly); + } +} + } // namespace diff --git a/src/mongo/s/client/sharding_connection_hook.cpp b/src/mongo/s/client/sharding_connection_hook.cpp index 80e924e1121..176491f198a 100644 --- a/src/mongo/s/client/sharding_connection_hook.cpp +++ b/src/mongo/s/client/sharding_connection_hook.cpp @@ -49,28 +49,6 @@ namespace mongo { using std::string; -namespace { - -bool initWireVersion(DBClientBase* conn, std::string* errMsg) { - BSONObj response; - if (!conn->runCommand("admin", BSON("isMaster" << 1), response)) { - *errMsg = str::stream() << "Failed to determine wire version " - << "for internal connection: " << response; - return false; - } - - if (response.hasField("minWireVersion") && response.hasField("maxWireVersion")) { - int minWireVersion = response["minWireVersion"].numberInt(); - int maxWireVersion = response["maxWireVersion"].numberInt(); - conn->setWireVersions(minWireVersion, maxWireVersion); - } - - return true; -} - -} // namespace - - ShardingConnectionHook::ShardingConnectionHook(bool shardedConnections) : _shardedConnections(shardedConnections) {} @@ -87,18 +65,6 @@ void ShardingConnectionHook::onCreate(DBClientBase* conn) { result); } - // Initialize the wire version of single connections - if (conn->type() == ConnectionString::MASTER) { - LOG(2) << "checking wire version of new connection " << conn->toString(); - - // Initialize the wire protocol version of the connection to find out if we - // can send write commands to this connection. - string errMsg; - if (!initWireVersion(conn, &errMsg)) { - uasserted(17363, errMsg); - } - } - if (_shardedConnections) { // For every DBClient created by mongos, add a hook that will capture the response from // commands we pass along from the client, so that we can target the correct node when diff --git a/src/mongo/shell/shell_options.h b/src/mongo/shell/shell_options.h index cc4edb8e3e5..c9326b85547 100644 --- a/src/mongo/shell/shell_options.h +++ b/src/mongo/shell/shell_options.h @@ -28,6 +28,7 @@ #pragma once +#include #include #include @@ -69,14 +70,14 @@ struct ShellGlobalParams { std::string readMode; - rpc::ProtocolSet rpcProtocols; + boost::optional rpcProtocols; ShellGlobalParams() : autoKillOp(false), useWriteCommandsDefault(true), writeMode("commands"), readMode("compatibility"), - rpcProtocols(rpc::supports::kOpQueryOnly) {} + rpcProtocols() {} }; extern ShellGlobalParams shellGlobalParams; diff --git a/src/mongo/shell/shell_utils.cpp b/src/mongo/shell/shell_utils.cpp index 81dd1d5225f..36f6daa4d2d 100644 --- a/src/mongo/shell/shell_utils.cpp +++ b/src/mongo/shell/shell_utils.cpp @@ -352,7 +352,12 @@ void onConnect(DBClientWithCommands& c) { if (_nokillop) { return; } - c.setClientRPCProtocols(shellGlobalParams.rpcProtocols); + + // Only override the default rpcProtocols if they were set on the command line. + if (shellGlobalParams.rpcProtocols) { + c.setClientRPCProtocols(*shellGlobalParams.rpcProtocols); + } + connectionRegistry.registerConnection(c); } diff --git a/src/mongo/shell/shell_utils_launcher.cpp b/src/mongo/shell/shell_utils_launcher.cpp index 370a0925397..56356ba4c6c 100644 --- a/src/mongo/shell/shell_utils_launcher.cpp +++ b/src/mongo/shell/shell_utils_launcher.cpp @@ -55,6 +55,7 @@ #include "mongo/shell/shell_utils.h" #include "mongo/stdx/thread.h" #include "mongo/util/log.h" +#include "mongo/util/net/hostandport.h" #include "mongo/util/quick_exit.h" #include "mongo/util/scopeguard.h" #include "mongo/util/signal_win32.h" @@ -660,7 +661,7 @@ inline void kill_wrapper(ProcessId pid, int sig, int port, const BSONObj& opt) { // try { DBClientConnection conn; - conn.connect("127.0.0.1:" + BSONObjBuilder::numStr(port)); + conn.connect(HostAndPort{"127.0.0.1:" + BSONObjBuilder::numStr(port)}); BSONElement authObj = opt["auth"]; diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp index dafc44b272b..b22153a3db9 100644 --- a/src/mongo/tools/bridge.cpp +++ b/src/mongo/tools/bridge.cpp @@ -94,7 +94,7 @@ public: int oldId = m.header().getId(); if (m.operation() == dbQuery || m.operation() == dbMsg || - m.operation() == dbGetMore) { + m.operation() == dbGetMore || m.operation() == dbCommand) { bool exhaust = false; if (m.operation() == dbQuery) { DbMessage d(m); diff --git a/src/mongo/tools/sniffer.cpp b/src/mongo/tools/sniffer.cpp index 3e4712c1223..a3c10c23d18 100644 --- a/src/mongo/tools/sniffer.cpp +++ b/src/mongo/tools/sniffer.cpp @@ -67,6 +67,8 @@ #include "mongo/db/dbmessage.h" #include "mongo/util/net/message.h" #include "mongo/db/storage/mmap_v1/mmap.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/net/message.h" #include "mongo/util/quick_exit.h" #include "mongo/util/text.h" @@ -362,7 +364,7 @@ void processMessage(Connection& c, Message& m) { std::shared_ptr conn = forwarder[c]; if (!conn) { conn.reset(new DBClientConnection(true)); - conn->connect(forwardAddress); + uassertStatusOK(conn->connect(mongo::HostAndPort{forwardAddress})); forwarder[c] = conn; } if (m.operation() == mongo::dbQuery || m.operation() == mongo::dbGetMore) { -- cgit v1.2.1