summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/SConscript2
-rw-r--r--src/mongo/db/s/SConscript22
-rw-r--r--src/mongo/db/s/shard_metadata_util.cpp54
-rw-r--r--src/mongo/db/s/shard_metadata_util.h41
-rw-r--r--src/mongo/db/s/shard_metadata_util_test.cpp31
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.cpp630
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.h245
-rw-r--r--src/mongo/db/s/sharding_initialization_mongod.cpp9
-rw-r--r--src/mongo/s/catalog/type_chunk.cpp4
-rw-r--r--src/mongo/s/catalog/type_chunk.h2
-rw-r--r--src/mongo/s/catalog/type_shard_collection.h14
-rw-r--r--src/mongo/s/catalog_cache.cpp5
-rw-r--r--src/mongo/s/catalog_cache.h8
-rw-r--r--src/mongo/s/catalog_cache_loader.h2
-rw-r--r--src/mongo/s/config_server_catalog_cache_loader.cpp2
-rw-r--r--src/mongo/s/config_server_test_fixture.cpp11
-rw-r--r--src/mongo/s/config_server_test_fixture.h5
-rw-r--r--src/mongo/s/server.cpp2
-rw-r--r--src/mongo/s/shard_server_test_fixture.cpp13
-rw-r--r--src/mongo/s/shard_server_test_fixture.h8
-rw-r--r--src/mongo/s/sharding_initialization.cpp3
-rw-r--r--src/mongo/s/sharding_initialization.h3
-rw-r--r--src/mongo/s/sharding_mongod_test_fixture.cpp12
-rw-r--r--src/mongo/s/sharding_mongod_test_fixture.h9
24 files changed, 1050 insertions, 87 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 9f753761487..e99e43414b8 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -863,7 +863,7 @@ env.Library(
"run_commands",
"rw_concern_d",
"s/commands",
- "s/metadata",
+ "s/collection_metadata",
"s/sharding",
"service_context_d",
"startup_warnings_mongod",
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 840991cc59a..beeb2957f33 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -5,17 +5,16 @@ Import("env")
env = env.Clone()
env.Library(
- target='metadata',
+ target='collection_metadata',
source=[
'collection_metadata.cpp',
],
LIBDEPS=[
- 'shard_metadata_util',
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/common',
'$BUILD_DIR/mongo/db/range_arithmetic',
- '$BUILD_DIR/mongo/db/repl/repl_coordinator_impl',
'$BUILD_DIR/mongo/db/service_context',
+ '$BUILD_DIR/mongo/s/common',
],
)
@@ -85,6 +84,7 @@ env.Library(
'move_timing_helper.cpp',
'operation_sharding_state.cpp',
'shard_identity_rollback_notifier.cpp',
+ 'shard_server_catalog_cache_loader.cpp',
'sharded_connection_info.cpp',
'sharding_egress_metadata_hook_for_mongod.cpp',
'sharding_initialization_mongod.cpp',
@@ -106,8 +106,9 @@ env.Library(
'$BUILD_DIR/mongo/s/is_mongos',
'$BUILD_DIR/mongo/s/sharding_initialization',
'$BUILD_DIR/mongo/util/elapsed_tracker',
- 'metadata',
+ 'collection_metadata',
'migration_types',
+ 'shard_metadata_util',
'sharding_task_executor',
'type_shard_identity',
#'$BUILD_DIR/mongo/db/dbhelpers', # CYCLE
@@ -198,7 +199,7 @@ env.Library(
'$BUILD_DIR/mongo/db/index_d',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_global',
'balancer',
- 'metadata',
+ 'collection_metadata',
'sharding',
],
)
@@ -227,7 +228,6 @@ env.CppUnitTest(
env.CppUnitTest(
target='shard_test',
source=[
- 'shard_metadata_util_test.cpp',
'active_migrations_registry_test.cpp',
'migration_chunk_cloner_source_legacy_test.cpp',
'sharding_state_test.cpp',
@@ -265,3 +265,13 @@ env.CppUnitTest(
],
)
+env.CppUnitTest(
+ target='shard_metadata_util_test',
+ source=[
+ 'shard_metadata_util_test.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/s/shard_server_test_fixture',
+ 'shard_metadata_util',
+ ],
+)
diff --git a/src/mongo/db/s/shard_metadata_util.cpp b/src/mongo/db/s/shard_metadata_util.cpp
index 45c34444de6..c0efea7fc4e 100644
--- a/src/mongo/db/s/shard_metadata_util.cpp
+++ b/src/mongo/db/s/shard_metadata_util.cpp
@@ -50,36 +50,14 @@ const WriteConcernOptions kLocalWriteConcern(1,
WriteConcernOptions::SyncMode::UNSET,
Milliseconds(0));
-/**
- * Structure representing the generated query and sort order for a chunk diffing operation.
- */
-struct QueryAndSort {
- const BSONObj query;
- const BSONObj sort;
-};
+} // namespace
-/**
- * Returns the query needed to find incremental changes to the chunks collection on a shard server.
- *
- * The query has to find all the chunks $gte the current max version. Currently, any splits, merges
- * and moves will increment the current max version. Querying by lastmod is essential because we
- * want to use the {lastmod} index on the chunks collection. This makes potential cursor yields to
- * apply split/merge/move updates safe: updates always move or insert documents at the end of the
- * index (because the document updates always have higher lastmod), so changed always come *after*
- * our current cursor position and are seen when the cursor recommences.
- *
- * The sort must be by ascending version so that the updates can be applied in-memory in order. This
- * is important because it is possible for a cursor to read updates to the same _id document twice,
- * due to the yield described above. If updates are applied in ascending version order, the later
- * update is applied last and remains.
- */
QueryAndSort createShardChunkDiffQuery(const ChunkVersion& collectionVersion) {
- return {BSON(ChunkType::DEPRECATED_lastmod() << GTE << Timestamp(collectionVersion.toLong())),
+ return {BSON(ChunkType::DEPRECATED_lastmod()
+ << BSON("$gte" << Timestamp(collectionVersion.toLong()))),
BSON(ChunkType::DEPRECATED_lastmod() << 1)};
}
-} // namespace
-
bool RefreshState::operator==(RefreshState& other) {
return (other.epoch == epoch) && (other.refreshing == refreshing) &&
(other.sequenceNumber == sequenceNumber);
@@ -123,8 +101,9 @@ StatusWith<RefreshState> getPersistedRefreshFlags(OperationContext* opCtx,
StatusWith<ShardCollectionType> readShardCollectionsEntry(OperationContext* opCtx,
const NamespaceString& nss) {
+
Query fullQuery(BSON(ShardCollectionType::uuid() << nss.ns()));
- fullQuery.readPref(ReadPreference::SecondaryOnly, BSONArray());
+
try {
DBDirectClient client(opCtx);
std::unique_ptr<DBClientCursor> cursor =
@@ -214,18 +193,20 @@ Status updateShardCollectionsEntry(OperationContext* opCtx,
StatusWith<std::vector<ChunkType>> readShardChunks(OperationContext* opCtx,
const NamespaceString& nss,
- const ChunkVersion& collectionVersion) {
+ const BSONObj& query,
+ const BSONObj& sort,
+ boost::optional<long long> limit,
+ const OID& epoch) {
// Query to retrieve the chunks.
- QueryAndSort diffQuery = createShardChunkDiffQuery(collectionVersion);
- Query fullQuery(diffQuery.query);
- fullQuery.sort(diffQuery.sort);
- fullQuery.readPref(ReadPreference::SecondaryOnly, BSONArray());
+ Query fullQuery(query);
+ fullQuery.sort(sort);
try {
DBDirectClient client(opCtx);
std::string chunkMetadataNs = ChunkType::ShardNSPrefix + nss.ns();
- std::unique_ptr<DBClientCursor> cursor = client.query(chunkMetadataNs, fullQuery, 0LL);
+ std::unique_ptr<DBClientCursor> cursor =
+ client.query(chunkMetadataNs, fullQuery, limit.get_value_or(0));
if (!cursor) {
return {ErrorCodes::OperationFailed,
@@ -236,7 +217,7 @@ StatusWith<std::vector<ChunkType>> readShardChunks(OperationContext* opCtx,
std::vector<ChunkType> chunks;
while (cursor->more()) {
BSONObj document = cursor->nextSafe().getOwned();
- auto statusWithChunk = ChunkType::fromShardBSON(document, collectionVersion.epoch());
+ auto statusWithChunk = ChunkType::fromShardBSON(document, epoch);
if (!statusWithChunk.isOK()) {
return {statusWithChunk.getStatus().code(),
str::stream() << "Failed to parse chunk '" << document.toString()
@@ -288,13 +269,6 @@ Status updateShardChunks(OperationContext* opCtx,
for (auto& chunk : chunks) {
// Check for a different epoch.
if (!chunk.getVersion().hasEqualEpoch(currEpoch)) {
- // This means the collection was dropped and recreated. Drop the chunk metadata
- // and return.
- Status status = dropChunksAndDeleteCollectionsEntry(opCtx, nss);
- if (!status.isOK()) {
- return status;
- }
-
return Status{ErrorCodes::ConflictingOperationInProgress,
str::stream() << "Invalid chunks found when reloading '"
<< nss.toString()
diff --git a/src/mongo/db/s/shard_metadata_util.h b/src/mongo/db/s/shard_metadata_util.h
index 3c5a684ff80..c29029df094 100644
--- a/src/mongo/db/s/shard_metadata_util.h
+++ b/src/mongo/db/s/shard_metadata_util.h
@@ -32,11 +32,11 @@
#include <vector>
#include "mongo/base/status.h"
+#include "mongo/bson/bsonobj.h"
#include "mongo/bson/oid.h"
namespace mongo {
-class BSONObj;
struct ChunkVersion;
class ChunkType;
class CollectionMetadata;
@@ -52,6 +52,14 @@ class StatusWith;
namespace shardmetadatautil {
/**
+ * Structure representing the generated query and sort order for a chunk diffing operation.
+ */
+struct QueryAndSort {
+ const BSONObj query;
+ const BSONObj sort;
+};
+
+/**
* Subset of the shard's collections collection related to refresh state.
*/
struct RefreshState {
@@ -63,6 +71,23 @@ struct RefreshState {
};
/**
+ * Returns the query needed to find incremental changes to the chunks collection on a shard server.
+ *
+ * The query has to find all the chunks $gte the current max version. Currently, any splits, merges
+ * and moves will increment the current max version. Querying by lastmod is essential because we
+ * want to use the {lastmod} index on the chunks collection. This makes potential cursor yields to
+ * apply split/merge/move updates safe: updates always move or insert documents at the end of the
+ * index (because the document updates always have higher lastmod), so changed always come *after*
+ * our current cursor position and are seen when the cursor recommences.
+ *
+ * The sort must be by ascending version so that the updates can be applied in-memory in order. This
+ * is important because it is possible for a cursor to read updates to the same _id document twice,
+ * due to the yield described above. If updates are applied in ascending version order, the newer
+ * update is applied last.
+ */
+QueryAndSort createShardChunkDiffQuery(const ChunkVersion& collectionVersion);
+
+/**
* Writes a persisted signal to indicate that the chunks collection is being updated. It is
* essential to call this before updating the chunks collection for 'nss' so that secondaries do not
* use incomplete metadata.
@@ -118,18 +143,22 @@ Status updateShardCollectionsEntry(OperationContext* opCtx,
const bool upsert);
/**
- * Reads the shard server's chunks collection corresponding to 'nss' for chunks with lastmod $gte
- * 'collectionVersion'.
+ * Reads the shard server's chunks collection corresponding to 'nss' for chunks matching 'query',
+ * returning at most 'limit' chunks in 'sort' order. 'epoch' populates the returned chunks' version
+ * fields, because we do not yet have UUIDs to replace epoches nor UUIDs associated with namespaces.
*/
StatusWith<std::vector<ChunkType>> readShardChunks(OperationContext* opCtx,
const NamespaceString& nss,
- const ChunkVersion& collectionVersion);
+ const BSONObj& query,
+ const BSONObj& sort,
+ boost::optional<long long> limit,
+ const OID& epoch);
/**
* Takes a vector of 'chunks' and updates the shard's chunks collection for 'nss'. Any chunk
* documents in config.chunks.ns that overlap with a chunk in 'chunks' is removed as the updated
- * chunk document is inserted. If the epoch of any chunk in 'chunks' does not match 'currEpoch',
- * the chunk metadata is dropped and a ConflictingOperationInProgress error is returned.
+ * chunk document is inserted. If the epoch of a chunk in 'chunks' does not match 'currEpoch',
+ * a ConflictingOperationInProgress error is returned and no more updates are applied.
*
* Note: two threads running this function in parallel for the same collection can corrupt the
* collection data!
diff --git a/src/mongo/db/s/shard_metadata_util_test.cpp b/src/mongo/db/s/shard_metadata_util_test.cpp
index f21b3109a64..cae6de39283 100644
--- a/src/mongo/db/s/shard_metadata_util_test.cpp
+++ b/src/mongo/db/s/shard_metadata_util_test.cpp
@@ -248,8 +248,14 @@ TEST_F(ShardMetadataUtilTest, WriteAndReadChunks) {
checkChunks(kChunkMetadataNss, chunks);
// read all the chunks
- std::vector<ChunkType> readChunks = assertGet(readShardChunks(
- operationContext(), kNss, ChunkVersion(0, 0, getCollectionVersion().epoch())));
+ QueryAndSort allChunkDiff =
+ createShardChunkDiffQuery(ChunkVersion(0, 0, getCollectionVersion().epoch()));
+ std::vector<ChunkType> readChunks = assertGet(readShardChunks(operationContext(),
+ kNss,
+ allChunkDiff.query,
+ allChunkDiff.sort,
+ boost::none,
+ getCollectionVersion().epoch()));
for (auto chunkIt = chunks.begin(), readChunkIt = readChunks.begin();
chunkIt != chunks.end() && readChunkIt != readChunks.end();
++chunkIt, ++readChunkIt) {
@@ -257,29 +263,32 @@ TEST_F(ShardMetadataUtilTest, WriteAndReadChunks) {
}
// read only the highest version chunk
- readChunks = assertGet(readShardChunks(operationContext(), kNss, getCollectionVersion()));
+ QueryAndSort oneChunkDiff = createShardChunkDiffQuery(getCollectionVersion());
+ readChunks = assertGet(readShardChunks(operationContext(),
+ kNss,
+ oneChunkDiff.query,
+ oneChunkDiff.sort,
+ boost::none,
+ getCollectionVersion().epoch()));
ASSERT_TRUE(readChunks.size() == 1);
ASSERT_BSONOBJ_EQ(chunks.back().toShardBSON(), readChunks.front().toShardBSON());
}
-TEST_F(ShardMetadataUtilTest, UpdatingChunksFindsNewEpochAndClearsMetadata) {
- // Set up a collections document so we can make sure it's deleted correctly.
- setUpCollection();
-
+TEST_F(ShardMetadataUtilTest, UpdatingChunksFindsNewEpoch) {
std::vector<ChunkType> chunks = makeFourChunks();
ASSERT_OK(updateShardChunks(operationContext(), kNss, chunks, getCollectionVersion().epoch()));
checkChunks(kChunkMetadataNss, chunks);
+ ChunkVersion originalChunkVersion = chunks.back().getVersion();
chunks.back().setVersion(ChunkVersion(1, 0, OID::gen()));
ASSERT_EQUALS(
updateShardChunks(operationContext(), kNss, chunks, getCollectionVersion().epoch()).code(),
ErrorCodes::ConflictingOperationInProgress);
- // Finding a new epoch should have caused the metadata to be cleared for that namespace.
- checkCollectionIsEmpty(kChunkMetadataNss);
- // Collections collection should be empty because it only had one entry.
- checkCollectionIsEmpty(NamespaceString(ShardCollectionType::ConfigNS));
+ // Check that the chunk with a different epoch did not get written.
+ chunks.back().setVersion(std::move(originalChunkVersion));
+ checkChunks(kChunkMetadataNss, chunks);
}
TEST_F(ShardMetadataUtilTest, UpdateWithWriteNewChunks) {
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
new file mode 100644
index 00000000000..368eb23abec
--- /dev/null
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
@@ -0,0 +1,630 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/shard_server_catalog_cache_loader.h"
+
+#include "mongo/db/client.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/s/shard_metadata_util.h"
+#include "mongo/s/catalog/type_shard_collection.h"
+#include "mongo/s/config_server_catalog_cache_loader.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunks;
+using namespace shardmetadatautil;
+
+namespace {
+
+/**
+ * Constructs the options for the loader thread pool.
+ */
+ThreadPool::Options makeDefaultThreadPoolOptions() {
+ ThreadPool::Options options;
+ options.poolName = "ShardServerCatalogCacheLoader";
+ options.minThreads = 0;
+ options.maxThreads = 6;
+
+ // Ensure all threads have a client.
+ options.onCreateThread = [](const std::string& threadName) {
+ Client::initThread(threadName.c_str());
+ };
+
+ return options;
+}
+
+/**
+ * Takes a CollectionAndChangedChunks object and persists the changes to the shard's metadata
+ * collections.
+ *
+ * Returns ConflictingOperationInProgress if a chunk is found with a new epoch.
+ */
+Status persistCollectionAndChangedChunks(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const CollectionAndChangedChunks& collAndChunks) {
+ // Update the collections collection entry for 'nss' in case there are any new updates.
+ ShardCollectionType update = ShardCollectionType(nss,
+ nss,
+ collAndChunks.epoch,
+ collAndChunks.shardKeyPattern,
+ collAndChunks.defaultCollation,
+ collAndChunks.shardKeyIsUnique);
+ Status status = updateShardCollectionsEntry(opCtx,
+ BSON(ShardCollectionType::uuid() << nss.ns()),
+ update.toBSON(),
+ BSONObj(),
+ true /*upsert*/);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ // Mark the chunk metadata as refreshing, so that secondaries are aware of refresh.
+ status = setPersistedRefreshFlags(opCtx, nss);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ // Update the chunks.
+ status = updateShardChunks(opCtx, nss, collAndChunks.changedChunks, collAndChunks.epoch);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ // Mark the chunk metadata as done refreshing.
+ status = unsetPersistedRefreshFlags(opCtx, nss);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ return Status::OK();
+}
+
+/**
+ * Retrieves the persisted max chunk version for 'nss', if there are any persisted chunks. If there
+ * are none -- meaning there's no persisted metadata for 'nss' --, returns a
+ * ChunkVersion::UNSHARDED() version.
+ *
+ * It is unsafe to call this when a task for 'nss' is running concurrently.
+ */
+ChunkVersion getPersistedMaxVersion(OperationContext* opCtx, const NamespaceString& nss) {
+ // Must read the collections entry to get the epoch to pass into ChunkType for shard's chunk
+ // collection.
+ auto statusWithCollection = readShardCollectionsEntry(opCtx, nss);
+ if (statusWithCollection == ErrorCodes::NamespaceNotFound) {
+ // There is no persisted metadata.
+ return ChunkVersion::UNSHARDED();
+ }
+ uassert(ErrorCodes::OperationFailed,
+ str::stream() << "Failed to read persisted collections entry for collection '"
+ << nss.ns()
+ << "' due to '"
+ << statusWithCollection.getStatus().toString()
+ << "'.",
+ statusWithCollection.isOK());
+
+ auto statusWithChunk =
+ shardmetadatautil::readShardChunks(opCtx,
+ nss,
+ BSONObj(),
+ BSON(ChunkType::DEPRECATED_lastmod() << -1),
+ 1LL,
+ statusWithCollection.getValue().getEpoch());
+ uassert(ErrorCodes::OperationFailed,
+ str::stream() << "Failed to read highest version persisted chunk for collection '"
+ << nss.ns()
+ << "' due to '"
+ << statusWithChunk.getStatus().toString()
+ << "'.",
+ statusWithChunk.isOK());
+
+ return statusWithChunk.getValue().empty() ? ChunkVersion::UNSHARDED()
+ : statusWithChunk.getValue().front().getVersion();
+}
+
+/**
+ * Tries to find persisted chunk metadata with chunk versions GTE to 'version'. Should always
+ * return metadata if the collection exists.
+ *
+ * If 'version's epoch matches persisted metadata, returns GTE persisted metadata.
+ * If 'version's epoch doesn't match persisted metadata, returns all persisted metadata.
+ * If nothing there is no persisted metadata, returns an empty CollectionAndChangedChunks object.
+ */
+StatusWith<CollectionAndChangedChunks> getPersistedMetadataSinceVersion(OperationContext* opCtx,
+ const NamespaceString& nss,
+ ChunkVersion version) {
+ auto swShardCollectionEntry = readShardCollectionsEntry(opCtx, nss);
+ if (swShardCollectionEntry == ErrorCodes::NamespaceNotFound) {
+ // If there is no metadata, collection does not exist. Return empty results.
+ return CollectionAndChangedChunks();
+ } else if (!swShardCollectionEntry.isOK()) {
+ return StatusWith<CollectionAndChangedChunks>(
+ ErrorCodes::OperationFailed,
+ str::stream() << "Failed to load local collection metadata due to '"
+ << swShardCollectionEntry.getStatus().toString()
+ << "'.");
+ }
+ auto shardCollectionEntry = std::move(swShardCollectionEntry.getValue());
+
+ // If the persisted epoch doesn't match what the CatalogCache requested, read everything.
+ ChunkVersion startingVersion;
+ if (shardCollectionEntry.getEpoch() != version.epoch()) {
+ startingVersion = ChunkVersion(0, 0, shardCollectionEntry.getEpoch());
+ } else {
+ startingVersion = version;
+ }
+
+ QueryAndSort diff = createShardChunkDiffQuery(startingVersion);
+
+ auto swChangedChunks =
+ readShardChunks(opCtx, nss, diff.query, diff.sort, boost::none, startingVersion.epoch());
+ if (!swChangedChunks.isOK()) {
+ return StatusWith<CollectionAndChangedChunks>(
+ ErrorCodes::OperationFailed,
+ str::stream() << "Failed to load local collection metadata due to '"
+ << swChangedChunks.getStatus().toString()
+ << "'.");
+ } else if (swChangedChunks.getValue().empty()) {
+ // No chunks were found, collection was dropped. Return empty results.
+ return CollectionAndChangedChunks();
+ }
+
+ // Make sure the collections entry epoch has not changed. Otherwise an epoch changing update was
+ // applied after we originally read the entry and the chunks may not match the original epoch.
+
+ auto swAfterShardCollectionEntry = readShardCollectionsEntry(opCtx, nss);
+ if (swAfterShardCollectionEntry == ErrorCodes::NamespaceNotFound) {
+ // The collection has been dropped since we began loading, return empty results.
+ return CollectionAndChangedChunks();
+ } else if (!swAfterShardCollectionEntry.isOK()) {
+ return StatusWith<CollectionAndChangedChunks>(
+ ErrorCodes::OperationFailed,
+ str::stream() << "Failed to reload local collection metadata due to '"
+ << swAfterShardCollectionEntry.getStatus().toString()
+ << "'.");
+ }
+
+ if (shardCollectionEntry.getEpoch() != swAfterShardCollectionEntry.getValue().getEpoch()) {
+ // The collection was dropped and recreated since we began. Return empty results.
+ return CollectionAndChangedChunks();
+ }
+
+ return CollectionAndChangedChunks{shardCollectionEntry.getEpoch(),
+ shardCollectionEntry.getKeyPattern().toBSON(),
+ shardCollectionEntry.getDefaultCollation(),
+ shardCollectionEntry.getUnique(),
+ std::move(swChangedChunks.getValue())};
+}
+
+} // namespace
+
+ShardServerCatalogCacheLoader::ShardServerCatalogCacheLoader(
+ std::unique_ptr<CatalogCacheLoader> configLoader)
+ : _configServerLoader(std::move(configLoader)), _threadPool(makeDefaultThreadPoolOptions()) {
+ _threadPool.startup();
+}
+
+ShardServerCatalogCacheLoader::~ShardServerCatalogCacheLoader() {
+ _threadPool.shutdown();
+ _threadPool.join();
+}
+
+
+std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSince(
+ const NamespaceString& nss,
+ ChunkVersion version,
+ stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) {
+
+ // TODO: plug in secondary machinery, with onStepDown and onBecomePrimary tasks: clear TaskLists
+ // and thread pool
+
+ auto notify = std::make_shared<Notification<void>>();
+
+ uassertStatusOK(_threadPool.schedule([ this, nss, version, callbackFn, notify ]() noexcept {
+ auto opCtx = Client::getCurrent()->makeOperationContext();
+ try {
+ _schedulePrimayGetChunksSince(opCtx.get(), nss, version, callbackFn, notify);
+ } catch (const DBException& ex) {
+ callbackFn(opCtx.get(), ex.toStatus());
+ notify->set();
+ }
+ }));
+
+ return notify;
+}
+
+void ShardServerCatalogCacheLoader::_schedulePrimayGetChunksSince(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkVersion& catalogCacheSinceVersion,
+ stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn,
+ std::shared_ptr<Notification<void>> notify) {
+
+ // Get the max version the loader has.
+ const ChunkVersion maxLoaderVersion = [&] {
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ auto taskListIt = _taskLists.find(nss);
+
+ if (taskListIt != _taskLists.end()) {
+ // Enqueued tasks have the latest metadata
+ return taskListIt->second.getHighestVersionEnqueued();
+ }
+ }
+
+ // If there are no enqueued tasks, get the max persisted
+ return getPersistedMaxVersion(opCtx, nss);
+ }();
+
+ auto remoteRefreshCallbackFn =
+ [this, nss, catalogCacheSinceVersion, maxLoaderVersion, notify, callbackFn](
+ OperationContext* opCtx,
+ StatusWith<CollectionAndChangedChunks> swCollectionAndChangedChunks) {
+
+ if (!swCollectionAndChangedChunks.isOK() &&
+ swCollectionAndChangedChunks != ErrorCodes::NamespaceNotFound) {
+ // No updates to apply. Do nothing.
+ } else {
+ // Enqueue a Task to apply the update retrieved from the config server.
+ Status scheduleStatus =
+ _scheduleTask(nss, Task{swCollectionAndChangedChunks, maxLoaderVersion});
+ if (!scheduleStatus.isOK()) {
+ callbackFn(opCtx, StatusWith<CollectionAndChangedChunks>(scheduleStatus));
+ notify->set();
+ return;
+ }
+
+ if (swCollectionAndChangedChunks.isOK()) {
+ // Create a response for the CatalogCache from the loader's metadata
+ // -- both persisted and enqueued.
+
+ swCollectionAndChangedChunks =
+ _getLoaderMetadata(opCtx, nss, catalogCacheSinceVersion);
+ // If no results were returned, convert the response into
+ // NamespaceNotFound.
+ if (swCollectionAndChangedChunks.isOK() &&
+ swCollectionAndChangedChunks.getValue().changedChunks.empty()) {
+ swCollectionAndChangedChunks =
+ Status(ErrorCodes::NamespaceNotFound, "collection was dropped");
+ }
+ }
+ }
+
+ // Complete the callbackFn work.
+ callbackFn(opCtx, std::move(swCollectionAndChangedChunks));
+ notify->set();
+ };
+
+ // Refresh the loader's metadata from the config server. The caller's request will
+ // then be serviced from the loader's up-to-date metadata.
+ _configServerLoader->getChunksSince(nss, maxLoaderVersion, remoteRefreshCallbackFn);
+}
+
+StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoaderMetadata(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkVersion& catalogCacheSinceVersion) {
+
+ // Get the enqueued metadata first. Otherwise we could miss data between reading persisted and
+ // enqueued, if an enqueued task finished after the persisted read but before the enqueued read.
+
+ auto enqueuedRes = _getEnqueuedMetadata(nss, catalogCacheSinceVersion);
+ bool isEnqueued = std::move(enqueuedRes.first);
+ CollectionAndChangedChunks enqueued = std::move(enqueuedRes.second);
+
+ auto swPersisted = getPersistedMetadataSinceVersion(opCtx, nss, catalogCacheSinceVersion);
+ if (!swPersisted.isOK()) {
+ return swPersisted;
+ }
+ CollectionAndChangedChunks persisted = std::move(swPersisted.getValue());
+
+ if (!isEnqueued) {
+ // There are no tasks in the queue. Return the persisted metadata.
+ return persisted;
+ } else if (enqueued.changedChunks.empty() || enqueued.epoch != persisted.epoch) {
+ // There is a task queue. Either:
+ // - nothing was returned, which means the last task enqueued is a drop task.
+ // - the epoch changed in the enqueued metadata, which means there's a drop operation
+ // enqueued somewhere.
+ // Either way, the persisted metadata is out-dated. Return enqueued results.
+ return enqueued;
+ } else if (persisted.changedChunks.empty()) {
+ // Nothing is persisted. Return enqueued results.
+ return enqueued;
+ } else {
+ // There can be overlap between persisted and enqueued metadata because enqueued work can
+ // be applied while persisted was read. We must remove this overlap.
+
+ const ChunkVersion minEnqueuedVersion = enqueued.changedChunks.front().getVersion();
+
+ // Remove chunks from 'persisted' that are GTE the minimum in 'enqueued' -- this is
+ // the overlap.
+ auto persistedChangedChunksIt = persisted.changedChunks.begin();
+ while (persistedChangedChunksIt != persisted.changedChunks.end() &&
+ persistedChangedChunksIt->getVersion() < minEnqueuedVersion) {
+ ++persistedChangedChunksIt;
+ }
+ persisted.changedChunks.erase(persistedChangedChunksIt, persisted.changedChunks.end());
+
+ // Append 'enqueued's chunks to 'persisted', which no longer overlaps.
+ persisted.changedChunks.insert(persisted.changedChunks.end(),
+ enqueued.changedChunks.begin(),
+ enqueued.changedChunks.end());
+
+ return persisted;
+ }
+}
+
+std::pair<bool, CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getEnqueuedMetadata(
+ const NamespaceString& nss, const ChunkVersion& catalogCacheSinceVersion) {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ auto taskListIt = _taskLists.find(nss);
+
+ if (taskListIt == _taskLists.end()) {
+ return std::make_pair(false, CollectionAndChangedChunks());
+ }
+
+ CollectionAndChangedChunks collAndChunks = taskListIt->second.getEnqueuedMetadata();
+
+ // Returns all the results if 'catalogCacheSinceVersion's epoch does not match. Otherwise, trim
+ // the results to be GTE to 'catalogCacheSinceVersion'
+
+ if (collAndChunks.epoch != catalogCacheSinceVersion.epoch()) {
+ return std::make_pair(true, collAndChunks);
+ }
+
+ auto changedChunksIt = collAndChunks.changedChunks.begin();
+ while (changedChunksIt != collAndChunks.changedChunks.end() &&
+ changedChunksIt->getVersion() < catalogCacheSinceVersion) {
+ ++changedChunksIt;
+ }
+ collAndChunks.changedChunks.erase(collAndChunks.changedChunks.begin(), changedChunksIt);
+
+ return std::make_pair(true, collAndChunks);
+}
+
+Status ShardServerCatalogCacheLoader::_scheduleTask(const NamespaceString& nss, Task task) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ const bool wasEmpty = _taskLists[nss].empty();
+ _taskLists[nss].addTask(std::move(task));
+
+ if (wasEmpty) {
+ return _threadPool.schedule([this, nss]() {
+ Status status = _threadPool.schedule([this, nss]() { _runTasks(nss); });
+ if (!status.isOK()) {
+ log() << "CatalogCacheLoader failed to schedule more persisted metadata update"
+ << " tasks for namespace '" << nss << "' due to '" << redact(status)
+ << "'. Clearing task list so that scheduling"
+ << " will be attempted by the next caller to refresh this namespace.";
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _taskLists.erase(nss);
+ }
+ });
+ }
+
+ return Status::OK();
+}
+
+void ShardServerCatalogCacheLoader::_runTasks(const NamespaceString& nss) {
+ auto opCtx = Client::getCurrent()->makeOperationContext();
+
+ // Run task
+ bool taskFinished = false;
+ try {
+ taskFinished = _updatePersistedMetadata(opCtx.get(), nss);
+ } catch (const DBException& ex) {
+ log() << redact(ex.toStatus());
+ }
+
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ // If task completed successfully, remove it from work queue
+ if (taskFinished) {
+ invariant(!_taskLists[nss].empty());
+ _taskLists[nss].removeActiveTask();
+ }
+
+ // Schedule more work if there is any
+ if (!_taskLists[nss].empty()) {
+ Status status = _threadPool.schedule([this, nss]() { _runTasks(nss); });
+ if (!status.isOK()) {
+ log() << "CatalogCacheLoader failed to schedule more persisted metadata update"
+ << " tasks for namespace '" << nss << "' due to '" << redact(status)
+ << "'. Clearing task list so that scheduling will be attempted by the next"
+ << " caller to refresh this namespace.";
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _taskLists.erase(nss);
+ }
+ } else {
+ _taskLists.erase(nss);
+ }
+}
+
+bool ShardServerCatalogCacheLoader::_updatePersistedMetadata(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+
+ invariant(!_taskLists[nss].empty());
+ const Task task = _taskLists[nss].getActiveTask();
+ invariant(task.dropped || !task.collectionAndChangedChunks->changedChunks.empty());
+
+ lock.unlock();
+
+ // Check if this is a drop task.
+
+ if (task.dropped) {
+ // The namespace was dropped. The persisted metadata for the collection must be cleared.
+ Status status = dropChunksAndDeleteCollectionsEntry(opCtx, nss);
+ uassert(ErrorCodes::OperationFailed,
+ str::stream() << "Failed to clear persisted chunk metadata for collection '"
+ << nss.ns()
+ << "' due to '"
+ << status.toString()
+ << "'. Will be retried.",
+ status.isOK());
+
+ LOG(1) << "Successfully cleared persisted chunk metadata for collection '" << nss << "'.";
+ return true;
+ }
+
+ // This is an update task.
+
+ ChunkVersion persistedMaxVersion = getPersistedMaxVersion(opCtx, nss);
+
+ // If the epoch of the update task does not match the persisted metadata, the persisted metadata
+ // -- from an old collection that was recreated -- must be cleared before applying the changes.
+ if (persistedMaxVersion.isSet() &&
+ persistedMaxVersion.epoch() != task.maxQueryVersion.epoch()) {
+ Status status = dropChunksAndDeleteCollectionsEntry(opCtx, nss);
+ uassert(ErrorCodes::OperationFailed,
+ str::stream() << "Failed to clear persisted chunk metadata for collection '"
+ << nss.ns()
+ << "' due to '"
+ << status.toString()
+ << "'. Will be retried.",
+ status.isOK());
+ }
+
+ Status status =
+ persistCollectionAndChangedChunks(opCtx, nss, task.collectionAndChangedChunks.get());
+ if (status == ErrorCodes::ConflictingOperationInProgress) {
+ // A new epoch was discovered in the new chunks. The CatalogCache will retry refreshing the
+ // chunk metadata: clearing the persisted metadata will be handled then.
+ return true;
+ }
+ uassert(ErrorCodes::OperationFailed,
+ str::stream() << "Failed to update the persisted chunk metadata for collection '"
+ << nss.ns()
+ << "' from '"
+ << task.minQueryVersion.toString()
+ << "' to '"
+ << task.maxQueryVersion.toString()
+ << "' due to '"
+ << status.toString()
+ << "'. Will be retried.",
+ status.isOK());
+
+ LOG(1) << "Successfully updated persisted chunk metadata for collection '" << nss << "' from '"
+ << task.minQueryVersion << "' to collection version '" << task.maxQueryVersion << "'.";
+ return true;
+}
+
+ShardServerCatalogCacheLoader::Task::Task(
+ StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks,
+ ChunkVersion minimumQueryVersion) {
+
+ minQueryVersion = minimumQueryVersion;
+
+ if (statusWithCollectionAndChangedChunks.isOK()) {
+ collectionAndChangedChunks = statusWithCollectionAndChangedChunks.getValue();
+ invariant(!collectionAndChangedChunks->changedChunks.empty());
+ maxQueryVersion = collectionAndChangedChunks->changedChunks.back().getVersion();
+ } else {
+ invariant(statusWithCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound);
+ dropped = true;
+ maxQueryVersion = ChunkVersion::UNSHARDED();
+ }
+}
+
+void ShardServerCatalogCacheLoader::TaskList::addTask(Task task) {
+ if (_tasks.empty()) {
+ _tasks.emplace_back(std::move(task));
+ return;
+ }
+
+ if (task.dropped) {
+ invariant(_tasks.back().maxQueryVersion.equals(task.minQueryVersion));
+
+ Task front = std::move(_tasks.front());
+ _tasks.clear();
+ _tasks.emplace_back(std::move(front));
+
+ // No need to schedule a drop if one is already currently active.
+ if (!_tasks.front().dropped) {
+ _tasks.emplace_back(std::move(task));
+ }
+ } else {
+ // Tasks must have contiguous versions, unless a complete reload occurs.
+ invariant(_tasks.back().maxQueryVersion.equals(task.minQueryVersion) ||
+ !task.minQueryVersion.isSet());
+
+ _tasks.emplace_back(std::move(task));
+ }
+}
+
+const ShardServerCatalogCacheLoader::Task& ShardServerCatalogCacheLoader::TaskList::getActiveTask()
+ const {
+ invariant(!_tasks.empty());
+ return _tasks.front();
+}
+
+void ShardServerCatalogCacheLoader::TaskList::removeActiveTask() {
+ invariant(!_tasks.empty());
+ _tasks.pop_front();
+}
+
+ChunkVersion ShardServerCatalogCacheLoader::TaskList::getHighestVersionEnqueued() const {
+ invariant(!_tasks.empty());
+ return _tasks.back().maxQueryVersion;
+}
+
+CollectionAndChangedChunks ShardServerCatalogCacheLoader::TaskList::getEnqueuedMetadata() const {
+ CollectionAndChangedChunks collAndChunks;
+ for (const auto& task : _tasks) {
+ if (task.dropped) {
+ // A drop task should reset the metadata.
+ collAndChunks = CollectionAndChangedChunks();
+ } else {
+ if (task.collectionAndChangedChunks->epoch != collAndChunks.epoch) {
+ // An epoch change should reset the metadata and start from the new.
+ collAndChunks = task.collectionAndChangedChunks.get();
+ } else {
+ // Epochs match, so the new results should be appended.
+ //
+ // Note: it's okay if the new chunks change to a new version epoch in the middle of
+ // the chunks vector. This will be either reset by the next task with a total reload
+ // with a new epoch, or cause the original getChunksSince caller to throw out the
+ // results and refresh again.
+ collAndChunks.changedChunks.insert(
+ collAndChunks.changedChunks.end(),
+ task.collectionAndChangedChunks->changedChunks.begin(),
+ task.collectionAndChangedChunks->changedChunks.end());
+ }
+ }
+ }
+ return collAndChunks;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h
new file mode 100644
index 00000000000..5be9eb51727
--- /dev/null
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h
@@ -0,0 +1,245 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/s/catalog_cache_loader.h"
+#include "mongo/util/concurrency/thread_pool.h"
+
+namespace mongo {
+
+class ConfigServerCatalogCacheLoader;
+class ThreadPoolInterface;
+
+/**
+ * Shard implementation of the CatalogCacheLoader used by the CatalogCache. Retrieves chunk metadata
+ * for the CatalogCache on shards.
+ *
+ * If a shard primary, retrieves chunk metadata from the config server and maintains a persisted
+ * copy of that chunk metadata so shard secondaries can access the metadata. If a shard secondary,
+ * retrieves chunk metadata from the shard persisted chunk metadata.
+ */
+class ShardServerCatalogCacheLoader : public CatalogCacheLoader {
+public:
+ ShardServerCatalogCacheLoader(std::unique_ptr<CatalogCacheLoader> configLoader);
+ ~ShardServerCatalogCacheLoader();
+
+ /**
+ * This must be called serially, never in parallel, including waiting for the returned
+ * Notification to be signalled.
+ *
+ * This function is robust to unexpected version requests from the CatalogCache. Requesting
+ * versions with epoches that do not match anything on the config server will not affect or
+ * clear the locally persisted metadata. Requesting versions higher than anything previous
+ * requested, or versions lower than already requested, will not mess up the locally persisted
+ * metadata, and will return what was requested if it exists.
+ */
+ std::shared_ptr<Notification<void>> getChunksSince(
+ const NamespaceString& nss,
+ ChunkVersion version,
+ stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn)
+ override;
+
+private:
+ /**
+ * This represents an update task for the persisted chunk metadata. The task will either be to
+ * apply a set up updated chunks to the shard persisted metadata store or to drop the persisted
+ * metadata for a specific collection.
+ */
+ struct Task {
+ /**
+ * Initializes a task for either dropping or updating the persisted metadata for the
+ * associated collection. Which type of task is determined by the Status of
+ * 'statusWithCollectionAndChangedChunks', whether it is NamespaceNotFound or OK.
+ *
+ * Note: statusWithCollectionAndChangedChunks must always be NamespaceNotFound or
+ * OK, otherwise the constructor will invariant because there is no task to complete.
+ *
+ * 'collectionAndChangedChunks' is only initialized if 'dropped' is false.
+ * 'minimumQueryVersion' sets 'minQueryVersion'.
+ * 'maxQueryVersion' is either set to the highest chunk version in
+ * 'collectionAndChangedChunks' or ChunkVersion::UNSHARDED().
+ */
+ Task(StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks,
+ ChunkVersion minimumQueryVersion);
+
+ // Chunks and Collection updates to be applied to the shard persisted metadata store.
+ boost::optional<CollectionAndChangedChunks> collectionAndChangedChunks{boost::none};
+
+ // The highest version that the loader had before going to the config server's metadata
+ // store for updated chunks.
+ // Used by the TaskList below to enforce consistent updates are applied.
+ ChunkVersion minQueryVersion;
+
+ // Either the highest chunk version in 'collectionAndChangedChunks' or the same as
+ // 'minQueryVersion' if 'dropped' is true.
+ // Used by the TaskList below to enforce consistent updates are applied.
+ ChunkVersion maxQueryVersion;
+
+ // Indicates whether the collection metadata must be cleared.
+ bool dropped{false};
+ };
+
+ /**
+ * A list (work queue) of updates to apply to the shard persisted metadata store for a specific
+ * collection. Enforces that tasks that are added to the list are either consistent:
+ *
+ * tasks[i].minQueryVersion == tasks[i-1].maxQueryVersion.
+ *
+ * or applying a complete update from the minumum version, where
+ *
+ * minQueryVersion == ChunkVersion::UNSHARDED().
+ */
+ class TaskList {
+ public:
+ /**
+ * Adds 'task' to the back of the 'tasks' list.
+ *
+ * If 'task' is a drop task, clears 'tasks' except for the front active task, so that we
+ * don't waste time applying changes we will just delete. If the one remaining task in the
+ * list is already a drop task, the new one isn't added because it is redundant.
+ */
+ void addTask(Task task);
+
+ /**
+ * Returns the front of the 'tasks' list. Invariants if 'tasks' is empty.
+ */
+ const Task& getActiveTask() const;
+
+ /**
+ * Erases the current active task and updates 'activeTask' to the next task in 'tasks'.
+ */
+ void removeActiveTask();
+
+ /**
+ * Checks whether there are any tasks left.
+ */
+ const bool empty() {
+ return _tasks.empty();
+ }
+
+ /**
+ * Gets the last task's highest version -- this is the most up to date version.
+ */
+ ChunkVersion getHighestVersionEnqueued() const;
+
+ /**
+ * Iterates over the task list to retrieve the enqueued metadata.
+ */
+ CollectionAndChangedChunks getEnqueuedMetadata() const;
+
+ private:
+ std::list<Task> _tasks{};
+ };
+
+ typedef std::map<NamespaceString, TaskList> TaskLists;
+
+ /**
+ * Refreshes chunk metadata from the config server's metadata store, and schedules maintenance
+ * of the shard's persisted metadata store with the latest updates retrieved from the config
+ * server.
+ *
+ * Then calls 'callbackFn' with metadata loaded from the shard persisted metadata store, and any
+ * in-memory task enqueued to update that store, GTE to 'catalogCacheSinceVersion'
+ *
+ * Only run on the shard primary.
+ */
+ void _schedulePrimayGetChunksSince(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkVersion& catalogCacheSinceVersion,
+ stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn,
+ std::shared_ptr<Notification<void>> notify);
+
+
+ /**
+ * Loads chunk metadata from the shard persisted metadata store, and any in-memory task enqueued
+ * to update that store, GTE to 'catalogCacheSinceVersion'.
+ *
+ * Will return an empty CollectionAndChangedChunks object if no metadata is found (collection
+ * was dropped).
+ *
+ * Only run on the shard primary.
+ */
+ StatusWith<CollectionAndChangedChunks> _getLoaderMetadata(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkVersion& catalogCacheSinceVersion);
+
+ /**
+ * Loads chunk metadata from all in-memory tasks enqueued to update the shard persisted metadata
+ * store for collection 'nss' that is GTE 'catalogCacheSinceVersion'. If
+ * 'catalogCacheSinceVersion's epoch does not match that of the metadata enqueued, returns all
+ * metadata.
+ *
+ * The bool returned in the pair indicates whether there are any tasks enqueued. If none are, it
+ * is false. If it is true, and the CollectionAndChangedChunks returned is empty, this indicates
+ * a drop was enqueued and there is no metadata.
+ *
+ * Only run on the shard primary.
+ */
+ std::pair<bool, CollectionAndChangedChunks> _getEnqueuedMetadata(
+ const NamespaceString& nss, const ChunkVersion& catalogCacheSinceVersion);
+
+ /**
+ * Adds 'task' to the task list for 'nss'. If this creates a new task list, then '_runTasks' is
+ * started on another thread to execute the tasks.
+ *
+ * Only run on the shard primary.
+ */
+ Status _scheduleTask(const NamespaceString& nss, Task task);
+
+ /**
+ * Schedules tasks in the 'nss' task list to execute until the task list is depleted.
+ *
+ * Only run on the shard primary.
+ */
+ void _runTasks(const NamespaceString& nss);
+
+ /**
+ * Executes the task at the front of the task list for 'nss'. The task will either drop 'nss's
+ * metadata or apply a set of updates.
+ *
+ * Only run on the shard primary.
+ */
+ bool _updatePersistedMetadata(OperationContext* opCtx, const NamespaceString& nss);
+
+ // Used by the shard primary to retrieve chunk metadata from the config server.
+ const std::unique_ptr<CatalogCacheLoader> _configServerLoader;
+
+ // Thread pool used to load chunk metadata.
+ ThreadPool _threadPool;
+
+ // Protects the class state below.
+ stdx::mutex _mutex;
+
+ // Map to track in progress persisted cache updates on the shard primary.
+ TaskLists _taskLists;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp
index b329a325423..46dd675fc13 100644
--- a/src/mongo/db/s/sharding_initialization_mongod.cpp
+++ b/src/mongo/db/s/sharding_initialization_mongod.cpp
@@ -38,11 +38,13 @@
#include "mongo/client/remote_command_targeter_factory_impl.h"
#include "mongo/db/logical_time_metadata_hook.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/s/shard_server_catalog_cache_loader.h"
#include "mongo/db/s/sharding_egress_metadata_hook_for_mongod.h"
#include "mongo/db/server_options.h"
#include "mongo/executor/task_executor.h"
#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
#include "mongo/s/catalog/sharding_catalog_manager_impl.h"
+#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard_factory.h"
#include "mongo/s/client/shard_local.h"
#include "mongo/s/client/shard_remote.h"
@@ -83,11 +85,18 @@ Status initializeGlobalShardingStateForMongod(OperationContext* opCtx,
auto shardFactory =
stdx::make_unique<ShardFactory>(std::move(buildersMap), std::move(targeterFactory));
+ std::unique_ptr<CatalogCache> catalogCache =
+ (serverGlobalParams.clusterRole == ClusterRole::ConfigServer)
+ ? stdx::make_unique<CatalogCache>()
+ : stdx::make_unique<CatalogCache>(stdx::make_unique<ShardServerCatalogCacheLoader>(
+ stdx::make_unique<ConfigServerCatalogCacheLoader>()));
+
return initializeGlobalShardingState(
opCtx,
configCS,
distLockProcessId,
std::move(shardFactory),
+ std::move(catalogCache),
[opCtx] {
auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>();
hookList->addHook(
diff --git a/src/mongo/s/catalog/type_chunk.cpp b/src/mongo/s/catalog/type_chunk.cpp
index eb286b7a631..5d40cc2ecdf 100644
--- a/src/mongo/s/catalog/type_chunk.cpp
+++ b/src/mongo/s/catalog/type_chunk.cpp
@@ -343,10 +343,6 @@ std::string ChunkType::genID(StringData ns, const BSONObj& o) {
}
Status ChunkType::validate() const {
- if (!_ns.is_initialized() || _ns->empty()) {
- return Status(ErrorCodes::NoSuchKey, str::stream() << "missing " << ns.name() << " field");
- }
-
if (!_min.is_initialized() || _min->isEmpty()) {
return Status(ErrorCodes::NoSuchKey, str::stream() << "missing " << min.name() << " field");
}
diff --git a/src/mongo/s/catalog/type_chunk.h b/src/mongo/s/catalog/type_chunk.h
index 96ee1588a6a..e80a7df3029 100644
--- a/src/mongo/s/catalog/type_chunk.h
+++ b/src/mongo/s/catalog/type_chunk.h
@@ -255,7 +255,7 @@ public:
private:
// Convention: (M)andatory, (O)ptional, (S)pecial; (C)onfig, (S)hard.
- // (M)(C) collection this chunk is in
+ // (O)(C) collection this chunk is in
boost::optional<std::string> _ns;
// (M)(C)(S) first key of the range, inclusive
boost::optional<BSONObj> _min;
diff --git a/src/mongo/s/catalog/type_shard_collection.h b/src/mongo/s/catalog/type_shard_collection.h
index 43118b4c0d5..c3637e2c5aa 100644
--- a/src/mongo/s/catalog/type_shard_collection.h
+++ b/src/mongo/s/catalog/type_shard_collection.h
@@ -79,6 +79,13 @@ public:
static const BSONField<bool> refreshing;
static const BSONField<long long> refreshSequenceNumber;
+ explicit ShardCollectionType(const NamespaceString& uuid,
+ const NamespaceString& nss,
+ const OID& epoch,
+ const KeyPattern& keyPattern,
+ const BSONObj& defaultCollation,
+ const bool& unique);
+
/**
* Constructs a new ShardCollectionType object from BSON. Also does validation of the contents.
*/
@@ -145,13 +152,6 @@ public:
}
private:
- ShardCollectionType(const NamespaceString& uuid,
- const NamespaceString& nss,
- const OID& epoch,
- const KeyPattern& keyPattern,
- const BSONObj& defaultCollation,
- const bool& unique);
-
// Will become the UUID when available. Currently a duplicate of '_nss'.
NamespaceString _uuid;
diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp
index 87f53ebadad..d81f222bfde 100644
--- a/src/mongo/s/catalog_cache.cpp
+++ b/src/mongo/s/catalog_cache.cpp
@@ -40,9 +40,7 @@
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_database.h"
#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/config_server_catalog_cache_loader.h"
#include "mongo/s/grid.h"
-#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
#include "mongo/util/timer.h"
@@ -157,6 +155,9 @@ std::shared_ptr<ChunkManager> refreshCollectionRoutingInfo(
CatalogCache::CatalogCache() : _cacheLoader(stdx::make_unique<ConfigServerCatalogCacheLoader>()) {}
+CatalogCache::CatalogCache(std::unique_ptr<CatalogCacheLoader> cacheLoader)
+ : _cacheLoader(std::move(cacheLoader)) {}
+
CatalogCache::~CatalogCache() = default;
StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx,
diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h
index e13a2b0c557..fa81d0a7100 100644
--- a/src/mongo/s/catalog_cache.h
+++ b/src/mongo/s/catalog_cache.h
@@ -34,6 +34,8 @@
#include "mongo/s/chunk_manager.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/client/shard.h"
+#include "mongo/s/config_server_catalog_cache_loader.h"
+#include "mongo/stdx/memory.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/notification.h"
#include "mongo/util/string_map.h"
@@ -55,7 +57,13 @@ class CatalogCache {
MONGO_DISALLOW_COPYING(CatalogCache);
public:
+ /**
+ * Defaults to instantiating a ConfigServerCatalogCacheLoader.
+ */
CatalogCache();
+
+ CatalogCache(std::unique_ptr<CatalogCacheLoader> cacheLoader);
+
~CatalogCache();
/**
diff --git a/src/mongo/s/catalog_cache_loader.h b/src/mongo/s/catalog_cache_loader.h
index 6cdfc487513..21702da77e3 100644
--- a/src/mongo/s/catalog_cache_loader.h
+++ b/src/mongo/s/catalog_cache_loader.h
@@ -68,7 +68,7 @@ public:
/**
* Non-blocking call, which requests the chunks changed since the specified version to be
- * fetched from the persistent matadata store and invokes the callback function with the result.
+ * fetched from the persistent metadata store and invokes the callback function with the result.
* The callback function must never throw - it is a fatal error to do so.
*
* If for some reason the asynchronous fetch operation cannot be dispatched (for example on
diff --git a/src/mongo/s/config_server_catalog_cache_loader.cpp b/src/mongo/s/config_server_catalog_cache_loader.cpp
index b26df2d73ae..6b63846e592 100644
--- a/src/mongo/s/config_server_catalog_cache_loader.cpp
+++ b/src/mongo/s/config_server_catalog_cache_loader.cpp
@@ -47,7 +47,7 @@ namespace {
*/
ThreadPool::Options makeDefaultThreadPoolOptions() {
ThreadPool::Options options;
- options.poolName = "CatalogCacheLoader";
+ options.poolName = "ConfigServerCatalogCacheLoader";
options.minThreads = 0;
options.maxThreads = 6;
diff --git a/src/mongo/s/config_server_test_fixture.cpp b/src/mongo/s/config_server_test_fixture.cpp
index c5640333022..5bd8d4a9aa1 100644
--- a/src/mongo/s/config_server_test_fixture.cpp
+++ b/src/mongo/s/config_server_test_fixture.cpp
@@ -66,6 +66,7 @@
#include "mongo/s/client/shard_local.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/client/shard_remote.h"
+#include "mongo/s/config_server_catalog_cache_loader.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/set_shard_version_request.h"
@@ -144,8 +145,14 @@ std::unique_ptr<ShardingCatalogManager> ConfigServerTestFixture::makeShardingCat
return stdx::make_unique<ShardingCatalogManagerImpl>(std::move(specialExec));
}
-std::unique_ptr<CatalogCache> ConfigServerTestFixture::makeCatalogCache() {
- return stdx::make_unique<CatalogCache>();
+std::unique_ptr<CatalogCacheLoader> ConfigServerTestFixture::makeCatalogCacheLoader() {
+ return stdx::make_unique<ConfigServerCatalogCacheLoader>();
+}
+
+std::unique_ptr<CatalogCache> ConfigServerTestFixture::makeCatalogCache(
+ std::unique_ptr<CatalogCacheLoader> catalogCacheLoader) {
+ invariant(catalogCacheLoader);
+ return stdx::make_unique<CatalogCache>(std::move(catalogCacheLoader));
}
std::unique_ptr<BalancerConfiguration> ConfigServerTestFixture::makeBalancerConfiguration() {
diff --git a/src/mongo/s/config_server_test_fixture.h b/src/mongo/s/config_server_test_fixture.h
index 69d08afe17d..9eeb0fa7398 100644
--- a/src/mongo/s/config_server_test_fixture.h
+++ b/src/mongo/s/config_server_test_fixture.h
@@ -132,7 +132,10 @@ protected:
std::unique_ptr<ShardingCatalogManager> makeShardingCatalogManager(
ShardingCatalogClient* catalogClient) override;
- std::unique_ptr<CatalogCache> makeCatalogCache() override;
+ std::unique_ptr<CatalogCacheLoader> makeCatalogCacheLoader() override;
+
+ std::unique_ptr<CatalogCache> makeCatalogCache(
+ std::unique_ptr<CatalogCacheLoader> catalogCacheLoader) override;
std::unique_ptr<ClusterCursorManager> makeClusterCursorManager() override;
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index a88d123e535..7e4c84bd01f 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -67,6 +67,7 @@
#include "mongo/s/balancer_configuration.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/sharding_catalog_manager.h"
+#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_factory.h"
#include "mongo/s/client/shard_registry.h"
@@ -203,6 +204,7 @@ static Status initializeSharding(OperationContext* opCtx) {
mongosGlobalParams.configdbs,
generateDistLockProcessId(opCtx),
std::move(shardFactory),
+ stdx::make_unique<CatalogCache>(),
[opCtx]() {
auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>();
hookList->addHook(
diff --git a/src/mongo/s/shard_server_test_fixture.cpp b/src/mongo/s/shard_server_test_fixture.cpp
index 272d6a00641..2e0396248e7 100644
--- a/src/mongo/s/shard_server_test_fixture.cpp
+++ b/src/mongo/s/shard_server_test_fixture.cpp
@@ -33,9 +33,11 @@
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/commands.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/s/shard_server_catalog_cache_loader.h"
#include "mongo/s/catalog/dist_lock_catalog_mock.h"
#include "mongo/s/catalog/dist_lock_manager_mock.h"
#include "mongo/s/catalog/sharding_catalog_client_impl.h"
+#include "mongo/s/catalog_cache.h"
#include "mongo/stdx/memory.h"
namespace mongo {
@@ -106,4 +108,15 @@ std::unique_ptr<ShardingCatalogClient> ShardServerTestFixture::makeShardingCatal
return stdx::make_unique<ShardingCatalogClientImpl>(std::move(distLockManager));
}
+std::unique_ptr<CatalogCacheLoader> ShardServerTestFixture::makeCatalogCacheLoader() {
+ return stdx::make_unique<ShardServerCatalogCacheLoader>(
+ stdx::make_unique<ConfigServerCatalogCacheLoader>());
+}
+
+std::unique_ptr<CatalogCache> ShardServerTestFixture::makeCatalogCache(
+ std::unique_ptr<CatalogCacheLoader> catalogCacheLoader) {
+ invariant(catalogCacheLoader);
+ return stdx::make_unique<CatalogCache>(std::move(catalogCacheLoader));
+}
+
} // namespace mongo
diff --git a/src/mongo/s/shard_server_test_fixture.h b/src/mongo/s/shard_server_test_fixture.h
index e3e15a4d780..52e202536d3 100644
--- a/src/mongo/s/shard_server_test_fixture.h
+++ b/src/mongo/s/shard_server_test_fixture.h
@@ -83,6 +83,14 @@ protected:
*/
std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient(
std::unique_ptr<DistLockManager> distLockManager) override;
+
+ /**
+ * Creates a ShardServerCatalogCacheLoader.
+ */
+ std::unique_ptr<CatalogCacheLoader> makeCatalogCacheLoader();
+
+ std::unique_ptr<CatalogCache> makeCatalogCache(
+ std::unique_ptr<CatalogCacheLoader> catalogCacheLoader);
};
} // namespace mongo
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index 9220176f29e..570440f817b 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -160,6 +160,7 @@ Status initializeGlobalShardingState(OperationContext* opCtx,
const ConnectionString& configCS,
StringData distLockProcessId,
std::unique_ptr<ShardFactory> shardFactory,
+ std::unique_ptr<CatalogCache> catalogCache,
rpc::ShardingEgressMetadataHookBuilder hookBuilder,
ShardingCatalogManagerBuilder catalogManagerBuilder) {
if (configCS.type() == ConnectionString::INVALID) {
@@ -202,7 +203,7 @@ Status initializeGlobalShardingState(OperationContext* opCtx,
grid.init(
std::move(catalogClient),
std::move(catalogManager),
- stdx::make_unique<CatalogCache>(),
+ std::move(catalogCache),
std::move(shardRegistry),
stdx::make_unique<ClusterCursorManager>(getGlobalServiceContext()->getPreciseClockSource()),
stdx::make_unique<BalancerConfiguration>(),
diff --git a/src/mongo/s/sharding_initialization.h b/src/mongo/s/sharding_initialization.h
index f2e3d12db8b..6d4629c7994 100644
--- a/src/mongo/s/sharding_initialization.h
+++ b/src/mongo/s/sharding_initialization.h
@@ -41,12 +41,14 @@ namespace executor {
class TaskExecutor;
} // namespace executor
+class CatalogCache;
class ConnectionString;
class OperationContext;
class ShardFactory;
class Status;
class ShardingCatalogClient;
class ShardingCatalogManager;
+
using ShardingCatalogManagerBuilder = stdx::function<std::unique_ptr<ShardingCatalogManager>(
ShardingCatalogClient*, std::unique_ptr<executor::TaskExecutor>)>;
@@ -73,6 +75,7 @@ Status initializeGlobalShardingState(OperationContext* opCtx,
const ConnectionString& configCS,
StringData distLockProcessId,
std::unique_ptr<ShardFactory> shardFactory,
+ std::unique_ptr<CatalogCache> catalogCache,
rpc::ShardingEgressMetadataHookBuilder hookBuilder,
ShardingCatalogManagerBuilder catalogManagerBuilder);
diff --git a/src/mongo/s/sharding_mongod_test_fixture.cpp b/src/mongo/s/sharding_mongod_test_fixture.cpp
index 8685b7ebc6d..5c4369f5fa3 100644
--- a/src/mongo/s/sharding_mongod_test_fixture.cpp
+++ b/src/mongo/s/sharding_mongod_test_fixture.cpp
@@ -65,6 +65,7 @@
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/catalog_cache.h"
+#include "mongo/s/catalog_cache_loader.h"
#include "mongo/s/client/shard_factory.h"
#include "mongo/s/client/shard_local.h"
#include "mongo/s/client/shard_registry.h"
@@ -235,7 +236,12 @@ std::unique_ptr<ShardingCatalogManager> ShardingMongodTestFixture::makeShardingC
return nullptr;
}
-std::unique_ptr<CatalogCache> ShardingMongodTestFixture::makeCatalogCache() {
+std::unique_ptr<CatalogCacheLoader> ShardingMongodTestFixture::makeCatalogCacheLoader() {
+ return nullptr;
+}
+
+std::unique_ptr<CatalogCache> ShardingMongodTestFixture::makeCatalogCache(
+ std::unique_ptr<CatalogCacheLoader> catalogCacheLoader) {
return nullptr;
}
@@ -269,7 +275,9 @@ Status ShardingMongodTestFixture::initializeGlobalShardingStateForMongodForTest(
auto catalogClientPtr = makeShardingCatalogClient(std::move(distLockManagerPtr));
auto catalogManagerPtr = makeShardingCatalogManager(catalogClientPtr.get());
- auto catalogCachePtr = makeCatalogCache();
+
+ auto catalogCacheLoaderPtr = makeCatalogCacheLoader();
+ auto catalogCachePtr = makeCatalogCache(std::move(catalogCacheLoaderPtr));
auto clusterCursorManagerPtr = makeClusterCursorManager();
diff --git a/src/mongo/s/sharding_mongod_test_fixture.h b/src/mongo/s/sharding_mongod_test_fixture.h
index 62435c97e79..fca43abd45d 100644
--- a/src/mongo/s/sharding_mongod_test_fixture.h
+++ b/src/mongo/s/sharding_mongod_test_fixture.h
@@ -40,6 +40,7 @@ namespace mongo {
class BalancerConfiguration;
class CatalogCache;
+class CatalogCacheLoader;
class ConnectionString;
class ClusterCursorManager;
class DistLockCatalog;
@@ -229,7 +230,13 @@ protected:
/**
* Base class returns nullptr.
*/
- virtual std::unique_ptr<CatalogCache> makeCatalogCache();
+ virtual std::unique_ptr<CatalogCacheLoader> makeCatalogCacheLoader();
+
+ /**
+ * Base class returns nullptr.
+ */
+ virtual std::unique_ptr<CatalogCache> makeCatalogCache(
+ std::unique_ptr<CatalogCacheLoader> catalogCacheLoader);
/**
* Base class returns nullptr.