diff options
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/chunk_manager.h | 5 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_map_reduce_cmd.cpp | 27 | ||||
-rw-r--r-- | src/mongo/s/write_ops/cluster_write.cpp | 257 | ||||
-rw-r--r-- | src/mongo/s/write_ops/cluster_write.h | 12 |
4 files changed, 0 insertions, 301 deletions
diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h index 40baca2cd68..99b91954791 100644 --- a/src/mongo/s/chunk_manager.h +++ b/src/mongo/s/chunk_manager.h @@ -212,11 +212,6 @@ private: } _autoSplitThrottle; friend class ChunkManager; - // This function needs to be able to access the auto-split throttle - friend void updateChunkWriteStatsAndSplitIfNeeded(OperationContext*, - ChunkManager*, - Chunk, - long); }; // This will be renamed to RoutingTableHistory and the original RoutingTableHistory will be diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp index 11de9f74faa..71b63d8b0ef 100644 --- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp @@ -543,7 +543,6 @@ public: shardedOutputCollUUID->appendToBuilder(&finalCmd, "shardedOutputCollUUID"); } - auto chunkSizes = SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<int>(); { // Take distributed lock to prevent split / migration. auto scopedDistLock = catalogClient->getDistLockManager()->lock( @@ -599,20 +598,6 @@ public: reduceCount += counts.getIntField("reduce"); outputCount += counts.getIntField("output"); postCountsB.append(server, counts); - - // get the size inserted for each chunk - // split cannot be called here since we already have the distributed lock - if (singleResult.hasField("chunkSizes")) { - std::vector<BSONElement> sizes = - singleResult.getField("chunkSizes").Array(); - for (unsigned int i = 0; i < sizes.size(); i += 2) { - BSONObj key = sizes[i].Obj().getOwned(); - const long long size = sizes[i + 1].numberLong(); - - invariant(size < std::numeric_limits<int>::max()); - chunkSizes[key] = static_cast<int>(size); - } - } } } @@ -624,18 +609,6 @@ public: str::stream() << "Failed to write mapreduce output to " << outputCollNss.ns() << "; expected that collection to be sharded, but it was not", outputRoutingInfo.cm()); - - const auto outputCM = outputRoutingInfo.cm(); - - for (const auto& chunkSize : chunkSizes) { - BSONObj key = chunkSize.first; - const int size = chunkSize.second; - invariant(size < std::numeric_limits<int>::max()); - - // Key reported should be the chunk's minimum - auto chunkWritten = outputCM->findIntersectingChunkWithSimpleCollation(key); - updateChunkWriteStatsAndSplitIfNeeded(opCtx, outputCM.get(), chunkWritten, size); - } } cleanUp(servers, dbname, shardResultCollection); diff --git a/src/mongo/s/write_ops/cluster_write.cpp b/src/mongo/s/write_ops/cluster_write.cpp index cdf9b745b39..258f48308dd 100644 --- a/src/mongo/s/write_ops/cluster_write.cpp +++ b/src/mongo/s/write_ops/cluster_write.cpp @@ -54,91 +54,12 @@ namespace mongo { namespace { -const uint64_t kTooManySplitPoints = 4; - void toBatchError(const Status& status, BatchedCommandResponse* response) { response->clear(); response->setStatus(status); dassert(response->isValid(NULL)); } -/** - * Returns the split point that will result in one of the chunk having exactly one document. Also - * returns an empty document if the split point cannot be determined. - * - * doSplitAtLower - determines which side of the split will have exactly one document. True means - * that the split point chosen will be closer to the lower bound. - * - * NOTE: this assumes that the shard key is not "special"- that is, the shardKeyPattern is simply an - * ordered list of ascending/descending field names. For example {a : 1, b : -1} is not special, but - * {a : "hashed"} is. - */ -BSONObj findExtremeKeyForShard(OperationContext* opCtx, - const NamespaceString& nss, - const ShardId& shardId, - const ShardKeyPattern& shardKeyPattern, - bool doSplitAtLower) { - Query q; - - if (doSplitAtLower) { - q.sort(shardKeyPattern.toBSON()); - } else { - // need to invert shard key pattern to sort backwards - // TODO: make a helper in ShardKeyPattern? - BSONObjBuilder r; - - BSONObjIterator i(shardKeyPattern.toBSON()); - while (i.more()) { - BSONElement e = i.next(); - uassert(10163, "can only handle numbers here - which i think is correct", e.isNumber()); - r.append(e.fieldName(), -1 * e.number()); - } - - q.sort(r.obj()); - } - - // Find the extreme key - const auto shardConnStr = [&]() { - const auto shard = - uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); - return shard->getConnString(); - }(); - - ScopedDbConnection conn(shardConnStr); - - BSONObj end; - - if (doSplitAtLower) { - // Splitting close to the lower bound means that the split point will be the - // upper bound. Chunk range upper bounds are exclusive so skip a document to - // make the lower half of the split end up with a single document. - std::unique_ptr<DBClientCursor> cursor = conn->query(nss.ns(), - q, - 1, /* nToReturn */ - 1 /* nToSkip */); - - uassert(28736, - str::stream() << "failed to initialize cursor during auto split due to " - << "connection problem with " - << conn->getServerAddress(), - cursor.get() != nullptr); - - if (cursor->more()) { - end = cursor->next().getOwned(); - } - } else { - end = conn->findOne(nss.ns(), q); - } - - conn.done(); - - if (end.isEmpty()) { - return BSONObj(); - } - - return shardKeyPattern.extractShardKeyFromDoc(end); -} - } // namespace void ClusterWriter::write(OperationContext* opCtx, @@ -200,182 +121,4 @@ void ClusterWriter::write(OperationContext* opCtx, } } -void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* opCtx, - ChunkManager* manager, - Chunk chunk, - long chunkBytesWritten) { - // Disable lastError tracking so that any errors, which occur during auto-split do not get - // bubbled up on the client connection doing a write - LastError::Disabled disableLastError(&LastError::get(opCtx->getClient())); - - const auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration(); - - const bool minIsInf = - (0 == manager->getShardKeyPattern().getKeyPattern().globalMin().woCompare(chunk.getMin())); - const bool maxIsInf = - (0 == manager->getShardKeyPattern().getKeyPattern().globalMax().woCompare(chunk.getMax())); - - auto writesTracker = chunk.getWritesTracker(); - writesTracker->addBytesWritten(chunkBytesWritten); - - const uint64_t desiredChunkSize = balancerConfig->getMaxChunkSizeBytes(); - - if (!writesTracker->shouldSplit(desiredChunkSize)) { - return; - } - - const NamespaceString& nss = manager->getns(); - - if (!manager->autoSplitThrottle()._splitTickets.tryAcquire()) { - LOG(1) << "won't auto split because not enough tickets: " << nss; - return; - } - - TicketHolderReleaser releaser(&(manager->autoSplitThrottle()._splitTickets)); - - const ChunkRange chunkRange(chunk.getMin(), chunk.getMax()); - - try { - // Ensure we have the most up-to-date balancer configuration - uassertStatusOK(balancerConfig->refreshAndCheck(opCtx)); - - if (!balancerConfig->getShouldAutoSplit()) { - return; - } - - LOG(1) << "about to initiate autosplit: " << redact(chunk.toString()) - << " dataWritten: " << chunkBytesWritten - << " desiredChunkSize: " << desiredChunkSize; - - const uint64_t chunkSizeToUse = [&]() { - const uint64_t estNumSplitPoints = chunkBytesWritten / desiredChunkSize * 2; - - if (estNumSplitPoints >= kTooManySplitPoints) { - // The current desired chunk size will split the chunk into lots of small chunk and - // at the worst case this can result into thousands of chunks. So check and see if a - // bigger value can be used. - return std::min((uint64_t)chunkBytesWritten, - balancerConfig->getMaxChunkSizeBytes()); - } else { - return desiredChunkSize; - } - }(); - - auto splitPoints = - uassertStatusOK(shardutil::selectChunkSplitPoints(opCtx, - chunk.getShardId(), - nss, - manager->getShardKeyPattern(), - chunkRange, - chunkSizeToUse, - boost::none)); - - if (splitPoints.size() <= 1) { - // No split points means there isn't enough data to split on; 1 split point means we - // have - // between half the chunk size to full chunk size so there is no need to split yet - writesTracker->clearBytesWritten(); - return; - } - - if (minIsInf || maxIsInf) { - // We don't want to reset _dataWritten since we want to check the other side right away - } else { - // We're splitting, so should wait a bit - writesTracker->clearBytesWritten(); - } - - // We assume that if the chunk being split is the first (or last) one on the collection, - // this chunk is likely to see more insertions. Instead of splitting mid-chunk, we use the - // very first (or last) key as a split point. - // - // This heuristic is skipped for "special" shard key patterns that are not likely to produce - // monotonically increasing or decreasing values (e.g. hashed shard keys). - if (KeyPattern::isOrderedKeyPattern(manager->getShardKeyPattern().toBSON())) { - if (minIsInf) { - BSONObj key = findExtremeKeyForShard( - opCtx, nss, chunk.getShardId(), manager->getShardKeyPattern(), true); - if (!key.isEmpty()) { - splitPoints.front() = key.getOwned(); - } - } else if (maxIsInf) { - BSONObj key = findExtremeKeyForShard( - opCtx, nss, chunk.getShardId(), manager->getShardKeyPattern(), false); - if (!key.isEmpty()) { - splitPoints.back() = key.getOwned(); - } - } - } - - const auto suggestedMigrateChunk = - uassertStatusOK(shardutil::splitChunkAtMultiplePoints(opCtx, - chunk.getShardId(), - nss, - manager->getShardKeyPattern(), - manager->getVersion(), - chunkRange, - splitPoints)); - - // Balance the resulting chunks if the option is enabled and if the shard suggested a chunk - // to balance - const bool shouldBalance = [&]() { - if (!balancerConfig->shouldBalanceForAutoSplit()) - return false; - - auto collStatus = - Grid::get(opCtx)->catalogClient()->getCollection(opCtx, manager->getns()); - if (!collStatus.isOK()) { - log() << "Auto-split for " << nss << " failed to load collection metadata" - << causedBy(redact(collStatus.getStatus())); - return false; - } - - return collStatus.getValue().value.getAllowBalance(); - }(); - - log() << "autosplitted " << nss << " chunk: " << redact(chunk.toString()) << " into " - << (splitPoints.size() + 1) << " parts (desiredChunkSize " << desiredChunkSize << ")" - << (suggestedMigrateChunk ? "" : (std::string) " (migrate suggested" + - (shouldBalance ? ")" : ", but no migrations allowed)")); - - // Reload the chunk manager after the split - auto routingInfo = uassertStatusOK( - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, - nss)); - - if (!shouldBalance || !suggestedMigrateChunk) { - return; - } - - // Top chunk optimization - try to move the top chunk out of this shard to prevent the hot - // spot from staying on a single shard. This is based on the assumption that succeeding - // inserts will fall on the top chunk. - - // We need to use the latest chunk manager (after the split) in order to have the most - // up-to-date view of the chunk we are about to move - auto suggestedChunk = routingInfo.cm()->findIntersectingChunkWithSimpleCollation( - suggestedMigrateChunk->getMin()); - - ChunkType chunkToMove; - chunkToMove.setNS(nss); - chunkToMove.setShard(suggestedChunk.getShardId()); - chunkToMove.setMin(suggestedChunk.getMin()); - chunkToMove.setMax(suggestedChunk.getMax()); - chunkToMove.setVersion(suggestedChunk.getLastmod()); - - uassertStatusOK(configsvr_client::rebalanceChunk(opCtx, chunkToMove)); - - // Ensure the collection gets reloaded because of the move - Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss); - } catch (const DBException& ex) { - chunk.getWritesTracker()->clearBytesWritten(); - - if (ErrorCodes::isStaleShardVersionError(ex.code())) { - log() << "Unable to auto-split chunk " << redact(chunkRange.toString()) << causedBy(ex) - << ", going to invalidate routing table entry for " << nss; - Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss); - } - } -} - } // namespace mongo diff --git a/src/mongo/s/write_ops/cluster_write.h b/src/mongo/s/write_ops/cluster_write.h index 7e45dc27b45..f4604fc2040 100644 --- a/src/mongo/s/write_ops/cluster_write.h +++ b/src/mongo/s/write_ops/cluster_write.h @@ -35,8 +35,6 @@ namespace mongo { class BSONObj; -class Chunk; -class ChunkManager; class OperationContext; class ClusterWriter { @@ -47,14 +45,4 @@ public: BatchedCommandResponse* response); }; -/** - * Adds the specified amount of data written to the chunk's stats and if the total amount nears the - * max size of a shard attempt to split the chunk. This call is opportunistic and swallows any - * errors. - */ -void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* opCtx, - ChunkManager* manager, - Chunk chunk, - long dataWritten); - } // namespace mongo |