diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2021-05-07 15:56:25 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-05-14 16:26:21 +0000 |
commit | 6c37e83e56bf2ceea19a4de59a5aba38e28de65a (patch) | |
tree | 3ef80002c03ded23cef5217ea0f63e6a1f0c91f9 | |
parent | fc57a64f959668b04ff7c7f2efdd68ce95a11736 (diff) | |
download | mongo-6c37e83e56bf2ceea19a4de59a5aba38e28de65a.tar.gz |
SERVER-56779 Allow multiple concurrent merges for the same collection across the cluster
-rw-r--r-- | src/mongo/db/s/active_migrations_registry.cpp | 91 | ||||
-rw-r--r-- | src/mongo/db/s/active_migrations_registry.h | 74 | ||||
-rw-r--r-- | src/mongo/db/s/merge_chunks_command.cpp | 19 |
3 files changed, 149 insertions, 35 deletions
diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp index 78503d62090..6a05fb1f460 100644 --- a/src/mongo/db/s/active_migrations_registry.cpp +++ b/src/mongo/db/s/active_migrations_registry.cpp @@ -74,6 +74,11 @@ StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk( return _activeMoveChunkState->constructErrorStatus(); } + auto itSplitMerge = _activeSplitMergeChunkStates.find(args.getNss()); + if (itSplitMerge != _activeSplitMergeChunkStates.end()) { + return itSplitMerge->second.constructErrorStatus(); + } + _activeMoveChunkState.emplace(args); return {ScopedDonateChunk(this, true, _activeMoveChunkState->notification)}; @@ -95,6 +100,26 @@ StatusWith<ScopedReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk( return {ScopedReceiveChunk(this)}; } +StatusWith<ScopedSplitMergeChunk> ActiveMigrationsRegistry::registerSplitOrMergeChunk( + const NamespaceString& nss, const ChunkRange& chunkRange) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_activeReceiveChunkState) { + return _activeReceiveChunkState->constructErrorStatus(); + } + + if (_activeMoveChunkState) { + return _activeMoveChunkState->constructErrorStatus(); + } + + auto emplaceResult = + _activeSplitMergeChunkStates.emplace(nss, ActiveSplitMergeChunkState(nss, chunkRange)); + if (!emplaceResult.second) { + return emplaceResult.first->second.constructErrorStatus(); + } + + return {ScopedSplitMergeChunk(this, nss)}; +} + boost::optional<NamespaceString> ActiveMigrationsRegistry::getActiveDonateChunkNss() { stdx::lock_guard<stdx::mutex> lk(_mutex); if (_activeMoveChunkState) { @@ -143,26 +168,42 @@ void ActiveMigrationsRegistry::_clearReceiveChunk() { _activeReceiveChunkState.reset(); } +void ActiveMigrationsRegistry::_clearSplitMergeChunk(const NamespaceString& nss) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_activeSplitMergeChunkStates.erase(nss)); +} + Status ActiveMigrationsRegistry::ActiveMoveChunkState::constructErrorStatus() const { - return {ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Unable to start new migration because this shard is currently " - "donating chunk " - << ChunkRange(args.getMinKey(), args.getMaxKey()).toString() - << " for namespace " - << args.getNss().ns() - << " to " - << args.getToShardId()}; + return { + ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Unable to start new balancer operation because this shard is currently " + "donating chunk " + << ChunkRange(args.getMinKey(), args.getMaxKey()).toString() + << " for namespace " + << args.getNss().ns() + << " to " + << args.getToShardId()}; } Status ActiveMigrationsRegistry::ActiveReceiveChunkState::constructErrorStatus() const { + return { + ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Unable to start new balancer operation because this shard is currently " + "receiving chunk " + << range.toString() + << " for namespace " + << nss.ns() + << " from " + << fromShardId}; +} + +Status ActiveMigrationsRegistry::ActiveSplitMergeChunkState::constructErrorStatus() const { return {ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Unable to start new migration because this shard is currently " - "receiving chunk " + str::stream() << "Unable to start new balancer operation because this shard is " + "currently splitting or merging chunk " << range.toString() << " for namespace " - << nss.ns() - << " from " - << fromShardId}; + << nss.ns()}; } ScopedDonateChunk::ScopedDonateChunk(ActiveMigrationsRegistry* registry, @@ -226,4 +267,28 @@ ScopedReceiveChunk& ScopedReceiveChunk::operator=(ScopedReceiveChunk&& other) { return *this; } +ScopedSplitMergeChunk::ScopedSplitMergeChunk(ActiveMigrationsRegistry* registry, + const NamespaceString& nss) + : _registry(registry), _nss(nss) {} + +ScopedSplitMergeChunk::~ScopedSplitMergeChunk() { + if (_registry) { + _registry->_clearSplitMergeChunk(_nss); + } +} + +ScopedSplitMergeChunk::ScopedSplitMergeChunk(ScopedSplitMergeChunk&& other) { + *this = std::move(other); +} + +ScopedSplitMergeChunk& ScopedSplitMergeChunk::operator=(ScopedSplitMergeChunk&& other) { + if (&other != this) { + _registry = other._registry; + other._registry = nullptr; + _nss = std::move(other._nss); + } + + return *this; +} + } // namespace mongo diff --git a/src/mongo/db/s/active_migrations_registry.h b/src/mongo/db/s/active_migrations_registry.h index 29344432d2d..48c55b01883 100644 --- a/src/mongo/db/s/active_migrations_registry.h +++ b/src/mongo/db/s/active_migrations_registry.h @@ -35,8 +35,8 @@ #include "mongo/base/disallow_copying.h" #include "mongo/db/s/migration_session_id.h" #include "mongo/s/request_types/move_chunk_request.h" -#include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" +#include "mongo/stdx/unordered_map.h" #include "mongo/util/concurrency/notification.h" namespace mongo { @@ -44,8 +44,7 @@ namespace mongo { class OperationContext; class ScopedDonateChunk; class ScopedReceiveChunk; -template <typename T> -class StatusWith; +class ScopedSplitMergeChunk; /** * Thread-safe object that keeps track of the active migrations running on a node and limits them @@ -62,9 +61,9 @@ public: static ActiveMigrationsRegistry& get(OperationContext* opCtx); /** - * If there are no migrations running on this shard, registers an active migration with the - * specified arguments. Returns a ScopedDonateChunk, which must be signaled by the - * caller before it goes out of scope. + * If there are no migrations or split/merges running on this shard, registers an active + * migration with the specified arguments. Returns a ScopedDonateChunk, which must be signaled + * by the caller before it goes out of scope. * * If there is an active migration already running on this shard and it has the exact same * arguments, returns a ScopedDonateChunk. The ScopedDonateChunk can be used to join the @@ -75,9 +74,10 @@ public: StatusWith<ScopedDonateChunk> registerDonateChunk(const MoveChunkRequest& args); /** - * If there are no migrations running on this shard, registers an active receive operation with - * the specified session id and returns a ScopedReceiveChunk. The ScopedReceiveChunk will - * unregister the migration when the ScopedReceiveChunk goes out of scope. + * If there are no migrations or split/merges running on this shard, registers an active receive + * operation with the specified session id and returns a ScopedReceiveChunk. The + * ScopedReceiveChunk will unregister the migration when the ScopedReceiveChunk goes out of + * scope. * * Otherwise returns a ConflictingOperationInProgress error. */ @@ -86,6 +86,14 @@ public: const ShardId& fromShardId); /** + * If there are no migrations running on this shard, registers an active split or merge + * operation for the specified namespace and returns a scoped object which will in turn disallow + * other migrations or splits/merges for the same namespace (but not for other namespaces). + */ + StatusWith<ScopedSplitMergeChunk> registerSplitOrMergeChunk(const NamespaceString& nss, + const ChunkRange& chunkRange); + + /** * If a migration has been previously registered through a call to registerDonateChunk, returns * that namespace. Otherwise returns boost::none. */ @@ -102,6 +110,7 @@ public: private: friend class ScopedDonateChunk; friend class ScopedReceiveChunk; + friend class ScopedSplitMergeChunk; // Describes the state of a currently active moveChunk operation struct ActiveMoveChunkState { @@ -140,6 +149,24 @@ private: ShardId fromShardId; }; + // Describes the state of a currently active split or merge operation + struct ActiveSplitMergeChunkState { + ActiveSplitMergeChunkState(NamespaceString inNss, ChunkRange inRange) + : nss(std::move(inNss)), range(std::move(inRange)) {} + + /** + * Constructs an error status to return in the case of conflicting operations. + */ + Status constructErrorStatus() const; + + // Namespace for which a chunk is being split or merged + NamespaceString nss; + + // If split, bounds of the chunk being split; if merge, the end bounds of the range being + // merged + ChunkRange range; + }; + /** * Unregisters a previously registered namespace with an ongoing migration. Must only be called * if a previous call to registerDonateChunk has succeeded. @@ -152,6 +179,12 @@ private: */ void _clearReceiveChunk(); + /** + * Unregisters a previously registered split/merge chunk operation. Must only be called if a + * previous call to registerSplitOrMergeChunk has succeeded. + */ + void _clearSplitMergeChunk(const NamespaceString& nss); + // Protects the state below stdx::mutex _mutex; @@ -160,6 +193,10 @@ private: // If there is an active chunk receive operation, this field contains the original session id boost::optional<ActiveReceiveChunkState> _activeReceiveChunkState; + + // If there is an active split or merge chunk operation for a particular namespace, this map + // will contain an entry + stdx::unordered_map<NamespaceString, ActiveSplitMergeChunkState> _activeSplitMergeChunkStates; }; /** @@ -235,4 +272,23 @@ private: ActiveMigrationsRegistry* _registry; }; +/** + * Object of this class is returned from the registerSplitOrMergeChunk call of the active migrations + * registry. + */ +class ScopedSplitMergeChunk { +public: + ScopedSplitMergeChunk(ActiveMigrationsRegistry* registry, const NamespaceString& nss); + ~ScopedSplitMergeChunk(); + + ScopedSplitMergeChunk(ScopedSplitMergeChunk&&); + ScopedSplitMergeChunk& operator=(ScopedSplitMergeChunk&&); + +private: + // Registry from which to unregister the split/merge. Not owned. + ActiveMigrationsRegistry* _registry; + + NamespaceString _nss; +}; + } // namespace mongo diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp index 83137ad4950..a3e611ebcca 100644 --- a/src/mongo/db/s/merge_chunks_command.cpp +++ b/src/mongo/db/s/merge_chunks_command.cpp @@ -40,6 +40,7 @@ #include "mongo/db/field_parser.h" #include "mongo/db/logical_clock.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/s/active_migrations_registry.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/sharding_state.h" @@ -80,20 +81,12 @@ void mergeChunks(OperationContext* opCtx, const BSONObj& minKey, const BSONObj& maxKey, const OID& epoch) { - const std::string whyMessage = str::stream() << "merging chunks in " << nss.ns() << " from " - << redact(minKey) << " to " << redact(maxKey); - auto scopedDistLock = uassertStatusOKWithContext( - Grid::get(opCtx)->catalogClient()->getDistLockManager()->lock( - opCtx, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout), - str::stream() << "could not acquire collection lock for " << nss.ns() - << " to merge chunks in [" - << redact(minKey) - << ", " - << redact(maxKey) - << ")"); - auto const shardingState = ShardingState::get(opCtx); + auto scopedSplitOrMergeChunk( + uassertStatusOK(ActiveMigrationsRegistry::get(opCtx).registerSplitOrMergeChunk( + nss, ChunkRange(minKey, maxKey)))); + // We now have the collection distributed lock, refresh metadata to latest version and sanity // check forceShardFilteringMetadataRefresh(opCtx, nss, true /* forceRefreshFromThisThread */); @@ -155,7 +148,7 @@ void mergeChunks(OperationContext* opCtx, !chunksToMerge.empty()); // - // Validate the range starts and ends at chunks and has no holes, error if not valid + // Validate the range starts and ends at chunk boundaries and has no holes, error if not valid // BSONObj firstDocMin = chunksToMerge.front().getMin(); |