diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2021-06-03 06:34:00 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-06-12 08:38:13 +0000 |
commit | 792dcddddf6bfedc45208dbc1c513d2dea27546f (patch) | |
tree | d41824efa24964f9a7406f08e02ca5803f37bbfc | |
parent | e2a3ba458a82547101976aa80ee7242598e5d43e (diff) | |
download | mongo-792dcddddf6bfedc45208dbc1c513d2dea27546f.tar.gz |
SERVER-56779 Allow multiple concurrent merges for the same collection across the cluster
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/active_migrations_registry.cpp | 104 | ||||
-rw-r--r-- | src/mongo/db/s/active_migrations_registry.h | 88 | ||||
-rw-r--r-- | src/mongo/db/s/merge_chunks_command.cpp | 13 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 1 |
5 files changed, 157 insertions, 50 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 7db1148890f..c30feb23528 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -2051,7 +2051,6 @@ env.Library( '$BUILD_DIR/mongo/executor/network_interface_factory', '$BUILD_DIR/mongo/rpc/rpc', '$BUILD_DIR/mongo/s/commands/shared_cluster_commands', - '$BUILD_DIR/mongo/s/sessions_collection_sharded', '$BUILD_DIR/mongo/scripting/scripting_server', '$BUILD_DIR/mongo/transport/message_compressor_options_server', '$BUILD_DIR/mongo/transport/service_entry_point', diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp index 1fec861ffe2..3ec8706c32d 100644 --- a/src/mongo/db/s/active_migrations_registry.cpp +++ b/src/mongo/db/s/active_migrations_registry.cpp @@ -35,6 +35,7 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/s/dist_lock_manager.h" #include "mongo/db/s/migration_session_id.h" #include "mongo/db/s/migration_source_manager.h" #include "mongo/db/service_context.h" @@ -64,36 +65,36 @@ ActiveMigrationsRegistry& ActiveMigrationsRegistry::get(OperationContext* opCtx) void ActiveMigrationsRegistry::lock(OperationContext* opCtx, StringData reason) { stdx::unique_lock<Latch> lock(_mutex); - // This wait is to hold back additional lock requests while there is already one in - // progress. - opCtx->waitForConditionOrInterrupt(_lockCond, lock, [this] { return !_migrationsBlocked; }); + // This wait is to hold back additional lock requests while there is already one in progress + opCtx->waitForConditionOrInterrupt( + _chunkOperationsStateChangedCV, lock, [this] { return !_migrationsBlocked; }); // Setting flag before condvar returns to block new migrations from starting. (Favoring writers) - LOGV2(4675601, "Going to start blocking migrations", "reason"_attr = reason); + LOGV2(467560, "Going to start blocking migrations", "reason"_attr = reason); _migrationsBlocked = true; - // Wait for any ongoing migrations to complete. - opCtx->waitForConditionOrInterrupt( - _lockCond, lock, [this] { return !(_activeMoveChunkState || _activeReceiveChunkState); }); + // Wait for any ongoing chunk modifications to complete + opCtx->waitForConditionOrInterrupt(_chunkOperationsStateChangedCV, lock, [this] { + return !(_activeMoveChunkState || _activeReceiveChunkState); + }); } void ActiveMigrationsRegistry::unlock(StringData reason) { stdx::lock_guard<Latch> lock(_mutex); - LOGV2(4675602, "Going to stop blocking migrations", "reason"_attr = reason); + LOGV2(467561, "Going to stop blocking migrations", "reason"_attr = reason); _migrationsBlocked = false; - _lockCond.notify_all(); + _chunkOperationsStateChangedCV.notify_all(); } StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk( OperationContext* opCtx, const MoveChunkRequest& args) { - stdx::unique_lock<Latch> lk(_mutex); + stdx::unique_lock<Latch> ul(_mutex); - if (_migrationsBlocked) { - LOGV2(4675603, "Register donate chunk waiting for migrations to be unblocked"); - opCtx->waitForConditionOrInterrupt(_lockCond, lk, [this] { return !_migrationsBlocked; }); - } + opCtx->waitForConditionOrInterrupt(_chunkOperationsStateChangedCV, ul, [&] { + return !_migrationsBlocked && !_activeSplitMergeChunkStates.count(args.getNss()); + }); if (_activeReceiveChunkState) { return _activeReceiveChunkState->constructErrorStatus(); @@ -110,7 +111,7 @@ StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk( } LOGV2(5004700, - "registerDonateChunk ", + "registerDonateChunk", "currentKeys"_attr = ChunkRange(_activeMoveChunkState->args.getMinKey(), _activeMoveChunkState->args.getMaxKey()) .toString(), @@ -118,6 +119,7 @@ StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk( "newKeys"_attr = ChunkRange(args.getMinKey(), args.getMaxKey()).toString(), "newToShardId"_attr = args.getToShardId(), "ns"_attr = args.getNss().ns()); + return _activeMoveChunkState->constructErrorStatus(); } @@ -131,12 +133,10 @@ StatusWith<ScopedReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk( const NamespaceString& nss, const ChunkRange& chunkRange, const ShardId& fromShardId) { - stdx::unique_lock<Latch> lk(_mutex); + stdx::unique_lock<Latch> ul(_mutex); - if (_migrationsBlocked) { - LOGV2(4675604, "Register receive chunk waiting for migrations to be unblocked"); - opCtx->waitForConditionOrInterrupt(_lockCond, lk, [this] { return !_migrationsBlocked; }); - } + opCtx->waitForConditionOrInterrupt( + _chunkOperationsStateChangedCV, ul, [this] { return !_migrationsBlocked; }); if (_activeReceiveChunkState) { return _activeReceiveChunkState->constructErrorStatus(); @@ -158,6 +158,22 @@ StatusWith<ScopedReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk( return {ScopedReceiveChunk(this)}; } +StatusWith<ScopedSplitMergeChunk> ActiveMigrationsRegistry::registerSplitOrMergeChunk( + OperationContext* opCtx, const NamespaceString& nss, const ChunkRange& chunkRange) { + stdx::unique_lock<Latch> ul(_mutex); + + opCtx->waitForConditionOrInterrupt(_chunkOperationsStateChangedCV, ul, [&] { + return !(_activeMoveChunkState && _activeMoveChunkState->args.getNss() == nss) && + !_activeSplitMergeChunkStates.count(nss); + }); + + auto [it, inserted] = + _activeSplitMergeChunkStates.emplace(nss, ActiveSplitMergeChunkState(nss, chunkRange)); + invariant(inserted); + + return {ScopedSplitMergeChunk(this, nss)}; +} + boost::optional<NamespaceString> ActiveMigrationsRegistry::getActiveDonateChunkNss() { stdx::lock_guard<Latch> lk(_mutex); if (_activeMoveChunkState) { @@ -205,20 +221,31 @@ void ActiveMigrationsRegistry::_clearDonateChunk() { .toString(), "currentToShardId"_attr = _activeMoveChunkState->args.getToShardId()); _activeMoveChunkState.reset(); - _lockCond.notify_all(); + _chunkOperationsStateChangedCV.notify_all(); } void ActiveMigrationsRegistry::_clearReceiveChunk() { stdx::lock_guard<Latch> lk(_mutex); invariant(_activeReceiveChunkState); + LOGV2(5004703, + "clearReceiveChunk ", + "currentKeys"_attr = ChunkRange(_activeReceiveChunkState->range.getMin(), + _activeReceiveChunkState->range.getMax()) + .toString()); _activeReceiveChunkState.reset(); - _lockCond.notify_all(); + _chunkOperationsStateChangedCV.notify_all(); +} + +void ActiveMigrationsRegistry::_clearSplitMergeChunk(const NamespaceString& nss) { + stdx::lock_guard<Latch> lk(_mutex); + invariant(_activeSplitMergeChunkStates.erase(nss)); + _chunkOperationsStateChangedCV.notify_all(); } Status ActiveMigrationsRegistry::ActiveMoveChunkState::constructErrorStatus() const { return {ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Unable to start new migration because this shard is currently " - "donating chunk " + str::stream() << "Unable to start new balancer operation because this shard is " + "currently donating chunk " << ChunkRange(args.getMinKey(), args.getMaxKey()).toString() << " for namespace " << args.getNss().ns() << " to " << args.getToShardId()}; @@ -226,8 +253,8 @@ Status ActiveMigrationsRegistry::ActiveMoveChunkState::constructErrorStatus() co Status ActiveMigrationsRegistry::ActiveReceiveChunkState::constructErrorStatus() const { return {ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Unable to start new migration because this shard is currently " - "receiving chunk " + str::stream() << "Unable to start new balancer operation because this shard is " + "currently receiving chunk " << range.toString() << " for namespace " << nss.ns() << " from " << fromShardId}; } @@ -245,7 +272,6 @@ ScopedDonateChunk::~ScopedDonateChunk() { invariant(*_completionNotification); _registry->_clearDonateChunk(); } - LOGV2(5004703, "~ScopedDonateChunk", "_shouldExecute"_attr = _shouldExecute); } ScopedDonateChunk::ScopedDonateChunk(ScopedDonateChunk&& other) { @@ -294,4 +320,28 @@ ScopedReceiveChunk& ScopedReceiveChunk::operator=(ScopedReceiveChunk&& other) { return *this; } +ScopedSplitMergeChunk::ScopedSplitMergeChunk(ActiveMigrationsRegistry* registry, + const NamespaceString& nss) + : _registry(registry), _nss(nss) {} + +ScopedSplitMergeChunk::~ScopedSplitMergeChunk() { + if (_registry) { + _registry->_clearSplitMergeChunk(_nss); + } +} + +ScopedSplitMergeChunk::ScopedSplitMergeChunk(ScopedSplitMergeChunk&& other) { + *this = std::move(other); +} + +ScopedSplitMergeChunk& ScopedSplitMergeChunk::operator=(ScopedSplitMergeChunk&& other) { + if (&other != this) { + _registry = other._registry; + other._registry = nullptr; + _nss = std::move(other._nss); + } + + return *this; +} + } // namespace mongo diff --git a/src/mongo/db/s/active_migrations_registry.h b/src/mongo/db/s/active_migrations_registry.h index 29b326cf4ef..ed6ee4c8f4e 100644 --- a/src/mongo/db/s/active_migrations_registry.h +++ b/src/mongo/db/s/active_migrations_registry.h @@ -30,7 +30,6 @@ #pragma once #include <boost/optional.hpp> -#include <memory> #include "mongo/db/s/migration_session_id.h" #include "mongo/platform/mutex.h" @@ -42,12 +41,19 @@ namespace mongo { class OperationContext; class ScopedDonateChunk; class ScopedReceiveChunk; -template <typename T> -class StatusWith; +class ScopedSplitMergeChunk; /** - * Thread-safe object that keeps track of the active migrations running on a node and limits them - * to only one per shard. There is only one instance of this object per shard. + * This class is used to synchronise all the active routing info operations for chunks owned by this + * shard. There is only one instance of it per ServiceContext. + * + * It implements a non-fair lock manager, which provides the following guarantees: + * + * - Move || Move (same chunk): The second move will join the first + * - Move || Move (different chunks or collections): The second move will result in a + * ConflictingOperationInProgress error + * - Move || Split/Merge (same collection): The second operation will block behind the first + * - Move/Split/Merge || Split/Merge (for different collections): Can proceed concurrently */ class ActiveMigrationsRegistry { ActiveMigrationsRegistry(const ActiveMigrationsRegistry&) = delete; @@ -70,9 +76,9 @@ public: void unlock(StringData reason); /** - * If there are no migrations running on this shard, registers an active migration with the - * specified arguments. Returns a ScopedDonateChunk, which must be signaled by the - * caller before it goes out of scope. + * If there are no migrations or split/merges running on this shard, registers an active + * migration with the specified arguments. Returns a ScopedDonateChunk, which must be signaled + * by the caller before it goes out of scope. * * If there is an active migration already running on this shard and it has the exact same * arguments, returns a ScopedDonateChunk. The ScopedDonateChunk can be used to join the @@ -84,9 +90,10 @@ public: const MoveChunkRequest& args); /** - * If there are no migrations running on this shard, registers an active receive operation with - * the specified session id and returns a ScopedReceiveChunk. The ScopedReceiveChunk will - * unregister the migration when the ScopedReceiveChunk goes out of scope. + * If there are no migrations or split/merges running on this shard, registers an active receive + * operation with the specified session id and returns a ScopedReceiveChunk. The + * ScopedReceiveChunk will unregister the migration when the ScopedReceiveChunk goes out of + * scope. * * Otherwise returns a ConflictingOperationInProgress error. */ @@ -96,6 +103,15 @@ public: const ShardId& fromShardId); /** + * If there are no migrations running on this shard, registers an active split or merge + * operation for the specified namespace and returns a scoped object which will in turn disallow + * other migrations or splits/merges for the same namespace (but not for other namespaces). + */ + StatusWith<ScopedSplitMergeChunk> registerSplitOrMergeChunk(OperationContext* opCtx, + const NamespaceString& nss, + const ChunkRange& chunkRange); + + /** * If a migration has been previously registered through a call to registerDonateChunk, returns * that namespace. Otherwise returns boost::none. */ @@ -112,6 +128,7 @@ public: private: friend class ScopedDonateChunk; friend class ScopedReceiveChunk; + friend class ScopedSplitMergeChunk; // Describes the state of a currently active moveChunk operation struct ActiveMoveChunkState { @@ -150,6 +167,19 @@ private: ShardId fromShardId; }; + // Describes the state of a currently active split or merge operation + struct ActiveSplitMergeChunkState { + ActiveSplitMergeChunkState(NamespaceString inNss, ChunkRange inRange) + : nss(std::move(inNss)), range(std::move(inRange)) {} + + // Namespace for which a chunk is being split or merged + NamespaceString nss; + + // If split, bounds of the chunk being split; if merge, the end bounds of the range being + // merged + ChunkRange range; + }; + /** * Unregisters a previously registered namespace with an ongoing migration. Must only be called * if a previous call to registerDonateChunk has succeeded. @@ -162,10 +192,21 @@ private: */ void _clearReceiveChunk(); + /** + * Unregisters a previously registered split/merge chunk operation. Must only be called if a + * previous call to registerSplitOrMergeChunk has succeeded. + */ + void _clearSplitMergeChunk(const NamespaceString& nss); + // Protects the state below Mutex _mutex = MONGO_MAKE_LATCH("ActiveMigrationsRegistry::_mutex"); - stdx::condition_variable _lockCond; + // Condition variable which will be signaled whenever any of the states below become false, + // boost::none or a specific namespace removed from the map. + stdx::condition_variable _chunkOperationsStateChangedCV; + + // Overarching block, which doesn't allow migrations to occur even when there isn't an active + // migration ongoing. Used during recovery and FCV changes. bool _migrationsBlocked{false}; // If there is an active moveChunk operation, this field contains the original request @@ -173,6 +214,10 @@ private: // If there is an active chunk receive operation, this field contains the original session id boost::optional<ActiveReceiveChunkState> _activeReceiveChunkState; + + // If there is an active split or merge chunk operation for a particular namespace, this map + // will contain an entry + stdx::unordered_map<NamespaceString, ActiveSplitMergeChunkState> _activeSplitMergeChunkStates; }; class MigrationBlockingGuard { @@ -269,4 +314,23 @@ private: ActiveMigrationsRegistry* _registry; }; +/** + * Object of this class is returned from the registerSplitOrMergeChunk call of the active migrations + * registry. + */ +class ScopedSplitMergeChunk { +public: + ScopedSplitMergeChunk(ActiveMigrationsRegistry* registry, const NamespaceString& nss); + ~ScopedSplitMergeChunk(); + + ScopedSplitMergeChunk(ScopedSplitMergeChunk&&); + ScopedSplitMergeChunk& operator=(ScopedSplitMergeChunk&&); + +private: + // Registry from which to unregister the split/merge. Not owned. + ActiveMigrationsRegistry* _registry; + + NamespaceString _nss; +}; + } // namespace mongo diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp index 815536ff78d..f68afa19cac 100644 --- a/src/mongo/db/s/merge_chunks_command.cpp +++ b/src/mongo/db/s/merge_chunks_command.cpp @@ -38,8 +38,8 @@ #include "mongo/db/commands.h" #include "mongo/db/field_parser.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/s/active_migrations_registry.h" #include "mongo/db/s/collection_sharding_runtime.h" -#include "mongo/db/s/dist_lock_manager.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/sharding_state.h" @@ -225,14 +225,9 @@ void mergeChunks(OperationContext* opCtx, const BSONObj& minKey, const BSONObj& maxKey, const OID& expectedEpoch) { - const std::string whyMessage = str::stream() << "merging chunks in " << nss.ns() << " from " - << redact(minKey) << " to " << redact(maxKey); - auto scopedDistLock = uassertStatusOKWithContext( - DistLockManager::get(opCtx)->lock( - opCtx, nss.ns(), whyMessage, DistLockManager::kDefaultLockTimeout), - str::stream() << "could not acquire collection lock for " << nss.ns() - << " to merge chunks in [" << redact(minKey) << ", " << redact(maxKey) - << ")"); + auto scopedSplitOrMergeChunk( + uassertStatusOK(ActiveMigrationsRegistry::get(opCtx).registerSplitOrMergeChunk( + opCtx, nss, ChunkRange(minKey, maxKey)))); const bool isVersioned = OperationShardingState::isOperationVersioned(opCtx); if (!isVersioned) { diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index d51c705a301..fa5dfd08e64 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -469,7 +469,6 @@ env.Library( 'mongos_server_parameters', 'mongos_topology_coordinator', 'query/cluster_cursor_cleanup_job', - 'sessions_collection_sharded', 'sharding_egress_metadata_hook_for_mongos', 'sharding_initialization', 'sharding_router_api', |