diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2021-05-31 12:58:41 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-06-01 13:22:26 +0000 |
commit | 5339712ac3d71198ca06fa6894c787010d2d5d96 (patch) | |
tree | be376e5ccf80ffe15e6814553f644c8b5e668d57 | |
parent | 8b565a174c9edf5451b6325085cebe0f1afe1e3a (diff) | |
download | mongo-5339712ac3d71198ca06fa6894c787010d2d5d96.tar.gz |
Revert "SERVER-56779 Allow multiple concurrent merges for the same collection across the cluster"
This reverts commit 6c37e83e56bf2ceea19a4de59a5aba38e28de65a.
-rw-r--r-- | src/mongo/db/s/active_migrations_registry.cpp | 91 | ||||
-rw-r--r-- | src/mongo/db/s/active_migrations_registry.h | 74 | ||||
-rw-r--r-- | src/mongo/db/s/merge_chunks_command.cpp | 19 |
3 files changed, 35 insertions, 149 deletions
diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp index 6a05fb1f460..78503d62090 100644 --- a/src/mongo/db/s/active_migrations_registry.cpp +++ b/src/mongo/db/s/active_migrations_registry.cpp @@ -74,11 +74,6 @@ StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk( return _activeMoveChunkState->constructErrorStatus(); } - auto itSplitMerge = _activeSplitMergeChunkStates.find(args.getNss()); - if (itSplitMerge != _activeSplitMergeChunkStates.end()) { - return itSplitMerge->second.constructErrorStatus(); - } - _activeMoveChunkState.emplace(args); return {ScopedDonateChunk(this, true, _activeMoveChunkState->notification)}; @@ -100,26 +95,6 @@ StatusWith<ScopedReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk( return {ScopedReceiveChunk(this)}; } -StatusWith<ScopedSplitMergeChunk> ActiveMigrationsRegistry::registerSplitOrMergeChunk( - const NamespaceString& nss, const ChunkRange& chunkRange) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_activeReceiveChunkState) { - return _activeReceiveChunkState->constructErrorStatus(); - } - - if (_activeMoveChunkState) { - return _activeMoveChunkState->constructErrorStatus(); - } - - auto emplaceResult = - _activeSplitMergeChunkStates.emplace(nss, ActiveSplitMergeChunkState(nss, chunkRange)); - if (!emplaceResult.second) { - return emplaceResult.first->second.constructErrorStatus(); - } - - return {ScopedSplitMergeChunk(this, nss)}; -} - boost::optional<NamespaceString> ActiveMigrationsRegistry::getActiveDonateChunkNss() { stdx::lock_guard<stdx::mutex> lk(_mutex); if (_activeMoveChunkState) { @@ -168,42 +143,26 @@ void ActiveMigrationsRegistry::_clearReceiveChunk() { _activeReceiveChunkState.reset(); } -void ActiveMigrationsRegistry::_clearSplitMergeChunk(const NamespaceString& nss) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_activeSplitMergeChunkStates.erase(nss)); -} - Status ActiveMigrationsRegistry::ActiveMoveChunkState::constructErrorStatus() const { - return { - ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Unable to start new balancer operation because this shard is currently " - "donating chunk " - << ChunkRange(args.getMinKey(), args.getMaxKey()).toString() - << " for namespace " - << args.getNss().ns() - << " to " - << args.getToShardId()}; + return {ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Unable to start new migration because this shard is currently " + "donating chunk " + << ChunkRange(args.getMinKey(), args.getMaxKey()).toString() + << " for namespace " + << args.getNss().ns() + << " to " + << args.getToShardId()}; } Status ActiveMigrationsRegistry::ActiveReceiveChunkState::constructErrorStatus() const { - return { - ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Unable to start new balancer operation because this shard is currently " - "receiving chunk " - << range.toString() - << " for namespace " - << nss.ns() - << " from " - << fromShardId}; -} - -Status ActiveMigrationsRegistry::ActiveSplitMergeChunkState::constructErrorStatus() const { return {ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Unable to start new balancer operation because this shard is " - "currently splitting or merging chunk " + str::stream() << "Unable to start new migration because this shard is currently " + "receiving chunk " << range.toString() << " for namespace " - << nss.ns()}; + << nss.ns() + << " from " + << fromShardId}; } ScopedDonateChunk::ScopedDonateChunk(ActiveMigrationsRegistry* registry, @@ -267,28 +226,4 @@ ScopedReceiveChunk& ScopedReceiveChunk::operator=(ScopedReceiveChunk&& other) { return *this; } -ScopedSplitMergeChunk::ScopedSplitMergeChunk(ActiveMigrationsRegistry* registry, - const NamespaceString& nss) - : _registry(registry), _nss(nss) {} - -ScopedSplitMergeChunk::~ScopedSplitMergeChunk() { - if (_registry) { - _registry->_clearSplitMergeChunk(_nss); - } -} - -ScopedSplitMergeChunk::ScopedSplitMergeChunk(ScopedSplitMergeChunk&& other) { - *this = std::move(other); -} - -ScopedSplitMergeChunk& ScopedSplitMergeChunk::operator=(ScopedSplitMergeChunk&& other) { - if (&other != this) { - _registry = other._registry; - other._registry = nullptr; - _nss = std::move(other._nss); - } - - return *this; -} - } // namespace mongo diff --git a/src/mongo/db/s/active_migrations_registry.h b/src/mongo/db/s/active_migrations_registry.h index 48c55b01883..29344432d2d 100644 --- a/src/mongo/db/s/active_migrations_registry.h +++ b/src/mongo/db/s/active_migrations_registry.h @@ -35,8 +35,8 @@ #include "mongo/base/disallow_copying.h" #include "mongo/db/s/migration_session_id.h" #include "mongo/s/request_types/move_chunk_request.h" +#include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" -#include "mongo/stdx/unordered_map.h" #include "mongo/util/concurrency/notification.h" namespace mongo { @@ -44,7 +44,8 @@ namespace mongo { class OperationContext; class ScopedDonateChunk; class ScopedReceiveChunk; -class ScopedSplitMergeChunk; +template <typename T> +class StatusWith; /** * Thread-safe object that keeps track of the active migrations running on a node and limits them @@ -61,9 +62,9 @@ public: static ActiveMigrationsRegistry& get(OperationContext* opCtx); /** - * If there are no migrations or split/merges running on this shard, registers an active - * migration with the specified arguments. Returns a ScopedDonateChunk, which must be signaled - * by the caller before it goes out of scope. + * If there are no migrations running on this shard, registers an active migration with the + * specified arguments. Returns a ScopedDonateChunk, which must be signaled by the + * caller before it goes out of scope. * * If there is an active migration already running on this shard and it has the exact same * arguments, returns a ScopedDonateChunk. The ScopedDonateChunk can be used to join the @@ -74,10 +75,9 @@ public: StatusWith<ScopedDonateChunk> registerDonateChunk(const MoveChunkRequest& args); /** - * If there are no migrations or split/merges running on this shard, registers an active receive - * operation with the specified session id and returns a ScopedReceiveChunk. The - * ScopedReceiveChunk will unregister the migration when the ScopedReceiveChunk goes out of - * scope. + * If there are no migrations running on this shard, registers an active receive operation with + * the specified session id and returns a ScopedReceiveChunk. The ScopedReceiveChunk will + * unregister the migration when the ScopedReceiveChunk goes out of scope. * * Otherwise returns a ConflictingOperationInProgress error. */ @@ -86,14 +86,6 @@ public: const ShardId& fromShardId); /** - * If there are no migrations running on this shard, registers an active split or merge - * operation for the specified namespace and returns a scoped object which will in turn disallow - * other migrations or splits/merges for the same namespace (but not for other namespaces). - */ - StatusWith<ScopedSplitMergeChunk> registerSplitOrMergeChunk(const NamespaceString& nss, - const ChunkRange& chunkRange); - - /** * If a migration has been previously registered through a call to registerDonateChunk, returns * that namespace. Otherwise returns boost::none. */ @@ -110,7 +102,6 @@ public: private: friend class ScopedDonateChunk; friend class ScopedReceiveChunk; - friend class ScopedSplitMergeChunk; // Describes the state of a currently active moveChunk operation struct ActiveMoveChunkState { @@ -149,24 +140,6 @@ private: ShardId fromShardId; }; - // Describes the state of a currently active split or merge operation - struct ActiveSplitMergeChunkState { - ActiveSplitMergeChunkState(NamespaceString inNss, ChunkRange inRange) - : nss(std::move(inNss)), range(std::move(inRange)) {} - - /** - * Constructs an error status to return in the case of conflicting operations. - */ - Status constructErrorStatus() const; - - // Namespace for which a chunk is being split or merged - NamespaceString nss; - - // If split, bounds of the chunk being split; if merge, the end bounds of the range being - // merged - ChunkRange range; - }; - /** * Unregisters a previously registered namespace with an ongoing migration. Must only be called * if a previous call to registerDonateChunk has succeeded. @@ -179,12 +152,6 @@ private: */ void _clearReceiveChunk(); - /** - * Unregisters a previously registered split/merge chunk operation. Must only be called if a - * previous call to registerSplitOrMergeChunk has succeeded. - */ - void _clearSplitMergeChunk(const NamespaceString& nss); - // Protects the state below stdx::mutex _mutex; @@ -193,10 +160,6 @@ private: // If there is an active chunk receive operation, this field contains the original session id boost::optional<ActiveReceiveChunkState> _activeReceiveChunkState; - - // If there is an active split or merge chunk operation for a particular namespace, this map - // will contain an entry - stdx::unordered_map<NamespaceString, ActiveSplitMergeChunkState> _activeSplitMergeChunkStates; }; /** @@ -272,23 +235,4 @@ private: ActiveMigrationsRegistry* _registry; }; -/** - * Object of this class is returned from the registerSplitOrMergeChunk call of the active migrations - * registry. - */ -class ScopedSplitMergeChunk { -public: - ScopedSplitMergeChunk(ActiveMigrationsRegistry* registry, const NamespaceString& nss); - ~ScopedSplitMergeChunk(); - - ScopedSplitMergeChunk(ScopedSplitMergeChunk&&); - ScopedSplitMergeChunk& operator=(ScopedSplitMergeChunk&&); - -private: - // Registry from which to unregister the split/merge. Not owned. - ActiveMigrationsRegistry* _registry; - - NamespaceString _nss; -}; - } // namespace mongo diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp index a3e611ebcca..83137ad4950 100644 --- a/src/mongo/db/s/merge_chunks_command.cpp +++ b/src/mongo/db/s/merge_chunks_command.cpp @@ -40,7 +40,6 @@ #include "mongo/db/field_parser.h" #include "mongo/db/logical_clock.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/s/active_migrations_registry.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/sharding_state.h" @@ -81,11 +80,19 @@ void mergeChunks(OperationContext* opCtx, const BSONObj& minKey, const BSONObj& maxKey, const OID& epoch) { - auto const shardingState = ShardingState::get(opCtx); + const std::string whyMessage = str::stream() << "merging chunks in " << nss.ns() << " from " + << redact(minKey) << " to " << redact(maxKey); + auto scopedDistLock = uassertStatusOKWithContext( + Grid::get(opCtx)->catalogClient()->getDistLockManager()->lock( + opCtx, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout), + str::stream() << "could not acquire collection lock for " << nss.ns() + << " to merge chunks in [" + << redact(minKey) + << ", " + << redact(maxKey) + << ")"); - auto scopedSplitOrMergeChunk( - uassertStatusOK(ActiveMigrationsRegistry::get(opCtx).registerSplitOrMergeChunk( - nss, ChunkRange(minKey, maxKey)))); + auto const shardingState = ShardingState::get(opCtx); // We now have the collection distributed lock, refresh metadata to latest version and sanity // check @@ -148,7 +155,7 @@ void mergeChunks(OperationContext* opCtx, !chunksToMerge.empty()); // - // Validate the range starts and ends at chunk boundaries and has no holes, error if not valid + // Validate the range starts and ends at chunks and has no holes, error if not valid // BSONObj firstDocMin = chunksToMerge.front().getMin(); |