summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2021-12-07 11:08:45 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-12-07 11:33:42 +0000
commitafaf05e8ee1ffee77f33154799e4270449249cf4 (patch)
tree1ab158daa0e24dcab213c574cbece2903ba0d322
parentaf27a14d78730524f4eaf2e22f3fcf625ddcdc27 (diff)
downloadmongo-afaf05e8ee1ffee77f33154799e4270449249cf4.tar.gz
SERVER-61759 Do not expose the cloner directly from the MigrationSourceManager
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp7
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp8
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp70
-rw-r--r--src/mongo/db/s/migration_source_manager.h42
-rw-r--r--src/mongo/db/s/op_observer_sharding_impl.cpp22
5 files changed, 91 insertions, 58 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index d2489e0db29..bc827b59b93 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -172,12 +172,11 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr);
- auto msm = MigrationSourceManager::get(csr, csrLock);
- if (!msm) {
+ const auto clonerPtr = MigrationSourceManager::getCurrentCloner(csr, csrLock);
+ if (!clonerPtr) {
continue;
}
-
- auto cloner = dynamic_cast<MigrationChunkClonerSourceLegacy*>(msm->getCloner().get());
+ auto* const cloner = dynamic_cast<MigrationChunkClonerSourceLegacy*>(clonerPtr.get());
auto opType = stmt.getOpType();
auto documentKey = getDocumentKeyFromReplOperation(stmt, opType);
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
index 7112be9b31c..f1bd1269c86 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
@@ -78,11 +78,9 @@ public:
auto csr = CollectionShardingRuntime::get(opCtx, *nss);
auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr);
- if (auto msm = MigrationSourceManager::get(csr, csrLock)) {
- // It is now safe to access the cloner
- _chunkCloner =
- std::dynamic_pointer_cast<MigrationChunkClonerSourceLegacy,
- MigrationChunkClonerSource>(msm->getCloner());
+ if (auto cloner = MigrationSourceManager::getCurrentCloner(csr, csrLock)) {
+ _chunkCloner = std::dynamic_pointer_cast<MigrationChunkClonerSourceLegacy,
+ MigrationChunkClonerSource>(cloner);
invariant(_chunkCloner);
} else {
uasserted(ErrorCodes::IllegalOperation,
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index f86ebfe738f..7ee42d27e1b 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -115,19 +115,27 @@ MONGO_FAIL_POINT_DEFINE(moveChunkHangAtStep4);
MONGO_FAIL_POINT_DEFINE(moveChunkHangAtStep5);
MONGO_FAIL_POINT_DEFINE(moveChunkHangAtStep6);
-} // namespace
-
MONGO_FAIL_POINT_DEFINE(doNotRefreshRecipientAfterCommit);
MONGO_FAIL_POINT_DEFINE(failMigrationCommit);
MONGO_FAIL_POINT_DEFINE(hangBeforeLeavingCriticalSection);
MONGO_FAIL_POINT_DEFINE(migrationCommitNetworkError);
MONGO_FAIL_POINT_DEFINE(hangBeforePostMigrationCommitRefresh);
+} // namespace
+
MigrationSourceManager* MigrationSourceManager::get(CollectionShardingRuntime* csr,
CollectionShardingRuntime::CSRLock& csrLock) {
return msmForCsr(csr);
}
+std::shared_ptr<MigrationChunkClonerSource> MigrationSourceManager::getCurrentCloner(
+ CollectionShardingRuntime* csr, CollectionShardingRuntime::CSRLock& csrLock) {
+ auto msm = get(csr, csrLock);
+ if (!msm)
+ return nullptr;
+ return msm->_cloneDriver;
+}
+
MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx,
MoveChunkRequest request,
ConnectionString donorConnStr,
@@ -178,8 +186,10 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx,
UUID collectionUUID = autoColl.getCollection()->uuid();
- auto optMetadata =
- CollectionShardingRuntime::get(_opCtx, _args.getNss())->getCurrentMetadataIfKnown();
+ auto* const csr = CollectionShardingRuntime::get(_opCtx, _args.getNss());
+ const auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr);
+
+ auto optMetadata = csr->getCurrentMetadataIfKnown();
uassert(ErrorCodes::ConflictingOperationInProgress,
"The collection's sharding state was cleared by a concurrent operation",
optMetadata);
@@ -188,10 +198,17 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx,
uassert(ErrorCodes::IncompatibleShardingMetadata,
"Cannot move chunks for an unsharded collection",
metadata.isSharded());
+
+ // Atomically (still under the CSR lock held above) check whether migrations are allowed and
+ // register the MigrationSourceManager on the CSR. This ensures that interruption due to the
+ // change of allowMigrations to false will properly serialise and not allow any new MSMs to
+ // be running after the change.
uassert(ErrorCodes::ConflictingOperationInProgress,
"Collection is undergoing changes so moveChunk is not allowed.",
metadata.allowMigrations());
+ _scopedRegisterer.emplace(this, csr, csrLock);
+
return std::make_tuple(std::move(metadata), std::move(collectionUUID));
}();
@@ -257,13 +274,6 @@ void MigrationSourceManager::startClone() {
{
const auto metadata = _getCurrentMetadataAndCheckEpoch();
- // Having the metadata manager registered on the collection sharding state is what indicates
- // that a chunk on that collection is being migrated. With an active migration, write
- // operations require the cloner to be present in order to track changes to the chunk which
- // needs to be transmitted to the recipient.
- _cloneDriver = std::make_unique<MigrationChunkClonerSourceLegacy>(
- _args, metadata.getKeyPattern(), _donorConnStr, _recipientHost);
-
AutoGetCollection autoColl(_opCtx,
_args.getNss(),
replEnabled ? MODE_IX : MODE_X,
@@ -271,9 +281,15 @@ void MigrationSourceManager::startClone() {
_opCtx->getServiceContext()->getPreciseClockSource()->now() +
Milliseconds(migrationLockAcquisitionMaxWaitMS.load()));
- auto csr = CollectionShardingRuntime::get(_opCtx, _args.getNss());
- auto lockedCsr = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr);
- invariant(nullptr == std::exchange(msmForCsr(csr), this));
+ auto* const csr = CollectionShardingRuntime::get(_opCtx, _args.getNss());
+ const auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr);
+
+ // Having the metadata manager registered on the collection sharding state is what indicates
+ // that a chunk on that collection is being migrated to the OpObservers. With an active
+ // migration, write operations require the cloner to be present in order to track changes to
+ // the chunk which needs to be transmitted to the recipient.
+ _cloneDriver = std::make_shared<MigrationChunkClonerSourceLegacy>(
+ _args, metadata.getKeyPattern(), _donorConnStr, _recipientHost);
_coordinator.emplace(_cloneDriver->getSessionId(),
_args.getFromShardId(),
@@ -670,20 +686,12 @@ void MigrationSourceManager::_cleanup(bool completeMigration) noexcept {
UninterruptibleLockGuard noInterrupt(_opCtx->lockState());
AutoGetCollection autoColl(_opCtx, _args.getNss(), MODE_IX);
auto* const csr = CollectionShardingRuntime::get(_opCtx, _args.getNss());
- auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr);
+ const auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr);
if (_state != kCreated) {
- invariant(msmForCsr(csr));
invariant(_cloneDriver);
}
- // While we are in kCreated, the MigrationSourceManager may or may not be already be
- // installed on the CollectionShardingRuntime.
- if (_state != kCreated || (_state == kCreated && msmForCsr(csr))) {
- auto oldMsmOnCsr = std::exchange(msmForCsr(csr), nullptr);
- invariant(this == oldMsmOnCsr);
- }
-
_critSec.reset();
return std::move(_cloneDriver);
}();
@@ -775,4 +783,20 @@ BSONObj MigrationSourceManager::getMigrationStatusReport() const {
_args.getMaxKey());
}
+MigrationSourceManager::ScopedRegisterer::ScopedRegisterer(
+ MigrationSourceManager* msm,
+ CollectionShardingRuntime* csr,
+ const CollectionShardingRuntime::CSRLock& csrLock)
+ : _msm(msm) {
+ invariant(nullptr == std::exchange(msmForCsr(csr), msm));
+}
+
+MigrationSourceManager::ScopedRegisterer::~ScopedRegisterer() {
+ UninterruptibleLockGuard noInterrupt(_msm->_opCtx->lockState());
+ AutoGetCollection autoColl(_msm->_opCtx, _msm->_args.getNss(), MODE_IX);
+ auto csr = CollectionShardingRuntime::get(_msm->_opCtx, _msm->_args.getNss());
+ auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_msm->_opCtx, csr);
+ invariant(_msm == std::exchange(msmForCsr(csr), nullptr));
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h
index 7b1b9b825fa..bc7462fb875 100644
--- a/src/mongo/db/s/migration_source_manager.h
+++ b/src/mongo/db/s/migration_source_manager.h
@@ -80,6 +80,15 @@ public:
CollectionShardingRuntime::CSRLock& csrLock);
/**
+ * If the currently installed migration has reached the cloning stage (i.e., after startClone),
+ * returns the cloner currently in use.
+ *
+ * Must be called with a both a collection lock and the CSRLock.
+ */
+ static std::shared_ptr<MigrationChunkClonerSource> getCurrentCloner(
+ CollectionShardingRuntime* csr, CollectionShardingRuntime::CSRLock& csrLock);
+
+ /**
* Instantiates a new migration source manager with the specified migration parameters. Must be
* called with the distributed lock acquired in advance (not asserted).
*
@@ -156,17 +165,6 @@ public:
void abortDueToConflictingIndexOperation(OperationContext* opCtx);
/**
- * Returns the cloner which is being used for this migration. This value is available only if
- * the migration source manager is currently in the clone phase (i.e. the previous call to
- * startClone has succeeded).
- *
- * Must be called with a both a collection lock and the CSRLock.
- */
- std::shared_ptr<MigrationChunkClonerSource> getCloner() const {
- return _cloneDriver;
- }
-
- /**
* Returns a report on the active migration.
*
* Must be called with some form of lock on the collection namespace.
@@ -246,6 +244,20 @@ private:
// The current state. Used only for diagnostics and validation.
State _state{kCreated};
+ // Responsible for registering and unregistering the MigrationSourceManager from the collection
+ // sharding runtime for the collection
+ class ScopedRegisterer {
+ public:
+ ScopedRegisterer(MigrationSourceManager* msm,
+ CollectionShardingRuntime* csr,
+ const CollectionShardingRuntime::CSRLock& csrLock);
+ ~ScopedRegisterer();
+
+ private:
+ MigrationSourceManager* const _msm;
+ };
+ boost::optional<ScopedRegisterer> _scopedRegisterer;
+
// The epoch of the collection being migrated and its UUID, as of the time the migration
// started. Values are boost::optional only up until the constructor runs, because UUID doesn't
// have a default constructor.
@@ -255,16 +267,16 @@ private:
// The version of the chunk at the time the migration started.
boost::optional<ChunkVersion> _chunkVersion;
- // Contains logic for ensuring the donor's and recipient's config.rangeDeletions entries are
- // correctly updated based on whether the migration committed or aborted.
- boost::optional<migrationutil::MigrationCoordinator> _coordinator;
-
// The chunk cloner source. Only available if there is an active migration going on. To set and
// remove it, a collection lock and the CSRLock need to be acquired first in order to block all
// logOp calls and then the mutex. To access it, only the mutex is necessary. Available after
// cloning stage has completed.
std::shared_ptr<MigrationChunkClonerSource> _cloneDriver;
+ // Contains logic for ensuring the donor's and recipient's config.rangeDeletions entries are
+ // correctly updated based on whether the migration committed or aborted.
+ boost::optional<migrationutil::MigrationCoordinator> _coordinator;
+
// Holds the in-memory critical section for the collection. Only set when migration has reached
// the critical section phase.
boost::optional<CollectionCriticalSection> _critSec;
diff --git a/src/mongo/db/s/op_observer_sharding_impl.cpp b/src/mongo/db/s/op_observer_sharding_impl.cpp
index 3f9b17ed138..80503f0920b 100644
--- a/src/mongo/db/s/op_observer_sharding_impl.cpp
+++ b/src/mongo/db/s/op_observer_sharding_impl.cpp
@@ -73,8 +73,8 @@ void assertIntersectingChunkHasNotMoved(OperationContext* opCtx,
bool isMigratingWithCSRLock(CollectionShardingRuntime* csr,
CollectionShardingRuntime::CSRLock& csrLock,
BSONObj const& docToDelete) {
- auto msm = MigrationSourceManager::get(csr, csrLock);
- return msm && msm->getCloner()->isDocumentInMigratingChunk(docToDelete);
+ auto cloner = MigrationSourceManager::getCurrentCloner(csr, csrLock);
+ return cloner && cloner->isDocumentInMigratingChunk(docToDelete);
}
void assertMovePrimaryInProgress(OperationContext* opCtx, NamespaceString const& nss) {
@@ -151,9 +151,9 @@ void OpObserverShardingImpl::shardObserveInsertOp(OperationContext* opCtx,
}
auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr);
- auto msm = MigrationSourceManager::get(csr, csrLock);
- if (msm) {
- msm->getCloner()->onInsertOp(opCtx, insertedDoc, opTime);
+ auto cloner = MigrationSourceManager::getCurrentCloner(csr, csrLock);
+ if (cloner) {
+ cloner->onInsertOp(opCtx, insertedDoc, opTime);
}
}
@@ -188,9 +188,9 @@ void OpObserverShardingImpl::shardObserveUpdateOp(OperationContext* opCtx,
}
auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr);
- auto msm = MigrationSourceManager::get(csr, csrLock);
- if (msm) {
- msm->getCloner()->onUpdateOp(opCtx, preImageDoc, postImageDoc, opTime, prePostImageOpTime);
+ auto cloner = MigrationSourceManager::getCurrentCloner(csr, csrLock);
+ if (cloner) {
+ cloner->onUpdateOp(opCtx, preImageDoc, postImageDoc, opTime, prePostImageOpTime);
}
}
@@ -224,10 +224,10 @@ void OpObserverShardingImpl::shardObserveDeleteOp(OperationContext* opCtx,
}
auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr);
- auto msm = MigrationSourceManager::get(csr, csrLock);
+ auto cloner = MigrationSourceManager::getCurrentCloner(csr, csrLock);
- if (msm && getIsMigrating(opCtx)) {
- msm->getCloner()->onDeleteOp(opCtx, documentKey, opTime, preImageOpTime);
+ if (cloner && getIsMigrating(opCtx)) {
+ cloner->onDeleteOp(opCtx, documentKey, opTime, preImageOpTime);
}
}