diff options
author | Matthew Saltz <matthew.saltz@mongodb.com> | 2019-07-03 10:41:04 -0400 |
---|---|---|
committer | Matthew Saltz <matthew.saltz@mongodb.com> | 2019-08-13 16:14:40 -0400 |
commit | 6c6f2be0cba78f4e75400d6cd1c0acbd793a7edb (patch) | |
tree | 90c351e9ec111b7aab43debae70839f6a4d8082f /src/mongo/db/s | |
parent | 45df410133eeb9c4fb1e9f0920a94e861f8c4585 (diff) | |
download | mongo-6c6f2be0cba78f4e75400d6cd1c0acbd793a7edb.tar.gz |
SERVER-41480 Increment major version on splits where the shard version equals the collection version
(cherry picked from commit 790609ffb6bc1dac0a3c13d1898d5884e21a1b7a)
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp | 127 | ||||
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp | 151 |
2 files changed, 213 insertions, 65 deletions
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp index 8b2c4b333d2..bd09a5cf5bd 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp @@ -271,6 +271,24 @@ boost::optional<ChunkType> getControlChunkForMigrate(OperationContext* opCtx, return uassertStatusOK(ChunkType::fromConfigBSON(response.docs.front())); } +// Helper function to find collection version and shard version. +StatusWith<ChunkVersion> getMaxChunkVersionFromQueryResponse( + const NamespaceString& nss, const StatusWith<Shard::QueryResponse>& queryResponse) { + + if (!queryResponse.isOK()) { + return queryResponse.getStatus(); + } + + const auto& chunksVector = queryResponse.getValue().docs; + if (chunksVector.empty()) { + return {ErrorCodes::IllegalOperation, + str::stream() << "Collection '" << nss.ns() + << "' no longer either exists, is sharded, or has chunks"}; + } + + return ChunkVersion::parseLegacyWithField(chunksVector.front(), ChunkType::lastmod()); +} + } // namespace Status ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx, @@ -285,42 +303,58 @@ Status ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx, // move chunks on different collections to proceed in parallel Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); - std::string errmsg; - // Get the max chunk version for this namespace. - auto findStatus = Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - ChunkType::ConfigNS, - BSON("ns" << nss.ns()), - BSON(ChunkType::lastmod << -1), - 1); - - if (!findStatus.isOK()) { - return findStatus.getStatus(); - } + auto swCollVersion = getMaxChunkVersionFromQueryResponse( + nss, + Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + ChunkType::ConfigNS, + BSON("ns" << nss.ns()), // Query all chunks for this namespace. + BSON(ChunkType::lastmod << -1), // Sort by version. + 1)); // Limit 1. - const auto& chunksVector = findStatus.getValue().docs; - if (chunksVector.empty()) { - errmsg = str::stream() << "splitChunk cannot split chunk " << range.toString() - << ". Collection '" << nss.ns() - << "' no longer either exists, is sharded, or has chunks"; - return {ErrorCodes::IllegalOperation, errmsg}; + if (!swCollVersion.isOK()) { + return swCollVersion.getStatus().withContext( + str::stream() << "splitChunk cannot split chunk " << range.toString() << "."); } - ChunkVersion collVersion = uassertStatusOK( - ChunkVersion::parseLegacyWithField(chunksVector.front(), ChunkType::lastmod())); + auto collVersion = swCollVersion.getValue(); // Return an error if collection epoch does not match epoch of request. if (collVersion.epoch() != requestEpoch) { - errmsg = str::stream() << "splitChunk cannot split chunk " << range.toString() - << ". Collection '" << nss.ns() << "' was dropped and re-created." - << " Current epoch: " << collVersion.epoch() - << ", cmd epoch: " << requestEpoch; - return {ErrorCodes::StaleEpoch, errmsg}; + return {ErrorCodes::StaleEpoch, + str::stream() << "splitChunk cannot split chunk " << range.toString() + << ". Collection '" + << nss.ns() + << "' was dropped and re-created." + << " Current epoch: " + << collVersion.epoch() + << ", cmd epoch: " + << requestEpoch}; } + // Get the shard version (max chunk version) for the shard requesting the split. + auto swShardVersion = getMaxChunkVersionFromQueryResponse( + nss, + Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + ChunkType::ConfigNS, + BSON("ns" << nss.ns() << "shard" + << shardName), // Query all chunks for this namespace and shard. + BSON(ChunkType::lastmod << -1), // Sort by version. + 1)); // Limit 1. + + if (!swShardVersion.isOK()) { + return swShardVersion.getStatus().withContext( + str::stream() << "splitChunk cannot split chunk " << range.toString() << "."); + } + + auto shardVersion = swShardVersion.getValue(); + // Find the chunk history. const auto origChunk = _findChunkOnConfig(opCtx, nss, range.getMin()); if (!origChunk.isOK()) { @@ -330,6 +364,11 @@ Status ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx, std::vector<ChunkType> newChunks; ChunkVersion currentMaxVersion = collVersion; + // Increment the major version only if the shard that owns the chunk being split has version == + // collection version. See SERVER-41480 for details. + if (shardVersion == collVersion) { + currentMaxVersion.incMajor(); + } auto startKey = range.getMin(); auto newChunkBounds(splitPoints); @@ -513,27 +552,25 @@ Status ShardingCatalogManager::commitChunkMerge(OperationContext* opCtx, !validAfter) { return {ErrorCodes::IllegalOperation, "chunk operation requires validAfter timestamp"}; } - // Get the chunk with the highest version for this namespace - auto findStatus = Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - ChunkType::ConfigNS, - BSON("ns" << nss.ns()), - BSON(ChunkType::lastmod << -1), - 1); - if (!findStatus.isOK()) { - return findStatus.getStatus(); - } + // Get the max chunk version for this namespace. + auto swCollVersion = getMaxChunkVersionFromQueryResponse( + nss, + Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + ChunkType::ConfigNS, + BSON("ns" << nss.ns()), // Query all chunks for this namespace. + BSON(ChunkType::lastmod << -1), // Sort by version. + 1)); // Limit 1. - const auto& chunksVector = findStatus.getValue().docs; - if (chunksVector.empty()) - return {ErrorCodes::IllegalOperation, - "collection does not exist, isn't sharded, or has no chunks"}; + if (!swCollVersion.isOK()) { + return swCollVersion.getStatus().withContext(str::stream() + << "mergeChunk cannot merge chunks."); + } - ChunkVersion collVersion = uassertStatusOK( - ChunkVersion::parseLegacyWithField(chunksVector.front(), ChunkType::lastmod())); + auto collVersion = swCollVersion.getValue(); // Return an error if epoch of chunk does not match epoch of request if (collVersion.epoch() != requestEpoch) { diff --git a/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp index 1baa43be2d4..0140245652c 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp @@ -78,9 +78,9 @@ TEST_F(SplitChunkTest, SplitExistingChunkCorrectlyShouldSucceed) { auto chunkDoc = chunkDocStatus.getValue(); ASSERT_BSONOBJ_EQ(chunkSplitPoint, chunkDoc.getMax()); - // Check for increment on first chunkDoc's minor version - ASSERT_EQ(origVersion.majorVersion(), chunkDoc.getVersion().majorVersion()); - ASSERT_EQ(origVersion.minorVersion() + 1, chunkDoc.getVersion().minorVersion()); + // Check for increment on first chunkDoc's major version. + ASSERT_EQ(origVersion.majorVersion() + 1, chunkDoc.getVersion().majorVersion()); + ASSERT_EQ(1u, chunkDoc.getVersion().minorVersion()); // Make sure the history is there ASSERT_EQ(2UL, chunkDoc.getHistory().size()); @@ -92,9 +92,9 @@ TEST_F(SplitChunkTest, SplitExistingChunkCorrectlyShouldSucceed) { auto otherChunkDoc = otherChunkDocStatus.getValue(); ASSERT_BSONOBJ_EQ(chunkMax, otherChunkDoc.getMax()); - // Check for increment on second chunkDoc's minor version - ASSERT_EQ(origVersion.majorVersion(), otherChunkDoc.getVersion().majorVersion()); - ASSERT_EQ(origVersion.minorVersion() + 2, otherChunkDoc.getVersion().minorVersion()); + // Check for increment on second chunkDoc's minor version. + ASSERT_EQ(origVersion.majorVersion() + 1, otherChunkDoc.getVersion().majorVersion()); + ASSERT_EQ(2u, otherChunkDoc.getVersion().minorVersion()); // Make sure the history is there ASSERT_EQ(2UL, otherChunkDoc.getHistory().size()); @@ -139,9 +139,9 @@ TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) { auto chunkDoc = chunkDocStatus.getValue(); ASSERT_BSONOBJ_EQ(chunkSplitPoint, chunkDoc.getMax()); - // Check for increment on first chunkDoc's minor version - ASSERT_EQ(origVersion.majorVersion(), chunkDoc.getVersion().majorVersion()); - ASSERT_EQ(origVersion.minorVersion() + 1, chunkDoc.getVersion().minorVersion()); + // Check for increment on first chunkDoc's major version. + ASSERT_EQ(origVersion.majorVersion() + 1, chunkDoc.getVersion().majorVersion()); + ASSERT_EQ(1u, chunkDoc.getVersion().minorVersion()); // Make sure the history is there ASSERT_EQ(2UL, chunkDoc.getHistory().size()); @@ -153,9 +153,9 @@ TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) { auto midChunkDoc = midChunkDocStatus.getValue(); ASSERT_BSONOBJ_EQ(chunkSplitPoint2, midChunkDoc.getMax()); - // Check for increment on second chunkDoc's minor version - ASSERT_EQ(origVersion.majorVersion(), midChunkDoc.getVersion().majorVersion()); - ASSERT_EQ(origVersion.minorVersion() + 2, midChunkDoc.getVersion().minorVersion()); + // Check for increment on second chunkDoc's minor version. + ASSERT_EQ(origVersion.majorVersion() + 1, midChunkDoc.getVersion().majorVersion()); + ASSERT_EQ(2u, midChunkDoc.getVersion().minorVersion()); // Make sure the history is there ASSERT_EQ(2UL, midChunkDoc.getHistory().size()); @@ -167,9 +167,9 @@ TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) { auto lastChunkDoc = lastChunkDocStatus.getValue(); ASSERT_BSONOBJ_EQ(chunkMax, lastChunkDoc.getMax()); - // Check for increment on third chunkDoc's minor version - ASSERT_EQ(origVersion.majorVersion(), lastChunkDoc.getVersion().majorVersion()); - ASSERT_EQ(origVersion.minorVersion() + 3, lastChunkDoc.getVersion().minorVersion()); + // Check for increment on third chunkDoc's minor version. + ASSERT_EQ(origVersion.majorVersion() + 1, lastChunkDoc.getVersion().majorVersion()); + ASSERT_EQ(3u, lastChunkDoc.getVersion().minorVersion()); // Make sure the history is there ASSERT_EQ(2UL, lastChunkDoc.getHistory().size()); @@ -223,9 +223,12 @@ TEST_F(SplitChunkTest, NewSplitShouldClaimHighestVersion) { auto chunkDoc = chunkDocStatus.getValue(); ASSERT_BSONOBJ_EQ(chunkSplitPoint, chunkDoc.getMax()); - // Check for increment based on the competing chunk version - ASSERT_EQ(competingVersion.majorVersion(), chunkDoc.getVersion().majorVersion()); - ASSERT_EQ(competingVersion.minorVersion() + 1, chunkDoc.getVersion().minorVersion()); + // Check for major version increment based on the competing chunk version. + ASSERT_EQ(competingVersion.majorVersion() + 1, chunkDoc.getVersion().majorVersion()); + // The minor version gets reset to 0 when the major version is incremented, and chunk splits + // increment the minor version after incrementing the major version, so we expect the minor + // version here to be 0 + 1 = 1. + ASSERT_EQ(1u, chunkDoc.getVersion().minorVersion()); // Second chunkDoc should have range [chunkSplitPoint, chunkMax] auto otherChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint); @@ -234,9 +237,117 @@ TEST_F(SplitChunkTest, NewSplitShouldClaimHighestVersion) { auto otherChunkDoc = otherChunkDocStatus.getValue(); ASSERT_BSONOBJ_EQ(chunkMax, otherChunkDoc.getMax()); - // Check for increment based on the competing chunk version + // Check for increment based on the competing chunk version. + ASSERT_EQ(competingVersion.majorVersion() + 1, otherChunkDoc.getVersion().majorVersion()); + // The minor version gets reset to 0 when the major version is incremented, and chunk splits + // increment the minor version after incrementing the major version for the first chunk in the + // split vector, so we expect the minor version here to be 0 + 1 + 1 = 2. + ASSERT_EQ(2u, otherChunkDoc.getVersion().minorVersion()); +} + +TEST_F(SplitChunkTest, SplitsOnShardWithLowerShardVersionDoesNotIncreaseCollectionVersion) { + ChunkType chunk, chunk2; + chunk.setNS(kNamespace); + chunk2.setNS(kNamespace); + auto collEpoch = OID::gen(); + + // Set up first chunk with lower version on shard0001. Its shard will not have shard version == + // collection version, so splits to it should give it the collection version plus a minor + // version bump. + auto origVersion = ChunkVersion(1, 2, collEpoch); + chunk.setVersion(origVersion); + chunk.setShard(ShardId("shard0000")); + chunk.setMin(BSON("a" << 1)); + chunk.setMax(BSON("a" << 10)); + + // Set up second chunk (chunk2) on shard0001. This has the higher version. + auto competingVersion = ChunkVersion(2, 1, collEpoch); + chunk2.setVersion(competingVersion); + chunk2.setShard(ShardId("shard0001")); + chunk2.setMin(BSON("a" << 10)); + chunk2.setMax(BSON("a" << 20)); + + setupChunks({chunk, chunk2}); + + std::vector<BSONObj> splitPoints; + auto chunkSplitPoint = BSON("a" << 5); + splitPoints.push_back(chunkSplitPoint); + + ASSERT_OK(ShardingCatalogManager::get(operationContext()) + ->commitChunkSplit(operationContext(), + kNamespace, + collEpoch, + ChunkRange(chunk.getMin(), chunk.getMax()), + splitPoints, + chunk.getShard().toString())); + + // First chunkDoc should have range [chunk.getMin(), chunkSplitPoint] + auto chunkDocStatus = getChunkDoc(operationContext(), chunk.getMin()); + auto chunkDoc = chunkDocStatus.getValue(); + + // Check for major version increment based on the competing chunk version. + ASSERT_EQ(competingVersion.majorVersion(), chunkDoc.getVersion().majorVersion()); + ASSERT_EQ(competingVersion.minorVersion() + 1u, chunkDoc.getVersion().minorVersion()); + + // Second chunkDoc should have range [chunkSplitPoint, chunkMax] + auto otherChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint); + auto otherChunkDoc = otherChunkDocStatus.getValue(); + // Check for increment based on the competing chunk version. ASSERT_EQ(competingVersion.majorVersion(), otherChunkDoc.getVersion().majorVersion()); - ASSERT_EQ(competingVersion.minorVersion() + 2, otherChunkDoc.getVersion().minorVersion()); + ASSERT_EQ(competingVersion.minorVersion() + 2u, otherChunkDoc.getVersion().minorVersion()); +} + +TEST_F(SplitChunkTest, SplitsOnShardWithHighestShardVersionIncreasesCollectionVersion) { + ChunkType chunk, chunk2; + chunk.setNS(kNamespace); + chunk2.setNS(kNamespace); + auto collEpoch = OID::gen(); + + // Set up first chunk with lower version on shard0001. Its shard will not have shard version == + // collection version. + auto origVersion = ChunkVersion(1, 2, collEpoch); + chunk.setVersion(origVersion); + chunk.setShard(ShardId("shard0000")); + chunk.setMin(BSON("a" << 1)); + chunk.setMax(BSON("a" << 10)); + + // Set up second chunk (chunk2) on shard0001. This has the higher version, so its shard version + // == collection version. When we split it, its major version should increase. + auto competingVersion = ChunkVersion(2, 1, collEpoch); + chunk2.setVersion(competingVersion); + chunk2.setShard(ShardId("shard0001")); + chunk2.setMin(BSON("a" << 10)); + chunk2.setMax(BSON("a" << 20)); + + setupChunks({chunk, chunk2}); + + std::vector<BSONObj> splitPoints; + // This will split the second chunk. + auto chunkSplitPoint = BSON("a" << 15); + splitPoints.push_back(chunkSplitPoint); + + ASSERT_OK(ShardingCatalogManager::get(operationContext()) + ->commitChunkSplit(operationContext(), + kNamespace, + collEpoch, + ChunkRange(chunk2.getMin(), chunk2.getMax()), + splitPoints, + chunk2.getShard().toString())); + + // First chunkDoc should have range [chunk2.getMin(), chunkSplitPoint] + auto chunkDocStatus = getChunkDoc(operationContext(), chunk2.getMin()); + auto chunkDoc = chunkDocStatus.getValue(); + + // Check for major version increment based on the competing chunk version. + ASSERT_EQ(competingVersion.majorVersion() + 1u, chunkDoc.getVersion().majorVersion()); + ASSERT_EQ(1u, chunkDoc.getVersion().minorVersion()); + + // Second chunkDoc should have range [chunkSplitPoint, chunk2.getMax()] + auto otherChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint); + auto otherChunkDoc = otherChunkDocStatus.getValue(); + // Check for increment based on the competing chunk version. + ASSERT_EQ(competingVersion.majorVersion() + 1u, otherChunkDoc.getVersion().majorVersion()); + ASSERT_EQ(2u, otherChunkDoc.getVersion().minorVersion()); } TEST_F(SplitChunkTest, PreConditionFailErrors) { |