summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorMarcos José Grillo Ramírez <marcos.grillo@mongodb.com>2020-06-16 17:15:16 +0200
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-06-17 12:42:00 +0000
commit1edd4798f8e72f226ad69269a8b0154b247a8049 (patch)
treed1304ce4dd9bb175c5c789edbaa11d469ef275ff /src/mongo/db/s
parent1bfaa4c1ff3be51bea5e63b9b0f0f6d693d9e36c (diff)
downloadmongo-1edd4798f8e72f226ad69269a8b0154b247a8049.tar.gz
SERVER-47982 Change the shard version update procedure of the migration source manager
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp63
-rw-r--r--src/mongo/db/s/migration_source_manager.h2
-rw-r--r--src/mongo/db/s/migration_util.cpp136
-rw-r--r--src/mongo/db/s/shard_filtering_metadata_refresh.cpp2
-rw-r--r--src/mongo/db/s/shard_filtering_metadata_refresh.h2
5 files changed, 97 insertions, 108 deletions
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 643aa08f353..dfdaf253b1a 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -140,7 +140,8 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx,
"requestParameters"_attr = redact(_args.toString()),
"collectionEpoch"_attr = _args.getVersionEpoch());
- // Force refresh of the metadata to ensure we have the latest
+ // Make sure the latest shard version is recovered as of the time of the invocation of the
+ // command.
onShardVersionMismatch(_opCtx, getNss(), boost::none);
// Snapshot the committed metadata from the time the migration starts
@@ -440,29 +441,43 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig() {
Shard::CommandResponse::getEffectiveStatus(commitChunkMigrationResponse);
if (!migrationCommitStatus.isOK()) {
- migrationutil::ensureChunkVersionIsGreaterThan(_opCtx, _args.getRange(), _chunkVersion);
- }
-
- migrationutil::refreshFilteringMetadataUntilSuccess(_opCtx, getNss());
-
- const auto refreshedMetadata = _getCurrentMetadataAndCheckEpoch();
+ {
+ UninterruptibleLockGuard noInterrupt(_opCtx->lockState());
+ AutoGetCollection autoColl(_opCtx, getNss(), MODE_IX);
+ auto* const csr = CollectionShardingRuntime::get(_opCtx, getNss());
+ auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr);
- if (refreshedMetadata.keyBelongsToMe(_args.getMinKey())) {
- // This condition may only happen if the migration commit has failed for any reason
- if (migrationCommitStatus.isOK()) {
- return {ErrorCodes::ConflictingOperationInProgress,
- "Migration commit succeeded but refresh found that the chunk is still owned; "
- "this node may be a stale primary of its replica set, and the new primary may "
- "have re-received the chunk"};
+ CollectionShardingRuntime::get(_opCtx, getNss())->clearFilteringMetadata();
}
+ scopedGuard.dismiss();
+ _cleanup(false);
+ // Best-effort recover of the shard version.
+ onShardVersionMismatchNoExcept(_opCtx, getNss(), boost::none).ignore();
+ return migrationCommitStatus;
+ }
- _coordinator->setMigrationDecision(migrationutil::MigrationCoordinator::Decision::kAborted);
+ try {
+ forceShardFilteringMetadataRefresh(_opCtx, getNss(), true);
+ } catch (const DBException& ex) {
+ {
+ UninterruptibleLockGuard noInterrupt(_opCtx->lockState());
+ AutoGetCollection autoColl(_opCtx, getNss(), MODE_IX);
+ auto* const csr = CollectionShardingRuntime::get(_opCtx, getNss());
+ auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr);
- // The chunk modification was not applied, so report the original error
- return migrationCommitStatus.withContext("Chunk move was not successful");
+ CollectionShardingRuntime::get(_opCtx, getNss())->clearFilteringMetadata();
+ }
+ scopedGuard.dismiss();
+ _cleanup(false);
+ // Best-effort recover of the shard version.
+ onShardVersionMismatchNoExcept(_opCtx, getNss(), boost::none).ignore();
+ return ex.toStatus();
}
// Migration succeeded
+
+ const auto refreshedMetadata = _getCurrentMetadataAndCheckEpoch();
+
LOGV2(22018,
"Migration succeeded and updated collection version to {updatedCollectionVersion}",
"Migration succeeded and updated collection version",
@@ -479,7 +494,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig() {
// Exit the critical section and ensure that all the necessary state is fully persisted before
// scheduling orphan cleanup.
- _cleanup();
+ _cleanup(true);
ShardingLogging::get(_opCtx)->logChange(
_opCtx,
@@ -537,7 +552,7 @@ void MigrationSourceManager::cleanupOnError() {
ShardingCatalogClient::kMajorityWriteConcern);
try {
- _cleanup();
+ _cleanup(true);
} catch (const DBException& ex) {
LOGV2_WARNING(22022,
"Failed to clean up migration with request parameters "
@@ -613,7 +628,7 @@ void MigrationSourceManager::_notifyChangeStreamsOnRecipientFirstChunk(
});
}
-void MigrationSourceManager::_cleanup() {
+void MigrationSourceManager::_cleanup(bool completeMigration) {
invariant(_state != kDone);
auto cloneDriver = [&]() {
@@ -668,15 +683,15 @@ void MigrationSourceManager::_cleanup() {
ShardingStateRecovery::endMetadataOp(_opCtx);
}
- if (_state >= kCloning) {
+ if (completeMigration && _state >= kCloning) {
invariant(_coordinator);
if (_state < kCommittingOnConfig) {
_coordinator->setMigrationDecision(
migrationutil::MigrationCoordinator::Decision::kAborted);
}
- // This can be called on an exception path after the OperationContext has been
- // interrupted, so use a new OperationContext. Note, it's valid to call
- // getServiceContext on an interrupted OperationContext.
+ // This can be called on an exception path after the OperationContext has been interrupted,
+ // so use a new OperationContext. Note, it's valid to call getServiceContext on an
+ // interrupted OperationContext.
auto newClient = _opCtx->getServiceContext()->makeClient("MigrationCoordinator");
{
stdx::lock_guard<Client> lk(*newClient.get());
diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h
index 5f43c9f9780..efefcda401e 100644
--- a/src/mongo/db/s/migration_source_manager.h
+++ b/src/mongo/db/s/migration_source_manager.h
@@ -214,7 +214,7 @@ private:
* Called when any of the states fails. May only be called once and will put the migration
* manager into the kDone state.
*/
- void _cleanup();
+ void _cleanup(bool completeMigration);
// This is the opCtx of the moveChunk request that constructed the MigrationSourceManager.
// The caller must guarantee it outlives the MigrationSourceManager.
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index 9347e31f506..90c3b9a9f31 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -580,24 +580,20 @@ void persistRangeDeletionTaskLocally(OperationContext* opCtx,
}
void persistCommitDecision(OperationContext* opCtx, const UUID& migrationId) {
- retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown(
- opCtx, "persist migrate commit decision", [&](OperationContext* newOpCtx) {
- hangInPersistMigrateCommitDecisionInterruptible.pauseWhileSet(newOpCtx);
+ hangInPersistMigrateCommitDecisionInterruptible.pauseWhileSet(opCtx);
- PersistentTaskStore<MigrationCoordinatorDocument> store(
- NamespaceString::kMigrationCoordinatorsNamespace);
- store.update(newOpCtx,
- QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationId),
- BSON("$set" << BSON(MigrationCoordinatorDocument::kDecisionFieldName
- << "committed")));
+ PersistentTaskStore<MigrationCoordinatorDocument> store(
+ NamespaceString::kMigrationCoordinatorsNamespace);
+ store.update(
+ opCtx,
+ QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationId),
+ BSON("$set" << BSON(MigrationCoordinatorDocument::kDecisionFieldName << "committed")));
- if (hangInPersistMigrateCommitDecisionThenSimulateErrorUninterruptible.shouldFail()) {
- hangInPersistMigrateCommitDecisionThenSimulateErrorUninterruptible.pauseWhileSet(
- newOpCtx);
- uasserted(ErrorCodes::InternalError,
- "simulate an error response when persisting migrate commit decision");
- }
- });
+ if (hangInPersistMigrateCommitDecisionThenSimulateErrorUninterruptible.shouldFail()) {
+ hangInPersistMigrateCommitDecisionThenSimulateErrorUninterruptible.pauseWhileSet(opCtx);
+ uasserted(ErrorCodes::InternalError,
+ "simulate an error response when persisting migrate commit decision");
+ }
}
void persistAbortDecision(OperationContext* opCtx, const UUID& migrationId) {
@@ -629,23 +625,18 @@ void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx,
false /*multi*/);
deleteOp.setDeletes({query});
- retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown(
- opCtx, "cancel range deletion on recipient", [&](OperationContext* newOpCtx) {
- hangInDeleteRangeDeletionOnRecipientInterruptible.pauseWhileSet(newOpCtx);
+ hangInDeleteRangeDeletionOnRecipientInterruptible.pauseWhileSet(opCtx);
- sendToRecipient(
- newOpCtx,
- recipientId,
- deleteOp,
- BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority));
+ sendToRecipient(opCtx,
+ recipientId,
+ deleteOp,
+ BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority));
- if (hangInDeleteRangeDeletionOnRecipientThenSimulateErrorUninterruptible.shouldFail()) {
- hangInDeleteRangeDeletionOnRecipientThenSimulateErrorUninterruptible.pauseWhileSet(
- newOpCtx);
- uasserted(ErrorCodes::InternalError,
- "simulate an error response when deleting range deletion on recipient");
- }
- });
+ if (hangInDeleteRangeDeletionOnRecipientThenSimulateErrorUninterruptible.shouldFail()) {
+ hangInDeleteRangeDeletionOnRecipientThenSimulateErrorUninterruptible.pauseWhileSet(opCtx);
+ uasserted(ErrorCodes::InternalError,
+ "simulate an error response when deleting range deletion on recipient");
+ }
}
void deleteRangeDeletionTaskLocally(OperationContext* opCtx,
@@ -716,17 +707,14 @@ void advanceTransactionOnRecipient(OperationContext* opCtx,
<< WriteConcernOptions::Majority << "lsid" << lsid.toBSON()
<< "txnNumber" << currentTxnNumber + 1);
- retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown(
- opCtx, "advance migration txn number", [&](OperationContext* newOpCtx) {
- hangInAdvanceTxnNumInterruptible.pauseWhileSet(newOpCtx);
- sendToRecipient(newOpCtx, recipientId, updateOp, passthroughFields);
+ hangInAdvanceTxnNumInterruptible.pauseWhileSet(opCtx);
+ sendToRecipient(opCtx, recipientId, updateOp, passthroughFields);
- if (hangInAdvanceTxnNumThenSimulateErrorUninterruptible.shouldFail()) {
- hangInAdvanceTxnNumThenSimulateErrorUninterruptible.pauseWhileSet(newOpCtx);
- uasserted(ErrorCodes::InternalError,
- "simulate an error response when initiating range deletion locally");
- }
- });
+ if (hangInAdvanceTxnNumThenSimulateErrorUninterruptible.shouldFail()) {
+ hangInAdvanceTxnNumThenSimulateErrorUninterruptible.pauseWhileSet(opCtx);
+ uasserted(ErrorCodes::InternalError,
+ "simulate an error response when initiating range deletion locally");
+ }
}
void markAsReadyRangeDeletionTaskLocally(OperationContext* opCtx, const UUID& migrationId) {
@@ -734,18 +722,14 @@ void markAsReadyRangeDeletionTaskLocally(OperationContext* opCtx, const UUID& mi
auto query = QUERY(RangeDeletionTask::kIdFieldName << migrationId);
auto update = BSON("$unset" << BSON(RangeDeletionTask::kPendingFieldName << ""));
- retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown(
- opCtx, "ready local range deletion", [&](OperationContext* newOpCtx) {
- hangInReadyRangeDeletionLocallyInterruptible.pauseWhileSet(newOpCtx);
- store.update(newOpCtx, query, update);
+ hangInReadyRangeDeletionLocallyInterruptible.pauseWhileSet(opCtx);
+ store.update(opCtx, query, update);
- if (hangInReadyRangeDeletionLocallyThenSimulateErrorUninterruptible.shouldFail()) {
- hangInReadyRangeDeletionLocallyThenSimulateErrorUninterruptible.pauseWhileSet(
- newOpCtx);
- uasserted(ErrorCodes::InternalError,
- "simulate an error response when initiating range deletion locally");
- }
- });
+ if (hangInReadyRangeDeletionLocallyThenSimulateErrorUninterruptible.shouldFail()) {
+ hangInReadyRangeDeletionLocallyThenSimulateErrorUninterruptible.pauseWhileSet(opCtx);
+ uasserted(ErrorCodes::InternalError,
+ "simulate an error response when initiating range deletion locally");
+ }
}
void deleteMigrationCoordinatorDocumentLocally(OperationContext* opCtx, const UUID& migrationId) {
@@ -767,35 +751,25 @@ void ensureChunkVersionIsGreaterThan(OperationContext* opCtx,
const auto ensureChunkVersionIsGreaterThanRequestBSON =
ensureChunkVersionIsGreaterThanRequest.toBSON({});
- retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown(
- opCtx, "ensureChunkVersionIsGreaterThan", [&](OperationContext* newOpCtx) {
- hangInEnsureChunkVersionIsGreaterThanInterruptible.pauseWhileSet(newOpCtx);
-
- const auto ensureChunkVersionIsGreaterThanResponse =
- Grid::get(newOpCtx)
- ->shardRegistry()
- ->getConfigShard()
- ->runCommandWithFixedRetryAttempts(
- newOpCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- "admin",
- CommandHelpers::appendMajorityWriteConcern(
- ensureChunkVersionIsGreaterThanRequestBSON),
- Shard::RetryPolicy::kIdempotent);
- const auto ensureChunkVersionIsGreaterThanStatus =
- Shard::CommandResponse::getEffectiveStatus(ensureChunkVersionIsGreaterThanResponse);
-
- uassertStatusOK(ensureChunkVersionIsGreaterThanStatus);
-
- if (hangInEnsureChunkVersionIsGreaterThanThenSimulateErrorUninterruptible
- .shouldFail()) {
- hangInEnsureChunkVersionIsGreaterThanThenSimulateErrorUninterruptible
- .pauseWhileSet();
- uasserted(
- ErrorCodes::InternalError,
- "simulate an error response for _configsvrEnsureChunkVersionIsGreaterThan");
- }
- });
+ hangInEnsureChunkVersionIsGreaterThanInterruptible.pauseWhileSet(opCtx);
+
+ const auto ensureChunkVersionIsGreaterThanResponse =
+ Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ CommandHelpers::appendMajorityWriteConcern(ensureChunkVersionIsGreaterThanRequestBSON),
+ Shard::RetryPolicy::kIdempotent);
+ const auto ensureChunkVersionIsGreaterThanStatus =
+ Shard::CommandResponse::getEffectiveStatus(ensureChunkVersionIsGreaterThanResponse);
+
+ uassertStatusOK(ensureChunkVersionIsGreaterThanStatus);
+
+ if (hangInEnsureChunkVersionIsGreaterThanThenSimulateErrorUninterruptible.shouldFail()) {
+ hangInEnsureChunkVersionIsGreaterThanThenSimulateErrorUninterruptible.pauseWhileSet();
+ uasserted(ErrorCodes::InternalError,
+ "simulate an error response for _configsvrEnsureChunkVersionIsGreaterThan");
+ }
}
void refreshFilteringMetadataUntilSuccess(OperationContext* opCtx, const NamespaceString& nss) {
diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
index b4081648073..ec549acf8de 100644
--- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
+++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
@@ -289,7 +289,7 @@ void ScopedShardVersionCriticalSection::enterCommitPhase() {
Status onShardVersionMismatchNoExcept(OperationContext* opCtx,
const NamespaceString& nss,
- ChunkVersion shardVersionReceived) noexcept {
+ boost::optional<ChunkVersion> shardVersionReceived) noexcept {
try {
onShardVersionMismatch(opCtx, nss, shardVersionReceived);
return Status::OK();
diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.h b/src/mongo/db/s/shard_filtering_metadata_refresh.h
index 5f29105bb8c..d475efeddf0 100644
--- a/src/mongo/db/s/shard_filtering_metadata_refresh.h
+++ b/src/mongo/db/s/shard_filtering_metadata_refresh.h
@@ -57,7 +57,7 @@ class OperationContext;
*/
Status onShardVersionMismatchNoExcept(OperationContext* opCtx,
const NamespaceString& nss,
- ChunkVersion shardVersionReceived) noexcept;
+ boost::optional<ChunkVersion> shardVersionReceived) noexcept;
void onShardVersionMismatch(OperationContext* opCtx,
const NamespaceString& nss,