summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands/set_feature_compatibility_version_command.cpp10
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp2
-rw-r--r--src/mongo/db/s/active_migrations_registry.cpp38
-rw-r--r--src/mongo/db/s/active_migrations_registry.h23
-rw-r--r--src/mongo/db/s/active_migrations_registry_test.cpp178
-rw-r--r--src/mongo/db/s/migration_destination_manager_legacy_commands.cpp2
-rw-r--r--src/mongo/db/s/migration_util.cpp13
-rw-r--r--src/mongo/db/s/migration_util.h2
-rw-r--r--src/mongo/db/s/move_chunk_command.cpp2
9 files changed, 190 insertions, 80 deletions
diff --git a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp
index b2004f4d3e0..efe1c054aa5 100644
--- a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp
+++ b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp
@@ -167,11 +167,19 @@ public:
CommandHelpers::appendCommandWCStatus(result, waitForWCStatus, res);
});
+ {
+ // Acquire the global IX lock and then immediately release it to ensure this operation
+ // will be killed by the RstlKillOpThread during step-up or stepdown. Note that the
+ // RstlKillOpThread kills any operations on step-up or stepdown for which
+ // Locker::wasGlobalLockTakenInModeConflictingWithWrites() returns true.
+ Lock::GlobalLock lk(opCtx, MODE_IX);
+ }
+
// Only allow one instance of setFeatureCompatibilityVersion to run at a time.
invariant(!opCtx->lockState()->isLocked());
Lock::ExclusiveLock lk(opCtx->lockState(), FeatureCompatibilityVersion::fcvLock);
- MigrationBlockingGuard lock(opCtx, ActiveMigrationsRegistry::get(opCtx));
+ MigrationBlockingGuard migrationBlockingGuard(opCtx, "setFeatureCompatibilityVersion");
const auto requestedVersion = uassertStatusOK(
FeatureCompatibilityVersionCommandParser::extractVersionFromCommand(getName(), cmdObj));
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 1deb8b11d27..dbab6e4ca84 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -875,7 +875,7 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
// ShardingStateRecovery::recover above, because they may trigger filtering metadata
// refreshes which should use the recovered configOpTime.
migrationutil::resubmitRangeDeletionsOnStepUp(_service);
- migrationutil::resumeMigrationCoordinationsOnStepUp(_service);
+ migrationutil::resumeMigrationCoordinationsOnStepUp(opCtx);
} else { // unsharded
if (auto validator = LogicalTimeValidator::get(_service)) {
diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp
index b5ee1c7850a..d58bcfe9805 100644
--- a/src/mongo/db/s/active_migrations_registry.cpp
+++ b/src/mongo/db/s/active_migrations_registry.cpp
@@ -27,6 +27,8 @@
* it in the license file.
*/
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingMigration
+
#include "mongo/platform/basic.h"
#include "mongo/db/s/active_migrations_registry.h"
@@ -36,6 +38,7 @@
#include "mongo/db/s/migration_session_id.h"
#include "mongo/db/s/migration_source_manager.h"
#include "mongo/db/service_context.h"
+#include "mongo/logv2/log.h"
namespace mongo {
namespace {
@@ -58,7 +61,7 @@ ActiveMigrationsRegistry& ActiveMigrationsRegistry::get(OperationContext* opCtx)
return get(opCtx->getServiceContext());
}
-void ActiveMigrationsRegistry::lock(OperationContext* opCtx) {
+void ActiveMigrationsRegistry::lock(OperationContext* opCtx, StringData reason) {
stdx::unique_lock<Latch> lock(_mutex);
// This wait is to hold back additional lock requests while there is already one in
@@ -66,6 +69,7 @@ void ActiveMigrationsRegistry::lock(OperationContext* opCtx) {
opCtx->waitForConditionOrInterrupt(_lockCond, lock, [this] { return !_migrationsBlocked; });
// Setting flag before condvar returns to block new migrations from starting. (Favoring writers)
+ LOGV2(4675601, "Going to start blocking migrations", "reason"_attr = reason);
_migrationsBlocked = true;
// Wait for any ongoing migrations to complete.
@@ -73,22 +77,23 @@ void ActiveMigrationsRegistry::lock(OperationContext* opCtx) {
_lockCond, lock, [this] { return !(_activeMoveChunkState || _activeReceiveChunkState); });
}
-void ActiveMigrationsRegistry::unlock() {
+void ActiveMigrationsRegistry::unlock(StringData reason) {
stdx::lock_guard<Latch> lock(_mutex);
+ LOGV2(4675602, "Going to stop blocking migrations", "reason"_attr = reason);
_migrationsBlocked = false;
_lockCond.notify_all();
}
StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk(
- const MoveChunkRequest& args) {
- stdx::lock_guard<Latch> lk(_mutex);
+ OperationContext* opCtx, const MoveChunkRequest& args) {
+ stdx::unique_lock<Latch> lk(_mutex);
- if (_migrationsBlocked)
- return {ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "Unable to start new migration because this shard is currently "
- "blocking all migrations."};
+ if (_migrationsBlocked) {
+ LOGV2(4675603, "Register donate chunk waiting for migrations to be unblocked");
+ opCtx->waitForConditionOrInterrupt(_lockCond, lk, [this] { return !_migrationsBlocked; });
+ }
if (_activeReceiveChunkState) {
return _activeReceiveChunkState->constructErrorStatus();
@@ -108,13 +113,16 @@ StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk(
}
StatusWith<ScopedReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk(
- const NamespaceString& nss, const ChunkRange& chunkRange, const ShardId& fromShardId) {
- stdx::lock_guard<Latch> lk(_mutex);
-
- if (_migrationsBlocked)
- return {ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "Unable to start new migration because this shard is currently "
- "blocking all migrations."};
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkRange& chunkRange,
+ const ShardId& fromShardId) {
+ stdx::unique_lock<Latch> lk(_mutex);
+
+ if (_migrationsBlocked) {
+ LOGV2(4675604, "Register receive chunk waiting for migrations to be unblocked");
+ opCtx->waitForConditionOrInterrupt(_lockCond, lk, [this] { return !_migrationsBlocked; });
+ }
if (_activeReceiveChunkState) {
return _activeReceiveChunkState->constructErrorStatus();
diff --git a/src/mongo/db/s/active_migrations_registry.h b/src/mongo/db/s/active_migrations_registry.h
index c0a8029c08b..29b326cf4ef 100644
--- a/src/mongo/db/s/active_migrations_registry.h
+++ b/src/mongo/db/s/active_migrations_registry.h
@@ -66,8 +66,8 @@ public:
* subsequent migration operations will return ConflictingOperationInProgress until the unlock()
* method is called.
*/
- void lock(OperationContext* opCtx);
- void unlock();
+ void lock(OperationContext* opCtx, StringData reason);
+ void unlock(StringData reason);
/**
* If there are no migrations running on this shard, registers an active migration with the
@@ -80,7 +80,8 @@ public:
*
* Otherwise returns a ConflictingOperationInProgress error.
*/
- StatusWith<ScopedDonateChunk> registerDonateChunk(const MoveChunkRequest& args);
+ StatusWith<ScopedDonateChunk> registerDonateChunk(OperationContext* opCtx,
+ const MoveChunkRequest& args);
/**
* If there are no migrations running on this shard, registers an active receive operation with
@@ -89,7 +90,8 @@ public:
*
* Otherwise returns a ConflictingOperationInProgress error.
*/
- StatusWith<ScopedReceiveChunk> registerReceiveChunk(const NamespaceString& nss,
+ StatusWith<ScopedReceiveChunk> registerReceiveChunk(OperationContext* opCtx,
+ const NamespaceString& nss,
const ChunkRange& chunkRange,
const ShardId& fromShardId);
@@ -175,18 +177,21 @@ private:
class MigrationBlockingGuard {
public:
- MigrationBlockingGuard(OperationContext* opCtx, ActiveMigrationsRegistry& registry)
- : _opCtx(opCtx), _registry(registry) {
- _registry.lock(_opCtx);
+ MigrationBlockingGuard(OperationContext* opCtx, std::string reason)
+ : _registry(ActiveMigrationsRegistry::get(opCtx)), _reason(std::move(reason)) {
+ // Ensure any thread attempting to use a MigrationBlockingGuard will be interrupted by
+ // a stepdown.
+ invariant(opCtx->lockState()->wasGlobalLockTakenInModeConflictingWithWrites());
+ _registry.lock(opCtx, _reason);
}
~MigrationBlockingGuard() {
- _registry.unlock();
+ _registry.unlock(_reason);
}
private:
- OperationContext* _opCtx{nullptr};
ActiveMigrationsRegistry& _registry;
+ std::string _reason;
};
/**
diff --git a/src/mongo/db/s/active_migrations_registry_test.cpp b/src/mongo/db/s/active_migrations_registry_test.cpp
index cd777113159..0f268e23fb0 100644
--- a/src/mongo/db/s/active_migrations_registry_test.cpp
+++ b/src/mongo/db/s/active_migrations_registry_test.cpp
@@ -79,7 +79,7 @@ MoveChunkRequest createMoveChunkRequest(const NamespaceString& nss) {
TEST_F(MoveChunkRegistration, ScopedDonateChunkMoveConstructorAndAssignment) {
auto originalScopedDonateChunk = assertGet(_registry.registerDonateChunk(
- createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))));
+ operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))));
ASSERT(originalScopedDonateChunk.mustExecute());
ScopedDonateChunk movedScopedDonateChunk(std::move(originalScopedDonateChunk));
@@ -98,7 +98,7 @@ TEST_F(MoveChunkRegistration, GetActiveMigrationNamespace) {
const NamespaceString nss("TestDB", "TestColl");
auto originalScopedDonateChunk =
- assertGet(_registry.registerDonateChunk(createMoveChunkRequest(nss)));
+ assertGet(_registry.registerDonateChunk(operationContext(), createMoveChunkRequest(nss)));
ASSERT_EQ(nss.ns(), _registry.getActiveDonateChunkNss()->ns());
@@ -108,10 +108,10 @@ TEST_F(MoveChunkRegistration, GetActiveMigrationNamespace) {
TEST_F(MoveChunkRegistration, SecondMigrationReturnsConflictingOperationInProgress) {
auto originalScopedDonateChunk = assertGet(_registry.registerDonateChunk(
- createMoveChunkRequest(NamespaceString("TestDB", "TestColl1"))));
+ operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl1"))));
auto secondScopedDonateChunkStatus = _registry.registerDonateChunk(
- createMoveChunkRequest(NamespaceString("TestDB", "TestColl2")));
+ operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl2")));
ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress,
secondScopedDonateChunkStatus.getStatus());
@@ -120,11 +120,11 @@ TEST_F(MoveChunkRegistration, SecondMigrationReturnsConflictingOperationInProgre
TEST_F(MoveChunkRegistration, SecondMigrationWithSameArgumentsJoinsFirst) {
auto originalScopedDonateChunk = assertGet(_registry.registerDonateChunk(
- createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))));
+ operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))));
ASSERT(originalScopedDonateChunk.mustExecute());
auto secondScopedDonateChunk = assertGet(_registry.registerDonateChunk(
- createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))));
+ operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))));
ASSERT(!secondScopedDonateChunk.mustExecute());
originalScopedDonateChunk.signalComplete({ErrorCodes::InternalError, "Test error"});
@@ -134,44 +134,121 @@ TEST_F(MoveChunkRegistration, SecondMigrationWithSameArgumentsJoinsFirst) {
}
TEST_F(MoveChunkRegistration, TestBlockingDonateChunk) {
- auto opCtx = operationContext();
+ stdx::promise<void> blockDonate;
+ stdx::promise<void> readyToLock;
+ stdx::promise<void> inLock;
- _registry.lock(opCtx);
+ // Registry thread.
+ auto result = stdx::async(stdx::launch::async, [&] {
+ // 2. Lock the registry so that starting to donate will block.
+ _registry.lock(operationContext(), "dummy");
+
+ // 3. Signal the donate thread that the donate is ready to be started.
+ readyToLock.set_value();
- auto scopedDonateChunk = _registry.registerDonateChunk(
- createMoveChunkRequest(NamespaceString("TestDB", "TestColl")));
+ // 4. Wait for the donate thread to start blocking because the registry is locked.
+ blockDonate.get_future().wait();
- ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, scopedDonateChunk.getStatus());
+ // 9. Unlock the registry to signal the donate thread.
+ _registry.unlock("dummy");
+ });
- _registry.unlock();
+ // Donate thread.
+ auto lockReleased = stdx::async(stdx::launch::async, [&] {
+ ThreadClient tc("donate thread", getGlobalServiceContext());
+ auto opCtx = tc->makeOperationContext();
- auto scopedDonateChunk2 = _registry.registerDonateChunk(
- createMoveChunkRequest(NamespaceString("TestDB", "TestColl")));
- ASSERT_OK(scopedDonateChunk2.getStatus());
+ auto baton = opCtx->getBaton();
+ baton->schedule([&inLock](Status) {
+ // 7. This is called when the donate is blocking. We let the test method know
+ // that we're blocked on the donate so that it can tell the registry thread to unlock
+ // the registry.
+ inLock.set_value();
+ });
- scopedDonateChunk2.getValue().signalComplete(Status::OK());
+ // 5. This is woken up by the registry thread.
+ readyToLock.get_future().wait();
+
+ // 6. Now that we're woken up by the registry thread, let's attempt to start to donate.
+ // This will block and call the lambda set on the baton above.
+ auto scopedDonateChunk = _registry.registerDonateChunk(
+ opCtx.get(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl")));
+
+ ASSERT_OK(scopedDonateChunk.getStatus());
+ scopedDonateChunk.getValue().signalComplete(Status::OK());
+
+ // 10. Destroy the ScopedDonateChunk and return.
+ });
+
+ // 1. Wait for the donate thread to start blocking.
+ inLock.get_future().wait();
+
+ // 8. Tell the registry thread to unlock the registry. That will signal the donate thread to
+ // continue.
+ blockDonate.set_value();
+
+ // 11. The donate thread has returned and this future is set.
+ lockReleased.wait();
}
TEST_F(MoveChunkRegistration, TestBlockingReceiveChunk) {
- auto opCtx = operationContext();
+ stdx::promise<void> blockReceive;
+ stdx::promise<void> readyToLock;
+ stdx::promise<void> inLock;
+
+ // Registry thread.
+ auto result = stdx::async(stdx::launch::async, [&] {
+ // 2. Lock the registry so that starting to receive will block.
+ _registry.lock(operationContext(), "dummy");
+
+ // 3. Signal the receive thread that the receive is ready to be started.
+ readyToLock.set_value();
+
+ // 4. Wait for the receive thread to start blocking because the registry is locked.
+ blockReceive.get_future().wait();
+
+ // 9. Unlock the registry to signal the receive thread.
+ _registry.unlock("dummy");
+ });
+
+ // Receive thread.
+ auto lockReleased = stdx::async(stdx::launch::async, [&] {
+ ThreadClient tc("receive thread", getGlobalServiceContext());
+ auto opCtx = tc->makeOperationContext();
+
+ auto baton = opCtx->getBaton();
+ baton->schedule([&inLock](Status) {
+ // 7. This is called when the receive is blocking. We let the test method know
+ // that we're blocked on the receive so that it can tell the registry thread to unlock
+ // the registry.
+ inLock.set_value();
+ });
- _registry.lock(opCtx);
+ // 5. This is woken up by the registry thread.
+ readyToLock.get_future().wait();
- auto scopedReceiveChunk =
- _registry.registerReceiveChunk(NamespaceString("TestDB", "TestColl"),
- ChunkRange(BSON("Key" << -100), BSON("Key" << 100)),
- ShardId("shard0001"));
+ // 6. Now that we're woken up by the registry thread, let's attempt to start to receive.
+ // This will block and call the lambda set on the baton above.
+ auto scopedReceiveChunk =
+ _registry.registerReceiveChunk(opCtx.get(),
+ NamespaceString("TestDB", "TestColl"),
+ ChunkRange(BSON("Key" << -100), BSON("Key" << 100)),
+ ShardId("shard0001"));
- ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, scopedReceiveChunk.getStatus());
+ ASSERT_OK(scopedReceiveChunk.getStatus());
- _registry.unlock();
+ // 10. Destroy the ScopedReceiveChunk and return.
+ });
- auto scopedReceiveChunk2 =
- _registry.registerReceiveChunk(NamespaceString("TestDB", "TestColl"),
- ChunkRange(BSON("Key" << -100), BSON("Key" << 100)),
- ShardId("shard0001"));
+ // 1. Wait for the receive thread to start blocking.
+ inLock.get_future().wait();
- ASSERT_OK(scopedReceiveChunk2.getStatus());
+ // 8. Tell the registry thread to unlock the registry. That will signal the receive thread to
+ // continue.
+ blockReceive.set_value();
+
+ // 11. The receive thread has returned and this future is set.
+ lockReleased.wait();
}
// This test validates that the ActiveMigrationsRegistry lock will block while there is a donation
@@ -186,17 +263,18 @@ TEST_F(MoveChunkRegistration, TestBlockingWhileDonateInProgress) {
auto result = stdx::async(stdx::launch::async, [&] {
// 2. Start a migration so that the registry lock will block when acquired.
auto scopedDonateChunk = _registry.registerDonateChunk(
- createMoveChunkRequest(NamespaceString("TestDB", "TestColl")));
+ operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl")));
ASSERT_OK(scopedDonateChunk.getStatus());
// 3. Signal the registry locking thread that the registry is ready to be locked.
readyToLock.set_value();
+ // 4. Wait for the registry thread to start blocking because there is an active donate.
blockDonate.get_future().wait();
scopedDonateChunk.getValue().signalComplete(Status::OK());
- // 8. Destroy the ScopedDonateChunk to signal the registy lock.
+ // 9. Destroy the ScopedDonateChunk to signal the registy lock.
});
// Registry locking thread.
@@ -206,31 +284,31 @@ TEST_F(MoveChunkRegistration, TestBlockingWhileDonateInProgress) {
auto baton = opCtx->getBaton();
baton->schedule([&inLock](Status) {
- // 6. This is called when the registry lock is blocking. We let the test method know
+ // 7. This is called when the registry lock is blocking. We let the test method know
// that we're blocked on the registry lock so that it tell the migration thread to let
// the donate operation complete.
inLock.set_value();
});
- // 4. This is woken up by the migration thread.
+ // 5. This is woken up by the migration thread.
readyToLock.get_future().wait();
- // 5. Now that we're woken up by the migration thread, let's attempt to lock the registry.
+ // 6. Now that we're woken up by the migration thread, let's attempt to lock the registry.
// This will block and call the lambda set on the baton above.
- _registry.lock(opCtx.get());
+ _registry.lock(opCtx.get(), "dummy");
- // 9. Unlock the registry and return.
- _registry.unlock();
+ // 10. Unlock the registry and return.
+ _registry.unlock("dummy");
});
// 1. Wait for registry lock to be acquired.
inLock.get_future().wait();
- // 7. Let the donate operation complete so that the ScopedDonateChunk is destroyed. That will
+ // 8. Let the donate operation complete so that the ScopedDonateChunk is destroyed. That will
// signal the registry lock.
blockDonate.set_value();
- // 10. The registy locking thread has returned and this future is set.
+ // 11. The registy locking thread has returned and this future is set.
lockReleased.wait();
}
@@ -246,7 +324,8 @@ TEST_F(MoveChunkRegistration, TestBlockingWhileReceiveInProgress) {
auto result = stdx::async(stdx::launch::async, [&] {
// 2. Start a migration so that the registry lock will block when acquired.
auto scopedReceiveChunk =
- _registry.registerReceiveChunk(NamespaceString("TestDB", "TestColl"),
+ _registry.registerReceiveChunk(operationContext(),
+ NamespaceString("TestDB", "TestColl"),
ChunkRange(BSON("Key" << -100), BSON("Key" << 100)),
ShardId("shard0001"));
ASSERT_OK(scopedReceiveChunk.getStatus());
@@ -254,9 +333,10 @@ TEST_F(MoveChunkRegistration, TestBlockingWhileReceiveInProgress) {
// 3. Signal the registry locking thread that the registry is ready to be locked.
readyToLock.set_value();
+ // 4. Wait for the registry thread to start blocking because there is an active receive.
blockReceive.get_future().wait();
- // 8. Destroy the scopedReceiveChunk to signal the registy lock.
+ // 9. Destroy the scopedReceiveChunk to signal the registy lock.
});
// Registry locking thread.
@@ -266,31 +346,31 @@ TEST_F(MoveChunkRegistration, TestBlockingWhileReceiveInProgress) {
auto baton = opCtx->getBaton();
baton->schedule([&inLock](Status) {
- // 6. This is called when the registry lock is blocking. We let the test method know
+ // 7. This is called when the registry lock is blocking. We let the test method know
// that we're blocked on the registry lock so that it tell the migration thread to let
// the receive operation complete.
inLock.set_value();
});
- // 4. This is woken up by the migration thread.
+ // 5. This is woken up by the migration thread.
readyToLock.get_future().wait();
- // 5. Now that we're woken up by the migration thread, let's attempt to lock the registry.
+ // 6. Now that we're woken up by the migration thread, let's attempt to lock the registry.
// This will block and call the lambda set on the baton above.
- _registry.lock(opCtx.get());
+ _registry.lock(opCtx.get(), "dummy");
- // 9. Unlock the registry and return.
- _registry.unlock();
+ // 10. Unlock the registry and return.
+ _registry.unlock("dummy");
});
// 1. Wait for registry lock to be acquired.
inLock.get_future().wait();
- // 7. Let the receive operation complete so that the scopedReceiveChunk is destroyed. That will
+ // 8. Let the receive operation complete so that the scopedReceiveChunk is destroyed. That will
// signal the registry lock.
blockReceive.set_value();
- // 10. The registy locking thread has returned and this future is set.
+ // 11. The registy locking thread has returned and this future is set.
lockReleased.wait();
}
diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
index 84064273757..4144291e7cd 100644
--- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
+++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
@@ -105,7 +105,7 @@ public:
// Ensure this shard is not currently receiving or donating any chunks.
auto scopedReceiveChunk(
uassertStatusOK(ActiveMigrationsRegistry::get(opCtx).registerReceiveChunk(
- nss, chunkRange, cloneRequest.getFromShardId())));
+ opCtx, nss, chunkRange, cloneRequest.getFromShardId())));
// We force a refresh immediately after registering this migration to guarantee that this
// shard will not receive a chunk after refreshing.
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index 6b4537ede2b..36b9c7b1177 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -735,9 +735,18 @@ void refreshFilteringMetadataUntilSuccess(OperationContext* opCtx, const Namespa
});
}
-void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) {
+void resumeMigrationCoordinationsOnStepUp(OperationContext* opCtx) {
LOGV2(22037, "Starting migration coordinator stepup recovery thread.");
+ // Don't allow migrations to start until the recovery is complete. Otherwise, the
+ // migration may end up inserting a migrationCoordinator doc that the recovery thread
+ // reads and attempts to recovery the decision for by bumping the chunkVersion, which
+ // will cause the migration to abort on trying to commit anyway.
+ // Store it as shared_ptr so that it can be captured in the async recovery task below.
+ const auto migrationBlockingGuard =
+ std::make_shared<MigrationBlockingGuard>(opCtx, "migration coordinator stepup recovery");
+
+ const auto serviceContext = opCtx->getServiceContext();
ExecutorFuture<void>(getMigrationUtilExecutor())
.then([serviceContext] {
ThreadClient tc("MigrationCoordinatorStepupRecovery", serviceContext);
@@ -864,7 +873,7 @@ void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) {
ShardingStatistics::get(opCtx).unfinishedMigrationFromPreviousPrimary.store(
migrationRecoveryCount);
})
- .getAsync([](const Status& status) {
+ .getAsync([migrationBlockingGuard](const Status& status) {
if (!status.isOK()) {
LOGV2(22041,
"Failed to resume coordinating migrations on stepup {causedBy_status}",
diff --git a/src/mongo/db/s/migration_util.h b/src/mongo/db/s/migration_util.h
index caaf750d3fb..aa971a17e00 100644
--- a/src/mongo/db/s/migration_util.h
+++ b/src/mongo/db/s/migration_util.h
@@ -203,7 +203,7 @@ void refreshFilteringMetadataUntilSuccess(OperationContext* opCtx, const Namespa
* Submits an asynchronous task to scan config.migrationCoordinators and drive each unfinished
* migration coordination to completion.
*/
-void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext);
+void resumeMigrationCoordinationsOnStepUp(OperationContext* opCtx);
} // namespace migrationutil
} // namespace mongo
diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp
index 63f1dd13507..60a7db4161c 100644
--- a/src/mongo/db/s/move_chunk_command.cpp
+++ b/src/mongo/db/s/move_chunk_command.cpp
@@ -135,7 +135,7 @@ public:
Grid::get(opCtx)->shardRegistry()->reload(opCtx);
auto scopedMigration = uassertStatusOK(
- ActiveMigrationsRegistry::get(opCtx).registerDonateChunk(moveChunkRequest));
+ ActiveMigrationsRegistry::get(opCtx).registerDonateChunk(opCtx, moveChunkRequest));
// Check if there is an existing migration running and if so, join it
if (scopedMigration.mustExecute()) {