diff options
-rw-r--r-- | s/shard.cpp | 73 | ||||
-rw-r--r-- | s/shard.h | 27 | ||||
-rw-r--r-- | s/shardconnection.cpp | 2 | ||||
-rw-r--r-- | s/writeback_listener.cpp | 41 | ||||
-rw-r--r-- | s/writeback_listener.h | 4 |
5 files changed, 113 insertions, 34 deletions
diff --git a/s/shard.cpp b/s/shard.cpp index a3d72c4a367..ff3d8c374b0 100644 --- a/s/shard.cpp +++ b/s/shard.cpp @@ -86,13 +86,7 @@ namespace mongo { } } - - bool isMember( const string& addr ) { - scoped_lock lk( _mutex ); - map<string,Shard>::iterator i = _lookup.find( addr ); - return i != _lookup.end(); - } - + const Shard& find( const string& ident ) { string mykey = ident; @@ -128,7 +122,7 @@ namespace mongo { if ( setAddr ) _lookup[addr] = s; } - + void remove( const string& name ) { scoped_lock lk( _mutex ); for ( map<string,Shard>::iterator i = _lookup.begin(); i!=_lookup.end(); ) { @@ -141,12 +135,12 @@ namespace mongo { } } } - - void getAllShards( vector<Shard>& all ) { + + void getAllShards( vector<Shard>& all ) const { scoped_lock lk( _mutex ); std::set<string> seen; - for ( map<string,Shard>::iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { - Shard s = i->second; + for ( map<string,Shard>::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { + const Shard& s = i->second; if ( s.getName() == "config" ) continue; if ( seen.count( s.getName() ) ) @@ -155,15 +149,46 @@ namespace mongo { all.push_back( s ); } } + + bool isAShardNode( const string& addr ) const { + scoped_lock lk( _mutex ); + + // check direct nods or set names + map<string,Shard>::const_iterator i = _lookup.find( addr ); + if ( i != _lookup.end() ) + return true; + + // check for set nodes + for ( map<string,Shard>::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { + const Shard& s = i->second; + if ( s.containsNode( addr ) ) + return true; + } + + return false; + } private: map<string,Shard> _lookup; - mongo::mutex _mutex; + mutable mongo::mutex _mutex; } staticShardInfo; + void Shard::_setAddr( const string& addr ) { + _addr = addr; + if ( _addr.size() ) { + _cs = ConnectionString( addr , ConnectionString::SET ); + if ( _cs.type() == ConnectionString::SET ) { + string x = _cs.getSetName(); + if ( x.size() == 0 ) + x = _name; + _rs = ReplicaSetMonitor::get( x , _cs.getServers() ); + } + } + } + void Shard::setAddress( const string& addr , bool authoritative ) { assert( _name.size() ); - _addr = addr; + _setAddr( addr ); if ( authoritative ) staticShardInfo.set( _name , _addr , true , false ); } @@ -172,17 +197,27 @@ namespace mongo { const Shard& s = staticShardInfo.find( ident ); uassert( 13128 , (string)"can't find shard for: " + ident , s.ok() ); _name = s._name; - _addr = s._addr; + _setAddr( s._addr ); _maxSize = s._maxSize; _isDraining = s._isDraining; } + bool Shard::containsNode( const string& node ) const { + if ( _addr == node ) + return true; + + if ( _rs && _rs->contains( node ) ) + return true; + + return false; + } + void Shard::getAllShards( vector<Shard>& all ) { staticShardInfo.getAllShards( all ); } - bool Shard::isAShard( const string& ident ) { - return staticShardInfo.isMember( ident ); + bool Shard::isAShardNode( const string& ident ) { + return staticShardInfo.isAShardNode( ident ); } void Shard::printShardInfo( ostream& out ) { @@ -216,10 +251,6 @@ namespace mongo { } - bool Shard::isMember( const string& addr ) { - return staticShardInfo.isMember( addr ); - } - void Shard::removeShard( const string& name ) { staticShardInfo.remove( name ); } diff --git a/s/shard.h b/s/shard.h index 97f0339bb25..5898a6fcca7 100644 --- a/s/shard.h +++ b/s/shard.h @@ -39,6 +39,7 @@ namespace mongo { Shard( const string& name , const string& addr, long long maxSize = 0 , bool isDraining = false ) : _name(name) , _addr( addr ) , _maxSize( maxSize ) , _isDraining( isDraining ) { + _setAddr( addr ); } Shard( const string& ident ) { @@ -46,11 +47,13 @@ namespace mongo { } Shard( const Shard& other ) - : _name( other._name ) , _addr( other._addr ) , _maxSize( other._maxSize ) , _isDraining( other._isDraining ) { + : _name( other._name ) , _addr( other._addr ) , _cs( other._cs ) , + _maxSize( other._maxSize ) , _isDraining( other._isDraining ) , _rs( other._rs ) { } Shard( const Shard* other ) - : _name( other->_name ) , _addr( other->_addr ), _maxSize( other->_maxSize ) , _isDraining( other->_isDraining ) { + : _name( other->_name ) , _addr( other->_addr ), _cs( other->_cs ) , + _maxSize( other->_maxSize ) , _isDraining( other->_isDraining ) , _rs( other->_rs ) { } static Shard make( const string& ident ) { @@ -59,8 +62,6 @@ namespace mongo { return s; } - static bool isAShard( const string& ident ); - /** * @param ident either name or address */ @@ -131,10 +132,17 @@ namespace mongo { BSONObj runCommand( const string& db , const BSONObj& cmd ) const ; ShardStatus getStatus() const ; + + /** + * mostly for replica set + * retursn true if node is the shard + * of if the replica set contains node + */ + bool containsNode( const string& node ) const; static void getAllShards( vector<Shard>& all ); static void printShardInfo( ostream& out ); - + /** * @parm current - shard where the chunk/database currently lives in * @return the currently emptiest shard, if best then current, or EMPTY @@ -145,15 +153,20 @@ namespace mongo { static void removeShard( const string& name ); - static bool isMember( const string& addr ); + static bool isAShardNode( const string& ident ); static Shard EMPTY; - + private: + + void _setAddr( const string& addr ); + string _name; string _addr; + ConnectionString _cs; long long _maxSize; // in MBytes, 0 is unlimited bool _isDraining; // shard is currently being removed + ReplicaSetMonitorPtr _rs; }; class ShardStatus { diff --git a/s/shardconnection.cpp b/s/shardconnection.cpp index 20f808811e9..78f77bc8962 100644 --- a/s/shardconnection.cpp +++ b/s/shardconnection.cpp @@ -129,7 +129,7 @@ namespace mongo { } for ( HostMap::iterator i=_hosts.begin(); i!=_hosts.end(); ++i ) { - if ( ! Shard::isAShard( i->first ) ) + if ( ! Shard::isAShardNode( i->first ) ) continue; Status* ss = i->second; assert( ss ); diff --git a/s/writeback_listener.cpp b/s/writeback_listener.cpp index eb2a5564e3a..d919d236e82 100644 --- a/s/writeback_listener.cpp +++ b/s/writeback_listener.cpp @@ -33,6 +33,7 @@ namespace mongo { map<string,WriteBackListener*> WriteBackListener::_cache; + set<string> WriteBackListener::_seenSets; mongo::mutex WriteBackListener::_cacheLock("WriteBackListener"); map<ConnectionId,WriteBackListener::WBStatus> WriteBackListener::_seenWritebacks; @@ -44,11 +45,38 @@ namespace mongo { /* static */ void WriteBackListener::init( DBClientBase& conn ) { + + if ( conn.type() != ConnectionString::SET ) { + init( conn.getServerAddress() ); + return; + } + + + { + scoped_lock lk( _cacheLock ); + if ( _seenSets.count( conn.getServerAddress() ) ) + return; + } + + // we want to do writebacks on all rs nodes + string errmsg; + ConnectionString cs = ConnectionString::parse( conn.getServerAddress() , errmsg ); + uassert( 13641 , str::stream() << "can't parse host [" << conn.getServerAddress() << "]" , cs.isValid() ); + + vector<HostAndPort> hosts = cs.getServers(); + + for ( unsigned i=0; i<hosts.size(); i++ ) + init( hosts[i].toString() ); + + } + + /* static */ + void WriteBackListener::init( const string& host ) { scoped_lock lk( _cacheLock ); - WriteBackListener*& l = _cache[conn.getServerAddress()]; + WriteBackListener*& l = _cache[host]; if ( l ) return; - l = new WriteBackListener( conn.getServerAddress() ); + l = new WriteBackListener( host ); l->go(); } @@ -79,8 +107,13 @@ namespace mongo { void WriteBackListener::run() { int secsToSleep = 0; - while ( ! inShutdown() && Shard::isMember( _addr ) ) { - + while ( ! inShutdown() ) { + + if ( ! Shard::isAShardNode( _addr ) ) { + log(1) << _addr << " is not a shard node" << endl; + sleepsecs( 60 ); + continue; + } try { ScopedDbConnection conn( _addr ); diff --git a/s/writeback_listener.h b/s/writeback_listener.h index 5bd08f2a2b4..73359999b13 100644 --- a/s/writeback_listener.h +++ b/s/writeback_listener.h @@ -36,6 +36,7 @@ namespace mongo { class WriteBackListener : public BackgroundJob { public: static void init( DBClientBase& conn ); + static void init( const string& host ); static BSONObj waitFor( ConnectionId connectionId, const OID& oid ); @@ -47,9 +48,10 @@ namespace mongo { private: string _addr; - + static mongo::mutex _cacheLock; // protects _cache static map<string,WriteBackListener*> _cache; // server to listener + static set<string> _seenSets; // cache of set urls we've seen - note this is ever expanding for order, case, changes struct WBStatus { OID id; |