summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/active_migrations_registry.cpp
diff options
context:
space:
mode:
authorPaolo Polato <paolo.polato@mongodb.com>2022-07-18 08:11:05 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-07-18 08:56:29 +0000
commit44fe191f411ef26c982707f4c0577216e79a178b (patch)
treeab0b836d89d958e3cfb023709b5c36e0e22405cb /src/mongo/db/s/active_migrations_registry.cpp
parent05f813248226f37f497021765b29b41295c5a60c (diff)
downloadmongo-44fe191f411ef26c982707f4c0577216e79a178b.tar.gz
SERVER-67729 Reject new migrations when the ActiveMigrationsRegistry is locked
Diffstat (limited to 'src/mongo/db/s/active_migrations_registry.cpp')
-rw-r--r--src/mongo/db/s/active_migrations_registry.cpp37
1 files changed, 29 insertions, 8 deletions
diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp
index 55b0466fa3f..61646702239 100644
--- a/src/mongo/db/s/active_migrations_registry.cpp
+++ b/src/mongo/db/s/active_migrations_registry.cpp
@@ -65,6 +65,8 @@ ActiveMigrationsRegistry& ActiveMigrationsRegistry::get(OperationContext* opCtx)
}
void ActiveMigrationsRegistry::lock(OperationContext* opCtx, StringData reason) {
+ // The method requires the requesting operation to be interruptible
+ invariant(opCtx->shouldAlwaysInterruptAtStepDownOrUp());
stdx::unique_lock<Latch> lock(_mutex);
// This wait is to hold back additional lock requests while there is already one in progress
@@ -82,6 +84,17 @@ void ActiveMigrationsRegistry::lock(OperationContext* opCtx, StringData reason)
return !(_activeMoveChunkState || _activeReceiveChunkState);
});
+ // lock() may be called while the node is still completing its draining mode; if so, reject the
+ // request with a retriable error and allow the draining mode to invoke registerReceiveChunk()
+ // as part of its recovery sequence.
+ {
+ AutoGetDb autoDB(opCtx, NamespaceString::kAdminDb, MODE_IS);
+ uassert(ErrorCodes::NotWritablePrimary,
+ "Cannot lock the registry while the node is in draining mode",
+ repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(
+ opCtx, NamespaceString::kAdminDb));
+ }
+
unblockMigrationsOnError.dismiss();
}
@@ -99,8 +112,7 @@ StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk(
stdx::unique_lock<Latch> ul(_mutex);
opCtx->waitForConditionOrInterrupt(_chunkOperationsStateChangedCV, ul, [&] {
- return !_migrationsBlocked &&
- !_activeSplitMergeChunkStates.count(args.getCommandParameter());
+ return !_activeSplitMergeChunkStates.count(args.getCommandParameter());
});
if (_activeReceiveChunkState) {
@@ -129,6 +141,12 @@ StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk(
return _activeMoveChunkState->constructErrorStatus();
}
+ if (_migrationsBlocked) {
+ return {ErrorCodes::ConflictingOperationInProgress,
+ "Unable to start new balancer operation because the ActiveMigrationsRegistry of "
+ "this shard is temporarily locked"};
+ }
+
_activeMoveChunkState.emplace(args);
return {ScopedDonateChunk(this, true, _activeMoveChunkState->notification)};
@@ -139,17 +157,14 @@ StatusWith<ScopedReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk(
const NamespaceString& nss,
const ChunkRange& chunkRange,
const ShardId& fromShardId,
- bool waitForOngoingMigrations) {
+ bool waitForCompletionOfConflictingOps) {
stdx::unique_lock<Latch> ul(_mutex);
- if (waitForOngoingMigrations) {
+ if (waitForCompletionOfConflictingOps) {
opCtx->waitForConditionOrInterrupt(_chunkOperationsStateChangedCV, ul, [this] {
return !_migrationsBlocked && !_activeMoveChunkState && !_activeReceiveChunkState;
});
} else {
- opCtx->waitForConditionOrInterrupt(
- _chunkOperationsStateChangedCV, ul, [this] { return !_migrationsBlocked; });
-
if (_activeReceiveChunkState) {
return _activeReceiveChunkState->constructErrorStatus();
}
@@ -161,10 +176,16 @@ StatusWith<ScopedReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk(
"runningMigration"_attr = _activeMoveChunkState->args.toBSON({}));
return _activeMoveChunkState->constructErrorStatus();
}
+
+ if (_migrationsBlocked) {
+ return {
+ ErrorCodes::ConflictingOperationInProgress,
+ "Unable to start new balancer operation because the ActiveMigrationsRegistry of "
+ "this shard is temporarily locked"};
+ }
}
_activeReceiveChunkState.emplace(nss, chunkRange, fromShardId);
-
return {ScopedReceiveChunk(this)};
}