summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDianna Hohensee <dianna.hohensee@10gen.com>2016-10-05 14:48:54 -0400
committerDianna Hohensee <dianna.hohensee@10gen.com>2016-10-17 09:49:29 -0400
commitaf2e05b0424b99ee162fe2ef0f1ecf6c687b06f9 (patch)
treeb7e53b434396e56cabd75a90fdf70d36cda6ef2e /src
parentcfb4014c9265992c8760e37938198d7dd8a38b7e (diff)
downloadmongo-af2e05b0424b99ee162fe2ef0f1ecf6c687b06f9.tar.gz
SERVER-26307 Differentiate between config and shard command errors in MigrationManager moveChunk commands
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/balancer/migration_manager.cpp177
-rw-r--r--src/mongo/db/s/balancer/migration_manager.h23
2 files changed, 110 insertions, 90 deletions
diff --git a/src/mongo/db/s/balancer/migration_manager.cpp b/src/mongo/db/s/balancer/migration_manager.cpp
index e1c033669d7..726137f38ff 100644
--- a/src/mongo/db/s/balancer/migration_manager.cpp
+++ b/src/mongo/db/s/balancer/migration_manager.cpp
@@ -68,30 +68,16 @@ const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
Seconds(15));
/**
- * Parses the specified asynchronous command response and converts it to status to use as outcome of
- * an asynchronous migration command. It is necessary for two reasons:
- * - Preserve backwards compatibility with 3.2 and earlier, where the move chunk command instead of
- * returning a ChunkTooBig status includes an extra field in the response.
- * - Convert CallbackCanceled errors into BalancerInterrupted for the cases where the migration
- * manager is being stopped at replica set stepdown. This return code allows the mongos calling
- * logic to retry the operation on a new primary.
+ * Parses the 'commandResponse' and converts it to a status to use as the outcome of the command.
+ * Preserves backwards compatibility with 3.2 and earlier shards that, rather than use a ChunkTooBig
+ * error code, include an extra field in the response.
*/
-Status extractMigrationStatusFromRemoteCommandResponse(const RemoteCommandResponse& response,
- bool isStopping) {
- if (!response.isOK()) {
- if (response.status == ErrorCodes::CallbackCanceled && isStopping) {
- return {ErrorCodes::BalancerInterrupted,
- "Migration interrupted because the balancer is stopping"};
- }
-
- return response.status;
- }
-
- Status commandStatus = getStatusFromCommandResult(response.data);
+Status extractMigrationStatusFromCommandResponse(const BSONObj& commandResponse) {
+ Status commandStatus = getStatusFromCommandResult(commandResponse);
if (!commandStatus.isOK()) {
bool chunkTooBig = false;
- bsonExtractBooleanFieldWithDefault(response.data, kChunkTooBig, false, &chunkTooBig);
+ bsonExtractBooleanFieldWithDefault(commandResponse, kChunkTooBig, false, &chunkTooBig);
if (chunkTooBig) {
commandStatus = {ErrorCodes::ChunkTooBig, commandStatus.reason()};
}
@@ -136,11 +122,10 @@ StatusWith<DistLockHandle> acquireDistLock(OperationContext* txn,
* Returns whether the specified status is an error caused by stepdown of the primary config node
* currently running the balancer.
*/
-bool isErrorDueToBalancerStepdown(Status status) {
- return (status == ErrorCodes::BalancerInterrupted ||
- status == ErrorCodes::InterruptedAtShutdown ||
- status == ErrorCodes::InterruptedDueToReplStateChange ||
- ErrorCodes::isShutdownError(status.code()));
+bool isErrorDueToConfigStepdown(Status status, bool isStopping) {
+ return ((status == ErrorCodes::CallbackCanceled && isStopping) ||
+ status == ErrorCodes::BalancerInterrupted ||
+ status == ErrorCodes::InterruptedDueToReplStateChange);
}
} // namespace
@@ -166,7 +151,7 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance(
{
std::map<MigrationIdentifier, ScopedMigrationRequest> scopedMigrationRequests;
- vector<std::pair<shared_ptr<Notification<Status>>, MigrateInfo>> responses;
+ vector<std::pair<shared_ptr<Notification<RemoteCommandResponse>>, MigrateInfo>> responses;
for (const auto& migrateInfo : migrateInfos) {
// Write a document to the config.migrations collection, in case this migration must be
@@ -197,18 +182,16 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance(
auto notification = std::move(response.first);
auto migrateInfo = std::move(response.second);
- Status responseStatus = notification->get();
+ const auto& remoteCommandResponse = notification->get();
- if (responseStatus == ErrorCodes::LockBusy) {
+ auto it = scopedMigrationRequests.find(migrateInfo.getName());
+ invariant(it != scopedMigrationRequests.end());
+ Status commandStatus =
+ _processRemoteCommandResponse(remoteCommandResponse, &it->second);
+ if (commandStatus == ErrorCodes::LockBusy) {
rescheduledMigrations.emplace_back(std::move(migrateInfo));
} else {
- if (isErrorDueToBalancerStepdown(responseStatus)) {
- auto it = scopedMigrationRequests.find(migrateInfo.getName());
- invariant(it != scopedMigrationRequests.end());
- it->second.keepDocumentOnDestruct();
- }
-
- migrationStatuses.emplace(migrateInfo.getName(), std::move(responseStatus));
+ migrationStatuses.emplace(migrateInfo.getName(), std::move(commandStatus));
}
}
}
@@ -225,19 +208,19 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance(
continue;
}
- Status responseStatus = _schedule(txn,
- migrateInfo,
- true, // Shard takes the collection dist lock
- maxChunkSizeBytes,
- secondaryThrottle,
- waitForDelete)
- ->get();
+ RemoteCommandResponse remoteCommandResponse =
+ _schedule(txn,
+ migrateInfo,
+ true, // Shard takes the collection dist lock
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete)
+ ->get();
- if (isErrorDueToBalancerStepdown(responseStatus)) {
- statusWithScopedMigrationRequest.getValue().keepDocumentOnDestruct();
- }
+ Status commandStatus = _processRemoteCommandResponse(
+ remoteCommandResponse, &statusWithScopedMigrationRequest.getValue());
- migrationStatuses.emplace(migrateInfo.getName(), std::move(responseStatus));
+ migrationStatuses.emplace(migrateInfo.getName(), std::move(commandStatus));
}
invariant(migrationStatuses.size() == migrateInfos.size());
@@ -261,13 +244,14 @@ Status MigrationManager::executeManualMigration(
return statusWithScopedMigrationRequest.getStatus();
}
- Status status = _schedule(txn,
- migrateInfo,
- false, // Config server takes the collection dist lock
- maxChunkSizeBytes,
- secondaryThrottle,
- waitForDelete)
- ->get();
+ RemoteCommandResponse remoteCommandResponse =
+ _schedule(txn,
+ migrateInfo,
+ false, // Config server takes the collection dist lock
+ maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete)
+ ->get();
auto scopedCMStatus = ScopedChunkManager::getExisting(txn, NamespaceString(migrateInfo.ns));
if (!scopedCMStatus.isOK()) {
@@ -280,23 +264,18 @@ Status MigrationManager::executeManualMigration(
auto chunk = cm->findIntersectingChunkWithSimpleCollation(txn, migrateInfo.minKey);
invariant(chunk);
- // The order of the checks below is important due to the need for interrupted migration calls to
- // be able to join any possibly completed migrations, which are still running in the
- // waitForDelete step.
- if (isErrorDueToBalancerStepdown(status)) {
- statusWithScopedMigrationRequest.getValue().keepDocumentOnDestruct();
-
- // We want the mongos to get a retriable error, and not make its replica set monitor
- // interpret something like InterruptedDueToReplStateChange as the config server when the
- // error comes from the shard.
- return {ErrorCodes::BalancerInterrupted, status.reason()};
- } else if (chunk->getShardId() == migrateInfo.to) {
- // Regardless of the status, if the chunk's current shard matches the destination, deem the
- // move as success.
+ Status commandStatus = _processRemoteCommandResponse(
+ remoteCommandResponse, &statusWithScopedMigrationRequest.getValue());
+
+ // Migration calls can be interrupted after the metadata is committed but before the command
+ // finishes the waitForDelete stage. Any failovers, therefore, must always cause the moveChunk
+ // command to be retried so as to assure that the waitForDelete promise of a successful command
+ // has been fulfilled.
+ if (chunk->getShardId() == migrateInfo.to && commandStatus != ErrorCodes::BalancerInterrupted) {
return Status::OK();
}
- return status;
+ return commandStatus;
}
void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* txn) {
@@ -404,7 +383,7 @@ void MigrationManager::finishRecovery(OperationContext* txn,
// Schedule recovered migrations.
vector<ScopedMigrationRequest> scopedMigrationRequests;
- vector<shared_ptr<Notification<Status>>> responses;
+ vector<shared_ptr<Notification<RemoteCommandResponse>>> responses;
for (auto& nssAndMigrateInfos : _migrationRecoveryMap) {
auto& nss = nssAndMigrateInfos.first;
@@ -521,7 +500,7 @@ void MigrationManager::drainActiveMigrations() {
_state = State::kStopped;
}
-shared_ptr<Notification<Status>> MigrationManager::_schedule(
+shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule(
OperationContext* txn,
const MigrateInfo& migrateInfo,
bool shardTakesCollectionDistLock,
@@ -534,7 +513,7 @@ shared_ptr<Notification<Status>> MigrationManager::_schedule(
{
stdx::lock_guard<stdx::mutex> lock(_mutex);
if (_state != State::kEnabled && _state != State::kRecovering) {
- return std::make_shared<Notification<Status>>(
+ return std::make_shared<Notification<RemoteCommandResponse>>(
Status(ErrorCodes::BalancerInterrupted,
"Migration cannot be executed because the balancer is not running"));
}
@@ -545,7 +524,7 @@ shared_ptr<Notification<Status>> MigrationManager::_schedule(
// shard as well, but doing them here saves an extra network call, which might otherwise fail.
auto statusWithScopedChunkManager = ScopedChunkManager::getExisting(txn, nss);
if (!statusWithScopedChunkManager.isOK()) {
- return std::make_shared<Notification<Status>>(
+ return std::make_shared<Notification<RemoteCommandResponse>>(
std::move(statusWithScopedChunkManager.getStatus()));
}
@@ -557,7 +536,7 @@ shared_ptr<Notification<Status>> MigrationManager::_schedule(
// If the chunk is not found exactly as requested, the caller must have stale data
if (SimpleBSONObjComparator::kInstance.evaluate(chunk->getMin() != migrateInfo.minKey) ||
SimpleBSONObjComparator::kInstance.evaluate(chunk->getMax() != migrateInfo.maxKey)) {
- return std::make_shared<Notification<Status>>(Status(
+ return std::make_shared<Notification<RemoteCommandResponse>>(Status(
ErrorCodes::IncompatibleShardingMetadata,
stream() << "Chunk " << ChunkRange(migrateInfo.minKey, migrateInfo.maxKey).toString()
<< " does not exist."));
@@ -565,14 +544,16 @@ shared_ptr<Notification<Status>> MigrationManager::_schedule(
const auto fromShardStatus = Grid::get(txn)->shardRegistry()->getShard(txn, migrateInfo.from);
if (!fromShardStatus.isOK()) {
- return std::make_shared<Notification<Status>>(std::move(fromShardStatus.getStatus()));
+ return std::make_shared<Notification<RemoteCommandResponse>>(
+ std::move(fromShardStatus.getStatus()));
}
const auto fromShard = fromShardStatus.getValue();
auto fromHostStatus =
fromShard->getTargeter()->findHost(txn, ReadPreferenceSetting{ReadPreference::PrimaryOnly});
if (!fromHostStatus.isOK()) {
- return std::make_shared<Notification<Status>>(std::move(fromHostStatus.getStatus()));
+ return std::make_shared<Notification<RemoteCommandResponse>>(
+ std::move(fromHostStatus.getStatus()));
}
BSONObjBuilder builder;
@@ -593,7 +574,7 @@ shared_ptr<Notification<Status>> MigrationManager::_schedule(
stdx::lock_guard<stdx::mutex> lock(_mutex);
if (_state != State::kEnabled && _state != State::kRecovering) {
- return std::make_shared<Notification<Status>>(
+ return std::make_shared<Notification<RemoteCommandResponse>>(
Status(ErrorCodes::BalancerInterrupted,
"Migration cannot be executed because the balancer is not running"));
}
@@ -652,11 +633,7 @@ void MigrationManager::_scheduleWithDistLock_inlock(OperationContext* txn,
auto txn = cc().makeOperationContext();
stdx::lock_guard<stdx::mutex> lock(_mutex);
- _completeWithDistLock_inlock(
- txn.get(),
- itMigration,
- extractMigrationStatusFromRemoteCommandResponse(
- args.response, _state != State::kEnabled && _state != State::kRecovering));
+ _completeWithDistLock_inlock(txn.get(), itMigration, args.response);
});
if (callbackHandleWithStatus.isOK()) {
@@ -667,9 +644,10 @@ void MigrationManager::_scheduleWithDistLock_inlock(OperationContext* txn,
_completeWithDistLock_inlock(txn, itMigration, std::move(callbackHandleWithStatus.getStatus()));
}
-void MigrationManager::_completeWithDistLock_inlock(OperationContext* txn,
- MigrationsList::iterator itMigration,
- Status status) {
+void MigrationManager::_completeWithDistLock_inlock(
+ OperationContext* txn,
+ MigrationsList::iterator itMigration,
+ const RemoteCommandResponse& remoteCommandResponse) {
const NamespaceString nss(itMigration->nss);
// Make sure to signal the notification last, after the distributed lock is freed, so that we
@@ -690,7 +668,7 @@ void MigrationManager::_completeWithDistLock_inlock(OperationContext* txn,
_checkDrained_inlock();
}
- notificationToSignal->set(status);
+ notificationToSignal->set(remoteCommandResponse);
}
void MigrationManager::_scheduleWithoutDistLock_inlock(OperationContext* txn,
@@ -715,8 +693,7 @@ void MigrationManager::_scheduleWithoutDistLock_inlock(OperationContext* txn,
_activeMigrationsWithoutDistLock.erase(itMigration);
_checkDrained_inlock();
- notificationToSignal->set(extractMigrationStatusFromRemoteCommandResponse(
- args.response, _state != State::kEnabled && _state != State::kRecovering));
+ notificationToSignal->set(args.response);
});
if (callbackHandleWithStatus.isOK()) {
@@ -771,10 +748,36 @@ void MigrationManager::_abandonActiveMigrationsAndEnableManager(OperationContext
_condVar.notify_all();
}
+Status MigrationManager::_processRemoteCommandResponse(
+ const RemoteCommandResponse& remoteCommandResponse,
+ ScopedMigrationRequest* scopedMigrationRequest) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ Status commandStatus(ErrorCodes::InternalError, "Uninitialized value.");
+ if (isErrorDueToConfigStepdown(remoteCommandResponse.status,
+ _state != State::kEnabled && _state != State::kRecovering)) {
+ scopedMigrationRequest->keepDocumentOnDestruct();
+ commandStatus = Status(ErrorCodes::BalancerInterrupted,
+ stream() << "Migration interrupted because the balancer is stopping."
+ << causedBy(remoteCommandResponse.status));
+ } else if (!remoteCommandResponse.isOK()) {
+ commandStatus = remoteCommandResponse.status;
+ } else {
+ commandStatus = extractMigrationStatusFromCommandResponse(remoteCommandResponse.data);
+ if (!commandStatus.isOK() && commandStatus != ErrorCodes::LockBusy &&
+ commandStatus != ErrorCodes::ChunkTooBig) {
+ commandStatus = Status(ErrorCodes::OperationFailed,
+ stream() << "moveChunk command failed on source shard."
+ << causedBy(commandStatus));
+ }
+ }
+
+ return commandStatus;
+}
+
MigrationManager::Migration::Migration(NamespaceString inNss, BSONObj inMoveChunkCmdObj)
: nss(std::move(inNss)),
moveChunkCmdObj(std::move(inMoveChunkCmdObj)),
- completionNotification(std::make_shared<Notification<Status>>()) {}
+ completionNotification(std::make_shared<Notification<RemoteCommandResponse>>()) {}
MigrationManager::Migration::~Migration() {
invariant(completionNotification);
diff --git a/src/mongo/db/s/balancer/migration_manager.h b/src/mongo/db/s/balancer/migration_manager.h
index f7730ba65e1..0c80f4f569a 100644
--- a/src/mongo/db/s/balancer/migration_manager.h
+++ b/src/mongo/db/s/balancer/migration_manager.h
@@ -47,6 +47,7 @@
namespace mongo {
class OperationContext;
+class ScopedMigrationRequest;
class ServiceContext;
class Status;
template <typename T>
@@ -164,7 +165,7 @@ private:
boost::optional<executor::TaskExecutor::CallbackHandle> callbackHandle;
// Notification, which will be signaled when the migration completes
- std::shared_ptr<Notification<Status>> completionNotification;
+ std::shared_ptr<Notification<executor::RemoteCommandResponse>> completionNotification;
};
// Used as a type in which to store a list of active migrations. The reason to choose list is
@@ -199,7 +200,7 @@ private:
* The 'shardTakesCollectionDistLock' parameter controls whether the distributed lock is
* acquired by the migration manager or by the shard executing the migration request.
*/
- std::shared_ptr<Notification<Status>> _schedule(
+ std::shared_ptr<Notification<executor::RemoteCommandResponse>> _schedule(
OperationContext* txn,
const MigrateInfo& migrateInfo,
bool shardTakesCollectionDistLock,
@@ -226,7 +227,7 @@ private:
*/
void _completeWithDistLock_inlock(OperationContext* txn,
MigrationsList::iterator itMigration,
- Status status);
+ const executor::RemoteCommandResponse& remoteCommandResponse);
/**
* Immediately schedules the specified migration without attempting to acquire the collection
@@ -260,6 +261,22 @@ private:
*/
void _abandonActiveMigrationsAndEnableManager(OperationContext* txn);
+ /**
+ * Parses a moveChunk RemoteCommandResponse's two levels of Status objects and distiguishes
+ * between errors generated by this config server and the shard primary to which the moveChunk
+ * command was sent.
+ *
+ * If the command failed because of stepdown of this config server, the migration document
+ * managed by 'scopedMigrationRequest' is saved for later balancer recovery and a
+ * BalancerInterrupted error is returned. If the command failed because the shard to which the
+ * command was sent returned an error, then the error is converted into OperationFailed.
+ * Shard LockBusy and ChunkTooBig errors are returned without any conversions, as special cases
+ * that can dictate further action. Otherwise, the status is returned as it is found.
+ */
+ Status _processRemoteCommandResponse(
+ const executor::RemoteCommandResponse& remoteCommandResponse,
+ ScopedMigrationRequest* scopedMigrationRequest);
+
// The service context under which this migration manager runs.
ServiceContext* const _serviceContext;