summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPaolo Polato <paolo.polato@mongodb.com>2021-10-26 10:44:47 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-26 11:28:32 +0000
commit4a8881d3f61be8e57b55a33e076bace92b3e6aad (patch)
tree43d1e28811b14d3b450dbe1b7c5e20f4759a9674 /src
parent6675c4f7819ae73f6cdc6658bd3ea318382ef325 (diff)
downloadmongo-4a8881d3f61be8e57b55a33e076bace92b3e6aad.tar.gz
SERVER-60336 Replace the balancer's MigrationScheduler with a BalancerCommandsScheduler
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/SConscript21
-rw-r--r--src/mongo/db/s/balancer/balancer.cpp151
-rw-r--r--src/mongo/db/s/balancer/balancer.h14
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler.h3
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp216
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h104
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp96
-rw-r--r--src/mongo/db/s/balancer/balancer_dist_locks.cpp78
-rw-r--r--src/mongo/db/s/balancer/balancer_dist_locks.h66
-rw-r--r--src/mongo/db/s/balancer/migration_manager.cpp709
-rw-r--r--src/mongo/db/s/balancer/migration_manager.h286
-rw-r--r--src/mongo/db/s/balancer/migration_manager_test.cpp700
-rw-r--r--src/mongo/db/s/balancer/scoped_migration_request.cpp217
-rw-r--r--src/mongo/db/s/balancer/scoped_migration_request.h120
-rw-r--r--src/mongo/db/s/balancer/scoped_migration_request_test.cpp246
15 files changed, 482 insertions, 2545 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index b0e48daf3c2..a7ff9dc9c9f 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -202,6 +202,18 @@ env.Library(
)
env.Library(
+ target='forwardable_operation_metadata',
+ source=[
+ 'forwardable_operation_metadata.cpp',
+ 'forwardable_operation_metadata.idl'
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/s/grid',
+ ]
+)
+
+env.Library(
target='sharding_catalog_manager',
source=[
'add_shard_cmd.idl',
@@ -212,12 +224,11 @@ env.Library(
'balancer/balancer_chunk_selection_policy.cpp',
'balancer/balancer_command_document.idl',
'balancer/balancer_commands_scheduler_impl.cpp',
+ 'balancer/balancer_dist_locks.cpp',
'balancer/balancer_policy.cpp',
'balancer/balancer.cpp',
'balancer/cluster_statistics_impl.cpp',
'balancer/cluster_statistics.cpp',
- 'balancer/migration_manager.cpp',
- 'balancer/scoped_migration_request.cpp',
'balancer/type_migration.cpp',
'config/initial_split_policy.cpp',
'config/sharding_catalog_manager_chunk_operations.cpp',
@@ -263,6 +274,7 @@ env.Library(
'$BUILD_DIR/mongo/s/coreshard',
'$BUILD_DIR/mongo/s/query/cluster_aggregate',
'$BUILD_DIR/mongo/util/log_and_backoff',
+ 'forwardable_operation_metadata',
'sharding_logging',
],
)
@@ -308,8 +320,6 @@ env.Library(
'drop_database_coordinator_document.idl',
'flush_database_cache_updates_command.cpp',
'flush_routing_table_cache_updates_command.cpp',
- 'forwardable_operation_metadata.cpp',
- 'forwardable_operation_metadata.idl',
'get_database_version_command.cpp',
'get_shard_version_command.cpp',
'merge_chunks_command.cpp',
@@ -374,6 +384,7 @@ env.Library(
'$BUILD_DIR/mongo/s/commands/sharded_cluster_sharding_commands',
'$BUILD_DIR/mongo/s/sharding_initialization',
'$BUILD_DIR/mongo/s/sharding_router_api',
+ 'forwardable_operation_metadata',
'sharding_runtime_d',
],
)
@@ -538,9 +549,7 @@ env.CppUnitTest(
'balancer/cluster_statistics_test.cpp',
'balancer/core_options_stub.cpp',
'balancer/balancer_commands_scheduler_test.cpp',
- 'balancer/migration_manager_test.cpp',
'balancer/migration_test_fixture.cpp',
- 'balancer/scoped_migration_request_test.cpp',
'balancer/type_migration_test.cpp',
'config_server_op_observer_test.cpp',
'config/initial_split_policy_test.cpp',
diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp
index 6071f12be7a..cdd27339a84 100644
--- a/src/mongo/db/s/balancer/balancer.cpp
+++ b/src/mongo/db/s/balancer/balancer.cpp
@@ -151,14 +151,46 @@ void warnOnMultiVersion(const vector<ClusterStatistics::ShardStatistics>& cluste
return;
BSONObjBuilder shardVersions;
- for (const auto& stat : clusterStats)
+ for (const auto& stat : clusterStats) {
shardVersions << stat.shardId << stat.mongoVersion;
+ }
+
LOGV2_WARNING(21875,
"Multiversion cluster detected",
"localVersion"_attr = vii.version(),
"shardVersions"_attr = shardVersions.done());
}
+Status processManualMigrationOutcome(OperationContext* opCtx,
+ const BSONObj& chunkMin,
+ const NamespaceString& nss,
+ const ShardId& destination,
+ Status outcome) {
+ if (outcome.isOK()) {
+ return outcome;
+ }
+
+ auto swCM =
+ Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss);
+ if (!swCM.isOK()) {
+ return swCM.getStatus();
+ }
+
+ const auto currentChunkInfo =
+ swCM.getValue().findIntersectingChunkWithSimpleCollation(chunkMin);
+ if (currentChunkInfo.getShardId() == destination &&
+ outcome != ErrorCodes::BalancerInterrupted) {
+ // Migration calls can be interrupted after the metadata is committed but before the command
+ // finishes the waitForDelete stage. Any failovers, therefore, must always cause the
+ // moveChunk command to be retried so as to assure that the waitForDelete promise of a
+ // successful command has been fulfilled.
+ LOGV2(6036622,
+ "Migration outcome is not OK, but the transaction was committed. Returning success");
+ outcome = Status::OK();
+ }
+ return outcome;
+}
+
const auto _balancerDecoration = ServiceContext::declareDecoration<Balancer>();
const ReplicaSetAwareServiceRegistry::Registerer<Balancer> _balancerRegisterer("Balancer");
@@ -180,9 +212,7 @@ Balancer::Balancer()
_chunkSelectionPolicy(
std::make_unique<BalancerChunkSelectionPolicyImpl>(_clusterStats.get(), _random)),
_commandScheduler(std::make_unique<BalancerCommandsSchedulerImpl>()),
- _chunkMerger(std::make_unique<BalancerChunkMergerImpl>(*_commandScheduler, *_clusterStats)),
- _migrationManager(_balancerDecoration.owner(this)) {}
-
+ _chunkMerger(std::make_unique<BalancerChunkMergerImpl>(*_commandScheduler, *_clusterStats)) {}
Balancer::~Balancer() {
// Terminate the balancer thread so it doesn't leak memory.
@@ -191,9 +221,6 @@ Balancer::~Balancer() {
if (_chunkMerger) {
_chunkMerger->waitForStop();
}
- if (_commandScheduler) {
- _commandScheduler->stop();
- }
}
void Balancer::onStepUpBegin(OperationContext* opCtx, long long term) {
@@ -212,9 +239,6 @@ void Balancer::onStepDown() {
if (_chunkMerger) {
_chunkMerger->onStepDown();
}
- if (_commandScheduler) {
- _commandScheduler->stop();
- }
}
void Balancer::onBecomeArbiter() {
@@ -228,8 +252,6 @@ void Balancer::initiateBalancer(OperationContext* opCtx) {
invariant(_state == kStopped);
_state = kRunning;
- _migrationManager.startRecoveryAndAcquireDistLocks(opCtx);
-
invariant(!_thread.joinable());
invariant(!_threadOperationContext);
_thread = stdx::thread([this] { _mainThread(); });
@@ -250,12 +272,6 @@ void Balancer::interruptBalancer() {
_threadOperationContext->markKilled(ErrorCodes::InterruptedDueToReplStateChange);
}
- // Schedule a separate thread to shutdown the migration manager in order to avoid deadlock with
- // replication step down
- invariant(!_migrationManagerInterruptThread.joinable());
- _migrationManagerInterruptThread =
- stdx::thread([this] { _migrationManager.interruptAndDisableMigrations(); });
-
_condVar.notify_all();
}
@@ -301,11 +317,14 @@ Status Balancer::rebalanceSingleChunk(OperationContext* opCtx,
return refreshStatus;
}
- return _migrationManager.executeManualMigration(opCtx,
- *migrateInfo,
- balancerConfig->getMaxChunkSizeBytes(),
- balancerConfig->getSecondaryThrottle(),
- balancerConfig->waitForDelete());
+ MoveChunkSettings settings(balancerConfig->getMaxChunkSizeBytes(),
+ balancerConfig->getSecondaryThrottle(),
+ balancerConfig->waitForDelete(),
+ migrateInfo->forceJumbo);
+ auto response = _commandScheduler->requestMoveChunk(
+ opCtx, nss, chunk, migrateInfo->to, settings, true /* issuedByRemoteUser */);
+ return processManualMigrationOutcome(
+ opCtx, chunk.getMin(), nss, migrateInfo->to, response->getOutcome());
}
Status Balancer::moveSingleChunk(OperationContext* opCtx,
@@ -321,17 +340,15 @@ Status Balancer::moveSingleChunk(OperationContext* opCtx,
return moveAllowedStatus;
}
- return _migrationManager.executeManualMigration(
- opCtx,
- MigrateInfo(newShardId,
- nss,
- chunk,
- forceJumbo ? MoveChunkRequest::ForceJumbo::kForceManual
- : MoveChunkRequest::ForceJumbo::kDoNotForce,
- MigrateInfo::chunksImbalance),
- maxChunkSizeBytes,
- secondaryThrottle,
- waitForDelete);
+ MoveChunkSettings settings(maxChunkSizeBytes,
+ secondaryThrottle,
+ waitForDelete,
+ forceJumbo ? MoveChunkRequest::ForceJumbo::kForceManual
+ : MoveChunkRequest::ForceJumbo::kDoNotForce);
+ auto response = _commandScheduler->requestMoveChunk(
+ opCtx, nss, chunk, newShardId, settings, true /* issuedByRemoteUser */);
+ return processManualMigrationOutcome(
+ opCtx, chunk.getMin(), nss, newShardId, response->getOutcome());
}
void Balancer::report(OperationContext* opCtx, BSONObjBuilder* builder) {
@@ -388,13 +405,11 @@ void Balancer::_mainThread() {
break;
}
- LOGV2(21857, "CSRS balancer thread is recovering");
+ LOGV2(6036605, "Starting command scheduler");
- _migrationManager.finishRecovery(opCtx.get(),
- balancerConfig->getMaxChunkSizeBytes(),
- balancerConfig->getSecondaryThrottle());
+ _commandScheduler->start(opCtx.get());
- LOGV2(21858, "CSRS balancer thread is recovered");
+ LOGV2(6036606, "Balancer worker thread initialised. Entering main loop.");
// Main balancer loop
while (!_stopRequested()) {
@@ -507,17 +522,13 @@ void Balancer::_mainThread() {
{
stdx::lock_guard<Latch> scopedLock(_mutex);
invariant(_state == kStopping);
- invariant(_migrationManagerInterruptThread.joinable());
}
+ // TODO(SERVER-60459) ensure that the merger gets consistently stopped with the scheduler.
_commandScheduler->stop();
- _migrationManagerInterruptThread.join();
- _migrationManager.drainActiveMigrations();
-
{
stdx::lock_guard<Latch> scopedLock(_mutex);
- _migrationManagerInterruptThread = {};
_threadOperationContext = nullptr;
}
@@ -693,30 +704,36 @@ int Balancer::_moveChunks(OperationContext* opCtx,
return 0;
}
- auto migrationStatuses =
- _migrationManager.executeMigrationsForAutoBalance(opCtx,
- candidateChunks,
- balancerConfig->getMaxChunkSizeBytes(),
- balancerConfig->getSecondaryThrottle(),
- balancerConfig->waitForDelete());
+ std::vector<std::pair<size_t, std::unique_ptr<MoveChunkResponse>>> migrateInfosAndResponses;
+ migrateInfosAndResponses.reserve(candidateChunks.size());
+ for (size_t migrateInfoIndex = 0; migrateInfoIndex < candidateChunks.size();
+ ++migrateInfoIndex) {
+ const auto& migrateInfo = candidateChunks[migrateInfoIndex];
+
+ ChunkType chunk;
+ chunk.setMin(migrateInfo.minKey);
+ chunk.setMax(migrateInfo.maxKey);
+ chunk.setShard(migrateInfo.from);
+ chunk.setVersion(migrateInfo.version);
+
+ MoveChunkSettings settings(balancerConfig->getMaxChunkSizeBytes(),
+ balancerConfig->getSecondaryThrottle(),
+ balancerConfig->waitForDelete(),
+ migrateInfo.forceJumbo);
+ auto response = _commandScheduler->requestMoveChunk(
+ opCtx, migrateInfo.nss, chunk, migrateInfo.to, settings);
+ migrateInfosAndResponses.emplace_back(
+ std::make_pair(migrateInfoIndex, std::move(response)));
+ }
int numChunksProcessed = 0;
-
- for (const auto& migrationStatusEntry : migrationStatuses) {
- const Status& status = migrationStatusEntry.second;
+ for (const auto& migrateInfoAndResponse : migrateInfosAndResponses) {
+ const Status status = migrateInfoAndResponse.second->getOutcome();
if (status.isOK()) {
numChunksProcessed++;
continue;
}
-
- const MigrationIdentifier& migrationId = migrationStatusEntry.first;
-
- const auto requestIt = std::find_if(candidateChunks.begin(),
- candidateChunks.end(),
- [&migrationId](const MigrateInfo& migrateInfo) {
- return migrateInfo.getName() == migrationId;
- });
- invariant(requestIt != candidateChunks.end());
+ const auto& migrateInfo = candidateChunks[migrateInfoAndResponse.first];
// ChunkTooBig is returned by the source shard during the cloning phase if the migration
// manager finds that the chunk is larger than some calculated size, the source shard is
@@ -730,21 +747,21 @@ int Balancer::_moveChunks(OperationContext* opCtx,
LOGV2(21871,
"Migration {migrateInfo} failed with {error}, going to try splitting the chunk",
"Migration failed, going to try splitting the chunk",
- "migrateInfo"_attr = redact(requestIt->toString()),
+ "migrateInfo"_attr = redact(migrateInfo.toString()),
"error"_attr = redact(status));
const CollectionType collection = catalogClient->getCollection(
- opCtx, requestIt->uuid, repl::ReadConcernLevel::kLocalReadConcern);
+ opCtx, migrateInfo.uuid, repl::ReadConcernLevel::kLocalReadConcern);
ShardingCatalogManager::get(opCtx)->splitOrMarkJumbo(
- opCtx, collection.getNss(), requestIt->minKey);
+ opCtx, collection.getNss(), migrateInfo.minKey);
continue;
}
LOGV2(21872,
"Migration {migrateInfo} failed with {error}",
"Migration failed",
- "migrateInfo"_attr = redact(requestIt->toString()),
+ "migrateInfo"_attr = redact(migrateInfo.toString()),
"error"_attr = redact(status));
}
@@ -774,8 +791,6 @@ void Balancer::_mergeChunksIfNeeded(OperationContext* opCtx) {
LOGV2(8423320, "Balancer will perform chunk merges");
- _commandScheduler->start(opCtx);
-
auto* manager = ShardingCatalogManager::get(opCtx);
for (CollectionType const& coll : collections) {
auto progress = _chunkMerger->mergeChunksOnShards(opCtx, coll);
diff --git a/src/mongo/db/s/balancer/balancer.h b/src/mongo/db/s/balancer/balancer.h
index 7433b2ca495..7d1c8069b72 100644
--- a/src/mongo/db/s/balancer/balancer.h
+++ b/src/mongo/db/s/balancer/balancer.h
@@ -32,7 +32,6 @@
#include "mongo/db/repl/replica_set_aware_service.h"
#include "mongo/db/s/balancer/balancer_chunk_selection_policy.h"
#include "mongo/db/s/balancer/balancer_random.h"
-#include "mongo/db/s/balancer/migration_manager.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/thread.h"
@@ -290,16 +289,6 @@ private:
// thread.
OperationContext* _threadOperationContext{nullptr};
- // This thread is only available in the kStopping state and is necessary for the migration
- // manager shutdown to not deadlock with replica set step down. In particular, the migration
- // manager's order of lock acquisition is mutex, then collection lock, whereas stepdown first
- // acquires the global S lock and then acquires the migration manager's mutex.
- //
- // The interrupt thread is scheduled when the balancer enters the kStopping state (which is at
- // step down) and is joined outside of lock, when the replica set leaves draining mode, outside
- // of the global X lock.
- stdx::thread _migrationManagerInterruptThread;
-
// Indicates whether the balancer is currently executing a balancer round
bool _inBalancerRound{false};
@@ -330,9 +319,6 @@ private:
std::unique_ptr<BalancerCommandsScheduler> _commandScheduler;
std::unique_ptr<BalancerChunkMerger> _chunkMerger;
-
- // Migration manager used to schedule and manage migrations
- MigrationManager _migrationManager;
};
} // namespace mongo
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler.h b/src/mongo/db/s/balancer/balancer_commands_scheduler.h
index 2083972413b..7766950b9d2 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler.h
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler.h
@@ -145,7 +145,8 @@ public:
const NamespaceString& nss,
const ChunkType& chunk,
const ShardId& recipient,
- const MoveChunkSettings& commandSettings) = 0;
+ const MoveChunkSettings& commandSettings,
+ bool issuedByRemoteUser = false) = 0;
virtual std::unique_ptr<MergeChunksResponse> requestMergeChunks(
OperationContext* opCtx,
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
index 734c6ba8f04..1f923f4edbc 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
@@ -43,6 +43,7 @@
namespace mongo {
MONGO_FAIL_POINT_DEFINE(pauseBalancerWorkerThread);
+MONGO_FAIL_POINT_DEFINE(deferredCleanupCompletedCheckpoint);
const std::string MergeChunksCommandInfo::kCommandName = "mergeChunks";
const std::string MergeChunksCommandInfo::kBounds = "bounds";
@@ -87,8 +88,6 @@ void BalancerCommandsSchedulerImpl::start(OperationContext* opCtx) {
}
invariant(!_workerThreadHandle.joinable());
- _incompleteRequests.reserve(_maxRunningRequests * 10);
- _runningRequestIds.reserve(_maxRunningRequests);
auto requestsToRecover = _loadRequestsToRecover(opCtx);
_state = requestsToRecover.empty() ? SchedulerState::Running : SchedulerState::Recovering;
@@ -120,7 +119,11 @@ std::unique_ptr<MoveChunkResponse> BalancerCommandsSchedulerImpl::requestMoveChu
const NamespaceString& nss,
const ChunkType& chunk,
const ShardId& recipient,
- const MoveChunkSettings& commandSettings) {
+ const MoveChunkSettings& commandSettings,
+ bool issuedByRemoteUser) {
+
+ auto externalClientInfo =
+ issuedByRemoteUser ? boost::optional<ExternalClientInfo>(opCtx) : boost::none;
auto commandInfo = std::make_shared<MoveChunkCommandInfo>(nss,
chunk.getShard(),
@@ -131,7 +134,8 @@ std::unique_ptr<MoveChunkResponse> BalancerCommandsSchedulerImpl::requestMoveChu
commandSettings.secondaryThrottle,
commandSettings.waitForDelete,
commandSettings.forceJumbo,
- chunk.getVersion());
+ chunk.getVersion(),
+ std::move(externalClientInfo));
auto requestCollectionInfo = _buildAndEnqueueNewRequest(opCtx, std::move(commandInfo));
return std::make_unique<MoveChunkResponseImpl>(std::move(requestCollectionInfo));
@@ -215,10 +219,9 @@ ResponseHandle BalancerCommandsSchedulerImpl::_buildAndEnqueueNewRequest(
OperationContext* opCtx, std::shared_ptr<CommandInfo>&& commandInfo) {
const auto newRequestId = UUID::gen();
LOGV2(5847202,
- "Enqueuing new Balancer command request with id {reqId}. Details: {command}",
"Enqueuing new Balancer command request",
"reqId"_attr = newRequestId,
- "command"_attr = commandInfo->serialise().toString(),
+ "command"_attr = redact(commandInfo->serialise().toString()),
"recoveryDocRequired"_attr = commandInfo->requiresRecoveryOnCrash());
if (commandInfo->requiresRecoveryOnCrash()) {
@@ -238,7 +241,6 @@ ResponseHandle BalancerCommandsSchedulerImpl::_buildAndEnqueueNewRequest(
if (auto writeStatus = getStatusFromWriteCommandReply(reply); !writeStatus.isOK()) {
LOGV2(5847210,
- "Failed to persist command document for reqId {reqId}. Error status: {status}",
"Failed to persist request command document",
"reqId"_attr = newRequestId,
"status"_attr = writeStatus);
@@ -263,61 +265,23 @@ ResponseHandle BalancerCommandsSchedulerImpl::_enqueueRequest(WithLock, RequestD
_stateUpdatedCV.notify_all();
} else {
deferredResponseHandle.handle->set(Status(
- ErrorCodes::CallbackCanceled, "Request rejected - balancer scheduler is stopped"));
+ ErrorCodes::BalancerInterrupted, "Request rejected - balancer scheduler is stopped"));
}
return deferredResponseHandle;
}
bool BalancerCommandsSchedulerImpl::_canSubmitNewRequests(WithLock) {
- return (!_pendingRequestIds.empty() && _runningRequestIds.size() < _maxRunningRequests &&
- MONGO_likely(!pauseBalancerWorkerThread.shouldFail()));
+ return (!_pendingRequestIds.empty() && MONGO_likely(!pauseBalancerWorkerThread.shouldFail()));
}
-Status BalancerCommandsSchedulerImpl::_acquireDistLock(OperationContext* opCtx,
- NamespaceString nss) {
- auto it = _migrationLocks.find(nss);
- if (it != _migrationLocks.end()) {
- ++it->second.numMigrations;
- return Status::OK();
- } else {
- boost::optional<DistLockManager::ScopedLock> scopedLock;
- try {
- scopedLock.emplace(DistLockManager::get(opCtx)->lockDirectLocally(
- opCtx, nss.ns(), DistLockManager::kSingleLockAttemptTimeout));
-
- const std::string whyMessage(str::stream()
- << "Migrating chunk(s) in collection " << nss.ns());
- uassertStatusOK(DistLockManager::get(opCtx)->lockDirect(
- opCtx, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout));
- } catch (const DBException& ex) {
- return ex.toStatus(str::stream() << "Could not acquire collection lock for " << nss.ns()
- << " to migrate chunks");
- }
- Migrations migrationData(std::move(*scopedLock));
- _migrationLocks.insert(std::make_pair(nss, std::move(migrationData)));
- }
- return Status::OK();
-}
-
-void BalancerCommandsSchedulerImpl::_releaseDistLock(OperationContext* opCtx, NamespaceString nss) {
- auto it = _migrationLocks.find(nss);
- if (it == _migrationLocks.end()) {
- return;
- } else if (it->second.numMigrations == 1) {
- DistLockManager::get(opCtx)->unlock(opCtx, nss.ns());
- _migrationLocks.erase(it);
- } else {
- --it->second.numMigrations;
- }
+bool BalancerCommandsSchedulerImpl::_deferredCleanupRequired(WithLock) {
+ return (!_obsoleteRecoveryDocumentIds.empty() || !_lockedReferencesToRelease.empty());
}
CommandSubmissionResult BalancerCommandsSchedulerImpl::_submit(
OperationContext* opCtx, const CommandSubmissionHandle& handle) {
- LOGV2(5847203,
- "Balancer command request id {reqId} submitted for execution",
- "Balancer command request submitted for execution",
- "reqId"_attr = handle.id);
+ LOGV2(5847203, "Balancer command request submitted for execution", "reqId"_attr = handle.id);
auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
const auto shardWithStatus =
@@ -338,14 +302,14 @@ CommandSubmissionResult BalancerCommandsSchedulerImpl::_submit(
opCtx);
auto onRemoteResponseReceived =
- [this, opCtx, requestId = handle.id](
- const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
- _applyCommandResponse(opCtx, requestId, args.response);
+ [this,
+ requestId = handle.id](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
+ _applyCommandResponse(requestId, args.response);
};
if (handle.commandInfo->requiresDistributedLock()) {
Status lockAcquisitionResponse =
- _acquireDistLock(opCtx, handle.commandInfo->getNameSpace());
+ _distributedLocks.acquireFor(opCtx, handle.commandInfo->getNameSpace());
if (!lockAcquisitionResponse.isOK()) {
return CommandSubmissionResult(handle.id, false, lockAcquisitionResponse);
}
@@ -360,12 +324,11 @@ CommandSubmissionResult BalancerCommandsSchedulerImpl::_submit(
}
void BalancerCommandsSchedulerImpl::_applySubmissionResult(
- WithLock, OperationContext* opCtx, CommandSubmissionResult&& submissionResult) {
+ WithLock, CommandSubmissionResult&& submissionResult) {
auto requestToUpdateIt = _incompleteRequests.find(submissionResult.id);
if (requestToUpdateIt == _incompleteRequests.end()) {
LOGV2(5847209,
- "Skipping _applySubmissionResult: reqId {reqId} already completed/canceled",
- "Skipping _applySubmissionResult: reqId already completed/canceled",
+ "Skipping _applySubmissionResult: request already completed/canceled",
"reqId"_attr = submissionResult.id);
return;
}
@@ -385,7 +348,7 @@ void BalancerCommandsSchedulerImpl::_applySubmissionResult(
} else {
const auto& submittedCommandInfo = requestToUpdate.getCommandInfo();
if (submissionResult.acquiredDistLock) {
- _releaseDistLock(opCtx, submittedCommandInfo.getNameSpace());
+ _lockedReferencesToRelease.emplace_back(submittedCommandInfo.getNameSpace());
}
if (submittedCommandInfo.requiresRecoveryOnCrash()) {
_obsoleteRecoveryDocumentIds.push_back(submissionResult.id);
@@ -396,9 +359,7 @@ void BalancerCommandsSchedulerImpl::_applySubmissionResult(
}
void BalancerCommandsSchedulerImpl::_applyCommandResponse(
- OperationContext* opCtx,
- UUID requestId,
- const executor::TaskExecutor::ResponseStatus& response) {
+ UUID requestId, const executor::TaskExecutor::ResponseStatus& response) {
{
stdx::lock_guard<Latch> lg(_mutex);
auto requestToCompleteIt = _incompleteRequests.find(requestId);
@@ -411,7 +372,7 @@ void BalancerCommandsSchedulerImpl::_applyCommandResponse(
requestToComplete.setOutcome(response);
auto& commandInfo = requestToComplete.getCommandInfo();
if (commandInfo.requiresDistributedLock()) {
- _releaseDistLock(opCtx, commandInfo.getNameSpace());
+ _lockedReferencesToRelease.emplace_back(commandInfo.getNameSpace());
}
if (commandInfo.requiresRecoveryOnCrash()) {
_obsoleteRecoveryDocumentIds.push_back(requestId);
@@ -425,7 +386,6 @@ void BalancerCommandsSchedulerImpl::_applyCommandResponse(
_stateUpdatedCV.notify_all();
}
LOGV2(5847204,
- "Execution of balancer command request id {reqId} completed",
"Execution of balancer command request completed",
"reqId"_attr = requestId,
"response"_attr = response.toString());
@@ -434,61 +394,78 @@ void BalancerCommandsSchedulerImpl::_applyCommandResponse(
std::vector<RequestData> BalancerCommandsSchedulerImpl::_loadRequestsToRecover(
OperationContext* opCtx) {
std::vector<RequestData> requestsToRecover;
- auto documentProcessor = [&requestsToRecover](const BSONObj& commandToRecoverDoc) {
+ auto documentProcessor = [&requestsToRecover, opCtx](const BSONObj& commandToRecoverDoc) {
auto originalCommand = PersistedBalancerCommand::parse(
IDLParserErrorContext("BalancerCommandsScheduler"), commandToRecoverDoc);
auto recoveryCommand = std::make_shared<RecoveryCommandInfo>(originalCommand);
LOGV2(5847212,
- "Recovered request id {reqId}, which command will be rescheduled. Details: "
- "{command}",
"Command request recovered and set for rescheduling",
"reqId"_attr = originalCommand.getRequestId(),
- "command"_attr = recoveryCommand->serialise());
+ "command"_attr = redact(recoveryCommand->serialise()));
requestsToRecover.emplace_back(originalCommand.getRequestId(), std::move(recoveryCommand));
};
DBDirectClient dbClient(opCtx);
- dbClient.query(documentProcessor, NamespaceString::kConfigBalancerCommandsNamespace, BSONObj());
+ try {
+ dbClient.query(
+ documentProcessor, NamespaceString::kConfigBalancerCommandsNamespace, BSONObj());
+ } catch (const DBException& e) {
+ LOGV2(5847225, "Failed to load requests to recover", "error"_attr = redact(e));
+ }
+
return requestsToRecover;
}
-void BalancerCommandsSchedulerImpl::_cleanUpObsoleteRecoveryInfo(WithLock,
- OperationContext* opCtx) {
- if (_obsoleteRecoveryDocumentIds.empty()) {
- return;
- }
- BSONObjBuilder queryBuilder;
- if (_obsoleteRecoveryDocumentIds.size() == 1) {
- _obsoleteRecoveryDocumentIds[0].appendToBuilder(
- &queryBuilder, PersistedBalancerCommand::kRequestIdFieldName);
- } else {
- BSONObjBuilder requestIdClause(
- queryBuilder.subobjStart(PersistedBalancerCommand::kRequestIdFieldName));
- BSONArrayBuilder valuesBuilder(requestIdClause.subarrayStart("$in"));
- for (const auto& requestId : _obsoleteRecoveryDocumentIds) {
- requestId.appendToArrayBuilder(&valuesBuilder);
+void BalancerCommandsSchedulerImpl::_performDeferredCleanup(WithLock, OperationContext* opCtx) {
+ auto recoveryDocsDeleted = [this, opCtx] {
+ if (_obsoleteRecoveryDocumentIds.empty()) {
+ return false;
+ }
+ BSONObjBuilder queryBuilder;
+ if (_obsoleteRecoveryDocumentIds.size() == 1) {
+ _obsoleteRecoveryDocumentIds[0].appendToBuilder(
+ &queryBuilder, PersistedBalancerCommand::kRequestIdFieldName);
+ } else {
+ BSONObjBuilder requestIdClause(
+ queryBuilder.subobjStart(PersistedBalancerCommand::kRequestIdFieldName));
+ BSONArrayBuilder valuesBuilder(requestIdClause.subarrayStart("$in"));
+ for (const auto& requestId : _obsoleteRecoveryDocumentIds) {
+ requestId.appendToArrayBuilder(&valuesBuilder);
+ }
}
- }
- _obsoleteRecoveryDocumentIds.clear();
+ _obsoleteRecoveryDocumentIds.clear();
- auto query = queryBuilder.obj();
- DBDirectClient dbClient(opCtx);
+ auto query = queryBuilder.obj();
+ DBDirectClient dbClient(opCtx);
- auto reply = dbClient.removeAcknowledged(
- NamespaceString::kConfigBalancerCommandsNamespace.toString(), query);
+ auto reply = dbClient.removeAcknowledged(
+ NamespaceString::kConfigBalancerCommandsNamespace.toString(), query);
- LOGV2(5847211,
- "Clean up of obsolete document info performed with outcome {reply}",
- "Clean up of obsolete document info performed",
- "query"_attr = query,
- "reply"_attr = reply);
-}
+ LOGV2(5847211,
+ "Clean up of obsolete document info performed",
+ "query"_attr = query,
+ "reply"_attr = reply);
+ return true;
+ }();
+ auto distributedLocksReleased = [this, opCtx] {
+ if (_lockedReferencesToRelease.empty()) {
+ return false;
+ }
+ for (const auto& nss : _lockedReferencesToRelease) {
+ _distributedLocks.releaseFor(opCtx, nss);
+ }
+ _lockedReferencesToRelease.clear();
+ return true;
+ }();
+
+ if (recoveryDocsDeleted || distributedLocksReleased) {
+ deferredCleanupCompletedCheckpoint.pauseWhileSet();
+ }
+}
void BalancerCommandsSchedulerImpl::_workerThread() {
ON_BLOCK_EXIT([this] {
- invariant(_migrationLocks.empty(),
- "BalancerCommandsScheduler worker thread failed to release all locks on exit");
LOGV2(5847208, "Leaving balancer command scheduler thread");
stdx::lock_guard<Latch> lg(_mutex);
_state = SchedulerState::Stopped;
@@ -496,7 +473,6 @@ void BalancerCommandsSchedulerImpl::_workerThread() {
});
Client::initThread("BalancerCommandsScheduler");
- auto opCtxHolder = cc().makeOperationContext();
stdx::unordered_map<UUID, RequestData, UUID::Hash> requestsToCleanUpOnExit;
LOGV2(5847205, "Balancer scheduler thread started");
@@ -507,10 +483,14 @@ void BalancerCommandsSchedulerImpl::_workerThread() {
invariant(_state != SchedulerState::Stopped);
_stateUpdatedCV.wait(ul, [this, &ul] {
return (_state == SchedulerState::Stopping || _canSubmitNewRequests(ul) ||
- !_obsoleteRecoveryDocumentIds.empty());
+ _deferredCleanupRequired(ul));
});
- _cleanUpObsoleteRecoveryInfo(ul, opCtxHolder.get());
+ // Completed commands defer the clean up of acquired resources
+ {
+ auto opCtxHolder = cc().makeOperationContext();
+ _performDeferredCleanup(ul, opCtxHolder.get());
+ }
if (_state == SchedulerState::Stopping) {
_runningRequestIds.clear();
@@ -523,51 +503,61 @@ void BalancerCommandsSchedulerImpl::_workerThread() {
continue;
}
- // 1. Pick up new requests to be submitted from the pending list, if possible.
- const auto availableSubmissionSlots =
- static_cast<size_t>(_maxRunningRequests - _runningRequestIds.size());
- while (!_pendingRequestIds.empty() &&
- commandsToSubmit.size() < availableSubmissionSlots) {
- const auto& requestData = _incompleteRequests.at(_pendingRequestIds.front());
+ // 1. Pick up new requests to be submitted from the pending list
+ for (auto pendingRequestId : _pendingRequestIds) {
+ const auto& requestData = _incompleteRequests.at(pendingRequestId);
commandsToSubmit.push_back(requestData.getSubmissionInfo());
- _pendingRequestIds.pop_front();
}
+ _pendingRequestIds.clear();
}
// 2. Serve the picked up requests, submitting their related commands.
std::vector<CommandSubmissionResult> submissionResults;
for (auto& submissionInfo : commandsToSubmit) {
+ auto opCtxHolder = cc().makeOperationContext();
+ if (submissionInfo.commandInfo) {
+ submissionInfo.commandInfo.get()->attachOperationMetadataTo(opCtxHolder.get());
+ }
submissionResults.push_back(_submit(opCtxHolder.get(), submissionInfo));
if (!submissionResults.back().context.isOK()) {
LOGV2(5847206,
- "Submission for scheduler command request {reqId} failed: cause {cause}",
- "Submission for scheduler command request {reqId} failed",
+ "Submission for scheduler command request failed",
"reqId"_attr = submissionResults.back().id,
"cause"_attr = submissionResults.back().context.getStatus());
}
}
// 3. Process the outcome of each submission.
+ int numRunningRequests = 0;
+ int numPendingRequests = 0;
{
stdx::lock_guard<Latch> lg(_mutex);
for (auto& submissionResult : submissionResults) {
- _applySubmissionResult(lg, opCtxHolder.get(), std::move(submissionResult));
+ _applySubmissionResult(lg, std::move(submissionResult));
}
+ numRunningRequests = _runningRequestIds.size();
+ numPendingRequests = _pendingRequestIds.size();
}
- LOGV2(5847207, "Ending balancer command scheduler round");
+ LOGV2_DEBUG(5847207,
+ 1,
+ "Ending balancer command scheduler round",
+ "numRunningRequests"_attr = numRunningRequests,
+ "numPendingRequests"_attr = numPendingRequests);
}
// In case of clean exit, cancel all the pending/running command requests
// (but keep the related descriptor documents to ensure they will be reissued on recovery).
+ auto opCtxHolder = cc().makeOperationContext();
auto executor = Grid::get(opCtxHolder.get())->getExecutorPool()->getFixedExecutor();
for (auto& idAndRequest : requestsToCleanUpOnExit) {
idAndRequest.second.setOutcome(Status(
- ErrorCodes::CallbackCanceled, "Request cancelled - balancer scheduler is stopping"));
+ ErrorCodes::BalancerInterrupted, "Request cancelled - balancer scheduler is stopping"));
const auto& cancelHandle = idAndRequest.second.getExecutionContext();
if (cancelHandle) {
executor->cancel(*cancelHandle);
}
- _releaseDistLock(opCtxHolder.get(), idAndRequest.second.getCommandInfo().getNameSpace());
+ _distributedLocks.releaseFor(opCtxHolder.get(),
+ idAndRequest.second.getCommandInfo().getNameSpace());
}
}
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h
index 62c8f4b2b82..79fc7aac793 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h
@@ -29,15 +29,14 @@
#pragma once
-#include <list>
-#include <unordered_map>
-
#include "mongo/db/s/balancer/balancer_command_document_gen.h"
#include "mongo/db/s/balancer/balancer_commands_scheduler.h"
-#include "mongo/db/s/dist_lock_manager.h"
+#include "mongo/db/s/balancer/balancer_dist_locks.h"
+#include "mongo/db/s/forwardable_operation_metadata.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/mutex.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/client/shard.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/concurrency/notification.h"
@@ -91,7 +90,12 @@ public:
if (!response.status.isOK()) {
return response.status;
}
- return getStatusFromCommandResult(response.data);
+ auto remoteStatus = getStatusFromCommandResult(response.data);
+ return Shard::shouldErrorBePropagated(remoteStatus.code())
+ ? remoteStatus
+ : Status(ErrorCodes::OperationFailed,
+ str::stream() << "Command request" << getRequestId().toString()
+ << "failed on source shard." << causedBy(remoteStatus));
}
executor::RemoteCommandResponse getRemoteResponse() {
@@ -244,6 +248,18 @@ public:
}
};
+/**
+ * Utility class to extract and hold information describing the remote client that submitted a
+ * command.
+ */
+struct ExternalClientInfo {
+ ExternalClientInfo(OperationContext* opCtx)
+ : operationMetadata(opCtx), apiParameters(APIParameters::get(opCtx)) {}
+
+ const ForwardableOperationMetadata operationMetadata;
+ const APIParameters apiParameters;
+};
+
/**
* Base class describing the common traits of a shard command associated to a Request
@@ -251,8 +267,10 @@ public:
*/
class CommandInfo {
public:
- CommandInfo(const ShardId& targetShardId, const NamespaceString& nss)
- : _targetShardId(targetShardId), _nss(nss) {}
+ CommandInfo(const ShardId& targetShardId,
+ const NamespaceString& nss,
+ boost::optional<ExternalClientInfo>&& clientInfo)
+ : _targetShardId(targetShardId), _nss(nss), _clientInfo(clientInfo) {}
virtual ~CommandInfo() {}
@@ -274,9 +292,22 @@ public:
return _nss;
}
+ void attachOperationMetadataTo(OperationContext* opCtx) {
+ if (_clientInfo) {
+ _clientInfo.get().operationMetadata.setOn(opCtx);
+ }
+ }
+
+ void appendCommandMetadataTo(BSONObjBuilder* commandBuilder) const {
+ if (_clientInfo && _clientInfo.get().apiParameters.getParamsPassed()) {
+ _clientInfo.get().apiParameters.appendInfo(commandBuilder);
+ }
+ }
+
private:
ShardId _targetShardId;
NamespaceString _nss;
+ boost::optional<ExternalClientInfo> _clientInfo;
};
/**
@@ -293,8 +324,9 @@ public:
const MigrationSecondaryThrottleOptions& secondaryThrottle,
bool waitForDelete,
MoveChunkRequest::ForceJumbo forceJumbo,
- const ChunkVersion& version)
- : CommandInfo(origin, nss),
+ const ChunkVersion& version,
+ boost::optional<ExternalClientInfo>&& clientInfo)
+ : CommandInfo(origin, nss, std::move(clientInfo)),
_chunkBoundaries(lowerBoundKey, upperBoundKey),
_recipient(recipient),
_version(version),
@@ -315,6 +347,7 @@ public:
_secondaryThrottle,
_waitForDelete,
_forceJumbo);
+ appendCommandMetadataTo(&commandBuilder);
return commandBuilder.obj();
}
@@ -343,7 +376,7 @@ public:
const BSONObj& lowerBoundKey,
const BSONObj& upperBoundKey,
const ChunkVersion& version)
- : CommandInfo(shardId, nss),
+ : CommandInfo(shardId, nss, boost::none),
_lowerBoundKey(lowerBoundKey),
_upperBoundKey(upperBoundKey),
_version(version) {}
@@ -386,7 +419,7 @@ public:
boost::optional<long long> maxChunkObjects,
boost::optional<long long> maxChunkSizeBytes,
bool force)
- : CommandInfo(shardId, nss),
+ : CommandInfo(shardId, nss, boost::none),
_shardKeyPattern(shardKeyPattern),
_lowerBoundKey(lowerBoundKey),
_upperBoundKey(upperBoundKey),
@@ -443,7 +476,7 @@ public:
const BSONObj& upperBoundKey,
bool estimatedValue,
const ChunkVersion& version)
- : CommandInfo(shardId, nss),
+ : CommandInfo(shardId, nss, boost::none),
_shardKeyPattern(shardKeyPattern),
_lowerBoundKey(lowerBoundKey),
_upperBoundKey(upperBoundKey),
@@ -487,7 +520,7 @@ public:
const BSONObj& upperBoundKey,
const ChunkVersion& version,
const std::vector<BSONObj>& splitPoints)
- : CommandInfo(shardId, nss),
+ : CommandInfo(shardId, nss, boost::none),
_shardKeyPattern(shardKeyPattern),
_lowerBoundKey(lowerBoundKey),
_upperBoundKey(upperBoundKey),
@@ -525,7 +558,7 @@ private:
class RecoveryCommandInfo : public CommandInfo {
public:
RecoveryCommandInfo(const PersistedBalancerCommand& persistedCommand)
- : CommandInfo(persistedCommand.getTarget(), persistedCommand.getNss()),
+ : CommandInfo(persistedCommand.getTarget(), persistedCommand.getNss(), boost::none),
_serialisedCommand(persistedCommand.getRemoteCommand()),
_requiresDistributedLock(persistedCommand.getRequiresDistributedLock()) {}
@@ -657,12 +690,12 @@ public:
void stop() override;
- std::unique_ptr<MoveChunkResponse> requestMoveChunk(
- OperationContext* opCtx,
- const NamespaceString& nss,
- const ChunkType& chunk,
- const ShardId& destination,
- const MoveChunkSettings& commandSettings) override;
+ std::unique_ptr<MoveChunkResponse> requestMoveChunk(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkType& chunk,
+ const ShardId& destination,
+ const MoveChunkSettings& commandSettings,
+ bool issuedByRemoteUser) override;
std::unique_ptr<MergeChunksResponse> requestMergeChunks(OperationContext* opCtx,
const NamespaceString& nss,
@@ -695,9 +728,10 @@ public:
private:
enum class SchedulerState { Recovering, Running, Stopping, Stopped };
- static const int32_t _maxRunningRequests{10};
-
+ // Protects the in-memory state of the Scheduler
Mutex _mutex = MONGO_MAKE_LATCH("BalancerCommandsSchedulerImpl::_mutex");
+
+ // Ensures that concurrent calls to start() and stop() get serialised
Mutex _startStopMutex = MONGO_MAKE_LATCH("BalancerCommandsSchedulerImpl::_startStopMutex");
SchedulerState _state{SchedulerState::Stopped};
@@ -724,16 +758,13 @@ private:
std::vector<UUID> _obsoleteRecoveryDocumentIds;
+ std::vector<NamespaceString> _lockedReferencesToRelease;
+
/**
- * State to acquire and release DistLocks on a per namespace basis
+ * Centralised accessor for all the distributed locks required by the Scheduler.
+ * Only _workerThread() is supposed to interact with this class.
*/
- struct Migrations {
- Migrations(DistLockManager::ScopedLock lock) : lock(std::move(lock)), numMigrations(1) {}
-
- DistLockManager::ScopedLock lock;
- int numMigrations;
- };
- stdx::unordered_map<NamespaceString, Migrations> _migrationLocks;
+ BalancerDistLocks _distributedLocks;
ResponseHandle _buildAndEnqueueNewRequest(OperationContext* opCtx,
std::shared_ptr<CommandInfo>&& commandInfo);
@@ -742,24 +773,19 @@ private:
bool _canSubmitNewRequests(WithLock);
- Status _acquireDistLock(OperationContext* opCtx, NamespaceString nss);
+ bool _deferredCleanupRequired(WithLock);
- void _releaseDistLock(OperationContext* opCtx, NamespaceString nss);
+ void _performDeferredCleanup(WithLock, OperationContext* opCtx);
CommandSubmissionResult _submit(OperationContext* opCtx, const CommandSubmissionHandle& data);
- void _applySubmissionResult(WithLock,
- OperationContext* opCtx,
- CommandSubmissionResult&& submissionResult);
+ void _applySubmissionResult(WithLock, CommandSubmissionResult&& submissionResult);
- void _applyCommandResponse(OperationContext* opCtx,
- UUID requestId,
+ void _applyCommandResponse(UUID requestId,
const executor::TaskExecutor::ResponseStatus& response);
std::vector<RequestData> _loadRequestsToRecover(OperationContext* opCtx);
- void _cleanUpObsoleteRecoveryInfo(WithLock, OperationContext* opCtx);
-
void _workerThread();
};
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
index 41db6025b56..51ea5d95995 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
@@ -123,16 +123,25 @@ TEST_F(BalancerCommandsSchedulerTest, ResilientToMultipleStarts) {
}
TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveChunkCommand) {
+ auto deferredCleanupCompletedCheckpoint =
+ globalFailPointRegistry().find("deferredCleanupCompletedCheckpoint");
+ auto timesEnteredFailPoint =
+ deferredCleanupCompletedCheckpoint->setMode(FailPoint::alwaysOn, 0);
_scheduler.start(operationContext());
ChunkType moveChunk = makeChunk(0, kShardId0);
auto networkResponseFuture = launchAsync([&]() {
onCommand(
[&](const executor::RemoteCommandRequest& request) { return BSON("ok" << true); });
});
- auto resp = _scheduler.requestMoveChunk(
- operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings());
+ auto resp = _scheduler.requestMoveChunk(operationContext(),
+ kNss,
+ moveChunk,
+ ShardId(kShardId1),
+ getDefaultMoveChunkSettings(),
+ false /* issuedByRemoteUser */);
ASSERT_OK(resp->getOutcome());
networkResponseFuture.default_timed_get();
+ deferredCleanupCompletedCheckpoint->waitForTimesEntered(timesEnteredFailPoint + 1);
// Ensure DistLock is released correctly
{
auto opCtx = Client::getCurrent()->getOperationContext();
@@ -142,6 +151,7 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveChunkCommand) {
opCtx, kNss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout);
ASSERT_OK(scopedDistLock.getStatus());
}
+ deferredCleanupCompletedCheckpoint->setMode(FailPoint::off, 0);
_scheduler.stop();
}
@@ -228,7 +238,7 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulRequestChunkDataSizeCommand) {
chunk.getRange(),
chunk.getVersion(),
KeyPattern(BSON("x" << 1)),
- false);
+ false /* issuedByRemoteUser */);
ASSERT_OK(resp->getOutcome());
ASSERT_OK(resp->getSize().getStatus());
ASSERT_EQ(resp->getSize().getValue(), 156);
@@ -239,16 +249,27 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulRequestChunkDataSizeCommand) {
}
TEST_F(BalancerCommandsSchedulerTest, CommandFailsWhenNetworkReturnsError) {
+ auto deferredCleanupCompletedCheckpoint =
+ globalFailPointRegistry().find("deferredCleanupCompletedCheckpoint");
+ auto timesEnteredFailPoint =
+ deferredCleanupCompletedCheckpoint->setMode(FailPoint::alwaysOn, 0);
+
_scheduler.start(operationContext());
ChunkType moveChunk = makeChunk(0, kShardId0);
auto timeoutError = Status{ErrorCodes::NetworkTimeout, "Mock error: network timed out"};
auto networkResponseFuture = launchAsync([&]() {
onCommand([&](const executor::RemoteCommandRequest& request) { return timeoutError; });
});
- auto resp = _scheduler.requestMoveChunk(
- operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings());
+ auto resp = _scheduler.requestMoveChunk(operationContext(),
+ kNss,
+ moveChunk,
+ ShardId(kShardId1),
+ getDefaultMoveChunkSettings(),
+ false /* issuedByRemoteUser */);
ASSERT_EQUALS(resp->getOutcome(), timeoutError);
networkResponseFuture.default_timed_get();
+ deferredCleanupCompletedCheckpoint->waitForTimesEntered(timesEnteredFailPoint + 1);
+
// Ensure DistLock is released correctly
{
auto opCtx = Client::getCurrent()->getOperationContext();
@@ -258,16 +279,21 @@ TEST_F(BalancerCommandsSchedulerTest, CommandFailsWhenNetworkReturnsError) {
opCtx, kNss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout);
ASSERT_OK(scopedDistLock.getStatus());
}
+ deferredCleanupCompletedCheckpoint->setMode(FailPoint::off, 0);
_scheduler.stop();
}
TEST_F(BalancerCommandsSchedulerTest, CommandFailsWhenSchedulerIsStopped) {
ChunkType moveChunk = makeChunk(0, kShardId0);
- auto resp = _scheduler.requestMoveChunk(
- operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings());
- ASSERT_EQUALS(
- resp->getOutcome(),
- Status(ErrorCodes::CallbackCanceled, "Request rejected - balancer scheduler is stopped"));
+ auto resp = _scheduler.requestMoveChunk(operationContext(),
+ kNss,
+ moveChunk,
+ ShardId(kShardId1),
+ getDefaultMoveChunkSettings(),
+ false /* issuedByRemoteUser */);
+ ASSERT_EQUALS(resp->getOutcome(),
+ Status(ErrorCodes::BalancerInterrupted,
+ "Request rejected - balancer scheduler is stopped"));
// Ensure DistLock is not taken
{
auto opCtx = Client::getCurrent()->getOperationContext();
@@ -285,13 +311,17 @@ TEST_F(BalancerCommandsSchedulerTest, CommandCanceledIfBalancerStops) {
FailPointEnableBlock failPoint("pauseBalancerWorkerThread");
_scheduler.start(operationContext());
ChunkType moveChunk = makeChunk(0, kShardId0);
- resp = _scheduler.requestMoveChunk(
- operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings());
+ resp = _scheduler.requestMoveChunk(operationContext(),
+ kNss,
+ moveChunk,
+ ShardId(kShardId1),
+ getDefaultMoveChunkSettings(),
+ false /* issuedByRemoteUser */);
_scheduler.stop();
}
- ASSERT_EQUALS(
- resp->getOutcome(),
- Status(ErrorCodes::CallbackCanceled, "Request cancelled - balancer scheduler is stopping"));
+ ASSERT_EQUALS(resp->getOutcome(),
+ Status(ErrorCodes::BalancerInterrupted,
+ "Request cancelled - balancer scheduler is stopping"));
// Ensure DistLock is released correctly
{
auto opCtx = Client::getCurrent()->getOperationContext();
@@ -312,8 +342,12 @@ TEST_F(BalancerCommandsSchedulerTest, MoveChunkCommandGetsPersistedOnDiskWhenReq
ChunkType moveChunk = makeChunk(0, kShardId0);
auto requestSettings = getDefaultMoveChunkSettings();
- auto deferredResponse = _scheduler.requestMoveChunk(
- operationContext(), kNss, moveChunk, ShardId(kShardId1), requestSettings);
+ auto deferredResponse = _scheduler.requestMoveChunk(operationContext(),
+ kNss,
+ moveChunk,
+ ShardId(kShardId1),
+ requestSettings,
+ false /* issuedByRemoteUser */);
// The command is persisted...
auto persistedCommandDocs = getPersistedCommandDocuments(opCtx);
@@ -334,7 +368,8 @@ TEST_F(BalancerCommandsSchedulerTest, MoveChunkCommandGetsPersistedOnDiskWhenReq
requestSettings.secondaryThrottle,
requestSettings.waitForDelete,
requestSettings.forceJumbo,
- moveChunk.getVersion());
+ moveChunk.getVersion(),
+ boost::none);
ASSERT_BSONOBJ_EQ(originalCommandInfo.serialise(), persistedCommand.getRemoteCommand());
}
@@ -356,7 +391,8 @@ TEST_F(BalancerCommandsSchedulerTest, PersistedCommandsAreReissuedWhenRecovering
requestSettings.secondaryThrottle,
requestSettings.waitForDelete,
requestSettings.forceJumbo,
- moveChunk.getVersion());
+ moveChunk.getVersion(),
+ boost::none);
// 4. ... Which is inspected here.
ASSERT_BSONOBJ_EQ(originalCommandInfo.serialise(), request.cmdObj);
@@ -364,15 +400,19 @@ TEST_F(BalancerCommandsSchedulerTest, PersistedCommandsAreReissuedWhenRecovering
});
});
- auto resp = _scheduler.requestMoveChunk(
- operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings());
+ auto resp = _scheduler.requestMoveChunk(operationContext(),
+ kNss,
+ moveChunk,
+ ShardId(kShardId1),
+ getDefaultMoveChunkSettings(),
+ false /* issuedByRemoteUser */);
_scheduler.stop();
failpoint->setMode(FailPoint::Mode::off);
// 1. The original submission is expected to fail...
- ASSERT_EQUALS(
- resp->getOutcome(),
- Status(ErrorCodes::CallbackCanceled, "Request cancelled - balancer scheduler is stopping"));
+ ASSERT_EQUALS(resp->getOutcome(),
+ Status(ErrorCodes::BalancerInterrupted,
+ "Request cancelled - balancer scheduler is stopping"));
// 2. ... And a recovery document to be persisted
auto persistedCommandDocs = getPersistedCommandDocuments(operationContext());
@@ -403,8 +443,12 @@ TEST_F(BalancerCommandsSchedulerTest, DistLockPreventsMoveChunkWithConcurrentDDL
ASSERT_OK(scopedDistLock.getStatus());
failpoint->setMode(FailPoint::Mode::off);
ChunkType moveChunk = makeChunk(0, kShardId0);
- auto resp = _scheduler.requestMoveChunk(
- operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings());
+ auto resp = _scheduler.requestMoveChunk(operationContext(),
+ kNss,
+ moveChunk,
+ ShardId(kShardId1),
+ getDefaultMoveChunkSettings(),
+ false /* issuedByRemoteUser */);
ASSERT_EQ(
resp->getOutcome(),
Status(ErrorCodes::LockBusy, "Failed to acquire dist lock testDb.testColl locally"));
diff --git a/src/mongo/db/s/balancer/balancer_dist_locks.cpp b/src/mongo/db/s/balancer/balancer_dist_locks.cpp
new file mode 100644
index 00000000000..a7384f2d1eb
--- /dev/null
+++ b/src/mongo/db/s/balancer/balancer_dist_locks.cpp
@@ -0,0 +1,78 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/s/balancer/balancer_dist_locks.h"
+#include "mongo/db/operation_context.h"
+
+namespace mongo {
+
+BalancerDistLocks::~BalancerDistLocks() {
+ invariant(_distLocksByCollection.empty(),
+ "Attempting to destroy the keychain while still holding distributed locks");
+}
+
+Status BalancerDistLocks::acquireFor(OperationContext* opCtx, const NamespaceString& nss) {
+ auto it = _distLocksByCollection.find(nss);
+ if (it != _distLocksByCollection.end()) {
+ ++it->second.references;
+ return Status::OK();
+ } else {
+ boost::optional<DistLockManager::ScopedLock> scopedLock;
+ try {
+ scopedLock.emplace(DistLockManager::get(opCtx)->lockDirectLocally(
+ opCtx, nss.ns(), DistLockManager::kSingleLockAttemptTimeout));
+
+ const std::string whyMessage(str::stream()
+ << "Migrating chunk(s) in collection " << nss.ns());
+ uassertStatusOK(DistLockManager::get(opCtx)->lockDirect(
+ opCtx, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout));
+ } catch (const DBException& ex) {
+ return ex.toStatus(str::stream() << "Could not acquire collection lock for " << nss.ns()
+ << " to migrate chunks");
+ }
+ ReferenceCountedLock refCountedLock(std::move(*scopedLock));
+ _distLocksByCollection.insert(std::make_pair(nss, std::move(refCountedLock)));
+ }
+ return Status::OK();
+}
+
+void BalancerDistLocks::releaseFor(OperationContext* opCtx, const NamespaceString& nss) {
+ auto it = _distLocksByCollection.find(nss);
+ if (it == _distLocksByCollection.end()) {
+ return;
+ } else if (it->second.references == 1) {
+ DistLockManager::get(opCtx)->unlock(opCtx, nss.ns());
+ _distLocksByCollection.erase(it);
+ } else {
+ --it->second.references;
+ }
+}
+
+
+} // namespace mongo
diff --git a/src/mongo/db/s/balancer/balancer_dist_locks.h b/src/mongo/db/s/balancer/balancer_dist_locks.h
new file mode 100644
index 00000000000..891d5a4e323
--- /dev/null
+++ b/src/mongo/db/s/balancer/balancer_dist_locks.h
@@ -0,0 +1,66 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/s/dist_lock_manager.h"
+
+namespace mongo {
+
+/**
+ * Utility class for centralising the control of any distributed lock required by a client.
+ * Its usage is not thread-safe.
+ */
+class BalancerDistLocks {
+
+public:
+ BalancerDistLocks() = default;
+
+ ~BalancerDistLocks();
+
+ Status acquireFor(OperationContext* opCtx, const NamespaceString& nss);
+
+ void releaseFor(OperationContext* opCtx, const NamespaceString& nss);
+
+private:
+ struct ReferenceCountedLock {
+ ReferenceCountedLock(DistLockManager::ScopedLock&& lock)
+ : lock(std::move(lock)), references(1) {}
+
+ DistLockManager::ScopedLock lock;
+ int references;
+ };
+
+ stdx::unordered_map<NamespaceString, ReferenceCountedLock> _distLocksByCollection;
+
+ BalancerDistLocks(const BalancerDistLocks&) = delete;
+ BalancerDistLocks& operator=(const BalancerDistLocks&) = delete;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/s/balancer/migration_manager.cpp b/src/mongo/db/s/balancer/migration_manager.cpp
deleted file mode 100644
index 040615acd6a..00000000000
--- a/src/mongo/db/s/balancer/migration_manager.cpp
+++ /dev/null
@@ -1,709 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/s/balancer/migration_manager.h"
-
-#include "mongo/bson/simple_bsonobj_comparator.h"
-#include "mongo/bson/util/bson_extract.h"
-#include "mongo/client/remote_command_targeter.h"
-#include "mongo/db/client.h"
-#include "mongo/db/repl/repl_set_config.h"
-#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/s/balancer/scoped_migration_request.h"
-#include "mongo/db/s/dist_lock_manager.h"
-#include "mongo/executor/task_executor_pool.h"
-#include "mongo/logv2/log.h"
-#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/s/catalog_cache.h"
-#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/grid.h"
-#include "mongo/s/request_types/move_chunk_request.h"
-#include "mongo/util/net/hostandport.h"
-#include "mongo/util/scopeguard.h"
-
-namespace mongo {
-namespace {
-
-using executor::RemoteCommandRequest;
-using executor::RemoteCommandResponse;
-
-const char kChunkTooBig[] = "chunkTooBig"; // TODO: delete in 3.8
-
-/**
- * Parses the 'commandResponse' and converts it to a status to use as the outcome of the command.
- * Preserves backwards compatibility with 3.4 and earlier shards that, rather than use a ChunkTooBig
- * error code, include an extra field in the response.
- *
- * TODO: Delete in 3.8
- */
-Status extractMigrationStatusFromCommandResponse(const BSONObj& commandResponse) {
- Status commandStatus = getStatusFromCommandResult(commandResponse);
-
- if (!commandStatus.isOK()) {
- bool chunkTooBig = false;
- bsonExtractBooleanFieldWithDefault(commandResponse, kChunkTooBig, false, &chunkTooBig)
- .transitional_ignore();
- if (chunkTooBig) {
- commandStatus = {ErrorCodes::ChunkTooBig, commandStatus.reason()};
- }
- }
-
- return commandStatus;
-}
-
-/**
- * Returns whether the specified status is an error caused by stepdown of the primary config node
- * currently running the balancer.
- */
-bool isErrorDueToConfigStepdown(Status status, bool isStopping) {
- return ((status == ErrorCodes::CallbackCanceled && isStopping) ||
- status == ErrorCodes::BalancerInterrupted ||
- status == ErrorCodes::InterruptedDueToReplStateChange);
-}
-
-} // namespace
-
-MigrationManager::MigrationManager(ServiceContext* serviceContext)
- : _serviceContext(serviceContext) {}
-
-MigrationManager::~MigrationManager() {
- // The migration manager must be completely quiesced at destruction time
- invariant(_activeMigrations.empty());
-}
-
-MigrationStatuses MigrationManager::executeMigrationsForAutoBalance(
- OperationContext* opCtx,
- const std::vector<MigrateInfo>& migrateInfos,
- uint64_t maxChunkSizeBytes,
- const MigrationSecondaryThrottleOptions& secondaryThrottle,
- bool waitForDelete) {
-
- MigrationStatuses migrationStatuses;
-
- ScopedMigrationRequestsMap scopedMigrationRequests;
- std::vector<std::pair<std::shared_ptr<Notification<RemoteCommandResponse>>, MigrateInfo>>
- responses;
-
- for (const auto& migrateInfo : migrateInfos) {
- responses.emplace_back(_schedule(opCtx,
- migrateInfo,
- maxChunkSizeBytes,
- secondaryThrottle,
- waitForDelete,
- &scopedMigrationRequests),
- migrateInfo);
- }
-
- // Wait for all the scheduled migrations to complete.
- for (auto& response : responses) {
- auto notification = std::move(response.first);
- auto migrateInfo = std::move(response.second);
-
- const auto& remoteCommandResponse = notification->get();
- const auto migrationInfoName = migrateInfo.getName();
-
- auto it = scopedMigrationRequests.find(migrationInfoName);
- if (it == scopedMigrationRequests.end()) {
- invariant(!remoteCommandResponse.status.isOK());
- migrationStatuses.emplace(migrationInfoName, std::move(remoteCommandResponse.status));
- continue;
- }
-
- auto statusWithScopedMigrationRequest = std::move(it->second);
-
- if (!statusWithScopedMigrationRequest.isOK()) {
- invariant(!remoteCommandResponse.status.isOK());
- migrationStatuses.emplace(migrationInfoName,
- std::move(statusWithScopedMigrationRequest.getStatus()));
- continue;
- }
-
- Status commandStatus = _processRemoteCommandResponse(
- remoteCommandResponse, &statusWithScopedMigrationRequest.getValue());
- migrationStatuses.emplace(migrationInfoName, std::move(commandStatus));
- }
-
- invariant(migrationStatuses.size() == migrateInfos.size());
-
- return migrationStatuses;
-}
-
-Status MigrationManager::executeManualMigration(
- OperationContext* opCtx,
- const MigrateInfo& migrateInfo,
- uint64_t maxChunkSizeBytes,
- const MigrationSecondaryThrottleOptions& secondaryThrottle,
- bool waitForDelete) {
-
- _waitForRecovery();
-
- ScopedMigrationRequestsMap scopedMigrationRequests;
-
- RemoteCommandResponse remoteCommandResponse = _schedule(opCtx,
- migrateInfo,
- maxChunkSizeBytes,
- secondaryThrottle,
- waitForDelete,
- &scopedMigrationRequests)
- ->get();
-
- auto swCM = Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(
- opCtx, migrateInfo.nss);
- if (!swCM.isOK()) {
- return swCM.getStatus();
- }
-
- const auto& cm = swCM.getValue();
-
- const auto chunk = cm.findIntersectingChunkWithSimpleCollation(migrateInfo.minKey);
-
- Status commandStatus = remoteCommandResponse.status;
-
- const auto migrationInfoName = migrateInfo.getName();
-
- auto it = scopedMigrationRequests.find(migrationInfoName);
-
- if (it != scopedMigrationRequests.end()) {
- invariant(scopedMigrationRequests.size() == 1);
- auto statusWithScopedMigrationRequest = &it->second;
- commandStatus = _processRemoteCommandResponse(
- remoteCommandResponse, &statusWithScopedMigrationRequest->getValue());
- }
-
- // Migration calls can be interrupted after the metadata is committed but before the command
- // finishes the waitForDelete stage. Any failovers, therefore, must always cause the moveChunk
- // command to be retried so as to assure that the waitForDelete promise of a successful command
- // has been fulfilled.
- if (chunk.getShardId() == migrateInfo.to && commandStatus != ErrorCodes::BalancerInterrupted) {
- return Status::OK();
- }
-
- return commandStatus;
-}
-
-void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* opCtx) {
- {
- stdx::lock_guard<Latch> lock(_mutex);
- invariant(_state == State::kStopped);
- invariant(_migrationRecoveryMap.empty());
- _state = State::kRecovering;
- }
-
- ScopeGuard scopedGuard([&] {
- _migrationRecoveryMap.clear();
- _abandonActiveMigrationsAndEnableManager(opCtx);
- });
-
- // Load the active migrations from the config.migrations collection.
- auto statusWithMigrationsQueryResponse =
- Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
- opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- repl::ReadConcernLevel::kLocalReadConcern,
- MigrationType::ConfigNS,
- BSONObj(),
- BSONObj(),
- boost::none);
-
- if (!statusWithMigrationsQueryResponse.isOK()) {
- LOGV2(21896,
- "Unable to read config.migrations collection documents for balancer migration "
- "recovery. Abandoning balancer recovery: {error}",
- "Unable to read config.migrations documents for balancer migration recovery",
- "error"_attr = redact(statusWithMigrationsQueryResponse.getStatus()));
- return;
- }
-
- for (const BSONObj& migration : statusWithMigrationsQueryResponse.getValue().docs) {
- auto statusWithMigrationType = MigrationType::fromBSON(migration);
- if (!statusWithMigrationType.isOK()) {
- // The format of this migration document is incorrect. The balancer holds a distlock for
- // this migration, but without parsing the migration document we cannot identify which
- // distlock must be released. So we must release all distlocks.
- LOGV2(21897,
- "Unable to parse config.migrations document '{migration}' for balancer"
- "migration recovery. Abandoning balancer recovery: {error}",
- "Unable to parse config.migrations document for balancer migration recovery",
- "migration"_attr = redact(migration.toString()),
- "error"_attr = redact(statusWithMigrationType.getStatus()));
- return;
- }
- MigrationType migrateType = std::move(statusWithMigrationType.getValue());
-
- auto it = _migrationRecoveryMap.find(NamespaceString(migrateType.getNss()));
- if (it == _migrationRecoveryMap.end()) {
- std::list<MigrationType> list;
- it = _migrationRecoveryMap.insert(std::make_pair(migrateType.getNss(), list)).first;
-
- // Reacquire the matching distributed lock for this namespace.
- const std::string whyMessage(str::stream() << "Migrating chunk(s) in collection "
- << migrateType.getNss().ns());
-
- auto status = DistLockManager::get(opCtx)->tryLockDirectWithLocalWriteConcern(
- opCtx, migrateType.getNss().ns(), whyMessage);
- if (!status.isOK()) {
- LOGV2(21898,
- "Failed to acquire distributed lock for collection {namespace} "
- "during balancer recovery of an active migration. Abandoning balancer "
- "recovery: {error}",
- "Failed to acquire distributed lock for collection "
- "during balancer recovery of an active migration",
- "namespace"_attr = migrateType.getNss().ns(),
- "error"_attr = redact(status));
- return;
- }
- }
-
- it->second.push_back(std::move(migrateType));
- }
-
- scopedGuard.dismiss();
-}
-
-void MigrationManager::finishRecovery(OperationContext* opCtx,
- uint64_t maxChunkSizeBytes,
- const MigrationSecondaryThrottleOptions& secondaryThrottle) {
- {
- stdx::lock_guard<Latch> lock(_mutex);
- if (_state == State::kStopping) {
- _migrationRecoveryMap.clear();
- return;
- }
-
- // If recovery was abandoned in startRecovery, then there is no more to do.
- if (_state == State::kEnabled) {
- invariant(_migrationRecoveryMap.empty());
- return;
- }
-
- invariant(_state == State::kRecovering);
- }
-
- ScopeGuard scopedGuard([&] {
- _migrationRecoveryMap.clear();
- _abandonActiveMigrationsAndEnableManager(opCtx);
- });
-
- // Schedule recovered migrations.
- std::vector<ScopedMigrationRequest> scopedMigrationRequests;
- std::vector<std::shared_ptr<Notification<RemoteCommandResponse>>> responses;
-
- for (auto& nssAndMigrateInfos : _migrationRecoveryMap) {
- auto& nss = nssAndMigrateInfos.first;
- auto& migrateInfos = nssAndMigrateInfos.second;
- invariant(!migrateInfos.empty());
-
- auto swCM = Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(
- opCtx, nss);
- if (!swCM.isOK()) {
- // This shouldn't happen because the collection was intact and sharded when the previous
- // config primary was active and the dist locks have been held by the balancer
- // throughout. Abort migration recovery.
- LOGV2(21899,
- "Unable to reload chunk metadata for collection {namespace} during balancer "
- "recovery. Abandoning recovery: {error}",
- "Unable to reload chunk metadata for collection during balancer recovery",
- "namespace"_attr = nss,
- "error"_attr = redact(swCM.getStatus()));
- return;
- }
-
- const auto& cm = swCM.getValue();
- int scheduledMigrations = 0;
-
- while (!migrateInfos.empty()) {
- auto migrationType = std::move(migrateInfos.front());
- const auto migrationInfo = migrationType.toMigrateInfo(cm.getUUID());
- auto waitForDelete = migrationType.getWaitForDelete();
- migrateInfos.pop_front();
-
- try {
- const auto chunk =
- cm.findIntersectingChunkWithSimpleCollation(migrationInfo.minKey);
-
- if (chunk.getShardId() != migrationInfo.from) {
- // Chunk is no longer on the source shard specified by this migration. Erase the
- // migration recovery document associated with it.
- ScopedMigrationRequest::createForRecovery(opCtx, nss, migrationInfo.minKey);
- continue;
- }
- } catch (const ExceptionFor<ErrorCodes::ShardKeyNotFound>&) {
- // The shard key for the collection has changed.
- // Abandon this migration and remove the document associated with it.
- ScopedMigrationRequest::createForRecovery(opCtx, nss, migrationInfo.minKey);
- continue;
- }
-
- scopedMigrationRequests.emplace_back(
- ScopedMigrationRequest::createForRecovery(opCtx, nss, migrationInfo.minKey));
-
- scheduledMigrations++;
-
- responses.emplace_back(_schedule(opCtx,
- migrationInfo,
- maxChunkSizeBytes,
- secondaryThrottle,
- waitForDelete,
- nullptr));
- }
-
- // If no migrations were scheduled for this namespace, free the dist lock
- if (!scheduledMigrations) {
- DistLockManager::get(opCtx)->unlock(opCtx, nss.ns());
- }
- }
-
- _migrationRecoveryMap.clear();
- scopedGuard.dismiss();
-
- {
- stdx::lock_guard<Latch> lock(_mutex);
- if (_state == State::kRecovering) {
- _state = State::kEnabled;
- _condVar.notify_all();
- }
- }
-
- // Wait for each migration to finish, as usual.
- for (auto& response : responses) {
- response->get();
- }
-}
-
-void MigrationManager::interruptAndDisableMigrations() {
- auto executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor();
-
- stdx::lock_guard<Latch> lock(_mutex);
- invariant(_state == State::kEnabled || _state == State::kRecovering);
- _state = State::kStopping;
-
- // Interrupt any active migrations with dist lock
- for (auto& cmsEntry : _activeMigrations) {
- auto& migrations = cmsEntry.second.migrationsList;
-
- for (auto& migration : migrations) {
- if (migration.callbackHandle) {
- executor->cancel(*migration.callbackHandle);
- }
- }
- }
-
- _checkDrained(lock);
-}
-
-void MigrationManager::drainActiveMigrations() {
- stdx::unique_lock<Latch> lock(_mutex);
-
- if (_state == State::kStopped)
- return;
- invariant(_state == State::kStopping);
- _condVar.wait(lock, [this] { return _activeMigrations.empty(); });
- _state = State::kStopped;
-}
-
-std::shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule(
- OperationContext* opCtx,
- const MigrateInfo& migrateInfo,
- uint64_t maxChunkSizeBytes,
- const MigrationSecondaryThrottleOptions& secondaryThrottle,
- bool waitForDelete,
- ScopedMigrationRequestsMap* scopedMigrationRequests) {
-
- // Ensure we are not stopped in order to avoid doing the extra work
- {
- stdx::lock_guard<Latch> lock(_mutex);
- if (_state != State::kEnabled && _state != State::kRecovering) {
- return std::make_shared<Notification<RemoteCommandResponse>>(
- Status(ErrorCodes::BalancerInterrupted,
- "Migration cannot be executed because the balancer is not running"));
- }
- }
-
- const auto fromShardStatus =
- Grid::get(opCtx)->shardRegistry()->getShard(opCtx, migrateInfo.from);
- if (!fromShardStatus.isOK()) {
- return std::make_shared<Notification<RemoteCommandResponse>>(
- std::move(fromShardStatus.getStatus()));
- }
-
- const auto fromShard = fromShardStatus.getValue();
- auto fromHostStatus = fromShard->getTargeter()->findHost(
- opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly});
- if (!fromHostStatus.isOK()) {
- return std::make_shared<Notification<RemoteCommandResponse>>(
- std::move(fromHostStatus.getStatus()));
- }
-
- BSONObjBuilder builder;
- MoveChunkRequest::appendAsCommand(&builder,
- migrateInfo.nss,
- migrateInfo.version,
- migrateInfo.from,
- migrateInfo.to,
- ChunkRange(migrateInfo.minKey, migrateInfo.maxKey),
- maxChunkSizeBytes,
- secondaryThrottle,
- waitForDelete,
- migrateInfo.forceJumbo);
-
- stdx::lock_guard<Latch> lock(_mutex);
-
- if (_state != State::kEnabled && _state != State::kRecovering) {
- return std::make_shared<Notification<RemoteCommandResponse>>(
- Status(ErrorCodes::BalancerInterrupted,
- "Migration cannot be executed because the balancer is not running"));
- }
-
- Migration migration(migrateInfo.nss, builder.obj());
-
- auto retVal = migration.completionNotification;
-
- _acquireDistLockAndSchedule(lock,
- opCtx,
- fromHostStatus.getValue(),
- std::move(migration),
- migrateInfo,
- waitForDelete,
- scopedMigrationRequests);
-
- return retVal;
-}
-
-void MigrationManager::_acquireDistLockAndSchedule(
- WithLock lock,
- OperationContext* opCtx,
- const HostAndPort& targetHost,
- Migration migration,
- const MigrateInfo& migrateInfo,
- bool waitForDelete,
- ScopedMigrationRequestsMap* scopedMigrationRequests) noexcept {
- auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
-
- const NamespaceString nss(migration.nss);
-
- auto it = _activeMigrations.find(nss);
- if (it == _activeMigrations.end()) {
- boost::optional<DistLockManager::ScopedLock> scopedLock;
- try {
- scopedLock.emplace(DistLockManager::get(opCtx)->lockDirectLocally(
- opCtx, nss.ns(), DistLockManager::kSingleLockAttemptTimeout));
-
- const std::string whyMessage(str::stream()
- << "Migrating chunk(s) in collection " << nss.ns());
- uassertStatusOK(DistLockManager::get(opCtx)->lockDirect(
- opCtx, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout));
- } catch (const DBException& ex) {
- migration.completionNotification->set(
- ex.toStatus(str::stream() << "Could not acquire collection lock for " << nss.ns()
- << " to migrate chunks"));
- return;
- }
-
- MigrationsState migrationsState(std::move(*scopedLock));
- it = _activeMigrations.insert(std::make_pair(nss, std::move(migrationsState))).first;
- }
-
- auto migrationRequestStatus = Status::OK();
-
- if (scopedMigrationRequests) {
- // Write a document to the config.migrations collection, in case this migration must be
- // recovered by the Balancer.
- auto statusWithScopedMigrationRequest =
- ScopedMigrationRequest::writeMigration(opCtx, migrateInfo, waitForDelete);
- if (!statusWithScopedMigrationRequest.isOK()) {
- migrationRequestStatus = std::move(statusWithScopedMigrationRequest.getStatus());
- } else {
- scopedMigrationRequests->emplace(migrateInfo.getName(),
- std::move(statusWithScopedMigrationRequest));
- }
- }
-
- auto migrations = &it->second.migrationsList;
-
- // Add ourselves to the list of migrations on this collection. From that point onwards, requests
- // must call _complete regardless of success or failure in order to remove it from the list.
- migrations->push_front(std::move(migration));
- auto itMigration = migrations->begin();
-
- if (!migrationRequestStatus.isOK()) {
- _complete(lock, opCtx, itMigration, std::move(migrationRequestStatus));
- return;
- }
-
- const RemoteCommandRequest remoteRequest(
- targetHost, NamespaceString::kAdminDb.toString(), itMigration->moveChunkCmdObj, opCtx);
-
- StatusWith<executor::TaskExecutor::CallbackHandle> callbackHandleWithStatus =
- executor->scheduleRemoteCommand(
- remoteRequest,
- [this, service = opCtx->getServiceContext(), itMigration](
- const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
- ThreadClient tc(getThreadName(), service);
- auto opCtx = cc().makeOperationContext();
-
- stdx::lock_guard<Latch> lock(_mutex);
- _complete(lock, opCtx.get(), itMigration, args.response);
- });
-
- if (callbackHandleWithStatus.isOK()) {
- itMigration->callbackHandle = std::move(callbackHandleWithStatus.getValue());
- return;
- }
-
- _complete(lock, opCtx, itMigration, std::move(callbackHandleWithStatus.getStatus()));
-}
-
-void MigrationManager::_complete(WithLock lock,
- OperationContext* opCtx,
- MigrationsList::iterator itMigration,
- const RemoteCommandResponse& remoteCommandResponse) {
- const NamespaceString nss(itMigration->nss);
-
- // Make sure to signal the notification last, after the distributed lock is freed, so that we
- // don't have the race condition where a subsequently scheduled migration finds the dist lock
- // still acquired.
- auto notificationToSignal = itMigration->completionNotification;
-
- auto it = _activeMigrations.find(nss);
- invariant(it != _activeMigrations.end());
-
- auto migrations = &it->second.migrationsList;
- migrations->erase(itMigration);
-
- if (migrations->empty()) {
- DistLockManager::get(opCtx)->unlock(opCtx, nss.ns());
- _activeMigrations.erase(it);
- _checkDrained(lock);
- }
-
- notificationToSignal->set(remoteCommandResponse);
-}
-
-void MigrationManager::_checkDrained(WithLock) {
- if (_state == State::kEnabled || _state == State::kRecovering) {
- return;
- }
- invariant(_state == State::kStopping);
-
- if (_activeMigrations.empty()) {
- _condVar.notify_all();
- }
-}
-
-void MigrationManager::_waitForRecovery() {
- stdx::unique_lock<Latch> lock(_mutex);
- _condVar.wait(lock, [this] { return _state != State::kRecovering; });
-}
-
-void MigrationManager::_abandonActiveMigrationsAndEnableManager(OperationContext* opCtx) {
- stdx::unique_lock<Latch> lock(_mutex);
- if (_state == State::kStopping) {
- // The balancer was interrupted. Let the next balancer recover the state.
- return;
- }
- invariant(_state == State::kRecovering);
-
- // Unlock all balancer distlocks we aren't using anymore.
- DistLockManager::get(opCtx)->unlockAll(opCtx);
-
- // Clear the config.migrations collection so that those chunks can be scheduled for migration
- // again.
- Grid::get(opCtx)
- ->catalogClient()
- ->removeConfigDocuments(
- opCtx, MigrationType::ConfigNS, BSONObj(), ShardingCatalogClient::kLocalWriteConcern)
- .transitional_ignore();
-
- _state = State::kEnabled;
- _condVar.notify_all();
-}
-
-Status MigrationManager::_processRemoteCommandResponse(
- const RemoteCommandResponse& remoteCommandResponse,
- ScopedMigrationRequest* scopedMigrationRequest) {
-
- stdx::lock_guard<Latch> lock(_mutex);
- Status commandStatus(ErrorCodes::InternalError, "Uninitialized value.");
-
- // Check for local errors sending the remote command caused by stepdown.
- if (isErrorDueToConfigStepdown(remoteCommandResponse.status,
- _state != State::kEnabled && _state != State::kRecovering)) {
- scopedMigrationRequest->keepDocumentOnDestruct();
- return {ErrorCodes::BalancerInterrupted,
- str::stream() << "Migration interrupted because the balancer is stopping."
- << " Command status: " << remoteCommandResponse.status.toString()};
- }
-
- if (!remoteCommandResponse.isOK()) {
- commandStatus = remoteCommandResponse.status;
- } else {
- // TODO: delete in 3.8
- commandStatus = extractMigrationStatusFromCommandResponse(remoteCommandResponse.data);
- }
-
- if (!Shard::shouldErrorBePropagated(commandStatus.code())) {
- commandStatus = {ErrorCodes::OperationFailed,
- str::stream() << "moveChunk command failed on source shard."
- << causedBy(commandStatus)};
- }
-
- // Any failure to remove the migration document should be because the config server is
- // stepping/shutting down. In this case we must fail the moveChunk command with a retryable
- // error so that the caller does not move on to other distlock requiring operations that could
- // fail when the balancer recovers and takes distlocks for migration recovery.
- Status status = scopedMigrationRequest->tryToRemoveMigration();
- if (!status.isOK()) {
- commandStatus = {
- ErrorCodes::BalancerInterrupted,
- str::stream() << "Migration interrupted because the balancer is stopping"
- << " and failed to remove the config.migrations document."
- << " Command status: "
- << (commandStatus.isOK() ? status.toString() : commandStatus.toString())};
- }
-
- return commandStatus;
-}
-
-MigrationManager::Migration::Migration(NamespaceString inNss, BSONObj inMoveChunkCmdObj)
- : nss(std::move(inNss)),
- moveChunkCmdObj(std::move(inMoveChunkCmdObj)),
- completionNotification(std::make_shared<Notification<RemoteCommandResponse>>()) {}
-
-MigrationManager::Migration::~Migration() {
- invariant(completionNotification);
-}
-
-MigrationManager::MigrationsState::MigrationsState(DistLockManager::ScopedLock lock)
- : lock(std::move(lock)) {}
-
-} // namespace mongo
diff --git a/src/mongo/db/s/balancer/migration_manager.h b/src/mongo/db/s/balancer/migration_manager.h
deleted file mode 100644
index 073353adccb..00000000000
--- a/src/mongo/db/s/balancer/migration_manager.h
+++ /dev/null
@@ -1,286 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <list>
-#include <map>
-#include <vector>
-
-#include "mongo/bson/bsonobj.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/db/s/balancer/balancer_policy.h"
-#include "mongo/db/s/balancer/type_migration.h"
-#include "mongo/db/s/dist_lock_manager.h"
-#include "mongo/executor/task_executor.h"
-#include "mongo/platform/mutex.h"
-#include "mongo/s/request_types/migration_secondary_throttle_options.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/unordered_map.h"
-#include "mongo/util/concurrency/notification.h"
-#include "mongo/util/concurrency/with_lock.h"
-
-namespace mongo {
-
-class ScopedMigrationRequest;
-
-// Uniquely identifies a migration, regardless of shard and version.
-typedef std::string MigrationIdentifier;
-typedef std::map<MigrationIdentifier, Status> MigrationStatuses;
-
-/**
- * Manages and executes parallel migrations for the balancer.
- */
-class MigrationManager {
- MigrationManager(const MigrationManager&) = delete;
- MigrationManager& operator=(const MigrationManager&) = delete;
-
-public:
- MigrationManager(ServiceContext* serviceContext);
- ~MigrationManager();
-
- /**
- * A blocking method that attempts to schedule all the migrations specified in
- * "candidateMigrations" and wait for them to complete. Takes the distributed lock for each
- * collection with a chunk being migrated.
- *
- * If any of the migrations, which were scheduled in parallel fails with a LockBusy error
- * reported from the shard, retries it serially without the distributed lock.
- *
- * Returns a map of migration Status objects to indicate the success/failure of each migration.
- */
- MigrationStatuses executeMigrationsForAutoBalance(
- OperationContext* opCtx,
- const std::vector<MigrateInfo>& migrateInfos,
- uint64_t maxChunkSizeBytes,
- const MigrationSecondaryThrottleOptions& secondaryThrottle,
- bool waitForDelete);
-
- /**
- * A blocking method that attempts to schedule the migration specified in "migrateInfo" and
- * waits for it to complete. Takes the distributed lock for the namespace which is being
- * migrated.
- *
- * Returns the status of the migration.
- */
- Status executeManualMigration(OperationContext* opCtx,
- const MigrateInfo& migrateInfo,
- uint64_t maxChunkSizeBytes,
- const MigrationSecondaryThrottleOptions& secondaryThrottle,
- bool waitForDelete);
-
- /**
- * Non-blocking method that puts the migration manager in the kRecovering state, in which
- * new migration requests will block until finishRecovery is called. Then reacquires distributed
- * locks for the balancer and any active migrations. The distributed locks are taken with local
- * write concern, since this is called in drain mode where majority writes are not yet possible.
- *
- * The active migration recovery may fail and be abandoned, setting the state to kEnabled.
- */
- void startRecoveryAndAcquireDistLocks(OperationContext* opCtx);
-
- /**
- * Blocking method that must only be called after startRecovery has been called. Recovers the
- * state of the migration manager (if necessary and able) and puts it in the kEnabled state,
- * where it will accept new migrations. Any migrations waiting on the recovery state will be
- * unblocked once the state is kEnabled, and then this function waits for the recovered active
- * migrations to finish before returning.
- *
- * The active migration recovery may fail and be abandoned, setting the state to kEnabled and
- * unblocking any process waiting on the recovery state.
- */
- void finishRecovery(OperationContext* opCtx,
- uint64_t maxChunkSizeBytes,
- const MigrationSecondaryThrottleOptions& secondaryThrottle);
-
- /**
- * Non-blocking method that should never be called concurrently with finishRecovery. Puts the
- * manager in a state where all subsequently scheduled migrations will immediately fail (without
- * ever getting scheduled) and all active ones will be cancelled. It has no effect if the
- * migration manager is already stopping or stopped.
- */
- void interruptAndDisableMigrations();
-
- /**
- * Blocking method that waits for any currently scheduled migrations to complete. Must be
- * called after interruptAndDisableMigrations has been called in order to be able to re-enable
- * migrations again.
- */
- void drainActiveMigrations();
-
-private:
- // The current state of the migration manager
- enum class State { // Allowed transitions:
- kStopped, // kRecovering
- kRecovering, // kEnabled, kStopping
- kEnabled, // kStopping
- kStopping, // kStopped
- };
-
- /**
- * Tracks the execution state of a single migration.
- */
- struct Migration {
- Migration(NamespaceString nss, BSONObj moveChunkCmdObj);
- ~Migration();
-
- // Namespace for which this migration applies
- NamespaceString nss;
-
- // Command object representing the migration
- BSONObj moveChunkCmdObj;
-
- // Callback handle for the migration network request. If the migration has not yet been sent
- // on the network, this value is not set.
- boost::optional<executor::TaskExecutor::CallbackHandle> callbackHandle;
-
- // Notification, which will be signaled when the migration completes
- std::shared_ptr<Notification<executor::RemoteCommandResponse>> completionNotification;
- };
-
- // Used as a type in which to store a list of active migrations. The reason to choose list is
- // that its iterators do not get invalidated when entries are removed around them. This allows
- // O(1) removal time.
- using MigrationsList = std::list<Migration>;
-
- // Tracks the execution of all of the active migrations within a collection. It holds a
- // NamespaceSerializer lock for the corresponding nss, which will be released when all of the
- // scheduled chunk migrations for this collection have completed.
- struct MigrationsState {
- MigrationsState(DistLockManager::ScopedLock lock);
-
- MigrationsList migrationsList;
- DistLockManager::ScopedLock lock;
- };
- using CollectionMigrationsStateMap = stdx::unordered_map<NamespaceString, MigrationsState>;
-
- using ScopedMigrationRequestsMap =
- std::map<MigrationIdentifier, StatusWith<ScopedMigrationRequest>>;
-
- /**
- * Optionally takes the collection distributed lock and schedules a chunk migration with the
- * specified parameters. May block for distributed lock acquisition. If dist lock acquisition is
- * successful (or not done), schedules the migration request and returns a notification which
- * can be used to obtain the outcome of the operation.
- */
- std::shared_ptr<Notification<executor::RemoteCommandResponse>> _schedule(
- OperationContext* opCtx,
- const MigrateInfo& migrateInfo,
- uint64_t maxChunkSizeBytes,
- const MigrationSecondaryThrottleOptions& secondaryThrottle,
- bool waitForDelete,
- ScopedMigrationRequestsMap* scopedMigrationRequests);
-
- /**
- * Acquires the collection distributed lock for the specified namespace and if it succeeds,
- * schedules the migration.
- *
- * The distributed lock is acquired before scheduling the first migration for the collection and
- * is only released when all active migrations on the collection have finished.
- *
- * Assumes that the migration document has already been written if no ScopedMigrationRequestsMap
- * pointer is passed. Otherwise, writes the migration document under the collection distributed
- * lock and adds it to the map.
- */
- void _acquireDistLockAndSchedule(WithLock,
- OperationContext* opCtx,
- const HostAndPort& targetHost,
- Migration migration,
- const MigrateInfo& migrateInfo,
- bool waitForDelete,
- ScopedMigrationRequestsMap* scopedMigrationRequests) noexcept;
-
- /**
- * Used internally for migrations scheduled with the distributed lock acquired by the config
- * server. Called exactly once for each scheduled migration, it will signal the migration in the
- * passed iterator and if this is the last migration for the collection will free the collection
- * distributed lock.
- */
- void _complete(WithLock,
- OperationContext* opCtx,
- MigrationsList::iterator itMigration,
- const executor::RemoteCommandResponse& remoteCommandResponse);
-
- /**
- * If the state of the migration manager is kStopping, checks whether there are any outstanding
- * scheduled requests and if there aren't any signals the class condition variable.
- */
- void _checkDrained(WithLock);
-
- /**
- * Blocking call, which waits for the migration manager to leave the recovering state (if it is
- * currently recovering).
- */
- void _waitForRecovery();
-
- /**
- * Should only be called from startRecovery or finishRecovery functions when the migration
- * manager is in either the kStopped or kRecovering state. Releases all the distributed locks
- * that the balancer holds, clears the config.migrations collection, changes the state of the
- * migration manager to kEnabled. Then unblocks all processes waiting for kEnabled state.
- */
- void _abandonActiveMigrationsAndEnableManager(OperationContext* opCtx);
-
- /**
- * Parses a moveChunk RemoteCommandResponse's two levels of Status objects and distiguishes
- * between errors generated by this config server and the shard primary to which the moveChunk
- * command was sent.
- *
- * If the command failed because of stepdown of this config server, the migration document
- * managed by 'scopedMigrationRequest' is saved for later balancer recovery and a
- * BalancerInterrupted error is returned. If the command failed because the shard to which the
- * command was sent returned an error, the migration document is not saved and the error is
- * returned without conversion.
- */
- Status _processRemoteCommandResponse(
- const executor::RemoteCommandResponse& remoteCommandResponse,
- ScopedMigrationRequest* scopedMigrationRequest);
-
- // The service context under which this migration manager runs.
- ServiceContext* const _serviceContext;
-
- // Carries migration information over from startRecovery to finishRecovery. Should only be set
- // in startRecovery and then accessed in finishRecovery.
- stdx::unordered_map<NamespaceString, std::list<MigrationType>> _migrationRecoveryMap;
-
- // Protects the class state below.
- Mutex _mutex = MONGO_MAKE_LATCH("MigrationManager::_mutex");
-
- // Always start the migration manager in a stopped state.
- State _state{State::kStopped};
-
- // Condition variable, which is waited on when the migration manager's state is changing and
- // signaled when the state change is complete.
- stdx::condition_variable _condVar;
-
- // Maps collection namespaces to that collection's active migrations.
- CollectionMigrationsStateMap _activeMigrations;
-};
-
-} // namespace mongo
diff --git a/src/mongo/db/s/balancer/migration_manager_test.cpp b/src/mongo/db/s/balancer/migration_manager_test.cpp
deleted file mode 100644
index ed04770685e..00000000000
--- a/src/mongo/db/s/balancer/migration_manager_test.cpp
+++ /dev/null
@@ -1,700 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/commands.h"
-#include "mongo/db/read_write_concern_defaults.h"
-#include "mongo/db/read_write_concern_defaults_cache_lookup_mock.h"
-#include "mongo/db/s/balancer/migration_manager.h"
-#include "mongo/db/s/balancer/migration_test_fixture.h"
-#include "mongo/db/s/dist_lock_manager.h"
-#include "mongo/s/request_types/move_chunk_request.h"
-
-namespace mongo {
-namespace {
-
-using executor::RemoteCommandRequest;
-using unittest::assertGet;
-
-const MigrationSecondaryThrottleOptions kDefaultSecondaryThrottle =
- MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kDefault);
-
-class MigrationManagerTest : public MigrationTestFixture {
-protected:
- void setUp() override {
- MigrationTestFixture::setUp();
- _migrationManager = std::make_unique<MigrationManager>(getServiceContext());
- _migrationManager->startRecoveryAndAcquireDistLocks(operationContext());
- _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle);
-
- // Necessary because the migration manager may take a dist lock, which calls serverStatus
- // and will attempt to return the latest read write concern defaults.
- ReadWriteConcernDefaults::create(getServiceContext(), _lookupMock.getFetchDefaultsFn());
- }
-
- void tearDown() override {
- checkMigrationsCollectionIsEmptyAndLocksAreUnlocked();
- _migrationManager->interruptAndDisableMigrations();
- _migrationManager->drainActiveMigrations();
- _migrationManager.reset();
- ConfigServerTestFixture::tearDown();
- }
-
- /**
- * Sets up mock network to expect a moveChunk command and returns a fixed BSON response or a
- * "returnStatus".
- */
- void expectMoveChunkCommand(const NamespaceString& nss,
- const ChunkType& chunk,
- const ShardId& toShardId,
- const BSONObj& response) {
- onCommand([&nss, &chunk, &toShardId, &response](const RemoteCommandRequest& request) {
- NamespaceString nss(request.cmdObj.firstElement().valueStringData());
-
- const StatusWith<MoveChunkRequest> moveChunkRequestWithStatus =
- MoveChunkRequest::createFromCommand(nss, request.cmdObj);
- ASSERT_OK(moveChunkRequestWithStatus.getStatus());
-
- ASSERT_EQ(nss, moveChunkRequestWithStatus.getValue().getNss());
- ASSERT_BSONOBJ_EQ(chunk.getMin(), moveChunkRequestWithStatus.getValue().getMinKey());
- ASSERT_BSONOBJ_EQ(chunk.getMax(), moveChunkRequestWithStatus.getValue().getMaxKey());
- ASSERT_EQ(chunk.getShard(), moveChunkRequestWithStatus.getValue().getFromShardId());
-
- ASSERT_EQ(toShardId, moveChunkRequestWithStatus.getValue().getToShardId());
-
- return response;
- });
- }
-
- void expectMoveChunkCommand(const NamespaceString& nss,
- const ChunkType& chunk,
- const ShardId& toShardId,
- const Status& returnStatus) {
- BSONObjBuilder resultBuilder;
- CommandHelpers::appendCommandStatusNoThrow(resultBuilder, returnStatus);
- expectMoveChunkCommand(nss, chunk, toShardId, resultBuilder.obj());
- }
-
- std::unique_ptr<MigrationManager> _migrationManager;
- ReadWriteConcernDefaultsLookupMock _lookupMock;
-};
-
-TEST_F(MigrationManagerTest, OneCollectionTwoMigrations) {
- // Set up two shards in the metadata.
- ASSERT_OK(catalogClient()->insertConfigDocument(
- operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
- ASSERT_OK(catalogClient()->insertConfigDocument(
- operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
-
- // Set up the database and collection as sharded in the metadata.
- const std::string dbName = "foo";
- const NamespaceString collName(dbName, "bar");
- const auto collUUID = UUID::gen();
- ChunkVersion version(2, 0, OID::gen(), Timestamp(42));
-
- setUpDatabase(dbName, kShardId0);
- setUpCollection(collName, collUUID, version);
-
- // Set up two chunks in the metadata.
- ChunkType chunk1 =
- setUpChunk(collUUID, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version);
- version.incMinor();
- ChunkType chunk2 =
- setUpChunk(collUUID, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
-
- // Going to request that these two chunks get migrated.
- const std::vector<MigrateInfo> migrationRequests{{kShardId1,
- collName,
- chunk1,
- MoveChunkRequest::ForceJumbo::kDoNotForce,
- MigrateInfo::chunksImbalance},
- {kShardId3,
- collName,
- chunk2,
- MoveChunkRequest::ForceJumbo::kDoNotForce,
- MigrateInfo::chunksImbalance}};
-
- auto future = launchAsync([this, migrationRequests] {
- ThreadClient tc("Test", getServiceContext());
- auto opCtx = cc().makeOperationContext();
-
- // Scheduling the moveChunk commands requires finding a host to which to send the command.
- // Set up dummy hosts for the source shards.
- shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
- shardTargeterMock(opCtx.get(), kShardId2)->setFindHostReturnValue(kShardHost2);
-
- MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
- opCtx.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
-
- for (const auto& migrateInfo : migrationRequests) {
- ASSERT_OK(migrationStatuses.at(migrateInfo.getName()));
- }
- });
-
- // Expect two moveChunk commands.
- expectMoveChunkCommand(collName, chunk1, kShardId1, Status::OK());
- expectMoveChunkCommand(collName, chunk2, kShardId3, Status::OK());
-
- // Run the MigrationManager code.
- future.default_timed_get();
-}
-
-TEST_F(MigrationManagerTest, TwoCollectionsTwoMigrationsEach) {
- // Set up two shards in the metadata.
- ASSERT_OK(catalogClient()->insertConfigDocument(
- operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
- ASSERT_OK(catalogClient()->insertConfigDocument(
- operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
-
- // Set up a database and two collections as sharded in the metadata.
- std::string dbName = "foo";
- const NamespaceString collName1(dbName, "bar");
- const auto collUUID1 = UUID::gen();
- const NamespaceString collName2(dbName, "baz");
- const auto collUUID2 = UUID::gen();
- ChunkVersion version1(2, 0, OID::gen(), Timestamp(12));
- ChunkVersion version2(2, 0, OID::gen(), Timestamp(24));
-
- setUpDatabase(dbName, kShardId0);
- setUpCollection(collName1, collUUID1, version1);
- setUpCollection(collName2, collUUID2, version2);
-
- // Set up two chunks in the metadata for each collection.
- ChunkType chunk1coll1 =
- setUpChunk(collUUID1, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version1);
- version1.incMinor();
- ChunkType chunk2coll1 =
- setUpChunk(collUUID1, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version1);
-
- ChunkType chunk1coll2 =
- setUpChunk(collUUID2, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version2);
- version2.incMinor();
- ChunkType chunk2coll2 =
- setUpChunk(collUUID2, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version2);
-
- // Going to request that these four chunks get migrated.
- const std::vector<MigrateInfo> migrationRequests{{kShardId1,
- collName1,
- chunk1coll1,
- MoveChunkRequest::ForceJumbo::kDoNotForce,
- MigrateInfo::chunksImbalance},
- {kShardId3,
- collName1,
- chunk2coll1,
- MoveChunkRequest::ForceJumbo::kDoNotForce,
- MigrateInfo::chunksImbalance},
- {kShardId1,
- collName2,
- chunk1coll2,
- MoveChunkRequest::ForceJumbo::kDoNotForce,
- MigrateInfo::chunksImbalance},
- {kShardId3,
- collName2,
- chunk2coll2,
- MoveChunkRequest::ForceJumbo::kDoNotForce,
- MigrateInfo::chunksImbalance}};
-
- auto future = launchAsync([this, migrationRequests] {
- ThreadClient tc("Test", getServiceContext());
- auto opCtx = cc().makeOperationContext();
-
- // Scheduling the moveChunk commands requires finding a host to which to send the command.
- // Set up dummy hosts for the source shards.
- shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
- shardTargeterMock(opCtx.get(), kShardId2)->setFindHostReturnValue(kShardHost2);
-
- MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
- opCtx.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
-
- for (const auto& migrateInfo : migrationRequests) {
- ASSERT_OK(migrationStatuses.at(migrateInfo.getName()));
- }
- });
-
- // Expect four moveChunk commands.
- expectMoveChunkCommand(collName1, chunk1coll1, kShardId1, Status::OK());
- expectMoveChunkCommand(collName1, chunk2coll1, kShardId3, Status::OK());
- expectMoveChunkCommand(collName2, chunk1coll2, kShardId1, Status::OK());
- expectMoveChunkCommand(collName2, chunk2coll2, kShardId3, Status::OK());
-
- // Run the MigrationManager code.
- future.default_timed_get();
-}
-
-// The MigrationManager should fail the migration if a host is not found for the source shard.
-// Scheduling a moveChunk command requires finding a host to which to send the command.
-TEST_F(MigrationManagerTest, SourceShardNotFound) {
- // Set up two shards in the metadata.
- ASSERT_OK(catalogClient()->insertConfigDocument(
- operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
- ASSERT_OK(catalogClient()->insertConfigDocument(
- operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
-
- // Set up the database and collection as sharded in the metadata.
- const std::string dbName = "foo";
- const NamespaceString collName(dbName, "bar");
- const auto collUUID = UUID::gen();
- ChunkVersion version(2, 0, OID::gen(), Timestamp(42));
-
- setUpDatabase(dbName, kShardId0);
- setUpCollection(collName, collUUID, version);
-
- // Set up two chunks in the metadata.
- ChunkType chunk1 =
- setUpChunk(collUUID, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version);
- version.incMinor();
- ChunkType chunk2 =
- setUpChunk(collUUID, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
-
- // Going to request that these two chunks get migrated.
- const std::vector<MigrateInfo> migrationRequests{{kShardId1,
- collName,
- chunk1,
- MoveChunkRequest::ForceJumbo::kDoNotForce,
- MigrateInfo::chunksImbalance},
- {kShardId3,
- collName,
- chunk2,
- MoveChunkRequest::ForceJumbo::kDoNotForce,
- MigrateInfo::chunksImbalance}};
-
- auto future = launchAsync([this, chunk1, chunk2, migrationRequests] {
- ThreadClient tc("Test", getServiceContext());
- auto opCtx = cc().makeOperationContext();
-
- // Scheduling a moveChunk command requires finding a host to which to send the command. Set
- // up a dummy host for kShardHost0, and return an error for kShardHost3.
- shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
- shardTargeterMock(opCtx.get(), kShardId2)
- ->setFindHostReturnValue(
- Status(ErrorCodes::ReplicaSetNotFound, "SourceShardNotFound generated error."));
-
- MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
- opCtx.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
-
- ASSERT_OK(migrationStatuses.at(migrationRequests.front().getName()));
- ASSERT_EQ(ErrorCodes::ReplicaSetNotFound,
- migrationStatuses.at(migrationRequests.back().getName()));
- });
-
- // Expect only one moveChunk command to be called.
- expectMoveChunkCommand(collName, chunk1, kShardId1, Status::OK());
-
- // Run the MigrationManager code.
- future.default_timed_get();
-}
-
-// TODO: Delete in 3.8
-TEST_F(MigrationManagerTest, JumboChunkResponseBackwardsCompatibility) {
- // Set up one shard in the metadata.
- ASSERT_OK(catalogClient()->insertConfigDocument(
- operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
-
- // Set up the database and collection as sharded in the metadata.
- const std::string dbName = "foo";
- const NamespaceString collName(dbName, "bar");
- const auto collUUID = UUID::gen();
- ChunkVersion version(2, 0, OID::gen(), Timestamp(42));
-
- setUpDatabase(dbName, kShardId0);
- setUpCollection(collName, collUUID, version);
-
- // Set up a single chunk in the metadata.
- ChunkType chunk1 =
- setUpChunk(collUUID, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version);
-
- // Going to request that this chunk gets migrated.
- const std::vector<MigrateInfo> migrationRequests{{kShardId1,
- collName,
- chunk1,
- MoveChunkRequest::ForceJumbo::kDoNotForce,
- MigrateInfo::chunksImbalance}};
-
- auto future = launchAsync([this, chunk1, migrationRequests] {
- ThreadClient tc("Test", getServiceContext());
- auto opCtx = cc().makeOperationContext();
-
- // Scheduling a moveChunk command requires finding a host to which to send the command. Set
- // up a dummy host for kShardHost0.
- shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
-
- MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
- opCtx.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
-
- ASSERT_EQ(ErrorCodes::ChunkTooBig,
- migrationStatuses.at(migrationRequests.front().getName()));
- });
-
- // Expect only one moveChunk command to be called.
- expectMoveChunkCommand(collName, chunk1, kShardId1, BSON("ok" << 0 << "chunkTooBig" << true));
-
- // Run the MigrationManager code.
- future.default_timed_get();
-}
-
-TEST_F(MigrationManagerTest, InterruptMigration) {
- // Set up one shard in the metadata.
- ASSERT_OK(catalogClient()->insertConfigDocument(
- operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
-
- // Set up the database and collection as sharded in the metadata.
- const std::string dbName = "foo";
- const NamespaceString collName(dbName, "bar");
- const auto collUUID = UUID::gen();
- ChunkVersion version(2, 0, OID::gen(), Timestamp(42));
-
- setUpDatabase(dbName, kShardId0);
- setUpCollection(collName, collUUID, version);
-
- // Set up a single chunk in the metadata.
- ChunkType chunk =
- setUpChunk(collUUID, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version);
-
- auto future = launchAsync([&] {
- ThreadClient tc("Test", getServiceContext());
- auto opCtx = cc().makeOperationContext();
-
- // Scheduling a moveChunk command requires finding a host to which to send the command. Set
- // up a dummy host for kShardHost0.
- shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
-
- ASSERT_EQ(
- ErrorCodes::BalancerInterrupted,
- _migrationManager->executeManualMigration(opCtx.get(),
- {kShardId1,
- collName,
- chunk,
- MoveChunkRequest::ForceJumbo::kDoNotForce,
- MigrateInfo::chunksImbalance},
- 0,
- kDefaultSecondaryThrottle,
- false));
- });
-
- // Wait till the move chunk request gets sent and pretend that it is stuck by never responding
- // to the request
- network()->enterNetwork();
- network()->blackHole(network()->getNextReadyRequest());
- network()->exitNetwork();
-
- // Now that the migration request is 'pending', try to cancel the migration manager. This should
- // succeed.
- _migrationManager->interruptAndDisableMigrations();
-
- // Ensure that cancellations get processed
- network()->enterNetwork();
- network()->runReadyNetworkOperations();
- network()->exitNetwork();
-
- // Ensure that the previously scheduled migration is cancelled
- future.default_timed_get();
-
- // Ensure that no new migrations can be scheduled
- ASSERT_EQ(ErrorCodes::BalancerInterrupted,
- _migrationManager->executeManualMigration(operationContext(),
- {kShardId1,
- collName,
- chunk,
- MoveChunkRequest::ForceJumbo::kDoNotForce,
- MigrateInfo::chunksImbalance},
- 0,
- kDefaultSecondaryThrottle,
- false));
-
- // Ensure that the migration manager is no longer handling any migrations.
- _migrationManager->drainActiveMigrations();
-
- // Check that the migration that was active when the migration manager was interrupted can be
- // found in config.migrations (and thus would be recovered if a migration manager were to start
- // up again).
- auto statusWithMigrationsQueryResponse =
- shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
- operationContext(),
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- repl::ReadConcernLevel::kMajorityReadConcern,
- MigrationType::ConfigNS,
- BSON(MigrationType::ns(collName.ns()) << MigrationType::min(chunk.getMin())),
- BSONObj(),
- boost::none);
- Shard::QueryResponse migrationsQueryResponse =
- uassertStatusOK(statusWithMigrationsQueryResponse);
- ASSERT_EQUALS(1U, migrationsQueryResponse.docs.size());
-
- ASSERT_OK(catalogClient()->removeConfigDocuments(
- operationContext(),
- MigrationType::ConfigNS,
- BSON(MigrationType::ns(collName.ns()) << MigrationType::min(chunk.getMin())),
- kMajorityWriteConcern));
-
- // Restore the migration manager back to the started state, which is expected by tearDown
- _migrationManager->startRecoveryAndAcquireDistLocks(operationContext());
- _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle);
-}
-
-TEST_F(MigrationManagerTest, RestartMigrationManager) {
- // Set up one shard in the metadata.
- ASSERT_OK(catalogClient()->insertConfigDocument(
- operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
-
- // Set up the database and collection as sharded in the metadata.
- const std::string dbName = "foo";
- const NamespaceString collName(dbName, "bar");
- const auto collUUID = UUID::gen();
- ChunkVersion version(2, 0, OID::gen(), Timestamp(42));
-
- setUpDatabase(dbName, kShardId0);
- setUpCollection(collName, collUUID, version);
-
- // Set up a single chunk in the metadata.
- ChunkType chunk1 =
- setUpChunk(collUUID, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version);
-
- // Go through the lifecycle of the migration manager
- _migrationManager->interruptAndDisableMigrations();
- _migrationManager->drainActiveMigrations();
- _migrationManager->startRecoveryAndAcquireDistLocks(operationContext());
- _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle);
-
- auto future = launchAsync([&] {
- ThreadClient tc("Test", getServiceContext());
- auto opCtx = cc().makeOperationContext();
-
- // Scheduling a moveChunk command requires finding a host to which to send the command. Set
- // up a dummy host for kShardHost0.
- shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
-
- ASSERT_OK(
- _migrationManager->executeManualMigration(opCtx.get(),
- {kShardId1,
- collName,
- chunk1,
- MoveChunkRequest::ForceJumbo::kDoNotForce,
- MigrateInfo::chunksImbalance},
- 0,
- kDefaultSecondaryThrottle,
- false));
- });
-
- // Expect only one moveChunk command to be called.
- expectMoveChunkCommand(collName, chunk1, kShardId1, Status::OK());
-
- // Run the MigrationManager code.
- future.default_timed_get();
-}
-
-TEST_F(MigrationManagerTest, MigrationRecovery) {
- // Set up two shards in the metadata.
- ASSERT_OK(catalogClient()->insertConfigDocument(
- operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
- ASSERT_OK(catalogClient()->insertConfigDocument(
- operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
-
- // Set up the database and collection as sharded in the metadata.
- const std::string dbName = "foo";
- const NamespaceString collName(dbName, "bar");
- const auto collUUID = UUID::gen();
- ChunkVersion version(1, 0, OID::gen(), Timestamp(42));
-
- setUpDatabase(dbName, kShardId0);
- setUpCollection(collName, collUUID, version);
-
- // Set up two chunks in the metadata and set up two fake active migrations by writing documents
- // to the config.migrations collection.
- ChunkType chunk1 =
- setUpChunk(collUUID, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version);
- version.incMinor();
- ChunkType chunk2 =
- setUpChunk(collUUID, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
-
- _migrationManager->interruptAndDisableMigrations();
- _migrationManager->drainActiveMigrations();
-
- setUpMigration(collName, chunk1, kShardId1.toString());
- setUpMigration(collName, chunk2, kShardId3.toString());
-
- // Mimic all config distlocks being released on config server stepup to primary.
- DistLockManager::get(operationContext())->unlockAll(operationContext());
-
- _migrationManager->startRecoveryAndAcquireDistLocks(operationContext());
-
- auto future = launchAsync([this] {
- ThreadClient tc("Test", getServiceContext());
- auto opCtx = cc().makeOperationContext();
-
- // Scheduling the moveChunk commands requires finding hosts to which to send the commands.
- // Set up dummy hosts for the source shards.
- shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
- shardTargeterMock(opCtx.get(), kShardId2)->setFindHostReturnValue(kShardHost2);
-
- _migrationManager->finishRecovery(opCtx.get(), 0, kDefaultSecondaryThrottle);
- });
-
- // Expect two moveChunk commands.
- expectMoveChunkCommand(collName, chunk1, kShardId1, Status::OK());
- expectMoveChunkCommand(collName, chunk2, kShardId3, Status::OK());
-
- // Run the MigrationManager code.
- future.default_timed_get();
-}
-
-TEST_F(MigrationManagerTest, FailMigrationRecovery) {
- // Set up two shards in the metadata.
- ASSERT_OK(catalogClient()->insertConfigDocument(
- operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
- ASSERT_OK(catalogClient()->insertConfigDocument(
- operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
-
- // Set up the database and collection as sharded in the metadata.
- const std::string dbName = "foo";
- const NamespaceString collName(dbName, "bar");
- const auto collUUID = UUID::gen();
- ChunkVersion version(1, 0, OID::gen(), Timestamp(42));
-
- setUpDatabase(dbName, kShardId0);
- setUpCollection(collName, collUUID, version);
-
- // Set up two chunks in the metadata.
- ChunkType chunk1 =
- setUpChunk(collUUID, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version);
- version.incMinor();
- ChunkType chunk2 =
- setUpChunk(collUUID, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
-
- _migrationManager->interruptAndDisableMigrations();
- _migrationManager->drainActiveMigrations();
-
- setUpMigration(collName, chunk1, kShardId1.toString());
-
- // Set up a fake active migration document that will fail MigrationType parsing -- missing
- // field.
- BSONObjBuilder builder;
- builder.append("_id", "testing");
- // No MigrationType::ns() field!
- builder.append(MigrationType::min(), chunk2.getMin());
- builder.append(MigrationType::max(), chunk2.getMax());
- builder.append(MigrationType::toShard(), kShardId3.toString());
- builder.append(MigrationType::fromShard(), chunk2.getShard().toString());
- ASSERT_OK(catalogClient()->insertConfigDocument(
- operationContext(), MigrationType::ConfigNS, builder.obj(), kMajorityWriteConcern));
-
- // Take the distributed lock for the collection, which should be released during recovery when
- // it fails. Any dist lock held by the config server will be released via proccessId, so the
- // session ID used here doesn't matter.
- ASSERT_OK(DistLockManager::get(operationContext())
- ->lockDirect(operationContext(),
- collName.ns(),
- "MigrationManagerTest",
- DistLockManager::kSingleLockAttemptTimeout));
-
- _migrationManager->startRecoveryAndAcquireDistLocks(operationContext());
- _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle);
-
- // MigrationManagerTest::tearDown checks that the config.migrations collection is empty and all
- // distributed locks are unlocked.
-}
-
-// Check that retriable / replset monitor altering errors returned from remote moveChunk commands
-// sent to source shards are not returned to the caller (mongos), but instead converted into
-// OperationFailed errors.
-TEST_F(MigrationManagerTest, RemoteCallErrorConversionToOperationFailed) {
- // Set up two shards in the metadata.
- ASSERT_OK(catalogClient()->insertConfigDocument(
- operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern));
- ASSERT_OK(catalogClient()->insertConfigDocument(
- operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern));
-
- // Set up the database and collection as sharded in the metadata.
- const std::string dbName = "foo";
- const NamespaceString collName(dbName, "bar");
- const auto collUUID = UUID::gen();
- ChunkVersion version(1, 0, OID::gen(), Timestamp(42));
-
- setUpDatabase(dbName, kShardId0);
- setUpCollection(collName, collUUID, version);
-
- // Set up two chunks in the metadata.
- ChunkType chunk1 =
- setUpChunk(collUUID, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version);
- version.incMinor();
- ChunkType chunk2 =
- setUpChunk(collUUID, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version);
-
- // Going to request that these two chunks get migrated.
- const std::vector<MigrateInfo> migrationRequests{{kShardId1,
- collName,
- chunk1,
- MoveChunkRequest::ForceJumbo::kDoNotForce,
- MigrateInfo::chunksImbalance},
- {kShardId3,
- collName,
- chunk2,
- MoveChunkRequest::ForceJumbo::kDoNotForce,
- MigrateInfo::chunksImbalance}};
-
- auto future = launchAsync([&] {
- ThreadClient tc("Test", getServiceContext());
- auto opCtx = cc().makeOperationContext();
-
- // Scheduling the moveChunk commands requires finding a host to which to send the command.
- // Set up dummy hosts for the source shards.
- shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0);
- shardTargeterMock(opCtx.get(), kShardId2)->setFindHostReturnValue(kShardHost2);
-
- MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance(
- opCtx.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false);
-
- ASSERT_EQ(ErrorCodes::OperationFailed,
- migrationStatuses.at(migrationRequests.front().getName()));
- ASSERT_EQ(ErrorCodes::OperationFailed,
- migrationStatuses.at(migrationRequests.back().getName()));
- });
-
- // Expect a moveChunk command that will fail with a retriable error.
- expectMoveChunkCommand(
- collName,
- chunk1,
- kShardId1,
- Status(ErrorCodes::NotPrimaryOrSecondary,
- "RemoteCallErrorConversionToOperationFailedCheck generated error."));
-
- // Expect a moveChunk command that will fail with a replset monitor updating error.
- expectMoveChunkCommand(
- collName,
- chunk2,
- kShardId3,
- Status(ErrorCodes::NetworkInterfaceExceededTimeLimit,
- "RemoteCallErrorConversionToOperationFailedCheck generated error."));
-
- // Run the MigrationManager code.
- future.default_timed_get();
-}
-
-} // namespace
-} // namespace mongo
diff --git a/src/mongo/db/s/balancer/scoped_migration_request.cpp b/src/mongo/db/s/balancer/scoped_migration_request.cpp
deleted file mode 100644
index 4e44b2d7e7e..00000000000
--- a/src/mongo/db/s/balancer/scoped_migration_request.cpp
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/s/balancer/scoped_migration_request.h"
-
-#include "mongo/db/s/balancer/type_migration.h"
-#include "mongo/db/write_concern_options.h"
-#include "mongo/logv2/log.h"
-#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/grid.h"
-
-namespace mongo {
-namespace {
-
-const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
- WriteConcernOptions::SyncMode::UNSET,
- WriteConcernOptions::kWriteConcernTimeoutMigration);
-const int kDuplicateKeyErrorMaxRetries = 2;
-
-} // namespace
-
-ScopedMigrationRequest::ScopedMigrationRequest(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& minKey)
- : _opCtx(opCtx), _nss(nss), _minKey(minKey) {}
-
-ScopedMigrationRequest::~ScopedMigrationRequest() {
- if (!_opCtx) {
- // If the opCtx object was cleared, nothing should happen in the destructor.
- return;
- }
-
- // Try to delete the entry in the config.migrations collection. If the command fails, that is
- // okay.
- BSONObj migrationDocumentIdentifier =
- BSON(MigrationType::ns(_nss.ns()) << MigrationType::min(_minKey));
- Status result = Grid::get(_opCtx)->catalogClient()->removeConfigDocuments(
- _opCtx, MigrationType::ConfigNS, migrationDocumentIdentifier, kMajorityWriteConcern);
-
- if (!result.isOK()) {
- LOGV2(21900,
- "Failed to remove config.migrations document for migration '{migration}': {error}",
- "Failed to remove config.migrations document for migration",
- "migration"_attr = migrationDocumentIdentifier,
- "error"_attr = redact(result));
- }
-}
-
-ScopedMigrationRequest::ScopedMigrationRequest(ScopedMigrationRequest&& other) {
- // This function relies on the move assigment to nullify 'other._opCtx'. If this is no longer
- // the case, this function should be updated to ensure that it nulls out 'other._opCtx'.
- *this = std::move(other);
-}
-
-ScopedMigrationRequest& ScopedMigrationRequest::operator=(ScopedMigrationRequest&& other) {
- if (this != &other) {
- _opCtx = other._opCtx;
- _nss = other._nss;
- _minKey = other._minKey;
- // Set opCtx to null so that the destructor will do nothing.
- other._opCtx = nullptr;
- }
-
- return *this;
-}
-
-StatusWith<ScopedMigrationRequest> ScopedMigrationRequest::writeMigration(
- OperationContext* opCtx, const MigrateInfo& migrateInfo, bool waitForDelete) {
- auto const grid = Grid::get(opCtx);
-
- // Try to write a unique migration document to config.migrations.
- const MigrationType migrationType(migrateInfo, waitForDelete);
-
- for (int retry = 0; retry < kDuplicateKeyErrorMaxRetries; ++retry) {
- Status result = grid->catalogClient()->insertConfigDocument(
- opCtx, MigrationType::ConfigNS, migrationType.toBSON(), kMajorityWriteConcern);
-
- if (result == ErrorCodes::DuplicateKey) {
- // If the exact migration described by "migrateInfo" is active, return a scoped object
- // for the request because this migration request will join the active one once
- // scheduled.
- auto statusWithMigrationQueryResult =
- grid->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
- opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- repl::ReadConcernLevel::kLocalReadConcern,
- MigrationType::ConfigNS,
- migrateInfo.getMigrationTypeQuery(),
- BSONObj(),
- boost::none);
- if (!statusWithMigrationQueryResult.isOK()) {
- return statusWithMigrationQueryResult.getStatus().withContext(
- str::stream() << "Failed to verify whether conflicting migration is in "
- << "progress for migration '" << redact(migrateInfo.toString())
- << "' while trying to query config.migrations.");
- }
- if (statusWithMigrationQueryResult.getValue().docs.empty()) {
- // The document that caused the DuplicateKey error is no longer in the collection,
- // so retrying the insert might succeed.
- continue;
- }
- invariant(statusWithMigrationQueryResult.getValue().docs.size() == 1);
-
- BSONObj activeMigrationBSON = statusWithMigrationQueryResult.getValue().docs.front();
- auto statusWithActiveMigration = MigrationType::fromBSON(activeMigrationBSON);
- if (!statusWithActiveMigration.isOK()) {
- return statusWithActiveMigration.getStatus().withContext(
- str::stream() << "Failed to verify whether conflicting migration is in "
- << "progress for migration '" << redact(migrateInfo.toString())
- << "' while trying to parse active migration document '"
- << redact(activeMigrationBSON.toString()) << "'.");
- }
-
- MigrateInfo activeMigrateInfo =
- statusWithActiveMigration.getValue().toMigrateInfo(migrateInfo.uuid);
- if (activeMigrateInfo.to != migrateInfo.to ||
- activeMigrateInfo.from != migrateInfo.from) {
- LOGV2(
- 21901,
- "Failed to write document '{newMigration}' to config.migrations because there "
- "is already an active migration for that chunk: "
- "'{activeMigration}': {error}",
- "Failed to write document to config.migrations because there "
- "is already an active migration for that chunk",
- "newMigration"_attr = redact(migrateInfo.toString()),
- "activeMigration"_attr = redact(activeMigrateInfo.toString()),
- "error"_attr = redact(result));
- return result;
- }
-
- result = Status::OK();
- }
-
- // As long as there isn't a DuplicateKey error, the document may have been written, and it's
- // safe (won't delete another migration's document) and necessary to try to clean up the
- // document via the destructor.
- ScopedMigrationRequest scopedMigrationRequest(opCtx, migrateInfo.nss, migrateInfo.minKey);
-
- // If there was a write error, let the object go out of scope and clean up in the
- // destructor.
- if (!result.isOK()) {
- return result;
- }
-
- return std::move(scopedMigrationRequest);
- }
-
- return Status(ErrorCodes::OperationFailed,
- str::stream() << "Failed to insert the config.migrations document after max "
- << "number of retries. Chunk '"
- << ChunkRange(migrateInfo.minKey, migrateInfo.maxKey).toString()
- << "' in collection '" << migrateInfo.nss
- << "' was being moved (somewhere) by another operation.");
-}
-
-ScopedMigrationRequest ScopedMigrationRequest::createForRecovery(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& minKey) {
- return ScopedMigrationRequest(opCtx, nss, minKey);
-}
-
-Status ScopedMigrationRequest::tryToRemoveMigration() {
- invariant(_opCtx);
- BSONObj migrationDocumentIdentifier =
- BSON(MigrationType::ns(_nss.ns()) << MigrationType::min(_minKey));
- Status status = Grid::get(_opCtx)->catalogClient()->removeConfigDocuments(
- _opCtx, MigrationType::ConfigNS, migrationDocumentIdentifier, kMajorityWriteConcern);
- if (status.isOK()) {
- // Don't try to do a no-op remove in the destructor.
- _opCtx = nullptr;
- }
- return status;
-}
-
-void ScopedMigrationRequest::keepDocumentOnDestruct() {
- invariant(_opCtx);
- _opCtx = nullptr;
- LOGV2_DEBUG(21902,
- 1,
- "Keeping config.migrations document with namespace {namespace} and minKey "
- "{minKey} for balancer recovery",
- "Keeping config.migrations document for balancer recovery",
- "namespace"_attr = _nss,
- "minKey"_attr = _minKey);
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/s/balancer/scoped_migration_request.h b/src/mongo/db/s/balancer/scoped_migration_request.h
deleted file mode 100644
index bfb58ba103c..00000000000
--- a/src/mongo/db/s/balancer/scoped_migration_request.h
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/base/status_with.h"
-#include "mongo/db/s/balancer/balancer_policy.h"
-#include "mongo/s/request_types/migration_secondary_throttle_options.h"
-
-namespace mongo {
-
-/**
- * RAII class that handles writes to the config.migrations collection for a migration that comes
- * through the balancer.
- *
- * A migration must have an entry in the config.migrations collection so that the Balancer can
- * recover from stepdown/crash. This entry must be entered before a migration begins and then
- * removed once the migration has finished.
- *
- * This class should only be used by the Balancer!
- */
-class ScopedMigrationRequest {
- ScopedMigrationRequest(const ScopedMigrationRequest&) = delete;
- ScopedMigrationRequest& operator=(const ScopedMigrationRequest&) = delete;
-
-public:
- /**
- * Deletes this migration's entry in the config.migrations collection, using majority write
- * concern. If there is a balancer stepdown/crash before the write propagates to a majority of
- * servers, that is alright because the balancer recovery process will handle it.
- *
- * If keepDocumentOnDestruct() has been called, then no attempt to remove the document is made.
- */
- ~ScopedMigrationRequest();
-
- ScopedMigrationRequest(ScopedMigrationRequest&& other);
- ScopedMigrationRequest& operator=(ScopedMigrationRequest&& other);
-
- /**
- * Inserts an unique migration entry in the config.migrations collection. If the write is
- * successful, a ScopedMigrationRequest object is returned; otherwise, the write error.
- *
- * The destructor will handle removing the document when it is no longer needed.
- */
- static StatusWith<ScopedMigrationRequest> writeMigration(OperationContext* opCtx,
- const MigrateInfo& migrate,
- bool waitForDelete);
-
- /**
- * Creates a ScopedMigrationRequest object without inserting a document into config.migrations.
- * The destructor will handle removing the migration document when it is no longer needed.
- *
- * This should only be used on Balancer recovery when a config.migrations document already
- * exists for the migration.
- */
- static ScopedMigrationRequest createForRecovery(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& minKey);
-
- /**
- * Do not call if keepDocumentOnDestruct has been called previously: it will invariant.
- *
- * Attempts to delete this migration's entry in the config.migrations collection using majority
- * write concern. If successful, clears the operation context so that the destructor will not
- * redundantly try to remove an already successfully deleted document.
- */
- Status tryToRemoveMigration();
-
- /**
- * Do not call if tryToRemoveMigration has been called previously: it may invariant.
- *
- * Clears the operation context so that the destructor will not remove the config.migrations
- * document for the migration.
- *
- * This should only be used on the Balancer when it is interrupted and must leave entries in
- * config.migrations so that ongoing migrations can be recovered later.
- */
- void keepDocumentOnDestruct();
-
-private:
- ScopedMigrationRequest(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& minKey);
-
- // Need an operation context with which to do a write in the destructor.
- OperationContext* _opCtx;
-
- // ns and minkey are needed to identify the migration document when it is removed from
- // config.migrations by the destructor.
- NamespaceString _nss;
- BSONObj _minKey;
-};
-
-} // namespace mongo
diff --git a/src/mongo/db/s/balancer/scoped_migration_request_test.cpp b/src/mongo/db/s/balancer/scoped_migration_request_test.cpp
deleted file mode 100644
index 414c2005db5..00000000000
--- a/src/mongo/db/s/balancer/scoped_migration_request_test.cpp
+++ /dev/null
@@ -1,246 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/s/balancer/scoped_migration_request.h"
-#include "mongo/db/s/balancer/type_migration.h"
-#include "mongo/db/s/config/config_server_test_fixture.h"
-#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/request_types/migration_secondary_throttle_options.h"
-
-namespace mongo {
-namespace {
-
-using unittest::assertGet;
-
-const std::string kNs = "TestDB.TestColl";
-const BSONObj kMin = BSON("a" << 10);
-const BSONObj kMax = BSON("a" << 20);
-const ShardId kFromShard("shard0000");
-const ShardId kToShard("shard0001");
-const ShardId kDifferentToShard("shard0002");
-
-class ScopedMigrationRequestTest : public ConfigServerTestFixture {
-public:
- /**
- * Queries config.migrations for the document pertaining to migrateInfo and asserts that the
- * number of documents returned equals "expectedNumberOfDocuments".
- */
- void checkMigrationsCollectionForDocument(const MigrateInfo& migrateInfo,
- unsigned long expectedNumberOfDocuments);
-
- /**
- * Makes a ScopedMigrationRequest and checks that the migration was written to
- * config.migrations. This exercises the ScopedMigrationRequest move and assignment
- * constructors.
- */
- ScopedMigrationRequest makeScopedMigrationRequest(const MigrateInfo& migrateInfo);
-
- MigrateInfo makeMigrateInfo();
-
-private:
- void setUp() override;
-};
-
-void ScopedMigrationRequestTest::setUp() {
- setUpAndInitializeConfigDb();
- setupShards({ShardType{"shard", "shard:12"}});
-}
-
-void ScopedMigrationRequestTest::checkMigrationsCollectionForDocument(
- const MigrateInfo& migrateInfo, unsigned long expectedNumberOfDocuments) {
- auto const query = BSON(MigrationType::ns(kNs) << MigrationType::min(migrateInfo.minKey));
- auto response = shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
- operationContext(),
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- repl::ReadConcernLevel::kMajorityReadConcern,
- MigrationType::ConfigNS,
- query,
- BSONObj(),
- boost::none);
- Shard::QueryResponse queryResponse = unittest::assertGet(response);
- std::vector<BSONObj> docs = queryResponse.docs;
- ASSERT_EQUALS(expectedNumberOfDocuments, docs.size());
-}
-
-ScopedMigrationRequest ScopedMigrationRequestTest::makeScopedMigrationRequest(
- const MigrateInfo& migrateInfo) {
- ScopedMigrationRequest scopedMigrationRequest =
- assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo, false));
-
- checkMigrationsCollectionForDocument(migrateInfo, 1);
-
- return scopedMigrationRequest;
-}
-
-MigrateInfo ScopedMigrationRequestTest::makeMigrateInfo() {
- const NamespaceString collNss{kNs};
- const auto collUuid = UUID::gen();
- const ChunkVersion kChunkVersion{1, 2, OID::gen(), Timestamp(1, 1)};
-
- BSONObjBuilder chunkBuilder;
- collUuid.appendToBuilder(&chunkBuilder, ChunkType::collectionUUID.name());
- chunkBuilder.append(ChunkType::min(), kMin);
- chunkBuilder.append(ChunkType::max(), kMax);
- kChunkVersion.appendLegacyWithField(&chunkBuilder, ChunkType::lastmod());
- chunkBuilder.append(ChunkType::shard(), kFromShard.toString());
-
- ChunkType chunkType = assertGet(ChunkType::parseFromConfigBSONCommand(chunkBuilder.obj()));
- ASSERT_OK(chunkType.validate());
-
- // Initialize the sharded TO collection
- setupCollection(collNss, KeyPattern(BSON("_id" << 1)), {chunkType});
-
- return MigrateInfo(kToShard,
- collNss,
- chunkType,
- MoveChunkRequest::ForceJumbo::kDoNotForce,
- MigrateInfo::chunksImbalance);
-}
-
-TEST_F(ScopedMigrationRequestTest, CreateScopedMigrationRequest) {
- MigrateInfo migrateInfo = makeMigrateInfo();
-
- {
- ScopedMigrationRequest scopedMigrationRequest = assertGet(
- ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo, false));
-
- checkMigrationsCollectionForDocument(migrateInfo, 1);
- }
-
- checkMigrationsCollectionForDocument(migrateInfo, 0);
-}
-
-/**
- * A document is created via scoped object, but document is not removed in destructor because
- * keepDocumentOnDestruct() is called beforehand. Then recreate the scoped object without writing to
- * the migraitons collection, and remove on destruct.
- *
- * Simulates (mostly) Balancer recovery.
- */
-TEST_F(ScopedMigrationRequestTest, CreateScopedMigrationRequestOnRecovery) {
- MigrateInfo migrateInfo = makeMigrateInfo();
-
- // Insert the document for the MigrationRequest and then prevent its removal in the destructor.
- {
- ScopedMigrationRequest scopedMigrationRequest = assertGet(
- ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo, false));
-
- checkMigrationsCollectionForDocument(migrateInfo, 1);
-
- scopedMigrationRequest.keepDocumentOnDestruct();
- }
-
- checkMigrationsCollectionForDocument(migrateInfo, 1);
-
- // Fail to write a migration document if a migration document already exists for that chunk but
- // with a different destination shard. (the migration request must have identical parameters).
- {
- MigrateInfo differentToShardMigrateInfo = migrateInfo;
- differentToShardMigrateInfo.to = kDifferentToShard;
-
- StatusWith<ScopedMigrationRequest> statusWithScopedMigrationRequest =
- ScopedMigrationRequest::writeMigration(
- operationContext(), differentToShardMigrateInfo, false);
-
- ASSERT_EQUALS(ErrorCodes::DuplicateKey, statusWithScopedMigrationRequest.getStatus());
-
- checkMigrationsCollectionForDocument(migrateInfo, 1);
- }
-
- // Create a new scoped object without inserting a document, and check that the destructor
- // still removes the document corresponding to the MigrationRequest.
- {
- ScopedMigrationRequest scopedMigrationRequest = ScopedMigrationRequest::createForRecovery(
- operationContext(), NamespaceString(kNs), migrateInfo.minKey);
-
- checkMigrationsCollectionForDocument(migrateInfo, 1);
- }
-
- checkMigrationsCollectionForDocument(migrateInfo, 0);
-}
-
-TEST_F(ScopedMigrationRequestTest, CreateMultipleScopedMigrationRequestsForIdenticalMigration) {
- MigrateInfo migrateInfo = makeMigrateInfo();
-
- {
- // Create a ScopedMigrationRequest, which will do the config.migrations write.
- ScopedMigrationRequest scopedMigrationRequest = assertGet(
- ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo, false));
-
- checkMigrationsCollectionForDocument(migrateInfo, 1);
-
- {
- // Should be able to create another Scoped object if the request is identical.
- ScopedMigrationRequest identicalScopedMigrationRequest = assertGet(
- ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo, false));
-
- checkMigrationsCollectionForDocument(migrateInfo, 1);
- }
-
- // If any scoped object goes out of scope, the migration should be over and the document
- // removed.
- checkMigrationsCollectionForDocument(migrateInfo, 0);
- }
-
- checkMigrationsCollectionForDocument(migrateInfo, 0);
-}
-
-TEST_F(ScopedMigrationRequestTest, TryToRemoveScopedMigrationRequestBeforeDestruct) {
- MigrateInfo migrateInfo = makeMigrateInfo();
-
- // Remove the migration document with tryToRemoveMigration().
- ScopedMigrationRequest scopedMigrationRequest =
- assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo, false));
-
- checkMigrationsCollectionForDocument(migrateInfo, 1);
-
- ASSERT_OK(scopedMigrationRequest.tryToRemoveMigration());
-
- checkMigrationsCollectionForDocument(migrateInfo, 0);
-}
-
-TEST_F(ScopedMigrationRequestTest, MoveAndAssignmentConstructors) {
- MigrateInfo migrateInfo = makeMigrateInfo();
-
- // Test that when the move and assignment constructors are used and the original variable goes
- // out of scope, the original object's destructor does not remove the migration document.
- {
- ScopedMigrationRequest anotherScopedMigrationRequest =
- makeScopedMigrationRequest(migrateInfo);
-
- checkMigrationsCollectionForDocument(migrateInfo, 1);
- }
-
- checkMigrationsCollectionForDocument(migrateInfo, 0);
-}
-
-} // namespace
-} // namespace mongo