/**
* Copyright (C) 2017 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
#include "mongo/platform/basic.h"
#include "mongo/db/query/query_request.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_database.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/catalog_cache_test_fixture.h"
#include "mongo/s/database_version_helpers.h"
namespace mongo {
namespace {
using executor::RemoteCommandRequest;
using unittest::assertGet;
const NamespaceString kNss("TestDB", "TestColl");
class CatalogCacheRefreshTest : public CatalogCacheTestFixture {
protected:
void setUp() override {
CatalogCacheTestFixture::setUp();
setupNShards(2);
}
void expectGetDatabase() {
expectFindSendBSONObjVector(kConfigHostAndPort, [&]() {
DatabaseType db(kNss.db().toString(), {"0"}, true, databaseVersion::makeNew());
return std::vector{db.toBSON()};
}());
}
void expectGetCollection(OID epoch, const ShardKeyPattern& shardKeyPattern) {
expectFindSendBSONObjVector(kConfigHostAndPort, [&]() {
CollectionType collType;
collType.setNs(kNss);
collType.setEpoch(epoch);
collType.setKeyPattern(shardKeyPattern.toBSON());
collType.setUnique(false);
return std::vector{collType.toBSON()};
}());
}
};
TEST_F(CatalogCacheRefreshTest, FullLoad) {
const OID epoch = OID::gen();
const ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
auto future = scheduleRoutingInfoRefresh(kNss);
expectGetDatabase();
expectGetCollection(epoch, shardKeyPattern);
expectGetCollection(epoch, shardKeyPattern);
expectFindSendBSONObjVector(kConfigHostAndPort, [&]() {
ChunkVersion version(1, 0, epoch);
ChunkType chunk1(kNss,
{shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << -100)},
version,
{"0"});
version.incMinor();
ChunkType chunk2(kNss, {BSON("_id" << -100), BSON("_id" << 0)}, version, {"1"});
version.incMinor();
ChunkType chunk3(kNss, {BSON("_id" << 0), BSON("_id" << 100)}, version, {"0"});
version.incMinor();
ChunkType chunk4(kNss,
{BSON("_id" << 100), shardKeyPattern.getKeyPattern().globalMax()},
version,
{"1"});
version.incMinor();
return std::vector{chunk1.toConfigBSON(),
chunk2.toConfigBSON(),
chunk3.toConfigBSON(),
chunk4.toConfigBSON()};
}());
auto routingInfo = future.timed_get(kFutureTimeout);
ASSERT(routingInfo->cm());
auto cm = routingInfo->cm();
ASSERT_EQ(4, cm->numChunks());
}
TEST_F(CatalogCacheRefreshTest, DatabaseNotFound) {
auto future = scheduleRoutingInfoRefresh(kNss);
// Return an empty database (need to return it twice because for missing databases, the
// CatalogClient tries twice)
expectFindSendBSONObjVector(kConfigHostAndPort, {});
expectFindSendBSONObjVector(kConfigHostAndPort, {});
try {
auto routingInfo = future.timed_get(kFutureTimeout);
auto cm = routingInfo->cm();
auto primary = routingInfo->db().primary();
FAIL(str::stream() << "Returning no database did not fail and returned "
<< (cm ? cm->toString() : routingInfo->db().primaryId().toString()));
} catch (const DBException& ex) {
ASSERT_EQ(ErrorCodes::NamespaceNotFound, ex.code());
}
}
TEST_F(CatalogCacheRefreshTest, DatabaseBSONCorrupted) {
auto future = scheduleRoutingInfoRefresh(kNss);
// Return a corrupted database entry
expectFindSendBSONObjVector(kConfigHostAndPort,
{BSON(
"BadValue"
<< "This value should not be in a database config document")});
try {
auto routingInfo = future.timed_get(kFutureTimeout);
auto cm = routingInfo->cm();
auto primary = routingInfo->db().primary();
FAIL(str::stream() << "Returning corrupted database entry did not fail and returned "
<< (cm ? cm->toString() : routingInfo->db().primaryId().toString()));
} catch (const DBException& ex) {
ASSERT_EQ(ErrorCodes::NoSuchKey, ex.code());
}
}
TEST_F(CatalogCacheRefreshTest, CollectionNotFound) {
auto future = scheduleRoutingInfoRefresh(kNss);
expectGetDatabase();
// Return an empty collection
expectFindSendBSONObjVector(kConfigHostAndPort, {});
auto routingInfo = future.timed_get(kFutureTimeout);
ASSERT(!routingInfo->cm());
ASSERT(routingInfo->db().primary());
ASSERT_EQ(ShardId{"0"}, routingInfo->db().primaryId());
}
TEST_F(CatalogCacheRefreshTest, CollectionBSONCorrupted) {
auto future = scheduleRoutingInfoRefresh(kNss);
expectGetDatabase();
// Return a corrupted collection entry
expectFindSendBSONObjVector(
kConfigHostAndPort,
{BSON("BadValue"
<< "This value should not be in a collection config document")});
try {
auto routingInfo = future.timed_get(kFutureTimeout);
auto cm = routingInfo->cm();
auto primary = routingInfo->db().primary();
FAIL(str::stream() << "Returning corrupted collection entry did not fail and returned "
<< (cm ? cm->toString() : routingInfo->db().primaryId().toString()));
} catch (const DBException& ex) {
ASSERT_EQ(ErrorCodes::FailedToParse, ex.code());
}
}
TEST_F(CatalogCacheRefreshTest, NoChunksFoundForCollection) {
const OID epoch = OID::gen();
const ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
auto future = scheduleRoutingInfoRefresh(kNss);
expectGetDatabase();
expectGetCollection(epoch, shardKeyPattern);
// Return no chunks three times, which is how frequently the catalog cache retries
expectGetCollection(epoch, shardKeyPattern);
expectFindSendBSONObjVector(kConfigHostAndPort, {});
expectGetCollection(epoch, shardKeyPattern);
expectFindSendBSONObjVector(kConfigHostAndPort, {});
expectGetCollection(epoch, shardKeyPattern);
expectFindSendBSONObjVector(kConfigHostAndPort, {});
try {
auto routingInfo = future.timed_get(kFutureTimeout);
auto cm = routingInfo->cm();
auto primary = routingInfo->db().primary();
FAIL(str::stream() << "Returning no chunks for collection did not fail and returned "
<< (cm ? cm->toString() : routingInfo->db().primaryId().toString()));
} catch (const DBException& ex) {
ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, ex.code());
}
}
TEST_F(CatalogCacheRefreshTest, ChunksBSONCorrupted) {
const OID epoch = OID::gen();
const ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
auto future = scheduleRoutingInfoRefresh(kNss);
expectGetDatabase();
expectGetCollection(epoch, shardKeyPattern);
// Return no chunks three times, which is how frequently the catalog cache retries
expectGetCollection(epoch, shardKeyPattern);
expectFindSendBSONObjVector(kConfigHostAndPort, [&] {
return std::vector{ChunkType(kNss,
{shardKeyPattern.getKeyPattern().globalMin(),
BSON("_id" << 0)},
ChunkVersion(1, 0, epoch),
{"0"})
.toConfigBSON(),
BSON("BadValue"
<< "This value should not be in a chunk config document")};
}());
try {
auto routingInfo = future.timed_get(kFutureTimeout);
auto cm = routingInfo->cm();
auto primary = routingInfo->db().primary();
FAIL(str::stream() << "Returning no chunks for collection did not fail and returned "
<< (cm ? cm->toString() : routingInfo->db().primaryId().toString()));
} catch (const DBException& ex) {
ASSERT_EQ(ErrorCodes::NoSuchKey, ex.code());
}
}
TEST_F(CatalogCacheRefreshTest, IncompleteChunksFoundForCollection) {
const OID epoch = OID::gen();
const ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
auto future = scheduleRoutingInfoRefresh(kNss);
expectGetDatabase();
expectGetCollection(epoch, shardKeyPattern);
const auto incompleteChunks = [&]() {
ChunkVersion version(1, 0, epoch);
// Chunk from (MinKey, -100) is missing (as if someone is dropping the collection
// concurrently)
version.incMinor();
ChunkType chunk2(kNss, {BSON("_id" << -100), BSON("_id" << 0)}, version, {"1"});
version.incMinor();
ChunkType chunk3(kNss, {BSON("_id" << 0), BSON("_id" << 100)}, version, {"0"});
version.incMinor();
ChunkType chunk4(kNss,
{BSON("_id" << 100), shardKeyPattern.getKeyPattern().globalMax()},
version,
{"1"});
version.incMinor();
return std::vector{
chunk2.toConfigBSON(), chunk3.toConfigBSON(), chunk4.toConfigBSON()};
}();
// Return incomplete set of chunks three times, which is how frequently the catalog cache
// retries
expectGetCollection(epoch, shardKeyPattern);
expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks);
expectGetCollection(epoch, shardKeyPattern);
expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks);
expectGetCollection(epoch, shardKeyPattern);
expectFindSendBSONObjVector(kConfigHostAndPort, incompleteChunks);
try {
auto routingInfo = future.timed_get(kFutureTimeout);
auto cm = routingInfo->cm();
auto primary = routingInfo->db().primary();
FAIL(
str::stream() << "Returning incomplete chunks for collection did not fail and returned "
<< (cm ? cm->toString() : routingInfo->db().primaryId().toString()));
} catch (const DBException& ex) {
ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, ex.code());
}
}
TEST_F(CatalogCacheRefreshTest, ChunkEpochChangeDuringIncrementalLoad) {
const ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
auto initialRoutingInfo(makeChunkManager(kNss, shardKeyPattern, nullptr, true, {}));
ASSERT_EQ(1, initialRoutingInfo->numChunks());
auto future = scheduleRoutingInfoRefresh(kNss);
ChunkVersion version = initialRoutingInfo->getVersion();
const auto inconsistentChunks = [&]() {
version.incMajor();
ChunkType chunk1(
kNss, {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)}, version, {"0"});
ChunkType chunk2(kNss,
{BSON("_id" << 0), shardKeyPattern.getKeyPattern().globalMax()},
ChunkVersion(1, 0, OID::gen()),
{"1"});
return std::vector{chunk1.toConfigBSON(), chunk2.toConfigBSON()};
}();
// Return set of chunks, one of which has different epoch. Do it three times, which is how
// frequently the catalog cache retries.
expectGetCollection(initialRoutingInfo->getVersion().epoch(), shardKeyPattern);
expectFindSendBSONObjVector(kConfigHostAndPort, inconsistentChunks);
expectGetCollection(initialRoutingInfo->getVersion().epoch(), shardKeyPattern);
expectFindSendBSONObjVector(kConfigHostAndPort, inconsistentChunks);
expectGetCollection(initialRoutingInfo->getVersion().epoch(), shardKeyPattern);
expectFindSendBSONObjVector(kConfigHostAndPort, inconsistentChunks);
try {
auto routingInfo = future.timed_get(kFutureTimeout);
auto cm = routingInfo->cm();
auto primary = routingInfo->db().primary();
FAIL(str::stream()
<< "Returning chunks with different epoch for collection did not fail and returned "
<< (cm ? cm->toString() : routingInfo->db().primaryId().toString()));
} catch (const DBException& ex) {
ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, ex.code());
}
}
TEST_F(CatalogCacheRefreshTest, ChunkEpochChangeDuringIncrementalLoadRecoveryAfterRetry) {
const ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
auto initialRoutingInfo(makeChunkManager(kNss, shardKeyPattern, nullptr, true, {}));
ASSERT_EQ(1, initialRoutingInfo->numChunks());
setupNShards(2);
auto future = scheduleRoutingInfoRefresh(kNss);
ChunkVersion oldVersion = initialRoutingInfo->getVersion();
const OID newEpoch = OID::gen();
// On the first attempt, return set of chunks, one of which has different epoch. This simulates
// the situation where a collection existed with epoch0, we started a refresh for that
// collection, the cursor yielded and while it yielded another node dropped the collection and
// recreated it with different epoch and chunks.
expectGetCollection(oldVersion.epoch(), shardKeyPattern);
onFindCommand([&](const RemoteCommandRequest& request) {
const auto diffQuery =
assertGet(QueryRequest::makeFromFindCommand(kNss, request.cmdObj, false));
ASSERT_BSONOBJ_EQ(BSON("ns" << kNss.ns() << "lastmod"
<< BSON("$gte" << Timestamp(oldVersion.majorVersion(),
oldVersion.minorVersion()))),
diffQuery->getFilter());
oldVersion.incMajor();
ChunkType chunk1(kNss,
{shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)},
oldVersion,
{"0"});
// "Yield" happens here with drop and recreate in between. This is the "last" chunk from the
// recreated collection.
ChunkType chunk3(kNss,
{BSON("_id" << 100), shardKeyPattern.getKeyPattern().globalMax()},
ChunkVersion(5, 2, newEpoch),
{"1"});
return std::vector{chunk1.toConfigBSON(), chunk3.toConfigBSON()};
});
// On the second retry attempt, return the correct set of chunks from the recreated collection
expectGetCollection(newEpoch, shardKeyPattern);
ChunkVersion newVersion(5, 0, newEpoch);
onFindCommand([&](const RemoteCommandRequest& request) {
// Ensure it is a differential query but starting from version zero (to fetch all the
// chunks) since the incremental refresh above produced a different version
const auto diffQuery =
assertGet(QueryRequest::makeFromFindCommand(kNss, request.cmdObj, false));
ASSERT_BSONOBJ_EQ(BSON("ns" << kNss.ns() << "lastmod" << BSON("$gte" << Timestamp(0, 0))),
diffQuery->getFilter());
ChunkType chunk1(kNss,
{shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)},
newVersion,
{"0"});
newVersion.incMinor();
ChunkType chunk2(kNss, {BSON("_id" << 0), BSON("_id" << 100)}, newVersion, {"0"});
newVersion.incMinor();
ChunkType chunk3(kNss,
{BSON("_id" << 100), shardKeyPattern.getKeyPattern().globalMax()},
newVersion,
{"1"});
return std::vector{
chunk1.toConfigBSON(), chunk2.toConfigBSON(), chunk3.toConfigBSON()};
});
auto routingInfo = future.timed_get(kFutureTimeout);
ASSERT(routingInfo->cm());
auto cm = routingInfo->cm();
ASSERT_EQ(3, cm->numChunks());
ASSERT_EQ(newVersion, cm->getVersion());
ASSERT_EQ(ChunkVersion(5, 1, newVersion.epoch()), cm->getVersion({"0"}));
ASSERT_EQ(ChunkVersion(5, 2, newVersion.epoch()), cm->getVersion({"1"}));
}
TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterCollectionEpochChange) {
const ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
auto initialRoutingInfo(makeChunkManager(kNss, shardKeyPattern, nullptr, true, {}));
ASSERT_EQ(1, initialRoutingInfo->numChunks());
setupNShards(2);
auto future = scheduleRoutingInfoRefresh(kNss);
ChunkVersion newVersion(1, 0, OID::gen());
// Return collection with a different epoch
expectGetCollection(newVersion.epoch(), shardKeyPattern);
// Return set of chunks, which represent a split
onFindCommand([&](const RemoteCommandRequest& request) {
// Ensure it is a differential query but starting from version zero
const auto diffQuery =
assertGet(QueryRequest::makeFromFindCommand(kNss, request.cmdObj, false));
ASSERT_BSONOBJ_EQ(BSON("ns" << kNss.ns() << "lastmod" << BSON("$gte" << Timestamp(0, 0))),
diffQuery->getFilter());
ChunkType chunk1(kNss,
{shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)},
newVersion,
{"0"});
newVersion.incMinor();
ChunkType chunk2(kNss,
{BSON("_id" << 0), shardKeyPattern.getKeyPattern().globalMax()},
newVersion,
{"1"});
return std::vector{chunk1.toConfigBSON(), chunk2.toConfigBSON()};
});
auto routingInfo = future.timed_get(kFutureTimeout);
ASSERT(routingInfo->cm());
auto cm = routingInfo->cm();
ASSERT_EQ(2, cm->numChunks());
ASSERT_EQ(newVersion, cm->getVersion());
ASSERT_EQ(ChunkVersion(1, 0, newVersion.epoch()), cm->getVersion({"0"}));
ASSERT_EQ(ChunkVersion(1, 1, newVersion.epoch()), cm->getVersion({"1"}));
}
TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterSplit) {
const ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
auto initialRoutingInfo(makeChunkManager(kNss, shardKeyPattern, nullptr, true, {}));
ASSERT_EQ(1, initialRoutingInfo->numChunks());
ChunkVersion version = initialRoutingInfo->getVersion();
auto future = scheduleRoutingInfoRefresh(kNss);
expectGetCollection(version.epoch(), shardKeyPattern);
// Return set of chunks, which represent a split
onFindCommand([&](const RemoteCommandRequest& request) {
// Ensure it is a differential query
const auto diffQuery =
assertGet(QueryRequest::makeFromFindCommand(kNss, request.cmdObj, false));
ASSERT_BSONOBJ_EQ(
BSON("ns" << kNss.ns() << "lastmod"
<< BSON("$gte" << Timestamp(version.majorVersion(), version.minorVersion()))),
diffQuery->getFilter());
version.incMajor();
ChunkType chunk1(
kNss, {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)}, version, {"0"});
version.incMinor();
ChunkType chunk2(
kNss, {BSON("_id" << 0), shardKeyPattern.getKeyPattern().globalMax()}, version, {"0"});
return std::vector{chunk1.toConfigBSON(), chunk2.toConfigBSON()};
});
auto routingInfo = future.timed_get(kFutureTimeout);
ASSERT(routingInfo->cm());
auto cm = routingInfo->cm();
ASSERT_EQ(2, cm->numChunks());
ASSERT_EQ(version, cm->getVersion());
ASSERT_EQ(version, cm->getVersion({"0"}));
ASSERT_EQ(ChunkVersion(0, 0, version.epoch()), cm->getVersion({"1"}));
}
TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMove) {
const ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
auto initialRoutingInfo(
makeChunkManager(kNss, shardKeyPattern, nullptr, true, {BSON("_id" << 0)}));
ASSERT_EQ(2, initialRoutingInfo->numChunks());
ChunkVersion version = initialRoutingInfo->getVersion();
auto future = scheduleRoutingInfoRefresh(kNss);
ChunkVersion expectedDestShardVersion;
expectGetCollection(version.epoch(), shardKeyPattern);
// Return set of chunks, which represent a move
expectFindSendBSONObjVector(kConfigHostAndPort, [&]() {
version.incMajor();
expectedDestShardVersion = version;
ChunkType chunk1(
kNss, {shardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << 0)}, version, {"1"});
version.incMinor();
ChunkType chunk2(
kNss, {BSON("_id" << 0), shardKeyPattern.getKeyPattern().globalMax()}, version, {"0"});
return std::vector{chunk1.toConfigBSON(), chunk2.toConfigBSON()};
}());
auto routingInfo = future.timed_get(kFutureTimeout);
ASSERT(routingInfo->cm());
auto cm = routingInfo->cm();
ASSERT_EQ(2, cm->numChunks());
ASSERT_EQ(version, cm->getVersion());
ASSERT_EQ(version, cm->getVersion({"0"}));
ASSERT_EQ(expectedDestShardVersion, cm->getVersion({"1"}));
}
TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMoveLastChunk) {
const ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
auto initialRoutingInfo(makeChunkManager(kNss, shardKeyPattern, nullptr, true, {}));
ASSERT_EQ(1, initialRoutingInfo->numChunks());
setupNShards(2);
ChunkVersion version = initialRoutingInfo->getVersion();
auto future = scheduleRoutingInfoRefresh(kNss);
expectGetCollection(version.epoch(), shardKeyPattern);
// Return set of chunks, which represent a move
expectFindSendBSONObjVector(kConfigHostAndPort, [&]() {
version.incMajor();
ChunkType chunk1(kNss,
{shardKeyPattern.getKeyPattern().globalMin(),
shardKeyPattern.getKeyPattern().globalMax()},
version,
{"1"});
return std::vector{chunk1.toConfigBSON()};
}());
auto routingInfo = future.timed_get(kFutureTimeout);
ASSERT(routingInfo->cm());
auto cm = routingInfo->cm();
ASSERT_EQ(1, cm->numChunks());
ASSERT_EQ(version, cm->getVersion());
ASSERT_EQ(ChunkVersion(0, 0, version.epoch()), cm->getVersion({"0"}));
ASSERT_EQ(version, cm->getVersion({"1"}));
}
} // namespace
} // namespace mongo