summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBlake Oler <blake.oler@mongodb.com>2018-02-26 15:50:19 -0500
committerBlake Oler <blake.oler@mongodb.com>2018-02-27 16:05:14 -0500
commitcbe3e978d9fd41b85f2f394ce6182e6579667fd1 (patch)
tree0655752d8862b0588eb6261a5d9c6a6a580e990d
parent044b03fc7e01c3cc0df135566432837568cb78a3 (diff)
downloadmongo-cbe3e978d9fd41b85f2f394ce6182e6579667fd1.tar.gz
SERVER-33197 Implement joining behavior for movePrimary on shards
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/active_migrations_registry.cpp47
-rw-r--r--src/mongo/db/s/active_migrations_registry.h101
-rw-r--r--src/mongo/db/s/active_migrations_registry_test.cpp41
-rw-r--r--src/mongo/db/s/active_move_primaries_registry.cpp124
-rw-r--r--src/mongo/db/s/active_move_primaries_registry.h157
-rw-r--r--src/mongo/db/s/active_move_primaries_registry_test.cpp128
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp10
-rw-r--r--src/mongo/db/s/migration_destination_manager.h4
-rw-r--r--src/mongo/db/s/migration_destination_manager_legacy_commands.cpp4
-rw-r--r--src/mongo/db/s/move_chunk_command.cpp10
-rw-r--r--src/mongo/db/s/move_primary_command.cpp27
-rw-r--r--src/mongo/db/s/sharding_state.cpp17
-rw-r--r--src/mongo/db/s/sharding_state.h40
-rw-r--r--src/mongo/s/request_types/move_primary.idl1
15 files changed, 591 insertions, 122 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 30c89725877..b894268d9a6 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -70,6 +70,7 @@ env.Library(
target='sharding',
source=[
'active_migrations_registry.cpp',
+ 'active_move_primaries_registry.cpp',
'chunk_move_write_concern_options.cpp',
'chunk_splitter.cpp',
'collection_range_deleter.cpp',
@@ -262,6 +263,7 @@ env.CppUnitTest(
target='shard_test',
source=[
'active_migrations_registry_test.cpp',
+ 'active_move_primaries_registry_test.cpp',
'catalog_cache_loader_mock.cpp',
'migration_chunk_cloner_source_legacy_test.cpp',
'namespace_metadata_change_notifications_test.cpp',
diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp
index a8006a06e6f..17f5bb1e94f 100644
--- a/src/mongo/db/s/active_migrations_registry.cpp
+++ b/src/mongo/db/s/active_migrations_registry.cpp
@@ -45,7 +45,7 @@ ActiveMigrationsRegistry::~ActiveMigrationsRegistry() {
invariant(!_activeMoveChunkState);
}
-StatusWith<ScopedRegisterDonateChunk> ActiveMigrationsRegistry::registerDonateChunk(
+StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk(
const MoveChunkRequest& args) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_activeReceiveChunkState) {
@@ -54,7 +54,7 @@ StatusWith<ScopedRegisterDonateChunk> ActiveMigrationsRegistry::registerDonateCh
if (_activeMoveChunkState) {
if (_activeMoveChunkState->args == args) {
- return {ScopedRegisterDonateChunk(nullptr, false, _activeMoveChunkState->notification)};
+ return {ScopedDonateChunk(nullptr, false, _activeMoveChunkState->notification)};
}
return _activeMoveChunkState->constructErrorStatus();
@@ -62,10 +62,10 @@ StatusWith<ScopedRegisterDonateChunk> ActiveMigrationsRegistry::registerDonateCh
_activeMoveChunkState.emplace(args);
- return {ScopedRegisterDonateChunk(this, true, _activeMoveChunkState->notification)};
+ return {ScopedDonateChunk(this, true, _activeMoveChunkState->notification)};
}
-StatusWith<ScopedRegisterReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk(
+StatusWith<ScopedReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk(
const NamespaceString& nss, const ChunkRange& chunkRange, const ShardId& fromShardId) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_activeReceiveChunkState) {
@@ -78,7 +78,7 @@ StatusWith<ScopedRegisterReceiveChunk> ActiveMigrationsRegistry::registerReceive
_activeReceiveChunkState.emplace(nss, chunkRange, fromShardId);
- return {ScopedRegisterReceiveChunk(this)};
+ return {ScopedReceiveChunk(this)};
}
boost::optional<NamespaceString> ActiveMigrationsRegistry::getActiveDonateChunkNss() {
@@ -151,62 +151,59 @@ Status ActiveMigrationsRegistry::ActiveReceiveChunkState::constructErrorStatus()
<< fromShardId};
}
-ScopedRegisterDonateChunk::ScopedRegisterDonateChunk(
- ActiveMigrationsRegistry* registry,
- bool forUnregister,
- std::shared_ptr<Notification<Status>> completionNotification)
+ScopedDonateChunk::ScopedDonateChunk(ActiveMigrationsRegistry* registry,
+ bool shouldExecute,
+ std::shared_ptr<Notification<Status>> completionNotification)
: _registry(registry),
- _forUnregister(forUnregister),
+ _shouldExecute(shouldExecute),
_completionNotification(std::move(completionNotification)) {}
-ScopedRegisterDonateChunk::~ScopedRegisterDonateChunk() {
- if (_registry && _forUnregister) {
+ScopedDonateChunk::~ScopedDonateChunk() {
+ if (_registry && _shouldExecute) {
// If this is a newly started migration the caller must always signal on completion
invariant(*_completionNotification);
_registry->_clearDonateChunk();
}
}
-ScopedRegisterDonateChunk::ScopedRegisterDonateChunk(ScopedRegisterDonateChunk&& other) {
+ScopedDonateChunk::ScopedDonateChunk(ScopedDonateChunk&& other) {
*this = std::move(other);
}
-ScopedRegisterDonateChunk& ScopedRegisterDonateChunk::operator=(ScopedRegisterDonateChunk&& other) {
+ScopedDonateChunk& ScopedDonateChunk::operator=(ScopedDonateChunk&& other) {
if (&other != this) {
_registry = other._registry;
other._registry = nullptr;
- _forUnregister = other._forUnregister;
+ _shouldExecute = other._shouldExecute;
_completionNotification = std::move(other._completionNotification);
}
return *this;
}
-void ScopedRegisterDonateChunk::complete(Status status) {
- invariant(_forUnregister);
+void ScopedDonateChunk::signalComplete(Status status) {
+ invariant(_shouldExecute);
_completionNotification->set(status);
}
-Status ScopedRegisterDonateChunk::waitForCompletion(OperationContext* opCtx) {
- invariant(!_forUnregister);
+Status ScopedDonateChunk::waitForCompletion(OperationContext* opCtx) {
+ invariant(!_shouldExecute);
return _completionNotification->get(opCtx);
}
-ScopedRegisterReceiveChunk::ScopedRegisterReceiveChunk(ActiveMigrationsRegistry* registry)
- : _registry(registry) {}
+ScopedReceiveChunk::ScopedReceiveChunk(ActiveMigrationsRegistry* registry) : _registry(registry) {}
-ScopedRegisterReceiveChunk::~ScopedRegisterReceiveChunk() {
+ScopedReceiveChunk::~ScopedReceiveChunk() {
if (_registry) {
_registry->_clearReceiveChunk();
}
}
-ScopedRegisterReceiveChunk::ScopedRegisterReceiveChunk(ScopedRegisterReceiveChunk&& other) {
+ScopedReceiveChunk::ScopedReceiveChunk(ScopedReceiveChunk&& other) {
*this = std::move(other);
}
-ScopedRegisterReceiveChunk& ScopedRegisterReceiveChunk::operator=(
- ScopedRegisterReceiveChunk&& other) {
+ScopedReceiveChunk& ScopedReceiveChunk::operator=(ScopedReceiveChunk&& other) {
if (&other != this) {
_registry = other._registry;
other._registry = nullptr;
diff --git a/src/mongo/db/s/active_migrations_registry.h b/src/mongo/db/s/active_migrations_registry.h
index a36699d09cf..6acd4678927 100644
--- a/src/mongo/db/s/active_migrations_registry.h
+++ b/src/mongo/db/s/active_migrations_registry.h
@@ -40,14 +40,14 @@
namespace mongo {
class OperationContext;
-class ScopedRegisterDonateChunk;
-class ScopedRegisterReceiveChunk;
+class ScopedDonateChunk;
+class ScopedReceiveChunk;
template <typename T>
class StatusWith;
/**
- * Thread-safe object, which keeps track of the active migrations running on a node and limits them
- * to only one per-shard. There is only one instance of this object per shard.
+ * Thread-safe object that keeps track of the active migrations running on a node and limits them
+ * to only one per shard. There is only one instance of this object per shard.
*/
class ActiveMigrationsRegistry {
MONGO_DISALLOW_COPYING(ActiveMigrationsRegistry);
@@ -58,30 +58,30 @@ public:
/**
* If there are no migrations running on this shard, registers an active migration with the
- * specified arguments and returns a ScopedRegisterDonateChunk, which must be signaled by the
+ * specified arguments. Returns a ScopedDonateChunk, which must be signaled by the
* caller before it goes out of scope.
*
* If there is an active migration already running on this shard and it has the exact same
- * arguments, returns a ScopedRegisterDonateChunk, which can be used to join the already running
- * migration.
+ * arguments, returns a ScopedDonateChunk. The ScopedDonateChunk can be used to join the
+ * already running migration.
*
* Otherwise returns a ConflictingOperationInProgress error.
*/
- StatusWith<ScopedRegisterDonateChunk> registerDonateChunk(const MoveChunkRequest& args);
+ StatusWith<ScopedDonateChunk> registerDonateChunk(const MoveChunkRequest& args);
/**
* If there are no migrations running on this shard, registers an active receive operation with
- * the specified session id and returns a ScopedRegisterReceiveChunk, which will unregister it
- * when it goes out of scope.
+ * the specified session id and returns a ScopedReceiveChunk. The ScopedReceiveChunk will
+ * unregister the migration when the ScopedReceiveChunk goes out of scope.
*
* Otherwise returns a ConflictingOperationInProgress error.
*/
- StatusWith<ScopedRegisterReceiveChunk> registerReceiveChunk(const NamespaceString& nss,
- const ChunkRange& chunkRange,
- const ShardId& fromShardId);
+ StatusWith<ScopedReceiveChunk> registerReceiveChunk(const NamespaceString& nss,
+ const ChunkRange& chunkRange,
+ const ShardId& fromShardId);
/**
- * If a migration has been previously registered through a call to registerDonateChunk returns
+ * If a migration has been previously registered through a call to registerDonateChunk, returns
* that namespace. Otherwise returns boost::none.
*/
boost::optional<NamespaceString> getActiveDonateChunkNss();
@@ -95,8 +95,8 @@ public:
BSONObj getActiveMigrationStatusReport(OperationContext* opCtx);
private:
- friend class ScopedRegisterDonateChunk;
- friend class ScopedRegisterReceiveChunk;
+ friend class ScopedDonateChunk;
+ friend class ScopedReceiveChunk;
// Describes the state of a currently active moveChunk operation
struct ActiveMoveChunkState {
@@ -111,7 +111,7 @@ private:
// Exact arguments of the currently active operation
MoveChunkRequest args;
- // Notification event, which will be signaled when the currently active operation completes
+ // Notification event that will be signaled when the currently active operation completes
std::shared_ptr<Notification<Status>> notification;
};
@@ -125,7 +125,7 @@ private:
*/
Status constructErrorStatus() const;
- // Namesspace for which a chunk is being received
+ // Namespace for which a chunk is being received
NamespaceString nss;
// Bounds of the chunk being migrated
@@ -136,8 +136,8 @@ private:
};
/**
- * Unregisters a previously registered namespace with ongoing migration. Must only be called if
- * a previous call to registerDonateChunk has succeeded.
+ * Unregisters a previously registered namespace with an ongoing migration. Must only be called
+ * if a previous call to registerDonateChunk has succeeded.
*/
void _clearDonateChunk();
@@ -150,49 +150,49 @@ private:
// Protects the state below
stdx::mutex _mutex;
- // If there is an active moveChunk operation going on, this field contains the request, which
- // initiated it
+ // If there is an active moveChunk operation, this field contains the original request
boost::optional<ActiveMoveChunkState> _activeMoveChunkState;
- // If there is an active receive of a chunk going on, this field contains the session id, which
- // initiated it
+ // If there is an active chunk receive operation, this field contains the original session id
boost::optional<ActiveReceiveChunkState> _activeReceiveChunkState;
};
/**
* Object of this class is returned from the registerDonateChunk call of the active migrations
- * registry. It can exist in two modes - 'unregister' and 'join'. See the comments for
+ * registry. It can exist in two modes - 'execute' and 'join'. See the comments for
* registerDonateChunk method for more details.
*/
-class ScopedRegisterDonateChunk {
- MONGO_DISALLOW_COPYING(ScopedRegisterDonateChunk);
+class ScopedDonateChunk {
+ MONGO_DISALLOW_COPYING(ScopedDonateChunk);
public:
- ScopedRegisterDonateChunk(ActiveMigrationsRegistry* registry,
- bool forUnregister,
- std::shared_ptr<Notification<Status>> completionNotification);
- ~ScopedRegisterDonateChunk();
+ ScopedDonateChunk(ActiveMigrationsRegistry* registry,
+ bool shouldExecute,
+ std::shared_ptr<Notification<Status>> completionNotification);
+ ~ScopedDonateChunk();
- ScopedRegisterDonateChunk(ScopedRegisterDonateChunk&&);
- ScopedRegisterDonateChunk& operator=(ScopedRegisterDonateChunk&&);
+ ScopedDonateChunk(ScopedDonateChunk&&);
+ ScopedDonateChunk& operator=(ScopedDonateChunk&&);
/**
- * Returns true if the migration object is in the 'unregister' mode, which means that the holder
- * must execute the moveChunk command and complete with a status.
+ * Returns true if the migration object is in the 'execute' mode. This means that the migration
+ * object holder is next in line to execute the moveChunk command. The holder must execute the
+ * command and call signalComplete with a status.
*/
bool mustExecute() const {
- return _forUnregister;
+ return _shouldExecute;
}
/**
- * Must only be called if the object is in the 'unregister' mode. Signals any callers, which
- * might be blocked in waitForCompletion.
+ * Must only be called if the object is in the 'execute' mode when the moveChunk command was
+ * invoked (the command immediately executed). Signals any callers that might be blocked in
+ * waitForCompletion.
*/
- void complete(Status status);
+ void signalComplete(Status status);
/**
* Must only be called if the object is in the 'join' mode. Blocks until the main executor of
- * the moveChunk command calls complete.
+ * the moveChunk command calls signalComplete.
*/
Status waitForCompletion(OperationContext* opCtx);
@@ -200,9 +200,12 @@ private:
// Registry from which to unregister the migration. Not owned.
ActiveMigrationsRegistry* _registry;
- // Whether this is a newly started migration (in which case the destructor must unregister) or
- // joining an existing one (in which case the caller must wait for completion).
- bool _forUnregister;
+ /**
+ * Whether the holder is the first in line for a newly started migration (in which case the
+ * destructor must unregister) or the caller is joining on an already-running migration
+ * (in which case the caller must block and wait for completion).
+ */
+ bool _shouldExecute;
// This is the future, which will be signaled at the end of a migration
std::shared_ptr<Notification<Status>> _completionNotification;
@@ -212,15 +215,15 @@ private:
* Object of this class is returned from the registerReceiveChunk call of the active migrations
* registry.
*/
-class ScopedRegisterReceiveChunk {
- MONGO_DISALLOW_COPYING(ScopedRegisterReceiveChunk);
+class ScopedReceiveChunk {
+ MONGO_DISALLOW_COPYING(ScopedReceiveChunk);
public:
- ScopedRegisterReceiveChunk(ActiveMigrationsRegistry* registry);
- ~ScopedRegisterReceiveChunk();
+ ScopedReceiveChunk(ActiveMigrationsRegistry* registry);
+ ~ScopedReceiveChunk();
- ScopedRegisterReceiveChunk(ScopedRegisterReceiveChunk&&);
- ScopedRegisterReceiveChunk& operator=(ScopedRegisterReceiveChunk&&);
+ ScopedReceiveChunk(ScopedReceiveChunk&&);
+ ScopedReceiveChunk& operator=(ScopedReceiveChunk&&);
private:
// Registry from which to unregister the migration. Not owned.
diff --git a/src/mongo/db/s/active_migrations_registry_test.cpp b/src/mongo/db/s/active_migrations_registry_test.cpp
index d336ba0362d..5450296d505 100644
--- a/src/mongo/db/s/active_migrations_registry_test.cpp
+++ b/src/mongo/db/s/active_migrations_registry_test.cpp
@@ -82,20 +82,19 @@ MoveChunkRequest createMoveChunkRequest(const NamespaceString& nss) {
return assertGet(MoveChunkRequest::createFromCommand(nss, builder.obj()));
}
-TEST_F(MoveChunkRegistration, ScopedRegisterDonateChunkMoveConstructorAndAssignment) {
- auto originalScopedRegisterDonateChunk = assertGet(_registry.registerDonateChunk(
+TEST_F(MoveChunkRegistration, ScopedDonateChunkMoveConstructorAndAssignment) {
+ auto originalScopedDonateChunk = assertGet(_registry.registerDonateChunk(
createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))));
- ASSERT(originalScopedRegisterDonateChunk.mustExecute());
+ ASSERT(originalScopedDonateChunk.mustExecute());
- ScopedRegisterDonateChunk movedScopedRegisterDonateChunk(
- std::move(originalScopedRegisterDonateChunk));
- ASSERT(movedScopedRegisterDonateChunk.mustExecute());
+ ScopedDonateChunk movedScopedDonateChunk(std::move(originalScopedDonateChunk));
+ ASSERT(movedScopedDonateChunk.mustExecute());
- originalScopedRegisterDonateChunk = std::move(movedScopedRegisterDonateChunk);
- ASSERT(originalScopedRegisterDonateChunk.mustExecute());
+ originalScopedDonateChunk = std::move(movedScopedDonateChunk);
+ ASSERT(originalScopedDonateChunk.mustExecute());
// Need to signal the registered migration so the destructor doesn't invariant
- originalScopedRegisterDonateChunk.complete(Status::OK());
+ originalScopedDonateChunk.signalComplete(Status::OK());
}
TEST_F(MoveChunkRegistration, GetActiveMigrationNamespace) {
@@ -103,39 +102,39 @@ TEST_F(MoveChunkRegistration, GetActiveMigrationNamespace) {
const NamespaceString nss("TestDB", "TestColl");
- auto originalScopedRegisterDonateChunk =
+ auto originalScopedDonateChunk =
assertGet(_registry.registerDonateChunk(createMoveChunkRequest(nss)));
ASSERT_EQ(nss.ns(), _registry.getActiveDonateChunkNss()->ns());
// Need to signal the registered migration so the destructor doesn't invariant
- originalScopedRegisterDonateChunk.complete(Status::OK());
+ originalScopedDonateChunk.signalComplete(Status::OK());
}
TEST_F(MoveChunkRegistration, SecondMigrationReturnsConflictingOperationInProgress) {
- auto originalScopedRegisterDonateChunk = assertGet(_registry.registerDonateChunk(
+ auto originalScopedDonateChunk = assertGet(_registry.registerDonateChunk(
createMoveChunkRequest(NamespaceString("TestDB", "TestColl1"))));
- auto secondScopedRegisterDonateChunkStatus = _registry.registerDonateChunk(
+ auto secondScopedDonateChunkStatus = _registry.registerDonateChunk(
createMoveChunkRequest(NamespaceString("TestDB", "TestColl2")));
ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress,
- secondScopedRegisterDonateChunkStatus.getStatus());
+ secondScopedDonateChunkStatus.getStatus());
- originalScopedRegisterDonateChunk.complete(Status::OK());
+ originalScopedDonateChunk.signalComplete(Status::OK());
}
TEST_F(MoveChunkRegistration, SecondMigrationWithSameArgumentsJoinsFirst) {
- auto originalScopedRegisterDonateChunk = assertGet(_registry.registerDonateChunk(
+ auto originalScopedDonateChunk = assertGet(_registry.registerDonateChunk(
createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))));
- ASSERT(originalScopedRegisterDonateChunk.mustExecute());
+ ASSERT(originalScopedDonateChunk.mustExecute());
- auto secondScopedRegisterDonateChunk = assertGet(_registry.registerDonateChunk(
+ auto secondScopedDonateChunk = assertGet(_registry.registerDonateChunk(
createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))));
- ASSERT(!secondScopedRegisterDonateChunk.mustExecute());
+ ASSERT(!secondScopedDonateChunk.mustExecute());
- originalScopedRegisterDonateChunk.complete({ErrorCodes::InternalError, "Test error"});
+ originalScopedDonateChunk.signalComplete({ErrorCodes::InternalError, "Test error"});
ASSERT_EQ(Status(ErrorCodes::InternalError, "Test error"),
- secondScopedRegisterDonateChunk.waitForCompletion(getTxn()));
+ secondScopedDonateChunk.waitForCompletion(getTxn()));
}
} // namespace
diff --git a/src/mongo/db/s/active_move_primaries_registry.cpp b/src/mongo/db/s/active_move_primaries_registry.cpp
new file mode 100644
index 00000000000..eaf2a01a74e
--- /dev/null
+++ b/src/mongo/db/s/active_move_primaries_registry.cpp
@@ -0,0 +1,124 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/db/s/active_move_primaries_registry.h"
+
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+
+ActiveMovePrimariesRegistry::ActiveMovePrimariesRegistry() = default;
+
+ActiveMovePrimariesRegistry::~ActiveMovePrimariesRegistry() {
+ invariant(!_activeMovePrimaryState);
+}
+
+StatusWith<ScopedMovePrimary> ActiveMovePrimariesRegistry::registerMovePrimary(
+ const ShardMovePrimary& requestArgs) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_activeMovePrimaryState) {
+ if (_activeMovePrimaryState->requestArgs == requestArgs) {
+ return {ScopedMovePrimary(nullptr, false, _activeMovePrimaryState->notification)};
+ }
+
+ return _activeMovePrimaryState->constructErrorStatus();
+ }
+
+ _activeMovePrimaryState.emplace(requestArgs);
+
+ return {ScopedMovePrimary(this, true, _activeMovePrimaryState->notification)};
+}
+
+boost::optional<NamespaceString> ActiveMovePrimariesRegistry::getActiveMovePrimaryNss() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_activeMovePrimaryState) {
+ return _activeMovePrimaryState->requestArgs.get_movePrimary();
+ }
+
+ return boost::none;
+}
+
+void ActiveMovePrimariesRegistry::_clearMovePrimary() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(_activeMovePrimaryState);
+ _activeMovePrimaryState.reset();
+}
+
+Status ActiveMovePrimariesRegistry::ActiveMovePrimaryState::constructErrorStatus() const {
+ return {ErrorCodes::ConflictingOperationInProgress,
+ str::stream()
+ << "Unable to start new movePrimary operation because this shard is currently "
+ "moving its primary for namespace "
+ << requestArgs.get_movePrimary().ns()
+ << " to "
+ << requestArgs.getTo()};
+}
+
+ScopedMovePrimary::ScopedMovePrimary(ActiveMovePrimariesRegistry* registry,
+ bool shouldExecute,
+ std::shared_ptr<Notification<Status>> completionNotification)
+ : _registry(registry),
+ _shouldExecute(shouldExecute),
+ _completionNotification(std::move(completionNotification)) {}
+
+ScopedMovePrimary::~ScopedMovePrimary() {
+ if (_registry) {
+ // If this is a movePrimary that didn't join an existing movePrimary, the caller must
+ // signal on completion.
+ invariant(_shouldExecute);
+ invariant(*_completionNotification);
+ _registry->_clearMovePrimary();
+ }
+}
+
+ScopedMovePrimary::ScopedMovePrimary(ScopedMovePrimary&& other) {
+ *this = std::move(other);
+}
+
+ScopedMovePrimary& ScopedMovePrimary::operator=(ScopedMovePrimary&& other) {
+ if (&other != this) {
+ _registry = other._registry;
+ other._registry = nullptr;
+ _shouldExecute = other._shouldExecute;
+ _completionNotification = std::move(other._completionNotification);
+ }
+
+ return *this;
+}
+
+void ScopedMovePrimary::signalComplete(Status status) {
+ invariant(_shouldExecute);
+ _completionNotification->set(status);
+}
+
+Status ScopedMovePrimary::waitForCompletion(OperationContext* opCtx) {
+ invariant(!_shouldExecute);
+ return _completionNotification->get(opCtx);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/active_move_primaries_registry.h b/src/mongo/db/s/active_move_primaries_registry.h
new file mode 100644
index 00000000000..a2172a36499
--- /dev/null
+++ b/src/mongo/db/s/active_move_primaries_registry.h
@@ -0,0 +1,157 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/s/request_types/move_primary_gen.h"
+#include "mongo/util/concurrency/notification.h"
+
+namespace mongo {
+
+class ScopedMovePrimary;
+
+/**
+ * Thread-safe object that keeps track of active movePrimary commands running on a node and limits
+ * them to only one per shard. There is only one instance of this object per shard.
+ */
+
+class ActiveMovePrimariesRegistry {
+ MONGO_DISALLOW_COPYING(ActiveMovePrimariesRegistry);
+
+public:
+ ActiveMovePrimariesRegistry();
+ ~ActiveMovePrimariesRegistry();
+
+ /**
+ * If there are no movePrimary operations running on this shard, registers an active
+ * movePrimary operation with the specified arguments. Returns a ScopedMovePrimary, which must
+ * be signaled by the caller before it goes out of scope.
+ *
+ * If there is an active movePrimary operation already running on this shard and it has the
+ * exact same arguments, returns a ScopedMovePrimary, which can be used to join the already
+ * running movePrimary command.
+ *
+ * Otherwise returns a ConflictingOperationInProgress error.
+ */
+ StatusWith<ScopedMovePrimary> registerMovePrimary(const ShardMovePrimary& requestArgs);
+
+ /**
+ * If a movePrimary command has been previously registered through a call to
+ * registerMovePrimary, returns that namespace. Otherwise returns boost::none.
+ */
+ boost::optional<NamespaceString> getActiveMovePrimaryNss();
+
+private:
+ friend class ScopedMovePrimary;
+
+ // Describes the state of a current active movePrimary operation.
+ struct ActiveMovePrimaryState {
+ ActiveMovePrimaryState(ShardMovePrimary inRequestArgs)
+ : requestArgs(std::move(inRequestArgs)),
+ notification(std::make_shared<Notification<Status>>()) {}
+
+ /**
+ * Constructs an error status to return in the case of conflicting operations.
+ */
+ Status constructErrorStatus() const;
+
+ // Exact arguments of the currently active operation
+ ShardMovePrimary requestArgs;
+
+ // Notification event, which will be signaled when the currently active operation completes
+ std::shared_ptr<Notification<Status>> notification;
+ };
+
+ /**
+ * Unregisters a previously registered namespace with an ongoing movePrimary operation. Must
+ * only be called if a previous call to registerMovePrimary has succeeded.
+ */
+ void _clearMovePrimary();
+
+ // Protects the state below
+ stdx::mutex _mutex;
+
+ // If there is an active movePrimary operation going on, this field contains the request that
+ // initiated it.
+ boost::optional<ActiveMovePrimaryState> _activeMovePrimaryState;
+};
+
+/**
+ * An object of this class is returned from the registerMovePrimary call of the current active
+ * movePrimaries registry. It can exist in two modes - 'execute' and 'join.' See the comments for
+ * registerMovePrimary for more details.
+ */
+class ScopedMovePrimary {
+ MONGO_DISALLOW_COPYING(ScopedMovePrimary);
+
+public:
+ ScopedMovePrimary(ActiveMovePrimariesRegistry* registry,
+ bool shouldExecute,
+ std::shared_ptr<Notification<Status>> completionNotification);
+ ~ScopedMovePrimary();
+
+ ScopedMovePrimary(ScopedMovePrimary&&);
+ ScopedMovePrimary& operator=(ScopedMovePrimary&&);
+
+ /**
+ * Returns true if the registerMovePrimary object is in the 'execute' mode. This means that
+ * the registerMovePrimary object holder is next in line to execute the movePrimary command.
+ * The holder must execute the command and call signalComplete with a status.
+ */
+ bool mustExecute() const {
+ return _shouldExecute;
+ }
+
+ /**
+ * Must only be called if the object was in the 'execute' mode when the movePrimary command
+ * was invoked (the command immediately executed). Signals any callers that might be blocked in
+ * waitForCompletion.
+ */
+ void signalComplete(Status status);
+
+ /**
+ * Must only be called if the object is in the 'join' mode. Blocks until the main executor of
+ * the movePrimary command calls signalComplete.
+ */
+ Status waitForCompletion(OperationContext* opCtx);
+
+private:
+ // Registry from which to unregister the movePrimary. Not owned.
+ ActiveMovePrimariesRegistry* _registry;
+
+ /* Whether the holder is the first in line to call the movePrimary command (in which case the
+ * destructor must unregister) or the caller is joining on an already-running movePrimary
+ * operation (in which case the caller must block and wait for completion).
+ */
+ bool _shouldExecute;
+
+ // This is the future, which will be signaled at the end of a movePrimary command.
+ std::shared_ptr<Notification<Status>> _completionNotification;
+};
+}
diff --git a/src/mongo/db/s/active_move_primaries_registry_test.cpp b/src/mongo/db/s/active_move_primaries_registry_test.cpp
new file mode 100644
index 00000000000..b196fdc2621
--- /dev/null
+++ b/src/mongo/db/s/active_move_primaries_registry_test.cpp
@@ -0,0 +1,128 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/db/s/active_move_primaries_registry.h"
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/db/client.h"
+#include "mongo/db/service_context_noop.h"
+#include "mongo/s/request_types/move_primary_gen.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+using unittest::assertGet;
+
+class MovePrimaryRegistration : public unittest::Test {
+protected:
+ void setUp() override {
+ _client = _serviceContext.makeClient("MovePrimaryRegistrationTest");
+ _opCtx = _serviceContext.makeOperationContext(_client.get());
+ }
+
+ void tearDown() override {
+ _opCtx.reset();
+ _client.reset();
+ }
+
+ OperationContext* getTxn() const {
+ return _opCtx.get();
+ }
+
+ ServiceContextNoop _serviceContext;
+ ServiceContext::UniqueClient _client;
+ ServiceContext::UniqueOperationContext _opCtx;
+
+ ActiveMovePrimariesRegistry _registry;
+};
+
+ShardMovePrimary createMovePrimaryRequest(const NamespaceString& nss) {
+ ShardMovePrimary request;
+ request.set_movePrimary(std::move(nss));
+ request.setTo("shard0001");
+
+ return request;
+}
+
+TEST_F(MovePrimaryRegistration, ScopedPrimaryMoveConstructorAndAssignment) {
+ auto originalScopedMovePrimary = assertGet(
+ _registry.registerMovePrimary(createMovePrimaryRequest(NamespaceString("TestDB"))));
+ ASSERT(originalScopedMovePrimary.mustExecute());
+
+ ScopedMovePrimary movedScopedMovePrimary(std::move(originalScopedMovePrimary));
+ ASSERT(movedScopedMovePrimary.mustExecute());
+
+ originalScopedMovePrimary = std::move(movedScopedMovePrimary);
+ ASSERT(originalScopedMovePrimary.mustExecute());
+
+ // Need to signal the registered migration so the destructor doesn't invariant
+ originalScopedMovePrimary.signalComplete(Status::OK());
+}
+
+TEST_F(MovePrimaryRegistration, GetActiveMovePrimaryNamespace) {
+ ASSERT(!_registry.getActiveMovePrimaryNss());
+
+ const NamespaceString nss("TestDB");
+
+ auto originalScopedMovePrimary =
+ assertGet(_registry.registerMovePrimary(createMovePrimaryRequest(nss)));
+
+ ASSERT_EQ(nss.ns(), _registry.getActiveMovePrimaryNss()->ns());
+
+ // Need to signal the registered migration so the destructor doesn't invariant
+ originalScopedMovePrimary.signalComplete(Status::OK());
+}
+
+TEST_F(MovePrimaryRegistration, SecondMovePrimaryReturnsConflictingOperationInProgress) {
+ auto originalScopedMovePrimary = assertGet(
+ _registry.registerMovePrimary(createMovePrimaryRequest(NamespaceString("TestDB"))));
+
+ auto secondScopedMovePrimaryStatus =
+ _registry.registerMovePrimary(createMovePrimaryRequest(NamespaceString("TestDB2")));
+ ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress,
+ secondScopedMovePrimaryStatus.getStatus());
+
+ originalScopedMovePrimary.signalComplete(Status::OK());
+}
+
+TEST_F(MovePrimaryRegistration, SecondMovePrimaryWithSameArgumentsJoinsFirst) {
+ auto originalScopedMovePrimary = assertGet(
+ _registry.registerMovePrimary(createMovePrimaryRequest(NamespaceString("TestDB"))));
+ ASSERT(originalScopedMovePrimary.mustExecute());
+
+ auto secondScopedMovePrimary = assertGet(
+ _registry.registerMovePrimary(createMovePrimaryRequest(NamespaceString("TestDB"))));
+ ASSERT(!secondScopedMovePrimary.mustExecute());
+
+ originalScopedMovePrimary.signalComplete({ErrorCodes::InternalError, "Test error"});
+ ASSERT_EQ(Status(ErrorCodes::InternalError, "Test error"),
+ secondScopedMovePrimary.waitForCompletion(getTxn()));
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index ac2746f180a..c4a8d99f24e 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -314,7 +314,7 @@ BSONObj MigrationDestinationManager::getMigrationStatusReport() {
}
Status MigrationDestinationManager::start(const NamespaceString& nss,
- ScopedRegisterReceiveChunk scopedRegisterReceiveChunk,
+ ScopedReceiveChunk scopedReceiveChunk,
const MigrationSessionId& sessionId,
const ConnectionString& fromShardConnString,
const ShardId& fromShard,
@@ -326,7 +326,7 @@ Status MigrationDestinationManager::start(const NamespaceString& nss,
const WriteConcernOptions& writeConcern) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(!_sessionId);
- invariant(!_scopedRegisterReceiveChunk);
+ invariant(!_scopedReceiveChunk);
_state = READY;
_stateChangedCV.notify_all();
@@ -348,7 +348,7 @@ Status MigrationDestinationManager::start(const NamespaceString& nss,
_numSteady = 0;
_sessionId = sessionId;
- _scopedRegisterReceiveChunk = std::move(scopedRegisterReceiveChunk);
+ _scopedReceiveChunk = std::move(scopedReceiveChunk);
// TODO: If we are here, the migrate thread must have completed, otherwise _active above
// would be false, so this would never block. There is no better place with the current
@@ -474,7 +474,7 @@ void MigrationDestinationManager::_migrateThread(BSONObj min,
stdx::lock_guard<stdx::mutex> lk(_mutex);
_sessionId.reset();
- _scopedRegisterReceiveChunk.reset();
+ _scopedReceiveChunk.reset();
_isActiveCV.notify_all();
}
@@ -487,7 +487,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
const WriteConcernOptions& writeConcern) {
invariant(isActive());
invariant(_sessionId);
- invariant(_scopedRegisterReceiveChunk);
+ invariant(_scopedReceiveChunk);
invariant(!min.isEmpty());
invariant(!max.isEmpty());
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index 1a6304f8010..290c7f2218c 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -106,7 +106,7 @@ public:
* Returns OK if migration started successfully.
*/
Status start(const NamespaceString& nss,
- ScopedRegisterReceiveChunk scopedRegisterReceiveChunk,
+ ScopedReceiveChunk scopedReceiveChunk,
const MigrationSessionId& sessionId,
const ConnectionString& fromShardConnString,
const ShardId& fromShard,
@@ -194,7 +194,7 @@ private:
// Migration session ID uniquely identifies the migration and indicates whether the prepare
// method has been called.
boost::optional<MigrationSessionId> _sessionId;
- boost::optional<ScopedRegisterReceiveChunk> _scopedRegisterReceiveChunk;
+ boost::optional<ScopedReceiveChunk> _scopedReceiveChunk;
// A condition variable on which to wait for the prepare method to be called.
stdx::condition_variable _isActiveCV;
diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
index 72d51ab2037..6600c3c1b30 100644
--- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
+++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
@@ -117,12 +117,12 @@ public:
uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
// Ensure this shard is not currently receiving or donating any chunks.
- auto scopedRegisterReceiveChunk(
+ auto scopedReceiveChunk(
uassertStatusOK(shardingState->registerReceiveChunk(nss, chunkRange, fromShard)));
uassertStatusOK(MigrationDestinationManager::get(opCtx)->start(
nss,
- std::move(scopedRegisterReceiveChunk),
+ std::move(scopedReceiveChunk),
migrationSessionId,
statusWithFromShardConnectionString.getValue(),
fromShard,
diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp
index 37fdf0065f3..f818da93283 100644
--- a/src/mongo/db/s/move_chunk_command.cpp
+++ b/src/mongo/db/s/move_chunk_command.cpp
@@ -120,29 +120,29 @@ public:
// where we might have changed a shard's host by removing/adding a shard with the same name.
Grid::get(opCtx)->shardRegistry()->reload(opCtx);
- auto scopedRegisterMigration =
+ auto scopedMigration =
uassertStatusOK(shardingState->registerDonateChunk(moveChunkRequest));
Status status = {ErrorCodes::InternalError, "Uninitialized value"};
// Check if there is an existing migration running and if so, join it
- if (scopedRegisterMigration.mustExecute()) {
+ if (scopedMigration.mustExecute()) {
try {
_runImpl(opCtx, moveChunkRequest);
status = Status::OK();
} catch (const DBException& e) {
status = e.toStatus();
} catch (const std::exception& e) {
- scopedRegisterMigration.complete(
+ scopedMigration.signalComplete(
{ErrorCodes::InternalError,
str::stream() << "Severe error occurred while running moveChunk command: "
<< e.what()});
throw;
}
- scopedRegisterMigration.complete(status);
+ scopedMigration.signalComplete(status);
} else {
- status = scopedRegisterMigration.waitForCompletion(opCtx);
+ status = scopedMigration.waitForCompletion(opCtx);
}
if (status == ErrorCodes::ChunkTooBig) {
diff --git a/src/mongo/db/s/move_primary_command.cpp b/src/mongo/db/s/move_primary_command.cpp
index 6375c3e9bf4..807dc0316cb 100644
--- a/src/mongo/db/s/move_primary_command.cpp
+++ b/src/mongo/db/s/move_primary_command.cpp
@@ -32,6 +32,8 @@
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/commands.h"
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/s/grid.h"
#include "mongo/s/request_types/move_primary_gen.h"
#include "mongo/util/log.h"
@@ -87,7 +89,11 @@ public:
"_movePrimary can only be run on shard servers"));
}
- auto movePrimaryRequest = MovePrimary::parse(IDLParserErrorContext("_movePrimary"), cmdObj);
+ auto shardingState = ShardingState::get(opCtx);
+ uassertStatusOK(shardingState->canAcceptShardedCommands());
+
+ const auto movePrimaryRequest =
+ ShardMovePrimary::parse(IDLParserErrorContext("_movePrimary"), cmdObj);
const auto dbname = parseNs("", cmdObj);
uassert(
@@ -117,6 +123,25 @@ public:
str::stream() << "you have to specify where you want to move it"});
}
+ // Make sure we're as up-to-date as possible with shard information. This catches the case
+ // where we might have changed a shard's host by removing/adding a shard with the same name.
+ Grid::get(opCtx)->shardRegistry()->reload(opCtx);
+
+ auto scopedMovePrimary =
+ uassertStatusOK(shardingState->registerMovePrimary(movePrimaryRequest));
+
+ Status status = {ErrorCodes::InternalError, "Uninitialized value"};
+
+ // Check if there is an existing movePrimary running and if so, join it
+ if (scopedMovePrimary.mustExecute()) {
+ status = Status::OK();
+ scopedMovePrimary.signalComplete(status);
+ } else {
+ status = scopedMovePrimary.waitForCompletion(opCtx);
+ }
+
+ uassertStatusOK(status);
+
return true;
}
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index d8ad3de98e7..467db2051a0 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -390,13 +390,13 @@ StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationCon
}
}
-StatusWith<ScopedRegisterDonateChunk> ShardingState::registerDonateChunk(
- const MoveChunkRequest& args) {
+StatusWith<ScopedDonateChunk> ShardingState::registerDonateChunk(const MoveChunkRequest& args) {
return _activeMigrationsRegistry.registerDonateChunk(args);
}
-StatusWith<ScopedRegisterReceiveChunk> ShardingState::registerReceiveChunk(
- const NamespaceString& nss, const ChunkRange& chunkRange, const ShardId& fromShardId) {
+StatusWith<ScopedReceiveChunk> ShardingState::registerReceiveChunk(const NamespaceString& nss,
+ const ChunkRange& chunkRange,
+ const ShardId& fromShardId) {
return _activeMigrationsRegistry.registerReceiveChunk(nss, chunkRange, fromShardId);
}
@@ -408,6 +408,15 @@ BSONObj ShardingState::getActiveMigrationStatusReport(OperationContext* opCtx) {
return _activeMigrationsRegistry.getActiveMigrationStatusReport(opCtx);
}
+StatusWith<ScopedMovePrimary> ShardingState::registerMovePrimary(
+ const ShardMovePrimary& requestArgs) {
+ return _activeMovePrimariesRegistry.registerMovePrimary(requestArgs);
+}
+
+boost::optional<NamespaceString> ShardingState::getActiveMovePrimaryNss() {
+ return _activeMovePrimariesRegistry.getActiveMovePrimaryNss();
+}
+
void ShardingState::appendInfo(OperationContext* opCtx, BSONObjBuilder& builder) {
const bool isEnabled = enabled();
builder.appendBool("enabled", isEnabled);
diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h
index 5d1910de055..cf5cc2945f9 100644
--- a/src/mongo/db/s/sharding_state.h
+++ b/src/mongo/db/s/sharding_state.h
@@ -35,6 +35,7 @@
#include "mongo/bson/oid.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/s/active_migrations_registry.h"
+#include "mongo/db/s/active_move_primaries_registry.h"
#include "mongo/db/s/chunk_splitter.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/stdx/functional.h"
@@ -156,26 +157,26 @@ public:
/**
* If there are no migrations running on this shard, registers an active migration with the
- * specified arguments and returns a ScopedRegisterDonateChunk, which must be signaled by the
+ * specified arguments and returns a ScopedDonateChunk, which must be signaled by the
* caller before it goes out of scope.
*
* If there is an active migration already running on this shard and it has the exact same
- * arguments, returns a ScopedRegisterDonateChunk, which can be used to join the existing one.
+ * arguments, returns a ScopedDonateChunk, which can be used to join the existing one.
*
- * Othwerwise returns a ConflictingOperationInProgress error.
+ * Otherwise returns a ConflictingOperationInProgress error.
*/
- StatusWith<ScopedRegisterDonateChunk> registerDonateChunk(const MoveChunkRequest& args);
+ StatusWith<ScopedDonateChunk> registerDonateChunk(const MoveChunkRequest& args);
/**
* If there are no migrations running on this shard, registers an active receive operation with
- * the specified session id and returns a ScopedRegisterReceiveChunk, which will unregister it
+ * the specified session id and returns a ScopedReceiveChunk, which will unregister it
* when it goes out of scope.
*
* Otherwise returns a ConflictingOperationInProgress error.
*/
- StatusWith<ScopedRegisterReceiveChunk> registerReceiveChunk(const NamespaceString& nss,
- const ChunkRange& chunkRange,
- const ShardId& fromShardId);
+ StatusWith<ScopedReceiveChunk> registerReceiveChunk(const NamespaceString& nss,
+ const ChunkRange& chunkRange,
+ const ShardId& fromShardId);
/**
* If a migration has been previously registered through a call to registerDonateChunk returns
@@ -195,6 +196,26 @@ public:
BSONObj getActiveMigrationStatusReport(OperationContext* opCtx);
/**
+ * If there are no movePrimary operations running on this shard, registers an active
+ * movePrimary operation with the specified arguments. Returns a ScopedMovePrimary, which must
+ * be signaled by the caller before it goes out of scope.
+ *
+ * If there is an active movePrimary operation already running on this shard and it has the
+ * exact same arguments, returns a ScopedMovePrimary, which can be used to join the already
+ * running movePrimary command.
+ *
+ * Otherwise returns a ConflictingOperationInProgress error.
+ */
+ StatusWith<ScopedMovePrimary> registerMovePrimary(const ShardMovePrimary& requestArgs);
+
+ /**
+ * If a movePrimary command has been previously registered through a call to
+ * registerMovePrimary,
+ * returns that namespace. Otherwise returns boost::none.
+ */
+ boost::optional<NamespaceString> getActiveMovePrimaryNss();
+
+ /**
* For testing only. Mock the initialization method used by initializeFromConfigConnString and
* initializeFromShardIdentity after all checks are performed.
*/
@@ -245,6 +266,9 @@ private:
// Tracks the active move chunk operations running on this shard
ActiveMigrationsRegistry _activeMigrationsRegistry;
+ // Tracks the active move primary operations running on this shard
+ ActiveMovePrimariesRegistry _activeMovePrimariesRegistry;
+
// Handles asynchronous auto-splitting of chunks
std::unique_ptr<ChunkSplitter> _chunkSplitter;
diff --git a/src/mongo/s/request_types/move_primary.idl b/src/mongo/s/request_types/move_primary.idl
index 2e944cbb8ec..c55b17ce526 100644
--- a/src/mongo/s/request_types/move_primary.idl
+++ b/src/mongo/s/request_types/move_primary.idl
@@ -70,6 +70,7 @@ structs:
ShardMovePrimary:
description: "The internal movePrimary command on a primary shard"
+ generate_comparison_operators: true
strict: false
fields:
_movePrimary: