summaryrefslogtreecommitdiff
path: root/src/mongo/s/chunk.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/chunk.cpp')
-rw-r--r--src/mongo/s/chunk.cpp1104
1 files changed, 1104 insertions, 0 deletions
diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp
new file mode 100644
index 00000000000..e0e7edee9bd
--- /dev/null
+++ b/src/mongo/s/chunk.cpp
@@ -0,0 +1,1104 @@
+// @file chunk.cpp
+
+/**
+ * Copyright (C) 2008 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "pch.h"
+
+#include "../client/connpool.h"
+#include "../db/querypattern.h"
+#include "../db/queryutil.h"
+#include "../util/unittest.h"
+#include "../util/timer.h"
+
+#include "chunk.h"
+#include "config.h"
+#include "cursors.h"
+#include "grid.h"
+#include "strategy.h"
+#include "client.h"
+
+namespace mongo {
+
+ inline bool allOfType(BSONType type, const BSONObj& o) {
+ BSONObjIterator it(o);
+ while(it.more()) {
+ if (it.next().type() != type)
+ return false;
+ }
+ return true;
+ }
+
+ // ------- Shard --------
+
+ string Chunk::chunkMetadataNS = "config.chunks";
+
+ int Chunk::MaxChunkSize = 1024 * 1024 * 64;
+ int Chunk::MaxObjectPerChunk = 250000;
+
+
+ Chunk::Chunk(const ChunkManager * manager, BSONObj from)
+ : _manager(manager), _lastmod(0), _dataWritten(mkDataWritten())
+ {
+ string ns = from.getStringField( "ns" );
+ _shard.reset( from.getStringField( "shard" ) );
+
+ _lastmod = from["lastmod"];
+ assert( _lastmod > 0 );
+
+ _min = from.getObjectField( "min" ).getOwned();
+ _max = from.getObjectField( "max" ).getOwned();
+
+ _jumbo = from["jumbo"].trueValue();
+
+ uassert( 10170 , "Chunk needs a ns" , ! ns.empty() );
+ uassert( 13327 , "Chunk ns must match server ns" , ns == _manager->getns() );
+
+ uassert( 10171 , "Chunk needs a server" , _shard.ok() );
+
+ uassert( 10172 , "Chunk needs a min" , ! _min.isEmpty() );
+ uassert( 10173 , "Chunk needs a max" , ! _max.isEmpty() );
+ }
+
+
+ Chunk::Chunk(const ChunkManager * info , const BSONObj& min, const BSONObj& max, const Shard& shard)
+ : _manager(info), _min(min), _max(max), _shard(shard), _lastmod(0), _jumbo(false), _dataWritten(mkDataWritten())
+ {}
+
+ long Chunk::mkDataWritten() {
+ return rand() % ( MaxChunkSize / 5 );
+ }
+
+ string Chunk::getns() const {
+ assert( _manager );
+ return _manager->getns();
+ }
+
+ bool Chunk::contains( const BSONObj& obj ) const {
+ return
+ _manager->getShardKey().compare( getMin() , obj ) <= 0 &&
+ _manager->getShardKey().compare( obj , getMax() ) < 0;
+ }
+
+ bool ChunkRange::contains(const BSONObj& obj) const {
+ // same as Chunk method
+ return
+ _manager->getShardKey().compare( getMin() , obj ) <= 0 &&
+ _manager->getShardKey().compare( obj , getMax() ) < 0;
+ }
+
+ bool Chunk::minIsInf() const {
+ return _manager->getShardKey().globalMin().woCompare( getMin() ) == 0;
+ }
+
+ bool Chunk::maxIsInf() const {
+ return _manager->getShardKey().globalMax().woCompare( getMax() ) == 0;
+ }
+
+ BSONObj Chunk::_getExtremeKey( int sort ) const {
+ ShardConnection conn( getShard().getConnString() , _manager->getns() );
+ Query q;
+ if ( sort == 1 ) {
+ q.sort( _manager->getShardKey().key() );
+ }
+ else {
+ // need to invert shard key pattern to sort backwards
+ // TODO: make a helper in ShardKeyPattern?
+
+ BSONObj k = _manager->getShardKey().key();
+ BSONObjBuilder r;
+
+ BSONObjIterator i(k);
+ while( i.more() ) {
+ BSONElement e = i.next();
+ uassert( 10163 , "can only handle numbers here - which i think is correct" , e.isNumber() );
+ r.append( e.fieldName() , -1 * e.number() );
+ }
+
+ q.sort( r.obj() );
+ }
+
+ // find the extreme key
+ BSONObj end = conn->findOne( _manager->getns() , q );
+ conn.done();
+
+ if ( end.isEmpty() )
+ return BSONObj();
+
+ return _manager->getShardKey().extractKey( end );
+ }
+
+ void Chunk::pickMedianKey( BSONObj& medianKey ) const {
+ // Ask the mongod holding this chunk to figure out the split points.
+ ScopedDbConnection conn( getShard().getConnString() );
+ BSONObj result;
+ BSONObjBuilder cmd;
+ cmd.append( "splitVector" , _manager->getns() );
+ cmd.append( "keyPattern" , _manager->getShardKey().key() );
+ cmd.append( "min" , getMin() );
+ cmd.append( "max" , getMax() );
+ cmd.appendBool( "force" , true );
+ BSONObj cmdObj = cmd.obj();
+
+ if ( ! conn->runCommand( "admin" , cmdObj , result )) {
+ conn.done();
+ ostringstream os;
+ os << "splitVector command (median key) failed: " << result;
+ uassert( 13503 , os.str() , 0 );
+ }
+
+ BSONObjIterator it( result.getObjectField( "splitKeys" ) );
+ if ( it.more() ) {
+ medianKey = it.next().Obj().getOwned();
+ }
+
+ conn.done();
+ }
+
+ void Chunk::pickSplitVector( vector<BSONObj>& splitPoints , int chunkSize /* bytes */, int maxPoints, int maxObjs ) const {
+ // Ask the mongod holding this chunk to figure out the split points.
+ ScopedDbConnection conn( getShard().getConnString() );
+ BSONObj result;
+ BSONObjBuilder cmd;
+ cmd.append( "splitVector" , _manager->getns() );
+ cmd.append( "keyPattern" , _manager->getShardKey().key() );
+ cmd.append( "min" , getMin() );
+ cmd.append( "max" , getMax() );
+ cmd.append( "maxChunkSizeBytes" , chunkSize );
+ cmd.append( "maxSplitPoints" , maxPoints );
+ cmd.append( "maxChunkObjects" , maxObjs );
+ BSONObj cmdObj = cmd.obj();
+
+ if ( ! conn->runCommand( "admin" , cmdObj , result )) {
+ conn.done();
+ ostringstream os;
+ os << "splitVector command failed: " << result;
+ uassert( 13345 , os.str() , 0 );
+ }
+
+ BSONObjIterator it( result.getObjectField( "splitKeys" ) );
+ while ( it.more() ) {
+ splitPoints.push_back( it.next().Obj().getOwned() );
+ }
+ conn.done();
+ }
+
+ BSONObj Chunk::singleSplit( bool force , BSONObj& res ) const {
+ vector<BSONObj> splitPoint;
+
+ // if splitting is not obligatory we may return early if there are not enough data
+ // we cap the number of objects that would fall in the first half (before the split point)
+ // the rationale is we'll find a split point without traversing all the data
+ if ( ! force ) {
+ vector<BSONObj> candidates;
+ const int maxPoints = 2;
+ pickSplitVector( candidates , getManager()->getCurrentDesiredChunkSize() , maxPoints , MaxObjectPerChunk );
+ if ( candidates.size() <= 1 ) {
+ // no split points means there isn't enough data to split on
+ // 1 split point means we have between half the chunk size to full chunk size
+ // so we shouldn't split
+ LOG(1) << "chunk not full enough to trigger auto-split " << ( candidates.size() == 0 ? "no split entry" : candidates[0].toString() ) << endl;
+ return BSONObj();
+ }
+
+ splitPoint.push_back( candidates.front() );
+
+ }
+ else {
+ // if forcing a split, use the chunk's median key
+ BSONObj medianKey;
+ pickMedianKey( medianKey );
+ if ( ! medianKey.isEmpty() )
+ splitPoint.push_back( medianKey );
+ }
+
+ // We assume that if the chunk being split is the first (or last) one on the collection, this chunk is
+ // likely to see more insertions. Instead of splitting mid-chunk, we use the very first (or last) key
+ // as a split point.
+ if ( minIsInf() ) {
+ splitPoint.clear();
+ BSONObj key = _getExtremeKey( 1 );
+ if ( ! key.isEmpty() ) {
+ splitPoint.push_back( key );
+ }
+
+ }
+ else if ( maxIsInf() ) {
+ splitPoint.clear();
+ BSONObj key = _getExtremeKey( -1 );
+ if ( ! key.isEmpty() ) {
+ splitPoint.push_back( key );
+ }
+ }
+
+ // Normally, we'd have a sound split point here if the chunk is not empty. It's also a good place to
+ // sanity check.
+ if ( splitPoint.empty() || _min == splitPoint.front() || _max == splitPoint.front() ) {
+ log() << "want to split chunk, but can't find split point chunk " << toString()
+ << " got: " << ( splitPoint.empty() ? "<empty>" : splitPoint.front().toString() ) << endl;
+ return BSONObj();
+ }
+
+ if (multiSplit( splitPoint , res ))
+ return splitPoint.front();
+ else
+ return BSONObj();
+ }
+
+ bool Chunk::multiSplit( const vector<BSONObj>& m , BSONObj& res ) const {
+ const size_t maxSplitPoints = 8192;
+
+ uassert( 10165 , "can't split as shard doesn't have a manager" , _manager );
+ uassert( 13332 , "need a split key to split chunk" , !m.empty() );
+ uassert( 13333 , "can't split a chunk in that many parts", m.size() < maxSplitPoints );
+ uassert( 13003 , "can't split a chunk with only one distinct value" , _min.woCompare(_max) );
+
+ ScopedDbConnection conn( getShard().getConnString() );
+
+ BSONObjBuilder cmd;
+ cmd.append( "splitChunk" , _manager->getns() );
+ cmd.append( "keyPattern" , _manager->getShardKey().key() );
+ cmd.append( "min" , getMin() );
+ cmd.append( "max" , getMax() );
+ cmd.append( "from" , getShard().getConnString() );
+ cmd.append( "splitKeys" , m );
+ cmd.append( "shardId" , genID() );
+ cmd.append( "configdb" , configServer.modelServer() );
+ BSONObj cmdObj = cmd.obj();
+
+ if ( ! conn->runCommand( "admin" , cmdObj , res )) {
+ warning() << "splitChunk failed - cmd: " << cmdObj << " result: " << res << endl;
+ conn.done();
+
+ // reloading won't strictly solve all problems, e.g. the collection's metadata lock can be taken
+ // but we issue here so that mongos may refresh without needing to be written/read against
+ _manager->reload();
+
+ return false;
+ }
+
+ conn.done();
+
+ // force reload of config
+ _manager->reload();
+
+ return true;
+ }
+
+ bool Chunk::moveAndCommit( const Shard& to , long long chunkSize /* bytes */, BSONObj& res ) const {
+ uassert( 10167 , "can't move shard to its current location!" , getShard() != to );
+
+ log() << "moving chunk ns: " << _manager->getns() << " moving ( " << toString() << ") " << _shard.toString() << " -> " << to.toString() << endl;
+
+ Shard from = _shard;
+
+ ScopedDbConnection fromconn( from);
+
+ bool worked = fromconn->runCommand( "admin" ,
+ BSON( "moveChunk" << _manager->getns() <<
+ "from" << from.getConnString() <<
+ "to" << to.getConnString() <<
+ "min" << _min <<
+ "max" << _max <<
+ "maxChunkSizeBytes" << chunkSize <<
+ "shardId" << genID() <<
+ "configdb" << configServer.modelServer()
+ ) ,
+ res
+ );
+
+ fromconn.done();
+
+ log( worked ) << "moveChunk result: " << res << endl;
+
+ // if succeeded, needs to reload to pick up the new location
+ // if failed, mongos may be stale
+ // reload is excessive here as the failure could be simply because collection metadata is taken
+ _manager->reload();
+
+ return worked;
+ }
+
+ bool Chunk::splitIfShould( long dataWritten ) const {
+ LastError::Disabled d( lastError.get() );
+
+ try {
+ _dataWritten += dataWritten;
+ int splitThreshold = getManager()->getCurrentDesiredChunkSize();
+ if ( minIsInf() || maxIsInf() ) {
+ splitThreshold = (int) ((double)splitThreshold * .9);
+ }
+
+ if ( _dataWritten < splitThreshold / 5 )
+ return false;
+
+ if ( ! getManager()->_splitTickets.tryAcquire() ) {
+ LOG(1) << "won't auto split becaue not enough tickets: " << getManager()->getns() << endl;
+ return false;
+ }
+ TicketHolderReleaser releaser( &getManager()->_splitTickets );
+
+ // this is a bit ugly
+ // we need it so that mongos blocks for the writes to actually be committed
+ // this does mean mongos has more back pressure than mongod alone
+ // since it nots 100% tcp queue bound
+ // this was implicit before since we did a splitVector on the same socket
+ ShardConnection::sync();
+
+ LOG(1) << "about to initiate autosplit: " << *this << " dataWritten: " << _dataWritten << " splitThreshold: " << splitThreshold << endl;
+
+ BSONObj res;
+ BSONObj splitPoint = singleSplit( false /* does not force a split if not enough data */ , res );
+ if ( splitPoint.isEmpty() ) {
+ // singleSplit would have issued a message if we got here
+ _dataWritten = 0; // this means there wasn't enough data to split, so don't want to try again until considerable more data
+ return false;
+ }
+
+ if ( maxIsInf() || minIsInf() ) {
+ // we don't want to reset _dataWritten since we kind of want to check the other side right away
+ }
+ else {
+ _dataWritten = 0; // we're splitting, so should wait a bit
+ }
+
+
+
+ log() << "autosplitted " << _manager->getns() << " shard: " << toString()
+ << " on: " << splitPoint << " (splitThreshold " << splitThreshold << ")"
+#ifdef _DEBUG
+ << " size: " << getPhysicalSize() // slow - but can be useful when debugging
+#endif
+ << ( res["shouldMigrate"].eoo() ? "" : " (migrate suggested)" ) << endl;
+
+ BSONElement shouldMigrate = res["shouldMigrate"]; // not in mongod < 1.9.1 but that is ok
+ if (!shouldMigrate.eoo() && grid.shouldBalance()){
+ BSONObj range = shouldMigrate.embeddedObject();
+ BSONObj min = range["min"].embeddedObject();
+ BSONObj max = range["max"].embeddedObject();
+
+ Shard newLocation = Shard::pick( getShard() );
+ if ( getShard() == newLocation ) {
+ // if this is the best shard, then we shouldn't do anything (Shard::pick already logged our shard).
+ LOG(1) << "recently split chunk: " << range << " already in the best shard: " << getShard() << endl;
+ return true; // we did split even if we didn't migrate
+ }
+
+ ChunkManagerPtr cm = _manager->reload(false/*just reloaded in mulitsplit*/);
+ ChunkPtr toMove = cm->findChunk(min);
+
+ if ( ! (toMove->getMin() == min && toMove->getMax() == max) ){
+ LOG(1) << "recently split chunk: " << range << " modified before we could migrate " << toMove << endl;
+ return true;
+ }
+
+ log() << "moving chunk (auto): " << toMove << " to: " << newLocation.toString() << endl;
+
+ BSONObj res;
+ massert( 10412 ,
+ str::stream() << "moveAndCommit failed: " << res ,
+ toMove->moveAndCommit( newLocation , MaxChunkSize , res ) );
+
+ // update our config
+ _manager->reload();
+ }
+
+ return true;
+
+ }
+ catch ( std::exception& e ) {
+ // if the collection lock is taken (e.g. we're migrating), it is fine for the split to fail.
+ warning() << "could have autosplit on collection: " << _manager->getns() << " but: " << e.what() << endl;
+ return false;
+ }
+ }
+
+ long Chunk::getPhysicalSize() const {
+ ScopedDbConnection conn( getShard().getConnString() );
+
+ BSONObj result;
+ uassert( 10169 , "datasize failed!" , conn->runCommand( "admin" ,
+ BSON( "datasize" << _manager->getns()
+ << "keyPattern" << _manager->getShardKey().key()
+ << "min" << getMin()
+ << "max" << getMax()
+ << "maxSize" << ( MaxChunkSize + 1 )
+ << "estimate" << true
+ ) , result ) );
+
+ conn.done();
+ return (long)result["size"].number();
+ }
+
+ void Chunk::appendShortVersion( const char * name , BSONObjBuilder& b ) const {
+ BSONObjBuilder bb( b.subobjStart( name ) );
+ bb.append( "min" , _min );
+ bb.append( "max" , _max );
+ bb.done();
+ }
+
+ bool Chunk::operator==( const Chunk& s ) const {
+ return
+ _manager->getShardKey().compare( _min , s._min ) == 0 &&
+ _manager->getShardKey().compare( _max , s._max ) == 0
+ ;
+ }
+
+ void Chunk::serialize(BSONObjBuilder& to,ShardChunkVersion myLastMod) {
+
+ to.append( "_id" , genID( _manager->getns() , _min ) );
+
+ if ( myLastMod.isSet() ) {
+ to.appendTimestamp( "lastmod" , myLastMod );
+ }
+ else if ( _lastmod.isSet() ) {
+ assert( _lastmod > 0 && _lastmod < 1000 );
+ to.appendTimestamp( "lastmod" , _lastmod );
+ }
+ else {
+ assert(0);
+ }
+
+ to << "ns" << _manager->getns();
+ to << "min" << _min;
+ to << "max" << _max;
+ to << "shard" << _shard.getName();
+ }
+
+ string Chunk::genID( const string& ns , const BSONObj& o ) {
+ StringBuilder buf( ns.size() + o.objsize() + 16 );
+ buf << ns << "-";
+
+ BSONObjIterator i(o);
+ while ( i.more() ) {
+ BSONElement e = i.next();
+ buf << e.fieldName() << "_" << e.toString(false, true);
+ }
+
+ return buf.str();
+ }
+
+ string Chunk::toString() const {
+ stringstream ss;
+ ss << "ns:" << _manager->getns() << " at: " << _shard.toString() << " lastmod: " << _lastmod.toString() << " min: " << _min << " max: " << _max;
+ return ss.str();
+ }
+
+ ShardKeyPattern Chunk::skey() const {
+ return _manager->getShardKey();
+ }
+
+ void Chunk::markAsJumbo() const {
+ // set this first
+ // even if we can't set it in the db
+ // at least this mongos won't try and keep moving
+ _jumbo = true;
+
+ try {
+ ScopedDbConnection conn( configServer.modelServer() );
+ conn->update( chunkMetadataNS , BSON( "_id" << genID() ) , BSON( "$set" << BSON( "jumbo" << true ) ) );
+ conn.done();
+ }
+ catch ( std::exception& ) {
+ warning() << "couldn't set jumbo for chunk: " << genID() << endl;
+ }
+ }
+
+ void Chunk::refreshChunkSize() {
+ BSONObj o = grid.getConfigSetting("chunksize");
+
+ if ( o.isEmpty() ) {
+ return;
+ }
+
+ int csize = o["value"].numberInt();
+
+ // validate chunksize before proceeding
+ if ( csize == 0 ) {
+ // setting was not modified; mark as such
+ log() << "warning: invalid chunksize (" << csize << ") ignored" << endl;
+ return;
+ }
+
+ LOG(1) << "Refreshing MaxChunkSize: " << csize << endl;
+ Chunk::MaxChunkSize = csize * 1024 * 1024;
+ }
+
+ // ------- ChunkManager --------
+
+ AtomicUInt ChunkManager::NextSequenceNumber = 1;
+
+ ChunkManager::ChunkManager( string ns , ShardKeyPattern pattern , bool unique ) :
+ _ns( ns ) , _key( pattern ) , _unique( unique ) , _chunkRanges(), _mutex("ChunkManager"),
+ _nsLock( ConnectionString( configServer.modelServer() , ConnectionString::SYNC ) , ns ),
+
+ // The shard versioning mechanism hinges on keeping track of the number of times we reloaded ChunkManager's.
+ // Increasing this number here will prompt checkShardVersion() to refresh the connection-level versions to
+ // the most up to date value.
+ _sequenceNumber(++NextSequenceNumber),
+
+ _splitTickets( 5 )
+
+ {
+ int tries = 3;
+ while (tries--) {
+ ChunkMap chunkMap;
+ set<Shard> shards;
+ ShardVersionMap shardVersions;
+ Timer t;
+ _load(chunkMap, shards, shardVersions);
+ {
+ int ms = t.millis();
+ log() << "ChunkManager: time to load chunks for " << ns << ": " << ms << "ms"
+ << " sequenceNumber: " << _sequenceNumber
+ << " version: " << _version.toString()
+ << endl;
+ }
+
+ if (_isValid(chunkMap)) {
+ // These variables are const for thread-safety. Since the
+ // constructor can only be called from one thread, we don't have
+ // to worry about that here.
+ const_cast<ChunkMap&>(_chunkMap).swap(chunkMap);
+ const_cast<set<Shard>&>(_shards).swap(shards);
+ const_cast<ShardVersionMap&>(_shardVersions).swap(shardVersions);
+ const_cast<ChunkRangeManager&>(_chunkRanges).reloadAll(_chunkMap);
+ return;
+ }
+
+ if (_chunkMap.size() < 10) {
+ _printChunks();
+ }
+
+ warning() << "ChunkManager loaded an invalid config, trying again" << endl;
+
+ sleepmillis(10 * (3-tries));
+ }
+
+ // this will abort construction so we should never have a reference to an invalid config
+ msgasserted(13282, "Couldn't load a valid config for " + _ns + " after 3 attempts. Please try again.");
+ }
+
+ ChunkManagerPtr ChunkManager::reload(bool force) const {
+ return grid.getDBConfig(getns())->getChunkManager(getns(), force);
+ }
+
+ void ChunkManager::_load(ChunkMap& chunkMap, set<Shard>& shards, ShardVersionMap& shardVersions) {
+ ScopedDbConnection conn( configServer.modelServer() );
+
+ // TODO really need the sort?
+ auto_ptr<DBClientCursor> cursor = conn->query( Chunk::chunkMetadataNS, QUERY("ns" << _ns).sort("lastmod",-1), 0, 0, 0, 0,
+ (DEBUG_BUILD ? 2 : 1000000)); // batch size. Try to induce potential race conditions in debug builds
+ assert( cursor.get() );
+ while ( cursor->more() ) {
+ BSONObj d = cursor->next();
+ if ( d["isMaxMarker"].trueValue() ) {
+ continue;
+ }
+
+ ChunkPtr c( new Chunk( this, d ) );
+
+ chunkMap[c->getMax()] = c;
+ shards.insert(c->getShard());
+
+
+ // set global max
+ if ( c->getLastmod() > _version )
+ _version = c->getLastmod();
+
+ // set shard max
+ ShardChunkVersion& shardMax = shardVersions[c->getShard()];
+ if ( c->getLastmod() > shardMax )
+ shardMax = c->getLastmod();
+ }
+ conn.done();
+ }
+
+ bool ChunkManager::_isValid(const ChunkMap& chunkMap) {
+#define ENSURE(x) do { if(!(x)) { log() << "ChunkManager::_isValid failed: " #x << endl; return false; } } while(0)
+
+ if (chunkMap.empty())
+ return true;
+
+ // Check endpoints
+ ENSURE(allOfType(MinKey, chunkMap.begin()->second->getMin()));
+ ENSURE(allOfType(MaxKey, boost::prior(chunkMap.end())->second->getMax()));
+
+ // Make sure there are no gaps or overlaps
+ for (ChunkMap::const_iterator it=boost::next(chunkMap.begin()), end=chunkMap.end(); it != end; ++it) {
+ ChunkMap::const_iterator last = boost::prior(it);
+
+ if (!(it->second->getMin() == last->second->getMax())) {
+ PRINT(it->second->toString());
+ PRINT(it->second->getMin());
+ PRINT(last->second->getMax());
+ }
+ ENSURE(it->second->getMin() == last->second->getMax());
+ }
+
+ return true;
+
+#undef ENSURE
+ }
+
+ void ChunkManager::_printChunks() const {
+ for (ChunkMap::const_iterator it=_chunkMap.begin(), end=_chunkMap.end(); it != end; ++it) {
+ log() << *it->second << endl;
+ }
+ }
+
+ bool ChunkManager::hasShardKey( const BSONObj& obj ) const {
+ return _key.hasShardKey( obj );
+ }
+
+ void ChunkManager::createFirstChunks( const Shard& primary , vector<BSONObj>* initPoints , vector<Shard>* initShards ) const {
+ // TODO distlock?
+ assert( _chunkMap.size() == 0 );
+
+ vector<BSONObj> splitPoints;
+ vector<Shard> shards;
+ unsigned long long numObjects = 0;
+ Chunk c(this, _key.globalMin(), _key.globalMax(), primary);
+
+ if ( !initPoints || !initPoints->size() ) {
+ // discover split points
+ {
+ // get stats to see if there is any data
+ ScopedDbConnection shardConn( primary.getConnString() );
+ numObjects = shardConn->count( getns() );
+ shardConn.done();
+ }
+
+ if ( numObjects > 0 )
+ c.pickSplitVector( splitPoints , Chunk::MaxChunkSize );
+
+ // since docs alread exists, must use primary shard
+ shards.push_back( primary );
+ } else {
+ // make sure points are unique and ordered
+ set<BSONObj> orderedPts;
+ for ( unsigned i = 0; i < initPoints->size(); ++i ) {
+ BSONObj pt = (*initPoints)[i];
+ orderedPts.insert( pt );
+ }
+ for ( set<BSONObj>::iterator it = orderedPts.begin(); it != orderedPts.end(); ++it ) {
+ splitPoints.push_back( *it );
+ }
+
+ if ( !initShards || !initShards->size() ) {
+ // use all shards, starting with primary
+ shards.push_back( primary );
+ vector<Shard> tmp;
+ primary.getAllShards( tmp );
+ for ( unsigned i = 0; i < tmp.size(); ++i ) {
+ if ( tmp[i] != primary )
+ shards.push_back( tmp[i] );
+ }
+ }
+ }
+
+ // this is the first chunk; start the versioning from scratch
+ ShardChunkVersion version;
+ version.incMajor();
+
+ log() << "going to create " << splitPoints.size() + 1 << " chunk(s) for: " << _ns << endl;
+
+ ScopedDbConnection conn( configServer.modelServer() );
+
+ for ( unsigned i=0; i<=splitPoints.size(); i++ ) {
+ BSONObj min = i == 0 ? _key.globalMin() : splitPoints[i-1];
+ BSONObj max = i < splitPoints.size() ? splitPoints[i] : _key.globalMax();
+
+ Chunk temp( this , min , max , shards[ i % shards.size() ] );
+
+ BSONObjBuilder chunkBuilder;
+ temp.serialize( chunkBuilder , version );
+ BSONObj chunkObj = chunkBuilder.obj();
+
+ conn->update( Chunk::chunkMetadataNS, QUERY( "_id" << temp.genID() ), chunkObj, true, false );
+
+ version.incMinor();
+ }
+
+ string errmsg = conn->getLastError();
+ if ( errmsg.size() ) {
+ string ss = str::stream() << "creating first chunks failed. result: " << errmsg;
+ error() << ss << endl;
+ msgasserted( 15903 , ss );
+ }
+
+ conn.done();
+
+ if ( numObjects == 0 ) {
+ // the ensure index will have the (desired) indirect effect of creating the collection on the
+ // assigned shard, as it sets up the index over the sharding keys.
+ ScopedDbConnection shardConn( c.getShard().getConnString() );
+ shardConn->ensureIndex( getns() , getShardKey().key() , _unique , "" , false ); // do not cache ensureIndex SERVER-1691
+ shardConn.done();
+ }
+
+ }
+
+ ChunkPtr ChunkManager::findChunk( const BSONObj & obj ) const {
+ BSONObj key = _key.extractKey(obj);
+
+ {
+ BSONObj foo;
+ ChunkPtr c;
+ {
+ ChunkMap::const_iterator it = _chunkMap.upper_bound(key);
+ if (it != _chunkMap.end()) {
+ foo = it->first;
+ c = it->second;
+ }
+ }
+
+ if ( c ) {
+ if ( c->contains( key ) ){
+ dassert(c->contains(key)); // doesn't use fast-path in extractKey
+ return c;
+ }
+
+ PRINT(foo);
+ PRINT(*c);
+ PRINT(key);
+
+ reload();
+ massert(13141, "Chunk map pointed to incorrect chunk", false);
+ }
+ }
+
+ throw UserException( 8070 , str::stream() << "couldn't find a chunk which should be impossible: " << key );
+ }
+
+ ChunkPtr ChunkManager::findChunkOnServer( const Shard& shard ) const {
+ for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ) {
+ ChunkPtr c = i->second;
+ if ( c->getShard() == shard )
+ return c;
+ }
+
+ return ChunkPtr();
+ }
+
+ void ChunkManager::getShardsForQuery( set<Shard>& shards , const BSONObj& query ) const {
+ //TODO look into FieldRangeSetOr
+ OrRangeGenerator org(_ns.c_str(), query, false);
+
+ const string special = org.getSpecial();
+ if (special == "2d") {
+ BSONForEach(field, query) {
+ if (getGtLtOp(field) == BSONObj::opNEAR) {
+ uassert(13501, "use geoNear command rather than $near query", false);
+ // TODO: convert to geoNear rather than erroring out
+ }
+ // $within queries are fine
+ }
+ }
+ else if (!special.empty()) {
+ uassert(13502, "unrecognized special query type: " + special, false);
+ }
+
+ do {
+ boost::scoped_ptr<FieldRangeSetPair> frsp (org.topFrsp());
+ {
+ // special case if most-significant field isn't in query
+ FieldRange range = frsp->singleKeyRange(_key.key().firstElementFieldName());
+ if ( !range.nontrivial() ) {
+ DEV PRINT(range.nontrivial());
+ getShardsForRange( shards, _key.globalMin(), _key.globalMax() );
+ return;
+ }
+ }
+
+ BoundList ranges = frsp->singleKeyIndexBounds(_key.key(), 1);
+ for (BoundList::const_iterator it=ranges.begin(), end=ranges.end(); it != end; ++it) {
+
+ BSONObj minObj = it->first.replaceFieldNames(_key.key());
+ BSONObj maxObj = it->second.replaceFieldNames(_key.key());
+
+ getShardsForRange( shards, minObj, maxObj, false );
+
+ // once we know we need to visit all shards no need to keep looping
+ if( shards.size() == _shards.size() ) return;
+ }
+
+ if (org.moreOrClauses())
+ org.popOrClauseSingleKey();
+
+ }
+ while (org.moreOrClauses());
+ }
+
+ void ChunkManager::getShardsForRange(set<Shard>& shards, const BSONObj& min, const BSONObj& max, bool fullKeyReq ) const {
+
+ if( fullKeyReq ){
+ uassert(13405, str::stream() << "min value " << min << " does not have shard key", hasShardKey(min));
+ uassert(13406, str::stream() << "max value " << max << " does not have shard key", hasShardKey(max));
+ }
+
+ ChunkRangeMap::const_iterator it = _chunkRanges.upper_bound(min);
+ ChunkRangeMap::const_iterator end = _chunkRanges.upper_bound(max);
+
+ massert( 13507 , str::stream() << "no chunks found between bounds " << min << " and " << max , it != _chunkRanges.ranges().end() );
+
+ if( end != _chunkRanges.ranges().end() ) ++end;
+
+ for( ; it != end; ++it ){
+ shards.insert(it->second->getShard());
+
+ // once we know we need to visit all shards no need to keep looping
+ if (shards.size() == _shards.size()) break;
+ }
+ }
+
+ void ChunkManager::getAllShards( set<Shard>& all ) const {
+ all.insert(_shards.begin(), _shards.end());
+ }
+
+ bool ChunkManager::compatibleWith( const ChunkManager& other, const Shard& shard ) const {
+ // TODO: Make this much smarter - currently returns true only if we're the same chunk manager
+ return getns() == other.getns() && getSequenceNumber() == other.getSequenceNumber();
+ }
+
+ void ChunkManager::drop( ChunkManagerPtr me ) const {
+ scoped_lock lk( _mutex );
+
+ configServer.logChange( "dropCollection.start" , _ns , BSONObj() );
+
+ dist_lock_try dlk;
+ try{
+ dlk = dist_lock_try( &_nsLock , "drop" );
+ }
+ catch( LockException& e ){
+ uassert( 14022, str::stream() << "Error locking distributed lock for chunk drop." << causedBy( e ), false);
+ }
+
+ uassert( 13331 , "collection's metadata is undergoing changes. Please try again." , dlk.got() );
+
+ uassert( 10174 , "config servers not all up" , configServer.allUp() );
+
+ set<Shard> seen;
+
+ LOG(1) << "ChunkManager::drop : " << _ns << endl;
+
+ // lock all shards so no one can do a split/migrate
+ for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ) {
+ ChunkPtr c = i->second;
+ seen.insert( c->getShard() );
+ }
+
+ LOG(1) << "ChunkManager::drop : " << _ns << "\t all locked" << endl;
+
+ // delete data from mongod
+ for ( set<Shard>::iterator i=seen.begin(); i!=seen.end(); i++ ) {
+ ScopedDbConnection conn( *i );
+ conn->dropCollection( _ns );
+ conn.done();
+ }
+
+ LOG(1) << "ChunkManager::drop : " << _ns << "\t removed shard data" << endl;
+
+ // remove chunk data
+ ScopedDbConnection conn( configServer.modelServer() );
+ conn->remove( Chunk::chunkMetadataNS , BSON( "ns" << _ns ) );
+ conn.done();
+ LOG(1) << "ChunkManager::drop : " << _ns << "\t removed chunk data" << endl;
+
+ for ( set<Shard>::iterator i=seen.begin(); i!=seen.end(); i++ ) {
+ ScopedDbConnection conn( *i );
+ BSONObj res;
+
+ // this is horrible
+ // we need a special command for dropping on the d side
+ // this hack works for the moment
+
+ if ( ! setShardVersion( conn.conn() , _ns , 0 , true , res ) )
+ throw UserException( 8071 , str::stream() << "cleaning up after drop failed: " << res );
+ conn->simpleCommand( "admin", 0, "unsetSharding" );
+ conn.done();
+ }
+
+ LOG(1) << "ChunkManager::drop : " << _ns << "\t DONE" << endl;
+ configServer.logChange( "dropCollection" , _ns , BSONObj() );
+ }
+
+ ShardChunkVersion ChunkManager::getVersion( const Shard& shard ) const {
+ ShardVersionMap::const_iterator i = _shardVersions.find( shard );
+ if ( i == _shardVersions.end() )
+ return 0;
+ return i->second;
+ }
+
+ ShardChunkVersion ChunkManager::getVersion() const {
+ return _version;
+ }
+
+ string ChunkManager::toString() const {
+ stringstream ss;
+ ss << "ChunkManager: " << _ns << " key:" << _key.toString() << '\n';
+ for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ) {
+ const ChunkPtr c = i->second;
+ ss << "\t" << c->toString() << '\n';
+ }
+ return ss.str();
+ }
+
+ void ChunkRangeManager::assertValid() const {
+ if (_ranges.empty())
+ return;
+
+ try {
+ // No Nulls
+ for (ChunkRangeMap::const_iterator it=_ranges.begin(), end=_ranges.end(); it != end; ++it) {
+ assert(it->second);
+ }
+
+ // Check endpoints
+ assert(allOfType(MinKey, _ranges.begin()->second->getMin()));
+ assert(allOfType(MaxKey, boost::prior(_ranges.end())->second->getMax()));
+
+ // Make sure there are no gaps or overlaps
+ for (ChunkRangeMap::const_iterator it=boost::next(_ranges.begin()), end=_ranges.end(); it != end; ++it) {
+ ChunkRangeMap::const_iterator last = boost::prior(it);
+ assert(it->second->getMin() == last->second->getMax());
+ }
+
+ // Check Map keys
+ for (ChunkRangeMap::const_iterator it=_ranges.begin(), end=_ranges.end(); it != end; ++it) {
+ assert(it->first == it->second->getMax());
+ }
+
+ // Make sure we match the original chunks
+ const ChunkMap chunks = _ranges.begin()->second->getManager()->_chunkMap;
+ for ( ChunkMap::const_iterator i=chunks.begin(); i!=chunks.end(); ++i ) {
+ const ChunkPtr chunk = i->second;
+
+ ChunkRangeMap::const_iterator min = _ranges.upper_bound(chunk->getMin());
+ ChunkRangeMap::const_iterator max = _ranges.lower_bound(chunk->getMax());
+
+ assert(min != _ranges.end());
+ assert(max != _ranges.end());
+ assert(min == max);
+ assert(min->second->getShard() == chunk->getShard());
+ assert(min->second->contains( chunk->getMin() ));
+ assert(min->second->contains( chunk->getMax() ) || (min->second->getMax() == chunk->getMax()));
+ }
+
+ }
+ catch (...) {
+ log( LL_ERROR ) << "\t invalid ChunkRangeMap! printing ranges:" << endl;
+
+ for (ChunkRangeMap::const_iterator it=_ranges.begin(), end=_ranges.end(); it != end; ++it)
+ cout << it->first << ": " << *it->second << endl;
+
+ throw;
+ }
+ }
+
+ void ChunkRangeManager::reloadAll(const ChunkMap& chunks) {
+ _ranges.clear();
+ _insertRange(chunks.begin(), chunks.end());
+
+ DEV assertValid();
+ }
+
+ void ChunkRangeManager::_insertRange(ChunkMap::const_iterator begin, const ChunkMap::const_iterator end) {
+ while (begin != end) {
+ ChunkMap::const_iterator first = begin;
+ Shard shard = first->second->getShard();
+ while (begin != end && (begin->second->getShard() == shard))
+ ++begin;
+
+ shared_ptr<ChunkRange> cr (new ChunkRange(first, begin));
+ _ranges[cr->getMax()] = cr;
+ }
+ }
+
+ int ChunkManager::getCurrentDesiredChunkSize() const {
+ // split faster in early chunks helps spread out an initial load better
+ const int minChunkSize = 1 << 20; // 1 MBytes
+
+ int splitThreshold = Chunk::MaxChunkSize;
+
+ int nc = numChunks();
+
+ if ( nc <= 1 ) {
+ return 1024;
+ }
+ else if ( nc < 3 ) {
+ return minChunkSize / 2;
+ }
+ else if ( nc < 10 ) {
+ splitThreshold = max( splitThreshold / 4 , minChunkSize );
+ }
+ else if ( nc < 20 ) {
+ splitThreshold = max( splitThreshold / 2 , minChunkSize );
+ }
+
+ return splitThreshold;
+ }
+
+ class ChunkObjUnitTest : public UnitTest {
+ public:
+ void runShardChunkVersion() {
+ vector<ShardChunkVersion> all;
+ all.push_back( ShardChunkVersion(1,1) );
+ all.push_back( ShardChunkVersion(1,2) );
+ all.push_back( ShardChunkVersion(2,1) );
+ all.push_back( ShardChunkVersion(2,2) );
+
+ for ( unsigned i=0; i<all.size(); i++ ) {
+ for ( unsigned j=i+1; j<all.size(); j++ ) {
+ assert( all[i] < all[j] );
+ }
+ }
+
+ }
+
+ void run() {
+ runShardChunkVersion();
+ LOG(1) << "shardObjTest passed" << endl;
+ }
+ } shardObjTest;
+
+
+ // ----- to be removed ---
+ extern OID serverID;
+
+ // NOTE (careful when deprecating)
+ // currently the sharding is enabled because of a write or read (as opposed to a split or migrate), the shard learns
+ // its name and through the 'setShardVersion' command call
+ bool setShardVersion( DBClientBase & conn , const string& ns , ShardChunkVersion version , bool authoritative , BSONObj& result ) {
+ BSONObjBuilder cmdBuilder;
+ cmdBuilder.append( "setShardVersion" , ns.c_str() );
+ cmdBuilder.append( "configdb" , configServer.modelServer() );
+ cmdBuilder.appendTimestamp( "version" , version.toLong() );
+ cmdBuilder.appendOID( "serverID" , &serverID );
+ if ( authoritative )
+ cmdBuilder.appendBool( "authoritative" , 1 );
+
+ Shard s = Shard::make( conn.getServerAddress() );
+ cmdBuilder.append( "shard" , s.getName() );
+ cmdBuilder.append( "shardHost" , s.getConnString() );
+ BSONObj cmd = cmdBuilder.obj();
+
+ LOG(1) << " setShardVersion " << s.getName() << " " << conn.getServerAddress() << " " << ns << " " << cmd << " " << &conn << endl;
+
+ return conn.runCommand( "admin" , cmd , result );
+ }
+
+} // namespace mongo