// @file chunk.cpp
/**
* Copyright (C) 2008-2012 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#include "mongo/pch.h"
#include "mongo/s/chunk.h"
#include "mongo/client/connpool.h"
#include "mongo/client/dbclientcursor.h"
#include "mongo/db/query/lite_parsed_query.h"
#include "mongo/db/queryutil.h"
#include "mongo/platform/random.h"
#include "mongo/s/chunk_diff.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/client_info.h"
#include "mongo/s/config.h"
#include "mongo/s/config_server_checker_service.h"
#include "mongo/s/cursors.h"
#include "mongo/s/grid.h"
#include "mongo/s/strategy.h"
#include "mongo/s/type_collection.h"
#include "mongo/s/type_settings.h"
#include "mongo/util/concurrency/ticketholder.h"
#include "mongo/util/startup_test.h"
#include "mongo/util/timer.h"
namespace mongo {
inline bool allOfType(BSONType type, const BSONObj& o) {
BSONObjIterator it(o);
while(it.more()) {
if (it.next().type() != type)
return false;
}
return true;
}
// ------- Shard --------
int Chunk::MaxChunkSize = 1024 * 1024 * 64;
int Chunk::MaxObjectPerChunk = 250000;
// Can be overridden from command line
bool Chunk::ShouldAutoSplit = true;
Chunk::Chunk(const ChunkManager * manager, BSONObj from)
: _manager(manager), _lastmod(0, OID()), _dataWritten(mkDataWritten())
{
string ns = from.getStringField(ChunkType::ns().c_str());
_shard.reset(from.getStringField(ChunkType::shard().c_str()));
_lastmod = ChunkVersion::fromBSON(from[ChunkType::DEPRECATED_lastmod()]);
verify( _lastmod.isSet() );
_min = from.getObjectField(ChunkType::min().c_str()).getOwned();
_max = from.getObjectField(ChunkType::max().c_str()).getOwned();
_jumbo = from[ChunkType::jumbo()].trueValue();
uassert( 10170 , "Chunk needs a ns" , ! ns.empty() );
uassert( 13327 , "Chunk ns must match server ns" , ns == _manager->getns() );
uassert( 10171 , "Chunk needs a server" , _shard.ok() );
uassert( 10172 , "Chunk needs a min" , ! _min.isEmpty() );
uassert( 10173 , "Chunk needs a max" , ! _max.isEmpty() );
}
Chunk::Chunk(const ChunkManager * info , const BSONObj& min, const BSONObj& max, const Shard& shard, ChunkVersion lastmod)
: _manager(info), _min(min), _max(max), _shard(shard), _lastmod(lastmod), _jumbo(false), _dataWritten(mkDataWritten())
{}
int Chunk::mkDataWritten() {
PseudoRandom r(static_cast(time(0)));
return r.nextInt32( MaxChunkSize / ChunkManager::SplitHeuristics::splitTestFactor );
}
string Chunk::getns() const {
verify( _manager );
return _manager->getns();
}
bool Chunk::containsPoint( const BSONObj& point ) const {
return getMin().woCompare( point ) <= 0 && point.woCompare( getMax() ) < 0;
}
bool ChunkRange::containsPoint( const BSONObj& point ) const {
// same as Chunk method
return getMin().woCompare( point ) <= 0 && point.woCompare( getMax() ) < 0;
}
bool Chunk::minIsInf() const {
return _manager->getShardKey().globalMin().woCompare( getMin() ) == 0;
}
bool Chunk::maxIsInf() const {
return _manager->getShardKey().globalMax().woCompare( getMax() ) == 0;
}
BSONObj Chunk::_getExtremeKey( int sort ) const {
Query q;
if ( sort == 1 ) {
q.sort( _manager->getShardKey().key() );
}
else {
// need to invert shard key pattern to sort backwards
// TODO: make a helper in ShardKeyPattern?
BSONObj k = _manager->getShardKey().key();
BSONObjBuilder r;
BSONObjIterator i(k);
while( i.more() ) {
BSONElement e = i.next();
uassert( 10163 , "can only handle numbers here - which i think is correct" , e.isNumber() );
r.append( e.fieldName() , -1 * e.number() );
}
q.sort( r.obj() );
}
// find the extreme key
ScopedDbConnection conn(getShard().getConnString());
BSONObj end = conn->findOne(_manager->getns(), q);
conn.done();
if ( end.isEmpty() )
return BSONObj();
return _manager->getShardKey().extractKey( end );
}
void Chunk::pickMedianKey( BSONObj& medianKey ) const {
// Ask the mongod holding this chunk to figure out the split points.
ScopedDbConnection conn(getShard().getConnString());
BSONObj result;
BSONObjBuilder cmd;
cmd.append( "splitVector" , _manager->getns() );
cmd.append( "keyPattern" , _manager->getShardKey().key() );
cmd.append( "min" , getMin() );
cmd.append( "max" , getMax() );
cmd.appendBool( "force" , true );
BSONObj cmdObj = cmd.obj();
if ( ! conn->runCommand( "admin" , cmdObj , result )) {
conn.done();
ostringstream os;
os << "splitVector command (median key) failed: " << result;
uassert( 13503 , os.str() , 0 );
}
BSONObjIterator it( result.getObjectField( "splitKeys" ) );
if ( it.more() ) {
medianKey = it.next().Obj().getOwned();
}
conn.done();
}
void Chunk::pickSplitVector( vector& splitPoints , int chunkSize /* bytes */, int maxPoints, int maxObjs ) const {
// Ask the mongod holding this chunk to figure out the split points.
ScopedDbConnection conn(getShard().getConnString());
BSONObj result;
BSONObjBuilder cmd;
cmd.append( "splitVector" , _manager->getns() );
cmd.append( "keyPattern" , _manager->getShardKey().key() );
cmd.append( "min" , getMin() );
cmd.append( "max" , getMax() );
cmd.append( "maxChunkSizeBytes" , chunkSize );
cmd.append( "maxSplitPoints" , maxPoints );
cmd.append( "maxChunkObjects" , maxObjs );
BSONObj cmdObj = cmd.obj();
if ( ! conn->runCommand( "admin" , cmdObj , result )) {
conn.done();
ostringstream os;
os << "splitVector command failed: " << result;
uassert( 13345 , os.str() , 0 );
}
BSONObjIterator it( result.getObjectField( "splitKeys" ) );
while ( it.more() ) {
splitPoints.push_back( it.next().Obj().getOwned() );
}
conn.done();
}
BSONObj Chunk::singleSplit( bool force , BSONObj& res ) const {
vector splitPoint;
// if splitting is not obligatory we may return early if there are not enough data
// we cap the number of objects that would fall in the first half (before the split point)
// the rationale is we'll find a split point without traversing all the data
if ( ! force ) {
vector candidates;
const int maxPoints = 2;
pickSplitVector( candidates , getManager()->getCurrentDesiredChunkSize() , maxPoints , MaxObjectPerChunk );
if ( candidates.size() <= 1 ) {
// no split points means there isn't enough data to split on
// 1 split point means we have between half the chunk size to full chunk size
// so we shouldn't split
LOG(1) << "chunk not full enough to trigger auto-split " << ( candidates.size() == 0 ? "no split entry" : candidates[0].toString() ) << endl;
return BSONObj();
}
splitPoint.push_back( candidates.front() );
}
else {
// if forcing a split, use the chunk's median key
BSONObj medianKey;
pickMedianKey( medianKey );
if ( ! medianKey.isEmpty() )
splitPoint.push_back( medianKey );
}
// We assume that if the chunk being split is the first (or last) one on the collection,
// this chunk is likely to see more insertions. Instead of splitting mid-chunk, we use
// the very first (or last) key as a split point.
// This heuristic is skipped for "special" shard key patterns that are not likely to
// produce monotonically increasing or decreasing values (e.g. hashed shard keys).
// TODO: need better way to detect when shard keys vals are increasing/decreasing, and
// use that better method to determine whether to apply heuristic here.
if ( ! skey().isSpecial() ){
if ( minIsInf() ) {
splitPoint.clear();
BSONObj key = _getExtremeKey( 1 );
if ( ! key.isEmpty() ) {
splitPoint.push_back( key );
}
}
else if ( maxIsInf() ) {
splitPoint.clear();
BSONObj key = _getExtremeKey( -1 );
if ( ! key.isEmpty() ) {
splitPoint.push_back( key );
}
}
}
// Normally, we'd have a sound split point here if the chunk is not empty. It's also a good place to
// sanity check.
if ( splitPoint.empty() || _min == splitPoint.front() || _max == splitPoint.front() ) {
log() << "want to split chunk, but can't find split point chunk " << toString()
<< " got: " << ( splitPoint.empty() ? "" : splitPoint.front().toString() ) << endl;
return BSONObj();
}
if (multiSplit( splitPoint , res ))
return splitPoint.front();
else
return BSONObj();
}
bool Chunk::multiSplit( const vector& m , BSONObj& res ) const {
const size_t maxSplitPoints = 8192;
uassert( 10165 , "can't split as shard doesn't have a manager" , _manager );
uassert( 13332 , "need a split key to split chunk" , !m.empty() );
uassert( 13333 , "can't split a chunk in that many parts", m.size() < maxSplitPoints );
uassert( 13003 , "can't split a chunk with only one distinct value" , _min.woCompare(_max) );
ScopedDbConnection conn(getShard().getConnString());
BSONObjBuilder cmd;
cmd.append( "splitChunk" , _manager->getns() );
cmd.append( "keyPattern" , _manager->getShardKey().key() );
cmd.append( "min" , getMin() );
cmd.append( "max" , getMax() );
cmd.append( "from" , getShard().getName() );
cmd.append( "splitKeys" , m );
cmd.append( "shardId" , genID() );
cmd.append( "configdb" , configServer.modelServer() );
BSONObj cmdObj = cmd.obj();
if ( ! conn->runCommand( "admin" , cmdObj , res )) {
warning() << "splitChunk failed - cmd: " << cmdObj << " result: " << res << endl;
conn.done();
// Mark the minor version for *eventual* reload
_manager->markMinorForReload( this->_lastmod );
return false;
}
conn.done();
// force reload of config
_manager->reload();
return true;
}
bool Chunk::moveAndCommit(const Shard& to,
long long chunkSize /* bytes */,
bool secondaryThrottle,
bool waitForDelete,
int maxTimeMS,
BSONObj& res) const
{
uassert( 10167 , "can't move shard to its current location!" , getShard() != to );
log() << "moving chunk ns: " << _manager->getns() << " moving ( " << toString() << ") " << _shard.toString() << " -> " << to.toString() << endl;
Shard from = _shard;
ScopedDbConnection fromconn(from.getConnString());
bool worked = fromconn->runCommand( "admin" ,
BSON( "moveChunk" << _manager->getns() <<
"from" << from.getAddress().toString() <<
"to" << to.getAddress().toString() <<
// NEEDED FOR 2.0 COMPATIBILITY
"fromShard" << from.getName() <<
"toShard" << to.getName() <<
///////////////////////////////
"min" << _min <<
"max" << _max <<
"maxChunkSizeBytes" << chunkSize <<
"shardId" << genID() <<
"configdb" << configServer.modelServer() <<
"secondaryThrottle" << secondaryThrottle <<
"waitForDelete" << waitForDelete <<
LiteParsedQuery::cmdOptionMaxTimeMS << maxTimeMS
) ,
res);
fromconn.done();
LOG( worked ? 1 : 0 ) << "moveChunk result: " << res << endl;
// if succeeded, needs to reload to pick up the new location
// if failed, mongos may be stale
// reload is excessive here as the failure could be simply because collection metadata is taken
_manager->reload();
return worked;
}
bool Chunk::splitIfShould( long dataWritten ) const {
dassert( ShouldAutoSplit );
LastError::Disabled d( lastError.get() );
try {
_dataWritten += dataWritten;
int splitThreshold = getManager()->getCurrentDesiredChunkSize();
if ( minIsInf() || maxIsInf() ) {
splitThreshold = (int) ((double)splitThreshold * .9);
}
if ( _dataWritten < splitThreshold / ChunkManager::SplitHeuristics::splitTestFactor )
return false;
if ( ! getManager()->_splitHeuristics._splitTickets.tryAcquire() ) {
LOG(1) << "won't auto split because not enough tickets: " << getManager()->getns() << endl;
return false;
}
TicketHolderReleaser releaser( &(getManager()->_splitHeuristics._splitTickets) );
// this is a bit ugly
// we need it so that mongos blocks for the writes to actually be committed
// this does mean mongos has more back pressure than mongod alone
// since it nots 100% tcp queue bound
// this was implicit before since we did a splitVector on the same socket
ShardConnection::sync();
if ( !isConfigServerConsistent() ) {
RARELY warning() << "will not perform auto-split because "
<< "config servers are inconsistent" << endl;
return false;
}
LOG(1) << "about to initiate autosplit: " << *this << " dataWritten: " << _dataWritten << " splitThreshold: " << splitThreshold << endl;
BSONObj res;
BSONObj splitPoint = singleSplit( false /* does not force a split if not enough data */ , res );
if ( splitPoint.isEmpty() ) {
// singleSplit would have issued a message if we got here
_dataWritten = 0; // this means there wasn't enough data to split, so don't want to try again until considerable more data
return false;
}
if ( maxIsInf() || minIsInf() ) {
// we don't want to reset _dataWritten since we kind of want to check the other side right away
}
else {
_dataWritten = 0; // we're splitting, so should wait a bit
}
bool shouldBalance = grid.shouldBalance( _manager->getns() );
log() << "autosplitted " << _manager->getns() << " shard: " << toString()
<< " on: " << splitPoint << " (splitThreshold " << splitThreshold << ")"
#ifdef _DEBUG
<< " size: " << getPhysicalSize() // slow - but can be useful when debugging
#endif
<< ( res["shouldMigrate"].eoo() ? "" : (string)" (migrate suggested" +
( shouldBalance ? ")" : ", but no migrations allowed)" ) ) << endl;
BSONElement shouldMigrate = res["shouldMigrate"]; // not in mongod < 1.9.1 but that is ok
if ( ! shouldMigrate.eoo() && shouldBalance ){
BSONObj range = shouldMigrate.embeddedObject();
BSONObj min = range["min"].embeddedObject();
BSONObj max = range["max"].embeddedObject();
// reload sharding metadata before starting migration
Shard::reloadShardInfo();
Shard newLocation = Shard::pick( getShard() );
if ( getShard() == newLocation ) {
// if this is the best shard, then we shouldn't do anything (Shard::pick already logged our shard).
LOG(1) << "recently split chunk: " << range << " already in the best shard: " << getShard() << endl;
return true; // we did split even if we didn't migrate
}
ChunkManagerPtr cm = _manager->reload(false/*just reloaded in mulitsplit*/);
ChunkPtr toMove = cm->findIntersectingChunk(min);
if ( ! (toMove->getMin() == min && toMove->getMax() == max) ){
LOG(1).stream() << "recently split chunk: " << range << " modified before we could migrate " << toMove << endl;
return true;
}
log().stream() << "moving chunk (auto): " << toMove << " to: " << newLocation.toString() << endl;
BSONObj res;
massert( 10412 ,
str::stream() << "moveAndCommit failed: " << res ,
toMove->moveAndCommit( newLocation ,
MaxChunkSize ,
false , /* secondaryThrottle - small chunk, no need */
false, /* waitForDelete - small chunk, no need */
0, /* maxTimeMS - don't time out */
res ) );
// update our config
_manager->reload();
}
return true;
}
catch ( DBException& e ) {
// TODO: Make this better - there are lots of reasons a split could fail
// Random so that we don't sync up with other failed splits
_dataWritten = mkDataWritten();
// if the collection lock is taken (e.g. we're migrating), it is fine for the split to fail.
warning() << "could not autosplit collection " << _manager->getns() << causedBy( e ) << endl;
return false;
}
}
long Chunk::getPhysicalSize() const {
ScopedDbConnection conn(getShard().getConnString());
BSONObj result;
uassert( 10169 , "datasize failed!" , conn->runCommand( "admin" ,
BSON( "datasize" << _manager->getns()
<< "keyPattern" << _manager->getShardKey().key()
<< "min" << getMin()
<< "max" << getMax()
<< "maxSize" << ( MaxChunkSize + 1 )
<< "estimate" << true
) , result ) );
conn.done();
return (long)result["size"].number();
}
void Chunk::appendShortVersion( const char * name , BSONObjBuilder& b ) const {
BSONObjBuilder bb( b.subobjStart( name ) );
bb.append(ChunkType::min(), _min);
bb.append(ChunkType::max(), _max);
bb.done();
}
bool Chunk::operator==( const Chunk& s ) const {
return _min.woCompare( s._min ) == 0 && _max.woCompare( s._max ) == 0;
}
void Chunk::serialize(BSONObjBuilder& to,ChunkVersion myLastMod) {
to.append( "_id" , genID( _manager->getns() , _min ) );
if ( myLastMod.isSet() ) {
myLastMod.addToBSON(to, ChunkType::DEPRECATED_lastmod());
}
else if ( _lastmod.isSet() ) {
_lastmod.addToBSON(to, ChunkType::DEPRECATED_lastmod());
}
else {
verify(0);
}
to << ChunkType::ns(_manager->getns());
to << ChunkType::min(_min);
to << ChunkType::max(_max);
to << ChunkType::shard(_shard.getName());
}
string Chunk::genID( const string& ns , const BSONObj& o ) {
StringBuilder buf;
buf << ns << "-";
BSONObjIterator i(o);
while ( i.more() ) {
BSONElement e = i.next();
buf << e.fieldName() << "_" << e.toString(false, true);
}
return buf.str();
}
string Chunk::toString() const {
stringstream ss;
ss << ChunkType::ns() << ": " << _manager->getns() << ", "
<< ChunkType::shard() << ": " << _shard.toString() << ", "
<< ChunkType::DEPRECATED_lastmod() << ": " << _lastmod.toString() << ", "
<< ChunkType::min() << ": " << _min << ", "
<< ChunkType::max() << ": " << _max;
return ss.str();
}
ShardKeyPattern Chunk::skey() const {
return _manager->getShardKey();
}
void Chunk::markAsJumbo() const {
// set this first
// even if we can't set it in the db
// at least this mongos won't try and keep moving
_jumbo = true;
try {
ScopedDbConnection conn(configServer.modelServer(), 30);
conn->update(ChunkType::ConfigNS,
BSON(ChunkType::name(genID())),
BSON("$set" << BSON(ChunkType::jumbo(true))));
conn.done();
}
catch ( DBException& e ) {
warning() << "couldn't set jumbo for chunk: " << genID() << causedBy( e ) << endl;
}
}
void Chunk::refreshChunkSize() {
BSONObj o = grid.getConfigSetting("chunksize");
if ( o.isEmpty() ) {
return;
}
int csize = o[SettingsType::chunksize()].numberInt();
// validate chunksize before proceeding
if ( csize == 0 ) {
// setting was not modified; mark as such
log() << "warning: invalid chunksize (" << csize << ") ignored" << endl;
return;
}
LOG(1) << "Refreshing MaxChunkSize: " << csize << "MB" << endl;
if (csize != Chunk::MaxChunkSize/(1024*1024)) {
log() << "MaxChunkSize changing from " << Chunk::MaxChunkSize/(1024*1024) << "MB"
<< " to " << csize << "MB" << endl;
}
if ( !setMaxChunkSizeSizeMB( csize ) ) {
warning() << "invalid MaxChunkSize: " << csize << endl;
}
}
bool Chunk::setMaxChunkSizeSizeMB( int newMaxChunkSize ) {
if ( newMaxChunkSize < 1 )
return false;
if ( newMaxChunkSize > 1024 )
return false;
MaxChunkSize = newMaxChunkSize * 1024 * 1024;
return true;
}
// ------- ChunkManager --------
AtomicUInt ChunkManager::NextSequenceNumber = 1;
ChunkManager::ChunkManager( const string& ns, const ShardKeyPattern& pattern , bool unique ) :
_ns( ns ),
_key( pattern ),
_unique( unique ),
_chunkRanges(),
_mutex("ChunkManager"),
_sequenceNumber(++NextSequenceNumber)
{
//
// Sets up a chunk manager from new data
//
}
ChunkManager::ChunkManager( const BSONObj& collDoc ) :
// Need the ns early, to construct the lock
// TODO: Construct lock on demand? Not sure why we need to keep it around
_ns(collDoc[CollectionType::ns()].type() == String ?
collDoc[CollectionType::ns()].String() :
""),
_key(collDoc[CollectionType::keyPattern()].type() == Object ?
collDoc[CollectionType::keyPattern()].Obj().getOwned() :
BSONObj()),
_unique(collDoc[CollectionType::unique()].trueValue()),
_chunkRanges(),
_mutex("ChunkManager"),
// The shard versioning mechanism hinges on keeping track of the number of times we reloaded ChunkManager's.
// Increasing this number here will prompt checkShardVersion() to refresh the connection-level versions to
// the most up to date value.
_sequenceNumber(++NextSequenceNumber)
{
//
// Sets up a chunk manager from an existing sharded collection document
//
verify( _ns != "" );
verify( ! _key.key().isEmpty() );
_version = ChunkVersion::fromBSON( collDoc );
}
ChunkManager::ChunkManager( ChunkManagerPtr oldManager ) :
_ns( oldManager->getns() ),
_key( oldManager->getShardKey() ),
_unique( oldManager->isUnique() ),
_chunkRanges(),
_mutex("ChunkManager"),
_sequenceNumber(++NextSequenceNumber)
{
//
// Sets up a chunk manager based on an older manager
//
_oldManager = oldManager;
}
void ChunkManager::loadExistingRanges( const string& config ){
int tries = 3;
while (tries--) {
ChunkMap chunkMap;
set shards;
ShardVersionMap shardVersions;
Timer t;
bool success = _load( config, chunkMap, shards, shardVersions, _oldManager );
if( success ){
{
int ms = t.millis();
log() << "ChunkManager: time to load chunks for " << _ns << ": " << ms << "ms"
<< " sequenceNumber: " << _sequenceNumber
<< " version: " << _version.toString()
<< " based on: " <<
( _oldManager.get() ? _oldManager->getVersion().toString() : "(empty)" )
<< endl;
}
// TODO: Merge into diff code above, so we validate in one place
if (_isValid(chunkMap)) {
// These variables are const for thread-safety. Since the
// constructor can only be called from one thread, we don't have
// to worry about that here.
const_cast(_chunkMap).swap(chunkMap);
const_cast&>(_shards).swap(shards);
const_cast(_shardVersions).swap(shardVersions);
const_cast(_chunkRanges).reloadAll(_chunkMap);
// Once we load data, clear reference to old manager
_oldManager.reset();
return;
}
}
if (_chunkMap.size() < 10) {
_printChunks();
}
warning() << "ChunkManager loaded an invalid config for " << _ns
<< ", trying again" << endl;
sleepmillis(10 * (3-tries));
}
// this will abort construction so we should never have a reference to an invalid config
msgasserted(13282, "Couldn't load a valid config for " + _ns + " after 3 attempts. Please try again.");
}
/**
* This is an adapter so we can use config diffs - mongos and mongod do them slightly
* differently
*
* The mongos adapter here tracks all shards, and stores ranges by (max, Chunk) in the map.
*/
class CMConfigDiffTracker : public ConfigDiffTracker {
public:
CMConfigDiffTracker( ChunkManager* manager ) : _manager( manager ) {}
virtual bool isTracked( const BSONObj& chunkDoc ) const {
// Mongos tracks all shards
return true;
}
virtual BSONObj minFrom( const ChunkPtr& val ) const {
return val.get()->getMin();
}
virtual bool isMinKeyIndexed() const { return false; }
virtual pair rangeFor( const BSONObj& chunkDoc, const BSONObj& min, const BSONObj& max ) const {
ChunkPtr c( new Chunk( _manager, chunkDoc ) );
return make_pair( max, c );
}
virtual Shard shardFor( const string& name ) const {
return Shard::make( name );
}
virtual string nameFrom( const Shard& shard ) const {
return shard.getName();
}
ChunkManager* _manager;
};
bool ChunkManager::_load( const string& config,
ChunkMap& chunkMap,
set& shards,
ShardVersionMap& shardVersions,
ChunkManagerPtr oldManager)
{
// Reset the max version, but not the epoch, when we aren't loading from the oldManager
_version = ChunkVersion( 0, _version.epoch() );
set minorVersions;
// If we have a previous version of the ChunkManager to work from, use that info to reduce
// our config query
if( oldManager && oldManager->getVersion().isSet() ){
// Get the old max version
_version = oldManager->getVersion();
// Load a copy of the old versions
shardVersions = oldManager->_shardVersions;
// Load a copy of the chunk map, replacing the chunk manager with our own
const ChunkMap& oldChunkMap = oldManager->getChunkMap();
// Could be v.expensive
// TODO: If chunks were immutable and didn't reference the manager, we could do more
// interesting things here
for( ChunkMap::const_iterator it = oldChunkMap.begin(); it != oldChunkMap.end(); it++ ){
ChunkPtr oldC = it->second;
ChunkPtr c( new Chunk( this, oldC->getMin(),
oldC->getMax(),
oldC->getShard(),
oldC->getLastmod() ) );
c->setBytesWritten( oldC->getBytesWritten() );
chunkMap.insert( make_pair( oldC->getMax(), c ) );
}
// Also get any minor versions stored for reload
oldManager->getMarkedMinorVersions( minorVersions );
LOG(2) << "loading chunk manager for collection " << _ns
<< " using old chunk manager w/ version " << _version.toString()
<< " and " << oldChunkMap.size() << " chunks" << endl;
}
// Attach a diff tracker for the versioned chunk data
CMConfigDiffTracker differ( this );
differ.attach( _ns, chunkMap, _version, shardVersions );
// Diff tracker should *always* find at least one chunk if collection exists
int diffsApplied = differ.calculateConfigDiff( config, minorVersions );
if( diffsApplied > 0 ){
LOG(2) << "loaded " << diffsApplied << " chunks into new chunk manager for " << _ns
<< " with version " << _version << endl;
// Add all the shards we find to the shards set
for( ShardVersionMap::iterator it = shardVersions.begin(); it != shardVersions.end(); it++ ){
shards.insert( it->first );
}
return true;
}
else if( diffsApplied == 0 ){
// No chunks were found for the ns
warning() << "no chunks found when reloading " << _ns
<< ", previous version was " << _version << endl;
// Set all our data to empty
chunkMap.clear();
shardVersions.clear();
_version = ChunkVersion( 0, OID() );
return true;
}
else { // diffsApplied < 0
bool allInconsistent = differ.numValidDiffs() == 0;
if( allInconsistent ){
// All versions are different, this can be normal
warning() << "major change in chunk information found when reloading "
<< _ns << ", previous version was " << _version << endl;
}
else {
// Inconsistent load halfway through (due to yielding cursor during load)
// should be rare
warning() << "inconsistent chunks found when reloading "
<< _ns << ", previous version was " << _version
<< ", this should be rare" << endl;
}
// Set all our data to empty to be extra safe
chunkMap.clear();
shardVersions.clear();
_version = ChunkVersion( 0, OID() );
return allInconsistent;
}
}
ChunkManagerPtr ChunkManager::reload(bool force) const {
return grid.getDBConfig(getns())->getChunkManager(getns(), force);
}
void ChunkManager::markMinorForReload( ChunkVersion majorVersion ) const {
_splitHeuristics.markMinorForReload( getns(), majorVersion );
}
void ChunkManager::getMarkedMinorVersions( set& minorVersions ) const {
_splitHeuristics.getMarkedMinorVersions( minorVersions );
}
void ChunkManager::SplitHeuristics::markMinorForReload( const string& ns, ChunkVersion majorVersion ) {
// When we get a stale minor version, it means that some *other* mongos has just split a
// chunk into a number of smaller parts, so we shouldn't need reload the data needed to
// split it ourselves for awhile. Don't be very aggressive reloading just because of this,
// since reloads are expensive and disrupt operations.
// *** Multiple threads could indicate a stale minor version simultaneously ***
// TODO: Ideally, this could be a single-threaded background service doing splits
// TODO: Ideally, we wouldn't need to care that this is stale at all
bool forceReload = false;
{
scoped_lock lk( _staleMinorSetMutex );
// The major version of the chunks which need to be reloaded
_staleMinorSet.insert( majorVersion );
// Increment the number of requests for minor version data
_staleMinorCount++;
if( _staleMinorCount >= staleMinorReloadThreshold ){
_staleMinorCount = 0;
// There's maxParallelSplits coming down this codepath at once -
// block as little as possible.
forceReload = true;
}
// There is no guarantee that a minor version change will be processed here, in the
// case where the request comes in "late" and the version's already getting reloaded -
// it's a heuristic anyway though, and we'll see requests multiple times.
}
if( forceReload )
grid.getDBConfig( ns )->getChunkManagerIfExists( ns, true, true );
}
void ChunkManager::SplitHeuristics::getMarkedMinorVersions( set& minorVersions ) {
scoped_lock lk( _staleMinorSetMutex );
for( set::iterator it = _staleMinorSet.begin(); it != _staleMinorSet.end(); it++ ){
minorVersions.insert( *it );
}
}
bool ChunkManager::_isValid(const ChunkMap& chunkMap) {
#define ENSURE(x) do { if(!(x)) { log() << "ChunkManager::_isValid failed: " #x << endl; return false; } } while(0)
if (chunkMap.empty())
return true;
// Check endpoints
ENSURE(allOfType(MinKey, chunkMap.begin()->second->getMin()));
ENSURE(allOfType(MaxKey, boost::prior(chunkMap.end())->second->getMax()));
// Make sure there are no gaps or overlaps
for (ChunkMap::const_iterator it=boost::next(chunkMap.begin()), end=chunkMap.end(); it != end; ++it) {
ChunkMap::const_iterator last = boost::prior(it);
if (!(it->second->getMin() == last->second->getMax())) {
PRINT(it->second->toString());
PRINT(it->second->getMin());
PRINT(last->second->getMax());
}
ENSURE(it->second->getMin() == last->second->getMax());
}
return true;
#undef ENSURE
}
void ChunkManager::_printChunks() const {
for (ChunkMap::const_iterator it=_chunkMap.begin(), end=_chunkMap.end(); it != end; ++it) {
log() << *it->second << endl;
}
}
bool ChunkManager::hasShardKey( const BSONObj& obj ) const {
return _key.hasShardKey( obj );
}
void ChunkManager::calcInitSplitsAndShards( const Shard& primary,
const vector* initPoints,
const vector* initShards,
vector* splitPoints,
vector* shards ) const
{
verify( _chunkMap.size() == 0 );
unsigned long long numObjects = 0;
Chunk c(this, _key.globalMin(), _key.globalMax(), primary);
if ( !initPoints || !initPoints->size() ) {
// discover split points
{
// get stats to see if there is any data
ScopedDbConnection shardConn(primary.getConnString());
numObjects = shardConn->count( getns() );
shardConn.done();
}
if ( numObjects > 0 )
c.pickSplitVector( *splitPoints , Chunk::MaxChunkSize );
// since docs alread exists, must use primary shard
shards->push_back( primary );
} else {
// make sure points are unique and ordered
set orderedPts;
for ( unsigned i = 0; i < initPoints->size(); ++i ) {
BSONObj pt = (*initPoints)[i];
orderedPts.insert( pt );
}
for ( set::iterator it = orderedPts.begin(); it != orderedPts.end(); ++it ) {
splitPoints->push_back( *it );
}
if ( !initShards || !initShards->size() ) {
// If not specified, only use the primary shard (note that it's not safe for mongos
// to put initial chunks on other shards without the primary mongod knowing).
shards->push_back( primary );
} else {
std::copy( initShards->begin() , initShards->end() , std::back_inserter(*shards) );
}
}
}
void ChunkManager::createFirstChunks( const string& config,
const Shard& primary,
const vector* initPoints,
const vector* initShards )
{
// TODO distlock?
// TODO: Race condition if we shard the collection and insert data while we split across
// the non-primary shard.
vector splitPoints;
vector shards;
calcInitSplitsAndShards( primary, initPoints, initShards,
&splitPoints, &shards );
// this is the first chunk; start the versioning from scratch
ChunkVersion version;
version.incEpoch();
version.incMajor();
log() << "going to create " << splitPoints.size() + 1 << " chunk(s) for: " << _ns
<< " using new epoch " << version.epoch() << endl;
ScopedDbConnection conn(config, 30);
// Make sure we don't have any chunks that already exist here
unsigned long long existingChunks =
conn->count(ChunkType::ConfigNS, BSON(ChunkType::ns(_ns)));
uassert( 13449 , str::stream() << "collection " << _ns << " already sharded with "
<< existingChunks << " chunks", existingChunks == 0 );
for ( unsigned i=0; i<=splitPoints.size(); i++ ) {
BSONObj min = i == 0 ? _key.globalMin() : splitPoints[i-1];
BSONObj max = i < splitPoints.size() ? splitPoints[i] : _key.globalMax();
Chunk temp( this , min , max , shards[ i % shards.size() ], version );
BSONObjBuilder chunkBuilder;
temp.serialize( chunkBuilder );
BSONObj chunkObj = chunkBuilder.obj();
conn->update(ChunkType::ConfigNS,
QUERY(ChunkType::name(temp.genID())),
chunkObj,
true,
false );
version.incMinor();
}
string errmsg = conn->getLastError();
if ( errmsg.size() ) {
string ss = str::stream() << "creating first chunks failed. result: " << errmsg;
error() << ss << endl;
msgasserted( 15903 , ss );
}
conn.done();
_version = ChunkVersion( 0, version.epoch() );
}
ChunkPtr ChunkManager::findIntersectingChunk( const BSONObj& point ) const {
{
BSONObj foo;
ChunkPtr c;
{
ChunkMap::const_iterator it = _chunkMap.upper_bound( point );
if (it != _chunkMap.end()) {
foo = it->first;
c = it->second;
}
}
if ( c ) {
if ( c->containsPoint( point ) ){
dassert( c->containsPoint( point ) ); // doesn't use fast-path in extractKey
return c;
}
PRINT(foo);
PRINT(*c);
PRINT( point );
reload();
massert(13141, "Chunk map pointed to incorrect chunk", false);
}
}
msgasserted( 8070 ,
str::stream() << "couldn't find a chunk intersecting: " << point
<< " for ns: " << _ns
<< " at version: " << _version.toString()
<< ", number of chunks: " << _chunkMap.size() );
}
ChunkPtr ChunkManager::findChunkForDoc( const BSONObj& doc ) const {
BSONObj key = _key.extractKey( doc );
return findIntersectingChunk( key );
}
ChunkPtr ChunkManager::findChunkOnServer( const Shard& shard ) const {
for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ) {
ChunkPtr c = i->second;
if ( c->getShard() == shard )
return c;
}
return ChunkPtr();
}
void ChunkManager::getShardsForQuery( set& shards , const BSONObj& query ) const {
// TODO Determine if the third argument to OrRangeGenerator() is necessary, see SERVER-5165.
OrRangeGenerator org(_ns.c_str(), query, false);
const SpecialIndices special = org.getSpecial();
if (special.has("2d") || special.has("2dsphere")) {
BSONForEach(field, query) {
if (getGtLtOp(field) == BSONObj::opNEAR) {
uassert(13501, "use geoNear command rather than $near query", false);
// TODO: convert to geoNear rather than erroring out
}
// $within queries are fine
}
} else if (!special.empty()) {
uassert(13502, "unrecognized special query type: " + special.toString(), false);
}
do {
boost::scoped_ptr frsp (org.topFrsp());
// special case if most-significant field isn't in query
FieldRange range = frsp->shardKeyRange(_key.key().firstElementFieldName());
if ( range.universal() ) {
DEV PRINT(range.universal());
getShardsForRange( shards, _key.globalMin(), _key.globalMax() );
return;
}
if ( frsp->matchPossibleForSingleKeyFRS( _key.key() ) ) {
BoundList ranges = _key.keyBounds( frsp->getSingleKeyFRS() );
for ( BoundList::const_iterator it=ranges.begin(); it != ranges.end(); ++it ){
getShardsForRange( shards, it->first /*min*/, it->second /*max*/ );
// once we know we need to visit all shards no need to keep looping
if( shards.size() == _shards.size() ) return;
}
}
if (!org.orRangesExhausted())
org.popOrClauseSingleKey();
}
while (!org.orRangesExhausted());
// SERVER-4914 Some clients of getShardsForQuery() assume at least one shard will be
// returned. For now, we satisfy that assumption by adding a shard with no matches rather
// than return an empty set of shards.
if ( shards.empty() ) {
massert( 16068, "no chunk ranges available", !_chunkRanges.ranges().empty() );
shards.insert( _chunkRanges.ranges().begin()->second->getShard() );
}
}
void ChunkManager::getShardsForRange( set& shards,
const BSONObj& min,
const BSONObj& max ) const {
ChunkRangeMap::const_iterator it = _chunkRanges.upper_bound(min);
ChunkRangeMap::const_iterator end = _chunkRanges.upper_bound(max);
massert( 13507 , str::stream() << "no chunks found between bounds " << min << " and " << max , it != _chunkRanges.ranges().end() );
if( end != _chunkRanges.ranges().end() ) ++end;
for( ; it != end; ++it ){
shards.insert(it->second->getShard());
// once we know we need to visit all shards no need to keep looping
if (shards.size() == _shards.size()) break;
}
}
void ChunkManager::getAllShards( set& all ) const {
all.insert(_shards.begin(), _shards.end());
}
bool ChunkManager::compatibleWith( const ChunkManager& other, const Shard& shard ) const {
// Return true if the shard version is the same in the two chunk managers
// TODO: This doesn't need to be so strong, just major vs
return other.getVersion( shard ).isEquivalentTo( getVersion( shard ) );
}
bool ChunkManager::compatibleWith( const Chunk& other ) const {
// Do this first, b/c findIntersectingChunk asserts if the key isn't similar
if( ! this->_key.hasShardKey( other.getMin() ) ) return false;
// We assume here that chunks will have consistent fields in min/max
ChunkPtr myChunk = this->findIntersectingChunk( other.getMin() );
if( other.getMin() != myChunk->getMin() ) return false;
if( other.getMax() != myChunk->getMax() ) return false;
if( other.getShard() != myChunk->getShard() ) return false;
return true;
}
void ChunkManager::drop( ChunkManagerPtr me ) const {
scoped_lock lk( _mutex );
configServer.logChange( "dropCollection.start" , _ns , BSONObj() );
DistributedLock nsLock( ConnectionString( configServer.modelServer(),
ConnectionString::SYNC ),
_ns );
dist_lock_try dlk;
try{
dlk = dist_lock_try( &nsLock , "drop" );
}
catch( LockException& e ){
uassert( 14022, str::stream() << "Error locking distributed lock for chunk drop." << causedBy( e ), false);
}
uassert( 13331 , "collection's metadata is undergoing changes. Please try again." , dlk.got() );
uassert( 10174 , "config servers not all up" , configServer.allUp() );
set seen;
LOG(1) << "ChunkManager::drop : " << _ns << endl;
// lock all shards so no one can do a split/migrate
for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ) {
ChunkPtr c = i->second;
seen.insert( c->getShard() );
}
LOG(1) << "ChunkManager::drop : " << _ns << "\t all locked" << endl;
map errors;
// delete data from mongod
for ( set::iterator i=seen.begin(); i!=seen.end(); i++ ) {
ScopedDbConnection conn(i->getConnString());
BSONObj info;
if ( !conn->dropCollection( _ns, &info ) ) {
errors[ i->getConnString() ] = info;
}
conn.done();
}
if ( !errors.empty() ) {
stringstream ss;
ss << "Dropping collection failed on the following hosts: ";
for ( map::const_iterator it = errors.begin(); it != errors.end(); ) {
ss << it->first << ": " << it->second;
++it;
if ( it != errors.end() ) {
ss << ", ";
}
}
uasserted( 16338, ss.str() );
}
LOG(1) << "ChunkManager::drop : " << _ns << "\t removed shard data" << endl;
// remove chunk data
ScopedDbConnection conn(configServer.modelServer());
conn->remove(ChunkType::ConfigNS, BSON(ChunkType::ns(_ns)));
// Make sure we're dropped on the config
string error = conn->getLastError();
uassert( 17001, str::stream() << "could not drop chunks for " << _ns
<< causedBy( error ),
error.size() == 0 );
conn.done();
LOG(1) << "ChunkManager::drop : " << _ns << "\t removed chunk data" << endl;
for ( set::iterator i=seen.begin(); i!=seen.end(); i++ ) {
ScopedDbConnection conn(i->getConnString());
BSONObj res;
// this is horrible
// we need a special command for dropping on the d side
// this hack works for the moment
if ( ! setShardVersion( conn.conn(),
_ns,
ChunkVersion( 0, OID() ),
ChunkManagerPtr(),
true, res ) )
{
throw UserException( 8071 , str::stream() << "cleaning up after drop failed: " << res );
}
conn->simpleCommand( "admin", 0, "unsetSharding" );
conn.done();
}
LOG(1) << "ChunkManager::drop : " << _ns << "\t DONE" << endl;
configServer.logChange( "dropCollection" , _ns , BSONObj() );
}
ChunkVersion ChunkManager::getVersion( const StringData& shardName ) const {
// NOTE: The empty-address Shard constructor is needed to avoid triggering a reload
return getVersion( Shard( shardName.toString(), "" ) );
}
ChunkVersion ChunkManager::getVersion( const Shard& shard ) const {
ShardVersionMap::const_iterator i = _shardVersions.find( shard );
if ( i == _shardVersions.end() ) {
// Shards without explicitly tracked shard versions (meaning they have
// no chunks) always have a version of (0, 0, epoch). Note this is
// *different* from the dropped chunk version of (0, 0, OID(000...)).
// See s/chunk_version.h.
return ChunkVersion( 0, 0, _version.epoch() );
}
return i->second;
}
ChunkVersion ChunkManager::getVersion() const {
return _version;
}
void ChunkManager::getInfo( BSONObjBuilder& b ) const {
b.append(CollectionType::keyPattern(), _key.key());
b.appendBool(CollectionType::unique(), _unique);
_version.addEpochToBSON(b, CollectionType::DEPRECATED_lastmod());
}
string ChunkManager::toString() const {
stringstream ss;
ss << "ChunkManager: " << _ns << " key:" << _key.toString() << '\n';
for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ) {
const ChunkPtr c = i->second;
ss << "\t" << c->toString() << '\n';
}
return ss.str();
}
void ChunkRangeManager::assertValid() const {
if (_ranges.empty())
return;
try {
// No Nulls
for (ChunkRangeMap::const_iterator it=_ranges.begin(), end=_ranges.end(); it != end; ++it) {
verify(it->second);
}
// Check endpoints
verify(allOfType(MinKey, _ranges.begin()->second->getMin()));
verify(allOfType(MaxKey, boost::prior(_ranges.end())->second->getMax()));
// Make sure there are no gaps or overlaps
for (ChunkRangeMap::const_iterator it=boost::next(_ranges.begin()), end=_ranges.end(); it != end; ++it) {
ChunkRangeMap::const_iterator last = boost::prior(it);
verify(it->second->getMin() == last->second->getMax());
}
// Check Map keys
for (ChunkRangeMap::const_iterator it=_ranges.begin(), end=_ranges.end(); it != end; ++it) {
verify(it->first == it->second->getMax());
}
// Make sure we match the original chunks
const ChunkMap chunks = _ranges.begin()->second->getManager()->_chunkMap;
for ( ChunkMap::const_iterator i=chunks.begin(); i!=chunks.end(); ++i ) {
const ChunkPtr chunk = i->second;
ChunkRangeMap::const_iterator min = _ranges.upper_bound(chunk->getMin());
ChunkRangeMap::const_iterator max = _ranges.lower_bound(chunk->getMax());
verify(min != _ranges.end());
verify(max != _ranges.end());
verify(min == max);
verify(min->second->getShard() == chunk->getShard());
verify(min->second->containsPoint( chunk->getMin() ));
verify(min->second->containsPoint( chunk->getMax() ) || (min->second->getMax() == chunk->getMax()));
}
}
catch (...) {
error() << "\t invalid ChunkRangeMap! printing ranges:" << endl;
for (ChunkRangeMap::const_iterator it=_ranges.begin(), end=_ranges.end(); it != end; ++it)
cout << it->first << ": " << *it->second << endl;
throw;
}
}
void ChunkRangeManager::reloadAll(const ChunkMap& chunks) {
_ranges.clear();
_insertRange(chunks.begin(), chunks.end());
DEV assertValid();
}
void ChunkRangeManager::_insertRange(ChunkMap::const_iterator begin, const ChunkMap::const_iterator end) {
while (begin != end) {
ChunkMap::const_iterator first = begin;
Shard shard = first->second->getShard();
while (begin != end && (begin->second->getShard() == shard))
++begin;
shared_ptr cr (new ChunkRange(first, begin));
_ranges[cr->getMax()] = cr;
}
}
int ChunkManager::getCurrentDesiredChunkSize() const {
// split faster in early chunks helps spread out an initial load better
const int minChunkSize = 1 << 20; // 1 MBytes
int splitThreshold = Chunk::MaxChunkSize;
int nc = numChunks();
if ( nc <= 1 ) {
return 1024;
}
else if ( nc < 3 ) {
return minChunkSize / 2;
}
else if ( nc < 10 ) {
splitThreshold = max( splitThreshold / 4 , minChunkSize );
}
else if ( nc < 20 ) {
splitThreshold = max( splitThreshold / 2 , minChunkSize );
}
return splitThreshold;
}
/** This is for testing only, just setting up minimal basic defaults. */
ChunkManager::ChunkManager() :
_unique(),
_chunkRanges(),
_mutex( "ChunkManager" ),
_sequenceNumber()
{}
class ChunkObjUnitTest : public StartupTest {
public:
void runChunkVersion() {
vector all;
all.push_back( ChunkVersion(1,1, OID()) );
all.push_back( ChunkVersion(1,2, OID()) );
all.push_back( ChunkVersion(2,1, OID()) );
all.push_back( ChunkVersion(2,2, OID()) );
for ( unsigned i=0; igetSequenceNumber()) :
"")
<< endl;
return conn.runCommand("admin", cmd, result, 0);
}
} // namespace mongo