diff options
Diffstat (limited to 'src/mongo/client/dbclient_rs.cpp')
-rw-r--r-- | src/mongo/client/dbclient_rs.cpp | 993 |
1 files changed, 993 insertions, 0 deletions
diff --git a/src/mongo/client/dbclient_rs.cpp b/src/mongo/client/dbclient_rs.cpp new file mode 100644 index 00000000000..2d9e0fbabba --- /dev/null +++ b/src/mongo/client/dbclient_rs.cpp @@ -0,0 +1,993 @@ +// dbclient.cpp - connect to a Mongo database as a database, from C++ + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "pch.h" +#include "dbclient.h" +#include "../bson/util/builder.h" +#include "../db/jsobj.h" +#include "../db/json.h" +#include "../db/dbmessage.h" +#include "connpool.h" +#include "dbclient_rs.h" +#include "../util/background.h" +#include "../util/timer.h" + +namespace mongo { + + // -------------------------------- + // ----- ReplicaSetMonitor --------- + // -------------------------------- + + // global background job responsible for checking every X amount of time + class ReplicaSetMonitorWatcher : public BackgroundJob { + public: + ReplicaSetMonitorWatcher() : _safego("ReplicaSetMonitorWatcher::_safego") , _started(false) {} + + virtual string name() const { return "ReplicaSetMonitorWatcher"; } + + void safeGo() { + // check outside of lock for speed + if ( _started ) + return; + + scoped_lock lk( _safego ); + if ( _started ) + return; + _started = true; + + go(); + } + protected: + void run() { + log() << "starting" << endl; + while ( ! inShutdown() ) { + sleepsecs( 10 ); + try { + ReplicaSetMonitor::checkAll( true ); + } + catch ( std::exception& e ) { + error() << "check failed: " << e.what() << endl; + } + catch ( ... ) { + error() << "unkown error" << endl; + } + } + } + + mongo::mutex _safego; + bool _started; + + } replicaSetMonitorWatcher; + + string seedString( const vector<HostAndPort>& servers ){ + string seedStr; + for ( unsigned i = 0; i < servers.size(); i++ ){ + seedStr += servers[i].toString(); + if( i < servers.size() - 1 ) seedStr += ","; + } + + return seedStr; + } + + ReplicaSetMonitor::ReplicaSetMonitor( const string& name , const vector<HostAndPort>& servers ) + : _lock( "ReplicaSetMonitor instance" ) , _checkConnectionLock( "ReplicaSetMonitor check connection lock" ), _name( name ) , _master(-1), _nextSlave(0) { + + uassert( 13642 , "need at least 1 node for a replica set" , servers.size() > 0 ); + + if ( _name.size() == 0 ) { + warning() << "replica set name empty, first node: " << servers[0] << endl; + } + + log() << "starting new replica set monitor for replica set " << _name << " with seed of " << seedString( servers ) << endl; + + string errmsg; + for ( unsigned i = 0; i < servers.size(); i++ ) { + + // Don't check servers we have already + if( _find_inlock( servers[i] ) >= 0 ) continue; + + auto_ptr<DBClientConnection> conn( new DBClientConnection( true , 0, 5.0 ) ); + try{ + if( ! conn->connect( servers[i] , errmsg ) ){ + throw DBException( errmsg, 15928 ); + } + log() << "successfully connected to seed " << servers[i] << " for replica set " << this->_name << endl; + } + catch( DBException& e ){ + log() << "error connecting to seed " << servers[i] << causedBy( e ) << endl; + // skip seeds that don't work + continue; + } + + string maybePrimary; + _checkConnection( conn.get(), maybePrimary, false, -1 ); + } + + // Check everything to get the first data + _check( true ); + + log() << "replica set monitor for replica set " << _name << " started, address is " << getServerAddress() << endl; + + } + + ReplicaSetMonitor::~ReplicaSetMonitor() { + _nodes.clear(); + _master = -1; + } + + ReplicaSetMonitorPtr ReplicaSetMonitor::get( const string& name , const vector<HostAndPort>& servers ) { + scoped_lock lk( _setsLock ); + ReplicaSetMonitorPtr& m = _sets[name]; + if ( ! m ) + m.reset( new ReplicaSetMonitor( name , servers ) ); + + replicaSetMonitorWatcher.safeGo(); + + return m; + } + + ReplicaSetMonitorPtr ReplicaSetMonitor::get( const string& name ) { + scoped_lock lk( _setsLock ); + map<string,ReplicaSetMonitorPtr>::const_iterator i = _sets.find( name ); + if ( i == _sets.end() ) + return ReplicaSetMonitorPtr(); + return i->second; + } + + + void ReplicaSetMonitor::checkAll( bool checkAllSecondaries ) { + set<string> seen; + + while ( true ) { + ReplicaSetMonitorPtr m; + { + scoped_lock lk( _setsLock ); + for ( map<string,ReplicaSetMonitorPtr>::iterator i=_sets.begin(); i!=_sets.end(); ++i ) { + string name = i->first; + if ( seen.count( name ) ) + continue; + LOG(1) << "checking replica set: " << name << endl; + seen.insert( name ); + m = i->second; + break; + } + } + + if ( ! m ) + break; + + m->check( checkAllSecondaries ); + } + + + } + + void ReplicaSetMonitor::setConfigChangeHook( ConfigChangeHook hook ) { + massert( 13610 , "ConfigChangeHook already specified" , _hook == 0 ); + _hook = hook; + } + + string ReplicaSetMonitor::getServerAddress() const { + scoped_lock lk( _lock ); + return _getServerAddress_inlock(); + } + + string ReplicaSetMonitor::_getServerAddress_inlock() const { + StringBuilder ss; + if ( _name.size() ) + ss << _name << "/"; + + for ( unsigned i=0; i<_nodes.size(); i++ ) { + if ( i > 0 ) + ss << ","; + ss << _nodes[i].addr.toString(); + } + + return ss.str(); + } + + bool ReplicaSetMonitor::contains( const string& server ) const { + scoped_lock lk( _lock ); + for ( unsigned i=0; i<_nodes.size(); i++ ) { + if ( _nodes[i].addr == server ) + return true; + } + return false; + } + + + void ReplicaSetMonitor::notifyFailure( const HostAndPort& server ) { + scoped_lock lk( _lock ); + if ( _master >= 0 && _master < (int)_nodes.size() ) { + if ( server == _nodes[_master].addr ) { + _nodes[_master].ok = false; + _master = -1; + } + } + } + + + + HostAndPort ReplicaSetMonitor::getMaster() { + { + scoped_lock lk( _lock ); + if ( _master >= 0 && _nodes[_master].ok ) + return _nodes[_master].addr; + } + + _check( false ); + + scoped_lock lk( _lock ); + uassert( 10009 , str::stream() << "ReplicaSetMonitor no master found for set: " << _name , _master >= 0 ); + return _nodes[_master].addr; + } + + HostAndPort ReplicaSetMonitor::getSlave( const HostAndPort& prev ) { + // make sure its valid + + bool wasFound = false; + bool wasMaster = false; + + // This is always true, since checked in port() + assert( prev.port() >= 0 ); + if( prev.host().size() ){ + scoped_lock lk( _lock ); + for ( unsigned i=0; i<_nodes.size(); i++ ) { + if ( prev != _nodes[i].addr ) + continue; + + wasFound = true; + + if ( _nodes[i].okForSecondaryQueries() ) + return prev; + + wasMaster = _nodes[i].ok && ! _nodes[i].secondary; + + break; + } + } + + if( prev.host().size() ){ + if( wasFound ){ LOG(1) << "slave '" << prev << ( wasMaster ? "' is master node, trying to find another node" : + "' is no longer ok to use" ) << endl; } + else{ LOG(1) << "slave '" << prev << "' was not found in the replica set" << endl; } + } + else LOG(1) << "slave '" << prev << "' is not initialized or invalid" << endl; + + return getSlave(); + } + + HostAndPort ReplicaSetMonitor::getSlave() { + LOG(2) << "dbclient_rs getSlave " << getServerAddress() << endl; + + scoped_lock lk( _lock ); + + for ( unsigned ii = 0; ii < _nodes.size(); ii++ ) { + _nextSlave = ( _nextSlave + 1 ) % _nodes.size(); + if ( _nextSlave != _master ) { + if ( _nodes[ _nextSlave ].okForSecondaryQueries() ) + return _nodes[ _nextSlave ].addr; + LOG(2) << "dbclient_rs getSlave not selecting " << _nodes[_nextSlave] << ", not currently okForSecondaryQueries" << endl; + } + } + uassert(15899, str::stream() << "No suitable member found for slaveOk query in replica set: " << _name, _master >= 0 && _nodes[_master].ok); + + // Fall back to primary + assert( static_cast<unsigned>(_master) < _nodes.size() ); + LOG(2) << "dbclient_rs getSlave no member in secondary state found, returning primary " << _nodes[ _master ] << endl; + return _nodes[_master].addr; + } + + /** + * notify the monitor that server has faild + */ + void ReplicaSetMonitor::notifySlaveFailure( const HostAndPort& server ) { + int x = _find( server ); + if ( x >= 0 ) { + scoped_lock lk( _lock ); + _nodes[x].ok = false; + } + } + + void ReplicaSetMonitor::_checkStatus(DBClientConnection *conn) { + BSONObj status; + + if (!conn->runCommand("admin", BSON("replSetGetStatus" << 1), status) ) { + LOG(1) << "dbclient_rs replSetGetStatus failed" << endl; + return; + } + if( !status.hasField("members") ) { + log() << "dbclient_rs error expected members field in replSetGetStatus result" << endl; + return; + } + if( status["members"].type() != Array) { + log() << "dbclient_rs error expected members field in replSetGetStatus result to be an array" << endl; + return; + } + + BSONObjIterator hi(status["members"].Obj()); + while (hi.more()) { + BSONObj member = hi.next().Obj(); + string host = member["name"].String(); + + int m = -1; + if ((m = _find(host)) < 0) { + LOG(1) << "dbclient_rs _checkStatus couldn't _find(" << host << ')' << endl; + continue; + } + + double state = member["state"].Number(); + if (member["health"].Number() == 1 && (state == 1 || state == 2)) { + LOG(1) << "dbclient_rs nodes["<<m<<"].ok = true " << host << endl; + scoped_lock lk( _lock ); + _nodes[m].ok = true; + } + else { + LOG(1) << "dbclient_rs nodes["<<m<<"].ok = false " << host << endl; + scoped_lock lk( _lock ); + _nodes[m].ok = false; + } + } + } + + NodeDiff ReplicaSetMonitor::_getHostDiff_inlock( const BSONObj& hostList ){ + + NodeDiff diff; + set<int> nodesFound; + + int index = 0; + BSONObjIterator hi( hostList ); + while( hi.more() ){ + + string toCheck = hi.next().String(); + int nodeIndex = _find_inlock( toCheck ); + + // Node-to-add + if( nodeIndex < 0 ) diff.first.insert( toCheck ); + else nodesFound.insert( nodeIndex ); + + index++; + } + + for( size_t i = 0; i < _nodes.size(); i++ ){ + if( nodesFound.find( static_cast<int>(i) ) == nodesFound.end() ) diff.second.insert( static_cast<int>(i) ); + } + + return diff; + } + + bool ReplicaSetMonitor::_shouldChangeHosts( const BSONObj& hostList, bool inlock ){ + + int origHosts = 0; + if( ! inlock ){ + scoped_lock lk( _lock ); + origHosts = _nodes.size(); + } + else origHosts = _nodes.size(); + int numHosts = 0; + bool changed = false; + + BSONObjIterator hi(hostList); + while ( hi.more() ) { + string toCheck = hi.next().String(); + + numHosts++; + int index = 0; + if( ! inlock ) index = _find( toCheck ); + else index = _find_inlock( toCheck ); + + if ( index >= 0 ) continue; + + changed = true; + break; + } + + return changed || origHosts != numHosts; + + } + + void ReplicaSetMonitor::_checkHosts( const BSONObj& hostList, bool& changed ) { + + // Fast path, still requires intermittent locking + if( ! _shouldChangeHosts( hostList, false ) ){ + changed = false; + return; + } + + // Slow path, double-checked though + scoped_lock lk( _lock ); + + // Our host list may have changed while waiting for another thread in the meantime, + // so double-check here + // TODO: Do we really need this much protection, this should be pretty rare and not triggered + // from lots of threads, duping old behavior for safety + if( ! _shouldChangeHosts( hostList, true ) ){ + changed = false; + return; + } + + // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare and we + // want to record our changes + log() << "changing hosts to " << hostList << " from " << _getServerAddress_inlock() << endl; + + NodeDiff diff = _getHostDiff_inlock( hostList ); + set<string> added = diff.first; + set<int> removed = diff.second; + + assert( added.size() > 0 || removed.size() > 0 ); + changed = true; + + // Delete from the end so we don't invalidate as we delete, delete indices are ascending + for( set<int>::reverse_iterator i = removed.rbegin(), end = removed.rend(); i != end; ++i ){ + + log() << "erasing host " << _nodes[ *i ] << " from replica set " << this->_name << endl; + + _nodes.erase( _nodes.begin() + *i ); + } + + // Add new nodes + for( set<string>::iterator i = added.begin(), end = added.end(); i != end; ++i ){ + + log() << "trying to add new host " << *i << " to replica set " << this->_name << endl; + + // Connect to new node + HostAndPort h( *i ); + DBClientConnection * newConn = new DBClientConnection( true, 0, 5.0 ); + + string errmsg; + try{ + if( ! newConn->connect( h , errmsg ) ){ + throw DBException( errmsg, 15927 ); + } + log() << "successfully connected to new host " << *i << " in replica set " << this->_name << endl; + } + catch( DBException& e ){ + warning() << "cannot connect to new host " << *i << " to replica set " << this->_name << causedBy( e ) << endl; + delete newConn; + newConn = NULL; + } + + _nodes.push_back( Node( h , newConn ) ); + } + + } + + + + bool ReplicaSetMonitor::_checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose , int nodesOffset ) { + scoped_lock lk( _checkConnectionLock ); + bool isMaster = false; + bool changed = false; + try { + Timer t; + BSONObj o; + c->isMaster(isMaster, &o); + if ( o["setName"].type() != String || o["setName"].String() != _name ) { + warning() << "node: " << c->getServerAddress() << " isn't a part of set: " << _name + << " ismaster: " << o << endl; + if ( nodesOffset >= 0 ) + _nodes[nodesOffset].ok = false; + return false; + } + + if ( nodesOffset >= 0 ) { + _nodes[nodesOffset].pingTimeMillis = t.millis(); + _nodes[nodesOffset].hidden = o["hidden"].trueValue(); + _nodes[nodesOffset].secondary = o["secondary"].trueValue(); + _nodes[nodesOffset].ismaster = o["ismaster"].trueValue(); + + _nodes[nodesOffset].lastIsMaster = o.copy(); + } + + log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << c->toString() << ' ' << o << endl; + + // add other nodes + BSONArrayBuilder b; + if ( o["hosts"].type() == Array ) { + if ( o["primary"].type() == String ) + maybePrimary = o["primary"].String(); + + BSONObjIterator it( o["hosts"].Obj() ); + while( it.more() ) b.append( it.next() ); + } + if (o.hasField("passives") && o["passives"].type() == Array) { + BSONObjIterator it( o["passives"].Obj() ); + while( it.more() ) b.append( it.next() ); + } + + _checkHosts( b.arr(), changed); + _checkStatus(c); + + + } + catch ( std::exception& e ) { + log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: caught exception " << c->toString() << ' ' << e.what() << endl; + _nodes[nodesOffset].ok = false; + } + + if ( changed && _hook ) + _hook( this ); + + return isMaster; + } + + void ReplicaSetMonitor::_check( bool checkAllSecondaries ) { + + bool triedQuickCheck = false; + + LOG(1) << "_check : " << getServerAddress() << endl; + + int newMaster = -1; + + for ( int retry = 0; retry < 2; retry++ ) { + for ( unsigned i=0; i<_nodes.size(); i++ ) { + shared_ptr<DBClientConnection> c; + { + scoped_lock lk( _lock ); + c = _nodes[i].conn; + } + + string maybePrimary; + if ( _checkConnection( c.get() , maybePrimary , retry , i ) ) { + _master = i; + newMaster = i; + if ( ! checkAllSecondaries ) + return; + } + + if ( ! triedQuickCheck && maybePrimary.size() ) { + int x = _find( maybePrimary ); + if ( x >= 0 ) { + triedQuickCheck = true; + string dummy; + shared_ptr<DBClientConnection> testConn; + { + scoped_lock lk( _lock ); + testConn = _nodes[x].conn; + } + if ( _checkConnection( testConn.get() , dummy , false , x ) ) { + _master = x; + newMaster = x; + if ( ! checkAllSecondaries ) + return; + } + } + } + + } + + if ( newMaster >= 0 ) + return; + + sleepsecs(1); + } + + } + + void ReplicaSetMonitor::check( bool checkAllSecondaries ) { + // first see if the current master is fine + if ( _master >= 0 ) { + string temp; + if ( _checkConnection( _nodes[_master].conn.get() , temp , false , _master ) ) { + if ( ! checkAllSecondaries ) { + // current master is fine, so we're done + return; + } + } + } + + // we either have no master, or the current is dead + _check( checkAllSecondaries ); + } + + int ReplicaSetMonitor::_find( const string& server ) const { + scoped_lock lk( _lock ); + return _find_inlock( server ); + } + + int ReplicaSetMonitor::_find_inlock( const string& server ) const { + for ( unsigned i=0; i<_nodes.size(); i++ ) + if ( _nodes[i].addr == server ) + return i; + return -1; + } + + + int ReplicaSetMonitor::_find( const HostAndPort& server ) const { + scoped_lock lk( _lock ); + for ( unsigned i=0; i<_nodes.size(); i++ ) + if ( _nodes[i].addr == server ) + return i; + return -1; + } + + void ReplicaSetMonitor::appendInfo( BSONObjBuilder& b ) const { + scoped_lock lk( _lock ); + BSONArrayBuilder hosts( b.subarrayStart( "hosts" ) ); + for ( unsigned i=0; i<_nodes.size(); i++ ) { + hosts.append( BSON( "addr" << _nodes[i].addr << + // "lastIsMaster" << _nodes[i].lastIsMaster << // this is a potential race, so only used when debugging + "ok" << _nodes[i].ok << + "ismaster" << _nodes[i].ismaster << + "hidden" << _nodes[i].hidden << + "secondary" << _nodes[i].secondary << + "pingTimeMillis" << _nodes[i].pingTimeMillis ) ); + + } + hosts.done(); + + b.append( "master" , _master ); + b.append( "nextSlave" , _nextSlave ); + } + + + mongo::mutex ReplicaSetMonitor::_setsLock( "ReplicaSetMonitor" ); + map<string,ReplicaSetMonitorPtr> ReplicaSetMonitor::_sets; + ReplicaSetMonitor::ConfigChangeHook ReplicaSetMonitor::_hook; + // -------------------------------- + // ----- DBClientReplicaSet --------- + // -------------------------------- + + DBClientReplicaSet::DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers, double so_timeout ) + : _monitor( ReplicaSetMonitor::get( name , servers ) ), + _so_timeout( so_timeout ) { + } + + DBClientReplicaSet::~DBClientReplicaSet() { + } + + DBClientConnection * DBClientReplicaSet::checkMaster() { + HostAndPort h = _monitor->getMaster(); + + if ( h == _masterHost && _master ) { + // a master is selected. let's just make sure connection didn't die + if ( ! _master->isFailed() ) + return _master.get(); + _monitor->notifyFailure( _masterHost ); + } + + _masterHost = _monitor->getMaster(); + _master.reset( new DBClientConnection( true , this , _so_timeout ) ); + string errmsg; + if ( ! _master->connect( _masterHost , errmsg ) ) { + _monitor->notifyFailure( _masterHost ); + uasserted( 13639 , str::stream() << "can't connect to new replica set master [" << _masterHost.toString() << "] err: " << errmsg ); + } + _auth( _master.get() ); + return _master.get(); + } + + DBClientConnection * DBClientReplicaSet::checkSlave() { + HostAndPort h = _monitor->getSlave( _slaveHost ); + + if ( h == _slaveHost && _slave ) { + if ( ! _slave->isFailed() ) + return _slave.get(); + _monitor->notifySlaveFailure( _slaveHost ); + _slaveHost = _monitor->getSlave(); + } + else { + _slaveHost = h; + } + + _slave.reset( new DBClientConnection( true , this , _so_timeout ) ); + _slave->connect( _slaveHost ); + _auth( _slave.get() ); + return _slave.get(); + } + + + void DBClientReplicaSet::_auth( DBClientConnection * conn ) { + for ( list<AuthInfo>::iterator i=_auths.begin(); i!=_auths.end(); ++i ) { + const AuthInfo& a = *i; + string errmsg; + if ( ! conn->auth( a.dbname , a.username , a.pwd , errmsg, a.digestPassword ) ) + warning() << "cached auth failed for set: " << _monitor->getName() << " db: " << a.dbname << " user: " << a.username << endl; + + } + + } + + DBClientConnection& DBClientReplicaSet::masterConn() { + return *checkMaster(); + } + + DBClientConnection& DBClientReplicaSet::slaveConn() { + return *checkSlave(); + } + + bool DBClientReplicaSet::connect() { + try { + checkMaster(); + } + catch (AssertionException&) { + if (_master && _monitor) { + _monitor->notifyFailure(_masterHost); + } + return false; + } + return true; + } + + bool DBClientReplicaSet::auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword ) { + DBClientConnection * m = checkMaster(); + + // first make sure it actually works + if( ! m->auth(dbname, username, pwd, errmsg, digestPassword ) ) + return false; + + // now that it does, we should save so that for a new node we can auth + _auths.push_back( AuthInfo( dbname , username , pwd , digestPassword ) ); + return true; + } + + // ------------- simple functions ----------------- + + void DBClientReplicaSet::insert( const string &ns , BSONObj obj , int flags) { + checkMaster()->insert(ns, obj, flags); + } + + void DBClientReplicaSet::insert( const string &ns, const vector< BSONObj >& v , int flags) { + checkMaster()->insert(ns, v, flags); + } + + void DBClientReplicaSet::remove( const string &ns , Query obj , bool justOne ) { + checkMaster()->remove(ns, obj, justOne); + } + + void DBClientReplicaSet::update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ) { + return checkMaster()->update(ns, query, obj, upsert,multi); + } + + auto_ptr<DBClientCursor> DBClientReplicaSet::query(const string &ns, Query query, int nToReturn, int nToSkip, + const BSONObj *fieldsToReturn, int queryOptions, int batchSize) { + + if ( queryOptions & QueryOption_SlaveOk ) { + // we're ok sending to a slave + // we'll try 2 slaves before just using master + // checkSlave will try a different slave automatically after a failure + for ( int i=0; i<3; i++ ) { + try { + return checkSlaveQueryResult( checkSlave()->query(ns,query,nToReturn,nToSkip,fieldsToReturn,queryOptions,batchSize) ); + } + catch ( DBException &e ) { + LOG(1) << "can't query replica set slave " << i << " : " << _slaveHost << causedBy( e ) << endl; + } + } + } + + return checkMaster()->query(ns,query,nToReturn,nToSkip,fieldsToReturn,queryOptions,batchSize); + } + + BSONObj DBClientReplicaSet::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) { + if ( queryOptions & QueryOption_SlaveOk ) { + // we're ok sending to a slave + // we'll try 2 slaves before just using master + // checkSlave will try a different slave automatically after a failure + for ( int i=0; i<3; i++ ) { + try { + return checkSlave()->findOne(ns,query,fieldsToReturn,queryOptions); + } + catch ( DBException &e ) { + LOG(1) << "can't findone replica set slave " << i << " : " << _slaveHost << causedBy( e ) << endl; + } + } + } + + return checkMaster()->findOne(ns,query,fieldsToReturn,queryOptions); + } + + void DBClientReplicaSet::killCursor( long long cursorID ) { + // we should neve call killCursor on a replica set conncetion + // since we don't know which server it belongs to + // can't assume master because of slave ok + // and can have a cursor survive a master change + assert(0); + } + + void DBClientReplicaSet::isntMaster() { + log() << "got not master for: " << _masterHost << endl; + _monitor->notifyFailure( _masterHost ); + _master.reset(); + } + + auto_ptr<DBClientCursor> DBClientReplicaSet::checkSlaveQueryResult( auto_ptr<DBClientCursor> result ){ + BSONObj error; + bool isError = result->peekError( &error ); + if( ! isError ) return result; + + // We only check for "not master or secondary" errors here + + // If the error code here ever changes, we need to change this code also + BSONElement code = error["code"]; + if( code.isNumber() && code.Int() == 13436 /* not master or secondary */ ){ + isntSecondary(); + throw DBException( str::stream() << "slave " << _slaveHost.toString() << " is no longer secondary", 14812 ); + } + + return result; + } + + void DBClientReplicaSet::isntSecondary() { + log() << "slave no longer has secondary status: " << _slaveHost << endl; + // Failover to next slave + _monitor->notifySlaveFailure( _slaveHost ); + _slave.reset(); + } + + void DBClientReplicaSet::say( Message& toSend, bool isRetry ) { + + if( ! isRetry ) + _lazyState = LazyState(); + + int lastOp = -1; + bool slaveOk = false; + + if ( ( lastOp = toSend.operation() ) == dbQuery ) { + // TODO: might be possible to do this faster by changing api + DbMessage dm( toSend ); + QueryMessage qm( dm ); + if ( ( slaveOk = ( qm.queryOptions & QueryOption_SlaveOk ) ) ) { + + for ( int i = _lazyState._retries; i < 3; i++ ) { + try { + DBClientConnection* slave = checkSlave(); + slave->say( toSend ); + + _lazyState._lastOp = lastOp; + _lazyState._slaveOk = slaveOk; + _lazyState._retries = i; + _lazyState._lastClient = slave; + return; + } + catch ( DBException &e ) { + LOG(1) << "can't callLazy replica set slave " << i << " : " << _slaveHost << causedBy( e ) << endl; + } + } + } + } + + DBClientConnection* master = checkMaster(); + master->say( toSend ); + + _lazyState._lastOp = lastOp; + _lazyState._slaveOk = slaveOk; + _lazyState._retries = 3; + _lazyState._lastClient = master; + return; + } + + bool DBClientReplicaSet::recv( Message& m ) { + + assert( _lazyState._lastClient ); + + // TODO: It would be nice if we could easily wrap a conn error as a result error + try { + return _lazyState._lastClient->recv( m ); + } + catch( DBException& e ){ + log() << "could not receive data from " << _lazyState._lastClient << causedBy( e ) << endl; + return false; + } + } + + void DBClientReplicaSet::checkResponse( const char* data, int nReturned, bool* retry, string* targetHost ){ + + // For now, do exactly as we did before, so as not to break things. In general though, we + // should fix this so checkResponse has a more consistent contract. + if( ! retry ){ + if( _lazyState._lastClient ) + return _lazyState._lastClient->checkResponse( data, nReturned ); + else + return checkMaster()->checkResponse( data, nReturned ); + } + + *retry = false; + if( targetHost && _lazyState._lastClient ) *targetHost = _lazyState._lastClient->getServerAddress(); + else if (targetHost) *targetHost = ""; + + if( ! _lazyState._lastClient ) return; + if( nReturned != 1 && nReturned != -1 ) return; + + BSONObj dataObj; + if( nReturned == 1 ) dataObj = BSONObj( data ); + + // Check if we should retry here + if( _lazyState._lastOp == dbQuery && _lazyState._slaveOk ){ + + // Check the error code for a slave not secondary error + if( nReturned == -1 || + ( hasErrField( dataObj ) && ! dataObj["code"].eoo() && dataObj["code"].Int() == 13436 ) ){ + + bool wasMaster = false; + if( _lazyState._lastClient == _slave.get() ){ + isntSecondary(); + } + else if( _lazyState._lastClient == _master.get() ){ + wasMaster = true; + isntMaster(); + } + else + warning() << "passed " << dataObj << " but last rs client " << _lazyState._lastClient->toString() << " is not master or secondary" << endl; + + if( _lazyState._retries < 3 ){ + _lazyState._retries++; + *retry = true; + } + else{ + (void)wasMaster; // silence set-but-not-used warning + // assert( wasMaster ); + // printStackTrace(); + log() << "too many retries (" << _lazyState._retries << "), could not get data from replica set" << endl; + } + } + } + } + + + bool DBClientReplicaSet::call( Message &toSend, Message &response, bool assertOk , string * actualServer ) { + const char * ns = 0; + + if ( toSend.operation() == dbQuery ) { + // TODO: might be possible to do this faster by changing api + DbMessage dm( toSend ); + QueryMessage qm( dm ); + ns = qm.ns; + + if ( qm.queryOptions & QueryOption_SlaveOk ) { + for ( int i=0; i<3; i++ ) { + try { + DBClientConnection* s = checkSlave(); + if ( actualServer ) + *actualServer = s->getServerAddress(); + return s->call( toSend , response , assertOk ); + } + catch ( DBException &e ) { + LOG(1) << "can't call replica set slave " << i << " : " << _slaveHost << causedBy( e ) << endl; + if ( actualServer ) + *actualServer = ""; + } + } + } + } + + DBClientConnection* m = checkMaster(); + if ( actualServer ) + *actualServer = m->getServerAddress(); + + if ( ! m->call( toSend , response , assertOk ) ) + return false; + + if ( ns ) { + QueryResult * res = (QueryResult*)response.singleData(); + if ( res->nReturned == 1 ) { + BSONObj x(res->data() ); + if ( str::contains( ns , "$cmd" ) ) { + if ( isNotMasterErrorString( x["errmsg"] ) ) + isntMaster(); + } + else { + if ( isNotMasterErrorString( getErrField( x ) ) ) + isntMaster(); + } + } + } + + return true; + } + +} |