diff options
author | Dianna Hohensee <dianna.hohensee@10gen.com> | 2016-10-05 14:48:54 -0400 |
---|---|---|
committer | Dianna Hohensee <dianna.hohensee@10gen.com> | 2016-10-17 09:49:29 -0400 |
commit | af2e05b0424b99ee162fe2ef0f1ecf6c687b06f9 (patch) | |
tree | b7e53b434396e56cabd75a90fdf70d36cda6ef2e /src | |
parent | cfb4014c9265992c8760e37938198d7dd8a38b7e (diff) | |
download | mongo-af2e05b0424b99ee162fe2ef0f1ecf6c687b06f9.tar.gz |
SERVER-26307 Differentiate between config and shard command errors in MigrationManager moveChunk commands
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/balancer/migration_manager.cpp | 177 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/migration_manager.h | 23 |
2 files changed, 110 insertions, 90 deletions
diff --git a/src/mongo/db/s/balancer/migration_manager.cpp b/src/mongo/db/s/balancer/migration_manager.cpp index e1c033669d7..726137f38ff 100644 --- a/src/mongo/db/s/balancer/migration_manager.cpp +++ b/src/mongo/db/s/balancer/migration_manager.cpp @@ -68,30 +68,16 @@ const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, Seconds(15)); /** - * Parses the specified asynchronous command response and converts it to status to use as outcome of - * an asynchronous migration command. It is necessary for two reasons: - * - Preserve backwards compatibility with 3.2 and earlier, where the move chunk command instead of - * returning a ChunkTooBig status includes an extra field in the response. - * - Convert CallbackCanceled errors into BalancerInterrupted for the cases where the migration - * manager is being stopped at replica set stepdown. This return code allows the mongos calling - * logic to retry the operation on a new primary. + * Parses the 'commandResponse' and converts it to a status to use as the outcome of the command. + * Preserves backwards compatibility with 3.2 and earlier shards that, rather than use a ChunkTooBig + * error code, include an extra field in the response. */ -Status extractMigrationStatusFromRemoteCommandResponse(const RemoteCommandResponse& response, - bool isStopping) { - if (!response.isOK()) { - if (response.status == ErrorCodes::CallbackCanceled && isStopping) { - return {ErrorCodes::BalancerInterrupted, - "Migration interrupted because the balancer is stopping"}; - } - - return response.status; - } - - Status commandStatus = getStatusFromCommandResult(response.data); +Status extractMigrationStatusFromCommandResponse(const BSONObj& commandResponse) { + Status commandStatus = getStatusFromCommandResult(commandResponse); if (!commandStatus.isOK()) { bool chunkTooBig = false; - bsonExtractBooleanFieldWithDefault(response.data, kChunkTooBig, false, &chunkTooBig); + bsonExtractBooleanFieldWithDefault(commandResponse, kChunkTooBig, false, &chunkTooBig); if (chunkTooBig) { commandStatus = {ErrorCodes::ChunkTooBig, commandStatus.reason()}; } @@ -136,11 +122,10 @@ StatusWith<DistLockHandle> acquireDistLock(OperationContext* txn, * Returns whether the specified status is an error caused by stepdown of the primary config node * currently running the balancer. */ -bool isErrorDueToBalancerStepdown(Status status) { - return (status == ErrorCodes::BalancerInterrupted || - status == ErrorCodes::InterruptedAtShutdown || - status == ErrorCodes::InterruptedDueToReplStateChange || - ErrorCodes::isShutdownError(status.code())); +bool isErrorDueToConfigStepdown(Status status, bool isStopping) { + return ((status == ErrorCodes::CallbackCanceled && isStopping) || + status == ErrorCodes::BalancerInterrupted || + status == ErrorCodes::InterruptedDueToReplStateChange); } } // namespace @@ -166,7 +151,7 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance( { std::map<MigrationIdentifier, ScopedMigrationRequest> scopedMigrationRequests; - vector<std::pair<shared_ptr<Notification<Status>>, MigrateInfo>> responses; + vector<std::pair<shared_ptr<Notification<RemoteCommandResponse>>, MigrateInfo>> responses; for (const auto& migrateInfo : migrateInfos) { // Write a document to the config.migrations collection, in case this migration must be @@ -197,18 +182,16 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance( auto notification = std::move(response.first); auto migrateInfo = std::move(response.second); - Status responseStatus = notification->get(); + const auto& remoteCommandResponse = notification->get(); - if (responseStatus == ErrorCodes::LockBusy) { + auto it = scopedMigrationRequests.find(migrateInfo.getName()); + invariant(it != scopedMigrationRequests.end()); + Status commandStatus = + _processRemoteCommandResponse(remoteCommandResponse, &it->second); + if (commandStatus == ErrorCodes::LockBusy) { rescheduledMigrations.emplace_back(std::move(migrateInfo)); } else { - if (isErrorDueToBalancerStepdown(responseStatus)) { - auto it = scopedMigrationRequests.find(migrateInfo.getName()); - invariant(it != scopedMigrationRequests.end()); - it->second.keepDocumentOnDestruct(); - } - - migrationStatuses.emplace(migrateInfo.getName(), std::move(responseStatus)); + migrationStatuses.emplace(migrateInfo.getName(), std::move(commandStatus)); } } } @@ -225,19 +208,19 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance( continue; } - Status responseStatus = _schedule(txn, - migrateInfo, - true, // Shard takes the collection dist lock - maxChunkSizeBytes, - secondaryThrottle, - waitForDelete) - ->get(); + RemoteCommandResponse remoteCommandResponse = + _schedule(txn, + migrateInfo, + true, // Shard takes the collection dist lock + maxChunkSizeBytes, + secondaryThrottle, + waitForDelete) + ->get(); - if (isErrorDueToBalancerStepdown(responseStatus)) { - statusWithScopedMigrationRequest.getValue().keepDocumentOnDestruct(); - } + Status commandStatus = _processRemoteCommandResponse( + remoteCommandResponse, &statusWithScopedMigrationRequest.getValue()); - migrationStatuses.emplace(migrateInfo.getName(), std::move(responseStatus)); + migrationStatuses.emplace(migrateInfo.getName(), std::move(commandStatus)); } invariant(migrationStatuses.size() == migrateInfos.size()); @@ -261,13 +244,14 @@ Status MigrationManager::executeManualMigration( return statusWithScopedMigrationRequest.getStatus(); } - Status status = _schedule(txn, - migrateInfo, - false, // Config server takes the collection dist lock - maxChunkSizeBytes, - secondaryThrottle, - waitForDelete) - ->get(); + RemoteCommandResponse remoteCommandResponse = + _schedule(txn, + migrateInfo, + false, // Config server takes the collection dist lock + maxChunkSizeBytes, + secondaryThrottle, + waitForDelete) + ->get(); auto scopedCMStatus = ScopedChunkManager::getExisting(txn, NamespaceString(migrateInfo.ns)); if (!scopedCMStatus.isOK()) { @@ -280,23 +264,18 @@ Status MigrationManager::executeManualMigration( auto chunk = cm->findIntersectingChunkWithSimpleCollation(txn, migrateInfo.minKey); invariant(chunk); - // The order of the checks below is important due to the need for interrupted migration calls to - // be able to join any possibly completed migrations, which are still running in the - // waitForDelete step. - if (isErrorDueToBalancerStepdown(status)) { - statusWithScopedMigrationRequest.getValue().keepDocumentOnDestruct(); - - // We want the mongos to get a retriable error, and not make its replica set monitor - // interpret something like InterruptedDueToReplStateChange as the config server when the - // error comes from the shard. - return {ErrorCodes::BalancerInterrupted, status.reason()}; - } else if (chunk->getShardId() == migrateInfo.to) { - // Regardless of the status, if the chunk's current shard matches the destination, deem the - // move as success. + Status commandStatus = _processRemoteCommandResponse( + remoteCommandResponse, &statusWithScopedMigrationRequest.getValue()); + + // Migration calls can be interrupted after the metadata is committed but before the command + // finishes the waitForDelete stage. Any failovers, therefore, must always cause the moveChunk + // command to be retried so as to assure that the waitForDelete promise of a successful command + // has been fulfilled. + if (chunk->getShardId() == migrateInfo.to && commandStatus != ErrorCodes::BalancerInterrupted) { return Status::OK(); } - return status; + return commandStatus; } void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* txn) { @@ -404,7 +383,7 @@ void MigrationManager::finishRecovery(OperationContext* txn, // Schedule recovered migrations. vector<ScopedMigrationRequest> scopedMigrationRequests; - vector<shared_ptr<Notification<Status>>> responses; + vector<shared_ptr<Notification<RemoteCommandResponse>>> responses; for (auto& nssAndMigrateInfos : _migrationRecoveryMap) { auto& nss = nssAndMigrateInfos.first; @@ -521,7 +500,7 @@ void MigrationManager::drainActiveMigrations() { _state = State::kStopped; } -shared_ptr<Notification<Status>> MigrationManager::_schedule( +shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule( OperationContext* txn, const MigrateInfo& migrateInfo, bool shardTakesCollectionDistLock, @@ -534,7 +513,7 @@ shared_ptr<Notification<Status>> MigrationManager::_schedule( { stdx::lock_guard<stdx::mutex> lock(_mutex); if (_state != State::kEnabled && _state != State::kRecovering) { - return std::make_shared<Notification<Status>>( + return std::make_shared<Notification<RemoteCommandResponse>>( Status(ErrorCodes::BalancerInterrupted, "Migration cannot be executed because the balancer is not running")); } @@ -545,7 +524,7 @@ shared_ptr<Notification<Status>> MigrationManager::_schedule( // shard as well, but doing them here saves an extra network call, which might otherwise fail. auto statusWithScopedChunkManager = ScopedChunkManager::getExisting(txn, nss); if (!statusWithScopedChunkManager.isOK()) { - return std::make_shared<Notification<Status>>( + return std::make_shared<Notification<RemoteCommandResponse>>( std::move(statusWithScopedChunkManager.getStatus())); } @@ -557,7 +536,7 @@ shared_ptr<Notification<Status>> MigrationManager::_schedule( // If the chunk is not found exactly as requested, the caller must have stale data if (SimpleBSONObjComparator::kInstance.evaluate(chunk->getMin() != migrateInfo.minKey) || SimpleBSONObjComparator::kInstance.evaluate(chunk->getMax() != migrateInfo.maxKey)) { - return std::make_shared<Notification<Status>>(Status( + return std::make_shared<Notification<RemoteCommandResponse>>(Status( ErrorCodes::IncompatibleShardingMetadata, stream() << "Chunk " << ChunkRange(migrateInfo.minKey, migrateInfo.maxKey).toString() << " does not exist.")); @@ -565,14 +544,16 @@ shared_ptr<Notification<Status>> MigrationManager::_schedule( const auto fromShardStatus = Grid::get(txn)->shardRegistry()->getShard(txn, migrateInfo.from); if (!fromShardStatus.isOK()) { - return std::make_shared<Notification<Status>>(std::move(fromShardStatus.getStatus())); + return std::make_shared<Notification<RemoteCommandResponse>>( + std::move(fromShardStatus.getStatus())); } const auto fromShard = fromShardStatus.getValue(); auto fromHostStatus = fromShard->getTargeter()->findHost(txn, ReadPreferenceSetting{ReadPreference::PrimaryOnly}); if (!fromHostStatus.isOK()) { - return std::make_shared<Notification<Status>>(std::move(fromHostStatus.getStatus())); + return std::make_shared<Notification<RemoteCommandResponse>>( + std::move(fromHostStatus.getStatus())); } BSONObjBuilder builder; @@ -593,7 +574,7 @@ shared_ptr<Notification<Status>> MigrationManager::_schedule( stdx::lock_guard<stdx::mutex> lock(_mutex); if (_state != State::kEnabled && _state != State::kRecovering) { - return std::make_shared<Notification<Status>>( + return std::make_shared<Notification<RemoteCommandResponse>>( Status(ErrorCodes::BalancerInterrupted, "Migration cannot be executed because the balancer is not running")); } @@ -652,11 +633,7 @@ void MigrationManager::_scheduleWithDistLock_inlock(OperationContext* txn, auto txn = cc().makeOperationContext(); stdx::lock_guard<stdx::mutex> lock(_mutex); - _completeWithDistLock_inlock( - txn.get(), - itMigration, - extractMigrationStatusFromRemoteCommandResponse( - args.response, _state != State::kEnabled && _state != State::kRecovering)); + _completeWithDistLock_inlock(txn.get(), itMigration, args.response); }); if (callbackHandleWithStatus.isOK()) { @@ -667,9 +644,10 @@ void MigrationManager::_scheduleWithDistLock_inlock(OperationContext* txn, _completeWithDistLock_inlock(txn, itMigration, std::move(callbackHandleWithStatus.getStatus())); } -void MigrationManager::_completeWithDistLock_inlock(OperationContext* txn, - MigrationsList::iterator itMigration, - Status status) { +void MigrationManager::_completeWithDistLock_inlock( + OperationContext* txn, + MigrationsList::iterator itMigration, + const RemoteCommandResponse& remoteCommandResponse) { const NamespaceString nss(itMigration->nss); // Make sure to signal the notification last, after the distributed lock is freed, so that we @@ -690,7 +668,7 @@ void MigrationManager::_completeWithDistLock_inlock(OperationContext* txn, _checkDrained_inlock(); } - notificationToSignal->set(status); + notificationToSignal->set(remoteCommandResponse); } void MigrationManager::_scheduleWithoutDistLock_inlock(OperationContext* txn, @@ -715,8 +693,7 @@ void MigrationManager::_scheduleWithoutDistLock_inlock(OperationContext* txn, _activeMigrationsWithoutDistLock.erase(itMigration); _checkDrained_inlock(); - notificationToSignal->set(extractMigrationStatusFromRemoteCommandResponse( - args.response, _state != State::kEnabled && _state != State::kRecovering)); + notificationToSignal->set(args.response); }); if (callbackHandleWithStatus.isOK()) { @@ -771,10 +748,36 @@ void MigrationManager::_abandonActiveMigrationsAndEnableManager(OperationContext _condVar.notify_all(); } +Status MigrationManager::_processRemoteCommandResponse( + const RemoteCommandResponse& remoteCommandResponse, + ScopedMigrationRequest* scopedMigrationRequest) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + Status commandStatus(ErrorCodes::InternalError, "Uninitialized value."); + if (isErrorDueToConfigStepdown(remoteCommandResponse.status, + _state != State::kEnabled && _state != State::kRecovering)) { + scopedMigrationRequest->keepDocumentOnDestruct(); + commandStatus = Status(ErrorCodes::BalancerInterrupted, + stream() << "Migration interrupted because the balancer is stopping." + << causedBy(remoteCommandResponse.status)); + } else if (!remoteCommandResponse.isOK()) { + commandStatus = remoteCommandResponse.status; + } else { + commandStatus = extractMigrationStatusFromCommandResponse(remoteCommandResponse.data); + if (!commandStatus.isOK() && commandStatus != ErrorCodes::LockBusy && + commandStatus != ErrorCodes::ChunkTooBig) { + commandStatus = Status(ErrorCodes::OperationFailed, + stream() << "moveChunk command failed on source shard." + << causedBy(commandStatus)); + } + } + + return commandStatus; +} + MigrationManager::Migration::Migration(NamespaceString inNss, BSONObj inMoveChunkCmdObj) : nss(std::move(inNss)), moveChunkCmdObj(std::move(inMoveChunkCmdObj)), - completionNotification(std::make_shared<Notification<Status>>()) {} + completionNotification(std::make_shared<Notification<RemoteCommandResponse>>()) {} MigrationManager::Migration::~Migration() { invariant(completionNotification); diff --git a/src/mongo/db/s/balancer/migration_manager.h b/src/mongo/db/s/balancer/migration_manager.h index f7730ba65e1..0c80f4f569a 100644 --- a/src/mongo/db/s/balancer/migration_manager.h +++ b/src/mongo/db/s/balancer/migration_manager.h @@ -47,6 +47,7 @@ namespace mongo { class OperationContext; +class ScopedMigrationRequest; class ServiceContext; class Status; template <typename T> @@ -164,7 +165,7 @@ private: boost::optional<executor::TaskExecutor::CallbackHandle> callbackHandle; // Notification, which will be signaled when the migration completes - std::shared_ptr<Notification<Status>> completionNotification; + std::shared_ptr<Notification<executor::RemoteCommandResponse>> completionNotification; }; // Used as a type in which to store a list of active migrations. The reason to choose list is @@ -199,7 +200,7 @@ private: * The 'shardTakesCollectionDistLock' parameter controls whether the distributed lock is * acquired by the migration manager or by the shard executing the migration request. */ - std::shared_ptr<Notification<Status>> _schedule( + std::shared_ptr<Notification<executor::RemoteCommandResponse>> _schedule( OperationContext* txn, const MigrateInfo& migrateInfo, bool shardTakesCollectionDistLock, @@ -226,7 +227,7 @@ private: */ void _completeWithDistLock_inlock(OperationContext* txn, MigrationsList::iterator itMigration, - Status status); + const executor::RemoteCommandResponse& remoteCommandResponse); /** * Immediately schedules the specified migration without attempting to acquire the collection @@ -260,6 +261,22 @@ private: */ void _abandonActiveMigrationsAndEnableManager(OperationContext* txn); + /** + * Parses a moveChunk RemoteCommandResponse's two levels of Status objects and distiguishes + * between errors generated by this config server and the shard primary to which the moveChunk + * command was sent. + * + * If the command failed because of stepdown of this config server, the migration document + * managed by 'scopedMigrationRequest' is saved for later balancer recovery and a + * BalancerInterrupted error is returned. If the command failed because the shard to which the + * command was sent returned an error, then the error is converted into OperationFailed. + * Shard LockBusy and ChunkTooBig errors are returned without any conversions, as special cases + * that can dictate further action. Otherwise, the status is returned as it is found. + */ + Status _processRemoteCommandResponse( + const executor::RemoteCommandResponse& remoteCommandResponse, + ScopedMigrationRequest* scopedMigrationRequest); + // The service context under which this migration manager runs. ServiceContext* const _serviceContext; |