diff options
-rw-r--r-- | src/mongo/s/chunk.cpp | 369 | ||||
-rw-r--r-- | src/mongo/s/chunk.h | 125 | ||||
-rw-r--r-- | src/mongo/s/chunk_manager.cpp | 18 | ||||
-rw-r--r-- | src/mongo/s/chunk_manager.h | 26 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_find_and_modify_cmd.cpp | 15 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_map_reduce_cmd.cpp | 3 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_merge_chunks_cmd.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_shard_collection_cmd.cpp | 115 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_write.cpp | 411 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_write.h | 17 |
10 files changed, 491 insertions, 612 deletions
diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp index 8ccf0250115..ff4341c3f7f 100644 --- a/src/mongo/s/chunk.cpp +++ b/src/mongo/s/chunk.cpp @@ -32,35 +32,17 @@ #include "mongo/s/chunk.h" -#include "mongo/bson/simple_bsonobj_comparator.h" -#include "mongo/client/connpool.h" -#include "mongo/db/lasterror.h" #include "mongo/platform/random.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/sharding_catalog_client.h" -#include "mongo/s/catalog/type_collection.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/config_server_client.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/grid.h" -#include "mongo/s/shard_util.h" -#include "mongo/s/sharding_raii.h" #include "mongo/util/log.h" namespace mongo { - -using std::shared_ptr; -using std::unique_ptr; -using std::map; -using std::ostringstream; -using std::set; -using std::string; -using std::stringstream; -using std::vector; - namespace { -const uint64_t kTooManySplitPoints = 4; - // Test whether we should split once data * splitTestFactor > chunkSize (approximately) const int splitTestFactor = 5; @@ -68,37 +50,14 @@ const int splitTestFactor = 5; * Generates a random value for _dataWritten so that a mongos restart wouldn't cause delay in * splitting. */ -int mkDataWritten() { +int64_t mkDataWritten() { PseudoRandom r(static_cast<int64_t>(time(0))); return r.nextInt32(grid.getBalancerConfiguration()->getMaxChunkSizeBytes() / splitTestFactor); } -void reloadChunkManager(OperationContext* txn, const std::string ns) { - const NamespaceString nss(ns); - auto config = uassertStatusOK(ScopedShardDatabase::getExisting(txn, nss.db())); - config.db()->getChunkManagerIfExists(txn, nss.ns(), true); -} - -uint64_t calculateDesiredChunkSize(uint64_t maxChunkSizeBytes, uint64_t numChunks) { - // Splitting faster in early chunks helps spread out an initial load better - const uint64_t minChunkSize = 1 << 20; // 1 MBytes - - if (numChunks <= 1) { - return 1024; - } else if (numChunks < 3) { - return minChunkSize / 2; - } else if (numChunks < 10) { - return std::max(maxChunkSizeBytes / 4, minChunkSize); - } else if (numChunks < 20) { - return std::max(maxChunkSizeBytes / 2, minChunkSize); - } else { - return maxChunkSizeBytes; - } -} - } // namespace -Chunk::Chunk(OperationContext* txn, ChunkManager* manager, const ChunkType& from) +Chunk::Chunk(ChunkManager* manager, const ChunkType& from) : _manager(manager), _min(from.getMin().getOwned()), _max(from.getMax().getOwned()), @@ -109,13 +68,13 @@ Chunk::Chunk(OperationContext* txn, ChunkManager* manager, const ChunkType& from invariantOK(from.validate()); } -Chunk::Chunk(ChunkManager* info, +Chunk::Chunk(ChunkManager* manager, const BSONObj& min, const BSONObj& max, const ShardId& shardId, ChunkVersion lastmod, uint64_t initialDataWritten) - : _manager(info), + : _manager(manager), _min(min), _max(max), _shardId(shardId), @@ -127,306 +86,28 @@ bool Chunk::containsKey(const BSONObj& shardKey) const { return getMin().woCompare(shardKey) <= 0 && shardKey.woCompare(getMax()) < 0; } -bool Chunk::_minIsInf() const { - return 0 == _manager->getShardKeyPattern().getKeyPattern().globalMin().woCompare(getMin()); -} - -bool Chunk::_maxIsInf() const { - return 0 == _manager->getShardKeyPattern().getKeyPattern().globalMax().woCompare(getMax()); -} - -BSONObj Chunk::_getExtremeKey(OperationContext* txn, bool doSplitAtLower) const { - Query q; - if (doSplitAtLower) { - q.sort(_manager->getShardKeyPattern().toBSON()); - } else { - // need to invert shard key pattern to sort backwards - // TODO: make a helper in ShardKeyPattern? - - BSONObj k = _manager->getShardKeyPattern().toBSON(); - BSONObjBuilder r; - - BSONObjIterator i(k); - while (i.more()) { - BSONElement e = i.next(); - uassert(10163, "can only handle numbers here - which i think is correct", e.isNumber()); - r.append(e.fieldName(), -1 * e.number()); - } - - q.sort(r.obj()); - } - - // find the extreme key - ScopedDbConnection conn(_getShardConnectionString(txn)); - BSONObj end; - - if (doSplitAtLower) { - // Splitting close to the lower bound means that the split point will be the - // upper bound. Chunk range upper bounds are exclusive so skip a document to - // make the lower half of the split end up with a single document. - unique_ptr<DBClientCursor> cursor = conn->query(_manager->getns(), - q, - 1, /* nToReturn */ - 1 /* nToSkip */); - - uassert(28736, - str::stream() << "failed to initialize cursor during auto split due to " - << "connection problem with " - << conn->getServerAddress(), - cursor.get() != nullptr); - - if (cursor->more()) { - end = cursor->next().getOwned(); - } - } else { - end = conn->findOne(_manager->getns(), q); - } - - conn.done(); - if (end.isEmpty()) - return BSONObj(); - return _manager->getShardKeyPattern().extractShardKeyFromDoc(end); -} - -std::vector<BSONObj> Chunk::_determineSplitPoints(OperationContext* txn) const { - // If splitting is not obligatory we may return early if there are not enough data we cap the - // number of objects that would fall in the first half (before the split point) the rationale is - // we'll find a split point without traversing all the data. - - uint64_t chunkSize = calculateDesiredChunkSize( - Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(), _manager->numChunks()); - - // Note: One split point for every 1/2 chunk size. - const uint64_t chunkBytesWritten = getBytesWritten(); - const uint64_t estNumSplitPoints = chunkBytesWritten / chunkSize * 2; - - if (estNumSplitPoints >= kTooManySplitPoints) { - // The current desired chunk size will split the chunk into lots of small chunk and at - // the worst case this can result into thousands of chunks. So check and see if a bigger - // value can be used. - chunkSize = std::min(chunkBytesWritten, - Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes()); - } - - vector<BSONObj> splitPoints = - uassertStatusOK(shardutil::selectChunkSplitPoints(txn, - _shardId, - NamespaceString(_manager->getns()), - _manager->getShardKeyPattern(), - ChunkRange(_min, _max), - chunkSize, - boost::none)); - if (splitPoints.size() <= 1) { - // No split points means there isn't enough data to split on 1 split point means we have - // between half the chunk size to full chunk size so we shouldn't split. - splitPoints.clear(); - } - - return splitPoints; -} - -StatusWith<boost::optional<ChunkRange>> Chunk::split(OperationContext* txn, - size_t* resultingSplits) const { - size_t dummy; - if (resultingSplits == NULL) { - resultingSplits = &dummy; - } - - vector<BSONObj> splitPoints = _determineSplitPoints(txn); - if (splitPoints.empty()) { - return {ErrorCodes::CannotSplit, "chunk not full enough to trigger auto-split"}; - } - - // We assume that if the chunk being split is the first (or last) one on the collection, - // this chunk is likely to see more insertions. Instead of splitting mid-chunk, we use - // the very first (or last) key as a split point. - // This heuristic is skipped for "special" shard key patterns that are not likely to - // produce monotonically increasing or decreasing values (e.g. hashed shard keys). - if (KeyPattern::isOrderedKeyPattern(_manager->getShardKeyPattern().toBSON())) { - if (_minIsInf()) { - BSONObj key = _getExtremeKey(txn, true); - if (!key.isEmpty()) { - splitPoints.front() = key.getOwned(); - } - } else if (_maxIsInf()) { - BSONObj key = _getExtremeKey(txn, false); - if (!key.isEmpty()) { - splitPoints.back() = key.getOwned(); - } - } - } - - auto splitStatus = shardutil::splitChunkAtMultiplePoints(txn, - _shardId, - NamespaceString(_manager->getns()), - _manager->getShardKeyPattern(), - _manager->getVersion(), - ChunkRange(_min, _max), - splitPoints); - if (!splitStatus.isOK()) { - return splitStatus.getStatus(); - } - - reloadChunkManager(txn, _manager->getns()); - - *resultingSplits = splitPoints.size(); - return splitStatus.getValue(); -} - uint64_t Chunk::getBytesWritten() const { return _dataWritten; } -void Chunk::addBytesWritten(uint64_t bytesWrittenIncrement) { +uint64_t Chunk::addBytesWritten(uint64_t bytesWrittenIncrement) { _dataWritten += bytesWrittenIncrement; + return _dataWritten; } -void Chunk::setBytesWritten(uint64_t newBytesWritten) { - _dataWritten = newBytesWritten; -} - -bool Chunk::splitIfShould(OperationContext* txn, long dataWritten) { - LastError::Disabled d(&LastError::get(cc())); - - addBytesWritten(dataWritten); - - const uint64_t chunkBytesWritten = getBytesWritten(); - - try { - uint64_t splitThreshold = calculateDesiredChunkSize( - Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(), - _manager->numChunks()); - - if (_minIsInf() || _maxIsInf()) { - splitThreshold = static_cast<uint64_t>((double)splitThreshold * 0.9); - } - - if (chunkBytesWritten < splitThreshold / splitTestFactor) { - return false; - } - - if (!_manager->_splitHeuristics._splitTickets.tryAcquire()) { - LOG(1) << "won't auto split because not enough tickets: " << _manager->getns(); - return false; - } - - TicketHolderReleaser releaser(&(_manager->_splitHeuristics._splitTickets)); - - const auto balancerConfig = Grid::get(txn)->getBalancerConfiguration(); - - Status refreshStatus = balancerConfig->refreshAndCheck(txn); - if (!refreshStatus.isOK()) { - warning() << "Unable to refresh balancer settings" << causedBy(refreshStatus); - return false; - } - - bool shouldAutoSplit = balancerConfig->getShouldAutoSplit(); - if (!shouldAutoSplit) { - return false; - } - - LOG(1) << "about to initiate autosplit: " << *this << " dataWritten: " << chunkBytesWritten - << " splitThreshold: " << splitThreshold; - - size_t splitCount = 0; - auto splitStatus = split(txn, &splitCount); - if (!splitStatus.isOK()) { - // Split would have issued a message if we got here. This means there wasn't enough data - // to split, so don't want to try again until we have considerably more data. - setBytesWritten(0); - return false; - } - - if (_maxIsInf() || _minIsInf()) { - // we don't want to reset _dataWritten since we kind of want to check the other side - // right away - } else { - // we're splitting, so should wait a bit - setBytesWritten(0); - } - - bool shouldBalance = balancerConfig->shouldBalanceForAutoSplit(); - - if (shouldBalance) { - auto collStatus = grid.catalogClient(txn)->getCollection(txn, _manager->getns()); - if (!collStatus.isOK()) { - warning() << "Auto-split for " << _manager->getns() - << " failed to load collection metadata" - << causedBy(collStatus.getStatus()); - return false; - } - - shouldBalance = collStatus.getValue().value.getAllowBalance(); - } - - const auto suggestedMigrateChunk = std::move(splitStatus.getValue()); - - log() << "autosplitted " << _manager->getns() << " shard: " << toString() << " into " - << (splitCount + 1) << " (splitThreshold " << splitThreshold << ")" - << (suggestedMigrateChunk ? "" : (string) " (migrate suggested" + - (shouldBalance ? ")" : ", but no migrations allowed)")); - - // Top chunk optimization - try to move the top chunk out of this shard to prevent the hot - // spot from staying on a single shard. This is based on the assumption that succeeding - // inserts will fall on the top chunk. - if (suggestedMigrateChunk && shouldBalance) { - const NamespaceString nss(_manager->getns()); - - // We need to use the latest chunk manager (after the split) in order to have the most - // up-to-date view of the chunk we are about to move - auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(txn, nss)); - auto suggestedChunk = scopedCM.cm()->findIntersectingChunkWithSimpleCollation( - txn, suggestedMigrateChunk->getMin()); - - ChunkType chunkToMove; - chunkToMove.setNS(nss.ns()); - chunkToMove.setShard(suggestedChunk->getShardId()); - chunkToMove.setMin(suggestedChunk->getMin()); - chunkToMove.setMax(suggestedChunk->getMax()); - chunkToMove.setVersion(suggestedChunk->getLastmod()); - - Status rebalanceStatus = configsvr_client::rebalanceChunk(txn, chunkToMove); - if (!rebalanceStatus.isOK()) { - msgassertedNoTraceWithStatus(10412, rebalanceStatus); - } - - reloadChunkManager(txn, _manager->getns()); - } - - return true; - } catch (const DBException& e) { - // TODO: Make this better - there are lots of reasons a split could fail - // Random so that we don't sync up with other failed splits - setBytesWritten(mkDataWritten()); - - // if the collection lock is taken (e.g. we're migrating), it is fine for the split to fail. - warning() << "could not autosplit collection " << _manager->getns() << causedBy(e); - return false; - } -} - -ConnectionString Chunk::_getShardConnectionString(OperationContext* txn) const { - const auto shard = uassertStatusOK(grid.shardRegistry()->getShard(txn, getShardId())); - return shard->getConnString(); -} - -void Chunk::appendShortVersion(const char* name, BSONObjBuilder& b) const { - BSONObjBuilder bb(b.subobjStart(name)); - bb.append(ChunkType::min(), _min); - bb.append(ChunkType::max(), _max); - bb.done(); +void Chunk::clearBytesWritten() { + _dataWritten = 0; } -bool Chunk::operator==(const Chunk& s) const { - return _min.woCompare(s._min) == 0 && _max.woCompare(s._max) == 0; +void Chunk::randomizeBytesWritten() { + _dataWritten = mkDataWritten(); } -string Chunk::toString() const { - stringstream ss; - ss << ChunkType::ns() << ": " << _manager->getns() << ", " << ChunkType::shard() << ": " - << _shardId << ", " << ChunkType::DEPRECATED_lastmod() << ": " << _lastmod.toString() << ", " - << ChunkType::min() << ": " << _min << ", " << ChunkType::max() << ": " << _max; - return ss.str(); +std::string Chunk::toString() const { + return str::stream() << ChunkType::shard() << ": " << _shardId << ", " + << ChunkType::DEPRECATED_lastmod() << ": " << _lastmod.toString() << ", " + << ChunkType::min() << ": " << _min << ", " << ChunkType::max() << ": " + << _max; } void Chunk::markAsJumbo(OperationContext* txn) const { @@ -437,15 +118,15 @@ void Chunk::markAsJumbo(OperationContext* txn) const { // at least this mongos won't try and keep moving _jumbo = true; - const string chunkName = ChunkType::genID(_manager->getns(), _min); + const std::string chunkName = ChunkType::genID(_manager->getns(), _min); - auto status = - grid.catalogClient(txn)->updateConfigDocument(txn, - ChunkType::ConfigNS, - BSON(ChunkType::name(chunkName)), - BSON("$set" << BSON(ChunkType::jumbo(true))), - false, - ShardingCatalogClient::kMajorityWriteConcern); + auto status = Grid::get(txn)->catalogClient(txn)->updateConfigDocument( + txn, + ChunkType::ConfigNS, + BSON(ChunkType::name(chunkName)), + BSON("$set" << BSON(ChunkType::jumbo(true))), + false, + ShardingCatalogClient::kMajorityWriteConcern); if (!status.isOK()) { warning() << "couldn't set jumbo for chunk: " << chunkName << causedBy(status.getStatus()); } diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h index dd97c751b87..4c107f0d2c2 100644 --- a/src/mongo/s/chunk.h +++ b/src/mongo/s/chunk.h @@ -28,11 +28,9 @@ #pragma once -#include <boost/optional.hpp> - -#include "mongo/s/catalog/type_chunk.h" +#include "mongo/base/disallow_copying.h" #include "mongo/s/chunk_version.h" -#include "mongo/s/client/shard.h" +#include "mongo/s/shard_id.h" namespace mongo { @@ -41,17 +39,13 @@ class ChunkType; class OperationContext; /** - config.chunks - { ns : "alleyinsider.fs.chunks" , min : {} , max : {} , server : "localhost:30001" } - - x is in a shard iff - min <= x < max + * Represents a cache entry for a single Chunk. Owned by a ChunkManager. */ class Chunk { MONGO_DISALLOW_COPYING(Chunk); public: - Chunk(OperationContext* txn, ChunkManager* manager, const ChunkType& from); + Chunk(ChunkManager* manager, const ChunkType& from); Chunk(ChunkManager* manager, const BSONObj& min, @@ -60,10 +54,6 @@ public: ChunkVersion lastmod, uint64_t initialDataWritten); - // - // chunk boundary support - // - const BSONObj& getMin() const { return _min; } @@ -72,53 +62,36 @@ public: return _max; } - // Returns true if this chunk contains the given shard key, and false otherwise - // - // Note: this function takes an extracted *key*, not an original document - // (the point may be computed by, say, hashing a given field or projecting - // to a subset of fields). - bool containsKey(const BSONObj& shardKey) const; - - // - // chunk version support - // - - void appendShortVersion(const char* name, BSONObjBuilder& b) const; + const ShardId& getShardId() const { + return _shardId; + } ChunkVersion getLastmod() const { return _lastmod; } - // - // split support - // + bool isJumbo() const { + return _jumbo; + } /** - * Get/increment/set the estimation of how much data was written for this chunk. + * Returns a string represenation of the chunk for logging. */ - uint64_t getBytesWritten() const; - void addBytesWritten(uint64_t bytesWrittenIncrement); - void setBytesWritten(uint64_t newBytesWritten); + std::string toString() const; - /** - * if the amount of data written nears the max size of a shard - * then we check the real size, and if its too big, we split - * @return if something was split - */ - bool splitIfShould(OperationContext* txn, long dataWritten); + // Returns true if this chunk contains the given shard key, and false otherwise + // + // Note: this function takes an extracted *key*, not an original document (the point may be + // computed by, say, hashing a given field or projecting to a subset of fields). + bool containsKey(const BSONObj& shardKey) const; /** - * Splits this chunk at a non-specificed split key to be chosen by the - * mongod holding this chunk. - * - * @param mode - * @param res the object containing details about the split execution - * @param resultingSplits the number of resulting split points. Set to NULL to ignore. - * - * @throws UserException + * Get/increment/set the estimation of how much data was written for this chunk. */ - StatusWith<boost::optional<ChunkRange>> split(OperationContext* txn, - size_t* resultingSplits) const; + uint64_t getBytesWritten() const; + uint64_t addBytesWritten(uint64_t bytesWrittenIncrement); + void clearBytesWritten(); + void randomizeBytesWritten(); /** * marks this chunk as a jumbo chunk @@ -126,40 +99,7 @@ public: */ void markAsJumbo(OperationContext* txn) const; - bool isJumbo() const { - return _jumbo; - } - - // - // accessors and helpers - // - - std::string toString() const; - - friend std::ostream& operator<<(std::ostream& out, const Chunk& c) { - return (out << c.toString()); - } - - // chunk equality is determined by comparing the min and max bounds of the chunk - bool operator==(const Chunk& s) const; - bool operator!=(const Chunk& s) const { - return !(*this == s); - } - - ShardId getShardId() const { - return _shardId; - } - private: - /** - * Returns the connection string for the shard on which this chunk resides. - */ - ConnectionString _getShardConnectionString(OperationContext* txn) const; - - // if min/max key is pos/neg infinity - bool _minIsInf() const; - bool _maxIsInf() const; - // The chunk manager, which owns this chunk. Not owned by the chunk. const ChunkManager* _manager; @@ -177,27 +117,6 @@ private: // Statistics for the approximate data written to this chunk mutable uint64_t _dataWritten; - - /** - * Returns the split point that will result in one of the chunk having exactly one - * document. Also returns an empty document if the split point cannot be determined. - * - * @param doSplitAtLower determines which side of the split will have exactly one document. - * True means that the split point chosen will be closer to the lower bound. - * - * Warning: this assumes that the shard key is not "special"- that is, the shardKeyPattern - * is simply an ordered list of ascending/descending field names. Examples: - * {a : 1, b : -1} is not special. {a : "hashed"} is. - */ - BSONObj _getExtremeKey(OperationContext* txn, bool doSplitAtLower) const; - - /** - * Determines the appropriate split points for this chunk. - * - * @param atMedian perform a single split at the middle of this chunk. - * @param splitPoints out parameter containing the chosen split points. Can be empty. - */ - std::vector<BSONObj> _determineSplitPoints(OperationContext* txn) const; }; } // namespace mongo diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp index 2d466bbb07b..324be082da5 100644 --- a/src/mongo/s/chunk_manager.cpp +++ b/src/mongo/s/chunk_manager.cpp @@ -53,7 +53,6 @@ #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_collection.h" -#include "mongo/s/chunk.h" #include "mongo/s/chunk_diff.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/config.h" @@ -64,7 +63,6 @@ namespace mongo { -using std::make_pair; using std::map; using std::pair; using std::set; @@ -75,6 +73,9 @@ using std::vector; namespace { +// Used to generate sequence numbers to assign to each newly created ChunkManager +AtomicUInt32 nextCMSequenceNumber(0); + /** * This is an adapter so we can use config diffs - mongos and mongod do them slightly differently. * @@ -101,12 +102,11 @@ public: pair<BSONObj, shared_ptr<Chunk>> rangeFor(OperationContext* txn, const ChunkType& chunk) const final { - shared_ptr<Chunk> c(new Chunk(txn, _manager, chunk)); - return make_pair(chunk.getMax(), c); + return std::make_pair(chunk.getMax(), std::make_shared<Chunk>(_manager, chunk)); } ShardId shardFor(OperationContext* txn, const ShardId& shardId) const final { - const auto shard = uassertStatusOK(grid.shardRegistry()->getShard(txn, shardId)); + const auto shard = uassertStatusOK(Grid::get(txn)->shardRegistry()->getShard(txn, shardId)); return shard->getId(); } @@ -166,8 +166,6 @@ bool isChunkMapValid(const ChunkMap& chunkMap) { } // namespace -AtomicUInt32 ChunkManager::NextSequenceNumber(1U); - ChunkManager::ChunkManager(const string& ns, const ShardKeyPattern& pattern, std::unique_ptr<CollatorInterface> defaultCollator, @@ -176,7 +174,7 @@ ChunkManager::ChunkManager(const string& ns, _keyPattern(pattern.getKeyPattern()), _defaultCollator(std::move(defaultCollator)), _unique(unique), - _sequenceNumber(NextSequenceNumber.addAndFetch(1)), + _sequenceNumber(nextCMSequenceNumber.addAndFetch(1)), _chunkMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<std::shared_ptr<Chunk>>()), _chunkRangeMap( SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<ShardAndChunkRange>()) {} @@ -185,7 +183,7 @@ ChunkManager::ChunkManager(OperationContext* txn, const CollectionType& coll) : _ns(coll.getNs().ns()), _keyPattern(coll.getKeyPattern()), _unique(coll.getUnique()), - _sequenceNumber(NextSequenceNumber.addAndFetch(1)), + _sequenceNumber(nextCMSequenceNumber.addAndFetch(1)), _chunkMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<std::shared_ptr<Chunk>>()), _chunkRangeMap( SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<ShardAndChunkRange>()) { @@ -278,7 +276,7 @@ bool ChunkManager::_load(OperationContext* txn, oldC->getLastmod(), oldC->getBytesWritten())); - chunkMap.insert(make_pair(oldC->getMax(), newC)); + chunkMap.insert(std::make_pair(oldC->getMax(), newC)); } LOG(2) << "loading chunk manager for collection " << _ns diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h index cbb0b6426ef..f809c6167c6 100644 --- a/src/mongo/s/chunk_manager.h +++ b/src/mongo/s/chunk_manager.h @@ -33,6 +33,7 @@ #include <string> #include <vector> +#include "mongo/base/disallow_copying.h" #include "mongo/db/query/collation/collator_interface.h" #include "mongo/db/repl/optime.h" #include "mongo/s/catalog/type_chunk.h" @@ -45,7 +46,6 @@ namespace mongo { class CanonicalQuery; -class Chunk; class CollectionType; struct QuerySolutionNode; class OperationContext; @@ -54,6 +54,8 @@ class OperationContext; typedef BSONObjIndexedMap<std::shared_ptr<Chunk>> ChunkMap; class ChunkManager { + MONGO_DISALLOW_COPYING(ChunkManager); + public: typedef std::map<ShardId, ChunkVersion> ShardVersionMap; @@ -270,27 +272,23 @@ private: // OpTime of config server the last time chunks were loaded. repl::OpTime _configOpTime; - // - // Split Heuristic info - // - class SplitHeuristics { + // Auto-split throttling state + struct AutoSplitThrottle { public: - SplitHeuristics() : _splitTickets(maxParallelSplits) {} + AutoSplitThrottle() : _splitTickets(maxParallelSplits) {} TicketHolder _splitTickets; // Maximum number of parallel threads requesting a split static const int maxParallelSplits = 5; - }; - mutable SplitHeuristics _splitHeuristics; - - // - // End split heuristics - // + } _autoSplitThrottle; - friend class Chunk; - static AtomicUInt32 NextSequenceNumber; + // This function needs to be able to access the auto-split throttle + friend void updateChunkWriteStatsAndSplitIfNeeded(OperationContext*, + ChunkManager*, + Chunk*, + long); friend class TestableChunkManager; }; diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index e47aa1633f9..14cd8676917 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -44,6 +44,7 @@ #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_explain.h" +#include "mongo/s/commands/cluster_write.h" #include "mongo/s/commands/sharded_command_processing.h" #include "mongo/s/commands/strategy.h" #include "mongo/s/grid.h" @@ -207,19 +208,19 @@ public: } BSONObj shardKey = status.getValue(); - auto chunk = chunkMgr->findIntersectingChunk(txn, shardKey, collation); - - if (!chunk.isOK()) { + auto chunkStatus = chunkMgr->findIntersectingChunk(txn, shardKey, collation); + if (!chunkStatus.isOK()) { uasserted(ErrorCodes::ShardKeyNotFound, "findAndModify must target a single shard, but was not able to due to " "non-simple collation"); } - bool ok = - _runCommand(txn, conf, chunkMgr, chunk.getValue()->getShardId(), nss, cmdObj, result); + const auto& chunk = chunkStatus.getValue(); + + const bool ok = _runCommand(txn, conf, chunkMgr, chunk->getShardId(), nss, cmdObj, result); if (ok) { - // check whether split is necessary (using update object for size heuristic) - chunk.getValue()->splitIfShould(txn, cmdObj.getObjectField("update").objsize()); + updateChunkWriteStatsAndSplitIfNeeded( + txn, chunkMgr.get(), chunk.get(), cmdObj.getObjectField("update").objsize()); } return ok; diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp index be4b489ff39..232aefa5bd2 100644 --- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp @@ -49,6 +49,7 @@ #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_common.h" +#include "mongo/s/commands/cluster_write.h" #include "mongo/s/commands/sharded_command_processing.h" #include "mongo/s/commands/strategy.h" #include "mongo/s/config.h" @@ -614,7 +615,7 @@ public: warning() << "Mongod reported " << size << " bytes inserted for key " << key << " but can't find chunk"; } else { - c->splitIfShould(txn, size); + updateChunkWriteStatsAndSplitIfNeeded(txn, cm.get(), c.get(), size); } } } diff --git a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp index 26dde5698c9..7dd1aa79852 100644 --- a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp +++ b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp @@ -159,8 +159,8 @@ public: remoteCmdObjB.append( ClusterMergeChunksCommand::configField(), Grid::get(txn)->shardRegistry()->getConfigServerConnectionString().toString()); - remoteCmdObjB.append(ClusterMergeChunksCommand::shardNameField(), firstChunk->getShardId()); - + remoteCmdObjB.append(ClusterMergeChunksCommand::shardNameField(), + firstChunk->getShardId().toString()); BSONObj remoteResult; diff --git a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp index 508642e1117..a7bf3060752 100644 --- a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp +++ b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp @@ -46,6 +46,7 @@ #include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/hasher.h" +#include "mongo/db/index/index_descriptor.h" #include "mongo/db/operation_context.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/write_concern_options.h" @@ -64,14 +65,79 @@ #include "mongo/util/log.h" namespace mongo { +namespace { -using std::shared_ptr; -using std::list; -using std::set; -using std::string; -using std::vector; +/** + * Constructs the BSON specification document for the given namespace, index key and options. + */ +BSONObj createIndexDoc(const std::string& ns, + const BSONObj& keys, + const BSONObj& collation, + bool unique) { + BSONObjBuilder indexDoc; + indexDoc.append("ns", ns); + indexDoc.append("key", keys); + + StringBuilder indexName; + + bool isFirstKey = true; + for (BSONObjIterator keyIter(keys); keyIter.more();) { + BSONElement currentKey = keyIter.next(); + + if (isFirstKey) { + isFirstKey = false; + } else { + indexName << "_"; + } -namespace { + indexName << currentKey.fieldName() << "_"; + if (currentKey.isNumber()) { + indexName << currentKey.numberInt(); + } else { + indexName << currentKey.str(); // this should match up with shell command + } + } + + indexDoc.append("name", indexName.str()); + + if (!collation.isEmpty()) { + // Creating an index with the "collation" option requires a v=2 index. + indexDoc.append("v", static_cast<int>(IndexDescriptor::IndexVersion::kV2)); + indexDoc.append("collation", collation); + } + + if (unique && !IndexDescriptor::isIdIndexPattern(keys)) { + indexDoc.appendBool("unique", unique); + } + + return indexDoc.obj(); +} + +/** + * Used only for writes to the config server, config and admin databases. + */ +Status clusterCreateIndex(OperationContext* txn, + const std::string& ns, + const BSONObj& keys, + const BSONObj& collation, + bool unique) { + const NamespaceString nss(ns); + + // Go through the shard insert path + std::unique_ptr<BatchedInsertRequest> insert(new BatchedInsertRequest()); + insert->addToDocuments(createIndexDoc(ns, keys, collation, unique)); + + BatchedCommandRequest request(insert.release()); + request.setNS(NamespaceString(nss.getSystemIndexesCollection())); + request.setWriteConcern(WriteConcernOptions::Acknowledged); + + BatchedCommandResponse response; + + ClusterWriter writer(false, 0); + writer.write(txn, request, &response); + + return response.toStatus(); +} class ShardCollectionCmd : public Command { public: @@ -194,8 +260,9 @@ public: } } - vector<ShardId> shardIds; + std::vector<ShardId> shardIds; shardRegistry->getAllShardIds(&shardIds); + const int numShards = shardIds.size(); // Cannot have more than 8192 initial chunks per shard. Setting a maximum of 1,000,000 @@ -224,7 +291,7 @@ public: // collection. BSONObj res; { - list<BSONObj> all = + std::list<BSONObj> all = conn->getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); if (!all.empty()) { res = all.front().getOwned(); @@ -328,12 +395,12 @@ public: // 5. If the collection is empty, and it's still possible to create an index // on the proposed key, we go ahead and do so. - list<BSONObj> indexes = conn->getIndexSpecs(nss.ns()); + std::list<BSONObj> indexes = conn->getIndexSpecs(nss.ns()); // 1. Verify consistency with existing unique indexes ShardKeyPattern proposedShardKey(proposedKey); - for (list<BSONObj>::iterator it = indexes.begin(); it != indexes.end(); ++it) { - BSONObj idx = *it; + + for (const auto& idx : indexes) { BSONObj currentKey = idx["key"].embeddedObject(); bool isUnique = idx["unique"].trueValue(); @@ -351,8 +418,7 @@ public: // 2. Check for a useful index bool hasUsefulIndexForKey = false; - for (list<BSONObj>::iterator it = indexes.begin(); it != indexes.end(); ++it) { - BSONObj idx = *it; + for (const auto& idx : indexes) { BSONObj currentKey = idx["key"].embeddedObject(); // Check 2.i. and 2.ii. if (!idx["sparse"].trueValue() && idx["filter"].eoo() && idx["collation"].eoo() && @@ -377,12 +443,12 @@ public: // 3. If proposed key is required to be unique, additionally check for exact match. bool careAboutUnique = cmdObj["unique"].trueValue(); + if (hasUsefulIndexForKey && careAboutUnique) { BSONObj eqQuery = BSON("ns" << nss.ns() << "key" << proposedKey); BSONObj eqQueryResult; - for (list<BSONObj>::iterator it = indexes.begin(); it != indexes.end(); ++it) { - BSONObj idx = *it; + for (const auto& idx : indexes) { if (SimpleBSONObjComparator::kInstance.evaluate(idx["key"].embeddedObject() == proposedKey)) { eqQueryResult = idx; @@ -459,8 +525,8 @@ public: // 2. move them one at a time // 3. split the big chunks to achieve the desired total number of initial chunks - vector<BSONObj> initSplits; // there will be at most numShards-1 of these - vector<BSONObj> allSplits; // all of the initial desired split points + std::vector<BSONObj> initSplits; // there will be at most numShards-1 of these + std::vector<BSONObj> allSplits; // all of the initial desired split points // only pre-split when using a hashed shard key and collection is still empty if (isHashedShardKey && isEmpty) { @@ -516,16 +582,13 @@ public: audit::logShardCollection(Client::getCurrent(), nss.ns(), proposedKey, careAboutUnique); - Status status = catalogClient->shardCollection(txn, + uassertStatusOK(catalogClient->shardCollection(txn, nss.ns(), proposedShardKey, defaultCollation, careAboutUnique, initSplits, - std::set<ShardId>{}); - if (!status.isOK()) { - return appendCommandStatus(result, status); - } + std::set<ShardId>{})); // Make sure the cached metadata for the collection knows that we are now sharded config->getChunkManager(txn, nss.ns(), true /* reload */); @@ -536,7 +599,7 @@ public: if (isHashedShardKey && isEmpty) { // Reload the new config info. If we created more than one initial chunk, then // we need to move them around to balance. - shared_ptr<ChunkManager> chunkManager = config->getChunkManager(txn, nss.ns(), true); + auto chunkManager = config->getChunkManager(txn, nss.ns(), true); ChunkMap chunkMap = chunkManager->getChunkMap(); // 2. Move and commit each "big chunk" to a different shard. @@ -549,7 +612,7 @@ public: } const auto to = toStatus.getValue(); - shared_ptr<Chunk> chunk = c->second; + auto chunk = c->second; // Can't move chunk to shard it's already on if (to->getId() == chunk->getShardId()) { @@ -587,10 +650,10 @@ public: // 3. Subdivide the big chunks by splitting at each of the points in "allSplits" // that we haven't already split by. - shared_ptr<Chunk> currentChunk = + auto currentChunk = chunkManager->findIntersectingChunkWithSimpleCollation(txn, allSplits[0]); - vector<BSONObj> subSplits; + std::vector<BSONObj> subSplits; for (unsigned i = 0; i <= allSplits.size(); i++) { if (i == allSplits.size() || !currentChunk->containsKey(allSplits[i])) { if (!subSplits.empty()) { diff --git a/src/mongo/s/commands/cluster_write.cpp b/src/mongo/s/commands/cluster_write.cpp index 03d0306aebc..217c37e273b 100644 --- a/src/mongo/s/commands/cluster_write.cpp +++ b/src/mongo/s/commands/cluster_write.cpp @@ -32,108 +32,161 @@ #include "mongo/s/commands/cluster_write.h" -#include <string> -#include <vector> - #include "mongo/base/status.h" -#include "mongo/db/index/index_descriptor.h" +#include "mongo/client/connpool.h" +#include "mongo/db/lasterror.h" #include "mongo/db/write_concern_options.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/type_collection.h" +#include "mongo/s/chunk.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/chunk_manager_targeter.h" #include "mongo/s/client/dbclient_multi_command.h" +#include "mongo/s/client/shard_registry.h" #include "mongo/s/config.h" +#include "mongo/s/config_server_client.h" #include "mongo/s/grid.h" -#include "mongo/s/mongos_options.h" -#include "mongo/s/write_ops/batch_write_exec.h" +#include "mongo/s/shard_util.h" +#include "mongo/s/sharding_raii.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" namespace mongo { +namespace { -using std::shared_ptr; -using std::unique_ptr; -using std::vector; -using std::map; -using std::string; -using std::stringstream; +// Test whether we should split once data * splitTestFactor > chunkSize (approximately) +const uint64_t splitTestFactor = 5; -using IndexVersion = IndexDescriptor::IndexVersion; +const uint64_t kTooManySplitPoints = 4; -namespace { +void toBatchError(const Status& status, BatchedCommandResponse* response) { + response->clear(); + response->setErrCode(status.code()); + response->setErrMessage(status.reason()); + response->setOk(false); + dassert(response->isValid(NULL)); +} + +void reloadChunkManager(OperationContext* txn, const NamespaceString& nss) { + auto config = uassertStatusOK(ScopedShardDatabase::getExisting(txn, nss.db())); + config.db()->getChunkManagerIfExists(txn, nss.ns(), true); +} /** - * Constructs the BSON specification document for the given namespace, index key - * and options. + * Given a maxChunkSize configuration and the number of chunks in a particular sharded collection, + * returns an optimal chunk size to use in order to achieve a good ratio between number of chunks + * and their size. */ -BSONObj createIndexDoc(const string& ns, - const BSONObj& keys, - const BSONObj& collation, - bool unique) { - BSONObjBuilder indexDoc; - indexDoc.append("ns", ns); - indexDoc.append("key", keys); - - stringstream indexName; - - bool isFirstKey = true; - for (BSONObjIterator keyIter(keys); keyIter.more();) { - BSONElement currentKey = keyIter.next(); - - if (isFirstKey) { - isFirstKey = false; - } else { - indexName << "_"; - } - - indexName << currentKey.fieldName() << "_"; - if (currentKey.isNumber()) { - indexName << currentKey.numberInt(); - } else { - indexName << currentKey.str(); // this should match up with shell command - } +uint64_t calculateDesiredChunkSize(uint64_t maxChunkSizeBytes, uint64_t numChunks) { + // Splitting faster in early chunks helps spread out an initial load better + const uint64_t minChunkSize = 1 << 20; // 1 MBytes + + if (numChunks <= 1) { + return 1024; + } else if (numChunks < 3) { + return minChunkSize / 2; + } else if (numChunks < 10) { + return std::max(maxChunkSizeBytes / 4, minChunkSize); + } else if (numChunks < 20) { + return std::max(maxChunkSizeBytes / 2, minChunkSize); + } else { + return maxChunkSizeBytes; } +} - indexDoc.append("name", indexName.str()); +/** + * Returns the split point that will result in one of the chunk having exactly one document. Also + * returns an empty document if the split point cannot be determined. + * + * doSplitAtLower - determines which side of the split will have exactly one document. True means + * that the split point chosen will be closer to the lower bound. + * + * NOTE: this assumes that the shard key is not "special"- that is, the shardKeyPattern is simply an + * ordered list of ascending/descending field names. For example {a : 1, b : -1} is not special, but + * {a : "hashed"} is. + */ +BSONObj findExtremeKeyForShard(OperationContext* txn, + const NamespaceString& nss, + const ShardId& shardId, + const ShardKeyPattern& shardKeyPattern, + bool doSplitAtLower) { + Query q; + + if (doSplitAtLower) { + q.sort(shardKeyPattern.toBSON()); + } else { + // need to invert shard key pattern to sort backwards + // TODO: make a helper in ShardKeyPattern? + BSONObjBuilder r; + + BSONObjIterator i(shardKeyPattern.toBSON()); + while (i.more()) { + BSONElement e = i.next(); + uassert(10163, "can only handle numbers here - which i think is correct", e.isNumber()); + r.append(e.fieldName(), -1 * e.number()); + } - if (!collation.isEmpty()) { - // Creating an index with the "collation" option requires a v=2 index. - indexDoc.append("v", static_cast<int>(IndexVersion::kV2)); - indexDoc.append("collation", collation); + q.sort(r.obj()); } - if (unique && !IndexDescriptor::isIdIndexPattern(keys)) { - indexDoc.appendBool("unique", unique); + // Find the extreme key + const auto shardConnStr = [&]() { + const auto shard = uassertStatusOK(Grid::get(txn)->shardRegistry()->getShard(txn, shardId)); + return shard->getConnString(); + }(); + + ScopedDbConnection conn(shardConnStr); + + BSONObj end; + + if (doSplitAtLower) { + // Splitting close to the lower bound means that the split point will be the + // upper bound. Chunk range upper bounds are exclusive so skip a document to + // make the lower half of the split end up with a single document. + std::unique_ptr<DBClientCursor> cursor = conn->query(nss.ns(), + q, + 1, /* nToReturn */ + 1 /* nToSkip */); + + uassert(28736, + str::stream() << "failed to initialize cursor during auto split due to " + << "connection problem with " + << conn->getServerAddress(), + cursor.get() != nullptr); + + if (cursor->more()) { + end = cursor->next().getOwned(); + } + } else { + end = conn->findOne(nss.ns(), q); } - return indexDoc.obj(); -} + conn.done(); -void toBatchError(const Status& status, BatchedCommandResponse* response) { - response->clear(); - response->setErrCode(status.code()); - response->setErrMessage(status.reason()); - response->setOk(false); - dassert(response->isValid(NULL)); + if (end.isEmpty()) { + return BSONObj(); + } + + return shardKeyPattern.extractShardKeyFromDoc(end); } /** * Splits the chunks touched based from the targeter stats if needed. */ void splitIfNeeded(OperationContext* txn, const NamespaceString& nss, const TargeterStats& stats) { - auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString()); + auto status = Grid::get(txn)->catalogCache()->getDatabase(txn, nss.db().toString()); if (!status.isOK()) { warning() << "failed to get database config for " << nss << " while checking for auto-split: " << status.getStatus(); return; } - shared_ptr<DBConfig> config = status.getValue(); + auto config = status.getValue(); - shared_ptr<ChunkManager> chunkManager; - shared_ptr<Shard> dummyShard; + std::shared_ptr<ChunkManager> chunkManager; + std::shared_ptr<Shard> dummyShard; config->getChunkManagerOrPrimary(txn, nss.ns(), chunkManager, dummyShard); if (!chunkManager) { @@ -141,7 +194,7 @@ void splitIfNeeded(OperationContext* txn, const NamespaceString& nss, const Targ } for (auto it = stats.chunkSizeDelta.cbegin(); it != stats.chunkSizeDelta.cend(); ++it) { - shared_ptr<Chunk> chunk; + std::shared_ptr<Chunk> chunk; try { chunk = chunkManager->findIntersectingChunkWithSimpleCollation(txn, it->first); } catch (const AssertionException& ex) { @@ -150,42 +203,23 @@ void splitIfNeeded(OperationContext* txn, const NamespaceString& nss, const Targ return; } - chunk->splitIfShould(txn, it->second); + updateChunkWriteStatsAndSplitIfNeeded(txn, chunkManager.get(), chunk.get(), it->second); } } } // namespace -Status clusterCreateIndex( - OperationContext* txn, const string& ns, BSONObj keys, BSONObj collation, bool unique) { - const NamespaceString nss(ns); - const std::string dbName = nss.db().toString(); - - BSONObj indexDoc = createIndexDoc(ns, keys, collation, unique); - - // Go through the shard insert path - std::unique_ptr<BatchedInsertRequest> insert(new BatchedInsertRequest()); - insert->addToDocuments(indexDoc); - - BatchedCommandRequest request(insert.release()); - request.setNS(NamespaceString(nss.getSystemIndexesCollection())); - request.setWriteConcern(WriteConcernOptions::Acknowledged); - - BatchedCommandResponse response; - - ClusterWriter writer(false, 0); - writer.write(txn, request, &response); - - return response.toStatus(); -} - +ClusterWriter::ClusterWriter(bool autoSplit, int timeoutMillis) + : _autoSplit(autoSplit), _timeoutMillis(timeoutMillis) {} void ClusterWriter::write(OperationContext* txn, const BatchedCommandRequest& origRequest, BatchedCommandResponse* response) { // Add _ids to insert request if req'd - unique_ptr<BatchedCommandRequest> idRequest(BatchedCommandRequest::cloneWithIds(origRequest)); - const BatchedCommandRequest* request = NULL != idRequest.get() ? idRequest.get() : &origRequest; + std::unique_ptr<BatchedCommandRequest> idRequest( + BatchedCommandRequest::cloneWithIds(origRequest)); + + const BatchedCommandRequest* request = idRequest ? idRequest.get() : &origRequest; const NamespaceString& nss = request->getNS(); if (!nss.isValid()) { @@ -215,28 +249,28 @@ void ClusterWriter::write(OperationContext* txn, return; } - string errMsg; + std::string errMsg; if (request->isInsertIndexRequest() && !request->isValidIndexRequest(&errMsg)) { toBatchError(Status(ErrorCodes::InvalidOptions, errMsg), response); return; } // Config writes and shard writes are done differently - const string dbName = nss.db().toString(); - - unique_ptr<BatchedCommandRequest> requestWithWriteConcern; - if (dbName == "config" || dbName == "admin") { - // w:majority is the only valid write concern for writes to the config servers. - // We also allow w:1 to come in on a user-initiated write, though we convert it here to - // w:majority before sending it to the config servers. + if (nss.db() == NamespaceString::kConfigDb || nss.db() == NamespaceString::kAdminDb) { + // w:majority is the only valid write concern for writes to the config servers. We also + // allow w:1 to come in on a user-initiated write, though we convert it here to w:majority + // before sending it to the config servers. bool rewriteCmdWithWriteConcern = false; + WriteConcernOptions writeConcern; + if (request->isWriteConcernSet()) { Status status = writeConcern.parse(request->getWriteConcern()); if (!status.isOK()) { toBatchError(status, response); return; } + if (!writeConcern.validForConfigServers()) { toBatchError(Status(ErrorCodes::InvalidOptions, "Invalid replication write concern. Writes to config servers " @@ -244,6 +278,7 @@ void ClusterWriter::write(OperationContext* txn, response); return; } + if (writeConcern.wMode == "") { invariant(writeConcern.wNumNodes == 1); rewriteCmdWithWriteConcern = true; @@ -252,6 +287,8 @@ void ClusterWriter::write(OperationContext* txn, rewriteCmdWithWriteConcern = true; } + std::unique_ptr<BatchedCommandRequest> requestWithWriteConcern; + if (rewriteCmdWithWriteConcern) { requestWithWriteConcern.reset(new BatchedCommandRequest(request->getBatchType())); request->cloneTo(requestWithWriteConcern.get()); @@ -261,7 +298,7 @@ void ClusterWriter::write(OperationContext* txn, request = requestWithWriteConcern.get(); } - grid.catalogClient(txn)->writeConfigServerDirect(txn, *request, response); + Grid::get(txn)->catalogClient(txn)->writeConfigServerDirect(txn, *request, response); } else { TargeterStats targeterStats; @@ -292,11 +329,185 @@ void ClusterWriter::write(OperationContext* txn, } } -ClusterWriter::ClusterWriter(bool autoSplit, int timeoutMillis) - : _autoSplit(autoSplit), _timeoutMillis(timeoutMillis) {} - const BatchWriteExecStats& ClusterWriter::getStats() { return _stats; } +void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* txn, + ChunkManager* manager, + Chunk* chunk, + long dataWritten) { + // Disable lastError tracking so that any errors, which occur during auto-split do not get + // bubbled up on the client connection doing a write. + LastError::Disabled d(&LastError::get(cc())); + + const auto balancerConfig = Grid::get(txn)->getBalancerConfiguration(); + + const bool minIsInf = + (0 == manager->getShardKeyPattern().getKeyPattern().globalMin().woCompare(chunk->getMin())); + const bool maxIsInf = + (0 == manager->getShardKeyPattern().getKeyPattern().globalMax().woCompare(chunk->getMax())); + + const uint64_t chunkBytesWritten = chunk->addBytesWritten(dataWritten); + + const uint64_t desiredChunkSize = + calculateDesiredChunkSize(balancerConfig->getMaxChunkSizeBytes(), manager->numChunks()); + + // If this chunk is at either end of the range, trigger auto-split at 10% less data written in + // order to trigger the top-chunk optimization. + const uint64_t splitThreshold = (minIsInf || maxIsInf) + ? static_cast<uint64_t>((double)desiredChunkSize * 0.9) + : desiredChunkSize; + + // Check if there are enough estimated bytes written to warrant a split + if (chunkBytesWritten < splitThreshold / splitTestFactor) { + return; + } + + const NamespaceString nss(manager->getns()); + + if (!manager->_autoSplitThrottle._splitTickets.tryAcquire()) { + LOG(1) << "won't auto split because not enough tickets: " << nss; + return; + } + + TicketHolderReleaser releaser(&(manager->_autoSplitThrottle._splitTickets)); + + const ChunkRange chunkRange(chunk->getMin(), chunk->getMax()); + + try { + // Ensure we have the most up-to-date balancer configuration + uassertStatusOK(balancerConfig->refreshAndCheck(txn)); + + if (!balancerConfig->getShouldAutoSplit()) { + return; + } + + LOG(1) << "about to initiate autosplit: " << redact(chunk->toString()) + << " dataWritten: " << chunkBytesWritten << " splitThreshold: " << splitThreshold; + + const uint64_t chunkSizeToUse = [&]() { + const uint64_t estNumSplitPoints = chunkBytesWritten / desiredChunkSize * 2; + + if (estNumSplitPoints >= kTooManySplitPoints) { + // The current desired chunk size will split the chunk into lots of small chunk and + // at the worst case this can result into thousands of chunks. So check and see if a + // bigger value can be used. + return std::min(chunkBytesWritten, balancerConfig->getMaxChunkSizeBytes()); + } else { + return desiredChunkSize; + } + }(); + + auto splitPoints = + uassertStatusOK(shardutil::selectChunkSplitPoints(txn, + chunk->getShardId(), + nss, + manager->getShardKeyPattern(), + chunkRange, + chunkSizeToUse, + boost::none)); + + if (splitPoints.size() <= 1) { + // No split points means there isn't enough data to split on; 1 split point means we + // have + // between half the chunk size to full chunk size so there is no need to split yet + chunk->clearBytesWritten(); + return; + } + + if (minIsInf || maxIsInf) { + // We don't want to reset _dataWritten since we want to check the other side right away + } else { + // We're splitting, so should wait a bit + chunk->clearBytesWritten(); + } + + // We assume that if the chunk being split is the first (or last) one on the collection, + // this chunk is likely to see more insertions. Instead of splitting mid-chunk, we use the + // very first (or last) key as a split point. + // + // This heuristic is skipped for "special" shard key patterns that are not likely to produce + // monotonically increasing or decreasing values (e.g. hashed shard keys). + if (KeyPattern::isOrderedKeyPattern(manager->getShardKeyPattern().toBSON())) { + if (minIsInf) { + BSONObj key = findExtremeKeyForShard( + txn, nss, chunk->getShardId(), manager->getShardKeyPattern(), true); + if (!key.isEmpty()) { + splitPoints.front() = key.getOwned(); + } + } else if (maxIsInf) { + BSONObj key = findExtremeKeyForShard( + txn, nss, chunk->getShardId(), manager->getShardKeyPattern(), false); + if (!key.isEmpty()) { + splitPoints.back() = key.getOwned(); + } + } + } + + const auto suggestedMigrateChunk = + uassertStatusOK(shardutil::splitChunkAtMultiplePoints(txn, + chunk->getShardId(), + nss, + manager->getShardKeyPattern(), + manager->getVersion(), + chunkRange, + splitPoints)); + + // Balance the resulting chunks if the option is enabled and if the shard suggested a chunk + // to balance + const bool shouldBalance = [&]() { + if (!balancerConfig->shouldBalanceForAutoSplit()) + return false; + + auto collStatus = + Grid::get(txn)->catalogClient(txn)->getCollection(txn, manager->getns()); + if (!collStatus.isOK()) { + log() << "Auto-split for " << nss << " failed to load collection metadata" + << causedBy(redact(collStatus.getStatus())); + return false; + } + + return collStatus.getValue().value.getAllowBalance(); + }(); + + log() << "autosplitted " << nss << " chunk: " << redact(chunk->toString()) << " into " + << (splitPoints.size() + 1) << " parts (splitThreshold " << splitThreshold << ")" + << (suggestedMigrateChunk ? "" : (std::string) " (migrate suggested" + + (shouldBalance ? ")" : ", but no migrations allowed)")); + + if (!shouldBalance || !suggestedMigrateChunk) { + reloadChunkManager(txn, nss); + return; + } + + // Top chunk optimization - try to move the top chunk out of this shard to prevent the hot + // spot + // from staying on a single shard. This is based on the assumption that succeeding inserts + // will + // fall on the top chunk. + + // We need to use the latest chunk manager (after the split) in order to have the most + // up-to-date view of the chunk we are about to move + auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(txn, nss)); + auto suggestedChunk = scopedCM.cm()->findIntersectingChunkWithSimpleCollation( + txn, suggestedMigrateChunk->getMin()); + + ChunkType chunkToMove; + chunkToMove.setNS(nss.ns()); + chunkToMove.setShard(suggestedChunk->getShardId()); + chunkToMove.setMin(suggestedChunk->getMin()); + chunkToMove.setMax(suggestedChunk->getMax()); + chunkToMove.setVersion(suggestedChunk->getLastmod()); + + uassertStatusOK(configsvr_client::rebalanceChunk(txn, chunkToMove)); + + reloadChunkManager(txn, nss); + } catch (const DBException& ex) { + chunk->randomizeBytesWritten(); + + log() << "Unable to auto-split chunk " << redact(chunkRange.toString()) << causedBy(ex); + } +} + } // namespace mongo diff --git a/src/mongo/s/commands/cluster_write.h b/src/mongo/s/commands/cluster_write.h index a0d5e931104..80f4a325ddf 100644 --- a/src/mongo/s/commands/cluster_write.h +++ b/src/mongo/s/commands/cluster_write.h @@ -28,12 +28,15 @@ #pragma once +#include <string> + #include "mongo/s/write_ops/batch_write_exec.h" namespace mongo { -class BatchedCommandRequest; -class BatchedCommandResponse; +class BSONObj; +class Chunk; +class ChunkManager; class OperationContext; class ClusterWriter { @@ -54,9 +57,13 @@ private: }; /** - * Used only for writes to the config server, config and admin databases. + * Adds the specified amount of data written to the chunk's stats and if the total amount nears the + * max size of a shard attempt to split the chunk. This call is opportunistic and swallows any + * errors. */ -Status clusterCreateIndex( - OperationContext* txn, const std::string& ns, BSONObj keys, BSONObj collation, bool unique); +void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* txn, + ChunkManager* manager, + Chunk* chunk, + long dataWritten); } // namespace mongo |