diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-03-28 18:38:18 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-04-13 07:49:54 -0400 |
commit | 48d1e1bd6f56f3c20c018332fabafc147bfafc3b (patch) | |
tree | 09ba40a790993b93b96dc905dea378b537ddbc94 /src/mongo/s | |
parent | c431782e66416e7dbbd9db457d9507b65c6304ba (diff) | |
download | mongo-48d1e1bd6f56f3c20c018332fabafc147bfafc3b.tar.gz |
SERVER-20854 Refresh collection metadata on a separate thread
(cherry picked from commit 0885a0adbb813727cc5a2083224ac0d89763c276)
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/s/catalog_cache.cpp | 273 | ||||
-rw-r--r-- | src/mongo/s/catalog_cache.h | 27 | ||||
-rw-r--r-- | src/mongo/s/catalog_cache_loader.h | 92 | ||||
-rw-r--r-- | src/mongo/s/config_server_catalog_cache_loader.cpp | 169 | ||||
-rw-r--r-- | src/mongo/s/config_server_catalog_cache_loader.h | 52 |
6 files changed, 476 insertions, 139 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 91765872709..1ae2b8dd244 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -201,6 +201,7 @@ env.Library( 'chunk.cpp', 'chunk_manager.cpp', 'cluster_identity_loader.cpp', + 'config_server_catalog_cache_loader.cpp', 'config_server_client.cpp', 'grid.cpp', 'shard_util.cpp', @@ -213,6 +214,7 @@ env.Library( '$BUILD_DIR/mongo/executor/task_executor_pool', '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client', '$BUILD_DIR/mongo/s/query/cluster_cursor_manager', + '$BUILD_DIR/mongo/util/concurrency/thread_pool', 'client/sharding_client', 'common', ], diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp index 28e1859f530..36d5e82ac49 100644 --- a/src/mongo/s/catalog_cache.cpp +++ b/src/mongo/s/catalog_cache.cpp @@ -41,6 +41,7 @@ #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/client/shard_registry.h" +#include "mongo/s/config_server_catalog_cache_loader.h" #include "mongo/s/grid.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" @@ -54,114 +55,49 @@ namespace { const int kMaxInconsistentRoutingInfoRefreshAttempts = 3; /** - * Structure representing the generated query and sort order for a chunk diffing operation. - */ -struct QueryAndSort { - const BSONObj query; - const BSONObj sort; -}; - -/** - * Returns the query needed to find incremental changes to a collection from the config server. - */ -QueryAndSort createConfigDiffQuery(const NamespaceString& nss, ChunkVersion collectionVersion) { - // The query has to find all the chunks $gte the current max version. Currently, any splits and - // merges will increment the current max version. - BSONObjBuilder queryB; - queryB.append(ChunkType::ns(), nss.ns()); - - { - BSONObjBuilder tsBuilder(queryB.subobjStart(ChunkType::DEPRECATED_lastmod())); - tsBuilder.appendTimestamp("$gte", collectionVersion.toLong()); - tsBuilder.done(); - } - - // NOTE: IT IS IMPORTANT FOR CONSISTENCY THAT WE SORT BY ASC VERSION, IN ORDER TO HANDLE CURSOR - // YIELDING BETWEEN CHUNKS BEING MIGRATED - // - // This ensures that changes to chunk version (which will always be higher) will always come - // *after* our current position in the chunk cursor - - return QueryAndSort{queryB.obj(), BSON(ChunkType::DEPRECATED_lastmod() << 1)}; -} - -/** - * Blocking method, which refreshes the routing information for the specified collection. If - * 'existingRoutingInfo' has been specified uses this as a basis to perform an 'incremental' - * refresh, which only fetches the chunks which changed. Otherwise does a full refresh, fetching all - * the chunks for the collection. - * - * Returns the refreshed routing information if the collection is still sharded or nullptr if it is - * not. If refresh fails for any reason, throws a DBException. + * Given an (optional) initial routing table and a set of changed chunks returned by the catalog + * cache loader, produces a new routing table with the changes applied. * - * With the exception of ConflictingOperationInProgress, error codes thrown from this method are - * final in that there is nothing that can be done to remedy them other than pass the error to the - * user. + * If the collection is no longer sharded returns nullptr. If the epoch has changed, expects that + * the 'collectionChunksList' contains the full contents of the chunks collection for that namespace + * so that the routing table can be built from scratch. * - * ConflictingOperationInProgress indicates that the chunk metadata was found to be inconsistent. - * Since this may be transient, due to the collection being dropped or recreated, the caller must - * retry the reload up to some configurable number of attempts. + * Throws ConflictingOperationInProgress if the chunk metadata was found to be inconsistent (not + * containing all the necessary chunks, contains overlaps or chunks' epoch values are not the same + * as that of the collection). Since this situation may be transient, due to the collection being + * dropped or recreated concurrently, the caller must retry the reload up to some configurable + * number of attempts. */ std::shared_ptr<ChunkManager> refreshCollectionRoutingInfo( OperationContext* opCtx, const NamespaceString& nss, - std::shared_ptr<ChunkManager> existingRoutingInfo) { - Timer t; - - const auto catalogClient = Grid::get(opCtx)->catalogClient(opCtx); - - // Decide whether to do a full or partial load based on the state of the collection - auto collStatus = catalogClient->getCollection(opCtx, nss.ns()); - if (collStatus == ErrorCodes::NamespaceNotFound) { + std::shared_ptr<ChunkManager> existingRoutingInfo, + StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollectionAndChangedChunks) { + if (swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound) { return nullptr; } - const auto coll = uassertStatusOK(std::move(collStatus)).value; - if (coll.getDropped()) { - return nullptr; - } + const auto collectionAndChunks = uassertStatusOK(std::move(swCollectionAndChangedChunks)); + // Check whether the collection epoch might have changed ChunkVersion startingCollectionVersion; ChunkMap chunkMap = SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<std::shared_ptr<Chunk>>(); if (!existingRoutingInfo) { // If we don't have a basis chunk manager, do a full refresh - startingCollectionVersion = ChunkVersion(0, 0, coll.getEpoch()); - } else if (existingRoutingInfo->getVersion().epoch() != coll.getEpoch()) { + startingCollectionVersion = ChunkVersion(0, 0, collectionAndChunks.epoch); + } else if (existingRoutingInfo->getVersion().epoch() != collectionAndChunks.epoch) { // If the collection's epoch has changed, do a full refresh - startingCollectionVersion = ChunkVersion(0, 0, coll.getEpoch()); + startingCollectionVersion = ChunkVersion(0, 0, collectionAndChunks.epoch); } else { - // Otherwise do a partial refresh startingCollectionVersion = existingRoutingInfo->getVersion(); chunkMap = existingRoutingInfo->chunkMap(); } - log() << "Refreshing chunks for collection " << nss << " based on version " - << startingCollectionVersion; - - // Diff tracker should *always* find at least one chunk if collection exists - const auto diffQuery = createConfigDiffQuery(nss, startingCollectionVersion); - - // Query the chunks which have changed - std::vector<ChunkType> newChunks; - repl::OpTime opTime; - uassertStatusOK(Grid::get(opCtx)->catalogClient(opCtx)->getChunks( - opCtx, - diffQuery.query, - diffQuery.sort, - boost::none, - &newChunks, - &opTime, - repl::ReadConcernLevel::kMajorityReadConcern)); - - uassert(ErrorCodes::ConflictingOperationInProgress, - "No chunks were found for the collection", - !newChunks.empty()); - ChunkVersion collectionVersion = startingCollectionVersion; - for (const auto& chunk : newChunks) { + for (const auto& chunk : collectionAndChunks.changedChunks) { const auto& chunkVersion = chunk.getVersion(); uassert(ErrorCodes::ConflictingOperationInProgress, @@ -200,42 +136,34 @@ std::shared_ptr<ChunkManager> refreshCollectionRoutingInfo( // sequence number to detect batch writes not making progress because of chunks moving across // shards too frequently. if (collectionVersion == startingCollectionVersion) { - log() << "Refresh for collection " << nss << " took " << t.millis() - << " ms and didn't find any metadata changes"; - return existingRoutingInfo; } std::unique_ptr<CollatorInterface> defaultCollator; - if (!coll.getDefaultCollation().isEmpty()) { + if (!collectionAndChunks.defaultCollation.isEmpty()) { // The collation should have been validated upon collection creation defaultCollator = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) - ->makeFromBSON(coll.getDefaultCollation())); + ->makeFromBSON(collectionAndChunks.defaultCollation)); } - log() << "Refresh for collection " << nss << " took " << t.millis() << " ms and found version " - << collectionVersion; - return stdx::make_unique<ChunkManager>(nss, - coll.getKeyPattern(), + KeyPattern(collectionAndChunks.shardKeyPattern), std::move(defaultCollator), - coll.getUnique(), + collectionAndChunks.shardKeyIsUnique, std::move(chunkMap), collectionVersion); } } // namespace -CatalogCache::CatalogCache() = default; +CatalogCache::CatalogCache() : _cacheLoader(stdx::make_unique<ConfigServerCatalogCacheLoader>()) {} CatalogCache::~CatalogCache() = default; StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx, StringData dbName) { - stdx::lock_guard<stdx::mutex> lg(_mutex); - try { - return {CachedDatabaseInfo(_getDatabase_inlock(opCtx, dbName))}; + return {CachedDatabaseInfo(_getDatabase(opCtx, dbName))}; } catch (const DBException& ex) { return ex.toStatus(); } @@ -243,18 +171,16 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo( OperationContext* opCtx, const NamespaceString& nss) { - int numRefreshAttempts = 0; - while (true) { - stdx::unique_lock<stdx::mutex> ul(_mutex); - std::shared_ptr<DatabaseInfoEntry> dbEntry; try { - dbEntry = _getDatabase_inlock(opCtx, nss.db()); + dbEntry = _getDatabase(opCtx, nss.db()); } catch (const DBException& ex) { return ex.toStatus(); } + stdx::unique_lock<stdx::mutex> ul(_mutex); + auto& collections = dbEntry->collections; auto it = collections.find(nss.ns()); @@ -275,37 +201,24 @@ StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo( auto& collEntry = it->second; if (collEntry.needsRefresh) { - numRefreshAttempts++; - - try { - auto newRoutingInfo = - refreshCollectionRoutingInfo(opCtx, nss, std::move(collEntry.routingInfo)); - if (newRoutingInfo == nullptr) { - collections.erase(it); - - // Loop around so we can return an "unsharded" routing info - continue; - } - - collEntry.routingInfo = std::move(newRoutingInfo); - collEntry.needsRefresh = false; - } catch (const DBException& ex) { - // It is possible that the metadata is being changed concurrently, so retry the - // refresh with a wait - if (ex.getCode() == ErrorCodes::ConflictingOperationInProgress && - numRefreshAttempts < kMaxInconsistentRoutingInfoRefreshAttempts) { - ul.unlock(); - - log() << "Metadata refresh for " << nss << " failed and will be retried" - << causedBy(redact(ex)); - - // Do the sleep outside of the mutex - sleepFor(Milliseconds(10) * numRefreshAttempts); - continue; - } - - return ex.toStatus(); + auto refreshNotification = collEntry.refreshCompletionNotification; + if (!refreshNotification) { + refreshNotification = (collEntry.refreshCompletionNotification = + std::make_shared<Notification<Status>>()); + _scheduleCollectionRefresh_inlock( + dbEntry, std::move(collEntry.routingInfo), nss, 1); } + + // Wait on the notification outside of the mutex + ul.unlock(); + + auto refreshStatus = refreshNotification->get(opCtx); + if (!refreshStatus.isOK()) { + return refreshStatus; + } + + // Once the refresh is complete, loop around to get the latest value + continue; } return {CachedCollectionRoutingInfo(dbEntry->primaryShardId, collEntry.routingInfo)}; @@ -407,8 +320,10 @@ void CatalogCache::purgeAllDatabases() { _databases.clear(); } -std::shared_ptr<CatalogCache::DatabaseInfoEntry> CatalogCache::_getDatabase_inlock( - OperationContext* opCtx, StringData dbName) { +std::shared_ptr<CatalogCache::DatabaseInfoEntry> CatalogCache::_getDatabase(OperationContext* opCtx, + StringData dbName) { + stdx::lock_guard<stdx::mutex> lg(_mutex); + auto it = _databases.find(dbName); if (it != _databases.end()) { return it->second; @@ -441,6 +356,94 @@ std::shared_ptr<CatalogCache::DatabaseInfoEntry> CatalogCache::_getDatabase_inlo dbDesc.getPrimary(), dbDesc.getSharded(), std::move(collectionEntries)}); } +void CatalogCache::_scheduleCollectionRefresh_inlock( + std::shared_ptr<DatabaseInfoEntry> dbEntry, + std::shared_ptr<ChunkManager> existingRoutingInfo, + const NamespaceString& nss, + int refreshAttempt) { + Timer t; + + const ChunkVersion startingCollectionVersion = + (existingRoutingInfo ? existingRoutingInfo->getVersion() : ChunkVersion::UNSHARDED()); + + const auto refreshFailed_inlock = + [ this, t, dbEntry, nss, refreshAttempt ](const Status& status) noexcept { + log() << "Refresh for collection " << nss << " took " << t.millis() << " ms and failed" + << causedBy(redact(status)); + + auto& collections = dbEntry->collections; + auto it = collections.find(nss.ns()); + invariant(it != collections.end()); + auto& collEntry = it->second; + + // It is possible that the metadata is being changed concurrently, so retry the + // refresh again + if (status == ErrorCodes::ConflictingOperationInProgress && + refreshAttempt < kMaxInconsistentRoutingInfoRefreshAttempts) { + _scheduleCollectionRefresh_inlock(dbEntry, nullptr, nss, refreshAttempt + 1); + } else { + // Leave needsRefresh to true so that any subsequent get attempts will kick off + // another round of refresh + collEntry.refreshCompletionNotification->set(status); + collEntry.refreshCompletionNotification = nullptr; + } + }; + + const auto refreshCallback = + [ this, t, dbEntry, nss, existingRoutingInfo, refreshFailed_inlock ]( + OperationContext * opCtx, + StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { + std::shared_ptr<ChunkManager> newRoutingInfo; + try { + newRoutingInfo = refreshCollectionRoutingInfo( + opCtx, nss, std::move(existingRoutingInfo), std::move(swCollAndChunks)); + } catch (const DBException& ex) { + stdx::lock_guard<stdx::mutex> lg(_mutex); + refreshFailed_inlock(ex.toStatus()); + return; + } + + stdx::lock_guard<stdx::mutex> lg(_mutex); + auto& collections = dbEntry->collections; + auto it = collections.find(nss.ns()); + invariant(it != collections.end()); + auto& collEntry = it->second; + + collEntry.needsRefresh = false; + collEntry.refreshCompletionNotification->set(Status::OK()); + collEntry.refreshCompletionNotification = nullptr; + + if (!newRoutingInfo) { + log() << "Refresh for collection " << nss << " took " << t.millis() + << " and found the collection is not sharded"; + + collections.erase(it); + } else { + log() << "Refresh for collection " << nss << " took " << t.millis() + << " ms and found version " << newRoutingInfo->getVersion(); + + collEntry.routingInfo = std::move(newRoutingInfo); + } + }; + + log() << "Refreshing chunks for collection " << nss << " based on version " + << startingCollectionVersion; + + try { + _cacheLoader->getChunksSince(nss, startingCollectionVersion, refreshCallback); + } catch (const DBException& ex) { + const auto status = ex.toStatus(); + + // ConflictingOperationInProgress errors trigger retry of the catalog cache reload logic. If + // we failed to schedule the asynchronous reload, there is no point in doing another + // attempt. + invariant(status != ErrorCodes::ConflictingOperationInProgress); + + stdx::lock_guard<stdx::mutex> lg(_mutex); + refreshFailed_inlock(status); + } +} + CachedDatabaseInfo::CachedDatabaseInfo(std::shared_ptr<CatalogCache::DatabaseInfoEntry> db) : _db(std::move(db)) {} diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h index e76188303a9..5631c6fa00b 100644 --- a/src/mongo/s/catalog_cache.h +++ b/src/mongo/s/catalog_cache.h @@ -30,6 +30,7 @@ #include "mongo/base/disallow_copying.h" #include "mongo/base/string_data.h" +#include "mongo/s/catalog_cache_loader.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/chunk_version.h" #include "mongo/s/client/shard.h" @@ -121,9 +122,16 @@ private: * Cache entry describing a collection. */ struct CollectionRoutingInfoEntry { - std::shared_ptr<ChunkManager> routingInfo; - + // Specifies whether this cache entry needs a refresh (in which case routingInfo should not + // be relied on) or it doesn't, in which case there should be a non-null routingInfo. bool needsRefresh{true}; + + // Contains a notification to be waited on for the refresh to complete (only available if + // needsRefresh is true) + std::shared_ptr<Notification<Status>> refreshCompletionNotification; + + // Contains the cached routing information (only available if needsRefresh is false) + std::shared_ptr<ChunkManager> routingInfo; }; /** @@ -143,8 +151,19 @@ private: * Ensures that the specified database is in the cache, loading it if necessary. If the database * was not in cache, all the sharded collections will be in the 'needsRefresh' state. */ - std::shared_ptr<DatabaseInfoEntry> _getDatabase_inlock(OperationContext* opCtx, - StringData dbName); + std::shared_ptr<DatabaseInfoEntry> _getDatabase(OperationContext* opCtx, StringData dbName); + + /** + * Non-blocking call which schedules an asynchronous refresh for the specified namespace. The + * namespace must be in the 'needRefresh' state. + */ + void _scheduleCollectionRefresh_inlock(std::shared_ptr<DatabaseInfoEntry> dbEntry, + std::shared_ptr<ChunkManager> existingRoutingInfo, + const NamespaceString& nss, + int refreshAttempt); + + // Interface from which chunks will be retrieved + const std::unique_ptr<CatalogCacheLoader> _cacheLoader; // Mutex to serialize access to the structures below stdx::mutex _mutex; diff --git a/src/mongo/s/catalog_cache_loader.h b/src/mongo/s/catalog_cache_loader.h new file mode 100644 index 00000000000..8ffdb4a94e2 --- /dev/null +++ b/src/mongo/s/catalog_cache_loader.h @@ -0,0 +1,92 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" +#include "mongo/base/string_data.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/catalog/type_collection.h" +#include "mongo/s/chunk_version.h" +#include "mongo/stdx/functional.h" +#include "mongo/util/concurrency/notification.h" + +namespace mongo { + +class NamespaceString; +class OperationContext; + +/** + * Interface through which the sharding catalog cache requests the set of changed chunks to be + * retrieved from the persisted metadata store. + */ +class CatalogCacheLoader { +public: + virtual ~CatalogCacheLoader() = default; + + /** + * Used as a return value for getChunksSince. + */ + struct CollectionAndChangedChunks { + // Information about the entire collection + OID epoch; + BSONObj shardKeyPattern; + BSONObj defaultCollation; + bool shardKeyIsUnique; + + // The chunks which have changed sorted by their chunkVersion. This list might potentially + // contain all the chunks in the collection. + std::vector<ChunkType> changedChunks; + }; + + /** + * Non-blocking call, which requests the chunks changed since the specified version to be + * fetched from the persistent matadata store and invokes the callback function with the result. + * The callback function must never throw - it is a fatal error to do so. + * + * If for some reason the asynchronous fetch operation cannot be dispatched (for example on + * shutdown), throws a DBException. Otherwise it is guaranteed that the callback function will + * be invoked even on error and the returned notification will be signalled. + * + * The callbackFn object must not be destroyed until it has been called. The returned + * Notification object can be waited on in order to ensure that. + */ + virtual std::shared_ptr<Notification<void>> getChunksSince( + const NamespaceString& nss, + ChunkVersion version, + stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> + callbackFn) = 0; + +protected: + CatalogCacheLoader() = default; +}; + +} // namespace mongo diff --git a/src/mongo/s/config_server_catalog_cache_loader.cpp b/src/mongo/s/config_server_catalog_cache_loader.cpp new file mode 100644 index 00000000000..b26df2d73ae --- /dev/null +++ b/src/mongo/s/config_server_catalog_cache_loader.cpp @@ -0,0 +1,169 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/config_server_catalog_cache_loader.h" + +#include "mongo/db/client.h" +#include "mongo/db/operation_context.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/grid.h" +#include "mongo/stdx/memory.h" + +namespace mongo { + +using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunks; + +namespace { + +/** + * Constructs the default options for the thread pool used by the cache loader. + */ +ThreadPool::Options makeDefaultThreadPoolOptions() { + ThreadPool::Options options; + options.poolName = "CatalogCacheLoader"; + options.minThreads = 0; + options.maxThreads = 6; + + // Ensure all threads have a client + options.onCreateThread = [](const std::string& threadName) { + Client::initThread(threadName.c_str()); + }; + + return options; +} + +/** + * Structure repsenting the generated query and sort order for a chunk diffing operation. + */ +struct QueryAndSort { + const BSONObj query; + const BSONObj sort; +}; + +/** + * Returns the query needed to find incremental changes to a collection from the config server. + * + * The query has to find all the chunks $gte the current max version. Currently, any splits and + * merges will increment the current max version. + * + * The sort needs to be by ascending version in order to pick up the chunks which changed most + * recent and also in order to handle cursor yields between chunks being migrated/split/merged. This + * ensures that changes to chunk version (which will always be higher) will always come *after* our + * current position in the chunk cursor. + */ +QueryAndSort createConfigDiffQuery(const NamespaceString& nss, ChunkVersion collectionVersion) { + return {BSON(ChunkType::ns() << nss.ns() << ChunkType::DEPRECATED_lastmod() << GTE + << Timestamp(collectionVersion.toLong())), + BSON(ChunkType::DEPRECATED_lastmod() << 1)}; +} + +/** + * Blocking method, which returns the chunks which changed since the specified version. + */ +CollectionAndChangedChunks getChangedChunks(OperationContext* opCtx, + const NamespaceString& nss, + ChunkVersion sinceVersion) { + const auto catalogClient = Grid::get(opCtx)->catalogClient(opCtx); + + // Decide whether to do a full or partial load based on the state of the collection + const auto coll = uassertStatusOK(catalogClient->getCollection(opCtx, nss.ns())).value; + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "Collection " << nss.ns() << " is dropped.", + !coll.getDropped()); + + // If the collection's epoch has changed, do a full refresh + const ChunkVersion startingCollectionVersion = (sinceVersion.epoch() == coll.getEpoch()) + ? sinceVersion + : ChunkVersion(0, 0, coll.getEpoch()); + + // Diff tracker should *always* find at least one chunk if collection exists + const auto diffQuery = createConfigDiffQuery(nss, startingCollectionVersion); + + // Query the chunks which have changed + std::vector<ChunkType> changedChunks; + repl::OpTime opTime; + uassertStatusOK(Grid::get(opCtx)->catalogClient(opCtx)->getChunks( + opCtx, + diffQuery.query, + diffQuery.sort, + boost::none, + &changedChunks, + &opTime, + repl::ReadConcernLevel::kMajorityReadConcern)); + + uassert(ErrorCodes::ConflictingOperationInProgress, + "No chunks were found for the collection", + !changedChunks.empty()); + + return {coll.getEpoch(), + coll.getKeyPattern().toBSON(), + coll.getDefaultCollation(), + coll.getUnique(), + std::move(changedChunks)}; +} + +} // namespace + +ConfigServerCatalogCacheLoader::ConfigServerCatalogCacheLoader() + : _threadPool(makeDefaultThreadPoolOptions()) { + _threadPool.startup(); +} + +ConfigServerCatalogCacheLoader::~ConfigServerCatalogCacheLoader() { + _threadPool.shutdown(); + _threadPool.join(); +} + +std::shared_ptr<Notification<void>> ConfigServerCatalogCacheLoader::getChunksSince( + const NamespaceString& nss, + ChunkVersion version, + stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) { + + auto notify = std::make_shared<Notification<void>>(); + + uassertStatusOK(_threadPool.schedule([ this, nss, version, notify, callbackFn ]() noexcept { + auto opCtx = Client::getCurrent()->makeOperationContext(); + + auto swCollAndChunks = [&]() -> StatusWith<CollectionAndChangedChunks> { + try { + return getChangedChunks(opCtx.get(), nss, version); + } catch (const DBException& ex) { + return ex.toStatus(); + } + }(); + + callbackFn(opCtx.get(), std::move(swCollAndChunks)); + notify->set(); + })); + + return notify; +} + +} // namespace mongo diff --git a/src/mongo/s/config_server_catalog_cache_loader.h b/src/mongo/s/config_server_catalog_cache_loader.h new file mode 100644 index 00000000000..09450c2ca2a --- /dev/null +++ b/src/mongo/s/config_server_catalog_cache_loader.h @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/s/catalog_cache_loader.h" +#include "mongo/util/concurrency/thread_pool.h" + +namespace mongo { + +class ConfigServerCatalogCacheLoader final : public CatalogCacheLoader { +public: + ConfigServerCatalogCacheLoader(); + ~ConfigServerCatalogCacheLoader(); + + std::shared_ptr<Notification<void>> getChunksSince( + const NamespaceString& nss, + ChunkVersion version, + stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) + override; + +private: + // Thread pool to be used to perform metadata load + ThreadPool _threadPool; +}; + +} // namespace mongo |