summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/catalog_cache_loader_mock.cpp137
-rw-r--r--src/mongo/db/s/catalog_cache_loader_mock.h88
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp32
-rw-r--r--src/mongo/db/s/read_only_catalog_cache_loader.cpp4
-rw-r--r--src/mongo/db/s/read_only_catalog_cache_loader.h4
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.cpp43
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.h16
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp492
9 files changed, 771 insertions, 47 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 1a9c825aee3..6481e53415e 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -233,9 +233,11 @@ env.CppUnitTest(
target='shard_test',
source=[
'active_migrations_registry_test.cpp',
+ 'catalog_cache_loader_mock.cpp',
'migration_chunk_cloner_source_legacy_test.cpp',
'namespace_metadata_change_notifications_test.cpp',
'sharding_state_test.cpp',
+ 'shard_server_catalog_cache_loader_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/query/query_request',
diff --git a/src/mongo/db/s/catalog_cache_loader_mock.cpp b/src/mongo/db/s/catalog_cache_loader_mock.cpp
new file mode 100644
index 00000000000..d5dbe15c50e
--- /dev/null
+++ b/src/mongo/db/s/catalog_cache_loader_mock.cpp
@@ -0,0 +1,137 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/catalog_cache_loader_mock.h"
+
+#include "mongo/db/operation_context.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/catalog/type_collection.h"
+#include "mongo/stdx/thread.h"
+
+namespace mongo {
+
+using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunks;
+
+namespace {
+
+/**
+ * Constructs the options for the loader thread pool.
+ */
+ThreadPool::Options makeDefaultThreadPoolOptions() {
+ ThreadPool::Options options;
+ options.poolName = "CatalogCacheLoaderMock";
+ options.minThreads = 0;
+ options.maxThreads = 1;
+
+ // Ensure all threads have a client.
+ options.onCreateThread = [](const std::string& threadName) {
+ Client::initThread(threadName.c_str());
+ };
+
+ return options;
+}
+
+} // namespace
+
+CatalogCacheLoaderMock::CatalogCacheLoaderMock() : _threadPool(makeDefaultThreadPoolOptions()) {
+ _threadPool.startup();
+}
+
+CatalogCacheLoaderMock::~CatalogCacheLoaderMock() {
+ _threadPool.shutdown();
+ _threadPool.join();
+}
+
+void CatalogCacheLoaderMock::initializeReplicaSetRole(bool isPrimary) {
+ MONGO_UNREACHABLE;
+}
+
+void CatalogCacheLoaderMock::onStepDown() {
+ MONGO_UNREACHABLE;
+}
+
+void CatalogCacheLoaderMock::onStepUp() {
+ MONGO_UNREACHABLE;
+}
+
+void CatalogCacheLoaderMock::notifyOfCollectionVersionUpdate(const NamespaceString& nss) {
+ MONGO_UNREACHABLE;
+}
+
+Status CatalogCacheLoaderMock::waitForCollectionVersion(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkVersion& version) {
+ MONGO_UNREACHABLE;
+}
+
+std::shared_ptr<Notification<void>> CatalogCacheLoaderMock::getChunksSince(
+ const NamespaceString& nss,
+ ChunkVersion version,
+ stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn) {
+
+ auto notify = std::make_shared<Notification<void>>();
+
+ uassertStatusOK(_threadPool.schedule([ this, notify, callbackFn ]() noexcept {
+ auto opCtx = Client::getCurrent()->makeOperationContext();
+
+ auto swCollAndChunks = [&]() -> StatusWith<CollectionAndChangedChunks> {
+ try {
+ uassertStatusOK(_swCollectionReturnValue);
+ uassertStatusOK(_swChunksReturnValue);
+
+ return CollectionAndChangedChunks{
+ _swCollectionReturnValue.getValue().getEpoch(),
+ _swCollectionReturnValue.getValue().getKeyPattern().toBSON(),
+ _swCollectionReturnValue.getValue().getDefaultCollation(),
+ _swCollectionReturnValue.getValue().getUnique(),
+ _swChunksReturnValue.getValue()};
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+ }();
+
+ callbackFn(opCtx.get(), std::move(swCollAndChunks));
+ notify->set();
+ }));
+
+ return notify;
+}
+
+void CatalogCacheLoaderMock::setCollectionRefreshReturnValue(
+ StatusWith<CollectionType> statusWithCollectionType) {
+ _swCollectionReturnValue = std::move(statusWithCollectionType);
+}
+
+void CatalogCacheLoaderMock::setChunkRefreshReturnValue(
+ StatusWith<std::vector<ChunkType>> statusWithChunks) {
+ _swChunksReturnValue = std::move(statusWithChunks);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/catalog_cache_loader_mock.h b/src/mongo/db/s/catalog_cache_loader_mock.h
new file mode 100644
index 00000000000..0fb16efde8f
--- /dev/null
+++ b/src/mongo/db/s/catalog_cache_loader_mock.h
@@ -0,0 +1,88 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/s/catalog_cache_loader.h"
+#include "mongo/util/concurrency/thread_pool.h"
+
+namespace mongo {
+
+/**
+ * Mocks the metadata refresh results with settable return values. The purpose of this class is to
+ * facilitate testing of classes that use a CatalogCacheLoader.
+ */
+class CatalogCacheLoaderMock final : public CatalogCacheLoader {
+ MONGO_DISALLOW_COPYING(CatalogCacheLoaderMock);
+
+public:
+ CatalogCacheLoaderMock();
+ ~CatalogCacheLoaderMock();
+
+ /**
+ * These functions should never be called. They trigger invariants if called.
+ */
+ void initializeReplicaSetRole(bool isPrimary) override;
+ void onStepDown() override;
+ void onStepUp() override;
+ void notifyOfCollectionVersionUpdate(const NamespaceString& nss) override;
+ Status waitForCollectionVersion(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkVersion& version) override;
+
+ std::shared_ptr<Notification<void>> getChunksSince(
+ const NamespaceString& nss,
+ ChunkVersion version,
+ stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn)
+ override;
+
+ /**
+ * Sets the mocked collection entry result that getChunksSince will use to construct its return
+ * value.
+ */
+ void setCollectionRefreshReturnValue(StatusWith<CollectionType> statusWithCollectionType);
+
+ /**
+ * Sets the mocked chunk results that getChunksSince will use to construct its return value.
+ */
+ void setChunkRefreshReturnValue(StatusWith<std::vector<ChunkType>> statusWithChunks);
+
+private:
+ // These variables hold the mocked chunks and collection entry results used to construct the
+ // return value of getChunksSince above.
+ StatusWith<CollectionType> _swCollectionReturnValue{Status(
+ ErrorCodes::InternalError, "config loader mock collection response is uninitialized")};
+
+ StatusWith<std::vector<ChunkType>> _swChunksReturnValue{
+ Status(ErrorCodes::InternalError, "config loader mock chunks response is uninitialized")};
+
+ // Thread pool on which to mock load chunk metadata.
+ ThreadPool _threadPool;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 23e6e2c5b6a..00576f2e00b 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -54,6 +54,7 @@
#include "mongo/s/catalog/type_config_version.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/catalog/type_shard_collection.h"
+#include "mongo/s/catalog_cache.h"
#include "mongo/s/catalog_cache_loader.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/cluster_identity_loader.h"
@@ -94,14 +95,12 @@ private:
*/
class CollectionVersionLogOpHandler final : public RecoveryUnit::Change {
public:
- CollectionVersionLogOpHandler(OperationContext* opCtx,
- const NamespaceString& nss,
- const ChunkVersion& updatedVersion)
- : _opCtx(opCtx), _nss(nss), _updatedVersion(updatedVersion) {}
+ CollectionVersionLogOpHandler(OperationContext* opCtx, const NamespaceString& nss)
+ : _opCtx(opCtx), _nss(nss) {}
void commit() override {
- CatalogCacheLoader::get(_opCtx).notifyOfCollectionVersionUpdate(
- _opCtx, _nss, _updatedVersion);
+ Grid::get(_opCtx)->catalogCache()->invalidateShardedCollection(_nss);
+ CatalogCacheLoader::get(_opCtx).notifyOfCollectionVersionUpdate(_nss);
}
void rollback() override {}
@@ -109,7 +108,6 @@ public:
private:
OperationContext* _opCtx;
const NamespaceString _nss;
- const ChunkVersion _updatedVersion;
};
} // unnamed namespace
@@ -432,22 +430,8 @@ void CollectionShardingState::_onConfigRefreshCompleteInvalidateCachedMetadataAn
// If 'lastRefreshedCollectionVersion' is present, then a refresh completed and the catalog
// cache must be invalidated and the catalog cache loader notified of the new version.
if (setField.hasField(ShardCollectionType::lastRefreshedCollectionVersion.name())) {
- // Get the version epoch from the 'updatedDoc', since it didn't get updated and won't be
- // in 'update'.
- BSONElement oidElem;
- fassert(40513,
- bsonExtractTypedField(
- updatedDoc, ShardCollectionType::epoch(), BSONType::jstOID, &oidElem));
-
- // Get the new collection version.
- auto statusWithLastRefreshedChunkVersion = ChunkVersion::parseFromBSONWithFieldAndSetEpoch(
- updatedDoc, ShardCollectionType::lastRefreshedCollectionVersion(), oidElem.OID());
- fassert(40514, statusWithLastRefreshedChunkVersion.isOK());
-
opCtx->recoveryUnit()->registerChange(
- new CollectionVersionLogOpHandler(opCtx,
- NamespaceString(refreshCollection),
- statusWithLastRefreshedChunkVersion.getValue()));
+ new CollectionVersionLogOpHandler(opCtx, NamespaceString(refreshCollection)));
}
}
@@ -461,8 +445,8 @@ void CollectionShardingState::_onConfigDeleteInvalidateCachedMetadataAndNotify(
fassertStatusOK(
40479, bsonExtractStringField(query, ShardCollectionType::uuid.name(), &deletedCollection));
- opCtx->recoveryUnit()->registerChange(new CollectionVersionLogOpHandler(
- opCtx, NamespaceString(deletedCollection), ChunkVersion::UNSHARDED()));
+ opCtx->recoveryUnit()->registerChange(
+ new CollectionVersionLogOpHandler(opCtx, NamespaceString(deletedCollection)));
}
bool CollectionShardingState::_checkShardVersionOk(OperationContext* opCtx,
diff --git a/src/mongo/db/s/read_only_catalog_cache_loader.cpp b/src/mongo/db/s/read_only_catalog_cache_loader.cpp
index 007a6203489..46cd792ddb7 100644
--- a/src/mongo/db/s/read_only_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/read_only_catalog_cache_loader.cpp
@@ -48,9 +48,7 @@ void ReadOnlyCatalogCacheLoader::onStepUp() {
return;
}
-void ReadOnlyCatalogCacheLoader::notifyOfCollectionVersionUpdate(OperationContext* opCtx,
- const NamespaceString& nss,
- const ChunkVersion& version) {
+void ReadOnlyCatalogCacheLoader::notifyOfCollectionVersionUpdate(const NamespaceString& nss) {
return;
}
diff --git a/src/mongo/db/s/read_only_catalog_cache_loader.h b/src/mongo/db/s/read_only_catalog_cache_loader.h
index 1615ba55c16..6485538c127 100644
--- a/src/mongo/db/s/read_only_catalog_cache_loader.h
+++ b/src/mongo/db/s/read_only_catalog_cache_loader.h
@@ -46,9 +46,7 @@ public:
void initializeReplicaSetRole(bool isPrimary) override;
void onStepDown() override;
void onStepUp() override;
- void notifyOfCollectionVersionUpdate(OperationContext* opCtx,
- const NamespaceString& nss,
- const ChunkVersion& version) override;
+ void notifyOfCollectionVersionUpdate(const NamespaceString& nss) override;
Status waitForCollectionVersion(OperationContext* opCtx,
const NamespaceString& nss,
const ChunkVersion& version) override;
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
index 59ee1da26c5..83596a1da3c 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
@@ -38,7 +38,6 @@
#include "mongo/db/s/shard_metadata_util.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/s/catalog/type_shard_collection.h"
-#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/stdx/memory.h"
@@ -286,11 +285,11 @@ ShardServerCatalogCacheLoader::~ShardServerCatalogCacheLoader() {
invariant(_contexts.isEmpty());
}
-void ShardServerCatalogCacheLoader::notifyOfCollectionVersionUpdate(OperationContext* opCtx,
- const NamespaceString& nss,
- const ChunkVersion& version) {
- Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss);
+void ShardServerCatalogCacheLoader::setForTesting() {
+ _testing = true;
+}
+void ShardServerCatalogCacheLoader::notifyOfCollectionVersionUpdate(const NamespaceString& nss) {
_namespaceNotifications.notifyChange(nss);
}
@@ -505,7 +504,6 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
}
}
-
if (swCollectionAndChangedChunks.isOK()) {
log() << "Cache loader remotely refreshed for collection " << nss
<< " from collection version " << maxLoaderVersion
@@ -581,15 +579,15 @@ StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoader
if (!tasksAreEnqueued) {
// There are no tasks in the queue. Return the persisted metadata.
return persisted;
- } else if (enqueued.changedChunks.empty() || enqueued.epoch != persisted.epoch) {
- // There is a task queue and either:
- // - nothing was returned, which means the last task enqueued is a drop task.
+ } else if (persisted.changedChunks.empty() || enqueued.changedChunks.empty() ||
+ enqueued.epoch != persisted.epoch) {
+ // There is a task queue and:
+ // - nothing is persisted.
+ // - nothing was returned from enqueued, which means the last task enqueued is a drop task.
// - the epoch changed in the enqueued metadata, which means there's a drop operation
// enqueued somewhere.
- // Either way, the persisted metadata is out-dated. Return enqueued results.
- return enqueued;
- } else if (persisted.changedChunks.empty()) {
- // Nothing is persisted. Return enqueued results.
+ // Whichever the cause, the persisted metadata is out-dated/non-existent. Return enqueued
+ // results.
return enqueued;
} else {
// There can be overlap between persisted and enqueued metadata because enqueued work can
@@ -699,6 +697,10 @@ void ShardServerCatalogCacheLoader::_runTasks(const NamespaceString& nss) {
_taskLists[nss].removeActiveTask();
}
+ if (_testing) {
+ notifyOfCollectionVersionUpdate(nss);
+ }
+
// Schedule more work if there is any
if (!_taskLists[nss].empty()) {
Status status = _threadPool.schedule([this, nss]() { _runTasks(nss); });
@@ -912,9 +914,22 @@ CollectionAndChangedChunks ShardServerCatalogCacheLoader::TaskList::getEnqueuedM
// the chunks vector. This will be either reset by the next task with a total reload
// with a new epoch, or cause the original getChunksSince caller to throw out the
// results and refresh again.
+
+ // Make sure we do not append a duplicate chunk. The diff query is GTE, so there can
+ // be duplicates of the same exact versioned chunk across tasks. This is no problem
+ // for our diff application algorithms, but it can return unpredictable numbers of
+ // chunks for testing purposes. Eliminate unpredicatable duplicates for testing
+ // stability.
+ auto taskCollectionAndChangedChunksIt =
+ task.collectionAndChangedChunks->changedChunks.begin();
+ if (collAndChunks.changedChunks.back().getVersion() ==
+ task.collectionAndChangedChunks->changedChunks.front().getVersion()) {
+ ++taskCollectionAndChangedChunksIt;
+ }
+
collAndChunks.changedChunks.insert(
collAndChunks.changedChunks.end(),
- task.collectionAndChangedChunks->changedChunks.begin(),
+ taskCollectionAndChangedChunksIt,
task.collectionAndChangedChunks->changedChunks.end());
}
}
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h
index 8eb863329c2..257791be7d0 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.h
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h
@@ -51,6 +51,15 @@ public:
~ShardServerCatalogCacheLoader();
/**
+ * For testing use only.
+ *
+ * Currently this only sets a boolean such that after metadata updates the notification system
+ * is signaled internally, rather than depending on the OpObservers which are not connectted for
+ * unit testing.
+ */
+ void setForTesting();
+
+ /**
* Initializes internal state so that the loader behaves as a primary or secondary. This can
* only be called once, when the sharding state is initialized.
*/
@@ -70,9 +79,7 @@ public:
* Sets any notifications waiting for this version to arrive and invalidates the catalog cache's
* chunk metadata for collection 'nss' so that the next caller provokes a refresh.
*/
- void notifyOfCollectionVersionUpdate(OperationContext* opCtx,
- const NamespaceString& nss,
- const ChunkVersion& version) override;
+ void notifyOfCollectionVersionUpdate(const NamespaceString& nss) override;
/**
* Waits for the persisted collection version to be gte to 'version', or an epoch change. Only
@@ -355,6 +362,9 @@ private:
// The collection of operation contexts in use by all threads.
OperationContextGroup _contexts;
+
+ // Gates additional actions needed when testing.
+ bool _testing{false};
};
} // namespace mongo
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp
new file mode 100644
index 00000000000..9ff8e837f71
--- /dev/null
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp
@@ -0,0 +1,492 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/shard_server_catalog_cache_loader.h"
+
+#include "mongo/db/s/catalog_cache_loader_mock.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/catalog/type_collection.h"
+#include "mongo/s/shard_server_test_fixture.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+using executor::RemoteCommandRequest;
+using executor::RemoteCommandResponse;
+using std::vector;
+using unittest::assertGet;
+using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunks;
+
+const NamespaceString kNss = NamespaceString("foo.bar");
+const std::string kPattern = "_id";
+const ShardId kShardId = ShardId("shard0");
+
+class ShardServerCatalogCacheLoaderTest : public ShardServerTestFixture {
+public:
+ /**
+ * Returns five chunks using collVersion as a starting version.
+ */
+ vector<ChunkType> makeFiveChunks(const ChunkVersion& collectionVersion);
+
+ /**
+ * Returns a chunk update diff GTE 'collVersion' for the chunks created by makeFiveChunks above.
+ */
+ vector<ChunkType> makeThreeUpdatedChunksDiff(const ChunkVersion& collectionVersion);
+
+ /**
+ * Returns a routing table applying 'threeUpdatedChunks' (the result of
+ * makeThreeUpdatedChunksDiff) to 'originalFiveChunks' (the result of makeFiveChunks).
+ */
+ vector<ChunkType> makeCombinedOriginalFiveChunksAndThreeNewChunksDiff(
+ const vector<ChunkType>& originalFiveChunks, const vector<ChunkType>& threeUpdatedChunks);
+
+ /**
+ * This helper makes a CollectionType with the current _maxCollVersion.
+ */
+ CollectionType makeCollectionType(const ChunkVersion& collVersion);
+
+ /**
+ * Sets up the _shardLoader with the results of makeFiveChunks().
+ */
+ vector<ChunkType> setUpChunkLoaderWithFiveChunks();
+
+ const KeyPattern kKeyPattern = KeyPattern(BSON(kPattern << 1));
+ const stdx::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)>
+ kDoNothingCallbackFn = [](
+ OperationContext * opCtx,
+ StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {};
+
+ CatalogCacheLoaderMock* _remoteLoaderMock;
+ std::unique_ptr<ShardServerCatalogCacheLoader> _shardLoader;
+
+private:
+ void setUp() override;
+ void tearDown() override;
+};
+
+void ShardServerCatalogCacheLoaderTest::setUp() {
+ ShardServerTestFixture::setUp();
+
+ // Create mock remote and real shard loader, retaining a pointer to the mock remote loader so
+ // that unit tests can manipulate it to return certain responses.
+ std::unique_ptr<CatalogCacheLoaderMock> mockLoader =
+ stdx::make_unique<CatalogCacheLoaderMock>();
+ _remoteLoaderMock = mockLoader.get();
+ _shardLoader = stdx::make_unique<ShardServerCatalogCacheLoader>(std::move(mockLoader));
+
+ // Set the shard loader to primary mode, and set it for testing.
+ _shardLoader->initializeReplicaSetRole(true);
+ _shardLoader->setForTesting();
+}
+
+void ShardServerCatalogCacheLoaderTest::tearDown() {
+ _shardLoader.reset();
+ ShardServerTestFixture::tearDown();
+}
+
+vector<ChunkType> ShardServerCatalogCacheLoaderTest::makeFiveChunks(
+ const ChunkVersion& collectionVersion) {
+ ChunkVersion collVersion(collectionVersion);
+ vector<ChunkType> chunks;
+
+ BSONObj mins[] = {
+ BSON("a" << MINKEY), BSON("a" << 10), BSON("a" << 50), BSON("a" << 100), BSON("a" << 200)};
+ BSONObj maxs[] = {
+ BSON("a" << 10), BSON("a" << 50), BSON("a" << 100), BSON("a" << 200), BSON("a" << MAXKEY)};
+
+ for (int i = 0; i < 5; ++i) {
+ collVersion.incMajor();
+
+ ChunkType chunk;
+ chunk.setNS(kNss.ns());
+ chunk.setMin(mins[i]);
+ chunk.setMax(maxs[i]);
+ chunk.setShard(kShardId);
+ chunk.setVersion(collVersion);
+
+ chunks.push_back(chunk);
+ }
+
+ return chunks;
+}
+
+vector<ChunkType> ShardServerCatalogCacheLoaderTest::makeThreeUpdatedChunksDiff(
+ const ChunkVersion& collectionVersion) {
+ ChunkVersion collVersion(collectionVersion);
+ vector<ChunkType> chunks;
+
+ // The diff query is for GTE a known version, so prepend the previous newest chunk, which is
+ // unmodified by this change and so should be found. Note: it is important for testing that the
+ // previous highest versioned chunk is unmodified. Otherwise the shard loader's results are
+ // dependent on a race between persistence and retrieving data because it combines enqueued and
+ // persisted results without applying modifications.
+ ChunkType oldChunk;
+ oldChunk.setNS(kNss.ns());
+ oldChunk.setMin(BSON("a" << 200));
+ oldChunk.setMax(BSON("a" << MAXKEY));
+ oldChunk.setShard(kShardId);
+ oldChunk.setVersion(collVersion);
+ chunks.push_back(oldChunk);
+
+
+ // Make chunk updates
+ BSONObj mins[] = {BSON("a" << MINKEY), BSON("a" << 5), BSON("a" << 10)};
+ BSONObj maxs[] = {BSON("a" << 5), BSON("a" << 10), BSON("a" << 100)};
+
+ for (int i = 0; i < 3; ++i) {
+ collVersion.incMinor();
+
+ ChunkType chunk;
+ chunk.setNS(kNss.ns());
+ chunk.setMin(mins[i]);
+ chunk.setMax(maxs[i]);
+ chunk.setShard(kShardId);
+ chunk.setVersion(collVersion);
+
+ chunks.push_back(chunk);
+ }
+
+ return chunks;
+}
+
+vector<ChunkType>
+ShardServerCatalogCacheLoaderTest::makeCombinedOriginalFiveChunksAndThreeNewChunksDiff(
+ const vector<ChunkType>& originalFiveChunks, const vector<ChunkType>& threeUpdatedChunksDiff) {
+ vector<ChunkType> chunks;
+
+ // Sorted by ascending chunk version, not range! Note, threeUpdatedChunksDiff already includes
+ // the last originalFiveChunks chunk because the diff query is GTE.
+ chunks.push_back(originalFiveChunks[3]);
+ chunks.insert(chunks.end(), threeUpdatedChunksDiff.begin(), threeUpdatedChunksDiff.end());
+
+ return chunks;
+}
+
+CollectionType ShardServerCatalogCacheLoaderTest::makeCollectionType(
+ const ChunkVersion& collVersion) {
+ CollectionType coll;
+ coll.setNs(kNss);
+ coll.setEpoch(collVersion.epoch());
+ coll.setUpdatedAt(Date_t::fromMillisSinceEpoch(collVersion.toLong()));
+ coll.setKeyPattern(kKeyPattern);
+ coll.setUnique(false);
+ return coll;
+}
+
+vector<ChunkType> ShardServerCatalogCacheLoaderTest::setUpChunkLoaderWithFiveChunks() {
+ ChunkVersion collectionVersion(1, 0, OID::gen());
+
+ CollectionType collectionType = makeCollectionType(collectionVersion);
+ vector<ChunkType> chunks = makeFiveChunks(collectionVersion);
+ _remoteLoaderMock->setCollectionRefreshReturnValue(collectionType);
+ _remoteLoaderMock->setChunkRefreshReturnValue(chunks);
+
+ StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{
+ Status(ErrorCodes::InternalError, "")};
+ const auto refreshCallbackFn = [&results](
+ OperationContext * opCtx,
+ StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
+ results = std::move(swCollAndChunks);
+ };
+
+ auto notification =
+ _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED(), refreshCallbackFn);
+ notification->get();
+
+ // Check refreshCallbackFn thread results where we can safely throw.
+ ASSERT_OK(results.getStatus());
+ auto collAndChunkRes = results.getValue();
+ ASSERT_EQUALS(collAndChunkRes.epoch, collectionType.getEpoch());
+ ASSERT_EQUALS(collAndChunkRes.changedChunks.size(), 5UL);
+ for (unsigned int i = 0; i < collAndChunkRes.changedChunks.size(); ++i) {
+ ASSERT_BSONOBJ_EQ(collAndChunkRes.changedChunks[i].toConfigBSON(),
+ chunks[i].toConfigBSON());
+ }
+
+ return chunks;
+}
+
+TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromUnshardedToUnsharded) {
+ // Return a NamespaceNotFound error that means the collection doesn't exist.
+
+ Status errorStatus = Status(ErrorCodes::NamespaceNotFound, "collection not found");
+ _remoteLoaderMock->setCollectionRefreshReturnValue(errorStatus);
+
+ StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{
+ Status(ErrorCodes::InternalError, "")};
+ const auto refreshCallbackFn = [&results](
+ OperationContext * opCtx,
+ StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
+ results = std::move(swCollAndChunks);
+ };
+
+ auto notification =
+ _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED(), refreshCallbackFn);
+ notification->get();
+
+ ASSERT_EQUALS(results.getStatus(), errorStatus);
+}
+
+TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedToUnsharded) {
+ // First set up the shard chunk loader as sharded.
+
+ auto chunks = setUpChunkLoaderWithFiveChunks();
+
+ // Then return a NamespaceNotFound error, which means the collection must have been dropped,
+ // clearing the chunk metadata.
+
+ Status errorStatus = Status(ErrorCodes::NamespaceNotFound, "collection not found");
+ _remoteLoaderMock->setCollectionRefreshReturnValue(errorStatus);
+
+ StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{
+ Status(ErrorCodes::InternalError, "")};
+ const auto nextRefreshCallbackFn = [&results](
+ OperationContext * opCtx,
+ StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
+ results = std::move(swCollAndChunks);
+ };
+
+ auto notification =
+ _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), nextRefreshCallbackFn);
+ notification->get();
+
+ ASSERT_EQUALS(results.getStatus(), errorStatus);
+}
+
+TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindNoDiff) {
+ // First set up the shard chunk loader as sharded.
+
+ vector<ChunkType> chunks = setUpChunkLoaderWithFiveChunks();
+
+ // Then set up the remote loader to return a single document we've already seen -- indicates
+ // there's nothing new.
+
+ vector<ChunkType> lastChunk;
+ lastChunk.push_back(chunks.back());
+ _remoteLoaderMock->setChunkRefreshReturnValue(lastChunk);
+
+ StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{
+ Status(ErrorCodes::InternalError, "")};
+ const auto refreshCallbackFn = [&results](
+ OperationContext * opCtx,
+ StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
+ results = std::move(swCollAndChunks);
+ };
+
+ auto notification =
+ _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), refreshCallbackFn);
+ notification->get();
+
+ // Check that refreshing from the latest version returned a single document matching that
+ // version.
+ ASSERT_OK(results.getStatus());
+ auto collAndChunksRes = results.getValue();
+ ASSERT_EQUALS(collAndChunksRes.epoch, chunks.back().getVersion().epoch());
+ ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 1UL);
+ ASSERT_BSONOBJ_EQ(collAndChunksRes.changedChunks.back().toConfigBSON(),
+ chunks.back().toConfigBSON());
+}
+
+// Same as the above unit test, PrimaryLoadFromShardedAndFindNoDiff, but caller requests complete
+// routing table, rather than diff from a known version.
+TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindNoDiffRequestAll) {
+ // First set up the shard chunk loader as sharded.
+
+ vector<ChunkType> chunks = setUpChunkLoaderWithFiveChunks();
+
+ // Then set up the remote loader to return a single document we've already seen -- indicates
+ // there's nothing new.
+
+ vector<ChunkType> lastChunk;
+ lastChunk.push_back(chunks.back());
+ _remoteLoaderMock->setChunkRefreshReturnValue(lastChunk);
+
+ StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{
+ Status(ErrorCodes::InternalError, "")};
+ const auto completeRefreshCallbackFn = [&results](
+ OperationContext * opCtx,
+ StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
+ results = std::move(swCollAndChunks);
+ };
+
+ auto notification =
+ _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED(), completeRefreshCallbackFn);
+ notification->get();
+
+ // Check that the complete routing table was returned successfully.
+ ASSERT_OK(results.getStatus());
+ auto collAndChunksRes = results.getValue();
+ ASSERT_EQUALS(collAndChunksRes.epoch, chunks.back().getVersion().epoch());
+ ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 5UL);
+ for (unsigned int i = 0; i < collAndChunksRes.changedChunks.size(); ++i) {
+ ASSERT_BSONOBJ_EQ(collAndChunksRes.changedChunks[i].getMin(), chunks[i].getMin());
+ ASSERT_BSONOBJ_EQ(collAndChunksRes.changedChunks[i].getMax(), chunks[i].getMax());
+ ASSERT_EQUALS(collAndChunksRes.changedChunks[i].getVersion(), chunks[i].getVersion());
+ }
+}
+
+TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindDiff) {
+ // First set up the shard chunk loader as sharded.
+
+ vector<ChunkType> chunks = setUpChunkLoaderWithFiveChunks();
+
+ // Then refresh again and find updated chunks.
+
+ ChunkVersion collVersion = chunks.back().getVersion();
+ vector<ChunkType> updatedChunksDiff = makeThreeUpdatedChunksDiff(collVersion);
+ _remoteLoaderMock->setChunkRefreshReturnValue(updatedChunksDiff);
+
+ StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{
+ Status(ErrorCodes::InternalError, "")};
+ const auto refreshCallbackFn = [&results](
+ OperationContext * opCtx,
+ StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
+ results = std::move(swCollAndChunks);
+ };
+
+ auto notification =
+ _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), refreshCallbackFn);
+ notification->get();
+
+ // Check that the diff was returned successfull.
+ ASSERT_OK(results.getStatus());
+ auto collAndChunksRes = results.getValue();
+ ASSERT_EQUALS(collAndChunksRes.epoch, updatedChunksDiff.front().getVersion().epoch());
+ ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 4UL);
+ for (unsigned int i = 0; i < collAndChunksRes.changedChunks.size(); ++i) {
+ ASSERT_BSONOBJ_EQ(collAndChunksRes.changedChunks[i].getMin(),
+ updatedChunksDiff[i].getMin());
+ ASSERT_BSONOBJ_EQ(collAndChunksRes.changedChunks[i].getMax(),
+ updatedChunksDiff[i].getMax());
+ ASSERT_EQUALS(collAndChunksRes.changedChunks[i].getVersion(),
+ updatedChunksDiff[i].getVersion());
+ }
+}
+
+TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindDiffRequestAll) {
+ // First set up the shard chunk loader as sharded.
+
+ vector<ChunkType> chunks = setUpChunkLoaderWithFiveChunks();
+
+ // First cause a remote refresh to find the updated chunks. Then wait for persistence, so that
+ // we ensure that nothing is enqueued and the next getChunksSince call will return a predictable
+ // number of chunk documents: the result of applying the enqueued update diff.
+
+ vector<ChunkType> updatedChunksDiff = makeThreeUpdatedChunksDiff(chunks.back().getVersion());
+ _remoteLoaderMock->setChunkRefreshReturnValue(updatedChunksDiff);
+
+ auto notification =
+ _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), kDoNothingCallbackFn);
+ notification->get();
+
+ // Wait for persistence of update
+ ASSERT_OK(_shardLoader->waitForCollectionVersion(
+ operationContext(), kNss, updatedChunksDiff.back().getVersion()));
+
+ // Set up the remote loader to return a single document we've already seen, indicating no change
+ // occurred.
+ vector<ChunkType> lastChunk;
+ lastChunk.push_back(updatedChunksDiff.back());
+ _remoteLoaderMock->setChunkRefreshReturnValue(lastChunk);
+
+ vector<ChunkType> completeRoutingTableWithDiffApplied =
+ makeCombinedOriginalFiveChunksAndThreeNewChunksDiff(chunks, updatedChunksDiff);
+
+ StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{
+ Status(ErrorCodes::InternalError, "")};
+ const auto refreshCallbackFn = [&results](
+ OperationContext * opCtx,
+ StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
+ results = std::move(swCollAndChunks);
+ };
+
+ auto nextNotification =
+ _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED(), refreshCallbackFn);
+ nextNotification->get();
+
+ // Check that the complete routing table, with diff applied, was returned.
+ ASSERT_OK(results.getStatus());
+ auto collAndChunksRes = results.getValue();
+ ASSERT_EQUALS(collAndChunksRes.epoch,
+ completeRoutingTableWithDiffApplied.front().getVersion().epoch());
+ ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 5UL);
+ for (unsigned int i = 0; i < collAndChunksRes.changedChunks.size(); ++i) {
+ ASSERT_BSONOBJ_EQ(collAndChunksRes.changedChunks[i].getMin(),
+ completeRoutingTableWithDiffApplied[i].getMin());
+ ASSERT_BSONOBJ_EQ(collAndChunksRes.changedChunks[i].getMax(),
+ completeRoutingTableWithDiffApplied[i].getMax());
+ ASSERT_EQUALS(collAndChunksRes.changedChunks[i].getVersion(),
+ completeRoutingTableWithDiffApplied[i].getVersion());
+ }
+}
+
+TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindNewEpoch) {
+ // First set up the shard chunk loader as sharded.
+
+ vector<ChunkType> chunks = setUpChunkLoaderWithFiveChunks();
+
+ // Then refresh again and find that the collection has been dropped and recreated.
+
+ ChunkVersion collVersionWithNewEpoch(1, 0, OID::gen());
+ CollectionType collectionTypeWithNewEpoch = makeCollectionType(collVersionWithNewEpoch);
+ vector<ChunkType> chunksWithNewEpoch = makeFiveChunks(collVersionWithNewEpoch);
+ _remoteLoaderMock->setCollectionRefreshReturnValue(collectionTypeWithNewEpoch);
+ _remoteLoaderMock->setChunkRefreshReturnValue(chunksWithNewEpoch);
+
+ StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{
+ Status(ErrorCodes::InternalError, "")};
+ const auto refreshCallbackFn = [&results](
+ OperationContext * opCtx,
+ StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
+ results = std::move(swCollAndChunks);
+ };
+
+ auto notification =
+ _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), refreshCallbackFn);
+ notification->get();
+
+ // Check that the complete routing table for the new epoch was returned.
+ ASSERT_OK(results.getStatus());
+ auto collAndChunksRes = results.getValue();
+ ASSERT_EQUALS(collAndChunksRes.epoch, collectionTypeWithNewEpoch.getEpoch());
+ ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 5UL);
+ for (unsigned int i = 0; i < collAndChunksRes.changedChunks.size(); ++i) {
+ ASSERT_BSONOBJ_EQ(collAndChunksRes.changedChunks[i].getMin(),
+ chunksWithNewEpoch[i].getMin());
+ ASSERT_BSONOBJ_EQ(collAndChunksRes.changedChunks[i].getMax(),
+ chunksWithNewEpoch[i].getMax());
+ ASSERT_EQUALS(collAndChunksRes.changedChunks[i].getVersion(),
+ chunksWithNewEpoch[i].getVersion());
+ }
+}
+
+} // namespace
+} // namespace mongo