From f8212b6b37bea1bead354df86e8485761a519339 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 14 Sep 2016 17:44:43 -0400 Subject: SERVER-25905 Release all config held distlocks and reacquire balancer distlocks in drain mode on config step up to primary --- ...replication_coordinator_external_state_impl.cpp | 4 ++ src/mongo/s/balancer/balancer.cpp | 10 ++- src/mongo/s/balancer/migration_manager.cpp | 83 ++++++++++++++-------- src/mongo/s/balancer/migration_manager.h | 38 ++++++---- src/mongo/s/balancer/migration_manager_test.cpp | 30 ++++---- src/mongo/s/catalog/dist_lock_manager.h | 3 +- src/mongo/s/catalog/dist_lock_manager_mock.cpp | 2 +- src/mongo/s/catalog/dist_lock_manager_mock.h | 3 +- .../catalog/replset/replset_dist_lock_manager.cpp | 3 +- .../s/catalog/replset/replset_dist_lock_manager.h | 3 +- .../replset/replset_dist_lock_manager_test.cpp | 8 +-- 11 files changed, 108 insertions(+), 79 deletions(-) diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 15484b6729f..f1e0e16e399 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -694,6 +694,10 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook << causedBy(shardAwareInitializationStatus); } + // Free any leftover locks from previous instantiations. + auto distLockManager = Grid::get(txn)->catalogClient(txn)->getDistLockManager(); + distLockManager->unlockAll(txn, distLockManager->getProcessID()); + // If this is a config server node becoming a primary, start the balancer Balancer::get(txn)->startThread(txn); } else if (ShardingState::get(txn)->enabled()) { diff --git a/src/mongo/s/balancer/balancer.cpp b/src/mongo/s/balancer/balancer.cpp index f2a4c52146c..c3a77dee494 100644 --- a/src/mongo/s/balancer/balancer.cpp +++ b/src/mongo/s/balancer/balancer.cpp @@ -182,7 +182,7 @@ Status Balancer::startThread(OperationContext* txn) { invariant(!_thread.joinable()); _state = kRunning; - _migrationManager.startRecovery(); + _migrationManager.startRecoveryAndAcquireDistLocks(txn); _thread = stdx::thread([this] { _mainThread(); }); // Intentional fall through case kRunning: @@ -291,7 +291,6 @@ void Balancer::_mainThread() { const Seconds kInitBackoffInterval(10); - OID clusterIdentity = ClusterIdentityLoader::get(txn.get())->getClusterId(); // Take the balancer distributed lock and hold it permanently. Do the attempts with single // attempts in order to not block the thread and be able to check for interrupt more frequently. @@ -301,7 +300,7 @@ void Balancer::_mainThread() { txn.get(), "balancer", "CSRS Balancer", - clusterIdentity, + OID::gen(), DistLockManager::kSingleLockAttemptTimeout); if (!distLockHandleStatus.isOK()) { warning() << "Balancer distributed lock could not be acquired and will be retried in " @@ -316,16 +315,15 @@ void Balancer::_mainThread() { } if (!_stopRequested()) { - log() << "CSRS balancer thread for cluster " << clusterIdentity << " is recovering"; + log() << "CSRS balancer thread is recovering"; auto balancerConfig = Grid::get(txn.get())->getBalancerConfiguration(); _migrationManager.finishRecovery(txn.get(), - clusterIdentity, balancerConfig->getMaxChunkSizeBytes(), balancerConfig->getSecondaryThrottle(), balancerConfig->waitForDelete()); - log() << "CSRS balancer thread for cluster " << clusterIdentity << " is recovered"; + log() << "CSRS balancer thread is recovered"; } // Main balancer loop diff --git a/src/mongo/s/balancer/migration_manager.cpp b/src/mongo/s/balancer/migration_manager.cpp index aa5beb38729..4bd9ae0950a 100644 --- a/src/mongo/s/balancer/migration_manager.cpp +++ b/src/mongo/s/balancer/migration_manager.cpp @@ -104,13 +104,13 @@ Status extractMigrationStatusFromRemoteCommandResponse(const RemoteCommandRespon * Blocking call to acquire the distributed collection lock for the specified namespace. */ StatusWith acquireDistLock(OperationContext* txn, - const OID& clusterIdentity, + const OID& lockSessionID, const NamespaceString& nss) { const std::string whyMessage(stream() << "Migrating chunk(s) in collection " << nss.ns()); auto statusWithDistLockHandle = Grid::get(txn)->catalogClient(txn)->getDistLockManager()->lockWithSessionID( - txn, nss.ns(), whyMessage, clusterIdentity, DistLockManager::kSingleLockAttemptTimeout); + txn, nss.ns(), whyMessage, lockSessionID, DistLockManager::kSingleLockAttemptTimeout); if (!statusWithDistLockHandle.isOK()) { // If we get LockBusy while trying to acquire the collection distributed lock, this implies @@ -135,7 +135,7 @@ StatusWith acquireDistLock(OperationContext* txn, } // namespace MigrationManager::MigrationManager(ServiceContext* serviceContext) - : _serviceContext(serviceContext) {} + : _serviceContext(serviceContext), _lockSessionID(OID::gen()) {} MigrationManager::~MigrationManager() { // The migration manager must be completely quiesced at destruction time @@ -283,29 +283,20 @@ Status MigrationManager::executeManualMigration( return status; } -void MigrationManager::startRecovery() { - stdx::lock_guard lock(_mutex); - invariant(_state == State::kStopped); - _state = State::kRecovering; -} - -void MigrationManager::finishRecovery(OperationContext* txn, - const OID& clusterIdentity, - uint64_t maxChunkSizeBytes, - const MigrationSecondaryThrottleOptions& secondaryThrottle, - bool waitForDelete) { +void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* txn) { { stdx::lock_guard lock(_mutex); - invariant(_state == State::kRecovering); - if (!_clusterIdentity.isSet()) { - _clusterIdentity = clusterIdentity; - } - invariant(_clusterIdentity == clusterIdentity); + invariant(_state == State::kStopped); + invariant(_migrationRecoveryMap.empty()); + _state = State::kRecovering; } - // Load the active migrations from the config.migrations collection. - vector migrateInfos; + auto scopedGuard = MakeGuard([&] { + _migrationRecoveryMap.clear(); + _abandonActiveMigrationsAndEnableManager(txn); + }); + // Load the active migrations from the config.migrations collection. auto statusWithMigrationsQueryResponse = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( txn, @@ -318,10 +309,8 @@ void MigrationManager::finishRecovery(OperationContext* txn, if (!statusWithMigrationsQueryResponse.isOK()) { warning() << "Unable to read config.migrations collection documents for balancer migration" - << " recovery. Abandoning recovery." + << " recovery. Abandoning balancer recovery." << causedBy(redact(statusWithMigrationsQueryResponse.getStatus())); - - _abandonActiveMigrationsAndEnableManager(txn); return; } @@ -331,11 +320,9 @@ void MigrationManager::finishRecovery(OperationContext* txn, // The format of this migration document is incorrect. The balancer holds a distlock for // this migration, but without parsing the migration document we cannot identify which // distlock must be released. So we must release all distlocks. - warning() << "Unable to parse config.migrations collection documents for balancer" - << " migration recovery. Abandoning recovery." + warning() << "Unable to parse config.migrations document '" << migration + << "' for balancer migration recovery. Abandoning balancer recovery." << causedBy(redact(statusWithMigrationType.getStatus())); - - _abandonActiveMigrationsAndEnableManager(txn); return; } MigrateInfo migrateInfo = statusWithMigrationType.getValue().toMigrateInfo(); @@ -345,11 +332,47 @@ void MigrationManager::finishRecovery(OperationContext* txn, std::list list; it = _migrationRecoveryMap.insert(std::make_pair(NamespaceString(migrateInfo.ns), list)) .first; + + // Reacquire the matching distributed lock for this namespace. + const std::string whyMessage(stream() << "Migrating chunk(s) in collection " + << migrateInfo.ns); + auto statusWithDistLockHandle = + Grid::get(txn) + ->catalogClient(txn) + ->getDistLockManager() + ->tryLockWithLocalWriteConcern(txn, migrateInfo.ns, whyMessage, _lockSessionID); + if (!statusWithDistLockHandle.isOK() && + statusWithDistLockHandle.getStatus() != ErrorCodes::LockStateChangeFailed) { + // LockStateChangeFailed is alright because that should mean a 3.2 shard has it for + // the active migration. + warning() << "Failed to acquire distributed lock for collection '" << migrateInfo.ns + << "' during balancer recovery of an active migration. Abandoning" + << " balancer recovery." + << causedBy(redact(statusWithDistLockHandle.getStatus())); + return; + } } it->second.push_back(std::move(migrateInfo)); } + scopedGuard.Dismiss(); +} + +void MigrationManager::finishRecovery(OperationContext* txn, + uint64_t maxChunkSizeBytes, + const MigrationSecondaryThrottleOptions& secondaryThrottle, + bool waitForDelete) { + { + stdx::lock_guard lock(_mutex); + // Check if recovery was abandoned in startRecovery, in which case there is no more to do. + if (_state == State::kEnabled) { + invariant(_migrationRecoveryMap.empty()); + return; + } + invariant(_state == State::kRecovering); + } + // Schedule recovered migrations. vector scopedMigrationRequests; vector>> responses; @@ -386,7 +409,7 @@ void MigrationManager::finishRecovery(OperationContext* txn, txn, NamespaceString(itMigrateInfo->ns), itMigrateInfo->minKey); if (nssAndMigrateInfos.second.size() == 1) { Grid::get(txn)->catalogClient(txn)->getDistLockManager()->unlock( - txn, _clusterIdentity, itMigrateInfo->ns); + txn, _lockSessionID, itMigrateInfo->ns); } itMigrateInfo = nssAndMigrateInfos.second.erase(itMigrateInfo); } else { @@ -568,7 +591,7 @@ void MigrationManager::_scheduleWithDistLock_inlock(OperationContext* txn, auto it = _activeMigrationsWithDistLock.find(nss); if (it == _activeMigrationsWithDistLock.end()) { // Acquire the collection distributed lock (blocking call) - auto distLockHandleStatus = acquireDistLock(txn, _clusterIdentity, nss); + auto distLockHandleStatus = acquireDistLock(txn, _lockSessionID, nss); if (!distLockHandleStatus.isOK()) { migration.completionNotification->set(distLockHandleStatus.getStatus()); return; diff --git a/src/mongo/s/balancer/migration_manager.h b/src/mongo/s/balancer/migration_manager.h index 456452b1efd..2c6d716b862 100644 --- a/src/mongo/s/balancer/migration_manager.h +++ b/src/mongo/s/balancer/migration_manager.h @@ -100,18 +100,24 @@ public: /** * Non-blocking method that puts the migration manager in the kRecovering state, in which - * new migration requests will block until finishRecovery is called. + * new migration requests will block until finishRecovery is called. Then does local writes to + * reacquire the distributed locks for active migrations. + * + * The active migration recovery may fail and be abandoned, setting the state to kEnabled. */ - void startRecovery(); + void startRecoveryAndAcquireDistLocks(OperationContext* txn); /** * Blocking method that must only be called after startRecovery has been called. Recovers the * state of the migration manager (if necessary and able) and puts it in the kEnabled state, * where it will accept new migrations. Any migrations waiting on the recovery state will be - * unblocked. + * unblocked once the state is kEnabled, and then this function waits for the recovered active + * migrations to finish before returning. + * + * The active migration recovery may fail and be abandoned, setting the state to kEnabled and + * unblocking any process waiting on the recovery state. */ void finishRecovery(OperationContext* txn, - const OID& clusterIdentity, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle, bool waitForDelete); @@ -247,16 +253,25 @@ private: void _waitForRecovery(); /** - * Should only be called from within the finishRecovery function because the migration manager - * must be in the kRecovering state. Releases all the distributed locks that the balancer holds, - * clears the config.migrations collection, changes the state of the migration manager from - * kRecovering to kEnabled, and unblocks all processes waiting on the recovery state. + * Should only be called from startRecovery or finishRecovery functions when the migration + * manager is in either the kStopped or kRecovering state. Releases all the distributed locks + * that the balancer holds, clears the config.migrations collection, changes the state of the + * migration manager to kEnabled. Then unblocks all processes waiting for kEnabled state. */ void _abandonActiveMigrationsAndEnableManager(OperationContext* txn); // The service context under which this migration manager runs. ServiceContext* const _serviceContext; + // Used as a constant session ID for all distributed locks that this MigrationManager holds. + // Currently required so that locks can be reacquired for the balancer in startRecovery and then + // overtaken in later operations. + OID _lockSessionID; + + // Carries migration information over from startRecovery to finishRecovery. Should only be set + // in startRecovery and then accessed in finishRecovery. + stdx::unordered_map> _migrationRecoveryMap; + // Protects the class state below. stdx::mutex _mutex; @@ -267,10 +282,6 @@ private: // signaled when the state change is complete. stdx::condition_variable _condVar; - // Identity of the cluster under which this migration manager runs. Used as a constant session - // ID for all distributed locks that the MigrationManager holds. - OID _clusterIdentity; - // Holds information about each collection's distributed lock and active migrations via a // CollectionMigrationState object. CollectionMigrationsStateMap _activeMigrationsWithDistLock; @@ -278,9 +289,6 @@ private: // Holds information about migrations, which have been scheduled without the collection // distributed lock acquired (i.e., the shard is asked to acquire it). MigrationsList _activeMigrationsWithoutDistLock; - - // Carries migration information over from startRecovery to finishRecovery. - stdx::unordered_map> _migrationRecoveryMap; }; } // namespace mongo diff --git a/src/mongo/s/balancer/migration_manager_test.cpp b/src/mongo/s/balancer/migration_manager_test.cpp index 645de866c96..bd11a48521f 100644 --- a/src/mongo/s/balancer/migration_manager_test.cpp +++ b/src/mongo/s/balancer/migration_manager_test.cpp @@ -155,9 +155,6 @@ protected: const KeyPattern kKeyPattern = KeyPattern(BSON(kPattern << 1)); - // Cluster identity to pass to the migration manager - const OID _clusterIdentity{OID::gen()}; - std::unique_ptr _migrationManager; private: @@ -168,9 +165,8 @@ private: void MigrationManagerTest::setUp() { ConfigServerTestFixture::setUp(); _migrationManager = stdx::make_unique(getServiceContext()); - _migrationManager->startRecovery(); - _migrationManager->finishRecovery( - operationContext(), _clusterIdentity, 0, kDefaultSecondaryThrottle, false); + _migrationManager->startRecoveryAndAcquireDistLocks(operationContext()); + _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle, false); } void MigrationManagerTest::tearDown() { @@ -834,9 +830,8 @@ TEST_F(MigrationManagerTest, RestartMigrationManager) { // Go through the lifecycle of the migration manager _migrationManager->interruptAndDisableMigrations(); _migrationManager->drainActiveMigrations(); - _migrationManager->startRecovery(); - _migrationManager->finishRecovery( - operationContext(), _clusterIdentity, 0, kDefaultSecondaryThrottle, false); + _migrationManager->startRecoveryAndAcquireDistLocks(operationContext()); + _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle, false); auto future = launchAsync([&] { Client::initThreadIfNotAlready("Test"); @@ -881,7 +876,6 @@ TEST_F(MigrationManagerTest, MigrationRecovery) { _migrationManager->interruptAndDisableMigrations(); _migrationManager->drainActiveMigrations(); - _migrationManager->startRecovery(); // Set up two fake active migrations by writing documents to the config.migrations collection. setUpMigration(collName, @@ -895,6 +889,8 @@ TEST_F(MigrationManagerTest, MigrationRecovery) { kShardId3.toString(), chunk2.getShard().toString()); + _migrationManager->startRecoveryAndAcquireDistLocks(operationContext()); + auto future = launchAsync([this] { Client::initThreadIfNotAlready("Test"); auto txn = cc().makeOperationContext(); @@ -904,8 +900,7 @@ TEST_F(MigrationManagerTest, MigrationRecovery) { shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0); shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2); - _migrationManager->finishRecovery( - txn.get(), _clusterIdentity, 0, kDefaultSecondaryThrottle, false); + _migrationManager->finishRecovery(txn.get(), 0, kDefaultSecondaryThrottle, false); }); // Expect two moveChunk commands. @@ -938,6 +933,9 @@ TEST_F(MigrationManagerTest, FailMigrationRecovery) { ChunkType chunk2 = setUpChunk(collName, BSON(kPattern << 49), kKeyPattern.globalMax(), kShardId2, version); + _migrationManager->interruptAndDisableMigrations(); + _migrationManager->drainActiveMigrations(); + // Set up a parsable fake active migration document in the config.migrations collection. setUpMigration(collName, chunk1.getMin(), @@ -945,10 +943,6 @@ TEST_F(MigrationManagerTest, FailMigrationRecovery) { kShardId1.toString(), chunk1.getShard().toString()); - _migrationManager->interruptAndDisableMigrations(); - _migrationManager->drainActiveMigrations(); - _migrationManager->startRecovery(); - // Set up a fake active migration document that will fail MigrationType parsing -- missing // field. BSONObjBuilder builder; @@ -971,8 +965,8 @@ TEST_F(MigrationManagerTest, FailMigrationRecovery) { OID::gen(), DistLockManager::kSingleLockAttemptTimeout)); - _migrationManager->finishRecovery( - operationContext(), _clusterIdentity, 0, kDefaultSecondaryThrottle, false); + _migrationManager->startRecoveryAndAcquireDistLocks(operationContext()); + _migrationManager->finishRecovery(operationContext(), 0, kDefaultSecondaryThrottle, false); // MigrationManagerTest::tearDown checks that the config.migrations collection is empty and all // distributed locks are unlocked. diff --git a/src/mongo/s/catalog/dist_lock_manager.h b/src/mongo/s/catalog/dist_lock_manager.h index 400490b8cd1..0512a5dc481 100644 --- a/src/mongo/s/catalog/dist_lock_manager.h +++ b/src/mongo/s/catalog/dist_lock_manager.h @@ -153,7 +153,8 @@ public: */ virtual StatusWith tryLockWithLocalWriteConcern(OperationContext* txn, StringData name, - StringData whyMessage) = 0; + StringData whyMessage, + const OID& lockSessionID) = 0; /** * Unlocks the given lockHandle. Will attempt to retry again later if the config diff --git a/src/mongo/s/catalog/dist_lock_manager_mock.cpp b/src/mongo/s/catalog/dist_lock_manager_mock.cpp index 194ec4b7918..18bd8a8ba6c 100644 --- a/src/mongo/s/catalog/dist_lock_manager_mock.cpp +++ b/src/mongo/s/catalog/dist_lock_manager_mock.cpp @@ -95,7 +95,7 @@ StatusWith DistLockManagerMock::lockWithSessionID(OperationConte } StatusWith DistLockManagerMock::tryLockWithLocalWriteConcern( - OperationContext* txn, StringData name, StringData whyMessage) { + OperationContext* txn, StringData name, StringData whyMessage, const OID& lockSessionID) { // Not yet implemented MONGO_UNREACHABLE; } diff --git a/src/mongo/s/catalog/dist_lock_manager_mock.h b/src/mongo/s/catalog/dist_lock_manager_mock.h index 19a0eb1c146..d137b0239e4 100644 --- a/src/mongo/s/catalog/dist_lock_manager_mock.h +++ b/src/mongo/s/catalog/dist_lock_manager_mock.h @@ -56,7 +56,8 @@ public: StatusWith tryLockWithLocalWriteConcern(OperationContext* txn, StringData name, - StringData whyMessage) override; + StringData whyMessage, + const OID& lockSessionID) override; void unlockAll(OperationContext* txn, const std::string& processID) override; diff --git a/src/mongo/s/catalog/replset/replset_dist_lock_manager.cpp b/src/mongo/s/catalog/replset/replset_dist_lock_manager.cpp index 27b80627647..6eab41a6bef 100644 --- a/src/mongo/s/catalog/replset/replset_dist_lock_manager.cpp +++ b/src/mongo/s/catalog/replset/replset_dist_lock_manager.cpp @@ -426,8 +426,7 @@ StatusWith ReplSetDistLockManager::lockWithSessionID(OperationCo } StatusWith ReplSetDistLockManager::tryLockWithLocalWriteConcern( - OperationContext* txn, StringData name, StringData whyMessage) { - const DistLockHandle lockSessionID = OID::gen(); + OperationContext* txn, StringData name, StringData whyMessage, const OID& lockSessionID) { const string who = str::stream() << _processID << ":" << getThreadName(); auto lockStatus = _catalog->grabLock(txn, diff --git a/src/mongo/s/catalog/replset/replset_dist_lock_manager.h b/src/mongo/s/catalog/replset/replset_dist_lock_manager.h index 4731cd0bf3a..365f768f52b 100644 --- a/src/mongo/s/catalog/replset/replset_dist_lock_manager.h +++ b/src/mongo/s/catalog/replset/replset_dist_lock_manager.h @@ -76,7 +76,8 @@ public: StatusWith tryLockWithLocalWriteConcern(OperationContext* txn, StringData name, - StringData whyMessage) override; + StringData whyMessage, + const OID& lockSessionID) override; void unlock(OperationContext* txn, const DistLockHandle& lockSessionID) override; diff --git a/src/mongo/s/catalog/replset/replset_dist_lock_manager_test.cpp b/src/mongo/s/catalog/replset/replset_dist_lock_manager_test.cpp index cadd4f6cfe9..d46a20c2913 100644 --- a/src/mongo/s/catalog/replset/replset_dist_lock_manager_test.cpp +++ b/src/mongo/s/catalog/replset/replset_dist_lock_manager_test.cpp @@ -1987,7 +1987,7 @@ TEST_F(ReplSetDistLockManagerFixture, TryLockWithLocalWriteConcernBusy) { // Will be different from the actual lock session id. For testing only. retLockDoc.setLockID(OID::gen()); - OID lockSessionIDPassed; + OID lockSessionIDPassed = OID::gen(); getMockCatalog()->expectGrabLock( [this, &lockName, &now, &whyMsg, &lockSessionIDPassed](StringData lockID, @@ -2001,14 +2001,14 @@ TEST_F(ReplSetDistLockManagerFixture, TryLockWithLocalWriteConcernBusy) { ASSERT_EQUALS(getProcessID(), processId); ASSERT_GREATER_THAN_OR_EQUALS(time, now); ASSERT_EQUALS(whyMsg, why); + ASSERT_EQUALS(lockSessionIDPassed, lockSessionID); - lockSessionIDPassed = lockSessionID; getMockCatalog()->expectNoGrabLock(); // Call only once. }, {ErrorCodes::LockStateChangeFailed, "Unable to take lock"}); - auto lockStatus = - distLock()->tryLockWithLocalWriteConcern(operationContext(), lockName, whyMsg); + auto lockStatus = distLock()->tryLockWithLocalWriteConcern( + operationContext(), lockName, whyMsg, lockSessionIDPassed); ASSERT_EQ(ErrorCodes::LockBusy, lockStatus.getStatus()); } -- cgit v1.2.1