diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2016-09-28 14:05:36 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2016-09-29 14:31:59 -0400 |
commit | ba3dc420fe41f9c6ba718da5eb270161374839f3 (patch) | |
tree | d8eb3e68dd62bbc2bb0b982945a62e7d892e37b4 /src | |
parent | fd94476eb6ca0207dd69662d36f93eeaaa227073 (diff) | |
download | mongo-ba3dc420fe41f9c6ba718da5eb270161374839f3.tar.gz |
SERVER-26370 Register incoming migrations on the ActiveMigrationsRegistry
In order to avoid race conditions where a shard could serve both as a
donor and recipient.
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/active_migrations_registry.cpp | 59 | ||||
-rw-r--r-- | src/mongo/db/s/active_migrations_registry.h | 49 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager_legacy_commands.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.h | 9 |
7 files changed, 136 insertions, 20 deletions
diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp index 35a765a89c2..d455da49096 100644 --- a/src/mongo/db/s/active_migrations_registry.cpp +++ b/src/mongo/db/s/active_migrations_registry.cpp @@ -33,6 +33,7 @@ #include "mongo/base/status_with.h" #include "mongo/db/db_raii.h" #include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/migration_session_id.h" #include "mongo/db/s/migration_source_manager.h" #include "mongo/util/assert_util.h" @@ -47,6 +48,13 @@ ActiveMigrationsRegistry::~ActiveMigrationsRegistry() { StatusWith<ScopedRegisterDonateChunk> ActiveMigrationsRegistry::registerDonateChunk( const MoveChunkRequest& args) { stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_activeReceiveChunkState) { + return {ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Unable to start new migration because this shard is currently " + "donating chunk for " + << _activeReceiveChunkState->nss.ns()}; + } + if (!_activeMoveChunkState) { _activeMoveChunkState.emplace(args); return {ScopedRegisterDonateChunk(this, true, _activeMoveChunkState->notification)}; @@ -63,6 +71,28 @@ StatusWith<ScopedRegisterDonateChunk> ActiveMigrationsRegistry::registerDonateCh << _activeMoveChunkState->args.getNss().ns()}; } +StatusWith<ScopedRegisterReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk( + const NamespaceString& nss) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_activeMoveChunkState) { + return {ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Unable to start new migration because this shard is currently " + "donating chunk for " + << _activeMoveChunkState->args.getNss().ns()}; + } + + if (_activeReceiveChunkState) { + return {ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Unable to start new migration because this shard is currently " + "donating chunk for " + << _activeReceiveChunkState->nss.ns()}; + } + + _activeReceiveChunkState.emplace(nss); + + return {ScopedRegisterReceiveChunk(this)}; +} + boost::optional<NamespaceString> ActiveMigrationsRegistry::getActiveDonateChunkNss() { stdx::lock_guard<stdx::mutex> lk(_mutex); if (_activeMoveChunkState) { @@ -105,6 +135,12 @@ void ActiveMigrationsRegistry::_clearDonateChunk() { _activeMoveChunkState.reset(); } +void ActiveMigrationsRegistry::_clearReceiveChunk() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_activeReceiveChunkState); + _activeReceiveChunkState.reset(); +} + ScopedRegisterDonateChunk::ScopedRegisterDonateChunk( ActiveMigrationsRegistry* registry, bool forUnregister, @@ -146,4 +182,27 @@ Status ScopedRegisterDonateChunk::waitForCompletion(OperationContext* txn) { return _completionNotification->get(txn); } +ScopedRegisterReceiveChunk::ScopedRegisterReceiveChunk(ActiveMigrationsRegistry* registry) + : _registry(registry) {} + +ScopedRegisterReceiveChunk::~ScopedRegisterReceiveChunk() { + if (_registry) { + _registry->_clearReceiveChunk(); + } +} + +ScopedRegisterReceiveChunk::ScopedRegisterReceiveChunk(ScopedRegisterReceiveChunk&& other) { + *this = std::move(other); +} + +ScopedRegisterReceiveChunk& ScopedRegisterReceiveChunk::operator=( + ScopedRegisterReceiveChunk&& other) { + if (&other != this) { + _registry = other._registry; + other._registry = nullptr; + } + + return *this; +} + } // namespace mongo diff --git a/src/mongo/db/s/active_migrations_registry.h b/src/mongo/db/s/active_migrations_registry.h index 06ffeaece07..e2f7ad06b8b 100644 --- a/src/mongo/db/s/active_migrations_registry.h +++ b/src/mongo/db/s/active_migrations_registry.h @@ -31,6 +31,7 @@ #include <boost/optional.hpp> #include "mongo/base/disallow_copying.h" +#include "mongo/db/s/migration_session_id.h" #include "mongo/s/move_chunk_request.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" @@ -40,6 +41,7 @@ namespace mongo { class OperationContext; class ScopedRegisterDonateChunk; +class ScopedRegisterReceiveChunk; template <typename T> class StatusWith; @@ -68,6 +70,15 @@ public: StatusWith<ScopedRegisterDonateChunk> registerDonateChunk(const MoveChunkRequest& args); /** + * If there are no migrations running on this shard, registers an active receive operation with + * the specified session id and returns a ScopedRegisterReceiveChunk, which will unregister it + * when it goes out of scope. + * + * Otherwise returns a ConflictingOperationInProgress error. + */ + StatusWith<ScopedRegisterReceiveChunk> registerReceiveChunk(const NamespaceString& nss); + + /** * If a migration has been previously registered through a call to registerDonateChunk returns * that namespace. Otherwise returns boost::none. */ @@ -83,6 +94,7 @@ public: private: friend class ScopedRegisterDonateChunk; + friend class ScopedRegisterReceiveChunk; // Describes the state of a currently active moveChunk operation struct ActiveMoveChunkState { @@ -96,18 +108,36 @@ private: std::shared_ptr<Notification<Status>> notification; }; + // Describes the state of a currently active receive chunk operation + struct ActiveReceiveChunkState { + ActiveReceiveChunkState(NamespaceString inNss) : nss(std::move(inNss)) {} + + // Namesspace for which a chunk is being received + NamespaceString nss; + }; + /** * Unregisters a previously registered namespace with ongoing migration. Must only be called if * a previous call to registerDonateChunk has succeeded. */ void _clearDonateChunk(); + /** + * Unregisters a previously registered incoming migration. Must only be called if a previous + * call to registerReceiveChunk has succeeded. + */ + void _clearReceiveChunk(); + // Protects the state below stdx::mutex _mutex; // If there is an active moveChunk operation going on, this field contains the request, which // initiated it boost::optional<ActiveMoveChunkState> _activeMoveChunkState; + + // If there is an active receive of a chunk going on, this field contains the session id, which + // initiated it + boost::optional<ActiveReceiveChunkState> _activeReceiveChunkState; }; /** @@ -159,4 +189,23 @@ private: std::shared_ptr<Notification<Status>> _completionNotification; }; +/** + * Object of this class is returned from the registerReceiveChunk call of the active migrations + * registry. + */ +class ScopedRegisterReceiveChunk { + MONGO_DISALLOW_COPYING(ScopedRegisterReceiveChunk); + +public: + ScopedRegisterReceiveChunk(ActiveMigrationsRegistry* registry); + ~ScopedRegisterReceiveChunk(); + + ScopedRegisterReceiveChunk(ScopedRegisterReceiveChunk&&); + ScopedRegisterReceiveChunk& operator=(ScopedRegisterReceiveChunk&&); + +private: + // Registry from which to unregister the migration. Not owned. + ActiveMigrationsRegistry* _registry; +}; + } // namespace mongo diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 8110f684766..83f0e74c289 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -263,6 +263,7 @@ BSONObj MigrationDestinationManager::getMigrationStatusReport() { } Status MigrationDestinationManager::start(const NamespaceString& nss, + ScopedRegisterReceiveChunk scopedRegisterReceiveChunk, const MigrationSessionId& sessionId, const ConnectionString& fromShardConnString, const ShardId& fromShard, @@ -273,19 +274,8 @@ Status MigrationDestinationManager::start(const NamespaceString& nss, const OID& epoch, const WriteConcernOptions& writeConcern) { stdx::lock_guard<stdx::mutex> lk(_mutex); - - if (_sessionId) { - return Status(ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Active migration already in progress " - << "ns: " - << _nss.ns() - << ", from: " - << _fromShardConnString.toString() - << ", min: " - << _min - << ", max: " - << _max); - } + invariant(!_sessionId); + invariant(!_scopedRegisterReceiveChunk); _state = READY; _errmsg = ""; @@ -306,6 +296,8 @@ Status MigrationDestinationManager::start(const NamespaceString& nss, _numSteady = 0; _sessionId = sessionId; + _scopedRegisterReceiveChunk = std::move(scopedRegisterReceiveChunk); + // TODO: If we are here, the migrate thread must have completed, otherwise _active above would // be false, so this would never block. There is no better place with the current implementation @@ -432,7 +424,8 @@ void MigrationDestinationManager::_migrateThread(BSONObj min, } stdx::lock_guard<stdx::mutex> lk(_mutex); - _sessionId = boost::none; + _sessionId.reset(); + _scopedRegisterReceiveChunk.reset(); _isActiveCV.notify_all(); } @@ -445,6 +438,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, const WriteConcernOptions& writeConcern) { invariant(isActive()); invariant(_sessionId); + invariant(_scopedRegisterReceiveChunk); invariant(!min.isEmpty()); invariant(!max.isEmpty()); diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index 804ec2bf35a..0b16202d55e 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -36,6 +36,7 @@ #include "mongo/bson/oid.h" #include "mongo/client/connection_string.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/s/active_migrations_registry.h" #include "mongo/db/s/migration_session_id.h" #include "mongo/s/shard_id.h" #include "mongo/stdx/condition_variable.h" @@ -88,6 +89,7 @@ public: * Returns OK if migration started successfully. */ Status start(const NamespaceString& nss, + ScopedRegisterReceiveChunk scopedRegisterReceiveChunk, const MigrationSessionId& sessionId, const ConnectionString& fromShardConnString, const ShardId& fromShard, @@ -193,6 +195,7 @@ private: // Migration session ID uniquely identifies the migration and indicates whether the prepare // method has been called. boost::optional<MigrationSessionId> _sessionId; + boost::optional<ScopedRegisterReceiveChunk> _scopedRegisterReceiveChunk; // A condition variable on which to wait for the prepare method to be called. stdx::condition_variable _isActiveCV; diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp index 4216b30ef5e..7ea7ff56be1 100644 --- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp +++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp @@ -166,8 +166,11 @@ public: const MigrationSessionId migrationSessionId( uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj))); + auto scopedRegisterReceiveChunk(uassertStatusOK(shardingState->registerReceiveChunk(nss))); + uassertStatusOK(shardingState->migrationDestinationManager()->start( nss, + std::move(scopedRegisterReceiveChunk), migrationSessionId, statusWithFromShardConnectionString.getValue(), fromShard, diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index 8250ed4021f..a789b4ca3cf 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -733,15 +733,14 @@ StatusWith<ChunkVersion> ShardingState::_refreshMetadata( StatusWith<ScopedRegisterDonateChunk> ShardingState::registerDonateChunk( const MoveChunkRequest& args) { - if (_migrationDestManager.isActive()) { - return {ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Unable to start new migration because this shard is currently " - "receiving a chunk"}; - } - return _activeMigrationsRegistry.registerDonateChunk(args); } +StatusWith<ScopedRegisterReceiveChunk> ShardingState::registerReceiveChunk( + const NamespaceString& nss) { + return _activeMigrationsRegistry.registerReceiveChunk(nss); +} + boost::optional<NamespaceString> ShardingState::getActiveDonateChunkNss() { return _activeMigrationsRegistry.getActiveDonateChunkNss(); } diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index 8e8f0c20074..9cf94cd4526 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -214,6 +214,15 @@ public: StatusWith<ScopedRegisterDonateChunk> registerDonateChunk(const MoveChunkRequest& args); /** + * If there are no migrations running on this shard, registers an active receive operation with + * the specified session id and returns a ScopedRegisterReceiveChunk, which will unregister it + * when it goes out of scope. + * + * Otherwise returns a ConflictingOperationInProgress error. + */ + StatusWith<ScopedRegisterReceiveChunk> registerReceiveChunk(const NamespaceString& nss); + + /** * If a migration has been previously registered through a call to registerDonateChunk returns * that namespace. Otherwise returns boost::none. * |