diff options
author | Blake Oler <blake.oler@mongodb.com> | 2018-12-11 13:21:07 -0500 |
---|---|---|
committer | Blake Oler <blake.oler@mongodb.com> | 2018-12-28 00:13:08 -0500 |
commit | 84a0dd98f9bedec0d104b912f23b3a1221ae456e (patch) | |
tree | eaea33ef45ad285977b06c9f180b748c28fc93c2 /src | |
parent | 86b6aca9fa1940e85bba87261d1494ef2c208a4a (diff) | |
download | mongo-84a0dd98f9bedec0d104b912f23b3a1221ae456e.tar.gz |
SERVER-38284 Create concurrency lock for CollectionShardingRuntime
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/active_migrations_registry.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_runtime.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_runtime.h | 50 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.h | 10 | ||||
-rw-r--r-- | src/mongo/db/s/op_observer_sharding_impl.cpp | 38 |
7 files changed, 118 insertions, 24 deletions
diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp index 78503d62090..6f12e0119ec 100644 --- a/src/mongo/db/s/active_migrations_registry.cpp +++ b/src/mongo/db/s/active_migrations_registry.cpp @@ -121,9 +121,10 @@ BSONObj ActiveMigrationsRegistry::getActiveMigrationStatusReport(OperationContex if (nss) { // Lock the collection so nothing changes while we're getting the migration report. AutoGetCollection autoColl(opCtx, nss.get(), MODE_IS); + auto csr = CollectionShardingRuntime::get(opCtx, nss.get()); + auto csrLock = CollectionShardingRuntimeLock::lock(opCtx, csr); - if (auto msm = - MigrationSourceManager::get(CollectionShardingRuntime::get(opCtx, nss.get()))) { + if (auto msm = MigrationSourceManager::get(csr, csrLock)) { return msm->getMigrationStatusReport(); } } diff --git a/src/mongo/db/s/collection_sharding_runtime.cpp b/src/mongo/db/s/collection_sharding_runtime.cpp index 6fc514ca2a7..968905645d7 100644 --- a/src/mongo/db/s/collection_sharding_runtime.cpp +++ b/src/mongo/db/s/collection_sharding_runtime.cpp @@ -75,6 +75,7 @@ CollectionShardingRuntime::CollectionShardingRuntime(ServiceContext* sc, NamespaceString nss, executor::TaskExecutor* rangeDeleterExecutor) : CollectionShardingState(nss), + _stateChangeMutex(nss.toString()), _nss(std::move(nss)), _metadataManager(std::make_shared<MetadataManager>(sc, _nss, rangeDeleterExecutor)) { if (isNamespaceAlwaysUnsharded(_nss)) { @@ -180,6 +181,26 @@ boost::optional<ScopedCollectionMetadata> CollectionShardingRuntime::_getMetadat return _metadataManager->getActiveMetadata(_metadataManager, atClusterTime); } +CollectionShardingRuntimeLock::CollectionShardingRuntimeLock(OperationContext* opCtx, + CollectionShardingRuntime* csr, + LockMode lockMode) + : _lock([&]() -> CSRLock { + invariant(lockMode == MODE_IS || lockMode == MODE_X); + return (lockMode == MODE_IS + ? CSRLock(Lock::SharedLock(opCtx->lockState(), csr->_stateChangeMutex)) + : CSRLock(Lock::ExclusiveLock(opCtx->lockState(), csr->_stateChangeMutex))); + }()) {} + +CollectionShardingRuntimeLock CollectionShardingRuntimeLock::lock(OperationContext* opCtx, + CollectionShardingRuntime* csr) { + return CollectionShardingRuntimeLock(opCtx, csr, MODE_IS); +} + +CollectionShardingRuntimeLock CollectionShardingRuntimeLock::lockExclusive( + OperationContext* opCtx, CollectionShardingRuntime* csr) { + return CollectionShardingRuntimeLock(opCtx, csr, MODE_X); +} + CollectionCriticalSection::CollectionCriticalSection(OperationContext* opCtx, NamespaceString ns) : _nss(std::move(ns)), _opCtx(opCtx) { AutoGetCollection autoColl(_opCtx, _nss, MODE_IX, MODE_X); diff --git a/src/mongo/db/s/collection_sharding_runtime.h b/src/mongo/db/s/collection_sharding_runtime.h index fe678268ca6..5acfa6c4258 100644 --- a/src/mongo/db/s/collection_sharding_runtime.h +++ b/src/mongo/db/s/collection_sharding_runtime.h @@ -31,13 +31,17 @@ #pragma once #include "mongo/base/disallow_copying.h" +#include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/namespace_string.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/metadata_manager.h" +#include "mongo/stdx/variant.h" #include "mongo/util/decorable.h" namespace mongo { +class CollectionShardingRuntimeLock; + /** * See the comments for CollectionShardingState for more information on how this class fits in the * sharding architecture. @@ -52,8 +56,8 @@ public: executor::TaskExecutor* rangeDeleterExecutor); /** - * Obtains the sharding state for the specified collection. If it does not exist, it will be - * created and will remain active until the collection is dropped or unsharded. + * Obtains the sharding runtime state for the specified collection. If it does not exist, it + * will be created and will remain active until the collection is dropped or unsharded. * * Must be called with some lock held on the specific collection being looked up and the * returned pointer should never be stored. @@ -142,9 +146,15 @@ public: } private: + friend CollectionShardingRuntimeLock; + friend boost::optional<Date_t> CollectionRangeDeleter::cleanUpNextRange( OperationContext*, NamespaceString const&, OID const&, int, CollectionRangeDeleter*); + // Object-wide ResourceMutex to protect changes to the CollectionShardingRuntime or objects + // held within. + Lock::ResourceMutex _stateChangeMutex; + // Namespace this state belongs to. const NamespaceString _nss; @@ -156,6 +166,42 @@ private: }; /** + * RAII-style class that locks the CollectionShardingRuntime using the CollectionShardingRuntime's + * ResourceMutex. The lock will be created and acquired on construction. The lock will be dismissed + * upon destruction of the CollectionShardingRuntimeLock object. + */ +class CollectionShardingRuntimeLock { + +public: + using CSRLock = stdx::variant<Lock::SharedLock, Lock::ExclusiveLock>; + + /** + * Locks the sharding runtime state for the specified collection with the + * CollectionShardingRuntime object's ResourceMutex in MODE_IS. When the object goes out of + * scope, the ResourceMutex will be unlocked. + */ + static CollectionShardingRuntimeLock lock(OperationContext* opCtx, + CollectionShardingRuntime* csr); + + /** + * Follows the same functionality as the CollectionShardingRuntimeLock lock method, except + * that lockExclusive takes the ResourceMutex in MODE_X. + */ + static CollectionShardingRuntimeLock lockExclusive(OperationContext* opCtx, + CollectionShardingRuntime* csr); + +private: + CollectionShardingRuntimeLock(OperationContext* opCtx, + CollectionShardingRuntime* csr, + LockMode lockMode); + + // The lock created and locked upon construction of a CollectionShardingRuntimeLock object. + // It locks the ResourceMutex taken from the CollectionShardingRuntime class, passed in on + // construction. + CSRLock _lock; +}; + +/** * RAII-style class, which obtains a reference to the critical section for the specified collection. */ class CollectionCriticalSection { diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp index 46dff5f9fb8..423158365a4 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp @@ -72,7 +72,10 @@ public: str::stream() << "Collection " << nss->ns() << " does not exist", _autoColl->getCollection()); - if (auto msm = MigrationSourceManager::get(CollectionShardingRuntime::get(opCtx, *nss))) { + auto csr = CollectionShardingRuntime::get(opCtx, *nss); + _csrLock.emplace(CollectionShardingRuntimeLock::lock(opCtx, csr)); + + if (auto msm = MigrationSourceManager::get(csr, *_csrLock)) { // It is now safe to access the cloner _chunkCloner = dynamic_cast<MigrationChunkClonerSourceLegacy*>(msm->getCloner()); invariant(_chunkCloner); @@ -110,6 +113,10 @@ private: // Scoped database + collection lock boost::optional<AutoGetCollection> _autoColl; + // The CollectionShardingRuntimeLock corresponding to the collection to which this + // migration belongs. + boost::optional<CollectionShardingRuntimeLock> _csrLock; + // Contains the active cloner for the namespace MigrationChunkClonerSourceLegacy* _chunkCloner; }; diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 80a4e75fc56..e5ef3fe1d57 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -120,7 +120,8 @@ MONGO_FAIL_POINT_DEFINE(failMigrationCommit); MONGO_FAIL_POINT_DEFINE(hangBeforeLeavingCriticalSection); MONGO_FAIL_POINT_DEFINE(migrationCommitNetworkError); -MigrationSourceManager* MigrationSourceManager::get(CollectionShardingRuntime& csr) { +MigrationSourceManager* MigrationSourceManager::get(CollectionShardingRuntime* csr, + CollectionShardingRuntimeLock& csrLock) { return msmForCsr(csr); } @@ -235,7 +236,6 @@ Status MigrationSourceManager::startClone(OperationContext* opCtx) { _cloneAndCommitTimer.reset(); { - // Register for notifications from the replication subsystem const auto metadata = _getCurrentMetadataAndCheckEpoch(opCtx); // Having the metadata manager registered on the collection sharding state is what indicates @@ -246,8 +246,9 @@ Status MigrationSourceManager::startClone(OperationContext* opCtx) { _args, metadata->getKeyPattern(), _donorConnStr, _recipientHost); AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X); - auto* const css = CollectionShardingRuntime::get(opCtx, getNss()); - invariant(nullptr == std::exchange(msmForCsr(css), this)); + auto csr = CollectionShardingRuntime::get(opCtx, getNss()); + auto lockedCsr = CollectionShardingRuntimeLock::lockExclusive(opCtx, csr); + invariant(nullptr == std::exchange(msmForCsr(csr), this)); _state = kCloning; } diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index eaaacdf0590..b2c8546b265 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -72,10 +72,12 @@ class MigrationSourceManager { MONGO_DISALLOW_COPYING(MigrationSourceManager); public: - static MigrationSourceManager* get(CollectionShardingRuntime& csr); - static MigrationSourceManager* get(CollectionShardingRuntime* csr) { - return get(*csr); - } + /** + * Retrieves the MigrationSourceManager pointer that corresponds to the given collection under + * a CollectionShardingRuntime that has its ResourceMutex locked. + */ + static MigrationSourceManager* get(CollectionShardingRuntime* csr, + CollectionShardingRuntimeLock& csrLock); /** * Instantiates a new migration source manager with the specified migration parameters. Must be diff --git a/src/mongo/db/s/op_observer_sharding_impl.cpp b/src/mongo/db/s/op_observer_sharding_impl.cpp index 8c990377d17..134727d46b9 100644 --- a/src/mongo/db/s/op_observer_sharding_impl.cpp +++ b/src/mongo/db/s/op_observer_sharding_impl.cpp @@ -60,8 +60,9 @@ void assertIntersectingChunkHasNotMoved(OperationContext* opCtx, bool OpObserverShardingImpl::isMigrating(OperationContext* opCtx, NamespaceString const& nss, BSONObj const& docToDelete) { - auto css = CollectionShardingRuntime::get(opCtx, nss); - auto msm = MigrationSourceManager::get(css); + auto csr = CollectionShardingRuntime::get(opCtx, nss); + auto csrLock = CollectionShardingRuntimeLock::lock(opCtx, csr); + auto msm = MigrationSourceManager::get(csr, csrLock); return msm && msm->getCloner()->isDocumentInMigratingChunk(docToDelete); } @@ -83,9 +84,14 @@ void OpObserverShardingImpl::shardObserveInsertOp(OperationContext* opCtx, if (css) { css->checkShardVersionOrThrow(opCtx); - auto msm = MigrationSourceManager::get(css); - if (msm) { - msm->getCloner()->onInsertOp(opCtx, insertedDoc, opTime); + { + auto csr = CollectionShardingRuntime::get(opCtx, nss); + auto csrLock = CollectionShardingRuntimeLock::lock(opCtx, csr); + + auto msm = MigrationSourceManager::get(csr, csrLock); + if (msm) { + msm->getCloner()->onInsertOp(opCtx, insertedDoc, opTime); + } } if (inMultiDocumentTransaction && @@ -104,9 +110,14 @@ void OpObserverShardingImpl::shardObserveUpdateOp(OperationContext* opCtx, auto* const css = CollectionShardingRuntime::get(opCtx, nss); css->checkShardVersionOrThrow(opCtx); - auto msm = MigrationSourceManager::get(css); - if (msm) { - msm->getCloner()->onUpdateOp(opCtx, updatedDoc, opTime, prePostImageOpTime); + { + auto csr = CollectionShardingRuntime::get(opCtx, nss); + auto csrLock = CollectionShardingRuntimeLock::lock(opCtx, csr); + + auto msm = MigrationSourceManager::get(csr, csrLock); + if (msm) { + msm->getCloner()->onUpdateOp(opCtx, updatedDoc, opTime, prePostImageOpTime); + } } if (inMultiDocumentTransaction && repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime()) { @@ -124,9 +135,14 @@ void OpObserverShardingImpl::shardObserveDeleteOp(OperationContext* opCtx, auto* const css = CollectionShardingRuntime::get(opCtx, nss); css->checkShardVersionOrThrow(opCtx); - auto msm = MigrationSourceManager::get(css); - if (msm && isMigrating) { - msm->getCloner()->onDeleteOp(opCtx, documentKey, opTime, preImageOpTime); + { + auto csr = CollectionShardingRuntime::get(opCtx, nss); + auto csrLock = CollectionShardingRuntimeLock::lock(opCtx, csr); + + auto msm = MigrationSourceManager::get(csr, csrLock); + if (msm && isMigrating) { + msm->getCloner()->onDeleteOp(opCtx, documentKey, opTime, preImageOpTime); + } } if (inMultiDocumentTransaction && repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime()) { |