diff options
24 files changed, 1050 insertions, 87 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 9f753761487..e99e43414b8 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -863,7 +863,7 @@ env.Library( "run_commands", "rw_concern_d", "s/commands", - "s/metadata", + "s/collection_metadata", "s/sharding", "service_context_d", "startup_warnings_mongod", diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 840991cc59a..beeb2957f33 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -5,17 +5,16 @@ Import("env") env = env.Clone() env.Library( - target='metadata', + target='collection_metadata', source=[ 'collection_metadata.cpp', ], LIBDEPS=[ - 'shard_metadata_util', '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/common', '$BUILD_DIR/mongo/db/range_arithmetic', - '$BUILD_DIR/mongo/db/repl/repl_coordinator_impl', '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/s/common', ], ) @@ -85,6 +84,7 @@ env.Library( 'move_timing_helper.cpp', 'operation_sharding_state.cpp', 'shard_identity_rollback_notifier.cpp', + 'shard_server_catalog_cache_loader.cpp', 'sharded_connection_info.cpp', 'sharding_egress_metadata_hook_for_mongod.cpp', 'sharding_initialization_mongod.cpp', @@ -106,8 +106,9 @@ env.Library( '$BUILD_DIR/mongo/s/is_mongos', '$BUILD_DIR/mongo/s/sharding_initialization', '$BUILD_DIR/mongo/util/elapsed_tracker', - 'metadata', + 'collection_metadata', 'migration_types', + 'shard_metadata_util', 'sharding_task_executor', 'type_shard_identity', #'$BUILD_DIR/mongo/db/dbhelpers', # CYCLE @@ -198,7 +199,7 @@ env.Library( '$BUILD_DIR/mongo/db/index_d', '$BUILD_DIR/mongo/db/repl/repl_coordinator_global', 'balancer', - 'metadata', + 'collection_metadata', 'sharding', ], ) @@ -227,7 +228,6 @@ env.CppUnitTest( env.CppUnitTest( target='shard_test', source=[ - 'shard_metadata_util_test.cpp', 'active_migrations_registry_test.cpp', 'migration_chunk_cloner_source_legacy_test.cpp', 'sharding_state_test.cpp', @@ -265,3 +265,13 @@ env.CppUnitTest( ], ) +env.CppUnitTest( + target='shard_metadata_util_test', + source=[ + 'shard_metadata_util_test.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/s/shard_server_test_fixture', + 'shard_metadata_util', + ], +) diff --git a/src/mongo/db/s/shard_metadata_util.cpp b/src/mongo/db/s/shard_metadata_util.cpp index 45c34444de6..c0efea7fc4e 100644 --- a/src/mongo/db/s/shard_metadata_util.cpp +++ b/src/mongo/db/s/shard_metadata_util.cpp @@ -50,36 +50,14 @@ const WriteConcernOptions kLocalWriteConcern(1, WriteConcernOptions::SyncMode::UNSET, Milliseconds(0)); -/** - * Structure representing the generated query and sort order for a chunk diffing operation. - */ -struct QueryAndSort { - const BSONObj query; - const BSONObj sort; -}; +} // namespace -/** - * Returns the query needed to find incremental changes to the chunks collection on a shard server. - * - * The query has to find all the chunks $gte the current max version. Currently, any splits, merges - * and moves will increment the current max version. Querying by lastmod is essential because we - * want to use the {lastmod} index on the chunks collection. This makes potential cursor yields to - * apply split/merge/move updates safe: updates always move or insert documents at the end of the - * index (because the document updates always have higher lastmod), so changed always come *after* - * our current cursor position and are seen when the cursor recommences. - * - * The sort must be by ascending version so that the updates can be applied in-memory in order. This - * is important because it is possible for a cursor to read updates to the same _id document twice, - * due to the yield described above. If updates are applied in ascending version order, the later - * update is applied last and remains. - */ QueryAndSort createShardChunkDiffQuery(const ChunkVersion& collectionVersion) { - return {BSON(ChunkType::DEPRECATED_lastmod() << GTE << Timestamp(collectionVersion.toLong())), + return {BSON(ChunkType::DEPRECATED_lastmod() + << BSON("$gte" << Timestamp(collectionVersion.toLong()))), BSON(ChunkType::DEPRECATED_lastmod() << 1)}; } -} // namespace - bool RefreshState::operator==(RefreshState& other) { return (other.epoch == epoch) && (other.refreshing == refreshing) && (other.sequenceNumber == sequenceNumber); @@ -123,8 +101,9 @@ StatusWith<RefreshState> getPersistedRefreshFlags(OperationContext* opCtx, StatusWith<ShardCollectionType> readShardCollectionsEntry(OperationContext* opCtx, const NamespaceString& nss) { + Query fullQuery(BSON(ShardCollectionType::uuid() << nss.ns())); - fullQuery.readPref(ReadPreference::SecondaryOnly, BSONArray()); + try { DBDirectClient client(opCtx); std::unique_ptr<DBClientCursor> cursor = @@ -214,18 +193,20 @@ Status updateShardCollectionsEntry(OperationContext* opCtx, StatusWith<std::vector<ChunkType>> readShardChunks(OperationContext* opCtx, const NamespaceString& nss, - const ChunkVersion& collectionVersion) { + const BSONObj& query, + const BSONObj& sort, + boost::optional<long long> limit, + const OID& epoch) { // Query to retrieve the chunks. - QueryAndSort diffQuery = createShardChunkDiffQuery(collectionVersion); - Query fullQuery(diffQuery.query); - fullQuery.sort(diffQuery.sort); - fullQuery.readPref(ReadPreference::SecondaryOnly, BSONArray()); + Query fullQuery(query); + fullQuery.sort(sort); try { DBDirectClient client(opCtx); std::string chunkMetadataNs = ChunkType::ShardNSPrefix + nss.ns(); - std::unique_ptr<DBClientCursor> cursor = client.query(chunkMetadataNs, fullQuery, 0LL); + std::unique_ptr<DBClientCursor> cursor = + client.query(chunkMetadataNs, fullQuery, limit.get_value_or(0)); if (!cursor) { return {ErrorCodes::OperationFailed, @@ -236,7 +217,7 @@ StatusWith<std::vector<ChunkType>> readShardChunks(OperationContext* opCtx, std::vector<ChunkType> chunks; while (cursor->more()) { BSONObj document = cursor->nextSafe().getOwned(); - auto statusWithChunk = ChunkType::fromShardBSON(document, collectionVersion.epoch()); + auto statusWithChunk = ChunkType::fromShardBSON(document, epoch); if (!statusWithChunk.isOK()) { return {statusWithChunk.getStatus().code(), str::stream() << "Failed to parse chunk '" << document.toString() @@ -288,13 +269,6 @@ Status updateShardChunks(OperationContext* opCtx, for (auto& chunk : chunks) { // Check for a different epoch. if (!chunk.getVersion().hasEqualEpoch(currEpoch)) { - // This means the collection was dropped and recreated. Drop the chunk metadata - // and return. - Status status = dropChunksAndDeleteCollectionsEntry(opCtx, nss); - if (!status.isOK()) { - return status; - } - return Status{ErrorCodes::ConflictingOperationInProgress, str::stream() << "Invalid chunks found when reloading '" << nss.toString() diff --git a/src/mongo/db/s/shard_metadata_util.h b/src/mongo/db/s/shard_metadata_util.h index 3c5a684ff80..c29029df094 100644 --- a/src/mongo/db/s/shard_metadata_util.h +++ b/src/mongo/db/s/shard_metadata_util.h @@ -32,11 +32,11 @@ #include <vector> #include "mongo/base/status.h" +#include "mongo/bson/bsonobj.h" #include "mongo/bson/oid.h" namespace mongo { -class BSONObj; struct ChunkVersion; class ChunkType; class CollectionMetadata; @@ -52,6 +52,14 @@ class StatusWith; namespace shardmetadatautil { /** + * Structure representing the generated query and sort order for a chunk diffing operation. + */ +struct QueryAndSort { + const BSONObj query; + const BSONObj sort; +}; + +/** * Subset of the shard's collections collection related to refresh state. */ struct RefreshState { @@ -63,6 +71,23 @@ struct RefreshState { }; /** + * Returns the query needed to find incremental changes to the chunks collection on a shard server. + * + * The query has to find all the chunks $gte the current max version. Currently, any splits, merges + * and moves will increment the current max version. Querying by lastmod is essential because we + * want to use the {lastmod} index on the chunks collection. This makes potential cursor yields to + * apply split/merge/move updates safe: updates always move or insert documents at the end of the + * index (because the document updates always have higher lastmod), so changed always come *after* + * our current cursor position and are seen when the cursor recommences. + * + * The sort must be by ascending version so that the updates can be applied in-memory in order. This + * is important because it is possible for a cursor to read updates to the same _id document twice, + * due to the yield described above. If updates are applied in ascending version order, the newer + * update is applied last. + */ +QueryAndSort createShardChunkDiffQuery(const ChunkVersion& collectionVersion); + +/** * Writes a persisted signal to indicate that the chunks collection is being updated. It is * essential to call this before updating the chunks collection for 'nss' so that secondaries do not * use incomplete metadata. @@ -118,18 +143,22 @@ Status updateShardCollectionsEntry(OperationContext* opCtx, const bool upsert); /** - * Reads the shard server's chunks collection corresponding to 'nss' for chunks with lastmod $gte - * 'collectionVersion'. + * Reads the shard server's chunks collection corresponding to 'nss' for chunks matching 'query', + * returning at most 'limit' chunks in 'sort' order. 'epoch' populates the returned chunks' version + * fields, because we do not yet have UUIDs to replace epoches nor UUIDs associated with namespaces. */ StatusWith<std::vector<ChunkType>> readShardChunks(OperationContext* opCtx, const NamespaceString& nss, - const ChunkVersion& collectionVersion); + const BSONObj& query, + const BSONObj& sort, + boost::optional<long long> limit, + const OID& epoch); /** * Takes a vector of 'chunks' and updates the shard's chunks collection for 'nss'. Any chunk * documents in config.chunks.ns that overlap with a chunk in 'chunks' is removed as the updated - * chunk document is inserted. If the epoch of any chunk in 'chunks' does not match 'currEpoch', - * the chunk metadata is dropped and a ConflictingOperationInProgress error is returned. + * chunk document is inserted. If the epoch of a chunk in 'chunks' does not match 'currEpoch', + * a ConflictingOperationInProgress error is returned and no more updates are applied. * * Note: two threads running this function in parallel for the same collection can corrupt the * collection data! diff --git a/src/mongo/db/s/shard_metadata_util_test.cpp b/src/mongo/db/s/shard_metadata_util_test.cpp index f21b3109a64..cae6de39283 100644 --- a/src/mongo/db/s/shard_metadata_util_test.cpp +++ b/src/mongo/db/s/shard_metadata_util_test.cpp @@ -248,8 +248,14 @@ TEST_F(ShardMetadataUtilTest, WriteAndReadChunks) { checkChunks(kChunkMetadataNss, chunks); // read all the chunks - std::vector<ChunkType> readChunks = assertGet(readShardChunks( - operationContext(), kNss, ChunkVersion(0, 0, getCollectionVersion().epoch()))); + QueryAndSort allChunkDiff = + createShardChunkDiffQuery(ChunkVersion(0, 0, getCollectionVersion().epoch())); + std::vector<ChunkType> readChunks = assertGet(readShardChunks(operationContext(), + kNss, + allChunkDiff.query, + allChunkDiff.sort, + boost::none, + getCollectionVersion().epoch())); for (auto chunkIt = chunks.begin(), readChunkIt = readChunks.begin(); chunkIt != chunks.end() && readChunkIt != readChunks.end(); ++chunkIt, ++readChunkIt) { @@ -257,29 +263,32 @@ TEST_F(ShardMetadataUtilTest, WriteAndReadChunks) { } // read only the highest version chunk - readChunks = assertGet(readShardChunks(operationContext(), kNss, getCollectionVersion())); + QueryAndSort oneChunkDiff = createShardChunkDiffQuery(getCollectionVersion()); + readChunks = assertGet(readShardChunks(operationContext(), + kNss, + oneChunkDiff.query, + oneChunkDiff.sort, + boost::none, + getCollectionVersion().epoch())); ASSERT_TRUE(readChunks.size() == 1); ASSERT_BSONOBJ_EQ(chunks.back().toShardBSON(), readChunks.front().toShardBSON()); } -TEST_F(ShardMetadataUtilTest, UpdatingChunksFindsNewEpochAndClearsMetadata) { - // Set up a collections document so we can make sure it's deleted correctly. - setUpCollection(); - +TEST_F(ShardMetadataUtilTest, UpdatingChunksFindsNewEpoch) { std::vector<ChunkType> chunks = makeFourChunks(); ASSERT_OK(updateShardChunks(operationContext(), kNss, chunks, getCollectionVersion().epoch())); checkChunks(kChunkMetadataNss, chunks); + ChunkVersion originalChunkVersion = chunks.back().getVersion(); chunks.back().setVersion(ChunkVersion(1, 0, OID::gen())); ASSERT_EQUALS( updateShardChunks(operationContext(), kNss, chunks, getCollectionVersion().epoch()).code(), ErrorCodes::ConflictingOperationInProgress); - // Finding a new epoch should have caused the metadata to be cleared for that namespace. - checkCollectionIsEmpty(kChunkMetadataNss); - // Collections collection should be empty because it only had one entry. - checkCollectionIsEmpty(NamespaceString(ShardCollectionType::ConfigNS)); + // Check that the chunk with a different epoch did not get written. + chunks.back().setVersion(std::move(originalChunkVersion)); + checkChunks(kChunkMetadataNss, chunks); } TEST_F(ShardMetadataUtilTest, UpdateWithWriteNewChunks) { diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp new file mode 100644 index 00000000000..368eb23abec --- /dev/null +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp @@ -0,0 +1,630 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/shard_server_catalog_cache_loader.h" + +#include "mongo/db/client.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/s/shard_metadata_util.h" +#include "mongo/s/catalog/type_shard_collection.h" +#include "mongo/s/config_server_catalog_cache_loader.h" +#include "mongo/stdx/memory.h" +#include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/log.h" + +namespace mongo { + +using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunks; +using namespace shardmetadatautil; + +namespace { + +/** + * Constructs the options for the loader thread pool. + */ +ThreadPool::Options makeDefaultThreadPoolOptions() { + ThreadPool::Options options; + options.poolName = "ShardServerCatalogCacheLoader"; + options.minThreads = 0; + options.maxThreads = 6; + + // Ensure all threads have a client. + options.onCreateThread = [](const std::string& threadName) { + Client::initThread(threadName.c_str()); + }; + + return options; +} + +/** + * Takes a CollectionAndChangedChunks object and persists the changes to the shard's metadata + * collections. + * + * Returns ConflictingOperationInProgress if a chunk is found with a new epoch. + */ +Status persistCollectionAndChangedChunks(OperationContext* opCtx, + const NamespaceString& nss, + const CollectionAndChangedChunks& collAndChunks) { + // Update the collections collection entry for 'nss' in case there are any new updates. + ShardCollectionType update = ShardCollectionType(nss, + nss, + collAndChunks.epoch, + collAndChunks.shardKeyPattern, + collAndChunks.defaultCollation, + collAndChunks.shardKeyIsUnique); + Status status = updateShardCollectionsEntry(opCtx, + BSON(ShardCollectionType::uuid() << nss.ns()), + update.toBSON(), + BSONObj(), + true /*upsert*/); + if (!status.isOK()) { + return status; + } + + // Mark the chunk metadata as refreshing, so that secondaries are aware of refresh. + status = setPersistedRefreshFlags(opCtx, nss); + if (!status.isOK()) { + return status; + } + + // Update the chunks. + status = updateShardChunks(opCtx, nss, collAndChunks.changedChunks, collAndChunks.epoch); + if (!status.isOK()) { + return status; + } + + // Mark the chunk metadata as done refreshing. + status = unsetPersistedRefreshFlags(opCtx, nss); + if (!status.isOK()) { + return status; + } + + return Status::OK(); +} + +/** + * Retrieves the persisted max chunk version for 'nss', if there are any persisted chunks. If there + * are none -- meaning there's no persisted metadata for 'nss' --, returns a + * ChunkVersion::UNSHARDED() version. + * + * It is unsafe to call this when a task for 'nss' is running concurrently. + */ +ChunkVersion getPersistedMaxVersion(OperationContext* opCtx, const NamespaceString& nss) { + // Must read the collections entry to get the epoch to pass into ChunkType for shard's chunk + // collection. + auto statusWithCollection = readShardCollectionsEntry(opCtx, nss); + if (statusWithCollection == ErrorCodes::NamespaceNotFound) { + // There is no persisted metadata. + return ChunkVersion::UNSHARDED(); + } + uassert(ErrorCodes::OperationFailed, + str::stream() << "Failed to read persisted collections entry for collection '" + << nss.ns() + << "' due to '" + << statusWithCollection.getStatus().toString() + << "'.", + statusWithCollection.isOK()); + + auto statusWithChunk = + shardmetadatautil::readShardChunks(opCtx, + nss, + BSONObj(), + BSON(ChunkType::DEPRECATED_lastmod() << -1), + 1LL, + statusWithCollection.getValue().getEpoch()); + uassert(ErrorCodes::OperationFailed, + str::stream() << "Failed to read highest version persisted chunk for collection '" + << nss.ns() + << "' due to '" + << statusWithChunk.getStatus().toString() + << "'.", + statusWithChunk.isOK()); + + return statusWithChunk.getValue().empty() ? ChunkVersion::UNSHARDED() + : statusWithChunk.getValue().front().getVersion(); +} + +/** + * Tries to find persisted chunk metadata with chunk versions GTE to 'version'. Should always + * return metadata if the collection exists. + * + * If 'version's epoch matches persisted metadata, returns GTE persisted metadata. + * If 'version's epoch doesn't match persisted metadata, returns all persisted metadata. + * If nothing there is no persisted metadata, returns an empty CollectionAndChangedChunks object. + */ +StatusWith<CollectionAndChangedChunks> getPersistedMetadataSinceVersion(OperationContext* opCtx, + const NamespaceString& nss, + ChunkVersion version) { + auto swShardCollectionEntry = readShardCollectionsEntry(opCtx, nss); + if (swShardCollectionEntry == ErrorCodes::NamespaceNotFound) { + // If there is no metadata, collection does not exist. Return empty results. + return CollectionAndChangedChunks(); + } else if (!swShardCollectionEntry.isOK()) { + return StatusWith<CollectionAndChangedChunks>( + ErrorCodes::OperationFailed, + str::stream() << "Failed to load local collection metadata due to '" + << swShardCollectionEntry.getStatus().toString() + << "'."); + } + auto shardCollectionEntry = std::move(swShardCollectionEntry.getValue()); + + // If the persisted epoch doesn't match what the CatalogCache requested, read everything. + ChunkVersion startingVersion; + if (shardCollectionEntry.getEpoch() != version.epoch()) { + startingVersion = ChunkVersion(0, 0, shardCollectionEntry.getEpoch()); + } else { + startingVersion = version; + } + + QueryAndSort diff = createShardChunkDiffQuery(startingVersion); + + auto swChangedChunks = + readShardChunks(opCtx, nss, diff.query, diff.sort, boost::none, startingVersion.epoch()); + if (!swChangedChunks.isOK()) { + return StatusWith<CollectionAndChangedChunks>( + ErrorCodes::OperationFailed, + str::stream() << "Failed to load local collection metadata due to '" + << swChangedChunks.getStatus().toString() + << "'."); + } else if (swChangedChunks.getValue().empty()) { + // No chunks were found, collection was dropped. Return empty results. + return CollectionAndChangedChunks(); + } + + // Make sure the collections entry epoch has not changed. Otherwise an epoch changing update was + // applied after we originally read the entry and the chunks may not match the original epoch. + + auto swAfterShardCollectionEntry = readShardCollectionsEntry(opCtx, nss); + if (swAfterShardCollectionEntry == ErrorCodes::NamespaceNotFound) { + // The collection has been dropped since we began loading, return empty results. + return CollectionAndChangedChunks(); + } else if (!swAfterShardCollectionEntry.isOK()) { + return StatusWith<CollectionAndChangedChunks>( + ErrorCodes::OperationFailed, + str::stream() << "Failed to reload local collection metadata due to '" + << swAfterShardCollectionEntry.getStatus().toString() + << "'."); + } + + if (shardCollectionEntry.getEpoch() != swAfterShardCollectionEntry.getValue().getEpoch()) { + // The collection was dropped and recreated since we began. Return empty results. + return CollectionAndChangedChunks(); + } + + return CollectionAndChangedChunks{shardCollectionEntry.getEpoch(), + shardCollectionEntry.getKeyPattern().toBSON(), + shardCollectionEntry.getDefaultCollation(), + shardCollectionEntry.getUnique(), + std::move(swChangedChunks.getValue())}; +} + +} // namespace + +ShardServerCatalogCacheLoader::ShardServerCatalogCacheLoader( + std::unique_ptr<CatalogCacheLoader> configLoader) + : _configServerLoader(std::move(configLoader)), _threadPool(makeDefaultThreadPoolOptions()) { + _threadPool.startup(); +} + +ShardServerCatalogCacheLoader::~ShardServerCatalogCacheLoader() { + _threadPool.shutdown(); + _threadPool.join(); +} + + +std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSince( + const NamespaceString& nss, + ChunkVersion version, + stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) { + + // TODO: plug in secondary machinery, with onStepDown and onBecomePrimary tasks: clear TaskLists + // and thread pool + + auto notify = std::make_shared<Notification<void>>(); + + uassertStatusOK(_threadPool.schedule([ this, nss, version, callbackFn, notify ]() noexcept { + auto opCtx = Client::getCurrent()->makeOperationContext(); + try { + _schedulePrimayGetChunksSince(opCtx.get(), nss, version, callbackFn, notify); + } catch (const DBException& ex) { + callbackFn(opCtx.get(), ex.toStatus()); + notify->set(); + } + })); + + return notify; +} + +void ShardServerCatalogCacheLoader::_schedulePrimayGetChunksSince( + OperationContext* opCtx, + const NamespaceString& nss, + const ChunkVersion& catalogCacheSinceVersion, + stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn, + std::shared_ptr<Notification<void>> notify) { + + // Get the max version the loader has. + const ChunkVersion maxLoaderVersion = [&] { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + auto taskListIt = _taskLists.find(nss); + + if (taskListIt != _taskLists.end()) { + // Enqueued tasks have the latest metadata + return taskListIt->second.getHighestVersionEnqueued(); + } + } + + // If there are no enqueued tasks, get the max persisted + return getPersistedMaxVersion(opCtx, nss); + }(); + + auto remoteRefreshCallbackFn = + [this, nss, catalogCacheSinceVersion, maxLoaderVersion, notify, callbackFn]( + OperationContext* opCtx, + StatusWith<CollectionAndChangedChunks> swCollectionAndChangedChunks) { + + if (!swCollectionAndChangedChunks.isOK() && + swCollectionAndChangedChunks != ErrorCodes::NamespaceNotFound) { + // No updates to apply. Do nothing. + } else { + // Enqueue a Task to apply the update retrieved from the config server. + Status scheduleStatus = + _scheduleTask(nss, Task{swCollectionAndChangedChunks, maxLoaderVersion}); + if (!scheduleStatus.isOK()) { + callbackFn(opCtx, StatusWith<CollectionAndChangedChunks>(scheduleStatus)); + notify->set(); + return; + } + + if (swCollectionAndChangedChunks.isOK()) { + // Create a response for the CatalogCache from the loader's metadata + // -- both persisted and enqueued. + + swCollectionAndChangedChunks = + _getLoaderMetadata(opCtx, nss, catalogCacheSinceVersion); + // If no results were returned, convert the response into + // NamespaceNotFound. + if (swCollectionAndChangedChunks.isOK() && + swCollectionAndChangedChunks.getValue().changedChunks.empty()) { + swCollectionAndChangedChunks = + Status(ErrorCodes::NamespaceNotFound, "collection was dropped"); + } + } + } + + // Complete the callbackFn work. + callbackFn(opCtx, std::move(swCollectionAndChangedChunks)); + notify->set(); + }; + + // Refresh the loader's metadata from the config server. The caller's request will + // then be serviced from the loader's up-to-date metadata. + _configServerLoader->getChunksSince(nss, maxLoaderVersion, remoteRefreshCallbackFn); +} + +StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoaderMetadata( + OperationContext* opCtx, + const NamespaceString& nss, + const ChunkVersion& catalogCacheSinceVersion) { + + // Get the enqueued metadata first. Otherwise we could miss data between reading persisted and + // enqueued, if an enqueued task finished after the persisted read but before the enqueued read. + + auto enqueuedRes = _getEnqueuedMetadata(nss, catalogCacheSinceVersion); + bool isEnqueued = std::move(enqueuedRes.first); + CollectionAndChangedChunks enqueued = std::move(enqueuedRes.second); + + auto swPersisted = getPersistedMetadataSinceVersion(opCtx, nss, catalogCacheSinceVersion); + if (!swPersisted.isOK()) { + return swPersisted; + } + CollectionAndChangedChunks persisted = std::move(swPersisted.getValue()); + + if (!isEnqueued) { + // There are no tasks in the queue. Return the persisted metadata. + return persisted; + } else if (enqueued.changedChunks.empty() || enqueued.epoch != persisted.epoch) { + // There is a task queue. Either: + // - nothing was returned, which means the last task enqueued is a drop task. + // - the epoch changed in the enqueued metadata, which means there's a drop operation + // enqueued somewhere. + // Either way, the persisted metadata is out-dated. Return enqueued results. + return enqueued; + } else if (persisted.changedChunks.empty()) { + // Nothing is persisted. Return enqueued results. + return enqueued; + } else { + // There can be overlap between persisted and enqueued metadata because enqueued work can + // be applied while persisted was read. We must remove this overlap. + + const ChunkVersion minEnqueuedVersion = enqueued.changedChunks.front().getVersion(); + + // Remove chunks from 'persisted' that are GTE the minimum in 'enqueued' -- this is + // the overlap. + auto persistedChangedChunksIt = persisted.changedChunks.begin(); + while (persistedChangedChunksIt != persisted.changedChunks.end() && + persistedChangedChunksIt->getVersion() < minEnqueuedVersion) { + ++persistedChangedChunksIt; + } + persisted.changedChunks.erase(persistedChangedChunksIt, persisted.changedChunks.end()); + + // Append 'enqueued's chunks to 'persisted', which no longer overlaps. + persisted.changedChunks.insert(persisted.changedChunks.end(), + enqueued.changedChunks.begin(), + enqueued.changedChunks.end()); + + return persisted; + } +} + +std::pair<bool, CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getEnqueuedMetadata( + const NamespaceString& nss, const ChunkVersion& catalogCacheSinceVersion) { + stdx::unique_lock<stdx::mutex> lock(_mutex); + auto taskListIt = _taskLists.find(nss); + + if (taskListIt == _taskLists.end()) { + return std::make_pair(false, CollectionAndChangedChunks()); + } + + CollectionAndChangedChunks collAndChunks = taskListIt->second.getEnqueuedMetadata(); + + // Returns all the results if 'catalogCacheSinceVersion's epoch does not match. Otherwise, trim + // the results to be GTE to 'catalogCacheSinceVersion' + + if (collAndChunks.epoch != catalogCacheSinceVersion.epoch()) { + return std::make_pair(true, collAndChunks); + } + + auto changedChunksIt = collAndChunks.changedChunks.begin(); + while (changedChunksIt != collAndChunks.changedChunks.end() && + changedChunksIt->getVersion() < catalogCacheSinceVersion) { + ++changedChunksIt; + } + collAndChunks.changedChunks.erase(collAndChunks.changedChunks.begin(), changedChunksIt); + + return std::make_pair(true, collAndChunks); +} + +Status ShardServerCatalogCacheLoader::_scheduleTask(const NamespaceString& nss, Task task) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + const bool wasEmpty = _taskLists[nss].empty(); + _taskLists[nss].addTask(std::move(task)); + + if (wasEmpty) { + return _threadPool.schedule([this, nss]() { + Status status = _threadPool.schedule([this, nss]() { _runTasks(nss); }); + if (!status.isOK()) { + log() << "CatalogCacheLoader failed to schedule more persisted metadata update" + << " tasks for namespace '" << nss << "' due to '" << redact(status) + << "'. Clearing task list so that scheduling" + << " will be attempted by the next caller to refresh this namespace."; + stdx::lock_guard<stdx::mutex> lock(_mutex); + _taskLists.erase(nss); + } + }); + } + + return Status::OK(); +} + +void ShardServerCatalogCacheLoader::_runTasks(const NamespaceString& nss) { + auto opCtx = Client::getCurrent()->makeOperationContext(); + + // Run task + bool taskFinished = false; + try { + taskFinished = _updatePersistedMetadata(opCtx.get(), nss); + } catch (const DBException& ex) { + log() << redact(ex.toStatus()); + } + + stdx::lock_guard<stdx::mutex> lock(_mutex); + + // If task completed successfully, remove it from work queue + if (taskFinished) { + invariant(!_taskLists[nss].empty()); + _taskLists[nss].removeActiveTask(); + } + + // Schedule more work if there is any + if (!_taskLists[nss].empty()) { + Status status = _threadPool.schedule([this, nss]() { _runTasks(nss); }); + if (!status.isOK()) { + log() << "CatalogCacheLoader failed to schedule more persisted metadata update" + << " tasks for namespace '" << nss << "' due to '" << redact(status) + << "'. Clearing task list so that scheduling will be attempted by the next" + << " caller to refresh this namespace."; + stdx::lock_guard<stdx::mutex> lock(_mutex); + _taskLists.erase(nss); + } + } else { + _taskLists.erase(nss); + } +} + +bool ShardServerCatalogCacheLoader::_updatePersistedMetadata(OperationContext* opCtx, + const NamespaceString& nss) { + stdx::unique_lock<stdx::mutex> lock(_mutex); + + invariant(!_taskLists[nss].empty()); + const Task task = _taskLists[nss].getActiveTask(); + invariant(task.dropped || !task.collectionAndChangedChunks->changedChunks.empty()); + + lock.unlock(); + + // Check if this is a drop task. + + if (task.dropped) { + // The namespace was dropped. The persisted metadata for the collection must be cleared. + Status status = dropChunksAndDeleteCollectionsEntry(opCtx, nss); + uassert(ErrorCodes::OperationFailed, + str::stream() << "Failed to clear persisted chunk metadata for collection '" + << nss.ns() + << "' due to '" + << status.toString() + << "'. Will be retried.", + status.isOK()); + + LOG(1) << "Successfully cleared persisted chunk metadata for collection '" << nss << "'."; + return true; + } + + // This is an update task. + + ChunkVersion persistedMaxVersion = getPersistedMaxVersion(opCtx, nss); + + // If the epoch of the update task does not match the persisted metadata, the persisted metadata + // -- from an old collection that was recreated -- must be cleared before applying the changes. + if (persistedMaxVersion.isSet() && + persistedMaxVersion.epoch() != task.maxQueryVersion.epoch()) { + Status status = dropChunksAndDeleteCollectionsEntry(opCtx, nss); + uassert(ErrorCodes::OperationFailed, + str::stream() << "Failed to clear persisted chunk metadata for collection '" + << nss.ns() + << "' due to '" + << status.toString() + << "'. Will be retried.", + status.isOK()); + } + + Status status = + persistCollectionAndChangedChunks(opCtx, nss, task.collectionAndChangedChunks.get()); + if (status == ErrorCodes::ConflictingOperationInProgress) { + // A new epoch was discovered in the new chunks. The CatalogCache will retry refreshing the + // chunk metadata: clearing the persisted metadata will be handled then. + return true; + } + uassert(ErrorCodes::OperationFailed, + str::stream() << "Failed to update the persisted chunk metadata for collection '" + << nss.ns() + << "' from '" + << task.minQueryVersion.toString() + << "' to '" + << task.maxQueryVersion.toString() + << "' due to '" + << status.toString() + << "'. Will be retried.", + status.isOK()); + + LOG(1) << "Successfully updated persisted chunk metadata for collection '" << nss << "' from '" + << task.minQueryVersion << "' to collection version '" << task.maxQueryVersion << "'."; + return true; +} + +ShardServerCatalogCacheLoader::Task::Task( + StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks, + ChunkVersion minimumQueryVersion) { + + minQueryVersion = minimumQueryVersion; + + if (statusWithCollectionAndChangedChunks.isOK()) { + collectionAndChangedChunks = statusWithCollectionAndChangedChunks.getValue(); + invariant(!collectionAndChangedChunks->changedChunks.empty()); + maxQueryVersion = collectionAndChangedChunks->changedChunks.back().getVersion(); + } else { + invariant(statusWithCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound); + dropped = true; + maxQueryVersion = ChunkVersion::UNSHARDED(); + } +} + +void ShardServerCatalogCacheLoader::TaskList::addTask(Task task) { + if (_tasks.empty()) { + _tasks.emplace_back(std::move(task)); + return; + } + + if (task.dropped) { + invariant(_tasks.back().maxQueryVersion.equals(task.minQueryVersion)); + + Task front = std::move(_tasks.front()); + _tasks.clear(); + _tasks.emplace_back(std::move(front)); + + // No need to schedule a drop if one is already currently active. + if (!_tasks.front().dropped) { + _tasks.emplace_back(std::move(task)); + } + } else { + // Tasks must have contiguous versions, unless a complete reload occurs. + invariant(_tasks.back().maxQueryVersion.equals(task.minQueryVersion) || + !task.minQueryVersion.isSet()); + + _tasks.emplace_back(std::move(task)); + } +} + +const ShardServerCatalogCacheLoader::Task& ShardServerCatalogCacheLoader::TaskList::getActiveTask() + const { + invariant(!_tasks.empty()); + return _tasks.front(); +} + +void ShardServerCatalogCacheLoader::TaskList::removeActiveTask() { + invariant(!_tasks.empty()); + _tasks.pop_front(); +} + +ChunkVersion ShardServerCatalogCacheLoader::TaskList::getHighestVersionEnqueued() const { + invariant(!_tasks.empty()); + return _tasks.back().maxQueryVersion; +} + +CollectionAndChangedChunks ShardServerCatalogCacheLoader::TaskList::getEnqueuedMetadata() const { + CollectionAndChangedChunks collAndChunks; + for (const auto& task : _tasks) { + if (task.dropped) { + // A drop task should reset the metadata. + collAndChunks = CollectionAndChangedChunks(); + } else { + if (task.collectionAndChangedChunks->epoch != collAndChunks.epoch) { + // An epoch change should reset the metadata and start from the new. + collAndChunks = task.collectionAndChangedChunks.get(); + } else { + // Epochs match, so the new results should be appended. + // + // Note: it's okay if the new chunks change to a new version epoch in the middle of + // the chunks vector. This will be either reset by the next task with a total reload + // with a new epoch, or cause the original getChunksSince caller to throw out the + // results and refresh again. + collAndChunks.changedChunks.insert( + collAndChunks.changedChunks.end(), + task.collectionAndChangedChunks->changedChunks.begin(), + task.collectionAndChangedChunks->changedChunks.end()); + } + } + } + return collAndChunks; +} + +} // namespace mongo diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h new file mode 100644 index 00000000000..5be9eb51727 --- /dev/null +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h @@ -0,0 +1,245 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/s/catalog_cache_loader.h" +#include "mongo/util/concurrency/thread_pool.h" + +namespace mongo { + +class ConfigServerCatalogCacheLoader; +class ThreadPoolInterface; + +/** + * Shard implementation of the CatalogCacheLoader used by the CatalogCache. Retrieves chunk metadata + * for the CatalogCache on shards. + * + * If a shard primary, retrieves chunk metadata from the config server and maintains a persisted + * copy of that chunk metadata so shard secondaries can access the metadata. If a shard secondary, + * retrieves chunk metadata from the shard persisted chunk metadata. + */ +class ShardServerCatalogCacheLoader : public CatalogCacheLoader { +public: + ShardServerCatalogCacheLoader(std::unique_ptr<CatalogCacheLoader> configLoader); + ~ShardServerCatalogCacheLoader(); + + /** + * This must be called serially, never in parallel, including waiting for the returned + * Notification to be signalled. + * + * This function is robust to unexpected version requests from the CatalogCache. Requesting + * versions with epoches that do not match anything on the config server will not affect or + * clear the locally persisted metadata. Requesting versions higher than anything previous + * requested, or versions lower than already requested, will not mess up the locally persisted + * metadata, and will return what was requested if it exists. + */ + std::shared_ptr<Notification<void>> getChunksSince( + const NamespaceString& nss, + ChunkVersion version, + stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) + override; + +private: + /** + * This represents an update task for the persisted chunk metadata. The task will either be to + * apply a set up updated chunks to the shard persisted metadata store or to drop the persisted + * metadata for a specific collection. + */ + struct Task { + /** + * Initializes a task for either dropping or updating the persisted metadata for the + * associated collection. Which type of task is determined by the Status of + * 'statusWithCollectionAndChangedChunks', whether it is NamespaceNotFound or OK. + * + * Note: statusWithCollectionAndChangedChunks must always be NamespaceNotFound or + * OK, otherwise the constructor will invariant because there is no task to complete. + * + * 'collectionAndChangedChunks' is only initialized if 'dropped' is false. + * 'minimumQueryVersion' sets 'minQueryVersion'. + * 'maxQueryVersion' is either set to the highest chunk version in + * 'collectionAndChangedChunks' or ChunkVersion::UNSHARDED(). + */ + Task(StatusWith<CollectionAndChangedChunks> statusWithCollectionAndChangedChunks, + ChunkVersion minimumQueryVersion); + + // Chunks and Collection updates to be applied to the shard persisted metadata store. + boost::optional<CollectionAndChangedChunks> collectionAndChangedChunks{boost::none}; + + // The highest version that the loader had before going to the config server's metadata + // store for updated chunks. + // Used by the TaskList below to enforce consistent updates are applied. + ChunkVersion minQueryVersion; + + // Either the highest chunk version in 'collectionAndChangedChunks' or the same as + // 'minQueryVersion' if 'dropped' is true. + // Used by the TaskList below to enforce consistent updates are applied. + ChunkVersion maxQueryVersion; + + // Indicates whether the collection metadata must be cleared. + bool dropped{false}; + }; + + /** + * A list (work queue) of updates to apply to the shard persisted metadata store for a specific + * collection. Enforces that tasks that are added to the list are either consistent: + * + * tasks[i].minQueryVersion == tasks[i-1].maxQueryVersion. + * + * or applying a complete update from the minumum version, where + * + * minQueryVersion == ChunkVersion::UNSHARDED(). + */ + class TaskList { + public: + /** + * Adds 'task' to the back of the 'tasks' list. + * + * If 'task' is a drop task, clears 'tasks' except for the front active task, so that we + * don't waste time applying changes we will just delete. If the one remaining task in the + * list is already a drop task, the new one isn't added because it is redundant. + */ + void addTask(Task task); + + /** + * Returns the front of the 'tasks' list. Invariants if 'tasks' is empty. + */ + const Task& getActiveTask() const; + + /** + * Erases the current active task and updates 'activeTask' to the next task in 'tasks'. + */ + void removeActiveTask(); + + /** + * Checks whether there are any tasks left. + */ + const bool empty() { + return _tasks.empty(); + } + + /** + * Gets the last task's highest version -- this is the most up to date version. + */ + ChunkVersion getHighestVersionEnqueued() const; + + /** + * Iterates over the task list to retrieve the enqueued metadata. + */ + CollectionAndChangedChunks getEnqueuedMetadata() const; + + private: + std::list<Task> _tasks{}; + }; + + typedef std::map<NamespaceString, TaskList> TaskLists; + + /** + * Refreshes chunk metadata from the config server's metadata store, and schedules maintenance + * of the shard's persisted metadata store with the latest updates retrieved from the config + * server. + * + * Then calls 'callbackFn' with metadata loaded from the shard persisted metadata store, and any + * in-memory task enqueued to update that store, GTE to 'catalogCacheSinceVersion' + * + * Only run on the shard primary. + */ + void _schedulePrimayGetChunksSince( + OperationContext* opCtx, + const NamespaceString& nss, + const ChunkVersion& catalogCacheSinceVersion, + stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn, + std::shared_ptr<Notification<void>> notify); + + + /** + * Loads chunk metadata from the shard persisted metadata store, and any in-memory task enqueued + * to update that store, GTE to 'catalogCacheSinceVersion'. + * + * Will return an empty CollectionAndChangedChunks object if no metadata is found (collection + * was dropped). + * + * Only run on the shard primary. + */ + StatusWith<CollectionAndChangedChunks> _getLoaderMetadata( + OperationContext* opCtx, + const NamespaceString& nss, + const ChunkVersion& catalogCacheSinceVersion); + + /** + * Loads chunk metadata from all in-memory tasks enqueued to update the shard persisted metadata + * store for collection 'nss' that is GTE 'catalogCacheSinceVersion'. If + * 'catalogCacheSinceVersion's epoch does not match that of the metadata enqueued, returns all + * metadata. + * + * The bool returned in the pair indicates whether there are any tasks enqueued. If none are, it + * is false. If it is true, and the CollectionAndChangedChunks returned is empty, this indicates + * a drop was enqueued and there is no metadata. + * + * Only run on the shard primary. + */ + std::pair<bool, CollectionAndChangedChunks> _getEnqueuedMetadata( + const NamespaceString& nss, const ChunkVersion& catalogCacheSinceVersion); + + /** + * Adds 'task' to the task list for 'nss'. If this creates a new task list, then '_runTasks' is + * started on another thread to execute the tasks. + * + * Only run on the shard primary. + */ + Status _scheduleTask(const NamespaceString& nss, Task task); + + /** + * Schedules tasks in the 'nss' task list to execute until the task list is depleted. + * + * Only run on the shard primary. + */ + void _runTasks(const NamespaceString& nss); + + /** + * Executes the task at the front of the task list for 'nss'. The task will either drop 'nss's + * metadata or apply a set of updates. + * + * Only run on the shard primary. + */ + bool _updatePersistedMetadata(OperationContext* opCtx, const NamespaceString& nss); + + // Used by the shard primary to retrieve chunk metadata from the config server. + const std::unique_ptr<CatalogCacheLoader> _configServerLoader; + + // Thread pool used to load chunk metadata. + ThreadPool _threadPool; + + // Protects the class state below. + stdx::mutex _mutex; + + // Map to track in progress persisted cache updates on the shard primary. + TaskLists _taskLists; +}; + +} // namespace mongo diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp index b329a325423..46dd675fc13 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.cpp +++ b/src/mongo/db/s/sharding_initialization_mongod.cpp @@ -38,11 +38,13 @@ #include "mongo/client/remote_command_targeter_factory_impl.h" #include "mongo/db/logical_time_metadata_hook.h" #include "mongo/db/operation_context.h" +#include "mongo/db/s/shard_server_catalog_cache_loader.h" #include "mongo/db/s/sharding_egress_metadata_hook_for_mongod.h" #include "mongo/db/server_options.h" #include "mongo/executor/task_executor.h" #include "mongo/rpc/metadata/egress_metadata_hook_list.h" #include "mongo/s/catalog/sharding_catalog_manager_impl.h" +#include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_factory.h" #include "mongo/s/client/shard_local.h" #include "mongo/s/client/shard_remote.h" @@ -83,11 +85,18 @@ Status initializeGlobalShardingStateForMongod(OperationContext* opCtx, auto shardFactory = stdx::make_unique<ShardFactory>(std::move(buildersMap), std::move(targeterFactory)); + std::unique_ptr<CatalogCache> catalogCache = + (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) + ? stdx::make_unique<CatalogCache>() + : stdx::make_unique<CatalogCache>(stdx::make_unique<ShardServerCatalogCacheLoader>( + stdx::make_unique<ConfigServerCatalogCacheLoader>())); + return initializeGlobalShardingState( opCtx, configCS, distLockProcessId, std::move(shardFactory), + std::move(catalogCache), [opCtx] { auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>(); hookList->addHook( diff --git a/src/mongo/s/catalog/type_chunk.cpp b/src/mongo/s/catalog/type_chunk.cpp index eb286b7a631..5d40cc2ecdf 100644 --- a/src/mongo/s/catalog/type_chunk.cpp +++ b/src/mongo/s/catalog/type_chunk.cpp @@ -343,10 +343,6 @@ std::string ChunkType::genID(StringData ns, const BSONObj& o) { } Status ChunkType::validate() const { - if (!_ns.is_initialized() || _ns->empty()) { - return Status(ErrorCodes::NoSuchKey, str::stream() << "missing " << ns.name() << " field"); - } - if (!_min.is_initialized() || _min->isEmpty()) { return Status(ErrorCodes::NoSuchKey, str::stream() << "missing " << min.name() << " field"); } diff --git a/src/mongo/s/catalog/type_chunk.h b/src/mongo/s/catalog/type_chunk.h index 96ee1588a6a..e80a7df3029 100644 --- a/src/mongo/s/catalog/type_chunk.h +++ b/src/mongo/s/catalog/type_chunk.h @@ -255,7 +255,7 @@ public: private: // Convention: (M)andatory, (O)ptional, (S)pecial; (C)onfig, (S)hard. - // (M)(C) collection this chunk is in + // (O)(C) collection this chunk is in boost::optional<std::string> _ns; // (M)(C)(S) first key of the range, inclusive boost::optional<BSONObj> _min; diff --git a/src/mongo/s/catalog/type_shard_collection.h b/src/mongo/s/catalog/type_shard_collection.h index 43118b4c0d5..c3637e2c5aa 100644 --- a/src/mongo/s/catalog/type_shard_collection.h +++ b/src/mongo/s/catalog/type_shard_collection.h @@ -79,6 +79,13 @@ public: static const BSONField<bool> refreshing; static const BSONField<long long> refreshSequenceNumber; + explicit ShardCollectionType(const NamespaceString& uuid, + const NamespaceString& nss, + const OID& epoch, + const KeyPattern& keyPattern, + const BSONObj& defaultCollation, + const bool& unique); + /** * Constructs a new ShardCollectionType object from BSON. Also does validation of the contents. */ @@ -145,13 +152,6 @@ public: } private: - ShardCollectionType(const NamespaceString& uuid, - const NamespaceString& nss, - const OID& epoch, - const KeyPattern& keyPattern, - const BSONObj& defaultCollation, - const bool& unique); - // Will become the UUID when available. Currently a duplicate of '_nss'. NamespaceString _uuid; diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp index 87f53ebadad..d81f222bfde 100644 --- a/src/mongo/s/catalog_cache.cpp +++ b/src/mongo/s/catalog_cache.cpp @@ -40,9 +40,7 @@ #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/config_server_catalog_cache_loader.h" #include "mongo/s/grid.h" -#include "mongo/stdx/memory.h" #include "mongo/util/log.h" #include "mongo/util/timer.h" @@ -157,6 +155,9 @@ std::shared_ptr<ChunkManager> refreshCollectionRoutingInfo( CatalogCache::CatalogCache() : _cacheLoader(stdx::make_unique<ConfigServerCatalogCacheLoader>()) {} +CatalogCache::CatalogCache(std::unique_ptr<CatalogCacheLoader> cacheLoader) + : _cacheLoader(std::move(cacheLoader)) {} + CatalogCache::~CatalogCache() = default; StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx, diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h index e13a2b0c557..fa81d0a7100 100644 --- a/src/mongo/s/catalog_cache.h +++ b/src/mongo/s/catalog_cache.h @@ -34,6 +34,8 @@ #include "mongo/s/chunk_manager.h" #include "mongo/s/chunk_version.h" #include "mongo/s/client/shard.h" +#include "mongo/s/config_server_catalog_cache_loader.h" +#include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/notification.h" #include "mongo/util/string_map.h" @@ -55,7 +57,13 @@ class CatalogCache { MONGO_DISALLOW_COPYING(CatalogCache); public: + /** + * Defaults to instantiating a ConfigServerCatalogCacheLoader. + */ CatalogCache(); + + CatalogCache(std::unique_ptr<CatalogCacheLoader> cacheLoader); + ~CatalogCache(); /** diff --git a/src/mongo/s/catalog_cache_loader.h b/src/mongo/s/catalog_cache_loader.h index 6cdfc487513..21702da77e3 100644 --- a/src/mongo/s/catalog_cache_loader.h +++ b/src/mongo/s/catalog_cache_loader.h @@ -68,7 +68,7 @@ public: /** * Non-blocking call, which requests the chunks changed since the specified version to be - * fetched from the persistent matadata store and invokes the callback function with the result. + * fetched from the persistent metadata store and invokes the callback function with the result. * The callback function must never throw - it is a fatal error to do so. * * If for some reason the asynchronous fetch operation cannot be dispatched (for example on diff --git a/src/mongo/s/config_server_catalog_cache_loader.cpp b/src/mongo/s/config_server_catalog_cache_loader.cpp index b26df2d73ae..6b63846e592 100644 --- a/src/mongo/s/config_server_catalog_cache_loader.cpp +++ b/src/mongo/s/config_server_catalog_cache_loader.cpp @@ -47,7 +47,7 @@ namespace { */ ThreadPool::Options makeDefaultThreadPoolOptions() { ThreadPool::Options options; - options.poolName = "CatalogCacheLoader"; + options.poolName = "ConfigServerCatalogCacheLoader"; options.minThreads = 0; options.maxThreads = 6; diff --git a/src/mongo/s/config_server_test_fixture.cpp b/src/mongo/s/config_server_test_fixture.cpp index c5640333022..5bd8d4a9aa1 100644 --- a/src/mongo/s/config_server_test_fixture.cpp +++ b/src/mongo/s/config_server_test_fixture.cpp @@ -66,6 +66,7 @@ #include "mongo/s/client/shard_local.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/client/shard_remote.h" +#include "mongo/s/config_server_catalog_cache_loader.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/set_shard_version_request.h" @@ -144,8 +145,14 @@ std::unique_ptr<ShardingCatalogManager> ConfigServerTestFixture::makeShardingCat return stdx::make_unique<ShardingCatalogManagerImpl>(std::move(specialExec)); } -std::unique_ptr<CatalogCache> ConfigServerTestFixture::makeCatalogCache() { - return stdx::make_unique<CatalogCache>(); +std::unique_ptr<CatalogCacheLoader> ConfigServerTestFixture::makeCatalogCacheLoader() { + return stdx::make_unique<ConfigServerCatalogCacheLoader>(); +} + +std::unique_ptr<CatalogCache> ConfigServerTestFixture::makeCatalogCache( + std::unique_ptr<CatalogCacheLoader> catalogCacheLoader) { + invariant(catalogCacheLoader); + return stdx::make_unique<CatalogCache>(std::move(catalogCacheLoader)); } std::unique_ptr<BalancerConfiguration> ConfigServerTestFixture::makeBalancerConfiguration() { diff --git a/src/mongo/s/config_server_test_fixture.h b/src/mongo/s/config_server_test_fixture.h index 69d08afe17d..9eeb0fa7398 100644 --- a/src/mongo/s/config_server_test_fixture.h +++ b/src/mongo/s/config_server_test_fixture.h @@ -132,7 +132,10 @@ protected: std::unique_ptr<ShardingCatalogManager> makeShardingCatalogManager( ShardingCatalogClient* catalogClient) override; - std::unique_ptr<CatalogCache> makeCatalogCache() override; + std::unique_ptr<CatalogCacheLoader> makeCatalogCacheLoader() override; + + std::unique_ptr<CatalogCache> makeCatalogCache( + std::unique_ptr<CatalogCacheLoader> catalogCacheLoader) override; std::unique_ptr<ClusterCursorManager> makeClusterCursorManager() override; diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index a88d123e535..7e4c84bd01f 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -67,6 +67,7 @@ #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/sharding_catalog_manager.h" +#include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_factory.h" #include "mongo/s/client/shard_registry.h" @@ -203,6 +204,7 @@ static Status initializeSharding(OperationContext* opCtx) { mongosGlobalParams.configdbs, generateDistLockProcessId(opCtx), std::move(shardFactory), + stdx::make_unique<CatalogCache>(), [opCtx]() { auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>(); hookList->addHook( diff --git a/src/mongo/s/shard_server_test_fixture.cpp b/src/mongo/s/shard_server_test_fixture.cpp index 272d6a00641..2e0396248e7 100644 --- a/src/mongo/s/shard_server_test_fixture.cpp +++ b/src/mongo/s/shard_server_test_fixture.cpp @@ -33,9 +33,11 @@ #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/commands.h" #include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/s/shard_server_catalog_cache_loader.h" #include "mongo/s/catalog/dist_lock_catalog_mock.h" #include "mongo/s/catalog/dist_lock_manager_mock.h" #include "mongo/s/catalog/sharding_catalog_client_impl.h" +#include "mongo/s/catalog_cache.h" #include "mongo/stdx/memory.h" namespace mongo { @@ -106,4 +108,15 @@ std::unique_ptr<ShardingCatalogClient> ShardServerTestFixture::makeShardingCatal return stdx::make_unique<ShardingCatalogClientImpl>(std::move(distLockManager)); } +std::unique_ptr<CatalogCacheLoader> ShardServerTestFixture::makeCatalogCacheLoader() { + return stdx::make_unique<ShardServerCatalogCacheLoader>( + stdx::make_unique<ConfigServerCatalogCacheLoader>()); +} + +std::unique_ptr<CatalogCache> ShardServerTestFixture::makeCatalogCache( + std::unique_ptr<CatalogCacheLoader> catalogCacheLoader) { + invariant(catalogCacheLoader); + return stdx::make_unique<CatalogCache>(std::move(catalogCacheLoader)); +} + } // namespace mongo diff --git a/src/mongo/s/shard_server_test_fixture.h b/src/mongo/s/shard_server_test_fixture.h index e3e15a4d780..52e202536d3 100644 --- a/src/mongo/s/shard_server_test_fixture.h +++ b/src/mongo/s/shard_server_test_fixture.h @@ -83,6 +83,14 @@ protected: */ std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient( std::unique_ptr<DistLockManager> distLockManager) override; + + /** + * Creates a ShardServerCatalogCacheLoader. + */ + std::unique_ptr<CatalogCacheLoader> makeCatalogCacheLoader(); + + std::unique_ptr<CatalogCache> makeCatalogCache( + std::unique_ptr<CatalogCacheLoader> catalogCacheLoader); }; } // namespace mongo diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 9220176f29e..570440f817b 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -160,6 +160,7 @@ Status initializeGlobalShardingState(OperationContext* opCtx, const ConnectionString& configCS, StringData distLockProcessId, std::unique_ptr<ShardFactory> shardFactory, + std::unique_ptr<CatalogCache> catalogCache, rpc::ShardingEgressMetadataHookBuilder hookBuilder, ShardingCatalogManagerBuilder catalogManagerBuilder) { if (configCS.type() == ConnectionString::INVALID) { @@ -202,7 +203,7 @@ Status initializeGlobalShardingState(OperationContext* opCtx, grid.init( std::move(catalogClient), std::move(catalogManager), - stdx::make_unique<CatalogCache>(), + std::move(catalogCache), std::move(shardRegistry), stdx::make_unique<ClusterCursorManager>(getGlobalServiceContext()->getPreciseClockSource()), stdx::make_unique<BalancerConfiguration>(), diff --git a/src/mongo/s/sharding_initialization.h b/src/mongo/s/sharding_initialization.h index f2e3d12db8b..6d4629c7994 100644 --- a/src/mongo/s/sharding_initialization.h +++ b/src/mongo/s/sharding_initialization.h @@ -41,12 +41,14 @@ namespace executor { class TaskExecutor; } // namespace executor +class CatalogCache; class ConnectionString; class OperationContext; class ShardFactory; class Status; class ShardingCatalogClient; class ShardingCatalogManager; + using ShardingCatalogManagerBuilder = stdx::function<std::unique_ptr<ShardingCatalogManager>( ShardingCatalogClient*, std::unique_ptr<executor::TaskExecutor>)>; @@ -73,6 +75,7 @@ Status initializeGlobalShardingState(OperationContext* opCtx, const ConnectionString& configCS, StringData distLockProcessId, std::unique_ptr<ShardFactory> shardFactory, + std::unique_ptr<CatalogCache> catalogCache, rpc::ShardingEgressMetadataHookBuilder hookBuilder, ShardingCatalogManagerBuilder catalogManagerBuilder); diff --git a/src/mongo/s/sharding_mongod_test_fixture.cpp b/src/mongo/s/sharding_mongod_test_fixture.cpp index 8685b7ebc6d..5c4369f5fa3 100644 --- a/src/mongo/s/sharding_mongod_test_fixture.cpp +++ b/src/mongo/s/sharding_mongod_test_fixture.cpp @@ -65,6 +65,7 @@ #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/catalog_cache_loader.h" #include "mongo/s/client/shard_factory.h" #include "mongo/s/client/shard_local.h" #include "mongo/s/client/shard_registry.h" @@ -235,7 +236,12 @@ std::unique_ptr<ShardingCatalogManager> ShardingMongodTestFixture::makeShardingC return nullptr; } -std::unique_ptr<CatalogCache> ShardingMongodTestFixture::makeCatalogCache() { +std::unique_ptr<CatalogCacheLoader> ShardingMongodTestFixture::makeCatalogCacheLoader() { + return nullptr; +} + +std::unique_ptr<CatalogCache> ShardingMongodTestFixture::makeCatalogCache( + std::unique_ptr<CatalogCacheLoader> catalogCacheLoader) { return nullptr; } @@ -269,7 +275,9 @@ Status ShardingMongodTestFixture::initializeGlobalShardingStateForMongodForTest( auto catalogClientPtr = makeShardingCatalogClient(std::move(distLockManagerPtr)); auto catalogManagerPtr = makeShardingCatalogManager(catalogClientPtr.get()); - auto catalogCachePtr = makeCatalogCache(); + + auto catalogCacheLoaderPtr = makeCatalogCacheLoader(); + auto catalogCachePtr = makeCatalogCache(std::move(catalogCacheLoaderPtr)); auto clusterCursorManagerPtr = makeClusterCursorManager(); diff --git a/src/mongo/s/sharding_mongod_test_fixture.h b/src/mongo/s/sharding_mongod_test_fixture.h index 62435c97e79..fca43abd45d 100644 --- a/src/mongo/s/sharding_mongod_test_fixture.h +++ b/src/mongo/s/sharding_mongod_test_fixture.h @@ -40,6 +40,7 @@ namespace mongo { class BalancerConfiguration; class CatalogCache; +class CatalogCacheLoader; class ConnectionString; class ClusterCursorManager; class DistLockCatalog; @@ -229,7 +230,13 @@ protected: /** * Base class returns nullptr. */ - virtual std::unique_ptr<CatalogCache> makeCatalogCache(); + virtual std::unique_ptr<CatalogCacheLoader> makeCatalogCacheLoader(); + + /** + * Base class returns nullptr. + */ + virtual std::unique_ptr<CatalogCache> makeCatalogCache( + std::unique_ptr<CatalogCacheLoader> catalogCacheLoader); /** * Base class returns nullptr. |