/* Copyright 2009 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
#include "mongo/platform/basic.h"
#include "mongo/client/dbclient_rs.h"
#include
#include
#include "mongo/bson/util/builder.h"
#include "mongo/client/connpool.h"
#include "mongo/client/dbclientcursor.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/client/sasl_client_authenticate.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/jsobj.h"
#include "mongo/util/log.h"
namespace mongo {
using boost::shared_ptr;
using std::auto_ptr;
using std::endl;
using std::map;
using std::set;
using std::string;
using std::vector;
namespace {
/*
* Set of commands that can be used with $readPreference
*/
set _secOkCmdList;
class PopulateReadPrefSecOkCmdList {
public:
PopulateReadPrefSecOkCmdList() {
_secOkCmdList.insert("aggregate");
_secOkCmdList.insert("collStats");
_secOkCmdList.insert("count");
_secOkCmdList.insert("distinct");
_secOkCmdList.insert("dbStats");
_secOkCmdList.insert("explain");
_secOkCmdList.insert("find");
_secOkCmdList.insert("geoNear");
_secOkCmdList.insert("geoSearch");
_secOkCmdList.insert("geoWalk");
_secOkCmdList.insert("group");
}
} _populateReadPrefSecOkCmdList;
/**
* Extracts the read preference settings from the query document. Note that this method
* assumes that the query is ok for secondaries so it defaults to
* ReadPreference_SecondaryPreferred when nothing is specified. Supports the following
* format:
*
* Format A (official format):
* { query: , $readPreference: }
*
* Format B (unofficial internal format from mongos):
* { , $queryOptions: { $readPreference: }}
*
* @param query the raw query document
*
* @return the read preference setting if a read preference exists, otherwise the default read
* preference of Primary_Only. If the tags field was not present, it will contain one
* empty tag document {} which matches any tag.
*
* @throws AssertionException if the read preference object is malformed
*/
ReadPreferenceSetting* _extractReadPref(const BSONObj& query, int queryOptions) {
if (Query::hasReadPreference(query)) {
ReadPreference pref = mongo::ReadPreference_SecondaryPreferred;
BSONElement readPrefElement;
if (query.hasField(Query::ReadPrefField.name())) {
readPrefElement = query[Query::ReadPrefField.name()];
}
else {
readPrefElement = query["$queryOptions"][Query::ReadPrefField.name()];
}
uassert(16381, "$readPreference should be an object",
readPrefElement.isABSONObj());
const BSONObj& prefDoc = readPrefElement.Obj();
uassert(16382, "mode not specified for read preference",
prefDoc.hasField(Query::ReadPrefModeField.name()));
const string mode = prefDoc[Query::ReadPrefModeField.name()].String();
if (mode == "primary") {
pref = mongo::ReadPreference_PrimaryOnly;
}
else if (mode == "primaryPreferred") {
pref = mongo::ReadPreference_PrimaryPreferred;
}
else if (mode == "secondary") {
pref = mongo::ReadPreference_SecondaryOnly;
}
else if (mode == "secondaryPreferred") {
pref = mongo::ReadPreference_SecondaryPreferred;
}
else if (mode == "nearest") {
pref = mongo::ReadPreference_Nearest;
}
else {
uasserted(16383, str::stream() << "Unknown read preference mode: " << mode);
}
if (prefDoc.hasField(Query::ReadPrefTagsField.name())) {
const BSONElement& tagsElem = prefDoc[Query::ReadPrefTagsField.name()];
uassert(16385, "tags for read preference should be an array",
tagsElem.type() == mongo::Array);
TagSet tags(BSONArray(tagsElem.Obj().getOwned()));
if (pref == mongo::ReadPreference_PrimaryOnly && !tags.getTagBSON().isEmpty()) {
uassert(16384, "Only empty tags are allowed with primary read preference",
tags.getTagBSON().firstElement().Obj().isEmpty());
}
return new ReadPreferenceSetting(pref, tags);
}
else {
return new ReadPreferenceSetting(pref, TagSet());
}
}
// Default read pref is primary only or secondary preferred with slaveOK
ReadPreference pref =
queryOptions & QueryOption_SlaveOk ?
mongo::ReadPreference_SecondaryPreferred : mongo::ReadPreference_PrimaryOnly;
return new ReadPreferenceSetting(pref, TagSet());
}
} // namespace
// --------------------------------
// ----- DBClientReplicaSet ---------
// --------------------------------
const size_t DBClientReplicaSet::MAX_RETRY = 3;
bool DBClientReplicaSet::_authPooledSecondaryConn = true;
DBClientReplicaSet::DBClientReplicaSet( const string& name , const vector& servers, double so_timeout )
: _setName( name ), _so_timeout( so_timeout ) {
ReplicaSetMonitor::createIfNeeded( name, set(servers.begin(), servers.end()) );
}
DBClientReplicaSet::~DBClientReplicaSet() {
if (_lastSlaveOkConn.get() == _master.get()) {
_lastSlaveOkConn.release();
}
}
ReplicaSetMonitorPtr DBClientReplicaSet::_getMonitor() const {
ReplicaSetMonitorPtr rsm = ReplicaSetMonitor::get( _setName, true );
// If you can't get a ReplicaSetMonitor then this connection isn't valid
uassert( 16340, str::stream() << "No replica set monitor active and no cached seed "
"found for set: " << _setName, rsm );
return rsm;
}
// This can't throw an exception because it is called in the destructor of ScopedDbConnection
string DBClientReplicaSet::getServerAddress() const {
ReplicaSetMonitorPtr rsm = ReplicaSetMonitor::get( _setName, true );
if ( !rsm ) {
warning() << "Trying to get server address for DBClientReplicaSet, but no "
"ReplicaSetMonitor exists for " << _setName << endl;
return str::stream() << _setName << "/" ;
}
return rsm->getServerAddress();
}
HostAndPort DBClientReplicaSet::getSuspectedPrimaryHostAndPort() const {
if (!_master) {
return HostAndPort();
}
return _master->getServerHostAndPort();
}
void DBClientReplicaSet::setRunCommandHook(DBClientWithCommands::RunCommandHookFunc func) {
// Set the hooks in both our sub-connections and in ourselves.
if (_master) {
_master->setRunCommandHook(func);
}
if (_lastSlaveOkConn.get()) {
_lastSlaveOkConn->setRunCommandHook(func);
}
_runCommandHook = func;
}
void DBClientReplicaSet::setPostRunCommandHook
(DBClientWithCommands::PostRunCommandHookFunc func) {
// Set the hooks in both our sub-connections and in ourselves.
if (_master) {
_master->setPostRunCommandHook(func);
}
if (_lastSlaveOkConn.get()) {
_lastSlaveOkConn->setPostRunCommandHook(func);
}
_postRunCommandHook = func;
}
// A replica set connection is never disconnected, since it controls its own reconnection
// logic.
//
// Has the side effect of proactively clearing any cached connections which have been
// disconnected in the background.
bool DBClientReplicaSet::isStillConnected() {
if ( _master && !_master->isStillConnected() ) {
resetMaster();
// Don't notify monitor of bg failure, since it's not clear how long ago it happened
}
if ( _lastSlaveOkConn.get() && !_lastSlaveOkConn->isStillConnected() ) {
resetSlaveOkConn();
// Don't notify monitor of bg failure, since it's not clear how long ago it happened
}
return true;
}
// Internal implementation of isSecondaryQuery, takes previously-parsed read preference
static bool _isSecondaryQuery( const string& ns,
const BSONObj& queryObj,
const ReadPreferenceSetting& readPref ) {
// If the read pref is primary only, this is not a secondary query
if (readPref.pref == ReadPreference_PrimaryOnly) return false;
if (ns.find(".$cmd") == string::npos) {
return true;
}
// This is a command with secondary-possible read pref
// Only certain commands are supported for secondary operation.
BSONObj actualQueryObj;
if (strcmp(queryObj.firstElement().fieldName(), "query") == 0) {
actualQueryObj = queryObj["query"].embeddedObject();
}
else {
actualQueryObj = queryObj;
}
const string cmdName = actualQueryObj.firstElementFieldName();
if (_secOkCmdList.count(cmdName) == 1) {
return true;
}
if (cmdName == "mapReduce" || cmdName == "mapreduce") {
if (!actualQueryObj.hasField("out")) {
return false;
}
BSONElement outElem(actualQueryObj["out"]);
if (outElem.isABSONObj() && outElem["inline"].trueValue()) {
return true;
}
}
return false;
}
bool DBClientReplicaSet::isSecondaryQuery( const string& ns,
const BSONObj& queryObj,
int queryOptions ) {
auto_ptr readPref( _extractReadPref( queryObj, queryOptions ) );
return _isSecondaryQuery( ns, queryObj, *readPref );
}
DBClientConnection * DBClientReplicaSet::checkMaster() {
ReplicaSetMonitorPtr monitor = _getMonitor();
HostAndPort h = monitor->getMasterOrUassert();
if ( h == _masterHost && _master ) {
// a master is selected. let's just make sure connection didn't die
if ( ! _master->isFailed() )
return _master.get();
monitor->failedHost( _masterHost );
h = monitor->getMasterOrUassert(); // old master failed, try again.
}
_masterHost = h;
ConnectionString connStr(_masterHost);
string errmsg;
DBClientConnection* newConn = NULL;
try {
// Needs to perform a dynamic_cast because we need to set the replSet
// callback. We should eventually not need this after we remove the
// callback.
newConn = dynamic_cast(
connStr.connect(errmsg, _so_timeout));
}
catch (const AssertionException& ex) {
errmsg = ex.toString();
}
if (newConn == NULL || !errmsg.empty()) {
monitor->failedHost(_masterHost);
uasserted(13639, str::stream() << "can't connect to new replica set master ["
<< _masterHost.toString() << "]"
<< (errmsg.empty()? "" : ", err: ") << errmsg);
}
resetMaster();
_masterHost = h;
_master.reset(newConn);
_master->setReplSetClientCallback(this);
_master->setRunCommandHook(_runCommandHook);
_master->setPostRunCommandHook(_postRunCommandHook);
_auth( _master.get() );
return _master.get();
}
bool DBClientReplicaSet::checkLastHost(const ReadPreferenceSetting* readPref) {
// Can't use a cached host if we don't have one.
if (!_lastSlaveOkConn.get() || _lastSlaveOkHost.empty()) {
return false;
}
// Don't pin if the readPrefs differ.
if (!_lastReadPref || !_lastReadPref->equals(*readPref)) {
return false;
}
// Make sure we don't think the host is down.
if (_lastSlaveOkConn->isFailed() || !_getMonitor()->isHostUp(_lastSlaveOkHost)) {
invalidateLastSlaveOkCache();
return false;
}
return true;
}
void DBClientReplicaSet::_auth( DBClientConnection * conn ) {
for (map::const_iterator i = _auths.begin(); i != _auths.end(); ++i) {
try {
conn->auth(i->second);
}
catch (const UserException&) {
warning() << "cached auth failed for set: " << _setName <<
" db: " << i->second[saslCommandUserDBFieldName].str() <<
" user: " << i->second[saslCommandUserFieldName].str() << endl;
}
}
}
void DBClientReplicaSet::logoutAll(DBClientConnection* conn) {
for (map::const_iterator i = _auths.begin(); i != _auths.end(); ++i) {
BSONObj response;
try {
conn->logout(i->first, response);
}
catch (const UserException& ex) {
warning() << "Failed to logout: " << conn->getServerAddress() <<
" on db: " << i->first << endl;
}
}
}
DBClientConnection& DBClientReplicaSet::masterConn() {
return *checkMaster();
}
DBClientConnection& DBClientReplicaSet::slaveConn() {
shared_ptr readPref(
new ReadPreferenceSetting(ReadPreference_SecondaryPreferred, TagSet()));
DBClientConnection* conn = selectNodeUsingTags(readPref);
uassert( 16369, str::stream() << "No good nodes available for set: "
<< _getMonitor()->getName(), conn != NULL );
return *conn;
}
bool DBClientReplicaSet::connect() {
// Returns true if there are any up hosts.
const ReadPreferenceSetting anyUpHost(ReadPreference_Nearest, TagSet());
return !_getMonitor()->getHostOrRefresh(anyUpHost).empty();
}
static bool isAuthenticationException( const DBException& ex ) {
return ex.getCode() == ErrorCodes::AuthenticationFailed;
}
void DBClientReplicaSet::_auth( const BSONObj& params ) {
// We prefer to authenticate against a primary, but otherwise a secondary is ok too
// Empty tag matches every secondary
shared_ptr readPref(
new ReadPreferenceSetting( ReadPreference_PrimaryPreferred, TagSet() ) );
LOG(3) << "dbclient_rs authentication of " << _getMonitor()->getName() << endl;
// NOTE that we retry MAX_RETRY + 1 times, since we're always primary preferred we don't
// fallback to the primary.
Status lastNodeStatus = Status::OK();
for ( size_t retry = 0; retry < MAX_RETRY + 1; retry++ ) {
try {
DBClientConnection* conn = selectNodeUsingTags( readPref );
if ( conn == NULL ) {
break;
}
conn->auth( params );
// Cache the new auth information since we now validated it's good
_auths[params[saslCommandUserDBFieldName].str()] = params.getOwned();
// Ensure the only child connection open is the one we authenticated against - other
// child connections may not have full authentication information.
// NOTE: _lastSlaveOkConn may or may not be the same as _master
dassert(_lastSlaveOkConn.get() == conn || _master.get() == conn);
if ( conn != _lastSlaveOkConn.get() ) {
resetSlaveOkConn();
}
if ( conn != _master.get() ) {
resetMaster();
}
return;
}
catch ( const DBException &ex ) {
// We care if we can't authenticate (i.e. bad password) in credential params.
if ( isAuthenticationException( ex ) ) {
throw;
}
StringBuilder errMsgB;
errMsgB << "can't authenticate against replica set node "
<< _lastSlaveOkHost.toString();
lastNodeStatus = ex.toStatus( errMsgB.str() );
LOG(1) << lastNodeStatus.reason() << endl;
invalidateLastSlaveOkCache();
}
}
if ( lastNodeStatus.isOK() ) {
StringBuilder assertMsgB;
assertMsgB << "Failed to authenticate, no good nodes in " << _getMonitor()->getName();
uasserted( ErrorCodes::NodeNotFound, assertMsgB.str() );
}
else {
uasserted( lastNodeStatus.code(), lastNodeStatus.reason() );
}
}
void DBClientReplicaSet::logout(const string &dbname, BSONObj& info) {
DBClientConnection* priConn = checkMaster();
priConn->logout(dbname, info);
_auths.erase(dbname);
/* Also logout the cached secondary connection. Note that this is only
* needed when we actually have something cached and is last known to be
* working.
*/
if (_lastSlaveOkConn.get() != NULL && !_lastSlaveOkConn->isFailed()) {
try {
BSONObj dummy;
_lastSlaveOkConn->logout(dbname, dummy);
}
catch (const DBException&) {
// Make sure we can't use this connection again.
verify(_lastSlaveOkConn->isFailed());
}
}
}
// ------------- simple functions -----------------
void DBClientReplicaSet::insert( const string &ns , BSONObj obj , int flags) {
checkMaster()->insert(ns, obj, flags);
}
void DBClientReplicaSet::insert( const string &ns, const vector< BSONObj >& v , int flags) {
checkMaster()->insert(ns, v, flags);
}
void DBClientReplicaSet::remove( const string &ns , Query obj , int flags ) {
checkMaster()->remove(ns, obj, flags);
}
void DBClientReplicaSet::update( const string &ns , Query query , BSONObj obj , int flags ) {
return checkMaster()->update( ns, query, obj, flags );
}
auto_ptr DBClientReplicaSet::query(const string &ns,
Query query,
int nToReturn,
int nToSkip,
const BSONObj *fieldsToReturn,
int queryOptions,
int batchSize) {
shared_ptr readPref( _extractReadPref( query.obj, queryOptions ) );
if ( _isSecondaryQuery( ns, query.obj, *readPref ) ) {
LOG( 3 ) << "dbclient_rs query using secondary or tagged node selection in "
<< _getMonitor()->getName() << ", read pref is "
<< readPref->toBSON() << " (primary : "
<< ( _master.get() != NULL ?
_master->getServerAddress() : "[not cached]" )
<< ", lastTagged : "
<< ( _lastSlaveOkConn.get() != NULL ?
_lastSlaveOkConn->getServerAddress() : "[not cached]" )
<< ")" << endl;
string lastNodeErrMsg;
for (size_t retry = 0; retry < MAX_RETRY; retry++) {
try {
DBClientConnection* conn = selectNodeUsingTags(readPref);
if (conn == NULL) {
break;
}
auto_ptr cursor = conn->query(ns, query,
nToReturn, nToSkip, fieldsToReturn, queryOptions,
batchSize);
return checkSlaveQueryResult(cursor);
}
catch (const DBException &dbExcep) {
StringBuilder errMsgBuilder;
errMsgBuilder << "can't query replica set node "
<< _lastSlaveOkHost.toString() << ": " << causedBy( dbExcep );
lastNodeErrMsg = errMsgBuilder.str();
LOG(1) << lastNodeErrMsg << endl;
invalidateLastSlaveOkCache();
}
}
StringBuilder assertMsg;
assertMsg << "Failed to do query, no good nodes in " << _getMonitor()->getName();
if ( !lastNodeErrMsg.empty() ) {
assertMsg << ", last error: " << lastNodeErrMsg;
}
uasserted( 16370, assertMsg.str() );
}
LOG( 3 ) << "dbclient_rs query to primary node in " << _getMonitor()->getName() << endl;
return checkMaster()->query(ns, query, nToReturn, nToSkip, fieldsToReturn,
queryOptions, batchSize);
}
BSONObj DBClientReplicaSet::findOne(const string &ns,
const Query& query,
const BSONObj *fieldsToReturn,
int queryOptions) {
shared_ptr readPref( _extractReadPref( query.obj, queryOptions ) );
if ( _isSecondaryQuery( ns, query.obj, *readPref ) ) {
LOG( 3 ) << "dbclient_rs findOne using secondary or tagged node selection in "
<< _getMonitor()->getName() << ", read pref is "
<< readPref->toBSON() << " (primary : "
<< ( _master.get() != NULL ?
_master->getServerAddress() : "[not cached]" )
<< ", lastTagged : "
<< ( _lastSlaveOkConn.get() != NULL ?
_lastSlaveOkConn->getServerAddress() : "[not cached]" )
<< ")" << endl;
string lastNodeErrMsg;
for (size_t retry = 0; retry < MAX_RETRY; retry++) {
try {
DBClientConnection* conn = selectNodeUsingTags(readPref);
if (conn == NULL) {
break;
}
return conn->findOne(ns,query,fieldsToReturn,queryOptions);
}
catch ( const DBException &dbExcep ) {
StringBuilder errMsgBuilder;
errMsgBuilder << "can't findone replica set node "
<< _lastSlaveOkHost.toString() << ": " << causedBy( dbExcep );
lastNodeErrMsg = errMsgBuilder.str();
LOG(1) << lastNodeErrMsg << endl;
invalidateLastSlaveOkCache();
}
}
StringBuilder assertMsg;
assertMsg << "Failed to call findOne, no good nodes in " << _getMonitor()->getName();
if ( !lastNodeErrMsg.empty() ) {
assertMsg << ", last error: " << lastNodeErrMsg;
}
uasserted(16379, assertMsg.str());
}
LOG( 3 ) << "dbclient_rs findOne to primary node in " << _getMonitor()->getName()
<< endl;
return checkMaster()->findOne(ns,query,fieldsToReturn,queryOptions);
}
void DBClientReplicaSet::killCursor( long long cursorID ) {
// we should never call killCursor on a replica set connection
// since we don't know which server it belongs to
// can't assume master because of slave ok
// and can have a cursor survive a master change
verify(0);
}
void DBClientReplicaSet::isntMaster() {
log() << "got not master for: " << _masterHost << endl;
// Can't use _getMonitor because that will create a new monitor from the cached seed if
// the monitor doesn't exist.
ReplicaSetMonitorPtr monitor = ReplicaSetMonitor::get( _setName );
if ( monitor ) {
monitor->failedHost( _masterHost );
}
resetMaster();
}
auto_ptr DBClientReplicaSet::checkSlaveQueryResult( auto_ptr result ){
if ( result.get() == NULL ) return result;
BSONObj error;
bool isError = result->peekError( &error );
if( ! isError ) return result;
// We only check for "not master or secondary" errors here
// If the error code here ever changes, we need to change this code also
BSONElement code = error["code"];
if( code.isNumber() &&
code.Int() == NotMasterOrSecondaryCode /* not master or secondary */ ) {
isntSecondary();
throw DBException( str::stream() << "slave " << _lastSlaveOkHost.toString()
<< " is no longer secondary", 14812 );
}
return result;
}
void DBClientReplicaSet::isntSecondary() {
log() << "slave no longer has secondary status: " << _lastSlaveOkHost << endl;
// Failover to next slave
_getMonitor()->failedHost( _lastSlaveOkHost );
resetSlaveOkConn();
}
DBClientConnection* DBClientReplicaSet::selectNodeUsingTags(
shared_ptr readPref) {
if (checkLastHost(readPref.get())) {
LOG( 3 ) << "dbclient_rs selecting compatible last used node " << _lastSlaveOkHost
<< endl;
return _lastSlaveOkConn.get();
}
ReplicaSetMonitorPtr monitor = _getMonitor();
HostAndPort selectedNode = monitor->getHostOrRefresh(*readPref);
if ( selectedNode.empty() ){
LOG( 3 ) << "dbclient_rs no compatible node found" << endl;
return NULL;
}
// We are now about to get a new connection from the pool, so cleanup
// the current one and release it back to the pool.
resetSlaveOkConn();
_lastReadPref = readPref;
_lastSlaveOkHost = selectedNode;
// Primary connection is special because it is the only connection that is
// versioned in mongos. Therefore, we have to make sure that this object
// maintains only one connection to the primary and use that connection
// every time we need to talk to the primary.
if (monitor->isPrimary(selectedNode)) {
checkMaster();
LOG( 3 ) << "dbclient_rs selecting primary node " << selectedNode << endl;
_lastSlaveOkConn.reset(_master.get());
return _master.get();
}
// Needs to perform a dynamic_cast because we need to set the replSet
// callback. We should eventually not need this after we remove the
// callback.
DBClientConnection* newConn = dynamic_cast(
pool.get(_lastSlaveOkHost.toString(), _so_timeout));
// Assert here instead of returning NULL since the contract of this method is such
// that returning NULL means none of the nodes were good, which is not the case here.
uassert(16532, str::stream() << "Failed to connect to " << _lastSlaveOkHost.toString(),
newConn != NULL);
_lastSlaveOkConn.reset(newConn);
_lastSlaveOkConn->setReplSetClientCallback(this);
_lastSlaveOkConn->setRunCommandHook(_runCommandHook);
_lastSlaveOkConn->setPostRunCommandHook(_postRunCommandHook);
if (_authPooledSecondaryConn) {
_auth(_lastSlaveOkConn.get());
}
else {
// Mongos pooled connections are authenticated through
// ShardingConnectionHook::onCreate().
}
LOG( 3 ) << "dbclient_rs selecting node " << _lastSlaveOkHost << endl;
return _lastSlaveOkConn.get();
}
void DBClientReplicaSet::say(Message& toSend, bool isRetry, string* actualServer) {
if (!isRetry)
_lazyState = LazyState();
const int lastOp = toSend.operation();
if (lastOp == dbQuery) {
// TODO: might be possible to do this faster by changing api
DbMessage dm(toSend);
QueryMessage qm(dm);
shared_ptr readPref( _extractReadPref( qm.query,
qm.queryOptions ) );
if ( _isSecondaryQuery( qm.ns, qm.query, *readPref ) ) {
LOG( 3 ) << "dbclient_rs say using secondary or tagged node selection in "
<< _getMonitor()->getName() << ", read pref is "
<< readPref->toBSON() << " (primary : "
<< ( _master.get() != NULL ?
_master->getServerAddress() : "[not cached]" )
<< ", lastTagged : "
<< ( _lastSlaveOkConn.get() != NULL ?
_lastSlaveOkConn->getServerAddress() : "[not cached]" )
<< ")" << endl;
string lastNodeErrMsg;
for (size_t retry = 0; retry < MAX_RETRY; retry++) {
_lazyState._retries = retry;
try {
DBClientConnection* conn = selectNodeUsingTags(readPref);
if (conn == NULL) {
break;
}
if (actualServer != NULL) {
*actualServer = conn->getServerAddress();
}
conn->say(toSend);
_lazyState._lastOp = lastOp;
_lazyState._secondaryQueryOk = true;
_lazyState._lastClient = conn;
}
catch ( const DBException& DBExcep ) {
StringBuilder errMsgBuilder;
errMsgBuilder << "can't callLazy replica set node "
<< _lastSlaveOkHost.toString() << ": " << causedBy( DBExcep );
lastNodeErrMsg = errMsgBuilder.str();
LOG(1) << lastNodeErrMsg << endl;
invalidateLastSlaveOkCache();
continue;
}
return;
}
StringBuilder assertMsg;
assertMsg << "Failed to call say, no good nodes in " << _getMonitor()->getName();
if ( !lastNodeErrMsg.empty() ) {
assertMsg << ", last error: " << lastNodeErrMsg;
}
uasserted(16380, assertMsg.str());
}
}
LOG( 3 ) << "dbclient_rs say to primary node in " << _getMonitor()->getName()
<< endl;
DBClientConnection* master = checkMaster();
if (actualServer)
*actualServer = master->getServerAddress();
_lazyState._lastOp = lastOp;
_lazyState._secondaryQueryOk = false;
// Don't retry requests to primary since there is only one host to try
_lazyState._retries = MAX_RETRY;
_lazyState._lastClient = master;
master->say(toSend);
return;
}
bool DBClientReplicaSet::recv( Message& m ) {
verify( _lazyState._lastClient );
// TODO: It would be nice if we could easily wrap a conn error as a result error
try {
return _lazyState._lastClient->recv( m );
}
catch( DBException& e ){
log() << "could not receive data from " << _lazyState._lastClient->toString() << causedBy( e ) << endl;
return false;
}
}
void DBClientReplicaSet::checkResponse( const char* data, int nReturned, bool* retry, string* targetHost ){
// For now, do exactly as we did before, so as not to break things. In general though, we
// should fix this so checkResponse has a more consistent contract.
if( ! retry ){
if( _lazyState._lastClient )
return _lazyState._lastClient->checkResponse( data, nReturned );
else
return checkMaster()->checkResponse( data, nReturned );
}
*retry = false;
if( targetHost && _lazyState._lastClient ) *targetHost = _lazyState._lastClient->getServerAddress();
else if (targetHost) *targetHost = "";
if( ! _lazyState._lastClient ) return;
// nReturned == 1 means that we got one result back, which might be an error
// nReturned == -1 is a sentinel value for "no data returned" aka (usually) network problem
// If neither, this must be a query result so our response is ok wrt the replica set
if( nReturned != 1 && nReturned != -1 ) return;
BSONObj dataObj;
if( nReturned == 1 ) dataObj = BSONObj( data );
// Check if we should retry here
if( _lazyState._lastOp == dbQuery && _lazyState._secondaryQueryOk ){
// query could potentially go to a secondary, so see if this is an error (or empty) and
// retry if we're not past our retry limit.
if( nReturned == -1 /* no result, maybe network problem */ ||
( hasErrField( dataObj ) && ! dataObj["code"].eoo()
&& dataObj["code"].Int() == NotMasterOrSecondaryCode ) ){
if( _lazyState._lastClient == _lastSlaveOkConn.get() ){
isntSecondary();
}
else if( _lazyState._lastClient == _master.get() ){
isntMaster();
}
else {
warning() << "passed " << dataObj << " but last rs client "
<< _lazyState._lastClient->toString() << " is not master or secondary"
<< endl;
}
if ( _lazyState._retries < static_cast( MAX_RETRY ) ) {
_lazyState._retries++;
*retry = true;
}
else{
log() << "too many retries (" << _lazyState._retries
<< "), could not get data from replica set" << endl;
}
}
}
else if( _lazyState._lastOp == dbQuery ){
// if query could not potentially go to a secondary, just mark the master as bad
if( nReturned == -1 /* no result, maybe network problem */ ||
( hasErrField( dataObj ) && ! dataObj["code"].eoo()
&& dataObj["code"].Int() == NotMasterNoSlaveOkCode ) )
{
if( _lazyState._lastClient == _master.get() ){
isntMaster();
}
}
}
}
bool DBClientReplicaSet::call(Message &toSend,
Message &response,
bool assertOk,
string * actualServer) {
const char * ns = 0;
if (toSend.operation() == dbQuery) {
// TODO: might be possible to do this faster by changing api
DbMessage dm(toSend);
QueryMessage qm(dm);
ns = qm.ns;
shared_ptr readPref( _extractReadPref( qm.query,
qm.queryOptions ) );
if ( _isSecondaryQuery( ns, qm.query, *readPref ) ) {
LOG( 3 ) << "dbclient_rs call using secondary or tagged node selection in "
<< _getMonitor()->getName() << ", read pref is "
<< readPref->toBSON() << " (primary : "
<< ( _master.get() != NULL ?
_master->getServerAddress() : "[not cached]" )
<< ", lastTagged : "
<< ( _lastSlaveOkConn.get() != NULL ?
_lastSlaveOkConn->getServerAddress() : "[not cached]" )
<< ")" << endl;
for (size_t retry = 0; retry < MAX_RETRY; retry++) {
try {
DBClientConnection* conn = selectNodeUsingTags(readPref);
if (conn == NULL) {
return false;
}
if (actualServer != NULL) {
*actualServer = conn->getServerAddress();
}
return conn->call(toSend, response, assertOk);
}
catch ( const DBException& dbExcep ) {
LOG(1) << "can't call replica set node " << _lastSlaveOkHost << ": "
<< causedBy( dbExcep ) << endl;
if ( actualServer ) *actualServer = "";
invalidateLastSlaveOkCache();
}
}
// Was not able to successfully send after max retries
return false;
}
}
LOG( 3 ) << "dbclient_rs call to primary node in " << _getMonitor()->getName() << endl;
DBClientConnection* m = checkMaster();
if ( actualServer )
*actualServer = m->getServerAddress();
if ( ! m->call( toSend , response , assertOk ) )
return false;
if ( ns ) {
QueryResult::View res = response.singleData().view2ptr();
if ( res.getNReturned() == 1 ) {
BSONObj x(res.data() );
if ( str::contains( ns , "$cmd" ) ) {
if ( isNotMasterErrorString( x["errmsg"] ) )
isntMaster();
}
else {
if ( isNotMasterErrorString( getErrField( x ) ) )
isntMaster();
}
}
}
return true;
}
void DBClientReplicaSet::invalidateLastSlaveOkCache() {
/* This is not wrapped in with if (_lastSlaveOkConn && _lastSlaveOkConn->isFailed())
* because there are certain exceptions that will not make the connection be labeled
* as failed. For example, asserts 13079, 13080, 16386
*/
_getMonitor()->failedHost(_lastSlaveOkHost);
resetSlaveOkConn();
}
void DBClientReplicaSet::reset() {
resetSlaveOkConn();
_lazyState._lastClient = NULL;
_lastReadPref.reset();
}
void DBClientReplicaSet::setAuthPooledSecondaryConn(bool setting) {
_authPooledSecondaryConn = setting;
}
void DBClientReplicaSet::resetMaster() {
if (_master.get() == _lastSlaveOkConn.get()) {
_lastSlaveOkConn.release();
_lastSlaveOkHost = HostAndPort();
}
_master.reset();
_masterHost = HostAndPort();
}
void DBClientReplicaSet::resetSlaveOkConn() {
if (_lastSlaveOkConn.get() == _master.get()) {
_lastSlaveOkConn.release();
}
else if (_lastSlaveOkConn.get() != NULL) {
if (_authPooledSecondaryConn) {
logoutAll(_lastSlaveOkConn.get());
}
else {
// Mongos pooled connections are all authenticated with the same credentials;
// so no need to logout.
}
// If the connection was bad, the pool will clean it up.
pool.release(_lastSlaveOkHost.toString(), _lastSlaveOkConn.release());
}
_lastSlaveOkHost = HostAndPort();
}
// trying to optimize for the common dont-care-about-tags case.
static const BSONArray tagsMatchesAll = BSON_ARRAY(BSONObj());
TagSet::TagSet() : _tags(tagsMatchesAll) {}
string readPrefToString( ReadPreference pref ) {
switch ( pref ) {
case ReadPreference_PrimaryOnly:
return "primary only";
case ReadPreference_PrimaryPreferred:
return "primary pref";
case ReadPreference_SecondaryOnly:
return "secondary only";
case ReadPreference_SecondaryPreferred:
return "secondary pref";
case ReadPreference_Nearest:
return "nearest";
default:
return "Unknown";
}
}
BSONObj ReadPreferenceSetting::toBSON() const {
BSONObjBuilder bob;
bob.append( "pref", readPrefToString( pref ) );
bob.append( "tags", tags.getTagBSON() );
return bob.obj();
}
}