diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2021-06-03 06:34:00 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-06-14 13:05:32 +0000 |
commit | 3a42f34b634eb9e5b0ccde551f6d8439ed1700c2 (patch) | |
tree | 35eea4d7c25a4ddb3867196895cde2aebdbb3f45 | |
parent | 6cd2d27dbcd1522da59b61b5aea0378f72371c7d (diff) | |
download | mongo-3a42f34b634eb9e5b0ccde551f6d8439ed1700c2.tar.gz |
SERVER-56779 Allow multiple concurrent merges for the same collection across the cluster
(cherry picked from commit d0c6ab09ee3c2726d92b1044577f2c5ebd22b52c)
-rw-r--r-- | src/mongo/db/s/active_migrations_registry.cpp | 99 | ||||
-rw-r--r-- | src/mongo/db/s/active_migrations_registry.h | 88 | ||||
-rw-r--r-- | src/mongo/db/s/merge_chunks_command.cpp | 12 |
3 files changed, 154 insertions, 45 deletions
diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp index d58bcfe9805..70cab089b64 100644 --- a/src/mongo/db/s/active_migrations_registry.cpp +++ b/src/mongo/db/s/active_migrations_registry.cpp @@ -64,36 +64,36 @@ ActiveMigrationsRegistry& ActiveMigrationsRegistry::get(OperationContext* opCtx) void ActiveMigrationsRegistry::lock(OperationContext* opCtx, StringData reason) { stdx::unique_lock<Latch> lock(_mutex); - // This wait is to hold back additional lock requests while there is already one in - // progress. - opCtx->waitForConditionOrInterrupt(_lockCond, lock, [this] { return !_migrationsBlocked; }); + // This wait is to hold back additional lock requests while there is already one in progress + opCtx->waitForConditionOrInterrupt( + _chunkOperationsStateChangedCV, lock, [this] { return !_migrationsBlocked; }); // Setting flag before condvar returns to block new migrations from starting. (Favoring writers) - LOGV2(4675601, "Going to start blocking migrations", "reason"_attr = reason); + LOGV2(467560, "Going to start blocking migrations", "reason"_attr = reason); _migrationsBlocked = true; - // Wait for any ongoing migrations to complete. - opCtx->waitForConditionOrInterrupt( - _lockCond, lock, [this] { return !(_activeMoveChunkState || _activeReceiveChunkState); }); + // Wait for any ongoing chunk modifications to complete + opCtx->waitForConditionOrInterrupt(_chunkOperationsStateChangedCV, lock, [this] { + return !(_activeMoveChunkState || _activeReceiveChunkState); + }); } void ActiveMigrationsRegistry::unlock(StringData reason) { stdx::lock_guard<Latch> lock(_mutex); - LOGV2(4675602, "Going to stop blocking migrations", "reason"_attr = reason); + LOGV2(467561, "Going to stop blocking migrations", "reason"_attr = reason); _migrationsBlocked = false; - _lockCond.notify_all(); + _chunkOperationsStateChangedCV.notify_all(); } StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk( OperationContext* opCtx, const MoveChunkRequest& args) { - stdx::unique_lock<Latch> lk(_mutex); + stdx::unique_lock<Latch> ul(_mutex); - if (_migrationsBlocked) { - LOGV2(4675603, "Register donate chunk waiting for migrations to be unblocked"); - opCtx->waitForConditionOrInterrupt(_lockCond, lk, [this] { return !_migrationsBlocked; }); - } + opCtx->waitForConditionOrInterrupt(_chunkOperationsStateChangedCV, ul, [&] { + return !_migrationsBlocked && !_activeSplitMergeChunkStates.count(args.getNss()); + }); if (_activeReceiveChunkState) { return _activeReceiveChunkState->constructErrorStatus(); @@ -117,12 +117,10 @@ StatusWith<ScopedReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk( const NamespaceString& nss, const ChunkRange& chunkRange, const ShardId& fromShardId) { - stdx::unique_lock<Latch> lk(_mutex); + stdx::unique_lock<Latch> ul(_mutex); - if (_migrationsBlocked) { - LOGV2(4675604, "Register receive chunk waiting for migrations to be unblocked"); - opCtx->waitForConditionOrInterrupt(_lockCond, lk, [this] { return !_migrationsBlocked; }); - } + opCtx->waitForConditionOrInterrupt( + _chunkOperationsStateChangedCV, ul, [this] { return !_migrationsBlocked; }); if (_activeReceiveChunkState) { return _activeReceiveChunkState->constructErrorStatus(); @@ -137,6 +135,22 @@ StatusWith<ScopedReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk( return {ScopedReceiveChunk(this)}; } +StatusWith<ScopedSplitMergeChunk> ActiveMigrationsRegistry::registerSplitOrMergeChunk( + OperationContext* opCtx, const NamespaceString& nss, const ChunkRange& chunkRange) { + stdx::unique_lock<Latch> ul(_mutex); + + opCtx->waitForConditionOrInterrupt(_chunkOperationsStateChangedCV, ul, [&] { + return !(_activeMoveChunkState && _activeMoveChunkState->args.getNss() == nss) && + !_activeSplitMergeChunkStates.count(nss); + }); + + auto [it, inserted] = + _activeSplitMergeChunkStates.emplace(nss, ActiveSplitMergeChunkState(nss, chunkRange)); + invariant(inserted); + + return {ScopedSplitMergeChunk(this, nss)}; +} + boost::optional<NamespaceString> ActiveMigrationsRegistry::getActiveDonateChunkNss() { stdx::lock_guard<Latch> lk(_mutex); if (_activeMoveChunkState) { @@ -178,20 +192,31 @@ void ActiveMigrationsRegistry::_clearDonateChunk() { stdx::lock_guard<Latch> lk(_mutex); invariant(_activeMoveChunkState); _activeMoveChunkState.reset(); - _lockCond.notify_all(); + _chunkOperationsStateChangedCV.notify_all(); } void ActiveMigrationsRegistry::_clearReceiveChunk() { stdx::lock_guard<Latch> lk(_mutex); invariant(_activeReceiveChunkState); + LOGV2(5004703, + "clearReceiveChunk ", + "currentKeys"_attr = ChunkRange(_activeReceiveChunkState->range.getMin(), + _activeReceiveChunkState->range.getMax()) + .toString()); _activeReceiveChunkState.reset(); - _lockCond.notify_all(); + _chunkOperationsStateChangedCV.notify_all(); +} + +void ActiveMigrationsRegistry::_clearSplitMergeChunk(const NamespaceString& nss) { + stdx::lock_guard<Latch> lk(_mutex); + invariant(_activeSplitMergeChunkStates.erase(nss)); + _chunkOperationsStateChangedCV.notify_all(); } Status ActiveMigrationsRegistry::ActiveMoveChunkState::constructErrorStatus() const { return {ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Unable to start new migration because this shard is currently " - "donating chunk " + str::stream() << "Unable to start new balancer operation because this shard is " + "currently donating chunk " << ChunkRange(args.getMinKey(), args.getMaxKey()).toString() << " for namespace " << args.getNss().ns() << " to " << args.getToShardId()}; @@ -199,8 +224,8 @@ Status ActiveMigrationsRegistry::ActiveMoveChunkState::constructErrorStatus() co Status ActiveMigrationsRegistry::ActiveReceiveChunkState::constructErrorStatus() const { return {ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Unable to start new migration because this shard is currently " - "receiving chunk " + str::stream() << "Unable to start new balancer operation because this shard is " + "currently receiving chunk " << range.toString() << " for namespace " << nss.ns() << " from " << fromShardId}; } @@ -266,4 +291,28 @@ ScopedReceiveChunk& ScopedReceiveChunk::operator=(ScopedReceiveChunk&& other) { return *this; } +ScopedSplitMergeChunk::ScopedSplitMergeChunk(ActiveMigrationsRegistry* registry, + const NamespaceString& nss) + : _registry(registry), _nss(nss) {} + +ScopedSplitMergeChunk::~ScopedSplitMergeChunk() { + if (_registry) { + _registry->_clearSplitMergeChunk(_nss); + } +} + +ScopedSplitMergeChunk::ScopedSplitMergeChunk(ScopedSplitMergeChunk&& other) { + *this = std::move(other); +} + +ScopedSplitMergeChunk& ScopedSplitMergeChunk::operator=(ScopedSplitMergeChunk&& other) { + if (&other != this) { + _registry = other._registry; + other._registry = nullptr; + _nss = std::move(other._nss); + } + + return *this; +} + } // namespace mongo diff --git a/src/mongo/db/s/active_migrations_registry.h b/src/mongo/db/s/active_migrations_registry.h index 4a8bd2d48aa..f6a99185d4a 100644 --- a/src/mongo/db/s/active_migrations_registry.h +++ b/src/mongo/db/s/active_migrations_registry.h @@ -30,7 +30,6 @@ #pragma once #include <boost/optional.hpp> -#include <memory> #include "mongo/db/s/migration_session_id.h" #include "mongo/platform/mutex.h" @@ -42,12 +41,19 @@ namespace mongo { class OperationContext; class ScopedDonateChunk; class ScopedReceiveChunk; -template <typename T> -class StatusWith; +class ScopedSplitMergeChunk; /** - * Thread-safe object that keeps track of the active migrations running on a node and limits them - * to only one per shard. There is only one instance of this object per shard. + * This class is used to synchronise all the active routing info operations for chunks owned by this + * shard. There is only one instance of it per ServiceContext. + * + * It implements a non-fair lock manager, which provides the following guarantees: + * + * - Move || Move (same chunk): The second move will join the first + * - Move || Move (different chunks or collections): The second move will result in a + * ConflictingOperationInProgress error + * - Move || Split/Merge (same collection): The second operation will block behind the first + * - Move/Split/Merge || Split/Merge (for different collections): Can proceed concurrently */ class ActiveMigrationsRegistry { ActiveMigrationsRegistry(const ActiveMigrationsRegistry&) = delete; @@ -70,9 +76,9 @@ public: void unlock(StringData reason); /** - * If there are no migrations running on this shard, registers an active migration with the - * specified arguments. Returns a ScopedDonateChunk, which must be signaled by the - * caller before it goes out of scope. + * If there are no migrations or split/merges running on this shard, registers an active + * migration with the specified arguments. Returns a ScopedDonateChunk, which must be signaled + * by the caller before it goes out of scope. * * If there is an active migration already running on this shard and it has the exact same * arguments, returns a ScopedDonateChunk. The ScopedDonateChunk can be used to join the @@ -84,9 +90,10 @@ public: const MoveChunkRequest& args); /** - * If there are no migrations running on this shard, registers an active receive operation with - * the specified session id and returns a ScopedReceiveChunk. The ScopedReceiveChunk will - * unregister the migration when the ScopedReceiveChunk goes out of scope. + * If there are no migrations or split/merges running on this shard, registers an active receive + * operation with the specified session id and returns a ScopedReceiveChunk. The + * ScopedReceiveChunk will unregister the migration when the ScopedReceiveChunk goes out of + * scope. * * Otherwise returns a ConflictingOperationInProgress error. */ @@ -96,6 +103,15 @@ public: const ShardId& fromShardId); /** + * If there are no migrations running on this shard, registers an active split or merge + * operation for the specified namespace and returns a scoped object which will in turn disallow + * other migrations or splits/merges for the same namespace (but not for other namespaces). + */ + StatusWith<ScopedSplitMergeChunk> registerSplitOrMergeChunk(OperationContext* opCtx, + const NamespaceString& nss, + const ChunkRange& chunkRange); + + /** * If a migration has been previously registered through a call to registerDonateChunk, returns * that namespace. Otherwise returns boost::none. */ @@ -112,6 +128,7 @@ public: private: friend class ScopedDonateChunk; friend class ScopedReceiveChunk; + friend class ScopedSplitMergeChunk; // Describes the state of a currently active moveChunk operation struct ActiveMoveChunkState { @@ -150,6 +167,19 @@ private: ShardId fromShardId; }; + // Describes the state of a currently active split or merge operation + struct ActiveSplitMergeChunkState { + ActiveSplitMergeChunkState(NamespaceString inNss, ChunkRange inRange) + : nss(std::move(inNss)), range(std::move(inRange)) {} + + // Namespace for which a chunk is being split or merged + NamespaceString nss; + + // If split, bounds of the chunk being split; if merge, the end bounds of the range being + // merged + ChunkRange range; + }; + /** * Unregisters a previously registered namespace with an ongoing migration. Must only be called * if a previous call to registerDonateChunk has succeeded. @@ -162,10 +192,21 @@ private: */ void _clearReceiveChunk(); + /** + * Unregisters a previously registered split/merge chunk operation. Must only be called if a + * previous call to registerSplitOrMergeChunk has succeeded. + */ + void _clearSplitMergeChunk(const NamespaceString& nss); + // Protects the state below Mutex _mutex = MONGO_MAKE_LATCH("ActiveMigrationsRegistry::_mutex"); - stdx::condition_variable _lockCond; + // Condition variable which will be signaled whenever any of the states below become false, + // boost::none or a specific namespace removed from the map. + stdx::condition_variable _chunkOperationsStateChangedCV; + + // Overarching block, which doesn't allow migrations to occur even when there isn't an active + // migration ongoing. Used during recovery and FCV changes. bool _migrationsBlocked{false}; // If there is an active moveChunk operation, this field contains the original request @@ -173,6 +214,10 @@ private: // If there is an active chunk receive operation, this field contains the original session id boost::optional<ActiveReceiveChunkState> _activeReceiveChunkState; + + // If there is an active split or merge chunk operation for a particular namespace, this map + // will contain an entry + stdx::unordered_map<NamespaceString, ActiveSplitMergeChunkState> _activeSplitMergeChunkStates; }; class MigrationBlockingGuard { @@ -270,4 +315,23 @@ private: ActiveMigrationsRegistry* _registry; }; +/** + * Object of this class is returned from the registerSplitOrMergeChunk call of the active migrations + * registry. + */ +class ScopedSplitMergeChunk { +public: + ScopedSplitMergeChunk(ActiveMigrationsRegistry* registry, const NamespaceString& nss); + ~ScopedSplitMergeChunk(); + + ScopedSplitMergeChunk(ScopedSplitMergeChunk&&); + ScopedSplitMergeChunk& operator=(ScopedSplitMergeChunk&&); + +private: + // Registry from which to unregister the split/merge. Not owned. + ActiveMigrationsRegistry* _registry; + + NamespaceString _nss; +}; + } // namespace mongo diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp index a79727716dc..041cfec28bc 100644 --- a/src/mongo/db/s/merge_chunks_command.cpp +++ b/src/mongo/db/s/merge_chunks_command.cpp @@ -39,6 +39,7 @@ #include "mongo/db/field_parser.h" #include "mongo/db/logical_clock.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/s/active_migrations_registry.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" @@ -224,14 +225,9 @@ void mergeChunks(OperationContext* opCtx, const BSONObj& minKey, const BSONObj& maxKey, const OID& epoch) { - const std::string whyMessage = str::stream() << "merging chunks in " << nss.ns() << " from " - << redact(minKey) << " to " << redact(maxKey); - auto scopedDistLock = uassertStatusOKWithContext( - Grid::get(opCtx)->catalogClient()->getDistLockManager()->lock( - opCtx, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout), - str::stream() << "could not acquire collection lock for " << nss.ns() - << " to merge chunks in [" << redact(minKey) << ", " << redact(maxKey) - << ")"); + auto scopedSplitOrMergeChunk( + uassertStatusOK(ActiveMigrationsRegistry::get(opCtx).registerSplitOrMergeChunk( + opCtx, nss, ChunkRange(minKey, maxKey)))); // We now have the collection distributed lock, refresh metadata to latest version and sanity // check |