diff options
author | Paolo Polato <paolo.polato@mongodb.com> | 2021-10-26 10:44:47 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-10-26 11:28:32 +0000 |
commit | 4a8881d3f61be8e57b55a33e076bace92b3e6aad (patch) | |
tree | 43d1e28811b14d3b450dbe1b7c5e20f4759a9674 /src | |
parent | 6675c4f7819ae73f6cdc6658bd3ea318382ef325 (diff) | |
download | mongo-4a8881d3f61be8e57b55a33e076bace92b3e6aad.tar.gz |
SERVER-60336 Replace the balancer's MigrationScheduler with a BalancerCommandsScheduler
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/SConscript | 21 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer.cpp | 151 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer.h | 14 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_commands_scheduler.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp | 216 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h | 104 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp | 96 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_dist_locks.cpp | 78 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_dist_locks.h | 66 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/migration_manager.cpp | 709 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/migration_manager.h | 286 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/migration_manager_test.cpp | 700 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/scoped_migration_request.cpp | 217 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/scoped_migration_request.h | 120 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/scoped_migration_request_test.cpp | 246 |
15 files changed, 482 insertions, 2545 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index b0e48daf3c2..a7ff9dc9c9f 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -202,6 +202,18 @@ env.Library( ) env.Library( + target='forwardable_operation_metadata', + source=[ + 'forwardable_operation_metadata.cpp', + 'forwardable_operation_metadata.idl' + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/s/grid', + ] +) + +env.Library( target='sharding_catalog_manager', source=[ 'add_shard_cmd.idl', @@ -212,12 +224,11 @@ env.Library( 'balancer/balancer_chunk_selection_policy.cpp', 'balancer/balancer_command_document.idl', 'balancer/balancer_commands_scheduler_impl.cpp', + 'balancer/balancer_dist_locks.cpp', 'balancer/balancer_policy.cpp', 'balancer/balancer.cpp', 'balancer/cluster_statistics_impl.cpp', 'balancer/cluster_statistics.cpp', - 'balancer/migration_manager.cpp', - 'balancer/scoped_migration_request.cpp', 'balancer/type_migration.cpp', 'config/initial_split_policy.cpp', 'config/sharding_catalog_manager_chunk_operations.cpp', @@ -263,6 +274,7 @@ env.Library( '$BUILD_DIR/mongo/s/coreshard', '$BUILD_DIR/mongo/s/query/cluster_aggregate', '$BUILD_DIR/mongo/util/log_and_backoff', + 'forwardable_operation_metadata', 'sharding_logging', ], ) @@ -308,8 +320,6 @@ env.Library( 'drop_database_coordinator_document.idl', 'flush_database_cache_updates_command.cpp', 'flush_routing_table_cache_updates_command.cpp', - 'forwardable_operation_metadata.cpp', - 'forwardable_operation_metadata.idl', 'get_database_version_command.cpp', 'get_shard_version_command.cpp', 'merge_chunks_command.cpp', @@ -374,6 +384,7 @@ env.Library( '$BUILD_DIR/mongo/s/commands/sharded_cluster_sharding_commands', '$BUILD_DIR/mongo/s/sharding_initialization', '$BUILD_DIR/mongo/s/sharding_router_api', + 'forwardable_operation_metadata', 'sharding_runtime_d', ], ) @@ -538,9 +549,7 @@ env.CppUnitTest( 'balancer/cluster_statistics_test.cpp', 'balancer/core_options_stub.cpp', 'balancer/balancer_commands_scheduler_test.cpp', - 'balancer/migration_manager_test.cpp', 'balancer/migration_test_fixture.cpp', - 'balancer/scoped_migration_request_test.cpp', 'balancer/type_migration_test.cpp', 'config_server_op_observer_test.cpp', 'config/initial_split_policy_test.cpp', diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index 6071f12be7a..cdd27339a84 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -151,14 +151,46 @@ void warnOnMultiVersion(const vector<ClusterStatistics::ShardStatistics>& cluste return; BSONObjBuilder shardVersions; - for (const auto& stat : clusterStats) + for (const auto& stat : clusterStats) { shardVersions << stat.shardId << stat.mongoVersion; + } + LOGV2_WARNING(21875, "Multiversion cluster detected", "localVersion"_attr = vii.version(), "shardVersions"_attr = shardVersions.done()); } +Status processManualMigrationOutcome(OperationContext* opCtx, + const BSONObj& chunkMin, + const NamespaceString& nss, + const ShardId& destination, + Status outcome) { + if (outcome.isOK()) { + return outcome; + } + + auto swCM = + Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss); + if (!swCM.isOK()) { + return swCM.getStatus(); + } + + const auto currentChunkInfo = + swCM.getValue().findIntersectingChunkWithSimpleCollation(chunkMin); + if (currentChunkInfo.getShardId() == destination && + outcome != ErrorCodes::BalancerInterrupted) { + // Migration calls can be interrupted after the metadata is committed but before the command + // finishes the waitForDelete stage. Any failovers, therefore, must always cause the + // moveChunk command to be retried so as to assure that the waitForDelete promise of a + // successful command has been fulfilled. + LOGV2(6036622, + "Migration outcome is not OK, but the transaction was committed. Returning success"); + outcome = Status::OK(); + } + return outcome; +} + const auto _balancerDecoration = ServiceContext::declareDecoration<Balancer>(); const ReplicaSetAwareServiceRegistry::Registerer<Balancer> _balancerRegisterer("Balancer"); @@ -180,9 +212,7 @@ Balancer::Balancer() _chunkSelectionPolicy( std::make_unique<BalancerChunkSelectionPolicyImpl>(_clusterStats.get(), _random)), _commandScheduler(std::make_unique<BalancerCommandsSchedulerImpl>()), - _chunkMerger(std::make_unique<BalancerChunkMergerImpl>(*_commandScheduler, *_clusterStats)), - _migrationManager(_balancerDecoration.owner(this)) {} - + _chunkMerger(std::make_unique<BalancerChunkMergerImpl>(*_commandScheduler, *_clusterStats)) {} Balancer::~Balancer() { // Terminate the balancer thread so it doesn't leak memory. @@ -191,9 +221,6 @@ Balancer::~Balancer() { if (_chunkMerger) { _chunkMerger->waitForStop(); } - if (_commandScheduler) { - _commandScheduler->stop(); - } } void Balancer::onStepUpBegin(OperationContext* opCtx, long long term) { @@ -212,9 +239,6 @@ void Balancer::onStepDown() { if (_chunkMerger) { _chunkMerger->onStepDown(); } - if (_commandScheduler) { - _commandScheduler->stop(); - } } void Balancer::onBecomeArbiter() { @@ -228,8 +252,6 @@ void Balancer::initiateBalancer(OperationContext* opCtx) { invariant(_state == kStopped); _state = kRunning; - _migrationManager.startRecoveryAndAcquireDistLocks(opCtx); - invariant(!_thread.joinable()); invariant(!_threadOperationContext); _thread = stdx::thread([this] { _mainThread(); }); @@ -250,12 +272,6 @@ void Balancer::interruptBalancer() { _threadOperationContext->markKilled(ErrorCodes::InterruptedDueToReplStateChange); } - // Schedule a separate thread to shutdown the migration manager in order to avoid deadlock with - // replication step down - invariant(!_migrationManagerInterruptThread.joinable()); - _migrationManagerInterruptThread = - stdx::thread([this] { _migrationManager.interruptAndDisableMigrations(); }); - _condVar.notify_all(); } @@ -301,11 +317,14 @@ Status Balancer::rebalanceSingleChunk(OperationContext* opCtx, return refreshStatus; } - return _migrationManager.executeManualMigration(opCtx, - *migrateInfo, - balancerConfig->getMaxChunkSizeBytes(), - balancerConfig->getSecondaryThrottle(), - balancerConfig->waitForDelete()); + MoveChunkSettings settings(balancerConfig->getMaxChunkSizeBytes(), + balancerConfig->getSecondaryThrottle(), + balancerConfig->waitForDelete(), + migrateInfo->forceJumbo); + auto response = _commandScheduler->requestMoveChunk( + opCtx, nss, chunk, migrateInfo->to, settings, true /* issuedByRemoteUser */); + return processManualMigrationOutcome( + opCtx, chunk.getMin(), nss, migrateInfo->to, response->getOutcome()); } Status Balancer::moveSingleChunk(OperationContext* opCtx, @@ -321,17 +340,15 @@ Status Balancer::moveSingleChunk(OperationContext* opCtx, return moveAllowedStatus; } - return _migrationManager.executeManualMigration( - opCtx, - MigrateInfo(newShardId, - nss, - chunk, - forceJumbo ? MoveChunkRequest::ForceJumbo::kForceManual - : MoveChunkRequest::ForceJumbo::kDoNotForce, - MigrateInfo::chunksImbalance), - maxChunkSizeBytes, - secondaryThrottle, - waitForDelete); + MoveChunkSettings settings(maxChunkSizeBytes, + secondaryThrottle, + waitForDelete, + forceJumbo ? MoveChunkRequest::ForceJumbo::kForceManual + : MoveChunkRequest::ForceJumbo::kDoNotForce); + auto response = _commandScheduler->requestMoveChunk( + opCtx, nss, chunk, newShardId, settings, true /* issuedByRemoteUser */); + return processManualMigrationOutcome( + opCtx, chunk.getMin(), nss, newShardId, response->getOutcome()); } void Balancer::report(OperationContext* opCtx, BSONObjBuilder* builder) { @@ -388,13 +405,11 @@ void Balancer::_mainThread() { break; } - LOGV2(21857, "CSRS balancer thread is recovering"); + LOGV2(6036605, "Starting command scheduler"); - _migrationManager.finishRecovery(opCtx.get(), - balancerConfig->getMaxChunkSizeBytes(), - balancerConfig->getSecondaryThrottle()); + _commandScheduler->start(opCtx.get()); - LOGV2(21858, "CSRS balancer thread is recovered"); + LOGV2(6036606, "Balancer worker thread initialised. Entering main loop."); // Main balancer loop while (!_stopRequested()) { @@ -507,17 +522,13 @@ void Balancer::_mainThread() { { stdx::lock_guard<Latch> scopedLock(_mutex); invariant(_state == kStopping); - invariant(_migrationManagerInterruptThread.joinable()); } + // TODO(SERVER-60459) ensure that the merger gets consistently stopped with the scheduler. _commandScheduler->stop(); - _migrationManagerInterruptThread.join(); - _migrationManager.drainActiveMigrations(); - { stdx::lock_guard<Latch> scopedLock(_mutex); - _migrationManagerInterruptThread = {}; _threadOperationContext = nullptr; } @@ -693,30 +704,36 @@ int Balancer::_moveChunks(OperationContext* opCtx, return 0; } - auto migrationStatuses = - _migrationManager.executeMigrationsForAutoBalance(opCtx, - candidateChunks, - balancerConfig->getMaxChunkSizeBytes(), - balancerConfig->getSecondaryThrottle(), - balancerConfig->waitForDelete()); + std::vector<std::pair<size_t, std::unique_ptr<MoveChunkResponse>>> migrateInfosAndResponses; + migrateInfosAndResponses.reserve(candidateChunks.size()); + for (size_t migrateInfoIndex = 0; migrateInfoIndex < candidateChunks.size(); + ++migrateInfoIndex) { + const auto& migrateInfo = candidateChunks[migrateInfoIndex]; + + ChunkType chunk; + chunk.setMin(migrateInfo.minKey); + chunk.setMax(migrateInfo.maxKey); + chunk.setShard(migrateInfo.from); + chunk.setVersion(migrateInfo.version); + + MoveChunkSettings settings(balancerConfig->getMaxChunkSizeBytes(), + balancerConfig->getSecondaryThrottle(), + balancerConfig->waitForDelete(), + migrateInfo.forceJumbo); + auto response = _commandScheduler->requestMoveChunk( + opCtx, migrateInfo.nss, chunk, migrateInfo.to, settings); + migrateInfosAndResponses.emplace_back( + std::make_pair(migrateInfoIndex, std::move(response))); + } int numChunksProcessed = 0; - - for (const auto& migrationStatusEntry : migrationStatuses) { - const Status& status = migrationStatusEntry.second; + for (const auto& migrateInfoAndResponse : migrateInfosAndResponses) { + const Status status = migrateInfoAndResponse.second->getOutcome(); if (status.isOK()) { numChunksProcessed++; continue; } - - const MigrationIdentifier& migrationId = migrationStatusEntry.first; - - const auto requestIt = std::find_if(candidateChunks.begin(), - candidateChunks.end(), - [&migrationId](const MigrateInfo& migrateInfo) { - return migrateInfo.getName() == migrationId; - }); - invariant(requestIt != candidateChunks.end()); + const auto& migrateInfo = candidateChunks[migrateInfoAndResponse.first]; // ChunkTooBig is returned by the source shard during the cloning phase if the migration // manager finds that the chunk is larger than some calculated size, the source shard is @@ -730,21 +747,21 @@ int Balancer::_moveChunks(OperationContext* opCtx, LOGV2(21871, "Migration {migrateInfo} failed with {error}, going to try splitting the chunk", "Migration failed, going to try splitting the chunk", - "migrateInfo"_attr = redact(requestIt->toString()), + "migrateInfo"_attr = redact(migrateInfo.toString()), "error"_attr = redact(status)); const CollectionType collection = catalogClient->getCollection( - opCtx, requestIt->uuid, repl::ReadConcernLevel::kLocalReadConcern); + opCtx, migrateInfo.uuid, repl::ReadConcernLevel::kLocalReadConcern); ShardingCatalogManager::get(opCtx)->splitOrMarkJumbo( - opCtx, collection.getNss(), requestIt->minKey); + opCtx, collection.getNss(), migrateInfo.minKey); continue; } LOGV2(21872, "Migration {migrateInfo} failed with {error}", "Migration failed", - "migrateInfo"_attr = redact(requestIt->toString()), + "migrateInfo"_attr = redact(migrateInfo.toString()), "error"_attr = redact(status)); } @@ -774,8 +791,6 @@ void Balancer::_mergeChunksIfNeeded(OperationContext* opCtx) { LOGV2(8423320, "Balancer will perform chunk merges"); - _commandScheduler->start(opCtx); - auto* manager = ShardingCatalogManager::get(opCtx); for (CollectionType const& coll : collections) { auto progress = _chunkMerger->mergeChunksOnShards(opCtx, coll); diff --git a/src/mongo/db/s/balancer/balancer.h b/src/mongo/db/s/balancer/balancer.h index 7433b2ca495..7d1c8069b72 100644 --- a/src/mongo/db/s/balancer/balancer.h +++ b/src/mongo/db/s/balancer/balancer.h @@ -32,7 +32,6 @@ #include "mongo/db/repl/replica_set_aware_service.h" #include "mongo/db/s/balancer/balancer_chunk_selection_policy.h" #include "mongo/db/s/balancer/balancer_random.h" -#include "mongo/db/s/balancer/migration_manager.h" #include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/thread.h" @@ -290,16 +289,6 @@ private: // thread. OperationContext* _threadOperationContext{nullptr}; - // This thread is only available in the kStopping state and is necessary for the migration - // manager shutdown to not deadlock with replica set step down. In particular, the migration - // manager's order of lock acquisition is mutex, then collection lock, whereas stepdown first - // acquires the global S lock and then acquires the migration manager's mutex. - // - // The interrupt thread is scheduled when the balancer enters the kStopping state (which is at - // step down) and is joined outside of lock, when the replica set leaves draining mode, outside - // of the global X lock. - stdx::thread _migrationManagerInterruptThread; - // Indicates whether the balancer is currently executing a balancer round bool _inBalancerRound{false}; @@ -330,9 +319,6 @@ private: std::unique_ptr<BalancerCommandsScheduler> _commandScheduler; std::unique_ptr<BalancerChunkMerger> _chunkMerger; - - // Migration manager used to schedule and manage migrations - MigrationManager _migrationManager; }; } // namespace mongo diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler.h b/src/mongo/db/s/balancer/balancer_commands_scheduler.h index 2083972413b..7766950b9d2 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler.h +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler.h @@ -145,7 +145,8 @@ public: const NamespaceString& nss, const ChunkType& chunk, const ShardId& recipient, - const MoveChunkSettings& commandSettings) = 0; + const MoveChunkSettings& commandSettings, + bool issuedByRemoteUser = false) = 0; virtual std::unique_ptr<MergeChunksResponse> requestMergeChunks( OperationContext* opCtx, diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp index 734c6ba8f04..1f923f4edbc 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp @@ -43,6 +43,7 @@ namespace mongo { MONGO_FAIL_POINT_DEFINE(pauseBalancerWorkerThread); +MONGO_FAIL_POINT_DEFINE(deferredCleanupCompletedCheckpoint); const std::string MergeChunksCommandInfo::kCommandName = "mergeChunks"; const std::string MergeChunksCommandInfo::kBounds = "bounds"; @@ -87,8 +88,6 @@ void BalancerCommandsSchedulerImpl::start(OperationContext* opCtx) { } invariant(!_workerThreadHandle.joinable()); - _incompleteRequests.reserve(_maxRunningRequests * 10); - _runningRequestIds.reserve(_maxRunningRequests); auto requestsToRecover = _loadRequestsToRecover(opCtx); _state = requestsToRecover.empty() ? SchedulerState::Running : SchedulerState::Recovering; @@ -120,7 +119,11 @@ std::unique_ptr<MoveChunkResponse> BalancerCommandsSchedulerImpl::requestMoveChu const NamespaceString& nss, const ChunkType& chunk, const ShardId& recipient, - const MoveChunkSettings& commandSettings) { + const MoveChunkSettings& commandSettings, + bool issuedByRemoteUser) { + + auto externalClientInfo = + issuedByRemoteUser ? boost::optional<ExternalClientInfo>(opCtx) : boost::none; auto commandInfo = std::make_shared<MoveChunkCommandInfo>(nss, chunk.getShard(), @@ -131,7 +134,8 @@ std::unique_ptr<MoveChunkResponse> BalancerCommandsSchedulerImpl::requestMoveChu commandSettings.secondaryThrottle, commandSettings.waitForDelete, commandSettings.forceJumbo, - chunk.getVersion()); + chunk.getVersion(), + std::move(externalClientInfo)); auto requestCollectionInfo = _buildAndEnqueueNewRequest(opCtx, std::move(commandInfo)); return std::make_unique<MoveChunkResponseImpl>(std::move(requestCollectionInfo)); @@ -215,10 +219,9 @@ ResponseHandle BalancerCommandsSchedulerImpl::_buildAndEnqueueNewRequest( OperationContext* opCtx, std::shared_ptr<CommandInfo>&& commandInfo) { const auto newRequestId = UUID::gen(); LOGV2(5847202, - "Enqueuing new Balancer command request with id {reqId}. Details: {command}", "Enqueuing new Balancer command request", "reqId"_attr = newRequestId, - "command"_attr = commandInfo->serialise().toString(), + "command"_attr = redact(commandInfo->serialise().toString()), "recoveryDocRequired"_attr = commandInfo->requiresRecoveryOnCrash()); if (commandInfo->requiresRecoveryOnCrash()) { @@ -238,7 +241,6 @@ ResponseHandle BalancerCommandsSchedulerImpl::_buildAndEnqueueNewRequest( if (auto writeStatus = getStatusFromWriteCommandReply(reply); !writeStatus.isOK()) { LOGV2(5847210, - "Failed to persist command document for reqId {reqId}. Error status: {status}", "Failed to persist request command document", "reqId"_attr = newRequestId, "status"_attr = writeStatus); @@ -263,61 +265,23 @@ ResponseHandle BalancerCommandsSchedulerImpl::_enqueueRequest(WithLock, RequestD _stateUpdatedCV.notify_all(); } else { deferredResponseHandle.handle->set(Status( - ErrorCodes::CallbackCanceled, "Request rejected - balancer scheduler is stopped")); + ErrorCodes::BalancerInterrupted, "Request rejected - balancer scheduler is stopped")); } return deferredResponseHandle; } bool BalancerCommandsSchedulerImpl::_canSubmitNewRequests(WithLock) { - return (!_pendingRequestIds.empty() && _runningRequestIds.size() < _maxRunningRequests && - MONGO_likely(!pauseBalancerWorkerThread.shouldFail())); + return (!_pendingRequestIds.empty() && MONGO_likely(!pauseBalancerWorkerThread.shouldFail())); } -Status BalancerCommandsSchedulerImpl::_acquireDistLock(OperationContext* opCtx, - NamespaceString nss) { - auto it = _migrationLocks.find(nss); - if (it != _migrationLocks.end()) { - ++it->second.numMigrations; - return Status::OK(); - } else { - boost::optional<DistLockManager::ScopedLock> scopedLock; - try { - scopedLock.emplace(DistLockManager::get(opCtx)->lockDirectLocally( - opCtx, nss.ns(), DistLockManager::kSingleLockAttemptTimeout)); - - const std::string whyMessage(str::stream() - << "Migrating chunk(s) in collection " << nss.ns()); - uassertStatusOK(DistLockManager::get(opCtx)->lockDirect( - opCtx, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout)); - } catch (const DBException& ex) { - return ex.toStatus(str::stream() << "Could not acquire collection lock for " << nss.ns() - << " to migrate chunks"); - } - Migrations migrationData(std::move(*scopedLock)); - _migrationLocks.insert(std::make_pair(nss, std::move(migrationData))); - } - return Status::OK(); -} - -void BalancerCommandsSchedulerImpl::_releaseDistLock(OperationContext* opCtx, NamespaceString nss) { - auto it = _migrationLocks.find(nss); - if (it == _migrationLocks.end()) { - return; - } else if (it->second.numMigrations == 1) { - DistLockManager::get(opCtx)->unlock(opCtx, nss.ns()); - _migrationLocks.erase(it); - } else { - --it->second.numMigrations; - } +bool BalancerCommandsSchedulerImpl::_deferredCleanupRequired(WithLock) { + return (!_obsoleteRecoveryDocumentIds.empty() || !_lockedReferencesToRelease.empty()); } CommandSubmissionResult BalancerCommandsSchedulerImpl::_submit( OperationContext* opCtx, const CommandSubmissionHandle& handle) { - LOGV2(5847203, - "Balancer command request id {reqId} submitted for execution", - "Balancer command request submitted for execution", - "reqId"_attr = handle.id); + LOGV2(5847203, "Balancer command request submitted for execution", "reqId"_attr = handle.id); auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); const auto shardWithStatus = @@ -338,14 +302,14 @@ CommandSubmissionResult BalancerCommandsSchedulerImpl::_submit( opCtx); auto onRemoteResponseReceived = - [this, opCtx, requestId = handle.id]( - const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { - _applyCommandResponse(opCtx, requestId, args.response); + [this, + requestId = handle.id](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { + _applyCommandResponse(requestId, args.response); }; if (handle.commandInfo->requiresDistributedLock()) { Status lockAcquisitionResponse = - _acquireDistLock(opCtx, handle.commandInfo->getNameSpace()); + _distributedLocks.acquireFor(opCtx, handle.commandInfo->getNameSpace()); if (!lockAcquisitionResponse.isOK()) { return CommandSubmissionResult(handle.id, false, lockAcquisitionResponse); } @@ -360,12 +324,11 @@ CommandSubmissionResult BalancerCommandsSchedulerImpl::_submit( } void BalancerCommandsSchedulerImpl::_applySubmissionResult( - WithLock, OperationContext* opCtx, CommandSubmissionResult&& submissionResult) { + WithLock, CommandSubmissionResult&& submissionResult) { auto requestToUpdateIt = _incompleteRequests.find(submissionResult.id); if (requestToUpdateIt == _incompleteRequests.end()) { LOGV2(5847209, - "Skipping _applySubmissionResult: reqId {reqId} already completed/canceled", - "Skipping _applySubmissionResult: reqId already completed/canceled", + "Skipping _applySubmissionResult: request already completed/canceled", "reqId"_attr = submissionResult.id); return; } @@ -385,7 +348,7 @@ void BalancerCommandsSchedulerImpl::_applySubmissionResult( } else { const auto& submittedCommandInfo = requestToUpdate.getCommandInfo(); if (submissionResult.acquiredDistLock) { - _releaseDistLock(opCtx, submittedCommandInfo.getNameSpace()); + _lockedReferencesToRelease.emplace_back(submittedCommandInfo.getNameSpace()); } if (submittedCommandInfo.requiresRecoveryOnCrash()) { _obsoleteRecoveryDocumentIds.push_back(submissionResult.id); @@ -396,9 +359,7 @@ void BalancerCommandsSchedulerImpl::_applySubmissionResult( } void BalancerCommandsSchedulerImpl::_applyCommandResponse( - OperationContext* opCtx, - UUID requestId, - const executor::TaskExecutor::ResponseStatus& response) { + UUID requestId, const executor::TaskExecutor::ResponseStatus& response) { { stdx::lock_guard<Latch> lg(_mutex); auto requestToCompleteIt = _incompleteRequests.find(requestId); @@ -411,7 +372,7 @@ void BalancerCommandsSchedulerImpl::_applyCommandResponse( requestToComplete.setOutcome(response); auto& commandInfo = requestToComplete.getCommandInfo(); if (commandInfo.requiresDistributedLock()) { - _releaseDistLock(opCtx, commandInfo.getNameSpace()); + _lockedReferencesToRelease.emplace_back(commandInfo.getNameSpace()); } if (commandInfo.requiresRecoveryOnCrash()) { _obsoleteRecoveryDocumentIds.push_back(requestId); @@ -425,7 +386,6 @@ void BalancerCommandsSchedulerImpl::_applyCommandResponse( _stateUpdatedCV.notify_all(); } LOGV2(5847204, - "Execution of balancer command request id {reqId} completed", "Execution of balancer command request completed", "reqId"_attr = requestId, "response"_attr = response.toString()); @@ -434,61 +394,78 @@ void BalancerCommandsSchedulerImpl::_applyCommandResponse( std::vector<RequestData> BalancerCommandsSchedulerImpl::_loadRequestsToRecover( OperationContext* opCtx) { std::vector<RequestData> requestsToRecover; - auto documentProcessor = [&requestsToRecover](const BSONObj& commandToRecoverDoc) { + auto documentProcessor = [&requestsToRecover, opCtx](const BSONObj& commandToRecoverDoc) { auto originalCommand = PersistedBalancerCommand::parse( IDLParserErrorContext("BalancerCommandsScheduler"), commandToRecoverDoc); auto recoveryCommand = std::make_shared<RecoveryCommandInfo>(originalCommand); LOGV2(5847212, - "Recovered request id {reqId}, which command will be rescheduled. Details: " - "{command}", "Command request recovered and set for rescheduling", "reqId"_attr = originalCommand.getRequestId(), - "command"_attr = recoveryCommand->serialise()); + "command"_attr = redact(recoveryCommand->serialise())); requestsToRecover.emplace_back(originalCommand.getRequestId(), std::move(recoveryCommand)); }; DBDirectClient dbClient(opCtx); - dbClient.query(documentProcessor, NamespaceString::kConfigBalancerCommandsNamespace, BSONObj()); + try { + dbClient.query( + documentProcessor, NamespaceString::kConfigBalancerCommandsNamespace, BSONObj()); + } catch (const DBException& e) { + LOGV2(5847225, "Failed to load requests to recover", "error"_attr = redact(e)); + } + return requestsToRecover; } -void BalancerCommandsSchedulerImpl::_cleanUpObsoleteRecoveryInfo(WithLock, - OperationContext* opCtx) { - if (_obsoleteRecoveryDocumentIds.empty()) { - return; - } - BSONObjBuilder queryBuilder; - if (_obsoleteRecoveryDocumentIds.size() == 1) { - _obsoleteRecoveryDocumentIds[0].appendToBuilder( - &queryBuilder, PersistedBalancerCommand::kRequestIdFieldName); - } else { - BSONObjBuilder requestIdClause( - queryBuilder.subobjStart(PersistedBalancerCommand::kRequestIdFieldName)); - BSONArrayBuilder valuesBuilder(requestIdClause.subarrayStart("$in")); - for (const auto& requestId : _obsoleteRecoveryDocumentIds) { - requestId.appendToArrayBuilder(&valuesBuilder); +void BalancerCommandsSchedulerImpl::_performDeferredCleanup(WithLock, OperationContext* opCtx) { + auto recoveryDocsDeleted = [this, opCtx] { + if (_obsoleteRecoveryDocumentIds.empty()) { + return false; + } + BSONObjBuilder queryBuilder; + if (_obsoleteRecoveryDocumentIds.size() == 1) { + _obsoleteRecoveryDocumentIds[0].appendToBuilder( + &queryBuilder, PersistedBalancerCommand::kRequestIdFieldName); + } else { + BSONObjBuilder requestIdClause( + queryBuilder.subobjStart(PersistedBalancerCommand::kRequestIdFieldName)); + BSONArrayBuilder valuesBuilder(requestIdClause.subarrayStart("$in")); + for (const auto& requestId : _obsoleteRecoveryDocumentIds) { + requestId.appendToArrayBuilder(&valuesBuilder); + } } - } - _obsoleteRecoveryDocumentIds.clear(); + _obsoleteRecoveryDocumentIds.clear(); - auto query = queryBuilder.obj(); - DBDirectClient dbClient(opCtx); + auto query = queryBuilder.obj(); + DBDirectClient dbClient(opCtx); - auto reply = dbClient.removeAcknowledged( - NamespaceString::kConfigBalancerCommandsNamespace.toString(), query); + auto reply = dbClient.removeAcknowledged( + NamespaceString::kConfigBalancerCommandsNamespace.toString(), query); - LOGV2(5847211, - "Clean up of obsolete document info performed with outcome {reply}", - "Clean up of obsolete document info performed", - "query"_attr = query, - "reply"_attr = reply); -} + LOGV2(5847211, + "Clean up of obsolete document info performed", + "query"_attr = query, + "reply"_attr = reply); + return true; + }(); + auto distributedLocksReleased = [this, opCtx] { + if (_lockedReferencesToRelease.empty()) { + return false; + } + for (const auto& nss : _lockedReferencesToRelease) { + _distributedLocks.releaseFor(opCtx, nss); + } + _lockedReferencesToRelease.clear(); + return true; + }(); + + if (recoveryDocsDeleted || distributedLocksReleased) { + deferredCleanupCompletedCheckpoint.pauseWhileSet(); + } +} void BalancerCommandsSchedulerImpl::_workerThread() { ON_BLOCK_EXIT([this] { - invariant(_migrationLocks.empty(), - "BalancerCommandsScheduler worker thread failed to release all locks on exit"); LOGV2(5847208, "Leaving balancer command scheduler thread"); stdx::lock_guard<Latch> lg(_mutex); _state = SchedulerState::Stopped; @@ -496,7 +473,6 @@ void BalancerCommandsSchedulerImpl::_workerThread() { }); Client::initThread("BalancerCommandsScheduler"); - auto opCtxHolder = cc().makeOperationContext(); stdx::unordered_map<UUID, RequestData, UUID::Hash> requestsToCleanUpOnExit; LOGV2(5847205, "Balancer scheduler thread started"); @@ -507,10 +483,14 @@ void BalancerCommandsSchedulerImpl::_workerThread() { invariant(_state != SchedulerState::Stopped); _stateUpdatedCV.wait(ul, [this, &ul] { return (_state == SchedulerState::Stopping || _canSubmitNewRequests(ul) || - !_obsoleteRecoveryDocumentIds.empty()); + _deferredCleanupRequired(ul)); }); - _cleanUpObsoleteRecoveryInfo(ul, opCtxHolder.get()); + // Completed commands defer the clean up of acquired resources + { + auto opCtxHolder = cc().makeOperationContext(); + _performDeferredCleanup(ul, opCtxHolder.get()); + } if (_state == SchedulerState::Stopping) { _runningRequestIds.clear(); @@ -523,51 +503,61 @@ void BalancerCommandsSchedulerImpl::_workerThread() { continue; } - // 1. Pick up new requests to be submitted from the pending list, if possible. - const auto availableSubmissionSlots = - static_cast<size_t>(_maxRunningRequests - _runningRequestIds.size()); - while (!_pendingRequestIds.empty() && - commandsToSubmit.size() < availableSubmissionSlots) { - const auto& requestData = _incompleteRequests.at(_pendingRequestIds.front()); + // 1. Pick up new requests to be submitted from the pending list + for (auto pendingRequestId : _pendingRequestIds) { + const auto& requestData = _incompleteRequests.at(pendingRequestId); commandsToSubmit.push_back(requestData.getSubmissionInfo()); - _pendingRequestIds.pop_front(); } + _pendingRequestIds.clear(); } // 2. Serve the picked up requests, submitting their related commands. std::vector<CommandSubmissionResult> submissionResults; for (auto& submissionInfo : commandsToSubmit) { + auto opCtxHolder = cc().makeOperationContext(); + if (submissionInfo.commandInfo) { + submissionInfo.commandInfo.get()->attachOperationMetadataTo(opCtxHolder.get()); + } submissionResults.push_back(_submit(opCtxHolder.get(), submissionInfo)); if (!submissionResults.back().context.isOK()) { LOGV2(5847206, - "Submission for scheduler command request {reqId} failed: cause {cause}", - "Submission for scheduler command request {reqId} failed", + "Submission for scheduler command request failed", "reqId"_attr = submissionResults.back().id, "cause"_attr = submissionResults.back().context.getStatus()); } } // 3. Process the outcome of each submission. + int numRunningRequests = 0; + int numPendingRequests = 0; { stdx::lock_guard<Latch> lg(_mutex); for (auto& submissionResult : submissionResults) { - _applySubmissionResult(lg, opCtxHolder.get(), std::move(submissionResult)); + _applySubmissionResult(lg, std::move(submissionResult)); } + numRunningRequests = _runningRequestIds.size(); + numPendingRequests = _pendingRequestIds.size(); } - LOGV2(5847207, "Ending balancer command scheduler round"); + LOGV2_DEBUG(5847207, + 1, + "Ending balancer command scheduler round", + "numRunningRequests"_attr = numRunningRequests, + "numPendingRequests"_attr = numPendingRequests); } // In case of clean exit, cancel all the pending/running command requests // (but keep the related descriptor documents to ensure they will be reissued on recovery). + auto opCtxHolder = cc().makeOperationContext(); auto executor = Grid::get(opCtxHolder.get())->getExecutorPool()->getFixedExecutor(); for (auto& idAndRequest : requestsToCleanUpOnExit) { idAndRequest.second.setOutcome(Status( - ErrorCodes::CallbackCanceled, "Request cancelled - balancer scheduler is stopping")); + ErrorCodes::BalancerInterrupted, "Request cancelled - balancer scheduler is stopping")); const auto& cancelHandle = idAndRequest.second.getExecutionContext(); if (cancelHandle) { executor->cancel(*cancelHandle); } - _releaseDistLock(opCtxHolder.get(), idAndRequest.second.getCommandInfo().getNameSpace()); + _distributedLocks.releaseFor(opCtxHolder.get(), + idAndRequest.second.getCommandInfo().getNameSpace()); } } diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h index 62c8f4b2b82..79fc7aac793 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.h @@ -29,15 +29,14 @@ #pragma once -#include <list> -#include <unordered_map> - #include "mongo/db/s/balancer/balancer_command_document_gen.h" #include "mongo/db/s/balancer/balancer_commands_scheduler.h" -#include "mongo/db/s/dist_lock_manager.h" +#include "mongo/db/s/balancer/balancer_dist_locks.h" +#include "mongo/db/s/forwardable_operation_metadata.h" #include "mongo/db/service_context.h" #include "mongo/platform/mutex.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/client/shard.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/thread.h" #include "mongo/util/concurrency/notification.h" @@ -91,7 +90,12 @@ public: if (!response.status.isOK()) { return response.status; } - return getStatusFromCommandResult(response.data); + auto remoteStatus = getStatusFromCommandResult(response.data); + return Shard::shouldErrorBePropagated(remoteStatus.code()) + ? remoteStatus + : Status(ErrorCodes::OperationFailed, + str::stream() << "Command request" << getRequestId().toString() + << "failed on source shard." << causedBy(remoteStatus)); } executor::RemoteCommandResponse getRemoteResponse() { @@ -244,6 +248,18 @@ public: } }; +/** + * Utility class to extract and hold information describing the remote client that submitted a + * command. + */ +struct ExternalClientInfo { + ExternalClientInfo(OperationContext* opCtx) + : operationMetadata(opCtx), apiParameters(APIParameters::get(opCtx)) {} + + const ForwardableOperationMetadata operationMetadata; + const APIParameters apiParameters; +}; + /** * Base class describing the common traits of a shard command associated to a Request @@ -251,8 +267,10 @@ public: */ class CommandInfo { public: - CommandInfo(const ShardId& targetShardId, const NamespaceString& nss) - : _targetShardId(targetShardId), _nss(nss) {} + CommandInfo(const ShardId& targetShardId, + const NamespaceString& nss, + boost::optional<ExternalClientInfo>&& clientInfo) + : _targetShardId(targetShardId), _nss(nss), _clientInfo(clientInfo) {} virtual ~CommandInfo() {} @@ -274,9 +292,22 @@ public: return _nss; } + void attachOperationMetadataTo(OperationContext* opCtx) { + if (_clientInfo) { + _clientInfo.get().operationMetadata.setOn(opCtx); + } + } + + void appendCommandMetadataTo(BSONObjBuilder* commandBuilder) const { + if (_clientInfo && _clientInfo.get().apiParameters.getParamsPassed()) { + _clientInfo.get().apiParameters.appendInfo(commandBuilder); + } + } + private: ShardId _targetShardId; NamespaceString _nss; + boost::optional<ExternalClientInfo> _clientInfo; }; /** @@ -293,8 +324,9 @@ public: const MigrationSecondaryThrottleOptions& secondaryThrottle, bool waitForDelete, MoveChunkRequest::ForceJumbo forceJumbo, - const ChunkVersion& version) - : CommandInfo(origin, nss), + const ChunkVersion& version, + boost::optional<ExternalClientInfo>&& clientInfo) + : CommandInfo(origin, nss, std::move(clientInfo)), _chunkBoundaries(lowerBoundKey, upperBoundKey), _recipient(recipient), _version(version), @@ -315,6 +347,7 @@ public: _secondaryThrottle, _waitForDelete, _forceJumbo); + appendCommandMetadataTo(&commandBuilder); return commandBuilder.obj(); } @@ -343,7 +376,7 @@ public: const BSONObj& lowerBoundKey, const BSONObj& upperBoundKey, const ChunkVersion& version) - : CommandInfo(shardId, nss), + : CommandInfo(shardId, nss, boost::none), _lowerBoundKey(lowerBoundKey), _upperBoundKey(upperBoundKey), _version(version) {} @@ -386,7 +419,7 @@ public: boost::optional<long long> maxChunkObjects, boost::optional<long long> maxChunkSizeBytes, bool force) - : CommandInfo(shardId, nss), + : CommandInfo(shardId, nss, boost::none), _shardKeyPattern(shardKeyPattern), _lowerBoundKey(lowerBoundKey), _upperBoundKey(upperBoundKey), @@ -443,7 +476,7 @@ public: const BSONObj& upperBoundKey, bool estimatedValue, const ChunkVersion& version) - : CommandInfo(shardId, nss), + : CommandInfo(shardId, nss, boost::none), _shardKeyPattern(shardKeyPattern), _lowerBoundKey(lowerBoundKey), _upperBoundKey(upperBoundKey), @@ -487,7 +520,7 @@ public: const BSONObj& upperBoundKey, const ChunkVersion& version, const std::vector<BSONObj>& splitPoints) - : CommandInfo(shardId, nss), + : CommandInfo(shardId, nss, boost::none), _shardKeyPattern(shardKeyPattern), _lowerBoundKey(lowerBoundKey), _upperBoundKey(upperBoundKey), @@ -525,7 +558,7 @@ private: class RecoveryCommandInfo : public CommandInfo { public: RecoveryCommandInfo(const PersistedBalancerCommand& persistedCommand) - : CommandInfo(persistedCommand.getTarget(), persistedCommand.getNss()), + : CommandInfo(persistedCommand.getTarget(), persistedCommand.getNss(), boost::none), _serialisedCommand(persistedCommand.getRemoteCommand()), _requiresDistributedLock(persistedCommand.getRequiresDistributedLock()) {} @@ -657,12 +690,12 @@ public: void stop() override; - std::unique_ptr<MoveChunkResponse> requestMoveChunk( - OperationContext* opCtx, - const NamespaceString& nss, - const ChunkType& chunk, - const ShardId& destination, - const MoveChunkSettings& commandSettings) override; + std::unique_ptr<MoveChunkResponse> requestMoveChunk(OperationContext* opCtx, + const NamespaceString& nss, + const ChunkType& chunk, + const ShardId& destination, + const MoveChunkSettings& commandSettings, + bool issuedByRemoteUser) override; std::unique_ptr<MergeChunksResponse> requestMergeChunks(OperationContext* opCtx, const NamespaceString& nss, @@ -695,9 +728,10 @@ public: private: enum class SchedulerState { Recovering, Running, Stopping, Stopped }; - static const int32_t _maxRunningRequests{10}; - + // Protects the in-memory state of the Scheduler Mutex _mutex = MONGO_MAKE_LATCH("BalancerCommandsSchedulerImpl::_mutex"); + + // Ensures that concurrent calls to start() and stop() get serialised Mutex _startStopMutex = MONGO_MAKE_LATCH("BalancerCommandsSchedulerImpl::_startStopMutex"); SchedulerState _state{SchedulerState::Stopped}; @@ -724,16 +758,13 @@ private: std::vector<UUID> _obsoleteRecoveryDocumentIds; + std::vector<NamespaceString> _lockedReferencesToRelease; + /** - * State to acquire and release DistLocks on a per namespace basis + * Centralised accessor for all the distributed locks required by the Scheduler. + * Only _workerThread() is supposed to interact with this class. */ - struct Migrations { - Migrations(DistLockManager::ScopedLock lock) : lock(std::move(lock)), numMigrations(1) {} - - DistLockManager::ScopedLock lock; - int numMigrations; - }; - stdx::unordered_map<NamespaceString, Migrations> _migrationLocks; + BalancerDistLocks _distributedLocks; ResponseHandle _buildAndEnqueueNewRequest(OperationContext* opCtx, std::shared_ptr<CommandInfo>&& commandInfo); @@ -742,24 +773,19 @@ private: bool _canSubmitNewRequests(WithLock); - Status _acquireDistLock(OperationContext* opCtx, NamespaceString nss); + bool _deferredCleanupRequired(WithLock); - void _releaseDistLock(OperationContext* opCtx, NamespaceString nss); + void _performDeferredCleanup(WithLock, OperationContext* opCtx); CommandSubmissionResult _submit(OperationContext* opCtx, const CommandSubmissionHandle& data); - void _applySubmissionResult(WithLock, - OperationContext* opCtx, - CommandSubmissionResult&& submissionResult); + void _applySubmissionResult(WithLock, CommandSubmissionResult&& submissionResult); - void _applyCommandResponse(OperationContext* opCtx, - UUID requestId, + void _applyCommandResponse(UUID requestId, const executor::TaskExecutor::ResponseStatus& response); std::vector<RequestData> _loadRequestsToRecover(OperationContext* opCtx); - void _cleanUpObsoleteRecoveryInfo(WithLock, OperationContext* opCtx); - void _workerThread(); }; diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp index 41db6025b56..51ea5d95995 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp @@ -123,16 +123,25 @@ TEST_F(BalancerCommandsSchedulerTest, ResilientToMultipleStarts) { } TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveChunkCommand) { + auto deferredCleanupCompletedCheckpoint = + globalFailPointRegistry().find("deferredCleanupCompletedCheckpoint"); + auto timesEnteredFailPoint = + deferredCleanupCompletedCheckpoint->setMode(FailPoint::alwaysOn, 0); _scheduler.start(operationContext()); ChunkType moveChunk = makeChunk(0, kShardId0); auto networkResponseFuture = launchAsync([&]() { onCommand( [&](const executor::RemoteCommandRequest& request) { return BSON("ok" << true); }); }); - auto resp = _scheduler.requestMoveChunk( - operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings()); + auto resp = _scheduler.requestMoveChunk(operationContext(), + kNss, + moveChunk, + ShardId(kShardId1), + getDefaultMoveChunkSettings(), + false /* issuedByRemoteUser */); ASSERT_OK(resp->getOutcome()); networkResponseFuture.default_timed_get(); + deferredCleanupCompletedCheckpoint->waitForTimesEntered(timesEnteredFailPoint + 1); // Ensure DistLock is released correctly { auto opCtx = Client::getCurrent()->getOperationContext(); @@ -142,6 +151,7 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveChunkCommand) { opCtx, kNss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout); ASSERT_OK(scopedDistLock.getStatus()); } + deferredCleanupCompletedCheckpoint->setMode(FailPoint::off, 0); _scheduler.stop(); } @@ -228,7 +238,7 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulRequestChunkDataSizeCommand) { chunk.getRange(), chunk.getVersion(), KeyPattern(BSON("x" << 1)), - false); + false /* issuedByRemoteUser */); ASSERT_OK(resp->getOutcome()); ASSERT_OK(resp->getSize().getStatus()); ASSERT_EQ(resp->getSize().getValue(), 156); @@ -239,16 +249,27 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulRequestChunkDataSizeCommand) { } TEST_F(BalancerCommandsSchedulerTest, CommandFailsWhenNetworkReturnsError) { + auto deferredCleanupCompletedCheckpoint = + globalFailPointRegistry().find("deferredCleanupCompletedCheckpoint"); + auto timesEnteredFailPoint = + deferredCleanupCompletedCheckpoint->setMode(FailPoint::alwaysOn, 0); + _scheduler.start(operationContext()); ChunkType moveChunk = makeChunk(0, kShardId0); auto timeoutError = Status{ErrorCodes::NetworkTimeout, "Mock error: network timed out"}; auto networkResponseFuture = launchAsync([&]() { onCommand([&](const executor::RemoteCommandRequest& request) { return timeoutError; }); }); - auto resp = _scheduler.requestMoveChunk( - operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings()); + auto resp = _scheduler.requestMoveChunk(operationContext(), + kNss, + moveChunk, + ShardId(kShardId1), + getDefaultMoveChunkSettings(), + false /* issuedByRemoteUser */); ASSERT_EQUALS(resp->getOutcome(), timeoutError); networkResponseFuture.default_timed_get(); + deferredCleanupCompletedCheckpoint->waitForTimesEntered(timesEnteredFailPoint + 1); + // Ensure DistLock is released correctly { auto opCtx = Client::getCurrent()->getOperationContext(); @@ -258,16 +279,21 @@ TEST_F(BalancerCommandsSchedulerTest, CommandFailsWhenNetworkReturnsError) { opCtx, kNss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout); ASSERT_OK(scopedDistLock.getStatus()); } + deferredCleanupCompletedCheckpoint->setMode(FailPoint::off, 0); _scheduler.stop(); } TEST_F(BalancerCommandsSchedulerTest, CommandFailsWhenSchedulerIsStopped) { ChunkType moveChunk = makeChunk(0, kShardId0); - auto resp = _scheduler.requestMoveChunk( - operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings()); - ASSERT_EQUALS( - resp->getOutcome(), - Status(ErrorCodes::CallbackCanceled, "Request rejected - balancer scheduler is stopped")); + auto resp = _scheduler.requestMoveChunk(operationContext(), + kNss, + moveChunk, + ShardId(kShardId1), + getDefaultMoveChunkSettings(), + false /* issuedByRemoteUser */); + ASSERT_EQUALS(resp->getOutcome(), + Status(ErrorCodes::BalancerInterrupted, + "Request rejected - balancer scheduler is stopped")); // Ensure DistLock is not taken { auto opCtx = Client::getCurrent()->getOperationContext(); @@ -285,13 +311,17 @@ TEST_F(BalancerCommandsSchedulerTest, CommandCanceledIfBalancerStops) { FailPointEnableBlock failPoint("pauseBalancerWorkerThread"); _scheduler.start(operationContext()); ChunkType moveChunk = makeChunk(0, kShardId0); - resp = _scheduler.requestMoveChunk( - operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings()); + resp = _scheduler.requestMoveChunk(operationContext(), + kNss, + moveChunk, + ShardId(kShardId1), + getDefaultMoveChunkSettings(), + false /* issuedByRemoteUser */); _scheduler.stop(); } - ASSERT_EQUALS( - resp->getOutcome(), - Status(ErrorCodes::CallbackCanceled, "Request cancelled - balancer scheduler is stopping")); + ASSERT_EQUALS(resp->getOutcome(), + Status(ErrorCodes::BalancerInterrupted, + "Request cancelled - balancer scheduler is stopping")); // Ensure DistLock is released correctly { auto opCtx = Client::getCurrent()->getOperationContext(); @@ -312,8 +342,12 @@ TEST_F(BalancerCommandsSchedulerTest, MoveChunkCommandGetsPersistedOnDiskWhenReq ChunkType moveChunk = makeChunk(0, kShardId0); auto requestSettings = getDefaultMoveChunkSettings(); - auto deferredResponse = _scheduler.requestMoveChunk( - operationContext(), kNss, moveChunk, ShardId(kShardId1), requestSettings); + auto deferredResponse = _scheduler.requestMoveChunk(operationContext(), + kNss, + moveChunk, + ShardId(kShardId1), + requestSettings, + false /* issuedByRemoteUser */); // The command is persisted... auto persistedCommandDocs = getPersistedCommandDocuments(opCtx); @@ -334,7 +368,8 @@ TEST_F(BalancerCommandsSchedulerTest, MoveChunkCommandGetsPersistedOnDiskWhenReq requestSettings.secondaryThrottle, requestSettings.waitForDelete, requestSettings.forceJumbo, - moveChunk.getVersion()); + moveChunk.getVersion(), + boost::none); ASSERT_BSONOBJ_EQ(originalCommandInfo.serialise(), persistedCommand.getRemoteCommand()); } @@ -356,7 +391,8 @@ TEST_F(BalancerCommandsSchedulerTest, PersistedCommandsAreReissuedWhenRecovering requestSettings.secondaryThrottle, requestSettings.waitForDelete, requestSettings.forceJumbo, - moveChunk.getVersion()); + moveChunk.getVersion(), + boost::none); // 4. ... Which is inspected here. ASSERT_BSONOBJ_EQ(originalCommandInfo.serialise(), request.cmdObj); @@ -364,15 +400,19 @@ TEST_F(BalancerCommandsSchedulerTest, PersistedCommandsAreReissuedWhenRecovering }); }); - auto resp = _scheduler.requestMoveChunk( - operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings()); + auto resp = _scheduler.requestMoveChunk(operationContext(), + kNss, + moveChunk, + ShardId(kShardId1), + getDefaultMoveChunkSettings(), + false /* issuedByRemoteUser */); _scheduler.stop(); failpoint->setMode(FailPoint::Mode::off); // 1. The original submission is expected to fail... - ASSERT_EQUALS( - resp->getOutcome(), - Status(ErrorCodes::CallbackCanceled, "Request cancelled - balancer scheduler is stopping")); + ASSERT_EQUALS(resp->getOutcome(), + Status(ErrorCodes::BalancerInterrupted, + "Request cancelled - balancer scheduler is stopping")); // 2. ... And a recovery document to be persisted auto persistedCommandDocs = getPersistedCommandDocuments(operationContext()); @@ -403,8 +443,12 @@ TEST_F(BalancerCommandsSchedulerTest, DistLockPreventsMoveChunkWithConcurrentDDL ASSERT_OK(scopedDistLock.getStatus()); failpoint->setMode(FailPoint::Mode::off); ChunkType moveChunk = makeChunk(0, kShardId0); - auto resp = _scheduler.requestMoveChunk( - operationContext(), kNss, moveChunk, ShardId(kShardId1), getDefaultMoveChunkSettings()); + auto resp = _scheduler.requestMoveChunk(operationContext(), + kNss, + moveChunk, + ShardId(kShardId1), + getDefaultMoveChunkSettings(), + false /* issuedByRemoteUser */); ASSERT_EQ( resp->getOutcome(), Status(ErrorCodes::LockBusy, "Failed to acquire dist lock testDb.testColl locally")); diff --git a/src/mongo/db/s/balancer/balancer_dist_locks.cpp b/src/mongo/db/s/balancer/balancer_dist_locks.cpp new file mode 100644 index 00000000000..a7384f2d1eb --- /dev/null +++ b/src/mongo/db/s/balancer/balancer_dist_locks.cpp @@ -0,0 +1,78 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/s/balancer/balancer_dist_locks.h" +#include "mongo/db/operation_context.h" + +namespace mongo { + +BalancerDistLocks::~BalancerDistLocks() { + invariant(_distLocksByCollection.empty(), + "Attempting to destroy the keychain while still holding distributed locks"); +} + +Status BalancerDistLocks::acquireFor(OperationContext* opCtx, const NamespaceString& nss) { + auto it = _distLocksByCollection.find(nss); + if (it != _distLocksByCollection.end()) { + ++it->second.references; + return Status::OK(); + } else { + boost::optional<DistLockManager::ScopedLock> scopedLock; + try { + scopedLock.emplace(DistLockManager::get(opCtx)->lockDirectLocally( + opCtx, nss.ns(), DistLockManager::kSingleLockAttemptTimeout)); + + const std::string whyMessage(str::stream() + << "Migrating chunk(s) in collection " << nss.ns()); + uassertStatusOK(DistLockManager::get(opCtx)->lockDirect( + opCtx, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout)); + } catch (const DBException& ex) { + return ex.toStatus(str::stream() << "Could not acquire collection lock for " << nss.ns() + << " to migrate chunks"); + } + ReferenceCountedLock refCountedLock(std::move(*scopedLock)); + _distLocksByCollection.insert(std::make_pair(nss, std::move(refCountedLock))); + } + return Status::OK(); +} + +void BalancerDistLocks::releaseFor(OperationContext* opCtx, const NamespaceString& nss) { + auto it = _distLocksByCollection.find(nss); + if (it == _distLocksByCollection.end()) { + return; + } else if (it->second.references == 1) { + DistLockManager::get(opCtx)->unlock(opCtx, nss.ns()); + _distLocksByCollection.erase(it); + } else { + --it->second.references; + } +} + + +} // namespace mongo diff --git a/src/mongo/db/s/balancer/balancer_dist_locks.h b/src/mongo/db/s/balancer/balancer_dist_locks.h new file mode 100644 index 00000000000..891d5a4e323 --- /dev/null +++ b/src/mongo/db/s/balancer/balancer_dist_locks.h @@ -0,0 +1,66 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/s/dist_lock_manager.h" + +namespace mongo { + +/** + * Utility class for centralising the control of any distributed lock required by a client. + * Its usage is not thread-safe. + */ +class BalancerDistLocks { + +public: + BalancerDistLocks() = default; + + ~BalancerDistLocks(); + + Status acquireFor(OperationContext* opCtx, const NamespaceString& nss); + + void releaseFor(OperationContext* opCtx, const NamespaceString& nss); + +private: + struct ReferenceCountedLock { + ReferenceCountedLock(DistLockManager::ScopedLock&& lock) + : lock(std::move(lock)), references(1) {} + + DistLockManager::ScopedLock lock; + int references; + }; + + stdx::unordered_map<NamespaceString, ReferenceCountedLock> _distLocksByCollection; + + BalancerDistLocks(const BalancerDistLocks&) = delete; + BalancerDistLocks& operator=(const BalancerDistLocks&) = delete; +}; + +} // namespace mongo diff --git a/src/mongo/db/s/balancer/migration_manager.cpp b/src/mongo/db/s/balancer/migration_manager.cpp deleted file mode 100644 index 040615acd6a..00000000000 --- a/src/mongo/db/s/balancer/migration_manager.cpp +++ /dev/null @@ -1,709 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding - -#include "mongo/platform/basic.h" - -#include "mongo/db/s/balancer/migration_manager.h" - -#include "mongo/bson/simple_bsonobj_comparator.h" -#include "mongo/bson/util/bson_extract.h" -#include "mongo/client/remote_command_targeter.h" -#include "mongo/db/client.h" -#include "mongo/db/repl/repl_set_config.h" -#include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/s/balancer/scoped_migration_request.h" -#include "mongo/db/s/dist_lock_manager.h" -#include "mongo/executor/task_executor_pool.h" -#include "mongo/logv2/log.h" -#include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/grid.h" -#include "mongo/s/request_types/move_chunk_request.h" -#include "mongo/util/net/hostandport.h" -#include "mongo/util/scopeguard.h" - -namespace mongo { -namespace { - -using executor::RemoteCommandRequest; -using executor::RemoteCommandResponse; - -const char kChunkTooBig[] = "chunkTooBig"; // TODO: delete in 3.8 - -/** - * Parses the 'commandResponse' and converts it to a status to use as the outcome of the command. - * Preserves backwards compatibility with 3.4 and earlier shards that, rather than use a ChunkTooBig - * error code, include an extra field in the response. - * - * TODO: Delete in 3.8 - */ -Status extractMigrationStatusFromCommandResponse(const BSONObj& commandResponse) { - Status commandStatus = getStatusFromCommandResult(commandResponse); - - if (!commandStatus.isOK()) { - bool chunkTooBig = false; - bsonExtractBooleanFieldWithDefault(commandResponse, kChunkTooBig, false, &chunkTooBig) - .transitional_ignore(); - if (chunkTooBig) { - commandStatus = {ErrorCodes::ChunkTooBig, commandStatus.reason()}; - } - } - - return commandStatus; -} - -/** - * Returns whether the specified status is an error caused by stepdown of the primary config node - * currently running the balancer. - */ -bool isErrorDueToConfigStepdown(Status status, bool isStopping) { - return ((status == ErrorCodes::CallbackCanceled && isStopping) || - status == ErrorCodes::BalancerInterrupted || - status == ErrorCodes::InterruptedDueToReplStateChange); -} - -} // namespace - -MigrationManager::MigrationManager(ServiceContext* serviceContext) - : _serviceContext(serviceContext) {} - -MigrationManager::~MigrationManager() { - // The migration manager must be completely quiesced at destruction time - invariant(_activeMigrations.empty()); -} - -MigrationStatuses MigrationManager::executeMigrationsForAutoBalance( - OperationContext* opCtx, - const std::vector<MigrateInfo>& migrateInfos, - uint64_t maxChunkSizeBytes, - const MigrationSecondaryThrottleOptions& secondaryThrottle, - bool waitForDelete) { - - MigrationStatuses migrationStatuses; - - ScopedMigrationRequestsMap scopedMigrationRequests; - std::vector<std::pair<std::shared_ptr<Notification<RemoteCommandResponse>>, MigrateInfo>> - responses; - - for (const auto& migrateInfo : migrateInfos) { - responses.emplace_back(_schedule(opCtx, - migrateInfo, - maxChunkSizeBytes, - secondaryThrottle, - waitForDelete, - &scopedMigrationRequests), - migrateInfo); - } - - // Wait for all the scheduled migrations to complete. - for (auto& response : responses) { - auto notification = std::move(response.first); - auto migrateInfo = std::move(response.second); - - const auto& remoteCommandResponse = notification->get(); - const auto migrationInfoName = migrateInfo.getName(); - - auto it = scopedMigrationRequests.find(migrationInfoName); - if (it == scopedMigrationRequests.end()) { - invariant(!remoteCommandResponse.status.isOK()); - migrationStatuses.emplace(migrationInfoName, std::move(remoteCommandResponse.status)); - continue; - } - - auto statusWithScopedMigrationRequest = std::move(it->second); - - if (!statusWithScopedMigrationRequest.isOK()) { - invariant(!remoteCommandResponse.status.isOK()); - migrationStatuses.emplace(migrationInfoName, - std::move(statusWithScopedMigrationRequest.getStatus())); - continue; - } - - Status commandStatus = _processRemoteCommandResponse( - remoteCommandResponse, &statusWithScopedMigrationRequest.getValue()); - migrationStatuses.emplace(migrationInfoName, std::move(commandStatus)); - } - - invariant(migrationStatuses.size() == migrateInfos.size()); - - return migrationStatuses; -} - -Status MigrationManager::executeManualMigration( - OperationContext* opCtx, - const MigrateInfo& migrateInfo, - uint64_t maxChunkSizeBytes, - const MigrationSecondaryThrottleOptions& secondaryThrottle, - bool waitForDelete) { - - _waitForRecovery(); - - ScopedMigrationRequestsMap scopedMigrationRequests; - - RemoteCommandResponse remoteCommandResponse = _schedule(opCtx, - migrateInfo, - maxChunkSizeBytes, - secondaryThrottle, - waitForDelete, - &scopedMigrationRequests) - ->get(); - - auto swCM = Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh( - opCtx, migrateInfo.nss); - if (!swCM.isOK()) { - return swCM.getStatus(); - } - - const auto& cm = swCM.getValue(); - - const auto chunk = cm.findIntersectingChunkWithSimpleCollation(migrateInfo.minKey); - - Status commandStatus = remoteCommandResponse.status; - - const auto migrationInfoName = migrateInfo.getName(); - - auto it = scopedMigrationRequests.find(migrationInfoName); - - if (it != scopedMigrationRequests.end()) { - invariant(scopedMigrationRequests.size() == 1); - auto statusWithScopedMigrationRequest = &it->second; - commandStatus = _processRemoteCommandResponse( - remoteCommandResponse, &statusWithScopedMigrationRequest->getValue()); - } - - // Migration calls can be interrupted after the metadata is committed but before the command - // finishes the waitForDelete stage. Any failovers, therefore, must always cause the moveChunk - // command to be retried so as to assure that the waitForDelete promise of a successful command - // has been fulfilled. - if (chunk.getShardId() == migrateInfo.to && commandStatus != ErrorCodes::BalancerInterrupted) { - return Status::OK(); - } - - return commandStatus; -} - -void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* opCtx) { - { - stdx::lock_guard<Latch> lock(_mutex); - invariant(_state == State::kStopped); - invariant(_migrationRecoveryMap.empty()); - _state = State::kRecovering; - } - - ScopeGuard scopedGuard([&] { - _migrationRecoveryMap.clear(); - _abandonActiveMigrationsAndEnableManager(opCtx); - }); - - // Load the active migrations from the config.migrations collection. - auto statusWithMigrationsQueryResponse = - Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - MigrationType::ConfigNS, - BSONObj(), - BSONObj(), - boost::none); - - if (!statusWithMigrationsQueryResponse.isOK()) { - LOGV2(21896, - "Unable to read config.migrations collection documents for balancer migration " - "recovery. Abandoning balancer recovery: {error}", - "Unable to read config.migrations documents for balancer migration recovery", - "error"_attr = redact(statusWithMigrationsQueryResponse.getStatus())); - return; - } - - for (const BSONObj& migration : statusWithMigrationsQueryResponse.getValue().docs) { - auto statusWithMigrationType = MigrationType::fromBSON(migration); - if (!statusWithMigrationType.isOK()) { - // The format of this migration document is incorrect. The balancer holds a distlock for - // this migration, but without parsing the migration document we cannot identify which - // distlock must be released. So we must release all distlocks. - LOGV2(21897, - "Unable to parse config.migrations document '{migration}' for balancer" - "migration recovery. Abandoning balancer recovery: {error}", - "Unable to parse config.migrations document for balancer migration recovery", - "migration"_attr = redact(migration.toString()), - "error"_attr = redact(statusWithMigrationType.getStatus())); - return; - } - MigrationType migrateType = std::move(statusWithMigrationType.getValue()); - - auto it = _migrationRecoveryMap.find(NamespaceString(migrateType.getNss())); - if (it == _migrationRecoveryMap.end()) { - std::list<MigrationType> list; - it = _migrationRecoveryMap.insert(std::make_pair(migrateType.getNss(), list)).first; - - // Reacquire the matching distributed lock for this namespace. - const std::string whyMessage(str::stream() << "Migrating chunk(s) in collection " - << migrateType.getNss().ns()); - - auto status = DistLockManager::get(opCtx)->tryLockDirectWithLocalWriteConcern( - opCtx, migrateType.getNss().ns(), whyMessage); - if (!status.isOK()) { - LOGV2(21898, - "Failed to acquire distributed lock for collection {namespace} " - "during balancer recovery of an active migration. Abandoning balancer " - "recovery: {error}", - "Failed to acquire distributed lock for collection " - "during balancer recovery of an active migration", - "namespace"_attr = migrateType.getNss().ns(), - "error"_attr = redact(status)); - return; - } - } - - it->second.push_back(std::move(migrateType)); - } - - scopedGuard.dismiss(); -} - -void MigrationManager::finishRecovery(OperationContext* opCtx, - uint64_t maxChunkSizeBytes, - const MigrationSecondaryThrottleOptions& secondaryThrottle) { - { - stdx::lock_guard<Latch> lock(_mutex); - if (_state == State::kStopping) { - _migrationRecoveryMap.clear(); - return; - } - - // If recovery was abandoned in startRecovery, then there is no more to do. - if (_state == State::kEnabled) { - invariant(_migrationRecoveryMap.empty()); - return; - } - - invariant(_state == State::kRecovering); - } - - ScopeGuard scopedGuard([&] { - _migrationRecoveryMap.clear(); - _abandonActiveMigrationsAndEnableManager(opCtx); - }); - - // Schedule recovered migrations. - std::vector<ScopedMigrationRequest> scopedMigrationRequests; - std::vector<std::shared_ptr<Notification<RemoteCommandResponse>>> responses; - - for (auto& nssAndMigrateInfos : _migrationRecoveryMap) { - auto& nss = nssAndMigrateInfos.first; - auto& migrateInfos = nssAndMigrateInfos.second; - invariant(!migrateInfos.empty()); - - auto swCM = Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh( - opCtx, nss); - if (!swCM.isOK()) { - // This shouldn't happen because the collection was intact and sharded when the previous - // config primary was active and the dist locks have been held by the balancer - // throughout. Abort migration recovery. - LOGV2(21899, - "Unable to reload chunk metadata for collection {namespace} during balancer " - "recovery. Abandoning recovery: {error}", - "Unable to reload chunk metadata for collection during balancer recovery", - "namespace"_attr = nss, - "error"_attr = redact(swCM.getStatus())); - return; - } - - const auto& cm = swCM.getValue(); - int scheduledMigrations = 0; - - while (!migrateInfos.empty()) { - auto migrationType = std::move(migrateInfos.front()); - const auto migrationInfo = migrationType.toMigrateInfo(cm.getUUID()); - auto waitForDelete = migrationType.getWaitForDelete(); - migrateInfos.pop_front(); - - try { - const auto chunk = - cm.findIntersectingChunkWithSimpleCollation(migrationInfo.minKey); - - if (chunk.getShardId() != migrationInfo.from) { - // Chunk is no longer on the source shard specified by this migration. Erase the - // migration recovery document associated with it. - ScopedMigrationRequest::createForRecovery(opCtx, nss, migrationInfo.minKey); - continue; - } - } catch (const ExceptionFor<ErrorCodes::ShardKeyNotFound>&) { - // The shard key for the collection has changed. - // Abandon this migration and remove the document associated with it. - ScopedMigrationRequest::createForRecovery(opCtx, nss, migrationInfo.minKey); - continue; - } - - scopedMigrationRequests.emplace_back( - ScopedMigrationRequest::createForRecovery(opCtx, nss, migrationInfo.minKey)); - - scheduledMigrations++; - - responses.emplace_back(_schedule(opCtx, - migrationInfo, - maxChunkSizeBytes, - secondaryThrottle, - waitForDelete, - nullptr)); - } - - // If no migrations were scheduled for this namespace, free the dist lock - if (!scheduledMigrations) { - DistLockManager::get(opCtx)->unlock(opCtx, nss.ns()); - } - } - - _migrationRecoveryMap.clear(); - scopedGuard.dismiss(); - - { - stdx::lock_guard<Latch> lock(_mutex); - if (_state == State::kRecovering) { - _state = State::kEnabled; - _condVar.notify_all(); - } - } - - // Wait for each migration to finish, as usual. - for (auto& response : responses) { - response->get(); - } -} - -void MigrationManager::interruptAndDisableMigrations() { - auto executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor(); - - stdx::lock_guard<Latch> lock(_mutex); - invariant(_state == State::kEnabled || _state == State::kRecovering); - _state = State::kStopping; - - // Interrupt any active migrations with dist lock - for (auto& cmsEntry : _activeMigrations) { - auto& migrations = cmsEntry.second.migrationsList; - - for (auto& migration : migrations) { - if (migration.callbackHandle) { - executor->cancel(*migration.callbackHandle); - } - } - } - - _checkDrained(lock); -} - -void MigrationManager::drainActiveMigrations() { - stdx::unique_lock<Latch> lock(_mutex); - - if (_state == State::kStopped) - return; - invariant(_state == State::kStopping); - _condVar.wait(lock, [this] { return _activeMigrations.empty(); }); - _state = State::kStopped; -} - -std::shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule( - OperationContext* opCtx, - const MigrateInfo& migrateInfo, - uint64_t maxChunkSizeBytes, - const MigrationSecondaryThrottleOptions& secondaryThrottle, - bool waitForDelete, - ScopedMigrationRequestsMap* scopedMigrationRequests) { - - // Ensure we are not stopped in order to avoid doing the extra work - { - stdx::lock_guard<Latch> lock(_mutex); - if (_state != State::kEnabled && _state != State::kRecovering) { - return std::make_shared<Notification<RemoteCommandResponse>>( - Status(ErrorCodes::BalancerInterrupted, - "Migration cannot be executed because the balancer is not running")); - } - } - - const auto fromShardStatus = - Grid::get(opCtx)->shardRegistry()->getShard(opCtx, migrateInfo.from); - if (!fromShardStatus.isOK()) { - return std::make_shared<Notification<RemoteCommandResponse>>( - std::move(fromShardStatus.getStatus())); - } - - const auto fromShard = fromShardStatus.getValue(); - auto fromHostStatus = fromShard->getTargeter()->findHost( - opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}); - if (!fromHostStatus.isOK()) { - return std::make_shared<Notification<RemoteCommandResponse>>( - std::move(fromHostStatus.getStatus())); - } - - BSONObjBuilder builder; - MoveChunkRequest::appendAsCommand(&builder, - migrateInfo.nss, - migrateInfo.version, - migrateInfo.from, - migrateInfo.to, - ChunkRange(migrateInfo.minKey, migrateInfo.maxKey), - maxChunkSizeBytes, - secondaryThrottle, - waitForDelete, - migrateInfo.forceJumbo); - - stdx::lock_guard<Latch> lock(_mutex); - - if (_state != State::kEnabled && _state != State::kRecovering) { - return std::make_shared<Notification<RemoteCommandResponse>>( - Status(ErrorCodes::BalancerInterrupted, - "Migration cannot be executed because the balancer is not running")); - } - - Migration migration(migrateInfo.nss, builder.obj()); - - auto retVal = migration.completionNotification; - - _acquireDistLockAndSchedule(lock, - opCtx, - fromHostStatus.getValue(), - std::move(migration), - migrateInfo, - waitForDelete, - scopedMigrationRequests); - - return retVal; -} - -void MigrationManager::_acquireDistLockAndSchedule( - WithLock lock, - OperationContext* opCtx, - const HostAndPort& targetHost, - Migration migration, - const MigrateInfo& migrateInfo, - bool waitForDelete, - ScopedMigrationRequestsMap* scopedMigrationRequests) noexcept { - auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); - - const NamespaceString nss(migration.nss); - - auto it = _activeMigrations.find(nss); - if (it == _activeMigrations.end()) { - boost::optional<DistLockManager::ScopedLock> scopedLock; - try { - scopedLock.emplace(DistLockManager::get(opCtx)->lockDirectLocally( - opCtx, nss.ns(), DistLockManager::kSingleLockAttemptTimeout)); - - const std::string whyMessage(str::stream() - << "Migrating chunk(s) in collection " << nss.ns()); - uassertStatusOK(DistLockManager::get(opCtx)->lockDirect( - opCtx, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout)); - } catch (const DBException& ex) { - migration.completionNotification->set( - ex.toStatus(str::stream() << "Could not acquire collection lock for " << nss.ns() - << " to migrate chunks")); - return; - } - - MigrationsState migrationsState(std::move(*scopedLock)); - it = _activeMigrations.insert(std::make_pair(nss, std::move(migrationsState))).first; - } - - auto migrationRequestStatus = Status::OK(); - - if (scopedMigrationRequests) { - // Write a document to the config.migrations collection, in case this migration must be - // recovered by the Balancer. - auto statusWithScopedMigrationRequest = - ScopedMigrationRequest::writeMigration(opCtx, migrateInfo, waitForDelete); - if (!statusWithScopedMigrationRequest.isOK()) { - migrationRequestStatus = std::move(statusWithScopedMigrationRequest.getStatus()); - } else { - scopedMigrationRequests->emplace(migrateInfo.getName(), - std::move(statusWithScopedMigrationRequest)); - } - } - - auto migrations = &it->second.migrationsList; - - // Add ourselves to the list of migrations on this collection. From that point onwards, requests - // must call _complete regardless of success or failure in order to remove it from the list. - migrations->push_front(std::move(migration)); - auto itMigration = migrations->begin(); - - if (!migrationRequestStatus.isOK()) { - _complete(lock, opCtx, itMigration, std::move(migrationRequestStatus)); - return; - } - - const RemoteCommandRequest remoteRequest( - targetHost, NamespaceString::kAdminDb.toString(), itMigration->moveChunkCmdObj, opCtx); - - StatusWith<executor::TaskExecutor::CallbackHandle> callbackHandleWithStatus = - executor->scheduleRemoteCommand( - remoteRequest, - [this, service = opCtx->getServiceContext(), itMigration]( - const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { - ThreadClient tc(getThreadName(), service); - auto opCtx = cc().makeOperationContext(); - - stdx::lock_guard<Latch> lock(_mutex); - _complete(lock, opCtx.get(), itMigration, args.response); - }); - - if (callbackHandleWithStatus.isOK()) { - itMigration->callbackHandle = std::move(callbackHandleWithStatus.getValue()); - return; - } - - _complete(lock, opCtx, itMigration, std::move(callbackHandleWithStatus.getStatus())); -} - -void MigrationManager::_complete(WithLock lock, - OperationContext* opCtx, - MigrationsList::iterator itMigration, - const RemoteCommandResponse& remoteCommandResponse) { - const NamespaceString nss(itMigration->nss); - - // Make sure to signal the notification last, after the distributed lock is freed, so that we - // don't have the race condition where a subsequently scheduled migration finds the dist lock - // still acquired. - auto notificationToSignal = itMigration->completionNotification; - - auto it = _activeMigrations.find(nss); - invariant(it != _activeMigrations.end()); - - auto migrations = &it->second.migrationsList; - migrations->erase(itMigration); - - if (migrations->empty()) { - DistLockManager::get(opCtx)->unlock(opCtx, nss.ns()); - _activeMigrations.erase(it); - _checkDrained(lock); - } - - notificationToSignal->set(remoteCommandResponse); -} - -void MigrationManager::_checkDrained(WithLock) { - if (_state == State::kEnabled || _state == State::kRecovering) { - return; - } - invariant(_state == State::kStopping); - - if (_activeMigrations.empty()) { - _condVar.notify_all(); - } -} - -void MigrationManager::_waitForRecovery() { - stdx::unique_lock<Latch> lock(_mutex); - _condVar.wait(lock, [this] { return _state != State::kRecovering; }); -} - -void MigrationManager::_abandonActiveMigrationsAndEnableManager(OperationContext* opCtx) { - stdx::unique_lock<Latch> lock(_mutex); - if (_state == State::kStopping) { - // The balancer was interrupted. Let the next balancer recover the state. - return; - } - invariant(_state == State::kRecovering); - - // Unlock all balancer distlocks we aren't using anymore. - DistLockManager::get(opCtx)->unlockAll(opCtx); - - // Clear the config.migrations collection so that those chunks can be scheduled for migration - // again. - Grid::get(opCtx) - ->catalogClient() - ->removeConfigDocuments( - opCtx, MigrationType::ConfigNS, BSONObj(), ShardingCatalogClient::kLocalWriteConcern) - .transitional_ignore(); - - _state = State::kEnabled; - _condVar.notify_all(); -} - -Status MigrationManager::_processRemoteCommandResponse( - const RemoteCommandResponse& remoteCommandResponse, - ScopedMigrationRequest* scopedMigrationRequest) { - - stdx::lock_guard<Latch> lock(_mutex); - Status commandStatus(ErrorCodes::InternalError, "Uninitialized value."); - - // Check for local errors sending the remote command caused by stepdown. - if (isErrorDueToConfigStepdown(remoteCommandResponse.status, - _state != State::kEnabled && _state != State::kRecovering)) { - scopedMigrationRequest->keepDocumentOnDestruct(); - return {ErrorCodes::BalancerInterrupted, - str::stream() << "Migration interrupted because the balancer is stopping." - << " Command status: " << remoteCommandResponse.status.toString()}; - } - - if (!remoteCommandResponse.isOK()) { - commandStatus = remoteCommandResponse.status; - } else { - // TODO: delete in 3.8 - commandStatus = extractMigrationStatusFromCommandResponse(remoteCommandResponse.data); - } - - if (!Shard::shouldErrorBePropagated(commandStatus.code())) { - commandStatus = {ErrorCodes::OperationFailed, - str::stream() << "moveChunk command failed on source shard." - << causedBy(commandStatus)}; - } - - // Any failure to remove the migration document should be because the config server is - // stepping/shutting down. In this case we must fail the moveChunk command with a retryable - // error so that the caller does not move on to other distlock requiring operations that could - // fail when the balancer recovers and takes distlocks for migration recovery. - Status status = scopedMigrationRequest->tryToRemoveMigration(); - if (!status.isOK()) { - commandStatus = { - ErrorCodes::BalancerInterrupted, - str::stream() << "Migration interrupted because the balancer is stopping" - << " and failed to remove the config.migrations document." - << " Command status: " - << (commandStatus.isOK() ? status.toString() : commandStatus.toString())}; - } - - return commandStatus; -} - -MigrationManager::Migration::Migration(NamespaceString inNss, BSONObj inMoveChunkCmdObj) - : nss(std::move(inNss)), - moveChunkCmdObj(std::move(inMoveChunkCmdObj)), - completionNotification(std::make_shared<Notification<RemoteCommandResponse>>()) {} - -MigrationManager::Migration::~Migration() { - invariant(completionNotification); -} - -MigrationManager::MigrationsState::MigrationsState(DistLockManager::ScopedLock lock) - : lock(std::move(lock)) {} - -} // namespace mongo diff --git a/src/mongo/db/s/balancer/migration_manager.h b/src/mongo/db/s/balancer/migration_manager.h deleted file mode 100644 index 073353adccb..00000000000 --- a/src/mongo/db/s/balancer/migration_manager.h +++ /dev/null @@ -1,286 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <list> -#include <map> -#include <vector> - -#include "mongo/bson/bsonobj.h" -#include "mongo/db/namespace_string.h" -#include "mongo/db/s/balancer/balancer_policy.h" -#include "mongo/db/s/balancer/type_migration.h" -#include "mongo/db/s/dist_lock_manager.h" -#include "mongo/executor/task_executor.h" -#include "mongo/platform/mutex.h" -#include "mongo/s/request_types/migration_secondary_throttle_options.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/unordered_map.h" -#include "mongo/util/concurrency/notification.h" -#include "mongo/util/concurrency/with_lock.h" - -namespace mongo { - -class ScopedMigrationRequest; - -// Uniquely identifies a migration, regardless of shard and version. -typedef std::string MigrationIdentifier; -typedef std::map<MigrationIdentifier, Status> MigrationStatuses; - -/** - * Manages and executes parallel migrations for the balancer. - */ -class MigrationManager { - MigrationManager(const MigrationManager&) = delete; - MigrationManager& operator=(const MigrationManager&) = delete; - -public: - MigrationManager(ServiceContext* serviceContext); - ~MigrationManager(); - - /** - * A blocking method that attempts to schedule all the migrations specified in - * "candidateMigrations" and wait for them to complete. Takes the distributed lock for each - * collection with a chunk being migrated. - * - * If any of the migrations, which were scheduled in parallel fails with a LockBusy error - * reported from the shard, retries it serially without the distributed lock. - * - * Returns a map of migration Status objects to indicate the success/failure of each migration. - */ - MigrationStatuses executeMigrationsForAutoBalance( - OperationContext* opCtx, - const std::vector<MigrateInfo>& migrateInfos, - uint64_t maxChunkSizeBytes, - const MigrationSecondaryThrottleOptions& secondaryThrottle, - bool waitForDelete); - - /** - * A blocking method that attempts to schedule the migration specified in "migrateInfo" and - * waits for it to complete. Takes the distributed lock for the namespace which is being - * migrated. - * - * Returns the status of the migration. - */ - Status executeManualMigration(OperationContext* opCtx, - const MigrateInfo& migrateInfo, - uint64_t maxChunkSizeBytes, - const MigrationSecondaryThrottleOptions& secondaryThrottle, - bool waitForDelete); - - /** - * Non-blocking method that puts the migration manager in the kRecovering state, in which - * new migration requests will block until finishRecovery is called. Then reacquires distributed - * locks for the balancer and any active migrations. The distributed locks are taken with local - * write concern, since this is called in drain mode where majority writes are not yet possible. - * - * The active migration recovery may fail and be abandoned, setting the state to kEnabled. - */ - void startRecoveryAndAcquireDistLocks(OperationContext* opCtx); - - /** - * Blocking method that must only be called after startRecovery has been called. Recovers the - * state of the migration manager (if necessary and able) and puts it in the kEnabled state, - * where it will accept new migrations. Any migrations waiting on the recovery state will be - * unblocked once the state is kEnabled, and then this function waits for the recovered active - * migrations to finish before returning. - * - * The active migration recovery may fail and be abandoned, setting the state to kEnabled and - * unblocking any process waiting on the recovery state. - */ - void finishRecovery(OperationContext* opCtx, - uint64_t maxChunkSizeBytes, - const MigrationSecondaryThrottleOptions& secondaryThrottle); - - /** - * Non-blocking method that should never be called concurrently with finishRecovery. Puts the - * manager in a state where all subsequently scheduled migrations will immediately fail (without - * ever getting scheduled) and all active ones will be cancelled. It has no effect if the - * migration manager is already stopping or stopped. - */ - void interruptAndDisableMigrations(); - - /** - * Blocking method that waits for any currently scheduled migrations to complete. Must be - * called after interruptAndDisableMigrations has been called in order to be able to re-enable - * migrations again. - */ - void drainActiveMigrations(); - -private: - // The current state of the migration manager - enum class State { // Allowed transitions: - kStopped, // kRecovering - kRecovering, // kEnabled, kStopping - kEnabled, // kStopping - kStopping, // kStopped - }; - - /** - * Tracks the execution state of a single migration. - */ - struct Migration { - Migration(NamespaceString nss, BSONObj moveChunkCmdObj); - ~Migration(); - - // Namespace for which this migration applies - NamespaceString nss; - - // Command object representing the migration - BSONObj moveChunkCmdObj; - - // Callback handle for the migration network request. If the migration has not yet been sent - // on the network, this value is not set. - boost::optional<executor::TaskExecutor::CallbackHandle> callbackHandle; - - // Notification, which will be signaled when the migration completes - std::shared_ptr<Notification<executor::RemoteCommandResponse>> completionNotification; - }; - - // Used as a type in which to store a list of active migrations. The reason to choose list is - // that its iterators do not get invalidated when entries are removed around them. This allows - // O(1) removal time. - using MigrationsList = std::list<Migration>; - - // Tracks the execution of all of the active migrations within a collection. It holds a - // NamespaceSerializer lock for the corresponding nss, which will be released when all of the - // scheduled chunk migrations for this collection have completed. - struct MigrationsState { - MigrationsState(DistLockManager::ScopedLock lock); - - MigrationsList migrationsList; - DistLockManager::ScopedLock lock; - }; - using CollectionMigrationsStateMap = stdx::unordered_map<NamespaceString, MigrationsState>; - - using ScopedMigrationRequestsMap = - std::map<MigrationIdentifier, StatusWith<ScopedMigrationRequest>>; - - /** - * Optionally takes the collection distributed lock and schedules a chunk migration with the - * specified parameters. May block for distributed lock acquisition. If dist lock acquisition is - * successful (or not done), schedules the migration request and returns a notification which - * can be used to obtain the outcome of the operation. - */ - std::shared_ptr<Notification<executor::RemoteCommandResponse>> _schedule( - OperationContext* opCtx, - const MigrateInfo& migrateInfo, - uint64_t maxChunkSizeBytes, - const MigrationSecondaryThrottleOptions& secondaryThrottle, - bool waitForDelete, - ScopedMigrationRequestsMap* scopedMigrationRequests); - - /** - * Acquires the collection distributed lock for the specified namespace and if it succeeds, - * schedules the migration. - * - * The distributed lock is acquired before scheduling the first migration for the collection and - * is only released when all active migrations on the collection have finished. - * - * Assumes that the migration document has already been written if no ScopedMigrationRequestsMap - * pointer is passed. Otherwise, writes the migration document under the collection distributed - * lock and adds it to the map. - */ - void _acquireDistLockAndSchedule(WithLock, - OperationContext* opCtx, - const HostAndPort& targetHost, - Migration migration, - const MigrateInfo& migrateInfo, - bool waitForDelete, - ScopedMigrationRequestsMap* scopedMigrationRequests) noexcept; - - /** - * Used internally for migrations scheduled with the distributed lock acquired by the config - * server. Called exactly once for each scheduled migration, it will signal the migration in the - * passed iterator and if this is the last migration for the collection will free the collection - * distributed lock. - */ - void _complete(WithLock, - OperationContext* opCtx, - MigrationsList::iterator itMigration, - const executor::RemoteCommandResponse& remoteCommandResponse); - - /** - * If the state of the migration manager is kStopping, checks whether there are any outstanding - * scheduled requests and if there aren't any signals the class condition variable. - */ - void _checkDrained(WithLock); - - /** - * Blocking call, which waits for the migration manager to leave the recovering state (if it is - * currently recovering). - */ - void _waitForRecovery(); - - /** - * Should only be called from startRecovery or finishRecovery functions when the migration - * manager is in either the kStopped or kRecovering state. Releases all the distributed locks - * that the balancer holds, clears the config.migrations collection, changes the state of the - * migration manager to kEnabled. Then unblocks all processes waiting for kEnabled state. - */ - void _abandonActiveMigrationsAndEnableManager(OperationContext* opCtx); - - /** - * Parses a moveChunk RemoteCommandResponse's two levels of Status objects and distiguishes - * between errors generated by this config server and the shard primary to which the moveChunk - * command was sent. - * - * If the command failed because of stepdown of this config server, the migration document - * managed by 'scopedMigrationRequest' is saved for later balancer recovery and a - * BalancerInterrupted error is returned. If the command failed because the shard to which the - * command was sent returned an error, the migration document is not saved and the error is - * returned without conversion. - */ - Status _processRemoteCommandResponse( - const executor::RemoteCommandResponse& remoteCommandResponse, - ScopedMigrationRequest* scopedMigrationRequest); - - // The service context under which this migration manager runs. - ServiceContext* const _serviceContext; - - // Carries migration information over from startRecovery to finishRecovery. Should only be set - // in startRecovery and then accessed in finishRecovery. - stdx::unordered_map<NamespaceString, std::list<MigrationType>> _migrationRecoveryMap; - - // Protects the class state below. - Mutex _mutex = MONGO_MAKE_LATCH("MigrationManager::_mutex"); - - // Always start the migration manager in a stopped state. - State _state{State::kStopped}; - - // Condition variable, which is waited on when the migration manager's state is changing and - // signaled when the state change is complete. - stdx::condition_variable _condVar; - - // Maps collection namespaces to that collection's active migrations. - CollectionMigrationsStateMap _activeMigrations; -}; - -} // namespace mongo diff --git a/src/mongo/db/s/balancer/migration_manager_test.cpp b/src/mongo/db/s/balancer/migration_manager_test.cpp deleted file mode 100644 index ed04770685e..00000000000 --- a/src/mongo/db/s/balancer/migration_manager_test.cpp +++ /dev/null @@ -1,700 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/db/commands.h" -#include "mongo/db/read_write_concern_defaults.h" -#include "mongo/db/read_write_concern_defaults_cache_lookup_mock.h" -#include "mongo/db/s/balancer/migration_manager.h" -#include "mongo/db/s/balancer/migration_test_fixture.h" -#include "mongo/db/s/dist_lock_manager.h" -#include "mongo/s/request_types/move_chunk_request.h" - -namespace mongo { -namespace { - -using executor::RemoteCommandRequest; -using unittest::assertGet; - -const MigrationSecondaryThrottleOptions kDefaultSecondaryThrottle = - MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kDefault); - -class MigrationManagerTest : public MigrationTestFixture { -protected: - void setUp() override { - MigrationTestFixture::setUp(); - _migrationManager = std::make_unique<MigrationManager>(getServiceContext()); - _migrationManager->startRecoveryAndAcquireDistLocks(operationContext()); - _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle); - - // Necessary because the migration manager may take a dist lock, which calls serverStatus - // and will attempt to return the latest read write concern defaults. - ReadWriteConcernDefaults::create(getServiceContext(), _lookupMock.getFetchDefaultsFn()); - } - - void tearDown() override { - checkMigrationsCollectionIsEmptyAndLocksAreUnlocked(); - _migrationManager->interruptAndDisableMigrations(); - _migrationManager->drainActiveMigrations(); - _migrationManager.reset(); - ConfigServerTestFixture::tearDown(); - } - - /** - * Sets up mock network to expect a moveChunk command and returns a fixed BSON response or a - * "returnStatus". - */ - void expectMoveChunkCommand(const NamespaceString& nss, - const ChunkType& chunk, - const ShardId& toShardId, - const BSONObj& response) { - onCommand([&nss, &chunk, &toShardId, &response](const RemoteCommandRequest& request) { - NamespaceString nss(request.cmdObj.firstElement().valueStringData()); - - const StatusWith<MoveChunkRequest> moveChunkRequestWithStatus = - MoveChunkRequest::createFromCommand(nss, request.cmdObj); - ASSERT_OK(moveChunkRequestWithStatus.getStatus()); - - ASSERT_EQ(nss, moveChunkRequestWithStatus.getValue().getNss()); - ASSERT_BSONOBJ_EQ(chunk.getMin(), moveChunkRequestWithStatus.getValue().getMinKey()); - ASSERT_BSONOBJ_EQ(chunk.getMax(), moveChunkRequestWithStatus.getValue().getMaxKey()); - ASSERT_EQ(chunk.getShard(), moveChunkRequestWithStatus.getValue().getFromShardId()); - - ASSERT_EQ(toShardId, moveChunkRequestWithStatus.getValue().getToShardId()); - - return response; - }); - } - - void expectMoveChunkCommand(const NamespaceString& nss, - const ChunkType& chunk, - const ShardId& toShardId, - const Status& returnStatus) { - BSONObjBuilder resultBuilder; - CommandHelpers::appendCommandStatusNoThrow(resultBuilder, returnStatus); - expectMoveChunkCommand(nss, chunk, toShardId, resultBuilder.obj()); - } - - std::unique_ptr<MigrationManager> _migrationManager; - ReadWriteConcernDefaultsLookupMock _lookupMock; -}; - -TEST_F(MigrationManagerTest, OneCollectionTwoMigrations) { - // Set up two shards in the metadata. - ASSERT_OK(catalogClient()->insertConfigDocument( - operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); - ASSERT_OK(catalogClient()->insertConfigDocument( - operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern)); - - // Set up the database and collection as sharded in the metadata. - const std::string dbName = "foo"; - const NamespaceString collName(dbName, "bar"); - const auto collUUID = UUID::gen(); - ChunkVersion version(2, 0, OID::gen(), Timestamp(42)); - - setUpDatabase(dbName, kShardId0); - setUpCollection(collName, collUUID, version); - - // Set up two chunks in the metadata. - ChunkType chunk1 = - setUpChunk(collUUID, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version); - version.incMinor(); - ChunkType chunk2 = - setUpChunk(collUUID, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version); - - // Going to request that these two chunks get migrated. - const std::vector<MigrateInfo> migrationRequests{{kShardId1, - collName, - chunk1, - MoveChunkRequest::ForceJumbo::kDoNotForce, - MigrateInfo::chunksImbalance}, - {kShardId3, - collName, - chunk2, - MoveChunkRequest::ForceJumbo::kDoNotForce, - MigrateInfo::chunksImbalance}}; - - auto future = launchAsync([this, migrationRequests] { - ThreadClient tc("Test", getServiceContext()); - auto opCtx = cc().makeOperationContext(); - - // Scheduling the moveChunk commands requires finding a host to which to send the command. - // Set up dummy hosts for the source shards. - shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0); - shardTargeterMock(opCtx.get(), kShardId2)->setFindHostReturnValue(kShardHost2); - - MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance( - opCtx.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false); - - for (const auto& migrateInfo : migrationRequests) { - ASSERT_OK(migrationStatuses.at(migrateInfo.getName())); - } - }); - - // Expect two moveChunk commands. - expectMoveChunkCommand(collName, chunk1, kShardId1, Status::OK()); - expectMoveChunkCommand(collName, chunk2, kShardId3, Status::OK()); - - // Run the MigrationManager code. - future.default_timed_get(); -} - -TEST_F(MigrationManagerTest, TwoCollectionsTwoMigrationsEach) { - // Set up two shards in the metadata. - ASSERT_OK(catalogClient()->insertConfigDocument( - operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); - ASSERT_OK(catalogClient()->insertConfigDocument( - operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern)); - - // Set up a database and two collections as sharded in the metadata. - std::string dbName = "foo"; - const NamespaceString collName1(dbName, "bar"); - const auto collUUID1 = UUID::gen(); - const NamespaceString collName2(dbName, "baz"); - const auto collUUID2 = UUID::gen(); - ChunkVersion version1(2, 0, OID::gen(), Timestamp(12)); - ChunkVersion version2(2, 0, OID::gen(), Timestamp(24)); - - setUpDatabase(dbName, kShardId0); - setUpCollection(collName1, collUUID1, version1); - setUpCollection(collName2, collUUID2, version2); - - // Set up two chunks in the metadata for each collection. - ChunkType chunk1coll1 = - setUpChunk(collUUID1, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version1); - version1.incMinor(); - ChunkType chunk2coll1 = - setUpChunk(collUUID1, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version1); - - ChunkType chunk1coll2 = - setUpChunk(collUUID2, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version2); - version2.incMinor(); - ChunkType chunk2coll2 = - setUpChunk(collUUID2, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version2); - - // Going to request that these four chunks get migrated. - const std::vector<MigrateInfo> migrationRequests{{kShardId1, - collName1, - chunk1coll1, - MoveChunkRequest::ForceJumbo::kDoNotForce, - MigrateInfo::chunksImbalance}, - {kShardId3, - collName1, - chunk2coll1, - MoveChunkRequest::ForceJumbo::kDoNotForce, - MigrateInfo::chunksImbalance}, - {kShardId1, - collName2, - chunk1coll2, - MoveChunkRequest::ForceJumbo::kDoNotForce, - MigrateInfo::chunksImbalance}, - {kShardId3, - collName2, - chunk2coll2, - MoveChunkRequest::ForceJumbo::kDoNotForce, - MigrateInfo::chunksImbalance}}; - - auto future = launchAsync([this, migrationRequests] { - ThreadClient tc("Test", getServiceContext()); - auto opCtx = cc().makeOperationContext(); - - // Scheduling the moveChunk commands requires finding a host to which to send the command. - // Set up dummy hosts for the source shards. - shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0); - shardTargeterMock(opCtx.get(), kShardId2)->setFindHostReturnValue(kShardHost2); - - MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance( - opCtx.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false); - - for (const auto& migrateInfo : migrationRequests) { - ASSERT_OK(migrationStatuses.at(migrateInfo.getName())); - } - }); - - // Expect four moveChunk commands. - expectMoveChunkCommand(collName1, chunk1coll1, kShardId1, Status::OK()); - expectMoveChunkCommand(collName1, chunk2coll1, kShardId3, Status::OK()); - expectMoveChunkCommand(collName2, chunk1coll2, kShardId1, Status::OK()); - expectMoveChunkCommand(collName2, chunk2coll2, kShardId3, Status::OK()); - - // Run the MigrationManager code. - future.default_timed_get(); -} - -// The MigrationManager should fail the migration if a host is not found for the source shard. -// Scheduling a moveChunk command requires finding a host to which to send the command. -TEST_F(MigrationManagerTest, SourceShardNotFound) { - // Set up two shards in the metadata. - ASSERT_OK(catalogClient()->insertConfigDocument( - operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); - ASSERT_OK(catalogClient()->insertConfigDocument( - operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern)); - - // Set up the database and collection as sharded in the metadata. - const std::string dbName = "foo"; - const NamespaceString collName(dbName, "bar"); - const auto collUUID = UUID::gen(); - ChunkVersion version(2, 0, OID::gen(), Timestamp(42)); - - setUpDatabase(dbName, kShardId0); - setUpCollection(collName, collUUID, version); - - // Set up two chunks in the metadata. - ChunkType chunk1 = - setUpChunk(collUUID, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version); - version.incMinor(); - ChunkType chunk2 = - setUpChunk(collUUID, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version); - - // Going to request that these two chunks get migrated. - const std::vector<MigrateInfo> migrationRequests{{kShardId1, - collName, - chunk1, - MoveChunkRequest::ForceJumbo::kDoNotForce, - MigrateInfo::chunksImbalance}, - {kShardId3, - collName, - chunk2, - MoveChunkRequest::ForceJumbo::kDoNotForce, - MigrateInfo::chunksImbalance}}; - - auto future = launchAsync([this, chunk1, chunk2, migrationRequests] { - ThreadClient tc("Test", getServiceContext()); - auto opCtx = cc().makeOperationContext(); - - // Scheduling a moveChunk command requires finding a host to which to send the command. Set - // up a dummy host for kShardHost0, and return an error for kShardHost3. - shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0); - shardTargeterMock(opCtx.get(), kShardId2) - ->setFindHostReturnValue( - Status(ErrorCodes::ReplicaSetNotFound, "SourceShardNotFound generated error.")); - - MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance( - opCtx.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false); - - ASSERT_OK(migrationStatuses.at(migrationRequests.front().getName())); - ASSERT_EQ(ErrorCodes::ReplicaSetNotFound, - migrationStatuses.at(migrationRequests.back().getName())); - }); - - // Expect only one moveChunk command to be called. - expectMoveChunkCommand(collName, chunk1, kShardId1, Status::OK()); - - // Run the MigrationManager code. - future.default_timed_get(); -} - -// TODO: Delete in 3.8 -TEST_F(MigrationManagerTest, JumboChunkResponseBackwardsCompatibility) { - // Set up one shard in the metadata. - ASSERT_OK(catalogClient()->insertConfigDocument( - operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); - - // Set up the database and collection as sharded in the metadata. - const std::string dbName = "foo"; - const NamespaceString collName(dbName, "bar"); - const auto collUUID = UUID::gen(); - ChunkVersion version(2, 0, OID::gen(), Timestamp(42)); - - setUpDatabase(dbName, kShardId0); - setUpCollection(collName, collUUID, version); - - // Set up a single chunk in the metadata. - ChunkType chunk1 = - setUpChunk(collUUID, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version); - - // Going to request that this chunk gets migrated. - const std::vector<MigrateInfo> migrationRequests{{kShardId1, - collName, - chunk1, - MoveChunkRequest::ForceJumbo::kDoNotForce, - MigrateInfo::chunksImbalance}}; - - auto future = launchAsync([this, chunk1, migrationRequests] { - ThreadClient tc("Test", getServiceContext()); - auto opCtx = cc().makeOperationContext(); - - // Scheduling a moveChunk command requires finding a host to which to send the command. Set - // up a dummy host for kShardHost0. - shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0); - - MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance( - opCtx.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false); - - ASSERT_EQ(ErrorCodes::ChunkTooBig, - migrationStatuses.at(migrationRequests.front().getName())); - }); - - // Expect only one moveChunk command to be called. - expectMoveChunkCommand(collName, chunk1, kShardId1, BSON("ok" << 0 << "chunkTooBig" << true)); - - // Run the MigrationManager code. - future.default_timed_get(); -} - -TEST_F(MigrationManagerTest, InterruptMigration) { - // Set up one shard in the metadata. - ASSERT_OK(catalogClient()->insertConfigDocument( - operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); - - // Set up the database and collection as sharded in the metadata. - const std::string dbName = "foo"; - const NamespaceString collName(dbName, "bar"); - const auto collUUID = UUID::gen(); - ChunkVersion version(2, 0, OID::gen(), Timestamp(42)); - - setUpDatabase(dbName, kShardId0); - setUpCollection(collName, collUUID, version); - - // Set up a single chunk in the metadata. - ChunkType chunk = - setUpChunk(collUUID, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version); - - auto future = launchAsync([&] { - ThreadClient tc("Test", getServiceContext()); - auto opCtx = cc().makeOperationContext(); - - // Scheduling a moveChunk command requires finding a host to which to send the command. Set - // up a dummy host for kShardHost0. - shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0); - - ASSERT_EQ( - ErrorCodes::BalancerInterrupted, - _migrationManager->executeManualMigration(opCtx.get(), - {kShardId1, - collName, - chunk, - MoveChunkRequest::ForceJumbo::kDoNotForce, - MigrateInfo::chunksImbalance}, - 0, - kDefaultSecondaryThrottle, - false)); - }); - - // Wait till the move chunk request gets sent and pretend that it is stuck by never responding - // to the request - network()->enterNetwork(); - network()->blackHole(network()->getNextReadyRequest()); - network()->exitNetwork(); - - // Now that the migration request is 'pending', try to cancel the migration manager. This should - // succeed. - _migrationManager->interruptAndDisableMigrations(); - - // Ensure that cancellations get processed - network()->enterNetwork(); - network()->runReadyNetworkOperations(); - network()->exitNetwork(); - - // Ensure that the previously scheduled migration is cancelled - future.default_timed_get(); - - // Ensure that no new migrations can be scheduled - ASSERT_EQ(ErrorCodes::BalancerInterrupted, - _migrationManager->executeManualMigration(operationContext(), - {kShardId1, - collName, - chunk, - MoveChunkRequest::ForceJumbo::kDoNotForce, - MigrateInfo::chunksImbalance}, - 0, - kDefaultSecondaryThrottle, - false)); - - // Ensure that the migration manager is no longer handling any migrations. - _migrationManager->drainActiveMigrations(); - - // Check that the migration that was active when the migration manager was interrupted can be - // found in config.migrations (and thus would be recovered if a migration manager were to start - // up again). - auto statusWithMigrationsQueryResponse = - shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - operationContext(), - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kMajorityReadConcern, - MigrationType::ConfigNS, - BSON(MigrationType::ns(collName.ns()) << MigrationType::min(chunk.getMin())), - BSONObj(), - boost::none); - Shard::QueryResponse migrationsQueryResponse = - uassertStatusOK(statusWithMigrationsQueryResponse); - ASSERT_EQUALS(1U, migrationsQueryResponse.docs.size()); - - ASSERT_OK(catalogClient()->removeConfigDocuments( - operationContext(), - MigrationType::ConfigNS, - BSON(MigrationType::ns(collName.ns()) << MigrationType::min(chunk.getMin())), - kMajorityWriteConcern)); - - // Restore the migration manager back to the started state, which is expected by tearDown - _migrationManager->startRecoveryAndAcquireDistLocks(operationContext()); - _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle); -} - -TEST_F(MigrationManagerTest, RestartMigrationManager) { - // Set up one shard in the metadata. - ASSERT_OK(catalogClient()->insertConfigDocument( - operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); - - // Set up the database and collection as sharded in the metadata. - const std::string dbName = "foo"; - const NamespaceString collName(dbName, "bar"); - const auto collUUID = UUID::gen(); - ChunkVersion version(2, 0, OID::gen(), Timestamp(42)); - - setUpDatabase(dbName, kShardId0); - setUpCollection(collName, collUUID, version); - - // Set up a single chunk in the metadata. - ChunkType chunk1 = - setUpChunk(collUUID, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version); - - // Go through the lifecycle of the migration manager - _migrationManager->interruptAndDisableMigrations(); - _migrationManager->drainActiveMigrations(); - _migrationManager->startRecoveryAndAcquireDistLocks(operationContext()); - _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle); - - auto future = launchAsync([&] { - ThreadClient tc("Test", getServiceContext()); - auto opCtx = cc().makeOperationContext(); - - // Scheduling a moveChunk command requires finding a host to which to send the command. Set - // up a dummy host for kShardHost0. - shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0); - - ASSERT_OK( - _migrationManager->executeManualMigration(opCtx.get(), - {kShardId1, - collName, - chunk1, - MoveChunkRequest::ForceJumbo::kDoNotForce, - MigrateInfo::chunksImbalance}, - 0, - kDefaultSecondaryThrottle, - false)); - }); - - // Expect only one moveChunk command to be called. - expectMoveChunkCommand(collName, chunk1, kShardId1, Status::OK()); - - // Run the MigrationManager code. - future.default_timed_get(); -} - -TEST_F(MigrationManagerTest, MigrationRecovery) { - // Set up two shards in the metadata. - ASSERT_OK(catalogClient()->insertConfigDocument( - operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); - ASSERT_OK(catalogClient()->insertConfigDocument( - operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern)); - - // Set up the database and collection as sharded in the metadata. - const std::string dbName = "foo"; - const NamespaceString collName(dbName, "bar"); - const auto collUUID = UUID::gen(); - ChunkVersion version(1, 0, OID::gen(), Timestamp(42)); - - setUpDatabase(dbName, kShardId0); - setUpCollection(collName, collUUID, version); - - // Set up two chunks in the metadata and set up two fake active migrations by writing documents - // to the config.migrations collection. - ChunkType chunk1 = - setUpChunk(collUUID, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version); - version.incMinor(); - ChunkType chunk2 = - setUpChunk(collUUID, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version); - - _migrationManager->interruptAndDisableMigrations(); - _migrationManager->drainActiveMigrations(); - - setUpMigration(collName, chunk1, kShardId1.toString()); - setUpMigration(collName, chunk2, kShardId3.toString()); - - // Mimic all config distlocks being released on config server stepup to primary. - DistLockManager::get(operationContext())->unlockAll(operationContext()); - - _migrationManager->startRecoveryAndAcquireDistLocks(operationContext()); - - auto future = launchAsync([this] { - ThreadClient tc("Test", getServiceContext()); - auto opCtx = cc().makeOperationContext(); - - // Scheduling the moveChunk commands requires finding hosts to which to send the commands. - // Set up dummy hosts for the source shards. - shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0); - shardTargeterMock(opCtx.get(), kShardId2)->setFindHostReturnValue(kShardHost2); - - _migrationManager->finishRecovery(opCtx.get(), 0, kDefaultSecondaryThrottle); - }); - - // Expect two moveChunk commands. - expectMoveChunkCommand(collName, chunk1, kShardId1, Status::OK()); - expectMoveChunkCommand(collName, chunk2, kShardId3, Status::OK()); - - // Run the MigrationManager code. - future.default_timed_get(); -} - -TEST_F(MigrationManagerTest, FailMigrationRecovery) { - // Set up two shards in the metadata. - ASSERT_OK(catalogClient()->insertConfigDocument( - operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); - ASSERT_OK(catalogClient()->insertConfigDocument( - operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern)); - - // Set up the database and collection as sharded in the metadata. - const std::string dbName = "foo"; - const NamespaceString collName(dbName, "bar"); - const auto collUUID = UUID::gen(); - ChunkVersion version(1, 0, OID::gen(), Timestamp(42)); - - setUpDatabase(dbName, kShardId0); - setUpCollection(collName, collUUID, version); - - // Set up two chunks in the metadata. - ChunkType chunk1 = - setUpChunk(collUUID, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version); - version.incMinor(); - ChunkType chunk2 = - setUpChunk(collUUID, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version); - - _migrationManager->interruptAndDisableMigrations(); - _migrationManager->drainActiveMigrations(); - - setUpMigration(collName, chunk1, kShardId1.toString()); - - // Set up a fake active migration document that will fail MigrationType parsing -- missing - // field. - BSONObjBuilder builder; - builder.append("_id", "testing"); - // No MigrationType::ns() field! - builder.append(MigrationType::min(), chunk2.getMin()); - builder.append(MigrationType::max(), chunk2.getMax()); - builder.append(MigrationType::toShard(), kShardId3.toString()); - builder.append(MigrationType::fromShard(), chunk2.getShard().toString()); - ASSERT_OK(catalogClient()->insertConfigDocument( - operationContext(), MigrationType::ConfigNS, builder.obj(), kMajorityWriteConcern)); - - // Take the distributed lock for the collection, which should be released during recovery when - // it fails. Any dist lock held by the config server will be released via proccessId, so the - // session ID used here doesn't matter. - ASSERT_OK(DistLockManager::get(operationContext()) - ->lockDirect(operationContext(), - collName.ns(), - "MigrationManagerTest", - DistLockManager::kSingleLockAttemptTimeout)); - - _migrationManager->startRecoveryAndAcquireDistLocks(operationContext()); - _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle); - - // MigrationManagerTest::tearDown checks that the config.migrations collection is empty and all - // distributed locks are unlocked. -} - -// Check that retriable / replset monitor altering errors returned from remote moveChunk commands -// sent to source shards are not returned to the caller (mongos), but instead converted into -// OperationFailed errors. -TEST_F(MigrationManagerTest, RemoteCallErrorConversionToOperationFailed) { - // Set up two shards in the metadata. - ASSERT_OK(catalogClient()->insertConfigDocument( - operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); - ASSERT_OK(catalogClient()->insertConfigDocument( - operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern)); - - // Set up the database and collection as sharded in the metadata. - const std::string dbName = "foo"; - const NamespaceString collName(dbName, "bar"); - const auto collUUID = UUID::gen(); - ChunkVersion version(1, 0, OID::gen(), Timestamp(42)); - - setUpDatabase(dbName, kShardId0); - setUpCollection(collName, collUUID, version); - - // Set up two chunks in the metadata. - ChunkType chunk1 = - setUpChunk(collUUID, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version); - version.incMinor(); - ChunkType chunk2 = - setUpChunk(collUUID, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version); - - // Going to request that these two chunks get migrated. - const std::vector<MigrateInfo> migrationRequests{{kShardId1, - collName, - chunk1, - MoveChunkRequest::ForceJumbo::kDoNotForce, - MigrateInfo::chunksImbalance}, - {kShardId3, - collName, - chunk2, - MoveChunkRequest::ForceJumbo::kDoNotForce, - MigrateInfo::chunksImbalance}}; - - auto future = launchAsync([&] { - ThreadClient tc("Test", getServiceContext()); - auto opCtx = cc().makeOperationContext(); - - // Scheduling the moveChunk commands requires finding a host to which to send the command. - // Set up dummy hosts for the source shards. - shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0); - shardTargeterMock(opCtx.get(), kShardId2)->setFindHostReturnValue(kShardHost2); - - MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance( - opCtx.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false); - - ASSERT_EQ(ErrorCodes::OperationFailed, - migrationStatuses.at(migrationRequests.front().getName())); - ASSERT_EQ(ErrorCodes::OperationFailed, - migrationStatuses.at(migrationRequests.back().getName())); - }); - - // Expect a moveChunk command that will fail with a retriable error. - expectMoveChunkCommand( - collName, - chunk1, - kShardId1, - Status(ErrorCodes::NotPrimaryOrSecondary, - "RemoteCallErrorConversionToOperationFailedCheck generated error.")); - - // Expect a moveChunk command that will fail with a replset monitor updating error. - expectMoveChunkCommand( - collName, - chunk2, - kShardId3, - Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, - "RemoteCallErrorConversionToOperationFailedCheck generated error.")); - - // Run the MigrationManager code. - future.default_timed_get(); -} - -} // namespace -} // namespace mongo diff --git a/src/mongo/db/s/balancer/scoped_migration_request.cpp b/src/mongo/db/s/balancer/scoped_migration_request.cpp deleted file mode 100644 index 4e44b2d7e7e..00000000000 --- a/src/mongo/db/s/balancer/scoped_migration_request.cpp +++ /dev/null @@ -1,217 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding - -#include "mongo/platform/basic.h" - -#include "mongo/db/s/balancer/scoped_migration_request.h" - -#include "mongo/db/s/balancer/type_migration.h" -#include "mongo/db/write_concern_options.h" -#include "mongo/logv2/log.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/grid.h" - -namespace mongo { -namespace { - -const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, - WriteConcernOptions::SyncMode::UNSET, - WriteConcernOptions::kWriteConcernTimeoutMigration); -const int kDuplicateKeyErrorMaxRetries = 2; - -} // namespace - -ScopedMigrationRequest::ScopedMigrationRequest(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& minKey) - : _opCtx(opCtx), _nss(nss), _minKey(minKey) {} - -ScopedMigrationRequest::~ScopedMigrationRequest() { - if (!_opCtx) { - // If the opCtx object was cleared, nothing should happen in the destructor. - return; - } - - // Try to delete the entry in the config.migrations collection. If the command fails, that is - // okay. - BSONObj migrationDocumentIdentifier = - BSON(MigrationType::ns(_nss.ns()) << MigrationType::min(_minKey)); - Status result = Grid::get(_opCtx)->catalogClient()->removeConfigDocuments( - _opCtx, MigrationType::ConfigNS, migrationDocumentIdentifier, kMajorityWriteConcern); - - if (!result.isOK()) { - LOGV2(21900, - "Failed to remove config.migrations document for migration '{migration}': {error}", - "Failed to remove config.migrations document for migration", - "migration"_attr = migrationDocumentIdentifier, - "error"_attr = redact(result)); - } -} - -ScopedMigrationRequest::ScopedMigrationRequest(ScopedMigrationRequest&& other) { - // This function relies on the move assigment to nullify 'other._opCtx'. If this is no longer - // the case, this function should be updated to ensure that it nulls out 'other._opCtx'. - *this = std::move(other); -} - -ScopedMigrationRequest& ScopedMigrationRequest::operator=(ScopedMigrationRequest&& other) { - if (this != &other) { - _opCtx = other._opCtx; - _nss = other._nss; - _minKey = other._minKey; - // Set opCtx to null so that the destructor will do nothing. - other._opCtx = nullptr; - } - - return *this; -} - -StatusWith<ScopedMigrationRequest> ScopedMigrationRequest::writeMigration( - OperationContext* opCtx, const MigrateInfo& migrateInfo, bool waitForDelete) { - auto const grid = Grid::get(opCtx); - - // Try to write a unique migration document to config.migrations. - const MigrationType migrationType(migrateInfo, waitForDelete); - - for (int retry = 0; retry < kDuplicateKeyErrorMaxRetries; ++retry) { - Status result = grid->catalogClient()->insertConfigDocument( - opCtx, MigrationType::ConfigNS, migrationType.toBSON(), kMajorityWriteConcern); - - if (result == ErrorCodes::DuplicateKey) { - // If the exact migration described by "migrateInfo" is active, return a scoped object - // for the request because this migration request will join the active one once - // scheduled. - auto statusWithMigrationQueryResult = - grid->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - MigrationType::ConfigNS, - migrateInfo.getMigrationTypeQuery(), - BSONObj(), - boost::none); - if (!statusWithMigrationQueryResult.isOK()) { - return statusWithMigrationQueryResult.getStatus().withContext( - str::stream() << "Failed to verify whether conflicting migration is in " - << "progress for migration '" << redact(migrateInfo.toString()) - << "' while trying to query config.migrations."); - } - if (statusWithMigrationQueryResult.getValue().docs.empty()) { - // The document that caused the DuplicateKey error is no longer in the collection, - // so retrying the insert might succeed. - continue; - } - invariant(statusWithMigrationQueryResult.getValue().docs.size() == 1); - - BSONObj activeMigrationBSON = statusWithMigrationQueryResult.getValue().docs.front(); - auto statusWithActiveMigration = MigrationType::fromBSON(activeMigrationBSON); - if (!statusWithActiveMigration.isOK()) { - return statusWithActiveMigration.getStatus().withContext( - str::stream() << "Failed to verify whether conflicting migration is in " - << "progress for migration '" << redact(migrateInfo.toString()) - << "' while trying to parse active migration document '" - << redact(activeMigrationBSON.toString()) << "'."); - } - - MigrateInfo activeMigrateInfo = - statusWithActiveMigration.getValue().toMigrateInfo(migrateInfo.uuid); - if (activeMigrateInfo.to != migrateInfo.to || - activeMigrateInfo.from != migrateInfo.from) { - LOGV2( - 21901, - "Failed to write document '{newMigration}' to config.migrations because there " - "is already an active migration for that chunk: " - "'{activeMigration}': {error}", - "Failed to write document to config.migrations because there " - "is already an active migration for that chunk", - "newMigration"_attr = redact(migrateInfo.toString()), - "activeMigration"_attr = redact(activeMigrateInfo.toString()), - "error"_attr = redact(result)); - return result; - } - - result = Status::OK(); - } - - // As long as there isn't a DuplicateKey error, the document may have been written, and it's - // safe (won't delete another migration's document) and necessary to try to clean up the - // document via the destructor. - ScopedMigrationRequest scopedMigrationRequest(opCtx, migrateInfo.nss, migrateInfo.minKey); - - // If there was a write error, let the object go out of scope and clean up in the - // destructor. - if (!result.isOK()) { - return result; - } - - return std::move(scopedMigrationRequest); - } - - return Status(ErrorCodes::OperationFailed, - str::stream() << "Failed to insert the config.migrations document after max " - << "number of retries. Chunk '" - << ChunkRange(migrateInfo.minKey, migrateInfo.maxKey).toString() - << "' in collection '" << migrateInfo.nss - << "' was being moved (somewhere) by another operation."); -} - -ScopedMigrationRequest ScopedMigrationRequest::createForRecovery(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& minKey) { - return ScopedMigrationRequest(opCtx, nss, minKey); -} - -Status ScopedMigrationRequest::tryToRemoveMigration() { - invariant(_opCtx); - BSONObj migrationDocumentIdentifier = - BSON(MigrationType::ns(_nss.ns()) << MigrationType::min(_minKey)); - Status status = Grid::get(_opCtx)->catalogClient()->removeConfigDocuments( - _opCtx, MigrationType::ConfigNS, migrationDocumentIdentifier, kMajorityWriteConcern); - if (status.isOK()) { - // Don't try to do a no-op remove in the destructor. - _opCtx = nullptr; - } - return status; -} - -void ScopedMigrationRequest::keepDocumentOnDestruct() { - invariant(_opCtx); - _opCtx = nullptr; - LOGV2_DEBUG(21902, - 1, - "Keeping config.migrations document with namespace {namespace} and minKey " - "{minKey} for balancer recovery", - "Keeping config.migrations document for balancer recovery", - "namespace"_attr = _nss, - "minKey"_attr = _minKey); -} - -} // namespace mongo diff --git a/src/mongo/db/s/balancer/scoped_migration_request.h b/src/mongo/db/s/balancer/scoped_migration_request.h deleted file mode 100644 index bfb58ba103c..00000000000 --- a/src/mongo/db/s/balancer/scoped_migration_request.h +++ /dev/null @@ -1,120 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/base/status_with.h" -#include "mongo/db/s/balancer/balancer_policy.h" -#include "mongo/s/request_types/migration_secondary_throttle_options.h" - -namespace mongo { - -/** - * RAII class that handles writes to the config.migrations collection for a migration that comes - * through the balancer. - * - * A migration must have an entry in the config.migrations collection so that the Balancer can - * recover from stepdown/crash. This entry must be entered before a migration begins and then - * removed once the migration has finished. - * - * This class should only be used by the Balancer! - */ -class ScopedMigrationRequest { - ScopedMigrationRequest(const ScopedMigrationRequest&) = delete; - ScopedMigrationRequest& operator=(const ScopedMigrationRequest&) = delete; - -public: - /** - * Deletes this migration's entry in the config.migrations collection, using majority write - * concern. If there is a balancer stepdown/crash before the write propagates to a majority of - * servers, that is alright because the balancer recovery process will handle it. - * - * If keepDocumentOnDestruct() has been called, then no attempt to remove the document is made. - */ - ~ScopedMigrationRequest(); - - ScopedMigrationRequest(ScopedMigrationRequest&& other); - ScopedMigrationRequest& operator=(ScopedMigrationRequest&& other); - - /** - * Inserts an unique migration entry in the config.migrations collection. If the write is - * successful, a ScopedMigrationRequest object is returned; otherwise, the write error. - * - * The destructor will handle removing the document when it is no longer needed. - */ - static StatusWith<ScopedMigrationRequest> writeMigration(OperationContext* opCtx, - const MigrateInfo& migrate, - bool waitForDelete); - - /** - * Creates a ScopedMigrationRequest object without inserting a document into config.migrations. - * The destructor will handle removing the migration document when it is no longer needed. - * - * This should only be used on Balancer recovery when a config.migrations document already - * exists for the migration. - */ - static ScopedMigrationRequest createForRecovery(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& minKey); - - /** - * Do not call if keepDocumentOnDestruct has been called previously: it will invariant. - * - * Attempts to delete this migration's entry in the config.migrations collection using majority - * write concern. If successful, clears the operation context so that the destructor will not - * redundantly try to remove an already successfully deleted document. - */ - Status tryToRemoveMigration(); - - /** - * Do not call if tryToRemoveMigration has been called previously: it may invariant. - * - * Clears the operation context so that the destructor will not remove the config.migrations - * document for the migration. - * - * This should only be used on the Balancer when it is interrupted and must leave entries in - * config.migrations so that ongoing migrations can be recovered later. - */ - void keepDocumentOnDestruct(); - -private: - ScopedMigrationRequest(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& minKey); - - // Need an operation context with which to do a write in the destructor. - OperationContext* _opCtx; - - // ns and minkey are needed to identify the migration document when it is removed from - // config.migrations by the destructor. - NamespaceString _nss; - BSONObj _minKey; -}; - -} // namespace mongo diff --git a/src/mongo/db/s/balancer/scoped_migration_request_test.cpp b/src/mongo/db/s/balancer/scoped_migration_request_test.cpp deleted file mode 100644 index 414c2005db5..00000000000 --- a/src/mongo/db/s/balancer/scoped_migration_request_test.cpp +++ /dev/null @@ -1,246 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/db/s/balancer/scoped_migration_request.h" -#include "mongo/db/s/balancer/type_migration.h" -#include "mongo/db/s/config/config_server_test_fixture.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/request_types/migration_secondary_throttle_options.h" - -namespace mongo { -namespace { - -using unittest::assertGet; - -const std::string kNs = "TestDB.TestColl"; -const BSONObj kMin = BSON("a" << 10); -const BSONObj kMax = BSON("a" << 20); -const ShardId kFromShard("shard0000"); -const ShardId kToShard("shard0001"); -const ShardId kDifferentToShard("shard0002"); - -class ScopedMigrationRequestTest : public ConfigServerTestFixture { -public: - /** - * Queries config.migrations for the document pertaining to migrateInfo and asserts that the - * number of documents returned equals "expectedNumberOfDocuments". - */ - void checkMigrationsCollectionForDocument(const MigrateInfo& migrateInfo, - unsigned long expectedNumberOfDocuments); - - /** - * Makes a ScopedMigrationRequest and checks that the migration was written to - * config.migrations. This exercises the ScopedMigrationRequest move and assignment - * constructors. - */ - ScopedMigrationRequest makeScopedMigrationRequest(const MigrateInfo& migrateInfo); - - MigrateInfo makeMigrateInfo(); - -private: - void setUp() override; -}; - -void ScopedMigrationRequestTest::setUp() { - setUpAndInitializeConfigDb(); - setupShards({ShardType{"shard", "shard:12"}}); -} - -void ScopedMigrationRequestTest::checkMigrationsCollectionForDocument( - const MigrateInfo& migrateInfo, unsigned long expectedNumberOfDocuments) { - auto const query = BSON(MigrationType::ns(kNs) << MigrationType::min(migrateInfo.minKey)); - auto response = shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - operationContext(), - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kMajorityReadConcern, - MigrationType::ConfigNS, - query, - BSONObj(), - boost::none); - Shard::QueryResponse queryResponse = unittest::assertGet(response); - std::vector<BSONObj> docs = queryResponse.docs; - ASSERT_EQUALS(expectedNumberOfDocuments, docs.size()); -} - -ScopedMigrationRequest ScopedMigrationRequestTest::makeScopedMigrationRequest( - const MigrateInfo& migrateInfo) { - ScopedMigrationRequest scopedMigrationRequest = - assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo, false)); - - checkMigrationsCollectionForDocument(migrateInfo, 1); - - return scopedMigrationRequest; -} - -MigrateInfo ScopedMigrationRequestTest::makeMigrateInfo() { - const NamespaceString collNss{kNs}; - const auto collUuid = UUID::gen(); - const ChunkVersion kChunkVersion{1, 2, OID::gen(), Timestamp(1, 1)}; - - BSONObjBuilder chunkBuilder; - collUuid.appendToBuilder(&chunkBuilder, ChunkType::collectionUUID.name()); - chunkBuilder.append(ChunkType::min(), kMin); - chunkBuilder.append(ChunkType::max(), kMax); - kChunkVersion.appendLegacyWithField(&chunkBuilder, ChunkType::lastmod()); - chunkBuilder.append(ChunkType::shard(), kFromShard.toString()); - - ChunkType chunkType = assertGet(ChunkType::parseFromConfigBSONCommand(chunkBuilder.obj())); - ASSERT_OK(chunkType.validate()); - - // Initialize the sharded TO collection - setupCollection(collNss, KeyPattern(BSON("_id" << 1)), {chunkType}); - - return MigrateInfo(kToShard, - collNss, - chunkType, - MoveChunkRequest::ForceJumbo::kDoNotForce, - MigrateInfo::chunksImbalance); -} - -TEST_F(ScopedMigrationRequestTest, CreateScopedMigrationRequest) { - MigrateInfo migrateInfo = makeMigrateInfo(); - - { - ScopedMigrationRequest scopedMigrationRequest = assertGet( - ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo, false)); - - checkMigrationsCollectionForDocument(migrateInfo, 1); - } - - checkMigrationsCollectionForDocument(migrateInfo, 0); -} - -/** - * A document is created via scoped object, but document is not removed in destructor because - * keepDocumentOnDestruct() is called beforehand. Then recreate the scoped object without writing to - * the migraitons collection, and remove on destruct. - * - * Simulates (mostly) Balancer recovery. - */ -TEST_F(ScopedMigrationRequestTest, CreateScopedMigrationRequestOnRecovery) { - MigrateInfo migrateInfo = makeMigrateInfo(); - - // Insert the document for the MigrationRequest and then prevent its removal in the destructor. - { - ScopedMigrationRequest scopedMigrationRequest = assertGet( - ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo, false)); - - checkMigrationsCollectionForDocument(migrateInfo, 1); - - scopedMigrationRequest.keepDocumentOnDestruct(); - } - - checkMigrationsCollectionForDocument(migrateInfo, 1); - - // Fail to write a migration document if a migration document already exists for that chunk but - // with a different destination shard. (the migration request must have identical parameters). - { - MigrateInfo differentToShardMigrateInfo = migrateInfo; - differentToShardMigrateInfo.to = kDifferentToShard; - - StatusWith<ScopedMigrationRequest> statusWithScopedMigrationRequest = - ScopedMigrationRequest::writeMigration( - operationContext(), differentToShardMigrateInfo, false); - - ASSERT_EQUALS(ErrorCodes::DuplicateKey, statusWithScopedMigrationRequest.getStatus()); - - checkMigrationsCollectionForDocument(migrateInfo, 1); - } - - // Create a new scoped object without inserting a document, and check that the destructor - // still removes the document corresponding to the MigrationRequest. - { - ScopedMigrationRequest scopedMigrationRequest = ScopedMigrationRequest::createForRecovery( - operationContext(), NamespaceString(kNs), migrateInfo.minKey); - - checkMigrationsCollectionForDocument(migrateInfo, 1); - } - - checkMigrationsCollectionForDocument(migrateInfo, 0); -} - -TEST_F(ScopedMigrationRequestTest, CreateMultipleScopedMigrationRequestsForIdenticalMigration) { - MigrateInfo migrateInfo = makeMigrateInfo(); - - { - // Create a ScopedMigrationRequest, which will do the config.migrations write. - ScopedMigrationRequest scopedMigrationRequest = assertGet( - ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo, false)); - - checkMigrationsCollectionForDocument(migrateInfo, 1); - - { - // Should be able to create another Scoped object if the request is identical. - ScopedMigrationRequest identicalScopedMigrationRequest = assertGet( - ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo, false)); - - checkMigrationsCollectionForDocument(migrateInfo, 1); - } - - // If any scoped object goes out of scope, the migration should be over and the document - // removed. - checkMigrationsCollectionForDocument(migrateInfo, 0); - } - - checkMigrationsCollectionForDocument(migrateInfo, 0); -} - -TEST_F(ScopedMigrationRequestTest, TryToRemoveScopedMigrationRequestBeforeDestruct) { - MigrateInfo migrateInfo = makeMigrateInfo(); - - // Remove the migration document with tryToRemoveMigration(). - ScopedMigrationRequest scopedMigrationRequest = - assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo, false)); - - checkMigrationsCollectionForDocument(migrateInfo, 1); - - ASSERT_OK(scopedMigrationRequest.tryToRemoveMigration()); - - checkMigrationsCollectionForDocument(migrateInfo, 0); -} - -TEST_F(ScopedMigrationRequestTest, MoveAndAssignmentConstructors) { - MigrateInfo migrateInfo = makeMigrateInfo(); - - // Test that when the move and assignment constructors are used and the original variable goes - // out of scope, the original object's destructor does not remove the migration document. - { - ScopedMigrationRequest anotherScopedMigrationRequest = - makeScopedMigrationRequest(migrateInfo); - - checkMigrationsCollectionForDocument(migrateInfo, 1); - } - - checkMigrationsCollectionForDocument(migrateInfo, 0); -} - -} // namespace -} // namespace mongo |