summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-03-28 18:38:18 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-04-13 07:49:54 -0400
commit48d1e1bd6f56f3c20c018332fabafc147bfafc3b (patch)
tree09ba40a790993b93b96dc905dea378b537ddbc94 /src/mongo/s
parentc431782e66416e7dbbd9db457d9507b65c6304ba (diff)
downloadmongo-48d1e1bd6f56f3c20c018332fabafc147bfafc3b.tar.gz
SERVER-20854 Refresh collection metadata on a separate thread
(cherry picked from commit 0885a0adbb813727cc5a2083224ac0d89763c276)
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/SConscript2
-rw-r--r--src/mongo/s/catalog_cache.cpp273
-rw-r--r--src/mongo/s/catalog_cache.h27
-rw-r--r--src/mongo/s/catalog_cache_loader.h92
-rw-r--r--src/mongo/s/config_server_catalog_cache_loader.cpp169
-rw-r--r--src/mongo/s/config_server_catalog_cache_loader.h52
6 files changed, 476 insertions, 139 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 91765872709..1ae2b8dd244 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -201,6 +201,7 @@ env.Library(
'chunk.cpp',
'chunk_manager.cpp',
'cluster_identity_loader.cpp',
+ 'config_server_catalog_cache_loader.cpp',
'config_server_client.cpp',
'grid.cpp',
'shard_util.cpp',
@@ -213,6 +214,7 @@ env.Library(
'$BUILD_DIR/mongo/executor/task_executor_pool',
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_client',
'$BUILD_DIR/mongo/s/query/cluster_cursor_manager',
+ '$BUILD_DIR/mongo/util/concurrency/thread_pool',
'client/sharding_client',
'common',
],
diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp
index 28e1859f530..36d5e82ac49 100644
--- a/src/mongo/s/catalog_cache.cpp
+++ b/src/mongo/s/catalog_cache.cpp
@@ -41,6 +41,7 @@
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_database.h"
#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/config_server_catalog_cache_loader.h"
#include "mongo/s/grid.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
@@ -54,114 +55,49 @@ namespace {
const int kMaxInconsistentRoutingInfoRefreshAttempts = 3;
/**
- * Structure representing the generated query and sort order for a chunk diffing operation.
- */
-struct QueryAndSort {
- const BSONObj query;
- const BSONObj sort;
-};
-
-/**
- * Returns the query needed to find incremental changes to a collection from the config server.
- */
-QueryAndSort createConfigDiffQuery(const NamespaceString& nss, ChunkVersion collectionVersion) {
- // The query has to find all the chunks $gte the current max version. Currently, any splits and
- // merges will increment the current max version.
- BSONObjBuilder queryB;
- queryB.append(ChunkType::ns(), nss.ns());
-
- {
- BSONObjBuilder tsBuilder(queryB.subobjStart(ChunkType::DEPRECATED_lastmod()));
- tsBuilder.appendTimestamp("$gte", collectionVersion.toLong());
- tsBuilder.done();
- }
-
- // NOTE: IT IS IMPORTANT FOR CONSISTENCY THAT WE SORT BY ASC VERSION, IN ORDER TO HANDLE CURSOR
- // YIELDING BETWEEN CHUNKS BEING MIGRATED
- //
- // This ensures that changes to chunk version (which will always be higher) will always come
- // *after* our current position in the chunk cursor
-
- return QueryAndSort{queryB.obj(), BSON(ChunkType::DEPRECATED_lastmod() << 1)};
-}
-
-/**
- * Blocking method, which refreshes the routing information for the specified collection. If
- * 'existingRoutingInfo' has been specified uses this as a basis to perform an 'incremental'
- * refresh, which only fetches the chunks which changed. Otherwise does a full refresh, fetching all
- * the chunks for the collection.
- *
- * Returns the refreshed routing information if the collection is still sharded or nullptr if it is
- * not. If refresh fails for any reason, throws a DBException.
+ * Given an (optional) initial routing table and a set of changed chunks returned by the catalog
+ * cache loader, produces a new routing table with the changes applied.
*
- * With the exception of ConflictingOperationInProgress, error codes thrown from this method are
- * final in that there is nothing that can be done to remedy them other than pass the error to the
- * user.
+ * If the collection is no longer sharded returns nullptr. If the epoch has changed, expects that
+ * the 'collectionChunksList' contains the full contents of the chunks collection for that namespace
+ * so that the routing table can be built from scratch.
*
- * ConflictingOperationInProgress indicates that the chunk metadata was found to be inconsistent.
- * Since this may be transient, due to the collection being dropped or recreated, the caller must
- * retry the reload up to some configurable number of attempts.
+ * Throws ConflictingOperationInProgress if the chunk metadata was found to be inconsistent (not
+ * containing all the necessary chunks, contains overlaps or chunks' epoch values are not the same
+ * as that of the collection). Since this situation may be transient, due to the collection being
+ * dropped or recreated concurrently, the caller must retry the reload up to some configurable
+ * number of attempts.
*/
std::shared_ptr<ChunkManager> refreshCollectionRoutingInfo(
OperationContext* opCtx,
const NamespaceString& nss,
- std::shared_ptr<ChunkManager> existingRoutingInfo) {
- Timer t;
-
- const auto catalogClient = Grid::get(opCtx)->catalogClient(opCtx);
-
- // Decide whether to do a full or partial load based on the state of the collection
- auto collStatus = catalogClient->getCollection(opCtx, nss.ns());
- if (collStatus == ErrorCodes::NamespaceNotFound) {
+ std::shared_ptr<ChunkManager> existingRoutingInfo,
+ StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollectionAndChangedChunks) {
+ if (swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound) {
return nullptr;
}
- const auto coll = uassertStatusOK(std::move(collStatus)).value;
- if (coll.getDropped()) {
- return nullptr;
- }
+ const auto collectionAndChunks = uassertStatusOK(std::move(swCollectionAndChangedChunks));
+ // Check whether the collection epoch might have changed
ChunkVersion startingCollectionVersion;
ChunkMap chunkMap =
SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<std::shared_ptr<Chunk>>();
if (!existingRoutingInfo) {
// If we don't have a basis chunk manager, do a full refresh
- startingCollectionVersion = ChunkVersion(0, 0, coll.getEpoch());
- } else if (existingRoutingInfo->getVersion().epoch() != coll.getEpoch()) {
+ startingCollectionVersion = ChunkVersion(0, 0, collectionAndChunks.epoch);
+ } else if (existingRoutingInfo->getVersion().epoch() != collectionAndChunks.epoch) {
// If the collection's epoch has changed, do a full refresh
- startingCollectionVersion = ChunkVersion(0, 0, coll.getEpoch());
+ startingCollectionVersion = ChunkVersion(0, 0, collectionAndChunks.epoch);
} else {
- // Otherwise do a partial refresh
startingCollectionVersion = existingRoutingInfo->getVersion();
chunkMap = existingRoutingInfo->chunkMap();
}
- log() << "Refreshing chunks for collection " << nss << " based on version "
- << startingCollectionVersion;
-
- // Diff tracker should *always* find at least one chunk if collection exists
- const auto diffQuery = createConfigDiffQuery(nss, startingCollectionVersion);
-
- // Query the chunks which have changed
- std::vector<ChunkType> newChunks;
- repl::OpTime opTime;
- uassertStatusOK(Grid::get(opCtx)->catalogClient(opCtx)->getChunks(
- opCtx,
- diffQuery.query,
- diffQuery.sort,
- boost::none,
- &newChunks,
- &opTime,
- repl::ReadConcernLevel::kMajorityReadConcern));
-
- uassert(ErrorCodes::ConflictingOperationInProgress,
- "No chunks were found for the collection",
- !newChunks.empty());
-
ChunkVersion collectionVersion = startingCollectionVersion;
- for (const auto& chunk : newChunks) {
+ for (const auto& chunk : collectionAndChunks.changedChunks) {
const auto& chunkVersion = chunk.getVersion();
uassert(ErrorCodes::ConflictingOperationInProgress,
@@ -200,42 +136,34 @@ std::shared_ptr<ChunkManager> refreshCollectionRoutingInfo(
// sequence number to detect batch writes not making progress because of chunks moving across
// shards too frequently.
if (collectionVersion == startingCollectionVersion) {
- log() << "Refresh for collection " << nss << " took " << t.millis()
- << " ms and didn't find any metadata changes";
-
return existingRoutingInfo;
}
std::unique_ptr<CollatorInterface> defaultCollator;
- if (!coll.getDefaultCollation().isEmpty()) {
+ if (!collectionAndChunks.defaultCollation.isEmpty()) {
// The collation should have been validated upon collection creation
defaultCollator = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
- ->makeFromBSON(coll.getDefaultCollation()));
+ ->makeFromBSON(collectionAndChunks.defaultCollation));
}
- log() << "Refresh for collection " << nss << " took " << t.millis() << " ms and found version "
- << collectionVersion;
-
return stdx::make_unique<ChunkManager>(nss,
- coll.getKeyPattern(),
+ KeyPattern(collectionAndChunks.shardKeyPattern),
std::move(defaultCollator),
- coll.getUnique(),
+ collectionAndChunks.shardKeyIsUnique,
std::move(chunkMap),
collectionVersion);
}
} // namespace
-CatalogCache::CatalogCache() = default;
+CatalogCache::CatalogCache() : _cacheLoader(stdx::make_unique<ConfigServerCatalogCacheLoader>()) {}
CatalogCache::~CatalogCache() = default;
StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx,
StringData dbName) {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
-
try {
- return {CachedDatabaseInfo(_getDatabase_inlock(opCtx, dbName))};
+ return {CachedDatabaseInfo(_getDatabase(opCtx, dbName))};
} catch (const DBException& ex) {
return ex.toStatus();
}
@@ -243,18 +171,16 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx
StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo(
OperationContext* opCtx, const NamespaceString& nss) {
- int numRefreshAttempts = 0;
-
while (true) {
- stdx::unique_lock<stdx::mutex> ul(_mutex);
-
std::shared_ptr<DatabaseInfoEntry> dbEntry;
try {
- dbEntry = _getDatabase_inlock(opCtx, nss.db());
+ dbEntry = _getDatabase(opCtx, nss.db());
} catch (const DBException& ex) {
return ex.toStatus();
}
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+
auto& collections = dbEntry->collections;
auto it = collections.find(nss.ns());
@@ -275,37 +201,24 @@ StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo(
auto& collEntry = it->second;
if (collEntry.needsRefresh) {
- numRefreshAttempts++;
-
- try {
- auto newRoutingInfo =
- refreshCollectionRoutingInfo(opCtx, nss, std::move(collEntry.routingInfo));
- if (newRoutingInfo == nullptr) {
- collections.erase(it);
-
- // Loop around so we can return an "unsharded" routing info
- continue;
- }
-
- collEntry.routingInfo = std::move(newRoutingInfo);
- collEntry.needsRefresh = false;
- } catch (const DBException& ex) {
- // It is possible that the metadata is being changed concurrently, so retry the
- // refresh with a wait
- if (ex.getCode() == ErrorCodes::ConflictingOperationInProgress &&
- numRefreshAttempts < kMaxInconsistentRoutingInfoRefreshAttempts) {
- ul.unlock();
-
- log() << "Metadata refresh for " << nss << " failed and will be retried"
- << causedBy(redact(ex));
-
- // Do the sleep outside of the mutex
- sleepFor(Milliseconds(10) * numRefreshAttempts);
- continue;
- }
-
- return ex.toStatus();
+ auto refreshNotification = collEntry.refreshCompletionNotification;
+ if (!refreshNotification) {
+ refreshNotification = (collEntry.refreshCompletionNotification =
+ std::make_shared<Notification<Status>>());
+ _scheduleCollectionRefresh_inlock(
+ dbEntry, std::move(collEntry.routingInfo), nss, 1);
}
+
+ // Wait on the notification outside of the mutex
+ ul.unlock();
+
+ auto refreshStatus = refreshNotification->get(opCtx);
+ if (!refreshStatus.isOK()) {
+ return refreshStatus;
+ }
+
+ // Once the refresh is complete, loop around to get the latest value
+ continue;
}
return {CachedCollectionRoutingInfo(dbEntry->primaryShardId, collEntry.routingInfo)};
@@ -407,8 +320,10 @@ void CatalogCache::purgeAllDatabases() {
_databases.clear();
}
-std::shared_ptr<CatalogCache::DatabaseInfoEntry> CatalogCache::_getDatabase_inlock(
- OperationContext* opCtx, StringData dbName) {
+std::shared_ptr<CatalogCache::DatabaseInfoEntry> CatalogCache::_getDatabase(OperationContext* opCtx,
+ StringData dbName) {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+
auto it = _databases.find(dbName);
if (it != _databases.end()) {
return it->second;
@@ -441,6 +356,94 @@ std::shared_ptr<CatalogCache::DatabaseInfoEntry> CatalogCache::_getDatabase_inlo
dbDesc.getPrimary(), dbDesc.getSharded(), std::move(collectionEntries)});
}
+void CatalogCache::_scheduleCollectionRefresh_inlock(
+ std::shared_ptr<DatabaseInfoEntry> dbEntry,
+ std::shared_ptr<ChunkManager> existingRoutingInfo,
+ const NamespaceString& nss,
+ int refreshAttempt) {
+ Timer t;
+
+ const ChunkVersion startingCollectionVersion =
+ (existingRoutingInfo ? existingRoutingInfo->getVersion() : ChunkVersion::UNSHARDED());
+
+ const auto refreshFailed_inlock =
+ [ this, t, dbEntry, nss, refreshAttempt ](const Status& status) noexcept {
+ log() << "Refresh for collection " << nss << " took " << t.millis() << " ms and failed"
+ << causedBy(redact(status));
+
+ auto& collections = dbEntry->collections;
+ auto it = collections.find(nss.ns());
+ invariant(it != collections.end());
+ auto& collEntry = it->second;
+
+ // It is possible that the metadata is being changed concurrently, so retry the
+ // refresh again
+ if (status == ErrorCodes::ConflictingOperationInProgress &&
+ refreshAttempt < kMaxInconsistentRoutingInfoRefreshAttempts) {
+ _scheduleCollectionRefresh_inlock(dbEntry, nullptr, nss, refreshAttempt + 1);
+ } else {
+ // Leave needsRefresh to true so that any subsequent get attempts will kick off
+ // another round of refresh
+ collEntry.refreshCompletionNotification->set(status);
+ collEntry.refreshCompletionNotification = nullptr;
+ }
+ };
+
+ const auto refreshCallback =
+ [ this, t, dbEntry, nss, existingRoutingInfo, refreshFailed_inlock ](
+ OperationContext * opCtx,
+ StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
+ std::shared_ptr<ChunkManager> newRoutingInfo;
+ try {
+ newRoutingInfo = refreshCollectionRoutingInfo(
+ opCtx, nss, std::move(existingRoutingInfo), std::move(swCollAndChunks));
+ } catch (const DBException& ex) {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ refreshFailed_inlock(ex.toStatus());
+ return;
+ }
+
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ auto& collections = dbEntry->collections;
+ auto it = collections.find(nss.ns());
+ invariant(it != collections.end());
+ auto& collEntry = it->second;
+
+ collEntry.needsRefresh = false;
+ collEntry.refreshCompletionNotification->set(Status::OK());
+ collEntry.refreshCompletionNotification = nullptr;
+
+ if (!newRoutingInfo) {
+ log() << "Refresh for collection " << nss << " took " << t.millis()
+ << " and found the collection is not sharded";
+
+ collections.erase(it);
+ } else {
+ log() << "Refresh for collection " << nss << " took " << t.millis()
+ << " ms and found version " << newRoutingInfo->getVersion();
+
+ collEntry.routingInfo = std::move(newRoutingInfo);
+ }
+ };
+
+ log() << "Refreshing chunks for collection " << nss << " based on version "
+ << startingCollectionVersion;
+
+ try {
+ _cacheLoader->getChunksSince(nss, startingCollectionVersion, refreshCallback);
+ } catch (const DBException& ex) {
+ const auto status = ex.toStatus();
+
+ // ConflictingOperationInProgress errors trigger retry of the catalog cache reload logic. If
+ // we failed to schedule the asynchronous reload, there is no point in doing another
+ // attempt.
+ invariant(status != ErrorCodes::ConflictingOperationInProgress);
+
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ refreshFailed_inlock(status);
+ }
+}
+
CachedDatabaseInfo::CachedDatabaseInfo(std::shared_ptr<CatalogCache::DatabaseInfoEntry> db)
: _db(std::move(db)) {}
diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h
index e76188303a9..5631c6fa00b 100644
--- a/src/mongo/s/catalog_cache.h
+++ b/src/mongo/s/catalog_cache.h
@@ -30,6 +30,7 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/base/string_data.h"
+#include "mongo/s/catalog_cache_loader.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/client/shard.h"
@@ -121,9 +122,16 @@ private:
* Cache entry describing a collection.
*/
struct CollectionRoutingInfoEntry {
- std::shared_ptr<ChunkManager> routingInfo;
-
+ // Specifies whether this cache entry needs a refresh (in which case routingInfo should not
+ // be relied on) or it doesn't, in which case there should be a non-null routingInfo.
bool needsRefresh{true};
+
+ // Contains a notification to be waited on for the refresh to complete (only available if
+ // needsRefresh is true)
+ std::shared_ptr<Notification<Status>> refreshCompletionNotification;
+
+ // Contains the cached routing information (only available if needsRefresh is false)
+ std::shared_ptr<ChunkManager> routingInfo;
};
/**
@@ -143,8 +151,19 @@ private:
* Ensures that the specified database is in the cache, loading it if necessary. If the database
* was not in cache, all the sharded collections will be in the 'needsRefresh' state.
*/
- std::shared_ptr<DatabaseInfoEntry> _getDatabase_inlock(OperationContext* opCtx,
- StringData dbName);
+ std::shared_ptr<DatabaseInfoEntry> _getDatabase(OperationContext* opCtx, StringData dbName);
+
+ /**
+ * Non-blocking call which schedules an asynchronous refresh for the specified namespace. The
+ * namespace must be in the 'needRefresh' state.
+ */
+ void _scheduleCollectionRefresh_inlock(std::shared_ptr<DatabaseInfoEntry> dbEntry,
+ std::shared_ptr<ChunkManager> existingRoutingInfo,
+ const NamespaceString& nss,
+ int refreshAttempt);
+
+ // Interface from which chunks will be retrieved
+ const std::unique_ptr<CatalogCacheLoader> _cacheLoader;
// Mutex to serialize access to the structures below
stdx::mutex _mutex;
diff --git a/src/mongo/s/catalog_cache_loader.h b/src/mongo/s/catalog_cache_loader.h
new file mode 100644
index 00000000000..8ffdb4a94e2
--- /dev/null
+++ b/src/mongo/s/catalog_cache_loader.h
@@ -0,0 +1,92 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status_with.h"
+#include "mongo/base/string_data.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/catalog/type_collection.h"
+#include "mongo/s/chunk_version.h"
+#include "mongo/stdx/functional.h"
+#include "mongo/util/concurrency/notification.h"
+
+namespace mongo {
+
+class NamespaceString;
+class OperationContext;
+
+/**
+ * Interface through which the sharding catalog cache requests the set of changed chunks to be
+ * retrieved from the persisted metadata store.
+ */
+class CatalogCacheLoader {
+public:
+ virtual ~CatalogCacheLoader() = default;
+
+ /**
+ * Used as a return value for getChunksSince.
+ */
+ struct CollectionAndChangedChunks {
+ // Information about the entire collection
+ OID epoch;
+ BSONObj shardKeyPattern;
+ BSONObj defaultCollation;
+ bool shardKeyIsUnique;
+
+ // The chunks which have changed sorted by their chunkVersion. This list might potentially
+ // contain all the chunks in the collection.
+ std::vector<ChunkType> changedChunks;
+ };
+
+ /**
+ * Non-blocking call, which requests the chunks changed since the specified version to be
+ * fetched from the persistent matadata store and invokes the callback function with the result.
+ * The callback function must never throw - it is a fatal error to do so.
+ *
+ * If for some reason the asynchronous fetch operation cannot be dispatched (for example on
+ * shutdown), throws a DBException. Otherwise it is guaranteed that the callback function will
+ * be invoked even on error and the returned notification will be signalled.
+ *
+ * The callbackFn object must not be destroyed until it has been called. The returned
+ * Notification object can be waited on in order to ensure that.
+ */
+ virtual std::shared_ptr<Notification<void>> getChunksSince(
+ const NamespaceString& nss,
+ ChunkVersion version,
+ stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)>
+ callbackFn) = 0;
+
+protected:
+ CatalogCacheLoader() = default;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/config_server_catalog_cache_loader.cpp b/src/mongo/s/config_server_catalog_cache_loader.cpp
new file mode 100644
index 00000000000..b26df2d73ae
--- /dev/null
+++ b/src/mongo/s/config_server_catalog_cache_loader.cpp
@@ -0,0 +1,169 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/config_server_catalog_cache_loader.h"
+
+#include "mongo/db/client.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/grid.h"
+#include "mongo/stdx/memory.h"
+
+namespace mongo {
+
+using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunks;
+
+namespace {
+
+/**
+ * Constructs the default options for the thread pool used by the cache loader.
+ */
+ThreadPool::Options makeDefaultThreadPoolOptions() {
+ ThreadPool::Options options;
+ options.poolName = "CatalogCacheLoader";
+ options.minThreads = 0;
+ options.maxThreads = 6;
+
+ // Ensure all threads have a client
+ options.onCreateThread = [](const std::string& threadName) {
+ Client::initThread(threadName.c_str());
+ };
+
+ return options;
+}
+
+/**
+ * Structure repsenting the generated query and sort order for a chunk diffing operation.
+ */
+struct QueryAndSort {
+ const BSONObj query;
+ const BSONObj sort;
+};
+
+/**
+ * Returns the query needed to find incremental changes to a collection from the config server.
+ *
+ * The query has to find all the chunks $gte the current max version. Currently, any splits and
+ * merges will increment the current max version.
+ *
+ * The sort needs to be by ascending version in order to pick up the chunks which changed most
+ * recent and also in order to handle cursor yields between chunks being migrated/split/merged. This
+ * ensures that changes to chunk version (which will always be higher) will always come *after* our
+ * current position in the chunk cursor.
+ */
+QueryAndSort createConfigDiffQuery(const NamespaceString& nss, ChunkVersion collectionVersion) {
+ return {BSON(ChunkType::ns() << nss.ns() << ChunkType::DEPRECATED_lastmod() << GTE
+ << Timestamp(collectionVersion.toLong())),
+ BSON(ChunkType::DEPRECATED_lastmod() << 1)};
+}
+
+/**
+ * Blocking method, which returns the chunks which changed since the specified version.
+ */
+CollectionAndChangedChunks getChangedChunks(OperationContext* opCtx,
+ const NamespaceString& nss,
+ ChunkVersion sinceVersion) {
+ const auto catalogClient = Grid::get(opCtx)->catalogClient(opCtx);
+
+ // Decide whether to do a full or partial load based on the state of the collection
+ const auto coll = uassertStatusOK(catalogClient->getCollection(opCtx, nss.ns())).value;
+ uassert(ErrorCodes::NamespaceNotFound,
+ str::stream() << "Collection " << nss.ns() << " is dropped.",
+ !coll.getDropped());
+
+ // If the collection's epoch has changed, do a full refresh
+ const ChunkVersion startingCollectionVersion = (sinceVersion.epoch() == coll.getEpoch())
+ ? sinceVersion
+ : ChunkVersion(0, 0, coll.getEpoch());
+
+ // Diff tracker should *always* find at least one chunk if collection exists
+ const auto diffQuery = createConfigDiffQuery(nss, startingCollectionVersion);
+
+ // Query the chunks which have changed
+ std::vector<ChunkType> changedChunks;
+ repl::OpTime opTime;
+ uassertStatusOK(Grid::get(opCtx)->catalogClient(opCtx)->getChunks(
+ opCtx,
+ diffQuery.query,
+ diffQuery.sort,
+ boost::none,
+ &changedChunks,
+ &opTime,
+ repl::ReadConcernLevel::kMajorityReadConcern));
+
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ "No chunks were found for the collection",
+ !changedChunks.empty());
+
+ return {coll.getEpoch(),
+ coll.getKeyPattern().toBSON(),
+ coll.getDefaultCollation(),
+ coll.getUnique(),
+ std::move(changedChunks)};
+}
+
+} // namespace
+
+ConfigServerCatalogCacheLoader::ConfigServerCatalogCacheLoader()
+ : _threadPool(makeDefaultThreadPoolOptions()) {
+ _threadPool.startup();
+}
+
+ConfigServerCatalogCacheLoader::~ConfigServerCatalogCacheLoader() {
+ _threadPool.shutdown();
+ _threadPool.join();
+}
+
+std::shared_ptr<Notification<void>> ConfigServerCatalogCacheLoader::getChunksSince(
+ const NamespaceString& nss,
+ ChunkVersion version,
+ stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) {
+
+ auto notify = std::make_shared<Notification<void>>();
+
+ uassertStatusOK(_threadPool.schedule([ this, nss, version, notify, callbackFn ]() noexcept {
+ auto opCtx = Client::getCurrent()->makeOperationContext();
+
+ auto swCollAndChunks = [&]() -> StatusWith<CollectionAndChangedChunks> {
+ try {
+ return getChangedChunks(opCtx.get(), nss, version);
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+ }();
+
+ callbackFn(opCtx.get(), std::move(swCollAndChunks));
+ notify->set();
+ }));
+
+ return notify;
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/config_server_catalog_cache_loader.h b/src/mongo/s/config_server_catalog_cache_loader.h
new file mode 100644
index 00000000000..09450c2ca2a
--- /dev/null
+++ b/src/mongo/s/config_server_catalog_cache_loader.h
@@ -0,0 +1,52 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/s/catalog_cache_loader.h"
+#include "mongo/util/concurrency/thread_pool.h"
+
+namespace mongo {
+
+class ConfigServerCatalogCacheLoader final : public CatalogCacheLoader {
+public:
+ ConfigServerCatalogCacheLoader();
+ ~ConfigServerCatalogCacheLoader();
+
+ std::shared_ptr<Notification<void>> getChunksSince(
+ const NamespaceString& nss,
+ ChunkVersion version,
+ stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn)
+ override;
+
+private:
+ // Thread pool to be used to perform metadata load
+ ThreadPool _threadPool;
+};
+
+} // namespace mongo