diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-01-26 14:57:32 -0500 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-01-30 16:43:13 -0500 |
commit | 3942d88af18cd7a2d9fff8ea3800f8c7769e5c9f (patch) | |
tree | b59c6fa5a7694b7b8a5d1a8c70d0752bc3236f66 /src/mongo/s/chunk.cpp | |
parent | 2922aea4adf8e8fe9bab3a5d6e986e7aecba7228 (diff) | |
download | mongo-3942d88af18cd7a2d9fff8ea3800f8c7769e5c9f.tar.gz |
SERVER-27809 Move Chunk::splitIfShould to cluster_write.h/.cpp
Diffstat (limited to 'src/mongo/s/chunk.cpp')
-rw-r--r-- | src/mongo/s/chunk.cpp | 369 |
1 files changed, 25 insertions, 344 deletions
diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp index 8ccf0250115..ff4341c3f7f 100644 --- a/src/mongo/s/chunk.cpp +++ b/src/mongo/s/chunk.cpp @@ -32,35 +32,17 @@ #include "mongo/s/chunk.h" -#include "mongo/bson/simple_bsonobj_comparator.h" -#include "mongo/client/connpool.h" -#include "mongo/db/lasterror.h" #include "mongo/platform/random.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/sharding_catalog_client.h" -#include "mongo/s/catalog/type_collection.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/config_server_client.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/grid.h" -#include "mongo/s/shard_util.h" -#include "mongo/s/sharding_raii.h" #include "mongo/util/log.h" namespace mongo { - -using std::shared_ptr; -using std::unique_ptr; -using std::map; -using std::ostringstream; -using std::set; -using std::string; -using std::stringstream; -using std::vector; - namespace { -const uint64_t kTooManySplitPoints = 4; - // Test whether we should split once data * splitTestFactor > chunkSize (approximately) const int splitTestFactor = 5; @@ -68,37 +50,14 @@ const int splitTestFactor = 5; * Generates a random value for _dataWritten so that a mongos restart wouldn't cause delay in * splitting. */ -int mkDataWritten() { +int64_t mkDataWritten() { PseudoRandom r(static_cast<int64_t>(time(0))); return r.nextInt32(grid.getBalancerConfiguration()->getMaxChunkSizeBytes() / splitTestFactor); } -void reloadChunkManager(OperationContext* txn, const std::string ns) { - const NamespaceString nss(ns); - auto config = uassertStatusOK(ScopedShardDatabase::getExisting(txn, nss.db())); - config.db()->getChunkManagerIfExists(txn, nss.ns(), true); -} - -uint64_t calculateDesiredChunkSize(uint64_t maxChunkSizeBytes, uint64_t numChunks) { - // Splitting faster in early chunks helps spread out an initial load better - const uint64_t minChunkSize = 1 << 20; // 1 MBytes - - if (numChunks <= 1) { - return 1024; - } else if (numChunks < 3) { - return minChunkSize / 2; - } else if (numChunks < 10) { - return std::max(maxChunkSizeBytes / 4, minChunkSize); - } else if (numChunks < 20) { - return std::max(maxChunkSizeBytes / 2, minChunkSize); - } else { - return maxChunkSizeBytes; - } -} - } // namespace -Chunk::Chunk(OperationContext* txn, ChunkManager* manager, const ChunkType& from) +Chunk::Chunk(ChunkManager* manager, const ChunkType& from) : _manager(manager), _min(from.getMin().getOwned()), _max(from.getMax().getOwned()), @@ -109,13 +68,13 @@ Chunk::Chunk(OperationContext* txn, ChunkManager* manager, const ChunkType& from invariantOK(from.validate()); } -Chunk::Chunk(ChunkManager* info, +Chunk::Chunk(ChunkManager* manager, const BSONObj& min, const BSONObj& max, const ShardId& shardId, ChunkVersion lastmod, uint64_t initialDataWritten) - : _manager(info), + : _manager(manager), _min(min), _max(max), _shardId(shardId), @@ -127,306 +86,28 @@ bool Chunk::containsKey(const BSONObj& shardKey) const { return getMin().woCompare(shardKey) <= 0 && shardKey.woCompare(getMax()) < 0; } -bool Chunk::_minIsInf() const { - return 0 == _manager->getShardKeyPattern().getKeyPattern().globalMin().woCompare(getMin()); -} - -bool Chunk::_maxIsInf() const { - return 0 == _manager->getShardKeyPattern().getKeyPattern().globalMax().woCompare(getMax()); -} - -BSONObj Chunk::_getExtremeKey(OperationContext* txn, bool doSplitAtLower) const { - Query q; - if (doSplitAtLower) { - q.sort(_manager->getShardKeyPattern().toBSON()); - } else { - // need to invert shard key pattern to sort backwards - // TODO: make a helper in ShardKeyPattern? - - BSONObj k = _manager->getShardKeyPattern().toBSON(); - BSONObjBuilder r; - - BSONObjIterator i(k); - while (i.more()) { - BSONElement e = i.next(); - uassert(10163, "can only handle numbers here - which i think is correct", e.isNumber()); - r.append(e.fieldName(), -1 * e.number()); - } - - q.sort(r.obj()); - } - - // find the extreme key - ScopedDbConnection conn(_getShardConnectionString(txn)); - BSONObj end; - - if (doSplitAtLower) { - // Splitting close to the lower bound means that the split point will be the - // upper bound. Chunk range upper bounds are exclusive so skip a document to - // make the lower half of the split end up with a single document. - unique_ptr<DBClientCursor> cursor = conn->query(_manager->getns(), - q, - 1, /* nToReturn */ - 1 /* nToSkip */); - - uassert(28736, - str::stream() << "failed to initialize cursor during auto split due to " - << "connection problem with " - << conn->getServerAddress(), - cursor.get() != nullptr); - - if (cursor->more()) { - end = cursor->next().getOwned(); - } - } else { - end = conn->findOne(_manager->getns(), q); - } - - conn.done(); - if (end.isEmpty()) - return BSONObj(); - return _manager->getShardKeyPattern().extractShardKeyFromDoc(end); -} - -std::vector<BSONObj> Chunk::_determineSplitPoints(OperationContext* txn) const { - // If splitting is not obligatory we may return early if there are not enough data we cap the - // number of objects that would fall in the first half (before the split point) the rationale is - // we'll find a split point without traversing all the data. - - uint64_t chunkSize = calculateDesiredChunkSize( - Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(), _manager->numChunks()); - - // Note: One split point for every 1/2 chunk size. - const uint64_t chunkBytesWritten = getBytesWritten(); - const uint64_t estNumSplitPoints = chunkBytesWritten / chunkSize * 2; - - if (estNumSplitPoints >= kTooManySplitPoints) { - // The current desired chunk size will split the chunk into lots of small chunk and at - // the worst case this can result into thousands of chunks. So check and see if a bigger - // value can be used. - chunkSize = std::min(chunkBytesWritten, - Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes()); - } - - vector<BSONObj> splitPoints = - uassertStatusOK(shardutil::selectChunkSplitPoints(txn, - _shardId, - NamespaceString(_manager->getns()), - _manager->getShardKeyPattern(), - ChunkRange(_min, _max), - chunkSize, - boost::none)); - if (splitPoints.size() <= 1) { - // No split points means there isn't enough data to split on 1 split point means we have - // between half the chunk size to full chunk size so we shouldn't split. - splitPoints.clear(); - } - - return splitPoints; -} - -StatusWith<boost::optional<ChunkRange>> Chunk::split(OperationContext* txn, - size_t* resultingSplits) const { - size_t dummy; - if (resultingSplits == NULL) { - resultingSplits = &dummy; - } - - vector<BSONObj> splitPoints = _determineSplitPoints(txn); - if (splitPoints.empty()) { - return {ErrorCodes::CannotSplit, "chunk not full enough to trigger auto-split"}; - } - - // We assume that if the chunk being split is the first (or last) one on the collection, - // this chunk is likely to see more insertions. Instead of splitting mid-chunk, we use - // the very first (or last) key as a split point. - // This heuristic is skipped for "special" shard key patterns that are not likely to - // produce monotonically increasing or decreasing values (e.g. hashed shard keys). - if (KeyPattern::isOrderedKeyPattern(_manager->getShardKeyPattern().toBSON())) { - if (_minIsInf()) { - BSONObj key = _getExtremeKey(txn, true); - if (!key.isEmpty()) { - splitPoints.front() = key.getOwned(); - } - } else if (_maxIsInf()) { - BSONObj key = _getExtremeKey(txn, false); - if (!key.isEmpty()) { - splitPoints.back() = key.getOwned(); - } - } - } - - auto splitStatus = shardutil::splitChunkAtMultiplePoints(txn, - _shardId, - NamespaceString(_manager->getns()), - _manager->getShardKeyPattern(), - _manager->getVersion(), - ChunkRange(_min, _max), - splitPoints); - if (!splitStatus.isOK()) { - return splitStatus.getStatus(); - } - - reloadChunkManager(txn, _manager->getns()); - - *resultingSplits = splitPoints.size(); - return splitStatus.getValue(); -} - uint64_t Chunk::getBytesWritten() const { return _dataWritten; } -void Chunk::addBytesWritten(uint64_t bytesWrittenIncrement) { +uint64_t Chunk::addBytesWritten(uint64_t bytesWrittenIncrement) { _dataWritten += bytesWrittenIncrement; + return _dataWritten; } -void Chunk::setBytesWritten(uint64_t newBytesWritten) { - _dataWritten = newBytesWritten; -} - -bool Chunk::splitIfShould(OperationContext* txn, long dataWritten) { - LastError::Disabled d(&LastError::get(cc())); - - addBytesWritten(dataWritten); - - const uint64_t chunkBytesWritten = getBytesWritten(); - - try { - uint64_t splitThreshold = calculateDesiredChunkSize( - Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(), - _manager->numChunks()); - - if (_minIsInf() || _maxIsInf()) { - splitThreshold = static_cast<uint64_t>((double)splitThreshold * 0.9); - } - - if (chunkBytesWritten < splitThreshold / splitTestFactor) { - return false; - } - - if (!_manager->_splitHeuristics._splitTickets.tryAcquire()) { - LOG(1) << "won't auto split because not enough tickets: " << _manager->getns(); - return false; - } - - TicketHolderReleaser releaser(&(_manager->_splitHeuristics._splitTickets)); - - const auto balancerConfig = Grid::get(txn)->getBalancerConfiguration(); - - Status refreshStatus = balancerConfig->refreshAndCheck(txn); - if (!refreshStatus.isOK()) { - warning() << "Unable to refresh balancer settings" << causedBy(refreshStatus); - return false; - } - - bool shouldAutoSplit = balancerConfig->getShouldAutoSplit(); - if (!shouldAutoSplit) { - return false; - } - - LOG(1) << "about to initiate autosplit: " << *this << " dataWritten: " << chunkBytesWritten - << " splitThreshold: " << splitThreshold; - - size_t splitCount = 0; - auto splitStatus = split(txn, &splitCount); - if (!splitStatus.isOK()) { - // Split would have issued a message if we got here. This means there wasn't enough data - // to split, so don't want to try again until we have considerably more data. - setBytesWritten(0); - return false; - } - - if (_maxIsInf() || _minIsInf()) { - // we don't want to reset _dataWritten since we kind of want to check the other side - // right away - } else { - // we're splitting, so should wait a bit - setBytesWritten(0); - } - - bool shouldBalance = balancerConfig->shouldBalanceForAutoSplit(); - - if (shouldBalance) { - auto collStatus = grid.catalogClient(txn)->getCollection(txn, _manager->getns()); - if (!collStatus.isOK()) { - warning() << "Auto-split for " << _manager->getns() - << " failed to load collection metadata" - << causedBy(collStatus.getStatus()); - return false; - } - - shouldBalance = collStatus.getValue().value.getAllowBalance(); - } - - const auto suggestedMigrateChunk = std::move(splitStatus.getValue()); - - log() << "autosplitted " << _manager->getns() << " shard: " << toString() << " into " - << (splitCount + 1) << " (splitThreshold " << splitThreshold << ")" - << (suggestedMigrateChunk ? "" : (string) " (migrate suggested" + - (shouldBalance ? ")" : ", but no migrations allowed)")); - - // Top chunk optimization - try to move the top chunk out of this shard to prevent the hot - // spot from staying on a single shard. This is based on the assumption that succeeding - // inserts will fall on the top chunk. - if (suggestedMigrateChunk && shouldBalance) { - const NamespaceString nss(_manager->getns()); - - // We need to use the latest chunk manager (after the split) in order to have the most - // up-to-date view of the chunk we are about to move - auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(txn, nss)); - auto suggestedChunk = scopedCM.cm()->findIntersectingChunkWithSimpleCollation( - txn, suggestedMigrateChunk->getMin()); - - ChunkType chunkToMove; - chunkToMove.setNS(nss.ns()); - chunkToMove.setShard(suggestedChunk->getShardId()); - chunkToMove.setMin(suggestedChunk->getMin()); - chunkToMove.setMax(suggestedChunk->getMax()); - chunkToMove.setVersion(suggestedChunk->getLastmod()); - - Status rebalanceStatus = configsvr_client::rebalanceChunk(txn, chunkToMove); - if (!rebalanceStatus.isOK()) { - msgassertedNoTraceWithStatus(10412, rebalanceStatus); - } - - reloadChunkManager(txn, _manager->getns()); - } - - return true; - } catch (const DBException& e) { - // TODO: Make this better - there are lots of reasons a split could fail - // Random so that we don't sync up with other failed splits - setBytesWritten(mkDataWritten()); - - // if the collection lock is taken (e.g. we're migrating), it is fine for the split to fail. - warning() << "could not autosplit collection " << _manager->getns() << causedBy(e); - return false; - } -} - -ConnectionString Chunk::_getShardConnectionString(OperationContext* txn) const { - const auto shard = uassertStatusOK(grid.shardRegistry()->getShard(txn, getShardId())); - return shard->getConnString(); -} - -void Chunk::appendShortVersion(const char* name, BSONObjBuilder& b) const { - BSONObjBuilder bb(b.subobjStart(name)); - bb.append(ChunkType::min(), _min); - bb.append(ChunkType::max(), _max); - bb.done(); +void Chunk::clearBytesWritten() { + _dataWritten = 0; } -bool Chunk::operator==(const Chunk& s) const { - return _min.woCompare(s._min) == 0 && _max.woCompare(s._max) == 0; +void Chunk::randomizeBytesWritten() { + _dataWritten = mkDataWritten(); } -string Chunk::toString() const { - stringstream ss; - ss << ChunkType::ns() << ": " << _manager->getns() << ", " << ChunkType::shard() << ": " - << _shardId << ", " << ChunkType::DEPRECATED_lastmod() << ": " << _lastmod.toString() << ", " - << ChunkType::min() << ": " << _min << ", " << ChunkType::max() << ": " << _max; - return ss.str(); +std::string Chunk::toString() const { + return str::stream() << ChunkType::shard() << ": " << _shardId << ", " + << ChunkType::DEPRECATED_lastmod() << ": " << _lastmod.toString() << ", " + << ChunkType::min() << ": " << _min << ", " << ChunkType::max() << ": " + << _max; } void Chunk::markAsJumbo(OperationContext* txn) const { @@ -437,15 +118,15 @@ void Chunk::markAsJumbo(OperationContext* txn) const { // at least this mongos won't try and keep moving _jumbo = true; - const string chunkName = ChunkType::genID(_manager->getns(), _min); + const std::string chunkName = ChunkType::genID(_manager->getns(), _min); - auto status = - grid.catalogClient(txn)->updateConfigDocument(txn, - ChunkType::ConfigNS, - BSON(ChunkType::name(chunkName)), - BSON("$set" << BSON(ChunkType::jumbo(true))), - false, - ShardingCatalogClient::kMajorityWriteConcern); + auto status = Grid::get(txn)->catalogClient(txn)->updateConfigDocument( + txn, + ChunkType::ConfigNS, + BSON(ChunkType::name(chunkName)), + BSON("$set" << BSON(ChunkType::jumbo(true))), + false, + ShardingCatalogClient::kMajorityWriteConcern); if (!status.isOK()) { warning() << "couldn't set jumbo for chunk: " << chunkName << causedBy(status.getStatus()); } |