// shard.cpp /** * Copyright (C) 2008 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "pch.h" #include "shard.h" #include "config.h" #include "request.h" #include "client.h" #include "../db/commands.h" #include namespace mongo { typedef shared_ptr ShardPtr; class StaticShardInfo { public: StaticShardInfo() : _mutex("StaticShardInfo") { } void reload() { list all; { ScopedDbConnection conn( configServer.getPrimary() ); auto_ptr c = conn->query( ShardNS::shard , Query() ); massert( 13632 , "couldn't get updated shard list from config server" , c.get() ); while ( c->more() ) { all.push_back( c->next().getOwned() ); } conn.done(); } scoped_lock lk( _mutex ); // We use the _lookup table for all shards and for the primary config DB. The config DB info, // however, does not come from the ShardNS::shard. So when cleaning the _lookup table we leave // the config state intact. The rationale is that this way we could drop shards that // were removed without reinitializing the config DB information. ShardMap::iterator i = _lookup.find( "config" ); if ( i != _lookup.end() ) { ShardPtr config = i->second; _lookup.clear(); _lookup[ "config" ] = config; } else { _lookup.clear(); } for ( list::iterator i=all.begin(); i!=all.end(); ++i ) { BSONObj o = *i; string name = o["_id"].String(); string host = o["host"].String(); long long maxSize = 0; BSONElement maxSizeElem = o[ ShardFields::maxSize.name() ]; if ( ! maxSizeElem.eoo() ) { maxSize = maxSizeElem.numberLong(); } bool isDraining = false; BSONElement isDrainingElem = o[ ShardFields::draining.name() ]; if ( ! isDrainingElem.eoo() ) { isDraining = isDrainingElem.Bool(); } ShardPtr s( new Shard( name , host , maxSize , isDraining ) ); _lookup[name] = s; _installHost( host , s ); } } ShardPtr find( const string& ident ) { string mykey = ident; { // if its a replica set, just use set name size_t pos = mykey.find( '/' ); if ( pos != string::npos ) mykey = mykey.substr(0,pos); } { scoped_lock lk( _mutex ); ShardMap::iterator i = _lookup.find( mykey ); if ( i != _lookup.end() ) return i->second; } // not in our maps, re-load all reload(); scoped_lock lk( _mutex ); ShardMap::iterator i = _lookup.find( mykey ); massert( 13129 , (string)"can't find shard for: " + mykey , i != _lookup.end() ); return i->second; } // Useful for ensuring our shard data will not be modified while we use it Shard findCopy( const string& ident ){ ShardPtr found = find( ident ); scoped_lock lk( _mutex ); massert( 13128 , (string)"can't find shard for: " + ident , found.get() ); return *found.get(); } void set( const string& name , const Shard& s , bool setName = true , bool setAddr = true ) { scoped_lock lk( _mutex ); ShardPtr ss( new Shard( s ) ); if ( setName ) _lookup[name] = ss; if ( setAddr ) _installHost( s.getConnString() , ss ); } void _installHost( const string& host , const ShardPtr& s ) { _lookup[host] = s; const ConnectionString& cs = s->getAddress(); if ( cs.type() == ConnectionString::SET ) { if ( cs.getSetName().size() ) _lookup[ cs.getSetName() ] = s; vector servers = cs.getServers(); for ( unsigned i=0; isecond; if ( s->getName() == name ) { _lookup.erase(i++); } else { ++i; } } } void getAllShards( vector& all ) const { scoped_lock lk( _mutex ); std::set seen; for ( ShardMap::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { const ShardPtr& s = i->second; if ( s->getName() == "config" ) continue; if ( seen.count( s->getName() ) ) continue; seen.insert( s->getName() ); all.push_back( s ); } } void getAllShards( vector& all ) const { scoped_lock lk( _mutex ); std::set seen; for ( ShardMap::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { const ShardPtr& s = i->second; if ( s->getName() == "config" ) continue; if ( seen.count( s->getName() ) ) continue; seen.insert( s->getName() ); all.push_back( *s ); } } bool isAShardNode( const string& addr ) const { scoped_lock lk( _mutex ); // check direct nods or set names ShardMap::const_iterator i = _lookup.find( addr ); if ( i != _lookup.end() ) return true; // check for set nodes for ( ShardMap::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { if ( i->first == "config" ) continue; if ( i->second->containsNode( addr ) ) return true; } return false; } bool getShardMap( BSONObjBuilder& result , string& errmsg ) const { scoped_lock lk( _mutex ); BSONObjBuilder b( _lookup.size() + 50 ); for ( ShardMap::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { b.append( i->first , i->second->getConnString() ); } result.append( "map" , b.obj() ); return true; } private: typedef map ShardMap; ShardMap _lookup; mutable mongo::mutex _mutex; } staticShardInfo; class CmdGetShardMap : public Command { public: CmdGetShardMap() : Command( "getShardMap" ){} virtual void help( stringstream &help ) const { help<<"internal"; } virtual LockType locktype() const { return NONE; } virtual bool slaveOk() const { return true; } virtual bool adminOnly() const { return true; } virtual bool run(const string&, mongo::BSONObj&, int, std::string& errmsg , mongo::BSONObjBuilder& result, bool) { return staticShardInfo.getShardMap( result , errmsg ); } } cmdGetShardMap; void Shard::_setAddr( const string& addr ) { _addr = addr; if ( _addr.size() ) { _cs = ConnectionString( addr , ConnectionString::SET ); _rsInit(); } } void Shard::_rsInit() { if ( _cs.type() == ConnectionString::SET ) { string x = _cs.getSetName(); massert( 14807 , str::stream() << "no set name for shard: " << _name << " " << _cs.toString() , x.size() ); _rs = ReplicaSetMonitor::get( x , _cs.getServers() ); } } void Shard::setAddress( const ConnectionString& cs) { assert( _name.size() ); _addr = cs.toString(); _cs = cs; _rsInit(); staticShardInfo.set( _name , *this , true , false ); } void Shard::reset( const string& ident ) { *this = staticShardInfo.findCopy( ident ); _rs.reset(); _rsInit(); } bool Shard::containsNode( const string& node ) const { if ( _addr == node ) return true; if ( _rs && _rs->contains( node ) ) return true; return false; } void Shard::getAllShards( vector& all ) { staticShardInfo.getAllShards( all ); } bool Shard::isAShardNode( const string& ident ) { return staticShardInfo.isAShardNode( ident ); } void Shard::printShardInfo( ostream& out ) { vector all; staticShardInfo.getAllShards( all ); for ( unsigned i=0; irunCommand( db , cmd , res ); if ( ! ok ) { stringstream ss; ss << "runCommand (" << cmd << ") on shard (" << _name << ") failed : " << res; throw UserException( 13136 , ss.str() ); } res = res.getOwned(); conn.done(); return res; } ShardStatus Shard::getStatus() const { return ShardStatus( *this , runCommand( "admin" , BSON( "serverStatus" << 1 ) ) ); } void Shard::reloadShardInfo() { staticShardInfo.reload(); } void Shard::removeShard( const string& name ) { staticShardInfo.remove( name ); } Shard Shard::pick( const Shard& current ) { vector all; staticShardInfo.getAllShards( all ); if ( all.size() == 0 ) { staticShardInfo.reload(); staticShardInfo.getAllShards( all ); if ( all.size() == 0 ) return EMPTY; } // if current shard was provided, pick a different shard only if it is a better choice ShardStatus best = all[0].getStatus(); if ( current != EMPTY ) { best = current.getStatus(); } for ( size_t i=0; itoString() << endl; uassert( 15847, "can't authenticate to shard server", conn->auth("local", internalSecurity.user, internalSecurity.pwd, err, false)); } if ( _shardedConnections && isVersionableCB( conn ) ) { // We must initialize sharding on all connections, so that we get exceptions if sharding is enabled on // the collection. BSONObj result; bool ok = initShardVersionCB( *conn, result ); // assert that we actually successfully setup sharding uassert( 15907, str::stream() << "could not initialize sharding on connection " << (*conn).toString() << ( result["errmsg"].type() == String ? causedBy( result["errmsg"].String() ) : causedBy( (string)"unknown failure : " + result.toString() ) ), ok ); } } void ShardingConnectionHook::onDestroy( DBClientBase * conn ) { if( _shardedConnections && isVersionableCB( conn ) ){ resetShardVersionCB( conn ); } } }