diff options
-rw-r--r-- | client/dbclient_rs.cpp | 205 | ||||
-rw-r--r-- | client/dbclient_rs.h | 44 |
2 files changed, 185 insertions, 64 deletions
diff --git a/client/dbclient_rs.cpp b/client/dbclient_rs.cpp index b40f1981099..52d857a8e44 100644 --- a/client/dbclient_rs.cpp +++ b/client/dbclient_rs.cpp @@ -211,6 +211,7 @@ namespace mongo { void ReplicaSetMonitor::notifyFailure( const HostAndPort& server ) { scoped_lock lk( _lock ); + if ( _master >= 0 && _master < (int)_nodes.size() ) { if ( server == _nodes[_master].addr ) { _nodes[_master].ok = false; @@ -292,7 +293,7 @@ namespace mongo { } /** - * notify the monitor that server has faild + * notify the monitor that server has failed */ void ReplicaSetMonitor::notifySlaveFailure( const HostAndPort& server ) { int x = _find( server ); @@ -402,15 +403,15 @@ namespace mongo { // Our host list may have changed while waiting for another thread in the meantime, // so double-check here - // TODO: Do we really need this much protection, this should be pretty rare and not triggered - // from lots of threads, duping old behavior for safety + // TODO: Do we really need this much protection, this should be pretty rare and not + // triggered from lots of threads, duping old behavior for safety if( ! _shouldChangeHosts( hostList, true ) ){ changed = false; return; } - // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare and we - // want to record our changes + // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare and + // we want to record our changes log() << "changing hosts to " << hostList << " from " << _getServerAddress_inlock() << endl; NodeDiff diff = _getHostDiff_inlock( hostList ); @@ -424,7 +425,6 @@ namespace mongo { for( set<int>::reverse_iterator i = removed.rbegin(), end = removed.rend(); i != end; ++i ){ log() << "erasing host " << _nodes[ *i ] << " from replica set " << this->_name << endl; - _nodes.erase( _nodes.begin() + *i ); } @@ -450,29 +450,52 @@ namespace mongo { _nodes.push_back( Node( h , newConn ) ); } - } - - bool ReplicaSetMonitor::_checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose , int nodesOffset ) { - assert( c ); + bool ReplicaSetMonitor::_checkConnection( DBClientConnection* conn, + string& maybePrimary, bool verbose, int nodesOffset ) { + + assert( conn ); + scoped_lock lk( _checkConnectionLock ); bool isMaster = false; bool changed = false; + bool errorOccured = false; + + if ( nodesOffset >= 0 ){ + scoped_lock lk( _lock ); + if ( !_checkConnMatch_inlock( conn, nodesOffset )) { + /* Another thread modified _nodes -> invariant broken. + * This also implies that another thread just passed + * through here and refreshed _nodes. So no need to do + * duplicate work. + */ + return false; + } + } + try { Timer t; BSONObj o; - c->isMaster(isMaster, &o); + conn->isMaster( isMaster, &o ); + if ( o["setName"].type() != String || o["setName"].String() != _name ) { - warning() << "node: " << c->getServerAddress() << " isn't a part of set: " << _name + warning() << "node: " << conn->getServerAddress() + << " isn't a part of set: " << _name << " ismaster: " << o << endl; - if ( nodesOffset >= 0 ) + + if ( nodesOffset >= 0 ) { + scoped_lock lk( _lock ); _nodes[nodesOffset].ok = false; + } + return false; } if ( nodesOffset >= 0 ) { + scoped_lock lk( _lock ); + _nodes[nodesOffset].pingTimeMillis = t.millis(); _nodes[nodesOffset].hidden = o["hidden"].trueValue(); _nodes[nodesOffset].secondary = o["secondary"].trueValue(); @@ -481,7 +504,10 @@ namespace mongo { _nodes[nodesOffset].lastIsMaster = o.copy(); } - log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << c->toString() << ' ' << o << endl; + if ( verbose ) { + log() << "ReplicaSetMonitor::_checkConnection: " << conn->toString() + << ' ' << o << endl; + } // add other nodes BSONArrayBuilder b; @@ -492,18 +518,27 @@ namespace mongo { BSONObjIterator it( o["hosts"].Obj() ); while( it.more() ) b.append( it.next() ); } + if (o.hasField("passives") && o["passives"].type() == Array) { BSONObjIterator it( o["passives"].Obj() ); while( it.more() ) b.append( it.next() ); } _checkHosts( b.arr(), changed); - _checkStatus(c); + _checkStatus( conn ); - } catch ( std::exception& e ) { - log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: caught exception " << c->toString() << ' ' << e.what() << endl; + if ( verbose ) { + log() << "ReplicaSetMonitor::_checkConnection: caught exception " + << conn->toString() << ' ' << e.what() << endl; + } + + errorOccured = true; + } + + if ( errorOccured ) { + scoped_lock lk( _lock ); _nodes[nodesOffset].ok = false; } @@ -514,63 +549,114 @@ namespace mongo { } void ReplicaSetMonitor::_check( bool checkAllSecondaries ) { - - bool triedQuickCheck = false; - LOG(1) << "_check : " << getServerAddress() << endl; int newMaster = -1; + shared_ptr<DBClientConnection> nodeConn; for ( int retry = 0; retry < 2; retry++ ) { - for ( unsigned i=0; i<_nodes.size(); i++ ) { - shared_ptr<DBClientConnection> c; + bool triedQuickCheck = false; + + if ( !checkAllSecondaries ) { + scoped_lock lk( _lock ); + if ( _master >= 0 ) { + /* Nothing else to do since another thread already + * found the _master + */ + return; + } + } + + for ( unsigned i = 0; /* should not check while outside of lock! */ ; i++ ) { { scoped_lock lk( _lock ); - c = _nodes[i].conn; + if ( i >= _nodes.size() ) break; + nodeConn = _nodes[i].conn; } string maybePrimary; - if ( _checkConnection( c.get() , maybePrimary , retry , i ) ) { - _master = i; - newMaster = i; - if ( ! checkAllSecondaries ) - return; + if ( _checkConnection( nodeConn.get(), maybePrimary, retry, i ) ) { + scoped_lock lk( _lock ); + if ( _checkConnMatch_inlock( nodeConn.get(), i )) { + _master = i; + newMaster = i; + + if ( !checkAllSecondaries ) + return; + } + else { + /* + * Somebody modified _nodes and most likely set the new + * _master, so try again. + */ + break; + } } - if ( ! triedQuickCheck && maybePrimary.size() ) { - int x = _find( maybePrimary ); - if ( x >= 0 ) { + + if ( ! triedQuickCheck && ! maybePrimary.empty() ) { + int probablePrimaryIdx = -1; + shared_ptr<DBClientConnection> probablePrimaryConn; + + { + scoped_lock lk( _lock ); + probablePrimaryIdx = _find_inlock( maybePrimary ); + probablePrimaryConn = _nodes[probablePrimaryIdx].conn; + } + + if ( probablePrimaryIdx >= 0 ) { triedQuickCheck = true; + string dummy; - shared_ptr<DBClientConnection> testConn; - { + if ( _checkConnection( probablePrimaryConn.get(), dummy, + false, probablePrimaryIdx ) ) { + scoped_lock lk( _lock ); - testConn = _nodes[x].conn; - } - if ( _checkConnection( testConn.get() , dummy , false , x ) ) { - _master = x; - newMaster = x; - if ( ! checkAllSecondaries ) - return; + + if ( _checkConnMatch_inlock( probablePrimaryConn.get(), + probablePrimaryIdx )) { + + _master = probablePrimaryIdx; + newMaster = probablePrimaryIdx; + + if ( ! checkAllSecondaries ) + return; + } + else { + /* + * Somebody modified _nodes and most likely set the + * new _master, so try again. + */ + break; + } } } } - } if ( newMaster >= 0 ) return; - sleepsecs(1); + sleepsecs( 1 ); } - } void ReplicaSetMonitor::check( bool checkAllSecondaries ) { - // first see if the current master is fine - if ( _master >= 0 ) { + shared_ptr<DBClientConnection> masterConn; + + { + scoped_lock lk( _lock ); + + // first see if the current master is fine + if ( _master >= 0 ) { + masterConn = _nodes[_master].conn; + } + } + + if ( masterConn.get() != NULL ) { string temp; - if ( _checkConnection( _nodes[_master].conn.get() , temp , false , _master ) ) { + + if ( _checkConnection( masterConn.get(), temp, false, _master )) { if ( ! checkAllSecondaries ) { // current master is fine, so we're done return; @@ -588,21 +674,17 @@ namespace mongo { } int ReplicaSetMonitor::_find_inlock( const string& server ) const { - for ( unsigned i=0; i<_nodes.size(); i++ ) - if ( _nodes[i].addr == server ) - return i; - return -1; - } + const size_t size = _nodes.size(); - - int ReplicaSetMonitor::_find( const HostAndPort& server ) const { - scoped_lock lk( _lock ); - for ( unsigned i=0; i<_nodes.size(); i++ ) - if ( _nodes[i].addr == server ) + for ( unsigned i = 0; i < size; i++ ) { + if ( _nodes[i].addr == server ) { return i; + } + } + return -1; } - + void ReplicaSetMonitor::appendInfo( BSONObjBuilder& b ) const { scoped_lock lk( _lock ); BSONArrayBuilder hosts( b.subarrayStart( "hosts" ) ); @@ -622,6 +704,13 @@ namespace mongo { b.append( "nextSlave" , _nextSlave ); } + bool ReplicaSetMonitor::_checkConnMatch_inlock( DBClientConnection* conn, + size_t nodeOffset ) const { + + return ( nodeOffset < _nodes.size() && + conn->getServerAddress() == _nodes[nodeOffset].conn->getServerAddress() ); + } + mongo::mutex ReplicaSetMonitor::_setsLock( "ReplicaSetMonitor" ); map<string,ReplicaSetMonitorPtr> ReplicaSetMonitor::_sets; @@ -645,6 +734,7 @@ namespace mongo { // a master is selected. let's just make sure connection didn't die if ( ! _master->isFailed() ) return _master.get(); + _monitor->notifyFailure( _masterHost ); } @@ -687,7 +777,6 @@ namespace mongo { warning() << "cached auth failed for set: " << _monitor->getName() << " db: " << a.dbname << " user: " << a.username << endl; } - } DBClientConnection& DBClientReplicaSet::masterConn() { diff --git a/client/dbclient_rs.h b/client/dbclient_rs.h index b68af29ac0d..bf91f091822 100644 --- a/client/dbclient_rs.h +++ b/client/dbclient_rs.h @@ -107,6 +107,13 @@ namespace mongo { */ ReplicaSetMonitor( const string& name , const vector<HostAndPort>& servers ); + /** + * Checks all connections from the host list and sets the current + * master. + * + * @param checkAllSecondaries if set to false, stop immediately when + * the master is found or when _master is not -1. + */ void _check( bool checkAllSecondaries ); /** @@ -125,25 +132,50 @@ namespace mongo { /** * Updates host list. - * @param c the connection to check + * Invariant: if nodesOffset is >= 0, _nodes[nodesOffset].conn should be + * equal to conn. + * + * @param conn the connection to check * @param maybePrimary OUT * @param verbose * @param nodesOffset - offset into _nodes array, -1 for not in it - * @return if the connection is good + * + * @return true if the connection is good or false if invariant + * is broken */ - bool _checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose , int nodesOffset ); + bool _checkConnection( DBClientConnection* conn, string& maybePrimary, + bool verbose, int nodesOffset ); string _getServerAddress_inlock() const; NodeDiff _getHostDiff_inlock( const BSONObj& hostList ); bool _shouldChangeHosts( const BSONObj& hostList, bool inlock ); - + /** + * @return the index to _nodes corresponding to the server address. + */ int _find( const string& server ) const ; int _find_inlock( const string& server ) const ; - int _find( const HostAndPort& server ) const ; - mutable mongo::mutex _lock; // protects _nodes + /** + * Checks whether the given connection matches the connection stored in _nodes. + * Mainly used for sanity checking to confirm that nodeOffset still + * refers to the right connection after releasing and reacquiring + * a mutex. + */ + bool _checkConnMatch_inlock( DBClientConnection* conn, size_t nodeOffset ) const; + + // protects _nodes and indices pointing to it (_master & _nextSlave) + mutable mongo::mutex _lock; + + /** + * "Synchronizes" the _checkConnection method. Should ideally be one mutex per + * connection object being used. The purpose of this lock is to make sure that + * the reply from the connection the lock holder got is the actual response + * to what it sent. + * + * Deadlock WARNING: never acquire this while holding _lock + */ mutable mongo::mutex _checkConnectionLock; string _name; |