/** * Copyright (C) 2018-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include "mongo/base/string_data.h" #include "mongo/db/jsobj.h" #include "mongo/platform/atomic_word.h" #include "mongo/s/catalog/type_database_gen.h" #include "mongo/s/catalog/type_index_catalog.h" #include "mongo/s/catalog_cache_loader.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/shard_version.h" #include "mongo/s/sharding_index_catalog_cache.h" #include "mongo/s/type_collection_common_types_gen.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/read_through_cache.h" namespace mongo { static constexpr int kMaxNumStaleVersionRetries = 10; class ComparableDatabaseVersion; using DatabaseTypeCache = ReadThroughCache; using DatabaseTypeValueHandle = DatabaseTypeCache::ValueHandle; using CachedDatabaseInfo = DatabaseTypeValueHandle; struct CollectionRoutingInfo { CollectionRoutingInfo(ChunkManager&& chunkManager, boost::optional&& shardingIndexesCatalog) : cm(std::move(chunkManager)), sii(std::move(shardingIndexesCatalog)) {} ChunkManager cm; boost::optional sii; ShardVersion getCollectionVersion() const; ShardVersion getShardVersion(const ShardId& shardId) const; }; /** * This class wrap a DatabaseVersion object augmenting it with: * - a sequence number to allow for forced catalog cache refreshes * - a sequence number to disambiguate scenarios in which the DatabaseVersion isn't valid */ class ComparableDatabaseVersion { public: /** * Creates a ComparableDatabaseVersion that wraps the given DatabaseVersion. * * If version is boost::none it creates a ComparableDatabaseVersion that doesn't have a valid * version. This is useful in some scenarios in which the DatabaseVersion is provided later * through ComparableDatabaseVersion::setVersion or to represent that a Database doesn't exist */ static ComparableDatabaseVersion makeComparableDatabaseVersion( const boost::optional& version); /** * Creates a new instance which will artificially be greater than any * previously created ComparableDatabaseVersion and smaller than any instance * created afterwards. Used as means to cause the collections cache to * attempt a refresh in situations where causal consistency cannot be * inferred. */ static ComparableDatabaseVersion makeComparableDatabaseVersionForForcedRefresh(); /** * Empty constructor needed by the ReadThroughCache. * * Instances created through this constructor will be always less than the ones created through * the static constructor. */ ComparableDatabaseVersion() = default; std::string toString() const; bool operator==(const ComparableDatabaseVersion& other) const; bool operator!=(const ComparableDatabaseVersion& other) const { return !(*this == other); } bool operator<(const ComparableDatabaseVersion& other) const; bool operator>(const ComparableDatabaseVersion& other) const { return other < *this; } bool operator<=(const ComparableDatabaseVersion& other) const { return !(*this > other); } bool operator>=(const ComparableDatabaseVersion& other) const { return !(*this < other); } private: friend class CatalogCache; static AtomicWord _disambiguatingSequenceNumSource; static AtomicWord _forcedRefreshSequenceNumSource; ComparableDatabaseVersion(boost::optional version, uint64_t disambiguatingSequenceNum, uint64_t forcedRefreshSequenceNum) : _dbVersion(std::move(version)), _disambiguatingSequenceNum(disambiguatingSequenceNum), _forcedRefreshSequenceNum(forcedRefreshSequenceNum) {} void setDatabaseVersion(const DatabaseVersion& version); boost::optional _dbVersion; uint64_t _disambiguatingSequenceNum{0}; uint64_t _forcedRefreshSequenceNum{0}; }; /** * This is the root of the "read-only" hierarchy of cached catalog metadata. It is read only * in the sense that it only reads from the persistent store, but never writes to it. Instead * writes happen through the ShardingCatalogManager and the cache hierarchy needs to be invalidated. */ class CatalogCache { CatalogCache(const CatalogCache&) = delete; CatalogCache& operator=(const CatalogCache&) = delete; public: CatalogCache(ServiceContext* service, CatalogCacheLoader& cacheLoader); virtual ~CatalogCache(); /** * Shuts down and joins the executor used by all the caches to run their blocking work. */ void shutDownAndJoin(); /** * Blocking method that ensures the specified database is in the cache, loading it if necessary, * and returns it. If the database was not in cache, all the sharded collections will be in the * 'needsRefresh' state. */ StatusWith getDatabase(OperationContext* opCtx, StringData dbName, bool allowLocks = false); /** * Blocking method to get both the placement information and the index information for a * collection. * * If the collection is sharded, returns placement info initialized with a ChunkManager and a * list of global indexes that may be empty if no global indexes exist. If the collection is not * sharded, returns placement info initialized with the primary shard for the specified database * and an empty list representing no global indexes. If an error occurs while loading the * metadata, returns a failed status. * * If the given atClusterTime is so far in the past that it is not possible to construct * placement info, returns a StaleClusterTime error. */ StatusWith getCollectionRoutingInfoAt(OperationContext* opCtx, const NamespaceString& nss, Timestamp atClusterTime); /** * Same as the getCollectionRoutingInfoAt call above, but returns the latest known routing * information for the specified namespace. * * While this method may fail under the same circumstances as getCollectionRoutingInfoAt, it is * guaranteed to never throw StaleClusterTime, because the latest routing information should * always be available. */ virtual StatusWith getCollectionRoutingInfo(OperationContext* opCtx, const NamespaceString& nss, bool allowLocks = false); /** * Same as getDatbase above, but in addition forces the database entry to be refreshed. */ StatusWith getDatabaseWithRefresh(OperationContext* opCtx, StringData dbName); /** * Same as getCollectionRoutingInfo above, but in addition causes the namespace to be refreshed. */ StatusWith getCollectionRoutingInfoWithRefresh( OperationContext* opCtx, const NamespaceString& nss); /** * Same as getCollectionRoutingInfo above, but in addition, causes the placement information for * the namespace to be refreshed. Will only refresh the index information if the collection * uuid from the placement information does not match the collection uuid from the cached index * information. */ StatusWith getCollectionRoutingInfoWithPlacementRefresh( OperationContext* opCtx, const NamespaceString& nss); /** * Same as getCollectionRoutingInfo above, but in addition, causes the index information for the * namespace to be refreshed. Will only refresh the placement information if the collection uuid * from the index information does not match the collection uuid from the cached placement * information. */ StatusWith getCollectionRoutingInfoWithIndexRefresh( OperationContext* opCtx, const NamespaceString& nss); /** * Same as getCollectionRoutingInfo above, but throws NamespaceNotSharded error if the namespace * is not sharded. */ CollectionRoutingInfo getShardedCollectionRoutingInfo(OperationContext* opCtx, const NamespaceString& nss); /** * Same as getCollectionRoutingInfoWithRefresh above, but in addition returns a * NamespaceNotSharded error if the collection is not sharded. */ StatusWith getShardedCollectionRoutingInfoWithRefresh( OperationContext* opCtx, const NamespaceString& nss); /** * Same as getCollectionRoutingInfoWithPlacementRefresh above, but in addition returns a * NamespaceNotSharded error if the collection is not sharded. */ StatusWith getShardedCollectionRoutingInfoWithPlacementRefresh( OperationContext* opCtx, const NamespaceString& nss); /** * Advances the version in the cache for the given database. * * To be called with the wantedVersion returned by a targeted node in case of a * StaleDatabaseVersion response. * * In the case the passed version is boost::none, invalidates the cache for the given database. */ void onStaleDatabaseVersion(StringData dbName, const boost::optional& wantedVersion); /** * Sets whether this operation should block behind a catalog cache refresh. */ static void setOperationShouldBlockBehindCatalogCacheRefresh(OperationContext* opCtx, bool shouldBlock); /** * Invalidates a single shard for the current collection if the epochs given in the chunk * versions match. Otherwise, invalidates the entire collection, causing any future targetting * requests to block on an upcoming catalog cache refresh. */ void invalidateShardOrEntireCollectionEntryForShardedCollection( const NamespaceString& nss, const boost::optional& wantedVersion, const ShardId& shardId); /** * Non-blocking method, which invalidates all namespaces which contain data on the specified * shard and all databases which have the shard listed as their primary shard. */ void invalidateEntriesThatReferenceShard(const ShardId& shardId); /** * Non-blocking method, which removes the entire specified database (including its collections) * from the cache. */ void purgeDatabase(StringData dbName); /** * Non-blocking method, which removes all databases (including their collections) from the * cache. */ void purgeAllDatabases(); /** * Reports statistics about the catalog cache to be used by serverStatus */ void report(BSONObjBuilder* builder) const; /** * Non-blocking method that marks the current database entry for the dbName as needing * refresh. Will cause all further targetting attempts to block on a catalog cache refresh, * even if they do not require causal consistency. */ void invalidateDatabaseEntry_LINEARIZABLE(const StringData& dbName); /** * Non-blocking method that marks the current collection entry for the namespace as needing * refresh. Will cause all further targetting attempts to block on a catalog cache refresh, * even if they do not require causal consistency. */ void invalidateCollectionEntry_LINEARIZABLE(const NamespaceString& nss); void invalidateIndexEntry_LINEARIZABLE(const NamespaceString& nss); private: class DatabaseCache : public DatabaseTypeCache { public: DatabaseCache(ServiceContext* service, ThreadPoolInterface& threadPool, CatalogCacheLoader& catalogCacheLoader); private: LookupResult _lookupDatabase(OperationContext* opCtx, const std::string& dbName, const ValueHandle& dbType, const ComparableDatabaseVersion& previousDbVersion); CatalogCacheLoader& _catalogCacheLoader; Mutex _mutex = MONGO_MAKE_LATCH("DatabaseCache::_mutex"); }; class CollectionCache : public RoutingTableHistoryCache { public: CollectionCache(ServiceContext* service, ThreadPoolInterface& threadPool, CatalogCacheLoader& catalogCacheLoader); void reportStats(BSONObjBuilder* builder) const; private: LookupResult _lookupCollection(OperationContext* opCtx, const NamespaceString& nss, const ValueHandle& collectionHistory, const ComparableChunkVersion& previousChunkVersion); CatalogCacheLoader& _catalogCacheLoader; Mutex _mutex = MONGO_MAKE_LATCH("CollectionCache::_mutex"); struct Stats { // Tracks how many incremental refreshes are waiting to complete currently AtomicWord numActiveIncrementalRefreshes{0}; // Cumulative, always-increasing counter of how many incremental refreshes have been // kicked off AtomicWord countIncrementalRefreshesStarted{0}; // Tracks how many full refreshes are waiting to complete currently AtomicWord numActiveFullRefreshes{0}; // Cumulative, always-increasing counter of how many full refreshes have been kicked off AtomicWord countFullRefreshesStarted{0}; // Cumulative, always-increasing counter of how many full or incremental refreshes // failed for whatever reason AtomicWord countFailedRefreshes{0}; /** * Reports the accumulated statistics for serverStatus. */ void report(BSONObjBuilder* builder) const; } _stats; void _updateRefreshesStats(bool isIncremental, bool add); }; class IndexCache : public ShardingIndexesCatalogRTCBase { public: IndexCache(ServiceContext* service, ThreadPoolInterface& threadPool); private: LookupResult _lookupIndexes(OperationContext* opCtx, const NamespaceString& nss, const ValueHandle& indexes, const ComparableIndexVersion& previousIndexVersion); Mutex _mutex = MONGO_MAKE_LATCH("IndexCache::_mutex"); }; StatusWith _getCollectionPlacementInfoAt(OperationContext* opCtx, const NamespaceString& nss, boost::optional atClusterTime, bool allowLocks = false); boost::optional _getCollectionIndexInfoAt( OperationContext* opCtx, const NamespaceString& nss, bool allowLocks = false); void _triggerPlacementVersionRefresh(OperationContext* opCtx, const NamespaceString& nss); void _triggerIndexVersionRefresh(OperationContext* opCtx, const NamespaceString& nss); // Same as getCollectionRoutingInfo but will fetch the index information from the cache even if // the placement information is not sharded. Used internally when the a refresh is requested for // the index component. StatusWith _getCollectionRoutingInfoWithoutOptimization( OperationContext* opCtx, const NamespaceString& nss); // Interface from which chunks will be retrieved CatalogCacheLoader& _cacheLoader; // Executor on which the caches below will execute their blocking work ThreadPool _executor; DatabaseCache _databaseCache; CollectionCache _collectionCache; IndexCache _indexCache; /** * Encapsulates runtime statistics across all databases and collections in this catalog cache */ struct Stats { // Counts how many times threads hit stale config exception (which is what triggers metadata // refreshes) AtomicWord countStaleConfigErrors{0}; // Cumulative, always-increasing counter of how much time threads waiting for refresh // combined AtomicWord totalRefreshWaitTimeMicros{0}; /** * Reports the accumulated statistics for serverStatus. */ void report(BSONObjBuilder* builder) const; } _stats; }; } // namespace mongo