diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2021-12-07 11:08:45 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-12-07 11:33:42 +0000 |
commit | afaf05e8ee1ffee77f33154799e4270449249cf4 (patch) | |
tree | 1ab158daa0e24dcab213c574cbece2903ba0d322 | |
parent | af27a14d78730524f4eaf2e22f3fcf625ddcdc27 (diff) | |
download | mongo-afaf05e8ee1ffee77f33154799e4270449249cf4.tar.gz |
SERVER-61759 Do not expose the cloner directly from the MigrationSourceManager
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 70 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.h | 42 | ||||
-rw-r--r-- | src/mongo/db/s/op_observer_sharding_impl.cpp | 22 |
5 files changed, 91 insertions, 58 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index d2489e0db29..bc827b59b93 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -172,12 +172,11 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam UninterruptibleLockGuard noInterrupt(opCtx->lockState()); auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); - auto msm = MigrationSourceManager::get(csr, csrLock); - if (!msm) { + const auto clonerPtr = MigrationSourceManager::getCurrentCloner(csr, csrLock); + if (!clonerPtr) { continue; } - - auto cloner = dynamic_cast<MigrationChunkClonerSourceLegacy*>(msm->getCloner().get()); + auto* const cloner = dynamic_cast<MigrationChunkClonerSourceLegacy*>(clonerPtr.get()); auto opType = stmt.getOpType(); auto documentKey = getDocumentKeyFromReplOperation(stmt, opType); diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp index 7112be9b31c..f1bd1269c86 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp @@ -78,11 +78,9 @@ public: auto csr = CollectionShardingRuntime::get(opCtx, *nss); auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); - if (auto msm = MigrationSourceManager::get(csr, csrLock)) { - // It is now safe to access the cloner - _chunkCloner = - std::dynamic_pointer_cast<MigrationChunkClonerSourceLegacy, - MigrationChunkClonerSource>(msm->getCloner()); + if (auto cloner = MigrationSourceManager::getCurrentCloner(csr, csrLock)) { + _chunkCloner = std::dynamic_pointer_cast<MigrationChunkClonerSourceLegacy, + MigrationChunkClonerSource>(cloner); invariant(_chunkCloner); } else { uasserted(ErrorCodes::IllegalOperation, diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index f86ebfe738f..7ee42d27e1b 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -115,19 +115,27 @@ MONGO_FAIL_POINT_DEFINE(moveChunkHangAtStep4); MONGO_FAIL_POINT_DEFINE(moveChunkHangAtStep5); MONGO_FAIL_POINT_DEFINE(moveChunkHangAtStep6); -} // namespace - MONGO_FAIL_POINT_DEFINE(doNotRefreshRecipientAfterCommit); MONGO_FAIL_POINT_DEFINE(failMigrationCommit); MONGO_FAIL_POINT_DEFINE(hangBeforeLeavingCriticalSection); MONGO_FAIL_POINT_DEFINE(migrationCommitNetworkError); MONGO_FAIL_POINT_DEFINE(hangBeforePostMigrationCommitRefresh); +} // namespace + MigrationSourceManager* MigrationSourceManager::get(CollectionShardingRuntime* csr, CollectionShardingRuntime::CSRLock& csrLock) { return msmForCsr(csr); } +std::shared_ptr<MigrationChunkClonerSource> MigrationSourceManager::getCurrentCloner( + CollectionShardingRuntime* csr, CollectionShardingRuntime::CSRLock& csrLock) { + auto msm = get(csr, csrLock); + if (!msm) + return nullptr; + return msm->_cloneDriver; +} + MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, MoveChunkRequest request, ConnectionString donorConnStr, @@ -178,8 +186,10 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, UUID collectionUUID = autoColl.getCollection()->uuid(); - auto optMetadata = - CollectionShardingRuntime::get(_opCtx, _args.getNss())->getCurrentMetadataIfKnown(); + auto* const csr = CollectionShardingRuntime::get(_opCtx, _args.getNss()); + const auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr); + + auto optMetadata = csr->getCurrentMetadataIfKnown(); uassert(ErrorCodes::ConflictingOperationInProgress, "The collection's sharding state was cleared by a concurrent operation", optMetadata); @@ -188,10 +198,17 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, uassert(ErrorCodes::IncompatibleShardingMetadata, "Cannot move chunks for an unsharded collection", metadata.isSharded()); + + // Atomically (still under the CSR lock held above) check whether migrations are allowed and + // register the MigrationSourceManager on the CSR. This ensures that interruption due to the + // change of allowMigrations to false will properly serialise and not allow any new MSMs to + // be running after the change. uassert(ErrorCodes::ConflictingOperationInProgress, "Collection is undergoing changes so moveChunk is not allowed.", metadata.allowMigrations()); + _scopedRegisterer.emplace(this, csr, csrLock); + return std::make_tuple(std::move(metadata), std::move(collectionUUID)); }(); @@ -257,13 +274,6 @@ void MigrationSourceManager::startClone() { { const auto metadata = _getCurrentMetadataAndCheckEpoch(); - // Having the metadata manager registered on the collection sharding state is what indicates - // that a chunk on that collection is being migrated. With an active migration, write - // operations require the cloner to be present in order to track changes to the chunk which - // needs to be transmitted to the recipient. - _cloneDriver = std::make_unique<MigrationChunkClonerSourceLegacy>( - _args, metadata.getKeyPattern(), _donorConnStr, _recipientHost); - AutoGetCollection autoColl(_opCtx, _args.getNss(), replEnabled ? MODE_IX : MODE_X, @@ -271,9 +281,15 @@ void MigrationSourceManager::startClone() { _opCtx->getServiceContext()->getPreciseClockSource()->now() + Milliseconds(migrationLockAcquisitionMaxWaitMS.load())); - auto csr = CollectionShardingRuntime::get(_opCtx, _args.getNss()); - auto lockedCsr = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr); - invariant(nullptr == std::exchange(msmForCsr(csr), this)); + auto* const csr = CollectionShardingRuntime::get(_opCtx, _args.getNss()); + const auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr); + + // Having the metadata manager registered on the collection sharding state is what indicates + // that a chunk on that collection is being migrated to the OpObservers. With an active + // migration, write operations require the cloner to be present in order to track changes to + // the chunk which needs to be transmitted to the recipient. + _cloneDriver = std::make_shared<MigrationChunkClonerSourceLegacy>( + _args, metadata.getKeyPattern(), _donorConnStr, _recipientHost); _coordinator.emplace(_cloneDriver->getSessionId(), _args.getFromShardId(), @@ -670,20 +686,12 @@ void MigrationSourceManager::_cleanup(bool completeMigration) noexcept { UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); AutoGetCollection autoColl(_opCtx, _args.getNss(), MODE_IX); auto* const csr = CollectionShardingRuntime::get(_opCtx, _args.getNss()); - auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr); + const auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr); if (_state != kCreated) { - invariant(msmForCsr(csr)); invariant(_cloneDriver); } - // While we are in kCreated, the MigrationSourceManager may or may not be already be - // installed on the CollectionShardingRuntime. - if (_state != kCreated || (_state == kCreated && msmForCsr(csr))) { - auto oldMsmOnCsr = std::exchange(msmForCsr(csr), nullptr); - invariant(this == oldMsmOnCsr); - } - _critSec.reset(); return std::move(_cloneDriver); }(); @@ -775,4 +783,20 @@ BSONObj MigrationSourceManager::getMigrationStatusReport() const { _args.getMaxKey()); } +MigrationSourceManager::ScopedRegisterer::ScopedRegisterer( + MigrationSourceManager* msm, + CollectionShardingRuntime* csr, + const CollectionShardingRuntime::CSRLock& csrLock) + : _msm(msm) { + invariant(nullptr == std::exchange(msmForCsr(csr), msm)); +} + +MigrationSourceManager::ScopedRegisterer::~ScopedRegisterer() { + UninterruptibleLockGuard noInterrupt(_msm->_opCtx->lockState()); + AutoGetCollection autoColl(_msm->_opCtx, _msm->_args.getNss(), MODE_IX); + auto csr = CollectionShardingRuntime::get(_msm->_opCtx, _msm->_args.getNss()); + auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_msm->_opCtx, csr); + invariant(_msm == std::exchange(msmForCsr(csr), nullptr)); +} + } // namespace mongo diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index 7b1b9b825fa..bc7462fb875 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -80,6 +80,15 @@ public: CollectionShardingRuntime::CSRLock& csrLock); /** + * If the currently installed migration has reached the cloning stage (i.e., after startClone), + * returns the cloner currently in use. + * + * Must be called with a both a collection lock and the CSRLock. + */ + static std::shared_ptr<MigrationChunkClonerSource> getCurrentCloner( + CollectionShardingRuntime* csr, CollectionShardingRuntime::CSRLock& csrLock); + + /** * Instantiates a new migration source manager with the specified migration parameters. Must be * called with the distributed lock acquired in advance (not asserted). * @@ -156,17 +165,6 @@ public: void abortDueToConflictingIndexOperation(OperationContext* opCtx); /** - * Returns the cloner which is being used for this migration. This value is available only if - * the migration source manager is currently in the clone phase (i.e. the previous call to - * startClone has succeeded). - * - * Must be called with a both a collection lock and the CSRLock. - */ - std::shared_ptr<MigrationChunkClonerSource> getCloner() const { - return _cloneDriver; - } - - /** * Returns a report on the active migration. * * Must be called with some form of lock on the collection namespace. @@ -246,6 +244,20 @@ private: // The current state. Used only for diagnostics and validation. State _state{kCreated}; + // Responsible for registering and unregistering the MigrationSourceManager from the collection + // sharding runtime for the collection + class ScopedRegisterer { + public: + ScopedRegisterer(MigrationSourceManager* msm, + CollectionShardingRuntime* csr, + const CollectionShardingRuntime::CSRLock& csrLock); + ~ScopedRegisterer(); + + private: + MigrationSourceManager* const _msm; + }; + boost::optional<ScopedRegisterer> _scopedRegisterer; + // The epoch of the collection being migrated and its UUID, as of the time the migration // started. Values are boost::optional only up until the constructor runs, because UUID doesn't // have a default constructor. @@ -255,16 +267,16 @@ private: // The version of the chunk at the time the migration started. boost::optional<ChunkVersion> _chunkVersion; - // Contains logic for ensuring the donor's and recipient's config.rangeDeletions entries are - // correctly updated based on whether the migration committed or aborted. - boost::optional<migrationutil::MigrationCoordinator> _coordinator; - // The chunk cloner source. Only available if there is an active migration going on. To set and // remove it, a collection lock and the CSRLock need to be acquired first in order to block all // logOp calls and then the mutex. To access it, only the mutex is necessary. Available after // cloning stage has completed. std::shared_ptr<MigrationChunkClonerSource> _cloneDriver; + // Contains logic for ensuring the donor's and recipient's config.rangeDeletions entries are + // correctly updated based on whether the migration committed or aborted. + boost::optional<migrationutil::MigrationCoordinator> _coordinator; + // Holds the in-memory critical section for the collection. Only set when migration has reached // the critical section phase. boost::optional<CollectionCriticalSection> _critSec; diff --git a/src/mongo/db/s/op_observer_sharding_impl.cpp b/src/mongo/db/s/op_observer_sharding_impl.cpp index 3f9b17ed138..80503f0920b 100644 --- a/src/mongo/db/s/op_observer_sharding_impl.cpp +++ b/src/mongo/db/s/op_observer_sharding_impl.cpp @@ -73,8 +73,8 @@ void assertIntersectingChunkHasNotMoved(OperationContext* opCtx, bool isMigratingWithCSRLock(CollectionShardingRuntime* csr, CollectionShardingRuntime::CSRLock& csrLock, BSONObj const& docToDelete) { - auto msm = MigrationSourceManager::get(csr, csrLock); - return msm && msm->getCloner()->isDocumentInMigratingChunk(docToDelete); + auto cloner = MigrationSourceManager::getCurrentCloner(csr, csrLock); + return cloner && cloner->isDocumentInMigratingChunk(docToDelete); } void assertMovePrimaryInProgress(OperationContext* opCtx, NamespaceString const& nss) { @@ -151,9 +151,9 @@ void OpObserverShardingImpl::shardObserveInsertOp(OperationContext* opCtx, } auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); - auto msm = MigrationSourceManager::get(csr, csrLock); - if (msm) { - msm->getCloner()->onInsertOp(opCtx, insertedDoc, opTime); + auto cloner = MigrationSourceManager::getCurrentCloner(csr, csrLock); + if (cloner) { + cloner->onInsertOp(opCtx, insertedDoc, opTime); } } @@ -188,9 +188,9 @@ void OpObserverShardingImpl::shardObserveUpdateOp(OperationContext* opCtx, } auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); - auto msm = MigrationSourceManager::get(csr, csrLock); - if (msm) { - msm->getCloner()->onUpdateOp(opCtx, preImageDoc, postImageDoc, opTime, prePostImageOpTime); + auto cloner = MigrationSourceManager::getCurrentCloner(csr, csrLock); + if (cloner) { + cloner->onUpdateOp(opCtx, preImageDoc, postImageDoc, opTime, prePostImageOpTime); } } @@ -224,10 +224,10 @@ void OpObserverShardingImpl::shardObserveDeleteOp(OperationContext* opCtx, } auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); - auto msm = MigrationSourceManager::get(csr, csrLock); + auto cloner = MigrationSourceManager::getCurrentCloner(csr, csrLock); - if (msm && getIsMigrating(opCtx)) { - msm->getCloner()->onDeleteOp(opCtx, documentKey, opTime, preImageOpTime); + if (cloner && getIsMigrating(opCtx)) { + cloner->onDeleteOp(opCtx, documentKey, opTime, preImageOpTime); } } |