diff options
Diffstat (limited to 'src/mongo/s/balancer/migration_manager.cpp')
-rw-r--r-- | src/mongo/s/balancer/migration_manager.cpp | 83 |
1 files changed, 53 insertions, 30 deletions
diff --git a/src/mongo/s/balancer/migration_manager.cpp b/src/mongo/s/balancer/migration_manager.cpp index aa5beb38729..4bd9ae0950a 100644 --- a/src/mongo/s/balancer/migration_manager.cpp +++ b/src/mongo/s/balancer/migration_manager.cpp @@ -104,13 +104,13 @@ Status extractMigrationStatusFromRemoteCommandResponse(const RemoteCommandRespon * Blocking call to acquire the distributed collection lock for the specified namespace. */ StatusWith<DistLockHandle> acquireDistLock(OperationContext* txn, - const OID& clusterIdentity, + const OID& lockSessionID, const NamespaceString& nss) { const std::string whyMessage(stream() << "Migrating chunk(s) in collection " << nss.ns()); auto statusWithDistLockHandle = Grid::get(txn)->catalogClient(txn)->getDistLockManager()->lockWithSessionID( - txn, nss.ns(), whyMessage, clusterIdentity, DistLockManager::kSingleLockAttemptTimeout); + txn, nss.ns(), whyMessage, lockSessionID, DistLockManager::kSingleLockAttemptTimeout); if (!statusWithDistLockHandle.isOK()) { // If we get LockBusy while trying to acquire the collection distributed lock, this implies @@ -135,7 +135,7 @@ StatusWith<DistLockHandle> acquireDistLock(OperationContext* txn, } // namespace MigrationManager::MigrationManager(ServiceContext* serviceContext) - : _serviceContext(serviceContext) {} + : _serviceContext(serviceContext), _lockSessionID(OID::gen()) {} MigrationManager::~MigrationManager() { // The migration manager must be completely quiesced at destruction time @@ -283,29 +283,20 @@ Status MigrationManager::executeManualMigration( return status; } -void MigrationManager::startRecovery() { - stdx::lock_guard<stdx::mutex> lock(_mutex); - invariant(_state == State::kStopped); - _state = State::kRecovering; -} - -void MigrationManager::finishRecovery(OperationContext* txn, - const OID& clusterIdentity, - uint64_t maxChunkSizeBytes, - const MigrationSecondaryThrottleOptions& secondaryThrottle, - bool waitForDelete) { +void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* txn) { { stdx::lock_guard<stdx::mutex> lock(_mutex); - invariant(_state == State::kRecovering); - if (!_clusterIdentity.isSet()) { - _clusterIdentity = clusterIdentity; - } - invariant(_clusterIdentity == clusterIdentity); + invariant(_state == State::kStopped); + invariant(_migrationRecoveryMap.empty()); + _state = State::kRecovering; } - // Load the active migrations from the config.migrations collection. - vector<MigrateInfo> migrateInfos; + auto scopedGuard = MakeGuard([&] { + _migrationRecoveryMap.clear(); + _abandonActiveMigrationsAndEnableManager(txn); + }); + // Load the active migrations from the config.migrations collection. auto statusWithMigrationsQueryResponse = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( txn, @@ -318,10 +309,8 @@ void MigrationManager::finishRecovery(OperationContext* txn, if (!statusWithMigrationsQueryResponse.isOK()) { warning() << "Unable to read config.migrations collection documents for balancer migration" - << " recovery. Abandoning recovery." + << " recovery. Abandoning balancer recovery." << causedBy(redact(statusWithMigrationsQueryResponse.getStatus())); - - _abandonActiveMigrationsAndEnableManager(txn); return; } @@ -331,11 +320,9 @@ void MigrationManager::finishRecovery(OperationContext* txn, // The format of this migration document is incorrect. The balancer holds a distlock for // this migration, but without parsing the migration document we cannot identify which // distlock must be released. So we must release all distlocks. - warning() << "Unable to parse config.migrations collection documents for balancer" - << " migration recovery. Abandoning recovery." + warning() << "Unable to parse config.migrations document '" << migration + << "' for balancer migration recovery. Abandoning balancer recovery." << causedBy(redact(statusWithMigrationType.getStatus())); - - _abandonActiveMigrationsAndEnableManager(txn); return; } MigrateInfo migrateInfo = statusWithMigrationType.getValue().toMigrateInfo(); @@ -345,11 +332,47 @@ void MigrationManager::finishRecovery(OperationContext* txn, std::list<MigrateInfo> list; it = _migrationRecoveryMap.insert(std::make_pair(NamespaceString(migrateInfo.ns), list)) .first; + + // Reacquire the matching distributed lock for this namespace. + const std::string whyMessage(stream() << "Migrating chunk(s) in collection " + << migrateInfo.ns); + auto statusWithDistLockHandle = + Grid::get(txn) + ->catalogClient(txn) + ->getDistLockManager() + ->tryLockWithLocalWriteConcern(txn, migrateInfo.ns, whyMessage, _lockSessionID); + if (!statusWithDistLockHandle.isOK() && + statusWithDistLockHandle.getStatus() != ErrorCodes::LockStateChangeFailed) { + // LockStateChangeFailed is alright because that should mean a 3.2 shard has it for + // the active migration. + warning() << "Failed to acquire distributed lock for collection '" << migrateInfo.ns + << "' during balancer recovery of an active migration. Abandoning" + << " balancer recovery." + << causedBy(redact(statusWithDistLockHandle.getStatus())); + return; + } } it->second.push_back(std::move(migrateInfo)); } + scopedGuard.Dismiss(); +} + +void MigrationManager::finishRecovery(OperationContext* txn, + uint64_t maxChunkSizeBytes, + const MigrationSecondaryThrottleOptions& secondaryThrottle, + bool waitForDelete) { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + // Check if recovery was abandoned in startRecovery, in which case there is no more to do. + if (_state == State::kEnabled) { + invariant(_migrationRecoveryMap.empty()); + return; + } + invariant(_state == State::kRecovering); + } + // Schedule recovered migrations. vector<ScopedMigrationRequest> scopedMigrationRequests; vector<shared_ptr<Notification<Status>>> responses; @@ -386,7 +409,7 @@ void MigrationManager::finishRecovery(OperationContext* txn, txn, NamespaceString(itMigrateInfo->ns), itMigrateInfo->minKey); if (nssAndMigrateInfos.second.size() == 1) { Grid::get(txn)->catalogClient(txn)->getDistLockManager()->unlock( - txn, _clusterIdentity, itMigrateInfo->ns); + txn, _lockSessionID, itMigrateInfo->ns); } itMigrateInfo = nssAndMigrateInfos.second.erase(itMigrateInfo); } else { @@ -568,7 +591,7 @@ void MigrationManager::_scheduleWithDistLock_inlock(OperationContext* txn, auto it = _activeMigrationsWithDistLock.find(nss); if (it == _activeMigrationsWithDistLock.end()) { // Acquire the collection distributed lock (blocking call) - auto distLockHandleStatus = acquireDistLock(txn, _clusterIdentity, nss); + auto distLockHandleStatus = acquireDistLock(txn, _lockSessionID, nss); if (!distLockHandleStatus.isOK()) { migration.completionNotification->set(distLockHandleStatus.getStatus()); return; |