diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2021-11-30 19:10:32 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-12-07 12:47:28 +0000 |
commit | 7879dd15deaa37f1f3a53a46e9877ef6b03b2f97 (patch) | |
tree | 1352fa51b77412f93270f04bab756eadb161b63d | |
parent | d69f240a64dea20ca163a93b6eeb6494ea1e4a24 (diff) | |
download | mongo-7879dd15deaa37f1f3a53a46e9877ef6b03b2f97.tar.gz |
SERVER-61759 Interrupt and join migrations when allowMigrations is set to false
-rw-r--r-- | jstests/libs/chunk_manipulation_util.js | 2 | ||||
-rw-r--r-- | jstests/sharding/move_chunk_allowMigrations.js | 4 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source.h | 2 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.h | 4 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.h | 8 | ||||
-rw-r--r-- | src/mongo/db/s/shard_filtering_metadata_refresh.cpp | 62 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_op_observer.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_util.cpp | 1 |
10 files changed, 68 insertions, 40 deletions
diff --git a/jstests/libs/chunk_manipulation_util.js b/jstests/libs/chunk_manipulation_util.js index 8f6b05ebf5d..3a7656420a3 100644 --- a/jstests/libs/chunk_manipulation_util.js +++ b/jstests/libs/chunk_manipulation_util.js @@ -138,7 +138,7 @@ function waitForMoveChunkStep(shardConnection, stepNumber) { assert.soon(function() { var inProgressStr = ''; - let in_progress = admin.aggregate([{$currentOp: {'allUsers': true}}]); + let in_progress = admin.aggregate([{$currentOp: {allUsers: true, idleConnections: true}}]); while (in_progress.hasNext()) { let op = in_progress.next(); diff --git a/jstests/sharding/move_chunk_allowMigrations.js b/jstests/sharding/move_chunk_allowMigrations.js index c07d3b9c0fa..83ca08daeeb 100644 --- a/jstests/sharding/move_chunk_allowMigrations.js +++ b/jstests/sharding/move_chunk_allowMigrations.js @@ -108,12 +108,12 @@ function setUpDatabaseAndEnableSharding(dbName) { assert.commandWorked(st.s.getDB(dbName).getCollection(collName).insert({_id: 1})); // Confirm that an inProgress moveChunk fails once {allowMigrations: false} - const fp = configureFailPoint(st.shard1, "migrateThreadHangAtStep4"); + const fp = configureFailPoint(st.shard0, "moveChunkHangAtStep4"); const awaitResult = startParallelShell( funWithArgs(function(ns, toShardName) { assert.commandFailedWithCode( db.adminCommand({moveChunk: ns, find: {_id: 0}, to: toShardName}), - ErrorCodes.ConflictingOperationInProgress); + [ErrorCodes.ConflictingOperationInProgress, ErrorCodes.Interrupted]); }, ns, st.shard1.shardName), st.s.port); fp.wait(); assert.commandWorked(st.configRS.getPrimary().adminCommand( diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h index a911c1c5efe..9c1a2e908f4 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source.h +++ b/src/mongo/db/s/migration_chunk_cloner_source.h @@ -113,7 +113,7 @@ public: * * NOTE: Must be called without any locks. */ - virtual void cancelClone(OperationContext* opCtx) = 0; + virtual void cancelClone(OperationContext* opCtx) noexcept = 0; // These methods are only meaningful for the legacy cloner and they are used as a way to keep a // running list of changes, which need to be fetched. diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index bc827b59b93..bb8eb56e260 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -358,7 +358,6 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationConte _sessionCatalogSource->onCommitCloneStarted(); } - auto responseStatus = _callRecipient(opCtx, [&] { BSONObjBuilder builder; builder.append(kRecvChunkCommit, _args.getNss().ns()); @@ -368,7 +367,7 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationConte }()); if (responseStatus.isOK()) { - _cleanup(opCtx); + _cleanup(); if (_sessionCatalogSource && _sessionCatalogSource->hasMoreOplog()) { return {ErrorCodes::SessionTransferIncomplete, @@ -383,7 +382,7 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationConte return responseStatus.getStatus(); } -void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* opCtx) { +void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* opCtx) noexcept { invariant(!opCtx->lockState()->isLocked()); if (_sessionCatalogSource) { @@ -407,7 +406,7 @@ void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* opCtx) { } // Intentional fall through case kNew: - _cleanup(opCtx); + _cleanup(); break; default: MONGO_UNREACHABLE; @@ -778,7 +777,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx, return Status::OK(); } -void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* opCtx) { +void MigrationChunkClonerSourceLegacy::_cleanup() { stdx::unique_lock<Latch> lk(_mutex); _state = kDone; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 08d8243acfe..f7c29efc16b 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -102,7 +102,7 @@ public: StatusWith<BSONObj> commitClone(OperationContext* opCtx, bool acquireCSOnRecipient) override; - void cancelClone(OperationContext* opCtx) override; + void cancelClone(OperationContext* opCtx) noexcept override; bool isDocumentInMigratingChunk(const BSONObj& doc) override; @@ -214,7 +214,7 @@ private: * Idempotent method, which cleans up any previously initialized state. It is safe to be called * at any time, but no methods should be called after it. */ - void _cleanup(OperationContext* opCtx); + void _cleanup(); /** * Synchronously invokes the recipient shard with the specified command and either returns the diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 7ee42d27e1b..0a0f0ab82e4 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -250,6 +250,8 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, MigrationSourceManager::~MigrationSourceManager() { invariant(!_cloneDriver); _stats.totalDonorMoveChunkTimeMillis.addAndFetch(_entireOpTimer.millis()); + + _completion.emplaceValue(); } void MigrationSourceManager::startClone() { @@ -337,7 +339,7 @@ void MigrationSourceManager::awaitToCatchUp() { _state = kCloneCaughtUp; _moveTimingHelper.done(4); - moveChunkHangAtStep4.pauseWhileSet(); + moveChunkHangAtStep4.pauseWhileSet(_opCtx); scopedGuard.dismiss(); } @@ -596,7 +598,7 @@ void MigrationSourceManager::commitChunkMetadataOnConfig() { moveChunkHangAtStep6.pauseWhileSet(); } -void MigrationSourceManager::_cleanupOnError() { +void MigrationSourceManager::_cleanupOnError() noexcept { if (_state == kDone) { return; } @@ -612,10 +614,12 @@ void MigrationSourceManager::_cleanupOnError() { _cleanup(true); } -void MigrationSourceManager::abortDueToConflictingIndexOperation(OperationContext* opCtx) { +SharedSemiFuture<void> MigrationSourceManager::abort() { stdx::lock_guard<Client> lk(*_opCtx->getClient()); _opCtx->markKilled(); _stats.countDonorMoveChunkAbortConflictingIndexOperation.addAndFetch(1); + + return _completion.getFuture(); } CollectionMetadata MigrationSourceManager::_getCurrentMetadataAndCheckEpoch() { diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index bc7462fb875..d51d6265867 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -162,7 +162,7 @@ public: * Aborts the migration after observing a concurrent index operation by marking its operation * context as killed. */ - void abortDueToConflictingIndexOperation(OperationContext* opCtx); + SharedSemiFuture<void> abort(); /** * Returns a report on the active migration. @@ -207,7 +207,7 @@ private: * Expected state: Any * Resulting state: kDone */ - void _cleanupOnError(); + void _cleanupOnError() noexcept; // This is the opCtx of the moveChunk request that constructed the MigrationSourceManager. // The caller must guarantee it outlives the MigrationSourceManager. @@ -237,6 +237,10 @@ private: // Utility for constructing detailed logs for the steps of the chunk migration MoveTimingHelper _moveTimingHelper; + // Promise which will be signaled when the migration source manager has finished running and is + // ready to be destroyed + SharedPromise<void> _completion; + // Starts counting from creation time and is used to time various parts from the lifetime of the // move chunk sequence Timer _cloneAndCommitTimer; diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp index f16739f660d..14902d2ef0d 100644 --- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp +++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp @@ -39,6 +39,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/database_sharding_state.h" +#include "mongo/db/s/migration_source_manager.h" #include "mongo/db/s/migration_util.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/resharding/resharding_donor_recipient_common.h" @@ -52,12 +53,12 @@ #include "mongo/util/fail_point.h" namespace mongo { +namespace { MONGO_FAIL_POINT_DEFINE(skipDatabaseVersionMetadataRefresh); MONGO_FAIL_POINT_DEFINE(skipShardFilteringMetadataRefresh); MONGO_FAIL_POINT_DEFINE(hangInRecoverRefreshThread); -namespace { void onDbVersionMismatch(OperationContext* opCtx, const StringData dbName, const DatabaseVersion& clientDbVersion, @@ -149,8 +150,7 @@ SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext // A view can potentially be created after spawning a thread to recover nss's shard // version. It is then ok to lock views in order to clear filtering metadata. // - // DBLock and CollectionLock are used here to avoid throwing further recursive stale - // config errors. + // DBLock and CollectionLock must be used in order to avoid shard version checks Lock::DBLock dbLock(opCtx, nss.db(), MODE_IX); Lock::CollectionLock collLock(opCtx, nss, MODE_IX); @@ -174,6 +174,28 @@ SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext auto currentMetadata = forceGetCurrentMetadata(opCtx, nss); if (currentMetadata.isSharded()) { + // If migrations are disallowed for the namespace, join any migrations which may be + // executing currently + if (!currentMetadata.allowMigrations()) { + boost::optional<SharedSemiFuture<void>> waitForMigrationAbort; + { + // DBLock and CollectionLock must be used in order to avoid shard version + // checks + Lock::DBLock dbLock(opCtx, nss.db(), MODE_IX); + Lock::CollectionLock collLock(opCtx, nss, MODE_IX); + + auto const& csr = CollectionShardingRuntime::get(opCtx, nss); + auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); + if (auto msm = MigrationSourceManager::get(csr, csrLock)) { + waitForMigrationAbort.emplace(msm->abort()); + } + } + + if (waitForMigrationAbort) { + waitForMigrationAbort->get(opCtx); + } + } + // If the collection metadata after a refresh has 'reshardingFields', then pass it // to the resharding subsystem to process. const auto& reshardingFields = currentMetadata.getReshardingFields(); @@ -212,13 +234,13 @@ void onShardVersionMismatch(OperationContext* opCtx, "namespace"_attr = nss, "shardVersionReceived"_attr = shardVersionReceived); - // If we are in a transaction, limit the time we can wait behind the critical section. This is - // needed in order to prevent distributed deadlocks in situations where a DDL operation needs to - // acquire the critical section on several shards. In that case, a shard running a transaction - // could be waiting for the critical section to be exited, while on another shard the - // transaction has already executed some statement and stashed locks which prevent the critical - // section from being acquired in that node. Limiting the wait behind the critical section will - // ensure that the transaction will eventually get aborted. + // If we are in a transaction, limit the time we can wait behind the critical section. This + // is needed in order to prevent distributed deadlocks in situations where a DDL operation + // needs to acquire the critical section on several shards. In that case, a shard running a + // transaction could be waiting for the critical section to be exited, while on another + // shard the transaction has already executed some statement and stashed locks which prevent + // the critical section from being acquired in that node. Limiting the wait behind the + // critical section will ensure that the transaction will eventually get aborted. const auto criticalSectionMaxWait = opCtx->inMultiDocumentTransaction() ? Milliseconds(metadataRefreshInTransactionMaxWaitBehindCritSecMS.load()) : Milliseconds::max(); @@ -244,8 +266,8 @@ void onShardVersionMismatch(OperationContext* opCtx, // Check if the current shard version is fresh enough if (shardVersionReceived) { const auto currentShardVersion = metadata->getShardVersion(); - // Don't need to remotely reload if the requested version is smaller than the known - // one. This means that the remote side is behind. + // Don't need to remotely reload if the requested version is smaller than the + // known one. This means that the remote side is behind. if (shardVersionReceived->isOlderOrEqualThan(currentShardVersion)) { return; } @@ -333,9 +355,9 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx, Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, nss)); if (!cm.isSharded()) { - // DBLock and CollectionLock are used here to avoid throwing further recursive stale config - // errors, as well as a possible InvalidViewDefinition error if an invalid view is in the - // 'system.views' collection. + // DBLock and CollectionLock are used here to avoid throwing further recursive stale + // config errors, as well as a possible InvalidViewDefinition error if an invalid view + // is in the 'system.views' collection. Lock::DBLock dbLock(opCtx, nss.db(), MODE_IX); Lock::CollectionLock collLock(opCtx, nss, MODE_IX); CollectionShardingRuntime::get(opCtx, nss) @@ -344,12 +366,12 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx, return ChunkVersion::UNSHARDED(); } - // Optimistic check with only IS lock in order to avoid threads piling up on the collection X - // lock below + // Optimistic check with only IS lock in order to avoid threads piling up on the collection + // X lock below { - // DBLock and CollectionLock are used here to avoid throwing further recursive stale config - // errors, as well as a possible InvalidViewDefinition error if an invalid view is in the - // 'system.views' collection. + // DBLock and CollectionLock are used here to avoid throwing further recursive stale + // config errors, as well as a possible InvalidViewDefinition error if an invalid view + // is in the 'system.views' collection. Lock::DBLock dbLock(opCtx, nss.db(), MODE_IS); Lock::CollectionLock collLock(opCtx, nss, MODE_IS); auto optMetadata = CollectionShardingRuntime::get(opCtx, nss)->getCurrentMetadataIfKnown(); diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp index cff62e2d7a1..26365cb46ab 100644 --- a/src/mongo/db/s/shard_server_op_observer.cpp +++ b/src/mongo/db/s/shard_server_op_observer.cpp @@ -209,9 +209,9 @@ void incrementChunkOnInsertOrUpdate(OperationContext* opCtx, void abortOngoingMigrationIfNeeded(OperationContext* opCtx, const NamespaceString nss) { auto* const csr = CollectionShardingRuntime::get(opCtx, nss); auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); - auto msm = MigrationSourceManager::get(csr, csrLock); - if (msm) { - msm->abortDueToConflictingIndexOperation(opCtx); + if (auto msm = MigrationSourceManager::get(csr, csrLock)) { + // Only interrupt the migration, but don't actually join + (void)msm->abort(); } } diff --git a/src/mongo/db/s/sharding_ddl_util.cpp b/src/mongo/db/s/sharding_ddl_util.cpp index 09db3271b07..3154339e52c 100644 --- a/src/mongo/db/s/sharding_ddl_util.cpp +++ b/src/mongo/db/s/sharding_ddl_util.cpp @@ -416,7 +416,6 @@ void stopMigrations(OperationContext* opCtx, // version to be bumped), it is safe to be retried. ); - try { uassertStatusOKWithContext( Shard::CommandResponse::getEffectiveStatus(std::move(swSetAllowMigrationsResult)), |