diff options
author | Marcos José Grillo Ramírez <marcos.grillo@mongodb.com> | 2020-06-16 17:15:16 +0200 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-06-17 12:42:00 +0000 |
commit | 1edd4798f8e72f226ad69269a8b0154b247a8049 (patch) | |
tree | d1304ce4dd9bb175c5c789edbaa11d469ef275ff /src/mongo/db/s | |
parent | 1bfaa4c1ff3be51bea5e63b9b0f0f6d693d9e36c (diff) | |
download | mongo-1edd4798f8e72f226ad69269a8b0154b247a8049.tar.gz |
SERVER-47982 Change the shard version update procedure of the migration source manager
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 63 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.h | 2 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.cpp | 136 | ||||
-rw-r--r-- | src/mongo/db/s/shard_filtering_metadata_refresh.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/shard_filtering_metadata_refresh.h | 2 |
5 files changed, 97 insertions, 108 deletions
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 643aa08f353..dfdaf253b1a 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -140,7 +140,8 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, "requestParameters"_attr = redact(_args.toString()), "collectionEpoch"_attr = _args.getVersionEpoch()); - // Force refresh of the metadata to ensure we have the latest + // Make sure the latest shard version is recovered as of the time of the invocation of the + // command. onShardVersionMismatch(_opCtx, getNss(), boost::none); // Snapshot the committed metadata from the time the migration starts @@ -440,29 +441,43 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig() { Shard::CommandResponse::getEffectiveStatus(commitChunkMigrationResponse); if (!migrationCommitStatus.isOK()) { - migrationutil::ensureChunkVersionIsGreaterThan(_opCtx, _args.getRange(), _chunkVersion); - } - - migrationutil::refreshFilteringMetadataUntilSuccess(_opCtx, getNss()); - - const auto refreshedMetadata = _getCurrentMetadataAndCheckEpoch(); + { + UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); + AutoGetCollection autoColl(_opCtx, getNss(), MODE_IX); + auto* const csr = CollectionShardingRuntime::get(_opCtx, getNss()); + auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr); - if (refreshedMetadata.keyBelongsToMe(_args.getMinKey())) { - // This condition may only happen if the migration commit has failed for any reason - if (migrationCommitStatus.isOK()) { - return {ErrorCodes::ConflictingOperationInProgress, - "Migration commit succeeded but refresh found that the chunk is still owned; " - "this node may be a stale primary of its replica set, and the new primary may " - "have re-received the chunk"}; + CollectionShardingRuntime::get(_opCtx, getNss())->clearFilteringMetadata(); } + scopedGuard.dismiss(); + _cleanup(false); + // Best-effort recover of the shard version. + onShardVersionMismatchNoExcept(_opCtx, getNss(), boost::none).ignore(); + return migrationCommitStatus; + } - _coordinator->setMigrationDecision(migrationutil::MigrationCoordinator::Decision::kAborted); + try { + forceShardFilteringMetadataRefresh(_opCtx, getNss(), true); + } catch (const DBException& ex) { + { + UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); + AutoGetCollection autoColl(_opCtx, getNss(), MODE_IX); + auto* const csr = CollectionShardingRuntime::get(_opCtx, getNss()); + auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr); - // The chunk modification was not applied, so report the original error - return migrationCommitStatus.withContext("Chunk move was not successful"); + CollectionShardingRuntime::get(_opCtx, getNss())->clearFilteringMetadata(); + } + scopedGuard.dismiss(); + _cleanup(false); + // Best-effort recover of the shard version. + onShardVersionMismatchNoExcept(_opCtx, getNss(), boost::none).ignore(); + return ex.toStatus(); } // Migration succeeded + + const auto refreshedMetadata = _getCurrentMetadataAndCheckEpoch(); + LOGV2(22018, "Migration succeeded and updated collection version to {updatedCollectionVersion}", "Migration succeeded and updated collection version", @@ -479,7 +494,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig() { // Exit the critical section and ensure that all the necessary state is fully persisted before // scheduling orphan cleanup. - _cleanup(); + _cleanup(true); ShardingLogging::get(_opCtx)->logChange( _opCtx, @@ -537,7 +552,7 @@ void MigrationSourceManager::cleanupOnError() { ShardingCatalogClient::kMajorityWriteConcern); try { - _cleanup(); + _cleanup(true); } catch (const DBException& ex) { LOGV2_WARNING(22022, "Failed to clean up migration with request parameters " @@ -613,7 +628,7 @@ void MigrationSourceManager::_notifyChangeStreamsOnRecipientFirstChunk( }); } -void MigrationSourceManager::_cleanup() { +void MigrationSourceManager::_cleanup(bool completeMigration) { invariant(_state != kDone); auto cloneDriver = [&]() { @@ -668,15 +683,15 @@ void MigrationSourceManager::_cleanup() { ShardingStateRecovery::endMetadataOp(_opCtx); } - if (_state >= kCloning) { + if (completeMigration && _state >= kCloning) { invariant(_coordinator); if (_state < kCommittingOnConfig) { _coordinator->setMigrationDecision( migrationutil::MigrationCoordinator::Decision::kAborted); } - // This can be called on an exception path after the OperationContext has been - // interrupted, so use a new OperationContext. Note, it's valid to call - // getServiceContext on an interrupted OperationContext. + // This can be called on an exception path after the OperationContext has been interrupted, + // so use a new OperationContext. Note, it's valid to call getServiceContext on an + // interrupted OperationContext. auto newClient = _opCtx->getServiceContext()->makeClient("MigrationCoordinator"); { stdx::lock_guard<Client> lk(*newClient.get()); diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index 5f43c9f9780..efefcda401e 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -214,7 +214,7 @@ private: * Called when any of the states fails. May only be called once and will put the migration * manager into the kDone state. */ - void _cleanup(); + void _cleanup(bool completeMigration); // This is the opCtx of the moveChunk request that constructed the MigrationSourceManager. // The caller must guarantee it outlives the MigrationSourceManager. diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index 9347e31f506..90c3b9a9f31 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -580,24 +580,20 @@ void persistRangeDeletionTaskLocally(OperationContext* opCtx, } void persistCommitDecision(OperationContext* opCtx, const UUID& migrationId) { - retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown( - opCtx, "persist migrate commit decision", [&](OperationContext* newOpCtx) { - hangInPersistMigrateCommitDecisionInterruptible.pauseWhileSet(newOpCtx); + hangInPersistMigrateCommitDecisionInterruptible.pauseWhileSet(opCtx); - PersistentTaskStore<MigrationCoordinatorDocument> store( - NamespaceString::kMigrationCoordinatorsNamespace); - store.update(newOpCtx, - QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationId), - BSON("$set" << BSON(MigrationCoordinatorDocument::kDecisionFieldName - << "committed"))); + PersistentTaskStore<MigrationCoordinatorDocument> store( + NamespaceString::kMigrationCoordinatorsNamespace); + store.update( + opCtx, + QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationId), + BSON("$set" << BSON(MigrationCoordinatorDocument::kDecisionFieldName << "committed"))); - if (hangInPersistMigrateCommitDecisionThenSimulateErrorUninterruptible.shouldFail()) { - hangInPersistMigrateCommitDecisionThenSimulateErrorUninterruptible.pauseWhileSet( - newOpCtx); - uasserted(ErrorCodes::InternalError, - "simulate an error response when persisting migrate commit decision"); - } - }); + if (hangInPersistMigrateCommitDecisionThenSimulateErrorUninterruptible.shouldFail()) { + hangInPersistMigrateCommitDecisionThenSimulateErrorUninterruptible.pauseWhileSet(opCtx); + uasserted(ErrorCodes::InternalError, + "simulate an error response when persisting migrate commit decision"); + } } void persistAbortDecision(OperationContext* opCtx, const UUID& migrationId) { @@ -629,23 +625,18 @@ void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx, false /*multi*/); deleteOp.setDeletes({query}); - retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown( - opCtx, "cancel range deletion on recipient", [&](OperationContext* newOpCtx) { - hangInDeleteRangeDeletionOnRecipientInterruptible.pauseWhileSet(newOpCtx); + hangInDeleteRangeDeletionOnRecipientInterruptible.pauseWhileSet(opCtx); - sendToRecipient( - newOpCtx, - recipientId, - deleteOp, - BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)); + sendToRecipient(opCtx, + recipientId, + deleteOp, + BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)); - if (hangInDeleteRangeDeletionOnRecipientThenSimulateErrorUninterruptible.shouldFail()) { - hangInDeleteRangeDeletionOnRecipientThenSimulateErrorUninterruptible.pauseWhileSet( - newOpCtx); - uasserted(ErrorCodes::InternalError, - "simulate an error response when deleting range deletion on recipient"); - } - }); + if (hangInDeleteRangeDeletionOnRecipientThenSimulateErrorUninterruptible.shouldFail()) { + hangInDeleteRangeDeletionOnRecipientThenSimulateErrorUninterruptible.pauseWhileSet(opCtx); + uasserted(ErrorCodes::InternalError, + "simulate an error response when deleting range deletion on recipient"); + } } void deleteRangeDeletionTaskLocally(OperationContext* opCtx, @@ -716,17 +707,14 @@ void advanceTransactionOnRecipient(OperationContext* opCtx, << WriteConcernOptions::Majority << "lsid" << lsid.toBSON() << "txnNumber" << currentTxnNumber + 1); - retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown( - opCtx, "advance migration txn number", [&](OperationContext* newOpCtx) { - hangInAdvanceTxnNumInterruptible.pauseWhileSet(newOpCtx); - sendToRecipient(newOpCtx, recipientId, updateOp, passthroughFields); + hangInAdvanceTxnNumInterruptible.pauseWhileSet(opCtx); + sendToRecipient(opCtx, recipientId, updateOp, passthroughFields); - if (hangInAdvanceTxnNumThenSimulateErrorUninterruptible.shouldFail()) { - hangInAdvanceTxnNumThenSimulateErrorUninterruptible.pauseWhileSet(newOpCtx); - uasserted(ErrorCodes::InternalError, - "simulate an error response when initiating range deletion locally"); - } - }); + if (hangInAdvanceTxnNumThenSimulateErrorUninterruptible.shouldFail()) { + hangInAdvanceTxnNumThenSimulateErrorUninterruptible.pauseWhileSet(opCtx); + uasserted(ErrorCodes::InternalError, + "simulate an error response when initiating range deletion locally"); + } } void markAsReadyRangeDeletionTaskLocally(OperationContext* opCtx, const UUID& migrationId) { @@ -734,18 +722,14 @@ void markAsReadyRangeDeletionTaskLocally(OperationContext* opCtx, const UUID& mi auto query = QUERY(RangeDeletionTask::kIdFieldName << migrationId); auto update = BSON("$unset" << BSON(RangeDeletionTask::kPendingFieldName << "")); - retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown( - opCtx, "ready local range deletion", [&](OperationContext* newOpCtx) { - hangInReadyRangeDeletionLocallyInterruptible.pauseWhileSet(newOpCtx); - store.update(newOpCtx, query, update); + hangInReadyRangeDeletionLocallyInterruptible.pauseWhileSet(opCtx); + store.update(opCtx, query, update); - if (hangInReadyRangeDeletionLocallyThenSimulateErrorUninterruptible.shouldFail()) { - hangInReadyRangeDeletionLocallyThenSimulateErrorUninterruptible.pauseWhileSet( - newOpCtx); - uasserted(ErrorCodes::InternalError, - "simulate an error response when initiating range deletion locally"); - } - }); + if (hangInReadyRangeDeletionLocallyThenSimulateErrorUninterruptible.shouldFail()) { + hangInReadyRangeDeletionLocallyThenSimulateErrorUninterruptible.pauseWhileSet(opCtx); + uasserted(ErrorCodes::InternalError, + "simulate an error response when initiating range deletion locally"); + } } void deleteMigrationCoordinatorDocumentLocally(OperationContext* opCtx, const UUID& migrationId) { @@ -767,35 +751,25 @@ void ensureChunkVersionIsGreaterThan(OperationContext* opCtx, const auto ensureChunkVersionIsGreaterThanRequestBSON = ensureChunkVersionIsGreaterThanRequest.toBSON({}); - retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown( - opCtx, "ensureChunkVersionIsGreaterThan", [&](OperationContext* newOpCtx) { - hangInEnsureChunkVersionIsGreaterThanInterruptible.pauseWhileSet(newOpCtx); - - const auto ensureChunkVersionIsGreaterThanResponse = - Grid::get(newOpCtx) - ->shardRegistry() - ->getConfigShard() - ->runCommandWithFixedRetryAttempts( - newOpCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - "admin", - CommandHelpers::appendMajorityWriteConcern( - ensureChunkVersionIsGreaterThanRequestBSON), - Shard::RetryPolicy::kIdempotent); - const auto ensureChunkVersionIsGreaterThanStatus = - Shard::CommandResponse::getEffectiveStatus(ensureChunkVersionIsGreaterThanResponse); - - uassertStatusOK(ensureChunkVersionIsGreaterThanStatus); - - if (hangInEnsureChunkVersionIsGreaterThanThenSimulateErrorUninterruptible - .shouldFail()) { - hangInEnsureChunkVersionIsGreaterThanThenSimulateErrorUninterruptible - .pauseWhileSet(); - uasserted( - ErrorCodes::InternalError, - "simulate an error response for _configsvrEnsureChunkVersionIsGreaterThan"); - } - }); + hangInEnsureChunkVersionIsGreaterThanInterruptible.pauseWhileSet(opCtx); + + const auto ensureChunkVersionIsGreaterThanResponse = + Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + CommandHelpers::appendMajorityWriteConcern(ensureChunkVersionIsGreaterThanRequestBSON), + Shard::RetryPolicy::kIdempotent); + const auto ensureChunkVersionIsGreaterThanStatus = + Shard::CommandResponse::getEffectiveStatus(ensureChunkVersionIsGreaterThanResponse); + + uassertStatusOK(ensureChunkVersionIsGreaterThanStatus); + + if (hangInEnsureChunkVersionIsGreaterThanThenSimulateErrorUninterruptible.shouldFail()) { + hangInEnsureChunkVersionIsGreaterThanThenSimulateErrorUninterruptible.pauseWhileSet(); + uasserted(ErrorCodes::InternalError, + "simulate an error response for _configsvrEnsureChunkVersionIsGreaterThan"); + } } void refreshFilteringMetadataUntilSuccess(OperationContext* opCtx, const NamespaceString& nss) { diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp index b4081648073..ec549acf8de 100644 --- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp +++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp @@ -289,7 +289,7 @@ void ScopedShardVersionCriticalSection::enterCommitPhase() { Status onShardVersionMismatchNoExcept(OperationContext* opCtx, const NamespaceString& nss, - ChunkVersion shardVersionReceived) noexcept { + boost::optional<ChunkVersion> shardVersionReceived) noexcept { try { onShardVersionMismatch(opCtx, nss, shardVersionReceived); return Status::OK(); diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.h b/src/mongo/db/s/shard_filtering_metadata_refresh.h index 5f29105bb8c..d475efeddf0 100644 --- a/src/mongo/db/s/shard_filtering_metadata_refresh.h +++ b/src/mongo/db/s/shard_filtering_metadata_refresh.h @@ -57,7 +57,7 @@ class OperationContext; */ Status onShardVersionMismatchNoExcept(OperationContext* opCtx, const NamespaceString& nss, - ChunkVersion shardVersionReceived) noexcept; + boost::optional<ChunkVersion> shardVersionReceived) noexcept; void onShardVersionMismatch(OperationContext* opCtx, const NamespaceString& nss, |