/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/bson/json.h"
#include "mongo/db/commands.h"
#include "mongo/db/query/lite_parsed_query.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/task_executor.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
#include "mongo/s/catalog/dist_lock_manager_mock.h"
#include "mongo/s/catalog/replset/catalog_manager_replica_set.h"
#include "mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_database.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/catalog/type_tags.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/s/write_ops/batched_insert_request.h"
#include "mongo/s/write_ops/batched_update_request.h"
#include "mongo/stdx/future.h"
#include "mongo/util/log.h"
#include "mongo/util/time_support.h"
namespace mongo {
namespace {
using executor::NetworkInterfaceMock;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
using executor::TaskExecutor;
using rpc::ReplSetMetadata;
using repl::OpTime;
using std::string;
using std::vector;
using unittest::assertGet;
using CatalogManagerReplSetTest = CatalogManagerReplSetTestFixture;
const int kMaxCommandRetry = 3;
const BSONObj kReplSecondaryOkMetadata{[] {
BSONObjBuilder o;
o.appendElements(rpc::ServerSelectionMetadata(true, boost::none).toBSON());
o.append(rpc::kReplSetMetadataFieldName, 1);
return o.obj();
}()};
TEST_F(CatalogManagerReplSetTest, GetCollectionExisting) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
CollectionType expectedColl;
expectedColl.setNs(NamespaceString("TestDB.TestNS"));
expectedColl.setKeyPattern(BSON("KeyName" << 1));
expectedColl.setUpdatedAt(Date_t());
expectedColl.setEpoch(OID::gen());
const OpTime newOpTime(Timestamp(7, 6), 5);
auto future = launchAsync([this, &expectedColl] {
return assertGet(
catalogManager()->getCollection(operationContext(), expectedColl.getNs().ns()));
});
onFindWithMetadataCommand([this, &expectedColl, newOpTime](
const RemoteCommandRequest& request) {
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), CollectionType::ConfigNS);
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
// Ensure the query is correct
ASSERT_EQ(query->ns(), CollectionType::ConfigNS);
ASSERT_EQ(query->getFilter(), BSON(CollectionType::fullNs(expectedColl.getNs().ns())));
ASSERT_EQ(query->getSort(), BSONObj());
ASSERT_EQ(query->getLimit().get(), 1);
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
ReplSetMetadata metadata(10, OpTime(), newOpTime, 100, OID(), 30, -1);
BSONObjBuilder builder;
metadata.writeToMetadata(&builder);
return std::make_tuple(vector{expectedColl.toBSON()}, builder.obj());
});
// Now wait for the getCollection call to return
const auto collOpTimePair = future.timed_get(kFutureTimeout);
ASSERT_EQ(newOpTime, collOpTimePair.opTime);
ASSERT_EQ(expectedColl.toBSON(), collOpTimePair.value.toBSON());
}
TEST_F(CatalogManagerReplSetTest, GetCollectionNotExisting) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
auto future = launchAsync([this] {
auto status = catalogManager()->getCollection(operationContext(), "NonExistent");
ASSERT_EQUALS(status.getStatus(), ErrorCodes::NamespaceNotFound);
});
onFindCommand([](const RemoteCommandRequest& request) { return vector{}; });
// Now wait for the getCollection call to return
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, GetDatabaseInvalidName) {
auto status = catalogManager()->getDatabase(operationContext(), "b.c").getStatus();
ASSERT_EQ(ErrorCodes::InvalidNamespace, status.code());
ASSERT_FALSE(status.reason().empty());
}
TEST_F(CatalogManagerReplSetTest, GetDatabaseExisting) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
DatabaseType expectedDb;
expectedDb.setName("bigdata");
expectedDb.setPrimary("shard0000");
expectedDb.setSharded(true);
const OpTime newOpTime(Timestamp(7, 6), 5);
auto future = launchAsync([this, &expectedDb] {
return assertGet(catalogManager()->getDatabase(operationContext(), expectedDb.getName()));
});
onFindWithMetadataCommand([this, &expectedDb, newOpTime](const RemoteCommandRequest& request) {
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), DatabaseType::ConfigNS);
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
ASSERT_EQ(query->ns(), DatabaseType::ConfigNS);
ASSERT_EQ(query->getFilter(), BSON(DatabaseType::name(expectedDb.getName())));
ASSERT_EQ(query->getSort(), BSONObj());
ASSERT_EQ(query->getLimit().get(), 1);
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
ReplSetMetadata metadata(10, OpTime(), newOpTime, 100, OID(), 30, -1);
BSONObjBuilder builder;
metadata.writeToMetadata(&builder);
return std::make_tuple(vector{expectedDb.toBSON()}, builder.obj());
});
const auto dbOpTimePair = future.timed_get(kFutureTimeout);
ASSERT_EQ(newOpTime, dbOpTimePair.opTime);
ASSERT_EQ(expectedDb.toBSON(), dbOpTimePair.value.toBSON());
}
TEST_F(CatalogManagerReplSetTest, GetDatabaseStaleSecondaryRetrySuccess) {
HostAndPort firstHost{"TestHost1"};
HostAndPort secondHost{"TestHost2"};
configTargeter()->setFindHostReturnValue(firstHost);
DatabaseType expectedDb;
expectedDb.setName("bigdata");
expectedDb.setPrimary("shard0000");
expectedDb.setSharded(true);
auto future = launchAsync([this, &expectedDb] {
return assertGet(catalogManager()->getDatabase(operationContext(), expectedDb.getName()));
});
// Return empty result set as if the database wasn't found
onFindCommand([this, &firstHost, &secondHost](const RemoteCommandRequest& request) {
ASSERT_EQUALS(firstHost, request.target);
configTargeter()->setFindHostReturnValue(secondHost);
return vector{};
});
// Make sure we retarget and retry.
onFindCommand([this, &expectedDb, &secondHost](const RemoteCommandRequest& request) {
ASSERT_EQUALS(secondHost, request.target);
return vector{expectedDb.toBSON()};
});
const auto dbOpTimePair = future.timed_get(kFutureTimeout);
ASSERT_EQ(expectedDb.toBSON(), dbOpTimePair.value.toBSON());
}
TEST_F(CatalogManagerReplSetTest, GetDatabaseStaleSecondaryRetryNoPrimary) {
HostAndPort testHost{"TestHost1"};
configTargeter()->setFindHostReturnValue(testHost);
auto future = launchAsync([this] {
auto dbResult = catalogManager()->getDatabase(operationContext(), "NonExistent");
ASSERT_EQ(dbResult.getStatus(), ErrorCodes::NotMaster);
});
// Return empty result set as if the database wasn't found
onFindCommand([this, &testHost](const RemoteCommandRequest& request) {
ASSERT_EQUALS(testHost, request.target);
// Make it so when it attempts to retarget and retry it will get a NotMaster error.
configTargeter()->setFindHostReturnValue(Status(ErrorCodes::NotMaster, "no config master"));
return vector{};
});
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, GetDatabaseNotExisting) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
auto future = launchAsync([this] {
auto dbResult = catalogManager()->getDatabase(operationContext(), "NonExistent");
ASSERT_EQ(dbResult.getStatus(), ErrorCodes::NamespaceNotFound);
});
onFindCommand([](const RemoteCommandRequest& request) { return vector{}; });
onFindCommand([](const RemoteCommandRequest& request) { return vector{}; });
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, UpdateCollection) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
CollectionType collection;
collection.setNs(NamespaceString("db.coll"));
collection.setUpdatedAt(network()->now());
collection.setUnique(true);
collection.setEpoch(OID::gen());
collection.setKeyPattern(KeyPattern(BSON("_id" << 1)));
auto future = launchAsync([this, collection] {
auto status = catalogManager()->updateCollection(
operationContext(), collection.getNs().toString(), collection);
ASSERT_OK(status);
});
expectUpdateCollection(HostAndPort("TestHost1"), collection);
// Now wait for the updateCollection call to return
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, UpdateCollectionNotMaster) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
CollectionType collection;
collection.setNs(NamespaceString("db.coll"));
collection.setUpdatedAt(network()->now());
collection.setUnique(true);
collection.setEpoch(OID::gen());
collection.setKeyPattern(KeyPattern(BSON("_id" << 1)));
auto future = launchAsync([this, collection] {
auto status = catalogManager()->updateCollection(
operationContext(), collection.getNs().toString(), collection);
ASSERT_EQUALS(ErrorCodes::NotMaster, status);
});
for (int i = 0; i < 3; ++i) {
onCommand([](const RemoteCommandRequest& request) {
BatchedCommandResponse response;
response.setOk(false);
response.setErrCode(ErrorCodes::NotMaster);
response.setErrMessage("not master");
return response.toBSON();
});
}
// Now wait for the updateCollection call to return
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, UpdateCollectionNotMasterFromTargeter) {
configTargeter()->setFindHostReturnValue(Status(ErrorCodes::NotMaster, "not master"));
CollectionType collection;
collection.setNs(NamespaceString("db.coll"));
collection.setUpdatedAt(network()->now());
collection.setUnique(true);
collection.setEpoch(OID::gen());
collection.setKeyPattern(KeyPattern(BSON("_id" << 1)));
auto future = launchAsync([this, collection] {
auto status = catalogManager()->updateCollection(
operationContext(), collection.getNs().toString(), collection);
ASSERT_EQUALS(ErrorCodes::NotMaster, status);
});
// Now wait for the updateCollection call to return
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, UpdateCollectionNotMasterRetrySuccess) {
HostAndPort host1("TestHost1");
HostAndPort host2("TestHost2");
configTargeter()->setFindHostReturnValue(host1);
CollectionType collection;
collection.setNs(NamespaceString("db.coll"));
collection.setUpdatedAt(network()->now());
collection.setUnique(true);
collection.setEpoch(OID::gen());
collection.setKeyPattern(KeyPattern(BSON("_id" << 1)));
auto future = launchAsync([this, collection] {
auto status = catalogManager()->updateCollection(
operationContext(), collection.getNs().toString(), collection);
ASSERT_OK(status);
});
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(host1, request.target);
BatchedCommandResponse response;
response.setOk(false);
response.setErrCode(ErrorCodes::NotMaster);
response.setErrMessage("not master");
// Ensure that when the catalog manager tries to retarget after getting the
// NotMaster response, it will get back a new target.
configTargeter()->setFindHostReturnValue(host2);
return response.toBSON();
});
expectUpdateCollection(HostAndPort(host2), collection);
// Now wait for the updateCollection call to return
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, GetAllShardsValid) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
ShardType s1;
s1.setName("shard0000");
s1.setHost("ShardHost");
s1.setDraining(false);
s1.setMaxSizeMB(50);
s1.setTags({"tag1", "tag2", "tag3"});
ShardType s2;
s2.setName("shard0001");
s2.setHost("ShardHost");
ShardType s3;
s3.setName("shard0002");
s3.setHost("ShardHost");
s3.setMaxSizeMB(65);
const vector expectedShardsList = {s1, s2, s3};
auto future = launchAsync([this] {
auto shards = assertGet(catalogManager()->getAllShards(operationContext()));
return shards.value;
});
onFindCommand([this, &s1, &s2, &s3](const RemoteCommandRequest& request) {
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), ShardType::ConfigNS);
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
ASSERT_EQ(query->ns(), ShardType::ConfigNS);
ASSERT_EQ(query->getFilter(), BSONObj());
ASSERT_EQ(query->getSort(), BSONObj());
ASSERT_FALSE(query->getLimit().is_initialized());
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
return vector{s1.toBSON(), s2.toBSON(), s3.toBSON()};
});
const vector actualShardsList = future.timed_get(kFutureTimeout);
ASSERT_EQ(actualShardsList.size(), expectedShardsList.size());
for (size_t i = 0; i < actualShardsList.size(); ++i) {
ASSERT_EQ(actualShardsList[i].toBSON(), expectedShardsList[i].toBSON());
}
}
TEST_F(CatalogManagerReplSetTest, GetAllShardsWithInvalidShard) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
auto future = launchAsync([this] {
auto status = catalogManager()->getAllShards(operationContext());
ASSERT_EQ(ErrorCodes::FailedToParse, status.getStatus());
});
onFindCommand([](const RemoteCommandRequest& request) {
// Valid ShardType
ShardType s1;
s1.setName("shard0001");
s1.setHost("ShardHost");
return vector{
s1.toBSON(),
BSONObj() // empty document is invalid
};
});
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, GetChunksForNSWithSortAndLimit) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
OID oid = OID::gen();
ChunkType chunkA;
chunkA.setNS("TestDB.TestColl");
chunkA.setMin(BSON("a" << 1));
chunkA.setMax(BSON("a" << 100));
chunkA.setVersion({1, 2, oid});
chunkA.setShard("shard0000");
ChunkType chunkB;
chunkB.setNS("TestDB.TestColl");
chunkB.setMin(BSON("a" << 100));
chunkB.setMax(BSON("a" << 200));
chunkB.setVersion({3, 4, oid});
chunkB.setShard("shard0001");
ChunkVersion queryChunkVersion({1, 2, oid});
const BSONObj chunksQuery(
BSON(ChunkType::ns("TestDB.TestColl")
<< ChunkType::DEPRECATED_lastmod()
<< BSON("$gte" << static_cast(queryChunkVersion.toLong()))));
const OpTime newOpTime(Timestamp(7, 6), 5);
auto future = launchAsync([this, &chunksQuery, newOpTime] {
vector chunks;
OpTime opTime;
ASSERT_OK(catalogManager()->getChunks(operationContext(),
chunksQuery,
BSON(ChunkType::DEPRECATED_lastmod() << -1),
1,
&chunks,
&opTime));
ASSERT_EQ(2U, chunks.size());
ASSERT_EQ(newOpTime, opTime);
return chunks;
});
onFindWithMetadataCommand([this, &chunksQuery, chunkA, chunkB, newOpTime](
const RemoteCommandRequest& request) {
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), ChunkType::ConfigNS);
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
ASSERT_EQ(query->ns(), ChunkType::ConfigNS);
ASSERT_EQ(query->getFilter(), chunksQuery);
ASSERT_EQ(query->getSort(), BSON(ChunkType::DEPRECATED_lastmod() << -1));
ASSERT_EQ(query->getLimit().get(), 1);
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
ReplSetMetadata metadata(10, OpTime(), newOpTime, 100, OID(), 30, -1);
BSONObjBuilder builder;
metadata.writeToMetadata(&builder);
return std::make_tuple(vector{chunkA.toBSON(), chunkB.toBSON()}, builder.obj());
});
const auto& chunks = future.timed_get(kFutureTimeout);
ASSERT_EQ(chunkA.toBSON(), chunks[0].toBSON());
ASSERT_EQ(chunkB.toBSON(), chunks[1].toBSON());
}
TEST_F(CatalogManagerReplSetTest, GetChunksForNSNoSortNoLimit) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
ChunkVersion queryChunkVersion({1, 2, OID::gen()});
const BSONObj chunksQuery(
BSON(ChunkType::ns("TestDB.TestColl")
<< ChunkType::DEPRECATED_lastmod()
<< BSON("$gte" << static_cast(queryChunkVersion.toLong()))));
auto future = launchAsync([this, &chunksQuery] {
vector chunks;
ASSERT_OK(catalogManager()->getChunks(
operationContext(), chunksQuery, BSONObj(), boost::none, &chunks, nullptr));
ASSERT_EQ(0U, chunks.size());
return chunks;
});
onFindCommand([this, &chunksQuery](const RemoteCommandRequest& request) {
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), ChunkType::ConfigNS);
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
ASSERT_EQ(query->ns(), ChunkType::ConfigNS);
ASSERT_EQ(query->getFilter(), chunksQuery);
ASSERT_EQ(query->getSort(), BSONObj());
ASSERT_FALSE(query->getLimit().is_initialized());
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
return vector{};
});
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, GetChunksForNSInvalidChunk) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
ChunkVersion queryChunkVersion({1, 2, OID::gen()});
const BSONObj chunksQuery(
BSON(ChunkType::ns("TestDB.TestColl")
<< ChunkType::DEPRECATED_lastmod()
<< BSON("$gte" << static_cast(queryChunkVersion.toLong()))));
auto future = launchAsync([this, &chunksQuery] {
vector chunks;
Status status = catalogManager()->getChunks(
operationContext(), chunksQuery, BSONObj(), boost::none, &chunks, nullptr);
ASSERT_EQUALS(ErrorCodes::FailedToParse, status);
ASSERT_EQ(0U, chunks.size());
});
onFindCommand([&chunksQuery](const RemoteCommandRequest& request) {
ChunkType chunkA;
chunkA.setNS("TestDB.TestColl");
chunkA.setMin(BSON("a" << 1));
chunkA.setMax(BSON("a" << 100));
chunkA.setVersion({1, 2, OID::gen()});
chunkA.setShard("shard0000");
ChunkType chunkB;
chunkB.setNS("TestDB.TestColl");
chunkB.setMin(BSON("a" << 100));
chunkB.setMax(BSON("a" << 200));
chunkB.setVersion({3, 4, OID::gen()});
// Missing shard id
return vector{chunkA.toBSON(), chunkB.toBSON()};
});
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, RunUserManagementReadCommand) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
auto future = launchAsync([this] {
BSONObjBuilder responseBuilder;
bool ok = catalogManager()->runUserManagementReadCommand(
operationContext(), "test", BSON("usersInfo" << 1), &responseBuilder);
ASSERT_TRUE(ok);
BSONObj response = responseBuilder.obj();
ASSERT_TRUE(response["ok"].trueValue());
auto users = response["users"].Array();
ASSERT_EQUALS(0U, users.size());
});
onCommand([](const RemoteCommandRequest& request) {
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
ASSERT_EQUALS("test", request.dbname);
ASSERT_EQUALS(BSON("usersInfo" << 1 << "maxTimeMS" << 30000), request.cmdObj);
return BSON("ok" << 1 << "users" << BSONArrayBuilder().arr());
});
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, RunUserManagementReadCommandUnsatisfiedReadPref) {
configTargeter()->setFindHostReturnValue(
Status(ErrorCodes::FailedToSatisfyReadPreference, "no nodes up"));
BSONObjBuilder responseBuilder;
bool ok = catalogManager()->runUserManagementReadCommand(
operationContext(), "test", BSON("usersInfo" << 1), &responseBuilder);
ASSERT_FALSE(ok);
Status commandStatus = getStatusFromCommandResult(responseBuilder.obj());
ASSERT_EQUALS(ErrorCodes::FailedToSatisfyReadPreference, commandStatus);
}
TEST_F(CatalogManagerReplSetTest, RunUserManagementWriteCommandSuccess) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
auto future = launchAsync([this] {
BSONObjBuilder responseBuilder;
bool ok = catalogManager()->runUserManagementWriteCommand(operationContext(),
"dropUser",
"test",
BSON("dropUser"
<< "test"),
&responseBuilder);
ASSERT_FALSE(ok);
Status commandStatus = getStatusFromCommandResult(responseBuilder.obj());
ASSERT_EQUALS(ErrorCodes::UserNotFound, commandStatus);
});
onCommand([](const RemoteCommandRequest& request) {
ASSERT_EQUALS("test", request.dbname);
// Since no write concern was sent we will add w:majority
ASSERT_EQUALS(BSON("dropUser"
<< "test"
<< "writeConcern" << BSON("w"
<< "majority"
<< "wtimeout" << 0) << "maxTimeMS" << 30000),
request.cmdObj);
ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
BSONObjBuilder responseBuilder;
Command::appendCommandStatus(responseBuilder,
Status(ErrorCodes::UserNotFound, "User test@test not found"));
return responseBuilder.obj();
});
// Now wait for the runUserManagementWriteCommand call to return
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, RunUserManagementWriteCommandInvalidWriteConcern) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
BSONObjBuilder responseBuilder;
bool ok =
catalogManager()->runUserManagementWriteCommand(operationContext(),
"dropUser",
"test",
BSON("dropUser"
<< "test"
<< "writeConcern" << BSON("w" << 2)),
&responseBuilder);
ASSERT_FALSE(ok);
Status commandStatus = getStatusFromCommandResult(responseBuilder.obj());
ASSERT_EQUALS(ErrorCodes::InvalidOptions, commandStatus);
ASSERT_STRING_CONTAINS(commandStatus.reason(), "Invalid replication write concern");
}
TEST_F(CatalogManagerReplSetTest, RunUserManagementWriteCommandRewriteWriteConcern) {
// Tests that if you send a w:1 write concern it gets replaced with w:majority
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
distLock()->expectLock(
[](StringData name,
StringData whyMessage,
Milliseconds waitFor,
Milliseconds lockTryInterval) {
ASSERT_EQUALS("authorizationData", name);
ASSERT_EQUALS("dropUser", whyMessage);
},
Status::OK());
auto future =
launchAsync([this] {
BSONObjBuilder responseBuilder;
bool ok =
catalogManager()->runUserManagementWriteCommand(
operationContext(),
"dropUser",
"test",
BSON("dropUser"
<< "test"
<< "writeConcern" << BSON("w" << 1 << "wtimeout" << 30)),
&responseBuilder);
ASSERT_FALSE(ok);
Status commandStatus = getStatusFromCommandResult(responseBuilder.obj());
ASSERT_EQUALS(ErrorCodes::UserNotFound, commandStatus);
});
onCommand([](const RemoteCommandRequest& request) {
ASSERT_EQUALS("test", request.dbname);
ASSERT_EQUALS(BSON("dropUser"
<< "test"
<< "writeConcern" << BSON("w"
<< "majority"
<< "wtimeout" << 30) << "maxTimeMS" << 30000),
request.cmdObj);
ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
BSONObjBuilder responseBuilder;
Command::appendCommandStatus(responseBuilder,
Status(ErrorCodes::UserNotFound, "User test@test not found"));
return responseBuilder.obj();
});
// Now wait for the runUserManagementWriteCommand call to return
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, RunUserManagementWriteCommandNotMaster) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
auto future = launchAsync([this] {
BSONObjBuilder responseBuilder;
bool ok = catalogManager()->runUserManagementWriteCommand(operationContext(),
"dropUser",
"test",
BSON("dropUser"
<< "test"),
&responseBuilder);
ASSERT_FALSE(ok);
Status commandStatus = getStatusFromCommandResult(responseBuilder.obj());
ASSERT_EQUALS(ErrorCodes::NotMaster, commandStatus);
});
for (int i = 0; i < 3; ++i) {
onCommand([](const RemoteCommandRequest& request) {
BSONObjBuilder responseBuilder;
Command::appendCommandStatus(responseBuilder,
Status(ErrorCodes::NotMaster, "not master"));
return responseBuilder.obj();
});
}
// Now wait for the runUserManagementWriteCommand call to return
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, RunUserManagementWriteCommandNotMasterRetrySuccess) {
HostAndPort host1("TestHost1");
HostAndPort host2("TestHost2");
configTargeter()->setFindHostReturnValue(host1);
auto future = launchAsync([this] {
BSONObjBuilder responseBuilder;
bool ok = catalogManager()->runUserManagementWriteCommand(operationContext(),
"dropUser",
"test",
BSON("dropUser"
<< "test"),
&responseBuilder);
ASSERT_TRUE(ok);
Status commandStatus = getStatusFromCommandResult(responseBuilder.obj());
ASSERT_OK(commandStatus);
});
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(host1, request.target);
BSONObjBuilder responseBuilder;
Command::appendCommandStatus(responseBuilder, Status(ErrorCodes::NotMaster, "not master"));
// Ensure that when the catalog manager tries to retarget after getting the
// NotMaster response, it will get back a new target.
configTargeter()->setFindHostReturnValue(host2);
return responseBuilder.obj();
});
onCommand([host2](const RemoteCommandRequest& request) {
ASSERT_EQUALS(host2, request.target);
ASSERT_EQUALS("test", request.dbname);
// Since no write concern was sent we will add w:majority
ASSERT_EQUALS(BSON("dropUser"
<< "test"
<< "writeConcern" << BSON("w"
<< "majority"
<< "wtimeout" << 0) << "maxTimeMS" << 30000),
request.cmdObj);
ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
return BSON("ok" << 1);
});
// Now wait for the runUserManagementWriteCommand call to return
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, GetCollectionsValidResultsNoDb) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
CollectionType coll1;
coll1.setNs(NamespaceString{"test.system.indexes"});
coll1.setUpdatedAt(network()->now());
coll1.setUnique(true);
coll1.setEpoch(OID::gen());
coll1.setKeyPattern(KeyPattern{BSON("_id" << 1)});
ASSERT_OK(coll1.validate());
CollectionType coll2;
coll2.setNs(NamespaceString{"test.coll1"});
coll2.setUpdatedAt(network()->now());
coll2.setUnique(false);
coll2.setEpoch(OID::gen());
coll2.setKeyPattern(KeyPattern{BSON("_id" << 1)});
ASSERT_OK(coll2.validate());
CollectionType coll3;
coll3.setNs(NamespaceString{"anotherdb.coll1"});
coll3.setUpdatedAt(network()->now());
coll3.setUnique(false);
coll3.setEpoch(OID::gen());
coll3.setKeyPattern(KeyPattern{BSON("_id" << 1)});
ASSERT_OK(coll3.validate());
const OpTime newOpTime(Timestamp(7, 6), 5);
auto future = launchAsync([this, newOpTime] {
vector collections;
OpTime opTime;
const auto status =
catalogManager()->getCollections(operationContext(), nullptr, &collections, &opTime);
ASSERT_OK(status);
ASSERT_EQ(newOpTime, opTime);
return collections;
});
onFindWithMetadataCommand([this, coll1, coll2, coll3, newOpTime](
const RemoteCommandRequest& request) {
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), CollectionType::ConfigNS);
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
ASSERT_EQ(query->ns(), CollectionType::ConfigNS);
ASSERT_EQ(query->getFilter(), BSONObj());
ASSERT_EQ(query->getSort(), BSONObj());
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
ReplSetMetadata metadata(10, OpTime(), newOpTime, 100, OID(), 30, -1);
BSONObjBuilder builder;
metadata.writeToMetadata(&builder);
return std::make_tuple(vector{coll1.toBSON(), coll2.toBSON(), coll3.toBSON()},
builder.obj());
});
const auto& actualColls = future.timed_get(kFutureTimeout);
ASSERT_EQ(3U, actualColls.size());
ASSERT_EQ(coll1.toBSON(), actualColls[0].toBSON());
ASSERT_EQ(coll2.toBSON(), actualColls[1].toBSON());
ASSERT_EQ(coll3.toBSON(), actualColls[2].toBSON());
}
TEST_F(CatalogManagerReplSetTest, GetCollectionsValidResultsWithDb) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
CollectionType coll1;
coll1.setNs(NamespaceString{"test.system.indexes"});
coll1.setUpdatedAt(network()->now());
coll1.setUnique(true);
coll1.setEpoch(OID::gen());
coll1.setKeyPattern(KeyPattern{BSON("_id" << 1)});
CollectionType coll2;
coll2.setNs(NamespaceString{"test.coll1"});
coll2.setUpdatedAt(network()->now());
coll2.setUnique(false);
coll2.setEpoch(OID::gen());
coll2.setKeyPattern(KeyPattern{BSON("_id" << 1)});
auto future = launchAsync([this] {
string dbName = "test";
vector collections;
const auto status =
catalogManager()->getCollections(operationContext(), &dbName, &collections, nullptr);
ASSERT_OK(status);
return collections;
});
onFindCommand([this, coll1, coll2](const RemoteCommandRequest& request) {
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), CollectionType::ConfigNS);
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
ASSERT_EQ(query->ns(), CollectionType::ConfigNS);
{
BSONObjBuilder b;
b.appendRegex(CollectionType::fullNs(), "^test\\.");
ASSERT_EQ(query->getFilter(), b.obj());
}
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
return vector{coll1.toBSON(), coll2.toBSON()};
});
const auto& actualColls = future.timed_get(kFutureTimeout);
ASSERT_EQ(2U, actualColls.size());
ASSERT_EQ(coll1.toBSON(), actualColls[0].toBSON());
ASSERT_EQ(coll2.toBSON(), actualColls[1].toBSON());
}
TEST_F(CatalogManagerReplSetTest, GetCollectionsInvalidCollectionType) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
auto future = launchAsync([this] {
string dbName = "test";
vector collections;
const auto status =
catalogManager()->getCollections(operationContext(), &dbName, &collections, nullptr);
ASSERT_EQ(ErrorCodes::FailedToParse, status);
ASSERT_EQ(0U, collections.size());
});
CollectionType validColl;
validColl.setNs(NamespaceString{"test.system.indexes"});
validColl.setUpdatedAt(network()->now());
validColl.setUnique(true);
validColl.setEpoch(OID::gen());
validColl.setKeyPattern(KeyPattern{BSON("_id" << 1)});
ASSERT_OK(validColl.validate());
onFindCommand([this, validColl](const RemoteCommandRequest& request) {
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), CollectionType::ConfigNS);
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
ASSERT_EQ(query->ns(), CollectionType::ConfigNS);
{
BSONObjBuilder b;
b.appendRegex(CollectionType::fullNs(), "^test\\.");
ASSERT_EQ(query->getFilter(), b.obj());
}
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
return vector{
validColl.toBSON(),
BSONObj() // empty document is invalid
};
});
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, GetDatabasesForShardValid) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
DatabaseType dbt1;
dbt1.setName("db1");
dbt1.setPrimary("shard0000");
DatabaseType dbt2;
dbt2.setName("db2");
dbt2.setPrimary("shard0000");
auto future = launchAsync([this] {
vector dbs;
const auto status =
catalogManager()->getDatabasesForShard(operationContext(), "shard0000", &dbs);
ASSERT_OK(status);
return dbs;
});
onFindCommand([this, dbt1, dbt2](const RemoteCommandRequest& request) {
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), DatabaseType::ConfigNS);
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
ASSERT_EQ(query->ns(), DatabaseType::ConfigNS);
ASSERT_EQ(query->getFilter(), BSON(DatabaseType::primary(dbt1.getPrimary())));
ASSERT_EQ(query->getSort(), BSONObj());
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
return vector{dbt1.toBSON(), dbt2.toBSON()};
});
const auto& actualDbNames = future.timed_get(kFutureTimeout);
ASSERT_EQ(2U, actualDbNames.size());
ASSERT_EQ(dbt1.getName(), actualDbNames[0]);
ASSERT_EQ(dbt2.getName(), actualDbNames[1]);
}
TEST_F(CatalogManagerReplSetTest, GetDatabasesForShardInvalidDoc) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
auto future = launchAsync([this] {
vector dbs;
const auto status =
catalogManager()->getDatabasesForShard(operationContext(), "shard0000", &dbs);
ASSERT_EQ(ErrorCodes::TypeMismatch, status);
ASSERT_EQ(0U, dbs.size());
});
onFindCommand([](const RemoteCommandRequest& request) {
DatabaseType dbt1;
dbt1.setName("db1");
dbt1.setPrimary("shard0000");
return vector{
dbt1.toBSON(),
BSON(DatabaseType::name() << 0) // DatabaseType::name() should be a string
};
});
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, GetTagsForCollection) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
TagsType tagA;
tagA.setNS("TestDB.TestColl");
tagA.setTag("TagA");
tagA.setMinKey(BSON("a" << 100));
tagA.setMaxKey(BSON("a" << 200));
TagsType tagB;
tagB.setNS("TestDB.TestColl");
tagB.setTag("TagB");
tagB.setMinKey(BSON("a" << 200));
tagB.setMaxKey(BSON("a" << 300));
auto future = launchAsync([this] {
vector tags;
ASSERT_OK(
catalogManager()->getTagsForCollection(operationContext(), "TestDB.TestColl", &tags));
ASSERT_EQ(2U, tags.size());
return tags;
});
onFindCommand([this, tagA, tagB](const RemoteCommandRequest& request) {
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), TagsType::ConfigNS);
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
ASSERT_EQ(query->ns(), TagsType::ConfigNS);
ASSERT_EQ(query->getFilter(), BSON(TagsType::ns("TestDB.TestColl")));
ASSERT_EQ(query->getSort(), BSON(TagsType::min() << 1));
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
return vector{tagA.toBSON(), tagB.toBSON()};
});
const auto& tags = future.timed_get(kFutureTimeout);
ASSERT_EQ(tagA.toBSON(), tags[0].toBSON());
ASSERT_EQ(tagB.toBSON(), tags[1].toBSON());
}
TEST_F(CatalogManagerReplSetTest, GetTagsForCollectionNoTags) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
auto future = launchAsync([this] {
vector tags;
ASSERT_OK(
catalogManager()->getTagsForCollection(operationContext(), "TestDB.TestColl", &tags));
ASSERT_EQ(0U, tags.size());
return tags;
});
onFindCommand([](const RemoteCommandRequest& request) { return vector{}; });
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, GetTagsForCollectionInvalidTag) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
auto future = launchAsync([this] {
vector tags;
Status status =
catalogManager()->getTagsForCollection(operationContext(), "TestDB.TestColl", &tags);
ASSERT_EQUALS(ErrorCodes::FailedToParse, status);
ASSERT_EQ(0U, tags.size());
});
onFindCommand([](const RemoteCommandRequest& request) {
TagsType tagA;
tagA.setNS("TestDB.TestColl");
tagA.setTag("TagA");
tagA.setMinKey(BSON("a" << 100));
tagA.setMaxKey(BSON("a" << 200));
TagsType tagB;
tagB.setNS("TestDB.TestColl");
tagB.setTag("TagB");
tagB.setMinKey(BSON("a" << 200));
// Missing maxKey
return vector{tagA.toBSON(), tagB.toBSON()};
});
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, GetTagForChunkOneTagFound) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
ChunkType chunk;
chunk.setNS("test.coll");
chunk.setMin(BSON("a" << 1));
chunk.setMax(BSON("a" << 100));
chunk.setVersion({1, 2, OID::gen()});
chunk.setShard("shard0000");
ASSERT_OK(chunk.validate());
auto future = launchAsync([this, chunk] {
return assertGet(catalogManager()->getTagForChunk(operationContext(), "test.coll", chunk));
});
onFindCommand([this, chunk](const RemoteCommandRequest& request) {
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), TagsType::ConfigNS);
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
ASSERT_EQ(query->ns(), TagsType::ConfigNS);
ASSERT_EQ(query->getFilter(),
BSON(TagsType::ns(chunk.getNS())
<< TagsType::min() << BSON("$lte" << chunk.getMin()) << TagsType::max()
<< BSON("$gte" << chunk.getMax())));
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
TagsType tt;
tt.setNS("test.coll");
tt.setTag("tag");
tt.setMinKey(BSON("a" << 1));
tt.setMaxKey(BSON("a" << 100));
return vector{tt.toBSON()};
});
const string& tagStr = future.timed_get(kFutureTimeout);
ASSERT_EQ("tag", tagStr);
}
TEST_F(CatalogManagerReplSetTest, GetTagForChunkNoTagFound) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
ChunkType chunk;
chunk.setNS("test.coll");
chunk.setMin(BSON("a" << 1));
chunk.setMax(BSON("a" << 100));
chunk.setVersion({1, 2, OID::gen()});
chunk.setShard("shard0000");
ASSERT_OK(chunk.validate());
auto future = launchAsync([this, chunk] {
return assertGet(catalogManager()->getTagForChunk(operationContext(), "test.coll", chunk));
});
onFindCommand([this, chunk](const RemoteCommandRequest& request) {
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), TagsType::ConfigNS);
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
ASSERT_EQ(query->ns(), TagsType::ConfigNS);
ASSERT_EQ(query->getFilter(),
BSON(TagsType::ns(chunk.getNS())
<< TagsType::min() << BSON("$lte" << chunk.getMin()) << TagsType::max()
<< BSON("$gte" << chunk.getMax())));
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
return vector{};
});
const string& tagStr = future.timed_get(kFutureTimeout);
ASSERT_EQ("", tagStr); // empty string returned when tag document not found
}
TEST_F(CatalogManagerReplSetTest, GetTagForChunkInvalidTagDoc) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
ChunkType chunk;
chunk.setNS("test.coll");
chunk.setMin(BSON("a" << 1));
chunk.setMax(BSON("a" << 100));
chunk.setVersion({1, 2, OID::gen()});
chunk.setShard("shard0000");
ASSERT_OK(chunk.validate());
auto future = launchAsync([this, chunk] {
const auto tagResult =
catalogManager()->getTagForChunk(operationContext(), "test.coll", chunk);
ASSERT_EQ(ErrorCodes::FailedToParse, tagResult.getStatus());
});
onFindCommand([this, chunk](const RemoteCommandRequest& request) {
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(nss.ns(), TagsType::ConfigNS);
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
ASSERT_EQ(query->ns(), TagsType::ConfigNS);
ASSERT_EQ(query->getFilter(),
BSON(TagsType::ns(chunk.getNS())
<< TagsType::min() << BSON("$lte" << chunk.getMin()) << TagsType::max()
<< BSON("$gte" << chunk.getMax())));
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
// Return a tag document missing the min key
return vector{BSON(TagsType::ns("test.mycol") << TagsType::tag("tag")
<< TagsType::max(BSON("a" << 20)))};
});
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, UpdateDatabase) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
DatabaseType dbt;
dbt.setName("test");
dbt.setPrimary("shard0000");
dbt.setSharded(true);
auto future = launchAsync([this, dbt] {
auto status = catalogManager()->updateDatabase(operationContext(), dbt.getName(), dbt);
ASSERT_OK(status);
});
onCommand([dbt](const RemoteCommandRequest& request) {
ASSERT_EQUALS("config", request.dbname);
ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
BatchedUpdateRequest actualBatchedUpdate;
std::string errmsg;
ASSERT_TRUE(actualBatchedUpdate.parseBSON(request.dbname, request.cmdObj, &errmsg));
ASSERT_EQUALS(DatabaseType::ConfigNS, actualBatchedUpdate.getNS().ns());
auto updates = actualBatchedUpdate.getUpdates();
ASSERT_EQUALS(1U, updates.size());
auto update = updates.front();
ASSERT_TRUE(update->getUpsert());
ASSERT_FALSE(update->getMulti());
ASSERT_EQUALS(update->getQuery(), BSON(DatabaseType::name(dbt.getName())));
ASSERT_EQUALS(update->getUpdateExpr(), dbt.toBSON());
BatchedCommandResponse response;
response.setOk(true);
response.setNModified(1);
return response.toBSON();
});
// Now wait for the updateDatabase call to return
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, UpdateDatabaseExceededTimeLimit) {
HostAndPort host1("TestHost1");
configTargeter()->setFindHostReturnValue(host1);
DatabaseType dbt;
dbt.setName("test");
dbt.setPrimary("shard0001");
dbt.setSharded(false);
auto future = launchAsync([this, dbt] {
auto status = catalogManager()->updateDatabase(operationContext(), dbt.getName(), dbt);
ASSERT_EQ(ErrorCodes::ExceededTimeLimit, status);
});
onCommand([host1](const RemoteCommandRequest& request) {
ASSERT_EQUALS(host1, request.target);
BatchedCommandResponse response;
response.setOk(false);
response.setErrCode(ErrorCodes::ExceededTimeLimit);
response.setErrMessage("operation timed out");
return response.toBSON();
});
// Now wait for the updateDatabase call to return
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, ApplyChunkOpsDeprecatedSuccessful) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
BSONArray updateOps = BSON_ARRAY(BSON("update1"
<< "first update")
<< BSON("update2"
<< "second update"));
BSONArray preCondition = BSON_ARRAY(BSON("precondition1"
<< "first precondition")
<< BSON("precondition2"
<< "second precondition"));
std::string nss = "config.chunks";
ChunkVersion lastChunkVersion(0, 0, OID());
auto future = launchAsync([this, updateOps, preCondition, nss, lastChunkVersion] {
auto status = catalogManager()->applyChunkOpsDeprecated(
operationContext(), updateOps, preCondition, nss, lastChunkVersion);
ASSERT_OK(status);
});
onCommand(
[updateOps, preCondition, nss](const RemoteCommandRequest& request) {
ASSERT_EQUALS("config", request.dbname);
ASSERT_EQUALS(BSON("w"
<< "majority"
<< "wtimeout" << 15000),
request.cmdObj["writeConcern"].Obj());
ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
ASSERT_EQUALS(updateOps, request.cmdObj["applyOps"].Obj());
ASSERT_EQUALS(preCondition, request.cmdObj["preCondition"].Obj());
return BSON("ok" << 1);
});
// Now wait for the applyChunkOpsDeprecated call to return
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, ApplyChunkOpsDeprecatedSuccessfulWithCheck) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
BSONArray updateOps = BSON_ARRAY(BSON("update1"
<< "first update")
<< BSON("update2"
<< "second update"));
BSONArray preCondition = BSON_ARRAY(BSON("precondition1"
<< "first precondition")
<< BSON("precondition2"
<< "second precondition"));
std::string nss = "config.chunks";
ChunkVersion lastChunkVersion(0, 0, OID());
auto future = launchAsync([this, updateOps, preCondition, nss, lastChunkVersion] {
auto status = catalogManager()->applyChunkOpsDeprecated(
operationContext(), updateOps, preCondition, nss, lastChunkVersion);
ASSERT_OK(status);
});
onCommand([&](const RemoteCommandRequest& request) {
BSONObjBuilder responseBuilder;
Command::appendCommandStatus(responseBuilder,
Status(ErrorCodes::DuplicateKey, "precondition failed"));
return responseBuilder.obj();
});
onFindCommand([this](const RemoteCommandRequest& request) {
OID oid = OID::gen();
ChunkType chunk;
chunk.setNS("TestDB.TestColl");
chunk.setMin(BSON("a" << 1));
chunk.setMax(BSON("a" << 100));
chunk.setVersion({1, 2, oid});
chunk.setShard("shard0000");
return vector{chunk.toBSON()};
});
// Now wait for the applyChunkOpsDeprecated call to return
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, ApplyChunkOpsDeprecatedFailedWithCheck) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
BSONArray updateOps = BSON_ARRAY(BSON("update1"
<< "first update")
<< BSON("update2"
<< "second update"));
BSONArray preCondition = BSON_ARRAY(BSON("precondition1"
<< "first precondition")
<< BSON("precondition2"
<< "second precondition"));
std::string nss = "config.chunks";
ChunkVersion lastChunkVersion(0, 0, OID());
auto future = launchAsync([this, updateOps, preCondition, nss, lastChunkVersion] {
auto status = catalogManager()->applyChunkOpsDeprecated(
operationContext(), updateOps, preCondition, nss, lastChunkVersion);
ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, status);
});
onCommand([&](const RemoteCommandRequest& request) {
BSONObjBuilder responseBuilder;
Command::appendCommandStatus(responseBuilder,
Status(ErrorCodes::NoMatchingDocument, "some error"));
return responseBuilder.obj();
});
onFindCommand([this](const RemoteCommandRequest& request) { return vector{}; });
// Now wait for the applyChunkOpsDeprecated call to return
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, createDatabaseSuccess) {
const string dbname = "databaseToCreate";
const HostAndPort configHost("TestHost1");
configTargeter()->setFindHostReturnValue(configHost);
ShardType s0;
s0.setName("shard0000");
s0.setHost("ShardHost0:27017");
ShardType s1;
s1.setName("shard0001");
s1.setHost("ShardHost1:27017");
ShardType s2;
s2.setName("shard0002");
s2.setHost("ShardHost2:27017");
// Prime the shard registry with information about the existing shards
auto future = launchAsync([this] { shardRegistry()->reload(operationContext()); });
onFindCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(configHost, request.target);
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
ASSERT_EQ(ShardType::ConfigNS, query->ns());
ASSERT_EQ(BSONObj(), query->getFilter());
ASSERT_EQ(BSONObj(), query->getSort());
ASSERT_FALSE(query->getLimit().is_initialized());
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
return vector{s0.toBSON(), s1.toBSON(), s2.toBSON()};
});
future.timed_get(kFutureTimeout);
// Set up all the target mocks return values.
RemoteCommandTargeterMock::get(
shardRegistry()->getShard(operationContext(), s0.getName())->getTargeter())
->setFindHostReturnValue(HostAndPort(s0.getHost()));
RemoteCommandTargeterMock::get(
shardRegistry()->getShard(operationContext(), s1.getName())->getTargeter())
->setFindHostReturnValue(HostAndPort(s1.getHost()));
RemoteCommandTargeterMock::get(
shardRegistry()->getShard(operationContext(), s2.getName())->getTargeter())
->setFindHostReturnValue(HostAndPort(s2.getHost()));
// Now actually start the createDatabase work.
distLock()->expectLock([dbname](StringData name,
StringData whyMessage,
Milliseconds waitFor,
Milliseconds lockTryInterval) {},
Status::OK());
future = launchAsync([this, dbname] {
Status status = catalogManager()->createDatabase(operationContext(), dbname);
ASSERT_OK(status);
});
// Report no databases with the same name already exist
onFindCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(configHost, request.target);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
ASSERT_EQ(DatabaseType::ConfigNS, nss.ns());
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
return vector{};
});
// Return size information about first shard
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(s0.getHost(), request.target.toString());
ASSERT_EQUALS("admin", request.dbname);
string cmdName = request.cmdObj.firstElement().fieldName();
ASSERT_EQUALS("listDatabases", cmdName);
ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName));
ASSERT_EQUALS(rpc::ServerSelectionMetadata(true, boost::none).toBSON(), request.metadata);
return BSON("ok" << 1 << "totalSize" << 10);
});
// Return size information about second shard
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(s1.getHost(), request.target.toString());
ASSERT_EQUALS("admin", request.dbname);
string cmdName = request.cmdObj.firstElement().fieldName();
ASSERT_EQUALS("listDatabases", cmdName);
ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName));
ASSERT_EQUALS(rpc::ServerSelectionMetadata(true, boost::none).toBSON(), request.metadata);
return BSON("ok" << 1 << "totalSize" << 1);
});
// Return size information about third shard
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(s2.getHost(), request.target.toString());
ASSERT_EQUALS("admin", request.dbname);
string cmdName = request.cmdObj.firstElement().fieldName();
ASSERT_EQUALS("listDatabases", cmdName);
ASSERT_EQUALS(rpc::ServerSelectionMetadata(true, boost::none).toBSON(), request.metadata);
return BSON("ok" << 1 << "totalSize" << 100);
});
// Process insert to config.databases collection
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(configHost, request.target);
ASSERT_EQUALS("config", request.dbname);
ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
BatchedInsertRequest actualBatchedInsert;
std::string errmsg;
ASSERT_TRUE(actualBatchedInsert.parseBSON(request.dbname, request.cmdObj, &errmsg));
ASSERT_EQUALS(DatabaseType::ConfigNS, actualBatchedInsert.getNS().ns());
auto inserts = actualBatchedInsert.getDocuments();
ASSERT_EQUALS(1U, inserts.size());
auto insert = inserts.front();
DatabaseType expectedDb;
expectedDb.setName(dbname);
expectedDb.setPrimary(s1.getName()); // This is the one we reported with the smallest size
expectedDb.setSharded(false);
ASSERT_EQUALS(expectedDb.toBSON(), insert);
BatchedCommandResponse response;
response.setOk(true);
response.setNModified(1);
return response.toBSON();
});
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, createDatabaseDistLockHeld) {
const string dbname = "databaseToCreate";
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
distLock()->expectLock(
[dbname](StringData name,
StringData whyMessage,
Milliseconds waitFor,
Milliseconds lockTryInterval) {
ASSERT_EQUALS(dbname, name);
ASSERT_EQUALS("createDatabase", whyMessage);
},
Status(ErrorCodes::LockBusy, "lock already held"));
Status status = catalogManager()->createDatabase(operationContext(), dbname);
ASSERT_EQUALS(ErrorCodes::LockBusy, status);
}
TEST_F(CatalogManagerReplSetTest, createDatabaseDBExists) {
const string dbname = "databaseToCreate";
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
distLock()->expectLock([dbname](StringData name,
StringData whyMessage,
Milliseconds waitFor,
Milliseconds lockTryInterval) {},
Status::OK());
auto future = launchAsync([this, dbname] {
Status status = catalogManager()->createDatabase(operationContext(), dbname);
ASSERT_EQUALS(ErrorCodes::NamespaceExists, status);
});
onFindCommand([this, dbname](const RemoteCommandRequest& request) {
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
BSONObjBuilder queryBuilder;
queryBuilder.appendRegex(
DatabaseType::name(), (string) "^" + pcrecpp::RE::QuoteMeta(dbname) + "$", "i");
ASSERT_EQ(DatabaseType::ConfigNS, query->ns());
ASSERT_EQ(queryBuilder.obj(), query->getFilter());
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
return vector{BSON("_id" << dbname)};
});
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, createDatabaseDBExistsDifferentCase) {
const string dbname = "databaseToCreate";
const string dbnameDiffCase = "databasetocreate";
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
distLock()->expectLock([dbname](StringData name,
StringData whyMessage,
Milliseconds waitFor,
Milliseconds lockTryInterval) {},
Status::OK());
auto future = launchAsync([this, dbname] {
Status status = catalogManager()->createDatabase(operationContext(), dbname);
ASSERT_EQUALS(ErrorCodes::DatabaseDifferCase, status);
});
onFindCommand([this, dbname, dbnameDiffCase](const RemoteCommandRequest& request) {
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
BSONObjBuilder queryBuilder;
queryBuilder.appendRegex(
DatabaseType::name(), (string) "^" + pcrecpp::RE::QuoteMeta(dbname) + "$", "i");
ASSERT_EQ(DatabaseType::ConfigNS, query->ns());
ASSERT_EQ(queryBuilder.obj(), query->getFilter());
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
return vector{BSON("_id" << dbnameDiffCase)};
});
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, createDatabaseNoShards) {
const string dbname = "databaseToCreate";
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
distLock()->expectLock([dbname](StringData name,
StringData whyMessage,
Milliseconds waitFor,
Milliseconds lockTryInterval) {},
Status::OK());
auto future = launchAsync([this, dbname] {
Status status = catalogManager()->createDatabase(operationContext(), dbname);
ASSERT_EQUALS(ErrorCodes::ShardNotFound, status);
});
// Report no databases with the same name already exist
onFindCommand([this, dbname](const RemoteCommandRequest& request) {
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(DatabaseType::ConfigNS, nss.ns());
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
return vector{};
});
// Report no shards exist
onFindCommand([this](const RemoteCommandRequest& request) {
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
ASSERT_EQ(ShardType::ConfigNS, query->ns());
ASSERT_EQ(BSONObj(), query->getFilter());
ASSERT_EQ(BSONObj(), query->getSort());
ASSERT_FALSE(query->getLimit().is_initialized());
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
return vector{};
});
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, createDatabaseDuplicateKeyOnInsert) {
const string dbname = "databaseToCreate";
const HostAndPort configHost("TestHost1");
configTargeter()->setFindHostReturnValue(configHost);
ShardType s0;
s0.setName("shard0000");
s0.setHost("ShardHost0:27017");
ShardType s1;
s1.setName("shard0001");
s1.setHost("ShardHost1:27017");
ShardType s2;
s2.setName("shard0002");
s2.setHost("ShardHost2:27017");
// Prime the shard registry with information about the existing shards
auto future = launchAsync([this] { shardRegistry()->reload(operationContext()); });
onFindCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(configHost, request.target);
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
auto query = assertGet(LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false));
ASSERT_EQ(ShardType::ConfigNS, query->ns());
ASSERT_EQ(BSONObj(), query->getFilter());
ASSERT_EQ(BSONObj(), query->getSort());
ASSERT_FALSE(query->getLimit().is_initialized());
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
return vector{s0.toBSON(), s1.toBSON(), s2.toBSON()};
});
future.timed_get(kFutureTimeout);
// Set up all the target mocks return values.
RemoteCommandTargeterMock::get(
shardRegistry()->getShard(operationContext(), s0.getName())->getTargeter())
->setFindHostReturnValue(HostAndPort(s0.getHost()));
RemoteCommandTargeterMock::get(
shardRegistry()->getShard(operationContext(), s1.getName())->getTargeter())
->setFindHostReturnValue(HostAndPort(s1.getHost()));
RemoteCommandTargeterMock::get(
shardRegistry()->getShard(operationContext(), s2.getName())->getTargeter())
->setFindHostReturnValue(HostAndPort(s2.getHost()));
// Now actually start the createDatabase work.
distLock()->expectLock([dbname](StringData name,
StringData whyMessage,
Milliseconds waitFor,
Milliseconds lockTryInterval) {},
Status::OK());
future = launchAsync([this, dbname] {
Status status = catalogManager()->createDatabase(operationContext(), dbname);
ASSERT_EQUALS(ErrorCodes::NamespaceExists, status);
});
// Report no databases with the same name already exist
onFindCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(configHost, request.target);
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(DatabaseType::ConfigNS, nss.ns());
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
return vector{};
});
// Return size information about first shard
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(s0.getHost(), request.target.toString());
ASSERT_EQUALS("admin", request.dbname);
string cmdName = request.cmdObj.firstElement().fieldName();
ASSERT_EQUALS("listDatabases", cmdName);
ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName));
ASSERT_EQUALS(rpc::ServerSelectionMetadata(true, boost::none).toBSON(), request.metadata);
return BSON("ok" << 1 << "totalSize" << 10);
});
// Return size information about second shard
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(s1.getHost(), request.target.toString());
ASSERT_EQUALS("admin", request.dbname);
string cmdName = request.cmdObj.firstElement().fieldName();
ASSERT_EQUALS("listDatabases", cmdName);
ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName));
ASSERT_EQUALS(rpc::ServerSelectionMetadata(true, boost::none).toBSON(), request.metadata);
return BSON("ok" << 1 << "totalSize" << 1);
});
// Return size information about third shard
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(s2.getHost(), request.target.toString());
ASSERT_EQUALS("admin", request.dbname);
string cmdName = request.cmdObj.firstElement().fieldName();
ASSERT_EQUALS("listDatabases", cmdName);
ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName));
ASSERT_EQUALS(rpc::ServerSelectionMetadata(true, boost::none).toBSON(), request.metadata);
return BSON("ok" << 1 << "totalSize" << 100);
});
// Process insert to config.databases collection
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQUALS(configHost, request.target);
ASSERT_EQUALS("config", request.dbname);
ASSERT_FALSE(request.cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName));
ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
BatchedInsertRequest actualBatchedInsert;
std::string errmsg;
ASSERT_TRUE(actualBatchedInsert.parseBSON(request.dbname, request.cmdObj, &errmsg));
ASSERT_EQUALS(DatabaseType::ConfigNS, actualBatchedInsert.getNS().ns());
auto inserts = actualBatchedInsert.getDocuments();
ASSERT_EQUALS(1U, inserts.size());
auto insert = inserts.front();
DatabaseType expectedDb;
expectedDb.setName(dbname);
expectedDb.setPrimary(s1.getName()); // This is the one we reported with the smallest size
expectedDb.setSharded(false);
ASSERT_EQUALS(expectedDb.toBSON(), insert);
BatchedCommandResponse response;
response.setOk(false);
response.setErrCode(ErrorCodes::DuplicateKey);
response.setErrMessage("duplicate key");
return response.toBSON();
});
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, EnableShardingNoDBExists) {
configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
vector shards;
ShardType shard;
shard.setName("shard0");
shard.setHost("shard0:12");
setupShards(vector{shard});
auto shardTargeter = RemoteCommandTargeterMock::get(
shardRegistry()->getShard(operationContext(), "shard0")->getTargeter());
shardTargeter->setFindHostReturnValue(HostAndPort("shard0:12"));
distLock()->expectLock([](StringData name, StringData whyMessage, Milliseconds, Milliseconds) {
ASSERT_EQ("test", name);
ASSERT_FALSE(whyMessage.empty());
}, Status::OK());
auto future = launchAsync([this] {
auto status = catalogManager()->enableSharding(operationContext(), "test");
ASSERT_OK(status);
});
// Query to find if db already exists in config.
onFindCommand([this](const RemoteCommandRequest& request) {
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
ASSERT_EQ(DatabaseType::ConfigNS, nss.toString());
auto queryResult = LiteParsedQuery::makeFromFindCommand(nss, request.cmdObj, false);
ASSERT_OK(queryResult.getStatus());
const auto& query = queryResult.getValue();
BSONObj expectedQuery(fromjson(R"({ _id: { $regex: "^test$", $options: "i" }})"));
ASSERT_EQ(DatabaseType::ConfigNS, query->ns());
ASSERT_EQ(expectedQuery, query->getFilter());
ASSERT_EQ(BSONObj(), query->getSort());
ASSERT_EQ(1, query->getLimit().get());
checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
return vector{};
});
// list databases for checking shard size.
onCommand([](const RemoteCommandRequest& request) {
ASSERT_EQ(HostAndPort("shard0:12"), request.target);
ASSERT_EQ("admin", request.dbname);
ASSERT_EQ(BSON("listDatabases" << 1), request.cmdObj);
ASSERT_EQUALS(rpc::ServerSelectionMetadata(true, boost::none).toBSON(), request.metadata);
return fromjson(R"({
databases: [],
totalSize: 1,
ok: 1
})");
});
onCommand([](const RemoteCommandRequest& request) {
ASSERT_EQ(HostAndPort("config:123"), request.target);
ASSERT_EQ("config", request.dbname);
ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
BSONObj expectedCmd(fromjson(R"({
update: "databases",
updates: [{
q: { _id: "test" },
u: { _id: "test", primary: "shard0", partitioned: true },
multi: false,
upsert: true
}],
writeConcern: { w: "majority", wtimeout: 15000 },
maxTimeMS: 30000
})"));
ASSERT_EQ(expectedCmd, request.cmdObj);
return fromjson(R"({
nModified: 0,
n: 1,
upserted: [
{ _id: "test", primary: "shard0", partitioned: true }
],
ok: 1
})");
});
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, EnableShardingLockBusy) {
configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
distLock()->expectLock([](StringData, StringData, Milliseconds, Milliseconds) {},
{ErrorCodes::LockBusy, "lock taken"});
auto status = catalogManager()->enableSharding(operationContext(), "test");
ASSERT_EQ(ErrorCodes::LockBusy, status.code());
}
TEST_F(CatalogManagerReplSetTest, EnableShardingDBExistsWithDifferentCase) {
configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
vector shards;
ShardType shard;
shard.setName("shard0");
shard.setHost("shard0:12");
setupShards(vector{shard});
distLock()->expectLock([](StringData, StringData, Milliseconds, Milliseconds) {}, Status::OK());
auto future = launchAsync([this] {
auto status = catalogManager()->enableSharding(operationContext(), "test");
ASSERT_EQ(ErrorCodes::DatabaseDifferCase, status.code());
ASSERT_FALSE(status.reason().empty());
});
// Query to find if db already exists in config.
onFindCommand([](const RemoteCommandRequest& request) {
BSONObj existingDoc(fromjson(R"({ _id: "Test", primary: "shard0", partitioned: true })"));
return vector{existingDoc};
});
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, EnableShardingDBExists) {
configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
vector shards;
ShardType shard;
shard.setName("shard0");
shard.setHost("shard0:12");
setupShards(vector{shard});
distLock()->expectLock([](StringData, StringData, Milliseconds, Milliseconds) {}, Status::OK());
auto future = launchAsync([this] {
auto status = catalogManager()->enableSharding(operationContext(), "test");
ASSERT_OK(status);
});
// Query to find if db already exists in config.
onFindCommand([](const RemoteCommandRequest& request) {
BSONObj existingDoc(fromjson(R"({ _id: "test", primary: "shard2", partitioned: false })"));
return vector{existingDoc};
});
onCommand([](const RemoteCommandRequest& request) {
ASSERT_EQ(HostAndPort("config:123"), request.target);
ASSERT_EQ("config", request.dbname);
ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
BSONObj expectedCmd(fromjson(R"({
update: "databases",
updates: [{
q: { _id: "test" },
u: { _id: "test", primary: "shard2", partitioned: true },
multi: false,
upsert: true
}],
writeConcern: { w: "majority", wtimeout: 15000 },
maxTimeMS: 30000
})"));
ASSERT_EQ(expectedCmd, request.cmdObj);
return fromjson(R"({
nModified: 0,
n: 1,
upserted: [
{ _id: "test", primary: "shard2", partitioned: true }
],
ok: 1
})");
});
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, EnableShardingFailsWhenTheDatabaseIsAlreadySharded) {
configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
vector shards;
ShardType shard;
shard.setName("shard0");
shard.setHost("shard0:12");
setupShards(vector{shard});
distLock()->expectLock([](StringData, StringData, Milliseconds, Milliseconds) {}, Status::OK());
auto future = launchAsync([this] {
auto status = catalogManager()->enableSharding(operationContext(), "test");
ASSERT_EQ(status.code(), ErrorCodes::AlreadyInitialized);
});
// Query to find if db already exists in config and it is sharded.
onFindCommand([](const RemoteCommandRequest& request) {
BSONObj existingDoc(fromjson(R"({ _id: "test", primary: "shard2", partitioned: true })"));
return vector{existingDoc};
});
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, EnableShardingDBExistsInvalidFormat) {
configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
vector shards;
ShardType shard;
shard.setName("shard0");
shard.setHost("shard0:12");
setupShards(vector{shard});
distLock()->expectLock([](StringData, StringData, Milliseconds, Milliseconds) {}, Status::OK());
auto future = launchAsync([this] {
auto status = catalogManager()->enableSharding(operationContext(), "test");
ASSERT_EQ(ErrorCodes::TypeMismatch, status.code());
});
// Query to find if db already exists in config.
onFindCommand([](const RemoteCommandRequest& request) {
// Bad type for primary field.
BSONObj existingDoc(fromjson(R"({ _id: "test", primary: 12, partitioned: false })"));
return vector{existingDoc};
});
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, EnableShardingNoDBExistsNoShards) {
configTargeter()->setFindHostReturnValue(HostAndPort("config:123"));
distLock()->expectLock([](StringData, StringData, Milliseconds, Milliseconds) {}, Status::OK());
auto future = launchAsync([this] {
auto status = catalogManager()->enableSharding(operationContext(), "test");
ASSERT_EQ(ErrorCodes::ShardNotFound, status.code());
ASSERT_FALSE(status.reason().empty());
});
// Query to find if db already exists in config.
onFindCommand([](const RemoteCommandRequest& request) { return vector{}; });
// Query for config.shards reload.
onFindCommand([](const RemoteCommandRequest& request) { return vector{}; });
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, BasicReadAfterOpTime) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
OpTime lastOpTime;
for (int x = 0; x < 3; x++) {
auto future = launchAsync([this] {
BSONObjBuilder responseBuilder;
ASSERT_TRUE(getCatalogManagerReplicaSet()->runReadCommandForTest(
operationContext(), "test", BSON("dummy" << 1), &responseBuilder));
});
const OpTime newOpTime(Timestamp(x + 2, x + 6), x + 5);
onCommandWithMetadata([this, &newOpTime, &lastOpTime](const RemoteCommandRequest& request) {
ASSERT_EQUALS("test", request.dbname);
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
checkReadConcern(request.cmdObj, lastOpTime.getTimestamp(), lastOpTime.getTerm());
ReplSetMetadata metadata(10, repl::OpTime(), newOpTime, 100, OID(), 30, -1);
BSONObjBuilder builder;
metadata.writeToMetadata(&builder);
return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1));
});
// Now wait for the runReadCommand call to return
future.timed_get(kFutureTimeout);
lastOpTime = newOpTime;
}
}
TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeShouldNotGoBack) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
// Initialize the internal config OpTime
auto future1 = launchAsync([this] {
BSONObjBuilder responseBuilder;
ASSERT_TRUE(getCatalogManagerReplicaSet()->runReadCommandForTest(
operationContext(), "test", BSON("dummy" << 1), &responseBuilder));
});
OpTime highestOpTime;
const OpTime newOpTime(Timestamp(7, 6), 5);
onCommandWithMetadata([this, &newOpTime, &highestOpTime](const RemoteCommandRequest& request) {
ASSERT_EQUALS("test", request.dbname);
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
ReplSetMetadata metadata(10, repl::OpTime(), newOpTime, 100, OID(), 30, -1);
BSONObjBuilder builder;
metadata.writeToMetadata(&builder);
return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1));
});
future1.timed_get(kFutureTimeout);
highestOpTime = newOpTime;
// Return an older OpTime
auto future2 = launchAsync([this] {
BSONObjBuilder responseBuilder;
ASSERT_TRUE(getCatalogManagerReplicaSet()->runReadCommandForTest(
operationContext(), "test", BSON("dummy" << 1), &responseBuilder));
});
const OpTime oldOpTime(Timestamp(3, 10), 5);
onCommandWithMetadata([this, &oldOpTime, &highestOpTime](const RemoteCommandRequest& request) {
ASSERT_EQUALS("test", request.dbname);
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
ReplSetMetadata metadata(10, repl::OpTime(), oldOpTime, 100, OID(), 30, -1);
BSONObjBuilder builder;
metadata.writeToMetadata(&builder);
return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1));
});
future2.timed_get(kFutureTimeout);
// Check that older OpTime does not override highest OpTime
auto future3 = launchAsync([this] {
BSONObjBuilder responseBuilder;
ASSERT_TRUE(getCatalogManagerReplicaSet()->runReadCommandForTest(
operationContext(), "test", BSON("dummy" << 1), &responseBuilder));
});
onCommandWithMetadata([this, &oldOpTime, &highestOpTime](const RemoteCommandRequest& request) {
ASSERT_EQUALS("test", request.dbname);
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
ReplSetMetadata metadata(10, repl::OpTime(), oldOpTime, 100, OID(), 30, -1);
BSONObjBuilder builder;
metadata.writeToMetadata(&builder);
return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1));
});
future3.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeFindThenCmd) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
auto future1 = launchAsync([this] {
ASSERT_OK(catalogManager()->getDatabase(operationContext(), "TestDB").getStatus());
});
OpTime highestOpTime;
const OpTime newOpTime(Timestamp(7, 6), 5);
onFindWithMetadataCommand(
[this, &newOpTime, &highestOpTime](const RemoteCommandRequest& request) {
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
ReplSetMetadata metadata(10, repl::OpTime(), newOpTime, 100, OID(), 30, -1);
BSONObjBuilder builder;
metadata.writeToMetadata(&builder);
DatabaseType dbType;
dbType.setName("TestDB");
dbType.setPrimary("TestShard");
dbType.setSharded("true");
return std::make_tuple(vector{dbType.toBSON()}, builder.obj());
});
future1.timed_get(kFutureTimeout);
highestOpTime = newOpTime;
// Return an older OpTime
auto future2 = launchAsync([this] {
BSONObjBuilder responseBuilder;
ASSERT_TRUE(getCatalogManagerReplicaSet()->runReadCommandForTest(
operationContext(), "test", BSON("dummy" << 1), &responseBuilder));
});
const OpTime oldOpTime(Timestamp(3, 10), 5);
onCommand([this, &oldOpTime, &highestOpTime](const RemoteCommandRequest& request) {
ASSERT_EQUALS("test", request.dbname);
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
return BSON("ok" << 1);
});
future2.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, ReadAfterOpTimeCmdThenFind) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
// Initialize the internal config OpTime
auto future1 = launchAsync([this] {
BSONObjBuilder responseBuilder;
ASSERT_TRUE(getCatalogManagerReplicaSet()->runReadCommandForTest(
operationContext(), "test", BSON("dummy" << 1), &responseBuilder));
});
OpTime highestOpTime;
const OpTime newOpTime(Timestamp(7, 6), 5);
onCommandWithMetadata([this, &newOpTime, &highestOpTime](const RemoteCommandRequest& request) {
ASSERT_EQUALS("test", request.dbname);
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
ASSERT_EQ(string("dummy"), request.cmdObj.firstElementFieldName());
checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
ReplSetMetadata metadata(10, repl::OpTime(), newOpTime, 100, OID(), 30, -1);
BSONObjBuilder builder;
metadata.writeToMetadata(&builder);
return RemoteCommandResponse(BSON("ok" << 1), builder.obj(), Milliseconds(1));
});
future1.timed_get(kFutureTimeout);
highestOpTime = newOpTime;
// Return an older OpTime
auto future2 = launchAsync([this] {
ASSERT_OK(catalogManager()->getDatabase(operationContext(), "TestDB").getStatus());
});
const OpTime oldOpTime(Timestamp(3, 10), 5);
onFindCommand([this, &oldOpTime, &highestOpTime](const RemoteCommandRequest& request) {
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
ASSERT_EQ(string("find"), request.cmdObj.firstElementFieldName());
checkReadConcern(request.cmdObj, highestOpTime.getTimestamp(), highestOpTime.getTerm());
DatabaseType dbType;
dbType.setName("TestDB");
dbType.setPrimary("TestShard");
dbType.setSharded("true");
return vector{dbType.toBSON()};
});
future2.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, RetryOnReadCommandNetworkErrorFailsAtMaxRetry) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
auto future1 = launchAsync([this] {
BSONObjBuilder responseBuilder;
auto ok = getCatalogManagerReplicaSet()->runReadCommandForTest(
operationContext(), "test", BSON("dummy" << 1), &responseBuilder);
ASSERT_FALSE(ok);
auto status = getStatusFromCommandResult(responseBuilder.obj());
ASSERT_EQ(ErrorCodes::HostUnreachable, status.code());
});
for (int i = 0; i < kMaxCommandRetry; ++i) {
onCommand([](const RemoteCommandRequest&) {
return Status{ErrorCodes::HostUnreachable, "bad host"};
});
}
future1.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, RetryOnReadCommandNetworkErrorSucceedsAtMaxRetry) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
BSONObj expectedResult = BSON("ok" << 1 << "yes"
<< "dummy");
auto future1 = launchAsync([this, expectedResult] {
BSONObjBuilder responseBuilder;
auto ok = getCatalogManagerReplicaSet()->runReadCommandForTest(
operationContext(), "test", BSON("dummy" << 1), &responseBuilder);
ASSERT_TRUE(ok);
auto response = responseBuilder.obj();
ASSERT_EQ(expectedResult, response);
});
for (int i = 0; i < kMaxCommandRetry - 1; ++i) {
onCommand([](const RemoteCommandRequest&) {
return Status{ErrorCodes::HostUnreachable, "bad host"};
});
}
onCommand([expectedResult](const RemoteCommandRequest& request) { return expectedResult; });
future1.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, RetryOnFindCommandNetworkErrorFailsAtMaxRetry) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
auto future = launchAsync([this] {
auto status = catalogManager()->getDatabase(operationContext(), "TestDB");
ASSERT_EQ(ErrorCodes::HostUnreachable, status.getStatus().code());
});
for (int i = 0; i < kMaxCommandRetry; ++i) {
onFindCommand([](const RemoteCommandRequest&) {
return Status{ErrorCodes::HostUnreachable, "bad host"};
});
}
future.timed_get(kFutureTimeout);
}
TEST_F(CatalogManagerReplSetTest, RetryOnFindCommandNetworkErrorSucceedsAtMaxRetry) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
auto future = launchAsync([&] {
ASSERT_OK(catalogManager()->getDatabase(operationContext(), "TestDB").getStatus());
});
for (int i = 0; i < kMaxCommandRetry - 1; ++i) {
onFindCommand([](const RemoteCommandRequest&) {
return Status{ErrorCodes::HostUnreachable, "bad host"};
});
}
onFindCommand([](const RemoteCommandRequest& request) {
DatabaseType dbType;
dbType.setName("TestDB");
dbType.setPrimary("TestShard");
dbType.setSharded("true");
return vector{dbType.toBSON()};
});
future.timed_get(kFutureTimeout);
}
} // namespace
} // namespace mongo