// shard.cpp /** * Copyright (C) 2008 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #include "mongo/pch.h" #include "mongo/s/shard.h" #include #include #include #include "mongo/client/replica_set_monitor.h" #include "mongo/client/dbclientcursor.h" #include "mongo/db/audit.h" #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_manager_global.h" #include "mongo/db/auth/privilege.h" #include "mongo/db/auth/security_key.h" #include "mongo/db/commands.h" #include "mongo/db/jsobj.h" #include "mongo/s/client_info.h" #include "mongo/s/config.h" #include "mongo/s/request.h" #include "mongo/s/type_shard.h" #include "mongo/s/version_manager.h" namespace mongo { static bool initWireVersion( DBClientBase* conn, std::string* errMsg ) { BSONObj response; if ( !conn->runCommand( "admin", BSON("isMaster" << 1), response )) { *errMsg = str::stream() << "Failed to determine wire version " << "for internal connection: " << response; return false; } if ( response.hasField("minWireVersion") && response.hasField("maxWireVersion") ) { int minWireVersion = response["minWireVersion"].numberInt(); int maxWireVersion = response["maxWireVersion"].numberInt(); conn->setWireVersions( minWireVersion, maxWireVersion ); } return true; } class StaticShardInfo { public: StaticShardInfo() : _mutex("StaticShardInfo"), _rsMutex("RSNameMap") { } void reload() { list all; { ScopedDbConnection conn(configServer.getPrimary().getConnString(), 30); auto_ptr c = conn->query(ShardType::ConfigNS , Query()); massert( 13632 , "couldn't get updated shard list from config server" , c.get() ); int numShards = 0; while ( c->more() ) { all.push_back( c->next().getOwned() ); ++numShards; } LOG( 1 ) << "found " << numShards << " shards listed on config server(s): " << conn.get()->toString() << endl; conn.done(); } scoped_lock lk( _mutex ); // We use the _lookup table for all shards and for the primary config DB. The config DB info, // however, does not come from the ShardNS::shard. So when cleaning the _lookup table we leave // the config state intact. The rationale is that this way we could drop shards that // were removed without reinitializing the config DB information. ShardMap::iterator i = _lookup.find( "config" ); if ( i != _lookup.end() ) { ShardPtr config = i->second; _lookup.clear(); _lookup[ "config" ] = config; } else { _lookup.clear(); } _rsLookup.clear(); for ( list::iterator i=all.begin(); i!=all.end(); ++i ) { BSONObj o = *i; string name = o[ ShardType::name() ].String(); string host = o[ ShardType::host() ].String(); long long maxSize = 0; BSONElement maxSizeElem = o[ ShardType::maxSize.name() ]; if ( ! maxSizeElem.eoo() ) { maxSize = maxSizeElem.numberLong(); } bool isDraining = false; BSONElement isDrainingElem = o[ ShardType::draining.name() ]; if ( ! isDrainingElem.eoo() ) { isDraining = isDrainingElem.Bool(); } ShardPtr s( new Shard( name , host , maxSize , isDraining ) ); if ( o[ ShardType::tags() ].type() == Array ) { vector v = o[ ShardType::tags() ].Array(); for ( unsigned j=0; jaddTag( v[j].String() ); } } _lookup[name] = s; _installHost( host , s ); } } ShardPtr findIfExists( const string& shardName ) { scoped_lock lk( _mutex ); ShardMap::iterator i = _lookup.find( shardName ); if ( i != _lookup.end() ) return i->second; return ShardPtr(); } ShardPtr find( const string& ident ) { string mykey = ident; { scoped_lock lk( _mutex ); ShardMap::iterator i = _lookup.find( mykey ); if ( i != _lookup.end() ) return i->second; } // not in our maps, re-load all reload(); scoped_lock lk( _mutex ); ShardMap::iterator i = _lookup.find( mykey ); massert( 13129 , (string)"can't find shard for: " + mykey , i != _lookup.end() ); return i->second; } // Lookup shard by replica set name. Returns Shard::EMTPY if the name can't be found. // Note: this doesn't refresh the table if the name isn't found, so it's possible that // a newly added shard/Replica Set may not be found. Shard lookupRSName( const string& name) { scoped_lock lk( _rsMutex ); ShardMap::iterator i = _rsLookup.find( name ); return (i == _rsLookup.end()) ? Shard::EMPTY : i->second.get(); } // Useful for ensuring our shard data will not be modified while we use it Shard findCopy( const string& ident ){ ShardPtr found = find( ident ); scoped_lock lk( _mutex ); massert( 13128 , (string)"can't find shard for: " + ident , found.get() ); return *found.get(); } void set( const string& name , const Shard& s , bool setName = true , bool setAddr = true ) { scoped_lock lk( _mutex ); ShardPtr ss( new Shard( s ) ); if ( setName ) _lookup[name] = ss; if ( setAddr ) _installHost( s.getConnString() , ss ); } void _installHost( const string& host , const ShardPtr& s ) { _lookup[host] = s; const ConnectionString& cs = s->getAddress(); if ( cs.type() == ConnectionString::SET ) { if ( cs.getSetName().size() ) { scoped_lock lk( _rsMutex); _rsLookup[ cs.getSetName() ] = s; } vector servers = cs.getServers(); for ( unsigned i=0; isecond; if ( s->getName() == name ) { _lookup.erase(i++); } else { ++i; } } for ( ShardMap::iterator i = _rsLookup.begin(); i!=_rsLookup.end(); ) { ShardPtr s = i->second; if ( s->getName() == name ) { _rsLookup.erase(i++); } else { ++i; } } } void getAllShards( vector& all ) const { scoped_lock lk( _mutex ); std::set seen; for ( ShardMap::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { const ShardPtr& s = i->second; if ( s->getName() == "config" ) continue; if ( seen.count( s->getName() ) ) continue; seen.insert( s->getName() ); all.push_back( s ); } } void getAllShards( vector& all ) const { scoped_lock lk( _mutex ); std::set seen; for ( ShardMap::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { const ShardPtr& s = i->second; if ( s->getName() == "config" ) continue; if ( seen.count( s->getName() ) ) continue; seen.insert( s->getName() ); all.push_back( *s ); } } bool isAShardNode( const string& addr ) const { scoped_lock lk( _mutex ); // check direct nods or set names ShardMap::const_iterator i = _lookup.find( addr ); if ( i != _lookup.end() ) return true; // check for set nodes for ( ShardMap::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { if ( i->first == "config" ) continue; if ( i->second->containsNode( addr ) ) return true; } return false; } bool getShardMap( BSONObjBuilder& result , string& errmsg ) const { scoped_lock lk( _mutex ); BSONObjBuilder b( _lookup.size() + 50 ); for ( ShardMap::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { b.append( i->first , i->second->getConnString() ); } result.append( "map" , b.obj() ); return true; } private: typedef map ShardMap; ShardMap _lookup; ShardMap _rsLookup; // Map from ReplSet name to shard mutable mongo::mutex _mutex; mutable mongo::mutex _rsMutex; } staticShardInfo; class CmdGetShardMap : public Command { public: CmdGetShardMap() : Command( "getShardMap" ){} virtual void help( stringstream &help ) const { help<<"internal"; } virtual bool isWriteCommandForConfigServer() const { return false; } virtual bool slaveOk() const { return true; } virtual bool adminOnly() const { return true; } virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector* out) { ActionSet actions; actions.addAction(ActionType::getShardMap); out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } virtual bool run(TransactionExperiment* txn, const string&, mongo::BSONObj&, int, std::string& errmsg , mongo::BSONObjBuilder& result, bool) { return staticShardInfo.getShardMap( result , errmsg ); } } cmdGetShardMap; Shard Shard::findIfExists( const string& shardName ) { ShardPtr shard = staticShardInfo.findIfExists( shardName ); return shard ? *shard : Shard::EMPTY; } void Shard::_setAddr( const string& addr ) { _addr = addr; if ( !_addr.empty() ) { _cs = ConnectionString( addr , ConnectionString::SET ); } } void Shard::setAddress( const ConnectionString& cs) { verify( _name.size() ); _addr = cs.toString(); _cs = cs; staticShardInfo.set( _name , *this , true , false ); } void Shard::reset( const string& ident ) { *this = staticShardInfo.findCopy( ident ); } bool Shard::containsNode( const string& node ) const { if ( _addr == node ) return true; if ( _cs.type() == ConnectionString::SET ) { ReplicaSetMonitorPtr rs = ReplicaSetMonitor::get( _cs.getSetName(), true ); if (!rs) { // Possibly still yet to be initialized. See SERVER-8194. warning() << "Monitor not found for a known shard: " << _cs.getSetName() << endl; return false; } return rs->contains(HostAndPort(node)); } return false; } void Shard::getAllShards( vector& all ) { staticShardInfo.getAllShards( all ); } bool Shard::isAShardNode( const string& ident ) { return staticShardInfo.isAShardNode( ident ); } Shard Shard::lookupRSName( const string& name) { return staticShardInfo.lookupRSName(name); } void Shard::printShardInfo( ostream& out ) { vector all; staticShardInfo.getAllShards( all ); for ( unsigned i=0; irunCommand( db , cmd , res ); if ( ! ok ) { stringstream ss; ss << "runCommand (" << cmd << ") on shard (" << _name << ") failed : " << res; conn.done(); throw UserException( 13136 , ss.str() ); } res = res.getOwned(); conn.done(); return res; } ShardStatus Shard::getStatus() const { return ShardStatus( *this , runCommand( "admin" , BSON( "serverStatus" << 1 ) ) ); } void Shard::reloadShardInfo() { staticShardInfo.reload(); } void Shard::removeShard( const string& name ) { staticShardInfo.remove( name ); } Shard Shard::pick( const Shard& current ) { vector all; staticShardInfo.getAllShards( all ); if ( all.size() == 0 ) { staticShardInfo.reload(); staticShardInfo.getAllShards( all ); if ( all.size() == 0 ) return EMPTY; } // if current shard was provided, pick a different shard only if it is a better choice ShardStatus best = all[0].getStatus(); if ( current != EMPTY ) { best = current.getStatus(); } for ( size_t i=0; iisAuthEnabled()) { LOG(2) << "calling onCreate auth for " << conn->toString() << endl; bool result = authenticateInternalUser(conn); uassert( 15847, str::stream() << "can't authenticate to server " << conn->getServerAddress(), result ); } if ( _shardedConnections ) { if ( versionManager.isVersionableCB( conn )) { // We must initialize sharding on all connections, so that we get exceptions // if sharding is enabled on the collection. BSONObj result; bool ok = versionManager.initShardVersionCB( conn, result ); // assert that we actually successfully setup sharding uassert( 15907, str::stream() << "could not initialize sharding on connection " << conn->toString() << ( result["errmsg"].type() == String ? causedBy( result["errmsg"].String() ) : causedBy( (string)"unknown failure : " + result.toString() ) ), ok ); } else { // Initialize the wire protocol version of the connection to find out if we // can send write commands to this connection. string errMsg; if ( !initWireVersion( conn, &errMsg )) { uasserted( 17363, errMsg ); } } // For every DBClient created by mongos, add a hook that will capture the response from // commands, so that we can target the correct node when subsequent getLastError calls // are made by mongos. conn->setPostRunCommandHook(boost::bind(&saveGLEStats, _1, _2)); } // For every DBClient created by mongos, add a hook that will append impersonated users // to the end of every runCommand. mongod uses this information to produce auditing // records attributed to the proper authenticated user(s). conn->setRunCommandHook(boost::bind(&audit::appendImpersonatedUsers, _1)); } void ShardingConnectionHook::onDestroy( DBClientBase * conn ) { if( _shardedConnections && versionManager.isVersionableCB( conn ) ){ versionManager.resetShardVersionCB( conn ); } } }