diff options
author | Greg Studer <greg@10gen.com> | 2011-12-07 17:15:16 -0500 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2012-02-01 14:00:05 -0500 |
commit | 6b2231a15c45104c5e8ed292ab4eb5ad3fc94a29 (patch) | |
tree | 22e6c7c712020a1ab182e0bea9349d3f132b9aa9 | |
parent | 9a09c9f1af58f8c3f78848cf4e158c082d1bf6be (diff) | |
download | mongo-6b2231a15c45104c5e8ed292ab4eb5ad3fc94a29.tar.gz |
SERVER-4399 make replica set updating more reliable in c++ driver, remove nodes when no longer present in config
-rw-r--r-- | client/dbclient_rs.cpp | 192 | ||||
-rw-r--r-- | client/dbclient_rs.h | 10 |
2 files changed, 165 insertions, 37 deletions
diff --git a/client/dbclient_rs.cpp b/client/dbclient_rs.cpp index c57a52dd593..dccaf571a4f 100644 --- a/client/dbclient_rs.cpp +++ b/client/dbclient_rs.cpp @@ -72,6 +72,15 @@ namespace mongo { } replicaSetMonitorWatcher; + string seedString( const vector<HostAndPort>& servers ){ + string seedStr; + for ( unsigned i = 0; i < servers.size(); i++ ){ + seedStr += servers[i].toString(); + if( i < servers.size() - 1 ) seedStr += ","; + } + + return seedStr; + } ReplicaSetMonitor::ReplicaSetMonitor( const string& name , const vector<HostAndPort>& servers ) : _lock( "ReplicaSetMonitor instance" ) , _checkConnectionLock( "ReplicaSetMonitor check connection lock" ), _name( name ) , _master(-1), _nextSlave(0) { @@ -82,28 +91,36 @@ namespace mongo { warning() << "replica set name empty, first node: " << servers[0] << endl; } - string errmsg; + log() << "starting new replica set monitor for replica set " << _name << " with seed of " << seedString( servers ) << endl; - for ( unsigned i=0; i<servers.size(); i++ ) { + string errmsg; + for ( unsigned i = 0; i < servers.size(); i++ ) { - bool haveAlready = false; - for ( unsigned n = 0; n < _nodes.size() && ! haveAlready; n++ ) - haveAlready = ( _nodes[n].addr == servers[i] ); - if( haveAlready ) continue; + // Don't check servers we have already + if( _find_inlock( servers[i] ) >= 0 ) continue; auto_ptr<DBClientConnection> conn( new DBClientConnection( true , 0, 5.0 ) ); - if (!conn->connect( servers[i] , errmsg ) ) { - log(1) << "error connecting to seed " << servers[i] << ": " << errmsg << endl; + try{ + if( ! conn->connect( servers[i] , errmsg ) ){ + throw DBException( errmsg, 15928 ); + } + log() << "successfully connected to seed " << servers[i] << " for replica set " << this->_name << endl; + } + catch( DBException& e ){ + log() << "error connecting to seed " << servers[i] << causedBy( e ) << endl; // skip seeds that don't work continue; } - _nodes.push_back( Node( servers[i] , conn.release() ) ); - - int myLoc = _nodes.size() - 1; string maybePrimary; - _checkConnection( _nodes[myLoc].conn.get() , maybePrimary, false, myLoc ); + _checkConnection( conn.get(), maybePrimary, false, -1 ); } + + // Check everything to get the first data + _check( true ); + + log() << "replica set monitor for replica set " << _name << " started, address is " << getServerAddress() << endl; + } ReplicaSetMonitor::~ReplicaSetMonitor() { @@ -164,18 +181,21 @@ namespace mongo { } string ReplicaSetMonitor::getServerAddress() const { + scoped_lock lk( _lock ); + return _getServerAddress_inlock(); + } + + string ReplicaSetMonitor::_getServerAddress_inlock() const { StringBuilder ss; if ( _name.size() ) ss << _name << "/"; - { - scoped_lock lk( _lock ); - for ( unsigned i=0; i<_nodes.size(); i++ ) { - if ( i > 0 ) - ss << ","; - ss << _nodes[i].addr.toString(); - } + for ( unsigned i=0; i<_nodes.size(); i++ ) { + if ( i > 0 ) + ss << ","; + ss << _nodes[i].addr.toString(); } + return ss.str(); } @@ -313,29 +333,126 @@ namespace mongo { } } - void ReplicaSetMonitor::_checkHosts( const BSONObj& hostList, bool& changed ) { + NodeDiff ReplicaSetMonitor::_getHostDiff_inlock( const BSONObj& hostList ){ + + NodeDiff diff; + set<int> nodesFound; + + int index = 0; + BSONObjIterator hi( hostList ); + while( hi.more() ){ + + string toCheck = hi.next().String(); + int nodeIndex = _find_inlock( toCheck ); + + // Node-to-add + if( nodeIndex < 0 ) diff.first.insert( toCheck ); + else nodesFound.insert( nodeIndex ); + + index++; + } + + for( size_t i = 0; i < _nodes.size(); i++ ){ + if( nodesFound.find( static_cast<int>(i) ) == nodesFound.end() ) diff.second.insert( static_cast<int>(i) ); + } + + return diff; + } + + bool ReplicaSetMonitor::_shouldChangeHosts( const BSONObj& hostList, bool inlock ){ + + int origHosts = 0; + if( ! inlock ){ + scoped_lock lk( _lock ); + origHosts = _nodes.size(); + } + else origHosts = _nodes.size(); + int numHosts = 0; + bool changed = false; + BSONObjIterator hi(hostList); while ( hi.more() ) { string toCheck = hi.next().String(); - if ( _find( toCheck ) >= 0 ) - continue; + numHosts++; + int index = 0; + if( ! inlock ) index = _find( toCheck ); + else index = _find_inlock( toCheck ); + + if ( index >= 0 ) continue; + + changed = true; + break; + } + + return changed || origHosts != numHosts; + + } + + void ReplicaSetMonitor::_checkHosts( const BSONObj& hostList, bool& changed ) { + + // Fast path, still requires intermittent locking + if( ! _shouldChangeHosts( hostList, false ) ){ + changed = false; + return; + } + + // Slow path, double-checked though + scoped_lock lk( _lock ); + + // Our host list may have changed while waiting for another thread in the meantime, + // so double-check here + // TODO: Do we really need this much protection, this should be pretty rare and not triggered + // from lots of threads, duping old behavior for safety + if( ! _shouldChangeHosts( hostList, true ) ){ + changed = false; + return; + } + + // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare and we + // want to record our changes + log() << "changing hosts to " << hostList << " from " << _getServerAddress_inlock() << endl; - HostAndPort h( toCheck ); + NodeDiff diff = _getHostDiff_inlock( hostList ); + set<string> added = diff.first; + set<int> removed = diff.second; + + assert( added.size() > 0 || removed.size() > 0 ); + changed = true; + + // Delete from the end so we don't invalidate as we delete, delete indices are ascending + for( set<int>::reverse_iterator i = removed.rbegin(), end = removed.rend(); i != end; ++i ){ + + log() << "erasing host " << _nodes[ *i ] << " from replica set " << this->_name << endl; + + _nodes.erase( _nodes.begin() + *i ); + } + + // Add new nodes + for( set<string>::iterator i = added.begin(), end = added.end(); i != end; ++i ){ + + log() << "trying to add new host " << *i << " to replica set " << this->_name << endl; + + // Connect to new node + HostAndPort h( *i ); DBClientConnection * newConn = new DBClientConnection( true, 0, 5.0 ); - string temp; - newConn->connect( h , temp ); - { - scoped_lock lk( _lock ); - if ( _find_inlock( toCheck ) >= 0 ) { - // we need this check inside the lock so there isn't thread contention on adding to vector - continue; + + string errmsg; + try{ + if( ! newConn->connect( h , errmsg ) ){ + throw DBException( errmsg, 15927 ); } - _nodes.push_back( Node( h , newConn ) ); + log() << "successfully connected to new host " << *i << " in replica set " << this->_name << endl; } - log() << "updated set (" << _name << ") to: " << getServerAddress() << endl; - changed = true; + catch( DBException& e ){ + warning() << "cannot connect to new host " << *i << " to replica set " << this->_name << causedBy( e ) << endl; + delete newConn; + newConn = NULL; + } + + _nodes.push_back( Node( h , newConn ) ); } + } @@ -348,7 +465,6 @@ namespace mongo { Timer t; BSONObj o; c->isMaster(isMaster, &o); - if ( o["setName"].type() != String || o["setName"].String() != _name ) { warning() << "node: " << c->getServerAddress() << " isn't a part of set: " << _name << " ismaster: " << o << endl; @@ -369,16 +485,20 @@ namespace mongo { log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << c->toString() << ' ' << o << endl; // add other nodes + BSONArrayBuilder b; if ( o["hosts"].type() == Array ) { if ( o["primary"].type() == String ) maybePrimary = o["primary"].String(); - _checkHosts(o["hosts"].Obj(), changed); + BSONObjIterator it( o["hosts"].Obj() ); + while( it.more() ) b.append( it.next() ); } if (o.hasField("passives") && o["passives"].type() == Array) { - _checkHosts(o["passives"].Obj(), changed); + BSONObjIterator it( o["passives"].Obj() ); + while( it.more() ) b.append( it.next() ); } + _checkHosts( b.arr(), changed); _checkStatus(c); diff --git a/client/dbclient_rs.h b/client/dbclient_rs.h index b6948a05b80..b68af29ac0d 100644 --- a/client/dbclient_rs.h +++ b/client/dbclient_rs.h @@ -24,6 +24,7 @@ namespace mongo { class ReplicaSetMonitor; typedef shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorPtr; + typedef pair<set<string>,set<int> > NodeDiff; /** * manages state about a replica set for client @@ -92,7 +93,7 @@ namespace mongo { string getName() const { return _name; } string getServerAddress() const; - + bool contains( const string& server ) const; void appendInfo( BSONObjBuilder& b ) const; @@ -132,6 +133,12 @@ namespace mongo { */ bool _checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose , int nodesOffset ); + string _getServerAddress_inlock() const; + + NodeDiff _getHostDiff_inlock( const BSONObj& hostList ); + bool _shouldChangeHosts( const BSONObj& hostList, bool inlock ); + + int _find( const string& server ) const ; int _find_inlock( const string& server ) const ; int _find( const HostAndPort& server ) const ; @@ -144,6 +151,7 @@ namespace mongo { Node( const HostAndPort& a , DBClientConnection* c ) : addr( a ) , conn(c) , ok(true) , ismaster(false), secondary( false ) , hidden( false ) , pingTimeMillis(0) { + ok = conn.get() == NULL; } bool okForSecondaryQueries() const { |