summaryrefslogtreecommitdiff
path: root/src/mongo/s/catalog/sharding_catalog_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/catalog/sharding_catalog_test.cpp')
-rw-r--r--src/mongo/s/catalog/sharding_catalog_test.cpp2527
1 files changed, 2527 insertions, 0 deletions
diff --git a/src/mongo/s/catalog/sharding_catalog_test.cpp b/src/mongo/s/catalog/sharding_catalog_test.cpp
new file mode 100644
index 00000000000..384b9239276
--- /dev/null
+++ b/src/mongo/s/catalog/sharding_catalog_test.cpp
@@ -0,0 +1,2527 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include <pcrecpp.h>
+
+#include "mongo/bson/json.h"
+#include "mongo/client/remote_command_targeter_mock.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/query/query_request.h"
+#include "mongo/db/repl/read_concern_args.h"
+#include "mongo/executor/network_interface_mock.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/rpc/metadata/repl_set_metadata.h"
+#include "mongo/rpc/metadata/server_selection_metadata.h"
+#include "mongo/s/catalog/dist_lock_manager_mock.h"
+#include "mongo/s/catalog/sharding_catalog_client_impl.h"
+#include "mongo/s/catalog/sharding_catalog_test_fixture.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/catalog/type_collection.h"
+#include "mongo/s/catalog/type_database.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/catalog/type_tags.h"
+#include "mongo/s/chunk_version.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/write_ops/batched_command_response.h"
+#include "mongo/s/write_ops/batched_insert_request.h"
+#include "mongo/s/write_ops/batched_update_request.h"
+#include "mongo/stdx/future.h"
+#include "mongo/util/log.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+namespace {
+
+using executor::NetworkInterfaceMock;
+using executor::RemoteCommandRequest;
+using executor::RemoteCommandResponse;
+using executor::TaskExecutor;
+using rpc::ReplSetMetadata;
+using repl::OpTime;
+using std::string;
+using std::vector;
+using unittest::assertGet;
+
+using ShardingCatalogClientTest = ShardingCatalogTestFixture;
+
+const int kMaxCommandRetry = 3;
+
+const BSONObj kReplSecondaryOkMetadata{[] {
+ BSONObjBuilder o;
+ o.appendElements(rpc::ServerSelectionMetadata(true, boost::none).toBSON());
+ o.append(rpc::kReplSetMetadataFieldName, 1);
+ return o.obj();
+}()};
+
+TEST_F(ShardingCatalogClientTest, GetCollectionExisting) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ CollectionType expectedColl;
+ expectedColl.setNs(NamespaceString("TestDB.TestNS"));
+ expectedColl.setKeyPattern(BSON("KeyName" << 1));
+ expectedColl.setUpdatedAt(Date_t());
+ expectedColl.setEpoch(OID::gen());
+
+ const OpTime newOpTime(Timestamp(7, 6), 5);
+
+ auto future = launchAsync([this, &expectedColl] {
+ return assertGet(
+ catalogClient()->getCollection(operationContext(), expectedColl.getNs().ns()));
+ });
+
+ onFindWithMetadataCommand(
+ [this, &expectedColl, newOpTime](const RemoteCommandRequest& request) {
+
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ ASSERT_EQ(nss.ns(), CollectionType::ConfigNS);
+
+ auto query = assertGet(QueryRequest::makeFromFindCommand(nss, request.cmdObj, false));
+
+ // Ensure the query is correct
+ ASSERT_EQ(query->ns(), CollectionType::ConfigNS);
+ ASSERT_BSONOBJ_EQ(query->getFilter(),
+ BSON(CollectionType::fullNs(expectedColl.getNs().ns())));
+ ASSERT_BSONOBJ_EQ(query->getSort(), BSONObj());
+ ASSERT_EQ(query->getLimit().get(), 1);
+
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+
+ ReplSetMetadata metadata(10, newOpTime, newOpTime, 100, OID(), 30, -1);
+ BSONObjBuilder builder;
+ metadata.writeToMetadata(&builder);
+
+ return std::make_tuple(vector<BSONObj>{expectedColl.toBSON()}, builder.obj());
+ });
+
+ // Now wait for the getCollection call to return
+ const auto collOpTimePair = future.timed_get(kFutureTimeout);
+ ASSERT_EQ(newOpTime, collOpTimePair.opTime);
+ ASSERT_BSONOBJ_EQ(expectedColl.toBSON(), collOpTimePair.value.toBSON());
+}
+
+TEST_F(ShardingCatalogClientTest, GetCollectionNotExisting) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ auto future = launchAsync([this] {
+ auto status = catalogClient()->getCollection(operationContext(), "NonExistent");
+ ASSERT_EQUALS(status.getStatus(), ErrorCodes::NamespaceNotFound);
+ });
+
+ onFindCommand([](const RemoteCommandRequest& request) { return vector<BSONObj>{}; });
+
+ // Now wait for the getCollection call to return
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, GetDatabaseInvalidName) {
+ auto status = catalogClient()->getDatabase(operationContext(), "b.c").getStatus();
+ ASSERT_EQ(ErrorCodes::InvalidNamespace, status.code());
+ ASSERT_FALSE(status.reason().empty());
+}
+
+TEST_F(ShardingCatalogClientTest, GetDatabaseExisting) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ DatabaseType expectedDb;
+ expectedDb.setName("bigdata");
+ expectedDb.setPrimary(ShardId("shard0000"));
+ expectedDb.setSharded(true);
+
+ const OpTime newOpTime(Timestamp(7, 6), 5);
+
+ auto future = launchAsync([this, &expectedDb] {
+ return assertGet(catalogClient()->getDatabase(operationContext(), expectedDb.getName()));
+ });
+
+ onFindWithMetadataCommand([this, &expectedDb, newOpTime](const RemoteCommandRequest& request) {
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ ASSERT_EQ(nss.ns(), DatabaseType::ConfigNS);
+
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ auto query = assertGet(QueryRequest::makeFromFindCommand(nss, request.cmdObj, false));
+
+ ASSERT_EQ(query->ns(), DatabaseType::ConfigNS);
+ ASSERT_BSONOBJ_EQ(query->getFilter(), BSON(DatabaseType::name(expectedDb.getName())));
+ ASSERT_BSONOBJ_EQ(query->getSort(), BSONObj());
+ ASSERT_EQ(query->getLimit().get(), 1);
+
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+
+ ReplSetMetadata metadata(10, newOpTime, newOpTime, 100, OID(), 30, -1);
+ BSONObjBuilder builder;
+ metadata.writeToMetadata(&builder);
+
+ return std::make_tuple(vector<BSONObj>{expectedDb.toBSON()}, builder.obj());
+ });
+
+ const auto dbOpTimePair = future.timed_get(kFutureTimeout);
+ ASSERT_EQ(newOpTime, dbOpTimePair.opTime);
+ ASSERT_BSONOBJ_EQ(expectedDb.toBSON(), dbOpTimePair.value.toBSON());
+}
+
+TEST_F(ShardingCatalogClientTest, GetDatabaseStaleSecondaryRetrySuccess) {
+ HostAndPort firstHost{"TestHost1"};
+ HostAndPort secondHost{"TestHost2"};
+ configTargeter()->setFindHostReturnValue(firstHost);
+
+ DatabaseType expectedDb;
+ expectedDb.setName("bigdata");
+ expectedDb.setPrimary(ShardId("shard0000"));
+ expectedDb.setSharded(true);
+
+ auto future = launchAsync([this, &expectedDb] {
+ return assertGet(catalogClient()->getDatabase(operationContext(), expectedDb.getName()));
+ });
+
+ // Return empty result set as if the database wasn't found
+ onFindCommand([this, &firstHost, &secondHost](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(firstHost, request.target);
+ configTargeter()->setFindHostReturnValue(secondHost);
+ return vector<BSONObj>{};
+ });
+
+ // Make sure we retarget and retry.
+ onFindCommand([this, &expectedDb, &secondHost](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(secondHost, request.target);
+ return vector<BSONObj>{expectedDb.toBSON()};
+ });
+
+ const auto dbOpTimePair = future.timed_get(kFutureTimeout);
+ ASSERT_BSONOBJ_EQ(expectedDb.toBSON(), dbOpTimePair.value.toBSON());
+}
+
+TEST_F(ShardingCatalogClientTest, GetDatabaseStaleSecondaryRetryNoPrimary) {
+ HostAndPort testHost{"TestHost1"};
+ configTargeter()->setFindHostReturnValue(testHost);
+
+ auto future = launchAsync([this] {
+ auto dbResult = catalogClient()->getDatabase(operationContext(), "NonExistent");
+ ASSERT_EQ(dbResult.getStatus(), ErrorCodes::NotMaster);
+ });
+
+ // Return empty result set as if the database wasn't found
+ onFindCommand([this, &testHost](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(testHost, request.target);
+ // Make it so when it attempts to retarget and retry it will get a NotMaster error.
+ configTargeter()->setFindHostReturnValue(Status(ErrorCodes::NotMaster, "no config master"));
+ return vector<BSONObj>{};
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, GetDatabaseNotExisting) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ auto future = launchAsync([this] {
+ auto dbResult = catalogClient()->getDatabase(operationContext(), "NonExistent");
+ ASSERT_EQ(dbResult.getStatus(), ErrorCodes::NamespaceNotFound);
+ });
+
+ onFindCommand([](const RemoteCommandRequest& request) { return vector<BSONObj>{}; });
+ onFindCommand([](const RemoteCommandRequest& request) { return vector<BSONObj>{}; });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, UpdateCollection) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ CollectionType collection;
+ collection.setNs(NamespaceString("db.coll"));
+ collection.setUpdatedAt(network()->now());
+ collection.setUnique(true);
+ collection.setEpoch(OID::gen());
+ collection.setKeyPattern(KeyPattern(BSON("_id" << 1)));
+
+ auto future = launchAsync([this, collection] {
+ auto status = catalogClient()->updateCollection(
+ operationContext(), collection.getNs().toString(), collection);
+ ASSERT_OK(status);
+ });
+
+ expectUpdateCollection(HostAndPort("TestHost1"), collection);
+
+ // Now wait for the updateCollection call to return
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, UpdateCollectionNotMaster) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ CollectionType collection;
+ collection.setNs(NamespaceString("db.coll"));
+ collection.setUpdatedAt(network()->now());
+ collection.setUnique(true);
+ collection.setEpoch(OID::gen());
+ collection.setKeyPattern(KeyPattern(BSON("_id" << 1)));
+
+ auto future = launchAsync([this, collection] {
+ auto status = catalogClient()->updateCollection(
+ operationContext(), collection.getNs().toString(), collection);
+ ASSERT_EQUALS(ErrorCodes::NotMaster, status);
+ });
+
+ for (int i = 0; i < 3; ++i) {
+ onCommand([](const RemoteCommandRequest& request) {
+ BatchedCommandResponse response;
+ response.setOk(false);
+ response.setErrCode(ErrorCodes::NotMaster);
+ response.setErrMessage("not master");
+
+ return response.toBSON();
+ });
+ }
+
+ // Now wait for the updateCollection call to return
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, UpdateCollectionNotMasterFromTargeter) {
+ configTargeter()->setFindHostReturnValue(Status(ErrorCodes::NotMaster, "not master"));
+
+ CollectionType collection;
+ collection.setNs(NamespaceString("db.coll"));
+ collection.setUpdatedAt(network()->now());
+ collection.setUnique(true);
+ collection.setEpoch(OID::gen());
+ collection.setKeyPattern(KeyPattern(BSON("_id" << 1)));
+
+ auto future = launchAsync([this, collection] {
+ auto status = catalogClient()->updateCollection(
+ operationContext(), collection.getNs().toString(), collection);
+ ASSERT_EQUALS(ErrorCodes::NotMaster, status);
+ });
+
+ // Now wait for the updateCollection call to return
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, UpdateCollectionNotMasterRetrySuccess) {
+ HostAndPort host1("TestHost1");
+ HostAndPort host2("TestHost2");
+ configTargeter()->setFindHostReturnValue(host1);
+
+ CollectionType collection;
+ collection.setNs(NamespaceString("db.coll"));
+ collection.setUpdatedAt(network()->now());
+ collection.setUnique(true);
+ collection.setEpoch(OID::gen());
+ collection.setKeyPattern(KeyPattern(BSON("_id" << 1)));
+
+ auto future = launchAsync([this, collection] {
+ auto status = catalogClient()->updateCollection(
+ operationContext(), collection.getNs().toString(), collection);
+ ASSERT_OK(status);
+ });
+
+ onCommand([&](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(host1, request.target);
+
+ BatchedCommandResponse response;
+ response.setOk(false);
+ response.setErrCode(ErrorCodes::NotMaster);
+ response.setErrMessage("not master");
+
+ // Ensure that when the catalog manager tries to retarget after getting the
+ // NotMaster response, it will get back a new target.
+ configTargeter()->setFindHostReturnValue(host2);
+ return response.toBSON();
+ });
+
+ expectUpdateCollection(HostAndPort(host2), collection);
+
+ // Now wait for the updateCollection call to return
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, GetAllShardsValid) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ ShardType s1;
+ s1.setName("shard0000");
+ s1.setHost("ShardHost");
+ s1.setDraining(false);
+ s1.setMaxSizeMB(50);
+ s1.setTags({"tag1", "tag2", "tag3"});
+
+ ShardType s2;
+ s2.setName("shard0001");
+ s2.setHost("ShardHost");
+
+ ShardType s3;
+ s3.setName("shard0002");
+ s3.setHost("ShardHost");
+ s3.setMaxSizeMB(65);
+
+ const vector<ShardType> expectedShardsList = {s1, s2, s3};
+
+ auto future = launchAsync([this] {
+ auto shards = assertGet(catalogClient()->getAllShards(
+ operationContext(), repl::ReadConcernLevel::kMajorityReadConcern));
+ return shards.value;
+ });
+
+ onFindCommand([this, &s1, &s2, &s3](const RemoteCommandRequest& request) {
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ ASSERT_EQ(nss.ns(), ShardType::ConfigNS);
+
+ auto query = assertGet(QueryRequest::makeFromFindCommand(nss, request.cmdObj, false));
+
+ ASSERT_EQ(query->ns(), ShardType::ConfigNS);
+ ASSERT_BSONOBJ_EQ(query->getFilter(), BSONObj());
+ ASSERT_BSONOBJ_EQ(query->getSort(), BSONObj());
+ ASSERT_FALSE(query->getLimit().is_initialized());
+
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+
+ return vector<BSONObj>{s1.toBSON(), s2.toBSON(), s3.toBSON()};
+ });
+
+ const vector<ShardType> actualShardsList = future.timed_get(kFutureTimeout);
+ ASSERT_EQ(actualShardsList.size(), expectedShardsList.size());
+
+ for (size_t i = 0; i < actualShardsList.size(); ++i) {
+ ASSERT_BSONOBJ_EQ(actualShardsList[i].toBSON(), expectedShardsList[i].toBSON());
+ }
+}
+
+TEST_F(ShardingCatalogClientTest, GetAllShardsWithInvalidShard) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ auto future = launchAsync([this] {
+ auto status = catalogClient()->getAllShards(operationContext(),
+ repl::ReadConcernLevel::kMajorityReadConcern);
+
+ ASSERT_EQ(ErrorCodes::FailedToParse, status.getStatus());
+ });
+
+ onFindCommand([](const RemoteCommandRequest& request) {
+ // Valid ShardType
+ ShardType s1;
+ s1.setName("shard0001");
+ s1.setHost("ShardHost");
+
+ return vector<BSONObj>{
+ s1.toBSON(),
+ BSONObj() // empty document is invalid
+ };
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, GetChunksForNSWithSortAndLimit) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ OID oid = OID::gen();
+
+ ChunkType chunkA;
+ chunkA.setNS("TestDB.TestColl");
+ chunkA.setMin(BSON("a" << 1));
+ chunkA.setMax(BSON("a" << 100));
+ chunkA.setVersion({1, 2, oid});
+ chunkA.setShard(ShardId("shard0000"));
+
+ ChunkType chunkB;
+ chunkB.setNS("TestDB.TestColl");
+ chunkB.setMin(BSON("a" << 100));
+ chunkB.setMax(BSON("a" << 200));
+ chunkB.setVersion({3, 4, oid});
+ chunkB.setShard(ShardId("shard0001"));
+
+ ChunkVersion queryChunkVersion({1, 2, oid});
+
+ const BSONObj chunksQuery(
+ BSON(ChunkType::ns("TestDB.TestColl")
+ << ChunkType::DEPRECATED_lastmod()
+ << BSON("$gte" << static_cast<long long>(queryChunkVersion.toLong()))));
+
+ const OpTime newOpTime(Timestamp(7, 6), 5);
+
+ auto future = launchAsync([this, &chunksQuery, newOpTime] {
+ vector<ChunkType> chunks;
+ OpTime opTime;
+
+ ASSERT_OK(catalogClient()->getChunks(operationContext(),
+ chunksQuery,
+ BSON(ChunkType::DEPRECATED_lastmod() << -1),
+ 1,
+ &chunks,
+ &opTime,
+ repl::ReadConcernLevel::kMajorityReadConcern));
+ ASSERT_EQ(2U, chunks.size());
+ ASSERT_EQ(newOpTime, opTime);
+
+ return chunks;
+ });
+
+ onFindWithMetadataCommand([this, &chunksQuery, chunkA, chunkB, newOpTime](
+ const RemoteCommandRequest& request) {
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ ASSERT_EQ(nss.ns(), ChunkType::ConfigNS);
+
+ auto query = assertGet(QueryRequest::makeFromFindCommand(nss, request.cmdObj, false));
+
+ ASSERT_EQ(query->ns(), ChunkType::ConfigNS);
+ ASSERT_BSONOBJ_EQ(query->getFilter(), chunksQuery);
+ ASSERT_BSONOBJ_EQ(query->getSort(), BSON(ChunkType::DEPRECATED_lastmod() << -1));
+ ASSERT_EQ(query->getLimit().get(), 1);
+
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+
+ ReplSetMetadata metadata(10, newOpTime, newOpTime, 100, OID(), 30, -1);
+ BSONObjBuilder builder;
+ metadata.writeToMetadata(&builder);
+
+ return std::make_tuple(vector<BSONObj>{chunkA.toBSON(), chunkB.toBSON()}, builder.obj());
+ });
+
+ const auto& chunks = future.timed_get(kFutureTimeout);
+ ASSERT_BSONOBJ_EQ(chunkA.toBSON(), chunks[0].toBSON());
+ ASSERT_BSONOBJ_EQ(chunkB.toBSON(), chunks[1].toBSON());
+}
+
+TEST_F(ShardingCatalogClientTest, GetChunksForNSNoSortNoLimit) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ ChunkVersion queryChunkVersion({1, 2, OID::gen()});
+
+ const BSONObj chunksQuery(
+ BSON(ChunkType::ns("TestDB.TestColl")
+ << ChunkType::DEPRECATED_lastmod()
+ << BSON("$gte" << static_cast<long long>(queryChunkVersion.toLong()))));
+
+ auto future = launchAsync([this, &chunksQuery] {
+ vector<ChunkType> chunks;
+
+ ASSERT_OK(catalogClient()->getChunks(operationContext(),
+ chunksQuery,
+ BSONObj(),
+ boost::none,
+ &chunks,
+ nullptr,
+ repl::ReadConcernLevel::kMajorityReadConcern));
+ ASSERT_EQ(0U, chunks.size());
+
+ return chunks;
+ });
+
+ onFindCommand([this, &chunksQuery](const RemoteCommandRequest& request) {
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ ASSERT_EQ(nss.ns(), ChunkType::ConfigNS);
+
+ auto query = assertGet(QueryRequest::makeFromFindCommand(nss, request.cmdObj, false));
+
+ ASSERT_EQ(query->ns(), ChunkType::ConfigNS);
+ ASSERT_BSONOBJ_EQ(query->getFilter(), chunksQuery);
+ ASSERT_BSONOBJ_EQ(query->getSort(), BSONObj());
+ ASSERT_FALSE(query->getLimit().is_initialized());
+
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+
+ return vector<BSONObj>{};
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, GetChunksForNSInvalidChunk) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ ChunkVersion queryChunkVersion({1, 2, OID::gen()});
+
+ const BSONObj chunksQuery(
+ BSON(ChunkType::ns("TestDB.TestColl")
+ << ChunkType::DEPRECATED_lastmod()
+ << BSON("$gte" << static_cast<long long>(queryChunkVersion.toLong()))));
+
+ auto future = launchAsync([this, &chunksQuery] {
+ vector<ChunkType> chunks;
+ Status status = catalogClient()->getChunks(operationContext(),
+ chunksQuery,
+ BSONObj(),
+ boost::none,
+ &chunks,
+ nullptr,
+ repl::ReadConcernLevel::kMajorityReadConcern);
+
+ ASSERT_EQUALS(ErrorCodes::FailedToParse, status);
+ ASSERT_EQ(0U, chunks.size());
+ });
+
+ onFindCommand([&chunksQuery](const RemoteCommandRequest& request) {
+ ChunkType chunkA;
+ chunkA.setNS("TestDB.TestColl");
+ chunkA.setMin(BSON("a" << 1));
+ chunkA.setMax(BSON("a" << 100));
+ chunkA.setVersion({1, 2, OID::gen()});
+ chunkA.setShard(ShardId("shard0000"));
+
+ ChunkType chunkB;
+ chunkB.setNS("TestDB.TestColl");
+ chunkB.setMin(BSON("a" << 100));
+ chunkB.setMax(BSON("a" << 200));
+ chunkB.setVersion({3, 4, OID::gen()});
+ // Missing shard id
+
+ return vector<BSONObj>{chunkA.toBSON(), chunkB.toBSON()};
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, RunUserManagementReadCommand) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ auto future = launchAsync([this] {
+ BSONObjBuilder responseBuilder;
+ bool ok = catalogClient()->runUserManagementReadCommand(
+ operationContext(), "test", BSON("usersInfo" << 1), &responseBuilder);
+ ASSERT_TRUE(ok);
+
+ BSONObj response = responseBuilder.obj();
+ ASSERT_TRUE(response["ok"].trueValue());
+ auto users = response["users"].Array();
+ ASSERT_EQUALS(0U, users.size());
+ });
+
+ onCommand([](const RemoteCommandRequest& request) {
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ ASSERT_EQUALS("test", request.dbname);
+ ASSERT_BSONOBJ_EQ(BSON("usersInfo" << 1 << "maxTimeMS" << 30000), request.cmdObj);
+
+ return BSON("ok" << 1 << "users" << BSONArrayBuilder().arr());
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, RunUserManagementReadCommandUnsatisfiedReadPref) {
+ configTargeter()->setFindHostReturnValue(
+ Status(ErrorCodes::FailedToSatisfyReadPreference, "no nodes up"));
+
+ BSONObjBuilder responseBuilder;
+ bool ok = catalogClient()->runUserManagementReadCommand(
+ operationContext(), "test", BSON("usersInfo" << 1), &responseBuilder);
+ ASSERT_FALSE(ok);
+
+ Status commandStatus = getStatusFromCommandResult(responseBuilder.obj());
+ ASSERT_EQUALS(ErrorCodes::FailedToSatisfyReadPreference, commandStatus);
+}
+
+TEST_F(ShardingCatalogClientTest, RunUserManagementWriteCommandSuccess) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ auto future = launchAsync([this] {
+ BSONObjBuilder responseBuilder;
+ bool ok = catalogClient()->runUserManagementWriteCommand(operationContext(),
+ "dropUser",
+ "test",
+ BSON("dropUser"
+ << "test"),
+ &responseBuilder);
+ ASSERT_FALSE(ok);
+
+ Status commandStatus = getStatusFromCommandResult(responseBuilder.obj());
+ ASSERT_EQUALS(ErrorCodes::UserNotFound, commandStatus);
+ });
+
+ onCommand([](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS("test", request.dbname);
+ // Since no write concern was sent we will add w:majority
+ ASSERT_BSONOBJ_EQ(BSON("dropUser"
+ << "test"
+ << "writeConcern"
+ << BSON("w"
+ << "majority"
+ << "wtimeout"
+ << 0)
+ << "maxTimeMS"
+ << 30000),
+ request.cmdObj);
+
+ ASSERT_BSONOBJ_EQ(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
+
+ BSONObjBuilder responseBuilder;
+ Command::appendCommandStatus(responseBuilder,
+ Status(ErrorCodes::UserNotFound, "User test@test not found"));
+ return responseBuilder.obj();
+ });
+
+ // Now wait for the runUserManagementWriteCommand call to return
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, RunUserManagementWriteCommandInvalidWriteConcern) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ BSONObjBuilder responseBuilder;
+ bool ok = catalogClient()->runUserManagementWriteCommand(operationContext(),
+ "dropUser",
+ "test",
+ BSON("dropUser"
+ << "test"
+ << "writeConcern"
+ << BSON("w" << 2)),
+ &responseBuilder);
+ ASSERT_FALSE(ok);
+
+ Status commandStatus = getStatusFromCommandResult(responseBuilder.obj());
+ ASSERT_EQUALS(ErrorCodes::InvalidOptions, commandStatus);
+ ASSERT_STRING_CONTAINS(commandStatus.reason(), "Invalid replication write concern");
+}
+
+TEST_F(ShardingCatalogClientTest, RunUserManagementWriteCommandRewriteWriteConcern) {
+ // Tests that if you send a w:1 write concern it gets replaced with w:majority
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ distLock()->expectLock(
+ [](StringData name, StringData whyMessage, Milliseconds waitFor) {
+ ASSERT_EQUALS("authorizationData", name);
+ ASSERT_EQUALS("dropUser", whyMessage);
+ },
+ Status::OK());
+
+ auto future = launchAsync([this] {
+ BSONObjBuilder responseBuilder;
+ bool ok = catalogClient()->runUserManagementWriteCommand(operationContext(),
+ "dropUser",
+ "test",
+ BSON("dropUser"
+ << "test"
+ << "writeConcern"
+ << BSON("w" << 1 << "wtimeout"
+ << 30)),
+ &responseBuilder);
+ ASSERT_FALSE(ok);
+
+ Status commandStatus = getStatusFromCommandResult(responseBuilder.obj());
+ ASSERT_EQUALS(ErrorCodes::UserNotFound, commandStatus);
+ });
+
+ onCommand([](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS("test", request.dbname);
+ ASSERT_BSONOBJ_EQ(BSON("dropUser"
+ << "test"
+ << "writeConcern"
+ << BSON("w"
+ << "majority"
+ << "wtimeout"
+ << 30)
+ << "maxTimeMS"
+ << 30000),
+ request.cmdObj);
+
+ ASSERT_BSONOBJ_EQ(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
+
+ BSONObjBuilder responseBuilder;
+ Command::appendCommandStatus(responseBuilder,
+ Status(ErrorCodes::UserNotFound, "User test@test not found"));
+ return responseBuilder.obj();
+ });
+
+ // Now wait for the runUserManagementWriteCommand call to return
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, RunUserManagementWriteCommandNotMaster) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ auto future = launchAsync([this] {
+ BSONObjBuilder responseBuilder;
+ bool ok = catalogClient()->runUserManagementWriteCommand(operationContext(),
+ "dropUser",
+ "test",
+ BSON("dropUser"
+ << "test"),
+ &responseBuilder);
+ ASSERT_FALSE(ok);
+
+ Status commandStatus = getStatusFromCommandResult(responseBuilder.obj());
+ ASSERT_EQUALS(ErrorCodes::NotMaster, commandStatus);
+ });
+
+ for (int i = 0; i < 3; ++i) {
+ onCommand([](const RemoteCommandRequest& request) {
+ BSONObjBuilder responseBuilder;
+ Command::appendCommandStatus(responseBuilder,
+ Status(ErrorCodes::NotMaster, "not master"));
+ return responseBuilder.obj();
+ });
+ }
+
+ // Now wait for the runUserManagementWriteCommand call to return
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, RunUserManagementWriteCommandNotMasterRetrySuccess) {
+ HostAndPort host1("TestHost1");
+ HostAndPort host2("TestHost2");
+
+ configTargeter()->setFindHostReturnValue(host1);
+
+ auto future = launchAsync([this] {
+ BSONObjBuilder responseBuilder;
+ bool ok = catalogClient()->runUserManagementWriteCommand(operationContext(),
+ "dropUser",
+ "test",
+ BSON("dropUser"
+ << "test"),
+ &responseBuilder);
+ ASSERT_TRUE(ok);
+
+ Status commandStatus = getStatusFromCommandResult(responseBuilder.obj());
+ ASSERT_OK(commandStatus);
+ });
+
+ onCommand([&](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(host1, request.target);
+
+ BSONObjBuilder responseBuilder;
+ Command::appendCommandStatus(responseBuilder, Status(ErrorCodes::NotMaster, "not master"));
+
+ // Ensure that when the catalog manager tries to retarget after getting the
+ // NotMaster response, it will get back a new target.
+ configTargeter()->setFindHostReturnValue(host2);
+ return responseBuilder.obj();
+ });
+
+ onCommand([host2](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(host2, request.target);
+ ASSERT_EQUALS("test", request.dbname);
+ // Since no write concern was sent we will add w:majority
+ ASSERT_BSONOBJ_EQ(BSON("dropUser"
+ << "test"
+ << "writeConcern"
+ << BSON("w"
+ << "majority"
+ << "wtimeout"
+ << 0)
+ << "maxTimeMS"
+ << 30000),
+ request.cmdObj);
+
+ ASSERT_BSONOBJ_EQ(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
+
+ return BSON("ok" << 1);
+ });
+
+ // Now wait for the runUserManagementWriteCommand call to return
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, GetCollectionsValidResultsNoDb) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ CollectionType coll1;
+ coll1.setNs(NamespaceString{"test.system.indexes"});
+ coll1.setUpdatedAt(network()->now());
+ coll1.setUnique(true);
+ coll1.setEpoch(OID::gen());
+ coll1.setKeyPattern(KeyPattern{BSON("_id" << 1)});
+ ASSERT_OK(coll1.validate());
+
+ CollectionType coll2;
+ coll2.setNs(NamespaceString{"test.coll1"});
+ coll2.setUpdatedAt(network()->now());
+ coll2.setUnique(false);
+ coll2.setEpoch(OID::gen());
+ coll2.setKeyPattern(KeyPattern{BSON("_id" << 1)});
+ ASSERT_OK(coll2.validate());
+
+ CollectionType coll3;
+ coll3.setNs(NamespaceString{"anotherdb.coll1"});
+ coll3.setUpdatedAt(network()->now());
+ coll3.setUnique(false);
+ coll3.setEpoch(OID::gen());
+ coll3.setKeyPattern(KeyPattern{BSON("_id" << 1)});
+ ASSERT_OK(coll3.validate());
+
+ const OpTime newOpTime(Timestamp(7, 6), 5);
+
+ auto future = launchAsync([this, newOpTime] {
+ vector<CollectionType> collections;
+
+ OpTime opTime;
+ const auto status =
+ catalogClient()->getCollections(operationContext(), nullptr, &collections, &opTime);
+
+ ASSERT_OK(status);
+ ASSERT_EQ(newOpTime, opTime);
+
+ return collections;
+ });
+
+ onFindWithMetadataCommand(
+ [this, coll1, coll2, coll3, newOpTime](const RemoteCommandRequest& request) {
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ ASSERT_EQ(nss.ns(), CollectionType::ConfigNS);
+
+ auto query = assertGet(QueryRequest::makeFromFindCommand(nss, request.cmdObj, false));
+
+ ASSERT_EQ(query->ns(), CollectionType::ConfigNS);
+ ASSERT_BSONOBJ_EQ(query->getFilter(), BSONObj());
+ ASSERT_BSONOBJ_EQ(query->getSort(), BSONObj());
+
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+
+ ReplSetMetadata metadata(10, newOpTime, newOpTime, 100, OID(), 30, -1);
+ BSONObjBuilder builder;
+ metadata.writeToMetadata(&builder);
+
+ return std::make_tuple(vector<BSONObj>{coll1.toBSON(), coll2.toBSON(), coll3.toBSON()},
+ builder.obj());
+ });
+
+ const auto& actualColls = future.timed_get(kFutureTimeout);
+ ASSERT_EQ(3U, actualColls.size());
+ ASSERT_BSONOBJ_EQ(coll1.toBSON(), actualColls[0].toBSON());
+ ASSERT_BSONOBJ_EQ(coll2.toBSON(), actualColls[1].toBSON());
+ ASSERT_BSONOBJ_EQ(coll3.toBSON(), actualColls[2].toBSON());
+}
+
+TEST_F(ShardingCatalogClientTest, GetCollectionsValidResultsWithDb) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ CollectionType coll1;
+ coll1.setNs(NamespaceString{"test.system.indexes"});
+ coll1.setUpdatedAt(network()->now());
+ coll1.setUnique(true);
+ coll1.setEpoch(OID::gen());
+ coll1.setKeyPattern(KeyPattern{BSON("_id" << 1)});
+
+ CollectionType coll2;
+ coll2.setNs(NamespaceString{"test.coll1"});
+ coll2.setUpdatedAt(network()->now());
+ coll2.setUnique(false);
+ coll2.setEpoch(OID::gen());
+ coll2.setKeyPattern(KeyPattern{BSON("_id" << 1)});
+
+ auto future = launchAsync([this] {
+ string dbName = "test";
+ vector<CollectionType> collections;
+
+ const auto status =
+ catalogClient()->getCollections(operationContext(), &dbName, &collections, nullptr);
+
+ ASSERT_OK(status);
+ return collections;
+ });
+
+ onFindCommand([this, coll1, coll2](const RemoteCommandRequest& request) {
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ ASSERT_EQ(nss.ns(), CollectionType::ConfigNS);
+
+ auto query = assertGet(QueryRequest::makeFromFindCommand(nss, request.cmdObj, false));
+
+ ASSERT_EQ(query->ns(), CollectionType::ConfigNS);
+ {
+ BSONObjBuilder b;
+ b.appendRegex(CollectionType::fullNs(), "^test\\.");
+ ASSERT_BSONOBJ_EQ(query->getFilter(), b.obj());
+ }
+
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+
+ return vector<BSONObj>{coll1.toBSON(), coll2.toBSON()};
+ });
+
+ const auto& actualColls = future.timed_get(kFutureTimeout);
+ ASSERT_EQ(2U, actualColls.size());
+ ASSERT_BSONOBJ_EQ(coll1.toBSON(), actualColls[0].toBSON());
+ ASSERT_BSONOBJ_EQ(coll2.toBSON(), actualColls[1].toBSON());
+}
+
+TEST_F(ShardingCatalogClientTest, GetCollectionsInvalidCollectionType) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ auto future = launchAsync([this] {
+ string dbName = "test";
+ vector<CollectionType> collections;
+
+ const auto status =
+ catalogClient()->getCollections(operationContext(), &dbName, &collections, nullptr);
+
+ ASSERT_EQ(ErrorCodes::FailedToParse, status);
+ ASSERT_EQ(0U, collections.size());
+ });
+
+ CollectionType validColl;
+ validColl.setNs(NamespaceString{"test.system.indexes"});
+ validColl.setUpdatedAt(network()->now());
+ validColl.setUnique(true);
+ validColl.setEpoch(OID::gen());
+ validColl.setKeyPattern(KeyPattern{BSON("_id" << 1)});
+ ASSERT_OK(validColl.validate());
+
+ onFindCommand([this, validColl](const RemoteCommandRequest& request) {
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ ASSERT_EQ(nss.ns(), CollectionType::ConfigNS);
+
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ auto query = assertGet(QueryRequest::makeFromFindCommand(nss, request.cmdObj, false));
+
+ ASSERT_EQ(query->ns(), CollectionType::ConfigNS);
+ {
+ BSONObjBuilder b;
+ b.appendRegex(CollectionType::fullNs(), "^test\\.");
+ ASSERT_BSONOBJ_EQ(query->getFilter(), b.obj());
+ }
+
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+
+ return vector<BSONObj>{
+ validColl.toBSON(),
+ BSONObj() // empty document is invalid
+ };
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, GetDatabasesForShardValid) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ DatabaseType dbt1;
+ dbt1.setName("db1");
+ dbt1.setPrimary(ShardId("shard0000"));
+
+ DatabaseType dbt2;
+ dbt2.setName("db2");
+ dbt2.setPrimary(ShardId("shard0000"));
+
+ auto future = launchAsync([this] {
+ vector<string> dbs;
+ const auto status =
+ catalogClient()->getDatabasesForShard(operationContext(), ShardId("shard0000"), &dbs);
+
+ ASSERT_OK(status);
+ return dbs;
+ });
+
+ onFindCommand([this, dbt1, dbt2](const RemoteCommandRequest& request) {
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ ASSERT_EQ(nss.ns(), DatabaseType::ConfigNS);
+
+ auto query = assertGet(QueryRequest::makeFromFindCommand(nss, request.cmdObj, false));
+
+ ASSERT_EQ(query->ns(), DatabaseType::ConfigNS);
+ ASSERT_BSONOBJ_EQ(query->getFilter(),
+ BSON(DatabaseType::primary(dbt1.getPrimary().toString())));
+ ASSERT_BSONOBJ_EQ(query->getSort(), BSONObj());
+
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+
+ return vector<BSONObj>{dbt1.toBSON(), dbt2.toBSON()};
+ });
+
+ const auto& actualDbNames = future.timed_get(kFutureTimeout);
+ ASSERT_EQ(2U, actualDbNames.size());
+ ASSERT_EQ(dbt1.getName(), actualDbNames[0]);
+ ASSERT_EQ(dbt2.getName(), actualDbNames[1]);
+}
+
+TEST_F(ShardingCatalogClientTest, GetDatabasesForShardInvalidDoc) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ auto future = launchAsync([this] {
+ vector<string> dbs;
+ const auto status =
+ catalogClient()->getDatabasesForShard(operationContext(), ShardId("shard0000"), &dbs);
+
+ ASSERT_EQ(ErrorCodes::TypeMismatch, status);
+ ASSERT_EQ(0U, dbs.size());
+ });
+
+ onFindCommand([](const RemoteCommandRequest& request) {
+ DatabaseType dbt1;
+ dbt1.setName("db1");
+ dbt1.setPrimary(ShardId("shard0000"));
+
+ return vector<BSONObj>{
+ dbt1.toBSON(),
+ BSON(DatabaseType::name() << 0) // DatabaseType::name() should be a string
+ };
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, GetTagsForCollection) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ TagsType tagA;
+ tagA.setNS("TestDB.TestColl");
+ tagA.setTag("TagA");
+ tagA.setMinKey(BSON("a" << 100));
+ tagA.setMaxKey(BSON("a" << 200));
+
+ TagsType tagB;
+ tagB.setNS("TestDB.TestColl");
+ tagB.setTag("TagB");
+ tagB.setMinKey(BSON("a" << 200));
+ tagB.setMaxKey(BSON("a" << 300));
+
+ auto future = launchAsync([this] {
+ vector<TagsType> tags;
+
+ ASSERT_OK(
+ catalogClient()->getTagsForCollection(operationContext(), "TestDB.TestColl", &tags));
+ ASSERT_EQ(2U, tags.size());
+
+ return tags;
+ });
+
+ onFindCommand([this, tagA, tagB](const RemoteCommandRequest& request) {
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ ASSERT_EQ(nss.ns(), TagsType::ConfigNS);
+
+ auto query = assertGet(QueryRequest::makeFromFindCommand(nss, request.cmdObj, false));
+
+ ASSERT_EQ(query->ns(), TagsType::ConfigNS);
+ ASSERT_BSONOBJ_EQ(query->getFilter(), BSON(TagsType::ns("TestDB.TestColl")));
+ ASSERT_BSONOBJ_EQ(query->getSort(), BSON(TagsType::min() << 1));
+
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+
+ return vector<BSONObj>{tagA.toBSON(), tagB.toBSON()};
+ });
+
+ const auto& tags = future.timed_get(kFutureTimeout);
+ ASSERT_BSONOBJ_EQ(tagA.toBSON(), tags[0].toBSON());
+ ASSERT_BSONOBJ_EQ(tagB.toBSON(), tags[1].toBSON());
+}
+
+TEST_F(ShardingCatalogClientTest, GetTagsForCollectionNoTags) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ auto future = launchAsync([this] {
+ vector<TagsType> tags;
+
+ ASSERT_OK(
+ catalogClient()->getTagsForCollection(operationContext(), "TestDB.TestColl", &tags));
+ ASSERT_EQ(0U, tags.size());
+
+ return tags;
+ });
+
+ onFindCommand([](const RemoteCommandRequest& request) { return vector<BSONObj>{}; });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, GetTagsForCollectionInvalidTag) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ auto future = launchAsync([this] {
+ vector<TagsType> tags;
+ Status status =
+ catalogClient()->getTagsForCollection(operationContext(), "TestDB.TestColl", &tags);
+
+ ASSERT_EQUALS(ErrorCodes::FailedToParse, status);
+ ASSERT_EQ(0U, tags.size());
+ });
+
+ onFindCommand([](const RemoteCommandRequest& request) {
+ TagsType tagA;
+ tagA.setNS("TestDB.TestColl");
+ tagA.setTag("TagA");
+ tagA.setMinKey(BSON("a" << 100));
+ tagA.setMaxKey(BSON("a" << 200));
+
+ TagsType tagB;
+ tagB.setNS("TestDB.TestColl");
+ tagB.setTag("TagB");
+ tagB.setMinKey(BSON("a" << 200));
+ // Missing maxKey
+
+ return vector<BSONObj>{tagA.toBSON(), tagB.toBSON()};
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, GetTagForChunkOneTagFound) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ ChunkType chunk;
+ chunk.setNS("test.coll");
+ chunk.setMin(BSON("a" << 1));
+ chunk.setMax(BSON("a" << 100));
+ chunk.setVersion({1, 2, OID::gen()});
+ chunk.setShard(ShardId("shard0000"));
+ ASSERT_OK(chunk.validate());
+
+ auto future = launchAsync([this, chunk] {
+ return assertGet(catalogClient()->getTagForChunk(operationContext(), "test.coll", chunk));
+ });
+
+ onFindCommand([this, chunk](const RemoteCommandRequest& request) {
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ ASSERT_EQ(nss.ns(), TagsType::ConfigNS);
+
+ auto query = assertGet(QueryRequest::makeFromFindCommand(nss, request.cmdObj, false));
+
+ ASSERT_EQ(query->ns(), TagsType::ConfigNS);
+ ASSERT_BSONOBJ_EQ(query->getFilter(),
+ BSON(TagsType::ns(chunk.getNS()) << TagsType::min()
+ << BSON("$lte" << chunk.getMin())
+ << TagsType::max()
+ << BSON("$gte" << chunk.getMax())));
+
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+
+ TagsType tt;
+ tt.setNS("test.coll");
+ tt.setTag("tag");
+ tt.setMinKey(BSON("a" << 1));
+ tt.setMaxKey(BSON("a" << 100));
+
+ return vector<BSONObj>{tt.toBSON()};
+ });
+
+ const string& tagStr = future.timed_get(kFutureTimeout);
+ ASSERT_EQ("tag", tagStr);
+}
+
+TEST_F(ShardingCatalogClientTest, GetTagForChunkNoTagFound) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ ChunkType chunk;
+ chunk.setNS("test.coll");
+ chunk.setMin(BSON("a" << 1));
+ chunk.setMax(BSON("a" << 100));
+ chunk.setVersion({1, 2, OID::gen()});
+ chunk.setShard(ShardId("shard0000"));
+ ASSERT_OK(chunk.validate());
+
+ auto future = launchAsync([this, chunk] {
+ return assertGet(catalogClient()->getTagForChunk(operationContext(), "test.coll", chunk));
+ });
+
+ onFindCommand([this, chunk](const RemoteCommandRequest& request) {
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ ASSERT_EQ(nss.ns(), TagsType::ConfigNS);
+
+ auto query = assertGet(QueryRequest::makeFromFindCommand(nss, request.cmdObj, false));
+
+ ASSERT_EQ(query->ns(), TagsType::ConfigNS);
+ ASSERT_BSONOBJ_EQ(query->getFilter(),
+ BSON(TagsType::ns(chunk.getNS()) << TagsType::min()
+ << BSON("$lte" << chunk.getMin())
+ << TagsType::max()
+ << BSON("$gte" << chunk.getMax())));
+
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+
+ return vector<BSONObj>{};
+ });
+
+ const string& tagStr = future.timed_get(kFutureTimeout);
+ ASSERT_EQ("", tagStr); // empty string returned when tag document not found
+}
+
+TEST_F(ShardingCatalogClientTest, GetTagForChunkInvalidTagDoc) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ ChunkType chunk;
+ chunk.setNS("test.coll");
+ chunk.setMin(BSON("a" << 1));
+ chunk.setMax(BSON("a" << 100));
+ chunk.setVersion({1, 2, OID::gen()});
+ chunk.setShard(ShardId("shard0000"));
+ ASSERT_OK(chunk.validate());
+
+ auto future = launchAsync([this, chunk] {
+ const auto tagResult =
+ catalogClient()->getTagForChunk(operationContext(), "test.coll", chunk);
+
+ ASSERT_EQ(ErrorCodes::FailedToParse, tagResult.getStatus());
+ });
+
+ onFindCommand([this, chunk](const RemoteCommandRequest& request) {
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ ASSERT_EQ(nss.ns(), TagsType::ConfigNS);
+
+ auto query = assertGet(QueryRequest::makeFromFindCommand(nss, request.cmdObj, false));
+
+ ASSERT_EQ(query->ns(), TagsType::ConfigNS);
+ ASSERT_BSONOBJ_EQ(query->getFilter(),
+ BSON(TagsType::ns(chunk.getNS()) << TagsType::min()
+ << BSON("$lte" << chunk.getMin())
+ << TagsType::max()
+ << BSON("$gte" << chunk.getMax())));
+
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+
+ // Return a tag document missing the min key
+ return vector<BSONObj>{BSON(TagsType::ns("test.mycol") << TagsType::tag("tag")
+ << TagsType::max(BSON("a" << 20)))};
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, UpdateDatabase) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ DatabaseType dbt;
+ dbt.setName("test");
+ dbt.setPrimary(ShardId("shard0000"));
+ dbt.setSharded(true);
+
+ auto future = launchAsync([this, dbt] {
+ auto status = catalogClient()->updateDatabase(operationContext(), dbt.getName(), dbt);
+ ASSERT_OK(status);
+ });
+
+ onCommand([dbt](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS("config", request.dbname);
+
+ ASSERT_BSONOBJ_EQ(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
+
+ BatchedUpdateRequest actualBatchedUpdate;
+ std::string errmsg;
+ ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg));
+ ASSERT_EQUALS(DatabaseType::ConfigNS, actualBatchedUpdate.getNS().ns());
+ auto updates = actualBatchedUpdate.getUpdates();
+ ASSERT_EQUALS(1U, updates.size());
+ auto update = updates.front();
+
+ ASSERT_TRUE(update->getUpsert());
+ ASSERT_FALSE(update->getMulti());
+ ASSERT_BSONOBJ_EQ(update->getQuery(), BSON(DatabaseType::name(dbt.getName())));
+ ASSERT_BSONOBJ_EQ(update->getUpdateExpr(), dbt.toBSON());
+
+ BatchedCommandResponse response;
+ response.setOk(true);
+ response.setNModified(1);
+
+ return response.toBSON();
+ });
+
+ // Now wait for the updateDatabase call to return
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, UpdateDatabaseExceededTimeLimit) {
+ HostAndPort host1("TestHost1");
+ configTargeter()->setFindHostReturnValue(host1);
+
+ DatabaseType dbt;
+ dbt.setName("test");
+ dbt.setPrimary(ShardId("shard0001"));
+ dbt.setSharded(false);
+
+ auto future = launchAsync([this, dbt] {
+ auto status = catalogClient()->updateDatabase(operationContext(), dbt.getName(), dbt);
+ ASSERT_EQ(ErrorCodes::ExceededTimeLimit, status);
+ });
+
+ onCommand([host1](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(host1, request.target);
+
+ BatchedCommandResponse response;
+ response.setOk(false);
+ response.setErrCode(ErrorCodes::ExceededTimeLimit);
+ response.setErrMessage("operation timed out");
+
+ return response.toBSON();
+ });
+
+ // Now wait for the updateDatabase call to return
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, ApplyChunkOpsDeprecatedSuccessful) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ BSONArray updateOps = BSON_ARRAY(BSON("update1"
+ << "first update")
+ << BSON("update2"
+ << "second update"));
+ BSONArray preCondition = BSON_ARRAY(BSON("precondition1"
+ << "first precondition")
+ << BSON("precondition2"
+ << "second precondition"));
+ std::string nss = "config.chunks";
+ ChunkVersion lastChunkVersion(0, 0, OID());
+
+ auto future = launchAsync([this, updateOps, preCondition, nss, lastChunkVersion] {
+ auto status =
+ catalogClient()->applyChunkOpsDeprecated(operationContext(),
+ updateOps,
+ preCondition,
+ nss,
+ lastChunkVersion,
+ ShardingCatalogClient::kMajorityWriteConcern,
+ repl::ReadConcernLevel::kMajorityReadConcern);
+ ASSERT_OK(status);
+ });
+
+ onCommand([updateOps, preCondition, nss](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS("config", request.dbname);
+ ASSERT_BSONOBJ_EQ(BSON("w"
+ << "majority"
+ << "wtimeout"
+ << 15000),
+ request.cmdObj["writeConcern"].Obj());
+ ASSERT_BSONOBJ_EQ(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
+ ASSERT_BSONOBJ_EQ(updateOps, request.cmdObj["applyOps"].Obj());
+ ASSERT_BSONOBJ_EQ(preCondition, request.cmdObj["preCondition"].Obj());
+
+ return BSON("ok" << 1);
+ });
+
+ // Now wait for the applyChunkOpsDeprecated call to return
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, ApplyChunkOpsDeprecatedSuccessfulWithCheck) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ BSONArray updateOps = BSON_ARRAY(BSON("update1"
+ << "first update")
+ << BSON("update2"
+ << "second update"));
+ BSONArray preCondition = BSON_ARRAY(BSON("precondition1"
+ << "first precondition")
+ << BSON("precondition2"
+ << "second precondition"));
+ std::string nss = "config.chunks";
+ ChunkVersion lastChunkVersion(0, 0, OID());
+
+ auto future = launchAsync([this, updateOps, preCondition, nss, lastChunkVersion] {
+ auto status =
+ catalogClient()->applyChunkOpsDeprecated(operationContext(),
+ updateOps,
+ preCondition,
+ nss,
+ lastChunkVersion,
+ ShardingCatalogClient::kMajorityWriteConcern,
+ repl::ReadConcernLevel::kMajorityReadConcern);
+ ASSERT_OK(status);
+ });
+
+ onCommand([&](const RemoteCommandRequest& request) {
+ BSONObjBuilder responseBuilder;
+ Command::appendCommandStatus(responseBuilder,
+ Status(ErrorCodes::DuplicateKey, "precondition failed"));
+ return responseBuilder.obj();
+ });
+
+ onFindCommand([this](const RemoteCommandRequest& request) {
+ OID oid = OID::gen();
+ ChunkType chunk;
+ chunk.setNS("TestDB.TestColl");
+ chunk.setMin(BSON("a" << 1));
+ chunk.setMax(BSON("a" << 100));
+ chunk.setVersion({1, 2, oid});
+ chunk.setShard(ShardId("shard0000"));
+ return vector<BSONObj>{chunk.toBSON()};
+ });
+
+ // Now wait for the applyChunkOpsDeprecated call to return
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, ApplyChunkOpsDeprecatedFailedWithCheck) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ BSONArray updateOps = BSON_ARRAY(BSON("update1"
+ << "first update")
+ << BSON("update2"
+ << "second update"));
+ BSONArray preCondition = BSON_ARRAY(BSON("precondition1"
+ << "first precondition")
+ << BSON("precondition2"
+ << "second precondition"));
+ std::string nss = "config.chunks";
+ ChunkVersion lastChunkVersion(0, 0, OID());
+
+ auto future = launchAsync([this, updateOps, preCondition, nss, lastChunkVersion] {
+ auto status =
+ catalogClient()->applyChunkOpsDeprecated(operationContext(),
+ updateOps,
+ preCondition,
+ nss,
+ lastChunkVersion,
+ ShardingCatalogClient::kMajorityWriteConcern,
+ repl::ReadConcernLevel::kMajorityReadConcern);
+ ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, status);
+ });
+
+ onCommand([&](const RemoteCommandRequest& request) {
+ BSONObjBuilder responseBuilder;
+ Command::appendCommandStatus(responseBuilder,
+ Status(ErrorCodes::NoMatchingDocument, "some error"));
+ return responseBuilder.obj();
+ });
+
+ onFindCommand([this](const RemoteCommandRequest& request) { return vector<BSONObj>{}; });
+
+ // Now wait for the applyChunkOpsDeprecated call to return
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, createDatabaseSuccess) {
+ const string dbname = "databaseToCreate";
+ const HostAndPort configHost("TestHost1");
+ configTargeter()->setFindHostReturnValue(configHost);
+
+ ShardType s0;
+ s0.setName("shard0000");
+ s0.setHost("ShardHost0:27017");
+
+ ShardType s1;
+ s1.setName("shard0001");
+ s1.setHost("ShardHost1:27017");
+
+ ShardType s2;
+ s2.setName("shard0002");
+ s2.setHost("ShardHost2:27017");
+
+ // Prime the shard registry with information about the existing shards
+ auto future = launchAsync([this] { shardRegistry()->reload(operationContext()); });
+
+ onFindCommand([&](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(configHost, request.target);
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ auto query = assertGet(QueryRequest::makeFromFindCommand(nss, request.cmdObj, false));
+
+ ASSERT_EQ(ShardType::ConfigNS, query->ns());
+ ASSERT_BSONOBJ_EQ(BSONObj(), query->getFilter());
+ ASSERT_BSONOBJ_EQ(BSONObj(), query->getSort());
+ ASSERT_FALSE(query->getLimit().is_initialized());
+
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+
+ return vector<BSONObj>{s0.toBSON(), s1.toBSON(), s2.toBSON()};
+ });
+
+ future.timed_get(kFutureTimeout);
+
+ // Set up all the target mocks return values.
+ RemoteCommandTargeterMock::get(
+ uassertStatusOK(shardRegistry()->getShard(operationContext(), s0.getName()))->getTargeter())
+ ->setFindHostReturnValue(HostAndPort(s0.getHost()));
+ RemoteCommandTargeterMock::get(
+ uassertStatusOK(shardRegistry()->getShard(operationContext(), s1.getName()))->getTargeter())
+ ->setFindHostReturnValue(HostAndPort(s1.getHost()));
+ RemoteCommandTargeterMock::get(
+ uassertStatusOK(shardRegistry()->getShard(operationContext(), s2.getName()))->getTargeter())
+ ->setFindHostReturnValue(HostAndPort(s2.getHost()));
+
+ // Now actually start the createDatabase work.
+
+ distLock()->expectLock(
+ [dbname](StringData name, StringData whyMessage, Milliseconds waitFor) {}, Status::OK());
+
+
+ future = launchAsync([this, dbname] {
+ Status status = catalogClient()->createDatabase(operationContext(), dbname);
+ ASSERT_OK(status);
+ });
+
+ // Report no databases with the same name already exist
+ onFindCommand([&](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(configHost, request.target);
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+ ASSERT_EQ(DatabaseType::ConfigNS, nss.ns());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+ return vector<BSONObj>{};
+ });
+
+ // Return size information about first shard
+ onCommand([&](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(s0.getHost(), request.target.toString());
+ ASSERT_EQUALS("admin", request.dbname);
+ string cmdName = request.cmdObj.firstElement().fieldName();
+ ASSERT_EQUALS("listDatabases", cmdName);
+ ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName));
+
+ ASSERT_BSONOBJ_EQ(rpc::ServerSelectionMetadata(true, boost::none).toBSON(),
+ request.metadata);
+
+ return BSON("ok" << 1 << "totalSize" << 10);
+ });
+
+ // Return size information about second shard
+ onCommand([&](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(s1.getHost(), request.target.toString());
+ ASSERT_EQUALS("admin", request.dbname);
+ string cmdName = request.cmdObj.firstElement().fieldName();
+ ASSERT_EQUALS("listDatabases", cmdName);
+ ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName));
+
+ ASSERT_BSONOBJ_EQ(rpc::ServerSelectionMetadata(true, boost::none).toBSON(),
+ request.metadata);
+
+ return BSON("ok" << 1 << "totalSize" << 1);
+ });
+
+ // Return size information about third shard
+ onCommand([&](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(s2.getHost(), request.target.toString());
+ ASSERT_EQUALS("admin", request.dbname);
+ string cmdName = request.cmdObj.firstElement().fieldName();
+ ASSERT_EQUALS("listDatabases", cmdName);
+
+ ASSERT_BSONOBJ_EQ(rpc::ServerSelectionMetadata(true, boost::none).toBSON(),
+ request.metadata);
+
+ return BSON("ok" << 1 << "totalSize" << 100);
+ });
+
+ // Process insert to config.databases collection
+ onCommand([&](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(configHost, request.target);
+ ASSERT_EQUALS("config", request.dbname);
+
+ ASSERT_BSONOBJ_EQ(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
+
+ BatchedInsertRequest actualBatchedInsert;
+ std::string errmsg;
+ ASSERT_TRUE(actualBatchedInsert.parseBSON(request.dbname, request.cmdObj, &errmsg));
+ ASSERT_EQUALS(DatabaseType::ConfigNS, actualBatchedInsert.getNS().ns());
+ auto inserts = actualBatchedInsert.getDocuments();
+ ASSERT_EQUALS(1U, inserts.size());
+ auto insert = inserts.front();
+
+ DatabaseType expectedDb;
+ expectedDb.setName(dbname);
+ expectedDb.setPrimary(
+ ShardId(s1.getName())); // This is the one we reported with the smallest size
+ expectedDb.setSharded(false);
+
+ ASSERT_BSONOBJ_EQ(expectedDb.toBSON(), insert);
+
+ BatchedCommandResponse response;
+ response.setOk(true);
+ response.setNModified(1);
+
+ return response.toBSON();
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, createDatabaseDistLockHeld) {
+ const string dbname = "databaseToCreate";
+
+
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ distLock()->expectLock(
+ [dbname](StringData name, StringData whyMessage, Milliseconds waitFor) {
+ ASSERT_EQUALS(dbname, name);
+ ASSERT_EQUALS("createDatabase", whyMessage);
+ },
+ Status(ErrorCodes::LockBusy, "lock already held"));
+
+ Status status = catalogClient()->createDatabase(operationContext(), dbname);
+ ASSERT_EQUALS(ErrorCodes::LockBusy, status);
+}
+
+TEST_F(ShardingCatalogClientTest, createDatabaseDBExists) {
+ const string dbname = "databaseToCreate";
+
+
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ distLock()->expectLock(
+ [dbname](StringData name, StringData whyMessage, Milliseconds waitFor) {}, Status::OK());
+
+
+ auto future = launchAsync([this, dbname] {
+ Status status = catalogClient()->createDatabase(operationContext(), dbname);
+ ASSERT_EQUALS(ErrorCodes::NamespaceExists, status);
+ });
+
+ onFindCommand([this, dbname](const RemoteCommandRequest& request) {
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ auto query = assertGet(QueryRequest::makeFromFindCommand(nss, request.cmdObj, false));
+
+ BSONObjBuilder queryBuilder;
+ queryBuilder.appendRegex(
+ DatabaseType::name(), (string) "^" + pcrecpp::RE::QuoteMeta(dbname) + "$", "i");
+
+ ASSERT_EQ(DatabaseType::ConfigNS, query->ns());
+ ASSERT_BSONOBJ_EQ(queryBuilder.obj(), query->getFilter());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+
+ return vector<BSONObj>{BSON("_id" << dbname)};
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, createDatabaseDBExistsDifferentCase) {
+ const string dbname = "databaseToCreate";
+ const string dbnameDiffCase = "databasetocreate";
+
+
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ distLock()->expectLock(
+ [dbname](StringData name, StringData whyMessage, Milliseconds waitFor) {}, Status::OK());
+
+
+ auto future = launchAsync([this, dbname] {
+ Status status = catalogClient()->createDatabase(operationContext(), dbname);
+ ASSERT_EQUALS(ErrorCodes::DatabaseDifferCase, status);
+ });
+
+ onFindCommand([this, dbname, dbnameDiffCase](const RemoteCommandRequest& request) {
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ auto query = assertGet(QueryRequest::makeFromFindCommand(nss, request.cmdObj, false));
+
+ BSONObjBuilder queryBuilder;
+ queryBuilder.appendRegex(
+ DatabaseType::name(), (string) "^" + pcrecpp::RE::QuoteMeta(dbname) + "$", "i");
+
+ ASSERT_EQ(DatabaseType::ConfigNS, query->ns());
+ ASSERT_BSONOBJ_EQ(queryBuilder.obj(), query->getFilter());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+
+ return vector<BSONObj>{BSON("_id" << dbnameDiffCase)};
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, createDatabaseNoShards) {
+ const string dbname = "databaseToCreate";
+
+
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ distLock()->expectLock(
+ [dbname](StringData name, StringData whyMessage, Milliseconds waitFor) {}, Status::OK());
+
+
+ auto future = launchAsync([this, dbname] {
+ Status status = catalogClient()->createDatabase(operationContext(), dbname);
+ ASSERT_EQUALS(ErrorCodes::ShardNotFound, status);
+ });
+
+ // Report no databases with the same name already exist
+ onFindCommand([this, dbname](const RemoteCommandRequest& request) {
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ ASSERT_EQ(DatabaseType::ConfigNS, nss.ns());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+ return vector<BSONObj>{};
+ });
+
+ // Report no shards exist
+ onFindCommand([this](const RemoteCommandRequest& request) {
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ auto query = assertGet(QueryRequest::makeFromFindCommand(nss, request.cmdObj, false));
+
+ ASSERT_EQ(ShardType::ConfigNS, query->ns());
+ ASSERT_BSONOBJ_EQ(BSONObj(), query->getFilter());
+ ASSERT_BSONOBJ_EQ(BSONObj(), query->getSort());
+ ASSERT_FALSE(query->getLimit().is_initialized());
+
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+
+ return vector<BSONObj>{};
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, createDatabaseDuplicateKeyOnInsert) {
+ const string dbname = "databaseToCreate";
+ const HostAndPort configHost("TestHost1");
+ configTargeter()->setFindHostReturnValue(configHost);
+
+ ShardType s0;
+ s0.setName("shard0000");
+ s0.setHost("ShardHost0:27017");
+
+ ShardType s1;
+ s1.setName("shard0001");
+ s1.setHost("ShardHost1:27017");
+
+ ShardType s2;
+ s2.setName("shard0002");
+ s2.setHost("ShardHost2:27017");
+
+ // Prime the shard registry with information about the existing shards
+ auto future = launchAsync([this] { shardRegistry()->reload(operationContext()); });
+
+ onFindCommand([&](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(configHost, request.target);
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ auto query = assertGet(QueryRequest::makeFromFindCommand(nss, request.cmdObj, false));
+
+ ASSERT_EQ(ShardType::ConfigNS, query->ns());
+ ASSERT_BSONOBJ_EQ(BSONObj(), query->getFilter());
+ ASSERT_BSONOBJ_EQ(BSONObj(), query->getSort());
+ ASSERT_FALSE(query->getLimit().is_initialized());
+
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+
+ return vector<BSONObj>{s0.toBSON(), s1.toBSON(), s2.toBSON()};
+ });
+
+ future.timed_get(kFutureTimeout);
+
+ // Set up all the target mocks return values.
+ RemoteCommandTargeterMock::get(
+ uassertStatusOK(shardRegistry()->getShard(operationContext(), s0.getName()))->getTargeter())
+ ->setFindHostReturnValue(HostAndPort(s0.getHost()));
+ RemoteCommandTargeterMock::get(
+ uassertStatusOK(shardRegistry()->getShard(operationContext(), s1.getName()))->getTargeter())
+ ->setFindHostReturnValue(HostAndPort(s1.getHost()));
+ RemoteCommandTargeterMock::get(
+ uassertStatusOK(shardRegistry()->getShard(operationContext(), s2.getName()))->getTargeter())
+ ->setFindHostReturnValue(HostAndPort(s2.getHost()));
+
+ // Now actually start the createDatabase work.
+
+ distLock()->expectLock(
+ [dbname](StringData name, StringData whyMessage, Milliseconds waitFor) {}, Status::OK());
+
+
+ future = launchAsync([this, dbname] {
+ Status status = catalogClient()->createDatabase(operationContext(), dbname);
+ ASSERT_EQUALS(ErrorCodes::NamespaceExists, status);
+ });
+
+ // Report no databases with the same name already exist
+ onFindCommand([&](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(configHost, request.target);
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ ASSERT_EQ(DatabaseType::ConfigNS, nss.ns());
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+ return vector<BSONObj>{};
+ });
+
+ // Return size information about first shard
+ onCommand([&](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(s0.getHost(), request.target.toString());
+ ASSERT_EQUALS("admin", request.dbname);
+ string cmdName = request.cmdObj.firstElement().fieldName();
+ ASSERT_EQUALS("listDatabases", cmdName);
+ ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName));
+
+ ASSERT_BSONOBJ_EQ(rpc::ServerSelectionMetadata(true, boost::none).toBSON(),
+ request.metadata);
+
+ return BSON("ok" << 1 << "totalSize" << 10);
+ });
+
+ // Return size information about second shard
+ onCommand([&](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(s1.getHost(), request.target.toString());
+ ASSERT_EQUALS("admin", request.dbname);
+ string cmdName = request.cmdObj.firstElement().fieldName();
+ ASSERT_EQUALS("listDatabases", cmdName);
+ ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName));
+
+ ASSERT_BSONOBJ_EQ(rpc::ServerSelectionMetadata(true, boost::none).toBSON(),
+ request.metadata);
+
+ return BSON("ok" << 1 << "totalSize" << 1);
+ });
+
+ // Return size information about third shard
+ onCommand([&](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(s2.getHost(), request.target.toString());
+ ASSERT_EQUALS("admin", request.dbname);
+ string cmdName = request.cmdObj.firstElement().fieldName();
+ ASSERT_EQUALS("listDatabases", cmdName);
+ ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName));
+
+ ASSERT_BSONOBJ_EQ(rpc::ServerSelectionMetadata(true, boost::none).toBSON(),
+ request.metadata);
+
+ return BSON("ok" << 1 << "totalSize" << 100);
+ });
+
+ // Process insert to config.databases collection
+ onCommand([&](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(configHost, request.target);
+ ASSERT_EQUALS("config", request.dbname);
+ ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName));
+
+ ASSERT_BSONOBJ_EQ(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
+
+ BatchedInsertRequest actualBatchedInsert;
+ std::string errmsg;
+ ASSERT_TRUE(actualBatchedInsert.parseBSON(request.dbname, request.cmdObj, &errmsg));
+ ASSERT_EQUALS(DatabaseType::ConfigNS, actualBatchedInsert.getNS().ns());
+ auto inserts = actualBatchedInsert.getDocuments();
+ ASSERT_EQUALS(1U, inserts.size());
+ auto insert = inserts.front();
+
+ DatabaseType expectedDb;
+ expectedDb.setName(dbname);
+ expectedDb.setPrimary(
+ ShardId(s1.getName())); // This is the one we reported with the smallest size
+ expectedDb.setSharded(false);
+
+ ASSERT_BSONOBJ_EQ(expectedDb.toBSON(), insert);
+
+ BatchedCommandResponse response;
+ response.setOk(false);
+ response.setErrCode(ErrorCodes::DuplicateKey);
+ response.setErrMessage("duplicate key");
+
+ return response.toBSON();
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, EnableShardingNoDBExists) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
+
+ vector<ShardType> shards;
+ ShardType shard;
+ shard.setName("shard0");
+ shard.setHost("shard0:12");
+
+ setupShards(vector<ShardType>{shard});
+
+ auto shardTargeter = RemoteCommandTargeterMock::get(
+ uassertStatusOK(shardRegistry()->getShard(operationContext(), ShardId("shard0")))
+ ->getTargeter());
+ shardTargeter->setFindHostReturnValue(HostAndPort("shard0:12"));
+
+ distLock()->expectLock(
+ [](StringData name, StringData whyMessage, Milliseconds) {
+ ASSERT_EQ("test", name);
+ ASSERT_FALSE(whyMessage.empty());
+ },
+ Status::OK());
+
+ auto future = launchAsync([this] {
+ auto status = catalogClient()->enableSharding(operationContext(), "test");
+ ASSERT_OK(status);
+ });
+
+ // Query to find if db already exists in config.
+ onFindCommand([this](const RemoteCommandRequest& request) {
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ ASSERT_EQ(DatabaseType::ConfigNS, nss.toString());
+
+ auto queryResult = QueryRequest::makeFromFindCommand(nss, request.cmdObj, false);
+ ASSERT_OK(queryResult.getStatus());
+
+ const auto& query = queryResult.getValue();
+ BSONObj expectedQuery(fromjson(R"({ _id: { $regex: "^test$", $options: "i" }})"));
+
+ ASSERT_EQ(DatabaseType::ConfigNS, query->ns());
+ ASSERT_BSONOBJ_EQ(expectedQuery, query->getFilter());
+ ASSERT_BSONOBJ_EQ(BSONObj(), query->getSort());
+ ASSERT_EQ(1, query->getLimit().get());
+
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+
+ return vector<BSONObj>{};
+ });
+
+ // list databases for checking shard size.
+ onCommand([](const RemoteCommandRequest& request) {
+ ASSERT_EQ(HostAndPort("shard0:12"), request.target);
+ ASSERT_EQ("admin", request.dbname);
+ ASSERT_BSONOBJ_EQ(BSON("listDatabases" << 1), request.cmdObj);
+
+ ASSERT_BSONOBJ_EQ(rpc::ServerSelectionMetadata(true, boost::none).toBSON(),
+ request.metadata);
+
+ return fromjson(R"({
+ databases: [],
+ totalSize: 1,
+ ok: 1
+ })");
+ });
+
+ onCommand([](const RemoteCommandRequest& request) {
+ ASSERT_EQ(HostAndPort("config:123"), request.target);
+ ASSERT_EQ("config", request.dbname);
+
+ ASSERT_BSONOBJ_EQ(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
+
+ BSONObj expectedCmd(fromjson(R"({
+ update: "databases",
+ updates: [{
+ q: { _id: "test" },
+ u: { _id: "test", primary: "shard0", partitioned: true },
+ multi: false,
+ upsert: true
+ }],
+ writeConcern: { w: "majority", wtimeout: 15000 },
+ maxTimeMS: 30000
+ })"));
+
+ ASSERT_BSONOBJ_EQ(expectedCmd, request.cmdObj);
+
+ return fromjson(R"({
+ nModified: 0,
+ n: 1,
+ upserted: [
+ { _id: "test", primary: "shard0", partitioned: true }
+ ],
+ ok: 1
+ })");
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, EnableShardingLockBusy) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
+
+ distLock()->expectLock([](StringData, StringData, Milliseconds) {},
+ {ErrorCodes::LockBusy, "lock taken"});
+
+ auto status = catalogClient()->enableSharding(operationContext(), "test");
+ ASSERT_EQ(ErrorCodes::LockBusy, status.code());
+}
+
+TEST_F(ShardingCatalogClientTest, EnableShardingDBExistsWithDifferentCase) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
+
+ vector<ShardType> shards;
+ ShardType shard;
+ shard.setName("shard0");
+ shard.setHost("shard0:12");
+
+ setupShards(vector<ShardType>{shard});
+
+ distLock()->expectLock([](StringData, StringData, Milliseconds) {}, Status::OK());
+
+ auto future = launchAsync([this] {
+ auto status = catalogClient()->enableSharding(operationContext(), "test");
+ ASSERT_EQ(ErrorCodes::DatabaseDifferCase, status.code());
+ ASSERT_FALSE(status.reason().empty());
+ });
+
+ // Query to find if db already exists in config.
+ onFindCommand([](const RemoteCommandRequest& request) {
+ BSONObj existingDoc(fromjson(R"({ _id: "Test", primary: "shard0", partitioned: true })"));
+ return vector<BSONObj>{existingDoc};
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, EnableShardingDBExists) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
+
+ vector<ShardType> shards;
+ ShardType shard;
+ shard.setName("shard0");
+ shard.setHost("shard0:12");
+
+ setupShards(vector<ShardType>{shard});
+
+ distLock()->expectLock([](StringData, StringData, Milliseconds) {}, Status::OK());
+
+ auto future = launchAsync([this] {
+ auto status = catalogClient()->enableSharding(operationContext(), "test");
+ ASSERT_OK(status);
+ });
+
+ // Query to find if db already exists in config.
+ onFindCommand([](const RemoteCommandRequest& request) {
+ BSONObj existingDoc(fromjson(R"({ _id: "test", primary: "shard2", partitioned: false })"));
+ return vector<BSONObj>{existingDoc};
+ });
+
+ onCommand([](const RemoteCommandRequest& request) {
+ ASSERT_EQ(HostAndPort("config:123"), request.target);
+ ASSERT_EQ("config", request.dbname);
+
+ ASSERT_BSONOBJ_EQ(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
+
+ BSONObj expectedCmd(fromjson(R"({
+ update: "databases",
+ updates: [{
+ q: { _id: "test" },
+ u: { _id: "test", primary: "shard2", partitioned: true },
+ multi: false,
+ upsert: true
+ }],
+ writeConcern: { w: "majority", wtimeout: 15000 },
+ maxTimeMS: 30000
+ })"));
+
+ ASSERT_BSONOBJ_EQ(expectedCmd, request.cmdObj);
+
+ return fromjson(R"({
+ nModified: 0,
+ n: 1,
+ upserted: [
+ { _id: "test", primary: "shard2", partitioned: true }
+ ],
+ ok: 1
+ })");
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, EnableShardingFailsWhenTheDatabaseIsAlreadySharded) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
+
+ vector<ShardType> shards;
+ ShardType shard;
+ shard.setName("shard0");
+ shard.setHost("shard0:12");
+
+ setupShards(vector<ShardType>{shard});
+
+ distLock()->expectLock([](StringData, StringData, Milliseconds) {}, Status::OK());
+
+ auto future = launchAsync([this] {
+ auto status = catalogClient()->enableSharding(operationContext(), "test");
+ ASSERT_EQ(status.code(), ErrorCodes::AlreadyInitialized);
+ });
+
+ // Query to find if db already exists in config and it is sharded.
+ onFindCommand([](const RemoteCommandRequest& request) {
+ BSONObj existingDoc(fromjson(R"({ _id: "test", primary: "shard2", partitioned: true })"));
+ return vector<BSONObj>{existingDoc};
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, EnableShardingDBExistsInvalidFormat) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
+
+ vector<ShardType> shards;
+ ShardType shard;
+ shard.setName("shard0");
+ shard.setHost("shard0:12");
+
+ setupShards(vector<ShardType>{shard});
+
+ distLock()->expectLock([](StringData, StringData, Milliseconds) {}, Status::OK());
+
+ auto future = launchAsync([this] {
+ auto status = catalogClient()->enableSharding(operationContext(), "test");
+ ASSERT_EQ(ErrorCodes::TypeMismatch, status.code());
+ });
+
+ // Query to find if db already exists in config.
+ onFindCommand([](const RemoteCommandRequest& request) {
+ // Bad type for primary field.
+ BSONObj existingDoc(fromjson(R"({ _id: "test", primary: 12, partitioned: false })"));
+ return vector<BSONObj>{existingDoc};
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, EnableShardingNoDBExistsNoShards) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
+
+ distLock()->expectLock([](StringData, StringData, Milliseconds) {}, Status::OK());
+
+ auto future = launchAsync([this] {
+ auto status = catalogClient()->enableSharding(operationContext(), "test");
+ ASSERT_EQ(ErrorCodes::ShardNotFound, status.code());
+ ASSERT_FALSE(status.reason().empty());
+ });
+
+ // Query to find if db already exists in config.
+ onFindCommand([](const RemoteCommandRequest& request) { return vector<BSONObj>{}; });
+
+ // Query for config.shards reload.
+ onFindCommand([](const RemoteCommandRequest& request) { return vector<BSONObj>{}; });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, BasicReadAfterOpTime) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ OpTime lastOpTime;
+ for (int x = 0; x < 3; x++) {
+ auto future = launchAsync([this] {
+ BSONObjBuilder responseBuilder;
+ ASSERT_TRUE(getCatalogClient()->runReadCommandForTest(
+ operationContext(), "test", BSON("dummy" << 1), &responseBuilder));
+ });
+
+ const OpTime newOpTime(Timestamp(x + 2, x + 6), x + 5);
+
+ onCommandWithMetadata([this, &newOpTime, &lastOpTime](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS("test", request.dbname);
+
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
+ checkReadConcern(request.cmdObj, lastOpTime.getTimestamp(), lastOpTime.getTerm());
+
+ ReplSetMetadata metadata(10, newOpTime, newOpTime, 100, OID(), 30, -1);
+ BSONObjBuilder builder;
+ metadata.writeToMetadata(&builder);
+
+ return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1));
+ });
+
+ // Now wait for the runReadCommand call to return
+ future.timed_get(kFutureTimeout);
+
+ lastOpTime = newOpTime;
+ }
+}
+
+TEST_F(ShardingCatalogClientTest, ReadAfterOpTimeShouldNotGoBack) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ // Initialize the internal config OpTime
+ auto future1 = launchAsync([this] {
+ BSONObjBuilder responseBuilder;
+ ASSERT_TRUE(getCatalogClient()->runReadCommandForTest(
+ operationContext(), "test", BSON("dummy" << 1), &responseBuilder));
+ });
+
+ OpTime highestOpTime;
+ const OpTime newOpTime(Timestamp(7, 6), 5);
+
+ onCommandWithMetadata([this, &newOpTime, &highestOpTime](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS("test", request.dbname);
+
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
+ checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
+
+ ReplSetMetadata metadata(10, newOpTime, newOpTime, 100, OID(), 30, -1);
+ BSONObjBuilder builder;
+ metadata.writeToMetadata(&builder);
+
+ return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1));
+ });
+
+ future1.timed_get(kFutureTimeout);
+
+ highestOpTime = newOpTime;
+
+ // Return an older OpTime
+ auto future2 = launchAsync([this] {
+ BSONObjBuilder responseBuilder;
+ ASSERT_TRUE(getCatalogClient()->runReadCommandForTest(
+ operationContext(), "test", BSON("dummy" << 1), &responseBuilder));
+ });
+
+ const OpTime oldOpTime(Timestamp(3, 10), 5);
+
+ onCommandWithMetadata([this, &oldOpTime, &highestOpTime](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS("test", request.dbname);
+
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
+ checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
+
+ ReplSetMetadata metadata(10, oldOpTime, oldOpTime, 100, OID(), 30, -1);
+ BSONObjBuilder builder;
+ metadata.writeToMetadata(&builder);
+
+ return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1));
+ });
+
+ future2.timed_get(kFutureTimeout);
+
+ // Check that older OpTime does not override highest OpTime
+ auto future3 = launchAsync([this] {
+ BSONObjBuilder responseBuilder;
+ ASSERT_TRUE(getCatalogClient()->runReadCommandForTest(
+ operationContext(), "test", BSON("dummy" << 1), &responseBuilder));
+ });
+
+ onCommandWithMetadata([this, &oldOpTime, &highestOpTime](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS("test", request.dbname);
+
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
+ checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
+
+ ReplSetMetadata metadata(10, oldOpTime, oldOpTime, 100, OID(), 30, -1);
+ BSONObjBuilder builder;
+ metadata.writeToMetadata(&builder);
+
+ return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1));
+ });
+
+ future3.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, ReadAfterOpTimeFindThenCmd) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ auto future1 = launchAsync([this] {
+ ASSERT_OK(catalogClient()->getDatabase(operationContext(), "TestDB").getStatus());
+ });
+
+ OpTime highestOpTime;
+ const OpTime newOpTime(Timestamp(7, 6), 5);
+
+ onFindWithMetadataCommand(
+ [this, &newOpTime, &highestOpTime](const RemoteCommandRequest& request) {
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+ checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
+
+ ReplSetMetadata metadata(10, newOpTime, newOpTime, 100, OID(), 30, -1);
+ BSONObjBuilder builder;
+ metadata.writeToMetadata(&builder);
+
+ DatabaseType dbType;
+ dbType.setName("TestDB");
+ dbType.setPrimary(ShardId("TestShard"));
+ dbType.setSharded("true");
+
+ return std::make_tuple(vector<BSONObj>{dbType.toBSON()}, builder.obj());
+ });
+
+ future1.timed_get(kFutureTimeout);
+
+ highestOpTime = newOpTime;
+
+ // Return an older OpTime
+ auto future2 = launchAsync([this] {
+ BSONObjBuilder responseBuilder;
+ ASSERT_TRUE(getCatalogClient()->runReadCommandForTest(
+ operationContext(), "test", BSON("dummy" << 1), &responseBuilder));
+ });
+
+ const OpTime oldOpTime(Timestamp(3, 10), 5);
+
+ onCommand([this, &oldOpTime, &highestOpTime](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS("test", request.dbname);
+
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
+ checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
+
+ return BSON("ok" << 1);
+ });
+
+ future2.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, ReadAfterOpTimeCmdThenFind) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ // Initialize the internal config OpTime
+ auto future1 = launchAsync([this] {
+ BSONObjBuilder responseBuilder;
+ ASSERT_TRUE(getCatalogClient()->runReadCommandForTest(
+ operationContext(), "test", BSON("dummy" << 1), &responseBuilder));
+ });
+
+ OpTime highestOpTime;
+ const OpTime newOpTime(Timestamp(7, 6), 5);
+
+ onCommandWithMetadata([this, &newOpTime, &highestOpTime](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS("test", request.dbname);
+
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
+ checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
+
+ ReplSetMetadata metadata(10, newOpTime, newOpTime, 100, OID(), 30, -1);
+ BSONObjBuilder builder;
+ metadata.writeToMetadata(&builder);
+
+ return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1));
+ });
+
+ future1.timed_get(kFutureTimeout);
+
+ highestOpTime = newOpTime;
+
+ // Return an older OpTime
+ auto future2 = launchAsync([this] {
+ ASSERT_OK(catalogClient()->getDatabase(operationContext(), "TestDB").getStatus());
+ });
+
+ const OpTime oldOpTime(Timestamp(3, 10), 5);
+
+ onFindCommand([this, &oldOpTime, &highestOpTime](const RemoteCommandRequest& request) {
+ ASSERT_BSONOBJ_EQ(kReplSecondaryOkMetadata, request.metadata);
+
+ ASSERT_EQ(string("find"), request.cmdObj.firstElementFieldName());
+ checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
+
+ DatabaseType dbType;
+ dbType.setName("TestDB");
+ dbType.setPrimary(ShardId("TestShard"));
+ dbType.setSharded("true");
+
+ return vector<BSONObj>{dbType.toBSON()};
+ });
+
+ future2.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, RetryOnReadCommandNetworkErrorFailsAtMaxRetry) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ auto future1 = launchAsync([this] {
+ BSONObjBuilder responseBuilder;
+ auto ok = getCatalogClient()->runReadCommandForTest(
+ operationContext(), "test", BSON("dummy" << 1), &responseBuilder);
+ ASSERT_FALSE(ok);
+ auto status = getStatusFromCommandResult(responseBuilder.obj());
+ ASSERT_EQ(ErrorCodes::HostUnreachable, status.code());
+ });
+
+ for (int i = 0; i < kMaxCommandRetry; ++i) {
+ onCommand([](const RemoteCommandRequest&) {
+ return Status{ErrorCodes::HostUnreachable, "bad host"};
+ });
+ }
+
+ future1.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, RetryOnReadCommandNetworkErrorSucceedsAtMaxRetry) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ BSONObj expectedResult = BSON("ok" << 1 << "yes"
+ << "dummy");
+
+ auto future1 = launchAsync([this, expectedResult] {
+ BSONObjBuilder responseBuilder;
+ auto ok = getCatalogClient()->runReadCommandForTest(
+ operationContext(), "test", BSON("dummy" << 1), &responseBuilder);
+ ASSERT_TRUE(ok);
+ auto response = responseBuilder.obj();
+ ASSERT_BSONOBJ_EQ(expectedResult, response);
+ });
+
+ for (int i = 0; i < kMaxCommandRetry - 1; ++i) {
+ onCommand([](const RemoteCommandRequest&) {
+ return Status{ErrorCodes::HostUnreachable, "bad host"};
+ });
+ }
+
+ onCommand([expectedResult](const RemoteCommandRequest& request) { return expectedResult; });
+
+ future1.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, RetryOnFindCommandNetworkErrorFailsAtMaxRetry) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ auto future = launchAsync([this] {
+ auto status = catalogClient()->getDatabase(operationContext(), "TestDB");
+ ASSERT_EQ(ErrorCodes::HostUnreachable, status.getStatus().code());
+ });
+
+ for (int i = 0; i < kMaxCommandRetry; ++i) {
+ onFindCommand([](const RemoteCommandRequest&) {
+ return Status{ErrorCodes::HostUnreachable, "bad host"};
+ });
+ }
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ShardingCatalogClientTest, RetryOnFindCommandNetworkErrorSucceedsAtMaxRetry) {
+ configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
+
+ auto future = launchAsync(
+ [&] { ASSERT_OK(catalogClient()->getDatabase(operationContext(), "TestDB").getStatus()); });
+
+ for (int i = 0; i < kMaxCommandRetry - 1; ++i) {
+ onFindCommand([](const RemoteCommandRequest&) {
+ return Status{ErrorCodes::HostUnreachable, "bad host"};
+ });
+ }
+
+ onFindCommand([](const RemoteCommandRequest& request) {
+ DatabaseType dbType;
+ dbType.setName("TestDB");
+ dbType.setPrimary(ShardId("TestShard"));
+ dbType.setSharded("true");
+
+ return vector<BSONObj>{dbType.toBSON()};
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+} // namespace
+} // namespace mongo