/** * Copyright (C) 2008-2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/s/chunk.h" #include "mongo/client/connpool.h" #include "mongo/client/dbclientcursor.h" #include "mongo/config.h" #include "mongo/db/lasterror.h" #include "mongo/db/query/query_solution.h" #include "mongo/db/write_concern.h" #include "mongo/db/write_concern_options.h" #include "mongo/platform/random.h" #include "mongo/s/balancer_policy.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_settings.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/config.h" #include "mongo/s/cursors.h" #include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/util/log.h" namespace mongo { using std::shared_ptr; using std::unique_ptr; using std::map; using std::ostringstream; using std::set; using std::string; using std::stringstream; using std::vector; namespace { const int kTooManySplitPoints = 4; /** * Attempts to move the given chunk to another shard. * * Returns true if the chunk was actually moved. */ bool tryMoveToOtherShard(const ChunkManager& manager, const ChunkType& chunk) { // reload sharding metadata before starting migration ChunkManagerPtr chunkMgr = manager.reload(false /* just reloaded in mulitsplit */); ShardInfoMap shardInfo; Status loadStatus = DistributionStatus::populateShardInfoMap(&shardInfo); if (!loadStatus.isOK()) { warning() << "failed to load shard metadata while trying to moveChunk after " << "auto-splitting" << causedBy(loadStatus); return false; } if (shardInfo.size() < 2) { LOG(0) << "no need to move top chunk since there's only 1 shard"; return false; } map> shardToChunkMap; DistributionStatus::populateShardToChunksMap(shardInfo, *chunkMgr, &shardToChunkMap); StatusWith tagStatus = grid.catalogManager()->getTagForChunk(manager.getns(), chunk); if (!tagStatus.isOK()) { warning() << "Not auto-moving chunk because of an error encountered while " << "checking tag for chunk: " << tagStatus.getStatus(); return false; } DistributionStatus chunkDistribution(shardInfo, shardToChunkMap); const string newLocation(chunkDistribution.getBestReceieverShard(tagStatus.getValue())); if (newLocation.empty()) { LOG(1) << "recently split chunk: " << chunk << " but no suitable shard to move to"; return false; } if (chunk.getShard() == newLocation) { // if this is the best shard, then we shouldn't do anything. LOG(1) << "recently split chunk: " << chunk << " already in the best shard"; return false; } ChunkPtr toMove = chunkMgr->findIntersectingChunk(chunk.getMin()); if (!(toMove->getMin() == chunk.getMin() && toMove->getMax() == chunk.getMax())) { LOG(1) << "recently split chunk: " << chunk << " modified before we could migrate " << toMove->toString(); return false; } log() << "moving chunk (auto): " << toMove->toString() << " to: " << newLocation; shared_ptr newShard = grid.shardRegistry()->getShard(newLocation); if (!newShard) { warning() << "Newly selected shard " << newLocation << " could not be found."; return false; } BSONObj res; WriteConcernOptions noThrottle; if (!toMove->moveAndCommit(newShard->getId(), Chunk::MaxChunkSize, &noThrottle, /* secondaryThrottle */ false, /* waitForDelete - small chunk, no need */ 0, /* maxTimeMS - don't time out */ res)) { msgassertedNoTrace(10412, str::stream() << "moveAndCommit failed: " << res); } // update our config manager.reload(); return true; } } // namespace long long Chunk::MaxChunkSize = 1024 * 1024 * 64; int Chunk::MaxObjectPerChunk = 250000; // Can be overridden from command line bool Chunk::ShouldAutoSplit = true; Chunk::Chunk(const ChunkManager* manager, BSONObj from) : _manager(manager), _lastmod(0, 0, OID()), _dataWritten(mkDataWritten()) { string ns = from.getStringField(ChunkType::ns().c_str()); _shardId = from.getStringField(ChunkType::shard().c_str()); _lastmod = ChunkVersion::fromBSON(from[ChunkType::DEPRECATED_lastmod()]); verify(_lastmod.isSet()); _min = from.getObjectField(ChunkType::min().c_str()).getOwned(); _max = from.getObjectField(ChunkType::max().c_str()).getOwned(); _jumbo = from[ChunkType::jumbo()].trueValue(); uassert(10170, "Chunk needs a ns", !ns.empty()); uassert(13327, "Chunk ns must match server ns", ns == _manager->getns()); { const auto shard = grid.shardRegistry()->getShard(_shardId); uassert(10171, "Chunk needs a server", shard); } uassert(10172, "Chunk needs a min", !_min.isEmpty()); uassert(10173, "Chunk needs a max", !_max.isEmpty()); } Chunk::Chunk(const ChunkManager* info, const BSONObj& min, const BSONObj& max, const ShardId& shardId, ChunkVersion lastmod) : _manager(info), _min(min), _max(max), _shardId(shardId), _lastmod(lastmod), _jumbo(false), _dataWritten(mkDataWritten()) {} int Chunk::mkDataWritten() { PseudoRandom r(static_cast(time(0))); return r.nextInt32(MaxChunkSize / ChunkManager::SplitHeuristics::splitTestFactor); } bool Chunk::containsKey(const BSONObj& shardKey) const { return getMin().woCompare(shardKey) <= 0 && shardKey.woCompare(getMax()) < 0; } bool ChunkRange::containsKey(const BSONObj& shardKey) const { // same as Chunk method return getMin().woCompare(shardKey) <= 0 && shardKey.woCompare(getMax()) < 0; } bool Chunk::_minIsInf() const { return 0 == _manager->getShardKeyPattern().getKeyPattern().globalMin().woCompare(getMin()); } bool Chunk::_maxIsInf() const { return 0 == _manager->getShardKeyPattern().getKeyPattern().globalMax().woCompare(getMax()); } BSONObj Chunk::_getExtremeKey(bool doSplitAtLower) const { Query q; if (doSplitAtLower) { q.sort(_manager->getShardKeyPattern().toBSON()); } else { // need to invert shard key pattern to sort backwards // TODO: make a helper in ShardKeyPattern? BSONObj k = _manager->getShardKeyPattern().toBSON(); BSONObjBuilder r; BSONObjIterator i(k); while (i.more()) { BSONElement e = i.next(); uassert(10163, "can only handle numbers here - which i think is correct", e.isNumber()); r.append(e.fieldName(), -1 * e.number()); } q.sort(r.obj()); } // find the extreme key ScopedDbConnection conn(_getShardConnectionString()); BSONObj end; if (doSplitAtLower) { // Splitting close to the lower bound means that the split point will be the // upper bound. Chunk range upper bounds are exclusive so skip a document to // make the lower half of the split end up with a single document. unique_ptr cursor = conn->query(_manager->getns(), q, 1, /* nToReturn */ 1 /* nToSkip */); if (cursor->more()) { end = cursor->next().getOwned(); } } else { end = conn->findOne(_manager->getns(), q); } conn.done(); if (end.isEmpty()) return BSONObj(); return _manager->getShardKeyPattern().extractShardKeyFromDoc(end); } void Chunk::pickMedianKey(BSONObj& medianKey) const { // Ask the mongod holding this chunk to figure out the split points. ScopedDbConnection conn(_getShardConnectionString()); BSONObj result; BSONObjBuilder cmd; cmd.append("splitVector", _manager->getns()); cmd.append("keyPattern", _manager->getShardKeyPattern().toBSON()); cmd.append("min", getMin()); cmd.append("max", getMax()); cmd.appendBool("force", true); BSONObj cmdObj = cmd.obj(); if (!conn->runCommand("admin", cmdObj, result)) { conn.done(); ostringstream os; os << "splitVector command (median key) failed: " << result; uassert(13503, os.str(), 0); } BSONObjIterator it(result.getObjectField("splitKeys")); if (it.more()) { medianKey = it.next().Obj().getOwned(); } conn.done(); } void Chunk::pickSplitVector(vector& splitPoints, long long chunkSize /* bytes */, int maxPoints, int maxObjs) const { // Ask the mongod holding this chunk to figure out the split points. ScopedDbConnection conn(_getShardConnectionString()); BSONObj result; BSONObjBuilder cmd; cmd.append("splitVector", _manager->getns()); cmd.append("keyPattern", _manager->getShardKeyPattern().toBSON()); cmd.append("min", getMin()); cmd.append("max", getMax()); cmd.append("maxChunkSizeBytes", chunkSize); cmd.append("maxSplitPoints", maxPoints); cmd.append("maxChunkObjects", maxObjs); BSONObj cmdObj = cmd.obj(); if (!conn->runCommand("admin", cmdObj, result)) { conn.done(); ostringstream os; os << "splitVector command failed: " << result; uassert(13345, os.str(), 0); } BSONObjIterator it(result.getObjectField("splitKeys")); while (it.more()) { splitPoints.push_back(it.next().Obj().getOwned()); } conn.done(); } void Chunk::determineSplitPoints(bool atMedian, vector* splitPoints) const { // if splitting is not obligatory we may return early if there are not enough data // we cap the number of objects that would fall in the first half (before the split point) // the rationale is we'll find a split point without traversing all the data if (atMedian) { BSONObj medianKey; pickMedianKey(medianKey); if (!medianKey.isEmpty()) splitPoints->push_back(medianKey); } else { long long chunkSize = _manager->getCurrentDesiredChunkSize(); // Note: One split point for every 1/2 chunk size. const int estNumSplitPoints = _dataWritten / chunkSize * 2; if (estNumSplitPoints >= kTooManySplitPoints) { // The current desired chunk size will split the chunk into lots of small chunks // (At the worst case, this can result into thousands of chunks); so check and // see if a bigger value can be used. chunkSize = std::min(_dataWritten, Chunk::MaxChunkSize); } pickSplitVector(*splitPoints, chunkSize, 0, MaxObjectPerChunk); if (splitPoints->size() <= 1) { // no split points means there isn't enough data to split on // 1 split point means we have between half the chunk size to full chunk size // so we shouldn't split splitPoints->clear(); } } } Status Chunk::split(SplitPointMode mode, size_t* resultingSplits, BSONObj* res) const { size_t dummy; if (resultingSplits == NULL) { resultingSplits = &dummy; } bool atMedian = mode == Chunk::atMedian; vector splitPoints; determineSplitPoints(atMedian, &splitPoints); if (splitPoints.empty()) { string msg; if (atMedian) { msg = "cannot find median in chunk, possibly empty"; } else { msg = "chunk not full enough to trigger auto-split"; } LOG(1) << msg; return Status(ErrorCodes::CannotSplit, msg); } // We assume that if the chunk being split is the first (or last) one on the collection, // this chunk is likely to see more insertions. Instead of splitting mid-chunk, we use // the very first (or last) key as a split point. // This heuristic is skipped for "special" shard key patterns that are not likely to // produce monotonically increasing or decreasing values (e.g. hashed shard keys). if (mode == Chunk::autoSplitInternal && KeyPattern::isOrderedKeyPattern(_manager->getShardKeyPattern().toBSON())) { if (_minIsInf()) { BSONObj key = _getExtremeKey(true); if (!key.isEmpty()) { splitPoints[0] = key.getOwned(); } } else if (_maxIsInf()) { BSONObj key = _getExtremeKey(false); if (!key.isEmpty()) { splitPoints.pop_back(); splitPoints.push_back(key); } } } // Normally, we'd have a sound split point here if the chunk is not empty. // It's also a good place to sanity check. if (_min == splitPoints.front()) { string msg(str::stream() << "not splitting chunk " << toString() << ", split point " << splitPoints.front() << " is exactly on chunk bounds"); log() << msg; return Status(ErrorCodes::CannotSplit, msg); } if (_max == splitPoints.back()) { string msg(str::stream() << "not splitting chunk " << toString() << ", split point " << splitPoints.back() << " is exactly on chunk bounds"); log() << msg; return Status(ErrorCodes::CannotSplit, msg); } Status status = multiSplit(splitPoints, res); *resultingSplits = splitPoints.size(); return status; } Status Chunk::multiSplit(const vector& m, BSONObj* res) const { const size_t maxSplitPoints = 8192; uassert(10165, "can't split as shard doesn't have a manager", _manager); uassert(13332, "need a split key to split chunk", !m.empty()); uassert(13333, "can't split a chunk in that many parts", m.size() < maxSplitPoints); uassert(13003, "can't split a chunk with only one distinct value", _min.woCompare(_max)); ScopedDbConnection conn(_getShardConnectionString()); BSONObjBuilder cmd; cmd.append("splitChunk", _manager->getns()); cmd.append("keyPattern", _manager->getShardKeyPattern().toBSON()); cmd.append("min", getMin()); cmd.append("max", getMax()); cmd.append("from", getShardId()); cmd.append("splitKeys", m); cmd.append("configdb", grid.catalogManager()->connectionString().toString()); cmd.append("epoch", _manager->getVersion().epoch()); BSONObj cmdObj = cmd.obj(); BSONObj dummy; if (res == NULL) { res = &dummy; } if (!conn->runCommand("admin", cmdObj, *res)) { string msg(str::stream() << "splitChunk failed - cmd: " << cmdObj << " result: " << *res); warning() << msg; conn.done(); return Status(ErrorCodes::SplitFailed, msg); } conn.done(); // force reload of config _manager->reload(); return Status::OK(); } bool Chunk::moveAndCommit(const ShardId& toShardId, long long chunkSize /* bytes */, const WriteConcernOptions* writeConcern, bool waitForDelete, int maxTimeMS, BSONObj& res) const { uassert(10167, "can't move shard to its current location!", getShardId() != toShardId); log() << "moving chunk ns: " << _manager->getns() << " moving ( " << toString() << ") " << getShardId() << " -> " << toShardId; const auto from = grid.shardRegistry()->getShard(getShardId()); BSONObjBuilder builder; builder.append("moveChunk", _manager->getns()); builder.append("from", from->getConnString().toString()); { const auto toShard = grid.shardRegistry()->getShard(toShardId); builder.append("to", toShard->getConnString().toString()); } // NEEDED FOR 2.0 COMPATIBILITY builder.append("fromShard", from->getId()); builder.append("toShard", toShardId); /////////////////////////////// builder.append("min", _min); builder.append("max", _max); builder.append("maxChunkSizeBytes", chunkSize); builder.append("configdb", grid.catalogManager()->connectionString().toString()); // For legacy secondary throttle setting. bool secondaryThrottle = true; if (writeConcern && writeConcern->wNumNodes <= 1 && writeConcern->wMode.empty()) { secondaryThrottle = false; } builder.append("secondaryThrottle", secondaryThrottle); if (secondaryThrottle && writeConcern) { builder.append("writeConcern", writeConcern->toBSON()); } builder.append("waitForDelete", waitForDelete); builder.append(LiteParsedQuery::cmdOptionMaxTimeMS, maxTimeMS); builder.append("epoch", _manager->getVersion().epoch()); ScopedDbConnection fromconn(from->getConnString()); bool worked = fromconn->runCommand("admin", builder.done(), res); fromconn.done(); LOG(worked ? 1 : 0) << "moveChunk result: " << res; // if succeeded, needs to reload to pick up the new location // if failed, mongos may be stale // reload is excessive here as the failure could be simply because collection metadata is taken _manager->reload(); return worked; } bool Chunk::splitIfShould(long dataWritten) const { dassert(ShouldAutoSplit); LastError::Disabled d(&LastError::get(cc())); try { _dataWritten += dataWritten; int splitThreshold = getManager()->getCurrentDesiredChunkSize(); if (_minIsInf() || _maxIsInf()) { splitThreshold = (int)((double)splitThreshold * .9); } if (_dataWritten < splitThreshold / ChunkManager::SplitHeuristics::splitTestFactor) return false; if (!getManager()->_splitHeuristics._splitTickets.tryAcquire()) { LOG(1) << "won't auto split because not enough tickets: " << getManager()->getns(); return false; } TicketHolderReleaser releaser(&(getManager()->_splitHeuristics._splitTickets)); // this is a bit ugly // we need it so that mongos blocks for the writes to actually be committed // this does mean mongos has more back pressure than mongod alone // since it nots 100% tcp queue bound // this was implicit before since we did a splitVector on the same socket ShardConnection::sync(); LOG(1) << "about to initiate autosplit: " << *this << " dataWritten: " << _dataWritten << " splitThreshold: " << splitThreshold; BSONObj res; size_t splitCount = 0; Status status = split(Chunk::autoSplitInternal, &splitCount, &res); if (!status.isOK()) { // Split would have issued a message if we got here. This means there wasn't enough // data to split, so don't want to try again until considerable more data _dataWritten = 0; return false; } if (_maxIsInf() || _minIsInf()) { // we don't want to reset _dataWritten since we kind of want to check the other side right away } else { // we're splitting, so should wait a bit _dataWritten = 0; } bool shouldBalance = grid.getConfigShouldBalance(); if (shouldBalance) { auto status = grid.catalogManager()->getCollection(_manager->getns()); if (!status.isOK()) { log() << "Auto-split for " << _manager->getns() << " failed to load collection metadata due to " << status.getStatus(); return false; } shouldBalance = status.getValue().getAllowBalance(); } log() << "autosplitted " << _manager->getns() << " shard: " << toString() << " into " << (splitCount + 1) << " (splitThreshold " << splitThreshold << ")" #ifdef MONGO_CONFIG_DEBUG_BUILD << " size: " << getPhysicalSize() // slow - but can be useful when debugging #endif << (res["shouldMigrate"].eoo() ? "" : (string) " (migrate suggested" + (shouldBalance ? ")" : ", but no migrations allowed)")); // Top chunk optimization - try to move the top chunk out of this shard // to prevent the hot spot from staying on a single shard. This is based on // the assumption that succeeding inserts will fall on the top chunk. BSONElement shouldMigrate = res["shouldMigrate"]; // not in mongod < 1.9.1 but that is ok if (!shouldMigrate.eoo() && shouldBalance) { BSONObj range = shouldMigrate.embeddedObject(); ChunkType chunkToMove; { const auto shard = grid.shardRegistry()->getShard(getShardId()); chunkToMove.setShard(shard->toString()); } chunkToMove.setMin(range["min"].embeddedObject()); chunkToMove.setMax(range["max"].embeddedObject()); tryMoveToOtherShard(*_manager, chunkToMove); } return true; } catch (DBException& e) { // TODO: Make this better - there are lots of reasons a split could fail // Random so that we don't sync up with other failed splits _dataWritten = mkDataWritten(); // if the collection lock is taken (e.g. we're migrating), it is fine for the split to fail. warning() << "could not autosplit collection " << _manager->getns() << causedBy(e); return false; } } const ConnectionString& Chunk::_getShardConnectionString() const { const auto shard = grid.shardRegistry()->getShard(getShardId()); return shard->getConnString(); } long Chunk::getPhysicalSize() const { ScopedDbConnection conn(_getShardConnectionString()); BSONObj result; uassert(10169, "datasize failed!", conn->runCommand("admin", BSON("datasize" << _manager->getns() << "keyPattern" << _manager->getShardKeyPattern().toBSON() << "min" << getMin() << "max" << getMax() << "maxSize" << (MaxChunkSize + 1) << "estimate" << true), result)); conn.done(); return (long)result["size"].number(); } void Chunk::appendShortVersion(const char* name, BSONObjBuilder& b) const { BSONObjBuilder bb(b.subobjStart(name)); bb.append(ChunkType::min(), _min); bb.append(ChunkType::max(), _max); bb.done(); } bool Chunk::operator==(const Chunk& s) const { return _min.woCompare(s._min) == 0 && _max.woCompare(s._max) == 0; } void Chunk::serialize(BSONObjBuilder& to, ChunkVersion myLastMod) { to.append("_id", genID(_manager->getns(), _min)); if (myLastMod.isSet()) { myLastMod.addToBSON(to, ChunkType::DEPRECATED_lastmod()); } else if (_lastmod.isSet()) { _lastmod.addToBSON(to, ChunkType::DEPRECATED_lastmod()); } else { verify(0); } to << ChunkType::ns(_manager->getns()); to << ChunkType::min(_min); to << ChunkType::max(_max); to << ChunkType::shard(_shardId); } string Chunk::genID() const { return genID(_manager->getns(), _min); } string Chunk::genID(const string& ns, const BSONObj& o) { StringBuilder buf; buf << ns << "-"; BSONObjIterator i(o); while (i.more()) { BSONElement e = i.next(); buf << e.fieldName() << "_" << e.toString(false, true); } return buf.str(); } string Chunk::toString() const { stringstream ss; ss << ChunkType::ns() << ": " << _manager->getns() << ", " << ChunkType::shard() << ": " << _shardId << ", " << ChunkType::DEPRECATED_lastmod() << ": " << _lastmod.toString() << ", " << ChunkType::min() << ": " << _min << ", " << ChunkType::max() << ": " << _max; return ss.str(); } void Chunk::markAsJumbo() const { // set this first // even if we can't set it in the db // at least this mongos won't try and keep moving _jumbo = true; Status result = grid.catalogManager()->update(ChunkType::ConfigNS, BSON(ChunkType::name(genID())), BSON("$set" << BSON(ChunkType::jumbo(true))), false, // upsert false, // multi NULL); if (!result.isOK()) { warning() << "couldn't set jumbo for chunk: " << genID() << result.reason(); } } void Chunk::refreshChunkSize() { auto chunkSizeSettingsResult = grid.catalogManager()->getGlobalSettings(SettingsType::ChunkSizeDocKey); if (!chunkSizeSettingsResult.isOK()) { log() << chunkSizeSettingsResult.getStatus(); return; } SettingsType chunkSizeSettings = chunkSizeSettingsResult.getValue(); int csize = chunkSizeSettings.getChunkSizeMB(); LOG(1) << "Refreshing MaxChunkSize: " << csize << "MB"; if (csize != Chunk::MaxChunkSize / (1024 * 1024)) { log() << "MaxChunkSize changing from " << Chunk::MaxChunkSize / (1024 * 1024) << "MB" << " to " << csize << "MB"; } if (!setMaxChunkSizeSizeMB(csize)) { warning() << "invalid MaxChunkSize: " << csize; } } bool Chunk::setMaxChunkSizeSizeMB(int newMaxChunkSize) { if (newMaxChunkSize < 1) return false; if (newMaxChunkSize > 1024) return false; MaxChunkSize = newMaxChunkSize * 1024 * 1024; return true; } } // namespace mongo