diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/s/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/s/catalog_cache_loader_mock.cpp | 137 | ||||
-rw-r--r-- | src/mongo/db/s/catalog_cache_loader_mock.h | 88 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/s/read_only_catalog_cache_loader.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/s/read_only_catalog_cache_loader.h | 4 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader.cpp | 43 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader.h | 16 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp | 492 |
9 files changed, 771 insertions, 47 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 1a9c825aee3..6481e53415e 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -233,9 +233,11 @@ env.CppUnitTest( target='shard_test', source=[ 'active_migrations_registry_test.cpp', + 'catalog_cache_loader_mock.cpp', 'migration_chunk_cloner_source_legacy_test.cpp', 'namespace_metadata_change_notifications_test.cpp', 'sharding_state_test.cpp', + 'shard_server_catalog_cache_loader_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/query/query_request', diff --git a/src/mongo/db/s/catalog_cache_loader_mock.cpp b/src/mongo/db/s/catalog_cache_loader_mock.cpp new file mode 100644 index 00000000000..d5dbe15c50e --- /dev/null +++ b/src/mongo/db/s/catalog_cache_loader_mock.cpp @@ -0,0 +1,137 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/catalog_cache_loader_mock.h" + +#include "mongo/db/operation_context.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/catalog/type_collection.h" +#include "mongo/stdx/thread.h" + +namespace mongo { + +using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunks; + +namespace { + +/** + * Constructs the options for the loader thread pool. + */ +ThreadPool::Options makeDefaultThreadPoolOptions() { + ThreadPool::Options options; + options.poolName = "CatalogCacheLoaderMock"; + options.minThreads = 0; + options.maxThreads = 1; + + // Ensure all threads have a client. + options.onCreateThread = [](const std::string& threadName) { + Client::initThread(threadName.c_str()); + }; + + return options; +} + +} // namespace + +CatalogCacheLoaderMock::CatalogCacheLoaderMock() : _threadPool(makeDefaultThreadPoolOptions()) { + _threadPool.startup(); +} + +CatalogCacheLoaderMock::~CatalogCacheLoaderMock() { + _threadPool.shutdown(); + _threadPool.join(); +} + +void CatalogCacheLoaderMock::initializeReplicaSetRole(bool isPrimary) { + MONGO_UNREACHABLE; +} + +void CatalogCacheLoaderMock::onStepDown() { + MONGO_UNREACHABLE; +} + +void CatalogCacheLoaderMock::onStepUp() { + MONGO_UNREACHABLE; +} + +void CatalogCacheLoaderMock::notifyOfCollectionVersionUpdate(const NamespaceString& nss) { + MONGO_UNREACHABLE; +} + +Status CatalogCacheLoaderMock::waitForCollectionVersion(OperationContext* opCtx, + const NamespaceString& nss, + const ChunkVersion& version) { + MONGO_UNREACHABLE; +} + +std::shared_ptr<Notification<void>> CatalogCacheLoaderMock::getChunksSince( + const NamespaceString& nss, + ChunkVersion version, + stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) { + + auto notify = std::make_shared<Notification<void>>(); + + uassertStatusOK(_threadPool.schedule([ this, notify, callbackFn ]() noexcept { + auto opCtx = Client::getCurrent()->makeOperationContext(); + + auto swCollAndChunks = [&]() -> StatusWith<CollectionAndChangedChunks> { + try { + uassertStatusOK(_swCollectionReturnValue); + uassertStatusOK(_swChunksReturnValue); + + return CollectionAndChangedChunks{ + _swCollectionReturnValue.getValue().getEpoch(), + _swCollectionReturnValue.getValue().getKeyPattern().toBSON(), + _swCollectionReturnValue.getValue().getDefaultCollation(), + _swCollectionReturnValue.getValue().getUnique(), + _swChunksReturnValue.getValue()}; + } catch (const DBException& ex) { + return ex.toStatus(); + } + }(); + + callbackFn(opCtx.get(), std::move(swCollAndChunks)); + notify->set(); + })); + + return notify; +} + +void CatalogCacheLoaderMock::setCollectionRefreshReturnValue( + StatusWith<CollectionType> statusWithCollectionType) { + _swCollectionReturnValue = std::move(statusWithCollectionType); +} + +void CatalogCacheLoaderMock::setChunkRefreshReturnValue( + StatusWith<std::vector<ChunkType>> statusWithChunks) { + _swChunksReturnValue = std::move(statusWithChunks); +} + +} // namespace mongo diff --git a/src/mongo/db/s/catalog_cache_loader_mock.h b/src/mongo/db/s/catalog_cache_loader_mock.h new file mode 100644 index 00000000000..0fb16efde8f --- /dev/null +++ b/src/mongo/db/s/catalog_cache_loader_mock.h @@ -0,0 +1,88 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/s/catalog_cache_loader.h" +#include "mongo/util/concurrency/thread_pool.h" + +namespace mongo { + +/** + * Mocks the metadata refresh results with settable return values. The purpose of this class is to + * facilitate testing of classes that use a CatalogCacheLoader. + */ +class CatalogCacheLoaderMock final : public CatalogCacheLoader { + MONGO_DISALLOW_COPYING(CatalogCacheLoaderMock); + +public: + CatalogCacheLoaderMock(); + ~CatalogCacheLoaderMock(); + + /** + * These functions should never be called. They trigger invariants if called. + */ + void initializeReplicaSetRole(bool isPrimary) override; + void onStepDown() override; + void onStepUp() override; + void notifyOfCollectionVersionUpdate(const NamespaceString& nss) override; + Status waitForCollectionVersion(OperationContext* opCtx, + const NamespaceString& nss, + const ChunkVersion& version) override; + + std::shared_ptr<Notification<void>> getChunksSince( + const NamespaceString& nss, + ChunkVersion version, + stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) + override; + + /** + * Sets the mocked collection entry result that getChunksSince will use to construct its return + * value. + */ + void setCollectionRefreshReturnValue(StatusWith<CollectionType> statusWithCollectionType); + + /** + * Sets the mocked chunk results that getChunksSince will use to construct its return value. + */ + void setChunkRefreshReturnValue(StatusWith<std::vector<ChunkType>> statusWithChunks); + +private: + // These variables hold the mocked chunks and collection entry results used to construct the + // return value of getChunksSince above. + StatusWith<CollectionType> _swCollectionReturnValue{Status( + ErrorCodes::InternalError, "config loader mock collection response is uninitialized")}; + + StatusWith<std::vector<ChunkType>> _swChunksReturnValue{ + Status(ErrorCodes::InternalError, "config loader mock chunks response is uninitialized")}; + + // Thread pool on which to mock load chunk metadata. + ThreadPool _threadPool; +}; + +} // namespace mongo diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 23e6e2c5b6a..00576f2e00b 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -54,6 +54,7 @@ #include "mongo/s/catalog/type_config_version.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/catalog/type_shard_collection.h" +#include "mongo/s/catalog_cache.h" #include "mongo/s/catalog_cache_loader.h" #include "mongo/s/chunk_version.h" #include "mongo/s/cluster_identity_loader.h" @@ -94,14 +95,12 @@ private: */ class CollectionVersionLogOpHandler final : public RecoveryUnit::Change { public: - CollectionVersionLogOpHandler(OperationContext* opCtx, - const NamespaceString& nss, - const ChunkVersion& updatedVersion) - : _opCtx(opCtx), _nss(nss), _updatedVersion(updatedVersion) {} + CollectionVersionLogOpHandler(OperationContext* opCtx, const NamespaceString& nss) + : _opCtx(opCtx), _nss(nss) {} void commit() override { - CatalogCacheLoader::get(_opCtx).notifyOfCollectionVersionUpdate( - _opCtx, _nss, _updatedVersion); + Grid::get(_opCtx)->catalogCache()->invalidateShardedCollection(_nss); + CatalogCacheLoader::get(_opCtx).notifyOfCollectionVersionUpdate(_nss); } void rollback() override {} @@ -109,7 +108,6 @@ public: private: OperationContext* _opCtx; const NamespaceString _nss; - const ChunkVersion _updatedVersion; }; } // unnamed namespace @@ -432,22 +430,8 @@ void CollectionShardingState::_onConfigRefreshCompleteInvalidateCachedMetadataAn // If 'lastRefreshedCollectionVersion' is present, then a refresh completed and the catalog // cache must be invalidated and the catalog cache loader notified of the new version. if (setField.hasField(ShardCollectionType::lastRefreshedCollectionVersion.name())) { - // Get the version epoch from the 'updatedDoc', since it didn't get updated and won't be - // in 'update'. - BSONElement oidElem; - fassert(40513, - bsonExtractTypedField( - updatedDoc, ShardCollectionType::epoch(), BSONType::jstOID, &oidElem)); - - // Get the new collection version. - auto statusWithLastRefreshedChunkVersion = ChunkVersion::parseFromBSONWithFieldAndSetEpoch( - updatedDoc, ShardCollectionType::lastRefreshedCollectionVersion(), oidElem.OID()); - fassert(40514, statusWithLastRefreshedChunkVersion.isOK()); - opCtx->recoveryUnit()->registerChange( - new CollectionVersionLogOpHandler(opCtx, - NamespaceString(refreshCollection), - statusWithLastRefreshedChunkVersion.getValue())); + new CollectionVersionLogOpHandler(opCtx, NamespaceString(refreshCollection))); } } @@ -461,8 +445,8 @@ void CollectionShardingState::_onConfigDeleteInvalidateCachedMetadataAndNotify( fassertStatusOK( 40479, bsonExtractStringField(query, ShardCollectionType::uuid.name(), &deletedCollection)); - opCtx->recoveryUnit()->registerChange(new CollectionVersionLogOpHandler( - opCtx, NamespaceString(deletedCollection), ChunkVersion::UNSHARDED())); + opCtx->recoveryUnit()->registerChange( + new CollectionVersionLogOpHandler(opCtx, NamespaceString(deletedCollection))); } bool CollectionShardingState::_checkShardVersionOk(OperationContext* opCtx, diff --git a/src/mongo/db/s/read_only_catalog_cache_loader.cpp b/src/mongo/db/s/read_only_catalog_cache_loader.cpp index 007a6203489..46cd792ddb7 100644 --- a/src/mongo/db/s/read_only_catalog_cache_loader.cpp +++ b/src/mongo/db/s/read_only_catalog_cache_loader.cpp @@ -48,9 +48,7 @@ void ReadOnlyCatalogCacheLoader::onStepUp() { return; } -void ReadOnlyCatalogCacheLoader::notifyOfCollectionVersionUpdate(OperationContext* opCtx, - const NamespaceString& nss, - const ChunkVersion& version) { +void ReadOnlyCatalogCacheLoader::notifyOfCollectionVersionUpdate(const NamespaceString& nss) { return; } diff --git a/src/mongo/db/s/read_only_catalog_cache_loader.h b/src/mongo/db/s/read_only_catalog_cache_loader.h index 1615ba55c16..6485538c127 100644 --- a/src/mongo/db/s/read_only_catalog_cache_loader.h +++ b/src/mongo/db/s/read_only_catalog_cache_loader.h @@ -46,9 +46,7 @@ public: void initializeReplicaSetRole(bool isPrimary) override; void onStepDown() override; void onStepUp() override; - void notifyOfCollectionVersionUpdate(OperationContext* opCtx, - const NamespaceString& nss, - const ChunkVersion& version) override; + void notifyOfCollectionVersionUpdate(const NamespaceString& nss) override; Status waitForCollectionVersion(OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& version) override; diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp index 59ee1da26c5..83596a1da3c 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp @@ -38,7 +38,6 @@ #include "mongo/db/s/shard_metadata_util.h" #include "mongo/db/s/sharding_state.h" #include "mongo/s/catalog/type_shard_collection.h" -#include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/stdx/memory.h" @@ -286,11 +285,11 @@ ShardServerCatalogCacheLoader::~ShardServerCatalogCacheLoader() { invariant(_contexts.isEmpty()); } -void ShardServerCatalogCacheLoader::notifyOfCollectionVersionUpdate(OperationContext* opCtx, - const NamespaceString& nss, - const ChunkVersion& version) { - Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss); +void ShardServerCatalogCacheLoader::setForTesting() { + _testing = true; +} +void ShardServerCatalogCacheLoader::notifyOfCollectionVersionUpdate(const NamespaceString& nss) { _namespaceNotifications.notifyChange(nss); } @@ -505,7 +504,6 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( } } - if (swCollectionAndChangedChunks.isOK()) { log() << "Cache loader remotely refreshed for collection " << nss << " from collection version " << maxLoaderVersion @@ -581,15 +579,15 @@ StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoader if (!tasksAreEnqueued) { // There are no tasks in the queue. Return the persisted metadata. return persisted; - } else if (enqueued.changedChunks.empty() || enqueued.epoch != persisted.epoch) { - // There is a task queue and either: - // - nothing was returned, which means the last task enqueued is a drop task. + } else if (persisted.changedChunks.empty() || enqueued.changedChunks.empty() || + enqueued.epoch != persisted.epoch) { + // There is a task queue and: + // - nothing is persisted. + // - nothing was returned from enqueued, which means the last task enqueued is a drop task. // - the epoch changed in the enqueued metadata, which means there's a drop operation // enqueued somewhere. - // Either way, the persisted metadata is out-dated. Return enqueued results. - return enqueued; - } else if (persisted.changedChunks.empty()) { - // Nothing is persisted. Return enqueued results. + // Whichever the cause, the persisted metadata is out-dated/non-existent. Return enqueued + // results. return enqueued; } else { // There can be overlap between persisted and enqueued metadata because enqueued work can @@ -699,6 +697,10 @@ void ShardServerCatalogCacheLoader::_runTasks(const NamespaceString& nss) { _taskLists[nss].removeActiveTask(); } + if (_testing) { + notifyOfCollectionVersionUpdate(nss); + } + // Schedule more work if there is any if (!_taskLists[nss].empty()) { Status status = _threadPool.schedule([this, nss]() { _runTasks(nss); }); @@ -912,9 +914,22 @@ CollectionAndChangedChunks ShardServerCatalogCacheLoader::TaskList::getEnqueuedM // the chunks vector. This will be either reset by the next task with a total reload // with a new epoch, or cause the original getChunksSince caller to throw out the // results and refresh again. + + // Make sure we do not append a duplicate chunk. The diff query is GTE, so there can + // be duplicates of the same exact versioned chunk across tasks. This is no problem + // for our diff application algorithms, but it can return unpredictable numbers of + // chunks for testing purposes. Eliminate unpredicatable duplicates for testing + // stability. + auto taskCollectionAndChangedChunksIt = + task.collectionAndChangedChunks->changedChunks.begin(); + if (collAndChunks.changedChunks.back().getVersion() == + task.collectionAndChangedChunks->changedChunks.front().getVersion()) { + ++taskCollectionAndChangedChunksIt; + } + collAndChunks.changedChunks.insert( collAndChunks.changedChunks.end(), - task.collectionAndChangedChunks->changedChunks.begin(), + taskCollectionAndChangedChunksIt, task.collectionAndChangedChunks->changedChunks.end()); } } diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h index 8eb863329c2..257791be7d0 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.h +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h @@ -51,6 +51,15 @@ public: ~ShardServerCatalogCacheLoader(); /** + * For testing use only. + * + * Currently this only sets a boolean such that after metadata updates the notification system + * is signaled internally, rather than depending on the OpObservers which are not connectted for + * unit testing. + */ + void setForTesting(); + + /** * Initializes internal state so that the loader behaves as a primary or secondary. This can * only be called once, when the sharding state is initialized. */ @@ -70,9 +79,7 @@ public: * Sets any notifications waiting for this version to arrive and invalidates the catalog cache's * chunk metadata for collection 'nss' so that the next caller provokes a refresh. */ - void notifyOfCollectionVersionUpdate(OperationContext* opCtx, - const NamespaceString& nss, - const ChunkVersion& version) override; + void notifyOfCollectionVersionUpdate(const NamespaceString& nss) override; /** * Waits for the persisted collection version to be gte to 'version', or an epoch change. Only @@ -355,6 +362,9 @@ private: // The collection of operation contexts in use by all threads. OperationContextGroup _contexts; + + // Gates additional actions needed when testing. + bool _testing{false}; }; } // namespace mongo diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp new file mode 100644 index 00000000000..9ff8e837f71 --- /dev/null +++ b/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp @@ -0,0 +1,492 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/shard_server_catalog_cache_loader.h" + +#include "mongo/db/s/catalog_cache_loader_mock.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/catalog/type_collection.h" +#include "mongo/s/shard_server_test_fixture.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +using executor::RemoteCommandRequest; +using executor::RemoteCommandResponse; +using std::vector; +using unittest::assertGet; +using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunks; + +const NamespaceString kNss = NamespaceString("foo.bar"); +const std::string kPattern = "_id"; +const ShardId kShardId = ShardId("shard0"); + +class ShardServerCatalogCacheLoaderTest : public ShardServerTestFixture { +public: + /** + * Returns five chunks using collVersion as a starting version. + */ + vector<ChunkType> makeFiveChunks(const ChunkVersion& collectionVersion); + + /** + * Returns a chunk update diff GTE 'collVersion' for the chunks created by makeFiveChunks above. + */ + vector<ChunkType> makeThreeUpdatedChunksDiff(const ChunkVersion& collectionVersion); + + /** + * Returns a routing table applying 'threeUpdatedChunks' (the result of + * makeThreeUpdatedChunksDiff) to 'originalFiveChunks' (the result of makeFiveChunks). + */ + vector<ChunkType> makeCombinedOriginalFiveChunksAndThreeNewChunksDiff( + const vector<ChunkType>& originalFiveChunks, const vector<ChunkType>& threeUpdatedChunks); + + /** + * This helper makes a CollectionType with the current _maxCollVersion. + */ + CollectionType makeCollectionType(const ChunkVersion& collVersion); + + /** + * Sets up the _shardLoader with the results of makeFiveChunks(). + */ + vector<ChunkType> setUpChunkLoaderWithFiveChunks(); + + const KeyPattern kKeyPattern = KeyPattern(BSON(kPattern << 1)); + const stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> + kDoNothingCallbackFn = []( + OperationContext * opCtx, + StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {}; + + CatalogCacheLoaderMock* _remoteLoaderMock; + std::unique_ptr<ShardServerCatalogCacheLoader> _shardLoader; + +private: + void setUp() override; + void tearDown() override; +}; + +void ShardServerCatalogCacheLoaderTest::setUp() { + ShardServerTestFixture::setUp(); + + // Create mock remote and real shard loader, retaining a pointer to the mock remote loader so + // that unit tests can manipulate it to return certain responses. + std::unique_ptr<CatalogCacheLoaderMock> mockLoader = + stdx::make_unique<CatalogCacheLoaderMock>(); + _remoteLoaderMock = mockLoader.get(); + _shardLoader = stdx::make_unique<ShardServerCatalogCacheLoader>(std::move(mockLoader)); + + // Set the shard loader to primary mode, and set it for testing. + _shardLoader->initializeReplicaSetRole(true); + _shardLoader->setForTesting(); +} + +void ShardServerCatalogCacheLoaderTest::tearDown() { + _shardLoader.reset(); + ShardServerTestFixture::tearDown(); +} + +vector<ChunkType> ShardServerCatalogCacheLoaderTest::makeFiveChunks( + const ChunkVersion& collectionVersion) { + ChunkVersion collVersion(collectionVersion); + vector<ChunkType> chunks; + + BSONObj mins[] = { + BSON("a" << MINKEY), BSON("a" << 10), BSON("a" << 50), BSON("a" << 100), BSON("a" << 200)}; + BSONObj maxs[] = { + BSON("a" << 10), BSON("a" << 50), BSON("a" << 100), BSON("a" << 200), BSON("a" << MAXKEY)}; + + for (int i = 0; i < 5; ++i) { + collVersion.incMajor(); + + ChunkType chunk; + chunk.setNS(kNss.ns()); + chunk.setMin(mins[i]); + chunk.setMax(maxs[i]); + chunk.setShard(kShardId); + chunk.setVersion(collVersion); + + chunks.push_back(chunk); + } + + return chunks; +} + +vector<ChunkType> ShardServerCatalogCacheLoaderTest::makeThreeUpdatedChunksDiff( + const ChunkVersion& collectionVersion) { + ChunkVersion collVersion(collectionVersion); + vector<ChunkType> chunks; + + // The diff query is for GTE a known version, so prepend the previous newest chunk, which is + // unmodified by this change and so should be found. Note: it is important for testing that the + // previous highest versioned chunk is unmodified. Otherwise the shard loader's results are + // dependent on a race between persistence and retrieving data because it combines enqueued and + // persisted results without applying modifications. + ChunkType oldChunk; + oldChunk.setNS(kNss.ns()); + oldChunk.setMin(BSON("a" << 200)); + oldChunk.setMax(BSON("a" << MAXKEY)); + oldChunk.setShard(kShardId); + oldChunk.setVersion(collVersion); + chunks.push_back(oldChunk); + + + // Make chunk updates + BSONObj mins[] = {BSON("a" << MINKEY), BSON("a" << 5), BSON("a" << 10)}; + BSONObj maxs[] = {BSON("a" << 5), BSON("a" << 10), BSON("a" << 100)}; + + for (int i = 0; i < 3; ++i) { + collVersion.incMinor(); + + ChunkType chunk; + chunk.setNS(kNss.ns()); + chunk.setMin(mins[i]); + chunk.setMax(maxs[i]); + chunk.setShard(kShardId); + chunk.setVersion(collVersion); + + chunks.push_back(chunk); + } + + return chunks; +} + +vector<ChunkType> +ShardServerCatalogCacheLoaderTest::makeCombinedOriginalFiveChunksAndThreeNewChunksDiff( + const vector<ChunkType>& originalFiveChunks, const vector<ChunkType>& threeUpdatedChunksDiff) { + vector<ChunkType> chunks; + + // Sorted by ascending chunk version, not range! Note, threeUpdatedChunksDiff already includes + // the last originalFiveChunks chunk because the diff query is GTE. + chunks.push_back(originalFiveChunks[3]); + chunks.insert(chunks.end(), threeUpdatedChunksDiff.begin(), threeUpdatedChunksDiff.end()); + + return chunks; +} + +CollectionType ShardServerCatalogCacheLoaderTest::makeCollectionType( + const ChunkVersion& collVersion) { + CollectionType coll; + coll.setNs(kNss); + coll.setEpoch(collVersion.epoch()); + coll.setUpdatedAt(Date_t::fromMillisSinceEpoch(collVersion.toLong())); + coll.setKeyPattern(kKeyPattern); + coll.setUnique(false); + return coll; +} + +vector<ChunkType> ShardServerCatalogCacheLoaderTest::setUpChunkLoaderWithFiveChunks() { + ChunkVersion collectionVersion(1, 0, OID::gen()); + + CollectionType collectionType = makeCollectionType(collectionVersion); + vector<ChunkType> chunks = makeFiveChunks(collectionVersion); + _remoteLoaderMock->setCollectionRefreshReturnValue(collectionType); + _remoteLoaderMock->setChunkRefreshReturnValue(chunks); + + StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{ + Status(ErrorCodes::InternalError, "")}; + const auto refreshCallbackFn = [&results]( + OperationContext * opCtx, + StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { + results = std::move(swCollAndChunks); + }; + + auto notification = + _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED(), refreshCallbackFn); + notification->get(); + + // Check refreshCallbackFn thread results where we can safely throw. + ASSERT_OK(results.getStatus()); + auto collAndChunkRes = results.getValue(); + ASSERT_EQUALS(collAndChunkRes.epoch, collectionType.getEpoch()); + ASSERT_EQUALS(collAndChunkRes.changedChunks.size(), 5UL); + for (unsigned int i = 0; i < collAndChunkRes.changedChunks.size(); ++i) { + ASSERT_BSONOBJ_EQ(collAndChunkRes.changedChunks[i].toConfigBSON(), + chunks[i].toConfigBSON()); + } + + return chunks; +} + +TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromUnshardedToUnsharded) { + // Return a NamespaceNotFound error that means the collection doesn't exist. + + Status errorStatus = Status(ErrorCodes::NamespaceNotFound, "collection not found"); + _remoteLoaderMock->setCollectionRefreshReturnValue(errorStatus); + + StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{ + Status(ErrorCodes::InternalError, "")}; + const auto refreshCallbackFn = [&results]( + OperationContext * opCtx, + StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { + results = std::move(swCollAndChunks); + }; + + auto notification = + _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED(), refreshCallbackFn); + notification->get(); + + ASSERT_EQUALS(results.getStatus(), errorStatus); +} + +TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedToUnsharded) { + // First set up the shard chunk loader as sharded. + + auto chunks = setUpChunkLoaderWithFiveChunks(); + + // Then return a NamespaceNotFound error, which means the collection must have been dropped, + // clearing the chunk metadata. + + Status errorStatus = Status(ErrorCodes::NamespaceNotFound, "collection not found"); + _remoteLoaderMock->setCollectionRefreshReturnValue(errorStatus); + + StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{ + Status(ErrorCodes::InternalError, "")}; + const auto nextRefreshCallbackFn = [&results]( + OperationContext * opCtx, + StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { + results = std::move(swCollAndChunks); + }; + + auto notification = + _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), nextRefreshCallbackFn); + notification->get(); + + ASSERT_EQUALS(results.getStatus(), errorStatus); +} + +TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindNoDiff) { + // First set up the shard chunk loader as sharded. + + vector<ChunkType> chunks = setUpChunkLoaderWithFiveChunks(); + + // Then set up the remote loader to return a single document we've already seen -- indicates + // there's nothing new. + + vector<ChunkType> lastChunk; + lastChunk.push_back(chunks.back()); + _remoteLoaderMock->setChunkRefreshReturnValue(lastChunk); + + StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{ + Status(ErrorCodes::InternalError, "")}; + const auto refreshCallbackFn = [&results]( + OperationContext * opCtx, + StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { + results = std::move(swCollAndChunks); + }; + + auto notification = + _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), refreshCallbackFn); + notification->get(); + + // Check that refreshing from the latest version returned a single document matching that + // version. + ASSERT_OK(results.getStatus()); + auto collAndChunksRes = results.getValue(); + ASSERT_EQUALS(collAndChunksRes.epoch, chunks.back().getVersion().epoch()); + ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 1UL); + ASSERT_BSONOBJ_EQ(collAndChunksRes.changedChunks.back().toConfigBSON(), + chunks.back().toConfigBSON()); +} + +// Same as the above unit test, PrimaryLoadFromShardedAndFindNoDiff, but caller requests complete +// routing table, rather than diff from a known version. +TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindNoDiffRequestAll) { + // First set up the shard chunk loader as sharded. + + vector<ChunkType> chunks = setUpChunkLoaderWithFiveChunks(); + + // Then set up the remote loader to return a single document we've already seen -- indicates + // there's nothing new. + + vector<ChunkType> lastChunk; + lastChunk.push_back(chunks.back()); + _remoteLoaderMock->setChunkRefreshReturnValue(lastChunk); + + StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{ + Status(ErrorCodes::InternalError, "")}; + const auto completeRefreshCallbackFn = [&results]( + OperationContext * opCtx, + StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { + results = std::move(swCollAndChunks); + }; + + auto notification = + _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED(), completeRefreshCallbackFn); + notification->get(); + + // Check that the complete routing table was returned successfully. + ASSERT_OK(results.getStatus()); + auto collAndChunksRes = results.getValue(); + ASSERT_EQUALS(collAndChunksRes.epoch, chunks.back().getVersion().epoch()); + ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 5UL); + for (unsigned int i = 0; i < collAndChunksRes.changedChunks.size(); ++i) { + ASSERT_BSONOBJ_EQ(collAndChunksRes.changedChunks[i].getMin(), chunks[i].getMin()); + ASSERT_BSONOBJ_EQ(collAndChunksRes.changedChunks[i].getMax(), chunks[i].getMax()); + ASSERT_EQUALS(collAndChunksRes.changedChunks[i].getVersion(), chunks[i].getVersion()); + } +} + +TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindDiff) { + // First set up the shard chunk loader as sharded. + + vector<ChunkType> chunks = setUpChunkLoaderWithFiveChunks(); + + // Then refresh again and find updated chunks. + + ChunkVersion collVersion = chunks.back().getVersion(); + vector<ChunkType> updatedChunksDiff = makeThreeUpdatedChunksDiff(collVersion); + _remoteLoaderMock->setChunkRefreshReturnValue(updatedChunksDiff); + + StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{ + Status(ErrorCodes::InternalError, "")}; + const auto refreshCallbackFn = [&results]( + OperationContext * opCtx, + StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { + results = std::move(swCollAndChunks); + }; + + auto notification = + _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), refreshCallbackFn); + notification->get(); + + // Check that the diff was returned successfull. + ASSERT_OK(results.getStatus()); + auto collAndChunksRes = results.getValue(); + ASSERT_EQUALS(collAndChunksRes.epoch, updatedChunksDiff.front().getVersion().epoch()); + ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 4UL); + for (unsigned int i = 0; i < collAndChunksRes.changedChunks.size(); ++i) { + ASSERT_BSONOBJ_EQ(collAndChunksRes.changedChunks[i].getMin(), + updatedChunksDiff[i].getMin()); + ASSERT_BSONOBJ_EQ(collAndChunksRes.changedChunks[i].getMax(), + updatedChunksDiff[i].getMax()); + ASSERT_EQUALS(collAndChunksRes.changedChunks[i].getVersion(), + updatedChunksDiff[i].getVersion()); + } +} + +TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindDiffRequestAll) { + // First set up the shard chunk loader as sharded. + + vector<ChunkType> chunks = setUpChunkLoaderWithFiveChunks(); + + // First cause a remote refresh to find the updated chunks. Then wait for persistence, so that + // we ensure that nothing is enqueued and the next getChunksSince call will return a predictable + // number of chunk documents: the result of applying the enqueued update diff. + + vector<ChunkType> updatedChunksDiff = makeThreeUpdatedChunksDiff(chunks.back().getVersion()); + _remoteLoaderMock->setChunkRefreshReturnValue(updatedChunksDiff); + + auto notification = + _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), kDoNothingCallbackFn); + notification->get(); + + // Wait for persistence of update + ASSERT_OK(_shardLoader->waitForCollectionVersion( + operationContext(), kNss, updatedChunksDiff.back().getVersion())); + + // Set up the remote loader to return a single document we've already seen, indicating no change + // occurred. + vector<ChunkType> lastChunk; + lastChunk.push_back(updatedChunksDiff.back()); + _remoteLoaderMock->setChunkRefreshReturnValue(lastChunk); + + vector<ChunkType> completeRoutingTableWithDiffApplied = + makeCombinedOriginalFiveChunksAndThreeNewChunksDiff(chunks, updatedChunksDiff); + + StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{ + Status(ErrorCodes::InternalError, "")}; + const auto refreshCallbackFn = [&results]( + OperationContext * opCtx, + StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { + results = std::move(swCollAndChunks); + }; + + auto nextNotification = + _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED(), refreshCallbackFn); + nextNotification->get(); + + // Check that the complete routing table, with diff applied, was returned. + ASSERT_OK(results.getStatus()); + auto collAndChunksRes = results.getValue(); + ASSERT_EQUALS(collAndChunksRes.epoch, + completeRoutingTableWithDiffApplied.front().getVersion().epoch()); + ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 5UL); + for (unsigned int i = 0; i < collAndChunksRes.changedChunks.size(); ++i) { + ASSERT_BSONOBJ_EQ(collAndChunksRes.changedChunks[i].getMin(), + completeRoutingTableWithDiffApplied[i].getMin()); + ASSERT_BSONOBJ_EQ(collAndChunksRes.changedChunks[i].getMax(), + completeRoutingTableWithDiffApplied[i].getMax()); + ASSERT_EQUALS(collAndChunksRes.changedChunks[i].getVersion(), + completeRoutingTableWithDiffApplied[i].getVersion()); + } +} + +TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindNewEpoch) { + // First set up the shard chunk loader as sharded. + + vector<ChunkType> chunks = setUpChunkLoaderWithFiveChunks(); + + // Then refresh again and find that the collection has been dropped and recreated. + + ChunkVersion collVersionWithNewEpoch(1, 0, OID::gen()); + CollectionType collectionTypeWithNewEpoch = makeCollectionType(collVersionWithNewEpoch); + vector<ChunkType> chunksWithNewEpoch = makeFiveChunks(collVersionWithNewEpoch); + _remoteLoaderMock->setCollectionRefreshReturnValue(collectionTypeWithNewEpoch); + _remoteLoaderMock->setChunkRefreshReturnValue(chunksWithNewEpoch); + + StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{ + Status(ErrorCodes::InternalError, "")}; + const auto refreshCallbackFn = [&results]( + OperationContext * opCtx, + StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { + results = std::move(swCollAndChunks); + }; + + auto notification = + _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), refreshCallbackFn); + notification->get(); + + // Check that the complete routing table for the new epoch was returned. + ASSERT_OK(results.getStatus()); + auto collAndChunksRes = results.getValue(); + ASSERT_EQUALS(collAndChunksRes.epoch, collectionTypeWithNewEpoch.getEpoch()); + ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 5UL); + for (unsigned int i = 0; i < collAndChunksRes.changedChunks.size(); ++i) { + ASSERT_BSONOBJ_EQ(collAndChunksRes.changedChunks[i].getMin(), + chunksWithNewEpoch[i].getMin()); + ASSERT_BSONOBJ_EQ(collAndChunksRes.changedChunks[i].getMax(), + chunksWithNewEpoch[i].getMax()); + ASSERT_EQUALS(collAndChunksRes.changedChunks[i].getVersion(), + chunksWithNewEpoch[i].getVersion()); + } +} + +} // namespace +} // namespace mongo |