/** * Copyright (C) 2008-2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/s/chunk.h" #include #include "mongo/base/owned_pointer_map.h" #include "mongo/client/connpool.h" #include "mongo/client/dbclientcursor.h" #include "mongo/config.h" #include "mongo/db/lasterror.h" #include "mongo/db/query/query_solution.h" #include "mongo/db/write_concern.h" #include "mongo/db/write_concern_options.h" #include "mongo/platform/random.h" #include "mongo/s/balancer_policy.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_settings.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/config.h" #include "mongo/s/config_server_checker_service.h" #include "mongo/s/cursors.h" #include "mongo/s/grid.h" #include "mongo/util/concurrency/ticketholder.h" #include "mongo/util/log.h" namespace mongo { using boost::shared_ptr; using std::auto_ptr; using std::map; using std::ostringstream; using std::set; using std::string; using std::stringstream; using std::vector; namespace { const int kTooManySplitPoints = 4; /** * Attempts to move the given chunk to another shard. * * Returns true if the chunk was actually moved. */ bool tryMoveToOtherShard(const ChunkManager& manager, const ChunkType& chunk) { // reload sharding metadata before starting migration ChunkManagerPtr chunkMgr = manager.reload(false /* just reloaded in mulitsplit */); ShardInfoMap shardInfo; Status loadStatus = DistributionStatus::populateShardInfoMap(&shardInfo); if (!loadStatus.isOK()) { warning() << "failed to load shard metadata while trying to moveChunk after " << "auto-splitting" << causedBy(loadStatus); return false; } if (shardInfo.size() < 2) { LOG(0) << "no need to move top chunk since there's only 1 shard"; return false; } OwnedPointerMap > shardToChunkMap; DistributionStatus::populateShardToChunksMap(shardInfo, *chunkMgr, &shardToChunkMap.mutableMap()); DistributionStatus chunkDistribution(shardInfo, shardToChunkMap.map()); const string configServerStr = configServer.getConnectionString().toString(); StatusWith tagStatus = DistributionStatus::getTagForSingleChunk(configServerStr, manager.getns(), chunk); if (!tagStatus.isOK()) { warning() << "Not auto-moving chunk because of an error encountered while " << "checking tag for chunk: " << tagStatus.getStatus(); return false; } const string newLocation( chunkDistribution.getBestReceieverShard(tagStatus.getValue())); if (newLocation.empty()) { LOG(1) << "recently split chunk: " << chunk << " but no suitable shard to move to"; return false; } if (chunk.getShard() == newLocation) { // if this is the best shard, then we shouldn't do anything. LOG(1) << "recently split chunk: " << chunk << " already in the best shard"; return false; } ChunkPtr toMove = chunkMgr->findIntersectingChunk(chunk.getMin()); if (!(toMove->getMin() == chunk.getMin() && toMove->getMax() == chunk.getMax())) { LOG(1) << "recently split chunk: " << chunk << " modified before we could migrate " << toMove->toString(); return false; } log() << "moving chunk (auto): " << toMove->toString() << " to: " << newLocation; BSONObj res; WriteConcernOptions noThrottle; if (!toMove->moveAndCommit(newLocation, Chunk::MaxChunkSize, &noThrottle, /* secondaryThrottle */ false, /* waitForDelete - small chunk, no need */ 0, /* maxTimeMS - don't time out */ res)) { msgassertedNoTrace(10412, str::stream() << "moveAndCommit failed: " << res); } // update our config manager.reload(); return true; } } // namespace long long Chunk::MaxChunkSize = 1024 * 1024 * 64; int Chunk::MaxObjectPerChunk = 250000; // Can be overridden from command line bool Chunk::ShouldAutoSplit = true; Chunk::Chunk(const ChunkManager * manager, BSONObj from) : _manager(manager), _lastmod(0, 0, OID()), _dataWritten(mkDataWritten()) { string ns = from.getStringField(ChunkType::ns().c_str()); _shard.reset(from.getStringField(ChunkType::shard().c_str())); _lastmod = ChunkVersion::fromBSON(from[ChunkType::DEPRECATED_lastmod()]); verify( _lastmod.isSet() ); _min = from.getObjectField(ChunkType::min().c_str()).getOwned(); _max = from.getObjectField(ChunkType::max().c_str()).getOwned(); _jumbo = from[ChunkType::jumbo()].trueValue(); uassert( 10170 , "Chunk needs a ns" , ! ns.empty() ); uassert( 13327 , "Chunk ns must match server ns" , ns == _manager->getns() ); uassert( 10171 , "Chunk needs a server" , _shard.ok() ); uassert( 10172 , "Chunk needs a min" , ! _min.isEmpty() ); uassert( 10173 , "Chunk needs a max" , ! _max.isEmpty() ); } Chunk::Chunk(const ChunkManager * info , const BSONObj& min, const BSONObj& max, const Shard& shard, ChunkVersion lastmod) : _manager(info), _min(min), _max(max), _shard(shard), _lastmod(lastmod), _jumbo(false), _dataWritten(mkDataWritten()) {} int Chunk::mkDataWritten() { PseudoRandom r(static_cast(time(0))); return r.nextInt32( MaxChunkSize / ChunkManager::SplitHeuristics::splitTestFactor ); } string Chunk::getns() const { verify( _manager ); return _manager->getns(); } bool Chunk::containsKey( const BSONObj& shardKey ) const { return getMin().woCompare( shardKey ) <= 0 && shardKey.woCompare( getMax() ) < 0; } bool ChunkRange::containsKey( const BSONObj& shardKey ) const { // same as Chunk method return getMin().woCompare( shardKey ) <= 0 && shardKey.woCompare( getMax() ) < 0; } bool Chunk::_minIsInf() const { return 0 == _manager->getShardKeyPattern().getKeyPattern().globalMin().woCompare(getMin()); } bool Chunk::_maxIsInf() const { return 0 == _manager->getShardKeyPattern().getKeyPattern().globalMax().woCompare(getMax()); } BSONObj Chunk::_getExtremeKey(bool doSplitAtLower) const { Query q; if (doSplitAtLower) { q.sort( _manager->getShardKeyPattern().toBSON() ); } else { // need to invert shard key pattern to sort backwards // TODO: make a helper in ShardKeyPattern? BSONObj k = _manager->getShardKeyPattern().toBSON(); BSONObjBuilder r; BSONObjIterator i(k); while( i.more() ) { BSONElement e = i.next(); uassert( 10163 , "can only handle numbers here - which i think is correct" , e.isNumber() ); r.append( e.fieldName() , -1 * e.number() ); } q.sort( r.obj() ); } // find the extreme key ScopedDbConnection conn(getShard().getConnString()); BSONObj end; if (doSplitAtLower) { // Splitting close to the lower bound means that the split point will be the // upper bound. Chunk range upper bounds are exclusive so skip a document to // make the lower half of the split end up with a single document. auto_ptr cursor = conn->query(_manager->getns(), q, 1, /* nToReturn */ 1 /* nToSkip */); if (cursor->more()) { end = cursor->next().getOwned(); } } else { end = conn->findOne(_manager->getns(), q); } conn.done(); if ( end.isEmpty() ) return BSONObj(); return _manager->getShardKeyPattern().extractShardKeyFromDoc(end); } void Chunk::pickMedianKey( BSONObj& medianKey ) const { // Ask the mongod holding this chunk to figure out the split points. ScopedDbConnection conn(getShard().getConnString()); BSONObj result; BSONObjBuilder cmd; cmd.append( "splitVector" , _manager->getns() ); cmd.append( "keyPattern" , _manager->getShardKeyPattern().toBSON() ); cmd.append( "min" , getMin() ); cmd.append( "max" , getMax() ); cmd.appendBool( "force" , true ); BSONObj cmdObj = cmd.obj(); if ( ! conn->runCommand( "admin" , cmdObj , result )) { conn.done(); ostringstream os; os << "splitVector command (median key) failed: " << result; uassert( 13503 , os.str() , 0 ); } BSONObjIterator it( result.getObjectField( "splitKeys" ) ); if ( it.more() ) { medianKey = it.next().Obj().getOwned(); } conn.done(); } void Chunk::pickSplitVector(vector& splitPoints, long long chunkSize /* bytes */, int maxPoints, int maxObjs) const { // Ask the mongod holding this chunk to figure out the split points. ScopedDbConnection conn(getShard().getConnString()); BSONObj result; BSONObjBuilder cmd; cmd.append( "splitVector" , _manager->getns() ); cmd.append( "keyPattern" , _manager->getShardKeyPattern().toBSON() ); cmd.append( "min" , getMin() ); cmd.append( "max" , getMax() ); cmd.append( "maxChunkSizeBytes" , chunkSize ); cmd.append( "maxSplitPoints" , maxPoints ); cmd.append( "maxChunkObjects" , maxObjs ); BSONObj cmdObj = cmd.obj(); if ( ! conn->runCommand( "admin" , cmdObj , result )) { conn.done(); ostringstream os; os << "splitVector command failed: " << result; uassert( 13345 , os.str() , 0 ); } BSONObjIterator it( result.getObjectField( "splitKeys" ) ); while ( it.more() ) { splitPoints.push_back( it.next().Obj().getOwned() ); } conn.done(); } void Chunk::determineSplitPoints(bool atMedian, vector* splitPoints) const { // if splitting is not obligatory we may return early if there are not enough data // we cap the number of objects that would fall in the first half (before the split point) // the rationale is we'll find a split point without traversing all the data if ( atMedian ) { BSONObj medianKey; pickMedianKey( medianKey ); if ( ! medianKey.isEmpty() ) splitPoints->push_back( medianKey ); } else { long long chunkSize = _manager->getCurrentDesiredChunkSize(); // Note: One split point for every 1/2 chunk size. const int estNumSplitPoints = _dataWritten / chunkSize * 2; if (estNumSplitPoints >= kTooManySplitPoints) { // The current desired chunk size will split the chunk into lots of small chunks // (At the worst case, this can result into thousands of chunks); so check and // see if a bigger value can be used. chunkSize = std::min(_dataWritten, Chunk::MaxChunkSize); } pickSplitVector(*splitPoints, chunkSize, 0, MaxObjectPerChunk); if ( splitPoints->size() <= 1 ) { // no split points means there isn't enough data to split on // 1 split point means we have between half the chunk size to full chunk size // so we shouldn't split splitPoints->clear(); } } } Status Chunk::split(SplitPointMode mode, size_t* resultingSplits, BSONObj* res) const { size_t dummy; if (resultingSplits == NULL) { resultingSplits = &dummy; } bool atMedian = mode == Chunk::atMedian; vector splitPoints; determineSplitPoints( atMedian, &splitPoints ); if (splitPoints.empty()) { string msg; if (atMedian) { msg = "cannot find median in chunk, possibly empty"; } else { msg = "chunk not full enough to trigger auto-split"; } LOG(1) << msg; return Status(ErrorCodes::CannotSplit, msg); } // We assume that if the chunk being split is the first (or last) one on the collection, // this chunk is likely to see more insertions. Instead of splitting mid-chunk, we use // the very first (or last) key as a split point. // This heuristic is skipped for "special" shard key patterns that are not likely to // produce monotonically increasing or decreasing values (e.g. hashed shard keys). if (mode == Chunk::autoSplitInternal && KeyPattern::isOrderedKeyPattern(_manager->getShardKeyPattern().toBSON())) { if (_minIsInf()) { BSONObj key = _getExtremeKey(true); if (!key.isEmpty()) { splitPoints[0] = key.getOwned(); } } else if (_maxIsInf()) { BSONObj key = _getExtremeKey(false); if (!key.isEmpty()) { splitPoints.pop_back(); splitPoints.push_back(key); } } } // Normally, we'd have a sound split point here if the chunk is not empty. // It's also a good place to sanity check. if ( _min == splitPoints.front() ) { string msg(str::stream() << "not splitting chunk " << toString() << ", split point " << splitPoints.front() << " is exactly on chunk bounds"); log() << msg; return Status(ErrorCodes::CannotSplit, msg); } if ( _max == splitPoints.back() ) { string msg(str::stream() << "not splitting chunk " << toString() << ", split point " << splitPoints.back() << " is exactly on chunk bounds"); log() << msg; return Status(ErrorCodes::CannotSplit, msg); } Status status = multiSplit(splitPoints, res); *resultingSplits = splitPoints.size(); return status; } Status Chunk::multiSplit(const vector& m, BSONObj* res) const { const size_t maxSplitPoints = 8192; uassert( 10165 , "can't split as shard doesn't have a manager" , _manager ); uassert( 13332 , "need a split key to split chunk" , !m.empty() ); uassert( 13333 , "can't split a chunk in that many parts", m.size() < maxSplitPoints ); uassert( 13003 , "can't split a chunk with only one distinct value" , _min.woCompare(_max) ); ScopedDbConnection conn(getShard().getConnString()); BSONObjBuilder cmd; cmd.append( "splitChunk" , _manager->getns() ); cmd.append( "keyPattern" , _manager->getShardKeyPattern().toBSON() ); cmd.append( "min" , getMin() ); cmd.append( "max" , getMax() ); cmd.append( "from" , getShard().getName() ); cmd.append( "splitKeys" , m ); cmd.append( "configdb" , configServer.modelServer() ); cmd.append("epoch", _manager->getVersion().epoch()); BSONObj cmdObj = cmd.obj(); BSONObj dummy; if (res == NULL) { res = &dummy; } if (!conn->runCommand("admin", cmdObj, *res)) { string msg(str::stream() << "splitChunk failed - cmd: " << cmdObj << " result: " << *res); warning() << msg; conn.done(); return Status(ErrorCodes::SplitFailed, msg); } conn.done(); // force reload of config _manager->reload(); return Status::OK(); } bool Chunk::moveAndCommit(const Shard& to, long long chunkSize /* bytes */, const WriteConcernOptions* writeConcern, bool waitForDelete, int maxTimeMS, BSONObj& res) const { uassert( 10167 , "can't move shard to its current location!" , getShard() != to ); log() << "moving chunk ns: " << _manager->getns() << " moving ( " << toString() << ") " << _shard.toString() << " -> " << to.toString(); Shard from = _shard; ScopedDbConnection fromconn(from.getConnString()); BSONObjBuilder builder; builder.append("moveChunk", _manager->getns()); builder.append("from", from.getAddress().toString()); builder.append("to", to.getAddress().toString()); // NEEDED FOR 2.0 COMPATIBILITY builder.append("fromShard", from.getName()); builder.append("toShard", to.getName()); /////////////////////////////// builder.append("min", _min); builder.append("max", _max); builder.append("maxChunkSizeBytes", chunkSize); builder.append("configdb", configServer.modelServer()); // For legacy secondary throttle setting. bool secondaryThrottle = true; if (writeConcern && writeConcern->wNumNodes <= 1 && writeConcern->wMode.empty()) { secondaryThrottle = false; } builder.append("secondaryThrottle", secondaryThrottle); if (secondaryThrottle && writeConcern) { builder.append("writeConcern", writeConcern->toBSON()); } builder.append("waitForDelete", waitForDelete); builder.append(LiteParsedQuery::cmdOptionMaxTimeMS, maxTimeMS); builder.append("epoch", _manager->getVersion().epoch()); bool worked = fromconn->runCommand("admin", builder.done(), res); fromconn.done(); LOG( worked ? 1 : 0 ) << "moveChunk result: " << res; // if succeeded, needs to reload to pick up the new location // if failed, mongos may be stale // reload is excessive here as the failure could be simply because collection metadata is taken _manager->reload(); return worked; } bool Chunk::splitIfShould( long dataWritten ) const { dassert( ShouldAutoSplit ); LastError::Disabled d( lastError.get() ); try { _dataWritten += dataWritten; int splitThreshold = getManager()->getCurrentDesiredChunkSize(); if (_minIsInf() || _maxIsInf()) { splitThreshold = (int)((double)splitThreshold * .9); } if ( _dataWritten < splitThreshold / ChunkManager::SplitHeuristics::splitTestFactor ) return false; if ( ! getManager()->_splitHeuristics._splitTickets.tryAcquire() ) { LOG(1) << "won't auto split because not enough tickets: " << getManager()->getns(); return false; } TicketHolderReleaser releaser( &(getManager()->_splitHeuristics._splitTickets) ); // this is a bit ugly // we need it so that mongos blocks for the writes to actually be committed // this does mean mongos has more back pressure than mongod alone // since it nots 100% tcp queue bound // this was implicit before since we did a splitVector on the same socket ShardConnection::sync(); if ( !isConfigServerConsistent() ) { RARELY warning() << "will not perform auto-split because " << "config servers are inconsistent"; return false; } LOG(1) << "about to initiate autosplit: " << *this << " dataWritten: " << _dataWritten << " splitThreshold: " << splitThreshold; BSONObj res; size_t splitCount = 0; Status status = split(Chunk::autoSplitInternal, &splitCount, &res); if (!status.isOK()) { // Split would have issued a message if we got here. This means there wasn't enough // data to split, so don't want to try again until considerable more data _dataWritten = 0; return false; } if (_maxIsInf() || _minIsInf()) { // we don't want to reset _dataWritten since we kind of want to check the other side right away } else { // we're splitting, so should wait a bit _dataWritten = 0; } bool shouldBalance = grid.getConfigShouldBalance(); if (shouldBalance) { auto status = grid.catalogManager()->getCollection(_manager->getns()); if (!status.isOK()) { log() << "Auto-split for " << _manager->getns() << " failed to load collection metadata due to " << status.getStatus(); return false; } shouldBalance = status.getValue().getAllowBalance(); } log() << "autosplitted " << _manager->getns() << " shard: " << toString() << " into " << (splitCount + 1) << " (splitThreshold " << splitThreshold << ")" #ifdef MONGO_CONFIG_DEBUG_BUILD << " size: " << getPhysicalSize() // slow - but can be useful when debugging #endif << ( res["shouldMigrate"].eoo() ? "" : (string)" (migrate suggested" + ( shouldBalance ? ")" : ", but no migrations allowed)" ) ); // Top chunk optimization - try to move the top chunk out of this shard // to prevent the hot spot from staying on a single shard. This is based on // the assumption that succeeding inserts will fall on the top chunk. BSONElement shouldMigrate = res["shouldMigrate"]; // not in mongod < 1.9.1 but that is ok if ( ! shouldMigrate.eoo() && shouldBalance ){ BSONObj range = shouldMigrate.embeddedObject(); ChunkType chunkToMove; chunkToMove.setShard(getShard().toString()); chunkToMove.setMin(range["min"].embeddedObject()); chunkToMove.setMax(range["max"].embeddedObject()); tryMoveToOtherShard(*_manager, chunkToMove); } return true; } catch ( DBException& e ) { // TODO: Make this better - there are lots of reasons a split could fail // Random so that we don't sync up with other failed splits _dataWritten = mkDataWritten(); // if the collection lock is taken (e.g. we're migrating), it is fine for the split to fail. warning() << "could not autosplit collection " << _manager->getns() << causedBy( e ); return false; } } long Chunk::getPhysicalSize() const { ScopedDbConnection conn(getShard().getConnString()); BSONObj result; uassert( 10169 , "datasize failed!" , conn->runCommand( "admin" , BSON( "datasize" << _manager->getns() << "keyPattern" << _manager->getShardKeyPattern().toBSON() << "min" << getMin() << "max" << getMax() << "maxSize" << ( MaxChunkSize + 1 ) << "estimate" << true ) , result ) ); conn.done(); return (long)result["size"].number(); } void Chunk::appendShortVersion( const char * name , BSONObjBuilder& b ) const { BSONObjBuilder bb( b.subobjStart( name ) ); bb.append(ChunkType::min(), _min); bb.append(ChunkType::max(), _max); bb.done(); } bool Chunk::operator==( const Chunk& s ) const { return _min.woCompare( s._min ) == 0 && _max.woCompare( s._max ) == 0; } void Chunk::serialize(BSONObjBuilder& to,ChunkVersion myLastMod) { to.append( "_id" , genID( _manager->getns() , _min ) ); if ( myLastMod.isSet() ) { myLastMod.addToBSON(to, ChunkType::DEPRECATED_lastmod()); } else if ( _lastmod.isSet() ) { _lastmod.addToBSON(to, ChunkType::DEPRECATED_lastmod()); } else { verify(0); } to << ChunkType::ns(_manager->getns()); to << ChunkType::min(_min); to << ChunkType::max(_max); to << ChunkType::shard(_shard.getName()); } string Chunk::genID() const { return genID(_manager->getns(), _min); } string Chunk::genID( const string& ns , const BSONObj& o ) { StringBuilder buf; buf << ns << "-"; BSONObjIterator i(o); while ( i.more() ) { BSONElement e = i.next(); buf << e.fieldName() << "_" << e.toString(false, true); } return buf.str(); } string Chunk::toString() const { stringstream ss; ss << ChunkType::ns() << ": " << _manager->getns() << ", " << ChunkType::shard() << ": " << _shard.toString() << ", " << ChunkType::DEPRECATED_lastmod() << ": " << _lastmod.toString() << ", " << ChunkType::min() << ": " << _min << ", " << ChunkType::max() << ": " << _max; return ss.str(); } void Chunk::markAsJumbo() const { // set this first // even if we can't set it in the db // at least this mongos won't try and keep moving _jumbo = true; Status result = grid.catalogManager()->update(ChunkType::ConfigNS, BSON(ChunkType::name(genID())), BSON("$set" << BSON(ChunkType::jumbo(true))), false, // upsert false, // multi NULL); if (!result.isOK()) { warning() << "couldn't set jumbo for chunk: " << genID() << result.reason(); } } void Chunk::refreshChunkSize() { auto chunkSizeSettingsResult = grid.catalogManager()->getGlobalSettings(SettingsType::ChunkSizeDocKey); if (!chunkSizeSettingsResult.isOK()) { log() << chunkSizeSettingsResult.getStatus(); return; } SettingsType chunkSizeSettings = chunkSizeSettingsResult.getValue(); string errMsg; if (!chunkSizeSettings.isValid(&errMsg)) { log() << errMsg; return; } int csize = chunkSizeSettings.getChunksize(); LOG(1) << "Refreshing MaxChunkSize: " << csize << "MB"; if (csize != Chunk::MaxChunkSize/(1024*1024)) { log() << "MaxChunkSize changing from " << Chunk::MaxChunkSize/(1024*1024) << "MB" << " to " << csize << "MB"; } if ( !setMaxChunkSizeSizeMB( csize ) ) { warning() << "invalid MaxChunkSize: " << csize; } } bool Chunk::setMaxChunkSizeSizeMB( int newMaxChunkSize ) { if ( newMaxChunkSize < 1 ) return false; if ( newMaxChunkSize > 1024 ) return false; MaxChunkSize = newMaxChunkSize * 1024 * 1024; return true; } } // namespace mongo