diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2021-06-03 06:34:00 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-06-24 10:18:25 +0000 |
commit | 0e0c905d987713004f273ffb8504e98a988b9fbd (patch) | |
tree | 6e806fab3b48a0fd696dc9dc6a3653c86936d230 | |
parent | b9b407313d1afd49e820e2cbb6b75336641dbad9 (diff) | |
download | mongo-0e0c905d987713004f273ffb8504e98a988b9fbd.tar.gz |
SERVER-56779 Allow multiple concurrent merges for the same collection across the cluster
(cherry picked from commit e14e6ac61867ebe158fa455f012574ed7e570801)
-rw-r--r-- | src/mongo/db/s/active_migrations_registry.cpp | 56 | ||||
-rw-r--r-- | src/mongo/db/s/active_migrations_registry.h | 78 | ||||
-rw-r--r-- | src/mongo/db/s/merge_chunks_command.cpp | 17 |
3 files changed, 128 insertions, 23 deletions
diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp index 78503d62090..058ffb4441e 100644 --- a/src/mongo/db/s/active_migrations_registry.cpp +++ b/src/mongo/db/s/active_migrations_registry.cpp @@ -95,6 +95,22 @@ StatusWith<ScopedReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk( return {ScopedReceiveChunk(this)}; } +StatusWith<ScopedSplitMergeChunk> ActiveMigrationsRegistry::registerSplitOrMergeChunk( + OperationContext* opCtx, const NamespaceString& nss, const ChunkRange& chunkRange) { + stdx::unique_lock<stdx::mutex> ul(_mutex); + + opCtx->waitForConditionOrInterrupt(_chunkOperationsStateChangedCV, ul, [&] { + return !(_activeMoveChunkState && _activeMoveChunkState->args.getNss() == nss) && + !_activeSplitMergeChunkStates.count(nss); + }); + + auto insertResult = + _activeSplitMergeChunkStates.emplace(nss, ActiveSplitMergeChunkState(nss, chunkRange)); + invariant(insertResult.second); + + return {ScopedSplitMergeChunk(this, nss)}; +} + boost::optional<NamespaceString> ActiveMigrationsRegistry::getActiveDonateChunkNss() { stdx::lock_guard<stdx::mutex> lk(_mutex); if (_activeMoveChunkState) { @@ -135,18 +151,26 @@ void ActiveMigrationsRegistry::_clearDonateChunk() { stdx::lock_guard<stdx::mutex> lk(_mutex); invariant(_activeMoveChunkState); _activeMoveChunkState.reset(); + _chunkOperationsStateChangedCV.notify_all(); } void ActiveMigrationsRegistry::_clearReceiveChunk() { stdx::lock_guard<stdx::mutex> lk(_mutex); invariant(_activeReceiveChunkState); _activeReceiveChunkState.reset(); + _chunkOperationsStateChangedCV.notify_all(); +} + +void ActiveMigrationsRegistry::_clearSplitMergeChunk(const NamespaceString& nss) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_activeSplitMergeChunkStates.erase(nss)); + _chunkOperationsStateChangedCV.notify_all(); } Status ActiveMigrationsRegistry::ActiveMoveChunkState::constructErrorStatus() const { return {ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Unable to start new migration because this shard is currently " - "donating chunk " + str::stream() << "Unable to start new balancer operation because this shard is " + "currently donating chunk " << ChunkRange(args.getMinKey(), args.getMaxKey()).toString() << " for namespace " << args.getNss().ns() @@ -156,8 +180,8 @@ Status ActiveMigrationsRegistry::ActiveMoveChunkState::constructErrorStatus() co Status ActiveMigrationsRegistry::ActiveReceiveChunkState::constructErrorStatus() const { return {ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Unable to start new migration because this shard is currently " - "receiving chunk " + str::stream() << "Unable to start new balancer operation because this shard is " + "currently receiving chunk " << range.toString() << " for namespace " << nss.ns() @@ -226,4 +250,28 @@ ScopedReceiveChunk& ScopedReceiveChunk::operator=(ScopedReceiveChunk&& other) { return *this; } +ScopedSplitMergeChunk::ScopedSplitMergeChunk(ActiveMigrationsRegistry* registry, + const NamespaceString& nss) + : _registry(registry), _nss(nss) {} + +ScopedSplitMergeChunk::~ScopedSplitMergeChunk() { + if (_registry) { + _registry->_clearSplitMergeChunk(_nss); + } +} + +ScopedSplitMergeChunk::ScopedSplitMergeChunk(ScopedSplitMergeChunk&& other) { + *this = std::move(other); +} + +ScopedSplitMergeChunk& ScopedSplitMergeChunk::operator=(ScopedSplitMergeChunk&& other) { + if (&other != this) { + _registry = other._registry; + other._registry = nullptr; + _nss = std::move(other._nss); + } + + return *this; +} + } // namespace mongo diff --git a/src/mongo/db/s/active_migrations_registry.h b/src/mongo/db/s/active_migrations_registry.h index 29344432d2d..d6280daf4fa 100644 --- a/src/mongo/db/s/active_migrations_registry.h +++ b/src/mongo/db/s/active_migrations_registry.h @@ -44,12 +44,19 @@ namespace mongo { class OperationContext; class ScopedDonateChunk; class ScopedReceiveChunk; -template <typename T> -class StatusWith; +class ScopedSplitMergeChunk; /** - * Thread-safe object that keeps track of the active migrations running on a node and limits them - * to only one per shard. There is only one instance of this object per shard. + * This class is used to synchronise all the active routing info operations for chunks owned by this + * shard. There is only one instance of it per ServiceContext. + * + * It implements a non-fair lock manager, which provides the following guarantees: + * + * - Move || Move (same chunk): The second move will join the first + * - Move || Move (different chunks or collections): The second move will result in a + * ConflictingOperationInProgress error + * - Move || Split/Merge (same collection): The second operation will block behind the first + * - Move/Split/Merge || Split/Merge (for different collections): Can proceed concurrently */ class ActiveMigrationsRegistry { MONGO_DISALLOW_COPYING(ActiveMigrationsRegistry); @@ -75,9 +82,10 @@ public: StatusWith<ScopedDonateChunk> registerDonateChunk(const MoveChunkRequest& args); /** - * If there are no migrations running on this shard, registers an active receive operation with - * the specified session id and returns a ScopedReceiveChunk. The ScopedReceiveChunk will - * unregister the migration when the ScopedReceiveChunk goes out of scope. + * If there are no migrations or split/merges running on this shard, registers an active receive + * operation with the specified session id and returns a ScopedReceiveChunk. The + * ScopedReceiveChunk will unregister the migration when the ScopedReceiveChunk goes out of + * scope. * * Otherwise returns a ConflictingOperationInProgress error. */ @@ -86,6 +94,15 @@ public: const ShardId& fromShardId); /** + * If there are no migrations running on this shard, registers an active split or merge + * operation for the specified namespace and returns a scoped object which will in turn disallow + * other migrations or splits/merges for the same namespace (but not for other namespaces). + */ + StatusWith<ScopedSplitMergeChunk> registerSplitOrMergeChunk(OperationContext* opCtx, + const NamespaceString& nss, + const ChunkRange& chunkRange); + + /** * If a migration has been previously registered through a call to registerDonateChunk, returns * that namespace. Otherwise returns boost::none. */ @@ -102,6 +119,7 @@ public: private: friend class ScopedDonateChunk; friend class ScopedReceiveChunk; + friend class ScopedSplitMergeChunk; // Describes the state of a currently active moveChunk operation struct ActiveMoveChunkState { @@ -140,6 +158,19 @@ private: ShardId fromShardId; }; + // Describes the state of a currently active split or merge operation + struct ActiveSplitMergeChunkState { + ActiveSplitMergeChunkState(NamespaceString inNss, ChunkRange inRange) + : nss(std::move(inNss)), range(std::move(inRange)) {} + + // Namespace for which a chunk is being split or merged + NamespaceString nss; + + // If split, bounds of the chunk being split; if merge, the end bounds of the range being + // merged + ChunkRange range; + }; + /** * Unregisters a previously registered namespace with an ongoing migration. Must only be called * if a previous call to registerDonateChunk has succeeded. @@ -152,14 +183,28 @@ private: */ void _clearReceiveChunk(); + /** + * Unregisters a previously registered split/merge chunk operation. Must only be called if a + * previous call to registerSplitOrMergeChunk has succeeded. + */ + void _clearSplitMergeChunk(const NamespaceString& nss); + // Protects the state below stdx::mutex _mutex; + // Condition variable which will be signaled whenever any of the states below become false, + // boost::none or a specific namespace removed from the map. + stdx::condition_variable _chunkOperationsStateChangedCV; + // If there is an active moveChunk operation, this field contains the original request boost::optional<ActiveMoveChunkState> _activeMoveChunkState; // If there is an active chunk receive operation, this field contains the original session id boost::optional<ActiveReceiveChunkState> _activeReceiveChunkState; + + // If there is an active split or merge chunk operation for a particular namespace, this map + // will contain an entry + stdx::unordered_map<NamespaceString, ActiveSplitMergeChunkState> _activeSplitMergeChunkStates; }; /** @@ -235,4 +280,23 @@ private: ActiveMigrationsRegistry* _registry; }; +/** + * Object of this class is returned from the registerSplitOrMergeChunk call of the active migrations + * registry. + */ +class ScopedSplitMergeChunk { +public: + ScopedSplitMergeChunk(ActiveMigrationsRegistry* registry, const NamespaceString& nss); + ~ScopedSplitMergeChunk(); + + ScopedSplitMergeChunk(ScopedSplitMergeChunk&&); + ScopedSplitMergeChunk& operator=(ScopedSplitMergeChunk&&); + +private: + // Registry from which to unregister the split/merge. Not owned. + ActiveMigrationsRegistry* _registry; + + NamespaceString _nss; +}; + } // namespace mongo diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp index 5760eade5d8..a8be891bc1c 100644 --- a/src/mongo/db/s/merge_chunks_command.cpp +++ b/src/mongo/db/s/merge_chunks_command.cpp @@ -40,6 +40,7 @@ #include "mongo/db/field_parser.h" #include "mongo/db/logical_clock.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/s/active_migrations_registry.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" @@ -234,17 +235,10 @@ void mergeChunks(OperationContext* opCtx, const BSONObj& minKey, const BSONObj& maxKey, const OID& epoch) { - const std::string whyMessage = str::stream() << "merging chunks in " << nss.ns() << " from " - << redact(minKey) << " to " << redact(maxKey); - auto scopedDistLock = uassertStatusOKWithContext( - Grid::get(opCtx)->catalogClient()->getDistLockManager()->lock( - opCtx, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout), - str::stream() << "could not acquire collection lock for " << nss.ns() - << " to merge chunks in [" - << redact(minKey) - << ", " - << redact(maxKey) - << ")"); + const ChunkRange chunkRange(minKey, maxKey); + + auto scopedSplitOrMergeChunk(uassertStatusOK( + ActiveMigrationsRegistry::get(opCtx).registerSplitOrMergeChunk(opCtx, nss, chunkRange))); // We now have the collection distributed lock, refresh metadata to latest version and sanity // check @@ -268,7 +262,6 @@ void mergeChunks(OperationContext* opCtx, metadata->isSharded()); const auto shardVersion = metadata->getShardVersion(); - const ChunkRange chunkRange(minKey, maxKey); uassert(ErrorCodes::StaleEpoch, str::stream() << "could not merge chunks, collection " << nss.ns() << " has changed since merge was sent (sent epoch: " |