summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/s/catalog_cache_loader_mock.cpp25
-rw-r--r--src/mongo/db/s/catalog_cache_loader_mock.h9
-rw-r--r--src/mongo/db/s/migration_util.cpp8
-rw-r--r--src/mongo/db/s/migration_util_test.cpp199
4 files changed, 210 insertions, 31 deletions
diff --git a/src/mongo/db/s/catalog_cache_loader_mock.cpp b/src/mongo/db/s/catalog_cache_loader_mock.cpp
index 406a16c26f6..7d3c7b1f80b 100644
--- a/src/mongo/db/s/catalog_cache_loader_mock.cpp
+++ b/src/mongo/db/s/catalog_cache_loader_mock.cpp
@@ -133,7 +133,26 @@ std::shared_ptr<Notification<void>> CatalogCacheLoaderMock::getChunksSince(
void CatalogCacheLoaderMock::getDatabase(
StringData dbName,
std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) {
- // Not implemented
+ _threadPool.schedule([ this, callbackFn ](auto status) noexcept {
+ invariant(status);
+
+ auto opCtx = Client::getCurrent()->makeOperationContext();
+
+ auto swDatabase = [&]() -> StatusWith<DatabaseType> {
+ try {
+ uassertStatusOK(_swDatabaseReturnValue);
+
+ return DatabaseType(_swDatabaseReturnValue.getValue().getName(),
+ _swDatabaseReturnValue.getValue().getPrimary(),
+ _swDatabaseReturnValue.getValue().getSharded(),
+ _swDatabaseReturnValue.getValue().getVersion());
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+ }();
+
+ callbackFn(opCtx.get(), std::move(swDatabase));
+ });
}
void CatalogCacheLoaderMock::setCollectionRefreshReturnValue(
@@ -146,4 +165,8 @@ void CatalogCacheLoaderMock::setChunkRefreshReturnValue(
_swChunksReturnValue = std::move(statusWithChunks);
}
+void CatalogCacheLoaderMock::setDatabaseRefreshReturnValue(StatusWith<DatabaseType> swDatabase) {
+ _swDatabaseReturnValue = std::move(swDatabase);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/catalog_cache_loader_mock.h b/src/mongo/db/s/catalog_cache_loader_mock.h
index b5a2f64a6b7..ce566a02fd7 100644
--- a/src/mongo/db/s/catalog_cache_loader_mock.h
+++ b/src/mongo/db/s/catalog_cache_loader_mock.h
@@ -77,7 +77,16 @@ public:
*/
void setChunkRefreshReturnValue(StatusWith<std::vector<ChunkType>> statusWithChunks);
+ /**
+ * Sets the mocked database entry result that getDatabase will use to construct its return
+ * value.
+ */
+ void setDatabaseRefreshReturnValue(StatusWith<DatabaseType> swDatabase);
+
private:
+ StatusWith<DatabaseType> _swDatabaseReturnValue{
+ Status(ErrorCodes::InternalError, "config loader database response is uninitialized")};
+
// These variables hold the mocked chunks and collection entry results used to construct the
// return value of getChunksSince above.
StatusWith<CollectionType> _swCollectionReturnValue{Status(
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index 18c06472062..ff3d10e6537 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -130,11 +130,7 @@ bool checkForConflictingDeletions(OperationContext* opCtx,
ExecutorFuture<bool> submitRangeDeletionTask(OperationContext* opCtx,
const RangeDeletionTask& deletionTask) {
const auto serviceContext = opCtx->getServiceContext();
- // TODO (SERVER-45577): Use the Grid's fixed executor once the refresh is done asynchronously.
- // An arbitrary executor is being used temporarily because unit tests have only one thread in
- // the fixed executor, and that thread is needed to respond to the refresh.
- return ExecutorFuture<void>(
- Grid::get(serviceContext)->getExecutorPool()->getArbitraryExecutor())
+ return ExecutorFuture<void>(Grid::get(serviceContext)->getExecutorPool()->getFixedExecutor())
.then([=] {
ThreadClient tc(kRangeDeletionThreadName, serviceContext);
{
@@ -160,7 +156,7 @@ ExecutorFuture<bool> submitRangeDeletionTask(OperationContext* opCtx,
: " is not known")
<< ", forcing a refresh of " << deletionTask.getNss();
- // TODO (SERVER-45577): Add an asynchronous version of
+ // TODO (SERVER-46075): Add an asynchronous version of
// forceShardFilteringMetadataRefresh to avoid blocking on the network in the
// thread pool.
autoColl.reset();
diff --git a/src/mongo/db/s/migration_util_test.cpp b/src/mongo/db/s/migration_util_test.cpp
index 2aafa4e6c5f..14547873f36 100644
--- a/src/mongo/db/s/migration_util_test.cpp
+++ b/src/mongo/db/s/migration_util_test.cpp
@@ -30,12 +30,16 @@
#include "mongo/client/remote_command_targeter_factory_mock.h"
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/s/catalog_cache_loader_mock.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/migration_util.h"
#include "mongo/db/s/persistent_task_store.h"
#include "mongo/db/s/shard_filtering_metadata_refresh.h"
+#include "mongo/db/s/shard_server_catalog_cache_loader.h"
+#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/wait_for_majority_service.h"
+#include "mongo/s/catalog/sharding_catalog_client_mock.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/database_version_helpers.h"
#include "mongo/s/shard_server_test_fixture.h"
@@ -376,22 +380,151 @@ TEST_F(MigrationUtilsTest, TestInvalidUUID) {
ASSERT_FALSE(migrationutil::checkForConflictingDeletions(opCtx, range, wrongUuid));
}
-using SubmitRangeDeletionTaskTest = MigrationUtilsTest;
+// Fixture that uses a mocked CatalogCacheLoader and CatalogClient to allow metadata refreshes
+// without using the mock network.
+class SubmitRangeDeletionTaskTest : public ShardServerTestFixture {
+public:
+ const HostAndPort kConfigHostAndPort{"dummy", 123};
+ const NamespaceString kNss{"test.foo"};
+ const ShardKeyPattern kShardKeyPattern = ShardKeyPattern(BSON("_id" << 1));
+ const UUID kDefaultUUID = UUID::gen();
+ const OID kEpoch = OID::gen();
+ const DatabaseType kDefaultDatabaseType =
+ DatabaseType(kNss.db().toString(), ShardId("0"), true, DatabaseVersion(kDefaultUUID, 1));
+ const std::vector<ShardType> kShardList = {ShardType("0", "Host0:12345"),
+ ShardType("1", "Host1:12345")};
+
+ void setUp() override {
+ // Don't call ShardServerTestFixture::setUp so we can install a mock catalog cache loader.
+ ShardingMongodTestFixture::setUp();
+
+ replicationCoordinator()->alwaysAllowWrites(true);
+ serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+
+ _clusterId = OID::gen();
+ ShardingState::get(getServiceContext())->setInitialized(_myShardName, _clusterId);
+
+ std::unique_ptr<CatalogCacheLoaderMock> mockLoader =
+ std::make_unique<CatalogCacheLoaderMock>();
+ _mockCatalogCacheLoader = mockLoader.get();
+ CatalogCacheLoader::set(getServiceContext(), std::move(mockLoader));
+
+ uassertStatusOK(
+ initializeGlobalShardingStateForMongodForTest(ConnectionString(kConfigHostAndPort)));
+
+ configTargeterMock()->setFindHostReturnValue(kConfigHostAndPort);
+
+ WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
+
+ // Set up 2 default shards.
+ for (const auto& shard : kShardList) {
+ std::unique_ptr<RemoteCommandTargeterMock> targeter(
+ std::make_unique<RemoteCommandTargeterMock>());
+ HostAndPort host(shard.getHost());
+ targeter->setConnectionStringReturnValue(ConnectionString(host));
+ targeter->setFindHostReturnValue(host);
+ targeterFactory()->addTargeterToReturn(ConnectionString(host), std::move(targeter));
+ }
+ }
+
+ void tearDown() override {
+ WaitForMajorityService::get(getServiceContext()).shutDown();
+ CatalogCacheLoader::clearForTests(getServiceContext());
+ ShardingMongodTestFixture::tearDown();
+ CollectionShardingStateFactory::clear(getServiceContext());
+ }
+
+ // Mock for the ShardingCatalogClient used to satisfy loading all shards for the ShardRegistry
+ // and loading all collections when a database is loaded for the first time by the CatalogCache.
+ class StaticCatalogClient final : public ShardingCatalogClientMock {
+ public:
+ StaticCatalogClient(std::vector<ShardType> shards)
+ : ShardingCatalogClientMock(nullptr), _shards(std::move(shards)) {}
+
+ StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards(
+ OperationContext* opCtx, repl::ReadConcernLevel readConcern) override {
+ return repl::OpTimeWith<std::vector<ShardType>>(_shards);
+ }
+
+ StatusWith<std::vector<CollectionType>> getCollections(
+ OperationContext* opCtx,
+ const std::string* dbName,
+ repl::OpTime* optime,
+ repl::ReadConcernLevel readConcernLevel) override {
+ return _colls;
+ }
+
+ void setCollections(std::vector<CollectionType> colls) {
+ _colls = std::move(colls);
+ }
+
+ private:
+ const std::vector<ShardType> _shards;
+ std::vector<CollectionType> _colls;
+ };
+
+ std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient(
+ std::unique_ptr<DistLockManager> distLockManager) override {
+ auto mockCatalogClient = std::make_unique<StaticCatalogClient>(kShardList);
+ // Stash a pointer to the mock so its return values can be set.
+ _mockCatalogClient = mockCatalogClient.get();
+ return mockCatalogClient;
+ }
+
+ CollectionType makeCollectionType(UUID uuid, OID epoch) {
+ CollectionType coll;
+ coll.setNs(kNss);
+ coll.setEpoch(epoch);
+ coll.setKeyPattern(kShardKeyPattern.getKeyPattern());
+ coll.setUnique(true);
+ coll.setUUID(uuid);
+ return coll;
+ }
+
+ std::vector<ChunkType> makeChangedChunks(ChunkVersion startingVersion) {
+ ChunkType chunk1(kNss,
+ {kShardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << -100)},
+ startingVersion,
+ {"0"});
+ chunk1.setName(OID::gen());
+ startingVersion.incMinor();
+
+ ChunkType chunk2(kNss, {BSON("_id" << -100), BSON("_id" << 0)}, startingVersion, {"1"});
+ chunk2.setName(OID::gen());
+ startingVersion.incMinor();
+
+ ChunkType chunk3(kNss, {BSON("_id" << 0), BSON("_id" << 100)}, startingVersion, {"0"});
+ chunk3.setName(OID::gen());
+ startingVersion.incMinor();
+
+ ChunkType chunk4(kNss,
+ {BSON("_id" << 100), kShardKeyPattern.getKeyPattern().globalMax()},
+ startingVersion,
+ {"1"});
+ chunk4.setName(OID::gen());
+ startingVersion.incMinor();
+
+ return std::vector<ChunkType>{chunk1, chunk2, chunk3, chunk4};
+ }
+
+ CatalogCacheLoaderMock* _mockCatalogCacheLoader;
+ StaticCatalogClient* _mockCatalogClient;
+};
TEST_F(SubmitRangeDeletionTaskTest,
FailsAndDeletesTaskIfFilteringMetadataIsUnknownEvenAfterRefresh) {
auto opCtx = operationContext();
- const auto uuid = UUID::gen();
- auto deletionTask = createDeletionTask(kNss, uuid, 0, 10);
+ auto deletionTask = createDeletionTask(kNss, kDefaultUUID, 0, 10);
PersistentTaskStore<RangeDeletionTask> store(opCtx, NamespaceString::kRangeDeletionNamespace);
store.add(opCtx, deletionTask);
ASSERT_EQ(store.count(opCtx), 1);
- // Make the refresh triggered by submitting the task return an empty result.
- auto result = stdx::async(stdx::launch::async,
- [this, uuid] { respondToMetadataRefreshRequestsWithError(); });
+ // Make the refresh triggered by submitting the task return an empty result when loading the
+ // database.
+ _mockCatalogCacheLoader->setDatabaseRefreshReturnValue(
+ Status(ErrorCodes::NamespaceNotFound, "dummy errmsg"));
auto submitTaskFuture = migrationutil::submitRangeDeletionTask(opCtx, deletionTask);
@@ -404,12 +537,15 @@ TEST_F(SubmitRangeDeletionTaskTest,
TEST_F(SubmitRangeDeletionTaskTest, SucceedsIfFilteringMetadataUUIDMatchesTaskUUID) {
auto opCtx = operationContext();
- const auto uuid = UUID::gen();
- auto deletionTask = createDeletionTask(kNss, uuid, 0, 10);
+ auto deletionTask = createDeletionTask(kNss, kDefaultUUID, 0, 10);
// Force a metadata refresh with the task's UUID before the task is submitted.
- auto result =
- stdx::async(stdx::launch::async, [this, uuid] { respondToMetadataRefreshRequests(uuid); });
+ auto coll = makeCollectionType(kDefaultUUID, kEpoch);
+ _mockCatalogCacheLoader->setDatabaseRefreshReturnValue(kDefaultDatabaseType);
+ _mockCatalogCacheLoader->setCollectionRefreshReturnValue(coll);
+ _mockCatalogCacheLoader->setChunkRefreshReturnValue(
+ makeChangedChunks(ChunkVersion(1, 0, kEpoch)));
+ _mockCatalogClient->setCollections({coll});
forceShardFilteringMetadataRefresh(opCtx, kNss, true);
// The task should have been submitted successfully.
@@ -422,12 +558,15 @@ TEST_F(
SucceedsIfFilteringMetadataInitiallyUnknownButFilteringMetadataUUIDMatchesTaskUUIDAfterRefresh) {
auto opCtx = operationContext();
- const auto uuid = UUID::gen();
- auto deletionTask = createDeletionTask(kNss, uuid, 0, 10);
+ auto deletionTask = createDeletionTask(kNss, kDefaultUUID, 0, 10);
// Make the refresh triggered by submitting the task return a UUID that matches the task's UUID.
- auto result =
- stdx::async(stdx::launch::async, [this, uuid] { respondToMetadataRefreshRequests(uuid); });
+ auto coll = makeCollectionType(kDefaultUUID, kEpoch);
+ _mockCatalogCacheLoader->setDatabaseRefreshReturnValue(kDefaultDatabaseType);
+ _mockCatalogCacheLoader->setCollectionRefreshReturnValue(coll);
+ _mockCatalogCacheLoader->setChunkRefreshReturnValue(
+ makeChangedChunks(ChunkVersion(1, 0, kEpoch)));
+ _mockCatalogClient->setCollections({coll});
// The task should have been submitted successfully.
auto submitTaskFuture = migrationutil::submitRangeDeletionTask(opCtx, deletionTask);
@@ -440,16 +579,24 @@ TEST_F(SubmitRangeDeletionTaskTest,
// Force a metadata refresh with an arbitrary UUID so that the node's filtering metadata is
// stale when the task is submitted.
- auto result1 = stdx::async(stdx::launch::async, [this] { respondToMetadataRefreshRequests(); });
+ const auto staleUUID = UUID::gen();
+ const auto staleEpoch = OID::gen();
+ auto staleColl = makeCollectionType(staleUUID, staleEpoch);
+ _mockCatalogCacheLoader->setDatabaseRefreshReturnValue(kDefaultDatabaseType);
+ _mockCatalogCacheLoader->setCollectionRefreshReturnValue(staleColl);
+ _mockCatalogCacheLoader->setChunkRefreshReturnValue(
+ makeChangedChunks(ChunkVersion(1, 0, staleEpoch)));
+ _mockCatalogClient->setCollections({staleColl});
forceShardFilteringMetadataRefresh(opCtx, kNss, true);
- const auto uuid = UUID::gen();
- auto deletionTask = createDeletionTask(kNss, uuid, 0, 10);
+ auto deletionTask = createDeletionTask(kNss, kDefaultUUID, 0, 10);
// Make the refresh triggered by submitting the task return a UUID that matches the task's UUID.
- auto result2 = stdx::async(stdx::launch::async, [this, uuid] {
- respondToMetadataRefreshRequests(uuid, true /* incrementalRefresh */);
- });
+ auto matchingColl = makeCollectionType(kDefaultUUID, kEpoch);
+ _mockCatalogCacheLoader->setCollectionRefreshReturnValue(matchingColl);
+ _mockCatalogCacheLoader->setChunkRefreshReturnValue(
+ makeChangedChunks(ChunkVersion(10, 0, kEpoch)));
+ _mockCatalogClient->setCollections({matchingColl});
// The task should have been submitted successfully.
auto submitTaskFuture = migrationutil::submitRangeDeletionTask(opCtx, deletionTask);
@@ -460,16 +607,20 @@ TEST_F(SubmitRangeDeletionTaskTest,
FailsAndDeletesTaskIfFilteringMetadataUUIDDifferentFromTaskUUIDEvenAfterRefresh) {
auto opCtx = operationContext();
- const auto uuid = UUID::gen();
- auto deletionTask = createDeletionTask(kNss, uuid, 0, 10);
+ auto deletionTask = createDeletionTask(kNss, kDefaultUUID, 0, 10);
PersistentTaskStore<RangeDeletionTask> store(opCtx, NamespaceString::kRangeDeletionNamespace);
store.add(opCtx, deletionTask);
ASSERT_EQ(store.count(opCtx), 1);
// Make the refresh triggered by submitting the task return an arbitrary UUID.
- auto result2 =
- stdx::async(stdx::launch::async, [this, uuid] { respondToMetadataRefreshRequests(); });
+ const auto otherEpoch = OID::gen();
+ auto otherColl = makeCollectionType(UUID::gen(), otherEpoch);
+ _mockCatalogCacheLoader->setDatabaseRefreshReturnValue(kDefaultDatabaseType);
+ _mockCatalogCacheLoader->setCollectionRefreshReturnValue(otherColl);
+ _mockCatalogCacheLoader->setChunkRefreshReturnValue(
+ makeChangedChunks(ChunkVersion(1, 0, otherEpoch)));
+ _mockCatalogClient->setCollections({otherColl});
// The task should not have been submitted, and the task's entry should have been removed from
// the persistent store.