summaryrefslogtreecommitdiff
path: root/src/mongo/client/dbclient.cpp
diff options
context:
space:
mode:
authorAdam Midvidy <amidvidy@gmail.com>2015-06-24 13:54:45 -0400
committerAdam Midvidy <amidvidy@gmail.com>2015-06-24 15:50:30 -0400
commit1fc9cba6988ab1b600be1a0549caf6146619e4df (patch)
treea5fcf53a04666b1bfc2ca43332cd824894154985 /src/mongo/client/dbclient.cpp
parent313c3bdc2547f2746639e84f8668a756ad95d8f3 (diff)
downloadmongo-1fc9cba6988ab1b600be1a0549caf6146619e4df.tar.gz
SERVER-19035 autodetect support for OP_COMMAND in remote servers
Diffstat (limited to 'src/mongo/client/dbclient.cpp')
-rw-r--r--src/mongo/client/dbclient.cpp161
1 files changed, 121 insertions, 40 deletions
diff --git a/src/mongo/client/dbclient.cpp b/src/mongo/client/dbclient.cpp
index 53e0ddc548d..e25645489c5 100644
--- a/src/mongo/client/dbclient.cpp
+++ b/src/mongo/client/dbclient.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/auth/internal_user_auth.h"
#include "mongo/db/json.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/wire_version.h"
#include "mongo/rpc/factory.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata.h"
@@ -902,46 +903,122 @@ BSONObj DBClientInterface::findOne(const string& ns,
return v.empty() ? BSONObj() : v[0];
}
-bool DBClientConnection::connect(const HostAndPort& server, string& errmsg) {
- _server = server;
- _serverString = _server.toString();
- return _connect(errmsg);
-}
+namespace {
-bool DBClientConnection::_connect(string& errmsg) {
- _serverString = _server.toString();
- _serverAddrString.clear();
+/**
+ * RAII class to force usage of OP_QUERY on a connection.
+ */
+class ScopedForceOpQuery {
+public:
+ ScopedForceOpQuery(DBClientBase* conn)
+ : _conn(conn), _oldProtos(conn->getClientRPCProtocols()) {
+ _conn->setClientRPCProtocols(rpc::supports::kOpQueryOnly);
+ }
- // we keep around SockAddr for connection life -- maybe MessagingPort
- // requires that?
- std::unique_ptr<SockAddr> serverSockAddr(new SockAddr(_server.host().c_str(), _server.port()));
- if (!serverSockAddr->isValid()) {
- errmsg = str::stream() << "couldn't initialize connection to host "
- << _server.host().c_str() << ", address is invalid";
- return false;
+ ~ScopedForceOpQuery() {
+ _conn->setClientRPCProtocols(_oldProtos);
}
- server.reset(serverSockAddr.release());
- p.reset(new MessagingPort(_so_timeout, _logLevel));
+private:
+ DBClientBase* const _conn;
+ const rpc::ProtocolSet _oldProtos;
+};
- if (_server.host().empty()) {
- errmsg = str::stream() << "couldn't connect to server " << toString() << ", host is empty";
- return false;
+/**
+* Initializes the wire version of conn, and returns the isMaster reply.
+*/
+StatusWith<BSONObj> initWireVersion(DBClientBase* conn) {
+ try {
+ // We need to force the usage of OP_QUERY on this command, even if we have previously
+ // detected support for OP_COMMAND on a connection. This is necessary to handle the case
+ // where we reconnect to an older version of MongoDB running at the same host/port.
+ ScopedForceOpQuery forceOpQuery{conn};
+
+ auto result = conn->runCommandWithMetadata(
+ "admin", "isMaster", rpc::makeEmptyMetadata(), BSON("isMaster" << 1));
+
+ BSONObj isMasterObj = result->getCommandReply().getOwned();
+
+ if (isMasterObj.hasField("minWireVersion") && isMasterObj.hasField("maxWireVersion")) {
+ int minWireVersion = isMasterObj["minWireVersion"].numberInt();
+ int maxWireVersion = isMasterObj["maxWireVersion"].numberInt();
+ conn->setWireVersions(minWireVersion, maxWireVersion);
+ }
+
+ return isMasterObj;
+
+ } catch (...) {
+ return exceptionToStatus();
}
+}
- _serverAddrString = server->getAddr();
+} // namespace
- if (_serverAddrString == "0.0.0.0") {
- errmsg = str::stream() << "couldn't connect to server " << toString()
- << ", address resolved to 0.0.0.0";
+bool DBClientConnection::connect(const HostAndPort& server, std::string& errmsg) {
+ auto connectStatus = connect(server);
+ if (!connectStatus.isOK()) {
+ errmsg = connectStatus.reason();
return false;
}
+ return true;
+}
+
+
+Status DBClientConnection::connect(const HostAndPort& serverAddress) {
+ auto connectStatus = connectSocketOnly(serverAddress);
+ if (!connectStatus.isOK()) {
+ return connectStatus;
+ }
- if (!p->connect(*server)) {
- errmsg = str::stream() << "couldn't connect to server " << toString()
- << ", connection attempt failed";
+ auto swIsMasterReply = initWireVersion(this);
+ if (!swIsMasterReply.isOK()) {
_failed = true;
- return false;
+ return swIsMasterReply.getStatus();
+ }
+
+ auto swProtocolSet = rpc::parseProtocolSetFromIsMasterReply(swIsMasterReply.getValue());
+ if (!swProtocolSet.isOK()) {
+ return swProtocolSet.getStatus();
+ }
+
+ _setServerRPCProtocols(swProtocolSet.getValue());
+
+ return Status::OK();
+}
+
+Status DBClientConnection::connectSocketOnly(const HostAndPort& serverAddress) {
+ _serverAddress = serverAddress;
+ _failed = true;
+
+ // We need to construct a SockAddr so we can resolve the address.
+ SockAddr osAddr{serverAddress.host().c_str(), serverAddress.port()};
+
+ if (!osAddr.isValid()) {
+ return Status(ErrorCodes::InvalidOptions,
+ str::stream() << "couldn't initialize connection to host "
+ << serverAddress.host() << ", address is invalid");
+ }
+
+ _port.reset(new MessagingPort(_so_timeout, _logLevel));
+
+ if (serverAddress.host().empty()) {
+ return Status(ErrorCodes::InvalidOptions,
+ str::stream() << "couldn't connect to server " << _serverAddress.toString()
+ << ", host is empty");
+ }
+
+ if (osAddr.getAddr() == "0.0.0.0") {
+ return Status(ErrorCodes::InvalidOptions,
+ str::stream() << "couldn't connect to server " << _serverAddress.toString()
+ << ", address resolved to 0.0.0.0");
+ }
+
+ _resolvedAddress = osAddr.getAddr();
+
+ if (!_port->connect(osAddr)) {
+ return Status(ErrorCodes::OperationFailed,
+ str::stream() << "couldn't connect to server " << _serverAddress.toString()
+ << ", connection attempt failed");
} else {
LOG(1) << "connected to server " << toString() << endl;
}
@@ -949,11 +1026,14 @@ bool DBClientConnection::_connect(string& errmsg) {
#ifdef MONGO_CONFIG_SSL
int sslModeVal = sslGlobalParams.sslMode.load();
if (sslModeVal == SSLParams::SSLMode_preferSSL || sslModeVal == SSLParams::SSLMode_requireSSL) {
- return p->secure(sslManager(), _server.host());
+ if (!_port->secure(sslManager(), serverAddress.host())) {
+ return Status(ErrorCodes::OperationFailed, "Failed to initialize SSL on connection");
+ }
}
#endif
- return true;
+ _failed = false;
+ return Status::OK();
}
void DBClientConnection::logout(const string& dbname, BSONObj& info) {
@@ -988,10 +1068,11 @@ void DBClientConnection::_checkConnection() {
LOG(_logLevel) << "trying reconnect to " << toString() << endl;
string errmsg;
_failed = false;
- if (!_connect(errmsg)) {
+ auto connectStatus = connect(_serverAddress);
+ if (!connectStatus.isOK()) {
_failed = true;
LOG(_logLevel) << "reconnect " << toString() << " failed " << errmsg << endl;
- throw SocketException(SocketException::CONNECT_ERROR, toString());
+ throw SocketException(SocketException::CONNECT_ERROR, connectStatus.reason());
}
LOG(_logLevel) << "reconnect " << toString() << " ok" << endl;
@@ -1009,14 +1090,14 @@ void DBClientConnection::_checkConnection() {
void DBClientConnection::setSoTimeout(double timeout) {
_so_timeout = timeout;
- if (p) {
- p->setSocketTimeout(timeout);
+ if (_port) {
+ _port->setSocketTimeout(timeout);
}
}
uint64_t DBClientConnection::getSockCreationMicroSec() const {
- if (p) {
- return p->getSockCreationMicroSec();
+ if (_port) {
+ return _port->getSockCreationMicroSec();
} else {
return INVALID_SOCK_CREATION_TIME;
}
@@ -1126,7 +1207,7 @@ unsigned long long DBClientConnection::query(stdx::function<void(DBClientCursorB
we have to reconnect.
*/
_failed = true;
- p->shutdown();
+ _port->shutdown();
throw;
}
@@ -1491,7 +1572,7 @@ void DBClientConnection::checkResponse(const char* data, int nReturned, bool* re
*/
*retry = false;
- *host = _serverString;
+ *host = _serverAddress.toString();
if (!_parentReplSetName.empty() && nReturned) {
verify(data);
@@ -1525,11 +1606,11 @@ void DBClientConnection::handleNotMasterResponse(const BSONElement& elemToCheck)
}
MONGO_LOG_COMPONENT(1, logger::LogComponent::kReplication)
- << "got not master from: " << _serverString << " of repl set: " << _parentReplSetName;
+ << "got not master from: " << _serverAddress << " of repl set: " << _parentReplSetName;
ReplicaSetMonitorPtr monitor = ReplicaSetMonitor::get(_parentReplSetName);
if (monitor) {
- monitor->failedHost(_server);
+ monitor->failedHost(_serverAddress);
}
_failed = true;