summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2016-09-28 14:05:36 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2016-09-29 14:31:59 -0400
commitba3dc420fe41f9c6ba718da5eb270161374839f3 (patch)
treed8eb3e68dd62bbc2bb0b982945a62e7d892e37b4
parentfd94476eb6ca0207dd69662d36f93eeaaa227073 (diff)
downloadmongo-ba3dc420fe41f9c6ba718da5eb270161374839f3.tar.gz
SERVER-26370 Register incoming migrations on the ActiveMigrationsRegistry
In order to avoid race conditions where a shard could serve both as a donor and recipient.
-rw-r--r--jstests/sharding/migration_ignore_interrupts_1.js5
-rw-r--r--src/mongo/db/s/active_migrations_registry.cpp59
-rw-r--r--src/mongo/db/s/active_migrations_registry.h49
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp22
-rw-r--r--src/mongo/db/s/migration_destination_manager.h3
-rw-r--r--src/mongo/db/s/migration_destination_manager_legacy_commands.cpp3
-rw-r--r--src/mongo/db/s/sharding_state.cpp11
-rw-r--r--src/mongo/db/s/sharding_state.h9
8 files changed, 141 insertions, 20 deletions
diff --git a/jstests/sharding/migration_ignore_interrupts_1.js b/jstests/sharding/migration_ignore_interrupts_1.js
index d2bcb6e3115..f02e560676f 100644
--- a/jstests/sharding/migration_ignore_interrupts_1.js
+++ b/jstests/sharding/migration_ignore_interrupts_1.js
@@ -58,6 +58,11 @@ load('./jstests/libs/chunk_manipulation_util.js');
ErrorCodes.ConflictingOperationInProgress,
"(2) A shard should not be able to be the recipient of two ongoing migrations.");
+ assert.commandFailed(
+ admin.runCommand({moveChunk: ns1, find: {a: 10}, to: st.shard0.shardName}),
+ ErrorCodes.ConflictingOperationInProgress,
+ "(3) A shard should not be able to be both a donor and recipient of migrations.");
+
// Finish migration
unpauseMigrateAtStep(shard1, migrateStepNames.deletedPriorDataInRange);
assert.doesNotThrow(function() {
diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp
index 35a765a89c2..d455da49096 100644
--- a/src/mongo/db/s/active_migrations_registry.cpp
+++ b/src/mongo/db/s/active_migrations_registry.cpp
@@ -33,6 +33,7 @@
#include "mongo/base/status_with.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/s/migration_session_id.h"
#include "mongo/db/s/migration_source_manager.h"
#include "mongo/util/assert_util.h"
@@ -47,6 +48,13 @@ ActiveMigrationsRegistry::~ActiveMigrationsRegistry() {
StatusWith<ScopedRegisterDonateChunk> ActiveMigrationsRegistry::registerDonateChunk(
const MoveChunkRequest& args) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_activeReceiveChunkState) {
+ return {ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Unable to start new migration because this shard is currently "
+ "donating chunk for "
+ << _activeReceiveChunkState->nss.ns()};
+ }
+
if (!_activeMoveChunkState) {
_activeMoveChunkState.emplace(args);
return {ScopedRegisterDonateChunk(this, true, _activeMoveChunkState->notification)};
@@ -63,6 +71,28 @@ StatusWith<ScopedRegisterDonateChunk> ActiveMigrationsRegistry::registerDonateCh
<< _activeMoveChunkState->args.getNss().ns()};
}
+StatusWith<ScopedRegisterReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk(
+ const NamespaceString& nss) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_activeMoveChunkState) {
+ return {ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Unable to start new migration because this shard is currently "
+ "donating chunk for "
+ << _activeMoveChunkState->args.getNss().ns()};
+ }
+
+ if (_activeReceiveChunkState) {
+ return {ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Unable to start new migration because this shard is currently "
+ "donating chunk for "
+ << _activeReceiveChunkState->nss.ns()};
+ }
+
+ _activeReceiveChunkState.emplace(nss);
+
+ return {ScopedRegisterReceiveChunk(this)};
+}
+
boost::optional<NamespaceString> ActiveMigrationsRegistry::getActiveDonateChunkNss() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_activeMoveChunkState) {
@@ -105,6 +135,12 @@ void ActiveMigrationsRegistry::_clearDonateChunk() {
_activeMoveChunkState.reset();
}
+void ActiveMigrationsRegistry::_clearReceiveChunk() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(_activeReceiveChunkState);
+ _activeReceiveChunkState.reset();
+}
+
ScopedRegisterDonateChunk::ScopedRegisterDonateChunk(
ActiveMigrationsRegistry* registry,
bool forUnregister,
@@ -146,4 +182,27 @@ Status ScopedRegisterDonateChunk::waitForCompletion(OperationContext* txn) {
return _completionNotification->get(txn);
}
+ScopedRegisterReceiveChunk::ScopedRegisterReceiveChunk(ActiveMigrationsRegistry* registry)
+ : _registry(registry) {}
+
+ScopedRegisterReceiveChunk::~ScopedRegisterReceiveChunk() {
+ if (_registry) {
+ _registry->_clearReceiveChunk();
+ }
+}
+
+ScopedRegisterReceiveChunk::ScopedRegisterReceiveChunk(ScopedRegisterReceiveChunk&& other) {
+ *this = std::move(other);
+}
+
+ScopedRegisterReceiveChunk& ScopedRegisterReceiveChunk::operator=(
+ ScopedRegisterReceiveChunk&& other) {
+ if (&other != this) {
+ _registry = other._registry;
+ other._registry = nullptr;
+ }
+
+ return *this;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/active_migrations_registry.h b/src/mongo/db/s/active_migrations_registry.h
index 06ffeaece07..e2f7ad06b8b 100644
--- a/src/mongo/db/s/active_migrations_registry.h
+++ b/src/mongo/db/s/active_migrations_registry.h
@@ -31,6 +31,7 @@
#include <boost/optional.hpp>
#include "mongo/base/disallow_copying.h"
+#include "mongo/db/s/migration_session_id.h"
#include "mongo/s/move_chunk_request.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/mutex.h"
@@ -40,6 +41,7 @@ namespace mongo {
class OperationContext;
class ScopedRegisterDonateChunk;
+class ScopedRegisterReceiveChunk;
template <typename T>
class StatusWith;
@@ -68,6 +70,15 @@ public:
StatusWith<ScopedRegisterDonateChunk> registerDonateChunk(const MoveChunkRequest& args);
/**
+ * If there are no migrations running on this shard, registers an active receive operation with
+ * the specified session id and returns a ScopedRegisterReceiveChunk, which will unregister it
+ * when it goes out of scope.
+ *
+ * Otherwise returns a ConflictingOperationInProgress error.
+ */
+ StatusWith<ScopedRegisterReceiveChunk> registerReceiveChunk(const NamespaceString& nss);
+
+ /**
* If a migration has been previously registered through a call to registerDonateChunk returns
* that namespace. Otherwise returns boost::none.
*/
@@ -83,6 +94,7 @@ public:
private:
friend class ScopedRegisterDonateChunk;
+ friend class ScopedRegisterReceiveChunk;
// Describes the state of a currently active moveChunk operation
struct ActiveMoveChunkState {
@@ -96,18 +108,36 @@ private:
std::shared_ptr<Notification<Status>> notification;
};
+ // Describes the state of a currently active receive chunk operation
+ struct ActiveReceiveChunkState {
+ ActiveReceiveChunkState(NamespaceString inNss) : nss(std::move(inNss)) {}
+
+ // Namesspace for which a chunk is being received
+ NamespaceString nss;
+ };
+
/**
* Unregisters a previously registered namespace with ongoing migration. Must only be called if
* a previous call to registerDonateChunk has succeeded.
*/
void _clearDonateChunk();
+ /**
+ * Unregisters a previously registered incoming migration. Must only be called if a previous
+ * call to registerReceiveChunk has succeeded.
+ */
+ void _clearReceiveChunk();
+
// Protects the state below
stdx::mutex _mutex;
// If there is an active moveChunk operation going on, this field contains the request, which
// initiated it
boost::optional<ActiveMoveChunkState> _activeMoveChunkState;
+
+ // If there is an active receive of a chunk going on, this field contains the session id, which
+ // initiated it
+ boost::optional<ActiveReceiveChunkState> _activeReceiveChunkState;
};
/**
@@ -159,4 +189,23 @@ private:
std::shared_ptr<Notification<Status>> _completionNotification;
};
+/**
+ * Object of this class is returned from the registerReceiveChunk call of the active migrations
+ * registry.
+ */
+class ScopedRegisterReceiveChunk {
+ MONGO_DISALLOW_COPYING(ScopedRegisterReceiveChunk);
+
+public:
+ ScopedRegisterReceiveChunk(ActiveMigrationsRegistry* registry);
+ ~ScopedRegisterReceiveChunk();
+
+ ScopedRegisterReceiveChunk(ScopedRegisterReceiveChunk&&);
+ ScopedRegisterReceiveChunk& operator=(ScopedRegisterReceiveChunk&&);
+
+private:
+ // Registry from which to unregister the migration. Not owned.
+ ActiveMigrationsRegistry* _registry;
+};
+
} // namespace mongo
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 8110f684766..83f0e74c289 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -263,6 +263,7 @@ BSONObj MigrationDestinationManager::getMigrationStatusReport() {
}
Status MigrationDestinationManager::start(const NamespaceString& nss,
+ ScopedRegisterReceiveChunk scopedRegisterReceiveChunk,
const MigrationSessionId& sessionId,
const ConnectionString& fromShardConnString,
const ShardId& fromShard,
@@ -273,19 +274,8 @@ Status MigrationDestinationManager::start(const NamespaceString& nss,
const OID& epoch,
const WriteConcernOptions& writeConcern) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- if (_sessionId) {
- return Status(ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "Active migration already in progress "
- << "ns: "
- << _nss.ns()
- << ", from: "
- << _fromShardConnString.toString()
- << ", min: "
- << _min
- << ", max: "
- << _max);
- }
+ invariant(!_sessionId);
+ invariant(!_scopedRegisterReceiveChunk);
_state = READY;
_errmsg = "";
@@ -306,6 +296,8 @@ Status MigrationDestinationManager::start(const NamespaceString& nss,
_numSteady = 0;
_sessionId = sessionId;
+ _scopedRegisterReceiveChunk = std::move(scopedRegisterReceiveChunk);
+
// TODO: If we are here, the migrate thread must have completed, otherwise _active above would
// be false, so this would never block. There is no better place with the current implementation
@@ -432,7 +424,8 @@ void MigrationDestinationManager::_migrateThread(BSONObj min,
}
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _sessionId = boost::none;
+ _sessionId.reset();
+ _scopedRegisterReceiveChunk.reset();
_isActiveCV.notify_all();
}
@@ -445,6 +438,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,
const WriteConcernOptions& writeConcern) {
invariant(isActive());
invariant(_sessionId);
+ invariant(_scopedRegisterReceiveChunk);
invariant(!min.isEmpty());
invariant(!max.isEmpty());
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index 804ec2bf35a..0b16202d55e 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -36,6 +36,7 @@
#include "mongo/bson/oid.h"
#include "mongo/client/connection_string.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/s/active_migrations_registry.h"
#include "mongo/db/s/migration_session_id.h"
#include "mongo/s/shard_id.h"
#include "mongo/stdx/condition_variable.h"
@@ -88,6 +89,7 @@ public:
* Returns OK if migration started successfully.
*/
Status start(const NamespaceString& nss,
+ ScopedRegisterReceiveChunk scopedRegisterReceiveChunk,
const MigrationSessionId& sessionId,
const ConnectionString& fromShardConnString,
const ShardId& fromShard,
@@ -193,6 +195,7 @@ private:
// Migration session ID uniquely identifies the migration and indicates whether the prepare
// method has been called.
boost::optional<MigrationSessionId> _sessionId;
+ boost::optional<ScopedRegisterReceiveChunk> _scopedRegisterReceiveChunk;
// A condition variable on which to wait for the prepare method to be called.
stdx::condition_variable _isActiveCV;
diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
index 4216b30ef5e..7ea7ff56be1 100644
--- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
+++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
@@ -166,8 +166,11 @@ public:
const MigrationSessionId migrationSessionId(
uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
+ auto scopedRegisterReceiveChunk(uassertStatusOK(shardingState->registerReceiveChunk(nss)));
+
uassertStatusOK(shardingState->migrationDestinationManager()->start(
nss,
+ std::move(scopedRegisterReceiveChunk),
migrationSessionId,
statusWithFromShardConnectionString.getValue(),
fromShard,
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index 8250ed4021f..a789b4ca3cf 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -733,15 +733,14 @@ StatusWith<ChunkVersion> ShardingState::_refreshMetadata(
StatusWith<ScopedRegisterDonateChunk> ShardingState::registerDonateChunk(
const MoveChunkRequest& args) {
- if (_migrationDestManager.isActive()) {
- return {ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "Unable to start new migration because this shard is currently "
- "receiving a chunk"};
- }
-
return _activeMigrationsRegistry.registerDonateChunk(args);
}
+StatusWith<ScopedRegisterReceiveChunk> ShardingState::registerReceiveChunk(
+ const NamespaceString& nss) {
+ return _activeMigrationsRegistry.registerReceiveChunk(nss);
+}
+
boost::optional<NamespaceString> ShardingState::getActiveDonateChunkNss() {
return _activeMigrationsRegistry.getActiveDonateChunkNss();
}
diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h
index 8e8f0c20074..9cf94cd4526 100644
--- a/src/mongo/db/s/sharding_state.h
+++ b/src/mongo/db/s/sharding_state.h
@@ -214,6 +214,15 @@ public:
StatusWith<ScopedRegisterDonateChunk> registerDonateChunk(const MoveChunkRequest& args);
/**
+ * If there are no migrations running on this shard, registers an active receive operation with
+ * the specified session id and returns a ScopedRegisterReceiveChunk, which will unregister it
+ * when it goes out of scope.
+ *
+ * Otherwise returns a ConflictingOperationInProgress error.
+ */
+ StatusWith<ScopedRegisterReceiveChunk> registerReceiveChunk(const NamespaceString& nss);
+
+ /**
* If a migration has been previously registered through a call to registerDonateChunk returns
* that namespace. Otherwise returns boost::none.
*