/* Copyright 2009 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork #include "mongo/platform/basic.h" #include "mongo/client/dbclient_rs.h" #include #include #include "mongo/bson/util/builder.h" #include "mongo/client/connpool.h" #include "mongo/client/dbclient_cursor.h" #include "mongo/client/global_conn_pool.h" #include "mongo/client/read_preference.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/db/auth/sasl_command_constants.h" #include "mongo/db/dbmessage.h" #include "mongo/db/jsobj.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" namespace mongo { using std::shared_ptr; using std::unique_ptr; using std::endl; using std::map; using std::set; using std::string; using std::vector; namespace { /* * Set of commands that can be used with $readPreference */ set _secOkCmdList; class PopulateReadPrefSecOkCmdList { public: PopulateReadPrefSecOkCmdList() { _secOkCmdList.insert("aggregate"); _secOkCmdList.insert("collStats"); _secOkCmdList.insert("count"); _secOkCmdList.insert("distinct"); _secOkCmdList.insert("dbStats"); _secOkCmdList.insert("explain"); _secOkCmdList.insert("find"); _secOkCmdList.insert("geoNear"); _secOkCmdList.insert("geoSearch"); _secOkCmdList.insert("group"); } } _populateReadPrefSecOkCmdList; /** * Maximum number of retries to make for auto-retry logic when performing a slave ok operation. */ const size_t MAX_RETRY = 3; /** * Extracts the read preference settings from the query document. Note that this method * assumes that the query is ok for secondaries so it defaults to * ReadPreference::SecondaryPreferred when nothing is specified. Supports the following * format: * * Format A (official format): * { query: , $readPreference: } * * Format B (unofficial internal format from mongos): * { , $queryOptions: { $readPreference: }} * * @param query the raw query document * * @return the read preference setting if a read preference exists, otherwise the default read * preference of Primary_Only. If the tags field was not present, it will contain one * empty tag document {} which matches any tag. * * @throws AssertionException if the read preference object is malformed */ std::unique_ptr _extractReadPref(const BSONObj& query, int queryOptions) { // Default read pref is primary only or secondary preferred with slaveOK const auto defaultReadPref = queryOptions & QueryOption_SlaveOk ? ReadPreference::SecondaryPreferred : ReadPreference::PrimaryOnly; auto readPrefContainingObj = query; if (auto elem = query["$queryOptions"]) { // The readPreference is embedded in the $queryOptions field. readPrefContainingObj = elem.Obj(); } return stdx::make_unique(uassertStatusOK( ReadPreferenceSetting::fromContainingBSON(readPrefContainingObj, defaultReadPref))); } } // namespace // -------------------------------- // ----- DBClientReplicaSet --------- // -------------------------------- bool DBClientReplicaSet::_authPooledSecondaryConn = true; DBClientReplicaSet::DBClientReplicaSet(const string& name, const vector& servers, StringData applicationName, double so_timeout, MongoURI uri) : _setName(name), _applicationName(applicationName.toString()), _so_timeout(so_timeout), _uri(std::move(uri)) { if (uri.isValid()) { _rsm = ReplicaSetMonitor::createIfNeeded(_uri); } else { _rsm = ReplicaSetMonitor::createIfNeeded(name, set(servers.begin(), servers.end())); } } ReplicaSetMonitorPtr DBClientReplicaSet::_getMonitor() { // If you can't get a ReplicaSetMonitor then this connection isn't valid uassert(16340, str::stream() << "No replica set monitor active and no cached seed " "found for set: " << _setName, _rsm); return _rsm; } // This can't throw an exception because it is called in the destructor of ScopedDbConnection string DBClientReplicaSet::getServerAddress() const { if (!_rsm) { warning() << "Trying to get server address for DBClientReplicaSet, but no " "ReplicaSetMonitor exists for " << _setName; return str::stream() << _setName << "/"; } return _rsm->getServerAddress(); } HostAndPort DBClientReplicaSet::getSuspectedPrimaryHostAndPort() const { if (!_master) { return HostAndPort(); } return _master->getServerHostAndPort(); } void DBClientReplicaSet::setRequestMetadataWriter(rpc::RequestMetadataWriter writer) { // Set the hooks in both our sub-connections and in ourselves. if (_master) { _master->setRequestMetadataWriter(writer); } if (_lastSlaveOkConn.get()) { _lastSlaveOkConn->setRequestMetadataWriter(writer); } DBClientBase::setRequestMetadataWriter(std::move(writer)); } void DBClientReplicaSet::setReplyMetadataReader(rpc::ReplyMetadataReader reader) { // Set the hooks in both our sub-connections and in ourselves. if (_master) { _master->setReplyMetadataReader(reader); } if (_lastSlaveOkConn.get()) { _lastSlaveOkConn->setReplyMetadataReader(reader); } DBClientBase::setReplyMetadataReader(std::move(reader)); } int DBClientReplicaSet::getMinWireVersion() { return _getMonitor()->getMinWireVersion(); } int DBClientReplicaSet::getMaxWireVersion() { return _getMonitor()->getMaxWireVersion(); } // A replica set connection is never disconnected, since it controls its own reconnection // logic. // // Has the side effect of proactively clearing any cached connections which have been // disconnected in the background. bool DBClientReplicaSet::isStillConnected() { if (_master && !_master->isStillConnected()) { resetMaster(); // Don't notify monitor of bg failure, since it's not clear how long ago it happened } if (_lastSlaveOkConn.get() && !_lastSlaveOkConn->isStillConnected()) { resetSlaveOkConn(); // Don't notify monitor of bg failure, since it's not clear how long ago it happened } return true; } namespace { bool _isSecondaryCommand(StringData commandName, const BSONObj& commandArgs) { if (_secOkCmdList.count(commandName.toString())) { return true; } if (commandName == "mapReduce" || commandName == "mapreduce") { if (!commandArgs.hasField("out")) { return false; } BSONElement outElem(commandArgs["out"]); if (outElem.isABSONObj() && outElem["inline"].ok()) { return true; } } return false; } // Internal implementation of isSecondaryQuery, takes previously-parsed read preference bool _isSecondaryQuery(const string& ns, const BSONObj& queryObj, const ReadPreferenceSetting& readPref) { // If the read pref is primary only, this is not a secondary query if (readPref.pref == ReadPreference::PrimaryOnly) return false; if (ns.find(".$cmd") == string::npos) { return true; } // This is a command with secondary-possible read pref // Only certain commands are supported for secondary operation. BSONObj actualQueryObj; if (strcmp(queryObj.firstElement().fieldName(), "$query") == 0) { actualQueryObj = queryObj["$query"].embeddedObject(); } else if (strcmp(queryObj.firstElement().fieldName(), "query") == 0) { actualQueryObj = queryObj["query"].embeddedObject(); } else { actualQueryObj = queryObj; } StringData commandName = actualQueryObj.firstElementFieldName(); return _isSecondaryCommand(commandName, actualQueryObj); } } // namespace bool DBClientReplicaSet::isSecondaryQuery(const string& ns, const BSONObj& queryObj, int queryOptions) { unique_ptr readPref(_extractReadPref(queryObj, queryOptions)); return _isSecondaryQuery(ns, queryObj, *readPref); } DBClientConnection* DBClientReplicaSet::checkMaster() { ReplicaSetMonitorPtr monitor = _getMonitor(); HostAndPort h = monitor->getMasterOrUassert(); if (h == _masterHost && _master) { // a master is selected. let's just make sure connection didn't die if (!_master->isFailed()) return _master.get(); monitor->failedHost(_masterHost, {ErrorCodes::Error(40657), "Last known master host cannot be reached"}); h = monitor->getMasterOrUassert(); // old master failed, try again. } _masterHost = h; MongoURI masterUri; if (!_uri.isValid()) masterUri = MongoURI(ConnectionString(_masterHost)); else masterUri = _uri.cloneURIForServer(_masterHost); string errmsg; DBClientConnection* newConn = NULL; boost::optional socketTimeout; if (_so_timeout > 0.0) socketTimeout = _so_timeout; try { // Needs to perform a dynamic_cast because we need to set the replSet // callback. We should eventually not need this after we remove the // callback. newConn = dynamic_cast( masterUri.connect(_applicationName, errmsg, socketTimeout)); } catch (const AssertionException& ex) { errmsg = ex.toString(); } if (newConn == NULL || !errmsg.empty()) { const std::string message = str::stream() << "can't connect to new replica set master [" << _masterHost.toString() << "]" << (errmsg.empty() ? "" : ", err: ") << errmsg; monitor->failedHost(_masterHost, {ErrorCodes::Error(40659), message}); uasserted(ErrorCodes::FailedToSatisfyReadPreference, message); } resetMaster(); _masterHost = h; _master.reset(newConn); _master->setParentReplSetName(_setName); _master->setRequestMetadataWriter(getRequestMetadataWriter()); _master->setReplyMetadataReader(getReplyMetadataReader()); _authConnection(_master.get()); return _master.get(); } bool DBClientReplicaSet::checkLastHost(const ReadPreferenceSetting* readPref) { // Can't use a cached host if we don't have one. if (!_lastSlaveOkConn.get() || _lastSlaveOkHost.empty()) { return false; } // Don't pin if the readPrefs differ. if (!_lastReadPref || !_lastReadPref->equals(*readPref)) { return false; } // Make sure we don't think the host is down. if (_lastSlaveOkConn->isFailed() || !_getMonitor()->isHostUp(_lastSlaveOkHost)) { _invalidateLastSlaveOkCache( {ErrorCodes::Error(40660), "Last slave connection is no longer available"}); return false; } return true; } void DBClientReplicaSet::_authConnection(DBClientConnection* conn) { for (map::const_iterator i = _auths.begin(); i != _auths.end(); ++i) { try { conn->auth(i->second); } catch (const AssertionException&) { warning() << "cached auth failed for set: " << _setName << " db: " << i->second[saslCommandUserDBFieldName].str() << " user: " << i->second[saslCommandUserFieldName].str() << endl; } } } void DBClientReplicaSet::logoutAll(DBClientConnection* conn) { for (map::const_iterator i = _auths.begin(); i != _auths.end(); ++i) { BSONObj response; try { conn->logout(i->first, response); } catch (const AssertionException& ex) { warning() << "Failed to logout: " << conn->getServerAddress() << " on db: " << i->first << causedBy(redact(ex)); } } } DBClientConnection& DBClientReplicaSet::masterConn() { return *checkMaster(); } DBClientConnection& DBClientReplicaSet::slaveConn() { shared_ptr readPref( new ReadPreferenceSetting(ReadPreference::SecondaryPreferred, TagSet())); DBClientConnection* conn = selectNodeUsingTags(readPref); uassert(16369, str::stream() << "No good nodes available for set: " << _getMonitor()->getName(), conn != NULL); return *conn; } bool DBClientReplicaSet::connect() { // Returns true if there are any up hosts. const ReadPreferenceSetting anyUpHost(ReadPreference::Nearest, TagSet()); return _getMonitor()->getHostOrRefresh(anyUpHost).getNoThrow().isOK(); } static bool isAuthenticationException(const DBException& ex) { return ex.code() == ErrorCodes::AuthenticationFailed; } void DBClientReplicaSet::_auth(const BSONObj& params) { // We prefer to authenticate against a primary, but otherwise a secondary is ok too // Empty tag matches every secondary shared_ptr readPref( new ReadPreferenceSetting(ReadPreference::PrimaryPreferred, TagSet())); LOG(3) << "dbclient_rs authentication of " << _getMonitor()->getName() << endl; // NOTE that we retry MAX_RETRY + 1 times, since we're always primary preferred we don't // fallback to the primary. Status lastNodeStatus = Status::OK(); for (size_t retry = 0; retry < MAX_RETRY + 1; retry++) { try { DBClientConnection* conn = selectNodeUsingTags(readPref); if (conn == NULL) { break; } conn->auth(params); // Cache the new auth information since we now validated it's good _auths[params[saslCommandUserDBFieldName].str()] = params.getOwned(); // Ensure the only child connection open is the one we authenticated against - other // child connections may not have full authentication information. // NOTE: _lastSlaveOkConn may or may not be the same as _master dassert(_lastSlaveOkConn.get() == conn || _master.get() == conn); if (conn != _lastSlaveOkConn.get()) { resetSlaveOkConn(); } if (conn != _master.get()) { resetMaster(); } return; } catch (const DBException& ex) { // We care if we can't authenticate (i.e. bad password) in credential params. if (isAuthenticationException(ex)) { throw; } lastNodeStatus = ex.toStatus(str::stream() << "can't authenticate against replica set node " << _lastSlaveOkHost); _invalidateLastSlaveOkCache(lastNodeStatus); } } if (lastNodeStatus.isOK()) { StringBuilder assertMsgB; assertMsgB << "Failed to authenticate, no good nodes in " << _getMonitor()->getName(); uasserted(ErrorCodes::HostNotFound, assertMsgB.str()); } else { uassertStatusOK(lastNodeStatus); } } void DBClientReplicaSet::logout(const string& dbname, BSONObj& info) { DBClientConnection* priConn = checkMaster(); priConn->logout(dbname, info); _auths.erase(dbname); /* Also logout the cached secondary connection. Note that this is only * needed when we actually have something cached and is last known to be * working. */ if (_lastSlaveOkConn.get() != NULL && !_lastSlaveOkConn->isFailed()) { try { BSONObj dummy; _lastSlaveOkConn->logout(dbname, dummy); } catch (const DBException&) { // Make sure we can't use this connection again. verify(_lastSlaveOkConn->isFailed()); } } } // ------------- simple functions ----------------- void DBClientReplicaSet::insert(const string& ns, BSONObj obj, int flags) { checkMaster()->insert(ns, obj, flags); } void DBClientReplicaSet::insert(const string& ns, const vector& v, int flags) { checkMaster()->insert(ns, v, flags); } void DBClientReplicaSet::remove(const string& ns, Query obj, int flags) { checkMaster()->remove(ns, obj, flags); } void DBClientReplicaSet::update(const string& ns, Query query, BSONObj obj, int flags) { return checkMaster()->update(ns, query, obj, flags); } unique_ptr DBClientReplicaSet::query(const NamespaceStringOrUUID& nsOrUuid, Query query, int nToReturn, int nToSkip, const BSONObj* fieldsToReturn, int queryOptions, int batchSize) { shared_ptr readPref(_extractReadPref(query.obj, queryOptions)); invariant(nsOrUuid.nss()); const string ns = nsOrUuid.nss()->ns(); if (_isSecondaryQuery(ns, query.obj, *readPref)) { LOG(3) << "dbclient_rs query using secondary or tagged node selection in " << _getMonitor()->getName() << ", read pref is " << readPref->toString() << " (primary : " << (_master.get() != NULL ? _master->getServerAddress() : "[not cached]") << ", lastTagged : " << (_lastSlaveOkConn.get() != NULL ? _lastSlaveOkConn->getServerAddress() : "[not cached]") << ")" << endl; string lastNodeErrMsg; for (size_t retry = 0; retry < MAX_RETRY; retry++) { try { DBClientConnection* conn = selectNodeUsingTags(readPref); if (conn == NULL) { break; } unique_ptr cursor = conn->query( nsOrUuid, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize); return checkSlaveQueryResult(std::move(cursor)); } catch (const DBException& ex) { const Status status = ex.toStatus(str::stream() << "can't query replica set node " << _lastSlaveOkHost); lastNodeErrMsg = status.reason(); _invalidateLastSlaveOkCache(status); } } StringBuilder assertMsg; assertMsg << "Failed to do query, no good nodes in " << _getMonitor()->getName(); if (!lastNodeErrMsg.empty()) { assertMsg << ", last error: " << lastNodeErrMsg; } uasserted(16370, assertMsg.str()); } LOG(3) << "dbclient_rs query to primary node in " << _getMonitor()->getName() << endl; return checkMaster()->query( nsOrUuid, query, nToReturn, nToSkip, fieldsToReturn, queryOptions, batchSize); } BSONObj DBClientReplicaSet::findOne(const string& ns, const Query& query, const BSONObj* fieldsToReturn, int queryOptions) { shared_ptr readPref(_extractReadPref(query.obj, queryOptions)); if (_isSecondaryQuery(ns, query.obj, *readPref)) { LOG(3) << "dbclient_rs findOne using secondary or tagged node selection in " << _getMonitor()->getName() << ", read pref is " << readPref->toString() << " (primary : " << (_master.get() != NULL ? _master->getServerAddress() : "[not cached]") << ", lastTagged : " << (_lastSlaveOkConn.get() != NULL ? _lastSlaveOkConn->getServerAddress() : "[not cached]") << ")" << endl; string lastNodeErrMsg; for (size_t retry = 0; retry < MAX_RETRY; retry++) { try { DBClientConnection* conn = selectNodeUsingTags(readPref); if (conn == NULL) { break; } return conn->findOne(ns, query, fieldsToReturn, queryOptions); } catch (const DBException& ex) { const Status status = ex.toStatus(str::stream() << "can't findone replica set node " << _lastSlaveOkHost.toString()); lastNodeErrMsg = status.reason(); _invalidateLastSlaveOkCache(status); } } StringBuilder assertMsg; assertMsg << "Failed to call findOne, no good nodes in " << _getMonitor()->getName(); if (!lastNodeErrMsg.empty()) { assertMsg << ", last error: " << lastNodeErrMsg; } uasserted(16379, assertMsg.str()); } LOG(3) << "dbclient_rs findOne to primary node in " << _getMonitor()->getName() << endl; return checkMaster()->findOne(ns, query, fieldsToReturn, queryOptions); } void DBClientReplicaSet::killCursor(const NamespaceString& ns, long long cursorID) { // we should never call killCursor on a replica set connection // since we don't know which server it belongs to // can't assume master because of slave ok // and can have a cursor survive a master change verify(0); } void DBClientReplicaSet::isntMaster() { // Can't use _getMonitor because that will create a new monitor from the cached seed if the // monitor doesn't exist. _rsm->failedHost( _masterHost, {ErrorCodes::NotMaster, str::stream() << "got not master for: " << _masterHost}); resetMaster(); } unique_ptr DBClientReplicaSet::checkSlaveQueryResult( unique_ptr result) { if (result.get() == NULL) return result; BSONObj error; bool isError = result->peekError(&error); if (!isError) return result; // We only check for "not master or secondary" errors here // If the error code here ever changes, we need to change this code also BSONElement code = error["code"]; if (code.isNumber() && code.Int() == ErrorCodes::NotMasterOrSecondary) { isntSecondary(); uasserted(14812, str::stream() << "slave " << _lastSlaveOkHost.toString() << " is no longer secondary"); } return result; } void DBClientReplicaSet::isntSecondary() { // Failover to next slave _getMonitor()->failedHost( _lastSlaveOkHost, {ErrorCodes::NotMasterOrSecondary, str::stream() << "slave no longer has secondary status: " << _lastSlaveOkHost}); resetSlaveOkConn(); } DBClientConnection* DBClientReplicaSet::selectNodeUsingTags( shared_ptr readPref) { if (checkLastHost(readPref.get())) { LOG(3) << "dbclient_rs selecting compatible last used node " << _lastSlaveOkHost; return _lastSlaveOkConn.get(); } ReplicaSetMonitorPtr monitor = _getMonitor(); auto selectedNodeStatus = monitor->getHostOrRefresh(*readPref).getNoThrow(); if (!selectedNodeStatus.isOK()) { LOG(3) << "dbclient_rs no compatible node found" << causedBy(redact(selectedNodeStatus.getStatus())); return nullptr; } const HostAndPort selectedNode = std::move(selectedNodeStatus.getValue()); // We are now about to get a new connection from the pool, so cleanup // the current one and release it back to the pool. resetSlaveOkConn(); _lastReadPref = readPref; _lastSlaveOkHost = selectedNode; // Primary connection is special because it is the only connection that is // versioned in mongos. Therefore, we have to make sure that this object // maintains only one connection to the primary and use that connection // every time we need to talk to the primary. if (monitor->isPrimary(selectedNode)) { checkMaster(); LOG(3) << "dbclient_rs selecting primary node " << selectedNode << endl; _lastSlaveOkConn = _master; return _master.get(); } auto dtor = [host = _lastSlaveOkHost.toString()](DBClientConnection * ptr) { globalConnPool.release(host, ptr); }; // Needs to perform a dynamic_cast because we need to set the replSet // callback. We should eventually not need this after we remove the // callback. DBClientConnection* newConn = dynamic_cast( globalConnPool.get(_lastSlaveOkHost.toString(), _so_timeout)); // Assert here instead of returning NULL since the contract of this method is such // that returning NULL means none of the nodes were good, which is not the case here. uassert(16532, str::stream() << "Failed to connect to " << _lastSlaveOkHost.toString(), newConn != NULL); _lastSlaveOkConn = std::shared_ptr(newConn, std::move(dtor)); _lastSlaveOkConn->setParentReplSetName(_setName); _lastSlaveOkConn->setRequestMetadataWriter(getRequestMetadataWriter()); _lastSlaveOkConn->setReplyMetadataReader(getReplyMetadataReader()); if (_authPooledSecondaryConn) { _authConnection(_lastSlaveOkConn.get()); } else { // Mongos pooled connections are authenticated through ShardingConnectionHook::onCreate() } LOG(3) << "dbclient_rs selecting node " << _lastSlaveOkHost << endl; return _lastSlaveOkConn.get(); } void DBClientReplicaSet::say(Message& toSend, bool isRetry, string* actualServer) { if (!isRetry) _lazyState = LazyState(); const int lastOp = toSend.operation(); if (lastOp == dbQuery) { // TODO: might be possible to do this faster by changing api DbMessage dm(toSend); QueryMessage qm(dm); shared_ptr readPref(_extractReadPref(qm.query, qm.queryOptions)); if (_isSecondaryQuery(qm.ns, qm.query, *readPref)) { LOG(3) << "dbclient_rs say using secondary or tagged node selection in " << _getMonitor()->getName() << ", read pref is " << readPref->toString() << " (primary : " << (_master.get() != NULL ? _master->getServerAddress() : "[not cached]") << ", lastTagged : " << (_lastSlaveOkConn.get() != NULL ? _lastSlaveOkConn->getServerAddress() : "[not cached]") << ")" << endl; string lastNodeErrMsg; for (size_t retry = 0; retry < MAX_RETRY; retry++) { _lazyState._retries = retry; try { DBClientConnection* conn = selectNodeUsingTags(readPref); if (conn == NULL) { break; } if (actualServer != NULL) { *actualServer = conn->getServerAddress(); } conn->say(toSend); _lazyState._lastOp = lastOp; _lazyState._secondaryQueryOk = true; _lazyState._lastClient = conn; } catch (const DBException& ex) { const Status status = ex.toStatus(str::stream() << "can't callLazy replica set node " << _lastSlaveOkHost.toString()); lastNodeErrMsg = status.reason(); _invalidateLastSlaveOkCache(status); continue; } return; } StringBuilder assertMsg; assertMsg << "Failed to call say, no good nodes in " << _getMonitor()->getName(); if (!lastNodeErrMsg.empty()) { assertMsg << ", last error: " << lastNodeErrMsg; } uasserted(16380, assertMsg.str()); } } LOG(3) << "dbclient_rs say to primary node in " << _getMonitor()->getName() << endl; DBClientConnection* master = checkMaster(); if (actualServer) *actualServer = master->getServerAddress(); _lazyState._lastOp = lastOp; _lazyState._secondaryQueryOk = false; // Don't retry requests to primary since there is only one host to try _lazyState._retries = MAX_RETRY; _lazyState._lastClient = master; master->say(toSend); return; } bool DBClientReplicaSet::recv(Message& m, int lastRequestId) { verify(_lazyState._lastClient); // TODO: It would be nice if we could easily wrap a conn error as a result error try { return _lazyState._lastClient->recv(m, lastRequestId); } catch (DBException& e) { log() << "could not receive data from " << _lazyState._lastClient->toString() << causedBy(redact(e)); return false; } } void DBClientReplicaSet::checkResponse(const std::vector& batch, bool networkError, bool* retry, string* targetHost) { // For now, do exactly as we did before, so as not to break things. In general though, we // should fix this so checkResponse has a more consistent contract. if (!retry) { if (_lazyState._lastClient) return _lazyState._lastClient->checkResponse(batch, networkError); else return checkMaster()->checkResponse(batch, networkError); } *retry = false; if (targetHost && _lazyState._lastClient) *targetHost = _lazyState._lastClient->getServerAddress(); else if (targetHost) *targetHost = ""; if (!_lazyState._lastClient) return; // nReturned == 1 means that we got one result back, which might be an error // networkError is a sentinel value for "no data returned" aka (usually) network problem // If neither, this must be a query result so our response is ok wrt the replica set if (batch.size() != 1 && !networkError) return; BSONObj dataObj; if (batch.size() == 1) dataObj = batch[0]; // Check if we should retry here if (_lazyState._lastOp == dbQuery && _lazyState._secondaryQueryOk) { // query could potentially go to a secondary, so see if this is an error (or empty) and // retry if we're not past our retry limit. if (networkError || (hasErrField(dataObj) && !dataObj["code"].eoo() && dataObj["code"].Int() == ErrorCodes::NotMasterOrSecondary)) { if (_lazyState._lastClient == _lastSlaveOkConn.get()) { isntSecondary(); } else if (_lazyState._lastClient == _master.get()) { isntMaster(); } else { warning() << "passed " << redact(dataObj) << " but last rs client " << _lazyState._lastClient->toString() << " is not master or secondary" << endl; } if (_lazyState._retries < static_cast(MAX_RETRY)) { _lazyState._retries++; *retry = true; } else { log() << "too many retries (" << _lazyState._retries << "), could not get data from replica set" << endl; } } } else if (_lazyState._lastOp == dbQuery) { // if query could not potentially go to a secondary, just mark the master as bad if (networkError || (hasErrField(dataObj) && !dataObj["code"].eoo() && dataObj["code"].Int() == ErrorCodes::NotMasterNoSlaveOk)) { if (_lazyState._lastClient == _master.get()) { isntMaster(); } } } } DBClientBase* DBClientReplicaSet::runFireAndForgetCommand(OpMsgRequest request) { // Assume all fire-and-forget commands should go to the primary node. It is currently used // for writes which need to go to the primary and for killCursors which should be sent to a // specific host rather than through DBClientReplicaSet. return checkMaster()->runFireAndForgetCommand(std::move(request)); } std::pair DBClientReplicaSet::runCommandWithTarget( OpMsgRequest request) { // This overload exists so we can parse out the read preference and then use server // selection directly without having to re-parse the raw message. // TODO: eventually we will want to pass the metadata before serializing it to BSON // so we don't have to re-parse it, however, that will come with its own set of // complications (e.g. some kind of base class or concept for MetadataSerializable // objects). For now we do it the stupid way. auto readPref = uassertStatusOK(ReadPreferenceSetting::fromContainingBSON(request.body)); if (readPref.pref == ReadPreference::PrimaryOnly || // If the command is not runnable on a secondary, we run it on the primary // regardless of the read preference. !_isSecondaryCommand(request.getCommandName(), request.body)) { auto conn = checkMaster(); return conn->runCommandWithTarget(std::move(request)); } auto rpShared = std::make_shared(std::move(readPref)); for (size_t retry = 0; retry < MAX_RETRY; ++retry) { try { auto* conn = selectNodeUsingTags(rpShared); if (conn == nullptr) { break; } // We can't move the request since we need it to retry. return conn->runCommandWithTarget(request); } catch (const DBException& ex) { _invalidateLastSlaveOkCache(ex.toStatus()); } } uasserted(ErrorCodes::HostNotFound, str::stream() << "Could not satisfy $readPreference of '" << readPref.toString() << "' while attempting to run command " << request.getCommandName()); } std::pair> DBClientReplicaSet::runCommandWithTarget( OpMsgRequest request, std::shared_ptr me) { auto out = runCommandWithTarget(std::move(request)); std::shared_ptr conn = [&] { if (out.second == _lastSlaveOkConn.get()) { return _lastSlaveOkConn; } if (out.second == _master.get()) { return _master; } MONGO_UNREACHABLE; }(); return {std::move(out.first), std::move(conn)}; } bool DBClientReplicaSet::call(Message& toSend, Message& response, bool assertOk, string* actualServer) { const char* ns = 0; if (toSend.operation() == dbQuery) { // TODO: might be possible to do this faster by changing api DbMessage dm(toSend); QueryMessage qm(dm); ns = qm.ns; shared_ptr readPref(_extractReadPref(qm.query, qm.queryOptions)); if (_isSecondaryQuery(ns, qm.query, *readPref)) { LOG(3) << "dbclient_rs call using secondary or tagged node selection in " << _getMonitor()->getName() << ", read pref is " << readPref->toString() << " (primary : " << (_master.get() != NULL ? _master->getServerAddress() : "[not cached]") << ", lastTagged : " << (_lastSlaveOkConn.get() != NULL ? _lastSlaveOkConn->getServerAddress() : "[not cached]") << ")" << endl; for (size_t retry = 0; retry < MAX_RETRY; retry++) { try { DBClientConnection* conn = selectNodeUsingTags(readPref); if (conn == NULL) { return false; } if (actualServer != NULL) { *actualServer = conn->getServerAddress(); } return conn->call(toSend, response, assertOk, nullptr); } catch (const DBException& ex) { if (actualServer) *actualServer = ""; const Status status = ex.toStatus(); _invalidateLastSlaveOkCache(status.withContext( str::stream() << "can't call replica set node " << _lastSlaveOkHost)); } } // Was not able to successfully send after max retries return false; } } LOG(3) << "dbclient_rs call to primary node in " << _getMonitor()->getName() << endl; DBClientConnection* m = checkMaster(); if (actualServer) *actualServer = m->getServerAddress(); if (!m->call(toSend, response, assertOk, nullptr)) return false; if (ns) { QueryResult::View res = response.singleData().view2ptr(); if (res.getNReturned() == 1) { BSONObj x(res.data()); if (str::contains(ns, "$cmd")) { if (isNotMasterErrorString(x["errmsg"])) isntMaster(); } else { if (isNotMasterErrorString(getErrField(x))) isntMaster(); } } } return true; } void DBClientReplicaSet::_invalidateLastSlaveOkCache(const Status& status) { // This is not wrapped in with if (_lastSlaveOkConn && _lastSlaveOkConn->isFailed()) because // there are certain exceptions that will not make the connection be labeled as failed. For // example, asserts 13079, 13080, 16386 _getMonitor()->failedHost(_lastSlaveOkHost, status); resetSlaveOkConn(); } void DBClientReplicaSet::reset() { resetSlaveOkConn(); _lazyState._lastClient = NULL; _lastReadPref.reset(); } void DBClientReplicaSet::setAuthPooledSecondaryConn(bool setting) { _authPooledSecondaryConn = setting; } void DBClientReplicaSet::resetMaster() { if (_master.get() == _lastSlaveOkConn.get()) { _lastSlaveOkConn.reset(); _lastSlaveOkHost = HostAndPort(); } _master.reset(); _masterHost = HostAndPort(); } void DBClientReplicaSet::resetSlaveOkConn() { if (_lastSlaveOkConn.get() == _master.get()) { _lastSlaveOkConn.reset(); } else if (_lastSlaveOkConn.get() != NULL) { if (_authPooledSecondaryConn) { logoutAll(_lastSlaveOkConn.get()); } else { // Mongos pooled connections are all authenticated with the same credentials; // so no need to logout. } _lastSlaveOkConn.reset(); } _lastSlaveOkHost = HostAndPort(); } } // namespace mongo