/** * Copyright (C) 2008-2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/s/chunk.h" #include "mongo/client/connpool.h" #include "mongo/db/commands.h" #include "mongo/db/lasterror.h" #include "mongo/platform/random.h" #include "mongo/s/balancer/balancer.h" #include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/shard_util.h" #include "mongo/s/sharding_raii.h" #include "mongo/util/log.h" namespace mongo { using std::shared_ptr; using std::unique_ptr; using std::map; using std::ostringstream; using std::set; using std::string; using std::stringstream; using std::vector; namespace { const uint64_t kTooManySplitPoints = 4; } // namespace Chunk::Chunk(OperationContext* txn, ChunkManager* manager, const ChunkType& from) : _manager(manager), _lastmod(from.getVersion()), _dataWritten(mkDataWritten()) { string ns = from.getNS(); _shardId = from.getShard(); verify(_lastmod.isSet()); _min = from.getMin().getOwned(); _max = from.getMax().getOwned(); _jumbo = from.getJumbo(); uassert(10170, "Chunk needs a ns", !ns.empty()); uassert(13327, "Chunk ns must match server ns", ns == _manager->getns()); uassert(10172, "Chunk needs a min", !_min.isEmpty()); uassert(10173, "Chunk needs a max", !_max.isEmpty()); uassert(10171, "Chunk needs a server", grid.shardRegistry()->getShard(txn, _shardId)); } Chunk::Chunk(ChunkManager* info, const BSONObj& min, const BSONObj& max, const ShardId& shardId, ChunkVersion lastmod, uint64_t initialDataWritten) : _manager(info), _min(min), _max(max), _shardId(shardId), _lastmod(lastmod), _jumbo(false), _dataWritten(initialDataWritten) {} int Chunk::mkDataWritten() { PseudoRandom r(static_cast(time(0))); return r.nextInt32(grid.getBalancerConfiguration()->getMaxChunkSizeBytes() / ChunkManager::SplitHeuristics::splitTestFactor); } bool Chunk::containsKey(const BSONObj& shardKey) const { return getMin().woCompare(shardKey) <= 0 && shardKey.woCompare(getMax()) < 0; } bool Chunk::_minIsInf() const { return 0 == _manager->getShardKeyPattern().getKeyPattern().globalMin().woCompare(getMin()); } bool Chunk::_maxIsInf() const { return 0 == _manager->getShardKeyPattern().getKeyPattern().globalMax().woCompare(getMax()); } BSONObj Chunk::_getExtremeKey(OperationContext* txn, bool doSplitAtLower) const { Query q; if (doSplitAtLower) { q.sort(_manager->getShardKeyPattern().toBSON()); } else { // need to invert shard key pattern to sort backwards // TODO: make a helper in ShardKeyPattern? BSONObj k = _manager->getShardKeyPattern().toBSON(); BSONObjBuilder r; BSONObjIterator i(k); while (i.more()) { BSONElement e = i.next(); uassert(10163, "can only handle numbers here - which i think is correct", e.isNumber()); r.append(e.fieldName(), -1 * e.number()); } q.sort(r.obj()); } // find the extreme key ScopedDbConnection conn(_getShardConnectionString(txn)); BSONObj end; if (doSplitAtLower) { // Splitting close to the lower bound means that the split point will be the // upper bound. Chunk range upper bounds are exclusive so skip a document to // make the lower half of the split end up with a single document. unique_ptr cursor = conn->query(_manager->getns(), q, 1, /* nToReturn */ 1 /* nToSkip */); uassert(28736, str::stream() << "failed to initialize cursor during auto split due to " << "connection problem with " << conn->getServerAddress(), cursor.get() != nullptr); if (cursor->more()) { end = cursor->next().getOwned(); } } else { end = conn->findOne(_manager->getns(), q); } conn.done(); if (end.isEmpty()) return BSONObj(); return _manager->getShardKeyPattern().extractShardKeyFromDoc(end); } std::vector Chunk::_determineSplitPoints(OperationContext* txn, bool atMedian) const { // If splitting is not obligatory we may return early if there are not enough data we cap the // number of objects that would fall in the first half (before the split point) the rationale is // we'll find a split point without traversing all the data. vector splitPoints; if (atMedian) { BSONObj medianKey = uassertStatusOK(shardutil::selectMedianKey(txn, _shardId, NamespaceString(_manager->getns()), _manager->getShardKeyPattern(), _min, _max)); if (!medianKey.isEmpty()) { splitPoints.push_back(medianKey); } } else { uint64_t chunkSize = _manager->getCurrentDesiredChunkSize(); // Note: One split point for every 1/2 chunk size. const uint64_t estNumSplitPoints = _dataWritten / chunkSize * 2; if (estNumSplitPoints >= kTooManySplitPoints) { // The current desired chunk size will split the chunk into lots of small chunk and at // the worst case this can result into thousands of chunks. So check and see if a bigger // value can be used. chunkSize = std::min( _dataWritten, Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes()); } splitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints(txn, _shardId, NamespaceString(_manager->getns()), _manager->getShardKeyPattern(), _min, _max, chunkSize, 0, MaxObjectPerChunk)); if (splitPoints.size() <= 1) { // No split points means there isn't enough data to split on 1 split point means we have // between half the chunk size to full chunk size so we shouldn't split. splitPoints.clear(); } } return splitPoints; } StatusWith> Chunk::split(OperationContext* txn, SplitPointMode mode, size_t* resultingSplits) const { size_t dummy; if (resultingSplits == NULL) { resultingSplits = &dummy; } bool atMedian = mode == Chunk::atMedian; vector splitPoints = _determineSplitPoints(txn, atMedian); if (splitPoints.empty()) { string msg; if (atMedian) { msg = "cannot find median in chunk, possibly empty"; } else { msg = "chunk not full enough to trigger auto-split"; } LOG(1) << msg; return Status(ErrorCodes::CannotSplit, msg); } // We assume that if the chunk being split is the first (or last) one on the collection, // this chunk is likely to see more insertions. Instead of splitting mid-chunk, we use // the very first (or last) key as a split point. // This heuristic is skipped for "special" shard key patterns that are not likely to // produce monotonically increasing or decreasing values (e.g. hashed shard keys). if (mode == Chunk::autoSplitInternal && KeyPattern::isOrderedKeyPattern(_manager->getShardKeyPattern().toBSON())) { if (_minIsInf()) { BSONObj key = _getExtremeKey(txn, true); if (!key.isEmpty()) { splitPoints[0] = key.getOwned(); } } else if (_maxIsInf()) { BSONObj key = _getExtremeKey(txn, false); if (!key.isEmpty()) { splitPoints.pop_back(); splitPoints.push_back(key); } } } // Normally, we'd have a sound split point here if the chunk is not empty. // It's also a good place to sanity check. if (_min == splitPoints.front()) { string msg(str::stream() << "not splitting chunk " << toString() << ", split point " << splitPoints.front() << " is exactly on chunk bounds"); log() << msg; return Status(ErrorCodes::CannotSplit, msg); } if (_max == splitPoints.back()) { string msg(str::stream() << "not splitting chunk " << toString() << ", split point " << splitPoints.back() << " is exactly on chunk bounds"); log() << msg; return Status(ErrorCodes::CannotSplit, msg); } auto splitStatus = shardutil::splitChunkAtMultiplePoints(txn, _shardId, NamespaceString(_manager->getns()), _manager->getShardKeyPattern(), _manager->getVersion(), _min, _max, splitPoints); if (!splitStatus.isOK()) { return splitStatus.getStatus(); } _manager->reload(txn); *resultingSplits = splitPoints.size(); return splitStatus.getValue(); } bool Chunk::splitIfShould(OperationContext* txn, long dataWritten) { LastError::Disabled d(&LastError::get(cc())); try { _dataWritten += dataWritten; uint64_t splitThreshold = _manager->getCurrentDesiredChunkSize(); if (_minIsInf() || _maxIsInf()) { splitThreshold = static_cast((double)splitThreshold * 0.9); } if (_dataWritten < splitThreshold / ChunkManager::SplitHeuristics::splitTestFactor) { return false; } if (!_manager->_splitHeuristics._splitTickets.tryAcquire()) { LOG(1) << "won't auto split because not enough tickets: " << _manager->getns(); return false; } TicketHolderReleaser releaser(&(_manager->_splitHeuristics._splitTickets)); LOG(1) << "about to initiate autosplit: " << *this << " dataWritten: " << _dataWritten << " splitThreshold: " << splitThreshold; size_t splitCount = 0; auto splitStatus = split(txn, Chunk::autoSplitInternal, &splitCount); if (!splitStatus.isOK()) { // Split would have issued a message if we got here. This means there wasn't enough // data to split, so don't want to try again until considerable more data _dataWritten = 0; return false; } if (_maxIsInf() || _minIsInf()) { // we don't want to reset _dataWritten since we kind of want to check the other side // right away } else { // we're splitting, so should wait a bit _dataWritten = 0; } const auto balancerConfig = Grid::get(txn)->getBalancerConfiguration(); Status refreshStatus = balancerConfig->refreshAndCheck(txn); if (!refreshStatus.isOK()) { warning() << "Unable to refresh balancer settings" << causedBy(refreshStatus); return false; } bool shouldBalance = balancerConfig->isBalancerActive(); if (shouldBalance) { auto collStatus = grid.catalogManager(txn)->getCollection(txn, _manager->getns()); if (!collStatus.isOK()) { warning() << "Auto-split for " << _manager->getns() << " failed to load collection metadata" << causedBy(collStatus.getStatus()); return false; } shouldBalance = collStatus.getValue().value.getAllowBalance(); } const auto suggestedMigrateChunk = std::move(splitStatus.getValue()); log() << "autosplitted " << _manager->getns() << " shard: " << toString() << " into " << (splitCount + 1) << " (splitThreshold " << splitThreshold << ")" << (suggestedMigrateChunk ? "" : (string) " (migrate suggested" + (shouldBalance ? ")" : ", but no migrations allowed)")); // Top chunk optimization - try to move the top chunk out of this shard to prevent the hot // spot from staying on a single shard. This is based on the assumption that succeeding // inserts will fall on the top chunk. if (suggestedMigrateChunk && shouldBalance) { const NamespaceString nss(_manager->getns()); // We need to use the latest chunk manager (after the split) in order to have the most // up-to-date view of the chunk we are about to move auto scopedCM = uassertStatusOK(ScopedChunkManager::getExisting(txn, nss)); auto suggestedChunk = scopedCM.cm()->findIntersectingChunk(txn, suggestedMigrateChunk->getMin()); ChunkType chunkToMove; chunkToMove.setNS(nss.ns()); chunkToMove.setShard(suggestedChunk->getShardId()); chunkToMove.setMin(suggestedChunk->getMin()); chunkToMove.setMax(suggestedChunk->getMax()); chunkToMove.setVersion(suggestedChunk->getLastmod()); Status rebalanceStatus = Balancer::get(txn)->rebalanceSingleChunk(txn, chunkToMove); if (!rebalanceStatus.isOK()) { msgassertedNoTraceWithStatus(10412, rebalanceStatus); } _manager->reload(txn); } return true; } catch (const DBException& e) { // TODO: Make this better - there are lots of reasons a split could fail // Random so that we don't sync up with other failed splits _dataWritten = mkDataWritten(); // if the collection lock is taken (e.g. we're migrating), it is fine for the split to fail. warning() << "could not autosplit collection " << _manager->getns() << causedBy(e); return false; } } ConnectionString Chunk::_getShardConnectionString(OperationContext* txn) const { const auto shard = grid.shardRegistry()->getShard(txn, getShardId()); return shard->getConnString(); } void Chunk::appendShortVersion(const char* name, BSONObjBuilder& b) const { BSONObjBuilder bb(b.subobjStart(name)); bb.append(ChunkType::min(), _min); bb.append(ChunkType::max(), _max); bb.done(); } bool Chunk::operator==(const Chunk& s) const { return _min.woCompare(s._min) == 0 && _max.woCompare(s._max) == 0; } string Chunk::toString() const { stringstream ss; ss << ChunkType::ns() << ": " << _manager->getns() << ", " << ChunkType::shard() << ": " << _shardId << ", " << ChunkType::DEPRECATED_lastmod() << ": " << _lastmod.toString() << ", " << ChunkType::min() << ": " << _min << ", " << ChunkType::max() << ": " << _max; return ss.str(); } void Chunk::markAsJumbo(OperationContext* txn) const { // set this first // even if we can't set it in the db // at least this mongos won't try and keep moving _jumbo = true; const string chunkName = ChunkType::genID(_manager->getns(), _min); auto status = grid.catalogManager(txn)->updateConfigDocument(txn, ChunkType::ConfigNS, BSON(ChunkType::name(chunkName)), BSON("$set" << BSON(ChunkType::jumbo(true))), false); if (!status.isOK()) { warning() << "couldn't set jumbo for chunk: " << chunkName << causedBy(status.getStatus()); } } } // namespace mongo