diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/base/error_codes.err | 1 | ||||
-rw-r--r-- | src/mongo/client/remote_command_retry_scheduler.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/balancer/balancer.cpp | 54 | ||||
-rw-r--r-- | src/mongo/s/balancer/balancer_policy.h | 7 | ||||
-rw-r--r-- | src/mongo/s/balancer/migration_manager.cpp | 321 | ||||
-rw-r--r-- | src/mongo/s/balancer/migration_manager.h | 72 | ||||
-rw-r--r-- | src/mongo/s/balancer/migration_manager_test.cpp | 226 | ||||
-rw-r--r-- | src/mongo/s/balancer/scoped_migration_request.cpp | 39 | ||||
-rw-r--r-- | src/mongo/s/balancer/scoped_migration_request.h | 5 | ||||
-rw-r--r-- | src/mongo/s/balancer/scoped_migration_request_test.cpp | 53 | ||||
-rw-r--r-- | src/mongo/s/balancer/type_migration.cpp | 34 | ||||
-rw-r--r-- | src/mongo/s/balancer/type_migration.h | 11 | ||||
-rw-r--r-- | src/mongo/s/balancer/type_migration_test.cpp | 66 | ||||
-rw-r--r-- | src/mongo/s/move_chunk_request.cpp | 8 |
15 files changed, 666 insertions, 238 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index 8f442c018a4..30be05d7444 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -192,6 +192,7 @@ error_code("MasterSlaveConnectionFailure", 190) error_code("BalancerLostDistributedLock", 191) error_code("FailPointEnabled", 192) error_code("NoShardingEnabled", 193) +error_code("BalancerInterrupted", 194) # Non-sequential error codes (for compatibility only) error_code("SocketException", 9001) diff --git a/src/mongo/client/remote_command_retry_scheduler.cpp b/src/mongo/client/remote_command_retry_scheduler.cpp index f72874a33a4..5bbd533ead3 100644 --- a/src/mongo/client/remote_command_retry_scheduler.cpp +++ b/src/mongo/client/remote_command_retry_scheduler.cpp @@ -96,7 +96,8 @@ const std::initializer_list<ErrorCodes::Error> RemoteCommandRetryScheduler::kAll ErrorCodes::HostNotFound, ErrorCodes::NetworkTimeout, ErrorCodes::PrimarySteppedDown, - ErrorCodes::InterruptedDueToReplStateChange}; + ErrorCodes::InterruptedDueToReplStateChange, + ErrorCodes::BalancerInterrupted}; std::unique_ptr<RemoteCommandRetryScheduler::RetryPolicy> RemoteCommandRetryScheduler::makeNoRetryPolicy() { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index dbf9c305f0a..15484b6729f 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -694,10 +694,6 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook << causedBy(shardAwareInitializationStatus); } - // Free any leftover locks from previous instantiations - auto distLockManager = Grid::get(txn)->catalogClient(txn)->getDistLockManager(); - distLockManager->unlockAll(txn, distLockManager->getProcessID()); - // If this is a config server node becoming a primary, start the balancer Balancer::get(txn)->startThread(txn); } else if (ShardingState::get(txn)->enabled()) { diff --git a/src/mongo/s/balancer/balancer.cpp b/src/mongo/s/balancer/balancer.cpp index 3c04c3145ca..56d376e35b2 100644 --- a/src/mongo/s/balancer/balancer.cpp +++ b/src/mongo/s/balancer/balancer.cpp @@ -48,6 +48,7 @@ #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" +#include "mongo/s/cluster_identity_loader.h" #include "mongo/s/grid.h" #include "mongo/s/shard_util.h" #include "mongo/s/sharding_raii.h" @@ -181,9 +182,7 @@ Status Balancer::startThread(OperationContext* txn) { invariant(!_thread.joinable()); _state = kRunning; - // Allow new migrations to be scheduled - _migrationManager.enableMigrations(); - + _migrationManager.startRecovery(); _thread = stdx::thread([this] { _mainThread(); }); // Intentional fall through case kRunning: @@ -196,9 +195,6 @@ Status Balancer::startThread(OperationContext* txn) { void Balancer::stopThread() { stdx::lock_guard<stdx::mutex> scopedLock(_mutex); if (_state == kRunning) { - // Stop any active migrations and prevent any new migrations from getting scheduled - _migrationManager.interruptAndDisableMigrations(); - // Request the balancer thread to stop _state = kStopping; _condVar.notify_all(); @@ -218,9 +214,6 @@ void Balancer::joinThread() { if (_thread.joinable()) { _thread.join(); - // Wait for any scheduled migrations to finish draining - _migrationManager.drainActiveMigrations(); - stdx::lock_guard<stdx::mutex> scopedLock(_mutex); _state = kStopped; _thread = {}; @@ -291,36 +284,47 @@ void Balancer::report(OperationContext* txn, BSONObjBuilder* builder) { void Balancer::_mainThread() { Client::initThread("Balancer"); + auto txn = cc().makeOperationContext(); + auto shardingContext = Grid::get(txn.get()); log() << "CSRS balancer is starting"; const Seconds kInitBackoffInterval(10); + OID clusterIdentity = ClusterIdentityLoader::get(txn.get())->getClusterId(); + // Take the balancer distributed lock and hold it permanently while (!_stopRequested()) { - auto txn = cc().makeOperationContext(); - auto shardingContext = Grid::get(txn.get()); - auto distLockHandleStatus = shardingContext->catalogClient(txn.get())->getDistLockManager()->lockWithSessionID( - txn.get(), "balancer", "CSRS Balancer", OID::gen()); - if (distLockHandleStatus.isOK()) { - break; - } + txn.get(), "balancer", "CSRS Balancer", clusterIdentity); + if (!distLockHandleStatus.isOK()) { + warning() << "Balancer distributed lock could not be acquired and will be retried in " + << durationCount<Seconds>(kInitBackoffInterval) << " seconds" + << causedBy(distLockHandleStatus.getStatus()); - warning() << "Balancer distributed lock could not be acquired and will be retried in " - << durationCount<Seconds>(kInitBackoffInterval) << " seconds" - << causedBy(distLockHandleStatus.getStatus()); + _sleepFor(txn.get(), kInitBackoffInterval); + continue; + } - _sleepFor(txn.get(), kInitBackoffInterval); + break; } - log() << "CSRS balancer thread is now running"; + if (!_stopRequested()) { + log() << "CSRS balancer thread for cluster " << clusterIdentity << " is recovering"; + + auto balancerConfig = Grid::get(txn.get())->getBalancerConfiguration(); + _migrationManager.finishRecovery(txn.get(), + clusterIdentity, + balancerConfig->getMaxChunkSizeBytes(), + balancerConfig->getSecondaryThrottle(), + balancerConfig->waitForDelete()); + + log() << "CSRS balancer thread for cluster " << clusterIdentity << " is recovered"; + } // Main balancer loop while (!_stopRequested()) { - auto txn = cc().makeOperationContext(); - auto shardingContext = Grid::get(txn.get()); auto balancerConfig = shardingContext->getBalancerConfiguration(); BalanceRoundDetails roundDetails; @@ -400,6 +404,10 @@ void Balancer::_mainThread() { } } + // Stop any active migrations and prevent any new migrations from getting scheduled + _migrationManager.interruptAndDisableMigrations(); + _migrationManager.drainActiveMigrations(); + log() << "CSRS balancer is now stopped"; } diff --git a/src/mongo/s/balancer/balancer_policy.h b/src/mongo/s/balancer/balancer_policy.h index 965e6519be5..97a1f2e91b1 100644 --- a/src/mongo/s/balancer/balancer_policy.h +++ b/src/mongo/s/balancer/balancer_policy.h @@ -61,6 +61,13 @@ struct MigrateInfo { minKey(a_chunk.getMin()), maxKey(a_chunk.getMax()) {} + MigrateInfo(const std::string& a_ns, + const ShardId& a_to, + const ShardId& a_from, + const BSONObj& a_minKey, + const BSONObj& a_maxKey) + : ns(a_ns), to(a_to), from(a_from), minKey(a_minKey), maxKey(a_maxKey) {} + std::string getName() const; std::string toString() const; diff --git a/src/mongo/s/balancer/migration_manager.cpp b/src/mongo/s/balancer/migration_manager.cpp index ba4a488ffcb..aa5beb38729 100644 --- a/src/mongo/s/balancer/migration_manager.cpp +++ b/src/mongo/s/balancer/migration_manager.cpp @@ -40,12 +40,15 @@ #include "mongo/db/client.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/balancer/scoped_migration_request.h" +#include "mongo/s/balancer/type_migration.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/move_chunk_request.h" #include "mongo/s/sharding_raii.h" +#include "mongo/util/log.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/scopeguard.h" @@ -60,21 +63,24 @@ using str::stream; namespace { const char kChunkTooBig[] = "chunkTooBig"; +const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, + WriteConcernOptions::SyncMode::UNSET, + Seconds(15)); /** * Parses the specified asynchronous command response and converts it to status to use as outcome of * an asynchronous migration command. It is necessary for two reasons: * - Preserve backwards compatibility with 3.2 and earlier, where the move chunk command instead of * returning a ChunkTooBig status includes an extra field in the response. - * - Convert CallbackCanceled errors into InterruptedDueToReplStateChange for the cases where the - * migration manager is being stopped at replica set stepdown. This return code allows the mongos - * calling logic to retry the operation on a new primary. + * - Convert CallbackCanceled errors into BalancerInterrupted for the cases where the migration + * manager is being stopped at replica set stepdown. This return code allows the mongos calling + * logic to retry the operation on a new primary. */ Status extractMigrationStatusFromRemoteCommandResponse(const RemoteCommandResponse& response, bool isStopping) { if (!response.isOK()) { if (response.status == ErrorCodes::CallbackCanceled && isStopping) { - return {ErrorCodes::InterruptedDueToReplStateChange, + return {ErrorCodes::BalancerInterrupted, "Migration interrupted because the balancer is stopping"}; } @@ -97,12 +103,14 @@ Status extractMigrationStatusFromRemoteCommandResponse(const RemoteCommandRespon /** * Blocking call to acquire the distributed collection lock for the specified namespace. */ -StatusWith<DistLockHandle> acquireDistLock(OperationContext* txn, const NamespaceString& nss) { +StatusWith<DistLockHandle> acquireDistLock(OperationContext* txn, + const OID& clusterIdentity, + const NamespaceString& nss) { const std::string whyMessage(stream() << "Migrating chunk(s) in collection " << nss.ns()); auto statusWithDistLockHandle = Grid::get(txn)->catalogClient(txn)->getDistLockManager()->lockWithSessionID( - txn, nss.ns(), whyMessage, OID::gen(), DistLockManager::kSingleLockAttemptTimeout); + txn, nss.ns(), whyMessage, clusterIdentity, DistLockManager::kSingleLockAttemptTimeout); if (!statusWithDistLockHandle.isOK()) { // If we get LockBusy while trying to acquire the collection distributed lock, this implies @@ -141,40 +149,71 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance( const MigrationSecondaryThrottleOptions& secondaryThrottle, bool waitForDelete) { - vector<std::pair<shared_ptr<Notification<Status>>, MigrateInfo>> responses; - - for (const auto& migrateInfo : migrateInfos) { - responses.emplace_back(_schedule(txn, - migrateInfo, - false, // Config server takes the collection dist lock - maxChunkSizeBytes, - secondaryThrottle, - waitForDelete), - migrateInfo); - } - MigrationStatuses migrationStatuses; vector<MigrateInfo> rescheduledMigrations; - // Wait for all the scheduled migrations to complete and note the ones, which failed with a - // LockBusy error code. These need to be executed serially, without the distributed lock being - // held by the config server for backwards compatibility with 3.2 shards. - for (auto& response : responses) { - auto notification = std::move(response.first); - auto migrateInfo = std::move(response.second); - - Status responseStatus = notification->get(); + { + std::map<MigrationIdentifier, ScopedMigrationRequest> scopedMigrationRequests; + vector<std::pair<shared_ptr<Notification<Status>>, MigrateInfo>> responses; + + for (const auto& migrateInfo : migrateInfos) { + // Write a document to the config.migrations collection, in case this migration must be + // recovered by the Balancer. Fail if the chunk is already moving. + auto statusWithScopedMigrationRequest = + ScopedMigrationRequest::writeMigration(txn, migrateInfo); + if (!statusWithScopedMigrationRequest.isOK()) { + migrationStatuses.emplace(migrateInfo.getName(), + std::move(statusWithScopedMigrationRequest.getStatus())); + continue; + } + scopedMigrationRequests.emplace(migrateInfo.getName(), + std::move(statusWithScopedMigrationRequest.getValue())); + + responses.emplace_back(_schedule(txn, + migrateInfo, + false, // Config server takes the collection dist lock + maxChunkSizeBytes, + secondaryThrottle, + waitForDelete), + migrateInfo); + } - if (responseStatus == ErrorCodes::LockBusy) { - rescheduledMigrations.emplace_back(std::move(migrateInfo)); - } else { - migrationStatuses.emplace(migrateInfo.getName(), std::move(responseStatus)); + // Wait for all the scheduled migrations to complete and note the ones, which failed with a + // LockBusy error code. These need to be executed serially, without the distributed lock + // being held by the config server for backwards compatibility with 3.2 shards. + for (auto& response : responses) { + auto notification = std::move(response.first); + auto migrateInfo = std::move(response.second); + + Status responseStatus = notification->get(); + + if (responseStatus == ErrorCodes::LockBusy) { + rescheduledMigrations.emplace_back(std::move(migrateInfo)); + } else { + if (responseStatus == ErrorCodes::BalancerInterrupted) { + auto it = scopedMigrationRequests.find(migrateInfo.getName()); + invariant(it != scopedMigrationRequests.end()); + it->second.keepDocumentOnDestruct(); + } + + migrationStatuses.emplace(migrateInfo.getName(), std::move(responseStatus)); + } } } // Schedule all 3.2 compatibility migrations sequentially for (const auto& migrateInfo : rescheduledMigrations) { + // Write a document to the config.migrations collection, in case this migration must be + // recovered by the Balancer. Fail if the chunk is already moving. + auto statusWithScopedMigrationRequest = + ScopedMigrationRequest::writeMigration(txn, migrateInfo); + if (!statusWithScopedMigrationRequest.isOK()) { + migrationStatuses.emplace(migrateInfo.getName(), + std::move(statusWithScopedMigrationRequest.getStatus())); + continue; + } + Status responseStatus = _schedule(txn, migrateInfo, true, // Shard takes the collection dist lock @@ -183,6 +222,10 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance( waitForDelete) ->get(); + if (responseStatus == ErrorCodes::BalancerInterrupted) { + statusWithScopedMigrationRequest.getValue().keepDocumentOnDestruct(); + } + migrationStatuses.emplace(migrateInfo.getName(), std::move(responseStatus)); } @@ -197,6 +240,16 @@ Status MigrationManager::executeManualMigration( uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle, bool waitForDelete) { + _waitForRecovery(); + + // Write a document to the config.migrations collection, in case this migration must be + // recovered by the Balancer. Fail if the chunk is already moving. + auto statusWithScopedMigrationRequest = + ScopedMigrationRequest::writeMigration(txn, migrateInfo); + if (!statusWithScopedMigrationRequest.isOK()) { + return statusWithScopedMigrationRequest.getStatus(); + } + Status status = _schedule(txn, migrateInfo, false, // Config server takes the collection dist lock @@ -223,13 +276,150 @@ Status MigrationManager::executeManualMigration( return Status::OK(); } + if (status == ErrorCodes::BalancerInterrupted) { + statusWithScopedMigrationRequest.getValue().keepDocumentOnDestruct(); + } + return status; } -void MigrationManager::enableMigrations() { +void MigrationManager::startRecovery() { stdx::lock_guard<stdx::mutex> lock(_mutex); - invariant(_state == kStopped); - _state = kEnabled; + invariant(_state == State::kStopped); + _state = State::kRecovering; +} + +void MigrationManager::finishRecovery(OperationContext* txn, + const OID& clusterIdentity, + uint64_t maxChunkSizeBytes, + const MigrationSecondaryThrottleOptions& secondaryThrottle, + bool waitForDelete) { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_state == State::kRecovering); + if (!_clusterIdentity.isSet()) { + _clusterIdentity = clusterIdentity; + } + invariant(_clusterIdentity == clusterIdentity); + } + + // Load the active migrations from the config.migrations collection. + vector<MigrateInfo> migrateInfos; + + auto statusWithMigrationsQueryResponse = + Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString(MigrationType::ConfigNS), + BSONObj(), + BSONObj(), + boost::none); + + if (!statusWithMigrationsQueryResponse.isOK()) { + warning() << "Unable to read config.migrations collection documents for balancer migration" + << " recovery. Abandoning recovery." + << causedBy(redact(statusWithMigrationsQueryResponse.getStatus())); + + _abandonActiveMigrationsAndEnableManager(txn); + return; + } + + for (const BSONObj& migration : statusWithMigrationsQueryResponse.getValue().docs) { + auto statusWithMigrationType = MigrationType::fromBSON(migration); + if (!statusWithMigrationType.isOK()) { + // The format of this migration document is incorrect. The balancer holds a distlock for + // this migration, but without parsing the migration document we cannot identify which + // distlock must be released. So we must release all distlocks. + warning() << "Unable to parse config.migrations collection documents for balancer" + << " migration recovery. Abandoning recovery." + << causedBy(redact(statusWithMigrationType.getStatus())); + + _abandonActiveMigrationsAndEnableManager(txn); + return; + } + MigrateInfo migrateInfo = statusWithMigrationType.getValue().toMigrateInfo(); + + auto it = _migrationRecoveryMap.find(NamespaceString(migrateInfo.ns)); + if (it == _migrationRecoveryMap.end()) { + std::list<MigrateInfo> list; + it = _migrationRecoveryMap.insert(std::make_pair(NamespaceString(migrateInfo.ns), list)) + .first; + } + + it->second.push_back(std::move(migrateInfo)); + } + + // Schedule recovered migrations. + vector<ScopedMigrationRequest> scopedMigrationRequests; + vector<shared_ptr<Notification<Status>>> responses; + + for (auto& nssAndMigrateInfos : _migrationRecoveryMap) { + auto scopedCMStatus = ScopedChunkManager::getExisting(txn, nssAndMigrateInfos.first); + if (!scopedCMStatus.isOK()) { + // This shouldn't happen because the collection was intact and sharded when the previous + // config primary was active and the dist locks have been held by the balancer + // throughout. Abort migration recovery. + warning() << "Unable to reload chunk metadata for collection '" + << nssAndMigrateInfos.first << "' during balancer" + << " recovery. Abandoning recovery." + << causedBy(redact(scopedCMStatus.getStatus())); + + _abandonActiveMigrationsAndEnableManager(txn); + return; + } + + auto scopedCM = std::move(scopedCMStatus.getValue()); + ChunkManager* const cm = scopedCM.cm(); + cm->reload(txn); + + auto itMigrateInfo = nssAndMigrateInfos.second.begin(); + invariant(itMigrateInfo != nssAndMigrateInfos.second.end()); + while (itMigrateInfo != nssAndMigrateInfos.second.end()) { + auto chunk = cm->findIntersectingChunkWithSimpleCollation(txn, itMigrateInfo->minKey); + invariant(chunk); + + if (chunk->getShardId() != itMigrateInfo->from) { + // Chunk is no longer on the source shard of the migration. Erase the migration doc + // and drop the distlock if it is only held for this migration. + ScopedMigrationRequest::createForRecovery( + txn, NamespaceString(itMigrateInfo->ns), itMigrateInfo->minKey); + if (nssAndMigrateInfos.second.size() == 1) { + Grid::get(txn)->catalogClient(txn)->getDistLockManager()->unlock( + txn, _clusterIdentity, itMigrateInfo->ns); + } + itMigrateInfo = nssAndMigrateInfos.second.erase(itMigrateInfo); + } else { + scopedMigrationRequests.emplace_back(ScopedMigrationRequest::createForRecovery( + txn, NamespaceString(itMigrateInfo->ns), itMigrateInfo->minKey)); + + responses.emplace_back( + _schedule(txn, + *itMigrateInfo, + false, // Config server takes the collection dist lock + maxChunkSizeBytes, + secondaryThrottle, + waitForDelete)); + + ++itMigrateInfo; + } + } + } + + // The MigrationManager has now reacquired the state that it was in prior to any stepdown. + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_state == State::kRecovering); + + _migrationRecoveryMap.clear(); + _state = State::kEnabled; + _condVar.notify_all(); + } + + // Wait for each migration to finish, as usual. + for (auto& response : responses) { + response->get(); + } } void MigrationManager::interruptAndDisableMigrations() { @@ -237,11 +427,11 @@ void MigrationManager::interruptAndDisableMigrations() { Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor(); stdx::lock_guard<stdx::mutex> lock(_mutex); - if (_state != kEnabled) { + if (_state != State::kEnabled && _state != State::kRecovering) { return; } - _state = kStopping; + _state = State::kStopping; // Interrupt any active migrations with dist lock for (auto& cmsEntry : _activeMigrationsWithDistLock) { @@ -267,15 +457,15 @@ void MigrationManager::interruptAndDisableMigrations() { void MigrationManager::drainActiveMigrations() { stdx::unique_lock<stdx::mutex> lock(_mutex); - if (_state == kStopped) + if (_state == State::kStopped) return; - invariant(_state == kStopping); + invariant(_state == State::kStopping); - _stoppedCondVar.wait(lock, [this] { + _condVar.wait(lock, [this] { return _activeMigrationsWithDistLock.empty() && _activeMigrationsWithoutDistLock.empty(); }); - _state = kStopped; + _state = State::kStopped; } shared_ptr<Notification<Status>> MigrationManager::_schedule( @@ -290,9 +480,9 @@ shared_ptr<Notification<Status>> MigrationManager::_schedule( // Ensure we are not stopped in order to avoid doing the extra work { stdx::lock_guard<stdx::mutex> lock(_mutex); - if (_state != kEnabled) { + if (_state != State::kEnabled && _state != State::kRecovering) { return std::make_shared<Notification<Status>>( - Status(ErrorCodes::InterruptedDueToReplStateChange, + Status(ErrorCodes::BalancerInterrupted, "Migration cannot be executed because the balancer is not running")); } } @@ -349,9 +539,9 @@ shared_ptr<Notification<Status>> MigrationManager::_schedule( stdx::lock_guard<stdx::mutex> lock(_mutex); - if (_state != kEnabled) { + if (_state != State::kEnabled && _state != State::kRecovering) { return std::make_shared<Notification<Status>>( - Status(ErrorCodes::InterruptedDueToReplStateChange, + Status(ErrorCodes::BalancerInterrupted, "Migration cannot be executed because the balancer is not running")); } @@ -378,7 +568,7 @@ void MigrationManager::_scheduleWithDistLock_inlock(OperationContext* txn, auto it = _activeMigrationsWithDistLock.find(nss); if (it == _activeMigrationsWithDistLock.end()) { // Acquire the collection distributed lock (blocking call) - auto distLockHandleStatus = acquireDistLock(txn, nss); + auto distLockHandleStatus = acquireDistLock(txn, _clusterIdentity, nss); if (!distLockHandleStatus.isOK()) { migration.completionNotification->set(distLockHandleStatus.getStatus()); return; @@ -409,10 +599,11 @@ void MigrationManager::_scheduleWithDistLock_inlock(OperationContext* txn, auto txn = cc().makeOperationContext(); stdx::lock_guard<stdx::mutex> lock(_mutex); - _completeWithDistLock_inlock(txn.get(), - itMigration, - extractMigrationStatusFromRemoteCommandResponse( - args.response, _state != kEnabled)); + _completeWithDistLock_inlock( + txn.get(), + itMigration, + extractMigrationStatusFromRemoteCommandResponse( + args.response, _state != State::kEnabled && _state != State::kRecovering)); }); if (callbackHandleWithStatus.isOK()) { @@ -441,7 +632,7 @@ void MigrationManager::_completeWithDistLock_inlock(OperationContext* txn, if (collectionMigrationState->migrations.empty()) { Grid::get(txn)->catalogClient(txn)->getDistLockManager()->unlock( - txn, collectionMigrationState->distLockHandle); + txn, collectionMigrationState->distLockHandle, nss.ns()); _activeMigrationsWithDistLock.erase(it); _checkDrained_inlock(); } @@ -472,7 +663,7 @@ void MigrationManager::_scheduleWithoutDistLock_inlock(OperationContext* txn, _checkDrained_inlock(); notificationToSignal->set(extractMigrationStatusFromRemoteCommandResponse( - args.response, _state != kEnabled)); + args.response, _state != State::kEnabled && _state != State::kRecovering)); }); if (callbackHandleWithStatus.isOK()) { @@ -489,16 +680,40 @@ void MigrationManager::_scheduleWithoutDistLock_inlock(OperationContext* txn, } void MigrationManager::_checkDrained_inlock() { - if (_state == kEnabled) { + if (_state == State::kEnabled || _state == State::kRecovering) { return; } - invariant(_state == kStopping); + invariant(_state == State::kStopping); if (_activeMigrationsWithDistLock.empty() && _activeMigrationsWithoutDistLock.empty()) { - _stoppedCondVar.notify_all(); + _condVar.notify_all(); } } +void MigrationManager::_waitForRecovery() { + stdx::unique_lock<stdx::mutex> lock(_mutex); + _condVar.wait(lock, [this] { return _state != State::kRecovering; }); +} + +void MigrationManager::_abandonActiveMigrationsAndEnableManager(OperationContext* txn) { + stdx::unique_lock<stdx::mutex> lock(_mutex); + invariant(_state == State::kRecovering); + + auto catalogClient = Grid::get(txn)->catalogClient(txn); + + // Unlock all balancer distlocks we aren't using anymore. + auto distLockManager = catalogClient->getDistLockManager(); + distLockManager->unlockAll(txn, distLockManager->getProcessID()); + + // Clear the config.migrations collection so that those chunks can be scheduled for migration + // again. + catalogClient->removeConfigDocuments( + txn, MigrationType::ConfigNS, BSONObj(), kMajorityWriteConcern); + + _state = State::kEnabled; + _condVar.notify_all(); +} + MigrationManager::Migration::Migration(NamespaceString inNss, BSONObj inMoveChunkCmdObj) : nss(std::move(inNss)), moveChunkCmdObj(std::move(inMoveChunkCmdObj)), diff --git a/src/mongo/s/balancer/migration_manager.h b/src/mongo/s/balancer/migration_manager.h index 39fe4e7f0ee..456452b1efd 100644 --- a/src/mongo/s/balancer/migration_manager.h +++ b/src/mongo/s/balancer/migration_manager.h @@ -99,20 +99,33 @@ public: bool waitForDelete); /** - * Non-blocking method, which puts the migration manager in a state where new migrations can be - * scheduled (kEnabled). May only be called if the manager is in the kStopped state. + * Non-blocking method that puts the migration manager in the kRecovering state, in which + * new migration requests will block until finishRecovery is called. */ - void enableMigrations(); + void startRecovery(); /** - * Non-blocking method, which puts the manager in a state where all subsequently scheduled - * migrations will immediately fail (without ever getting scheduled) and all active ones will be - * cancelled. It has no effect if the migration manager is not enabled. + * Blocking method that must only be called after startRecovery has been called. Recovers the + * state of the migration manager (if necessary and able) and puts it in the kEnabled state, + * where it will accept new migrations. Any migrations waiting on the recovery state will be + * unblocked. + */ + void finishRecovery(OperationContext* txn, + const OID& clusterIdentity, + uint64_t maxChunkSizeBytes, + const MigrationSecondaryThrottleOptions& secondaryThrottle, + bool waitForDelete); + + /** + * Non-blocking method that should never be called concurrently with finishRecovery. Puts the + * manager in a state where all subsequently scheduled migrations will immediately fail (without + * ever getting scheduled) and all active ones will be cancelled. It has no effect if the + * migration manager is already stopping or stopped. */ void interruptAndDisableMigrations(); /** - * Blocking method, which waits for any currently scheduled migrations to complete. Must be + * Blocking method that waits for any currently scheduled migrations to complete. Must be * called after interruptAndDisableMigrations has been called in order to be able to re-enable * migrations again. */ @@ -120,7 +133,12 @@ public: private: // The current state of the migration manager - enum State { kEnabled, kStopping, kStopped }; + enum class State { // Allowed transitions: + kRecovering, // kEnabled + kEnabled, // kStopping + kStopping, // kStopped + kStopped, // kRecovering + }; /** * Tracks the execution state of a single migration. @@ -217,22 +235,41 @@ private: Migration migration); /** - * If the state of the migration manager is kStopping checks whether there are any outstanding - * scheduled requests and if there aren't any signals the 'stopped' conditional variable. + * If the state of the migration manager is kStopping, checks whether there are any outstanding + * scheduled requests and if there aren't any signals the class condition variable. */ void _checkDrained_inlock(); - // The service context under which this migration manager runs + /** + * Blocking call, which waits for the migration manager to leave the recovering state (if it is + * currently recovering). + */ + void _waitForRecovery(); + + /** + * Should only be called from within the finishRecovery function because the migration manager + * must be in the kRecovering state. Releases all the distributed locks that the balancer holds, + * clears the config.migrations collection, changes the state of the migration manager from + * kRecovering to kEnabled, and unblocks all processes waiting on the recovery state. + */ + void _abandonActiveMigrationsAndEnableManager(OperationContext* txn); + + // The service context under which this migration manager runs. ServiceContext* const _serviceContext; - // Protects the class state below + // Protects the class state below. stdx::mutex _mutex; - // Start the migration manager as stopped - State _state{kStopped}; + // Always start the migration manager in a stopped state. + State _state{State::kStopped}; + + // Condition variable, which is waited on when the migration manager's state is changing and + // signaled when the state change is complete. + stdx::condition_variable _condVar; - // Condition variable, which is signaled when the migration manager has no more active requests - stdx::condition_variable _stoppedCondVar; + // Identity of the cluster under which this migration manager runs. Used as a constant session + // ID for all distributed locks that the MigrationManager holds. + OID _clusterIdentity; // Holds information about each collection's distributed lock and active migrations via a // CollectionMigrationState object. @@ -241,6 +278,9 @@ private: // Holds information about migrations, which have been scheduled without the collection // distributed lock acquired (i.e., the shard is asked to acquire it). MigrationsList _activeMigrationsWithoutDistLock; + + // Carries migration information over from startRecovery to finishRecovery. + stdx::unordered_map<NamespaceString, std::list<MigrateInfo>> _migrationRecoveryMap; }; } // namespace mongo diff --git a/src/mongo/s/balancer/migration_manager_test.cpp b/src/mongo/s/balancer/migration_manager_test.cpp index 696f208a95d..2e90d29f2e9 100644 --- a/src/mongo/s/balancer/migration_manager_test.cpp +++ b/src/mongo/s/balancer/migration_manager_test.cpp @@ -33,10 +33,12 @@ #include "mongo/db/commands.h" #include "mongo/db/write_concern_options.h" #include "mongo/s/balancer/migration_manager.h" +#include "mongo/s/balancer/type_migration.h" #include "mongo/s/catalog/dist_lock_manager_mock.h" #include "mongo/s/catalog/replset/sharding_catalog_client_impl.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_database.h" +#include "mongo/s/catalog/type_locks.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/config_server_test_fixture.h" #include "mongo/s/move_chunk_request.h" @@ -48,6 +50,7 @@ namespace { using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; using std::vector; +using unittest::assertGet; const auto kShardId0 = ShardId("shard0"); const auto kShardId1 = ShardId("shard1"); @@ -108,6 +111,21 @@ protected: const ChunkVersion& version); /** + * Inserts a document into the config.migrations collection as an active migration. + */ + void setUpMigration(const std::string& collName, + const BSONObj& minKey, + const BSONObj& maxKey, + const ShardId& toShard, + const ShardId& fromShard); + + /** + * Asserts that config.migrations is empty and config.locks contains no locked documents, both + * of which should be true if the MigrationManager is inactive and behaving properly. + */ + void checkMigrationsCollectionIsEmptyAndLocksAreUnlocked(); + + /** * Sets up mock network to expect a moveChunk command and return a fixed BSON response or a * "returnStatus". */ @@ -137,6 +155,9 @@ protected: const KeyPattern kKeyPattern = KeyPattern(BSON(kPattern << 1)); + // Cluster identity to pass to the migration manager + const OID _clusterIdentity{OID::gen()}; + std::unique_ptr<MigrationManager> _migrationManager; private: @@ -147,10 +168,13 @@ private: void MigrationManagerTest::setUp() { ConfigServerTestFixture::setUp(); _migrationManager = stdx::make_unique<MigrationManager>(getServiceContext()); - _migrationManager->enableMigrations(); + _migrationManager->startRecovery(); + _migrationManager->finishRecovery( + operationContext(), _clusterIdentity, 0, kDefaultSecondaryThrottle, false); } void MigrationManagerTest::tearDown() { + checkMigrationsCollectionIsEmptyAndLocksAreUnlocked(); _migrationManager->interruptAndDisableMigrations(); _migrationManager->drainActiveMigrations(); _migrationManager.reset(); @@ -199,6 +223,50 @@ ChunkType MigrationManagerTest::setUpChunk(const std::string& collName, return chunk; } +void MigrationManagerTest::setUpMigration(const std::string& collName, + const BSONObj& minKey, + const BSONObj& maxKey, + const ShardId& toShard, + const ShardId& fromShard) { + BSONObjBuilder builder; + builder.append(MigrationType::ns(), collName); + builder.append(MigrationType::min(), minKey); + builder.append(MigrationType::max(), maxKey); + builder.append(MigrationType::toShard(), toShard.toString()); + builder.append(MigrationType::fromShard(), fromShard.toString()); + MigrationType migrationType = assertGet(MigrationType::fromBSON(builder.obj())); + ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(), + MigrationType::ConfigNS, + migrationType.toBSON(), + kMajorityWriteConcern)); +} + +void MigrationManagerTest::checkMigrationsCollectionIsEmptyAndLocksAreUnlocked() { + auto statusWithMigrationsQueryResponse = + shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + operationContext(), + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString(MigrationType::ConfigNS), + BSONObj(), + BSONObj(), + boost::none); + Shard::QueryResponse migrationsQueryResponse = + uassertStatusOK(statusWithMigrationsQueryResponse); + ASSERT_EQUALS(0U, migrationsQueryResponse.docs.size()); + + auto statusWithLocksQueryResponse = shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + operationContext(), + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString(LocksType::ConfigNS), + BSON(LocksType::state(LocksType::LOCKED)), + BSONObj(), + boost::none); + Shard::QueryResponse locksQueryResponse = uassertStatusOK(statusWithLocksQueryResponse); + ASSERT_EQUALS(0U, locksQueryResponse.docs.size()); +} + void MigrationManagerTest::expectMoveChunkCommand(const ChunkType& chunk, const ShardId& toShardId, const bool& takeDistLock, @@ -559,7 +627,7 @@ TEST_F(MigrationManagerTest, FailToAcquireDistributedLock) { // Take the distributed lock for the collection before scheduling via the MigrationManager. const std::string whyMessage("FailToAcquireDistributedLock unit-test taking distributed lock"); DistLockManager::ScopedDistLock distLockStatus = - unittest::assertGet(catalogClient()->getDistLockManager()->lock( + assertGet(catalogClient()->getDistLockManager()->lock( operationContext(), chunk1.getNS(), whyMessage)); MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance( @@ -679,7 +747,7 @@ TEST_F(MigrationManagerTest, InterruptMigration) { setUpCollection(collName, version); // Set up a single chunk in the metadata. - ChunkType chunk1 = + ChunkType chunk = setUpChunk(collName, kKeyPattern.globalMin(), kKeyPattern.globalMax(), kShardId0, version); auto future = launchAsync([&] { @@ -691,7 +759,7 @@ TEST_F(MigrationManagerTest, InterruptMigration) { shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0); ASSERT_NOT_OK(_migrationManager->executeManualMigration( - txn.get(), {chunk1.getNS(), kShardId1, chunk1}, 0, kDefaultSecondaryThrottle, false)); + txn.get(), {chunk.getNS(), kShardId1, chunk}, 0, kDefaultSecondaryThrottle, false)); }); // Wait till the move chunk request gets sent and pretend that it is stuck by never responding @@ -714,13 +782,34 @@ TEST_F(MigrationManagerTest, InterruptMigration) { // Ensure that no new migrations can be scheduled ASSERT_NOT_OK(_migrationManager->executeManualMigration(operationContext(), - {chunk1.getNS(), kShardId1, chunk1}, + {chunk.getNS(), kShardId1, chunk}, 0, kDefaultSecondaryThrottle, false)); - // Ensure there are no active migrations left + // Ensure that the migration manager is no longer handling any migrations. _migrationManager->drainActiveMigrations(); + + // Check that the migration that was active when the migration manager was interrupted can be + // found in config.migrations (and thus would be recovered if a migration manager were to start + // up again). + auto statusWithMigrationsQueryResponse = + shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + operationContext(), + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString(MigrationType::ConfigNS), + BSON(MigrationType::name(chunk.getName())), + BSONObj(), + boost::none); + Shard::QueryResponse migrationsQueryResponse = + uassertStatusOK(statusWithMigrationsQueryResponse); + ASSERT_EQUALS(1U, migrationsQueryResponse.docs.size()); + + ASSERT_OK(catalogClient()->removeConfigDocuments(operationContext(), + MigrationType::ConfigNS, + BSON(MigrationType::name(chunk.getName())), + kMajorityWriteConcern)); } TEST_F(MigrationManagerTest, RestartMigrationManager) { @@ -743,7 +832,9 @@ TEST_F(MigrationManagerTest, RestartMigrationManager) { // Go through the lifecycle of the migration manager _migrationManager->interruptAndDisableMigrations(); _migrationManager->drainActiveMigrations(); - _migrationManager->enableMigrations(); + _migrationManager->startRecovery(); + _migrationManager->finishRecovery( + operationContext(), _clusterIdentity, 0, kDefaultSecondaryThrottle, false); auto future = launchAsync([&] { Client::initThreadIfNotAlready("Test"); @@ -764,5 +855,126 @@ TEST_F(MigrationManagerTest, RestartMigrationManager) { future.timed_get(kFutureTimeout); } +TEST_F(MigrationManagerTest, MigrationRecovery) { + // Set up two shards in the metadata. + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern)); + + // Set up the database and collection as sharded in the metadata. + std::string dbName = "foo"; + std::string collName = "foo.bar"; + ChunkVersion version(2, 0, OID::gen()); + + setUpDatabase(dbName, kShardId0); + setUpCollection(collName, version); + + // Set up two chunks in the metadata. + ChunkType chunk1 = + setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version); + version.incMinor(); + ChunkType chunk2 = + setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version); + + _migrationManager->interruptAndDisableMigrations(); + _migrationManager->drainActiveMigrations(); + _migrationManager->startRecovery(); + + // Set up two fake active migrations by writing documents to the config.migrations collection. + setUpMigration(collName, + chunk1.getMin(), + chunk1.getMax(), + kShardId1.toString(), + chunk1.getShard().toString()); + setUpMigration(collName, + chunk2.getMin(), + chunk2.getMax(), + kShardId3.toString(), + chunk2.getShard().toString()); + + auto future = launchAsync([this] { + Client::initThreadIfNotAlready("Test"); + auto txn = cc().makeOperationContext(); + + // Scheduling the moveChunk commands requires finding hosts to which to send the commands. + // Set up dummy hosts for the source shards. + shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0); + shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2); + + _migrationManager->finishRecovery( + txn.get(), _clusterIdentity, 0, kDefaultSecondaryThrottle, false); + }); + + // Expect two moveChunk commands. + expectMoveChunkCommand(chunk1, kShardId1, false, Status::OK()); + expectMoveChunkCommand(chunk2, kShardId3, false, Status::OK()); + + // Run the MigrationManager code. + future.timed_get(kFutureTimeout); +} + +TEST_F(MigrationManagerTest, FailMigrationRecovery) { + // Set up two shards in the metadata. + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard0, kMajorityWriteConcern)); + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), ShardType::ConfigNS, kShard2, kMajorityWriteConcern)); + + // Set up the database and collection as sharded in the metadata. + std::string dbName = "foo"; + std::string collName = "foo.bar"; + ChunkVersion version(2, 0, OID::gen()); + + setUpDatabase(dbName, kShardId0); + setUpCollection(collName, version); + + // Set up two chunks in the metadata. + ChunkType chunk1 = + setUpChunk(collName, kKeyPattern.globalMin(), BSON(kPattern << 49), kShardId0, version); + version.incMinor(); + ChunkType chunk2 = + setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version); + + // Set up a parsable fake active migration document in the config.migrations collection. + setUpMigration(collName, + chunk1.getMin(), + chunk1.getMax(), + kShardId1.toString(), + chunk1.getShard().toString()); + + _migrationManager->interruptAndDisableMigrations(); + _migrationManager->drainActiveMigrations(); + _migrationManager->startRecovery(); + + // Set up a fake active migration document that will fail MigrationType parsing -- missing + // field. + BSONObjBuilder builder; + builder.append("_id", "testing"); + // No MigrationType::ns() field! + builder.append(MigrationType::min(), chunk2.getMin()); + builder.append(MigrationType::max(), chunk2.getMax()); + builder.append(MigrationType::toShard(), kShardId3.toString()); + builder.append(MigrationType::fromShard(), chunk2.getShard().toString()); + ASSERT_OK(catalogClient()->insertConfigDocument( + operationContext(), MigrationType::ConfigNS, builder.obj(), kMajorityWriteConcern)); + + // Take the distributed lock for the collection, which should be released during recovery when + // it fails. Any dist lock held by the config server will be released via proccessId, so the + // session ID used here doesn't matter. + ASSERT_OK(catalogClient()->getDistLockManager()->lockWithSessionID( + operationContext(), + collName, + "MigrationManagerTest", + OID::gen(), + DistLockManager::kSingleLockAttemptTimeout)); + + _migrationManager->finishRecovery( + operationContext(), _clusterIdentity, 0, kDefaultSecondaryThrottle, false); + + // MigrationManagerTest::tearDown checks that the config.migrations collection is empty and all + // distributed locks are unlocked. +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/balancer/scoped_migration_request.cpp b/src/mongo/s/balancer/scoped_migration_request.cpp index 538537079f8..29beacdbd7c 100644 --- a/src/mongo/s/balancer/scoped_migration_request.cpp +++ b/src/mongo/s/balancer/scoped_migration_request.cpp @@ -36,6 +36,7 @@ #include "mongo/db/write_concern_options.h" #include "mongo/s/balancer/type_migration.h" #include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" @@ -90,18 +91,44 @@ ScopedMigrationRequest& ScopedMigrationRequest::operator=(ScopedMigrationRequest } StatusWith<ScopedMigrationRequest> ScopedMigrationRequest::writeMigration( - OperationContext* txn, - const MigrateInfo& migrateInfo, - const ChunkVersion& chunkVersion, - const ChunkVersion& collectionVersion) { + OperationContext* txn, const MigrateInfo& migrateInfo) { // Try to write a unique migration document to config.migrations. - MigrationType migrationType(migrateInfo, chunkVersion, collectionVersion); + MigrationType migrationType(migrateInfo); Status result = grid.catalogClient(txn)->insertConfigDocument( txn, MigrationType::ConfigNS, migrationType.toBSON(), kMajorityWriteConcern); if (result == ErrorCodes::DuplicateKey) { - return result; + // If the exact migration described by "migrateInfo" is active, return a scoped object for + // the request because this migration request will join the active one once scheduled. + auto statusWithMigrationQueryResult = + grid.shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString(MigrationType::ConfigNS), + migrationType.toBSON(), + BSONObj(), + 1); + + if (!statusWithMigrationQueryResult.isOK()) { + return {statusWithMigrationQueryResult.getStatus().code(), + str::stream() << "Failed to verify whether conflicting migration is in " + << "progress for migration '" + << migrateInfo.toString() + << "' while trying to persist migration to config.migrations." + << causedBy(redact(statusWithMigrationQueryResult.getStatus()))}; + } + + if (statusWithMigrationQueryResult.getValue().docs.size() != 1) { + invariant(statusWithMigrationQueryResult.getValue().docs.size() == 0); + log() << "Failed to write document '" << migrateInfo + << "' to config.migrations because there is already an active migration for " + << "this chunk" << causedBy(redact(result)); + return result; + } + + result = Status::OK(); } // As long as there isn't a DuplicateKey error, the document may have been written, and it's diff --git a/src/mongo/s/balancer/scoped_migration_request.h b/src/mongo/s/balancer/scoped_migration_request.h index 68f46cfdb50..8595671dc4d 100644 --- a/src/mongo/s/balancer/scoped_migration_request.h +++ b/src/mongo/s/balancer/scoped_migration_request.h @@ -30,7 +30,6 @@ #include "mongo/base/status_with.h" #include "mongo/s/balancer/balancer_policy.h" -#include "mongo/s/chunk_version.h" #include "mongo/s/migration_secondary_throttle_options.h" namespace mongo { @@ -68,9 +67,7 @@ public: * The destructor will handle removing the document when it is no longer needed. */ static StatusWith<ScopedMigrationRequest> writeMigration(OperationContext* txn, - const MigrateInfo& migrate, - const ChunkVersion& chunkVersion, - const ChunkVersion& collectionVersion); + const MigrateInfo& migrate); /** * Creates a ScopedMigrationRequest object without inserting a document into config.migrations. diff --git a/src/mongo/s/balancer/scoped_migration_request_test.cpp b/src/mongo/s/balancer/scoped_migration_request_test.cpp index e7bc010c654..48c0d501136 100644 --- a/src/mongo/s/balancer/scoped_migration_request_test.cpp +++ b/src/mongo/s/balancer/scoped_migration_request_test.cpp @@ -45,6 +45,7 @@ const BSONObj kMin = BSON("a" << 10); const BSONObj kMax = BSON("a" << 20); const ShardId kFromShard("shard0000"); const ShardId kToShard("shard0001"); +const ShardId kDifferentToShard("shard0002"); const std::string kName = "TestDB.TestColl-a_10"; class ScopedMigrationRequestTest : public ConfigServerTestFixture { @@ -82,10 +83,7 @@ void ScopedMigrationRequestTest::checkMigrationsCollectionForDocument( ScopedMigrationRequest ScopedMigrationRequestTest::makeScopedMigrationRequest( const MigrateInfo& migrateInfo) { ScopedMigrationRequest scopedMigrationRequest = - assertGet(ScopedMigrationRequest::writeMigration(operationContext(), - migrateInfo, - ChunkVersion(1, 2, OID::gen()), - ChunkVersion(1, 2, OID::gen()))); + assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo)); checkMigrationsCollectionForDocument(migrateInfo.getName(), 1); @@ -114,10 +112,7 @@ TEST_F(ScopedMigrationRequestTest, CreateScopedMigrationRequest) { { ScopedMigrationRequest scopedMigrationRequest = - assertGet(ScopedMigrationRequest::writeMigration(operationContext(), - migrateInfo, - ChunkVersion(1, 2, OID::gen()), - ChunkVersion(1, 2, OID::gen()))); + assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo)); checkMigrationsCollectionForDocument(migrateInfo.getName(), 1); } @@ -138,10 +133,7 @@ TEST_F(ScopedMigrationRequestTest, CreateScopedMigrationRequestOnRecovery) { // Insert the document for the MigrationRequest and then prevent its removal in the destructor. { ScopedMigrationRequest scopedMigrationRequest = - assertGet(ScopedMigrationRequest::writeMigration(operationContext(), - migrateInfo, - ChunkVersion(1, 2, OID::gen()), - ChunkVersion(1, 2, OID::gen()))); + assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo)); checkMigrationsCollectionForDocument(migrateInfo.getName(), 1); @@ -150,13 +142,14 @@ TEST_F(ScopedMigrationRequestTest, CreateScopedMigrationRequestOnRecovery) { checkMigrationsCollectionForDocument(migrateInfo.getName(), 1); - // Trying to write a migration that already exists should fail. + // Fail to write a migration document if a migration document already exists for that chunk but + // with a different destination shard. (the migration request must have identical parameters). { + MigrateInfo differentToShardMigrateInfo = migrateInfo; + differentToShardMigrateInfo.to = kDifferentToShard; + StatusWith<ScopedMigrationRequest> statusWithScopedMigrationRequest = - ScopedMigrationRequest::writeMigration(operationContext(), - migrateInfo, - ChunkVersion(1, 2, OID::gen()), - ChunkVersion(1, 2, OID::gen())); + ScopedMigrationRequest::writeMigration(operationContext(), differentToShardMigrateInfo); ASSERT_EQUALS(ErrorCodes::DuplicateKey, statusWithScopedMigrationRequest.getStatus()); @@ -175,6 +168,32 @@ TEST_F(ScopedMigrationRequestTest, CreateScopedMigrationRequestOnRecovery) { checkMigrationsCollectionForDocument(migrateInfo.getName(), 0); } +TEST_F(ScopedMigrationRequestTest, CreateMultipleScopedMigrationRequestsForIdenticalMigration) { + MigrateInfo migrateInfo = makeMigrateInfo(); + + { + // Create a ScopedMigrationRequest, which will do the config.migrations write. + ScopedMigrationRequest scopedMigrationRequest = + assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo)); + + checkMigrationsCollectionForDocument(migrateInfo.getName(), 1); + + { + // Should be able to create another Scoped object if the request is identical. + ScopedMigrationRequest identicalScopedMigrationRequest = + assertGet(ScopedMigrationRequest::writeMigration(operationContext(), migrateInfo)); + + checkMigrationsCollectionForDocument(migrateInfo.getName(), 1); + } + + // If any scoped object goes out of scope, the migration should be over and the document + // removed. + checkMigrationsCollectionForDocument(migrateInfo.getName(), 0); + } + + checkMigrationsCollectionForDocument(migrateInfo.getName(), 0); +} + TEST_F(ScopedMigrationRequestTest, MoveAndAssignmentConstructors) { MigrateInfo migrateInfo = makeMigrateInfo(); diff --git a/src/mongo/s/balancer/type_migration.cpp b/src/mongo/s/balancer/type_migration.cpp index 0fb97ba7b51..963c40b30e3 100644 --- a/src/mongo/s/balancer/type_migration.cpp +++ b/src/mongo/s/balancer/type_migration.cpp @@ -43,19 +43,13 @@ const BSONField<BSONObj> MigrationType::min("min"); const BSONField<BSONObj> MigrationType::max("max"); const BSONField<std::string> MigrationType::fromShard("fromShard"); const BSONField<std::string> MigrationType::toShard("toShard"); -const BSONField<std::string> MigrationType::chunkVersionField("chunkVersion"); -const BSONField<std::string> MigrationType::collectionVersionField("collectionVersion"); MigrationType::MigrationType() = default; -MigrationType::MigrationType(MigrateInfo info, - const ChunkVersion& chunkVersion, - const ChunkVersion& collectionVersion) +MigrationType::MigrationType(MigrateInfo info) : _nss(NamespaceString(info.ns)), _min(info.minKey), _max(info.maxKey), - _chunkVersion(chunkVersion), - _collectionVersion(collectionVersion), _fromShard(info.from), _toShard(info.to) {} @@ -81,24 +75,6 @@ StatusWith<MigrationType> MigrationType::fromBSON(const BSONObj& source) { } { - auto chunkVersionStatus = - ChunkVersion::parseFromBSONWithFieldForCommands(source, chunkVersionField.name()); - if (!chunkVersionStatus.isOK()) { - return chunkVersionStatus.getStatus(); - } - migrationType._chunkVersion = std::move(chunkVersionStatus.getValue()); - } - - { - auto collectionVersionStatus = - ChunkVersion::parseFromBSONWithFieldForCommands(source, collectionVersionField.name()); - if (!collectionVersionStatus.isOK()) { - return collectionVersionStatus.getStatus(); - } - migrationType._collectionVersion = std::move(collectionVersionStatus.getValue()); - } - - { std::string migrationToShard; Status status = bsonExtractStringField(source, toShard.name(), &migrationToShard); if (!status.isOK()) @@ -127,10 +103,6 @@ BSONObj MigrationType::toBSON() const { builder.append(min.name(), _min.get()); if (_max) builder.append(max.name(), _max.get()); - if (_chunkVersion) - _chunkVersion->appendWithFieldForCommands(&builder, chunkVersionField.name()); - if (_collectionVersion) - _collectionVersion->appendWithFieldForCommands(&builder, collectionVersionField.name()); if (_fromShard) builder.append(fromShard.name(), _fromShard->toString()); if (_toShard) @@ -139,6 +111,10 @@ BSONObj MigrationType::toBSON() const { return builder.obj(); } +MigrateInfo MigrationType::toMigrateInfo() const { + return MigrateInfo(_nss->ns(), _toShard.get(), _fromShard.get(), _min.get(), _max.get()); +} + std::string MigrationType::getName() const { return ChunkType::genID(_nss->ns(), _min.get()); } diff --git a/src/mongo/s/balancer/type_migration.h b/src/mongo/s/balancer/type_migration.h index f1ee489d916..5f2948e9dfe 100644 --- a/src/mongo/s/balancer/type_migration.h +++ b/src/mongo/s/balancer/type_migration.h @@ -59,9 +59,7 @@ public: * The Balancer encapsulates migration information in MigrateInfo objects, so this facilitates * conversion to a config.migrations entry format. */ - MigrationType(MigrateInfo info, - const ChunkVersion& chunkVersion, - const ChunkVersion& collectionVersion); + explicit MigrationType(MigrateInfo info); /** * Constructs a new MigrationType object from BSON. Expects all fields to be present, and errors @@ -75,6 +73,11 @@ public: BSONObj toBSON() const; /** + * Helper function for the Balancer that uses MigrateInfo objects to schedule migrations. + */ + MigrateInfo toMigrateInfo() const; + + /** * Uniquely identifies a chunk by collection and min key. */ std::string getName() const; @@ -86,8 +89,6 @@ private: boost::optional<NamespaceString> _nss; boost::optional<BSONObj> _min; boost::optional<BSONObj> _max; - boost::optional<ChunkVersion> _chunkVersion; - boost::optional<ChunkVersion> _collectionVersion; boost::optional<ShardId> _fromShard; boost::optional<ShardId> _toShard; }; diff --git a/src/mongo/s/balancer/type_migration_test.cpp b/src/mongo/s/balancer/type_migration_test.cpp index ff6c31ca9ae..d3a352301aa 100644 --- a/src/mongo/s/balancer/type_migration_test.cpp +++ b/src/mongo/s/balancer/type_migration_test.cpp @@ -61,15 +61,13 @@ TEST(MigrationTypeTest, ConvertFromMigrationInfo) { ASSERT_OK(chunkType.validate()); MigrateInfo migrateInfo(kNs, kToShard, chunkType); - MigrationType migrationType(migrateInfo, version, version); + MigrationType migrationType(migrateInfo); BSONObjBuilder builder; builder.append(MigrationType::name(), kName); builder.append(MigrationType::ns(), kNs); builder.append(MigrationType::min(), kMin); builder.append(MigrationType::max(), kMax); - version.appendWithFieldForCommands(&builder, MigrationType::chunkVersionField.name()); - version.appendWithFieldForCommands(&builder, MigrationType::collectionVersionField.name()); builder.append(MigrationType::fromShard(), kFromShard.toString()); builder.append(MigrationType::toShard(), kToShard.toString()); @@ -79,15 +77,11 @@ TEST(MigrationTypeTest, ConvertFromMigrationInfo) { } TEST(MigrationTypeTest, FromAndToBSON) { - const ChunkVersion version(1, 2, OID::gen()); - BSONObjBuilder builder; builder.append(MigrationType::name(), kName); builder.append(MigrationType::ns(), kNs); builder.append(MigrationType::min(), kMin); builder.append(MigrationType::max(), kMax); - version.appendWithFieldForCommands(&builder, MigrationType::chunkVersionField.name()); - version.appendWithFieldForCommands(&builder, MigrationType::collectionVersionField.name()); builder.append(MigrationType::fromShard(), kFromShard.toString()); builder.append(MigrationType::toShard(), kToShard.toString()); @@ -98,13 +92,9 @@ TEST(MigrationTypeTest, FromAndToBSON) { } TEST(MigrationTypeTest, MissingRequiredNamespaceField) { - const ChunkVersion version(1, 2, OID::gen()); - BSONObjBuilder builder; builder.append(MigrationType::min(), kMin); builder.append(MigrationType::max(), kMax); - version.appendWithFieldForCommands(&builder, MigrationType::chunkVersionField.name()); - version.appendWithFieldForCommands(&builder, MigrationType::collectionVersionField.name()); builder.append(MigrationType::fromShard(), kFromShard.toString()); builder.append(MigrationType::toShard(), kToShard.toString()); @@ -116,13 +106,9 @@ TEST(MigrationTypeTest, MissingRequiredNamespaceField) { } TEST(MigrationTypeTest, MissingRequiredMinField) { - const ChunkVersion version(1, 2, OID::gen()); - BSONObjBuilder builder; builder.append(MigrationType::ns(), kNs); builder.append(MigrationType::max(), kMax); - version.appendWithFieldForCommands(&builder, MigrationType::chunkVersionField.name()); - version.appendWithFieldForCommands(&builder, MigrationType::collectionVersionField.name()); builder.append(MigrationType::fromShard(), kFromShard.toString()); builder.append(MigrationType::toShard(), kToShard.toString()); @@ -134,13 +120,9 @@ TEST(MigrationTypeTest, MissingRequiredMinField) { } TEST(MigrationTypeTest, MissingRequiredMaxField) { - const ChunkVersion version(1, 2, OID::gen()); - BSONObjBuilder builder; builder.append(MigrationType::ns(), kNs); builder.append(MigrationType::min(), kMin); - version.appendWithFieldForCommands(&builder, MigrationType::chunkVersionField.name()); - version.appendWithFieldForCommands(&builder, MigrationType::collectionVersionField.name()); builder.append(MigrationType::fromShard(), kFromShard.toString()); builder.append(MigrationType::toShard(), kToShard.toString()); @@ -151,53 +133,11 @@ TEST(MigrationTypeTest, MissingRequiredMaxField) { ASSERT_STRING_CONTAINS(migrationType.getStatus().reason(), MigrationType::max.name()); } -TEST(MigrationTypeTest, MissingRequiredChunkVersionField) { - const ChunkVersion version(1, 2, OID::gen()); - - BSONObjBuilder builder; - builder.append(MigrationType::ns(), kNs); - builder.append(MigrationType::min(), kMin); - builder.append(MigrationType::max(), kMax); - version.appendWithFieldForCommands(&builder, MigrationType::collectionVersionField.name()); - builder.append(MigrationType::fromShard(), kFromShard.toString()); - builder.append(MigrationType::toShard(), kToShard.toString()); - - BSONObj obj = builder.obj(); - - StatusWith<MigrationType> migrationType = MigrationType::fromBSON(obj); - ASSERT_EQUALS(migrationType.getStatus(), ErrorCodes::NoSuchKey); - ASSERT_STRING_CONTAINS(migrationType.getStatus().reason(), - MigrationType::chunkVersionField.name()); -} - -TEST(MigrationTypeTest, MissingRequiredCollectionVersionField) { - const ChunkVersion version(1, 2, OID::gen()); - - BSONObjBuilder builder; - builder.append(MigrationType::ns(), kNs); - builder.append(MigrationType::min(), kMin); - builder.append(MigrationType::max(), kMax); - version.appendWithFieldForCommands(&builder, MigrationType::chunkVersionField.name()); - builder.append(MigrationType::fromShard(), kFromShard.toString()); - builder.append(MigrationType::toShard(), kToShard.toString()); - - BSONObj obj = builder.obj(); - - StatusWith<MigrationType> migrationType = MigrationType::fromBSON(obj); - ASSERT_EQUALS(migrationType.getStatus(), ErrorCodes::NoSuchKey); - ASSERT_STRING_CONTAINS(migrationType.getStatus().reason(), - MigrationType::collectionVersionField.name()); -} - TEST(MigrationTypeTest, MissingRequiredFromShardField) { - const ChunkVersion version(1, 2, OID::gen()); - BSONObjBuilder builder; builder.append(MigrationType::ns(), kNs); builder.append(MigrationType::min(), kMin); builder.append(MigrationType::max(), kMax); - version.appendWithFieldForCommands(&builder, MigrationType::chunkVersionField.name()); - version.appendWithFieldForCommands(&builder, MigrationType::collectionVersionField.name()); builder.append(MigrationType::toShard(), kToShard.toString()); BSONObj obj = builder.obj(); @@ -208,14 +148,10 @@ TEST(MigrationTypeTest, MissingRequiredFromShardField) { } TEST(MigrationTypeTest, MissingRequiredToShardField) { - const ChunkVersion version(1, 2, OID::gen()); - BSONObjBuilder builder; builder.append(MigrationType::ns(), kNs); builder.append(MigrationType::min(), kMin); builder.append(MigrationType::max(), kMax); - version.appendWithFieldForCommands(&builder, MigrationType::chunkVersionField.name()); - version.appendWithFieldForCommands(&builder, MigrationType::collectionVersionField.name()); builder.append(MigrationType::fromShard(), kFromShard.toString()); BSONObj obj = builder.obj(); diff --git a/src/mongo/s/move_chunk_request.cpp b/src/mongo/s/move_chunk_request.cpp index d26c0002d83..e16ae7307fd 100644 --- a/src/mongo/s/move_chunk_request.cpp +++ b/src/mongo/s/move_chunk_request.cpp @@ -184,14 +184,6 @@ bool MoveChunkRequest::operator==(const MoveChunkRequest& other) const { return false; if (_range != other._range) return false; - if (_maxChunkSizeBytes != other._maxChunkSizeBytes) - return false; - if (_secondaryThrottle != other._secondaryThrottle) - return false; - if (_waitForDelete != other._waitForDelete) - return false; - if (_takeDistLock != other._takeDistLock) - return false; return true; } |