summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/dbclient_rs.cpp205
-rw-r--r--client/dbclient_rs.h44
2 files changed, 185 insertions, 64 deletions
diff --git a/client/dbclient_rs.cpp b/client/dbclient_rs.cpp
index b40f1981099..52d857a8e44 100644
--- a/client/dbclient_rs.cpp
+++ b/client/dbclient_rs.cpp
@@ -211,6 +211,7 @@ namespace mongo {
void ReplicaSetMonitor::notifyFailure( const HostAndPort& server ) {
scoped_lock lk( _lock );
+
if ( _master >= 0 && _master < (int)_nodes.size() ) {
if ( server == _nodes[_master].addr ) {
_nodes[_master].ok = false;
@@ -292,7 +293,7 @@ namespace mongo {
}
/**
- * notify the monitor that server has faild
+ * notify the monitor that server has failed
*/
void ReplicaSetMonitor::notifySlaveFailure( const HostAndPort& server ) {
int x = _find( server );
@@ -402,15 +403,15 @@ namespace mongo {
// Our host list may have changed while waiting for another thread in the meantime,
// so double-check here
- // TODO: Do we really need this much protection, this should be pretty rare and not triggered
- // from lots of threads, duping old behavior for safety
+ // TODO: Do we really need this much protection, this should be pretty rare and not
+ // triggered from lots of threads, duping old behavior for safety
if( ! _shouldChangeHosts( hostList, true ) ){
changed = false;
return;
}
- // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare and we
- // want to record our changes
+ // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare and
+ // we want to record our changes
log() << "changing hosts to " << hostList << " from " << _getServerAddress_inlock() << endl;
NodeDiff diff = _getHostDiff_inlock( hostList );
@@ -424,7 +425,6 @@ namespace mongo {
for( set<int>::reverse_iterator i = removed.rbegin(), end = removed.rend(); i != end; ++i ){
log() << "erasing host " << _nodes[ *i ] << " from replica set " << this->_name << endl;
-
_nodes.erase( _nodes.begin() + *i );
}
@@ -450,29 +450,52 @@ namespace mongo {
_nodes.push_back( Node( h , newConn ) );
}
-
}
-
- bool ReplicaSetMonitor::_checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose , int nodesOffset ) {
- assert( c );
+ bool ReplicaSetMonitor::_checkConnection( DBClientConnection* conn,
+ string& maybePrimary, bool verbose, int nodesOffset ) {
+
+ assert( conn );
+
scoped_lock lk( _checkConnectionLock );
bool isMaster = false;
bool changed = false;
+ bool errorOccured = false;
+
+ if ( nodesOffset >= 0 ){
+ scoped_lock lk( _lock );
+ if ( !_checkConnMatch_inlock( conn, nodesOffset )) {
+ /* Another thread modified _nodes -> invariant broken.
+ * This also implies that another thread just passed
+ * through here and refreshed _nodes. So no need to do
+ * duplicate work.
+ */
+ return false;
+ }
+ }
+
try {
Timer t;
BSONObj o;
- c->isMaster(isMaster, &o);
+ conn->isMaster( isMaster, &o );
+
if ( o["setName"].type() != String || o["setName"].String() != _name ) {
- warning() << "node: " << c->getServerAddress() << " isn't a part of set: " << _name
+ warning() << "node: " << conn->getServerAddress()
+ << " isn't a part of set: " << _name
<< " ismaster: " << o << endl;
- if ( nodesOffset >= 0 )
+
+ if ( nodesOffset >= 0 ) {
+ scoped_lock lk( _lock );
_nodes[nodesOffset].ok = false;
+ }
+
return false;
}
if ( nodesOffset >= 0 ) {
+ scoped_lock lk( _lock );
+
_nodes[nodesOffset].pingTimeMillis = t.millis();
_nodes[nodesOffset].hidden = o["hidden"].trueValue();
_nodes[nodesOffset].secondary = o["secondary"].trueValue();
@@ -481,7 +504,10 @@ namespace mongo {
_nodes[nodesOffset].lastIsMaster = o.copy();
}
- log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << c->toString() << ' ' << o << endl;
+ if ( verbose ) {
+ log() << "ReplicaSetMonitor::_checkConnection: " << conn->toString()
+ << ' ' << o << endl;
+ }
// add other nodes
BSONArrayBuilder b;
@@ -492,18 +518,27 @@ namespace mongo {
BSONObjIterator it( o["hosts"].Obj() );
while( it.more() ) b.append( it.next() );
}
+
if (o.hasField("passives") && o["passives"].type() == Array) {
BSONObjIterator it( o["passives"].Obj() );
while( it.more() ) b.append( it.next() );
}
_checkHosts( b.arr(), changed);
- _checkStatus(c);
+ _checkStatus( conn );
-
}
catch ( std::exception& e ) {
- log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: caught exception " << c->toString() << ' ' << e.what() << endl;
+ if ( verbose ) {
+ log() << "ReplicaSetMonitor::_checkConnection: caught exception "
+ << conn->toString() << ' ' << e.what() << endl;
+ }
+
+ errorOccured = true;
+ }
+
+ if ( errorOccured ) {
+ scoped_lock lk( _lock );
_nodes[nodesOffset].ok = false;
}
@@ -514,63 +549,114 @@ namespace mongo {
}
void ReplicaSetMonitor::_check( bool checkAllSecondaries ) {
-
- bool triedQuickCheck = false;
-
LOG(1) << "_check : " << getServerAddress() << endl;
int newMaster = -1;
+ shared_ptr<DBClientConnection> nodeConn;
for ( int retry = 0; retry < 2; retry++ ) {
- for ( unsigned i=0; i<_nodes.size(); i++ ) {
- shared_ptr<DBClientConnection> c;
+ bool triedQuickCheck = false;
+
+ if ( !checkAllSecondaries ) {
+ scoped_lock lk( _lock );
+ if ( _master >= 0 ) {
+ /* Nothing else to do since another thread already
+ * found the _master
+ */
+ return;
+ }
+ }
+
+ for ( unsigned i = 0; /* should not check while outside of lock! */ ; i++ ) {
{
scoped_lock lk( _lock );
- c = _nodes[i].conn;
+ if ( i >= _nodes.size() ) break;
+ nodeConn = _nodes[i].conn;
}
string maybePrimary;
- if ( _checkConnection( c.get() , maybePrimary , retry , i ) ) {
- _master = i;
- newMaster = i;
- if ( ! checkAllSecondaries )
- return;
+ if ( _checkConnection( nodeConn.get(), maybePrimary, retry, i ) ) {
+ scoped_lock lk( _lock );
+ if ( _checkConnMatch_inlock( nodeConn.get(), i )) {
+ _master = i;
+ newMaster = i;
+
+ if ( !checkAllSecondaries )
+ return;
+ }
+ else {
+ /*
+ * Somebody modified _nodes and most likely set the new
+ * _master, so try again.
+ */
+ break;
+ }
}
- if ( ! triedQuickCheck && maybePrimary.size() ) {
- int x = _find( maybePrimary );
- if ( x >= 0 ) {
+
+ if ( ! triedQuickCheck && ! maybePrimary.empty() ) {
+ int probablePrimaryIdx = -1;
+ shared_ptr<DBClientConnection> probablePrimaryConn;
+
+ {
+ scoped_lock lk( _lock );
+ probablePrimaryIdx = _find_inlock( maybePrimary );
+ probablePrimaryConn = _nodes[probablePrimaryIdx].conn;
+ }
+
+ if ( probablePrimaryIdx >= 0 ) {
triedQuickCheck = true;
+
string dummy;
- shared_ptr<DBClientConnection> testConn;
- {
+ if ( _checkConnection( probablePrimaryConn.get(), dummy,
+ false, probablePrimaryIdx ) ) {
+
scoped_lock lk( _lock );
- testConn = _nodes[x].conn;
- }
- if ( _checkConnection( testConn.get() , dummy , false , x ) ) {
- _master = x;
- newMaster = x;
- if ( ! checkAllSecondaries )
- return;
+
+ if ( _checkConnMatch_inlock( probablePrimaryConn.get(),
+ probablePrimaryIdx )) {
+
+ _master = probablePrimaryIdx;
+ newMaster = probablePrimaryIdx;
+
+ if ( ! checkAllSecondaries )
+ return;
+ }
+ else {
+ /*
+ * Somebody modified _nodes and most likely set the
+ * new _master, so try again.
+ */
+ break;
+ }
}
}
}
-
}
if ( newMaster >= 0 )
return;
- sleepsecs(1);
+ sleepsecs( 1 );
}
-
}
void ReplicaSetMonitor::check( bool checkAllSecondaries ) {
- // first see if the current master is fine
- if ( _master >= 0 ) {
+ shared_ptr<DBClientConnection> masterConn;
+
+ {
+ scoped_lock lk( _lock );
+
+ // first see if the current master is fine
+ if ( _master >= 0 ) {
+ masterConn = _nodes[_master].conn;
+ }
+ }
+
+ if ( masterConn.get() != NULL ) {
string temp;
- if ( _checkConnection( _nodes[_master].conn.get() , temp , false , _master ) ) {
+
+ if ( _checkConnection( masterConn.get(), temp, false, _master )) {
if ( ! checkAllSecondaries ) {
// current master is fine, so we're done
return;
@@ -588,21 +674,17 @@ namespace mongo {
}
int ReplicaSetMonitor::_find_inlock( const string& server ) const {
- for ( unsigned i=0; i<_nodes.size(); i++ )
- if ( _nodes[i].addr == server )
- return i;
- return -1;
- }
+ const size_t size = _nodes.size();
-
- int ReplicaSetMonitor::_find( const HostAndPort& server ) const {
- scoped_lock lk( _lock );
- for ( unsigned i=0; i<_nodes.size(); i++ )
- if ( _nodes[i].addr == server )
+ for ( unsigned i = 0; i < size; i++ ) {
+ if ( _nodes[i].addr == server ) {
return i;
+ }
+ }
+
return -1;
}
-
+
void ReplicaSetMonitor::appendInfo( BSONObjBuilder& b ) const {
scoped_lock lk( _lock );
BSONArrayBuilder hosts( b.subarrayStart( "hosts" ) );
@@ -622,6 +704,13 @@ namespace mongo {
b.append( "nextSlave" , _nextSlave );
}
+ bool ReplicaSetMonitor::_checkConnMatch_inlock( DBClientConnection* conn,
+ size_t nodeOffset ) const {
+
+ return ( nodeOffset < _nodes.size() &&
+ conn->getServerAddress() == _nodes[nodeOffset].conn->getServerAddress() );
+ }
+
mongo::mutex ReplicaSetMonitor::_setsLock( "ReplicaSetMonitor" );
map<string,ReplicaSetMonitorPtr> ReplicaSetMonitor::_sets;
@@ -645,6 +734,7 @@ namespace mongo {
// a master is selected. let's just make sure connection didn't die
if ( ! _master->isFailed() )
return _master.get();
+
_monitor->notifyFailure( _masterHost );
}
@@ -687,7 +777,6 @@ namespace mongo {
warning() << "cached auth failed for set: " << _monitor->getName() << " db: " << a.dbname << " user: " << a.username << endl;
}
-
}
DBClientConnection& DBClientReplicaSet::masterConn() {
diff --git a/client/dbclient_rs.h b/client/dbclient_rs.h
index b68af29ac0d..bf91f091822 100644
--- a/client/dbclient_rs.h
+++ b/client/dbclient_rs.h
@@ -107,6 +107,13 @@ namespace mongo {
*/
ReplicaSetMonitor( const string& name , const vector<HostAndPort>& servers );
+ /**
+ * Checks all connections from the host list and sets the current
+ * master.
+ *
+ * @param checkAllSecondaries if set to false, stop immediately when
+ * the master is found or when _master is not -1.
+ */
void _check( bool checkAllSecondaries );
/**
@@ -125,25 +132,50 @@ namespace mongo {
/**
* Updates host list.
- * @param c the connection to check
+ * Invariant: if nodesOffset is >= 0, _nodes[nodesOffset].conn should be
+ * equal to conn.
+ *
+ * @param conn the connection to check
* @param maybePrimary OUT
* @param verbose
* @param nodesOffset - offset into _nodes array, -1 for not in it
- * @return if the connection is good
+ *
+ * @return true if the connection is good or false if invariant
+ * is broken
*/
- bool _checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose , int nodesOffset );
+ bool _checkConnection( DBClientConnection* conn, string& maybePrimary,
+ bool verbose, int nodesOffset );
string _getServerAddress_inlock() const;
NodeDiff _getHostDiff_inlock( const BSONObj& hostList );
bool _shouldChangeHosts( const BSONObj& hostList, bool inlock );
-
+ /**
+ * @return the index to _nodes corresponding to the server address.
+ */
int _find( const string& server ) const ;
int _find_inlock( const string& server ) const ;
- int _find( const HostAndPort& server ) const ;
- mutable mongo::mutex _lock; // protects _nodes
+ /**
+ * Checks whether the given connection matches the connection stored in _nodes.
+ * Mainly used for sanity checking to confirm that nodeOffset still
+ * refers to the right connection after releasing and reacquiring
+ * a mutex.
+ */
+ bool _checkConnMatch_inlock( DBClientConnection* conn, size_t nodeOffset ) const;
+
+ // protects _nodes and indices pointing to it (_master & _nextSlave)
+ mutable mongo::mutex _lock;
+
+ /**
+ * "Synchronizes" the _checkConnection method. Should ideally be one mutex per
+ * connection object being used. The purpose of this lock is to make sure that
+ * the reply from the connection the lock holder got is the actual response
+ * to what it sent.
+ *
+ * Deadlock WARNING: never acquire this while holding _lock
+ */
mutable mongo::mutex _checkConnectionLock;
string _name;