/** * Copyright (C) 2016 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/db/s/balancer/migration_manager.h" #include #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/bson/util/bson_extract.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/db/client.h" #include "mongo/db/repl/repl_set_config.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/balancer/scoped_migration_request.h" #include "mongo/db/s/balancer/type_migration.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/move_chunk_request.h" #include "mongo/s/sharding_raii.h" #include "mongo/util/log.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/scopeguard.h" namespace mongo { using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; using std::shared_ptr; using std::vector; using str::stream; namespace { const char kChunkTooBig[] = "chunkTooBig"; // TODO: delete in 3.8 const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, Seconds(15)); /** * Parses the 'commandResponse' and converts it to a status to use as the outcome of the command. * Preserves backwards compatibility with 3.4 and earlier shards that, rather than use a ChunkTooBig * error code, include an extra field in the response. * * TODO: Delete in 3.8 */ Status extractMigrationStatusFromCommandResponse(const BSONObj& commandResponse) { Status commandStatus = getStatusFromCommandResult(commandResponse); if (!commandStatus.isOK()) { bool chunkTooBig = false; bsonExtractBooleanFieldWithDefault(commandResponse, kChunkTooBig, false, &chunkTooBig); if (chunkTooBig) { commandStatus = {ErrorCodes::ChunkTooBig, commandStatus.reason()}; } } return commandStatus; } /** * Returns whether the specified status is an error caused by stepdown of the primary config node * currently running the balancer. */ bool isErrorDueToConfigStepdown(Status status, bool isStopping) { return ((status == ErrorCodes::CallbackCanceled && isStopping) || status == ErrorCodes::BalancerInterrupted || status == ErrorCodes::InterruptedDueToReplStateChange); } } // namespace MigrationManager::MigrationManager(ServiceContext* serviceContext) : _serviceContext(serviceContext) {} MigrationManager::~MigrationManager() { // The migration manager must be completely quiesced at destruction time invariant(_activeMigrations.empty()); } MigrationStatuses MigrationManager::executeMigrationsForAutoBalance( OperationContext* opCtx, const vector& migrateInfos, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle, bool waitForDelete) { MigrationStatuses migrationStatuses; { std::map scopedMigrationRequests; vector>, MigrateInfo>> responses; for (const auto& migrateInfo : migrateInfos) { // Write a document to the config.migrations collection, in case this migration must be // recovered by the Balancer. Fail if the chunk is already moving. auto statusWithScopedMigrationRequest = ScopedMigrationRequest::writeMigration(opCtx, migrateInfo, waitForDelete); if (!statusWithScopedMigrationRequest.isOK()) { migrationStatuses.emplace(migrateInfo.getName(), std::move(statusWithScopedMigrationRequest.getStatus())); continue; } scopedMigrationRequests.emplace(migrateInfo.getName(), std::move(statusWithScopedMigrationRequest.getValue())); responses.emplace_back( _schedule(opCtx, migrateInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete), migrateInfo); } // Wait for all the scheduled migrations to complete. for (auto& response : responses) { auto notification = std::move(response.first); auto migrateInfo = std::move(response.second); const auto& remoteCommandResponse = notification->get(); auto it = scopedMigrationRequests.find(migrateInfo.getName()); invariant(it != scopedMigrationRequests.end()); Status commandStatus = _processRemoteCommandResponse(remoteCommandResponse, &it->second); migrationStatuses.emplace(migrateInfo.getName(), std::move(commandStatus)); } } invariant(migrationStatuses.size() == migrateInfos.size()); return migrationStatuses; } Status MigrationManager::executeManualMigration( OperationContext* opCtx, const MigrateInfo& migrateInfo, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle, bool waitForDelete) { _waitForRecovery(); // Write a document to the config.migrations collection, in case this migration must be // recovered by the Balancer. Fail if the chunk is already moving. auto statusWithScopedMigrationRequest = ScopedMigrationRequest::writeMigration(opCtx, migrateInfo, waitForDelete); if (!statusWithScopedMigrationRequest.isOK()) { return statusWithScopedMigrationRequest.getStatus(); } RemoteCommandResponse remoteCommandResponse = _schedule(opCtx, migrateInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete)->get(); auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, NamespaceString(migrateInfo.ns)); if (!scopedCMStatus.isOK()) { return scopedCMStatus.getStatus(); } const auto& scopedCM = scopedCMStatus.getValue(); auto chunk = scopedCM.cm()->findIntersectingChunkWithSimpleCollation(migrateInfo.minKey); invariant(chunk); Status commandStatus = _processRemoteCommandResponse( remoteCommandResponse, &statusWithScopedMigrationRequest.getValue()); // Migration calls can be interrupted after the metadata is committed but before the command // finishes the waitForDelete stage. Any failovers, therefore, must always cause the moveChunk // command to be retried so as to assure that the waitForDelete promise of a successful command // has been fulfilled. if (chunk->getShardId() == migrateInfo.to && commandStatus != ErrorCodes::BalancerInterrupted) { return Status::OK(); } return commandStatus; } void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* opCtx) { { stdx::lock_guard lock(_mutex); invariant(_state == State::kStopped); invariant(_migrationRecoveryMap.empty()); _state = State::kRecovering; } auto scopedGuard = MakeGuard([&] { _migrationRecoveryMap.clear(); _abandonActiveMigrationsAndEnableManager(opCtx); }); auto distLockManager = Grid::get(opCtx)->catalogClient(opCtx)->getDistLockManager(); // Load the active migrations from the config.migrations collection. auto statusWithMigrationsQueryResponse = Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, repl::ReadConcernLevel::kLocalReadConcern, NamespaceString(MigrationType::ConfigNS), BSONObj(), BSONObj(), boost::none); if (!statusWithMigrationsQueryResponse.isOK()) { log() << "Unable to read config.migrations collection documents for balancer migration" << " recovery. Abandoning balancer recovery." << causedBy(redact(statusWithMigrationsQueryResponse.getStatus())); return; } for (const BSONObj& migration : statusWithMigrationsQueryResponse.getValue().docs) { auto statusWithMigrationType = MigrationType::fromBSON(migration); if (!statusWithMigrationType.isOK()) { // The format of this migration document is incorrect. The balancer holds a distlock for // this migration, but without parsing the migration document we cannot identify which // distlock must be released. So we must release all distlocks. log() << "Unable to parse config.migrations document '" << redact(migration.toString()) << "' for balancer migration recovery. Abandoning balancer recovery." << causedBy(redact(statusWithMigrationType.getStatus())); return; } MigrationType migrateType = std::move(statusWithMigrationType.getValue()); auto it = _migrationRecoveryMap.find(NamespaceString(migrateType.getNss())); if (it == _migrationRecoveryMap.end()) { std::list list; it = _migrationRecoveryMap.insert(std::make_pair(migrateType.getNss(), list)).first; // Reacquire the matching distributed lock for this namespace. const std::string whyMessage(stream() << "Migrating chunk(s) in collection " << migrateType.getNss().ns()); auto statusWithDistLockHandle = distLockManager->tryLockWithLocalWriteConcern( opCtx, migrateType.getNss().ns(), whyMessage, _lockSessionID); if (!statusWithDistLockHandle.isOK()) { log() << "Failed to acquire distributed lock for collection '" << migrateType.getNss().ns() << "' during balancer recovery of an active migration. Abandoning" << " balancer recovery." << causedBy(redact(statusWithDistLockHandle.getStatus())); return; } } it->second.push_back(std::move(migrateType)); } scopedGuard.Dismiss(); } void MigrationManager::finishRecovery(OperationContext* opCtx, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle) { { stdx::lock_guard lock(_mutex); if (_state == State::kStopping) { _migrationRecoveryMap.clear(); return; } // If recovery was abandoned in startRecovery, then there is no more to do. if (_state == State::kEnabled) { invariant(_migrationRecoveryMap.empty()); return; } invariant(_state == State::kRecovering); } auto scopedGuard = MakeGuard([&] { _migrationRecoveryMap.clear(); _abandonActiveMigrationsAndEnableManager(opCtx); }); // Schedule recovered migrations. vector scopedMigrationRequests; vector>> responses; for (auto& nssAndMigrateInfos : _migrationRecoveryMap) { auto& nss = nssAndMigrateInfos.first; auto& migrateInfos = nssAndMigrateInfos.second; invariant(!migrateInfos.empty()); auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, nss); if (!scopedCMStatus.isOK()) { // This shouldn't happen because the collection was intact and sharded when the previous // config primary was active and the dist locks have been held by the balancer // throughout. Abort migration recovery. log() << "Unable to reload chunk metadata for collection '" << nss << "' during balancer recovery. Abandoning recovery." << causedBy(redact(scopedCMStatus.getStatus())); return; } const auto& scopedCM = scopedCMStatus.getValue(); int scheduledMigrations = 0; while (!migrateInfos.empty()) { auto migrationType = std::move(migrateInfos.front()); const auto migrationInfo = migrationType.toMigrateInfo(); auto waitForDelete = migrationType.getWaitForDelete(); migrateInfos.pop_front(); auto chunk = scopedCM.cm()->findIntersectingChunkWithSimpleCollation(migrationInfo.minKey); invariant(chunk); if (chunk->getShardId() != migrationInfo.from) { // Chunk is no longer on the source shard specified by this migration. Erase the // migration recovery document associated with it. ScopedMigrationRequest::createForRecovery(opCtx, nss, migrationInfo.minKey); continue; } scopedMigrationRequests.emplace_back( ScopedMigrationRequest::createForRecovery(opCtx, nss, migrationInfo.minKey)); scheduledMigrations++; responses.emplace_back(_schedule( opCtx, migrationInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete)); } // If no migrations were scheduled for this namespace, free the dist lock if (!scheduledMigrations) { Grid::get(opCtx)->catalogClient(opCtx)->getDistLockManager()->unlock( opCtx, _lockSessionID, nss.ns()); } } _migrationRecoveryMap.clear(); scopedGuard.Dismiss(); { stdx::lock_guard lock(_mutex); if (_state == State::kRecovering) { _state = State::kEnabled; _condVar.notify_all(); } } // Wait for each migration to finish, as usual. for (auto& response : responses) { response->get(); } } void MigrationManager::interruptAndDisableMigrations() { executor::TaskExecutor* const executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor(); stdx::lock_guard lock(_mutex); invariant(_state == State::kEnabled || _state == State::kRecovering); _state = State::kStopping; // Interrupt any active migrations with dist lock for (auto& cmsEntry : _activeMigrations) { auto& migrations = cmsEntry.second; for (auto& migration : migrations) { if (migration.callbackHandle) { executor->cancel(*migration.callbackHandle); } } } _checkDrained_inlock(); } void MigrationManager::drainActiveMigrations() { stdx::unique_lock lock(_mutex); if (_state == State::kStopped) return; invariant(_state == State::kStopping); _condVar.wait(lock, [this] { return _activeMigrations.empty(); }); _state = State::kStopped; } shared_ptr> MigrationManager::_schedule( OperationContext* opCtx, const MigrateInfo& migrateInfo, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle, bool waitForDelete) { const NamespaceString nss(migrateInfo.ns); // Ensure we are not stopped in order to avoid doing the extra work { stdx::lock_guard lock(_mutex); if (_state != State::kEnabled && _state != State::kRecovering) { return std::make_shared>( Status(ErrorCodes::BalancerInterrupted, "Migration cannot be executed because the balancer is not running")); } } const auto fromShardStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, migrateInfo.from); if (!fromShardStatus.isOK()) { return std::make_shared>( std::move(fromShardStatus.getStatus())); } const auto fromShard = fromShardStatus.getValue(); auto fromHostStatus = fromShard->getTargeter()->findHost( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}); if (!fromHostStatus.isOK()) { return std::make_shared>( std::move(fromHostStatus.getStatus())); } BSONObjBuilder builder; MoveChunkRequest::appendAsCommand( &builder, nss, migrateInfo.version, repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString(), migrateInfo.from, migrateInfo.to, ChunkRange(migrateInfo.minKey, migrateInfo.maxKey), maxChunkSizeBytes, secondaryThrottle, waitForDelete); stdx::lock_guard lock(_mutex); if (_state != State::kEnabled && _state != State::kRecovering) { return std::make_shared>( Status(ErrorCodes::BalancerInterrupted, "Migration cannot be executed because the balancer is not running")); } Migration migration(nss, builder.obj()); auto retVal = migration.completionNotification; _schedule_inlock(opCtx, fromHostStatus.getValue(), std::move(migration)); return retVal; } void MigrationManager::_schedule_inlock(OperationContext* opCtx, const HostAndPort& targetHost, Migration migration) { executor::TaskExecutor* const executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); const NamespaceString nss(migration.nss); auto it = _activeMigrations.find(nss); if (it == _activeMigrations.end()) { const std::string whyMessage(stream() << "Migrating chunk(s) in collection " << nss.ns()); // Acquire the collection distributed lock (blocking call) auto statusWithDistLockHandle = Grid::get(opCtx)->catalogClient(opCtx)->getDistLockManager()->lockWithSessionID( opCtx, nss.ns(), whyMessage, _lockSessionID, DistLockManager::kSingleLockAttemptTimeout); if (!statusWithDistLockHandle.isOK()) { migration.completionNotification->set( Status(statusWithDistLockHandle.getStatus().code(), stream() << "Could not acquire collection lock for " << nss.ns() << " to migrate chunks, due to " << statusWithDistLockHandle.getStatus().reason())); return; } it = _activeMigrations.insert(std::make_pair(nss, MigrationsList())).first; } auto migrations = &it->second; // Add ourselves to the list of migrations on this collection migrations->push_front(std::move(migration)); auto itMigration = migrations->begin(); const RemoteCommandRequest remoteRequest( targetHost, NamespaceString::kAdminDb.toString(), itMigration->moveChunkCmdObj, opCtx); StatusWith callbackHandleWithStatus = executor->scheduleRemoteCommand( remoteRequest, [this, itMigration](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { Client::initThread(getThreadName().c_str()); ON_BLOCK_EXIT([&] { Client::destroy(); }); auto opCtx = cc().makeOperationContext(); stdx::lock_guard lock(_mutex); _complete_inlock(opCtx.get(), itMigration, args.response); }); if (callbackHandleWithStatus.isOK()) { itMigration->callbackHandle = std::move(callbackHandleWithStatus.getValue()); return; } _complete_inlock(opCtx, itMigration, std::move(callbackHandleWithStatus.getStatus())); } void MigrationManager::_complete_inlock(OperationContext* opCtx, MigrationsList::iterator itMigration, const RemoteCommandResponse& remoteCommandResponse) { const NamespaceString nss(itMigration->nss); // Make sure to signal the notification last, after the distributed lock is freed, so that we // don't have the race condition where a subsequently scheduled migration finds the dist lock // still acquired. auto notificationToSignal = itMigration->completionNotification; auto it = _activeMigrations.find(nss); invariant(it != _activeMigrations.end()); auto migrations = &it->second; migrations->erase(itMigration); if (migrations->empty()) { Grid::get(opCtx)->catalogClient(opCtx)->getDistLockManager()->unlock( opCtx, _lockSessionID, nss.ns()); _activeMigrations.erase(it); _checkDrained_inlock(); } notificationToSignal->set(remoteCommandResponse); } void MigrationManager::_checkDrained_inlock() { if (_state == State::kEnabled || _state == State::kRecovering) { return; } invariant(_state == State::kStopping); if (_activeMigrations.empty()) { _condVar.notify_all(); } } void MigrationManager::_waitForRecovery() { stdx::unique_lock lock(_mutex); _condVar.wait(lock, [this] { return _state != State::kRecovering; }); } void MigrationManager::_abandonActiveMigrationsAndEnableManager(OperationContext* opCtx) { stdx::unique_lock lock(_mutex); if (_state == State::kStopping) { // The balancer was interrupted. Let the next balancer recover the state. return; } invariant(_state == State::kRecovering); auto catalogClient = Grid::get(opCtx)->catalogClient(opCtx); // Unlock all balancer distlocks we aren't using anymore. auto distLockManager = catalogClient->getDistLockManager(); distLockManager->unlockAll(opCtx, distLockManager->getProcessID()); // Clear the config.migrations collection so that those chunks can be scheduled for migration // again. catalogClient->removeConfigDocuments( opCtx, MigrationType::ConfigNS, BSONObj(), kMajorityWriteConcern); _state = State::kEnabled; _condVar.notify_all(); } Status MigrationManager::_processRemoteCommandResponse( const RemoteCommandResponse& remoteCommandResponse, ScopedMigrationRequest* scopedMigrationRequest) { stdx::lock_guard lock(_mutex); Status commandStatus(ErrorCodes::InternalError, "Uninitialized value."); // Check for local errors sending the remote command caused by stepdown. if (isErrorDueToConfigStepdown(remoteCommandResponse.status, _state != State::kEnabled && _state != State::kRecovering)) { scopedMigrationRequest->keepDocumentOnDestruct(); return {ErrorCodes::BalancerInterrupted, stream() << "Migration interrupted because the balancer is stopping." << " Command status: " << remoteCommandResponse.status.toString()}; } if (!remoteCommandResponse.isOK()) { commandStatus = remoteCommandResponse.status; } else { // TODO: delete in 3.8 commandStatus = extractMigrationStatusFromCommandResponse(remoteCommandResponse.data); } if (!Shard::shouldErrorBePropagated(commandStatus.code())) { commandStatus = {ErrorCodes::OperationFailed, stream() << "moveChunk command failed on source shard." << causedBy(commandStatus)}; } // Any failure to remove the migration document should be because the config server is // stepping/shutting down. In this case we must fail the moveChunk command with a retryable // error so that the caller does not move on to other distlock requiring operations that could // fail when the balancer recovers and takes distlocks for migration recovery. Status status = scopedMigrationRequest->tryToRemoveMigration(); if (!status.isOK()) { commandStatus = { ErrorCodes::BalancerInterrupted, stream() << "Migration interrupted because the balancer is stopping" << " and failed to remove the config.migrations document." << " Command status: " << (commandStatus.isOK() ? status.toString() : commandStatus.toString())}; } return commandStatus; } MigrationManager::Migration::Migration(NamespaceString inNss, BSONObj inMoveChunkCmdObj) : nss(std::move(inNss)), moveChunkCmdObj(std::move(inMoveChunkCmdObj)), completionNotification(std::make_shared>()) {} MigrationManager::Migration::~Migration() { invariant(completionNotification); } } // namespace mongo