summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDianna Hohensee <dianna.hohensee@10gen.com>2016-08-17 19:08:15 -0400
committerDianna Hohensee <dianna.hohensee@10gen.com>2016-09-14 09:43:08 -0400
commit3596a4fb09b31c955d95fbe17db0492589c96f03 (patch)
treeb95df2efd363ba54f2fb9771a1f0b72a9fc5caab
parent6471618952c8727bc5b06039ed2cf861e1a36436 (diff)
downloadmongo-3596a4fb09b31c955d95fbe17db0492589c96f03.tar.gz
SERVER-24866 Balancer recovery
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/client/remote_command_retry_scheduler.cpp3
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp4
-rw-r--r--src/mongo/s/balancer/balancer.cpp54
-rw-r--r--src/mongo/s/balancer/balancer_policy.h7
-rw-r--r--src/mongo/s/balancer/migration_manager.cpp321
-rw-r--r--src/mongo/s/balancer/migration_manager.h72
-rw-r--r--src/mongo/s/balancer/migration_manager_test.cpp226
-rw-r--r--src/mongo/s/balancer/scoped_migration_request.cpp39
-rw-r--r--src/mongo/s/balancer/scoped_migration_request.h5
-rw-r--r--src/mongo/s/balancer/scoped_migration_request_test.cpp53
-rw-r--r--src/mongo/s/balancer/type_migration.cpp34
-rw-r--r--src/mongo/s/balancer/type_migration.h11
-rw-r--r--src/mongo/s/balancer/type_migration_test.cpp66
-rw-r--r--src/mongo/s/move_chunk_request.cpp8
15 files changed, 666 insertions, 238 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index 8f442c018a4..30be05d7444 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -192,6 +192,7 @@ error_code("MasterSlaveConnectionFailure", 190)
error_code("BalancerLostDistributedLock", 191)
error_code("FailPointEnabled", 192)
error_code("NoShardingEnabled", 193)
+error_code("BalancerInterrupted", 194)
# Non-sequential error codes (for compatibility only)
error_code("SocketException", 9001)
diff --git a/src/mongo/client/remote_command_retry_scheduler.cpp b/src/mongo/client/remote_command_retry_scheduler.cpp
index f72874a33a4..5bbd533ead3 100644
--- a/src/mongo/client/remote_command_retry_scheduler.cpp
+++ b/src/mongo/client/remote_command_retry_scheduler.cpp
@@ -96,7 +96,8 @@ const std::initializer_list<ErrorCodes::Error> RemoteCommandRetryScheduler::kAll
ErrorCodes::HostNotFound,
ErrorCodes::NetworkTimeout,
ErrorCodes::PrimarySteppedDown,
- ErrorCodes::InterruptedDueToReplStateChange};
+ ErrorCodes::InterruptedDueToReplStateChange,
+ ErrorCodes::BalancerInterrupted};
std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy>
RemoteCommandRetryScheduler::makeNoRetryPolicy() {
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index dbf9c305f0a..15484b6729f 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -694,10 +694,6 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
<< causedBy(shardAwareInitializationStatus);
}
- // Free any leftover locks from previous instantiations
- auto distLockManager = Grid::get(txn)->catalogClient(txn)->getDistLockManager();
- distLockManager->unlockAll(txn, distLockManager->getProcessID());
-
// If this is a config server node becoming a primary, start the balancer
Balancer::get(txn)->startThread(txn);
} else if (ShardingState::get(txn)->enabled()) {
diff --git a/src/mongo/s/balancer/balancer.cpp b/src/mongo/s/balancer/balancer.cpp
index 3c04c3145ca..56d376e35b2 100644
--- a/src/mongo/s/balancer/balancer.cpp
+++ b/src/mongo/s/balancer/balancer.cpp
@@ -48,6 +48,7 @@
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/cluster_identity_loader.h"
#include "mongo/s/grid.h"
#include "mongo/s/shard_util.h"
#include "mongo/s/sharding_raii.h"
@@ -181,9 +182,7 @@ Status Balancer::startThread(OperationContext* txn) {
invariant(!_thread.joinable());
_state = kRunning;
- // Allow new migrations to be scheduled
- _migrationManager.enableMigrations();
-
+ _migrationManager.startRecovery();
_thread = stdx::thread([this] { _mainThread(); });
// Intentional fall through
case kRunning:
@@ -196,9 +195,6 @@ Status Balancer::startThread(OperationContext* txn) {
void Balancer::stopThread() {
stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
if (_state == kRunning) {
- // Stop any active migrations and prevent any new migrations from getting scheduled
- _migrationManager.interruptAndDisableMigrations();
-
// Request the balancer thread to stop
_state = kStopping;
_condVar.notify_all();
@@ -218,9 +214,6 @@ void Balancer::joinThread() {
if (_thread.joinable()) {
_thread.join();
- // Wait for any scheduled migrations to finish draining
- _migrationManager.drainActiveMigrations();
-
stdx::lock_guard<stdx::mutex> scopedLock(_mutex);
_state = kStopped;
_thread = {};
@@ -291,36 +284,47 @@ void Balancer::report(OperationContext* txn, BSONObjBuilder* builder) {
void Balancer::_mainThread() {
Client::initThread("Balancer");
+ auto txn = cc().makeOperationContext();
+ auto shardingContext = Grid::get(txn.get());
log() << "CSRS balancer is starting";
const Seconds kInitBackoffInterval(10);
+ OID clusterIdentity = ClusterIdentityLoader::get(txn.get())->getClusterId();
+
// Take the balancer distributed lock and hold it permanently
while (!_stopRequested()) {
- auto txn = cc().makeOperationContext();
- auto shardingContext = Grid::get(txn.get());
-
auto distLockHandleStatus =
shardingContext->catalogClient(txn.get())->getDistLockManager()->lockWithSessionID(
- txn.get(), "balancer", "CSRS Balancer", OID::gen());
- if (distLockHandleStatus.isOK()) {
- break;
- }
+ txn.get(), "balancer", "CSRS Balancer", clusterIdentity);
+ if (!distLockHandleStatus.isOK()) {
+ warning() << "Balancer distributed lock could not be acquired and will be retried in "
+ << durationCount<Seconds>(kInitBackoffInterval) << " seconds"
+ << causedBy(distLockHandleStatus.getStatus());
- warning() << "Balancer distributed lock could not be acquired and will be retried in "
- << durationCount<Seconds>(kInitBackoffInterval) << " seconds"
- << causedBy(distLockHandleStatus.getStatus());
+ _sleepFor(txn.get(), kInitBackoffInterval);
+ continue;
+ }
- _sleepFor(txn.get(), kInitBackoffInterval);
+ break;
}
- log() << "CSRS balancer thread is now running";
+ if (!_stopRequested()) {
+ log() << "CSRS balancer thread for cluster " << clusterIdentity << " is recovering";
+
+ auto balancerConfig = Grid::get(txn.get())->getBalancerConfiguration();
+ _migrationManager.finishRecovery(txn.get(),
+ clusterIdentity,
+ balancerConfig->getMaxChunkSizeBytes(),
+ balancerConfig->getSecondaryThrottle(),
+ balancerConfig->waitForDelete());
+
+ log() << "CSRS balancer thread for cluster " << clusterIdentity << " is recovered";
+ }
// Main balancer loop
while (!_stopRequested()) {
- auto txn = cc().makeOperationContext();
- auto shardingContext = Grid::get(txn.get());
auto balancerConfig = shardingContext->getBalancerConfiguration();
BalanceRoundDetails roundDetails;
@@ -400,6 +404,10 @@ void Balancer::_mainThread() {
}
}
+ // Stop any active migrations and prevent any new migrations from getting scheduled
+ _migrationManager.interruptAndDisableMigrations();
+ _migrationManager.drainActiveMigrations();
+
log() << "CSRS balancer is now stopped";
}
diff --git a/src/mongo/s/balancer/balancer_policy.h b/src/mongo/s/balancer/balancer_policy.h
index 965e6519be5..97a1f2e91b1 100644
--- a/src/mongo/s/balancer/balancer_policy.h
+++ b/src/mongo/s/balancer/balancer_policy.h
@@ -61,6 +61,13 @@ struct MigrateInfo {
minKey(a_chunk.getMin()),
maxKey(a_chunk.getMax()) {}
+ MigrateInfo(const std::string& a_ns,
+ const ShardId& a_to,
+ const ShardId& a_from,
+ const BSONObj& a_minKey,
+ const BSONObj& a_maxKey)
+ : ns(a_ns), to(a_to), from(a_from), minKey(a_minKey), maxKey(a_maxKey) {}
+
std::string getName() const;
std::string toString() const;
diff --git a/src/mongo/s/balancer/migration_manager.cpp b/src/mongo/s/balancer/migration_manager.cpp
index ba4a488ffcb..aa5beb38729 100644
--- a/src/mongo/s/balancer/migration_manager.cpp
+++ b/src/mongo/s/balancer/migration_manager.cpp
@@ -40,12 +40,15 @@
#include "mongo/db/client.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/balancer/scoped_migration_request.h"
+#include "mongo/s/balancer/type_migration.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/move_chunk_request.h"
#include "mongo/s/sharding_raii.h"
+#include "mongo/util/log.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/scopeguard.h"
@@ -60,21 +63,24 @@ using str::stream;
namespace {
const char kChunkTooBig[] = "chunkTooBig";
+const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
+ WriteConcernOptions::SyncMode::UNSET,
+ Seconds(15));
/**
* Parses the specified asynchronous command response and converts it to status to use as outcome of
* an asynchronous migration command. It is necessary for two reasons:
* - Preserve backwards compatibility with 3.2 and earlier, where the move chunk command instead of
* returning a ChunkTooBig status includes an extra field in the response.
- * - Convert CallbackCanceled errors into InterruptedDueToReplStateChange for the cases where the
- * migration manager is being stopped at replica set stepdown. This return code allows the mongos
- * calling logic to retry the operation on a new primary.
+ * - Convert CallbackCanceled errors into BalancerInterrupted for the cases where the migration
+ * manager is being stopped at replica set stepdown. This return code allows the mongos calling
+ * logic to retry the operation on a new primary.
*/
Status extractMigrationStatusFromRemoteCommandResponse(const RemoteCommandResponse& response,
bool isStopping) {
if (!response.isOK()) {
if (response.status == ErrorCodes::CallbackCanceled && isStopping) {
- return {ErrorCodes::InterruptedDueToReplStateChange,
+ return {ErrorCodes::BalancerInterrupted,
"Migration interrupted because the balancer is stopping"};
}
@@ -97,12 +103,14 @@ Status extractMigrationStatusFromRemoteCommandResponse(const RemoteCommandRespon
/**
* Blocking call to acquire the distributed collection lock for the specified namespace.
*/
-StatusWith<DistLockHandle> acquireDistLock(OperationContext* txn, const NamespaceString& nss) {
+StatusWith<DistLockHandle> acquireDistLock(OperationContext* txn,
+ const OID& clusterIdentity,
+ const NamespaceString& nss) {
const std::string whyMessage(stream() << "Migrating chunk(s) in collection " << nss.ns());
auto statusWithDistLockHandle =
Grid::get(txn)->catalogClient(txn)->getDistLockManager()->lockWithSessionID(
- txn, nss.ns(), whyMessage, OID::gen(), DistLockManager::kSingleLockAttemptTimeout);
+ txn, nss.ns(), whyMessage, clusterIdentity, DistLockManager::kSingleLockAttemptTimeout);
if (!statusWithDistLockHandle.isOK()) {
// If we get LockBusy while trying to acquire the collection distributed lock, this implies
@@ -141,40 +149,71 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance(
const MigrationSecondaryThrottleOptions& secondaryThrottle,
bool waitForDelete) {
- vector<std::pair<shared_ptr<Notification<Status>>, MigrateInfo>> responses;
-
- for (const auto& migrateInfo : migrateInfos) {
- responses.emplace_back(_schedule(txn,
- migrateInfo,
- false, // Config server takes the collection dist lock
- maxChunkSizeBytes,
- secondaryThrottle,
- waitForDelete),
- migrateInfo);
- }
-
MigrationStatuses migrationStatuses;
vector<MigrateInfo> rescheduledMigrations;
- // Wait for all the scheduled migrations to complete and note the ones, which failed with a
- // LockBusy error code. These need to be executed serially, without the distributed lock being
- // held by the config server for backwards compatibility with 3.2 shards.
- for (auto& response : responses) {
- auto notification = std::move(response.first);
- auto migrateInfo = std::move(response.second);
-
- Status responseStatus = notification->get();
+ {
+ std::map<MigrationIdentifier, ScopedMigrationRequest> scopedMigrationRequests;
+ vector<std::pair<shared_ptr<Notification<Status>>, MigrateInfo>> responses;
+
+ for (const auto& migrateInfo : migrateInfos) {
+ // Write a document to the config.migrations collection, in case this migration must be
+ // recovered by the Balancer. Fail if the chunk is already moving.
+ auto statusWithScopedMigrationRequest =
+ ScopedMigrationRequest::writeMigration(txn, migrateInfo);
+ if (!statusWithScopedMigrationRequest.isOK()) {
+ migrationStatuses.emplace(migrateInfo.getName(),
+ std::move(statusWithScopedMigrationRequest.getStatus()));
+ continue;
+ }
+ scopedMigrationRequests.emplace(migrateInfo.getName(),
+ std::move(statusWithScopedMigrationRequest.getValue()));
+
+ responses.emplace_back(_schedule(txn,
+ migrateInfo,
+ false, // Config server takes the collection dist lock
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete),
+ migrateInfo);
+ }
- if (responseStatus == ErrorCodes::LockBusy) {
- rescheduledMigrations.emplace_back(std::move(migrateInfo));
- } else {
- migrationStatuses.emplace(migrateInfo.getName(), std::move(responseStatus));
+ // Wait for all the scheduled migrations to complete and note the ones, which failed with a
+ // LockBusy error code. These need to be executed serially, without the distributed lock
+ // being held by the config server for backwards compatibility with 3.2 shards.
+ for (auto& response : responses) {
+ auto notification = std::move(response.first);
+ auto migrateInfo = std::move(response.second);
+
+ Status responseStatus = notification->get();
+
+ if (responseStatus == ErrorCodes::LockBusy) {
+ rescheduledMigrations.emplace_back(std::move(migrateInfo));
+ } else {
+ if (responseStatus == ErrorCodes::BalancerInterrupted) {
+ auto it = scopedMigrationRequests.find(migrateInfo.getName());
+ invariant(it != scopedMigrationRequests.end());
+ it->second.keepDocumentOnDestruct();
+ }
+
+ migrationStatuses.emplace(migrateInfo.getName(), std::move(responseStatus));
+ }
}
}
// Schedule all 3.2 compatibility migrations sequentially
for (const auto& migrateInfo : rescheduledMigrations) {
+ // Write a document to the config.migrations collection, in case this migration must be
+ // recovered by the Balancer. Fail if the chunk is already moving.
+ auto statusWithScopedMigrationRequest =
+ ScopedMigrationRequest::writeMigration(txn, migrateInfo);
+ if (!statusWithScopedMigrationRequest.isOK()) {
+ migrationStatuses.emplace(migrateInfo.getName(),
+ std::move(statusWithScopedMigrationRequest.getStatus()));
+ continue;
+ }
+
Status responseStatus = _schedule(txn,
migrateInfo,
true, // Shard takes the collection dist lock
@@ -183,6 +222,10 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance(
waitForDelete)
->get();
+ if (responseStatus == ErrorCodes::BalancerInterrupted) {
+ statusWithScopedMigrationRequest.getValue().keepDocumentOnDestruct();
+ }
+
migrationStatuses.emplace(migrateInfo.getName(), std::move(responseStatus));
}
@@ -197,6 +240,16 @@ Status MigrationManager::executeManualMigration(
uint64_t maxChunkSizeBytes,
const MigrationSecondaryThrottleOptions& secondaryThrottle,
bool waitForDelete) {
+ _waitForRecovery();
+
+ // Write a document to the config.migrations collection, in case this migration must be
+ // recovered by the Balancer. Fail if the chunk is already moving.
+ auto statusWithScopedMigrationRequest =
+ ScopedMigrationRequest::writeMigration(txn, migrateInfo);
+ if (!statusWithScopedMigrationRequest.isOK()) {
+ return statusWithScopedMigrationRequest.getStatus();
+ }
+
Status status = _schedule(txn,
migrateInfo,
false, // Config server takes the collection dist lock
@@ -223,13 +276,150 @@ Status MigrationManager::executeManualMigration(
return Status::OK();
}
+ if (status == ErrorCodes::BalancerInterrupted) {
+ statusWithScopedMigrationRequest.getValue().keepDocumentOnDestruct();
+ }
+
return status;
}
-void MigrationManager::enableMigrations() {
+void MigrationManager::startRecovery() {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- invariant(_state == kStopped);
- _state = kEnabled;
+ invariant(_state == State::kStopped);
+ _state = State::kRecovering;
+}
+
+void MigrationManager::finishRecovery(OperationContext* txn,
+ const OID& clusterIdentity,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete) {
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(_state == State::kRecovering);
+ if (!_clusterIdentity.isSet()) {
+ _clusterIdentity = clusterIdentity;
+ }
+ invariant(_clusterIdentity == clusterIdentity);
+ }
+
+ // Load the active migrations from the config.migrations collection.
+ vector<MigrateInfo> migrateInfos;
+
+ auto statusWithMigrationsQueryResponse =
+ Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString(MigrationType::ConfigNS),
+ BSONObj(),
+ BSONObj(),
+ boost::none);
+
+ if (!statusWithMigrationsQueryResponse.isOK()) {
+ warning() << "Unable to read config.migrations collection documents for balancer migration"
+ << " recovery. Abandoning recovery."
+ << causedBy(redact(statusWithMigrationsQueryResponse.getStatus()));
+
+ _abandonActiveMigrationsAndEnableManager(txn);
+ return;
+ }
+
+ for (const BSONObj& migration : statusWithMigrationsQueryResponse.getValue().docs) {
+ auto statusWithMigrationType = MigrationType::fromBSON(migration);
+ if (!statusWithMigrationType.isOK()) {
+ // The format of this migration document is incorrect. The balancer holds a distlock for
+ // this migration, but without parsing the migration document we cannot identify which
+ // distlock must be released. So we must release all distlocks.
+ warning() << "Unable to parse config.migrations collection documents for balancer"
+ << " migration recovery. Abandoning recovery."
+ << causedBy(redact(statusWithMigrationType.getStatus()));
+
+ _abandonActiveMigrationsAndEnableManager(txn);
+ return;
+ }
+ MigrateInfo migrateInfo = statusWithMigrationType.getValue().toMigrateInfo();
+
+ auto it = _migrationRecoveryMap.find(NamespaceString(migrateInfo.ns));
+ if (it == _migrationRecoveryMap.end()) {
+ std::list<MigrateInfo> list;
+ it = _migrationRecoveryMap.insert(std::make_pair(NamespaceString(migrateInfo.ns), list))
+ .first;
+ }
+
+ it->second.push_back(std::move(migrateInfo));
+ }
+
+ // Schedule recovered migrations.
+ vector<ScopedMigrationRequest> scopedMigrationRequests;
+ vector<shared_ptr<Notification<Status>>> responses;
+
+ for (auto& nssAndMigrateInfos : _migrationRecoveryMap) {
+ auto scopedCMStatus = ScopedChunkManager::getExisting(txn, nssAndMigrateInfos.first);
+ if (!scopedCMStatus.isOK()) {
+ // This shouldn't happen because the collection was intact and sharded when the previous
+ // config primary was active and the dist locks have been held by the balancer
+ // throughout. Abort migration recovery.
+ warning() << "Unable to reload chunk metadata for collection '"
+ << nssAndMigrateInfos.first << "' during balancer"
+ << " recovery. Abandoning recovery."
+ << causedBy(redact(scopedCMStatus.getStatus()));
+
+ _abandonActiveMigrationsAndEnableManager(txn);
+ return;
+ }
+
+ auto scopedCM = std::move(scopedCMStatus.getValue());
+ ChunkManager* const cm = scopedCM.cm();
+ cm->reload(txn);
+
+ auto itMigrateInfo = nssAndMigrateInfos.second.begin();
+ invariant(itMigrateInfo != nssAndMigrateInfos.second.end());
+ while (itMigrateInfo != nssAndMigrateInfos.second.end()) {
+ auto chunk = cm->findIntersectingChunkWithSimpleCollation(txn, itMigrateInfo->minKey);
+ invariant(chunk);
+
+ if (chunk->getShardId() != itMigrateInfo->from) {
+ // Chunk is no longer on the source shard of the migration. Erase the migration doc
+ // and drop the distlock if it is only held for this migration.
+ ScopedMigrationRequest::createForRecovery(
+ txn, NamespaceString(itMigrateInfo->ns), itMigrateInfo->minKey);
+ if (nssAndMigrateInfos.second.size() == 1) {
+ Grid::get(txn)->catalogClient(txn)->getDistLockManager()->unlock(
+ txn, _clusterIdentity, itMigrateInfo->ns);
+ }
+ itMigrateInfo = nssAndMigrateInfos.second.erase(itMigrateInfo);
+ } else {
+ scopedMigrationRequests.emplace_back(ScopedMigrationRequest::createForRecovery(
+ txn, NamespaceString(itMigrateInfo->ns), itMigrateInfo->minKey));
+
+ responses.emplace_back(
+ _schedule(txn,
+ *itMigrateInfo,
+ false, // Config server takes the collection dist lock
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete));
+
+ ++itMigrateInfo;
+ }
+ }
+ }
+
+ // The MigrationManager has now reacquired the state that it was in prior to any stepdown.
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(_state == State::kRecovering);
+
+ _migrationRecoveryMap.clear();
+ _state = State::kEnabled;
+ _condVar.notify_all();
+ }
+
+ // Wait for each migration to finish, as usual.
+ for (auto& response : responses) {
+ response->get();
+ }
}
void MigrationManager::interruptAndDisableMigrations() {
@@ -237,11 +427,11 @@ void MigrationManager::interruptAndDisableMigrations() {
Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor();
stdx::lock_guard<stdx::mutex> lock(_mutex);
- if (_state != kEnabled) {
+ if (_state != State::kEnabled && _state != State::kRecovering) {
return;
}
- _state = kStopping;
+ _state = State::kStopping;
// Interrupt any active migrations with dist lock
for (auto& cmsEntry : _activeMigrationsWithDistLock) {
@@ -267,15 +457,15 @@ void MigrationManager::interruptAndDisableMigrations() {
void MigrationManager::drainActiveMigrations() {
stdx::unique_lock<stdx::mutex> lock(_mutex);
- if (_state == kStopped)
+ if (_state == State::kStopped)
return;
- invariant(_state == kStopping);
+ invariant(_state == State::kStopping);
- _stoppedCondVar.wait(lock, [this] {
+ _condVar.wait(lock, [this] {
return _activeMigrationsWithDistLock.empty() && _activeMigrationsWithoutDistLock.empty();
});
- _state = kStopped;
+ _state = State::kStopped;
}
shared_ptr<Notification<Status>> MigrationManager::_schedule(
@@ -290,9 +480,9 @@ shared_ptr<Notification<Status>> MigrationManager::_schedule(
// Ensure we are not stopped in order to avoid doing the extra work
{
stdx::lock_guard<stdx::mutex> lock(_mutex);
- if (_state != kEnabled) {
+ if (_state != State::kEnabled && _state != State::kRecovering) {
return std::make_shared<Notification<Status>>(
- Status(ErrorCodes::InterruptedDueToReplStateChange,
+ Status(ErrorCodes::BalancerInterrupted,
"Migration cannot be executed because the balancer is not running"));
}
}
@@ -349,9 +539,9 @@ shared_ptr<Notification<Status>> MigrationManager::_schedule(
stdx::lock_guard<stdx::mutex> lock(_mutex);
- if (_state != kEnabled) {
+ if (_state != State::kEnabled && _state != State::kRecovering) {
return std::make_shared<Notification<Status>>(
- Status(ErrorCodes::InterruptedDueToReplStateChange,
+ Status(ErrorCodes::BalancerInterrupted,
"Migration cannot be executed because the balancer is not running"));
}
@@ -378,7 +568,7 @@ void MigrationManager::_scheduleWithDistLock_inlock(OperationContext* txn,
auto it = _activeMigrationsWithDistLock.find(nss);
if (it == _activeMigrationsWithDistLock.end()) {
// Acquire the collection distributed lock (blocking call)
- auto distLockHandleStatus = acquireDistLock(txn, nss);
+ auto distLockHandleStatus = acquireDistLock(txn, _clusterIdentity, nss);
if (!distLockHandleStatus.isOK()) {
migration.completionNotification->set(distLockHandleStatus.getStatus());
return;
@@ -409,10 +599,11 @@ void MigrationManager::_scheduleWithDistLock_inlock(OperationContext* txn,
auto txn = cc().makeOperationContext();
stdx::lock_guard<stdx::mutex> lock(_mutex);
- _completeWithDistLock_inlock(txn.get(),
- itMigration,
- extractMigrationStatusFromRemoteCommandResponse(
- args.response, _state != kEnabled));
+ _completeWithDistLock_inlock(
+ txn.get(),
+ itMigration,
+ extractMigrationStatusFromRemoteCommandResponse(
+ args.response, _state != State::kEnabled && _state != State::kRecovering));
});
if (callbackHandleWithStatus.isOK()) {
@@ -441,7 +632,7 @@ void MigrationManager::_completeWithDistLock_inlock(OperationContext* txn,
if (collectionMigrationState->migrations.empty()) {
Grid::get(txn)->catalogClient(txn)->getDistLockManager()->unlock(
- txn, collectionMigrationState->distLockHandle);
+ txn, collectionMigrationState->distLockHandle, nss.ns());
_activeMigrationsWithDistLock.erase(it);
_checkDrained_inlock();
}
@@ -472,7 +663,7 @@ void MigrationManager::_scheduleWithoutDistLock_inlock(OperationContext* txn,
_checkDrained_inlock();
notificationToSignal->set(extractMigrationStatusFromRemoteCommandResponse(
- args.response, _state != kEnabled));
+ args.response, _state != State::kEnabled && _state != State::kRecovering));
});
if (callbackHandleWithStatus.isOK()) {
@@ -489,16 +680,40 @@ void MigrationManager::_scheduleWithoutDistLock_inlock(OperationContext* txn,
}
void MigrationManager::_checkDrained_inlock() {
- if (_state == kEnabled) {
+ if (_state == State::kEnabled || _state == State::kRecovering) {
return;
}
- invariant(_state == kStopping);
+ invariant(_state == State::kStopping);
if (_activeMigrationsWithDistLock.empty() && _activeMigrationsWithoutDistLock.empty()) {
- _stoppedCondVar.notify_all();
+ _condVar.notify_all();
}
}
+void MigrationManager::_waitForRecovery() {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ _condVar.wait(lock, [this] { return _state != State::kRecovering; });
+}
+
+void MigrationManager::_abandonActiveMigrationsAndEnableManager(OperationContext* txn) {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ invariant(_state == State::kRecovering);
+
+ auto catalogClient = Grid::get(txn)->catalogClient(txn);
+
+ // Unlock all balancer distlocks we aren't using anymore.
+ auto distLockManager = catalogClient->getDistLockManager();
+ distLockManager->unlockAll(txn, distLockManager->getProcessID());
+
+ // Clear the config.migrations collection so that those chunks can be scheduled for migration
+ // again.
+ catalogClient->removeConfigDocuments(
+ txn, MigrationType::ConfigNS, BSONObj(), kMajorityWriteConcern);
+
+ _state = State::kEnabled;
+ _condVar.notify_all();
+}
+
MigrationManager::Migration::Migration(NamespaceString inNss, BSONObj inMoveChunkCmdObj)
: nss(std::move(inNss)),
moveChunkCmdObj(std::move(inMoveChunkCmdObj)),
diff --git a/src/mongo/s/balancer/migration_manager.h b/src/mongo/s/balancer/migration_manager.h
index 39fe4e7f0ee..456452b1efd 100644
--- a/src/mongo/s/balancer/migration_manager.h
+++ b/src/mongo/s/balancer/migration_manager.h
@@ -99,20 +99,33 @@ public:
bool waitForDelete);
/**
- * Non-blocking method, which puts the migration manager in a state where new migrations can be
- * scheduled (kEnabled). May only be called if the manager is in the kStopped state.
+ * Non-blocking method that puts the migration manager in the kRecovering state, in which
+ * new migration requests will block until finishRecovery is called.
*/
- void enableMigrations();
+ void startRecovery();
/**
- * Non-blocking method, which puts the manager in a state where all subsequently scheduled
- * migrations will immediately fail (without ever getting scheduled) and all active ones will be
- * cancelled. It has no effect if the migration manager is not enabled.
+ * Blocking method that must only be called after startRecovery has been called. Recovers the
+ * state of the migration manager (if necessary and able) and puts it in the kEnabled state,
+ * where it will accept new migrations. Any migrations waiting on the recovery state will be
+ * unblocked.
+ */
+ void finishRecovery(OperationContext* txn,
+ const OID& clusterIdentity,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete);
+
+ /**
+ * Non-blocking method that should never be called concurrently with finishRecovery. Puts the
+ * manager in a state where all subsequently scheduled migrations will immediately fail (without
+ * ever getting scheduled) and all active ones will be cancelled. It has no effect if the
+ * migration manager is already stopping or stopped.
*/
void interruptAndDisableMigrations();
/**
- * Blocking method, which waits for any currently scheduled migrations to complete. Must be
+ * Blocking method that waits for any currently scheduled migrations to complete. Must be
* called after interruptAndDisableMigrations has been called in order to be able to re-enable
* migrations again.
*/
@@ -120,7 +133,12 @@ public:
private:
// The current state of the migration manager
- enum State { kEnabled, kStopping, kStopped };
+ enum class State { // Allowed transitions:
+ kRecovering, // kEnabled
+ kEnabled, // kStopping
+ kStopping, // kStopped
+ kStopped, // kRecovering
+ };
/**
* Tracks the execution state of a single migration.
@@ -217,22 +235,41 @@ private:
Migration migration);
/**
- * If the state of the migration manager is kStopping checks whether there are any outstanding
- * scheduled requests and if there aren't any signals the 'stopped' conditional variable.
+ * If the state of the migration manager is kStopping, checks whether there are any outstanding
+ * scheduled requests and if there aren't any signals the class condition variable.
*/
void _checkDrained_inlock();
- // The service context under which this migration manager runs
+ /**
+ * Blocking call, which waits for the migration manager to leave the recovering state (if it is
+ * currently recovering).
+ */
+ void _waitForRecovery();
+
+ /**
+ * Should only be called from within the finishRecovery function because the migration manager
+ * must be in the kRecovering state. Releases all the distributed locks that the balancer holds,
+ * clears the config.migrations collection, changes the state of the migration manager from
+ * kRecovering to kEnabled, and unblocks all processes waiting on the recovery state.
+ */
+ void _abandonActiveMigrationsAndEnableManager(OperationContext* txn);
+
+ // The service context under which this migration manager runs.
ServiceContext* const _serviceContext;
- // Protects the class state below
+ // Protects the class state below.
stdx::mutex _mutex;
- // Start the migration manager as stopped
- State _state{kStopped};
+ // Always start the migration manager in a stopped state.
+ State _state{State::kStopped};
+
+ // Condition variable, which is waited on when the migration manager's state is changing and
+ // signaled when the state change is complete.
+ stdx::condition_variable _condVar;
- // Condition variable, which is signaled when the migration manager has no more active requests
- stdx::condition_variable _stoppedCondVar;
+ // Identity of the cluster under which this migration manager runs. Used as a constant session
+ // ID for all distributed locks that the MigrationManager holds.
+ OID _clusterIdentity;
// Holds information about each collection's distributed lock and active migrations via a
// CollectionMigrationState object.
@@ -241,6 +278,9 @@ private:
// Holds information about migrations, which have been scheduled without the collection
// distributed lock acquired (i.e., the shard is asked to acquire it).
MigrationsList _activeMigrationsWithoutDistLock;
+
+ // Carries migration information over from startRecovery to finishRecovery.
+ stdx::unordered_map<NamespaceString, std::list<MigrateInfo>> _migrationRecoveryMap;
};
} // namespace mongo
diff --git a/src/mongo/s/balancer/migration_manager_test.cpp b/src/mongo/s/balancer/migration_manager_test.cpp
index 696f208a95d..2e90d29f2e9 100644
--- a/src/mongo/s/balancer/migration_manager_test.cpp
+++ b/src/mongo/s/balancer/migration_manager_test.cpp
@@ -33,10 +33,12 @@
#include "mongo/db/commands.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/s/balancer/migration_manager.h"
+#include "mongo/s/balancer/type_migration.h"
#include "mongo/s/catalog/dist_lock_manager_mock.h"
#include "mongo/s/catalog/replset/sharding_catalog_client_impl.h"
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_database.h"
+#include "mongo/s/catalog/type_locks.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/config_server_test_fixture.h"
#include "mongo/s/move_chunk_request.h"
@@ -48,6 +50,7 @@ namespace {
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
using std::vector;
+using unittest::assertGet;
const auto kShardId0 = ShardId("shard0");
const auto kShardId1 = ShardId("shard1");
@@ -108,6 +111,21 @@ protected:
const ChunkVersion& version);
/**
+ * Inserts a document into the config.migrations collection as an active migration.
+ */
+ void setUpMigration(const std::string& collName,
+ const BSONObj& minKey,
+ const BSONObj& maxKey,
+ const ShardId& toShard,
+ const ShardId& fromShard);
+
+ /**
+ * Asserts that config.migrations is empty and config.locks contains no locked documents, both
+ * of which should be true if the MigrationManager is inactive and behaving properly.
+ */
+ void checkMigrationsCollectionIsEmptyAndLocksAreUnlocked();
+
+ /**
* Sets up mock network to expect a moveChunk command and return a fixed BSON response or a
* "returnStatus".
*/
@@ -137,6 +155,9 @@ protected:
const KeyPattern kKeyPattern = KeyPattern(BSON(kPattern << 1));
+ // Cluster identity to pass to the migration manager
+ const OID _clusterIdentity{OID::gen()};
+
std::unique_ptr<MigrationManager> _migrationManager;
private:
@@ -147,10 +168,13 @@ private:
void MigrationManagerTest::setUp() {
ConfigServerTestFixture::setUp();
_migrationManager = stdx::make_unique<MigrationManager>(getServiceContext());
- _migrationManager->enableMigrations();
+ _migrationManager->startRecovery();
+ _migrationManager->finishRecovery(
+ operationContext(), _clusterIdentity, 0, kDefaultSecondaryThrottle, false);
}
void MigrationManagerTest::tearDown() {
+ checkMigrationsCollectionIsEmptyAndLocksAreUnlocked();
_migrationManager->interruptAndDisableMigrations();
_migrationManager->drainActiveMigrations();
_migrationManager.reset();
@@ -199,6 +223,50 @@ ChunkType MigrationManagerTest::setUpChunk(const std::string& collName,
return chunk;
}
+void MigrationManagerTest::setUpMigration(const std::string& collName,
+ const BSONObj& minKey,
+ const BSONObj& maxKey,
+ const ShardId& toShard,
+ const ShardId& fromShard) {
+ BSONObjBuilder builder;
+ builder.append(MigrationType::ns(), collName);
+ builder.append(MigrationType::min(), minKey);
+ builder.append(MigrationType::max(), maxKey);
+ builder.append(MigrationType::toShard(), toShard.toString());
+ builder.append(MigrationType::fromShard(), fromShard.toString());
+ MigrationType migrationType = assertGet(MigrationType::fromBSON(builder.obj()));
+ ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(),
+ MigrationType::ConfigNS,
+ migrationType.toBSON(),
+ kMajorityWriteConcern));
+}
+
+void MigrationManagerTest::checkMigrationsCollectionIsEmptyAndLocksAreUnlocked() {
+ auto statusWithMigrationsQueryResponse =
+ shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ operationContext(),
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString(MigrationType::ConfigNS),
+ BSONObj(),
+ BSONObj(),
+ boost::none);
+ Shard::QueryResponse migrationsQueryResponse =
+ uassertStatusOK(statusWithMigrationsQueryResponse);
+ ASSERT_EQUALS(0U, migrationsQueryResponse.docs.size());
+
+ auto statusWithLocksQueryResponse = shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ operationContext(),
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString(LocksType::ConfigNS),
+ BSON(LocksType::state(LocksType::LOCKED)),
+ BSONObj(),
+ boost::none);
+ Shard::QueryResponse locksQueryResponse = uassertStatusOK(statusWithLocksQueryResponse);
+ ASSERT_EQUALS(0U, locksQueryResponse.docs.size());
+}
+
void MigrationManagerTest::expectMoveChunkCommand(const ChunkType& chunk,
const ShardId& toShardId,
const bool& takeDistLock,
@@ -559,7 +627,7 @@ TEST_F(MigrationManagerTest, FailToAcquireDistributedLock) {
// Take the distributed lock for the collection before scheduling via the MigrationManager.
const std::string whyMessage("FailToAcquireDistributedLock unit-test taking distributed lock");
DistLockManager::ScopedDistLock distLockStatus =
- unittest::assertGet(catalogClient()->getDistLockManager()->lock(
+ assertGet(catalogClient()->getDistLockManager()->lock(
operationContext(), chunk1.getNS(), whyMessage));
MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
@@ -679,7 +747,7 @@ TEST_F(MigrationManagerTest, InterruptMigration) {
setUpCollection(collName, version);
// Set up a single chunk in the metadata.
- ChunkType chunk1 =
+ ChunkType chunk =
setUpChunk(collName, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version);
auto future = launchAsync([&] {
@@ -691,7 +759,7 @@ TEST_F(MigrationManagerTest, InterruptMigration) {
shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
ASSERT_NOT_OK(_migrationManager->executeManualMigration(
- txn.get(), {chunk1.getNS(), kShardId1, chunk1}, 0, kDefaultSecondaryThrottle, false));
+ txn.get(), {chunk.getNS(), kShardId1, chunk}, 0, kDefaultSecondaryThrottle, false));
});
// Wait till the move chunk request gets sent and pretend that it is stuck by never responding
@@ -714,13 +782,34 @@ TEST_F(MigrationManagerTest, InterruptMigration) {
// Ensure that no new migrations can be scheduled
ASSERT_NOT_OK(_migrationManager->executeManualMigration(operationContext(),
- {chunk1.getNS(), kShardId1, chunk1},
+ {chunk.getNS(), kShardId1, chunk},
0,
kDefaultSecondaryThrottle,
false));
- // Ensure there are no active migrations left
+ // Ensure that the migration manager is no longer handling any migrations.
_migrationManager->drainActiveMigrations();
+
+ // Check that the migration that was active when the migration manager was interrupted can be
+ // found in config.migrations (and thus would be recovered if a migration manager were to start
+ // up again).
+ auto statusWithMigrationsQueryResponse =
+ shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ operationContext(),
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString(MigrationType::ConfigNS),
+ BSON(MigrationType::name(chunk.getName())),
+ BSONObj(),
+ boost::none);
+ Shard::QueryResponse migrationsQueryResponse =
+ uassertStatusOK(statusWithMigrationsQueryResponse);
+ ASSERT_EQUALS(1U, migrationsQueryResponse.docs.size());
+
+ ASSERT_OK(catalogClient()->removeConfigDocuments(operationContext(),
+ MigrationType::ConfigNS,
+ BSON(MigrationType::name(chunk.getName())),
+ kMajorityWriteConcern));
}
TEST_F(MigrationManagerTest, RestartMigrationManager) {
@@ -743,7 +832,9 @@ TEST_F(MigrationManagerTest, RestartMigrationManager) {
// Go through the lifecycle of the migration manager
_migrationManager->interruptAndDisableMigrations();
_migrationManager->drainActiveMigrations();
- _migrationManager->enableMigrations();
+ _migrationManager->startRecovery();
+ _migrationManager->finishRecovery(
+ operationContext(), _clusterIdentity, 0, kDefaultSecondaryThrottle, false);
auto future = launchAsync([&] {
Client::initThreadIfNotAlready("Test");
@@ -764,5 +855,126 @@ TEST_F(MigrationManagerTest, RestartMigrationManager) {
future.timed_get(kFutureTimeout);
}
+TEST_F(MigrationManagerTest, MigrationRecovery) {
+ // Set up two shards in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up two chunks in the metadata.
+ ChunkType chunk1 =
+ setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version);
+ version.incMinor();
+ ChunkType chunk2 =
+ setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
+
+ _migrationManager->interruptAndDisableMigrations();
+ _migrationManager->drainActiveMigrations();
+ _migrationManager->startRecovery();
+
+ // Set up two fake active migrations by writing documents to the config.migrations collection.
+ setUpMigration(collName,
+ chunk1.getMin(),
+ chunk1.getMax(),
+ kShardId1.toString(),
+ chunk1.getShard().toString());
+ setUpMigration(collName,
+ chunk2.getMin(),
+ chunk2.getMax(),
+ kShardId3.toString(),
+ chunk2.getShard().toString());
+
+ auto future = launchAsync([this] {
+ Client::initThreadIfNotAlready("Test");
+ auto txn = cc().makeOperationContext();
+
+ // Scheduling the moveChunk commands requires finding hosts to which to send the commands.
+ // Set up dummy hosts for the source shards.
+ shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
+ shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2);
+
+ _migrationManager->finishRecovery(
+ txn.get(), _clusterIdentity, 0, kDefaultSecondaryThrottle, false);
+ });
+
+ // Expect two moveChunk commands.
+ expectMoveChunkCommand(chunk1, kShardId1, false, Status::OK());
+ expectMoveChunkCommand(chunk2, kShardId3, false, Status::OK());
+
+ // Run the MigrationManager code.
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(MigrationManagerTest, FailMigrationRecovery) {
+ // Set up two shards in the metadata.
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
+
+ // Set up the database and collection as sharded in the metadata.
+ std::string dbName = "foo";
+ std::string collName = "foo.bar";
+ ChunkVersion version(2, 0, OID::gen());
+
+ setUpDatabase(dbName, kShardId0);
+ setUpCollection(collName, version);
+
+ // Set up two chunks in the metadata.
+ ChunkType chunk1 =
+ setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version);
+ version.incMinor();
+ ChunkType chunk2 =
+ setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
+
+ // Set up a parsable fake active migration document in the config.migrations collection.
+ setUpMigration(collName,
+ chunk1.getMin(),
+ chunk1.getMax(),
+ kShardId1.toString(),
+ chunk1.getShard().toString());
+
+ _migrationManager->interruptAndDisableMigrations();
+ _migrationManager->drainActiveMigrations();
+ _migrationManager->startRecovery();
+
+ // Set up a fake active migration document that will fail MigrationType parsing -- missing
+ // field.
+ BSONObjBuilder builder;
+ builder.append("_id", "testing");
+ // No MigrationType::ns() field!
+ builder.append(MigrationType::min(), chunk2.getMin());
+ builder.append(MigrationType::max(), chunk2.getMax());
+ builder.append(MigrationType::toShard(), kShardId3.toString());
+ builder.append(MigrationType::fromShard(), chunk2.getShard().toString());
+ ASSERT_OK(catalogClient()->insertConfigDocument(
+ operationContext(), MigrationType::ConfigNS, builder.obj(), kMajorityWriteConcern));
+
+ // Take the distributed lock for the collection, which should be released during recovery when
+ // it fails. Any dist lock held by the config server will be released via proccessId, so the
+ // session ID used here doesn't matter.
+ ASSERT_OK(catalogClient()->getDistLockManager()->lockWithSessionID(
+ operationContext(),
+ collName,
+ "MigrationManagerTest",
+ OID::gen(),
+ DistLockManager::kSingleLockAttemptTimeout));
+
+ _migrationManager->finishRecovery(
+ operationContext(), _clusterIdentity, 0, kDefaultSecondaryThrottle, false);
+
+ // MigrationManagerTest::tearDown checks that the config.migrations collection is empty and all
+ // distributed locks are unlocked.
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/balancer/scoped_migration_request.cpp b/src/mongo/s/balancer/scoped_migration_request.cpp
index 538537079f8..29beacdbd7c 100644
--- a/src/mongo/s/balancer/scoped_migration_request.cpp
+++ b/src/mongo/s/balancer/scoped_migration_request.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/write_concern_options.h"
#include "mongo/s/balancer/type_migration.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/util/log.h"
@@ -90,18 +91,44 @@ ScopedMigrationRequest& ScopedMigrationRequest::operator=(ScopedMigrationRequest
}
StatusWith<ScopedMigrationRequest> ScopedMigrationRequest::writeMigration(
- OperationContext* txn,
- const MigrateInfo& migrateInfo,
- const ChunkVersion& chunkVersion,
- const ChunkVersion& collectionVersion) {
+ OperationContext* txn, const MigrateInfo& migrateInfo) {
// Try to write a unique migration document to config.migrations.
- MigrationType migrationType(migrateInfo, chunkVersion, collectionVersion);
+ MigrationType migrationType(migrateInfo);
Status result = grid.catalogClient(txn)->insertConfigDocument(
txn, MigrationType::ConfigNS, migrationType.toBSON(), kMajorityWriteConcern);
if (result == ErrorCodes::DuplicateKey) {
- return result;
+ // If the exact migration described by "migrateInfo" is active, return a scoped object for
+ // the request because this migration request will join the active one once scheduled.
+ auto statusWithMigrationQueryResult =
+ grid.shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString(MigrationType::ConfigNS),
+ migrationType.toBSON(),
+ BSONObj(),
+ 1);
+
+ if (!statusWithMigrationQueryResult.isOK()) {
+ return {statusWithMigrationQueryResult.getStatus().code(),
+ str::stream() << "Failed to verify whether conflicting migration is in "
+ << "progress for migration '"
+ << migrateInfo.toString()
+ << "' while trying to persist migration to config.migrations."
+ << causedBy(redact(statusWithMigrationQueryResult.getStatus()))};
+ }
+
+ if (statusWithMigrationQueryResult.getValue().docs.size() != 1) {
+ invariant(statusWithMigrationQueryResult.getValue().docs.size() == 0);
+ log() << "Failed to write document '" << migrateInfo
+ << "' to config.migrations because there is already an active migration for "
+ << "this chunk" << causedBy(redact(result));
+ return result;
+ }
+
+ result = Status::OK();
}
// As long as there isn't a DuplicateKey error, the document may have been written, and it's
diff --git a/src/mongo/s/balancer/scoped_migration_request.h b/src/mongo/s/balancer/scoped_migration_request.h
index 68f46cfdb50..8595671dc4d 100644
--- a/src/mongo/s/balancer/scoped_migration_request.h
+++ b/src/mongo/s/balancer/scoped_migration_request.h
@@ -30,7 +30,6 @@
#include "mongo/base/status_with.h"
#include "mongo/s/balancer/balancer_policy.h"
-#include "mongo/s/chunk_version.h"
#include "mongo/s/migration_secondary_throttle_options.h"
namespace mongo {
@@ -68,9 +67,7 @@ public:
* The destructor will handle removing the document when it is no longer needed.
*/
static StatusWith<ScopedMigrationRequest> writeMigration(OperationContext* txn,
- const MigrateInfo& migrate,
- const ChunkVersion& chunkVersion,
- const ChunkVersion& collectionVersion);
+ const MigrateInfo& migrate);
/**
* Creates a ScopedMigrationRequest object without inserting a document into config.migrations.
diff --git a/src/mongo/s/balancer/scoped_migration_request_test.cpp b/src/mongo/s/balancer/scoped_migration_request_test.cpp
index e7bc010c654..48c0d501136 100644
--- a/src/mongo/s/balancer/scoped_migration_request_test.cpp
+++ b/src/mongo/s/balancer/scoped_migration_request_test.cpp
@@ -45,6 +45,7 @@ const BSONObj kMin = BSON("a" << 10);
const BSONObj kMax = BSON("a" << 20);
const ShardId kFromShard("shard0000");
const ShardId kToShard("shard0001");
+const ShardId kDifferentToShard("shard0002");
const std::string kName = "TestDB.TestColl-a_10";
class ScopedMigrationRequestTest : public ConfigServerTestFixture {
@@ -82,10 +83,7 @@ void ScopedMigrationRequestTest::checkMigrationsCollectionForDocument(
ScopedMigrationRequest ScopedMigrationRequestTest::makeScopedMigrationRequest(
const MigrateInfo& migrateInfo) {
ScopedMigrationRequest scopedMigrationRequest =
- assertGet(ScopedMigrationRequest::writeMigration(operationContext(),
- migrateInfo,
- ChunkVersion(1, 2, OID::gen()),
- ChunkVersion(1, 2, OID::gen())));
+ assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo));
checkMigrationsCollectionForDocument(migrateInfo.getName(), 1);
@@ -114,10 +112,7 @@ TEST_F(ScopedMigrationRequestTest, CreateScopedMigrationRequest) {
{
ScopedMigrationRequest scopedMigrationRequest =
- assertGet(ScopedMigrationRequest::writeMigration(operationContext(),
- migrateInfo,
- ChunkVersion(1, 2, OID::gen()),
- ChunkVersion(1, 2, OID::gen())));
+ assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo));
checkMigrationsCollectionForDocument(migrateInfo.getName(), 1);
}
@@ -138,10 +133,7 @@ TEST_F(ScopedMigrationRequestTest, CreateScopedMigrationRequestOnRecovery) {
// Insert the document for the MigrationRequest and then prevent its removal in the destructor.
{
ScopedMigrationRequest scopedMigrationRequest =
- assertGet(ScopedMigrationRequest::writeMigration(operationContext(),
- migrateInfo,
- ChunkVersion(1, 2, OID::gen()),
- ChunkVersion(1, 2, OID::gen())));
+ assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo));
checkMigrationsCollectionForDocument(migrateInfo.getName(), 1);
@@ -150,13 +142,14 @@ TEST_F(ScopedMigrationRequestTest, CreateScopedMigrationRequestOnRecovery) {
checkMigrationsCollectionForDocument(migrateInfo.getName(), 1);
- // Trying to write a migration that already exists should fail.
+ // Fail to write a migration document if a migration document already exists for that chunk but
+ // with a different destination shard. (the migration request must have identical parameters).
{
+ MigrateInfo differentToShardMigrateInfo = migrateInfo;
+ differentToShardMigrateInfo.to = kDifferentToShard;
+
StatusWith<ScopedMigrationRequest> statusWithScopedMigrationRequest =
- ScopedMigrationRequest::writeMigration(operationContext(),
- migrateInfo,
- ChunkVersion(1, 2, OID::gen()),
- ChunkVersion(1, 2, OID::gen()));
+ ScopedMigrationRequest::writeMigration(operationContext(), differentToShardMigrateInfo);
ASSERT_EQUALS(ErrorCodes::DuplicateKey, statusWithScopedMigrationRequest.getStatus());
@@ -175,6 +168,32 @@ TEST_F(ScopedMigrationRequestTest, CreateScopedMigrationRequestOnRecovery) {
checkMigrationsCollectionForDocument(migrateInfo.getName(), 0);
}
+TEST_F(ScopedMigrationRequestTest, CreateMultipleScopedMigrationRequestsForIdenticalMigration) {
+ MigrateInfo migrateInfo = makeMigrateInfo();
+
+ {
+ // Create a ScopedMigrationRequest, which will do the config.migrations write.
+ ScopedMigrationRequest scopedMigrationRequest =
+ assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo));
+
+ checkMigrationsCollectionForDocument(migrateInfo.getName(), 1);
+
+ {
+ // Should be able to create another Scoped object if the request is identical.
+ ScopedMigrationRequest identicalScopedMigrationRequest =
+ assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo));
+
+ checkMigrationsCollectionForDocument(migrateInfo.getName(), 1);
+ }
+
+ // If any scoped object goes out of scope, the migration should be over and the document
+ // removed.
+ checkMigrationsCollectionForDocument(migrateInfo.getName(), 0);
+ }
+
+ checkMigrationsCollectionForDocument(migrateInfo.getName(), 0);
+}
+
TEST_F(ScopedMigrationRequestTest, MoveAndAssignmentConstructors) {
MigrateInfo migrateInfo = makeMigrateInfo();
diff --git a/src/mongo/s/balancer/type_migration.cpp b/src/mongo/s/balancer/type_migration.cpp
index 0fb97ba7b51..963c40b30e3 100644
--- a/src/mongo/s/balancer/type_migration.cpp
+++ b/src/mongo/s/balancer/type_migration.cpp
@@ -43,19 +43,13 @@ const BSONField<BSONObj> MigrationType::min("min");
const BSONField<BSONObj> MigrationType::max("max");
const BSONField<std::string> MigrationType::fromShard("fromShard");
const BSONField<std::string> MigrationType::toShard("toShard");
-const BSONField<std::string> MigrationType::chunkVersionField("chunkVersion");
-const BSONField<std::string> MigrationType::collectionVersionField("collectionVersion");
MigrationType::MigrationType() = default;
-MigrationType::MigrationType(MigrateInfo info,
- const ChunkVersion& chunkVersion,
- const ChunkVersion& collectionVersion)
+MigrationType::MigrationType(MigrateInfo info)
: _nss(NamespaceString(info.ns)),
_min(info.minKey),
_max(info.maxKey),
- _chunkVersion(chunkVersion),
- _collectionVersion(collectionVersion),
_fromShard(info.from),
_toShard(info.to) {}
@@ -81,24 +75,6 @@ StatusWith<MigrationType> MigrationType::fromBSON(const BSONObj& source) {
}
{
- auto chunkVersionStatus =
- ChunkVersion::parseFromBSONWithFieldForCommands(source, chunkVersionField.name());
- if (!chunkVersionStatus.isOK()) {
- return chunkVersionStatus.getStatus();
- }
- migrationType._chunkVersion = std::move(chunkVersionStatus.getValue());
- }
-
- {
- auto collectionVersionStatus =
- ChunkVersion::parseFromBSONWithFieldForCommands(source, collectionVersionField.name());
- if (!collectionVersionStatus.isOK()) {
- return collectionVersionStatus.getStatus();
- }
- migrationType._collectionVersion = std::move(collectionVersionStatus.getValue());
- }
-
- {
std::string migrationToShard;
Status status = bsonExtractStringField(source, toShard.name(), &migrationToShard);
if (!status.isOK())
@@ -127,10 +103,6 @@ BSONObj MigrationType::toBSON() const {
builder.append(min.name(), _min.get());
if (_max)
builder.append(max.name(), _max.get());
- if (_chunkVersion)
- _chunkVersion->appendWithFieldForCommands(&builder, chunkVersionField.name());
- if (_collectionVersion)
- _collectionVersion->appendWithFieldForCommands(&builder, collectionVersionField.name());
if (_fromShard)
builder.append(fromShard.name(), _fromShard->toString());
if (_toShard)
@@ -139,6 +111,10 @@ BSONObj MigrationType::toBSON() const {
return builder.obj();
}
+MigrateInfo MigrationType::toMigrateInfo() const {
+ return MigrateInfo(_nss->ns(), _toShard.get(), _fromShard.get(), _min.get(), _max.get());
+}
+
std::string MigrationType::getName() const {
return ChunkType::genID(_nss->ns(), _min.get());
}
diff --git a/src/mongo/s/balancer/type_migration.h b/src/mongo/s/balancer/type_migration.h
index f1ee489d916..5f2948e9dfe 100644
--- a/src/mongo/s/balancer/type_migration.h
+++ b/src/mongo/s/balancer/type_migration.h
@@ -59,9 +59,7 @@ public:
* The Balancer encapsulates migration information in MigrateInfo objects, so this facilitates
* conversion to a config.migrations entry format.
*/
- MigrationType(MigrateInfo info,
- const ChunkVersion& chunkVersion,
- const ChunkVersion& collectionVersion);
+ explicit MigrationType(MigrateInfo info);
/**
* Constructs a new MigrationType object from BSON. Expects all fields to be present, and errors
@@ -75,6 +73,11 @@ public:
BSONObj toBSON() const;
/**
+ * Helper function for the Balancer that uses MigrateInfo objects to schedule migrations.
+ */
+ MigrateInfo toMigrateInfo() const;
+
+ /**
* Uniquely identifies a chunk by collection and min key.
*/
std::string getName() const;
@@ -86,8 +89,6 @@ private:
boost::optional<NamespaceString> _nss;
boost::optional<BSONObj> _min;
boost::optional<BSONObj> _max;
- boost::optional<ChunkVersion> _chunkVersion;
- boost::optional<ChunkVersion> _collectionVersion;
boost::optional<ShardId> _fromShard;
boost::optional<ShardId> _toShard;
};
diff --git a/src/mongo/s/balancer/type_migration_test.cpp b/src/mongo/s/balancer/type_migration_test.cpp
index ff6c31ca9ae..d3a352301aa 100644
--- a/src/mongo/s/balancer/type_migration_test.cpp
+++ b/src/mongo/s/balancer/type_migration_test.cpp
@@ -61,15 +61,13 @@ TEST(MigrationTypeTest, ConvertFromMigrationInfo) {
ASSERT_OK(chunkType.validate());
MigrateInfo migrateInfo(kNs, kToShard, chunkType);
- MigrationType migrationType(migrateInfo, version, version);
+ MigrationType migrationType(migrateInfo);
BSONObjBuilder builder;
builder.append(MigrationType::name(), kName);
builder.append(MigrationType::ns(), kNs);
builder.append(MigrationType::min(), kMin);
builder.append(MigrationType::max(), kMax);
- version.appendWithFieldForCommands(&builder, MigrationType::chunkVersionField.name());
- version.appendWithFieldForCommands(&builder, MigrationType::collectionVersionField.name());
builder.append(MigrationType::fromShard(), kFromShard.toString());
builder.append(MigrationType::toShard(), kToShard.toString());
@@ -79,15 +77,11 @@ TEST(MigrationTypeTest, ConvertFromMigrationInfo) {
}
TEST(MigrationTypeTest, FromAndToBSON) {
- const ChunkVersion version(1, 2, OID::gen());
-
BSONObjBuilder builder;
builder.append(MigrationType::name(), kName);
builder.append(MigrationType::ns(), kNs);
builder.append(MigrationType::min(), kMin);
builder.append(MigrationType::max(), kMax);
- version.appendWithFieldForCommands(&builder, MigrationType::chunkVersionField.name());
- version.appendWithFieldForCommands(&builder, MigrationType::collectionVersionField.name());
builder.append(MigrationType::fromShard(), kFromShard.toString());
builder.append(MigrationType::toShard(), kToShard.toString());
@@ -98,13 +92,9 @@ TEST(MigrationTypeTest, FromAndToBSON) {
}
TEST(MigrationTypeTest, MissingRequiredNamespaceField) {
- const ChunkVersion version(1, 2, OID::gen());
-
BSONObjBuilder builder;
builder.append(MigrationType::min(), kMin);
builder.append(MigrationType::max(), kMax);
- version.appendWithFieldForCommands(&builder, MigrationType::chunkVersionField.name());
- version.appendWithFieldForCommands(&builder, MigrationType::collectionVersionField.name());
builder.append(MigrationType::fromShard(), kFromShard.toString());
builder.append(MigrationType::toShard(), kToShard.toString());
@@ -116,13 +106,9 @@ TEST(MigrationTypeTest, MissingRequiredNamespaceField) {
}
TEST(MigrationTypeTest, MissingRequiredMinField) {
- const ChunkVersion version(1, 2, OID::gen());
-
BSONObjBuilder builder;
builder.append(MigrationType::ns(), kNs);
builder.append(MigrationType::max(), kMax);
- version.appendWithFieldForCommands(&builder, MigrationType::chunkVersionField.name());
- version.appendWithFieldForCommands(&builder, MigrationType::collectionVersionField.name());
builder.append(MigrationType::fromShard(), kFromShard.toString());
builder.append(MigrationType::toShard(), kToShard.toString());
@@ -134,13 +120,9 @@ TEST(MigrationTypeTest, MissingRequiredMinField) {
}
TEST(MigrationTypeTest, MissingRequiredMaxField) {
- const ChunkVersion version(1, 2, OID::gen());
-
BSONObjBuilder builder;
builder.append(MigrationType::ns(), kNs);
builder.append(MigrationType::min(), kMin);
- version.appendWithFieldForCommands(&builder, MigrationType::chunkVersionField.name());
- version.appendWithFieldForCommands(&builder, MigrationType::collectionVersionField.name());
builder.append(MigrationType::fromShard(), kFromShard.toString());
builder.append(MigrationType::toShard(), kToShard.toString());
@@ -151,53 +133,11 @@ TEST(MigrationTypeTest, MissingRequiredMaxField) {
ASSERT_STRING_CONTAINS(migrationType.getStatus().reason(), MigrationType::max.name());
}
-TEST(MigrationTypeTest, MissingRequiredChunkVersionField) {
- const ChunkVersion version(1, 2, OID::gen());
-
- BSONObjBuilder builder;
- builder.append(MigrationType::ns(), kNs);
- builder.append(MigrationType::min(), kMin);
- builder.append(MigrationType::max(), kMax);
- version.appendWithFieldForCommands(&builder, MigrationType::collectionVersionField.name());
- builder.append(MigrationType::fromShard(), kFromShard.toString());
- builder.append(MigrationType::toShard(), kToShard.toString());
-
- BSONObj obj = builder.obj();
-
- StatusWith<MigrationType> migrationType = MigrationType::fromBSON(obj);
- ASSERT_EQUALS(migrationType.getStatus(), ErrorCodes::NoSuchKey);
- ASSERT_STRING_CONTAINS(migrationType.getStatus().reason(),
- MigrationType::chunkVersionField.name());
-}
-
-TEST(MigrationTypeTest, MissingRequiredCollectionVersionField) {
- const ChunkVersion version(1, 2, OID::gen());
-
- BSONObjBuilder builder;
- builder.append(MigrationType::ns(), kNs);
- builder.append(MigrationType::min(), kMin);
- builder.append(MigrationType::max(), kMax);
- version.appendWithFieldForCommands(&builder, MigrationType::chunkVersionField.name());
- builder.append(MigrationType::fromShard(), kFromShard.toString());
- builder.append(MigrationType::toShard(), kToShard.toString());
-
- BSONObj obj = builder.obj();
-
- StatusWith<MigrationType> migrationType = MigrationType::fromBSON(obj);
- ASSERT_EQUALS(migrationType.getStatus(), ErrorCodes::NoSuchKey);
- ASSERT_STRING_CONTAINS(migrationType.getStatus().reason(),
- MigrationType::collectionVersionField.name());
-}
-
TEST(MigrationTypeTest, MissingRequiredFromShardField) {
- const ChunkVersion version(1, 2, OID::gen());
-
BSONObjBuilder builder;
builder.append(MigrationType::ns(), kNs);
builder.append(MigrationType::min(), kMin);
builder.append(MigrationType::max(), kMax);
- version.appendWithFieldForCommands(&builder, MigrationType::chunkVersionField.name());
- version.appendWithFieldForCommands(&builder, MigrationType::collectionVersionField.name());
builder.append(MigrationType::toShard(), kToShard.toString());
BSONObj obj = builder.obj();
@@ -208,14 +148,10 @@ TEST(MigrationTypeTest, MissingRequiredFromShardField) {
}
TEST(MigrationTypeTest, MissingRequiredToShardField) {
- const ChunkVersion version(1, 2, OID::gen());
-
BSONObjBuilder builder;
builder.append(MigrationType::ns(), kNs);
builder.append(MigrationType::min(), kMin);
builder.append(MigrationType::max(), kMax);
- version.appendWithFieldForCommands(&builder, MigrationType::chunkVersionField.name());
- version.appendWithFieldForCommands(&builder, MigrationType::collectionVersionField.name());
builder.append(MigrationType::fromShard(), kFromShard.toString());
BSONObj obj = builder.obj();
diff --git a/src/mongo/s/move_chunk_request.cpp b/src/mongo/s/move_chunk_request.cpp
index d26c0002d83..e16ae7307fd 100644
--- a/src/mongo/s/move_chunk_request.cpp
+++ b/src/mongo/s/move_chunk_request.cpp
@@ -184,14 +184,6 @@ bool MoveChunkRequest::operator==(const MoveChunkRequest& other) const {
return false;
if (_range != other._range)
return false;
- if (_maxChunkSizeBytes != other._maxChunkSizeBytes)
- return false;
- if (_secondaryThrottle != other._secondaryThrottle)
- return false;
- if (_waitForDelete != other._waitForDelete)
- return false;
- if (_takeDistLock != other._takeDistLock)
- return false;
return true;
}