summaryrefslogtreecommitdiff
path: root/src/mongo/s/balancer/migration_manager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/balancer/migration_manager.cpp')
-rw-r--r--src/mongo/s/balancer/migration_manager.cpp83
1 files changed, 53 insertions, 30 deletions
diff --git a/src/mongo/s/balancer/migration_manager.cpp b/src/mongo/s/balancer/migration_manager.cpp
index aa5beb38729..4bd9ae0950a 100644
--- a/src/mongo/s/balancer/migration_manager.cpp
+++ b/src/mongo/s/balancer/migration_manager.cpp
@@ -104,13 +104,13 @@ Status extractMigrationStatusFromRemoteCommandResponse(const RemoteCommandRespon
* Blocking call to acquire the distributed collection lock for the specified namespace.
*/
StatusWith<DistLockHandle> acquireDistLock(OperationContext* txn,
- const OID& clusterIdentity,
+ const OID& lockSessionID,
const NamespaceString& nss) {
const std::string whyMessage(stream() << "Migrating chunk(s) in collection " << nss.ns());
auto statusWithDistLockHandle =
Grid::get(txn)->catalogClient(txn)->getDistLockManager()->lockWithSessionID(
- txn, nss.ns(), whyMessage, clusterIdentity, DistLockManager::kSingleLockAttemptTimeout);
+ txn, nss.ns(), whyMessage, lockSessionID, DistLockManager::kSingleLockAttemptTimeout);
if (!statusWithDistLockHandle.isOK()) {
// If we get LockBusy while trying to acquire the collection distributed lock, this implies
@@ -135,7 +135,7 @@ StatusWith<DistLockHandle> acquireDistLock(OperationContext* txn,
} // namespace
MigrationManager::MigrationManager(ServiceContext* serviceContext)
- : _serviceContext(serviceContext) {}
+ : _serviceContext(serviceContext), _lockSessionID(OID::gen()) {}
MigrationManager::~MigrationManager() {
// The migration manager must be completely quiesced at destruction time
@@ -283,29 +283,20 @@ Status MigrationManager::executeManualMigration(
return status;
}
-void MigrationManager::startRecovery() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- invariant(_state == State::kStopped);
- _state = State::kRecovering;
-}
-
-void MigrationManager::finishRecovery(OperationContext* txn,
- const OID& clusterIdentity,
- uint64_t maxChunkSizeBytes,
- const MigrationSecondaryThrottleOptions& secondaryThrottle,
- bool waitForDelete) {
+void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* txn) {
{
stdx::lock_guard<stdx::mutex> lock(_mutex);
- invariant(_state == State::kRecovering);
- if (!_clusterIdentity.isSet()) {
- _clusterIdentity = clusterIdentity;
- }
- invariant(_clusterIdentity == clusterIdentity);
+ invariant(_state == State::kStopped);
+ invariant(_migrationRecoveryMap.empty());
+ _state = State::kRecovering;
}
- // Load the active migrations from the config.migrations collection.
- vector<MigrateInfo> migrateInfos;
+ auto scopedGuard = MakeGuard([&] {
+ _migrationRecoveryMap.clear();
+ _abandonActiveMigrationsAndEnableManager(txn);
+ });
+ // Load the active migrations from the config.migrations collection.
auto statusWithMigrationsQueryResponse =
Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
txn,
@@ -318,10 +309,8 @@ void MigrationManager::finishRecovery(OperationContext* txn,
if (!statusWithMigrationsQueryResponse.isOK()) {
warning() << "Unable to read config.migrations collection documents for balancer migration"
- << " recovery. Abandoning recovery."
+ << " recovery. Abandoning balancer recovery."
<< causedBy(redact(statusWithMigrationsQueryResponse.getStatus()));
-
- _abandonActiveMigrationsAndEnableManager(txn);
return;
}
@@ -331,11 +320,9 @@ void MigrationManager::finishRecovery(OperationContext* txn,
// The format of this migration document is incorrect. The balancer holds a distlock for
// this migration, but without parsing the migration document we cannot identify which
// distlock must be released. So we must release all distlocks.
- warning() << "Unable to parse config.migrations collection documents for balancer"
- << " migration recovery. Abandoning recovery."
+ warning() << "Unable to parse config.migrations document '" << migration
+ << "' for balancer migration recovery. Abandoning balancer recovery."
<< causedBy(redact(statusWithMigrationType.getStatus()));
-
- _abandonActiveMigrationsAndEnableManager(txn);
return;
}
MigrateInfo migrateInfo = statusWithMigrationType.getValue().toMigrateInfo();
@@ -345,11 +332,47 @@ void MigrationManager::finishRecovery(OperationContext* txn,
std::list<MigrateInfo> list;
it = _migrationRecoveryMap.insert(std::make_pair(NamespaceString(migrateInfo.ns), list))
.first;
+
+ // Reacquire the matching distributed lock for this namespace.
+ const std::string whyMessage(stream() << "Migrating chunk(s) in collection "
+ << migrateInfo.ns);
+ auto statusWithDistLockHandle =
+ Grid::get(txn)
+ ->catalogClient(txn)
+ ->getDistLockManager()
+ ->tryLockWithLocalWriteConcern(txn, migrateInfo.ns, whyMessage, _lockSessionID);
+ if (!statusWithDistLockHandle.isOK() &&
+ statusWithDistLockHandle.getStatus() != ErrorCodes::LockStateChangeFailed) {
+ // LockStateChangeFailed is alright because that should mean a 3.2 shard has it for
+ // the active migration.
+ warning() << "Failed to acquire distributed lock for collection '" << migrateInfo.ns
+ << "' during balancer recovery of an active migration. Abandoning"
+ << " balancer recovery."
+ << causedBy(redact(statusWithDistLockHandle.getStatus()));
+ return;
+ }
}
it->second.push_back(std::move(migrateInfo));
}
+ scopedGuard.Dismiss();
+}
+
+void MigrationManager::finishRecovery(OperationContext* txn,
+ uint64_t maxChunkSizeBytes,
+ const MigrationSecondaryThrottleOptions& secondaryThrottle,
+ bool waitForDelete) {
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ // Check if recovery was abandoned in startRecovery, in which case there is no more to do.
+ if (_state == State::kEnabled) {
+ invariant(_migrationRecoveryMap.empty());
+ return;
+ }
+ invariant(_state == State::kRecovering);
+ }
+
// Schedule recovered migrations.
vector<ScopedMigrationRequest> scopedMigrationRequests;
vector<shared_ptr<Notification<Status>>> responses;
@@ -386,7 +409,7 @@ void MigrationManager::finishRecovery(OperationContext* txn,
txn, NamespaceString(itMigrateInfo->ns), itMigrateInfo->minKey);
if (nssAndMigrateInfos.second.size() == 1) {
Grid::get(txn)->catalogClient(txn)->getDistLockManager()->unlock(
- txn, _clusterIdentity, itMigrateInfo->ns);
+ txn, _lockSessionID, itMigrateInfo->ns);
}
itMigrateInfo = nssAndMigrateInfos.second.erase(itMigrateInfo);
} else {
@@ -568,7 +591,7 @@ void MigrationManager::_scheduleWithDistLock_inlock(OperationContext* txn,
auto it = _activeMigrationsWithDistLock.find(nss);
if (it == _activeMigrationsWithDistLock.end()) {
// Acquire the collection distributed lock (blocking call)
- auto distLockHandleStatus = acquireDistLock(txn, _clusterIdentity, nss);
+ auto distLockHandleStatus = acquireDistLock(txn, _lockSessionID, nss);
if (!distLockHandleStatus.isOK()) {
migration.completionNotification->set(distLockHandleStatus.getStatus());
return;