summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--s/shard.cpp73
-rw-r--r--s/shard.h27
-rw-r--r--s/shardconnection.cpp2
-rw-r--r--s/writeback_listener.cpp41
-rw-r--r--s/writeback_listener.h4
5 files changed, 113 insertions, 34 deletions
diff --git a/s/shard.cpp b/s/shard.cpp
index a3d72c4a367..ff3d8c374b0 100644
--- a/s/shard.cpp
+++ b/s/shard.cpp
@@ -86,13 +86,7 @@ namespace mongo {
}
}
-
- bool isMember( const string& addr ) {
- scoped_lock lk( _mutex );
- map<string,Shard>::iterator i = _lookup.find( addr );
- return i != _lookup.end();
- }
-
+
const Shard& find( const string& ident ) {
string mykey = ident;
@@ -128,7 +122,7 @@ namespace mongo {
if ( setAddr )
_lookup[addr] = s;
}
-
+
void remove( const string& name ) {
scoped_lock lk( _mutex );
for ( map<string,Shard>::iterator i = _lookup.begin(); i!=_lookup.end(); ) {
@@ -141,12 +135,12 @@ namespace mongo {
}
}
}
-
- void getAllShards( vector<Shard>& all ) {
+
+ void getAllShards( vector<Shard>& all ) const {
scoped_lock lk( _mutex );
std::set<string> seen;
- for ( map<string,Shard>::iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) {
- Shard s = i->second;
+ for ( map<string,Shard>::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) {
+ const Shard& s = i->second;
if ( s.getName() == "config" )
continue;
if ( seen.count( s.getName() ) )
@@ -155,15 +149,46 @@ namespace mongo {
all.push_back( s );
}
}
+
+ bool isAShardNode( const string& addr ) const {
+ scoped_lock lk( _mutex );
+
+ // check direct nods or set names
+ map<string,Shard>::const_iterator i = _lookup.find( addr );
+ if ( i != _lookup.end() )
+ return true;
+
+ // check for set nodes
+ for ( map<string,Shard>::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) {
+ const Shard& s = i->second;
+ if ( s.containsNode( addr ) )
+ return true;
+ }
+
+ return false;
+ }
private:
map<string,Shard> _lookup;
- mongo::mutex _mutex;
+ mutable mongo::mutex _mutex;
} staticShardInfo;
+ void Shard::_setAddr( const string& addr ) {
+ _addr = addr;
+ if ( _addr.size() ) {
+ _cs = ConnectionString( addr , ConnectionString::SET );
+ if ( _cs.type() == ConnectionString::SET ) {
+ string x = _cs.getSetName();
+ if ( x.size() == 0 )
+ x = _name;
+ _rs = ReplicaSetMonitor::get( x , _cs.getServers() );
+ }
+ }
+ }
+
void Shard::setAddress( const string& addr , bool authoritative ) {
assert( _name.size() );
- _addr = addr;
+ _setAddr( addr );
if ( authoritative )
staticShardInfo.set( _name , _addr , true , false );
}
@@ -172,17 +197,27 @@ namespace mongo {
const Shard& s = staticShardInfo.find( ident );
uassert( 13128 , (string)"can't find shard for: " + ident , s.ok() );
_name = s._name;
- _addr = s._addr;
+ _setAddr( s._addr );
_maxSize = s._maxSize;
_isDraining = s._isDraining;
}
+ bool Shard::containsNode( const string& node ) const {
+ if ( _addr == node )
+ return true;
+
+ if ( _rs && _rs->contains( node ) )
+ return true;
+
+ return false;
+ }
+
void Shard::getAllShards( vector<Shard>& all ) {
staticShardInfo.getAllShards( all );
}
- bool Shard::isAShard( const string& ident ) {
- return staticShardInfo.isMember( ident );
+ bool Shard::isAShardNode( const string& ident ) {
+ return staticShardInfo.isAShardNode( ident );
}
void Shard::printShardInfo( ostream& out ) {
@@ -216,10 +251,6 @@ namespace mongo {
}
- bool Shard::isMember( const string& addr ) {
- return staticShardInfo.isMember( addr );
- }
-
void Shard::removeShard( const string& name ) {
staticShardInfo.remove( name );
}
diff --git a/s/shard.h b/s/shard.h
index 97f0339bb25..5898a6fcca7 100644
--- a/s/shard.h
+++ b/s/shard.h
@@ -39,6 +39,7 @@ namespace mongo {
Shard( const string& name , const string& addr, long long maxSize = 0 , bool isDraining = false )
: _name(name) , _addr( addr ) , _maxSize( maxSize ) , _isDraining( isDraining ) {
+ _setAddr( addr );
}
Shard( const string& ident ) {
@@ -46,11 +47,13 @@ namespace mongo {
}
Shard( const Shard& other )
- : _name( other._name ) , _addr( other._addr ) , _maxSize( other._maxSize ) , _isDraining( other._isDraining ) {
+ : _name( other._name ) , _addr( other._addr ) , _cs( other._cs ) ,
+ _maxSize( other._maxSize ) , _isDraining( other._isDraining ) , _rs( other._rs ) {
}
Shard( const Shard* other )
- : _name( other->_name ) , _addr( other->_addr ), _maxSize( other->_maxSize ) , _isDraining( other->_isDraining ) {
+ : _name( other->_name ) , _addr( other->_addr ), _cs( other->_cs ) ,
+ _maxSize( other->_maxSize ) , _isDraining( other->_isDraining ) , _rs( other->_rs ) {
}
static Shard make( const string& ident ) {
@@ -59,8 +62,6 @@ namespace mongo {
return s;
}
- static bool isAShard( const string& ident );
-
/**
* @param ident either name or address
*/
@@ -131,10 +132,17 @@ namespace mongo {
BSONObj runCommand( const string& db , const BSONObj& cmd ) const ;
ShardStatus getStatus() const ;
+
+ /**
+ * mostly for replica set
+ * retursn true if node is the shard
+ * of if the replica set contains node
+ */
+ bool containsNode( const string& node ) const;
static void getAllShards( vector<Shard>& all );
static void printShardInfo( ostream& out );
-
+
/**
* @parm current - shard where the chunk/database currently lives in
* @return the currently emptiest shard, if best then current, or EMPTY
@@ -145,15 +153,20 @@ namespace mongo {
static void removeShard( const string& name );
- static bool isMember( const string& addr );
+ static bool isAShardNode( const string& ident );
static Shard EMPTY;
-
+
private:
+
+ void _setAddr( const string& addr );
+
string _name;
string _addr;
+ ConnectionString _cs;
long long _maxSize; // in MBytes, 0 is unlimited
bool _isDraining; // shard is currently being removed
+ ReplicaSetMonitorPtr _rs;
};
class ShardStatus {
diff --git a/s/shardconnection.cpp b/s/shardconnection.cpp
index 20f808811e9..78f77bc8962 100644
--- a/s/shardconnection.cpp
+++ b/s/shardconnection.cpp
@@ -129,7 +129,7 @@ namespace mongo {
}
for ( HostMap::iterator i=_hosts.begin(); i!=_hosts.end(); ++i ) {
- if ( ! Shard::isAShard( i->first ) )
+ if ( ! Shard::isAShardNode( i->first ) )
continue;
Status* ss = i->second;
assert( ss );
diff --git a/s/writeback_listener.cpp b/s/writeback_listener.cpp
index eb2a5564e3a..d919d236e82 100644
--- a/s/writeback_listener.cpp
+++ b/s/writeback_listener.cpp
@@ -33,6 +33,7 @@
namespace mongo {
map<string,WriteBackListener*> WriteBackListener::_cache;
+ set<string> WriteBackListener::_seenSets;
mongo::mutex WriteBackListener::_cacheLock("WriteBackListener");
map<ConnectionId,WriteBackListener::WBStatus> WriteBackListener::_seenWritebacks;
@@ -44,11 +45,38 @@ namespace mongo {
/* static */
void WriteBackListener::init( DBClientBase& conn ) {
+
+ if ( conn.type() != ConnectionString::SET ) {
+ init( conn.getServerAddress() );
+ return;
+ }
+
+
+ {
+ scoped_lock lk( _cacheLock );
+ if ( _seenSets.count( conn.getServerAddress() ) )
+ return;
+ }
+
+ // we want to do writebacks on all rs nodes
+ string errmsg;
+ ConnectionString cs = ConnectionString::parse( conn.getServerAddress() , errmsg );
+ uassert( 13641 , str::stream() << "can't parse host [" << conn.getServerAddress() << "]" , cs.isValid() );
+
+ vector<HostAndPort> hosts = cs.getServers();
+
+ for ( unsigned i=0; i<hosts.size(); i++ )
+ init( hosts[i].toString() );
+
+ }
+
+ /* static */
+ void WriteBackListener::init( const string& host ) {
scoped_lock lk( _cacheLock );
- WriteBackListener*& l = _cache[conn.getServerAddress()];
+ WriteBackListener*& l = _cache[host];
if ( l )
return;
- l = new WriteBackListener( conn.getServerAddress() );
+ l = new WriteBackListener( host );
l->go();
}
@@ -79,8 +107,13 @@ namespace mongo {
void WriteBackListener::run() {
int secsToSleep = 0;
- while ( ! inShutdown() && Shard::isMember( _addr ) ) {
-
+ while ( ! inShutdown() ) {
+
+ if ( ! Shard::isAShardNode( _addr ) ) {
+ log(1) << _addr << " is not a shard node" << endl;
+ sleepsecs( 60 );
+ continue;
+ }
try {
ScopedDbConnection conn( _addr );
diff --git a/s/writeback_listener.h b/s/writeback_listener.h
index 5bd08f2a2b4..73359999b13 100644
--- a/s/writeback_listener.h
+++ b/s/writeback_listener.h
@@ -36,6 +36,7 @@ namespace mongo {
class WriteBackListener : public BackgroundJob {
public:
static void init( DBClientBase& conn );
+ static void init( const string& host );
static BSONObj waitFor( ConnectionId connectionId, const OID& oid );
@@ -47,9 +48,10 @@ namespace mongo {
private:
string _addr;
-
+
static mongo::mutex _cacheLock; // protects _cache
static map<string,WriteBackListener*> _cache; // server to listener
+ static set<string> _seenSets; // cache of set urls we've seen - note this is ever expanding for order, case, changes
struct WBStatus {
OID id;