// grid.cpp
/**
* Copyright (C) 2010 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#include "mongo/pch.h"
#include "mongo/s/grid.h"
#include "pcrecpp.h"
#include
#include "mongo/client/connpool.h"
#include "mongo/db/json.h"
#include "mongo/db/namespace_string.h"
#include "mongo/s/grid.h"
#include "mongo/s/mongos_options.h"
#include "mongo/s/shard.h"
#include "mongo/s/type_collection.h"
#include "mongo/s/type_database.h"
#include "mongo/s/type_settings.h"
#include "mongo/s/type_shard.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/startup_test.h"
#include "mongo/util/stringutils.h"
namespace mongo {
MONGO_FP_DECLARE(neverBalance);
DBConfigPtr Grid::getDBConfig( const StringData& ns , bool create , const string& shardNameHint ) {
string database = nsToDatabase( ns );
if ( database == "config" )
return configServerPtr;
uassert( 15918,
str::stream() << "invalid database name: " << database,
NamespaceString::validDBName( database ) );
scoped_lock l( _lock );
DBConfigPtr& dbConfig = _databases[database];
if( ! dbConfig ){
dbConfig.reset(new DBConfig( database ));
// Protect initial load from connectivity errors
bool loaded = false;
try {
loaded = dbConfig->load();
}
catch( DBException& e ){
e.addContext( "error loading initial database config information" );
warning() << e.what() << endl;
dbConfig.reset();
throw;
}
if( ! loaded ){
if( create ){
// Protect creation of initial db doc from connectivity errors
try{
// note here that cc->primary == 0.
log() << "couldn't find database [" << database << "] in config db" << endl;
{
// lets check case
ScopedDbConnection conn(configServer.modelServer(), 30);
BSONObjBuilder b;
b.appendRegex( "_id" , (string)"^" +
pcrecpp::RE::QuoteMeta( database ) + "$" , "i" );
BSONObj dbObj = conn->findOne( DatabaseType::ConfigNS , b.obj() );
conn.done();
// If our name is exactly the same as the name we want, try loading
// the database again.
if (!dbObj.isEmpty() &&
dbObj[DatabaseType::name()].String() == database)
{
if (dbConfig->load()) return dbConfig;
}
// TODO: This really shouldn't fall through, but without metadata
// management there's no good way to make sure this works all the time
// when the database is getting rapidly created and dropped.
// For now, just do exactly what we used to do.
if ( ! dbObj.isEmpty() ) {
uasserted( DatabaseDifferCaseCode, str::stream()
<< "can't have 2 databases that just differ on case "
<< " have: " << dbObj[DatabaseType::name()].String()
<< " want to add: " << database );
}
}
Shard primary;
if ( database == "admin" ) {
primary = configServer.getPrimary();
}
else if ( shardNameHint.empty() ) {
primary = Shard::pick();
}
else {
// use the shard name if provided
Shard shard;
shard.reset( shardNameHint );
primary = shard;
}
if ( primary.ok() ) {
dbConfig->setPrimary( primary.getName() ); // saves 'cc' to configDB
log() << "\t put [" << database << "] on: " << primary << endl;
}
else {
uasserted( 10185 , "can't find a shard to put new db on" );
}
}
catch( DBException& e ){
e.addContext( "error creating initial database config information" );
warning() << e.what() << endl;
dbConfig.reset();
throw;
}
}
else {
dbConfig.reset();
}
}
}
return dbConfig;
}
void Grid::removeDB( const std::string& database ) {
uassert( 10186 , "removeDB expects db name" , database.find( '.' ) == string::npos );
scoped_lock l( _lock );
_databases.erase( database );
}
void Grid::removeDBIfExists( const DBConfig& database ) {
scoped_lock l( _lock );
map::iterator it = _databases.find( database.getName() );
if( it != _databases.end() && it->second.get() == &database ){
_databases.erase( it );
log() << "erased database " << database.getName() << " from local registry" << endl;
}
else{
log() << database.getName() << "already erased from local registry" << endl;
}
}
bool Grid::allowLocalHost() const {
return _allowLocalShard;
}
void Grid::setAllowLocalHost( bool allow ) {
_allowLocalShard = allow;
}
bool Grid::addShard( string* name , const ConnectionString& servers , long long maxSize , string& errMsg ) {
// name can be NULL, so provide a dummy one here to avoid testing it elsewhere
string nameInternal;
if ( ! name ) {
name = &nameInternal;
}
ReplicaSetMonitorPtr rsMonitor;
// Check whether the host (or set) exists and run several sanity checks on this request.
// There are two set of sanity checks: making sure adding this particular shard is consistent
// with the replica set state (if it exists) and making sure this shards databases can be
// brought into the grid without conflict.
vector dbNames;
try {
ScopedDbConnection newShardConn(servers.toString());
newShardConn->getLastError();
if ( newShardConn->type() == ConnectionString::SYNC ) {
newShardConn.done();
errMsg = "can't use sync cluster as a shard. for replica set, have to use /,,...";
return false;
}
BSONObj resIsMongos;
bool ok = newShardConn->runCommand( "admin" , BSON( "isdbgrid" << 1 ) , resIsMongos );
// should return ok=0, cmd not found if it's a normal mongod
if ( ok ) {
errMsg = "can't add a mongos process as a shard";
newShardConn.done();
return false;
}
BSONObj resIsMaster;
ok = newShardConn->runCommand( "admin" , BSON( "isMaster" << 1 ) , resIsMaster );
if ( !ok ) {
ostringstream ss;
ss << "failed running isMaster: " << resIsMaster;
errMsg = ss.str();
newShardConn.done();
return false;
}
// if the shard has only one host, make sure it is not part of a replica set
string setName = resIsMaster["setName"].str();
string commandSetName = servers.getSetName();
if ( commandSetName.empty() && ! setName.empty() ) {
ostringstream ss;
ss << "host is part of set " << setName << ", use replica set url format /,,....";
errMsg = ss.str();
newShardConn.done();
return false;
}
if ( !commandSetName.empty() && setName.empty() ) {
ostringstream ss;
ss << "host did not return a set name, is the replica set still initializing? " << resIsMaster;
errMsg = ss.str();
newShardConn.done();
return false;
}
// if the shard is part of replica set, make sure it is the right one
if ( ! commandSetName.empty() && ( commandSetName != setName ) ) {
ostringstream ss;
ss << "host is part of a different set: " << setName;
errMsg = ss.str();
newShardConn.done();
return false;
}
if( setName.empty() ) {
// check this isn't a --configsvr
BSONObj res;
bool ok = newShardConn->runCommand("admin",BSON("replSetGetStatus"<<1),res);
ostringstream ss;
if( !ok && res["info"].type() == String && res["info"].String() == "configsvr" ) {
errMsg = "the specified mongod is a --configsvr and should thus not be a shard server";
newShardConn.done();
return false;
}
}
// if the shard is part of a replica set, make sure all the hosts mentioned in 'servers' are part of
// the set. It is fine if not all members of the set are present in 'servers'.
bool foundAll = true;
string offendingHost;
if ( ! commandSetName.empty() ) {
set hostSet;
BSONObjIterator iter( resIsMaster["hosts"].Obj() );
while ( iter.more() ) {
hostSet.insert( iter.next().String() ); // host:port
}
if ( resIsMaster["passives"].isABSONObj() ) {
BSONObjIterator piter( resIsMaster["passives"].Obj() );
while ( piter.more() ) {
hostSet.insert( piter.next().String() ); // host:port
}
}
if ( resIsMaster["arbiters"].isABSONObj() ) {
BSONObjIterator piter( resIsMaster["arbiters"].Obj() );
while ( piter.more() ) {
hostSet.insert( piter.next().String() ); // host:port
}
}
vector hosts = servers.getServers();
for ( size_t i = 0 ; i < hosts.size() ; i++ ) {
if (!hosts[i].hasPort()) {
hosts[i].setPort(ServerGlobalParams::DefaultDBPort);
}
string host = hosts[i].toString(); // host:port
if ( hostSet.find( host ) == hostSet.end() ) {
offendingHost = host;
foundAll = false;
break;
}
}
}
if ( ! foundAll ) {
ostringstream ss;
ss << "in seed list " << servers.toString() << ", host " << offendingHost
<< " does not belong to replica set " << setName;
errMsg = ss.str();
newShardConn.done();
return false;
}
// shard name defaults to the name of the replica set
if ( name->empty() && ! setName.empty() )
*name = setName;
// In order to be accepted as a new shard, that mongod must not have any database name that exists already
// in any other shards. If that test passes, the new shard's databases are going to be entered as
// non-sharded db's whose primary is the newly added shard.
BSONObj resListDB;
ok = newShardConn->runCommand( "admin" , BSON( "listDatabases" << 1 ) , resListDB );
if ( !ok ) {
ostringstream ss;
ss << "failed listing " << servers.toString() << "'s databases:" << resListDB;
errMsg = ss.str();
newShardConn.done();
return false;
}
BSONObjIterator i( resListDB["databases"].Obj() );
while ( i.more() ) {
BSONObj dbEntry = i.next().Obj();
const string& dbName = dbEntry["name"].String();
if ( _isSpecialLocalDB( dbName ) ) {
// 'local', 'admin', and 'config' are system DBs and should be excluded here
continue;
}
else {
dbNames.push_back( dbName );
}
}
if ( newShardConn->type() == ConnectionString::SET )
rsMonitor = ReplicaSetMonitor::get( setName );
newShardConn.done();
}
catch ( DBException& e ) {
if ( servers.type() == ConnectionString::SET ) {
ReplicaSetMonitor::remove( servers.getSetName() );
}
ostringstream ss;
ss << "couldn't connect to new shard ";
ss << e.what();
errMsg = ss.str();
return false;
}
// check that none of the existing shard candidate's db's exist elsewhere
for ( vector::const_iterator it = dbNames.begin(); it != dbNames.end(); ++it ) {
DBConfigPtr config = getDBConfig( *it , false );
if ( config.get() != NULL ) {
ostringstream ss;
ss << "can't add shard " << servers.toString() << " because a local database '" << *it;
ss << "' exists in another " << config->getPrimary().toString();
errMsg = ss.str();
return false;
}
}
// if a name for a shard wasn't provided, pick one.
if ( name->empty() && ! _getNewShardName( name ) ) {
errMsg = "error generating new shard name";
return false;
}
// build the ConfigDB shard document
BSONObjBuilder b;
b.append(ShardType::name(), *name);
b.append(ShardType::host(),
rsMonitor ? rsMonitor->getServerAddress() : servers.toString());
if (maxSize > 0) {
b.append(ShardType::maxSize(), maxSize);
}
BSONObj shardDoc = b.obj();
{
ScopedDbConnection conn(configServer.getPrimary().getConnString(), 30);
// check whether the set of hosts (or single host) is not an already a known shard
BSONObj old = conn->findOne(ShardType::ConfigNS,
BSON(ShardType::host(servers.toString())));
if ( ! old.isEmpty() ) {
errMsg = "host already used";
conn.done();
return false;
}
log() << "going to add shard: " << shardDoc << endl;
conn->insert(ShardType::ConfigNS , shardDoc);
errMsg = conn->getLastError();
if ( ! errMsg.empty() ) {
log() << "error adding shard: " << shardDoc << " err: " << errMsg << endl;
conn.done();
return false;
}
conn.done();
}
Shard::reloadShardInfo();
// add all databases of the new shard
for ( vector::const_iterator it = dbNames.begin(); it != dbNames.end(); ++it ) {
DBConfigPtr config = getDBConfig( *it , true , *name );
if ( ! config ) {
log() << "adding shard " << servers << " even though could not add database " << *it << endl;
}
}
// Record in changelog
BSONObjBuilder shardDetails;
shardDetails.append("name", *name);
shardDetails.append("host", servers.toString());
configServer.logChange("addShard", "", shardDetails.obj());
return true;
}
bool Grid::knowAboutShard( const string& name ) const {
ScopedDbConnection conn(configServer.getPrimary().getConnString(), 30);
BSONObj shard = conn->findOne(ShardType::ConfigNS, BSON(ShardType::host(name)));
conn.done();
return ! shard.isEmpty();
}
bool Grid::_getNewShardName( string* name ) const {
DEV verify( name );
bool ok = false;
int count = 0;
ScopedDbConnection conn(configServer.getPrimary().getConnString(), 30);
BSONObj o = conn->findOne(ShardType::ConfigNS,
Query(fromjson("{" + ShardType::name() + ": /^shard/}"))
.sort(BSON(ShardType::name() << -1 )));
if ( ! o.isEmpty() ) {
string last = o[ShardType::name()].String();
istringstream is( last.substr( 5 ) );
is >> count;
count++;
}
if (count < 9999) {
stringstream ss;
ss << "shard" << setfill('0') << setw(4) << count;
*name = ss.str();
ok = true;
}
conn.done();
return ok;
}
/*
* Returns whether balancing is enabled, with optional namespace "ns" parameter for balancing on a particular
* collection.
*/
bool Grid::shouldBalance( const string& ns, BSONObj* balancerDocOut ) const {
// Allow disabling the balancer for testing
if ( MONGO_FAIL_POINT(neverBalance) ) return false;
ScopedDbConnection conn(configServer.getPrimary().getConnString(), 30);
BSONObj balancerDoc;
BSONObj collDoc;
try {
// look for the stop balancer marker
balancerDoc = conn->findOne( SettingsType::ConfigNS,
BSON( SettingsType::key("balancer") ) );
if( ns.size() > 0 ) collDoc = conn->findOne(CollectionType::ConfigNS,
BSON( CollectionType::ns(ns)));
conn.done();
}
catch( DBException& e ){
conn.kill();
warning() << "could not determine whether balancer should be running, error getting"
"config data from " << conn.getHost() << causedBy( e ) << endl;
// if anything goes wrong, we shouldn't try balancing
return false;
}
if ( balancerDocOut )
*balancerDocOut = balancerDoc;
boost::posix_time::ptime now = boost::posix_time::second_clock::local_time();
if ( _balancerStopped( balancerDoc ) || ! _inBalancingWindow( balancerDoc , now ) ) {
return false;
}
if( collDoc["noBalance"].trueValue() ) return false;
return true;
}
bool Grid::_balancerStopped( const BSONObj& balancerDoc ) {
// check the 'stopped' marker maker
// if present, it is a simple bool
BSONElement stoppedElem = balancerDoc[SettingsType::balancerStopped()];
return stoppedElem.trueValue();
}
bool Grid::_inBalancingWindow( const BSONObj& balancerDoc , const boost::posix_time::ptime& now ) {
// check the 'activeWindow' marker
// if present, it is an interval during the day when the balancer should be active
// { start: "08:00" , stop: "19:30" }, strftime format is %H:%M
BSONElement windowElem = balancerDoc[SettingsType::balancerActiveWindow()];
if ( windowElem.eoo() ) {
return true;
}
// check if both 'start' and 'stop' are present
if ( ! windowElem.isABSONObj() ) {
warning() << "'activeWindow' format is { start: \"hh:mm\" , stop: ... }" << balancerDoc << endl;
return true;
}
BSONObj intervalDoc = windowElem.Obj();
const string start = intervalDoc["start"].str();
const string stop = intervalDoc["stop"].str();
if ( start.empty() || stop.empty() ) {
warning() << "must specify both start and end of balancing window: " << intervalDoc << endl;
return true;
}
// check that both 'start' and 'stop' are valid time-of-day
boost::posix_time::ptime startTime, stopTime;
if ( ! toPointInTime( start , &startTime ) || ! toPointInTime( stop , &stopTime ) ) {
warning() << "cannot parse active window (use hh:mm 24hs format): " << intervalDoc << endl;
return true;
}
LOG(1).stream() << "_inBalancingWindow: "
<< " now: " << now
<< " startTime: " << startTime
<< " stopTime: " << stopTime;
// allow balancing if during the activeWindow
// note that a window may be open during the night
if ( stopTime > startTime ) {
if ( ( now >= startTime ) && ( now <= stopTime ) ) {
return true;
}
}
else if ( startTime > stopTime ) {
if ( ( now >=startTime ) || ( now <= stopTime ) ) {
return true;
}
}
return false;
}
unsigned long long Grid::getNextOpTime() const {
ScopedDbConnection conn(configServer.getPrimary().getConnString(), 30);
BSONObj result;
massert( 10421,
"getoptime failed",
conn->simpleCommand( "admin" , &result , "getoptime" ) );
conn.done();
return result["optime"]._numberLong();
}
bool Grid::_isSpecialLocalDB( const string& dbName ) {
return ( dbName == "local" ) || ( dbName == "admin" ) || ( dbName == "config" );
}
void Grid::flushConfig() {
scoped_lock lk( _lock );
_databases.clear();
}
BSONObj Grid::getConfigSetting( const std::string& name ) const {
ScopedDbConnection conn(configServer.getPrimary().getConnString(), 30);
BSONObj result = conn->findOne( SettingsType::ConfigNS,
BSON( SettingsType::key(name) ) );
conn.done();
return result;
}
Grid grid;
// unit tests
class BalancingWindowUnitTest : public StartupTest {
public:
void run() {
if (!isMongos())
return;
// T0 < T1 < now < T2 < T3 and Error
const string T0 = "9:00";
const string T1 = "11:00";
boost::posix_time::ptime now( currentDate(), boost::posix_time::hours( 13 ) + boost::posix_time::minutes( 48 ) );
const string T2 = "17:00";
const string T3 = "21:30";
const string E = "28:35";
// closed in the past
BSONObj w1 = BSON( SettingsType::balancerActiveWindow( BSON( "start" << T0 <<
"stop" << T1 ) ) );
// not opened until the future
BSONObj w2 = BSON( SettingsType::balancerActiveWindow( BSON( "start" << T2 <<
"stop" << T3 ) ) );
// open now
BSONObj w3 = BSON( SettingsType::balancerActiveWindow( BSON( "start" << T1 <<
"stop" << T2 ) ) );
// open since last day
BSONObj w4 = BSON( SettingsType::balancerActiveWindow( BSON( "start" << T3 <<
"stop" << T2 ) ) );
verify( ! Grid::_inBalancingWindow( w1 , now ) );
verify( ! Grid::_inBalancingWindow( w2 , now ) );
verify( Grid::_inBalancingWindow( w3 , now ) );
verify( Grid::_inBalancingWindow( w4 , now ) );
// bad input should not stop the balancer
// empty window
BSONObj w5;
// missing stop
BSONObj w6 = BSON( SettingsType::balancerActiveWindow( BSON( "start" << 1 ) ) );
// missing start
BSONObj w7 = BSON( SettingsType::balancerActiveWindow( BSON( "stop" << 1 ) ) );
// active window marker missing
BSONObj w8 = BSON( "wrongMarker" << 1 << "start" << 1 << "stop" << 1 );
// garbage in window
BSONObj w9 = BSON( SettingsType::balancerActiveWindow( BSON( "start" << T3 <<
"stop" << E ) ) );
verify( Grid::_inBalancingWindow( w5 , now ) );
verify( Grid::_inBalancingWindow( w6 , now ) );
verify( Grid::_inBalancingWindow( w7 , now ) );
verify( Grid::_inBalancingWindow( w8 , now ) );
verify( Grid::_inBalancingWindow( w9 , now ) );
LOG(1) << "BalancingWidowObjTest passed" << endl;
}
} BalancingWindowObjTest;
}