diff options
-rw-r--r-- | src/mongo/db/s/catalog_cache_loader_mock.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/s/catalog_cache_loader_mock.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/s/read_only_catalog_cache_loader.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/s/read_only_catalog_cache_loader.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader.cpp | 65 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader.h | 31 | ||||
-rw-r--r-- | src/mongo/s/catalog_cache_loader.h | 9 | ||||
-rw-r--r-- | src/mongo/s/config_server_catalog_cache_loader.cpp | 7 | ||||
-rw-r--r-- | src/mongo/s/config_server_catalog_cache_loader.h | 3 |
10 files changed, 55 insertions, 83 deletions
diff --git a/src/mongo/db/s/catalog_cache_loader_mock.cpp b/src/mongo/db/s/catalog_cache_loader_mock.cpp index 0a2685742d2..42aeb43775f 100644 --- a/src/mongo/db/s/catalog_cache_loader_mock.cpp +++ b/src/mongo/db/s/catalog_cache_loader_mock.cpp @@ -95,10 +95,7 @@ void CatalogCacheLoaderMock::waitForDatabaseFlush(OperationContext* opCtx, Strin } std::shared_ptr<Notification<void>> CatalogCacheLoaderMock::getChunksSince( - const NamespaceString& nss, - ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) { - + const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) { auto notify = std::make_shared<Notification<void>>(); uassertStatusOK(_threadPool.schedule([ this, notify, callbackFn ]() noexcept { diff --git a/src/mongo/db/s/catalog_cache_loader_mock.h b/src/mongo/db/s/catalog_cache_loader_mock.h index 6709d841224..e9b3e3101fb 100644 --- a/src/mongo/db/s/catalog_cache_loader_mock.h +++ b/src/mongo/db/s/catalog_cache_loader_mock.h @@ -57,8 +57,7 @@ public: std::shared_ptr<Notification<void>> getChunksSince( const NamespaceString& nss, ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) - override; + GetChunksSinceCallbackFn callbackFn) override; void getDatabase( StringData dbName, diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 424ce3ce548..d457d260f09 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -46,6 +46,7 @@ #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/s/stale_exception.h" #include "mongo/util/log.h" +#include "mongo/util/string_map.h" namespace mongo { namespace { @@ -114,9 +115,9 @@ public: auto it = _collections.find(ns); if (it == _collections.end()) { - auto inserted = _collections.emplace( + auto inserted = _collections.try_emplace( ns, - std::make_unique<CollectionShardingState>(get.owner(this), NamespaceString(ns))); + std::make_shared<CollectionShardingState>(get.owner(this), NamespaceString(ns))); invariant(inserted.second); it = std::move(inserted.first); } @@ -156,8 +157,7 @@ public: private: mutable stdx::mutex _mutex; - using CollectionsMap = - stdx::unordered_map<std::string, std::unique_ptr<CollectionShardingState>>; + using CollectionsMap = StringMap<std::shared_ptr<CollectionShardingState>>; CollectionsMap _collections; }; diff --git a/src/mongo/db/s/read_only_catalog_cache_loader.cpp b/src/mongo/db/s/read_only_catalog_cache_loader.cpp index 4c3c317bf15..9757a731e00 100644 --- a/src/mongo/db/s/read_only_catalog_cache_loader.cpp +++ b/src/mongo/db/s/read_only_catalog_cache_loader.cpp @@ -44,9 +44,7 @@ void ReadOnlyCatalogCacheLoader::waitForDatabaseFlush(OperationContext* opCtx, S } std::shared_ptr<Notification<void>> ReadOnlyCatalogCacheLoader::getChunksSince( - const NamespaceString& nss, - ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) { + const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) { return _configServerLoader.getChunksSince(nss, version, callbackFn); } diff --git a/src/mongo/db/s/read_only_catalog_cache_loader.h b/src/mongo/db/s/read_only_catalog_cache_loader.h index 2ff0548aaed..73891e7979a 100644 --- a/src/mongo/db/s/read_only_catalog_cache_loader.h +++ b/src/mongo/db/s/read_only_catalog_cache_loader.h @@ -49,8 +49,7 @@ public: std::shared_ptr<Notification<void>> getChunksSince( const NamespaceString& nss, ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) - override; + GetChunksSinceCallbackFn callbackFn) override; void getDatabase( StringData dbName, diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp index ec7bea979b6..b06ae7565e0 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp @@ -381,7 +381,7 @@ void ShardServerCatalogCacheLoader::notifyOfCollectionVersionUpdate(const Namesp } void ShardServerCatalogCacheLoader::initializeReplicaSetRole(bool isPrimary) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<stdx::mutex> lg(_mutex); invariant(_role == ReplicaSetRole::None); if (isPrimary) { @@ -392,7 +392,7 @@ void ShardServerCatalogCacheLoader::initializeReplicaSetRole(bool isPrimary) { } void ShardServerCatalogCacheLoader::onStepDown() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<stdx::mutex> lg(_mutex); invariant(_role != ReplicaSetRole::None); _contexts.interrupt(ErrorCodes::PrimarySteppedDown); ++_term; @@ -400,58 +400,47 @@ void ShardServerCatalogCacheLoader::onStepDown() { } void ShardServerCatalogCacheLoader::onStepUp() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<stdx::mutex> lg(_mutex); invariant(_role != ReplicaSetRole::None); ++_term; _role = ReplicaSetRole::Primary; } std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSince( - const NamespaceString& nss, - ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) { - long long currentTerm; + const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) { + auto notify = std::make_shared<Notification<void>>(); + bool isPrimary; - { - // Take the mutex so that we can discern whether we're primary or secondary and schedule a - // task with the corresponding _term value. + long long term; + std::tie(isPrimary, term) = [&] { stdx::lock_guard<stdx::mutex> lock(_mutex); - invariant(_role != ReplicaSetRole::None); - - currentTerm = _term; - isPrimary = (_role == ReplicaSetRole::Primary); - } - - auto notify = std::make_shared<Notification<void>>(); + return std::make_tuple(_role == ReplicaSetRole::Primary, _term); + }(); uassertStatusOK(_threadPool.schedule( - [ this, nss, version, callbackFn, notify, isPrimary, currentTerm ]() noexcept { + [ this, nss, version, callbackFn, notify, isPrimary, term ]() noexcept { auto context = _contexts.makeOperationContext(*Client::getCurrent()); + auto const opCtx = context.opCtx(); - { - stdx::lock_guard<stdx::mutex> lock(_mutex); - // We may have missed an OperationContextGroup interrupt since this operation began - // but before the OperationContext was added to the group. So we'll check that - // we're still in the same _term. - if (_term != currentTerm) { - callbackFn(context.opCtx(), - Status{ErrorCodes::Interrupted, - "Unable to refresh routing table because replica set state " - "changed or node is shutting down."}); - notify->set(); - return; + try { + { + // We may have missed an OperationContextGroup interrupt since this operation + // began but before the OperationContext was added to the group. So we'll check + // that we're still in the same _term. + stdx::lock_guard<stdx::mutex> lock(_mutex); + uassert(ErrorCodes::Interrupted, + "Unable to refresh routing table because replica set state changed or " + "the node is shutting down.", + _term == term); } - } - try { if (isPrimary) { - _schedulePrimaryGetChunksSince( - context.opCtx(), nss, version, currentTerm, callbackFn, notify); + _schedulePrimaryGetChunksSince(opCtx, nss, version, term, callbackFn, notify); } else { - _runSecondaryGetChunksSince(context.opCtx(), nss, version, callbackFn); + _runSecondaryGetChunksSince(opCtx, nss, version, callbackFn, notify); } } catch (const DBException& ex) { - callbackFn(context.opCtx(), ex.toStatus()); + callbackFn(opCtx, ex.toStatus()); notify->set(); } })); @@ -614,13 +603,15 @@ void ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince( OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& catalogCacheSinceVersion, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) { + stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn, + std::shared_ptr<Notification<void>> notify) { forcePrimaryCollectionRefreshAndWaitForReplication(opCtx, nss); // Read the local metadata. auto swCollAndChunks = _getCompletePersistedMetadataForSecondarySinceVersion(opCtx, nss, catalogCacheSinceVersion); callbackFn(opCtx, std::move(swCollAndChunks)); + notify->set(); } void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h index a3e908d85bf..52802a3c311 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.h +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h @@ -73,21 +73,10 @@ public: */ void notifyOfCollectionVersionUpdate(const NamespaceString& nss) override; - /** - * This must be called serially, never in parallel, including waiting for the returned - * Notification to be signalled. - * - * This function is robust to unexpected version requests from the CatalogCache. Requesting - * versions with epoches that do not match anything on the config server will not affect or - * clear the locally persisted metadata. Requesting versions higher than anything previous - * requested, or versions lower than already requested, will not mess up the locally persisted - * metadata, and will return what was requested if it exists. - */ std::shared_ptr<Notification<void>> getChunksSince( const NamespaceString& nss, ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) - override; + GetChunksSinceCallbackFn callbackFn) override; void getDatabase( StringData dbName, @@ -349,7 +338,8 @@ private: OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& catalogCacheSinceVersion, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn); + stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn, + std::shared_ptr<Notification<void>> notify); /** * Refreshes chunk metadata from the config server's metadata store, and schedules maintenance @@ -477,21 +467,19 @@ private: CollectionAndChangedChunks _getCompletePersistedMetadataForSecondarySinceVersion( OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& version); - // Used by the shard primary to retrieve chunk metadata from the config server. + // Loader used by the shard primary to retrieve the authoritative routing metadata from the + // config server const std::unique_ptr<CatalogCacheLoader> _configServerLoader; - // Thread pool used to load chunk metadata. + // Thread pool used to run blocking tasks which perform disk reads and writes ThreadPool _threadPool; + // Registry of notifications for changes happening to the shard's on-disk routing information NamespaceMetadataChangeNotifications _namespaceNotifications; - // Protects the class state below. + // Protects the class state below stdx::mutex _mutex; - // Map to track in progress persisted cache updates on the shard primary. - CollAndChunkTaskLists _collAndChunkTaskLists; - DbTaskLists _dbTaskLists; - // This value is bumped every time the set of currently scheduled tasks should no longer be // running. This includes, replica set state transitions and shutdown. long long _term{0}; @@ -502,6 +490,9 @@ private: // The collection of operation contexts in use by all threads. OperationContextGroup _contexts; + + CollAndChunkTaskLists _collAndChunkTaskLists; + DbTaskLists _dbTaskLists; }; } // namespace mongo diff --git a/src/mongo/s/catalog_cache_loader.h b/src/mongo/s/catalog_cache_loader.h index 90b2716fdfb..b5317390c1e 100644 --- a/src/mongo/s/catalog_cache_loader.h +++ b/src/mongo/s/catalog_cache_loader.h @@ -63,7 +63,6 @@ public: static CatalogCacheLoader& get(ServiceContext* serviceContext); static CatalogCacheLoader& get(OperationContext* opCtx); - /** * Used as a return value for getChunksSince. */ @@ -88,6 +87,9 @@ public: std::vector<ChunkType> changedChunks; }; + using GetChunksSinceCallbackFn = + stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)>; + /** * Initializes internal state. Must be called only once when sharding state is initialized. */ @@ -121,10 +123,7 @@ public: * Notification object can be waited on in order to ensure that. */ virtual std::shared_ptr<Notification<void>> getChunksSince( - const NamespaceString& nss, - ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> - callbackFn) = 0; + const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) = 0; /** * Non-blocking call, which requests the most recent db version for the given dbName from the diff --git a/src/mongo/s/config_server_catalog_cache_loader.cpp b/src/mongo/s/config_server_catalog_cache_loader.cpp index e6c7ecc138c..d174c7b7e32 100644 --- a/src/mongo/s/config_server_catalog_cache_loader.cpp +++ b/src/mongo/s/config_server_catalog_cache_loader.cpp @@ -25,7 +25,9 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ + #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + #include "mongo/platform/basic.h" #include "mongo/s/config_server_catalog_cache_loader.h" @@ -170,10 +172,7 @@ void ConfigServerCatalogCacheLoader::waitForDatabaseFlush(OperationContext* opCt } std::shared_ptr<Notification<void>> ConfigServerCatalogCacheLoader::getChunksSince( - const NamespaceString& nss, - ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) { - + const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) { auto notify = std::make_shared<Notification<void>>(); uassertStatusOK(_threadPool.schedule([ nss, version, notify, callbackFn ]() noexcept { diff --git a/src/mongo/s/config_server_catalog_cache_loader.h b/src/mongo/s/config_server_catalog_cache_loader.h index d0ff4e4daba..ad5b8e1b0ab 100644 --- a/src/mongo/s/config_server_catalog_cache_loader.h +++ b/src/mongo/s/config_server_catalog_cache_loader.h @@ -51,8 +51,7 @@ public: std::shared_ptr<Notification<void>> getChunksSince( const NamespaceString& nss, ChunkVersion version, - stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) - override; + GetChunksSinceCallbackFn callbackFn) override; void getDatabase( StringData dbName, |