summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/balancer/migration_manager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/balancer/migration_manager.cpp')
-rw-r--r--src/mongo/db/s/balancer/migration_manager.cpp188
1 files changed, 37 insertions, 151 deletions
diff --git a/src/mongo/db/s/balancer/migration_manager.cpp b/src/mongo/db/s/balancer/migration_manager.cpp
index bd8465338f0..6bccabf7326 100644
--- a/src/mongo/db/s/balancer/migration_manager.cpp
+++ b/src/mongo/db/s/balancer/migration_manager.cpp
@@ -61,15 +61,17 @@ using str::stream;
namespace {
-const char kChunkTooBig[] = "chunkTooBig";
+const char kChunkTooBig[] = "chunkTooBig"; // TODO: delete in 3.8
const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
WriteConcernOptions::SyncMode::UNSET,
Seconds(15));
/**
* Parses the 'commandResponse' and converts it to a status to use as the outcome of the command.
- * Preserves backwards compatibility with 3.2 and earlier shards that, rather than use a ChunkTooBig
+ * Preserves backwards compatibility with 3.4 and earlier shards that, rather than use a ChunkTooBig
* error code, include an extra field in the response.
+ *
+ * TODO: Delete in 3.8
*/
Status extractMigrationStatusFromCommandResponse(const BSONObj& commandResponse) {
Status commandStatus = getStatusFromCommandResult(commandResponse);
@@ -98,23 +100,13 @@ StatusWith<DistLockHandle> acquireDistLock(OperationContext* txn,
txn, nss.ns(), whyMessage, lockSessionID, DistLockManager::kSingleLockAttemptTimeout);
if (!statusWithDistLockHandle.isOK()) {
- // If we get LockBusy while trying to acquire the collection distributed lock, this implies
- // that a concurrent collection operation is running either on a 3.2 shard or on mongos.
- // Convert it to ConflictingOperationInProgress to better indicate the error.
- //
- // In addition, the code which re-schedules parallel migrations serially for 3.2 shard
- // compatibility uses the LockBusy code as a hint to do the reschedule.
- const ErrorCodes::Error code = (statusWithDistLockHandle == ErrorCodes::LockBusy
- ? ErrorCodes::ConflictingOperationInProgress
- : statusWithDistLockHandle.getStatus().code());
-
- return {code,
+ return {statusWithDistLockHandle.getStatus().code(),
stream() << "Could not acquire collection lock for " << nss.ns()
<< " to migrate chunks, due to "
<< statusWithDistLockHandle.getStatus().reason()};
}
- return std::move(statusWithDistLockHandle.getValue());
+ return statusWithDistLockHandle;
}
/**
@@ -134,7 +126,7 @@ MigrationManager::MigrationManager(ServiceContext* serviceContext)
MigrationManager::~MigrationManager() {
// The migration manager must be completely quiesced at destruction time
- invariant(_activeMigrationsWithoutDistLock.empty());
+ invariant(_activeMigrations.empty());
}
MigrationStatuses MigrationManager::executeMigrationsForAutoBalance(
@@ -146,8 +138,6 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance(
MigrationStatuses migrationStatuses;
- vector<MigrateInfo> rescheduledMigrations;
-
{
std::map<MigrationIdentifier, ScopedMigrationRequest> scopedMigrationRequests;
vector<std::pair<shared_ptr<Notification<RemoteCommandResponse>>, MigrateInfo>> responses;
@@ -165,18 +155,12 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance(
scopedMigrationRequests.emplace(migrateInfo.getName(),
std::move(statusWithScopedMigrationRequest.getValue()));
- responses.emplace_back(_schedule(txn,
- migrateInfo,
- false, // Config server takes the collection dist lock
- maxChunkSizeBytes,
- secondaryThrottle,
- waitForDelete),
- migrateInfo);
+ responses.emplace_back(
+ _schedule(txn, migrateInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete),
+ migrateInfo);
}
- // Wait for all the scheduled migrations to complete and note the ones, which failed with a
- // LockBusy error code. These need to be executed serially, without the distributed lock
- // being held by the config server for backwards compatibility with 3.2 shards.
+ // Wait for all the scheduled migrations to complete.
for (auto& response : responses) {
auto notification = std::move(response.first);
auto migrateInfo = std::move(response.second);
@@ -187,39 +171,8 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance(
invariant(it != scopedMigrationRequests.end());
Status commandStatus =
_processRemoteCommandResponse(remoteCommandResponse, &it->second);
- if (commandStatus == ErrorCodes::LockBusy) {
- rescheduledMigrations.emplace_back(std::move(migrateInfo));
- } else {
- migrationStatuses.emplace(migrateInfo.getName(), std::move(commandStatus));
- }
- }
- }
-
- // Schedule all 3.2 compatibility migrations sequentially
- for (const auto& migrateInfo : rescheduledMigrations) {
- // Write a document to the config.migrations collection, in case this migration must be
- // recovered by the Balancer. Fail if the chunk is already moving.
- auto statusWithScopedMigrationRequest =
- ScopedMigrationRequest::writeMigration(txn, migrateInfo, waitForDelete);
- if (!statusWithScopedMigrationRequest.isOK()) {
- migrationStatuses.emplace(migrateInfo.getName(),
- std::move(statusWithScopedMigrationRequest.getStatus()));
- continue;
+ migrationStatuses.emplace(migrateInfo.getName(), std::move(commandStatus));
}
-
- RemoteCommandResponse remoteCommandResponse =
- _schedule(txn,
- migrateInfo,
- true, // Shard takes the collection dist lock
- maxChunkSizeBytes,
- secondaryThrottle,
- waitForDelete)
- ->get();
-
- Status commandStatus = _processRemoteCommandResponse(
- remoteCommandResponse, &statusWithScopedMigrationRequest.getValue());
-
- migrationStatuses.emplace(migrateInfo.getName(), std::move(commandStatus));
}
invariant(migrationStatuses.size() == migrateInfos.size());
@@ -244,13 +197,7 @@ Status MigrationManager::executeManualMigration(
}
RemoteCommandResponse remoteCommandResponse =
- _schedule(txn,
- migrateInfo,
- false, // Config server takes the collection dist lock
- maxChunkSizeBytes,
- secondaryThrottle,
- waitForDelete)
- ->get();
+ _schedule(txn, migrateInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete)->get();
auto scopedCMStatus = ScopedChunkManager::refreshAndGet(txn, NamespaceString(migrateInfo.ns));
if (!scopedCMStatus.isOK()) {
@@ -343,10 +290,7 @@ void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* txn) {
auto statusWithDistLockHandle = distLockManager->tryLockWithLocalWriteConcern(
txn, migrateType.getNss().ns(), whyMessage, _lockSessionID);
- if (!statusWithDistLockHandle.isOK() &&
- statusWithDistLockHandle.getStatus() != ErrorCodes::LockBusy) {
- // LockBusy is alright because that should mean a 3.2 shard has it for the active
- // migration.
+ if (!statusWithDistLockHandle.isOK()) {
log() << "Failed to acquire distributed lock for collection '"
<< migrateType.getNss().ns()
<< "' during balancer recovery of an active migration. Abandoning"
@@ -432,12 +376,8 @@ void MigrationManager::finishRecovery(OperationContext* txn,
scheduledMigrations++;
- responses.emplace_back(_schedule(txn,
- migrationInfo,
- false, // Config server takes the collection dist lock
- maxChunkSizeBytes,
- secondaryThrottle,
- waitForDelete));
+ responses.emplace_back(
+ _schedule(txn, migrationInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete));
}
// If no migrations were scheduled for this namespace, free the dist lock
@@ -473,7 +413,7 @@ void MigrationManager::interruptAndDisableMigrations() {
_state = State::kStopping;
// Interrupt any active migrations with dist lock
- for (auto& cmsEntry : _activeMigrationsWithDistLock) {
+ for (auto& cmsEntry : _activeMigrations) {
auto* cms = &cmsEntry.second;
for (auto& migration : cms->migrations) {
@@ -483,13 +423,6 @@ void MigrationManager::interruptAndDisableMigrations() {
}
}
- // Interrupt any active migrations without dist lock
- for (auto& migration : _activeMigrationsWithoutDistLock) {
- if (migration.callbackHandle) {
- executor->cancel(*migration.callbackHandle);
- }
- }
-
_checkDrained_inlock();
}
@@ -499,18 +432,13 @@ void MigrationManager::drainActiveMigrations() {
if (_state == State::kStopped)
return;
invariant(_state == State::kStopping);
-
- _condVar.wait(lock, [this] {
- return _activeMigrationsWithDistLock.empty() && _activeMigrationsWithoutDistLock.empty();
- });
-
+ _condVar.wait(lock, [this] { return _activeMigrations.empty(); });
_state = State::kStopped;
}
shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule(
OperationContext* txn,
const MigrateInfo& migrateInfo,
- bool shardTakesCollectionDistLock,
uint64_t maxChunkSizeBytes,
const MigrationSecondaryThrottleOptions& secondaryThrottle,
bool waitForDelete) {
@@ -575,8 +503,7 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule(
chunk->getLastmod(),
maxChunkSizeBytes,
secondaryThrottle,
- waitForDelete,
- shardTakesCollectionDistLock);
+ waitForDelete);
stdx::lock_guard<stdx::mutex> lock(_mutex);
@@ -590,24 +517,20 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule(
auto retVal = migration.completionNotification;
- if (shardTakesCollectionDistLock) {
- _scheduleWithoutDistLock_inlock(txn, fromHostStatus.getValue(), std::move(migration));
- } else {
- _scheduleWithDistLock_inlock(txn, fromHostStatus.getValue(), std::move(migration));
- }
+ _schedule_inlock(txn, fromHostStatus.getValue(), std::move(migration));
return retVal;
}
-void MigrationManager::_scheduleWithDistLock_inlock(OperationContext* txn,
- const HostAndPort& targetHost,
- Migration migration) {
+void MigrationManager::_schedule_inlock(OperationContext* txn,
+ const HostAndPort& targetHost,
+ Migration migration) {
executor::TaskExecutor* const executor = Grid::get(txn)->getExecutorPool()->getFixedExecutor();
const NamespaceString nss(migration.nss);
- auto it = _activeMigrationsWithDistLock.find(nss);
- if (it == _activeMigrationsWithDistLock.end()) {
+ auto it = _activeMigrations.find(nss);
+ if (it == _activeMigrations.end()) {
// Acquire the collection distributed lock (blocking call)
auto distLockHandleStatus = acquireDistLock(txn, _lockSessionID, nss);
if (!distLockHandleStatus.isOK()) {
@@ -615,7 +538,7 @@ void MigrationManager::_scheduleWithDistLock_inlock(OperationContext* txn,
return;
}
- it = _activeMigrationsWithDistLock
+ it = _activeMigrations
.insert(std::make_pair(
nss, CollectionMigrationsState(std::move(distLockHandleStatus.getValue()))))
.first;
@@ -640,7 +563,7 @@ void MigrationManager::_scheduleWithDistLock_inlock(OperationContext* txn,
auto txn = cc().makeOperationContext();
stdx::lock_guard<stdx::mutex> lock(_mutex);
- _completeWithDistLock_inlock(txn.get(), itMigration, args.response);
+ _complete_inlock(txn.get(), itMigration, args.response);
});
if (callbackHandleWithStatus.isOK()) {
@@ -648,13 +571,12 @@ void MigrationManager::_scheduleWithDistLock_inlock(OperationContext* txn,
return;
}
- _completeWithDistLock_inlock(txn, itMigration, std::move(callbackHandleWithStatus.getStatus()));
+ _complete_inlock(txn, itMigration, std::move(callbackHandleWithStatus.getStatus()));
}
-void MigrationManager::_completeWithDistLock_inlock(
- OperationContext* txn,
- MigrationsList::iterator itMigration,
- const RemoteCommandResponse& remoteCommandResponse) {
+void MigrationManager::_complete_inlock(OperationContext* txn,
+ MigrationsList::iterator itMigration,
+ const RemoteCommandResponse& remoteCommandResponse) {
const NamespaceString nss(itMigration->nss);
// Make sure to signal the notification last, after the distributed lock is freed, so that we
@@ -662,8 +584,8 @@ void MigrationManager::_completeWithDistLock_inlock(
// still acquired.
auto notificationToSignal = itMigration->completionNotification;
- auto it = _activeMigrationsWithDistLock.find(nss);
- invariant(it != _activeMigrationsWithDistLock.end());
+ auto it = _activeMigrations.find(nss);
+ invariant(it != _activeMigrations.end());
auto collectionMigrationState = &it->second;
collectionMigrationState->migrations.erase(itMigration);
@@ -671,58 +593,20 @@ void MigrationManager::_completeWithDistLock_inlock(
if (collectionMigrationState->migrations.empty()) {
Grid::get(txn)->catalogClient(txn)->getDistLockManager()->unlock(
txn, collectionMigrationState->distLockHandle, nss.ns());
- _activeMigrationsWithDistLock.erase(it);
+ _activeMigrations.erase(it);
_checkDrained_inlock();
}
notificationToSignal->set(remoteCommandResponse);
}
-void MigrationManager::_scheduleWithoutDistLock_inlock(OperationContext* txn,
- const HostAndPort& targetHost,
- Migration migration) {
- executor::TaskExecutor* const executor = Grid::get(txn)->getExecutorPool()->getFixedExecutor();
-
- _activeMigrationsWithoutDistLock.push_front(std::move(migration));
- auto itMigration = _activeMigrationsWithoutDistLock.begin();
-
- const RemoteCommandRequest remoteRequest(
- targetHost, NamespaceString::kAdminDb.toString(), itMigration->moveChunkCmdObj, txn);
-
- StatusWith<executor::TaskExecutor::CallbackHandle> callbackHandleWithStatus =
- executor->scheduleRemoteCommand(
- remoteRequest,
- [this, itMigration](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
- auto notificationToSignal = itMigration->completionNotification;
-
- stdx::lock_guard<stdx::mutex> lock(_mutex);
-
- _activeMigrationsWithoutDistLock.erase(itMigration);
- _checkDrained_inlock();
-
- notificationToSignal->set(args.response);
- });
-
- if (callbackHandleWithStatus.isOK()) {
- itMigration->callbackHandle = std::move(callbackHandleWithStatus.getValue());
- return;
- }
-
- auto notificationToSignal = itMigration->completionNotification;
-
- _activeMigrationsWithoutDistLock.erase(itMigration);
- _checkDrained_inlock();
-
- notificationToSignal->set(std::move(callbackHandleWithStatus.getStatus()));
-}
-
void MigrationManager::_checkDrained_inlock() {
if (_state == State::kEnabled || _state == State::kRecovering) {
return;
}
invariant(_state == State::kStopping);
- if (_activeMigrationsWithDistLock.empty() && _activeMigrationsWithoutDistLock.empty()) {
+ if (_activeMigrations.empty()) {
_condVar.notify_all();
}
}
@@ -758,6 +642,7 @@ void MigrationManager::_abandonActiveMigrationsAndEnableManager(OperationContext
Status MigrationManager::_processRemoteCommandResponse(
const RemoteCommandResponse& remoteCommandResponse,
ScopedMigrationRequest* scopedMigrationRequest) {
+
stdx::lock_guard<stdx::mutex> lock(_mutex);
Status commandStatus(ErrorCodes::InternalError, "Uninitialized value.");
@@ -774,6 +659,7 @@ Status MigrationManager::_processRemoteCommandResponse(
if (!remoteCommandResponse.isOK()) {
commandStatus = remoteCommandResponse.status;
} else {
+ // TODO: delete in 3.8
commandStatus = extractMigrationStatusFromCommandResponse(remoteCommandResponse.data);
}