diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2020-08-07 06:16:48 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-08-07 19:48:44 +0000 |
commit | 2931f3c764f8e093ae31adb4b2b62c1ce01d7421 (patch) | |
tree | 77c8f40a0db32708f106b6498e9479dc650037af /src/mongo/db/s | |
parent | f72268197eb8845b11ea24dcf458db4a0c463034 (diff) | |
download | mongo-2931f3c764f8e093ae31adb4b2b62c1ce01d7421.tar.gz |
SERVER-50174 Make MigrationCoordinator recovery acquire the MigrationBlockingGuard
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r-- | src/mongo/db/s/migration_coordinator.cpp | 40 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.h | 13 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.cpp | 31 |
4 files changed, 61 insertions, 43 deletions
diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp index a4fe84afb6d..22102845062 100644 --- a/src/mongo/db/s/migration_coordinator.cpp +++ b/src/mongo/db/s/migration_coordinator.cpp @@ -182,15 +182,17 @@ SemiFuture<void> MigrationCoordinator::_commitMigrationOnDonorAndRecipient( 23894, 2, "Making commit decision durable", "migrationId"_attr = _migrationInfo.getId()); migrationutil::persistCommitDecision(opCtx, _migrationInfo.getId()); - LOGV2_DEBUG(23895, - 2, - "Bumping transaction number with lsid {lsid} and current txnNumber " - "{currentTxnNumber} on recipient shard {recipientShardId}", - "Bumping transaction number on recipient shard", - "recipientShardId"_attr = _migrationInfo.getRecipientShardId(), - "lsid"_attr = _migrationInfo.getLsid().toBSON(), - "currentTxnNumber"_attr = _migrationInfo.getTxnNumber(), - "migrationId"_attr = _migrationInfo.getId()); + LOGV2_DEBUG( + 23895, + 2, + "Bumping transaction number with lsid {lsid} and current txnNumber {currentTxnNumber} on " + "recipient shard {recipientShardId} for commit of collection {nss}", + "Bumping transaction number on recipient shard for commit", + "namespace"_attr = _migrationInfo.getNss(), + "recipientShardId"_attr = _migrationInfo.getRecipientShardId(), + "lsid"_attr = _migrationInfo.getLsid(), + "currentTxnNumber"_attr = _migrationInfo.getTxnNumber(), + "migrationId"_attr = _migrationInfo.getId()); migrationutil::advanceTransactionOnRecipient(opCtx, _migrationInfo.getRecipientShardId(), _migrationInfo.getLsid(), @@ -233,15 +235,17 @@ void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext* 23899, 2, "Making abort decision durable", "migrationId"_attr = _migrationInfo.getId()); migrationutil::persistAbortDecision(opCtx, _migrationInfo.getId()); - LOGV2_DEBUG(23900, - 2, - "Bumping transaction number with lsid {lsid} and current txnNumber " - "{currentTxnNumber} on recipient shard {recipientShardId}", - "Bumping transaction number on recipient shard", - "recipientShardId"_attr = _migrationInfo.getRecipientShardId(), - "lsid"_attr = _migrationInfo.getLsid().toBSON(), - "currentTxnNumber"_attr = _migrationInfo.getTxnNumber(), - "migrationId"_attr = _migrationInfo.getId()); + LOGV2_DEBUG( + 23900, + 2, + "Bumping transaction number with lsid {lsid} and current txnNumber {currentTxnNumber} on " + "recipient shard {recipientShardId} for abort of collection {nss}", + "Bumping transaction number on recipient shard for abort", + "namespace"_attr = _migrationInfo.getNss(), + "recipientShardId"_attr = _migrationInfo.getRecipientShardId(), + "lsid"_attr = _migrationInfo.getLsid(), + "currentTxnNumber"_attr = _migrationInfo.getTxnNumber(), + "migrationId"_attr = _migrationInfo.getId()); migrationutil::advanceTransactionOnRecipient(opCtx, _migrationInfo.getRecipientShardId(), _migrationInfo.getLsid(), diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 5e59b07115f..02539348de4 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -145,15 +145,14 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, onShardVersionMismatch(_opCtx, getNss(), boost::none); // Snapshot the committed metadata from the time the migration starts - const auto collectionMetadataAndUUID = [&] { + const auto [collectionMetadata, collectionUUID] = [&] { UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); AutoGetCollection autoColl(_opCtx, getNss(), MODE_IS); uassert(ErrorCodes::InvalidOptions, "cannot move chunks for a collection that doesn't exist", autoColl.getCollection()); - boost::optional<UUID> collectionUUID; - collectionUUID = autoColl.getCollection()->uuid(); + UUID collectionUUID = autoColl.getCollection()->uuid(); auto optMetadata = CollectionShardingRuntime::get(_opCtx, getNss())->getCurrentMetadataIfKnown(); @@ -169,8 +168,6 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, return std::make_tuple(std::move(metadata), std::move(collectionUUID)); }(); - const auto& collectionMetadata = std::get<0>(collectionMetadataAndUUID); - const auto collectionVersion = collectionMetadata.getCollVersion(); const auto shardVersion = collectionMetadata.getShardVersion(); @@ -195,11 +192,12 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, str::stream() << "Unable to move chunk with arguments '" << redact(_args.toString())); + _collectionEpoch = collectionVersion.epoch(); + _collectionUUID = collectionUUID; + _chunkVersion = collectionMetadata.getChunkManager() ->findIntersectingChunkWithSimpleCollation(_args.getMinKey()) .getLastmod(); - _collectionEpoch = collectionVersion.epoch(); - _collectionUuid = std::get<1>(collectionMetadataAndUUID); } MigrationSourceManager::~MigrationSourceManager() { @@ -259,7 +257,7 @@ Status MigrationSourceManager::startClone() { _args.getFromShardId(), _args.getToShardId(), getNss(), - _collectionUuid.get(), + *_collectionUUID, ChunkRange(_args.getMinKey(), _args.getMaxKey()), _chunkVersion, _args.getWaitForDelete()); @@ -617,10 +615,10 @@ CollectionMetadata MigrationSourceManager::_getCurrentMetadataAndCheckEpoch() { uassert(ErrorCodes::ConflictingOperationInProgress, str::stream() << "The collection's epoch has changed since the migration began. " "Expected collection epoch: " - << _collectionEpoch.toString() << ", but found: " + << _collectionEpoch->toString() << ", but found: " << (metadata.isSharded() ? metadata.getCollVersion().epoch().toString() : "unsharded collection"), - metadata.isSharded() && metadata.getCollVersion().epoch() == _collectionEpoch); + metadata.isSharded() && metadata.getCollVersion().epoch() == *_collectionEpoch); return metadata; } @@ -649,7 +647,7 @@ void MigrationSourceManager::_notifyChangeStreamsOnRecipientFirstChunk( _opCtx, "migrateChunkToNewShard", NamespaceString::kRsOplogNamespace.ns(), [&] { WriteUnitOfWork uow(_opCtx); serviceContext->getOpObserver()->onInternalOpMessage( - _opCtx, getNss(), _collectionUuid, BSON("msg" << dbgMessage), o2Message); + _opCtx, getNss(), *_collectionUUID, BSON("msg" << dbgMessage), o2Message); uow.commit(); }); } diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index efefcda401e..3bbae9988af 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -242,16 +242,15 @@ private: // The current state. Used only for diagnostics and validation. State _state{kCreated}; + // The epoch of the collection being migrated and its UUID, as of the time the migration + // started. Values are boost::optional up until the constructor runs, because UUID doesn't have + // a default constructor. + boost::optional<OID> _collectionEpoch; + boost::optional<UUID> _collectionUUID; + // The version of the chunk at the time the migration started. ChunkVersion _chunkVersion; - // The version of the collection at the time migration started. - OID _collectionEpoch; - - // The UUID of the the collection whose chunks are being moved. Default to empty if the - // collection doesn't have UUID. - boost::optional<UUID> _collectionUuid; - // Contains logic for ensuring the donor's and recipient's config.rangeDeletions entries are // correctly updated based on whether the migration committed or aborted. std::unique_ptr<migrationutil::MigrationCoordinator> _coordinator; diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index 1c2cc75c844..6d240a3a23b 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -776,15 +776,25 @@ void ensureChunkVersionIsGreaterThan(OperationContext* opCtx, } void resumeMigrationCoordinationsOnStepUp(OperationContext* opCtx) { - LOGV2_DEBUG(4798510, 2, "Starting migration coordinator stepup recovery"); + LOGV2_DEBUG(4798510, 2, "Starting migration coordinator step-up recovery"); unsigned long long unfinishedMigrationsCount = 0; + PersistentTaskStore<MigrationCoordinatorDocument> store( NamespaceString::kMigrationCoordinatorsNamespace); - Query query; store.forEach(opCtx, - query, + Query{}, [&opCtx, &unfinishedMigrationsCount](const MigrationCoordinatorDocument& doc) { + // MigrationCoordinators are only created under the MigrationBlockingGuard, + // which means that only one can possibly exist on an instance at a time. + // Furthermore, recovery of an incomplete MigrationCoordator also acquires the + // MigrationBlockingGuard. Because of this it is not possible to have more + // than one unfinished migration. + invariant(unfinishedMigrationsCount == 0, + str::stream() + << "Upon step-up a second migration coordinator was found" + << redact(doc.toBSON())); + unfinishedMigrationsCount++; LOGV2_DEBUG(4798511, 3, @@ -792,15 +802,21 @@ void resumeMigrationCoordinationsOnStepUp(OperationContext* opCtx) { "migrationCoordinatorDoc"_attr = redact(doc.toBSON()), "unfinishedMigrationsCount"_attr = unfinishedMigrationsCount); - const auto nss = doc.getNss(); + const auto& nss = doc.getNss(); + { AutoGetCollection autoColl(opCtx, nss, MODE_IX); CollectionShardingRuntime::get(opCtx, nss)->clearFilteringMetadata(opCtx); } - const auto serviceContext = opCtx->getServiceContext(); + auto mbg = std::make_shared<MigrationBlockingGuard>( + opCtx, + str::stream() << "Recovery of migration session " + << doc.getMigrationSessionId().toString() + << " on collection " << nss); + ExecutorFuture<void>(getMigrationUtilExecutor()) - .then([serviceContext, nss] { + .then([serviceContext = opCtx->getServiceContext(), nss, mbg] { ThreadClient tc("TriggerMigrationRecovery", serviceContext); { stdx::lock_guard<Client> lk(*tc.get()); @@ -826,9 +842,10 @@ void resumeMigrationCoordinationsOnStepUp(OperationContext* opCtx) { ShardingStatistics::get(opCtx).unfinishedMigrationFromPreviousPrimary.store( unfinishedMigrationsCount); + LOGV2_DEBUG(4798513, 2, - "Finished migration coordinator stepup recovery", + "Finished migration coordinator step-up recovery", "unfinishedMigrationsCount"_attr = unfinishedMigrationsCount); } |