diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2021-12-14 10:56:00 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-12-14 11:20:58 +0000 |
commit | ed7cca61938ee12f5a9cbe870af096987c662f5c (patch) | |
tree | bf385542317de682f3b8c488bc5d8975921cb884 | |
parent | 23cdc9223e5eee9367e2aa13358de1b87f845c75 (diff) | |
download | mongo-ed7cca61938ee12f5a9cbe870af096987c662f5c.tar.gz |
SERVER-59929 Limit the blocking of split/merge behind other metadata operations to 5 seconds
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/active_migrations_registry.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/s/chunk_splitter.cpp | 405 | ||||
-rw-r--r-- | src/mongo/db/s/chunk_splitter.h | 109 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_initialization_mongod.cpp | 2 |
6 files changed, 11 insertions, 524 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index b38448ef80c..fb53906333a 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -74,7 +74,6 @@ #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/sync_tail.h" #include "mongo/db/s/balancer/balancer.h" -#include "mongo/db/s/chunk_splitter.h" #include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/s/sharding_initialization_mongod.h" #include "mongo/db/s/sharding_state_recovery.h" @@ -776,7 +775,6 @@ void ReplicationCoordinatorExternalStateImpl::shardingOnStepDownHook() { if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { Balancer::get(_service)->interruptBalancer(); } else if (ShardingState::get(_service)->enabled()) { - ChunkSplitter::get(_service).onStepDown(); CatalogCacheLoader::get(_service).onStepDown(); } @@ -860,7 +858,6 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook } CatalogCacheLoader::get(_service).onStepUp(); - ChunkSplitter::get(_service).onStepUp(); } else { // unsharded if (auto validator = LogicalTimeValidator::get(_service)) { validator->enableKeyGenerator(opCtx, true); diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 7f96b5499ff..d8c920756f5 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -35,7 +35,6 @@ env.Library( 'active_shard_collection_registry.cpp', 'auto_split_vector.cpp', 'chunk_move_write_concern_options.cpp', - 'chunk_splitter.cpp', 'collection_range_deleter.cpp', 'collection_sharding_runtime.cpp', 'collection_sharding_state_factory_shard.cpp', diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp index 058ffb4441e..8d0cffacfeb 100644 --- a/src/mongo/db/s/active_migrations_registry.cpp +++ b/src/mongo/db/s/active_migrations_registry.cpp @@ -99,10 +99,17 @@ StatusWith<ScopedSplitMergeChunk> ActiveMigrationsRegistry::registerSplitOrMerge OperationContext* opCtx, const NamespaceString& nss, const ChunkRange& chunkRange) { stdx::unique_lock<stdx::mutex> ul(_mutex); - opCtx->waitForConditionOrInterrupt(_chunkOperationsStateChangedCV, ul, [&] { - return !(_activeMoveChunkState && _activeMoveChunkState->args.getNss() == nss) && - !_activeSplitMergeChunkStates.count(nss); - }); + // In order for splits to not block for too long behind a potential chunk migration, limit the + // duration of waiting for conflicting operations to at most 5 seconds. Otherwise, due to the + // fact that chunk splits block write operations on the MongoS it is possible that the write + // workload gets a really long stall. + const auto deadline = opCtx->getServiceContext()->getFastClockSource()->now() + Seconds{5}; + if (!opCtx->waitForConditionOrInterruptUntil(_chunkOperationsStateChangedCV, ul, deadline, [&] { + return !(_activeMoveChunkState && _activeMoveChunkState->args.getNss() == nss) && + !_activeSplitMergeChunkStates.count(nss); + })) { + return {ErrorCodes::LockBusy, "Timed out waiting for concurrent migration to complete"}; + } auto insertResult = _activeSplitMergeChunkStates.emplace(nss, ActiveSplitMergeChunkState(nss, chunkRange)); diff --git a/src/mongo/db/s/chunk_splitter.cpp b/src/mongo/db/s/chunk_splitter.cpp deleted file mode 100644 index 73eb4266aa5..00000000000 --- a/src/mongo/db/s/chunk_splitter.cpp +++ /dev/null @@ -1,405 +0,0 @@ - -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding - -#include "mongo/platform/basic.h" - -#include "mongo/db/s/chunk_splitter.h" - -#include "mongo/client/dbclientcursor.h" -#include "mongo/client/query.h" -#include "mongo/db/client.h" -#include "mongo/db/dbdirectclient.h" -#include "mongo/db/namespace_string.h" -#include "mongo/db/s/auto_split_vector.h" -#include "mongo/db/s/sharding_state.h" -#include "mongo/db/s/split_chunk.h" -#include "mongo/db/service_context.h" -#include "mongo/s/balancer_configuration.h" -#include "mongo/s/catalog/type_chunk.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/chunk_manager.h" -#include "mongo/s/config_server_client.h" -#include "mongo/s/grid.h" -#include "mongo/s/shard_key_pattern.h" -#include "mongo/util/assert_util.h" -#include "mongo/util/log.h" - -namespace mongo { -namespace { - -/** - * Constructs the default options for the thread pool used to schedule splits. - */ -ThreadPool::Options makeDefaultThreadPoolOptions() { - ThreadPool::Options options; - options.poolName = "ChunkSplitter"; - options.minThreads = 0; - options.maxThreads = 20; - - // Ensure all threads have a client - options.onCreateThread = [](const std::string& threadName) { - Client::initThread(threadName.c_str()); - }; - return options; -} - -/** - * Attempts to split the chunk described by min/maxKey at the split points provided. - */ -Status splitChunkAtMultiplePoints(OperationContext* opCtx, - const ShardId& shardId, - const NamespaceString& nss, - const ShardKeyPattern& shardKeyPattern, - const ChunkVersion& collectionVersion, - const ChunkRange& chunkRange, - const std::vector<BSONObj>& splitPoints) { - invariant(!splitPoints.empty()); - - const size_t kMaxSplitPoints = 8192; - - if (splitPoints.size() > kMaxSplitPoints) { - return {ErrorCodes::BadValue, - str::stream() << "Cannot split chunk in more than " << kMaxSplitPoints - << " parts at a time."}; - } - - return splitChunk(opCtx, - nss, - shardKeyPattern.toBSON(), - chunkRange, - splitPoints, - shardId.toString(), - collectionVersion.epoch()) - .getStatus() - .withContext("split failed"); -} - -/** - * Attempts to move the chunk specified by minKey away from its current shard. - */ -void moveChunk(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& minKey) { - // We need to have the most up-to-date view of the chunk we are about to move. - const auto routingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - - uassert(ErrorCodes::NamespaceNotSharded, - "Could not move chunk. Collection is no longer sharded", - routingInfo.cm()); - - const auto suggestedChunk = routingInfo.cm()->findIntersectingChunkWithSimpleCollation(minKey); - - ChunkType chunkToMove; - chunkToMove.setNS(nss); - chunkToMove.setShard(suggestedChunk.getShardId()); - chunkToMove.setMin(suggestedChunk.getMin()); - chunkToMove.setMax(suggestedChunk.getMax()); - chunkToMove.setVersion(suggestedChunk.getLastmod()); - - uassertStatusOK(configsvr_client::rebalanceChunk(opCtx, chunkToMove)); -} - -/** - * Returns the split point that will result in one of the chunks having exactly one document. Also - * returns an empty document if the split point cannot be determined. - * - * doSplitAtLower - determines which side of the split will have exactly one document. True means - * that the split point chosen will be closer to the lower bound. - * - * NOTE: this assumes that the shard key is not "special"- that is, the shardKeyPattern is simply an - * ordered list of ascending/descending field names. For example {a : 1, b : -1} is not special, but - * {a : "hashed"} is. - */ -BSONObj findExtremeKeyForShard(OperationContext* opCtx, - const NamespaceString& nss, - const ShardKeyPattern& shardKeyPattern, - bool doSplitAtLower) { - Query q; - - if (doSplitAtLower) { - q.sort(shardKeyPattern.toBSON()); - } else { - // need to invert shard key pattern to sort backwards - BSONObjBuilder r; - - BSONObjIterator i(shardKeyPattern.toBSON()); - while (i.more()) { - BSONElement e = i.next(); - uassert(40617, "can only handle numbers here - which i think is correct", e.isNumber()); - r.append(e.fieldName(), -1 * e.number()); - } - - q.sort(r.obj()); - } - - DBDirectClient client(opCtx); - - BSONObj end; - - if (doSplitAtLower) { - // Splitting close to the lower bound means that the split point will be the - // upper bound. Chunk range upper bounds are exclusive so skip a document to - // make the lower half of the split end up with a single document. - std::unique_ptr<DBClientCursor> cursor = client.query(nss.ns(), - q, - 1, /* nToReturn */ - 1 /* nToSkip */); - - uassert(40618, - str::stream() << "failed to initialize cursor during auto split due to " - << "connection problem with " - << client.getServerAddress(), - cursor.get() != nullptr); - - if (cursor->more()) { - end = cursor->next().getOwned(); - } - } else { - end = client.findOne(nss.ns(), q); - } - - if (end.isEmpty()) { - return BSONObj(); - } - - return shardKeyPattern.extractShardKeyFromDoc(end); -} - -/** - * Checks if autobalance is enabled on the current sharded collection. - */ -bool isAutoBalanceEnabled(OperationContext* opCtx, - const NamespaceString& nss, - BalancerConfiguration* balancerConfig) { - if (!balancerConfig->shouldBalanceForAutoSplit()) - return false; - - auto collStatus = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, nss); - if (!collStatus.isOK()) { - log() << "Auto-split for " << nss << " failed to load collection metadata" - << causedBy(redact(collStatus.getStatus())); - return false; - } - - return collStatus.getValue().value.getAllowBalance(); -} - -const auto getChunkSplitter = ServiceContext::declareDecoration<ChunkSplitter>(); - -} // namespace - -ChunkSplitter::ChunkSplitter() : _threadPool(makeDefaultThreadPoolOptions()) { - _threadPool.startup(); -} - -ChunkSplitter::~ChunkSplitter() { - _threadPool.shutdown(); - _threadPool.join(); -} - -ChunkSplitter& ChunkSplitter::get(OperationContext* opCtx) { - return get(opCtx->getServiceContext()); -} - -ChunkSplitter& ChunkSplitter::get(ServiceContext* serviceContext) { - return getChunkSplitter(serviceContext); -} - -void ChunkSplitter::setReplicaSetMode(bool isPrimary) { - stdx::lock_guard<stdx::mutex> scopedLock(_mutex); - _isPrimary = isPrimary; -} - -void ChunkSplitter::onStepUp() { - stdx::lock_guard<stdx::mutex> lg(_mutex); - if (_isPrimary) { - return; - } - _isPrimary = true; - - // log() << "The ChunkSplitter has started and will accept autosplit tasks."; - // TODO: Re-enable this log line when auto split is actively running on shards. -} - -void ChunkSplitter::onStepDown() { - stdx::lock_guard<stdx::mutex> lg(_mutex); - if (!_isPrimary) { - return; - } - _isPrimary = false; - - // log() << "The ChunkSplitter has stopped and will no longer run new autosplit tasks. Any " - // << "autosplit tasks that have already started will be allowed to finish."; - // TODO: Re-enable this log when auto split is actively running on shards. -} - -void ChunkSplitter::trySplitting(const NamespaceString& nss, - const BSONObj& min, - const BSONObj& max, - long dataWritten) { - if (!_isPrimary) { - return; - } - uassertStatusOK(_threadPool.schedule([ this, nss, min, max, dataWritten ]() noexcept { - _runAutosplit(nss, min, max, dataWritten); - })); -} - -void ChunkSplitter::_runAutosplit(const NamespaceString& nss, - const BSONObj& min, - const BSONObj& max, - long dataWritten) { - if (!_isPrimary) { - return; - } - - try { - const auto opCtx = cc().makeOperationContext(); - const auto routingInfo = uassertStatusOK( - Grid::get(opCtx.get())->catalogCache()->getCollectionRoutingInfo(opCtx.get(), nss)); - - uassert(ErrorCodes::NamespaceNotSharded, - "Could not split chunk. Collection is no longer sharded", - routingInfo.cm()); - - const auto cm = routingInfo.cm(); - const auto chunk = cm->findIntersectingChunkWithSimpleCollation(min); - - // Stop if chunk's range differs from the range we were expecting to split. - if ((0 != chunk.getMin().woCompare(min)) || (0 != chunk.getMax().woCompare(max)) || - (chunk.getShardId() != ShardingState::get(opCtx.get())->shardId())) { - LOG(1) << "Cannot auto-split chunk with range '" - << redact(ChunkRange(min, max).toString()) << "' for nss '" << nss - << "' on shard '" << ShardingState::get(opCtx.get())->shardId() - << "' because since scheduling auto-split the chunk has been changed to '" - << redact(chunk.toString()) << "'"; - return; - } - - const ChunkRange chunkRange(chunk.getMin(), chunk.getMax()); - - const auto balancerConfig = Grid::get(opCtx.get())->getBalancerConfiguration(); - // Ensure we have the most up-to-date balancer configuration - uassertStatusOK(balancerConfig->refreshAndCheck(opCtx.get())); - - if (!balancerConfig->getShouldAutoSplit()) { - return; - } - - const uint64_t maxChunkSizeBytes = balancerConfig->getMaxChunkSizeBytes(); - - LOG(1) << "about to initiate autosplit: " << redact(chunk.toString()) - << " dataWritten since last check: " << dataWritten - << " maxChunkSizeBytes: " << maxChunkSizeBytes; - - const auto& shardKeyPattern = cm->getShardKeyPattern(); - auto splitPoints = autoSplitVector(opCtx.get(), - nss, - shardKeyPattern.toBSON(), - chunk.getMin(), - chunk.getMax(), - maxChunkSizeBytes); - - if (splitPoints.empty()) { - // No split points means there isn't enough data to split on; 1 split point means we - // have between half the chunk size to full chunk size so there is no need to split yet - return; - } - - // We assume that if the chunk being split is the first (or last) one on the collection, - // this chunk is likely to see more insertions. Instead of splitting mid-chunk, we use the - // very first (or last) key as a split point. - // - // This heuristic is skipped for "special" shard key patterns that are not likely to produce - // monotonically increasing or decreasing values (e.g. hashed shard keys). - - // Keeps track of the minKey of the top chunk after the split so we can migrate the chunk. - BSONObj topChunkMinKey; - - if (KeyPattern::isOrderedKeyPattern(cm->getShardKeyPattern().toBSON())) { - if (0 == - cm->getShardKeyPattern().getKeyPattern().globalMin().woCompare(chunk.getMin())) { - // MinKey is infinity (This is the first chunk on the collection) - BSONObj key = - findExtremeKeyForShard(opCtx.get(), nss, cm->getShardKeyPattern(), true); - if (!key.isEmpty()) { - splitPoints.front() = key.getOwned(); - topChunkMinKey = cm->getShardKeyPattern().getKeyPattern().globalMin(); - } - } else if (0 == - cm->getShardKeyPattern().getKeyPattern().globalMax().woCompare( - chunk.getMax())) { - // MaxKey is infinity (This is the last chunk on the collection) - BSONObj key = - findExtremeKeyForShard(opCtx.get(), nss, cm->getShardKeyPattern(), false); - if (!key.isEmpty()) { - splitPoints.back() = key.getOwned(); - topChunkMinKey = key.getOwned(); - } - } - } - - uassertStatusOK(splitChunkAtMultiplePoints(opCtx.get(), - chunk.getShardId(), - nss, - cm->getShardKeyPattern(), - cm->getVersion(), - chunkRange, - splitPoints)); - - const bool shouldBalance = isAutoBalanceEnabled(opCtx.get(), nss, balancerConfig); - - log() << "autosplitted " << nss << " chunk: " << redact(chunk.toString()) << " into " - << (splitPoints.size() + 1) << " parts (maxChunkSizeBytes " << maxChunkSizeBytes - << ")" - << (topChunkMinKey.isEmpty() ? "" : " (top chunk migration suggested" + - (std::string)(shouldBalance ? ")" : ", but no migrations allowed)")); - - // Balance the resulting chunks if the autobalance option is enabled and if we split at the - // first or last chunk on the collection as part of top chunk optimization. - - if (!shouldBalance || topChunkMinKey.isEmpty()) { - return; - } - - // Tries to move the top chunk out of the shard to prevent the hot spot from staying on a - // single shard. This is based on the assumption that succeeding inserts will fall on the - // top chunk. - moveChunk(opCtx.get(), nss, topChunkMinKey); - } catch (const DBException& ex) { - log() << "Unable to auto-split chunk " << redact(ChunkRange(min, max).toString()) - << " in nss " << nss << causedBy(redact(ex.toStatus())); - } catch (const std::exception& e) { - log() << "caught exception while splitting chunk: " << redact(e.what()); - } -} - -} // namespace mongo diff --git a/src/mongo/db/s/chunk_splitter.h b/src/mongo/db/s/chunk_splitter.h deleted file mode 100644 index a634ae46ce2..00000000000 --- a/src/mongo/db/s/chunk_splitter.h +++ /dev/null @@ -1,109 +0,0 @@ - -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/util/concurrency/thread_pool.h" - -namespace mongo { - -class NamespaceString; -class OperationContext; -class ServiceContext; - -/** - * Handles asynchronous auto-splitting of chunks. - */ -class ChunkSplitter { - MONGO_DISALLOW_COPYING(ChunkSplitter); - -public: - ChunkSplitter(); - ~ChunkSplitter(); - - /** - * Obtains the service-wide chunk splitter instance. - */ - static ChunkSplitter& get(OperationContext* opCtx); - static ChunkSplitter& get(ServiceContext* serviceContext); - - /** - * Sets the mode of the ChunkSplitter to either primary or secondary. - * The ChunkSplitter is only active when primary. - */ - void setReplicaSetMode(bool isPrimary); - - /** - * Invoked when the shard server primary enters the 'PRIMARY' state to set up the ChunkSplitter - * to begin accepting split requests. - */ - void onStepUp(); - - /** - * Invoked when this node which is currently serving as a 'PRIMARY' steps down. - * - * This method might be called multiple times in succession, which is what happens as a result - * of incomplete transition to primary so it is resilient to that. - */ - void onStepDown(); - - /** - * Schedules an autosplit task. This function throws on scheduling failure. - */ - void trySplitting(const NamespaceString& nss, - const BSONObj& min, - const BSONObj& max, - long dataWritten); - -private: - /** - * Determines if the specified chunk should be split and then performs any necessary splits. - * - * It may also perform a 'top chunk' optimization where a resulting chunk that contains either - * MaxKey or MinKey as a range extreme will be moved off to another shard to relieve load on the - * original owner. This optimization presumes that the user is doing writes with increasing or - * decreasing shard key values. - */ - void _runAutosplit(const NamespaceString& nss, - const BSONObj& min, - const BSONObj& max, - long dataWritten); - - // Protects the state below. - stdx::mutex _mutex; - - // The ChunkSplitter is only active on a primary node. - bool _isPrimary{false}; - - // Thread pool for parallelizing splits. - ThreadPool _threadPool; -}; - -} // namespace mongo diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp index d5f6b0c32dd..6e04d369f7d 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.cpp +++ b/src/mongo/db/s/sharding_initialization_mongod.cpp @@ -49,7 +49,6 @@ #include "mongo/db/ops/update.h" #include "mongo/db/ops/update_lifecycle_impl.h" #include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/s/chunk_splitter.h" #include "mongo/db/s/read_only_catalog_cache_loader.h" #include "mongo/db/s/shard_server_catalog_cache_loader.h" #include "mongo/db/s/sharding_config_optime_gossip.h" @@ -129,7 +128,6 @@ void initializeShardingEnvironmentOnShardServer(OperationContext* opCtx, repl::MemberState::RS_PRIMARY); CatalogCacheLoader::get(opCtx).initializeReplicaSetRole(isStandaloneOrPrimary); - ChunkSplitter::get(opCtx).setReplicaSetMode(isStandaloneOrPrimary); Grid::get(opCtx)->setShardingInitialized(); |