diff options
author | Martin Neupauer <martin.neupauer@10gen.com> | 2018-03-06 21:27:26 -0500 |
---|---|---|
committer | Martin Neupauer <martin.neupauer@mongodb.com> | 2018-03-27 13:18:55 -0400 |
commit | e6503239af00e7a7a5848ed7784142ac904e4218 (patch) | |
tree | 347220712d1e74f9d2ed9f41c6bc87e95eb9232f | |
parent | 15a1f6446d89fe30c63f1dc23ecc4c477b905168 (diff) | |
download | mongo-e6503239af00e7a7a5848ed7784142ac904e4218.tar.gz |
SERVER-33453 - Add timestamp support to the ChunkManager
-rw-r--r-- | src/mongo/base/error_codes.err | 3 | ||||
-rw-r--r-- | src/mongo/db/s/collection_metadata_test.cpp | 50 | ||||
-rw-r--r-- | src/mongo/db/s/collection_range_deleter_test.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/s/metadata_manager_test.cpp | 9 | ||||
-rw-r--r-- | src/mongo/s/catalog/type_chunk.h | 3 | ||||
-rw-r--r-- | src/mongo/s/catalog_cache.cpp | 50 | ||||
-rw-r--r-- | src/mongo/s/catalog_cache.h | 9 | ||||
-rw-r--r-- | src/mongo/s/chunk.cpp | 28 | ||||
-rw-r--r-- | src/mongo/s/chunk.h | 9 | ||||
-rw-r--r-- | src/mongo/s/chunk_manager.cpp | 155 | ||||
-rw-r--r-- | src/mongo/s/chunk_manager.h | 305 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_map_reduce_cmd.cpp | 1 | ||||
-rw-r--r-- | src/mongo/s/write_ops/cluster_write.cpp | 4 |
17 files changed, 421 insertions, 228 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index e43a7ccb131..0eb14979d0c 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -247,6 +247,7 @@ error_code("SnapshotUnavailable", 246) error_code("ProducerConsumerQueueBatchTooLarge", 247) error_code("ProducerConsumerQueueEndClosed", 248) error_code("StaleDbVersion", 249, extra="StaleDbRoutingVersion"); +error_code("StaleChunkHistory", 250); # Error codes 4000-8999 are reserved. @@ -304,4 +305,4 @@ error_class("ConnectionFatalMessageParseError", ["IllegalOpMsgFlag", error_class("ExceededTimeLimitError", ["ExceededTimeLimit", "NetworkInterfaceExceededTimeLimit"]) -error_class("SnapshotError", ["SnapshotTooOld", "SnapshotUnavailable"]) +error_class("SnapshotError", ["SnapshotTooOld", "SnapshotUnavailable", "StaleChunkHistory"]) diff --git a/src/mongo/db/s/collection_metadata_test.cpp b/src/mongo/db/s/collection_metadata_test.cpp index 120c0b3ee58..862188f5509 100644 --- a/src/mongo/db/s/collection_metadata_test.cpp +++ b/src/mongo/db/s/collection_metadata_test.cpp @@ -43,13 +43,17 @@ using unittest::assertGet; std::unique_ptr<CollectionMetadata> makeCollectionMetadataImpl( const KeyPattern& shardKeyPattern, - const std::vector<std::pair<BSONObj, BSONObj>>& thisShardsChunks) { + const std::vector<std::pair<BSONObj, BSONObj>>& thisShardsChunks, + bool staleChunkManager) { const OID epoch = OID::gen(); const NamespaceString kNss("test.foo"); const ShardId kThisShard("thisShard"); const ShardId kOtherShard("otherShard"); + const Timestamp kRouting(100, 0); + const Timestamp kChunkManager(staleChunkManager ? 99 : 100, 0); + std::vector<ChunkType> allChunks; auto nextMinKey = shardKeyPattern.globalMin(); ChunkVersion version{1, 0, epoch}; @@ -58,20 +62,25 @@ std::unique_ptr<CollectionMetadata> makeCollectionMetadataImpl( // Need to add a chunk to the other shard from nextMinKey to myNextChunk.first. allChunks.emplace_back( kNss, ChunkRange{nextMinKey, myNextChunk.first}, version, kOtherShard); + allChunks.back().setHistory({ChunkHistory(kRouting, kOtherShard)}); version.incMajor(); } allChunks.emplace_back( kNss, ChunkRange{myNextChunk.first, myNextChunk.second}, version, kThisShard); + allChunks.back().setHistory({ChunkHistory(kRouting, kThisShard)}); version.incMajor(); nextMinKey = myNextChunk.second; } if (SimpleBSONObjComparator::kInstance.evaluate(nextMinKey < shardKeyPattern.globalMax())) { allChunks.emplace_back( kNss, ChunkRange{nextMinKey, shardKeyPattern.globalMax()}, version, kOtherShard); + allChunks.back().setHistory({ChunkHistory(kRouting, kOtherShard)}); } UUID uuid(UUID::gen()); - auto cm = ChunkManager::makeNew(kNss, uuid, shardKeyPattern, nullptr, false, epoch, allChunks); + auto rt = + RoutingTableHistory::makeNew(kNss, uuid, shardKeyPattern, nullptr, false, epoch, allChunks); + std::shared_ptr<ChunkManager> cm = std::make_shared<ChunkManager>(rt, kChunkManager); return stdx::make_unique<CollectionMetadata>(cm, kThisShard); } @@ -83,7 +92,7 @@ struct ConstructedRangeMap : public RangeMap { class NoChunkFixture : public unittest::Test { protected: std::unique_ptr<CollectionMetadata> makeCollectionMetadata() const { - return makeCollectionMetadataImpl(KeyPattern(BSON("a" << 1)), {}); + return makeCollectionMetadataImpl(KeyPattern(BSON("a" << 1)), {}, false); } }; @@ -168,8 +177,8 @@ TEST_F(NoChunkFixture, OrphanedDataRangeEnd) { class SingleChunkFixture : public unittest::Test { protected: std::unique_ptr<CollectionMetadata> makeCollectionMetadata() const { - return makeCollectionMetadataImpl(KeyPattern(BSON("a" << 1)), - {std::make_pair(BSON("a" << 10), BSON("a" << 20))}); + return makeCollectionMetadataImpl( + KeyPattern(BSON("a" << 1)), {std::make_pair(BSON("a" << 10), BSON("a" << 20))}, false); } }; @@ -247,7 +256,8 @@ protected: const KeyPattern shardKeyPattern(BSON("a" << 1 << "b" << 1)); return makeCollectionMetadataImpl( shardKeyPattern, - {std::make_pair(shardKeyPattern.globalMin(), shardKeyPattern.globalMax())}); + {std::make_pair(shardKeyPattern.globalMin(), shardKeyPattern.globalMax())}, + false); } }; @@ -271,7 +281,8 @@ protected: return makeCollectionMetadataImpl( KeyPattern(BSON("a" << 1 << "b" << 1)), {std::make_pair(BSON("a" << 10 << "b" << 0), BSON("a" << 20 << "b" << 0)), - std::make_pair(BSON("a" << 30 << "b" << 0), BSON("a" << 40 << "b" << 0))}); + std::make_pair(BSON("a" << 30 << "b" << 0), BSON("a" << 40 << "b" << 0))}, + false); } }; @@ -308,7 +319,8 @@ protected: return makeCollectionMetadataImpl(KeyPattern(BSON("a" << 1)), {std::make_pair(BSON("a" << MINKEY), BSON("a" << 10)), std::make_pair(BSON("a" << 10), BSON("a" << 20)), - std::make_pair(BSON("a" << 30), BSON("a" << MAXKEY))}); + std::make_pair(BSON("a" << 30), BSON("a" << MAXKEY))}, + false); } }; @@ -390,5 +402,27 @@ TEST_F(ThreeChunkWithRangeGapFixture, GetDifferentChunkFromLast) { ASSERT_EQUALS(0, differentChunk.getMax().woCompare(BSON("a" << 10))); } +/** + * Fixture with single chunk containing: + * [10->20) + */ +class StaleChunkFixture : public unittest::Test { +protected: + std::unique_ptr<CollectionMetadata> makeCollectionMetadata() const { + return makeCollectionMetadataImpl( + KeyPattern(BSON("a" << 1)), {std::make_pair(BSON("a" << 10), BSON("a" << 20))}, true); + } +}; + +TEST_F(StaleChunkFixture, KeyBelongsToMe) { + ASSERT_THROWS_CODE(makeCollectionMetadata()->keyBelongsToMe(BSON("a" << 10)), + AssertionException, + ErrorCodes::StaleChunkHistory); + + ASSERT_THROWS_CODE(makeCollectionMetadata()->keyBelongsToMe(BSON("a" << 0)), + AssertionException, + ErrorCodes::StaleChunkHistory); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/collection_range_deleter_test.cpp b/src/mongo/db/s/collection_range_deleter_test.cpp index beddabc66de..393d20c667b 100644 --- a/src/mongo/db/s/collection_range_deleter_test.cpp +++ b/src/mongo/db/s/collection_range_deleter_test.cpp @@ -70,7 +70,7 @@ protected: client.createCollection(kNss.ns()); const KeyPattern keyPattern(kShardKeyPattern); - auto cm = ChunkManager::makeNew( + auto rt = RoutingTableHistory::makeNew( kNss, UUID::gen(), keyPattern, @@ -81,6 +81,7 @@ protected: ChunkRange{keyPattern.globalMin(), keyPattern.globalMax()}, ChunkVersion(1, 0, epoch()), ShardId("otherShard"))}); + std::shared_ptr<ChunkManager> cm = std::make_shared<ChunkManager>(rt, Timestamp(100, 0)); AutoGetCollection autoColl(operationContext(), kNss, MODE_IX); auto const css = CollectionShardingState::get(operationContext(), kNss); diff --git a/src/mongo/db/s/collection_sharding_state_test.cpp b/src/mongo/db/s/collection_sharding_state_test.cpp index d50d0b10d86..63db860c50d 100644 --- a/src/mongo/db/s/collection_sharding_state_test.cpp +++ b/src/mongo/db/s/collection_sharding_state_test.cpp @@ -132,8 +132,10 @@ std::unique_ptr<CollectionMetadata> makeAMetadata(BSONObj const& keyPattern) { const OID epoch = OID::gen(); auto range = ChunkRange(BSON("key" << MINKEY), BSON("key" << MAXKEY)); auto chunk = ChunkType(kTestNss, std::move(range), ChunkVersion(1, 0, epoch), ShardId("other")); - auto cm = ChunkManager::makeNew( + auto rt = RoutingTableHistory::makeNew( kTestNss, UUID::gen(), KeyPattern(keyPattern), nullptr, false, epoch, {std::move(chunk)}); + std::shared_ptr<ChunkManager> cm = std::make_shared<ChunkManager>(rt, Timestamp(100, 0)); + return stdx::make_unique<CollectionMetadata>(std::move(cm), ShardId("this")); } diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp index c4fc7ca563e..50d25843d50 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp @@ -691,6 +691,7 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration( // Generate the new versions of migratedChunk and controlChunk. Migrating chunk's minor version // will be 0. ChunkType newMigratedChunk = migratedChunk; + newMigratedChunk.setShard(toShard); newMigratedChunk.setVersion(ChunkVersion( currentCollectionVersion.majorVersion() + 1, 0, currentCollectionVersion.epoch())); @@ -718,6 +719,9 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration( << validAfter.get().toString()}; } newHistory.emplace(newHistory.begin(), ChunkHistory(validAfter.get(), toShard)); + } else { + // TODO: SERVER-33781 FCV 3.6 should not have any history + newHistory.clear(); } newMigratedChunk.setHistory(std::move(newHistory)); diff --git a/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp index dd1357e8b4a..5a817120c27 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp @@ -64,7 +64,7 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandWithCtl) { chunk0.setNS(kNamespace); chunk0.setVersion(origVersion); chunk0.setShard(shard0.getName()); - chunk0.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX"))}); + chunk0.setHistory({ChunkHistory(Timestamp(100, 0), shard0.getName())}); // apportion auto chunkMin = BSON("a" << 1); @@ -143,7 +143,7 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandNoCtl) { chunk0.setNS(kNamespace); chunk0.setVersion(origVersion); chunk0.setShard(shard0.getName()); - chunk0.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX"))}); + chunk0.setHistory({ChunkHistory(Timestamp(100, 0), shard0.getName())}); // apportion auto chunkMin = BSON("a" << 1); @@ -204,7 +204,7 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandNoCtlTrimHistory) { chunk0.setNS(kNamespace); chunk0.setVersion(origVersion); chunk0.setShard(shard0.getName()); - chunk0.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX"))}); + chunk0.setHistory({ChunkHistory(Timestamp(100, 0), shard0.getName())}); // apportion auto chunkMin = BSON("a" << 1); @@ -266,7 +266,7 @@ TEST_F(CommitChunkMigrate, RejectOutOfOrderHistory) { chunk0.setNS(kNamespace); chunk0.setVersion(origVersion); chunk0.setShard(shard0.getName()); - chunk0.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX"))}); + chunk0.setHistory({ChunkHistory(Timestamp(100, 0), shard0.getName())}); // apportion auto chunkMin = BSON("a" << 1); diff --git a/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp index 932bc374745..336f1139e4e 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp @@ -53,7 +53,7 @@ TEST_F(SplitChunkTest, SplitExistingChunkCorrectlyShouldSucceed) { auto chunkMax = BSON("a" << 10); chunk.setMin(chunkMin); chunk.setMax(chunkMax); - chunk.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX")), + chunk.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shard0000")), ChunkHistory(Timestamp(90, 0), ShardId("shardY"))}); auto chunkSplitPoint = BSON("a" << 5); @@ -113,7 +113,7 @@ TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) { auto chunkMax = BSON("a" << 10); chunk.setMin(chunkMin); chunk.setMax(chunkMax); - chunk.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX")), + chunk.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shard0000")), ChunkHistory(Timestamp(90, 0), ShardId("shardY"))}); auto chunkSplitPoint = BSON("a" << 5); diff --git a/src/mongo/db/s/metadata_manager_test.cpp b/src/mongo/db/s/metadata_manager_test.cpp index 9acd513a5ab..7a260c7f08f 100644 --- a/src/mongo/db/s/metadata_manager_test.cpp +++ b/src/mongo/db/s/metadata_manager_test.cpp @@ -71,7 +71,7 @@ protected: static std::unique_ptr<CollectionMetadata> makeEmptyMetadata() { const OID epoch = OID::gen(); - auto cm = ChunkManager::makeNew( + auto rt = RoutingTableHistory::makeNew( kNss, UUID::gen(), kShardKeyPattern, @@ -82,6 +82,9 @@ protected: ChunkRange{BSON(kPattern << MINKEY), BSON(kPattern << MAXKEY)}, ChunkVersion(1, 0, epoch), kOtherShard}}); + + std::shared_ptr<ChunkManager> cm = std::make_shared<ChunkManager>(rt, Timestamp(100, 0)); + return stdx::make_unique<CollectionMetadata>(cm, kThisShard); } @@ -110,10 +113,12 @@ protected: v2.incMajor(); auto v3 = v2; v3.incMajor(); - cm = cm->makeUpdated( + auto rt = cm->getRoutingHistory().makeUpdated( {ChunkType{kNss, ChunkRange{chunkToSplit->getMin(), minKey}, v1, kOtherShard}, ChunkType{kNss, ChunkRange{minKey, maxKey}, v2, kThisShard}, ChunkType{kNss, ChunkRange{maxKey, chunkToSplit->getMax()}, v3, kOtherShard}}); + cm = std::make_shared<ChunkManager>(rt, Timestamp(100, 0)); + return stdx::make_unique<CollectionMetadata>(cm, kThisShard); } diff --git a/src/mongo/s/catalog/type_chunk.h b/src/mongo/s/catalog/type_chunk.h index 139e2d1dfa9..2e541690c2c 100644 --- a/src/mongo/s/catalog/type_chunk.h +++ b/src/mongo/s/catalog/type_chunk.h @@ -256,6 +256,9 @@ public: void setHistory(std::vector<ChunkHistory>&& history) { _history = std::move(history); + if (!_history.empty()) { + invariant(_shard == _history.front().getShard()); + } } const std::vector<ChunkHistory>& getHistory() const { return _history; diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp index 85d454b702e..a70ceafc4ee 100644 --- a/src/mongo/s/catalog_cache.cpp +++ b/src/mongo/s/catalog_cache.cpp @@ -35,6 +35,7 @@ #include "mongo/base/status.h" #include "mongo/base/status_with.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/logical_clock.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/repl/optime_with.h" #include "mongo/s/catalog/type_collection.h" @@ -67,10 +68,10 @@ const int kMaxInconsistentRoutingInfoRefreshAttempts = 3; * dropped or recreated concurrently, the caller must retry the reload up to some configurable * number of attempts. */ -std::shared_ptr<ChunkManager> refreshCollectionRoutingInfo( +std::shared_ptr<RoutingTableHistory> refreshCollectionRoutingInfo( OperationContext* opCtx, const NamespaceString& nss, - std::shared_ptr<ChunkManager> existingRoutingInfo, + std::shared_ptr<RoutingTableHistory> existingRoutingInfo, StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollectionAndChangedChunks) { if (swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound) { return nullptr; @@ -94,13 +95,13 @@ std::shared_ptr<ChunkManager> refreshCollectionRoutingInfo( } return nullptr; }(); - return ChunkManager::makeNew(nss, - collectionAndChunks.uuid, - KeyPattern(collectionAndChunks.shardKeyPattern), - std::move(defaultCollator), - collectionAndChunks.shardKeyIsUnique, - collectionAndChunks.epoch, - collectionAndChunks.changedChunks); + return RoutingTableHistory::makeNew(nss, + collectionAndChunks.uuid, + KeyPattern(collectionAndChunks.shardKeyPattern), + std::move(defaultCollator), + collectionAndChunks.shardKeyIsUnique, + collectionAndChunks.epoch, + collectionAndChunks.changedChunks); }(); std::set<ShardId> shardIds; @@ -129,14 +130,18 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx } } +StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo( + OperationContext* opCtx, const NamespaceString& nss) { + return _getCollectionRoutingInfoAt(opCtx, nss, boost::none); +} + StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfoAt( OperationContext* opCtx, const NamespaceString& nss, Timestamp atClusterTime) { - // TODO (GPiTR): Implement retrieving collection routing info at time - return getCollectionRoutingInfo(opCtx, nss); + return _getCollectionRoutingInfoAt(opCtx, nss, atClusterTime); } -StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo( - OperationContext* opCtx, const NamespaceString& nss) { +StatusWith<CachedCollectionRoutingInfo> CatalogCache::_getCollectionRoutingInfoAt( + OperationContext* opCtx, const NamespaceString& nss, boost::optional<Timestamp> atClusterTime) { while (true) { std::shared_ptr<DatabaseInfoEntry> dbEntry; try { @@ -197,12 +202,14 @@ StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo( continue; } + auto cm = std::make_shared<ChunkManager>(collEntry.routingInfo, atClusterTime); + return {CachedCollectionRoutingInfo( nss, {dbEntry, uassertStatusOK( Grid::get(opCtx)->shardRegistry()->getShard(opCtx, dbEntry->primaryShardId))}, - collEntry.routingInfo)}; + std::move(cm))}; } } @@ -347,11 +354,12 @@ std::shared_ptr<CatalogCache::DatabaseInfoEntry> CatalogCache::_getDatabase(Oper dbDesc.getVersion()}); } -void CatalogCache::_scheduleCollectionRefresh(WithLock lk, - std::shared_ptr<DatabaseInfoEntry> dbEntry, - std::shared_ptr<ChunkManager> existingRoutingInfo, - NamespaceString const& nss, - int refreshAttempt) { +void CatalogCache::_scheduleCollectionRefresh( + WithLock lk, + std::shared_ptr<DatabaseInfoEntry> dbEntry, + std::shared_ptr<RoutingTableHistory> existingRoutingInfo, + NamespaceString const& nss, + int refreshAttempt) { // If we have an existing chunk manager, the refresh is considered "incremental", regardless of // how many chunks are in the differential const bool isIncremental(existingRoutingInfo); @@ -366,7 +374,7 @@ void CatalogCache::_scheduleCollectionRefresh(WithLock lk, // Invoked when one iteration of getChunksSince has completed, whether with success or error const auto onRefreshCompleted = [ this, t = Timer(), nss, isIncremental ]( - const Status& status, ChunkManager* routingInfoAfterRefresh) { + const Status& status, RoutingTableHistory* routingInfoAfterRefresh) { if (isIncremental) { _stats.numActiveIncrementalRefreshes.subtractAndFetch(1); } else { @@ -414,7 +422,7 @@ void CatalogCache::_scheduleCollectionRefresh(WithLock lk, [ this, dbEntry, nss, existingRoutingInfo, onRefreshFailed, onRefreshCompleted ]( OperationContext * opCtx, StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { - std::shared_ptr<ChunkManager> newRoutingInfo; + std::shared_ptr<RoutingTableHistory> newRoutingInfo; try { newRoutingInfo = refreshCollectionRoutingInfo( opCtx, nss, std::move(existingRoutingInfo), std::move(swCollAndChunks)); diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h index bd857a8b4b8..6e28f3c7167 100644 --- a/src/mongo/s/catalog_cache.h +++ b/src/mongo/s/catalog_cache.h @@ -160,7 +160,7 @@ private: std::shared_ptr<Notification<Status>> refreshCompletionNotification; // Contains the cached routing information (only available if needsRefresh is false) - std::shared_ptr<ChunkManager> routingInfo; + std::shared_ptr<RoutingTableHistory> routingInfo; }; /** @@ -191,10 +191,15 @@ private: */ void _scheduleCollectionRefresh(WithLock, std::shared_ptr<DatabaseInfoEntry> dbEntry, - std::shared_ptr<ChunkManager> existingRoutingInfo, + std::shared_ptr<RoutingTableHistory> existingRoutingInfo, NamespaceString const& nss, int refreshAttempt); + StatusWith<CachedCollectionRoutingInfo> _getCollectionRoutingInfoAt( + OperationContext* opCtx, + const NamespaceString& nss, + boost::optional<Timestamp> atClusterTime); + // Interface from which chunks will be retrieved CatalogCacheLoader& _cacheLoader; diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp index 21067acd7e0..95842124579 100644 --- a/src/mongo/s/chunk.cpp +++ b/src/mongo/s/chunk.cpp @@ -65,6 +65,34 @@ Chunk::Chunk(const ChunkType& from) _jumbo(from.getJumbo()), _dataWritten(mkDataWritten()) { invariantOK(from.validate()); + if (!_history.empty()) { + invariant(_shardId == _history.front().getShard()); + } +} + +const ShardId& Chunk::getShardIdAt(const boost::optional<Timestamp>& ts) const { + // This chunk was refreshed from FCV 3.6 config server so it doesn't have history + // TODO: SERVER-34100 consider removing getShardIdAt completely + // TODO: SERVER-33781 add uassert once we do upgrade/downgrade work + if (_history.empty()) { + return _shardId; + } + + // If the tiemstamp is not provided than we return the latest shardid + if (!ts) { + invariant(_shardId == _history.front().getShard()); + return _history.front().getShard(); + } + + for (const auto& h : _history) { + if (h.getValidAfter() <= ts) { + return h.getShard(); + } + } + + uasserted(ErrorCodes::StaleChunkHistory, + str::stream() << "Cant find shardId the chunk belonged to at cluster time " + << ts.get().toString()); } bool Chunk::containsKey(const BSONObj& shardKey) const { diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h index 31205aefc25..c6a6bbec550 100644 --- a/src/mongo/s/chunk.h +++ b/src/mongo/s/chunk.h @@ -55,13 +55,20 @@ public: } const ShardId& getShardId() const { - return _shardId; + // TODO: SERVER-34100 - consolidate a usage of getShardAt and getShardIdAt + return getShardIdAt(boost::none); } + const ShardId& getShardIdAt(const boost::optional<Timestamp>& ts) const; + ChunkVersion getLastmod() const { return _lastmod; } + const auto& getHistory() const { + return _history; + } + bool isJumbo() const { return _jumbo; } diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp index 7bf687c1c1a..166f9ccda95 100644 --- a/src/mongo/s/chunk_manager.cpp +++ b/src/mongo/s/chunk_manager.cpp @@ -45,7 +45,7 @@ namespace mongo { namespace { -// Used to generate sequence numbers to assign to each newly created ChunkManager +// Used to generate sequence numbers to assign to each newly created RoutingTableHistory AtomicUInt32 nextCMSequenceNumber(0); void checkAllElementsAreOfType(BSONType type, const BSONObj& o) { @@ -68,13 +68,13 @@ std::string extractKeyStringInternal(const BSONObj& shardKeyValue, Ordering orde } // namespace -ChunkManager::ChunkManager(NamespaceString nss, - boost::optional<UUID> uuid, - KeyPattern shardKeyPattern, - std::unique_ptr<CollatorInterface> defaultCollator, - bool unique, - ChunkMap chunkMap, - ChunkVersion collectionVersion) +RoutingTableHistory::RoutingTableHistory(NamespaceString nss, + boost::optional<UUID> uuid, + KeyPattern shardKeyPattern, + std::unique_ptr<CollatorInterface> defaultCollator, + bool unique, + ChunkMap chunkMap, + ChunkVersion collectionVersion) : _sequenceNumber(nextCMSequenceNumber.addAndFetch(1)), _nss(std::move(nss)), _uuid(uuid), @@ -89,7 +89,7 @@ ChunkManager::ChunkManager(NamespaceString nss, std::shared_ptr<Chunk> ChunkManager::findIntersectingChunk(const BSONObj& shardKey, const BSONObj& collation) const { - const bool hasSimpleCollation = (collation.isEmpty() && !_defaultCollator) || + const bool hasSimpleCollation = (collation.isEmpty() && !_rt->getDefaultCollator()) || SimpleBSONObjComparator::kInstance.evaluate(collation == CollationSpec::kSimpleSpec); if (!hasSimpleCollation) { for (BSONElement elt : shardKey) { @@ -100,43 +100,38 @@ std::shared_ptr<Chunk> ChunkManager::findIntersectingChunk(const BSONObj& shardK } } - const auto it = _chunkMap.upper_bound(_extractKeyString(shardKey)); + const auto it = _rt->getChunkMap().upper_bound(_rt->_extractKeyString(shardKey)); uassert(ErrorCodes::ShardKeyNotFound, str::stream() << "Cannot target single shard using key " << shardKey, - it != _chunkMap.end() && it->second->containsKey(shardKey)); + it != _rt->getChunkMap().end() && it->second->containsKey(shardKey)); return it->second; } -std::shared_ptr<Chunk> ChunkManager::findIntersectingChunkWithSimpleCollation( - const BSONObj& shardKey) const { - return findIntersectingChunk(shardKey, CollationSpec::kSimpleSpec); -} - bool ChunkManager::keyBelongsToShard(const BSONObj& shardKey, const ShardId& shardId) const { if (shardKey.isEmpty()) return false; - const auto it = _chunkMap.upper_bound(_extractKeyString(shardKey)); - if (it == _chunkMap.end()) + const auto it = _rt->getChunkMap().upper_bound(_rt->_extractKeyString(shardKey)); + if (it == _rt->getChunkMap().end()) return false; invariant(it->second->containsKey(shardKey)); - return it->second->getShardId() == shardId; + return it->second->getShardIdAt(_clusterTime) == shardId; } void ChunkManager::getShardIdsForQuery(OperationContext* opCtx, const BSONObj& query, const BSONObj& collation, std::set<ShardId>* shardIds) const { - auto qr = stdx::make_unique<QueryRequest>(_nss); + auto qr = stdx::make_unique<QueryRequest>(_rt->getns()); qr->setFilter(query); if (!collation.isEmpty()) { qr->setCollation(collation); - } else if (_defaultCollator) { - qr->setCollation(_defaultCollator->getSpec().toBSON()); + } else if (_rt->getDefaultCollator()) { + qr->setCollation(_rt->getDefaultCollator()->getSpec().toBSON()); } const boost::intrusive_ptr<ExpressionContext> expCtx; @@ -148,11 +143,11 @@ void ChunkManager::getShardIdsForQuery(OperationContext* opCtx, MatchExpressionParser::kAllowAllSpecialFeatures)); // Fast path for targeting equalities on the shard key. - auto shardKeyToFind = _shardKeyPattern.extractShardKeyFromQuery(*cq); + auto shardKeyToFind = _rt->getShardKeyPattern().extractShardKeyFromQuery(*cq); if (!shardKeyToFind.isEmpty()) { try { auto chunk = findIntersectingChunk(shardKeyToFind, collation); - shardIds->insert(chunk->getShardId()); + shardIds->insert(chunk->getShardIdAt(_clusterTime)); return; } catch (const DBException&) { // The query uses multiple shards @@ -165,20 +160,20 @@ void ChunkManager::getShardIdsForQuery(OperationContext* opCtx, // Query { a : { $gte : 1, $lt : 2 }, // b : { $gte : 3, $lt : 4 } } // => Bounds { a : [1, 2), b : [3, 4) } - IndexBounds bounds = getIndexBoundsForQuery(_shardKeyPattern.toBSON(), *cq); + IndexBounds bounds = getIndexBoundsForQuery(_rt->getShardKeyPattern().toBSON(), *cq); // Transforms bounds for each shard key field into full shard key ranges // for example : // Key { a : 1, b : 1 } // Bounds { a : [1, 2), b : [3, 4) } // => Ranges { a : 1, b : 3 } => { a : 2, b : 4 } - BoundList ranges = _shardKeyPattern.flattenBounds(bounds); + BoundList ranges = _rt->getShardKeyPattern().flattenBounds(bounds); for (BoundList::const_iterator it = ranges.begin(); it != ranges.end(); ++it) { getShardIdsForRange(it->first /*min*/, it->second /*max*/, shardIds); // once we know we need to visit all shards no need to keep looping - if (shardIds->size() == _shardVersions.size()) { + if (shardIds->size() == _rt->_shardVersions.size()) { break; } } @@ -187,47 +182,41 @@ void ChunkManager::getShardIdsForQuery(OperationContext* opCtx, // For now, we satisfy that assumption by adding a shard with no matches rather than returning // an empty set of shards. if (shardIds->empty()) { - shardIds->insert(_chunkMap.begin()->second->getShardId()); + shardIds->insert(_rt->getChunkMap().begin()->second->getShardIdAt(_clusterTime)); } } void ChunkManager::getShardIdsForRange(const BSONObj& min, const BSONObj& max, std::set<ShardId>* shardIds) const { - const auto itMin = _chunkMap.upper_bound(_extractKeyString(min)); - const auto itMax = [this, &max]() { - auto it = _chunkMap.upper_bound(_extractKeyString(max)); - return it == _chunkMap.end() ? it : ++it; - }(); - - for (auto it = itMin; it != itMax; ++it) { - shardIds->insert(it->second->getShardId()); + const auto bounds = _rt->overlappingRanges(min, max, true); + for (auto it = bounds.first; it != bounds.second; ++it) { + shardIds->insert(it->second->getShardIdAt(_clusterTime)); // No need to iterate through the rest of the ranges, because we already know we need to use // all shards. - if (shardIds->size() == _shardVersions.size()) { + if (shardIds->size() == _rt->_shardVersions.size()) { break; } } } bool ChunkManager::rangeOverlapsShard(const ChunkRange& range, const ShardId& shardId) const { - const auto itMin = _chunkMap.upper_bound(_extractKeyString(range.getMin())); - const auto itMax = [this, &range]() { - auto it = _chunkMap.lower_bound(_extractKeyString(range.getMax())); - return it == _chunkMap.end() ? it : ++it; - }(); - const auto it = std::find_if( - itMin, itMax, [&shardId](const auto& scr) { return scr.second->getShardId() == shardId; }); - return it != itMax; + const auto bounds = _rt->overlappingRanges(range.getMin(), range.getMax(), false); + const auto it = std::find_if(bounds.first, bounds.second, [this, &shardId](const auto& scr) { + return scr.second->getShardIdAt(_clusterTime) == shardId; + }); + + return it != bounds.second; } ChunkManager::ConstRangeOfChunks ChunkManager::getNextChunkOnShard(const BSONObj& shardKey, const ShardId& shardId) const { - for (auto it = _chunkMap.upper_bound(_extractKeyString(shardKey)); it != _chunkMap.end(); + for (auto it = _rt->getChunkMap().upper_bound(_rt->_extractKeyString(shardKey)); + it != _rt->getChunkMap().end(); ++it) { const auto& chunk = it->second; - if (chunk->getShardId() == shardId) { + if (chunk->getShardIdAt(_clusterTime) == shardId) { const auto begin = it; const auto end = ++it; return {ConstChunkIterator(begin), ConstChunkIterator(end)}; @@ -237,13 +226,28 @@ ChunkManager::ConstRangeOfChunks ChunkManager::getNextChunkOnShard(const BSONObj return {ConstChunkIterator(), ConstChunkIterator()}; } -void ChunkManager::getAllShardIds(std::set<ShardId>* all) const { +void RoutingTableHistory::getAllShardIds(std::set<ShardId>* all) const { std::transform(_shardVersions.begin(), _shardVersions.end(), std::inserter(*all, all->begin()), [](const ShardVersionMap::value_type& pair) { return pair.first; }); } +std::pair<ChunkMap::const_iterator, ChunkMap::const_iterator> +RoutingTableHistory::overlappingRanges(const BSONObj& min, + const BSONObj& max, + bool isMaxInclusive) const { + + const auto itMin = _chunkMap.upper_bound(_extractKeyString(min)); + const auto itMax = [this, &max, isMaxInclusive]() { + auto it = isMaxInclusive ? _chunkMap.upper_bound(_extractKeyString(max)) + : _chunkMap.lower_bound(_extractKeyString(max)); + return it == _chunkMap.end() ? it : ++it; + }(); + + return {itMin, itMax}; +} + IndexBounds ChunkManager::getIndexBoundsForQuery(const BSONObj& key, const CanonicalQuery& canonicalQuery) { // $text is not allowed in planning since we don't have text index on mongos. @@ -359,13 +363,14 @@ IndexBounds ChunkManager::collapseQuerySolution(const QuerySolutionNode* node) { return bounds; } -bool ChunkManager::compatibleWith(const ChunkManager& other, const ShardId& shardName) const { +bool RoutingTableHistory::compatibleWith(const RoutingTableHistory& other, + const ShardId& shardName) const { // Return true if the shard version is the same in the two chunk managers // TODO: This doesn't need to be so strong, just major vs return other.getVersion(shardName).equals(getVersion(shardName)); } -ChunkVersion ChunkManager::getVersion(const ShardId& shardName) const { +ChunkVersion RoutingTableHistory::getVersion(const ShardId& shardName) const { auto it = _shardVersions.find(shardName); if (it == _shardVersions.end()) { // Shards without explicitly tracked shard versions (meaning they have no chunks) always @@ -376,13 +381,13 @@ ChunkVersion ChunkManager::getVersion(const ShardId& shardName) const { return it->second; } -std::string ChunkManager::toString() const { +std::string RoutingTableHistory::toString() const { StringBuilder sb; - sb << "ChunkManager: " << _nss.ns() << " key: " << _shardKeyPattern.toString() << '\n'; + sb << "RoutingTableHistory: " << _nss.ns() << " key: " << _shardKeyPattern.toString() << '\n'; sb << "Chunks:\n"; - for (const auto& chunk : chunks()) { - sb << "\t" << chunk->toString() << '\n'; + for (const auto& chunk : _chunkMap) { + sb << "\t" << chunk.second->toString() << '\n'; } sb << "Shard versions:\n"; @@ -393,9 +398,9 @@ std::string ChunkManager::toString() const { return sb.str(); } -ShardVersionMap ChunkManager::_constructShardVersionMap(const OID& epoch, - const ChunkMap& chunkMap, - Ordering shardKeyOrdering) { +ShardVersionMap RoutingTableHistory::_constructShardVersionMap(const OID& epoch, + const ChunkMap& chunkMap, + Ordering shardKeyOrdering) { ShardVersionMap shardVersions; ChunkMap::const_iterator current = chunkMap.cbegin(); @@ -475,11 +480,11 @@ ShardVersionMap ChunkManager::_constructShardVersionMap(const OID& epoch, return shardVersions; } -std::string ChunkManager::_extractKeyString(const BSONObj& shardKeyValue) const { +std::string RoutingTableHistory::_extractKeyString(const BSONObj& shardKeyValue) const { return extractKeyStringInternal(shardKeyValue, _shardKeyOrdering); } -std::shared_ptr<ChunkManager> ChunkManager::makeNew( +std::shared_ptr<RoutingTableHistory> RoutingTableHistory::makeNew( NamespaceString nss, boost::optional<UUID> uuid, KeyPattern shardKeyPattern, @@ -487,17 +492,17 @@ std::shared_ptr<ChunkManager> ChunkManager::makeNew( bool unique, OID epoch, const std::vector<ChunkType>& chunks) { - return ChunkManager(std::move(nss), - std::move(uuid), - std::move(shardKeyPattern), - std::move(defaultCollator), - std::move(unique), - {}, - {0, 0, epoch}) + return RoutingTableHistory(std::move(nss), + std::move(uuid), + std::move(shardKeyPattern), + std::move(defaultCollator), + std::move(unique), + {}, + {0, 0, epoch}) .makeUpdated(chunks); } -std::shared_ptr<ChunkManager> ChunkManager::makeUpdated( +std::shared_ptr<RoutingTableHistory> RoutingTableHistory::makeUpdated( const std::vector<ChunkType>& changedChunks) { const auto startingCollectionVersion = getVersion(); @@ -546,14 +551,14 @@ std::shared_ptr<ChunkManager> ChunkManager::makeUpdated( return shared_from_this(); } - return std::shared_ptr<ChunkManager>( - new ChunkManager(_nss, - _uuid, - KeyPattern(getShardKeyPattern().getKeyPattern()), - CollatorInterface::cloneCollator(getDefaultCollator()), - isUnique(), - std::move(chunkMap), - collectionVersion)); + return std::shared_ptr<RoutingTableHistory>( + new RoutingTableHistory(_nss, + _uuid, + KeyPattern(getShardKeyPattern().getKeyPattern()), + CollatorInterface::cloneCollator(getDefaultCollator()), + isUnique(), + std::move(chunkMap), + collectionVersion)); } } // namespace mongo diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h index e56c7b0cb62..816934477d8 100644 --- a/src/mongo/s/chunk_manager.h +++ b/src/mongo/s/chunk_manager.h @@ -47,6 +47,7 @@ namespace mongo { class CanonicalQuery; struct QuerySolutionNode; class OperationContext; +class ChunkManager; // Ordered map from the max for each chunk to an entry describing the chunk using ChunkMap = std::map<std::string, std::shared_ptr<Chunk>>; @@ -55,8 +56,167 @@ using ChunkMap = std::map<std::string, std::shared_ptr<Chunk>>; using ShardVersionMap = std::map<ShardId, ChunkVersion>; /** - * In-memory representation of the routing table for a single sharded collection. + * In-memory representation of the routing table for a single sharded collection at various points + * in time. */ +class RoutingTableHistory : public std::enable_shared_from_this<RoutingTableHistory> { + MONGO_DISALLOW_COPYING(RoutingTableHistory); + +public: + /** + * Makes an instance with a routing table for collection "nss", sharded on + * "shardKeyPattern". + * + * "defaultCollator" is the default collation for the collection, "unique" indicates whether + * or not the shard key for each document will be globally unique, and "epoch" is the globally + * unique identifier for this version of the collection. + * + * The "chunks" vector must contain the chunk routing information sorted in ascending order by + * chunk version, and adhere to the requirements of the routing table update algorithm. + */ + static std::shared_ptr<RoutingTableHistory> makeNew( + NamespaceString nss, + boost::optional<UUID>, + KeyPattern shardKeyPattern, + std::unique_ptr<CollatorInterface> defaultCollator, + bool unique, + OID epoch, + const std::vector<ChunkType>& chunks); + + /** + * Constructs a new instance with a routing table updated according to the changes described + * in "changedChunks". + * + * The changes in "changedChunks" must be sorted in ascending order by chunk version, and adhere + * to the requirements of the routing table update algorithm. + */ + std::shared_ptr<RoutingTableHistory> makeUpdated(const std::vector<ChunkType>& changedChunks); + + /** + * Returns an increasing number of the reload sequence number of this chunk manager. + */ + unsigned long long getSequenceNumber() const { + return _sequenceNumber; + } + + const NamespaceString& getns() const { + return _nss; + } + + const ShardKeyPattern& getShardKeyPattern() const { + return _shardKeyPattern; + } + + const CollatorInterface* getDefaultCollator() const { + return _defaultCollator.get(); + } + + bool isUnique() const { + return _unique; + } + + ChunkVersion getVersion() const { + return _collectionVersion; + } + + ChunkVersion getVersion(const ShardId& shardId) const; + + const ChunkMap& getChunkMap() const { + return _chunkMap; + } + + /** + * Returns the ids of all shards on which the collection has any chunks. + */ + void getAllShardIds(std::set<ShardId>* all) const; + + /** + * Returns true if, for this shard, the chunks are identical in both chunk managers + */ + bool compatibleWith(const RoutingTableHistory& other, const ShardId& shard) const; + + std::string toString() const; + + bool uuidMatches(UUID uuid) const { + return _uuid && *_uuid == uuid; + } + + std::pair<ChunkMap::const_iterator, ChunkMap::const_iterator> overlappingRanges( + const BSONObj& min, const BSONObj& max, bool isMaxInclusive) const; + + +private: + /** + * Does a single pass over the chunkMap and constructs the ShardVersionMap object. + */ + static ShardVersionMap _constructShardVersionMap(const OID& epoch, + const ChunkMap& chunkMap, + Ordering shardKeyOrdering); + + RoutingTableHistory(NamespaceString nss, + boost::optional<UUID> uuid, + KeyPattern shardKeyPattern, + std::unique_ptr<CollatorInterface> defaultCollator, + bool unique, + ChunkMap chunkMap, + ChunkVersion collectionVersion); + + std::string _extractKeyString(const BSONObj& shardKeyValue) const; + + // The shard versioning mechanism hinges on keeping track of the number of times we reload + // ChunkManagers. + const unsigned long long _sequenceNumber; + + // Namespace to which this routing information corresponds + const NamespaceString _nss; + + // The invariant UUID of the collection. This is optional in 3.6, except in change streams. + const boost::optional<UUID> _uuid; + + // The key pattern used to shard the collection + const ShardKeyPattern _shardKeyPattern; + + const Ordering _shardKeyOrdering; + + // Default collation to use for routing data queries for this collection + const std::unique_ptr<CollatorInterface> _defaultCollator; + + // Whether the sharding key is unique + const bool _unique; + + // Map from the max for each chunk to an entry describing the chunk. The union of all chunks' + // ranges must cover the complete space from [MinKey, MaxKey). + const ChunkMap _chunkMap; + + // Map from shard id to the maximum chunk version for that shard. If a shard contains no + // chunks, it won't be present in this map. + const ShardVersionMap _shardVersions; + + // Max version across all chunks + const ChunkVersion _collectionVersion; + + // Auto-split throttling state (state mutable by write commands) + struct AutoSplitThrottle { + public: + AutoSplitThrottle() : _splitTickets(maxParallelSplits) {} + + TicketHolder _splitTickets; + + // Maximum number of parallel threads requesting a split + static const int maxParallelSplits = 5; + + } _autoSplitThrottle; + + friend class ChunkManager; + // This function needs to be able to access the auto-split throttle + friend void updateChunkWriteStatsAndSplitIfNeeded(OperationContext*, + ChunkManager*, + Chunk*, + long); +}; + +// This will be renamed to RoutingTableHistory and the original RoutingTableHistory will be +// ChunkHistoryMap class ChunkManager : public std::enable_shared_from_this<ChunkManager> { MONGO_DISALLOW_COPYING(ChunkManager); @@ -104,69 +264,46 @@ public: ConstChunkIterator _end; }; - /** - * Makes an instance with a routing table for collection "nss", sharded on - * "shardKeyPattern". - * - * "defaultCollator" is the default collation for the collection, "unique" indicates whether - * or not the shard key for each document will be globally unique, and "epoch" is the globally - * unique identifier for this version of the collection. - * - * The "chunks" vector must contain the chunk routing information sorted in ascending order by - * chunk version, and adhere to the requirements of the routing table update algorithm. - */ - static std::shared_ptr<ChunkManager> makeNew(NamespaceString nss, - boost::optional<UUID>, - KeyPattern shardKeyPattern, - std::unique_ptr<CollatorInterface> defaultCollator, - bool unique, - OID epoch, - const std::vector<ChunkType>& chunks); - - /** - * Constructs a new instance with a routing table updated according to the changes described - * in "changedChunks". - * - * The changes in "changedChunks" must be sorted in ascending order by chunk version, and adhere - * to the requirements of the routing table update algorithm. - */ - std::shared_ptr<ChunkManager> makeUpdated(const std::vector<ChunkType>& changedChunks); - + ChunkManager(std::shared_ptr<RoutingTableHistory> rt, boost::optional<Timestamp> clusterTime) + : _rt(std::move(rt)), _clusterTime(std::move(clusterTime)) {} /** * Returns an increasing number of the reload sequence number of this chunk manager. */ unsigned long long getSequenceNumber() const { - return _sequenceNumber; + return _rt->getSequenceNumber(); } const NamespaceString& getns() const { - return _nss; + return _rt->getns(); } const ShardKeyPattern& getShardKeyPattern() const { - return _shardKeyPattern; + return _rt->getShardKeyPattern(); } const CollatorInterface* getDefaultCollator() const { - return _defaultCollator.get(); + return _rt->getDefaultCollator(); } bool isUnique() const { - return _unique; + return _rt->isUnique(); } ChunkVersion getVersion() const { - return _collectionVersion; + return _rt->getVersion(); } - ChunkVersion getVersion(const ShardId& shardId) const; + ChunkVersion getVersion(const ShardId& shardId) const { + return _rt->getVersion(shardId); + } ConstRangeOfChunks chunks() const { - return {ConstChunkIterator{_chunkMap.cbegin()}, ConstChunkIterator{_chunkMap.cend()}}; + return {ConstChunkIterator{_rt->getChunkMap().cbegin()}, + ConstChunkIterator{_rt->getChunkMap().cend()}}; } int numChunks() const { - return _chunkMap.size(); + return _rt->getChunkMap().size(); } /** @@ -206,7 +343,9 @@ public: /** * Same as findIntersectingChunk, but assumes the simple collation. */ - std::shared_ptr<Chunk> findIntersectingChunkWithSimpleCollation(const BSONObj& shardKey) const; + std::shared_ptr<Chunk> findIntersectingChunkWithSimpleCollation(const BSONObj& shardKey) const { + return findIntersectingChunk(shardKey, CollationSpec::kSimpleSpec); + } /** * Finds the shard IDs for a given filter and collation. If collation is empty, we use the @@ -228,7 +367,9 @@ public: /** * Returns the ids of all shards on which the collection has any chunks. */ - void getAllShardIds(std::set<ShardId>* all) const; + void getAllShardIds(std::set<ShardId>* all) const { + _rt->getAllShardIds(all); + } // Transforms query into bounds for each field in the shard key // for example : @@ -252,81 +393,29 @@ public: /** * Returns true if, for this shard, the chunks are identical in both chunk managers */ - bool compatibleWith(const ChunkManager& other, const ShardId& shard) const; + bool compatibleWith(const ChunkManager& other, const ShardId& shard) const { + return _rt->compatibleWith(*other._rt, shard); + } - std::string toString() const; + std::string toString() const { + return _rt->toString(); + } bool uuidMatches(UUID uuid) const { - return _uuid && *_uuid == uuid; + return _rt->uuidMatches(uuid); } -private: - /** - * Does a single pass over the chunkMap and constructs the ShardVersionMap object. - */ - static ShardVersionMap _constructShardVersionMap(const OID& epoch, - const ChunkMap& chunkMap, - Ordering shardKeyOrdering); - - ChunkManager(NamespaceString nss, - boost::optional<UUID> uuid, - KeyPattern shardKeyPattern, - std::unique_ptr<CollatorInterface> defaultCollator, - bool unique, - ChunkMap chunkMap, - ChunkVersion collectionVersion); - - std::string _extractKeyString(const BSONObj& shardKeyValue) const; - - // The shard versioning mechanism hinges on keeping track of the number of times we reload - // ChunkManagers. - const unsigned long long _sequenceNumber; - - // Namespace to which this routing information corresponds - const NamespaceString _nss; - - // The invariant UUID of the collection. This is optional in 3.6, except in change streams. - const boost::optional<UUID> _uuid; - - // The key pattern used to shard the collection - const ShardKeyPattern _shardKeyPattern; - - const Ordering _shardKeyOrdering; - - // Default collation to use for routing data queries for this collection - const std::unique_ptr<CollatorInterface> _defaultCollator; - - // Whether the sharding key is unique - const bool _unique; - - // Map from the max for each chunk to an entry describing the chunk. The union of all chunks' - // ranges must cover the complete space from [MinKey, MaxKey). - const ChunkMap _chunkMap; - - // Map from shard id to the maximum chunk version for that shard. If a shard contains no - // chunks, it won't be present in this map. - const ShardVersionMap _shardVersions; - - // Max version across all chunks - const ChunkVersion _collectionVersion; - - // Auto-split throttling state (state mutable by write commands) - struct AutoSplitThrottle { - public: - AutoSplitThrottle() : _splitTickets(maxParallelSplits) {} - - TicketHolder _splitTickets; - - // Maximum number of parallel threads requesting a split - static const int maxParallelSplits = 5; + auto& autoSplitThrottle() const { + return _rt->_autoSplitThrottle; + } - } _autoSplitThrottle; + RoutingTableHistory& getRoutingHistory() const { + return *_rt; + } - // This function needs to be able to access the auto-split throttle - friend void updateChunkWriteStatsAndSplitIfNeeded(OperationContext*, - ChunkManager*, - Chunk*, - long); +private: + std::shared_ptr<RoutingTableHistory> _rt; + boost::optional<Timestamp> _clusterTime; }; } // namespace mongo diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp index fc51f62d325..6fe1d46770f 100644 --- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp @@ -51,6 +51,7 @@ #include "mongo/s/commands/strategy.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/shard_collection_gen.h" +#include "mongo/s/write_ops/cluster_write.h" #include "mongo/stdx/chrono.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" diff --git a/src/mongo/s/write_ops/cluster_write.cpp b/src/mongo/s/write_ops/cluster_write.cpp index fe288bfbd56..9eab15b47cd 100644 --- a/src/mongo/s/write_ops/cluster_write.cpp +++ b/src/mongo/s/write_ops/cluster_write.cpp @@ -259,12 +259,12 @@ void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* opCtx, const NamespaceString& nss = manager->getns(); - if (!manager->_autoSplitThrottle._splitTickets.tryAcquire()) { + if (!manager->autoSplitThrottle()._splitTickets.tryAcquire()) { LOG(1) << "won't auto split because not enough tickets: " << nss; return; } - TicketHolderReleaser releaser(&(manager->_autoSplitThrottle._splitTickets)); + TicketHolderReleaser releaser(&(manager->autoSplitThrottle()._splitTickets)); const ChunkRange chunkRange(chunk->getMin(), chunk->getMax()); |