diff options
author | Adam Midvidy <amidvidy@gmail.com> | 2015-06-24 13:54:45 -0400 |
---|---|---|
committer | Adam Midvidy <amidvidy@gmail.com> | 2015-06-24 15:50:30 -0400 |
commit | 1fc9cba6988ab1b600be1a0549caf6146619e4df (patch) | |
tree | a5fcf53a04666b1bfc2ca43332cd824894154985 /src/mongo/client/dbclient.cpp | |
parent | 313c3bdc2547f2746639e84f8668a756ad95d8f3 (diff) | |
download | mongo-1fc9cba6988ab1b600be1a0549caf6146619e4df.tar.gz |
SERVER-19035 autodetect support for OP_COMMAND in remote servers
Diffstat (limited to 'src/mongo/client/dbclient.cpp')
-rw-r--r-- | src/mongo/client/dbclient.cpp | 161 |
1 files changed, 121 insertions, 40 deletions
diff --git a/src/mongo/client/dbclient.cpp b/src/mongo/client/dbclient.cpp index 53e0ddc548d..e25645489c5 100644 --- a/src/mongo/client/dbclient.cpp +++ b/src/mongo/client/dbclient.cpp @@ -45,6 +45,7 @@ #include "mongo/db/auth/internal_user_auth.h" #include "mongo/db/json.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/wire_version.h" #include "mongo/rpc/factory.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata.h" @@ -902,46 +903,122 @@ BSONObj DBClientInterface::findOne(const string& ns, return v.empty() ? BSONObj() : v[0]; } -bool DBClientConnection::connect(const HostAndPort& server, string& errmsg) { - _server = server; - _serverString = _server.toString(); - return _connect(errmsg); -} +namespace { -bool DBClientConnection::_connect(string& errmsg) { - _serverString = _server.toString(); - _serverAddrString.clear(); +/** + * RAII class to force usage of OP_QUERY on a connection. + */ +class ScopedForceOpQuery { +public: + ScopedForceOpQuery(DBClientBase* conn) + : _conn(conn), _oldProtos(conn->getClientRPCProtocols()) { + _conn->setClientRPCProtocols(rpc::supports::kOpQueryOnly); + } - // we keep around SockAddr for connection life -- maybe MessagingPort - // requires that? - std::unique_ptr<SockAddr> serverSockAddr(new SockAddr(_server.host().c_str(), _server.port())); - if (!serverSockAddr->isValid()) { - errmsg = str::stream() << "couldn't initialize connection to host " - << _server.host().c_str() << ", address is invalid"; - return false; + ~ScopedForceOpQuery() { + _conn->setClientRPCProtocols(_oldProtos); } - server.reset(serverSockAddr.release()); - p.reset(new MessagingPort(_so_timeout, _logLevel)); +private: + DBClientBase* const _conn; + const rpc::ProtocolSet _oldProtos; +}; - if (_server.host().empty()) { - errmsg = str::stream() << "couldn't connect to server " << toString() << ", host is empty"; - return false; +/** +* Initializes the wire version of conn, and returns the isMaster reply. +*/ +StatusWith<BSONObj> initWireVersion(DBClientBase* conn) { + try { + // We need to force the usage of OP_QUERY on this command, even if we have previously + // detected support for OP_COMMAND on a connection. This is necessary to handle the case + // where we reconnect to an older version of MongoDB running at the same host/port. + ScopedForceOpQuery forceOpQuery{conn}; + + auto result = conn->runCommandWithMetadata( + "admin", "isMaster", rpc::makeEmptyMetadata(), BSON("isMaster" << 1)); + + BSONObj isMasterObj = result->getCommandReply().getOwned(); + + if (isMasterObj.hasField("minWireVersion") && isMasterObj.hasField("maxWireVersion")) { + int minWireVersion = isMasterObj["minWireVersion"].numberInt(); + int maxWireVersion = isMasterObj["maxWireVersion"].numberInt(); + conn->setWireVersions(minWireVersion, maxWireVersion); + } + + return isMasterObj; + + } catch (...) { + return exceptionToStatus(); } +} - _serverAddrString = server->getAddr(); +} // namespace - if (_serverAddrString == "0.0.0.0") { - errmsg = str::stream() << "couldn't connect to server " << toString() - << ", address resolved to 0.0.0.0"; +bool DBClientConnection::connect(const HostAndPort& server, std::string& errmsg) { + auto connectStatus = connect(server); + if (!connectStatus.isOK()) { + errmsg = connectStatus.reason(); return false; } + return true; +} + + +Status DBClientConnection::connect(const HostAndPort& serverAddress) { + auto connectStatus = connectSocketOnly(serverAddress); + if (!connectStatus.isOK()) { + return connectStatus; + } - if (!p->connect(*server)) { - errmsg = str::stream() << "couldn't connect to server " << toString() - << ", connection attempt failed"; + auto swIsMasterReply = initWireVersion(this); + if (!swIsMasterReply.isOK()) { _failed = true; - return false; + return swIsMasterReply.getStatus(); + } + + auto swProtocolSet = rpc::parseProtocolSetFromIsMasterReply(swIsMasterReply.getValue()); + if (!swProtocolSet.isOK()) { + return swProtocolSet.getStatus(); + } + + _setServerRPCProtocols(swProtocolSet.getValue()); + + return Status::OK(); +} + +Status DBClientConnection::connectSocketOnly(const HostAndPort& serverAddress) { + _serverAddress = serverAddress; + _failed = true; + + // We need to construct a SockAddr so we can resolve the address. + SockAddr osAddr{serverAddress.host().c_str(), serverAddress.port()}; + + if (!osAddr.isValid()) { + return Status(ErrorCodes::InvalidOptions, + str::stream() << "couldn't initialize connection to host " + << serverAddress.host() << ", address is invalid"); + } + + _port.reset(new MessagingPort(_so_timeout, _logLevel)); + + if (serverAddress.host().empty()) { + return Status(ErrorCodes::InvalidOptions, + str::stream() << "couldn't connect to server " << _serverAddress.toString() + << ", host is empty"); + } + + if (osAddr.getAddr() == "0.0.0.0") { + return Status(ErrorCodes::InvalidOptions, + str::stream() << "couldn't connect to server " << _serverAddress.toString() + << ", address resolved to 0.0.0.0"); + } + + _resolvedAddress = osAddr.getAddr(); + + if (!_port->connect(osAddr)) { + return Status(ErrorCodes::OperationFailed, + str::stream() << "couldn't connect to server " << _serverAddress.toString() + << ", connection attempt failed"); } else { LOG(1) << "connected to server " << toString() << endl; } @@ -949,11 +1026,14 @@ bool DBClientConnection::_connect(string& errmsg) { #ifdef MONGO_CONFIG_SSL int sslModeVal = sslGlobalParams.sslMode.load(); if (sslModeVal == SSLParams::SSLMode_preferSSL || sslModeVal == SSLParams::SSLMode_requireSSL) { - return p->secure(sslManager(), _server.host()); + if (!_port->secure(sslManager(), serverAddress.host())) { + return Status(ErrorCodes::OperationFailed, "Failed to initialize SSL on connection"); + } } #endif - return true; + _failed = false; + return Status::OK(); } void DBClientConnection::logout(const string& dbname, BSONObj& info) { @@ -988,10 +1068,11 @@ void DBClientConnection::_checkConnection() { LOG(_logLevel) << "trying reconnect to " << toString() << endl; string errmsg; _failed = false; - if (!_connect(errmsg)) { + auto connectStatus = connect(_serverAddress); + if (!connectStatus.isOK()) { _failed = true; LOG(_logLevel) << "reconnect " << toString() << " failed " << errmsg << endl; - throw SocketException(SocketException::CONNECT_ERROR, toString()); + throw SocketException(SocketException::CONNECT_ERROR, connectStatus.reason()); } LOG(_logLevel) << "reconnect " << toString() << " ok" << endl; @@ -1009,14 +1090,14 @@ void DBClientConnection::_checkConnection() { void DBClientConnection::setSoTimeout(double timeout) { _so_timeout = timeout; - if (p) { - p->setSocketTimeout(timeout); + if (_port) { + _port->setSocketTimeout(timeout); } } uint64_t DBClientConnection::getSockCreationMicroSec() const { - if (p) { - return p->getSockCreationMicroSec(); + if (_port) { + return _port->getSockCreationMicroSec(); } else { return INVALID_SOCK_CREATION_TIME; } @@ -1126,7 +1207,7 @@ unsigned long long DBClientConnection::query(stdx::function<void(DBClientCursorB we have to reconnect. */ _failed = true; - p->shutdown(); + _port->shutdown(); throw; } @@ -1491,7 +1572,7 @@ void DBClientConnection::checkResponse(const char* data, int nReturned, bool* re */ *retry = false; - *host = _serverString; + *host = _serverAddress.toString(); if (!_parentReplSetName.empty() && nReturned) { verify(data); @@ -1525,11 +1606,11 @@ void DBClientConnection::handleNotMasterResponse(const BSONElement& elemToCheck) } MONGO_LOG_COMPONENT(1, logger::LogComponent::kReplication) - << "got not master from: " << _serverString << " of repl set: " << _parentReplSetName; + << "got not master from: " << _serverAddress << " of repl set: " << _parentReplSetName; ReplicaSetMonitorPtr monitor = ReplicaSetMonitor::get(_parentReplSetName); if (monitor) { - monitor->failedHost(_server); + monitor->failedHost(_serverAddress); } _failed = true; |