summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorMatthew Saltz <matthew.saltz@mongodb.com>2018-08-08 16:11:56 -0400
committerMatthew Saltz <matthew.saltz@mongodb.com>2018-08-09 14:30:26 -0400
commitb225e12c9759f88c7d7d4e30fa4e7c13be1dc95a (patch)
tree02e89549c89129212b6921aff94ae71317a7fe5e /src/mongo/s
parent4363a94cd3ebb72a125f25bcdee110c12a82f40b (diff)
downloadmongo-b225e12c9759f88c7d7d4e30fa4e7c13be1dc95a.tar.gz
SERVER-34448 Remove autosplitting logic from map-reduce on mongos
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/chunk_manager.h5
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_cmd.cpp27
-rw-r--r--src/mongo/s/write_ops/cluster_write.cpp257
-rw-r--r--src/mongo/s/write_ops/cluster_write.h12
4 files changed, 0 insertions, 301 deletions
diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h
index 40baca2cd68..99b91954791 100644
--- a/src/mongo/s/chunk_manager.h
+++ b/src/mongo/s/chunk_manager.h
@@ -212,11 +212,6 @@ private:
} _autoSplitThrottle;
friend class ChunkManager;
- // This function needs to be able to access the auto-split throttle
- friend void updateChunkWriteStatsAndSplitIfNeeded(OperationContext*,
- ChunkManager*,
- Chunk,
- long);
};
// This will be renamed to RoutingTableHistory and the original RoutingTableHistory will be
diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
index 11de9f74faa..71b63d8b0ef 100644
--- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
@@ -543,7 +543,6 @@ public:
shardedOutputCollUUID->appendToBuilder(&finalCmd, "shardedOutputCollUUID");
}
- auto chunkSizes = SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<int>();
{
// Take distributed lock to prevent split / migration.
auto scopedDistLock = catalogClient->getDistLockManager()->lock(
@@ -599,20 +598,6 @@ public:
reduceCount += counts.getIntField("reduce");
outputCount += counts.getIntField("output");
postCountsB.append(server, counts);
-
- // get the size inserted for each chunk
- // split cannot be called here since we already have the distributed lock
- if (singleResult.hasField("chunkSizes")) {
- std::vector<BSONElement> sizes =
- singleResult.getField("chunkSizes").Array();
- for (unsigned int i = 0; i < sizes.size(); i += 2) {
- BSONObj key = sizes[i].Obj().getOwned();
- const long long size = sizes[i + 1].numberLong();
-
- invariant(size < std::numeric_limits<int>::max());
- chunkSizes[key] = static_cast<int>(size);
- }
- }
}
}
@@ -624,18 +609,6 @@ public:
str::stream() << "Failed to write mapreduce output to " << outputCollNss.ns()
<< "; expected that collection to be sharded, but it was not",
outputRoutingInfo.cm());
-
- const auto outputCM = outputRoutingInfo.cm();
-
- for (const auto& chunkSize : chunkSizes) {
- BSONObj key = chunkSize.first;
- const int size = chunkSize.second;
- invariant(size < std::numeric_limits<int>::max());
-
- // Key reported should be the chunk's minimum
- auto chunkWritten = outputCM->findIntersectingChunkWithSimpleCollation(key);
- updateChunkWriteStatsAndSplitIfNeeded(opCtx, outputCM.get(), chunkWritten, size);
- }
}
cleanUp(servers, dbname, shardResultCollection);
diff --git a/src/mongo/s/write_ops/cluster_write.cpp b/src/mongo/s/write_ops/cluster_write.cpp
index cdf9b745b39..258f48308dd 100644
--- a/src/mongo/s/write_ops/cluster_write.cpp
+++ b/src/mongo/s/write_ops/cluster_write.cpp
@@ -54,91 +54,12 @@
namespace mongo {
namespace {
-const uint64_t kTooManySplitPoints = 4;
-
void toBatchError(const Status& status, BatchedCommandResponse* response) {
response->clear();
response->setStatus(status);
dassert(response->isValid(NULL));
}
-/**
- * Returns the split point that will result in one of the chunk having exactly one document. Also
- * returns an empty document if the split point cannot be determined.
- *
- * doSplitAtLower - determines which side of the split will have exactly one document. True means
- * that the split point chosen will be closer to the lower bound.
- *
- * NOTE: this assumes that the shard key is not "special"- that is, the shardKeyPattern is simply an
- * ordered list of ascending/descending field names. For example {a : 1, b : -1} is not special, but
- * {a : "hashed"} is.
- */
-BSONObj findExtremeKeyForShard(OperationContext* opCtx,
- const NamespaceString& nss,
- const ShardId& shardId,
- const ShardKeyPattern& shardKeyPattern,
- bool doSplitAtLower) {
- Query q;
-
- if (doSplitAtLower) {
- q.sort(shardKeyPattern.toBSON());
- } else {
- // need to invert shard key pattern to sort backwards
- // TODO: make a helper in ShardKeyPattern?
- BSONObjBuilder r;
-
- BSONObjIterator i(shardKeyPattern.toBSON());
- while (i.more()) {
- BSONElement e = i.next();
- uassert(10163, "can only handle numbers here - which i think is correct", e.isNumber());
- r.append(e.fieldName(), -1 * e.number());
- }
-
- q.sort(r.obj());
- }
-
- // Find the extreme key
- const auto shardConnStr = [&]() {
- const auto shard =
- uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
- return shard->getConnString();
- }();
-
- ScopedDbConnection conn(shardConnStr);
-
- BSONObj end;
-
- if (doSplitAtLower) {
- // Splitting close to the lower bound means that the split point will be the
- // upper bound. Chunk range upper bounds are exclusive so skip a document to
- // make the lower half of the split end up with a single document.
- std::unique_ptr<DBClientCursor> cursor = conn->query(nss.ns(),
- q,
- 1, /* nToReturn */
- 1 /* nToSkip */);
-
- uassert(28736,
- str::stream() << "failed to initialize cursor during auto split due to "
- << "connection problem with "
- << conn->getServerAddress(),
- cursor.get() != nullptr);
-
- if (cursor->more()) {
- end = cursor->next().getOwned();
- }
- } else {
- end = conn->findOne(nss.ns(), q);
- }
-
- conn.done();
-
- if (end.isEmpty()) {
- return BSONObj();
- }
-
- return shardKeyPattern.extractShardKeyFromDoc(end);
-}
-
} // namespace
void ClusterWriter::write(OperationContext* opCtx,
@@ -200,182 +121,4 @@ void ClusterWriter::write(OperationContext* opCtx,
}
}
-void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* opCtx,
- ChunkManager* manager,
- Chunk chunk,
- long chunkBytesWritten) {
- // Disable lastError tracking so that any errors, which occur during auto-split do not get
- // bubbled up on the client connection doing a write
- LastError::Disabled disableLastError(&LastError::get(opCtx->getClient()));
-
- const auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration();
-
- const bool minIsInf =
- (0 == manager->getShardKeyPattern().getKeyPattern().globalMin().woCompare(chunk.getMin()));
- const bool maxIsInf =
- (0 == manager->getShardKeyPattern().getKeyPattern().globalMax().woCompare(chunk.getMax()));
-
- auto writesTracker = chunk.getWritesTracker();
- writesTracker->addBytesWritten(chunkBytesWritten);
-
- const uint64_t desiredChunkSize = balancerConfig->getMaxChunkSizeBytes();
-
- if (!writesTracker->shouldSplit(desiredChunkSize)) {
- return;
- }
-
- const NamespaceString& nss = manager->getns();
-
- if (!manager->autoSplitThrottle()._splitTickets.tryAcquire()) {
- LOG(1) << "won't auto split because not enough tickets: " << nss;
- return;
- }
-
- TicketHolderReleaser releaser(&(manager->autoSplitThrottle()._splitTickets));
-
- const ChunkRange chunkRange(chunk.getMin(), chunk.getMax());
-
- try {
- // Ensure we have the most up-to-date balancer configuration
- uassertStatusOK(balancerConfig->refreshAndCheck(opCtx));
-
- if (!balancerConfig->getShouldAutoSplit()) {
- return;
- }
-
- LOG(1) << "about to initiate autosplit: " << redact(chunk.toString())
- << " dataWritten: " << chunkBytesWritten
- << " desiredChunkSize: " << desiredChunkSize;
-
- const uint64_t chunkSizeToUse = [&]() {
- const uint64_t estNumSplitPoints = chunkBytesWritten / desiredChunkSize * 2;
-
- if (estNumSplitPoints >= kTooManySplitPoints) {
- // The current desired chunk size will split the chunk into lots of small chunk and
- // at the worst case this can result into thousands of chunks. So check and see if a
- // bigger value can be used.
- return std::min((uint64_t)chunkBytesWritten,
- balancerConfig->getMaxChunkSizeBytes());
- } else {
- return desiredChunkSize;
- }
- }();
-
- auto splitPoints =
- uassertStatusOK(shardutil::selectChunkSplitPoints(opCtx,
- chunk.getShardId(),
- nss,
- manager->getShardKeyPattern(),
- chunkRange,
- chunkSizeToUse,
- boost::none));
-
- if (splitPoints.size() <= 1) {
- // No split points means there isn't enough data to split on; 1 split point means we
- // have
- // between half the chunk size to full chunk size so there is no need to split yet
- writesTracker->clearBytesWritten();
- return;
- }
-
- if (minIsInf || maxIsInf) {
- // We don't want to reset _dataWritten since we want to check the other side right away
- } else {
- // We're splitting, so should wait a bit
- writesTracker->clearBytesWritten();
- }
-
- // We assume that if the chunk being split is the first (or last) one on the collection,
- // this chunk is likely to see more insertions. Instead of splitting mid-chunk, we use the
- // very first (or last) key as a split point.
- //
- // This heuristic is skipped for "special" shard key patterns that are not likely to produce
- // monotonically increasing or decreasing values (e.g. hashed shard keys).
- if (KeyPattern::isOrderedKeyPattern(manager->getShardKeyPattern().toBSON())) {
- if (minIsInf) {
- BSONObj key = findExtremeKeyForShard(
- opCtx, nss, chunk.getShardId(), manager->getShardKeyPattern(), true);
- if (!key.isEmpty()) {
- splitPoints.front() = key.getOwned();
- }
- } else if (maxIsInf) {
- BSONObj key = findExtremeKeyForShard(
- opCtx, nss, chunk.getShardId(), manager->getShardKeyPattern(), false);
- if (!key.isEmpty()) {
- splitPoints.back() = key.getOwned();
- }
- }
- }
-
- const auto suggestedMigrateChunk =
- uassertStatusOK(shardutil::splitChunkAtMultiplePoints(opCtx,
- chunk.getShardId(),
- nss,
- manager->getShardKeyPattern(),
- manager->getVersion(),
- chunkRange,
- splitPoints));
-
- // Balance the resulting chunks if the option is enabled and if the shard suggested a chunk
- // to balance
- const bool shouldBalance = [&]() {
- if (!balancerConfig->shouldBalanceForAutoSplit())
- return false;
-
- auto collStatus =
- Grid::get(opCtx)->catalogClient()->getCollection(opCtx, manager->getns());
- if (!collStatus.isOK()) {
- log() << "Auto-split for " << nss << " failed to load collection metadata"
- << causedBy(redact(collStatus.getStatus()));
- return false;
- }
-
- return collStatus.getValue().value.getAllowBalance();
- }();
-
- log() << "autosplitted " << nss << " chunk: " << redact(chunk.toString()) << " into "
- << (splitPoints.size() + 1) << " parts (desiredChunkSize " << desiredChunkSize << ")"
- << (suggestedMigrateChunk ? "" : (std::string) " (migrate suggested" +
- (shouldBalance ? ")" : ", but no migrations allowed)"));
-
- // Reload the chunk manager after the split
- auto routingInfo = uassertStatusOK(
- Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx,
- nss));
-
- if (!shouldBalance || !suggestedMigrateChunk) {
- return;
- }
-
- // Top chunk optimization - try to move the top chunk out of this shard to prevent the hot
- // spot from staying on a single shard. This is based on the assumption that succeeding
- // inserts will fall on the top chunk.
-
- // We need to use the latest chunk manager (after the split) in order to have the most
- // up-to-date view of the chunk we are about to move
- auto suggestedChunk = routingInfo.cm()->findIntersectingChunkWithSimpleCollation(
- suggestedMigrateChunk->getMin());
-
- ChunkType chunkToMove;
- chunkToMove.setNS(nss);
- chunkToMove.setShard(suggestedChunk.getShardId());
- chunkToMove.setMin(suggestedChunk.getMin());
- chunkToMove.setMax(suggestedChunk.getMax());
- chunkToMove.setVersion(suggestedChunk.getLastmod());
-
- uassertStatusOK(configsvr_client::rebalanceChunk(opCtx, chunkToMove));
-
- // Ensure the collection gets reloaded because of the move
- Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss);
- } catch (const DBException& ex) {
- chunk.getWritesTracker()->clearBytesWritten();
-
- if (ErrorCodes::isStaleShardVersionError(ex.code())) {
- log() << "Unable to auto-split chunk " << redact(chunkRange.toString()) << causedBy(ex)
- << ", going to invalidate routing table entry for " << nss;
- Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss);
- }
- }
-}
-
} // namespace mongo
diff --git a/src/mongo/s/write_ops/cluster_write.h b/src/mongo/s/write_ops/cluster_write.h
index 7e45dc27b45..f4604fc2040 100644
--- a/src/mongo/s/write_ops/cluster_write.h
+++ b/src/mongo/s/write_ops/cluster_write.h
@@ -35,8 +35,6 @@
namespace mongo {
class BSONObj;
-class Chunk;
-class ChunkManager;
class OperationContext;
class ClusterWriter {
@@ -47,14 +45,4 @@ public:
BatchedCommandResponse* response);
};
-/**
- * Adds the specified amount of data written to the chunk's stats and if the total amount nears the
- * max size of a shard attempt to split the chunk. This call is opportunistic and swallows any
- * errors.
- */
-void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* opCtx,
- ChunkManager* manager,
- Chunk chunk,
- long dataWritten);
-
} // namespace mongo