summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaolo Polato <paolo.polato@mongodb.com>2022-07-18 08:11:05 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-07-18 08:56:29 +0000
commit44fe191f411ef26c982707f4c0577216e79a178b (patch)
treeab0b836d89d958e3cfb023709b5c36e0e22405cb
parent05f813248226f37f497021765b29b41295c5a60c (diff)
downloadmongo-44fe191f411ef26c982707f4c0577216e79a178b.tar.gz
SERVER-67729 Reject new migrations when the ActiveMigrationsRegistry is locked
-rw-r--r--src/mongo/db/s/active_migrations_registry.cpp37
-rw-r--r--src/mongo/db/s/active_migrations_registry.h18
-rw-r--r--src/mongo/db/s/active_migrations_registry_test.cpp42
-rw-r--r--src/mongo/db/s/migration_destination_manager_legacy_commands.cpp6
-rw-r--r--src/mongo/db/s/migration_util.cpp2
5 files changed, 70 insertions, 35 deletions
diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp
index 55b0466fa3f..61646702239 100644
--- a/src/mongo/db/s/active_migrations_registry.cpp
+++ b/src/mongo/db/s/active_migrations_registry.cpp
@@ -65,6 +65,8 @@ ActiveMigrationsRegistry& ActiveMigrationsRegistry::get(OperationContext* opCtx)
}
void ActiveMigrationsRegistry::lock(OperationContext* opCtx, StringData reason) {
+ // The method requires the requesting operation to be interruptible
+ invariant(opCtx->shouldAlwaysInterruptAtStepDownOrUp());
stdx::unique_lock<Latch> lock(_mutex);
// This wait is to hold back additional lock requests while there is already one in progress
@@ -82,6 +84,17 @@ void ActiveMigrationsRegistry::lock(OperationContext* opCtx, StringData reason)
return !(_activeMoveChunkState || _activeReceiveChunkState);
});
+ // lock() may be called while the node is still completing its draining mode; if so, reject the
+ // request with a retriable error and allow the draining mode to invoke registerReceiveChunk()
+ // as part of its recovery sequence.
+ {
+ AutoGetDb autoDB(opCtx, NamespaceString::kAdminDb, MODE_IS);
+ uassert(ErrorCodes::NotWritablePrimary,
+ "Cannot lock the registry while the node is in draining mode",
+ repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(
+ opCtx, NamespaceString::kAdminDb));
+ }
+
unblockMigrationsOnError.dismiss();
}
@@ -99,8 +112,7 @@ StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk(
stdx::unique_lock<Latch> ul(_mutex);
opCtx->waitForConditionOrInterrupt(_chunkOperationsStateChangedCV, ul, [&] {
- return !_migrationsBlocked &&
- !_activeSplitMergeChunkStates.count(args.getCommandParameter());
+ return !_activeSplitMergeChunkStates.count(args.getCommandParameter());
});
if (_activeReceiveChunkState) {
@@ -129,6 +141,12 @@ StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk(
return _activeMoveChunkState->constructErrorStatus();
}
+ if (_migrationsBlocked) {
+ return {ErrorCodes::ConflictingOperationInProgress,
+ "Unable to start new balancer operation because the ActiveMigrationsRegistry of "
+ "this shard is temporarily locked"};
+ }
+
_activeMoveChunkState.emplace(args);
return {ScopedDonateChunk(this, true, _activeMoveChunkState->notification)};
@@ -139,17 +157,14 @@ StatusWith<ScopedReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk(
const NamespaceString& nss,
const ChunkRange& chunkRange,
const ShardId& fromShardId,
- bool waitForOngoingMigrations) {
+ bool waitForCompletionOfConflictingOps) {
stdx::unique_lock<Latch> ul(_mutex);
- if (waitForOngoingMigrations) {
+ if (waitForCompletionOfConflictingOps) {
opCtx->waitForConditionOrInterrupt(_chunkOperationsStateChangedCV, ul, [this] {
return !_migrationsBlocked && !_activeMoveChunkState && !_activeReceiveChunkState;
});
} else {
- opCtx->waitForConditionOrInterrupt(
- _chunkOperationsStateChangedCV, ul, [this] { return !_migrationsBlocked; });
-
if (_activeReceiveChunkState) {
return _activeReceiveChunkState->constructErrorStatus();
}
@@ -161,10 +176,16 @@ StatusWith<ScopedReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk(
"runningMigration"_attr = _activeMoveChunkState->args.toBSON({}));
return _activeMoveChunkState->constructErrorStatus();
}
+
+ if (_migrationsBlocked) {
+ return {
+ ErrorCodes::ConflictingOperationInProgress,
+ "Unable to start new balancer operation because the ActiveMigrationsRegistry of "
+ "this shard is temporarily locked"};
+ }
}
_activeReceiveChunkState.emplace(nss, chunkRange, fromShardId);
-
return {ScopedReceiveChunk(this)};
}
diff --git a/src/mongo/db/s/active_migrations_registry.h b/src/mongo/db/s/active_migrations_registry.h
index 4d75a6845b0..ea61210782a 100644
--- a/src/mongo/db/s/active_migrations_registry.h
+++ b/src/mongo/db/s/active_migrations_registry.h
@@ -91,20 +91,22 @@ public:
const ShardsvrMoveRange& args);
/**
- * If there are no migrations or split/merges running on this shard, registers an active receive
- * operation with the specified session id and returns a ScopedReceiveChunk. The
- * ScopedReceiveChunk will unregister the migration when the ScopedReceiveChunk goes out of
- * scope.
+ * Registers an active receive chunk operation with the specified session id and returns a
+ * ScopedReceiveChunk. The returned ScopedReceiveChunk object will unregister the migration when
+ * it goes out of scope.
*
- * Otherwise returns a ConflictingOperationInProgress error if waitForOngoingMigrations is false
- * or waits for the ongoing migration/split/merge to finish and then registers the migration if
- * waitForOngoingMigrations is true.
+ * In case registerReceiveChunk() is called while other operations (a second migration or a
+ * registry lock()) are already holding resources of the ActiveMigrationsRegistry, the function
+ * will either
+ * - wait for such operations to complete and then perform the registration
+ * - return a ConflictingOperationInProgress error
+ * based on the value of the waitForCompletionOfConflictingOps parameter
*/
StatusWith<ScopedReceiveChunk> registerReceiveChunk(OperationContext* opCtx,
const NamespaceString& nss,
const ChunkRange& chunkRange,
const ShardId& fromShardId,
- bool waitForOngoingMigrations);
+ bool waitForCompletionOfConflictingOps);
/**
* If there are no migrations running on this shard, registers an active split or merge
diff --git a/src/mongo/db/s/active_migrations_registry_test.cpp b/src/mongo/db/s/active_migrations_registry_test.cpp
index bc285b35f81..acdd2bfaf38 100644
--- a/src/mongo/db/s/active_migrations_registry_test.cpp
+++ b/src/mongo/db/s/active_migrations_registry_test.cpp
@@ -33,7 +33,7 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/client.h"
#include "mongo/db/s/active_migrations_registry.h"
-#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/db/s/shard_server_test_fixture.h"
#include "mongo/s/commands/cluster_commands_gen.h"
#include "mongo/stdx/future.h"
#include "mongo/unittest/unittest.h"
@@ -43,19 +43,17 @@ namespace {
using unittest::assertGet;
-class MoveChunkRegistration : public ServiceContextMongoDTest {
+class MoveChunkRegistration : public ShardServerTestFixture {
public:
void setUp() override {
- _opCtx = getClient()->makeOperationContext();
- }
-
- OperationContext* operationContext() {
- return _opCtx.get();
+ ShardServerTestFixture::setUp();
+ _opCtx = operationContext();
+ _opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
}
protected:
ActiveMigrationsRegistry _registry;
- ServiceContext::UniqueOperationContext _opCtx;
+ OperationContext* _opCtx;
};
ShardsvrMoveRange createMoveRangeRequest(const NamespaceString& nss,
@@ -73,7 +71,7 @@ ShardsvrMoveRange createMoveRangeRequest(const NamespaceString& nss,
TEST_F(MoveChunkRegistration, ScopedDonateChunkMoveConstructorAndAssignment) {
auto originalScopedDonateChunk = assertGet(_registry.registerDonateChunk(
- operationContext(), createMoveRangeRequest(NamespaceString("TestDB", "TestColl"))));
+ _opCtx, createMoveRangeRequest(NamespaceString("TestDB", "TestColl"))));
ASSERT(originalScopedDonateChunk.mustExecute());
ScopedDonateChunk movedScopedDonateChunk(std::move(originalScopedDonateChunk));
@@ -144,7 +142,11 @@ TEST_F(MoveChunkRegistration, TestBlockingDonateChunk) {
// Registry thread.
auto result = stdx::async(stdx::launch::async, [&] {
// 2. Lock the registry so that starting to donate will block.
- _registry.lock(operationContext(), "dummy");
+ ThreadClient tc("ActiveMigrationsRegistryTest", getGlobalServiceContext());
+ auto opCtxHolder = tc->makeOperationContext();
+ opCtxHolder->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
+
+ _registry.lock(opCtxHolder.get(), "dummy");
// 3. Signal the donate thread that the donate is ready to be started.
readyToLock.set_value();
@@ -202,7 +204,11 @@ TEST_F(MoveChunkRegistration, TestBlockingReceiveChunk) {
// Registry thread.
auto result = stdx::async(stdx::launch::async, [&] {
// 2. Lock the registry so that starting to receive will block.
- _registry.lock(operationContext(), "dummy");
+ ThreadClient tc("ActiveMigrationsRegistryTest", getGlobalServiceContext());
+ auto opCtxHolder = tc->makeOperationContext();
+ opCtxHolder->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
+
+ _registry.lock(opCtxHolder.get(), "dummy");
// 3. Signal the receive thread that the receive is ready to be started.
readyToLock.set_value();
@@ -284,9 +290,10 @@ TEST_F(MoveChunkRegistration, TestBlockingWhileDonateInProgress) {
// Registry locking thread.
auto lockReleased = stdx::async(stdx::launch::async, [&] {
ThreadClient tc("ActiveMigrationsRegistryTest", getGlobalServiceContext());
- auto opCtx = tc->makeOperationContext();
+ auto opCtxHolder = tc->makeOperationContext();
+ opCtxHolder->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
- auto baton = opCtx->getBaton();
+ auto baton = opCtxHolder->getBaton();
baton->schedule([&inLock](Status) {
// 7. This is called when the registry lock is blocking. We let the test method know
// that we're blocked on the registry lock so that it tell the migration thread to let
@@ -299,7 +306,7 @@ TEST_F(MoveChunkRegistration, TestBlockingWhileDonateInProgress) {
// 6. Now that we're woken up by the migration thread, let's attempt to lock the registry.
// This will block and call the lambda set on the baton above.
- _registry.lock(opCtx.get(), "dummy");
+ _registry.lock(opCtxHolder.get(), "dummy");
// 10. Unlock the registry and return.
_registry.unlock("dummy");
@@ -347,9 +354,10 @@ TEST_F(MoveChunkRegistration, TestBlockingWhileReceiveInProgress) {
// Registry locking thread.
auto lockReleased = stdx::async(stdx::launch::async, [&] {
ThreadClient tc("ActiveMigrationsRegistryTest", getGlobalServiceContext());
- auto opCtx = tc->makeOperationContext();
+ auto opCtxHolder = tc->makeOperationContext();
+ opCtxHolder->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
- auto baton = opCtx->getBaton();
+ auto baton = opCtxHolder->getBaton();
baton->schedule([&inLock](Status) {
// 7. This is called when the registry lock is blocking. We let the test method know
// that we're blocked on the registry lock so that it tell the migration thread to let
@@ -362,7 +370,7 @@ TEST_F(MoveChunkRegistration, TestBlockingWhileReceiveInProgress) {
// 6. Now that we're woken up by the migration thread, let's attempt to lock the registry.
// This will block and call the lambda set on the baton above.
- _registry.lock(opCtx.get(), "dummy");
+ _registry.lock(opCtxHolder.get(), "dummy");
// 10. Unlock the registry and return.
_registry.unlock("dummy");
diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
index a3b59a4c6cc..1959befa719 100644
--- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
+++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
@@ -120,7 +120,11 @@ public:
// Ensure this shard is not currently receiving or donating any chunks.
auto scopedReceiveChunk(
uassertStatusOK(ActiveMigrationsRegistry::get(opCtx).registerReceiveChunk(
- opCtx, nss, chunkRange, cloneRequest.getFromShardId(), false)));
+ opCtx,
+ nss,
+ chunkRange,
+ cloneRequest.getFromShardId(),
+ false /* waitForCompletionOfConflictingOps*/)));
// We force a refresh immediately after registering this migration to guarantee that this
// shard will not receive a chunk after refreshing.
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index e71dfe04d89..f0d85e91d0a 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -1243,7 +1243,7 @@ void resumeMigrationRecipientsOnStepUp(OperationContext* opCtx) {
nss,
doc.getRange(),
doc.getDonorShardIdForLoggingPurposesOnly(),
- true /* waitForOngoingMigrations */)));
+ true /* waitForCompletionOfConflictingOps */)));
const auto mdm = MigrationDestinationManager::get(opCtx);
uassertStatusOK(