diff options
author | Cheahuychou Mao <cheahuychou.mao@mongodb.com> | 2019-12-11 21:46:37 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-12-11 21:46:37 +0000 |
commit | 6dfa4748f017315300a32982b63fe71e8a68d42d (patch) | |
tree | bb089ddc8ccd769339bb1dfc2263f47020db86fe /src | |
parent | 57acc8b666b8c9dfc34eaf03c226ab26ac225781 (diff) | |
download | mongo-6dfa4748f017315300a32982b63fe71e8a68d42d.tar.gz |
SERVER-44911 Make index operations abort concurrent outgoing migrations
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/auth/auth_op_observer.h | 2 | ||||
-rw-r--r-- | src/mongo/db/catalog/multi_index_block.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_op_observer.h | 2 | ||||
-rw-r--r-- | src/mongo/db/op_observer.h | 6 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 31 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.h | 1 | ||||
-rw-r--r-- | src/mongo/db/op_observer_noop.h | 3 | ||||
-rw-r--r-- | src/mongo/db/op_observer_registry.h | 8 | ||||
-rw-r--r-- | src/mongo/db/s/config_server_op_observer.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 186 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.h | 29 | ||||
-rw-r--r-- | src/mongo/db/s/move_chunk_command.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_op_observer.cpp | 45 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_op_observer.h | 8 |
14 files changed, 226 insertions, 132 deletions
diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h index 931cb7e0747..7287876df0b 100644 --- a/src/mongo/db/auth/auth_op_observer.h +++ b/src/mongo/db/auth/auth_op_observer.h @@ -58,6 +58,8 @@ public: const std::vector<BSONObj>& indexes, bool fromMigrate) final {} + void onStartIndexBuildSinglePhase(OperationContext* opCtx, const NamespaceString& nss) final {} + void onCommitIndexBuild(OperationContext* opCtx, const NamespaceString& nss, CollectionUUID collUUID, diff --git a/src/mongo/db/catalog/multi_index_block.cpp b/src/mongo/db/catalog/multi_index_block.cpp index f60bbc40147..6ee21034efc 100644 --- a/src/mongo/db/catalog/multi_index_block.cpp +++ b/src/mongo/db/catalog/multi_index_block.cpp @@ -186,29 +186,7 @@ MultiIndexBlock::OnInitFn MultiIndexBlock::kNoopOnInitFn = MultiIndexBlock::OnInitFn MultiIndexBlock::makeTimestampedIndexOnInitFn(OperationContext* opCtx, const Collection* coll) { return [opCtx, ns = coll->ns()](std::vector<BSONObj>& specs) -> Status { - // This function sets a timestamp for the initial catalog write when beginning an index - // build, if necessary. There are four scenarios: - - // 1. A timestamp is already set -- replication application sets a timestamp ahead of time. - // This could include the phase of initial sync where it applies oplog entries. Also, - // primaries performing an index build via `applyOps` may have a wrapping commit timestamp. - if (!opCtx->recoveryUnit()->getCommitTimestamp().isNull()) - return Status::OK(); - - // 2. If the node is initial syncing, we do not set a timestamp. - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - if (replCoord->isReplEnabled() && replCoord->getMemberState().startup2()) - return Status::OK(); - - // 3. If the index build is on the local database, do not timestamp. - if (ns.isLocal()) - return Status::OK(); - - // 4. All other cases, we generate a timestamp by writing a no-op oplog entry. This is - // better than using a ghost timestamp. Writing an oplog entry ensures this node is - // primary. - opCtx->getServiceContext()->getOpObserver()->onOpMessage( - opCtx, BSON("msg" << std::string(str::stream() << "Creating indexes. Coll: " << ns))); + opCtx->getServiceContext()->getOpObserver()->onStartIndexBuildSinglePhase(opCtx, ns); return Status::OK(); }; } diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h index 00c565d0dea..3220c2b3985 100644 --- a/src/mongo/db/free_mon/free_mon_op_observer.h +++ b/src/mongo/db/free_mon/free_mon_op_observer.h @@ -58,6 +58,8 @@ public: const std::vector<BSONObj>& indexes, bool fromMigrate) final {} + void onStartIndexBuildSinglePhase(OperationContext* opCtx, const NamespaceString& nss) final {} + void onCommitIndexBuild(OperationContext* opCtx, const NamespaceString& nss, CollectionUUID collUUID, diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index acb478fc68c..2c98b853cd9 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -101,6 +101,12 @@ public: const std::vector<BSONObj>& indexes, bool fromMigrate) = 0; + /** + * TODO (SERVER-45017): Remove when v4.4 becomes last-stable. + */ + virtual void onStartIndexBuildSinglePhase(OperationContext* opCtx, + const NamespaceString& nss) = 0; + virtual void onCommitIndexBuild(OperationContext* opCtx, const NamespaceString& nss, CollectionUUID collUUID, diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 1b012b4a8ad..b47dabd1d01 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -281,6 +281,37 @@ void OpObserverImpl::onStartIndexBuild(OperationContext* opCtx, logOperation(opCtx, &oplogEntry); } +void OpObserverImpl::onStartIndexBuildSinglePhase(OperationContext* opCtx, + const NamespaceString& nss) { + // This function sets a timestamp for the initial catalog write when beginning an index + // build, if necessary. There are four scenarios: + + // 1. A timestamp is already set -- replication application sets a timestamp ahead of time. + // This could include the phase of initial sync where it applies oplog entries. Also, + // primaries performing an index build via `applyOps` may have a wrapping commit timestamp. + if (!opCtx->recoveryUnit()->getCommitTimestamp().isNull()) + return; + + // 2. If the node is initial syncing, we do not set a timestamp. + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + if (replCoord->isReplEnabled() && replCoord->getMemberState().startup2()) + return; + + // 3. If the index build is on the local database, do not timestamp. + if (nss.isLocal()) + return; + + // 4. All other cases, we generate a timestamp by writing a no-op oplog entry. This is + // better than using a ghost timestamp. Writing an oplog entry ensures this node is + // primary. + onInternalOpMessage( + opCtx, + {}, + boost::none, + BSON("msg" << std::string(str::stream() << "Creating indexes. Coll: " << nss)), + boost::none); +} + void OpObserverImpl::onCommitIndexBuild(OperationContext* opCtx, const NamespaceString& nss, CollectionUUID collUUID, diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h index 1a512eb7f3a..b2d09b6d58c 100644 --- a/src/mongo/db/op_observer_impl.h +++ b/src/mongo/db/op_observer_impl.h @@ -53,6 +53,7 @@ public: const UUID& indexBuildUUID, const std::vector<BSONObj>& indexes, bool fromMigrate) final; + void onStartIndexBuildSinglePhase(OperationContext* opCtx, const NamespaceString& nss) final; void onCommitIndexBuild(OperationContext* opCtx, const NamespaceString& nss, diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h index b45fe089338..e22a3d9097b 100644 --- a/src/mongo/db/op_observer_noop.h +++ b/src/mongo/db/op_observer_noop.h @@ -48,6 +48,9 @@ public: const std::vector<BSONObj>& indexes, bool fromMigrate) override {} + void onStartIndexBuildSinglePhase(OperationContext* opCtx, + const NamespaceString& nss) override {} + void onCommitIndexBuild(OperationContext* opCtx, const NamespaceString& nss, CollectionUUID collUUID, diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h index 12370e0b561..8da173e1c91 100644 --- a/src/mongo/db/op_observer_registry.h +++ b/src/mongo/db/op_observer_registry.h @@ -80,6 +80,14 @@ public: } } + virtual void onStartIndexBuildSinglePhase(OperationContext* opCtx, + const NamespaceString& nss) override { + ReservedTimes times{opCtx}; + for (auto& o : _observers) { + o->onStartIndexBuildSinglePhase(opCtx, nss); + } + } + virtual void onCommitIndexBuild(OperationContext* opCtx, const NamespaceString& nss, CollectionUUID collUUID, diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h index 89c420b356c..38f31c6caef 100644 --- a/src/mongo/db/s/config_server_op_observer.h +++ b/src/mongo/db/s/config_server_op_observer.h @@ -58,6 +58,9 @@ public: const std::vector<BSONObj>& indexes, bool fromMigrate) override {} + void onStartIndexBuildSinglePhase(OperationContext* opCtx, + const NamespaceString& nss) override {} + void onCommitIndexBuild(OperationContext* opCtx, const NamespaceString& nss, CollectionUUID collUUID, diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 42583b3baaa..31f451fa00d 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -134,11 +134,12 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, MoveChunkRequest request, ConnectionString donorConnStr, HostAndPort recipientHost) - : _args(std::move(request)), + : _opCtx(opCtx), + _args(std::move(request)), _donorConnStr(std::move(donorConnStr)), _recipientHost(std::move(recipientHost)), - _stats(ShardingStatistics::get(opCtx)) { - invariant(!opCtx->lockState()->isLocked()); + _stats(ShardingStatistics::get(_opCtx)) { + invariant(!_opCtx->lockState()->isLocked()); // Disallow moving a chunk to ourselves uassert(ErrorCodes::InvalidOptions, @@ -149,12 +150,12 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, << " with expected collection version epoch " << _args.getVersionEpoch(); // Force refresh of the metadata to ensure we have the latest - forceShardFilteringMetadataRefresh(opCtx, getNss()); + forceShardFilteringMetadataRefresh(_opCtx, getNss()); // Snapshot the committed metadata from the time the migration starts const auto collectionMetadataAndUUID = [&] { - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - AutoGetCollection autoColl(opCtx, getNss(), MODE_IS); + UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); + AutoGetCollection autoColl(_opCtx, getNss(), MODE_IS); uassert(ErrorCodes::InvalidOptions, "cannot move chunks for a collection that doesn't exist", autoColl.getCollection()); @@ -163,7 +164,7 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, collectionUUID = autoColl.getCollection()->uuid(); auto optMetadata = - CollectionShardingState::get(opCtx, getNss())->getCurrentMetadataIfKnown(); + CollectionShardingState::get(_opCtx, getNss())->getCurrentMetadataIfKnown(); uassert(ErrorCodes::ConflictingOperationInProgress, "The collection's sharding state was cleared by a concurrent operation", optMetadata); @@ -215,14 +216,14 @@ NamespaceString MigrationSourceManager::getNss() const { return _args.getNss(); } -Status MigrationSourceManager::startClone(OperationContext* opCtx) { - invariant(!opCtx->lockState()->isLocked()); +Status MigrationSourceManager::startClone() { + invariant(!_opCtx->lockState()->isLocked()); invariant(_state == kCreated); - auto scopedGuard = makeGuard([&] { cleanupOnError(opCtx); }); + auto scopedGuard = makeGuard([&] { cleanupOnError(); }); _stats.countDonorMoveChunkStarted.addAndFetch(1); - const Status logStatus = ShardingLogging::get(opCtx)->logChangeChecked( - opCtx, + const Status logStatus = ShardingLogging::get(_opCtx)->logChangeChecked( + _opCtx, "moveChunk.start", getNss().ns(), BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from" @@ -234,11 +235,11 @@ Status MigrationSourceManager::startClone(OperationContext* opCtx) { _cloneAndCommitTimer.reset(); - auto replCoord = repl::ReplicationCoordinator::get(opCtx); + auto replCoord = repl::ReplicationCoordinator::get(_opCtx); auto replEnabled = replCoord->isReplEnabled(); { - const auto metadata = _getCurrentMetadataAndCheckEpoch(opCtx); + const auto metadata = _getCurrentMetadataAndCheckEpoch(); // Having the metadata manager registered on the collection sharding state is what indicates // that a chunk on that collection is being migrated. With an active migration, write @@ -249,23 +250,23 @@ Status MigrationSourceManager::startClone(OperationContext* opCtx) { boost::optional<AutoGetCollection> autoColl; if (replEnabled) { - autoColl.emplace(opCtx, + autoColl.emplace(_opCtx, getNss(), MODE_IX, AutoGetCollection::ViewMode::kViewsForbidden, - opCtx->getServiceContext()->getPreciseClockSource()->now() + + _opCtx->getServiceContext()->getPreciseClockSource()->now() + Milliseconds(migrationLockAcquisitionMaxWaitMS.load())); } else { - autoColl.emplace(opCtx, + autoColl.emplace(_opCtx, getNss(), MODE_X, AutoGetCollection::ViewMode::kViewsForbidden, - opCtx->getServiceContext()->getPreciseClockSource()->now() + + _opCtx->getServiceContext()->getPreciseClockSource()->now() + Milliseconds(migrationLockAcquisitionMaxWaitMS.load())); } - auto csr = CollectionShardingRuntime::get(opCtx, getNss()); - auto lockedCsr = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr); + auto csr = CollectionShardingRuntime::get(_opCtx, getNss()); + auto lockedCsr = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr); invariant(nullptr == std::exchange(msmForCsr(csr), this)); _state = kCloning; @@ -275,12 +276,12 @@ Status MigrationSourceManager::startClone(OperationContext* opCtx) { auto const readConcernArgs = repl::ReadConcernArgs( replCoord->getMyLastAppliedOpTime(), repl::ReadConcernLevel::kLocalReadConcern); - uassertStatusOK(waitForReadConcern(opCtx, readConcernArgs, false)); + uassertStatusOK(waitForReadConcern(_opCtx, readConcernArgs, false)); setPrepareConflictBehaviorForReadConcern( - opCtx, readConcernArgs, PrepareConflictBehavior::kEnforce); + _opCtx, readConcernArgs, PrepareConflictBehavior::kEnforce); } - Status startCloneStatus = _cloneDriver->startClone(opCtx); + Status startCloneStatus = _cloneDriver->startClone(_opCtx); if (!startCloneStatus.isOK()) { return startCloneStatus; } @@ -289,16 +290,16 @@ Status MigrationSourceManager::startClone(OperationContext* opCtx) { return Status::OK(); } -Status MigrationSourceManager::awaitToCatchUp(OperationContext* opCtx) { - invariant(!opCtx->lockState()->isLocked()); +Status MigrationSourceManager::awaitToCatchUp() { + invariant(!_opCtx->lockState()->isLocked()); invariant(_state == kCloning); - auto scopedGuard = makeGuard([&] { cleanupOnError(opCtx); }); + auto scopedGuard = makeGuard([&] { cleanupOnError(); }); _stats.totalDonorChunkCloneTimeMillis.addAndFetch(_cloneAndCommitTimer.millis()); _cloneAndCommitTimer.reset(); // Block until the cloner deems it appropriate to enter the critical section. Status catchUpStatus = _cloneDriver->awaitUntilCriticalSectionIsAppropriate( - opCtx, kMaxWaitToEnterCriticalSectionTimeout); + _opCtx, kMaxWaitToEnterCriticalSectionTimeout); if (!catchUpStatus.isOK()) { return catchUpStatus; } @@ -308,26 +309,26 @@ Status MigrationSourceManager::awaitToCatchUp(OperationContext* opCtx) { return Status::OK(); } -Status MigrationSourceManager::enterCriticalSection(OperationContext* opCtx) { - invariant(!opCtx->lockState()->isLocked()); +Status MigrationSourceManager::enterCriticalSection() { + invariant(!_opCtx->lockState()->isLocked()); invariant(_state == kCloneCaughtUp); - auto scopedGuard = makeGuard([&] { cleanupOnError(opCtx); }); + auto scopedGuard = makeGuard([&] { cleanupOnError(); }); _stats.totalDonorChunkCloneTimeMillis.addAndFetch(_cloneAndCommitTimer.millis()); _cloneAndCommitTimer.reset(); - _notifyChangeStreamsOnRecipientFirstChunk(opCtx, _getCurrentMetadataAndCheckEpoch(opCtx)); + _notifyChangeStreamsOnRecipientFirstChunk(_getCurrentMetadataAndCheckEpoch()); // Mark the shard as running critical operation, which requires recovery on crash. // // NOTE: The 'migrateChunkToNewShard' oplog message written by the above call to // '_notifyChangeStreamsOnRecipientFirstChunk' depends on this majority write to carry its local // write to majority committed. - Status status = ShardingStateRecovery::startMetadataOp(opCtx); + Status status = ShardingStateRecovery::startMetadataOp(_opCtx); if (!status.isOK()) { return status; } - _critSec.emplace(opCtx, _args.getNss()); + _critSec.emplace(_opCtx, _args.getNss()); _state = kCriticalSection; @@ -338,7 +339,7 @@ Status MigrationSourceManager::enterCriticalSection(OperationContext* opCtx) { // Note: this write must occur after the critSec flag is set, to ensure the secondary refresh // will stall behind the flag. Status signalStatus = updateShardCollectionsEntry( - opCtx, + _opCtx, BSON(ShardCollectionType::kNssFieldName << getNss().ns()), BSONObj(), BSON(ShardCollectionType::kEnterCriticalSectionCounterFieldName << 1), @@ -356,13 +357,13 @@ Status MigrationSourceManager::enterCriticalSection(OperationContext* opCtx) { return Status::OK(); } -Status MigrationSourceManager::commitChunkOnRecipient(OperationContext* opCtx) { - invariant(!opCtx->lockState()->isLocked()); +Status MigrationSourceManager::commitChunkOnRecipient() { + invariant(!_opCtx->lockState()->isLocked()); invariant(_state == kCriticalSection); - auto scopedGuard = makeGuard([&] { cleanupOnError(opCtx); }); + auto scopedGuard = makeGuard([&] { cleanupOnError(); }); // Tell the recipient shard to fetch the latest changes. - auto commitCloneStatus = _cloneDriver->commitClone(opCtx); + auto commitCloneStatus = _cloneDriver->commitClone(_opCtx); if (MONGO_unlikely(failMigrationCommit.shouldFail()) && commitCloneStatus.isOK()) { commitCloneStatus = {ErrorCodes::InternalError, @@ -380,10 +381,10 @@ Status MigrationSourceManager::commitChunkOnRecipient(OperationContext* opCtx) { return Status::OK(); } -Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opCtx) { - invariant(!opCtx->lockState()->isLocked()); +Status MigrationSourceManager::commitChunkMetadataOnConfig() { + invariant(!_opCtx->lockState()->isLocked()); invariant(_state == kCloneCompleted); - auto scopedGuard = makeGuard([&] { cleanupOnError(opCtx); }); + auto scopedGuard = makeGuard([&] { cleanupOnError(); }); // If we have chunks left on the FROM shard, bump the version of one of them as well. This will // change the local collection major version, which indicates to other processes that the chunk @@ -391,7 +392,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC BSONObjBuilder builder; { - const auto metadata = _getCurrentMetadataAndCheckEpoch(opCtx); + const auto metadata = _getCurrentMetadataAndCheckEpoch(); ChunkType migratedChunkType; migratedChunkType.setMin(_args.getMinKey()); @@ -404,7 +405,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC _args.getToShardId(), migratedChunkType, metadata->getCollVersion(), - LogicalClock::get(opCtx)->getClusterTime().asTimestamp()); + LogicalClock::get(_opCtx)->getClusterTime().asTimestamp()); builder.append(kWriteConcernField, kMajorityWriteConcern.toBSON()); } @@ -416,8 +417,8 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC Timer t; auto commitChunkMigrationResponse = - Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( - opCtx, + Grid::get(_opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( + _opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "admin", builder.obj(), @@ -439,8 +440,8 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC "against the config server to obtain its latest optime" << causedBy(redact(migrationCommitStatus)); - Status status = ShardingLogging::get(opCtx)->logChangeChecked( - opCtx, + Status status = ShardingLogging::get(_opCtx)->logChangeChecked( + _opCtx, "moveChunk.validating", getNss().ns(), BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from" @@ -462,10 +463,10 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC // metadata for this collection, forcing subsequent callers to do a full refresh. Check if // this node can accept writes for this collection as a proxy for it being primary. if (!status.isOK()) { - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - AutoGetCollection autoColl(opCtx, getNss(), MODE_IX); - if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, getNss())) { - CollectionShardingRuntime::get(opCtx, getNss())->clearFilteringMetadata(); + UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); + AutoGetCollection autoColl(_opCtx, getNss(), MODE_IX); + if (!repl::ReplicationCoordinator::get(_opCtx)->canAcceptWritesFor(_opCtx, getNss())) { + CollectionShardingRuntime::get(_opCtx, getNss())->clearFilteringMetadata(); uassertStatusOK(status.withContext( str::stream() << "Unable to verify migration commit for chunk: " << redact(_args.toString()) @@ -488,12 +489,12 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC // section. It is okay if the refresh fails because that will cause the metadata to be cleared // and subsequent callers will try to do a full refresh. try { - forceShardFilteringMetadataRefresh(opCtx, getNss(), true); + forceShardFilteringMetadataRefresh(_opCtx, getNss(), true); } catch (const DBException& ex) { - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - AutoGetCollection autoColl(opCtx, getNss(), MODE_IX); + UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); + AutoGetCollection autoColl(_opCtx, getNss(), MODE_IX); - CollectionShardingRuntime::get(opCtx, getNss())->clearFilteringMetadata(); + CollectionShardingRuntime::get(_opCtx, getNss())->clearFilteringMetadata(); log() << "Failed to refresh metadata after a " << (migrationCommitStatus.isOK() ? "failed commit attempt" : "successful commit") @@ -509,7 +510,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC << ex.toString() << "' after commit failed"); } - const auto refreshedMetadata = _getCurrentMetadataAndCheckEpoch(opCtx); + const auto refreshedMetadata = _getCurrentMetadataAndCheckEpoch(); if (refreshedMetadata->keyBelongsToMe(_args.getMinKey())) { // This condition may only happen if the migration commit has failed for any reason @@ -517,7 +518,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC severe() << "The migration commit succeeded, but the new chunk placement was not " "reflected after metadata refresh, which is an indication of an " "afterOpTime bug."; - severe() << "The current config server opTime is " << Grid::get(opCtx)->configOpTime(); + severe() << "The current config server opTime is " << Grid::get(_opCtx)->configOpTime(); severe() << "The commit response came from " << redact(commitChunkMigrationResponse.getValue().hostAndPort->toString()) << " and contained"; @@ -543,10 +544,10 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC // Exit the critical section and ensure that all the necessary state is fully persisted before // scheduling orphan cleanup. - _cleanup(opCtx); + _cleanup(); - ShardingLogging::get(opCtx)->logChange( - opCtx, + ShardingLogging::get(_opCtx)->logChange( + _opCtx, "moveChunk.commit", getNss().ns(), BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from" @@ -559,14 +560,14 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC auto notification = [&] { auto const whenToClean = _args.getWaitForDelete() ? CollectionShardingRuntime::kNow : CollectionShardingRuntime::kDelayed; - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - AutoGetCollection autoColl(opCtx, getNss(), MODE_IS); - return CollectionShardingRuntime::get(opCtx, getNss())->cleanUpRange(range, whenToClean); + UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); + AutoGetCollection autoColl(_opCtx, getNss(), MODE_IS); + return CollectionShardingRuntime::get(_opCtx, getNss())->cleanUpRange(range, whenToClean); }(); if (!MONGO_unlikely(doNotRefreshRecipientAfterCommit.shouldFail())) { // Best-effort make the recipient refresh its routing table to the new collection version. - refreshRecipientRoutingTable(opCtx, + refreshRecipientRoutingTable(_opCtx, getNss(), _args.getToShardId(), _recipientHost, @@ -580,7 +581,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC if (_args.getWaitForDelete()) { log() << "Waiting for cleanup of " << getNss().ns() << " range " << redact(range.toString()); - auto deleteStatus = notification.waitStatus(opCtx); + auto deleteStatus = notification.waitStatus(_opCtx); if (!deleteStatus.isOK()) { return {ErrorCodes::OrphanedRangeCleanUpFailed, orphanedRangeCleanUpErrMsg + redact(deleteStatus)}; @@ -588,9 +589,9 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC return Status::OK(); } - if (notification.ready() && !notification.waitStatus(opCtx).isOK()) { + if (notification.ready() && !notification.waitStatus(_opCtx).isOK()) { return {ErrorCodes::OrphanedRangeCleanUpFailed, - orphanedRangeCleanUpErrMsg + redact(notification.waitStatus(opCtx))}; + orphanedRangeCleanUpErrMsg + redact(notification.waitStatus(_opCtx))}; } else { log() << "Leaving cleanup of " << getNss().ns() << " range " << redact(range.toString()) << " to complete in background"; @@ -600,13 +601,13 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC return Status::OK(); } -void MigrationSourceManager::cleanupOnError(OperationContext* opCtx) { +void MigrationSourceManager::cleanupOnError() { if (_state == kDone) { return; } - ShardingLogging::get(opCtx)->logChange( - opCtx, + ShardingLogging::get(_opCtx)->logChange( + _opCtx, "moveChunk.error", getNss().ns(), BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from" @@ -614,19 +615,22 @@ void MigrationSourceManager::cleanupOnError(OperationContext* opCtx) { ShardingCatalogClient::kMajorityWriteConcern); try { - _cleanup(opCtx); + _cleanup(); } catch (const ExceptionForCat<ErrorCategory::NotMasterError>& ex) { warning() << "Failed to clean up migration: " << redact(_args.toString()) << "due to: " << redact(ex); } } -ScopedCollectionMetadata MigrationSourceManager::_getCurrentMetadataAndCheckEpoch( - OperationContext* opCtx) { +void MigrationSourceManager::abortDueToConflictingIndexOperation() { + _opCtx->markKilled(); +} + +ScopedCollectionMetadata MigrationSourceManager::_getCurrentMetadataAndCheckEpoch() { auto metadata = [&] { - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - AutoGetCollection autoColl(opCtx, getNss(), MODE_IS); - auto* const css = CollectionShardingRuntime::get(opCtx, getNss()); + UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); + AutoGetCollection autoColl(_opCtx, getNss(), MODE_IS); + auto* const css = CollectionShardingRuntime::get(_opCtx, getNss()); const auto optMetadata = css->getCurrentMetadataIfKnown(); uassert(ErrorCodes::ConflictingOperationInProgress, @@ -647,7 +651,7 @@ ScopedCollectionMetadata MigrationSourceManager::_getCurrentMetadataAndCheckEpoc } void MigrationSourceManager::_notifyChangeStreamsOnRecipientFirstChunk( - OperationContext* opCtx, const ScopedCollectionMetadata& metadata) { + const ScopedCollectionMetadata& metadata) { // If this is not the first donation, there is nothing to be done if (metadata->getChunkManager()->getVersion(_args.getToShardId()).isSet()) return; @@ -662,28 +666,28 @@ void MigrationSourceManager::_notifyChangeStreamsOnRecipientFirstChunk( << "from" << _args.getFromShardId() << "to" << _args.getToShardId()); - auto const serviceContext = opCtx->getClient()->getServiceContext(); + auto const serviceContext = _opCtx->getClient()->getServiceContext(); - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - AutoGetCollection autoColl(opCtx, NamespaceString::kRsOplogNamespace, MODE_IX); + UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); + AutoGetCollection autoColl(_opCtx, NamespaceString::kRsOplogNamespace, MODE_IX); writeConflictRetry( - opCtx, "migrateChunkToNewShard", NamespaceString::kRsOplogNamespace.ns(), [&] { - WriteUnitOfWork uow(opCtx); + _opCtx, "migrateChunkToNewShard", NamespaceString::kRsOplogNamespace.ns(), [&] { + WriteUnitOfWork uow(_opCtx); serviceContext->getOpObserver()->onInternalOpMessage( - opCtx, getNss(), _collectionUuid, BSON("msg" << dbgMessage), o2Message); + _opCtx, getNss(), _collectionUuid, BSON("msg" << dbgMessage), o2Message); uow.commit(); }); } -void MigrationSourceManager::_cleanup(OperationContext* opCtx) { +void MigrationSourceManager::_cleanup() { invariant(_state != kDone); auto cloneDriver = [&]() { // Unregister from the collection's sharding state and exit the migration critical section. - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - AutoGetCollection autoColl(opCtx, getNss(), MODE_IX); - auto* const csr = CollectionShardingRuntime::get(opCtx, getNss()); - auto csrLock = CollectionShardingState::CSRLock::lockExclusive(opCtx, csr); + UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); + AutoGetCollection autoColl(_opCtx, getNss(), MODE_IX); + auto* const csr = CollectionShardingRuntime::get(_opCtx, getNss()); + auto csrLock = CollectionShardingState::CSRLock::lockExclusive(_opCtx, csr); if (_state != kCreated) { invariant(msmForCsr(csr)); @@ -705,7 +709,7 @@ void MigrationSourceManager::_cleanup(OperationContext* opCtx) { // outside of the collection X lock if (cloneDriver) { - cloneDriver->cancelClone(opCtx); + cloneDriver->cancelClone(_opCtx); } if (_state == kCriticalSection || _state == kCloneCompleted) { @@ -723,11 +727,11 @@ void MigrationSourceManager::_cleanup(OperationContext* opCtx) { // possible that the persisted metadata is rolled back after step down, but the write which // cleared the 'inMigration' flag is not, a secondary node will report itself at an older // shard version. - CatalogCacheLoader::get(opCtx).waitForCollectionFlush(opCtx, getNss()); + CatalogCacheLoader::get(_opCtx).waitForCollectionFlush(_opCtx, getNss()); // Clear the 'minOpTime recovery' document so that the next time a node from this shard // becomes a primary, it won't have to recover the config server optime. - ShardingStateRecovery::endMetadataOp(opCtx); + ShardingStateRecovery::endMetadataOp(_opCtx); } _state = kDone; diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index cf0a14fd9df..fbb0c73fab5 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -113,7 +113,7 @@ public: * Expected state: kCreated * Resulting state: kCloning on success, kDone on failure */ - Status startClone(OperationContext* opCtx); + Status startClone(); /** * Waits for the cloning to catch up sufficiently so we won't have to stay in the critical @@ -123,7 +123,7 @@ public: * Expected state: kCloning * Resulting state: kCloneCaughtUp on success, kDone on failure */ - Status awaitToCatchUp(OperationContext* opCtx); + Status awaitToCatchUp(); /** * Waits for the active clone operation to catch up and enters critical section. Once this call @@ -134,7 +134,7 @@ public: * Expected state: kCloneCaughtUp * Resulting state: kCriticalSection on success, kDone on failure */ - Status enterCriticalSection(OperationContext* opCtx); + Status enterCriticalSection(); /** * Tells the recipient of the chunk to commit the chunk contents, which it received. @@ -142,7 +142,7 @@ public: * Expected state: kCriticalSection * Resulting state: kCloneCompleted on success, kDone on failure */ - Status commitChunkOnRecipient(OperationContext* opCtx); + Status commitChunkOnRecipient(); /** * Tells the recipient shard to fetch the latest portion of data from the donor and to commit it @@ -156,7 +156,7 @@ public: * Expected state: kCloneCompleted * Resulting state: kDone */ - Status commitChunkMetadataOnConfig(OperationContext* opCtx); + Status commitChunkMetadataOnConfig(); /** * May be called at any time. Unregisters the migration source manager from the collection, @@ -166,7 +166,13 @@ public: * Expected state: Any * Resulting state: kDone */ - void cleanupOnError(OperationContext* opCtx); + void cleanupOnError(); + + /** + * Aborts the migration after observing a concurrent index operation by marking its operation + * context as killed. + */ + void abortDueToConflictingIndexOperation(); /** * Returns the cloner which is being used for this migration. This value is available only if @@ -191,21 +197,24 @@ private: // comments explaining the various state transitions. enum State { kCreated, kCloning, kCloneCaughtUp, kCriticalSection, kCloneCompleted, kDone }; - ScopedCollectionMetadata _getCurrentMetadataAndCheckEpoch(OperationContext* opCtx); + ScopedCollectionMetadata _getCurrentMetadataAndCheckEpoch(); /** * If this donation moves the first chunk to the recipient (i.e., the recipient didn't have any * chunks), this function writes a no-op message to the oplog, so that change stream will notice * that and close the cursor in order to notify mongos to target the new shard as well. */ - void _notifyChangeStreamsOnRecipientFirstChunk(OperationContext* opCtx, - const ScopedCollectionMetadata& metadata); + void _notifyChangeStreamsOnRecipientFirstChunk(const ScopedCollectionMetadata& metadata); /** * Called when any of the states fails. May only be called once and will put the migration * manager into the kDone state. */ - void _cleanup(OperationContext* opCtx); + void _cleanup(); + + // This is the opCtx of the moveChunk request that constructed the MigrationSourceManager. + // The caller must guarantee it outlives the MigrationSourceManager. + OperationContext* const _opCtx; // The parameters to the moveChunk command const MoveChunkRequest _args; diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp index 8afc52f0a7f..1fa019e7ebd 100644 --- a/src/mongo/db/s/move_chunk_command.cpp +++ b/src/mongo/db/s/move_chunk_command.cpp @@ -225,20 +225,20 @@ private: moveTimingHelper.done(2); moveChunkHangAtStep2.pauseWhileSet(); - uassertStatusOKWithWarning(migrationSourceManager.startClone(opCtx)); + uassertStatusOKWithWarning(migrationSourceManager.startClone()); moveTimingHelper.done(3); moveChunkHangAtStep3.pauseWhileSet(); - uassertStatusOKWithWarning(migrationSourceManager.awaitToCatchUp(opCtx)); + uassertStatusOKWithWarning(migrationSourceManager.awaitToCatchUp()); moveTimingHelper.done(4); moveChunkHangAtStep4.pauseWhileSet(); - uassertStatusOKWithWarning(migrationSourceManager.enterCriticalSection(opCtx)); - uassertStatusOKWithWarning(migrationSourceManager.commitChunkOnRecipient(opCtx)); + uassertStatusOKWithWarning(migrationSourceManager.enterCriticalSection()); + uassertStatusOKWithWarning(migrationSourceManager.commitChunkOnRecipient()); moveTimingHelper.done(5); moveChunkHangAtStep5.pauseWhileSet(); - uassertStatusOKWithWarning(migrationSourceManager.commitChunkMetadataOnConfig(opCtx)); + uassertStatusOKWithWarning(migrationSourceManager.commitChunkMetadataOnConfig()); moveTimingHelper.done(6); moveChunkHangAtStep6.pauseWhileSet(); } diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp index f8df5dc8e6b..b58479cbcde 100644 --- a/src/mongo/db/s/shard_server_op_observer.cpp +++ b/src/mongo/db/s/shard_server_op_observer.cpp @@ -188,6 +188,19 @@ void incrementChunkOnInsertOrUpdate(OperationContext* opCtx, } } +/** + * Aborts any ongoing migration for the given namespace. Should only be called when observing index + * operations. + */ +void abortOngoingMigration(OperationContext* opCtx, const NamespaceString nss) { + auto* const csr = CollectionShardingRuntime::get(opCtx, nss); + auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); + auto msm = MigrationSourceManager::get(csr, csrLock); + if (msm) { + msm->abortDueToConflictingIndexOperation(); + } +} + } // namespace ShardServerOpObserver::ShardServerOpObserver() = default; @@ -429,4 +442,36 @@ repl::OpTime ShardServerOpObserver::onDropCollection(OperationContext* opCtx, return {}; } +void ShardServerOpObserver::onStartIndexBuild(OperationContext* opCtx, + const NamespaceString& nss, + CollectionUUID collUUID, + const UUID& indexBuildUUID, + const std::vector<BSONObj>& indexes, + bool fromMigrate) { + abortOngoingMigration(opCtx, nss); +}; + +void ShardServerOpObserver::onStartIndexBuildSinglePhase(OperationContext* opCtx, + const NamespaceString& nss) { + abortOngoingMigration(opCtx, nss); +} + +void ShardServerOpObserver::onDropIndex(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + const std::string& indexName, + const BSONObj& indexInfo) { + abortOngoingMigration(opCtx, nss); +}; + +void ShardServerOpObserver::onCollMod(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + const BSONObj& collModCmd, + const CollectionOptions& oldCollOptions, + boost::optional<TTLCollModInfo> ttlInfo) { + abortOngoingMigration(opCtx, nss); +}; + + } // namespace mongo diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h index 08822c4e33a..b834fb751f7 100644 --- a/src/mongo/db/s/shard_server_op_observer.h +++ b/src/mongo/db/s/shard_server_op_observer.h @@ -57,7 +57,9 @@ public: CollectionUUID collUUID, const UUID& indexBuildUUID, const std::vector<BSONObj>& indexes, - bool fromMigrate) override {} + bool fromMigrate) override; + + void onStartIndexBuildSinglePhase(OperationContext* opCtx, const NamespaceString& nss) override; void onCommitIndexBuild(OperationContext* opCtx, const NamespaceString& nss, @@ -112,7 +114,7 @@ public: OptionalCollectionUUID uuid, const BSONObj& collModCmd, const CollectionOptions& oldCollOptions, - boost::optional<TTLCollModInfo> ttlInfo) override {} + boost::optional<TTLCollModInfo> ttlInfo) override; void onDropDatabase(OperationContext* opCtx, const std::string& dbName) override {} @@ -126,7 +128,7 @@ public: const NamespaceString& nss, OptionalCollectionUUID uuid, const std::string& indexName, - const BSONObj& indexInfo) override {} + const BSONObj& indexInfo) override; void onRenameCollection(OperationContext* opCtx, const NamespaceString& fromCollection, |