summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorMatthew Saltz <matthew.saltz@mongodb.com>2019-07-03 10:41:04 -0400
committerMatthew Saltz <matthew.saltz@mongodb.com>2019-08-13 16:14:40 -0400
commit6c6f2be0cba78f4e75400d6cd1c0acbd793a7edb (patch)
tree90c351e9ec111b7aab43debae70839f6a4d8082f /src/mongo/db/s
parent45df410133eeb9c4fb1e9f0920a94e861f8c4585 (diff)
downloadmongo-6c6f2be0cba78f4e75400d6cd1c0acbd793a7edb.tar.gz
SERVER-41480 Increment major version on splits where the shard version equals the collection version
(cherry picked from commit 790609ffb6bc1dac0a3c13d1898d5884e21a1b7a)
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp127
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp151
2 files changed, 213 insertions, 65 deletions
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
index 8b2c4b333d2..bd09a5cf5bd 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
@@ -271,6 +271,24 @@ boost::optional<ChunkType> getControlChunkForMigrate(OperationContext* opCtx,
return uassertStatusOK(ChunkType::fromConfigBSON(response.docs.front()));
}
+// Helper function to find collection version and shard version.
+StatusWith<ChunkVersion> getMaxChunkVersionFromQueryResponse(
+ const NamespaceString& nss, const StatusWith<Shard::QueryResponse>& queryResponse) {
+
+ if (!queryResponse.isOK()) {
+ return queryResponse.getStatus();
+ }
+
+ const auto& chunksVector = queryResponse.getValue().docs;
+ if (chunksVector.empty()) {
+ return {ErrorCodes::IllegalOperation,
+ str::stream() << "Collection '" << nss.ns()
+ << "' no longer either exists, is sharded, or has chunks"};
+ }
+
+ return ChunkVersion::parseLegacyWithField(chunksVector.front(), ChunkType::lastmod());
+}
+
} // namespace
Status ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx,
@@ -285,42 +303,58 @@ Status ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx,
// move chunks on different collections to proceed in parallel
Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock);
- std::string errmsg;
-
// Get the max chunk version for this namespace.
- auto findStatus = Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
- opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- repl::ReadConcernLevel::kLocalReadConcern,
- ChunkType::ConfigNS,
- BSON("ns" << nss.ns()),
- BSON(ChunkType::lastmod << -1),
- 1);
-
- if (!findStatus.isOK()) {
- return findStatus.getStatus();
- }
+ auto swCollVersion = getMaxChunkVersionFromQueryResponse(
+ nss,
+ Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ ChunkType::ConfigNS,
+ BSON("ns" << nss.ns()), // Query all chunks for this namespace.
+ BSON(ChunkType::lastmod << -1), // Sort by version.
+ 1)); // Limit 1.
- const auto& chunksVector = findStatus.getValue().docs;
- if (chunksVector.empty()) {
- errmsg = str::stream() << "splitChunk cannot split chunk " << range.toString()
- << ". Collection '" << nss.ns()
- << "' no longer either exists, is sharded, or has chunks";
- return {ErrorCodes::IllegalOperation, errmsg};
+ if (!swCollVersion.isOK()) {
+ return swCollVersion.getStatus().withContext(
+ str::stream() << "splitChunk cannot split chunk " << range.toString() << ".");
}
- ChunkVersion collVersion = uassertStatusOK(
- ChunkVersion::parseLegacyWithField(chunksVector.front(), ChunkType::lastmod()));
+ auto collVersion = swCollVersion.getValue();
// Return an error if collection epoch does not match epoch of request.
if (collVersion.epoch() != requestEpoch) {
- errmsg = str::stream() << "splitChunk cannot split chunk " << range.toString()
- << ". Collection '" << nss.ns() << "' was dropped and re-created."
- << " Current epoch: " << collVersion.epoch()
- << ", cmd epoch: " << requestEpoch;
- return {ErrorCodes::StaleEpoch, errmsg};
+ return {ErrorCodes::StaleEpoch,
+ str::stream() << "splitChunk cannot split chunk " << range.toString()
+ << ". Collection '"
+ << nss.ns()
+ << "' was dropped and re-created."
+ << " Current epoch: "
+ << collVersion.epoch()
+ << ", cmd epoch: "
+ << requestEpoch};
}
+ // Get the shard version (max chunk version) for the shard requesting the split.
+ auto swShardVersion = getMaxChunkVersionFromQueryResponse(
+ nss,
+ Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ ChunkType::ConfigNS,
+ BSON("ns" << nss.ns() << "shard"
+ << shardName), // Query all chunks for this namespace and shard.
+ BSON(ChunkType::lastmod << -1), // Sort by version.
+ 1)); // Limit 1.
+
+ if (!swShardVersion.isOK()) {
+ return swShardVersion.getStatus().withContext(
+ str::stream() << "splitChunk cannot split chunk " << range.toString() << ".");
+ }
+
+ auto shardVersion = swShardVersion.getValue();
+
// Find the chunk history.
const auto origChunk = _findChunkOnConfig(opCtx, nss, range.getMin());
if (!origChunk.isOK()) {
@@ -330,6 +364,11 @@ Status ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx,
std::vector<ChunkType> newChunks;
ChunkVersion currentMaxVersion = collVersion;
+ // Increment the major version only if the shard that owns the chunk being split has version ==
+ // collection version. See SERVER-41480 for details.
+ if (shardVersion == collVersion) {
+ currentMaxVersion.incMajor();
+ }
auto startKey = range.getMin();
auto newChunkBounds(splitPoints);
@@ -513,27 +552,25 @@ Status ShardingCatalogManager::commitChunkMerge(OperationContext* opCtx,
!validAfter) {
return {ErrorCodes::IllegalOperation, "chunk operation requires validAfter timestamp"};
}
- // Get the chunk with the highest version for this namespace
- auto findStatus = Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
- opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- repl::ReadConcernLevel::kLocalReadConcern,
- ChunkType::ConfigNS,
- BSON("ns" << nss.ns()),
- BSON(ChunkType::lastmod << -1),
- 1);
- if (!findStatus.isOK()) {
- return findStatus.getStatus();
- }
+ // Get the max chunk version for this namespace.
+ auto swCollVersion = getMaxChunkVersionFromQueryResponse(
+ nss,
+ Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ ChunkType::ConfigNS,
+ BSON("ns" << nss.ns()), // Query all chunks for this namespace.
+ BSON(ChunkType::lastmod << -1), // Sort by version.
+ 1)); // Limit 1.
- const auto& chunksVector = findStatus.getValue().docs;
- if (chunksVector.empty())
- return {ErrorCodes::IllegalOperation,
- "collection does not exist, isn't sharded, or has no chunks"};
+ if (!swCollVersion.isOK()) {
+ return swCollVersion.getStatus().withContext(str::stream()
+ << "mergeChunk cannot merge chunks.");
+ }
- ChunkVersion collVersion = uassertStatusOK(
- ChunkVersion::parseLegacyWithField(chunksVector.front(), ChunkType::lastmod()));
+ auto collVersion = swCollVersion.getValue();
// Return an error if epoch of chunk does not match epoch of request
if (collVersion.epoch() != requestEpoch) {
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp
index 1baa43be2d4..0140245652c 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp
@@ -78,9 +78,9 @@ TEST_F(SplitChunkTest, SplitExistingChunkCorrectlyShouldSucceed) {
auto chunkDoc = chunkDocStatus.getValue();
ASSERT_BSONOBJ_EQ(chunkSplitPoint, chunkDoc.getMax());
- // Check for increment on first chunkDoc's minor version
- ASSERT_EQ(origVersion.majorVersion(), chunkDoc.getVersion().majorVersion());
- ASSERT_EQ(origVersion.minorVersion() + 1, chunkDoc.getVersion().minorVersion());
+ // Check for increment on first chunkDoc's major version.
+ ASSERT_EQ(origVersion.majorVersion() + 1, chunkDoc.getVersion().majorVersion());
+ ASSERT_EQ(1u, chunkDoc.getVersion().minorVersion());
// Make sure the history is there
ASSERT_EQ(2UL, chunkDoc.getHistory().size());
@@ -92,9 +92,9 @@ TEST_F(SplitChunkTest, SplitExistingChunkCorrectlyShouldSucceed) {
auto otherChunkDoc = otherChunkDocStatus.getValue();
ASSERT_BSONOBJ_EQ(chunkMax, otherChunkDoc.getMax());
- // Check for increment on second chunkDoc's minor version
- ASSERT_EQ(origVersion.majorVersion(), otherChunkDoc.getVersion().majorVersion());
- ASSERT_EQ(origVersion.minorVersion() + 2, otherChunkDoc.getVersion().minorVersion());
+ // Check for increment on second chunkDoc's minor version.
+ ASSERT_EQ(origVersion.majorVersion() + 1, otherChunkDoc.getVersion().majorVersion());
+ ASSERT_EQ(2u, otherChunkDoc.getVersion().minorVersion());
// Make sure the history is there
ASSERT_EQ(2UL, otherChunkDoc.getHistory().size());
@@ -139,9 +139,9 @@ TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) {
auto chunkDoc = chunkDocStatus.getValue();
ASSERT_BSONOBJ_EQ(chunkSplitPoint, chunkDoc.getMax());
- // Check for increment on first chunkDoc's minor version
- ASSERT_EQ(origVersion.majorVersion(), chunkDoc.getVersion().majorVersion());
- ASSERT_EQ(origVersion.minorVersion() + 1, chunkDoc.getVersion().minorVersion());
+ // Check for increment on first chunkDoc's major version.
+ ASSERT_EQ(origVersion.majorVersion() + 1, chunkDoc.getVersion().majorVersion());
+ ASSERT_EQ(1u, chunkDoc.getVersion().minorVersion());
// Make sure the history is there
ASSERT_EQ(2UL, chunkDoc.getHistory().size());
@@ -153,9 +153,9 @@ TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) {
auto midChunkDoc = midChunkDocStatus.getValue();
ASSERT_BSONOBJ_EQ(chunkSplitPoint2, midChunkDoc.getMax());
- // Check for increment on second chunkDoc's minor version
- ASSERT_EQ(origVersion.majorVersion(), midChunkDoc.getVersion().majorVersion());
- ASSERT_EQ(origVersion.minorVersion() + 2, midChunkDoc.getVersion().minorVersion());
+ // Check for increment on second chunkDoc's minor version.
+ ASSERT_EQ(origVersion.majorVersion() + 1, midChunkDoc.getVersion().majorVersion());
+ ASSERT_EQ(2u, midChunkDoc.getVersion().minorVersion());
// Make sure the history is there
ASSERT_EQ(2UL, midChunkDoc.getHistory().size());
@@ -167,9 +167,9 @@ TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) {
auto lastChunkDoc = lastChunkDocStatus.getValue();
ASSERT_BSONOBJ_EQ(chunkMax, lastChunkDoc.getMax());
- // Check for increment on third chunkDoc's minor version
- ASSERT_EQ(origVersion.majorVersion(), lastChunkDoc.getVersion().majorVersion());
- ASSERT_EQ(origVersion.minorVersion() + 3, lastChunkDoc.getVersion().minorVersion());
+ // Check for increment on third chunkDoc's minor version.
+ ASSERT_EQ(origVersion.majorVersion() + 1, lastChunkDoc.getVersion().majorVersion());
+ ASSERT_EQ(3u, lastChunkDoc.getVersion().minorVersion());
// Make sure the history is there
ASSERT_EQ(2UL, lastChunkDoc.getHistory().size());
@@ -223,9 +223,12 @@ TEST_F(SplitChunkTest, NewSplitShouldClaimHighestVersion) {
auto chunkDoc = chunkDocStatus.getValue();
ASSERT_BSONOBJ_EQ(chunkSplitPoint, chunkDoc.getMax());
- // Check for increment based on the competing chunk version
- ASSERT_EQ(competingVersion.majorVersion(), chunkDoc.getVersion().majorVersion());
- ASSERT_EQ(competingVersion.minorVersion() + 1, chunkDoc.getVersion().minorVersion());
+ // Check for major version increment based on the competing chunk version.
+ ASSERT_EQ(competingVersion.majorVersion() + 1, chunkDoc.getVersion().majorVersion());
+ // The minor version gets reset to 0 when the major version is incremented, and chunk splits
+ // increment the minor version after incrementing the major version, so we expect the minor
+ // version here to be 0 + 1 = 1.
+ ASSERT_EQ(1u, chunkDoc.getVersion().minorVersion());
// Second chunkDoc should have range [chunkSplitPoint, chunkMax]
auto otherChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint);
@@ -234,9 +237,117 @@ TEST_F(SplitChunkTest, NewSplitShouldClaimHighestVersion) {
auto otherChunkDoc = otherChunkDocStatus.getValue();
ASSERT_BSONOBJ_EQ(chunkMax, otherChunkDoc.getMax());
- // Check for increment based on the competing chunk version
+ // Check for increment based on the competing chunk version.
+ ASSERT_EQ(competingVersion.majorVersion() + 1, otherChunkDoc.getVersion().majorVersion());
+ // The minor version gets reset to 0 when the major version is incremented, and chunk splits
+ // increment the minor version after incrementing the major version for the first chunk in the
+ // split vector, so we expect the minor version here to be 0 + 1 + 1 = 2.
+ ASSERT_EQ(2u, otherChunkDoc.getVersion().minorVersion());
+}
+
+TEST_F(SplitChunkTest, SplitsOnShardWithLowerShardVersionDoesNotIncreaseCollectionVersion) {
+ ChunkType chunk, chunk2;
+ chunk.setNS(kNamespace);
+ chunk2.setNS(kNamespace);
+ auto collEpoch = OID::gen();
+
+ // Set up first chunk with lower version on shard0001. Its shard will not have shard version ==
+ // collection version, so splits to it should give it the collection version plus a minor
+ // version bump.
+ auto origVersion = ChunkVersion(1, 2, collEpoch);
+ chunk.setVersion(origVersion);
+ chunk.setShard(ShardId("shard0000"));
+ chunk.setMin(BSON("a" << 1));
+ chunk.setMax(BSON("a" << 10));
+
+ // Set up second chunk (chunk2) on shard0001. This has the higher version.
+ auto competingVersion = ChunkVersion(2, 1, collEpoch);
+ chunk2.setVersion(competingVersion);
+ chunk2.setShard(ShardId("shard0001"));
+ chunk2.setMin(BSON("a" << 10));
+ chunk2.setMax(BSON("a" << 20));
+
+ setupChunks({chunk, chunk2});
+
+ std::vector<BSONObj> splitPoints;
+ auto chunkSplitPoint = BSON("a" << 5);
+ splitPoints.push_back(chunkSplitPoint);
+
+ ASSERT_OK(ShardingCatalogManager::get(operationContext())
+ ->commitChunkSplit(operationContext(),
+ kNamespace,
+ collEpoch,
+ ChunkRange(chunk.getMin(), chunk.getMax()),
+ splitPoints,
+ chunk.getShard().toString()));
+
+ // First chunkDoc should have range [chunk.getMin(), chunkSplitPoint]
+ auto chunkDocStatus = getChunkDoc(operationContext(), chunk.getMin());
+ auto chunkDoc = chunkDocStatus.getValue();
+
+ // Check for major version increment based on the competing chunk version.
+ ASSERT_EQ(competingVersion.majorVersion(), chunkDoc.getVersion().majorVersion());
+ ASSERT_EQ(competingVersion.minorVersion() + 1u, chunkDoc.getVersion().minorVersion());
+
+ // Second chunkDoc should have range [chunkSplitPoint, chunkMax]
+ auto otherChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint);
+ auto otherChunkDoc = otherChunkDocStatus.getValue();
+ // Check for increment based on the competing chunk version.
ASSERT_EQ(competingVersion.majorVersion(), otherChunkDoc.getVersion().majorVersion());
- ASSERT_EQ(competingVersion.minorVersion() + 2, otherChunkDoc.getVersion().minorVersion());
+ ASSERT_EQ(competingVersion.minorVersion() + 2u, otherChunkDoc.getVersion().minorVersion());
+}
+
+TEST_F(SplitChunkTest, SplitsOnShardWithHighestShardVersionIncreasesCollectionVersion) {
+ ChunkType chunk, chunk2;
+ chunk.setNS(kNamespace);
+ chunk2.setNS(kNamespace);
+ auto collEpoch = OID::gen();
+
+ // Set up first chunk with lower version on shard0001. Its shard will not have shard version ==
+ // collection version.
+ auto origVersion = ChunkVersion(1, 2, collEpoch);
+ chunk.setVersion(origVersion);
+ chunk.setShard(ShardId("shard0000"));
+ chunk.setMin(BSON("a" << 1));
+ chunk.setMax(BSON("a" << 10));
+
+ // Set up second chunk (chunk2) on shard0001. This has the higher version, so its shard version
+ // == collection version. When we split it, its major version should increase.
+ auto competingVersion = ChunkVersion(2, 1, collEpoch);
+ chunk2.setVersion(competingVersion);
+ chunk2.setShard(ShardId("shard0001"));
+ chunk2.setMin(BSON("a" << 10));
+ chunk2.setMax(BSON("a" << 20));
+
+ setupChunks({chunk, chunk2});
+
+ std::vector<BSONObj> splitPoints;
+ // This will split the second chunk.
+ auto chunkSplitPoint = BSON("a" << 15);
+ splitPoints.push_back(chunkSplitPoint);
+
+ ASSERT_OK(ShardingCatalogManager::get(operationContext())
+ ->commitChunkSplit(operationContext(),
+ kNamespace,
+ collEpoch,
+ ChunkRange(chunk2.getMin(), chunk2.getMax()),
+ splitPoints,
+ chunk2.getShard().toString()));
+
+ // First chunkDoc should have range [chunk2.getMin(), chunkSplitPoint]
+ auto chunkDocStatus = getChunkDoc(operationContext(), chunk2.getMin());
+ auto chunkDoc = chunkDocStatus.getValue();
+
+ // Check for major version increment based on the competing chunk version.
+ ASSERT_EQ(competingVersion.majorVersion() + 1u, chunkDoc.getVersion().majorVersion());
+ ASSERT_EQ(1u, chunkDoc.getVersion().minorVersion());
+
+ // Second chunkDoc should have range [chunkSplitPoint, chunk2.getMax()]
+ auto otherChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint);
+ auto otherChunkDoc = otherChunkDocStatus.getValue();
+ // Check for increment based on the competing chunk version.
+ ASSERT_EQ(competingVersion.majorVersion() + 1u, otherChunkDoc.getVersion().majorVersion());
+ ASSERT_EQ(2u, otherChunkDoc.getVersion().minorVersion());
}
TEST_F(SplitChunkTest, PreConditionFailErrors) {