diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/commands/set_feature_compatibility_version_command.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/active_migrations_registry.cpp | 38 | ||||
-rw-r--r-- | src/mongo/db/s/active_migrations_registry.h | 23 | ||||
-rw-r--r-- | src/mongo/db/s/active_migrations_registry_test.cpp | 178 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager_legacy_commands.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.h | 2 | ||||
-rw-r--r-- | src/mongo/db/s/move_chunk_command.cpp | 2 |
9 files changed, 190 insertions, 80 deletions
diff --git a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp index b2004f4d3e0..efe1c054aa5 100644 --- a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp +++ b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp @@ -167,11 +167,19 @@ public: CommandHelpers::appendCommandWCStatus(result, waitForWCStatus, res); }); + { + // Acquire the global IX lock and then immediately release it to ensure this operation + // will be killed by the RstlKillOpThread during step-up or stepdown. Note that the + // RstlKillOpThread kills any operations on step-up or stepdown for which + // Locker::wasGlobalLockTakenInModeConflictingWithWrites() returns true. + Lock::GlobalLock lk(opCtx, MODE_IX); + } + // Only allow one instance of setFeatureCompatibilityVersion to run at a time. invariant(!opCtx->lockState()->isLocked()); Lock::ExclusiveLock lk(opCtx->lockState(), FeatureCompatibilityVersion::fcvLock); - MigrationBlockingGuard lock(opCtx, ActiveMigrationsRegistry::get(opCtx)); + MigrationBlockingGuard migrationBlockingGuard(opCtx, "setFeatureCompatibilityVersion"); const auto requestedVersion = uassertStatusOK( FeatureCompatibilityVersionCommandParser::extractVersionFromCommand(getName(), cmdObj)); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 1deb8b11d27..dbab6e4ca84 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -875,7 +875,7 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook // ShardingStateRecovery::recover above, because they may trigger filtering metadata // refreshes which should use the recovered configOpTime. migrationutil::resubmitRangeDeletionsOnStepUp(_service); - migrationutil::resumeMigrationCoordinationsOnStepUp(_service); + migrationutil::resumeMigrationCoordinationsOnStepUp(opCtx); } else { // unsharded if (auto validator = LogicalTimeValidator::get(_service)) { diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp index b5ee1c7850a..d58bcfe9805 100644 --- a/src/mongo/db/s/active_migrations_registry.cpp +++ b/src/mongo/db/s/active_migrations_registry.cpp @@ -27,6 +27,8 @@ * it in the license file. */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingMigration + #include "mongo/platform/basic.h" #include "mongo/db/s/active_migrations_registry.h" @@ -36,6 +38,7 @@ #include "mongo/db/s/migration_session_id.h" #include "mongo/db/s/migration_source_manager.h" #include "mongo/db/service_context.h" +#include "mongo/logv2/log.h" namespace mongo { namespace { @@ -58,7 +61,7 @@ ActiveMigrationsRegistry& ActiveMigrationsRegistry::get(OperationContext* opCtx) return get(opCtx->getServiceContext()); } -void ActiveMigrationsRegistry::lock(OperationContext* opCtx) { +void ActiveMigrationsRegistry::lock(OperationContext* opCtx, StringData reason) { stdx::unique_lock<Latch> lock(_mutex); // This wait is to hold back additional lock requests while there is already one in @@ -66,6 +69,7 @@ void ActiveMigrationsRegistry::lock(OperationContext* opCtx) { opCtx->waitForConditionOrInterrupt(_lockCond, lock, [this] { return !_migrationsBlocked; }); // Setting flag before condvar returns to block new migrations from starting. (Favoring writers) + LOGV2(4675601, "Going to start blocking migrations", "reason"_attr = reason); _migrationsBlocked = true; // Wait for any ongoing migrations to complete. @@ -73,22 +77,23 @@ void ActiveMigrationsRegistry::lock(OperationContext* opCtx) { _lockCond, lock, [this] { return !(_activeMoveChunkState || _activeReceiveChunkState); }); } -void ActiveMigrationsRegistry::unlock() { +void ActiveMigrationsRegistry::unlock(StringData reason) { stdx::lock_guard<Latch> lock(_mutex); + LOGV2(4675602, "Going to stop blocking migrations", "reason"_attr = reason); _migrationsBlocked = false; _lockCond.notify_all(); } StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk( - const MoveChunkRequest& args) { - stdx::lock_guard<Latch> lk(_mutex); + OperationContext* opCtx, const MoveChunkRequest& args) { + stdx::unique_lock<Latch> lk(_mutex); - if (_migrationsBlocked) - return {ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Unable to start new migration because this shard is currently " - "blocking all migrations."}; + if (_migrationsBlocked) { + LOGV2(4675603, "Register donate chunk waiting for migrations to be unblocked"); + opCtx->waitForConditionOrInterrupt(_lockCond, lk, [this] { return !_migrationsBlocked; }); + } if (_activeReceiveChunkState) { return _activeReceiveChunkState->constructErrorStatus(); @@ -108,13 +113,16 @@ StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk( } StatusWith<ScopedReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk( - const NamespaceString& nss, const ChunkRange& chunkRange, const ShardId& fromShardId) { - stdx::lock_guard<Latch> lk(_mutex); - - if (_migrationsBlocked) - return {ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Unable to start new migration because this shard is currently " - "blocking all migrations."}; + OperationContext* opCtx, + const NamespaceString& nss, + const ChunkRange& chunkRange, + const ShardId& fromShardId) { + stdx::unique_lock<Latch> lk(_mutex); + + if (_migrationsBlocked) { + LOGV2(4675604, "Register receive chunk waiting for migrations to be unblocked"); + opCtx->waitForConditionOrInterrupt(_lockCond, lk, [this] { return !_migrationsBlocked; }); + } if (_activeReceiveChunkState) { return _activeReceiveChunkState->constructErrorStatus(); diff --git a/src/mongo/db/s/active_migrations_registry.h b/src/mongo/db/s/active_migrations_registry.h index c0a8029c08b..29b326cf4ef 100644 --- a/src/mongo/db/s/active_migrations_registry.h +++ b/src/mongo/db/s/active_migrations_registry.h @@ -66,8 +66,8 @@ public: * subsequent migration operations will return ConflictingOperationInProgress until the unlock() * method is called. */ - void lock(OperationContext* opCtx); - void unlock(); + void lock(OperationContext* opCtx, StringData reason); + void unlock(StringData reason); /** * If there are no migrations running on this shard, registers an active migration with the @@ -80,7 +80,8 @@ public: * * Otherwise returns a ConflictingOperationInProgress error. */ - StatusWith<ScopedDonateChunk> registerDonateChunk(const MoveChunkRequest& args); + StatusWith<ScopedDonateChunk> registerDonateChunk(OperationContext* opCtx, + const MoveChunkRequest& args); /** * If there are no migrations running on this shard, registers an active receive operation with @@ -89,7 +90,8 @@ public: * * Otherwise returns a ConflictingOperationInProgress error. */ - StatusWith<ScopedReceiveChunk> registerReceiveChunk(const NamespaceString& nss, + StatusWith<ScopedReceiveChunk> registerReceiveChunk(OperationContext* opCtx, + const NamespaceString& nss, const ChunkRange& chunkRange, const ShardId& fromShardId); @@ -175,18 +177,21 @@ private: class MigrationBlockingGuard { public: - MigrationBlockingGuard(OperationContext* opCtx, ActiveMigrationsRegistry& registry) - : _opCtx(opCtx), _registry(registry) { - _registry.lock(_opCtx); + MigrationBlockingGuard(OperationContext* opCtx, std::string reason) + : _registry(ActiveMigrationsRegistry::get(opCtx)), _reason(std::move(reason)) { + // Ensure any thread attempting to use a MigrationBlockingGuard will be interrupted by + // a stepdown. + invariant(opCtx->lockState()->wasGlobalLockTakenInModeConflictingWithWrites()); + _registry.lock(opCtx, _reason); } ~MigrationBlockingGuard() { - _registry.unlock(); + _registry.unlock(_reason); } private: - OperationContext* _opCtx{nullptr}; ActiveMigrationsRegistry& _registry; + std::string _reason; }; /** diff --git a/src/mongo/db/s/active_migrations_registry_test.cpp b/src/mongo/db/s/active_migrations_registry_test.cpp index cd777113159..0f268e23fb0 100644 --- a/src/mongo/db/s/active_migrations_registry_test.cpp +++ b/src/mongo/db/s/active_migrations_registry_test.cpp @@ -79,7 +79,7 @@ MoveChunkRequest createMoveChunkRequest(const NamespaceString& nss) { TEST_F(MoveChunkRegistration, ScopedDonateChunkMoveConstructorAndAssignment) { auto originalScopedDonateChunk = assertGet(_registry.registerDonateChunk( - createMoveChunkRequest(NamespaceString("TestDB", "TestColl")))); + operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl")))); ASSERT(originalScopedDonateChunk.mustExecute()); ScopedDonateChunk movedScopedDonateChunk(std::move(originalScopedDonateChunk)); @@ -98,7 +98,7 @@ TEST_F(MoveChunkRegistration, GetActiveMigrationNamespace) { const NamespaceString nss("TestDB", "TestColl"); auto originalScopedDonateChunk = - assertGet(_registry.registerDonateChunk(createMoveChunkRequest(nss))); + assertGet(_registry.registerDonateChunk(operationContext(), createMoveChunkRequest(nss))); ASSERT_EQ(nss.ns(), _registry.getActiveDonateChunkNss()->ns()); @@ -108,10 +108,10 @@ TEST_F(MoveChunkRegistration, GetActiveMigrationNamespace) { TEST_F(MoveChunkRegistration, SecondMigrationReturnsConflictingOperationInProgress) { auto originalScopedDonateChunk = assertGet(_registry.registerDonateChunk( - createMoveChunkRequest(NamespaceString("TestDB", "TestColl1")))); + operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl1")))); auto secondScopedDonateChunkStatus = _registry.registerDonateChunk( - createMoveChunkRequest(NamespaceString("TestDB", "TestColl2"))); + operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl2"))); ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, secondScopedDonateChunkStatus.getStatus()); @@ -120,11 +120,11 @@ TEST_F(MoveChunkRegistration, SecondMigrationReturnsConflictingOperationInProgre TEST_F(MoveChunkRegistration, SecondMigrationWithSameArgumentsJoinsFirst) { auto originalScopedDonateChunk = assertGet(_registry.registerDonateChunk( - createMoveChunkRequest(NamespaceString("TestDB", "TestColl")))); + operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl")))); ASSERT(originalScopedDonateChunk.mustExecute()); auto secondScopedDonateChunk = assertGet(_registry.registerDonateChunk( - createMoveChunkRequest(NamespaceString("TestDB", "TestColl")))); + operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl")))); ASSERT(!secondScopedDonateChunk.mustExecute()); originalScopedDonateChunk.signalComplete({ErrorCodes::InternalError, "Test error"}); @@ -134,44 +134,121 @@ TEST_F(MoveChunkRegistration, SecondMigrationWithSameArgumentsJoinsFirst) { } TEST_F(MoveChunkRegistration, TestBlockingDonateChunk) { - auto opCtx = operationContext(); + stdx::promise<void> blockDonate; + stdx::promise<void> readyToLock; + stdx::promise<void> inLock; - _registry.lock(opCtx); + // Registry thread. + auto result = stdx::async(stdx::launch::async, [&] { + // 2. Lock the registry so that starting to donate will block. + _registry.lock(operationContext(), "dummy"); + + // 3. Signal the donate thread that the donate is ready to be started. + readyToLock.set_value(); - auto scopedDonateChunk = _registry.registerDonateChunk( - createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))); + // 4. Wait for the donate thread to start blocking because the registry is locked. + blockDonate.get_future().wait(); - ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, scopedDonateChunk.getStatus()); + // 9. Unlock the registry to signal the donate thread. + _registry.unlock("dummy"); + }); - _registry.unlock(); + // Donate thread. + auto lockReleased = stdx::async(stdx::launch::async, [&] { + ThreadClient tc("donate thread", getGlobalServiceContext()); + auto opCtx = tc->makeOperationContext(); - auto scopedDonateChunk2 = _registry.registerDonateChunk( - createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))); - ASSERT_OK(scopedDonateChunk2.getStatus()); + auto baton = opCtx->getBaton(); + baton->schedule([&inLock](Status) { + // 7. This is called when the donate is blocking. We let the test method know + // that we're blocked on the donate so that it can tell the registry thread to unlock + // the registry. + inLock.set_value(); + }); - scopedDonateChunk2.getValue().signalComplete(Status::OK()); + // 5. This is woken up by the registry thread. + readyToLock.get_future().wait(); + + // 6. Now that we're woken up by the registry thread, let's attempt to start to donate. + // This will block and call the lambda set on the baton above. + auto scopedDonateChunk = _registry.registerDonateChunk( + opCtx.get(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))); + + ASSERT_OK(scopedDonateChunk.getStatus()); + scopedDonateChunk.getValue().signalComplete(Status::OK()); + + // 10. Destroy the ScopedDonateChunk and return. + }); + + // 1. Wait for the donate thread to start blocking. + inLock.get_future().wait(); + + // 8. Tell the registry thread to unlock the registry. That will signal the donate thread to + // continue. + blockDonate.set_value(); + + // 11. The donate thread has returned and this future is set. + lockReleased.wait(); } TEST_F(MoveChunkRegistration, TestBlockingReceiveChunk) { - auto opCtx = operationContext(); + stdx::promise<void> blockReceive; + stdx::promise<void> readyToLock; + stdx::promise<void> inLock; + + // Registry thread. + auto result = stdx::async(stdx::launch::async, [&] { + // 2. Lock the registry so that starting to receive will block. + _registry.lock(operationContext(), "dummy"); + + // 3. Signal the receive thread that the receive is ready to be started. + readyToLock.set_value(); + + // 4. Wait for the receive thread to start blocking because the registry is locked. + blockReceive.get_future().wait(); + + // 9. Unlock the registry to signal the receive thread. + _registry.unlock("dummy"); + }); + + // Receive thread. + auto lockReleased = stdx::async(stdx::launch::async, [&] { + ThreadClient tc("receive thread", getGlobalServiceContext()); + auto opCtx = tc->makeOperationContext(); + + auto baton = opCtx->getBaton(); + baton->schedule([&inLock](Status) { + // 7. This is called when the receive is blocking. We let the test method know + // that we're blocked on the receive so that it can tell the registry thread to unlock + // the registry. + inLock.set_value(); + }); - _registry.lock(opCtx); + // 5. This is woken up by the registry thread. + readyToLock.get_future().wait(); - auto scopedReceiveChunk = - _registry.registerReceiveChunk(NamespaceString("TestDB", "TestColl"), - ChunkRange(BSON("Key" << -100), BSON("Key" << 100)), - ShardId("shard0001")); + // 6. Now that we're woken up by the registry thread, let's attempt to start to receive. + // This will block and call the lambda set on the baton above. + auto scopedReceiveChunk = + _registry.registerReceiveChunk(opCtx.get(), + NamespaceString("TestDB", "TestColl"), + ChunkRange(BSON("Key" << -100), BSON("Key" << 100)), + ShardId("shard0001")); - ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, scopedReceiveChunk.getStatus()); + ASSERT_OK(scopedReceiveChunk.getStatus()); - _registry.unlock(); + // 10. Destroy the ScopedReceiveChunk and return. + }); - auto scopedReceiveChunk2 = - _registry.registerReceiveChunk(NamespaceString("TestDB", "TestColl"), - ChunkRange(BSON("Key" << -100), BSON("Key" << 100)), - ShardId("shard0001")); + // 1. Wait for the receive thread to start blocking. + inLock.get_future().wait(); - ASSERT_OK(scopedReceiveChunk2.getStatus()); + // 8. Tell the registry thread to unlock the registry. That will signal the receive thread to + // continue. + blockReceive.set_value(); + + // 11. The receive thread has returned and this future is set. + lockReleased.wait(); } // This test validates that the ActiveMigrationsRegistry lock will block while there is a donation @@ -186,17 +263,18 @@ TEST_F(MoveChunkRegistration, TestBlockingWhileDonateInProgress) { auto result = stdx::async(stdx::launch::async, [&] { // 2. Start a migration so that the registry lock will block when acquired. auto scopedDonateChunk = _registry.registerDonateChunk( - createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))); + operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))); ASSERT_OK(scopedDonateChunk.getStatus()); // 3. Signal the registry locking thread that the registry is ready to be locked. readyToLock.set_value(); + // 4. Wait for the registry thread to start blocking because there is an active donate. blockDonate.get_future().wait(); scopedDonateChunk.getValue().signalComplete(Status::OK()); - // 8. Destroy the ScopedDonateChunk to signal the registy lock. + // 9. Destroy the ScopedDonateChunk to signal the registy lock. }); // Registry locking thread. @@ -206,31 +284,31 @@ TEST_F(MoveChunkRegistration, TestBlockingWhileDonateInProgress) { auto baton = opCtx->getBaton(); baton->schedule([&inLock](Status) { - // 6. This is called when the registry lock is blocking. We let the test method know + // 7. This is called when the registry lock is blocking. We let the test method know // that we're blocked on the registry lock so that it tell the migration thread to let // the donate operation complete. inLock.set_value(); }); - // 4. This is woken up by the migration thread. + // 5. This is woken up by the migration thread. readyToLock.get_future().wait(); - // 5. Now that we're woken up by the migration thread, let's attempt to lock the registry. + // 6. Now that we're woken up by the migration thread, let's attempt to lock the registry. // This will block and call the lambda set on the baton above. - _registry.lock(opCtx.get()); + _registry.lock(opCtx.get(), "dummy"); - // 9. Unlock the registry and return. - _registry.unlock(); + // 10. Unlock the registry and return. + _registry.unlock("dummy"); }); // 1. Wait for registry lock to be acquired. inLock.get_future().wait(); - // 7. Let the donate operation complete so that the ScopedDonateChunk is destroyed. That will + // 8. Let the donate operation complete so that the ScopedDonateChunk is destroyed. That will // signal the registry lock. blockDonate.set_value(); - // 10. The registy locking thread has returned and this future is set. + // 11. The registy locking thread has returned and this future is set. lockReleased.wait(); } @@ -246,7 +324,8 @@ TEST_F(MoveChunkRegistration, TestBlockingWhileReceiveInProgress) { auto result = stdx::async(stdx::launch::async, [&] { // 2. Start a migration so that the registry lock will block when acquired. auto scopedReceiveChunk = - _registry.registerReceiveChunk(NamespaceString("TestDB", "TestColl"), + _registry.registerReceiveChunk(operationContext(), + NamespaceString("TestDB", "TestColl"), ChunkRange(BSON("Key" << -100), BSON("Key" << 100)), ShardId("shard0001")); ASSERT_OK(scopedReceiveChunk.getStatus()); @@ -254,9 +333,10 @@ TEST_F(MoveChunkRegistration, TestBlockingWhileReceiveInProgress) { // 3. Signal the registry locking thread that the registry is ready to be locked. readyToLock.set_value(); + // 4. Wait for the registry thread to start blocking because there is an active receive. blockReceive.get_future().wait(); - // 8. Destroy the scopedReceiveChunk to signal the registy lock. + // 9. Destroy the scopedReceiveChunk to signal the registy lock. }); // Registry locking thread. @@ -266,31 +346,31 @@ TEST_F(MoveChunkRegistration, TestBlockingWhileReceiveInProgress) { auto baton = opCtx->getBaton(); baton->schedule([&inLock](Status) { - // 6. This is called when the registry lock is blocking. We let the test method know + // 7. This is called when the registry lock is blocking. We let the test method know // that we're blocked on the registry lock so that it tell the migration thread to let // the receive operation complete. inLock.set_value(); }); - // 4. This is woken up by the migration thread. + // 5. This is woken up by the migration thread. readyToLock.get_future().wait(); - // 5. Now that we're woken up by the migration thread, let's attempt to lock the registry. + // 6. Now that we're woken up by the migration thread, let's attempt to lock the registry. // This will block and call the lambda set on the baton above. - _registry.lock(opCtx.get()); + _registry.lock(opCtx.get(), "dummy"); - // 9. Unlock the registry and return. - _registry.unlock(); + // 10. Unlock the registry and return. + _registry.unlock("dummy"); }); // 1. Wait for registry lock to be acquired. inLock.get_future().wait(); - // 7. Let the receive operation complete so that the scopedReceiveChunk is destroyed. That will + // 8. Let the receive operation complete so that the scopedReceiveChunk is destroyed. That will // signal the registry lock. blockReceive.set_value(); - // 10. The registy locking thread has returned and this future is set. + // 11. The registy locking thread has returned and this future is set. lockReleased.wait(); } diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp index 84064273757..4144291e7cd 100644 --- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp +++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp @@ -105,7 +105,7 @@ public: // Ensure this shard is not currently receiving or donating any chunks. auto scopedReceiveChunk( uassertStatusOK(ActiveMigrationsRegistry::get(opCtx).registerReceiveChunk( - nss, chunkRange, cloneRequest.getFromShardId()))); + opCtx, nss, chunkRange, cloneRequest.getFromShardId()))); // We force a refresh immediately after registering this migration to guarantee that this // shard will not receive a chunk after refreshing. diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index 6b4537ede2b..36b9c7b1177 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -735,9 +735,18 @@ void refreshFilteringMetadataUntilSuccess(OperationContext* opCtx, const Namespa }); } -void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) { +void resumeMigrationCoordinationsOnStepUp(OperationContext* opCtx) { LOGV2(22037, "Starting migration coordinator stepup recovery thread."); + // Don't allow migrations to start until the recovery is complete. Otherwise, the + // migration may end up inserting a migrationCoordinator doc that the recovery thread + // reads and attempts to recovery the decision for by bumping the chunkVersion, which + // will cause the migration to abort on trying to commit anyway. + // Store it as shared_ptr so that it can be captured in the async recovery task below. + const auto migrationBlockingGuard = + std::make_shared<MigrationBlockingGuard>(opCtx, "migration coordinator stepup recovery"); + + const auto serviceContext = opCtx->getServiceContext(); ExecutorFuture<void>(getMigrationUtilExecutor()) .then([serviceContext] { ThreadClient tc("MigrationCoordinatorStepupRecovery", serviceContext); @@ -864,7 +873,7 @@ void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) { ShardingStatistics::get(opCtx).unfinishedMigrationFromPreviousPrimary.store( migrationRecoveryCount); }) - .getAsync([](const Status& status) { + .getAsync([migrationBlockingGuard](const Status& status) { if (!status.isOK()) { LOGV2(22041, "Failed to resume coordinating migrations on stepup {causedBy_status}", diff --git a/src/mongo/db/s/migration_util.h b/src/mongo/db/s/migration_util.h index caaf750d3fb..aa971a17e00 100644 --- a/src/mongo/db/s/migration_util.h +++ b/src/mongo/db/s/migration_util.h @@ -203,7 +203,7 @@ void refreshFilteringMetadataUntilSuccess(OperationContext* opCtx, const Namespa * Submits an asynchronous task to scan config.migrationCoordinators and drive each unfinished * migration coordination to completion. */ -void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext); +void resumeMigrationCoordinationsOnStepUp(OperationContext* opCtx); } // namespace migrationutil } // namespace mongo diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp index 63f1dd13507..60a7db4161c 100644 --- a/src/mongo/db/s/move_chunk_command.cpp +++ b/src/mongo/db/s/move_chunk_command.cpp @@ -135,7 +135,7 @@ public: Grid::get(opCtx)->shardRegistry()->reload(opCtx); auto scopedMigration = uassertStatusOK( - ActiveMigrationsRegistry::get(opCtx).registerDonateChunk(moveChunkRequest)); + ActiveMigrationsRegistry::get(opCtx).registerDonateChunk(opCtx, moveChunkRequest)); // Check if there is an existing migration running and if so, join it if (scopedMigration.mustExecute()) { |