summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/balancer/migration_manager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/balancer/migration_manager.cpp')
-rw-r--r--src/mongo/db/s/balancer/migration_manager.cpp88
1 files changed, 45 insertions, 43 deletions
diff --git a/src/mongo/db/s/balancer/migration_manager.cpp b/src/mongo/db/s/balancer/migration_manager.cpp
index 7882201e8c2..7f267b97e67 100644
--- a/src/mongo/db/s/balancer/migration_manager.cpp
+++ b/src/mongo/db/s/balancer/migration_manager.cpp
@@ -111,7 +111,7 @@ MigrationManager::~MigrationManager() {
}
MigrationStatuses MigrationManager::executeMigrationsForAutoBalance(
- OperationContext* txn,
+ OperationContext* opCtx,
const vector<MigrateInfo>& migrateInfos,
uint64_t maxChunkSizeBytes,
const MigrationSecondaryThrottleOptions& secondaryThrottle,
@@ -127,7 +127,7 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance(
// Write a document to the config.migrations collection, in case this migration must be
// recovered by the Balancer. Fail if the chunk is already moving.
auto statusWithScopedMigrationRequest =
- ScopedMigrationRequest::writeMigration(txn, migrateInfo, waitForDelete);
+ ScopedMigrationRequest::writeMigration(opCtx, migrateInfo, waitForDelete);
if (!statusWithScopedMigrationRequest.isOK()) {
migrationStatuses.emplace(migrateInfo.getName(),
std::move(statusWithScopedMigrationRequest.getStatus()));
@@ -137,7 +137,7 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance(
std::move(statusWithScopedMigrationRequest.getValue()));
responses.emplace_back(
- _schedule(txn, migrateInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete),
+ _schedule(opCtx, migrateInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete),
migrateInfo);
}
@@ -162,7 +162,7 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance(
}
Status MigrationManager::executeManualMigration(
- OperationContext* txn,
+ OperationContext* opCtx,
const MigrateInfo& migrateInfo,
uint64_t maxChunkSizeBytes,
const MigrationSecondaryThrottleOptions& secondaryThrottle,
@@ -172,15 +172,15 @@ Status MigrationManager::executeManualMigration(
// Write a document to the config.migrations collection, in case this migration must be
// recovered by the Balancer. Fail if the chunk is already moving.
auto statusWithScopedMigrationRequest =
- ScopedMigrationRequest::writeMigration(txn, migrateInfo, waitForDelete);
+ ScopedMigrationRequest::writeMigration(opCtx, migrateInfo, waitForDelete);
if (!statusWithScopedMigrationRequest.isOK()) {
return statusWithScopedMigrationRequest.getStatus();
}
RemoteCommandResponse remoteCommandResponse =
- _schedule(txn, migrateInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete)->get();
+ _schedule(opCtx, migrateInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete)->get();
- auto scopedCMStatus = ScopedChunkManager::refreshAndGet(txn, NamespaceString(migrateInfo.ns));
+ auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, NamespaceString(migrateInfo.ns));
if (!scopedCMStatus.isOK()) {
return scopedCMStatus.getStatus();
}
@@ -204,7 +204,7 @@ Status MigrationManager::executeManualMigration(
return commandStatus;
}
-void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* txn) {
+void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* opCtx) {
{
stdx::lock_guard<stdx::mutex> lock(_mutex);
invariant(_state == State::kStopped);
@@ -214,15 +214,15 @@ void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* txn) {
auto scopedGuard = MakeGuard([&] {
_migrationRecoveryMap.clear();
- _abandonActiveMigrationsAndEnableManager(txn);
+ _abandonActiveMigrationsAndEnableManager(opCtx);
});
- auto distLockManager = Grid::get(txn)->catalogClient(txn)->getDistLockManager();
+ auto distLockManager = Grid::get(opCtx)->catalogClient(opCtx)->getDistLockManager();
// Load the active migrations from the config.migrations collection.
auto statusWithMigrationsQueryResponse =
- Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
- txn,
+ Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
repl::ReadConcernLevel::kLocalReadConcern,
NamespaceString(MigrationType::ConfigNS),
@@ -260,7 +260,7 @@ void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* txn) {
<< migrateType.getNss().ns());
auto statusWithDistLockHandle = distLockManager->tryLockWithLocalWriteConcern(
- txn, migrateType.getNss().ns(), whyMessage, _lockSessionID);
+ opCtx, migrateType.getNss().ns(), whyMessage, _lockSessionID);
if (!statusWithDistLockHandle.isOK()) {
log() << "Failed to acquire distributed lock for collection '"
<< migrateType.getNss().ns()
@@ -277,7 +277,7 @@ void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* txn) {
scopedGuard.Dismiss();
}
-void MigrationManager::finishRecovery(OperationContext* txn,
+void MigrationManager::finishRecovery(OperationContext* opCtx,
uint64_t maxChunkSizeBytes,
const MigrationSecondaryThrottleOptions& secondaryThrottle) {
{
@@ -298,7 +298,7 @@ void MigrationManager::finishRecovery(OperationContext* txn,
auto scopedGuard = MakeGuard([&] {
_migrationRecoveryMap.clear();
- _abandonActiveMigrationsAndEnableManager(txn);
+ _abandonActiveMigrationsAndEnableManager(opCtx);
});
// Schedule recovered migrations.
@@ -310,7 +310,7 @@ void MigrationManager::finishRecovery(OperationContext* txn,
auto& migrateInfos = nssAndMigrateInfos.second;
invariant(!migrateInfos.empty());
- auto scopedCMStatus = ScopedChunkManager::refreshAndGet(txn, nss);
+ auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, nss);
if (!scopedCMStatus.isOK()) {
// This shouldn't happen because the collection was intact and sharded when the previous
// config primary was active and the dist locks have been held by the balancer
@@ -338,23 +338,23 @@ void MigrationManager::finishRecovery(OperationContext* txn,
if (chunk->getShardId() != migrationInfo.from) {
// Chunk is no longer on the source shard specified by this migration. Erase the
// migration recovery document associated with it.
- ScopedMigrationRequest::createForRecovery(txn, nss, migrationInfo.minKey);
+ ScopedMigrationRequest::createForRecovery(opCtx, nss, migrationInfo.minKey);
continue;
}
scopedMigrationRequests.emplace_back(
- ScopedMigrationRequest::createForRecovery(txn, nss, migrationInfo.minKey));
+ ScopedMigrationRequest::createForRecovery(opCtx, nss, migrationInfo.minKey));
scheduledMigrations++;
- responses.emplace_back(
- _schedule(txn, migrationInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete));
+ responses.emplace_back(_schedule(
+ opCtx, migrationInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete));
}
// If no migrations were scheduled for this namespace, free the dist lock
if (!scheduledMigrations) {
- Grid::get(txn)->catalogClient(txn)->getDistLockManager()->unlock(
- txn, _lockSessionID, nss.ns());
+ Grid::get(opCtx)->catalogClient(opCtx)->getDistLockManager()->unlock(
+ opCtx, _lockSessionID, nss.ns());
}
}
@@ -408,7 +408,7 @@ void MigrationManager::drainActiveMigrations() {
}
shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule(
- OperationContext* txn,
+ OperationContext* opCtx,
const MigrateInfo& migrateInfo,
uint64_t maxChunkSizeBytes,
const MigrationSecondaryThrottleOptions& secondaryThrottle,
@@ -425,15 +425,16 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule(
}
}
- const auto fromShardStatus = Grid::get(txn)->shardRegistry()->getShard(txn, migrateInfo.from);
+ const auto fromShardStatus =
+ Grid::get(opCtx)->shardRegistry()->getShard(opCtx, migrateInfo.from);
if (!fromShardStatus.isOK()) {
return std::make_shared<Notification<RemoteCommandResponse>>(
std::move(fromShardStatus.getStatus()));
}
const auto fromShard = fromShardStatus.getValue();
- auto fromHostStatus =
- fromShard->getTargeter()->findHost(txn, ReadPreferenceSetting{ReadPreference::PrimaryOnly});
+ auto fromHostStatus = fromShard->getTargeter()->findHost(
+ opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly});
if (!fromHostStatus.isOK()) {
return std::make_shared<Notification<RemoteCommandResponse>>(
std::move(fromHostStatus.getStatus()));
@@ -444,7 +445,7 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule(
&builder,
nss,
migrateInfo.version,
- repl::ReplicationCoordinator::get(txn)->getConfig().getConnectionString(),
+ repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString(),
migrateInfo.from,
migrateInfo.to,
ChunkRange(migrateInfo.minKey, migrateInfo.maxKey),
@@ -464,15 +465,16 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule(
auto retVal = migration.completionNotification;
- _schedule_inlock(txn, fromHostStatus.getValue(), std::move(migration));
+ _schedule_inlock(opCtx, fromHostStatus.getValue(), std::move(migration));
return retVal;
}
-void MigrationManager::_schedule_inlock(OperationContext* txn,
+void MigrationManager::_schedule_inlock(OperationContext* opCtx,
const HostAndPort& targetHost,
Migration migration) {
- executor::TaskExecutor* const executor = Grid::get(txn)->getExecutorPool()->getFixedExecutor();
+ executor::TaskExecutor* const executor =
+ Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
const NamespaceString nss(migration.nss);
@@ -482,8 +484,8 @@ void MigrationManager::_schedule_inlock(OperationContext* txn,
// Acquire the collection distributed lock (blocking call)
auto statusWithDistLockHandle =
- Grid::get(txn)->catalogClient(txn)->getDistLockManager()->lockWithSessionID(
- txn,
+ Grid::get(opCtx)->catalogClient(opCtx)->getDistLockManager()->lockWithSessionID(
+ opCtx,
nss.ns(),
whyMessage,
_lockSessionID,
@@ -508,7 +510,7 @@ void MigrationManager::_schedule_inlock(OperationContext* txn,
auto itMigration = migrations->begin();
const RemoteCommandRequest remoteRequest(
- targetHost, NamespaceString::kAdminDb.toString(), itMigration->moveChunkCmdObj, txn);
+ targetHost, NamespaceString::kAdminDb.toString(), itMigration->moveChunkCmdObj, opCtx);
StatusWith<executor::TaskExecutor::CallbackHandle> callbackHandleWithStatus =
executor->scheduleRemoteCommand(
@@ -516,10 +518,10 @@ void MigrationManager::_schedule_inlock(OperationContext* txn,
[this, itMigration](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
Client::initThread(getThreadName().c_str());
ON_BLOCK_EXIT([&] { Client::destroy(); });
- auto txn = cc().makeOperationContext();
+ auto opCtx = cc().makeOperationContext();
stdx::lock_guard<stdx::mutex> lock(_mutex);
- _complete_inlock(txn.get(), itMigration, args.response);
+ _complete_inlock(opCtx.get(), itMigration, args.response);
});
if (callbackHandleWithStatus.isOK()) {
@@ -527,10 +529,10 @@ void MigrationManager::_schedule_inlock(OperationContext* txn,
return;
}
- _complete_inlock(txn, itMigration, std::move(callbackHandleWithStatus.getStatus()));
+ _complete_inlock(opCtx, itMigration, std::move(callbackHandleWithStatus.getStatus()));
}
-void MigrationManager::_complete_inlock(OperationContext* txn,
+void MigrationManager::_complete_inlock(OperationContext* opCtx,
MigrationsList::iterator itMigration,
const RemoteCommandResponse& remoteCommandResponse) {
const NamespaceString nss(itMigration->nss);
@@ -547,8 +549,8 @@ void MigrationManager::_complete_inlock(OperationContext* txn,
migrations->erase(itMigration);
if (migrations->empty()) {
- Grid::get(txn)->catalogClient(txn)->getDistLockManager()->unlock(
- txn, _lockSessionID, nss.ns());
+ Grid::get(opCtx)->catalogClient(opCtx)->getDistLockManager()->unlock(
+ opCtx, _lockSessionID, nss.ns());
_activeMigrations.erase(it);
_checkDrained_inlock();
}
@@ -572,7 +574,7 @@ void MigrationManager::_waitForRecovery() {
_condVar.wait(lock, [this] { return _state != State::kRecovering; });
}
-void MigrationManager::_abandonActiveMigrationsAndEnableManager(OperationContext* txn) {
+void MigrationManager::_abandonActiveMigrationsAndEnableManager(OperationContext* opCtx) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
if (_state == State::kStopping) {
// The balancer was interrupted. Let the next balancer recover the state.
@@ -580,16 +582,16 @@ void MigrationManager::_abandonActiveMigrationsAndEnableManager(OperationContext
}
invariant(_state == State::kRecovering);
- auto catalogClient = Grid::get(txn)->catalogClient(txn);
+ auto catalogClient = Grid::get(opCtx)->catalogClient(opCtx);
// Unlock all balancer distlocks we aren't using anymore.
auto distLockManager = catalogClient->getDistLockManager();
- distLockManager->unlockAll(txn, distLockManager->getProcessID());
+ distLockManager->unlockAll(opCtx, distLockManager->getProcessID());
// Clear the config.migrations collection so that those chunks can be scheduled for migration
// again.
catalogClient->removeConfigDocuments(
- txn, MigrationType::ConfigNS, BSONObj(), kMajorityWriteConcern);
+ opCtx, MigrationType::ConfigNS, BSONObj(), kMajorityWriteConcern);
_state = State::kEnabled;
_condVar.notify_all();