diff options
author | Dianna Hohensee <dianna.hohensee@10gen.com> | 2016-10-17 10:20:37 -0400 |
---|---|---|
committer | Dianna Hohensee <dianna.hohensee@10gen.com> | 2016-10-17 10:20:37 -0400 |
commit | d89ff664c25bdc3537cfd08af03d14d2d96815bb (patch) | |
tree | 4acc85dbbcc03bd2144f63210a6f51f36e4acb27 | |
parent | af2e05b0424b99ee162fe2ef0f1ecf6c687b06f9 (diff) | |
download | mongo-d89ff664c25bdc3537cfd08af03d14d2d96815bb.tar.gz |
Revert "SERVER-26307 Differentiate between config and shard command errors in MigrationManager moveChunk commands"
This reverts commit af2e05b0424b99ee162fe2ef0f1ecf6c687b06f9.
-rw-r--r-- | src/mongo/db/s/balancer/migration_manager.cpp | 177 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/migration_manager.h | 23 |
2 files changed, 90 insertions, 110 deletions
diff --git a/src/mongo/db/s/balancer/migration_manager.cpp b/src/mongo/db/s/balancer/migration_manager.cpp index 726137f38ff..e1c033669d7 100644 --- a/src/mongo/db/s/balancer/migration_manager.cpp +++ b/src/mongo/db/s/balancer/migration_manager.cpp @@ -68,16 +68,30 @@ const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, Seconds(15)); /** - * Parses the 'commandResponse' and converts it to a status to use as the outcome of the command. - * Preserves backwards compatibility with 3.2 and earlier shards that, rather than use a ChunkTooBig - * error code, include an extra field in the response. + * Parses the specified asynchronous command response and converts it to status to use as outcome of + * an asynchronous migration command. It is necessary for two reasons: + * - Preserve backwards compatibility with 3.2 and earlier, where the move chunk command instead of + * returning a ChunkTooBig status includes an extra field in the response. + * - Convert CallbackCanceled errors into BalancerInterrupted for the cases where the migration + * manager is being stopped at replica set stepdown. This return code allows the mongos calling + * logic to retry the operation on a new primary. */ -Status extractMigrationStatusFromCommandResponse(const BSONObj& commandResponse) { - Status commandStatus = getStatusFromCommandResult(commandResponse); +Status extractMigrationStatusFromRemoteCommandResponse(const RemoteCommandResponse& response, + bool isStopping) { + if (!response.isOK()) { + if (response.status == ErrorCodes::CallbackCanceled && isStopping) { + return {ErrorCodes::BalancerInterrupted, + "Migration interrupted because the balancer is stopping"}; + } + + return response.status; + } + + Status commandStatus = getStatusFromCommandResult(response.data); if (!commandStatus.isOK()) { bool chunkTooBig = false; - bsonExtractBooleanFieldWithDefault(commandResponse, kChunkTooBig, false, &chunkTooBig); + bsonExtractBooleanFieldWithDefault(response.data, kChunkTooBig, false, &chunkTooBig); if (chunkTooBig) { commandStatus = {ErrorCodes::ChunkTooBig, commandStatus.reason()}; } @@ -122,10 +136,11 @@ StatusWith<DistLockHandle> acquireDistLock(OperationContext* txn, * Returns whether the specified status is an error caused by stepdown of the primary config node * currently running the balancer. */ -bool isErrorDueToConfigStepdown(Status status, bool isStopping) { - return ((status == ErrorCodes::CallbackCanceled && isStopping) || - status == ErrorCodes::BalancerInterrupted || - status == ErrorCodes::InterruptedDueToReplStateChange); +bool isErrorDueToBalancerStepdown(Status status) { + return (status == ErrorCodes::BalancerInterrupted || + status == ErrorCodes::InterruptedAtShutdown || + status == ErrorCodes::InterruptedDueToReplStateChange || + ErrorCodes::isShutdownError(status.code())); } } // namespace @@ -151,7 +166,7 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance( { std::map<MigrationIdentifier, ScopedMigrationRequest> scopedMigrationRequests; - vector<std::pair<shared_ptr<Notification<RemoteCommandResponse>>, MigrateInfo>> responses; + vector<std::pair<shared_ptr<Notification<Status>>, MigrateInfo>> responses; for (const auto& migrateInfo : migrateInfos) { // Write a document to the config.migrations collection, in case this migration must be @@ -182,16 +197,18 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance( auto notification = std::move(response.first); auto migrateInfo = std::move(response.second); - const auto& remoteCommandResponse = notification->get(); + Status responseStatus = notification->get(); - auto it = scopedMigrationRequests.find(migrateInfo.getName()); - invariant(it != scopedMigrationRequests.end()); - Status commandStatus = - _processRemoteCommandResponse(remoteCommandResponse, &it->second); - if (commandStatus == ErrorCodes::LockBusy) { + if (responseStatus == ErrorCodes::LockBusy) { rescheduledMigrations.emplace_back(std::move(migrateInfo)); } else { - migrationStatuses.emplace(migrateInfo.getName(), std::move(commandStatus)); + if (isErrorDueToBalancerStepdown(responseStatus)) { + auto it = scopedMigrationRequests.find(migrateInfo.getName()); + invariant(it != scopedMigrationRequests.end()); + it->second.keepDocumentOnDestruct(); + } + + migrationStatuses.emplace(migrateInfo.getName(), std::move(responseStatus)); } } } @@ -208,19 +225,19 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance( continue; } - RemoteCommandResponse remoteCommandResponse = - _schedule(txn, - migrateInfo, - true, // Shard takes the collection dist lock - maxChunkSizeBytes, - secondaryThrottle, - waitForDelete) - ->get(); + Status responseStatus = _schedule(txn, + migrateInfo, + true, // Shard takes the collection dist lock + maxChunkSizeBytes, + secondaryThrottle, + waitForDelete) + ->get(); - Status commandStatus = _processRemoteCommandResponse( - remoteCommandResponse, &statusWithScopedMigrationRequest.getValue()); + if (isErrorDueToBalancerStepdown(responseStatus)) { + statusWithScopedMigrationRequest.getValue().keepDocumentOnDestruct(); + } - migrationStatuses.emplace(migrateInfo.getName(), std::move(commandStatus)); + migrationStatuses.emplace(migrateInfo.getName(), std::move(responseStatus)); } invariant(migrationStatuses.size() == migrateInfos.size()); @@ -244,14 +261,13 @@ Status MigrationManager::executeManualMigration( return statusWithScopedMigrationRequest.getStatus(); } - RemoteCommandResponse remoteCommandResponse = - _schedule(txn, - migrateInfo, - false, // Config server takes the collection dist lock - maxChunkSizeBytes, - secondaryThrottle, - waitForDelete) - ->get(); + Status status = _schedule(txn, + migrateInfo, + false, // Config server takes the collection dist lock + maxChunkSizeBytes, + secondaryThrottle, + waitForDelete) + ->get(); auto scopedCMStatus = ScopedChunkManager::getExisting(txn, NamespaceString(migrateInfo.ns)); if (!scopedCMStatus.isOK()) { @@ -264,18 +280,23 @@ Status MigrationManager::executeManualMigration( auto chunk = cm->findIntersectingChunkWithSimpleCollation(txn, migrateInfo.minKey); invariant(chunk); - Status commandStatus = _processRemoteCommandResponse( - remoteCommandResponse, &statusWithScopedMigrationRequest.getValue()); - - // Migration calls can be interrupted after the metadata is committed but before the command - // finishes the waitForDelete stage. Any failovers, therefore, must always cause the moveChunk - // command to be retried so as to assure that the waitForDelete promise of a successful command - // has been fulfilled. - if (chunk->getShardId() == migrateInfo.to && commandStatus != ErrorCodes::BalancerInterrupted) { + // The order of the checks below is important due to the need for interrupted migration calls to + // be able to join any possibly completed migrations, which are still running in the + // waitForDelete step. + if (isErrorDueToBalancerStepdown(status)) { + statusWithScopedMigrationRequest.getValue().keepDocumentOnDestruct(); + + // We want the mongos to get a retriable error, and not make its replica set monitor + // interpret something like InterruptedDueToReplStateChange as the config server when the + // error comes from the shard. + return {ErrorCodes::BalancerInterrupted, status.reason()}; + } else if (chunk->getShardId() == migrateInfo.to) { + // Regardless of the status, if the chunk's current shard matches the destination, deem the + // move as success. return Status::OK(); } - return commandStatus; + return status; } void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* txn) { @@ -383,7 +404,7 @@ void MigrationManager::finishRecovery(OperationContext* txn, // Schedule recovered migrations. vector<ScopedMigrationRequest> scopedMigrationRequests; - vector<shared_ptr<Notification<RemoteCommandResponse>>> responses; + vector<shared_ptr<Notification<Status>>> responses; for (auto& nssAndMigrateInfos : _migrationRecoveryMap) { auto& nss = nssAndMigrateInfos.first; @@ -500,7 +521,7 @@ void MigrationManager::drainActiveMigrations() { _state = State::kStopped; } -shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule( +shared_ptr<Notification<Status>> MigrationManager::_schedule( OperationContext* txn, const MigrateInfo& migrateInfo, bool shardTakesCollectionDistLock, @@ -513,7 +534,7 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule( { stdx::lock_guard<stdx::mutex> lock(_mutex); if (_state != State::kEnabled && _state != State::kRecovering) { - return std::make_shared<Notification<RemoteCommandResponse>>( + return std::make_shared<Notification<Status>>( Status(ErrorCodes::BalancerInterrupted, "Migration cannot be executed because the balancer is not running")); } @@ -524,7 +545,7 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule( // shard as well, but doing them here saves an extra network call, which might otherwise fail. auto statusWithScopedChunkManager = ScopedChunkManager::getExisting(txn, nss); if (!statusWithScopedChunkManager.isOK()) { - return std::make_shared<Notification<RemoteCommandResponse>>( + return std::make_shared<Notification<Status>>( std::move(statusWithScopedChunkManager.getStatus())); } @@ -536,7 +557,7 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule( // If the chunk is not found exactly as requested, the caller must have stale data if (SimpleBSONObjComparator::kInstance.evaluate(chunk->getMin() != migrateInfo.minKey) || SimpleBSONObjComparator::kInstance.evaluate(chunk->getMax() != migrateInfo.maxKey)) { - return std::make_shared<Notification<RemoteCommandResponse>>(Status( + return std::make_shared<Notification<Status>>(Status( ErrorCodes::IncompatibleShardingMetadata, stream() << "Chunk " << ChunkRange(migrateInfo.minKey, migrateInfo.maxKey).toString() << " does not exist.")); @@ -544,16 +565,14 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule( const auto fromShardStatus = Grid::get(txn)->shardRegistry()->getShard(txn, migrateInfo.from); if (!fromShardStatus.isOK()) { - return std::make_shared<Notification<RemoteCommandResponse>>( - std::move(fromShardStatus.getStatus())); + return std::make_shared<Notification<Status>>(std::move(fromShardStatus.getStatus())); } const auto fromShard = fromShardStatus.getValue(); auto fromHostStatus = fromShard->getTargeter()->findHost(txn, ReadPreferenceSetting{ReadPreference::PrimaryOnly}); if (!fromHostStatus.isOK()) { - return std::make_shared<Notification<RemoteCommandResponse>>( - std::move(fromHostStatus.getStatus())); + return std::make_shared<Notification<Status>>(std::move(fromHostStatus.getStatus())); } BSONObjBuilder builder; @@ -574,7 +593,7 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule( stdx::lock_guard<stdx::mutex> lock(_mutex); if (_state != State::kEnabled && _state != State::kRecovering) { - return std::make_shared<Notification<RemoteCommandResponse>>( + return std::make_shared<Notification<Status>>( Status(ErrorCodes::BalancerInterrupted, "Migration cannot be executed because the balancer is not running")); } @@ -633,7 +652,11 @@ void MigrationManager::_scheduleWithDistLock_inlock(OperationContext* txn, auto txn = cc().makeOperationContext(); stdx::lock_guard<stdx::mutex> lock(_mutex); - _completeWithDistLock_inlock(txn.get(), itMigration, args.response); + _completeWithDistLock_inlock( + txn.get(), + itMigration, + extractMigrationStatusFromRemoteCommandResponse( + args.response, _state != State::kEnabled && _state != State::kRecovering)); }); if (callbackHandleWithStatus.isOK()) { @@ -644,10 +667,9 @@ void MigrationManager::_scheduleWithDistLock_inlock(OperationContext* txn, _completeWithDistLock_inlock(txn, itMigration, std::move(callbackHandleWithStatus.getStatus())); } -void MigrationManager::_completeWithDistLock_inlock( - OperationContext* txn, - MigrationsList::iterator itMigration, - const RemoteCommandResponse& remoteCommandResponse) { +void MigrationManager::_completeWithDistLock_inlock(OperationContext* txn, + MigrationsList::iterator itMigration, + Status status) { const NamespaceString nss(itMigration->nss); // Make sure to signal the notification last, after the distributed lock is freed, so that we @@ -668,7 +690,7 @@ void MigrationManager::_completeWithDistLock_inlock( _checkDrained_inlock(); } - notificationToSignal->set(remoteCommandResponse); + notificationToSignal->set(status); } void MigrationManager::_scheduleWithoutDistLock_inlock(OperationContext* txn, @@ -693,7 +715,8 @@ void MigrationManager::_scheduleWithoutDistLock_inlock(OperationContext* txn, _activeMigrationsWithoutDistLock.erase(itMigration); _checkDrained_inlock(); - notificationToSignal->set(args.response); + notificationToSignal->set(extractMigrationStatusFromRemoteCommandResponse( + args.response, _state != State::kEnabled && _state != State::kRecovering)); }); if (callbackHandleWithStatus.isOK()) { @@ -748,36 +771,10 @@ void MigrationManager::_abandonActiveMigrationsAndEnableManager(OperationContext _condVar.notify_all(); } -Status MigrationManager::_processRemoteCommandResponse( - const RemoteCommandResponse& remoteCommandResponse, - ScopedMigrationRequest* scopedMigrationRequest) { - stdx::lock_guard<stdx::mutex> lock(_mutex); - Status commandStatus(ErrorCodes::InternalError, "Uninitialized value."); - if (isErrorDueToConfigStepdown(remoteCommandResponse.status, - _state != State::kEnabled && _state != State::kRecovering)) { - scopedMigrationRequest->keepDocumentOnDestruct(); - commandStatus = Status(ErrorCodes::BalancerInterrupted, - stream() << "Migration interrupted because the balancer is stopping." - << causedBy(remoteCommandResponse.status)); - } else if (!remoteCommandResponse.isOK()) { - commandStatus = remoteCommandResponse.status; - } else { - commandStatus = extractMigrationStatusFromCommandResponse(remoteCommandResponse.data); - if (!commandStatus.isOK() && commandStatus != ErrorCodes::LockBusy && - commandStatus != ErrorCodes::ChunkTooBig) { - commandStatus = Status(ErrorCodes::OperationFailed, - stream() << "moveChunk command failed on source shard." - << causedBy(commandStatus)); - } - } - - return commandStatus; -} - MigrationManager::Migration::Migration(NamespaceString inNss, BSONObj inMoveChunkCmdObj) : nss(std::move(inNss)), moveChunkCmdObj(std::move(inMoveChunkCmdObj)), - completionNotification(std::make_shared<Notification<RemoteCommandResponse>>()) {} + completionNotification(std::make_shared<Notification<Status>>()) {} MigrationManager::Migration::~Migration() { invariant(completionNotification); diff --git a/src/mongo/db/s/balancer/migration_manager.h b/src/mongo/db/s/balancer/migration_manager.h index 0c80f4f569a..f7730ba65e1 100644 --- a/src/mongo/db/s/balancer/migration_manager.h +++ b/src/mongo/db/s/balancer/migration_manager.h @@ -47,7 +47,6 @@ namespace mongo { class OperationContext; -class ScopedMigrationRequest; class ServiceContext; class Status; template <typename T> @@ -165,7 +164,7 @@ private: boost::optional<executor::TaskExecutor::CallbackHandle> callbackHandle; // Notification, which will be signaled when the migration completes - std::shared_ptr<Notification<executor::RemoteCommandResponse>> completionNotification; + std::shared_ptr<Notification<Status>> completionNotification; }; // Used as a type in which to store a list of active migrations. The reason to choose list is @@ -200,7 +199,7 @@ private: * The 'shardTakesCollectionDistLock' parameter controls whether the distributed lock is * acquired by the migration manager or by the shard executing the migration request. */ - std::shared_ptr<Notification<executor::RemoteCommandResponse>> _schedule( + std::shared_ptr<Notification<Status>> _schedule( OperationContext* txn, const MigrateInfo& migrateInfo, bool shardTakesCollectionDistLock, @@ -227,7 +226,7 @@ private: */ void _completeWithDistLock_inlock(OperationContext* txn, MigrationsList::iterator itMigration, - const executor::RemoteCommandResponse& remoteCommandResponse); + Status status); /** * Immediately schedules the specified migration without attempting to acquire the collection @@ -261,22 +260,6 @@ private: */ void _abandonActiveMigrationsAndEnableManager(OperationContext* txn); - /** - * Parses a moveChunk RemoteCommandResponse's two levels of Status objects and distiguishes - * between errors generated by this config server and the shard primary to which the moveChunk - * command was sent. - * - * If the command failed because of stepdown of this config server, the migration document - * managed by 'scopedMigrationRequest' is saved for later balancer recovery and a - * BalancerInterrupted error is returned. If the command failed because the shard to which the - * command was sent returned an error, then the error is converted into OperationFailed. - * Shard LockBusy and ChunkTooBig errors are returned without any conversions, as special cases - * that can dictate further action. Otherwise, the status is returned as it is found. - */ - Status _processRemoteCommandResponse( - const executor::RemoteCommandResponse& remoteCommandResponse, - ScopedMigrationRequest* scopedMigrationRequest); - // The service context under which this migration manager runs. ServiceContext* const _serviceContext; |