summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDianna Hohensee <dianna.hohensee@10gen.com>2016-10-17 10:20:37 -0400
committerDianna Hohensee <dianna.hohensee@10gen.com>2016-10-17 10:20:37 -0400
commitd89ff664c25bdc3537cfd08af03d14d2d96815bb (patch)
tree4acc85dbbcc03bd2144f63210a6f51f36e4acb27
parentaf2e05b0424b99ee162fe2ef0f1ecf6c687b06f9 (diff)
downloadmongo-d89ff664c25bdc3537cfd08af03d14d2d96815bb.tar.gz
Revert "SERVER-26307 Differentiate between config and shard command errors in MigrationManager moveChunk commands"
This reverts commit af2e05b0424b99ee162fe2ef0f1ecf6c687b06f9.
-rw-r--r--src/mongo/db/s/balancer/migration_manager.cpp177
-rw-r--r--src/mongo/db/s/balancer/migration_manager.h23
2 files changed, 90 insertions, 110 deletions
diff --git a/src/mongo/db/s/balancer/migration_manager.cpp b/src/mongo/db/s/balancer/migration_manager.cpp
index 726137f38ff..e1c033669d7 100644
--- a/src/mongo/db/s/balancer/migration_manager.cpp
+++ b/src/mongo/db/s/balancer/migration_manager.cpp
@@ -68,16 +68,30 @@ const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
Seconds(15));
/**
- * Parses the 'commandResponse' and converts it to a status to use as the outcome of the command.
- * Preserves backwards compatibility with 3.2 and earlier shards that, rather than use a ChunkTooBig
- * error code, include an extra field in the response.
+ * Parses the specified asynchronous command response and converts it to status to use as outcome of
+ * an asynchronous migration command. It is necessary for two reasons:
+ * - Preserve backwards compatibility with 3.2 and earlier, where the move chunk command instead of
+ * returning a ChunkTooBig status includes an extra field in the response.
+ * - Convert CallbackCanceled errors into BalancerInterrupted for the cases where the migration
+ * manager is being stopped at replica set stepdown. This return code allows the mongos calling
+ * logic to retry the operation on a new primary.
*/
-Status extractMigrationStatusFromCommandResponse(const BSONObj& commandResponse) {
- Status commandStatus = getStatusFromCommandResult(commandResponse);
+Status extractMigrationStatusFromRemoteCommandResponse(const RemoteCommandResponse& response,
+ bool isStopping) {
+ if (!response.isOK()) {
+ if (response.status == ErrorCodes::CallbackCanceled && isStopping) {
+ return {ErrorCodes::BalancerInterrupted,
+ "Migration interrupted because the balancer is stopping"};
+ }
+
+ return response.status;
+ }
+
+ Status commandStatus = getStatusFromCommandResult(response.data);
if (!commandStatus.isOK()) {
bool chunkTooBig = false;
- bsonExtractBooleanFieldWithDefault(commandResponse, kChunkTooBig, false, &chunkTooBig);
+ bsonExtractBooleanFieldWithDefault(response.data, kChunkTooBig, false, &chunkTooBig);
if (chunkTooBig) {
commandStatus = {ErrorCodes::ChunkTooBig, commandStatus.reason()};
}
@@ -122,10 +136,11 @@ StatusWith<DistLockHandle> acquireDistLock(OperationContext* txn,
* Returns whether the specified status is an error caused by stepdown of the primary config node
* currently running the balancer.
*/
-bool isErrorDueToConfigStepdown(Status status, bool isStopping) {
- return ((status == ErrorCodes::CallbackCanceled && isStopping) ||
- status == ErrorCodes::BalancerInterrupted ||
- status == ErrorCodes::InterruptedDueToReplStateChange);
+bool isErrorDueToBalancerStepdown(Status status) {
+ return (status == ErrorCodes::BalancerInterrupted ||
+ status == ErrorCodes::InterruptedAtShutdown ||
+ status == ErrorCodes::InterruptedDueToReplStateChange ||
+ ErrorCodes::isShutdownError(status.code()));
}
} // namespace
@@ -151,7 +166,7 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance(
{
std::map<MigrationIdentifier, ScopedMigrationRequest> scopedMigrationRequests;
- vector<std::pair<shared_ptr<Notification<RemoteCommandResponse>>, MigrateInfo>> responses;
+ vector<std::pair<shared_ptr<Notification<Status>>, MigrateInfo>> responses;
for (const auto& migrateInfo : migrateInfos) {
// Write a document to the config.migrations collection, in case this migration must be
@@ -182,16 +197,18 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance(
auto notification = std::move(response.first);
auto migrateInfo = std::move(response.second);
- const auto& remoteCommandResponse = notification->get();
+ Status responseStatus = notification->get();
- auto it = scopedMigrationRequests.find(migrateInfo.getName());
- invariant(it != scopedMigrationRequests.end());
- Status commandStatus =
- _processRemoteCommandResponse(remoteCommandResponse, &it->second);
- if (commandStatus == ErrorCodes::LockBusy) {
+ if (responseStatus == ErrorCodes::LockBusy) {
rescheduledMigrations.emplace_back(std::move(migrateInfo));
} else {
- migrationStatuses.emplace(migrateInfo.getName(), std::move(commandStatus));
+ if (isErrorDueToBalancerStepdown(responseStatus)) {
+ auto it = scopedMigrationRequests.find(migrateInfo.getName());
+ invariant(it != scopedMigrationRequests.end());
+ it->second.keepDocumentOnDestruct();
+ }
+
+ migrationStatuses.emplace(migrateInfo.getName(), std::move(responseStatus));
}
}
}
@@ -208,19 +225,19 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance(
continue;
}
- RemoteCommandResponse remoteCommandResponse =
- _schedule(txn,
- migrateInfo,
- true, // Shard takes the collection dist lock
- maxChunkSizeBytes,
- secondaryThrottle,
- waitForDelete)
- ->get();
+ Status responseStatus = _schedule(txn,
+ migrateInfo,
+ true, // Shard takes the collection dist lock
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete)
+ ->get();
- Status commandStatus = _processRemoteCommandResponse(
- remoteCommandResponse, &statusWithScopedMigrationRequest.getValue());
+ if (isErrorDueToBalancerStepdown(responseStatus)) {
+ statusWithScopedMigrationRequest.getValue().keepDocumentOnDestruct();
+ }
- migrationStatuses.emplace(migrateInfo.getName(), std::move(commandStatus));
+ migrationStatuses.emplace(migrateInfo.getName(), std::move(responseStatus));
}
invariant(migrationStatuses.size() == migrateInfos.size());
@@ -244,14 +261,13 @@ Status MigrationManager::executeManualMigration(
return statusWithScopedMigrationRequest.getStatus();
}
- RemoteCommandResponse remoteCommandResponse =
- _schedule(txn,
- migrateInfo,
- false, // Config server takes the collection dist lock
- maxChunkSizeBytes,
- secondaryThrottle,
- waitForDelete)
- ->get();
+ Status status = _schedule(txn,
+ migrateInfo,
+ false, // Config server takes the collection dist lock
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete)
+ ->get();
auto scopedCMStatus = ScopedChunkManager::getExisting(txn, NamespaceString(migrateInfo.ns));
if (!scopedCMStatus.isOK()) {
@@ -264,18 +280,23 @@ Status MigrationManager::executeManualMigration(
auto chunk = cm->findIntersectingChunkWithSimpleCollation(txn, migrateInfo.minKey);
invariant(chunk);
- Status commandStatus = _processRemoteCommandResponse(
- remoteCommandResponse, &statusWithScopedMigrationRequest.getValue());
-
- // Migration calls can be interrupted after the metadata is committed but before the command
- // finishes the waitForDelete stage. Any failovers, therefore, must always cause the moveChunk
- // command to be retried so as to assure that the waitForDelete promise of a successful command
- // has been fulfilled.
- if (chunk->getShardId() == migrateInfo.to && commandStatus != ErrorCodes::BalancerInterrupted) {
+ // The order of the checks below is important due to the need for interrupted migration calls to
+ // be able to join any possibly completed migrations, which are still running in the
+ // waitForDelete step.
+ if (isErrorDueToBalancerStepdown(status)) {
+ statusWithScopedMigrationRequest.getValue().keepDocumentOnDestruct();
+
+ // We want the mongos to get a retriable error, and not make its replica set monitor
+ // interpret something like InterruptedDueToReplStateChange as the config server when the
+ // error comes from the shard.
+ return {ErrorCodes::BalancerInterrupted, status.reason()};
+ } else if (chunk->getShardId() == migrateInfo.to) {
+ // Regardless of the status, if the chunk's current shard matches the destination, deem the
+ // move as success.
return Status::OK();
}
- return commandStatus;
+ return status;
}
void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* txn) {
@@ -383,7 +404,7 @@ void MigrationManager::finishRecovery(OperationContext* txn,
// Schedule recovered migrations.
vector<ScopedMigrationRequest> scopedMigrationRequests;
- vector<shared_ptr<Notification<RemoteCommandResponse>>> responses;
+ vector<shared_ptr<Notification<Status>>> responses;
for (auto& nssAndMigrateInfos : _migrationRecoveryMap) {
auto& nss = nssAndMigrateInfos.first;
@@ -500,7 +521,7 @@ void MigrationManager::drainActiveMigrations() {
_state = State::kStopped;
}
-shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule(
+shared_ptr<Notification<Status>> MigrationManager::_schedule(
OperationContext* txn,
const MigrateInfo& migrateInfo,
bool shardTakesCollectionDistLock,
@@ -513,7 +534,7 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule(
{
stdx::lock_guard<stdx::mutex> lock(_mutex);
if (_state != State::kEnabled && _state != State::kRecovering) {
- return std::make_shared<Notification<RemoteCommandResponse>>(
+ return std::make_shared<Notification<Status>>(
Status(ErrorCodes::BalancerInterrupted,
"Migration cannot be executed because the balancer is not running"));
}
@@ -524,7 +545,7 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule(
// shard as well, but doing them here saves an extra network call, which might otherwise fail.
auto statusWithScopedChunkManager = ScopedChunkManager::getExisting(txn, nss);
if (!statusWithScopedChunkManager.isOK()) {
- return std::make_shared<Notification<RemoteCommandResponse>>(
+ return std::make_shared<Notification<Status>>(
std::move(statusWithScopedChunkManager.getStatus()));
}
@@ -536,7 +557,7 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule(
// If the chunk is not found exactly as requested, the caller must have stale data
if (SimpleBSONObjComparator::kInstance.evaluate(chunk->getMin() != migrateInfo.minKey) ||
SimpleBSONObjComparator::kInstance.evaluate(chunk->getMax() != migrateInfo.maxKey)) {
- return std::make_shared<Notification<RemoteCommandResponse>>(Status(
+ return std::make_shared<Notification<Status>>(Status(
ErrorCodes::IncompatibleShardingMetadata,
stream() << "Chunk " << ChunkRange(migrateInfo.minKey, migrateInfo.maxKey).toString()
<< " does not exist."));
@@ -544,16 +565,14 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule(
const auto fromShardStatus = Grid::get(txn)->shardRegistry()->getShard(txn, migrateInfo.from);
if (!fromShardStatus.isOK()) {
- return std::make_shared<Notification<RemoteCommandResponse>>(
- std::move(fromShardStatus.getStatus()));
+ return std::make_shared<Notification<Status>>(std::move(fromShardStatus.getStatus()));
}
const auto fromShard = fromShardStatus.getValue();
auto fromHostStatus =
fromShard->getTargeter()->findHost(txn, ReadPreferenceSetting{ReadPreference::PrimaryOnly});
if (!fromHostStatus.isOK()) {
- return std::make_shared<Notification<RemoteCommandResponse>>(
- std::move(fromHostStatus.getStatus()));
+ return std::make_shared<Notification<Status>>(std::move(fromHostStatus.getStatus()));
}
BSONObjBuilder builder;
@@ -574,7 +593,7 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule(
stdx::lock_guard<stdx::mutex> lock(_mutex);
if (_state != State::kEnabled && _state != State::kRecovering) {
- return std::make_shared<Notification<RemoteCommandResponse>>(
+ return std::make_shared<Notification<Status>>(
Status(ErrorCodes::BalancerInterrupted,
"Migration cannot be executed because the balancer is not running"));
}
@@ -633,7 +652,11 @@ void MigrationManager::_scheduleWithDistLock_inlock(OperationContext* txn,
auto txn = cc().makeOperationContext();
stdx::lock_guard<stdx::mutex> lock(_mutex);
- _completeWithDistLock_inlock(txn.get(), itMigration, args.response);
+ _completeWithDistLock_inlock(
+ txn.get(),
+ itMigration,
+ extractMigrationStatusFromRemoteCommandResponse(
+ args.response, _state != State::kEnabled && _state != State::kRecovering));
});
if (callbackHandleWithStatus.isOK()) {
@@ -644,10 +667,9 @@ void MigrationManager::_scheduleWithDistLock_inlock(OperationContext* txn,
_completeWithDistLock_inlock(txn, itMigration, std::move(callbackHandleWithStatus.getStatus()));
}
-void MigrationManager::_completeWithDistLock_inlock(
- OperationContext* txn,
- MigrationsList::iterator itMigration,
- const RemoteCommandResponse& remoteCommandResponse) {
+void MigrationManager::_completeWithDistLock_inlock(OperationContext* txn,
+ MigrationsList::iterator itMigration,
+ Status status) {
const NamespaceString nss(itMigration->nss);
// Make sure to signal the notification last, after the distributed lock is freed, so that we
@@ -668,7 +690,7 @@ void MigrationManager::_completeWithDistLock_inlock(
_checkDrained_inlock();
}
- notificationToSignal->set(remoteCommandResponse);
+ notificationToSignal->set(status);
}
void MigrationManager::_scheduleWithoutDistLock_inlock(OperationContext* txn,
@@ -693,7 +715,8 @@ void MigrationManager::_scheduleWithoutDistLock_inlock(OperationContext* txn,
_activeMigrationsWithoutDistLock.erase(itMigration);
_checkDrained_inlock();
- notificationToSignal->set(args.response);
+ notificationToSignal->set(extractMigrationStatusFromRemoteCommandResponse(
+ args.response, _state != State::kEnabled && _state != State::kRecovering));
});
if (callbackHandleWithStatus.isOK()) {
@@ -748,36 +771,10 @@ void MigrationManager::_abandonActiveMigrationsAndEnableManager(OperationContext
_condVar.notify_all();
}
-Status MigrationManager::_processRemoteCommandResponse(
- const RemoteCommandResponse& remoteCommandResponse,
- ScopedMigrationRequest* scopedMigrationRequest) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- Status commandStatus(ErrorCodes::InternalError, "Uninitialized value.");
- if (isErrorDueToConfigStepdown(remoteCommandResponse.status,
- _state != State::kEnabled && _state != State::kRecovering)) {
- scopedMigrationRequest->keepDocumentOnDestruct();
- commandStatus = Status(ErrorCodes::BalancerInterrupted,
- stream() << "Migration interrupted because the balancer is stopping."
- << causedBy(remoteCommandResponse.status));
- } else if (!remoteCommandResponse.isOK()) {
- commandStatus = remoteCommandResponse.status;
- } else {
- commandStatus = extractMigrationStatusFromCommandResponse(remoteCommandResponse.data);
- if (!commandStatus.isOK() && commandStatus != ErrorCodes::LockBusy &&
- commandStatus != ErrorCodes::ChunkTooBig) {
- commandStatus = Status(ErrorCodes::OperationFailed,
- stream() << "moveChunk command failed on source shard."
- << causedBy(commandStatus));
- }
- }
-
- return commandStatus;
-}
-
MigrationManager::Migration::Migration(NamespaceString inNss, BSONObj inMoveChunkCmdObj)
: nss(std::move(inNss)),
moveChunkCmdObj(std::move(inMoveChunkCmdObj)),
- completionNotification(std::make_shared<Notification<RemoteCommandResponse>>()) {}
+ completionNotification(std::make_shared<Notification<Status>>()) {}
MigrationManager::Migration::~Migration() {
invariant(completionNotification);
diff --git a/src/mongo/db/s/balancer/migration_manager.h b/src/mongo/db/s/balancer/migration_manager.h
index 0c80f4f569a..f7730ba65e1 100644
--- a/src/mongo/db/s/balancer/migration_manager.h
+++ b/src/mongo/db/s/balancer/migration_manager.h
@@ -47,7 +47,6 @@
namespace mongo {
class OperationContext;
-class ScopedMigrationRequest;
class ServiceContext;
class Status;
template <typename T>
@@ -165,7 +164,7 @@ private:
boost::optional<executor::TaskExecutor::CallbackHandle> callbackHandle;
// Notification, which will be signaled when the migration completes
- std::shared_ptr<Notification<executor::RemoteCommandResponse>> completionNotification;
+ std::shared_ptr<Notification<Status>> completionNotification;
};
// Used as a type in which to store a list of active migrations. The reason to choose list is
@@ -200,7 +199,7 @@ private:
* The 'shardTakesCollectionDistLock' parameter controls whether the distributed lock is
* acquired by the migration manager or by the shard executing the migration request.
*/
- std::shared_ptr<Notification<executor::RemoteCommandResponse>> _schedule(
+ std::shared_ptr<Notification<Status>> _schedule(
OperationContext* txn,
const MigrateInfo& migrateInfo,
bool shardTakesCollectionDistLock,
@@ -227,7 +226,7 @@ private:
*/
void _completeWithDistLock_inlock(OperationContext* txn,
MigrationsList::iterator itMigration,
- const executor::RemoteCommandResponse& remoteCommandResponse);
+ Status status);
/**
* Immediately schedules the specified migration without attempting to acquire the collection
@@ -261,22 +260,6 @@ private:
*/
void _abandonActiveMigrationsAndEnableManager(OperationContext* txn);
- /**
- * Parses a moveChunk RemoteCommandResponse's two levels of Status objects and distiguishes
- * between errors generated by this config server and the shard primary to which the moveChunk
- * command was sent.
- *
- * If the command failed because of stepdown of this config server, the migration document
- * managed by 'scopedMigrationRequest' is saved for later balancer recovery and a
- * BalancerInterrupted error is returned. If the command failed because the shard to which the
- * command was sent returned an error, then the error is converted into OperationFailed.
- * Shard LockBusy and ChunkTooBig errors are returned without any conversions, as special cases
- * that can dictate further action. Otherwise, the status is returned as it is found.
- */
- Status _processRemoteCommandResponse(
- const executor::RemoteCommandResponse& remoteCommandResponse,
- ScopedMigrationRequest* scopedMigrationRequest);
-
// The service context under which this migration manager runs.
ServiceContext* const _serviceContext;