diff options
Diffstat (limited to 'src/mongo/db/s/balancer/migration_manager.cpp')
-rw-r--r-- | src/mongo/db/s/balancer/migration_manager.cpp | 88 |
1 files changed, 45 insertions, 43 deletions
diff --git a/src/mongo/db/s/balancer/migration_manager.cpp b/src/mongo/db/s/balancer/migration_manager.cpp index 7882201e8c2..7f267b97e67 100644 --- a/src/mongo/db/s/balancer/migration_manager.cpp +++ b/src/mongo/db/s/balancer/migration_manager.cpp @@ -111,7 +111,7 @@ MigrationManager::~MigrationManager() { } MigrationStatuses MigrationManager::executeMigrationsForAutoBalance( - OperationContext* txn, + OperationContext* opCtx, const vector<MigrateInfo>& migrateInfos, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle, @@ -127,7 +127,7 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance( // Write a document to the config.migrations collection, in case this migration must be // recovered by the Balancer. Fail if the chunk is already moving. auto statusWithScopedMigrationRequest = - ScopedMigrationRequest::writeMigration(txn, migrateInfo, waitForDelete); + ScopedMigrationRequest::writeMigration(opCtx, migrateInfo, waitForDelete); if (!statusWithScopedMigrationRequest.isOK()) { migrationStatuses.emplace(migrateInfo.getName(), std::move(statusWithScopedMigrationRequest.getStatus())); @@ -137,7 +137,7 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance( std::move(statusWithScopedMigrationRequest.getValue())); responses.emplace_back( - _schedule(txn, migrateInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete), + _schedule(opCtx, migrateInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete), migrateInfo); } @@ -162,7 +162,7 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance( } Status MigrationManager::executeManualMigration( - OperationContext* txn, + OperationContext* opCtx, const MigrateInfo& migrateInfo, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle, @@ -172,15 +172,15 @@ Status MigrationManager::executeManualMigration( // Write a document to the config.migrations collection, in case this migration must be // recovered by the Balancer. Fail if the chunk is already moving. auto statusWithScopedMigrationRequest = - ScopedMigrationRequest::writeMigration(txn, migrateInfo, waitForDelete); + ScopedMigrationRequest::writeMigration(opCtx, migrateInfo, waitForDelete); if (!statusWithScopedMigrationRequest.isOK()) { return statusWithScopedMigrationRequest.getStatus(); } RemoteCommandResponse remoteCommandResponse = - _schedule(txn, migrateInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete)->get(); + _schedule(opCtx, migrateInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete)->get(); - auto scopedCMStatus = ScopedChunkManager::refreshAndGet(txn, NamespaceString(migrateInfo.ns)); + auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, NamespaceString(migrateInfo.ns)); if (!scopedCMStatus.isOK()) { return scopedCMStatus.getStatus(); } @@ -204,7 +204,7 @@ Status MigrationManager::executeManualMigration( return commandStatus; } -void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* txn) { +void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* opCtx) { { stdx::lock_guard<stdx::mutex> lock(_mutex); invariant(_state == State::kStopped); @@ -214,15 +214,15 @@ void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* txn) { auto scopedGuard = MakeGuard([&] { _migrationRecoveryMap.clear(); - _abandonActiveMigrationsAndEnableManager(txn); + _abandonActiveMigrationsAndEnableManager(opCtx); }); - auto distLockManager = Grid::get(txn)->catalogClient(txn)->getDistLockManager(); + auto distLockManager = Grid::get(opCtx)->catalogClient(opCtx)->getDistLockManager(); // Load the active migrations from the config.migrations collection. auto statusWithMigrationsQueryResponse = - Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - txn, + Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, repl::ReadConcernLevel::kLocalReadConcern, NamespaceString(MigrationType::ConfigNS), @@ -260,7 +260,7 @@ void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* txn) { << migrateType.getNss().ns()); auto statusWithDistLockHandle = distLockManager->tryLockWithLocalWriteConcern( - txn, migrateType.getNss().ns(), whyMessage, _lockSessionID); + opCtx, migrateType.getNss().ns(), whyMessage, _lockSessionID); if (!statusWithDistLockHandle.isOK()) { log() << "Failed to acquire distributed lock for collection '" << migrateType.getNss().ns() @@ -277,7 +277,7 @@ void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* txn) { scopedGuard.Dismiss(); } -void MigrationManager::finishRecovery(OperationContext* txn, +void MigrationManager::finishRecovery(OperationContext* opCtx, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle) { { @@ -298,7 +298,7 @@ void MigrationManager::finishRecovery(OperationContext* txn, auto scopedGuard = MakeGuard([&] { _migrationRecoveryMap.clear(); - _abandonActiveMigrationsAndEnableManager(txn); + _abandonActiveMigrationsAndEnableManager(opCtx); }); // Schedule recovered migrations. @@ -310,7 +310,7 @@ void MigrationManager::finishRecovery(OperationContext* txn, auto& migrateInfos = nssAndMigrateInfos.second; invariant(!migrateInfos.empty()); - auto scopedCMStatus = ScopedChunkManager::refreshAndGet(txn, nss); + auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, nss); if (!scopedCMStatus.isOK()) { // This shouldn't happen because the collection was intact and sharded when the previous // config primary was active and the dist locks have been held by the balancer @@ -338,23 +338,23 @@ void MigrationManager::finishRecovery(OperationContext* txn, if (chunk->getShardId() != migrationInfo.from) { // Chunk is no longer on the source shard specified by this migration. Erase the // migration recovery document associated with it. - ScopedMigrationRequest::createForRecovery(txn, nss, migrationInfo.minKey); + ScopedMigrationRequest::createForRecovery(opCtx, nss, migrationInfo.minKey); continue; } scopedMigrationRequests.emplace_back( - ScopedMigrationRequest::createForRecovery(txn, nss, migrationInfo.minKey)); + ScopedMigrationRequest::createForRecovery(opCtx, nss, migrationInfo.minKey)); scheduledMigrations++; - responses.emplace_back( - _schedule(txn, migrationInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete)); + responses.emplace_back(_schedule( + opCtx, migrationInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete)); } // If no migrations were scheduled for this namespace, free the dist lock if (!scheduledMigrations) { - Grid::get(txn)->catalogClient(txn)->getDistLockManager()->unlock( - txn, _lockSessionID, nss.ns()); + Grid::get(opCtx)->catalogClient(opCtx)->getDistLockManager()->unlock( + opCtx, _lockSessionID, nss.ns()); } } @@ -408,7 +408,7 @@ void MigrationManager::drainActiveMigrations() { } shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule( - OperationContext* txn, + OperationContext* opCtx, const MigrateInfo& migrateInfo, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle, @@ -425,15 +425,16 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule( } } - const auto fromShardStatus = Grid::get(txn)->shardRegistry()->getShard(txn, migrateInfo.from); + const auto fromShardStatus = + Grid::get(opCtx)->shardRegistry()->getShard(opCtx, migrateInfo.from); if (!fromShardStatus.isOK()) { return std::make_shared<Notification<RemoteCommandResponse>>( std::move(fromShardStatus.getStatus())); } const auto fromShard = fromShardStatus.getValue(); - auto fromHostStatus = - fromShard->getTargeter()->findHost(txn, ReadPreferenceSetting{ReadPreference::PrimaryOnly}); + auto fromHostStatus = fromShard->getTargeter()->findHost( + opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}); if (!fromHostStatus.isOK()) { return std::make_shared<Notification<RemoteCommandResponse>>( std::move(fromHostStatus.getStatus())); @@ -444,7 +445,7 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule( &builder, nss, migrateInfo.version, - repl::ReplicationCoordinator::get(txn)->getConfig().getConnectionString(), + repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString(), migrateInfo.from, migrateInfo.to, ChunkRange(migrateInfo.minKey, migrateInfo.maxKey), @@ -464,15 +465,16 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule( auto retVal = migration.completionNotification; - _schedule_inlock(txn, fromHostStatus.getValue(), std::move(migration)); + _schedule_inlock(opCtx, fromHostStatus.getValue(), std::move(migration)); return retVal; } -void MigrationManager::_schedule_inlock(OperationContext* txn, +void MigrationManager::_schedule_inlock(OperationContext* opCtx, const HostAndPort& targetHost, Migration migration) { - executor::TaskExecutor* const executor = Grid::get(txn)->getExecutorPool()->getFixedExecutor(); + executor::TaskExecutor* const executor = + Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); const NamespaceString nss(migration.nss); @@ -482,8 +484,8 @@ void MigrationManager::_schedule_inlock(OperationContext* txn, // Acquire the collection distributed lock (blocking call) auto statusWithDistLockHandle = - Grid::get(txn)->catalogClient(txn)->getDistLockManager()->lockWithSessionID( - txn, + Grid::get(opCtx)->catalogClient(opCtx)->getDistLockManager()->lockWithSessionID( + opCtx, nss.ns(), whyMessage, _lockSessionID, @@ -508,7 +510,7 @@ void MigrationManager::_schedule_inlock(OperationContext* txn, auto itMigration = migrations->begin(); const RemoteCommandRequest remoteRequest( - targetHost, NamespaceString::kAdminDb.toString(), itMigration->moveChunkCmdObj, txn); + targetHost, NamespaceString::kAdminDb.toString(), itMigration->moveChunkCmdObj, opCtx); StatusWith<executor::TaskExecutor::CallbackHandle> callbackHandleWithStatus = executor->scheduleRemoteCommand( @@ -516,10 +518,10 @@ void MigrationManager::_schedule_inlock(OperationContext* txn, [this, itMigration](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { Client::initThread(getThreadName().c_str()); ON_BLOCK_EXIT([&] { Client::destroy(); }); - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); stdx::lock_guard<stdx::mutex> lock(_mutex); - _complete_inlock(txn.get(), itMigration, args.response); + _complete_inlock(opCtx.get(), itMigration, args.response); }); if (callbackHandleWithStatus.isOK()) { @@ -527,10 +529,10 @@ void MigrationManager::_schedule_inlock(OperationContext* txn, return; } - _complete_inlock(txn, itMigration, std::move(callbackHandleWithStatus.getStatus())); + _complete_inlock(opCtx, itMigration, std::move(callbackHandleWithStatus.getStatus())); } -void MigrationManager::_complete_inlock(OperationContext* txn, +void MigrationManager::_complete_inlock(OperationContext* opCtx, MigrationsList::iterator itMigration, const RemoteCommandResponse& remoteCommandResponse) { const NamespaceString nss(itMigration->nss); @@ -547,8 +549,8 @@ void MigrationManager::_complete_inlock(OperationContext* txn, migrations->erase(itMigration); if (migrations->empty()) { - Grid::get(txn)->catalogClient(txn)->getDistLockManager()->unlock( - txn, _lockSessionID, nss.ns()); + Grid::get(opCtx)->catalogClient(opCtx)->getDistLockManager()->unlock( + opCtx, _lockSessionID, nss.ns()); _activeMigrations.erase(it); _checkDrained_inlock(); } @@ -572,7 +574,7 @@ void MigrationManager::_waitForRecovery() { _condVar.wait(lock, [this] { return _state != State::kRecovering; }); } -void MigrationManager::_abandonActiveMigrationsAndEnableManager(OperationContext* txn) { +void MigrationManager::_abandonActiveMigrationsAndEnableManager(OperationContext* opCtx) { stdx::unique_lock<stdx::mutex> lock(_mutex); if (_state == State::kStopping) { // The balancer was interrupted. Let the next balancer recover the state. @@ -580,16 +582,16 @@ void MigrationManager::_abandonActiveMigrationsAndEnableManager(OperationContext } invariant(_state == State::kRecovering); - auto catalogClient = Grid::get(txn)->catalogClient(txn); + auto catalogClient = Grid::get(opCtx)->catalogClient(opCtx); // Unlock all balancer distlocks we aren't using anymore. auto distLockManager = catalogClient->getDistLockManager(); - distLockManager->unlockAll(txn, distLockManager->getProcessID()); + distLockManager->unlockAll(opCtx, distLockManager->getProcessID()); // Clear the config.migrations collection so that those chunks can be scheduled for migration // again. catalogClient->removeConfigDocuments( - txn, MigrationType::ConfigNS, BSONObj(), kMajorityWriteConcern); + opCtx, MigrationType::ConfigNS, BSONObj(), kMajorityWriteConcern); _state = State::kEnabled; _condVar.notify_all(); |