From 44fe191f411ef26c982707f4c0577216e79a178b Mon Sep 17 00:00:00 2001 From: Paolo Polato Date: Mon, 18 Jul 2022 08:11:05 +0000 Subject: SERVER-67729 Reject new migrations when the ActiveMigrationsRegistry is locked --- src/mongo/db/s/active_migrations_registry.cpp | 37 ++++++++++++++----- src/mongo/db/s/active_migrations_registry.h | 18 +++++----- src/mongo/db/s/active_migrations_registry_test.cpp | 42 +++++++++++++--------- ...gration_destination_manager_legacy_commands.cpp | 6 +++- src/mongo/db/s/migration_util.cpp | 2 +- 5 files changed, 70 insertions(+), 35 deletions(-) diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp index 55b0466fa3f..61646702239 100644 --- a/src/mongo/db/s/active_migrations_registry.cpp +++ b/src/mongo/db/s/active_migrations_registry.cpp @@ -65,6 +65,8 @@ ActiveMigrationsRegistry& ActiveMigrationsRegistry::get(OperationContext* opCtx) } void ActiveMigrationsRegistry::lock(OperationContext* opCtx, StringData reason) { + // The method requires the requesting operation to be interruptible + invariant(opCtx->shouldAlwaysInterruptAtStepDownOrUp()); stdx::unique_lock lock(_mutex); // This wait is to hold back additional lock requests while there is already one in progress @@ -82,6 +84,17 @@ void ActiveMigrationsRegistry::lock(OperationContext* opCtx, StringData reason) return !(_activeMoveChunkState || _activeReceiveChunkState); }); + // lock() may be called while the node is still completing its draining mode; if so, reject the + // request with a retriable error and allow the draining mode to invoke registerReceiveChunk() + // as part of its recovery sequence. + { + AutoGetDb autoDB(opCtx, NamespaceString::kAdminDb, MODE_IS); + uassert(ErrorCodes::NotWritablePrimary, + "Cannot lock the registry while the node is in draining mode", + repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase( + opCtx, NamespaceString::kAdminDb)); + } + unblockMigrationsOnError.dismiss(); } @@ -99,8 +112,7 @@ StatusWith ActiveMigrationsRegistry::registerDonateChunk( stdx::unique_lock ul(_mutex); opCtx->waitForConditionOrInterrupt(_chunkOperationsStateChangedCV, ul, [&] { - return !_migrationsBlocked && - !_activeSplitMergeChunkStates.count(args.getCommandParameter()); + return !_activeSplitMergeChunkStates.count(args.getCommandParameter()); }); if (_activeReceiveChunkState) { @@ -129,6 +141,12 @@ StatusWith ActiveMigrationsRegistry::registerDonateChunk( return _activeMoveChunkState->constructErrorStatus(); } + if (_migrationsBlocked) { + return {ErrorCodes::ConflictingOperationInProgress, + "Unable to start new balancer operation because the ActiveMigrationsRegistry of " + "this shard is temporarily locked"}; + } + _activeMoveChunkState.emplace(args); return {ScopedDonateChunk(this, true, _activeMoveChunkState->notification)}; @@ -139,17 +157,14 @@ StatusWith ActiveMigrationsRegistry::registerReceiveChunk( const NamespaceString& nss, const ChunkRange& chunkRange, const ShardId& fromShardId, - bool waitForOngoingMigrations) { + bool waitForCompletionOfConflictingOps) { stdx::unique_lock ul(_mutex); - if (waitForOngoingMigrations) { + if (waitForCompletionOfConflictingOps) { opCtx->waitForConditionOrInterrupt(_chunkOperationsStateChangedCV, ul, [this] { return !_migrationsBlocked && !_activeMoveChunkState && !_activeReceiveChunkState; }); } else { - opCtx->waitForConditionOrInterrupt( - _chunkOperationsStateChangedCV, ul, [this] { return !_migrationsBlocked; }); - if (_activeReceiveChunkState) { return _activeReceiveChunkState->constructErrorStatus(); } @@ -161,10 +176,16 @@ StatusWith ActiveMigrationsRegistry::registerReceiveChunk( "runningMigration"_attr = _activeMoveChunkState->args.toBSON({})); return _activeMoveChunkState->constructErrorStatus(); } + + if (_migrationsBlocked) { + return { + ErrorCodes::ConflictingOperationInProgress, + "Unable to start new balancer operation because the ActiveMigrationsRegistry of " + "this shard is temporarily locked"}; + } } _activeReceiveChunkState.emplace(nss, chunkRange, fromShardId); - return {ScopedReceiveChunk(this)}; } diff --git a/src/mongo/db/s/active_migrations_registry.h b/src/mongo/db/s/active_migrations_registry.h index 4d75a6845b0..ea61210782a 100644 --- a/src/mongo/db/s/active_migrations_registry.h +++ b/src/mongo/db/s/active_migrations_registry.h @@ -91,20 +91,22 @@ public: const ShardsvrMoveRange& args); /** - * If there are no migrations or split/merges running on this shard, registers an active receive - * operation with the specified session id and returns a ScopedReceiveChunk. The - * ScopedReceiveChunk will unregister the migration when the ScopedReceiveChunk goes out of - * scope. + * Registers an active receive chunk operation with the specified session id and returns a + * ScopedReceiveChunk. The returned ScopedReceiveChunk object will unregister the migration when + * it goes out of scope. * - * Otherwise returns a ConflictingOperationInProgress error if waitForOngoingMigrations is false - * or waits for the ongoing migration/split/merge to finish and then registers the migration if - * waitForOngoingMigrations is true. + * In case registerReceiveChunk() is called while other operations (a second migration or a + * registry lock()) are already holding resources of the ActiveMigrationsRegistry, the function + * will either + * - wait for such operations to complete and then perform the registration + * - return a ConflictingOperationInProgress error + * based on the value of the waitForCompletionOfConflictingOps parameter */ StatusWith registerReceiveChunk(OperationContext* opCtx, const NamespaceString& nss, const ChunkRange& chunkRange, const ShardId& fromShardId, - bool waitForOngoingMigrations); + bool waitForCompletionOfConflictingOps); /** * If there are no migrations running on this shard, registers an active split or merge diff --git a/src/mongo/db/s/active_migrations_registry_test.cpp b/src/mongo/db/s/active_migrations_registry_test.cpp index bc285b35f81..acdd2bfaf38 100644 --- a/src/mongo/db/s/active_migrations_registry_test.cpp +++ b/src/mongo/db/s/active_migrations_registry_test.cpp @@ -33,7 +33,7 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/client.h" #include "mongo/db/s/active_migrations_registry.h" -#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/db/s/shard_server_test_fixture.h" #include "mongo/s/commands/cluster_commands_gen.h" #include "mongo/stdx/future.h" #include "mongo/unittest/unittest.h" @@ -43,19 +43,17 @@ namespace { using unittest::assertGet; -class MoveChunkRegistration : public ServiceContextMongoDTest { +class MoveChunkRegistration : public ShardServerTestFixture { public: void setUp() override { - _opCtx = getClient()->makeOperationContext(); - } - - OperationContext* operationContext() { - return _opCtx.get(); + ShardServerTestFixture::setUp(); + _opCtx = operationContext(); + _opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); } protected: ActiveMigrationsRegistry _registry; - ServiceContext::UniqueOperationContext _opCtx; + OperationContext* _opCtx; }; ShardsvrMoveRange createMoveRangeRequest(const NamespaceString& nss, @@ -73,7 +71,7 @@ ShardsvrMoveRange createMoveRangeRequest(const NamespaceString& nss, TEST_F(MoveChunkRegistration, ScopedDonateChunkMoveConstructorAndAssignment) { auto originalScopedDonateChunk = assertGet(_registry.registerDonateChunk( - operationContext(), createMoveRangeRequest(NamespaceString("TestDB", "TestColl")))); + _opCtx, createMoveRangeRequest(NamespaceString("TestDB", "TestColl")))); ASSERT(originalScopedDonateChunk.mustExecute()); ScopedDonateChunk movedScopedDonateChunk(std::move(originalScopedDonateChunk)); @@ -144,7 +142,11 @@ TEST_F(MoveChunkRegistration, TestBlockingDonateChunk) { // Registry thread. auto result = stdx::async(stdx::launch::async, [&] { // 2. Lock the registry so that starting to donate will block. - _registry.lock(operationContext(), "dummy"); + ThreadClient tc("ActiveMigrationsRegistryTest", getGlobalServiceContext()); + auto opCtxHolder = tc->makeOperationContext(); + opCtxHolder->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); + + _registry.lock(opCtxHolder.get(), "dummy"); // 3. Signal the donate thread that the donate is ready to be started. readyToLock.set_value(); @@ -202,7 +204,11 @@ TEST_F(MoveChunkRegistration, TestBlockingReceiveChunk) { // Registry thread. auto result = stdx::async(stdx::launch::async, [&] { // 2. Lock the registry so that starting to receive will block. - _registry.lock(operationContext(), "dummy"); + ThreadClient tc("ActiveMigrationsRegistryTest", getGlobalServiceContext()); + auto opCtxHolder = tc->makeOperationContext(); + opCtxHolder->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); + + _registry.lock(opCtxHolder.get(), "dummy"); // 3. Signal the receive thread that the receive is ready to be started. readyToLock.set_value(); @@ -284,9 +290,10 @@ TEST_F(MoveChunkRegistration, TestBlockingWhileDonateInProgress) { // Registry locking thread. auto lockReleased = stdx::async(stdx::launch::async, [&] { ThreadClient tc("ActiveMigrationsRegistryTest", getGlobalServiceContext()); - auto opCtx = tc->makeOperationContext(); + auto opCtxHolder = tc->makeOperationContext(); + opCtxHolder->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); - auto baton = opCtx->getBaton(); + auto baton = opCtxHolder->getBaton(); baton->schedule([&inLock](Status) { // 7. This is called when the registry lock is blocking. We let the test method know // that we're blocked on the registry lock so that it tell the migration thread to let @@ -299,7 +306,7 @@ TEST_F(MoveChunkRegistration, TestBlockingWhileDonateInProgress) { // 6. Now that we're woken up by the migration thread, let's attempt to lock the registry. // This will block and call the lambda set on the baton above. - _registry.lock(opCtx.get(), "dummy"); + _registry.lock(opCtxHolder.get(), "dummy"); // 10. Unlock the registry and return. _registry.unlock("dummy"); @@ -347,9 +354,10 @@ TEST_F(MoveChunkRegistration, TestBlockingWhileReceiveInProgress) { // Registry locking thread. auto lockReleased = stdx::async(stdx::launch::async, [&] { ThreadClient tc("ActiveMigrationsRegistryTest", getGlobalServiceContext()); - auto opCtx = tc->makeOperationContext(); + auto opCtxHolder = tc->makeOperationContext(); + opCtxHolder->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); - auto baton = opCtx->getBaton(); + auto baton = opCtxHolder->getBaton(); baton->schedule([&inLock](Status) { // 7. This is called when the registry lock is blocking. We let the test method know // that we're blocked on the registry lock so that it tell the migration thread to let @@ -362,7 +370,7 @@ TEST_F(MoveChunkRegistration, TestBlockingWhileReceiveInProgress) { // 6. Now that we're woken up by the migration thread, let's attempt to lock the registry. // This will block and call the lambda set on the baton above. - _registry.lock(opCtx.get(), "dummy"); + _registry.lock(opCtxHolder.get(), "dummy"); // 10. Unlock the registry and return. _registry.unlock("dummy"); diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp index a3b59a4c6cc..1959befa719 100644 --- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp +++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp @@ -120,7 +120,11 @@ public: // Ensure this shard is not currently receiving or donating any chunks. auto scopedReceiveChunk( uassertStatusOK(ActiveMigrationsRegistry::get(opCtx).registerReceiveChunk( - opCtx, nss, chunkRange, cloneRequest.getFromShardId(), false))); + opCtx, + nss, + chunkRange, + cloneRequest.getFromShardId(), + false /* waitForCompletionOfConflictingOps*/))); // We force a refresh immediately after registering this migration to guarantee that this // shard will not receive a chunk after refreshing. diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index e71dfe04d89..f0d85e91d0a 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -1243,7 +1243,7 @@ void resumeMigrationRecipientsOnStepUp(OperationContext* opCtx) { nss, doc.getRange(), doc.getDonorShardIdForLoggingPurposesOnly(), - true /* waitForOngoingMigrations */))); + true /* waitForCompletionOfConflictingOps */))); const auto mdm = MigrationDestinationManager::get(opCtx); uassertStatusOK( -- cgit v1.2.1