summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-01-26 14:57:32 -0500
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-01-30 16:43:13 -0500
commit3942d88af18cd7a2d9fff8ea3800f8c7769e5c9f (patch)
treeb59c6fa5a7694b7b8a5d1a8c70d0752bc3236f66 /src/mongo
parent2922aea4adf8e8fe9bab3a5d6e986e7aecba7228 (diff)
downloadmongo-3942d88af18cd7a2d9fff8ea3800f8c7769e5c9f.tar.gz
SERVER-27809 Move Chunk::splitIfShould to cluster_write.h/.cpp
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/s/chunk.cpp369
-rw-r--r--src/mongo/s/chunk.h125
-rw-r--r--src/mongo/s/chunk_manager.cpp18
-rw-r--r--src/mongo/s/chunk_manager.h26
-rw-r--r--src/mongo/s/commands/cluster_find_and_modify_cmd.cpp15
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_cmd.cpp3
-rw-r--r--src/mongo/s/commands/cluster_merge_chunks_cmd.cpp4
-rw-r--r--src/mongo/s/commands/cluster_shard_collection_cmd.cpp115
-rw-r--r--src/mongo/s/commands/cluster_write.cpp411
-rw-r--r--src/mongo/s/commands/cluster_write.h17
10 files changed, 491 insertions, 612 deletions
diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp
index 8ccf0250115..ff4341c3f7f 100644
--- a/src/mongo/s/chunk.cpp
+++ b/src/mongo/s/chunk.cpp
@@ -32,35 +32,17 @@
#include "mongo/s/chunk.h"
-#include "mongo/bson/simple_bsonobj_comparator.h"
-#include "mongo/client/connpool.h"
-#include "mongo/db/lasterror.h"
#include "mongo/platform/random.h"
#include "mongo/s/balancer_configuration.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
-#include "mongo/s/catalog/type_collection.h"
-#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/config_server_client.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/chunk_manager.h"
#include "mongo/s/grid.h"
-#include "mongo/s/shard_util.h"
-#include "mongo/s/sharding_raii.h"
#include "mongo/util/log.h"
namespace mongo {
-
-using std::shared_ptr;
-using std::unique_ptr;
-using std::map;
-using std::ostringstream;
-using std::set;
-using std::string;
-using std::stringstream;
-using std::vector;
-
namespace {
-const uint64_t kTooManySplitPoints = 4;
-
// Test whether we should split once data * splitTestFactor > chunkSize (approximately)
const int splitTestFactor = 5;
@@ -68,37 +50,14 @@ const int splitTestFactor = 5;
* Generates a random value for _dataWritten so that a mongos restart wouldn't cause delay in
* splitting.
*/
-int mkDataWritten() {
+int64_t mkDataWritten() {
PseudoRandom r(static_cast<int64_t>(time(0)));
return r.nextInt32(grid.getBalancerConfiguration()->getMaxChunkSizeBytes() / splitTestFactor);
}
-void reloadChunkManager(OperationContext* txn, const std::string ns) {
- const NamespaceString nss(ns);
- auto config = uassertStatusOK(ScopedShardDatabase::getExisting(txn, nss.db()));
- config.db()->getChunkManagerIfExists(txn, nss.ns(), true);
-}
-
-uint64_t calculateDesiredChunkSize(uint64_t maxChunkSizeBytes, uint64_t numChunks) {
- // Splitting faster in early chunks helps spread out an initial load better
- const uint64_t minChunkSize = 1 << 20; // 1 MBytes
-
- if (numChunks <= 1) {
- return 1024;
- } else if (numChunks < 3) {
- return minChunkSize / 2;
- } else if (numChunks < 10) {
- return std::max(maxChunkSizeBytes / 4, minChunkSize);
- } else if (numChunks < 20) {
- return std::max(maxChunkSizeBytes / 2, minChunkSize);
- } else {
- return maxChunkSizeBytes;
- }
-}
-
} // namespace
-Chunk::Chunk(OperationContext* txn, ChunkManager* manager, const ChunkType& from)
+Chunk::Chunk(ChunkManager* manager, const ChunkType& from)
: _manager(manager),
_min(from.getMin().getOwned()),
_max(from.getMax().getOwned()),
@@ -109,13 +68,13 @@ Chunk::Chunk(OperationContext* txn, ChunkManager* manager, const ChunkType& from
invariantOK(from.validate());
}
-Chunk::Chunk(ChunkManager* info,
+Chunk::Chunk(ChunkManager* manager,
const BSONObj& min,
const BSONObj& max,
const ShardId& shardId,
ChunkVersion lastmod,
uint64_t initialDataWritten)
- : _manager(info),
+ : _manager(manager),
_min(min),
_max(max),
_shardId(shardId),
@@ -127,306 +86,28 @@ bool Chunk::containsKey(const BSONObj& shardKey) const {
return getMin().woCompare(shardKey) <= 0 && shardKey.woCompare(getMax()) < 0;
}
-bool Chunk::_minIsInf() const {
- return 0 == _manager->getShardKeyPattern().getKeyPattern().globalMin().woCompare(getMin());
-}
-
-bool Chunk::_maxIsInf() const {
- return 0 == _manager->getShardKeyPattern().getKeyPattern().globalMax().woCompare(getMax());
-}
-
-BSONObj Chunk::_getExtremeKey(OperationContext* txn, bool doSplitAtLower) const {
- Query q;
- if (doSplitAtLower) {
- q.sort(_manager->getShardKeyPattern().toBSON());
- } else {
- // need to invert shard key pattern to sort backwards
- // TODO: make a helper in ShardKeyPattern?
-
- BSONObj k = _manager->getShardKeyPattern().toBSON();
- BSONObjBuilder r;
-
- BSONObjIterator i(k);
- while (i.more()) {
- BSONElement e = i.next();
- uassert(10163, "can only handle numbers here - which i think is correct", e.isNumber());
- r.append(e.fieldName(), -1 * e.number());
- }
-
- q.sort(r.obj());
- }
-
- // find the extreme key
- ScopedDbConnection conn(_getShardConnectionString(txn));
- BSONObj end;
-
- if (doSplitAtLower) {
- // Splitting close to the lower bound means that the split point will be the
- // upper bound. Chunk range upper bounds are exclusive so skip a document to
- // make the lower half of the split end up with a single document.
- unique_ptr<DBClientCursor> cursor = conn->query(_manager->getns(),
- q,
- 1, /* nToReturn */
- 1 /* nToSkip */);
-
- uassert(28736,
- str::stream() << "failed to initialize cursor during auto split due to "
- << "connection problem with "
- << conn->getServerAddress(),
- cursor.get() != nullptr);
-
- if (cursor->more()) {
- end = cursor->next().getOwned();
- }
- } else {
- end = conn->findOne(_manager->getns(), q);
- }
-
- conn.done();
- if (end.isEmpty())
- return BSONObj();
- return _manager->getShardKeyPattern().extractShardKeyFromDoc(end);
-}
-
-std::vector<BSONObj> Chunk::_determineSplitPoints(OperationContext* txn) const {
- // If splitting is not obligatory we may return early if there are not enough data we cap the
- // number of objects that would fall in the first half (before the split point) the rationale is
- // we'll find a split point without traversing all the data.
-
- uint64_t chunkSize = calculateDesiredChunkSize(
- Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(), _manager->numChunks());
-
- // Note: One split point for every 1/2 chunk size.
- const uint64_t chunkBytesWritten = getBytesWritten();
- const uint64_t estNumSplitPoints = chunkBytesWritten / chunkSize * 2;
-
- if (estNumSplitPoints >= kTooManySplitPoints) {
- // The current desired chunk size will split the chunk into lots of small chunk and at
- // the worst case this can result into thousands of chunks. So check and see if a bigger
- // value can be used.
- chunkSize = std::min(chunkBytesWritten,
- Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes());
- }
-
- vector<BSONObj> splitPoints =
- uassertStatusOK(shardutil::selectChunkSplitPoints(txn,
- _shardId,
- NamespaceString(_manager->getns()),
- _manager->getShardKeyPattern(),
- ChunkRange(_min, _max),
- chunkSize,
- boost::none));
- if (splitPoints.size() <= 1) {
- // No split points means there isn't enough data to split on 1 split point means we have
- // between half the chunk size to full chunk size so we shouldn't split.
- splitPoints.clear();
- }
-
- return splitPoints;
-}
-
-StatusWith<boost::optional<ChunkRange>> Chunk::split(OperationContext* txn,
- size_t* resultingSplits) const {
- size_t dummy;
- if (resultingSplits == NULL) {
- resultingSplits = &dummy;
- }
-
- vector<BSONObj> splitPoints = _determineSplitPoints(txn);
- if (splitPoints.empty()) {
- return {ErrorCodes::CannotSplit, "chunk not full enough to trigger auto-split"};
- }
-
- // We assume that if the chunk being split is the first (or last) one on the collection,
- // this chunk is likely to see more insertions. Instead of splitting mid-chunk, we use
- // the very first (or last) key as a split point.
- // This heuristic is skipped for "special" shard key patterns that are not likely to
- // produce monotonically increasing or decreasing values (e.g. hashed shard keys).
- if (KeyPattern::isOrderedKeyPattern(_manager->getShardKeyPattern().toBSON())) {
- if (_minIsInf()) {
- BSONObj key = _getExtremeKey(txn, true);
- if (!key.isEmpty()) {
- splitPoints.front() = key.getOwned();
- }
- } else if (_maxIsInf()) {
- BSONObj key = _getExtremeKey(txn, false);
- if (!key.isEmpty()) {
- splitPoints.back() = key.getOwned();
- }
- }
- }
-
- auto splitStatus = shardutil::splitChunkAtMultiplePoints(txn,
- _shardId,
- NamespaceString(_manager->getns()),
- _manager->getShardKeyPattern(),
- _manager->getVersion(),
- ChunkRange(_min, _max),
- splitPoints);
- if (!splitStatus.isOK()) {
- return splitStatus.getStatus();
- }
-
- reloadChunkManager(txn, _manager->getns());
-
- *resultingSplits = splitPoints.size();
- return splitStatus.getValue();
-}
-
uint64_t Chunk::getBytesWritten() const {
return _dataWritten;
}
-void Chunk::addBytesWritten(uint64_t bytesWrittenIncrement) {
+uint64_t Chunk::addBytesWritten(uint64_t bytesWrittenIncrement) {
_dataWritten += bytesWrittenIncrement;
+ return _dataWritten;
}
-void Chunk::setBytesWritten(uint64_t newBytesWritten) {
- _dataWritten = newBytesWritten;
-}
-
-bool Chunk::splitIfShould(OperationContext* txn, long dataWritten) {
- LastError::Disabled d(&LastError::get(cc()));
-
- addBytesWritten(dataWritten);
-
- const uint64_t chunkBytesWritten = getBytesWritten();
-
- try {
- uint64_t splitThreshold = calculateDesiredChunkSize(
- Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(),
- _manager->numChunks());
-
- if (_minIsInf() || _maxIsInf()) {
- splitThreshold = static_cast<uint64_t>((double)splitThreshold * 0.9);
- }
-
- if (chunkBytesWritten < splitThreshold / splitTestFactor) {
- return false;
- }
-
- if (!_manager->_splitHeuristics._splitTickets.tryAcquire()) {
- LOG(1) << "won't auto split because not enough tickets: " << _manager->getns();
- return false;
- }
-
- TicketHolderReleaser releaser(&(_manager->_splitHeuristics._splitTickets));
-
- const auto balancerConfig = Grid::get(txn)->getBalancerConfiguration();
-
- Status refreshStatus = balancerConfig->refreshAndCheck(txn);
- if (!refreshStatus.isOK()) {
- warning() << "Unable to refresh balancer settings" << causedBy(refreshStatus);
- return false;
- }
-
- bool shouldAutoSplit = balancerConfig->getShouldAutoSplit();
- if (!shouldAutoSplit) {
- return false;
- }
-
- LOG(1) << "about to initiate autosplit: " << *this << " dataWritten: " << chunkBytesWritten
- << " splitThreshold: " << splitThreshold;
-
- size_t splitCount = 0;
- auto splitStatus = split(txn, &splitCount);
- if (!splitStatus.isOK()) {
- // Split would have issued a message if we got here. This means there wasn't enough data
- // to split, so don't want to try again until we have considerably more data.
- setBytesWritten(0);
- return false;
- }
-
- if (_maxIsInf() || _minIsInf()) {
- // we don't want to reset _dataWritten since we kind of want to check the other side
- // right away
- } else {
- // we're splitting, so should wait a bit
- setBytesWritten(0);
- }
-
- bool shouldBalance = balancerConfig->shouldBalanceForAutoSplit();
-
- if (shouldBalance) {
- auto collStatus = grid.catalogClient(txn)->getCollection(txn, _manager->getns());
- if (!collStatus.isOK()) {
- warning() << "Auto-split for " << _manager->getns()
- << " failed to load collection metadata"
- << causedBy(collStatus.getStatus());
- return false;
- }
-
- shouldBalance = collStatus.getValue().value.getAllowBalance();
- }
-
- const auto suggestedMigrateChunk = std::move(splitStatus.getValue());
-
- log() << "autosplitted " << _manager->getns() << " shard: " << toString() << " into "
- << (splitCount + 1) << " (splitThreshold " << splitThreshold << ")"
- << (suggestedMigrateChunk ? "" : (string) " (migrate suggested" +
- (shouldBalance ? ")" : ", but no migrations allowed)"));
-
- // Top chunk optimization - try to move the top chunk out of this shard to prevent the hot
- // spot from staying on a single shard. This is based on the assumption that succeeding
- // inserts will fall on the top chunk.
- if (suggestedMigrateChunk && shouldBalance) {
- const NamespaceString nss(_manager->getns());
-
- // We need to use the latest chunk manager (after the split) in order to have the most
- // up-to-date view of the chunk we are about to move
- auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(txn, nss));
- auto suggestedChunk = scopedCM.cm()->findIntersectingChunkWithSimpleCollation(
- txn, suggestedMigrateChunk->getMin());
-
- ChunkType chunkToMove;
- chunkToMove.setNS(nss.ns());
- chunkToMove.setShard(suggestedChunk->getShardId());
- chunkToMove.setMin(suggestedChunk->getMin());
- chunkToMove.setMax(suggestedChunk->getMax());
- chunkToMove.setVersion(suggestedChunk->getLastmod());
-
- Status rebalanceStatus = configsvr_client::rebalanceChunk(txn, chunkToMove);
- if (!rebalanceStatus.isOK()) {
- msgassertedNoTraceWithStatus(10412, rebalanceStatus);
- }
-
- reloadChunkManager(txn, _manager->getns());
- }
-
- return true;
- } catch (const DBException& e) {
- // TODO: Make this better - there are lots of reasons a split could fail
- // Random so that we don't sync up with other failed splits
- setBytesWritten(mkDataWritten());
-
- // if the collection lock is taken (e.g. we're migrating), it is fine for the split to fail.
- warning() << "could not autosplit collection " << _manager->getns() << causedBy(e);
- return false;
- }
-}
-
-ConnectionString Chunk::_getShardConnectionString(OperationContext* txn) const {
- const auto shard = uassertStatusOK(grid.shardRegistry()->getShard(txn, getShardId()));
- return shard->getConnString();
-}
-
-void Chunk::appendShortVersion(const char* name, BSONObjBuilder& b) const {
- BSONObjBuilder bb(b.subobjStart(name));
- bb.append(ChunkType::min(), _min);
- bb.append(ChunkType::max(), _max);
- bb.done();
+void Chunk::clearBytesWritten() {
+ _dataWritten = 0;
}
-bool Chunk::operator==(const Chunk& s) const {
- return _min.woCompare(s._min) == 0 && _max.woCompare(s._max) == 0;
+void Chunk::randomizeBytesWritten() {
+ _dataWritten = mkDataWritten();
}
-string Chunk::toString() const {
- stringstream ss;
- ss << ChunkType::ns() << ": " << _manager->getns() << ", " << ChunkType::shard() << ": "
- << _shardId << ", " << ChunkType::DEPRECATED_lastmod() << ": " << _lastmod.toString() << ", "
- << ChunkType::min() << ": " << _min << ", " << ChunkType::max() << ": " << _max;
- return ss.str();
+std::string Chunk::toString() const {
+ return str::stream() << ChunkType::shard() << ": " << _shardId << ", "
+ << ChunkType::DEPRECATED_lastmod() << ": " << _lastmod.toString() << ", "
+ << ChunkType::min() << ": " << _min << ", " << ChunkType::max() << ": "
+ << _max;
}
void Chunk::markAsJumbo(OperationContext* txn) const {
@@ -437,15 +118,15 @@ void Chunk::markAsJumbo(OperationContext* txn) const {
// at least this mongos won't try and keep moving
_jumbo = true;
- const string chunkName = ChunkType::genID(_manager->getns(), _min);
+ const std::string chunkName = ChunkType::genID(_manager->getns(), _min);
- auto status =
- grid.catalogClient(txn)->updateConfigDocument(txn,
- ChunkType::ConfigNS,
- BSON(ChunkType::name(chunkName)),
- BSON("$set" << BSON(ChunkType::jumbo(true))),
- false,
- ShardingCatalogClient::kMajorityWriteConcern);
+ auto status = Grid::get(txn)->catalogClient(txn)->updateConfigDocument(
+ txn,
+ ChunkType::ConfigNS,
+ BSON(ChunkType::name(chunkName)),
+ BSON("$set" << BSON(ChunkType::jumbo(true))),
+ false,
+ ShardingCatalogClient::kMajorityWriteConcern);
if (!status.isOK()) {
warning() << "couldn't set jumbo for chunk: " << chunkName << causedBy(status.getStatus());
}
diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h
index dd97c751b87..4c107f0d2c2 100644
--- a/src/mongo/s/chunk.h
+++ b/src/mongo/s/chunk.h
@@ -28,11 +28,9 @@
#pragma once
-#include <boost/optional.hpp>
-
-#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/base/disallow_copying.h"
#include "mongo/s/chunk_version.h"
-#include "mongo/s/client/shard.h"
+#include "mongo/s/shard_id.h"
namespace mongo {
@@ -41,17 +39,13 @@ class ChunkType;
class OperationContext;
/**
- config.chunks
- { ns : "alleyinsider.fs.chunks" , min : {} , max : {} , server : "localhost:30001" }
-
- x is in a shard iff
- min <= x < max
+ * Represents a cache entry for a single Chunk. Owned by a ChunkManager.
*/
class Chunk {
MONGO_DISALLOW_COPYING(Chunk);
public:
- Chunk(OperationContext* txn, ChunkManager* manager, const ChunkType& from);
+ Chunk(ChunkManager* manager, const ChunkType& from);
Chunk(ChunkManager* manager,
const BSONObj& min,
@@ -60,10 +54,6 @@ public:
ChunkVersion lastmod,
uint64_t initialDataWritten);
- //
- // chunk boundary support
- //
-
const BSONObj& getMin() const {
return _min;
}
@@ -72,53 +62,36 @@ public:
return _max;
}
- // Returns true if this chunk contains the given shard key, and false otherwise
- //
- // Note: this function takes an extracted *key*, not an original document
- // (the point may be computed by, say, hashing a given field or projecting
- // to a subset of fields).
- bool containsKey(const BSONObj& shardKey) const;
-
- //
- // chunk version support
- //
-
- void appendShortVersion(const char* name, BSONObjBuilder& b) const;
+ const ShardId& getShardId() const {
+ return _shardId;
+ }
ChunkVersion getLastmod() const {
return _lastmod;
}
- //
- // split support
- //
+ bool isJumbo() const {
+ return _jumbo;
+ }
/**
- * Get/increment/set the estimation of how much data was written for this chunk.
+ * Returns a string represenation of the chunk for logging.
*/
- uint64_t getBytesWritten() const;
- void addBytesWritten(uint64_t bytesWrittenIncrement);
- void setBytesWritten(uint64_t newBytesWritten);
+ std::string toString() const;
- /**
- * if the amount of data written nears the max size of a shard
- * then we check the real size, and if its too big, we split
- * @return if something was split
- */
- bool splitIfShould(OperationContext* txn, long dataWritten);
+ // Returns true if this chunk contains the given shard key, and false otherwise
+ //
+ // Note: this function takes an extracted *key*, not an original document (the point may be
+ // computed by, say, hashing a given field or projecting to a subset of fields).
+ bool containsKey(const BSONObj& shardKey) const;
/**
- * Splits this chunk at a non-specificed split key to be chosen by the
- * mongod holding this chunk.
- *
- * @param mode
- * @param res the object containing details about the split execution
- * @param resultingSplits the number of resulting split points. Set to NULL to ignore.
- *
- * @throws UserException
+ * Get/increment/set the estimation of how much data was written for this chunk.
*/
- StatusWith<boost::optional<ChunkRange>> split(OperationContext* txn,
- size_t* resultingSplits) const;
+ uint64_t getBytesWritten() const;
+ uint64_t addBytesWritten(uint64_t bytesWrittenIncrement);
+ void clearBytesWritten();
+ void randomizeBytesWritten();
/**
* marks this chunk as a jumbo chunk
@@ -126,40 +99,7 @@ public:
*/
void markAsJumbo(OperationContext* txn) const;
- bool isJumbo() const {
- return _jumbo;
- }
-
- //
- // accessors and helpers
- //
-
- std::string toString() const;
-
- friend std::ostream& operator<<(std::ostream& out, const Chunk& c) {
- return (out << c.toString());
- }
-
- // chunk equality is determined by comparing the min and max bounds of the chunk
- bool operator==(const Chunk& s) const;
- bool operator!=(const Chunk& s) const {
- return !(*this == s);
- }
-
- ShardId getShardId() const {
- return _shardId;
- }
-
private:
- /**
- * Returns the connection string for the shard on which this chunk resides.
- */
- ConnectionString _getShardConnectionString(OperationContext* txn) const;
-
- // if min/max key is pos/neg infinity
- bool _minIsInf() const;
- bool _maxIsInf() const;
-
// The chunk manager, which owns this chunk. Not owned by the chunk.
const ChunkManager* _manager;
@@ -177,27 +117,6 @@ private:
// Statistics for the approximate data written to this chunk
mutable uint64_t _dataWritten;
-
- /**
- * Returns the split point that will result in one of the chunk having exactly one
- * document. Also returns an empty document if the split point cannot be determined.
- *
- * @param doSplitAtLower determines which side of the split will have exactly one document.
- * True means that the split point chosen will be closer to the lower bound.
- *
- * Warning: this assumes that the shard key is not "special"- that is, the shardKeyPattern
- * is simply an ordered list of ascending/descending field names. Examples:
- * {a : 1, b : -1} is not special. {a : "hashed"} is.
- */
- BSONObj _getExtremeKey(OperationContext* txn, bool doSplitAtLower) const;
-
- /**
- * Determines the appropriate split points for this chunk.
- *
- * @param atMedian perform a single split at the middle of this chunk.
- * @param splitPoints out parameter containing the chosen split points. Can be empty.
- */
- std::vector<BSONObj> _determineSplitPoints(OperationContext* txn) const;
};
} // namespace mongo
diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp
index 2d466bbb07b..324be082da5 100644
--- a/src/mongo/s/chunk_manager.cpp
+++ b/src/mongo/s/chunk_manager.cpp
@@ -53,7 +53,6 @@
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/type_collection.h"
-#include "mongo/s/chunk.h"
#include "mongo/s/chunk_diff.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/config.h"
@@ -64,7 +63,6 @@
namespace mongo {
-using std::make_pair;
using std::map;
using std::pair;
using std::set;
@@ -75,6 +73,9 @@ using std::vector;
namespace {
+// Used to generate sequence numbers to assign to each newly created ChunkManager
+AtomicUInt32 nextCMSequenceNumber(0);
+
/**
* This is an adapter so we can use config diffs - mongos and mongod do them slightly differently.
*
@@ -101,12 +102,11 @@ public:
pair<BSONObj, shared_ptr<Chunk>> rangeFor(OperationContext* txn,
const ChunkType& chunk) const final {
- shared_ptr<Chunk> c(new Chunk(txn, _manager, chunk));
- return make_pair(chunk.getMax(), c);
+ return std::make_pair(chunk.getMax(), std::make_shared<Chunk>(_manager, chunk));
}
ShardId shardFor(OperationContext* txn, const ShardId& shardId) const final {
- const auto shard = uassertStatusOK(grid.shardRegistry()->getShard(txn, shardId));
+ const auto shard = uassertStatusOK(Grid::get(txn)->shardRegistry()->getShard(txn, shardId));
return shard->getId();
}
@@ -166,8 +166,6 @@ bool isChunkMapValid(const ChunkMap& chunkMap) {
} // namespace
-AtomicUInt32 ChunkManager::NextSequenceNumber(1U);
-
ChunkManager::ChunkManager(const string& ns,
const ShardKeyPattern& pattern,
std::unique_ptr<CollatorInterface> defaultCollator,
@@ -176,7 +174,7 @@ ChunkManager::ChunkManager(const string& ns,
_keyPattern(pattern.getKeyPattern()),
_defaultCollator(std::move(defaultCollator)),
_unique(unique),
- _sequenceNumber(NextSequenceNumber.addAndFetch(1)),
+ _sequenceNumber(nextCMSequenceNumber.addAndFetch(1)),
_chunkMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<std::shared_ptr<Chunk>>()),
_chunkRangeMap(
SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<ShardAndChunkRange>()) {}
@@ -185,7 +183,7 @@ ChunkManager::ChunkManager(OperationContext* txn, const CollectionType& coll)
: _ns(coll.getNs().ns()),
_keyPattern(coll.getKeyPattern()),
_unique(coll.getUnique()),
- _sequenceNumber(NextSequenceNumber.addAndFetch(1)),
+ _sequenceNumber(nextCMSequenceNumber.addAndFetch(1)),
_chunkMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<std::shared_ptr<Chunk>>()),
_chunkRangeMap(
SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<ShardAndChunkRange>()) {
@@ -278,7 +276,7 @@ bool ChunkManager::_load(OperationContext* txn,
oldC->getLastmod(),
oldC->getBytesWritten()));
- chunkMap.insert(make_pair(oldC->getMax(), newC));
+ chunkMap.insert(std::make_pair(oldC->getMax(), newC));
}
LOG(2) << "loading chunk manager for collection " << _ns
diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h
index cbb0b6426ef..f809c6167c6 100644
--- a/src/mongo/s/chunk_manager.h
+++ b/src/mongo/s/chunk_manager.h
@@ -33,6 +33,7 @@
#include <string>
#include <vector>
+#include "mongo/base/disallow_copying.h"
#include "mongo/db/query/collation/collator_interface.h"
#include "mongo/db/repl/optime.h"
#include "mongo/s/catalog/type_chunk.h"
@@ -45,7 +46,6 @@
namespace mongo {
class CanonicalQuery;
-class Chunk;
class CollectionType;
struct QuerySolutionNode;
class OperationContext;
@@ -54,6 +54,8 @@ class OperationContext;
typedef BSONObjIndexedMap<std::shared_ptr<Chunk>> ChunkMap;
class ChunkManager {
+ MONGO_DISALLOW_COPYING(ChunkManager);
+
public:
typedef std::map<ShardId, ChunkVersion> ShardVersionMap;
@@ -270,27 +272,23 @@ private:
// OpTime of config server the last time chunks were loaded.
repl::OpTime _configOpTime;
- //
- // Split Heuristic info
- //
- class SplitHeuristics {
+ // Auto-split throttling state
+ struct AutoSplitThrottle {
public:
- SplitHeuristics() : _splitTickets(maxParallelSplits) {}
+ AutoSplitThrottle() : _splitTickets(maxParallelSplits) {}
TicketHolder _splitTickets;
// Maximum number of parallel threads requesting a split
static const int maxParallelSplits = 5;
- };
- mutable SplitHeuristics _splitHeuristics;
-
- //
- // End split heuristics
- //
+ } _autoSplitThrottle;
- friend class Chunk;
- static AtomicUInt32 NextSequenceNumber;
+ // This function needs to be able to access the auto-split throttle
+ friend void updateChunkWriteStatsAndSplitIfNeeded(OperationContext*,
+ ChunkManager*,
+ Chunk*,
+ long);
friend class TestableChunkManager;
};
diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
index e47aa1633f9..14cd8676917 100644
--- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
@@ -44,6 +44,7 @@
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_explain.h"
+#include "mongo/s/commands/cluster_write.h"
#include "mongo/s/commands/sharded_command_processing.h"
#include "mongo/s/commands/strategy.h"
#include "mongo/s/grid.h"
@@ -207,19 +208,19 @@ public:
}
BSONObj shardKey = status.getValue();
- auto chunk = chunkMgr->findIntersectingChunk(txn, shardKey, collation);
-
- if (!chunk.isOK()) {
+ auto chunkStatus = chunkMgr->findIntersectingChunk(txn, shardKey, collation);
+ if (!chunkStatus.isOK()) {
uasserted(ErrorCodes::ShardKeyNotFound,
"findAndModify must target a single shard, but was not able to due to "
"non-simple collation");
}
- bool ok =
- _runCommand(txn, conf, chunkMgr, chunk.getValue()->getShardId(), nss, cmdObj, result);
+ const auto& chunk = chunkStatus.getValue();
+
+ const bool ok = _runCommand(txn, conf, chunkMgr, chunk->getShardId(), nss, cmdObj, result);
if (ok) {
- // check whether split is necessary (using update object for size heuristic)
- chunk.getValue()->splitIfShould(txn, cmdObj.getObjectField("update").objsize());
+ updateChunkWriteStatsAndSplitIfNeeded(
+ txn, chunkMgr.get(), chunk.get(), cmdObj.getObjectField("update").objsize());
}
return ok;
diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
index be4b489ff39..232aefa5bd2 100644
--- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
@@ -49,6 +49,7 @@
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_commands_common.h"
+#include "mongo/s/commands/cluster_write.h"
#include "mongo/s/commands/sharded_command_processing.h"
#include "mongo/s/commands/strategy.h"
#include "mongo/s/config.h"
@@ -614,7 +615,7 @@ public:
warning() << "Mongod reported " << size << " bytes inserted for key " << key
<< " but can't find chunk";
} else {
- c->splitIfShould(txn, size);
+ updateChunkWriteStatsAndSplitIfNeeded(txn, cm.get(), c.get(), size);
}
}
}
diff --git a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp
index 26dde5698c9..7dd1aa79852 100644
--- a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp
+++ b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp
@@ -159,8 +159,8 @@ public:
remoteCmdObjB.append(
ClusterMergeChunksCommand::configField(),
Grid::get(txn)->shardRegistry()->getConfigServerConnectionString().toString());
- remoteCmdObjB.append(ClusterMergeChunksCommand::shardNameField(), firstChunk->getShardId());
-
+ remoteCmdObjB.append(ClusterMergeChunksCommand::shardNameField(),
+ firstChunk->getShardId().toString());
BSONObj remoteResult;
diff --git a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
index 508642e1117..a7bf3060752 100644
--- a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
+++ b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
#include "mongo/db/hasher.h"
+#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/write_concern_options.h"
@@ -64,14 +65,79 @@
#include "mongo/util/log.h"
namespace mongo {
+namespace {
-using std::shared_ptr;
-using std::list;
-using std::set;
-using std::string;
-using std::vector;
+/**
+ * Constructs the BSON specification document for the given namespace, index key and options.
+ */
+BSONObj createIndexDoc(const std::string& ns,
+ const BSONObj& keys,
+ const BSONObj& collation,
+ bool unique) {
+ BSONObjBuilder indexDoc;
+ indexDoc.append("ns", ns);
+ indexDoc.append("key", keys);
+
+ StringBuilder indexName;
+
+ bool isFirstKey = true;
+ for (BSONObjIterator keyIter(keys); keyIter.more();) {
+ BSONElement currentKey = keyIter.next();
+
+ if (isFirstKey) {
+ isFirstKey = false;
+ } else {
+ indexName << "_";
+ }
-namespace {
+ indexName << currentKey.fieldName() << "_";
+ if (currentKey.isNumber()) {
+ indexName << currentKey.numberInt();
+ } else {
+ indexName << currentKey.str(); // this should match up with shell command
+ }
+ }
+
+ indexDoc.append("name", indexName.str());
+
+ if (!collation.isEmpty()) {
+ // Creating an index with the "collation" option requires a v=2 index.
+ indexDoc.append("v", static_cast<int>(IndexDescriptor::IndexVersion::kV2));
+ indexDoc.append("collation", collation);
+ }
+
+ if (unique && !IndexDescriptor::isIdIndexPattern(keys)) {
+ indexDoc.appendBool("unique", unique);
+ }
+
+ return indexDoc.obj();
+}
+
+/**
+ * Used only for writes to the config server, config and admin databases.
+ */
+Status clusterCreateIndex(OperationContext* txn,
+ const std::string& ns,
+ const BSONObj& keys,
+ const BSONObj& collation,
+ bool unique) {
+ const NamespaceString nss(ns);
+
+ // Go through the shard insert path
+ std::unique_ptr<BatchedInsertRequest> insert(new BatchedInsertRequest());
+ insert->addToDocuments(createIndexDoc(ns, keys, collation, unique));
+
+ BatchedCommandRequest request(insert.release());
+ request.setNS(NamespaceString(nss.getSystemIndexesCollection()));
+ request.setWriteConcern(WriteConcernOptions::Acknowledged);
+
+ BatchedCommandResponse response;
+
+ ClusterWriter writer(false, 0);
+ writer.write(txn, request, &response);
+
+ return response.toStatus();
+}
class ShardCollectionCmd : public Command {
public:
@@ -194,8 +260,9 @@ public:
}
}
- vector<ShardId> shardIds;
+ std::vector<ShardId> shardIds;
shardRegistry->getAllShardIds(&shardIds);
+
const int numShards = shardIds.size();
// Cannot have more than 8192 initial chunks per shard. Setting a maximum of 1,000,000
@@ -224,7 +291,7 @@ public:
// collection.
BSONObj res;
{
- list<BSONObj> all =
+ std::list<BSONObj> all =
conn->getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll()));
if (!all.empty()) {
res = all.front().getOwned();
@@ -328,12 +395,12 @@ public:
// 5. If the collection is empty, and it's still possible to create an index
// on the proposed key, we go ahead and do so.
- list<BSONObj> indexes = conn->getIndexSpecs(nss.ns());
+ std::list<BSONObj> indexes = conn->getIndexSpecs(nss.ns());
// 1. Verify consistency with existing unique indexes
ShardKeyPattern proposedShardKey(proposedKey);
- for (list<BSONObj>::iterator it = indexes.begin(); it != indexes.end(); ++it) {
- BSONObj idx = *it;
+
+ for (const auto& idx : indexes) {
BSONObj currentKey = idx["key"].embeddedObject();
bool isUnique = idx["unique"].trueValue();
@@ -351,8 +418,7 @@ public:
// 2. Check for a useful index
bool hasUsefulIndexForKey = false;
- for (list<BSONObj>::iterator it = indexes.begin(); it != indexes.end(); ++it) {
- BSONObj idx = *it;
+ for (const auto& idx : indexes) {
BSONObj currentKey = idx["key"].embeddedObject();
// Check 2.i. and 2.ii.
if (!idx["sparse"].trueValue() && idx["filter"].eoo() && idx["collation"].eoo() &&
@@ -377,12 +443,12 @@ public:
// 3. If proposed key is required to be unique, additionally check for exact match.
bool careAboutUnique = cmdObj["unique"].trueValue();
+
if (hasUsefulIndexForKey && careAboutUnique) {
BSONObj eqQuery = BSON("ns" << nss.ns() << "key" << proposedKey);
BSONObj eqQueryResult;
- for (list<BSONObj>::iterator it = indexes.begin(); it != indexes.end(); ++it) {
- BSONObj idx = *it;
+ for (const auto& idx : indexes) {
if (SimpleBSONObjComparator::kInstance.evaluate(idx["key"].embeddedObject() ==
proposedKey)) {
eqQueryResult = idx;
@@ -459,8 +525,8 @@ public:
// 2. move them one at a time
// 3. split the big chunks to achieve the desired total number of initial chunks
- vector<BSONObj> initSplits; // there will be at most numShards-1 of these
- vector<BSONObj> allSplits; // all of the initial desired split points
+ std::vector<BSONObj> initSplits; // there will be at most numShards-1 of these
+ std::vector<BSONObj> allSplits; // all of the initial desired split points
// only pre-split when using a hashed shard key and collection is still empty
if (isHashedShardKey && isEmpty) {
@@ -516,16 +582,13 @@ public:
audit::logShardCollection(Client::getCurrent(), nss.ns(), proposedKey, careAboutUnique);
- Status status = catalogClient->shardCollection(txn,
+ uassertStatusOK(catalogClient->shardCollection(txn,
nss.ns(),
proposedShardKey,
defaultCollation,
careAboutUnique,
initSplits,
- std::set<ShardId>{});
- if (!status.isOK()) {
- return appendCommandStatus(result, status);
- }
+ std::set<ShardId>{}));
// Make sure the cached metadata for the collection knows that we are now sharded
config->getChunkManager(txn, nss.ns(), true /* reload */);
@@ -536,7 +599,7 @@ public:
if (isHashedShardKey && isEmpty) {
// Reload the new config info. If we created more than one initial chunk, then
// we need to move them around to balance.
- shared_ptr<ChunkManager> chunkManager = config->getChunkManager(txn, nss.ns(), true);
+ auto chunkManager = config->getChunkManager(txn, nss.ns(), true);
ChunkMap chunkMap = chunkManager->getChunkMap();
// 2. Move and commit each "big chunk" to a different shard.
@@ -549,7 +612,7 @@ public:
}
const auto to = toStatus.getValue();
- shared_ptr<Chunk> chunk = c->second;
+ auto chunk = c->second;
// Can't move chunk to shard it's already on
if (to->getId() == chunk->getShardId()) {
@@ -587,10 +650,10 @@ public:
// 3. Subdivide the big chunks by splitting at each of the points in "allSplits"
// that we haven't already split by.
- shared_ptr<Chunk> currentChunk =
+ auto currentChunk =
chunkManager->findIntersectingChunkWithSimpleCollation(txn, allSplits[0]);
- vector<BSONObj> subSplits;
+ std::vector<BSONObj> subSplits;
for (unsigned i = 0; i <= allSplits.size(); i++) {
if (i == allSplits.size() || !currentChunk->containsKey(allSplits[i])) {
if (!subSplits.empty()) {
diff --git a/src/mongo/s/commands/cluster_write.cpp b/src/mongo/s/commands/cluster_write.cpp
index 03d0306aebc..217c37e273b 100644
--- a/src/mongo/s/commands/cluster_write.cpp
+++ b/src/mongo/s/commands/cluster_write.cpp
@@ -32,108 +32,161 @@
#include "mongo/s/commands/cluster_write.h"
-#include <string>
-#include <vector>
-
#include "mongo/base/status.h"
-#include "mongo/db/index/index_descriptor.h"
+#include "mongo/client/connpool.h"
+#include "mongo/db/lasterror.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/s/balancer_configuration.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/catalog/type_collection.h"
+#include "mongo/s/chunk.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/chunk_manager_targeter.h"
#include "mongo/s/client/dbclient_multi_command.h"
+#include "mongo/s/client/shard_registry.h"
#include "mongo/s/config.h"
+#include "mongo/s/config_server_client.h"
#include "mongo/s/grid.h"
-#include "mongo/s/mongos_options.h"
-#include "mongo/s/write_ops/batch_write_exec.h"
+#include "mongo/s/shard_util.h"
+#include "mongo/s/sharding_raii.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
namespace mongo {
+namespace {
-using std::shared_ptr;
-using std::unique_ptr;
-using std::vector;
-using std::map;
-using std::string;
-using std::stringstream;
+// Test whether we should split once data * splitTestFactor > chunkSize (approximately)
+const uint64_t splitTestFactor = 5;
-using IndexVersion = IndexDescriptor::IndexVersion;
+const uint64_t kTooManySplitPoints = 4;
-namespace {
+void toBatchError(const Status& status, BatchedCommandResponse* response) {
+ response->clear();
+ response->setErrCode(status.code());
+ response->setErrMessage(status.reason());
+ response->setOk(false);
+ dassert(response->isValid(NULL));
+}
+
+void reloadChunkManager(OperationContext* txn, const NamespaceString& nss) {
+ auto config = uassertStatusOK(ScopedShardDatabase::getExisting(txn, nss.db()));
+ config.db()->getChunkManagerIfExists(txn, nss.ns(), true);
+}
/**
- * Constructs the BSON specification document for the given namespace, index key
- * and options.
+ * Given a maxChunkSize configuration and the number of chunks in a particular sharded collection,
+ * returns an optimal chunk size to use in order to achieve a good ratio between number of chunks
+ * and their size.
*/
-BSONObj createIndexDoc(const string& ns,
- const BSONObj& keys,
- const BSONObj& collation,
- bool unique) {
- BSONObjBuilder indexDoc;
- indexDoc.append("ns", ns);
- indexDoc.append("key", keys);
-
- stringstream indexName;
-
- bool isFirstKey = true;
- for (BSONObjIterator keyIter(keys); keyIter.more();) {
- BSONElement currentKey = keyIter.next();
-
- if (isFirstKey) {
- isFirstKey = false;
- } else {
- indexName << "_";
- }
-
- indexName << currentKey.fieldName() << "_";
- if (currentKey.isNumber()) {
- indexName << currentKey.numberInt();
- } else {
- indexName << currentKey.str(); // this should match up with shell command
- }
+uint64_t calculateDesiredChunkSize(uint64_t maxChunkSizeBytes, uint64_t numChunks) {
+ // Splitting faster in early chunks helps spread out an initial load better
+ const uint64_t minChunkSize = 1 << 20; // 1 MBytes
+
+ if (numChunks <= 1) {
+ return 1024;
+ } else if (numChunks < 3) {
+ return minChunkSize / 2;
+ } else if (numChunks < 10) {
+ return std::max(maxChunkSizeBytes / 4, minChunkSize);
+ } else if (numChunks < 20) {
+ return std::max(maxChunkSizeBytes / 2, minChunkSize);
+ } else {
+ return maxChunkSizeBytes;
}
+}
- indexDoc.append("name", indexName.str());
+/**
+ * Returns the split point that will result in one of the chunk having exactly one document. Also
+ * returns an empty document if the split point cannot be determined.
+ *
+ * doSplitAtLower - determines which side of the split will have exactly one document. True means
+ * that the split point chosen will be closer to the lower bound.
+ *
+ * NOTE: this assumes that the shard key is not "special"- that is, the shardKeyPattern is simply an
+ * ordered list of ascending/descending field names. For example {a : 1, b : -1} is not special, but
+ * {a : "hashed"} is.
+ */
+BSONObj findExtremeKeyForShard(OperationContext* txn,
+ const NamespaceString& nss,
+ const ShardId& shardId,
+ const ShardKeyPattern& shardKeyPattern,
+ bool doSplitAtLower) {
+ Query q;
+
+ if (doSplitAtLower) {
+ q.sort(shardKeyPattern.toBSON());
+ } else {
+ // need to invert shard key pattern to sort backwards
+ // TODO: make a helper in ShardKeyPattern?
+ BSONObjBuilder r;
+
+ BSONObjIterator i(shardKeyPattern.toBSON());
+ while (i.more()) {
+ BSONElement e = i.next();
+ uassert(10163, "can only handle numbers here - which i think is correct", e.isNumber());
+ r.append(e.fieldName(), -1 * e.number());
+ }
- if (!collation.isEmpty()) {
- // Creating an index with the "collation" option requires a v=2 index.
- indexDoc.append("v", static_cast<int>(IndexVersion::kV2));
- indexDoc.append("collation", collation);
+ q.sort(r.obj());
}
- if (unique && !IndexDescriptor::isIdIndexPattern(keys)) {
- indexDoc.appendBool("unique", unique);
+ // Find the extreme key
+ const auto shardConnStr = [&]() {
+ const auto shard = uassertStatusOK(Grid::get(txn)->shardRegistry()->getShard(txn, shardId));
+ return shard->getConnString();
+ }();
+
+ ScopedDbConnection conn(shardConnStr);
+
+ BSONObj end;
+
+ if (doSplitAtLower) {
+ // Splitting close to the lower bound means that the split point will be the
+ // upper bound. Chunk range upper bounds are exclusive so skip a document to
+ // make the lower half of the split end up with a single document.
+ std::unique_ptr<DBClientCursor> cursor = conn->query(nss.ns(),
+ q,
+ 1, /* nToReturn */
+ 1 /* nToSkip */);
+
+ uassert(28736,
+ str::stream() << "failed to initialize cursor during auto split due to "
+ << "connection problem with "
+ << conn->getServerAddress(),
+ cursor.get() != nullptr);
+
+ if (cursor->more()) {
+ end = cursor->next().getOwned();
+ }
+ } else {
+ end = conn->findOne(nss.ns(), q);
}
- return indexDoc.obj();
-}
+ conn.done();
-void toBatchError(const Status& status, BatchedCommandResponse* response) {
- response->clear();
- response->setErrCode(status.code());
- response->setErrMessage(status.reason());
- response->setOk(false);
- dassert(response->isValid(NULL));
+ if (end.isEmpty()) {
+ return BSONObj();
+ }
+
+ return shardKeyPattern.extractShardKeyFromDoc(end);
}
/**
* Splits the chunks touched based from the targeter stats if needed.
*/
void splitIfNeeded(OperationContext* txn, const NamespaceString& nss, const TargeterStats& stats) {
- auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString());
+ auto status = Grid::get(txn)->catalogCache()->getDatabase(txn, nss.db().toString());
if (!status.isOK()) {
warning() << "failed to get database config for " << nss
<< " while checking for auto-split: " << status.getStatus();
return;
}
- shared_ptr<DBConfig> config = status.getValue();
+ auto config = status.getValue();
- shared_ptr<ChunkManager> chunkManager;
- shared_ptr<Shard> dummyShard;
+ std::shared_ptr<ChunkManager> chunkManager;
+ std::shared_ptr<Shard> dummyShard;
config->getChunkManagerOrPrimary(txn, nss.ns(), chunkManager, dummyShard);
if (!chunkManager) {
@@ -141,7 +194,7 @@ void splitIfNeeded(OperationContext* txn, const NamespaceString& nss, const Targ
}
for (auto it = stats.chunkSizeDelta.cbegin(); it != stats.chunkSizeDelta.cend(); ++it) {
- shared_ptr<Chunk> chunk;
+ std::shared_ptr<Chunk> chunk;
try {
chunk = chunkManager->findIntersectingChunkWithSimpleCollation(txn, it->first);
} catch (const AssertionException& ex) {
@@ -150,42 +203,23 @@ void splitIfNeeded(OperationContext* txn, const NamespaceString& nss, const Targ
return;
}
- chunk->splitIfShould(txn, it->second);
+ updateChunkWriteStatsAndSplitIfNeeded(txn, chunkManager.get(), chunk.get(), it->second);
}
}
} // namespace
-Status clusterCreateIndex(
- OperationContext* txn, const string& ns, BSONObj keys, BSONObj collation, bool unique) {
- const NamespaceString nss(ns);
- const std::string dbName = nss.db().toString();
-
- BSONObj indexDoc = createIndexDoc(ns, keys, collation, unique);
-
- // Go through the shard insert path
- std::unique_ptr<BatchedInsertRequest> insert(new BatchedInsertRequest());
- insert->addToDocuments(indexDoc);
-
- BatchedCommandRequest request(insert.release());
- request.setNS(NamespaceString(nss.getSystemIndexesCollection()));
- request.setWriteConcern(WriteConcernOptions::Acknowledged);
-
- BatchedCommandResponse response;
-
- ClusterWriter writer(false, 0);
- writer.write(txn, request, &response);
-
- return response.toStatus();
-}
-
+ClusterWriter::ClusterWriter(bool autoSplit, int timeoutMillis)
+ : _autoSplit(autoSplit), _timeoutMillis(timeoutMillis) {}
void ClusterWriter::write(OperationContext* txn,
const BatchedCommandRequest& origRequest,
BatchedCommandResponse* response) {
// Add _ids to insert request if req'd
- unique_ptr<BatchedCommandRequest> idRequest(BatchedCommandRequest::cloneWithIds(origRequest));
- const BatchedCommandRequest* request = NULL != idRequest.get() ? idRequest.get() : &origRequest;
+ std::unique_ptr<BatchedCommandRequest> idRequest(
+ BatchedCommandRequest::cloneWithIds(origRequest));
+
+ const BatchedCommandRequest* request = idRequest ? idRequest.get() : &origRequest;
const NamespaceString& nss = request->getNS();
if (!nss.isValid()) {
@@ -215,28 +249,28 @@ void ClusterWriter::write(OperationContext* txn,
return;
}
- string errMsg;
+ std::string errMsg;
if (request->isInsertIndexRequest() && !request->isValidIndexRequest(&errMsg)) {
toBatchError(Status(ErrorCodes::InvalidOptions, errMsg), response);
return;
}
// Config writes and shard writes are done differently
- const string dbName = nss.db().toString();
-
- unique_ptr<BatchedCommandRequest> requestWithWriteConcern;
- if (dbName == "config" || dbName == "admin") {
- // w:majority is the only valid write concern for writes to the config servers.
- // We also allow w:1 to come in on a user-initiated write, though we convert it here to
- // w:majority before sending it to the config servers.
+ if (nss.db() == NamespaceString::kConfigDb || nss.db() == NamespaceString::kAdminDb) {
+ // w:majority is the only valid write concern for writes to the config servers. We also
+ // allow w:1 to come in on a user-initiated write, though we convert it here to w:majority
+ // before sending it to the config servers.
bool rewriteCmdWithWriteConcern = false;
+
WriteConcernOptions writeConcern;
+
if (request->isWriteConcernSet()) {
Status status = writeConcern.parse(request->getWriteConcern());
if (!status.isOK()) {
toBatchError(status, response);
return;
}
+
if (!writeConcern.validForConfigServers()) {
toBatchError(Status(ErrorCodes::InvalidOptions,
"Invalid replication write concern. Writes to config servers "
@@ -244,6 +278,7 @@ void ClusterWriter::write(OperationContext* txn,
response);
return;
}
+
if (writeConcern.wMode == "") {
invariant(writeConcern.wNumNodes == 1);
rewriteCmdWithWriteConcern = true;
@@ -252,6 +287,8 @@ void ClusterWriter::write(OperationContext* txn,
rewriteCmdWithWriteConcern = true;
}
+ std::unique_ptr<BatchedCommandRequest> requestWithWriteConcern;
+
if (rewriteCmdWithWriteConcern) {
requestWithWriteConcern.reset(new BatchedCommandRequest(request->getBatchType()));
request->cloneTo(requestWithWriteConcern.get());
@@ -261,7 +298,7 @@ void ClusterWriter::write(OperationContext* txn,
request = requestWithWriteConcern.get();
}
- grid.catalogClient(txn)->writeConfigServerDirect(txn, *request, response);
+ Grid::get(txn)->catalogClient(txn)->writeConfigServerDirect(txn, *request, response);
} else {
TargeterStats targeterStats;
@@ -292,11 +329,185 @@ void ClusterWriter::write(OperationContext* txn,
}
}
-ClusterWriter::ClusterWriter(bool autoSplit, int timeoutMillis)
- : _autoSplit(autoSplit), _timeoutMillis(timeoutMillis) {}
-
const BatchWriteExecStats& ClusterWriter::getStats() {
return _stats;
}
+void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* txn,
+ ChunkManager* manager,
+ Chunk* chunk,
+ long dataWritten) {
+ // Disable lastError tracking so that any errors, which occur during auto-split do not get
+ // bubbled up on the client connection doing a write.
+ LastError::Disabled d(&LastError::get(cc()));
+
+ const auto balancerConfig = Grid::get(txn)->getBalancerConfiguration();
+
+ const bool minIsInf =
+ (0 == manager->getShardKeyPattern().getKeyPattern().globalMin().woCompare(chunk->getMin()));
+ const bool maxIsInf =
+ (0 == manager->getShardKeyPattern().getKeyPattern().globalMax().woCompare(chunk->getMax()));
+
+ const uint64_t chunkBytesWritten = chunk->addBytesWritten(dataWritten);
+
+ const uint64_t desiredChunkSize =
+ calculateDesiredChunkSize(balancerConfig->getMaxChunkSizeBytes(), manager->numChunks());
+
+ // If this chunk is at either end of the range, trigger auto-split at 10% less data written in
+ // order to trigger the top-chunk optimization.
+ const uint64_t splitThreshold = (minIsInf || maxIsInf)
+ ? static_cast<uint64_t>((double)desiredChunkSize * 0.9)
+ : desiredChunkSize;
+
+ // Check if there are enough estimated bytes written to warrant a split
+ if (chunkBytesWritten < splitThreshold / splitTestFactor) {
+ return;
+ }
+
+ const NamespaceString nss(manager->getns());
+
+ if (!manager->_autoSplitThrottle._splitTickets.tryAcquire()) {
+ LOG(1) << "won't auto split because not enough tickets: " << nss;
+ return;
+ }
+
+ TicketHolderReleaser releaser(&(manager->_autoSplitThrottle._splitTickets));
+
+ const ChunkRange chunkRange(chunk->getMin(), chunk->getMax());
+
+ try {
+ // Ensure we have the most up-to-date balancer configuration
+ uassertStatusOK(balancerConfig->refreshAndCheck(txn));
+
+ if (!balancerConfig->getShouldAutoSplit()) {
+ return;
+ }
+
+ LOG(1) << "about to initiate autosplit: " << redact(chunk->toString())
+ << " dataWritten: " << chunkBytesWritten << " splitThreshold: " << splitThreshold;
+
+ const uint64_t chunkSizeToUse = [&]() {
+ const uint64_t estNumSplitPoints = chunkBytesWritten / desiredChunkSize * 2;
+
+ if (estNumSplitPoints >= kTooManySplitPoints) {
+ // The current desired chunk size will split the chunk into lots of small chunk and
+ // at the worst case this can result into thousands of chunks. So check and see if a
+ // bigger value can be used.
+ return std::min(chunkBytesWritten, balancerConfig->getMaxChunkSizeBytes());
+ } else {
+ return desiredChunkSize;
+ }
+ }();
+
+ auto splitPoints =
+ uassertStatusOK(shardutil::selectChunkSplitPoints(txn,
+ chunk->getShardId(),
+ nss,
+ manager->getShardKeyPattern(),
+ chunkRange,
+ chunkSizeToUse,
+ boost::none));
+
+ if (splitPoints.size() <= 1) {
+ // No split points means there isn't enough data to split on; 1 split point means we
+ // have
+ // between half the chunk size to full chunk size so there is no need to split yet
+ chunk->clearBytesWritten();
+ return;
+ }
+
+ if (minIsInf || maxIsInf) {
+ // We don't want to reset _dataWritten since we want to check the other side right away
+ } else {
+ // We're splitting, so should wait a bit
+ chunk->clearBytesWritten();
+ }
+
+ // We assume that if the chunk being split is the first (or last) one on the collection,
+ // this chunk is likely to see more insertions. Instead of splitting mid-chunk, we use the
+ // very first (or last) key as a split point.
+ //
+ // This heuristic is skipped for "special" shard key patterns that are not likely to produce
+ // monotonically increasing or decreasing values (e.g. hashed shard keys).
+ if (KeyPattern::isOrderedKeyPattern(manager->getShardKeyPattern().toBSON())) {
+ if (minIsInf) {
+ BSONObj key = findExtremeKeyForShard(
+ txn, nss, chunk->getShardId(), manager->getShardKeyPattern(), true);
+ if (!key.isEmpty()) {
+ splitPoints.front() = key.getOwned();
+ }
+ } else if (maxIsInf) {
+ BSONObj key = findExtremeKeyForShard(
+ txn, nss, chunk->getShardId(), manager->getShardKeyPattern(), false);
+ if (!key.isEmpty()) {
+ splitPoints.back() = key.getOwned();
+ }
+ }
+ }
+
+ const auto suggestedMigrateChunk =
+ uassertStatusOK(shardutil::splitChunkAtMultiplePoints(txn,
+ chunk->getShardId(),
+ nss,
+ manager->getShardKeyPattern(),
+ manager->getVersion(),
+ chunkRange,
+ splitPoints));
+
+ // Balance the resulting chunks if the option is enabled and if the shard suggested a chunk
+ // to balance
+ const bool shouldBalance = [&]() {
+ if (!balancerConfig->shouldBalanceForAutoSplit())
+ return false;
+
+ auto collStatus =
+ Grid::get(txn)->catalogClient(txn)->getCollection(txn, manager->getns());
+ if (!collStatus.isOK()) {
+ log() << "Auto-split for " << nss << " failed to load collection metadata"
+ << causedBy(redact(collStatus.getStatus()));
+ return false;
+ }
+
+ return collStatus.getValue().value.getAllowBalance();
+ }();
+
+ log() << "autosplitted " << nss << " chunk: " << redact(chunk->toString()) << " into "
+ << (splitPoints.size() + 1) << " parts (splitThreshold " << splitThreshold << ")"
+ << (suggestedMigrateChunk ? "" : (std::string) " (migrate suggested" +
+ (shouldBalance ? ")" : ", but no migrations allowed)"));
+
+ if (!shouldBalance || !suggestedMigrateChunk) {
+ reloadChunkManager(txn, nss);
+ return;
+ }
+
+ // Top chunk optimization - try to move the top chunk out of this shard to prevent the hot
+ // spot
+ // from staying on a single shard. This is based on the assumption that succeeding inserts
+ // will
+ // fall on the top chunk.
+
+ // We need to use the latest chunk manager (after the split) in order to have the most
+ // up-to-date view of the chunk we are about to move
+ auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(txn, nss));
+ auto suggestedChunk = scopedCM.cm()->findIntersectingChunkWithSimpleCollation(
+ txn, suggestedMigrateChunk->getMin());
+
+ ChunkType chunkToMove;
+ chunkToMove.setNS(nss.ns());
+ chunkToMove.setShard(suggestedChunk->getShardId());
+ chunkToMove.setMin(suggestedChunk->getMin());
+ chunkToMove.setMax(suggestedChunk->getMax());
+ chunkToMove.setVersion(suggestedChunk->getLastmod());
+
+ uassertStatusOK(configsvr_client::rebalanceChunk(txn, chunkToMove));
+
+ reloadChunkManager(txn, nss);
+ } catch (const DBException& ex) {
+ chunk->randomizeBytesWritten();
+
+ log() << "Unable to auto-split chunk " << redact(chunkRange.toString()) << causedBy(ex);
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_write.h b/src/mongo/s/commands/cluster_write.h
index a0d5e931104..80f4a325ddf 100644
--- a/src/mongo/s/commands/cluster_write.h
+++ b/src/mongo/s/commands/cluster_write.h
@@ -28,12 +28,15 @@
#pragma once
+#include <string>
+
#include "mongo/s/write_ops/batch_write_exec.h"
namespace mongo {
-class BatchedCommandRequest;
-class BatchedCommandResponse;
+class BSONObj;
+class Chunk;
+class ChunkManager;
class OperationContext;
class ClusterWriter {
@@ -54,9 +57,13 @@ private:
};
/**
- * Used only for writes to the config server, config and admin databases.
+ * Adds the specified amount of data written to the chunk's stats and if the total amount nears the
+ * max size of a shard attempt to split the chunk. This call is opportunistic and swallows any
+ * errors.
*/
-Status clusterCreateIndex(
- OperationContext* txn, const std::string& ns, BSONObj keys, BSONObj collation, bool unique);
+void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* txn,
+ ChunkManager* manager,
+ Chunk* chunk,
+ long dataWritten);
} // namespace mongo