summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Neupauer <martin.neupauer@10gen.com>2018-03-06 21:27:26 -0500
committerMartin Neupauer <martin.neupauer@mongodb.com>2018-03-27 13:18:55 -0400
commite6503239af00e7a7a5848ed7784142ac904e4218 (patch)
tree347220712d1e74f9d2ed9f41c6bc87e95eb9232f
parent15a1f6446d89fe30c63f1dc23ecc4c477b905168 (diff)
downloadmongo-e6503239af00e7a7a5848ed7784142ac904e4218.tar.gz
SERVER-33453 - Add timestamp support to the ChunkManager
-rw-r--r--src/mongo/base/error_codes.err3
-rw-r--r--src/mongo/db/s/collection_metadata_test.cpp50
-rw-r--r--src/mongo/db/s/collection_range_deleter_test.cpp3
-rw-r--r--src/mongo/db/s/collection_sharding_state_test.cpp4
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp4
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp8
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp4
-rw-r--r--src/mongo/db/s/metadata_manager_test.cpp9
-rw-r--r--src/mongo/s/catalog/type_chunk.h3
-rw-r--r--src/mongo/s/catalog_cache.cpp50
-rw-r--r--src/mongo/s/catalog_cache.h9
-rw-r--r--src/mongo/s/chunk.cpp28
-rw-r--r--src/mongo/s/chunk.h9
-rw-r--r--src/mongo/s/chunk_manager.cpp155
-rw-r--r--src/mongo/s/chunk_manager.h305
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_cmd.cpp1
-rw-r--r--src/mongo/s/write_ops/cluster_write.cpp4
17 files changed, 421 insertions, 228 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index e43a7ccb131..0eb14979d0c 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -247,6 +247,7 @@ error_code("SnapshotUnavailable", 246)
error_code("ProducerConsumerQueueBatchTooLarge", 247)
error_code("ProducerConsumerQueueEndClosed", 248)
error_code("StaleDbVersion", 249, extra="StaleDbRoutingVersion");
+error_code("StaleChunkHistory", 250);
# Error codes 4000-8999 are reserved.
@@ -304,4 +305,4 @@ error_class("ConnectionFatalMessageParseError", ["IllegalOpMsgFlag",
error_class("ExceededTimeLimitError", ["ExceededTimeLimit", "NetworkInterfaceExceededTimeLimit"])
-error_class("SnapshotError", ["SnapshotTooOld", "SnapshotUnavailable"])
+error_class("SnapshotError", ["SnapshotTooOld", "SnapshotUnavailable", "StaleChunkHistory"])
diff --git a/src/mongo/db/s/collection_metadata_test.cpp b/src/mongo/db/s/collection_metadata_test.cpp
index 120c0b3ee58..862188f5509 100644
--- a/src/mongo/db/s/collection_metadata_test.cpp
+++ b/src/mongo/db/s/collection_metadata_test.cpp
@@ -43,13 +43,17 @@ using unittest::assertGet;
std::unique_ptr<CollectionMetadata> makeCollectionMetadataImpl(
const KeyPattern& shardKeyPattern,
- const std::vector<std::pair<BSONObj, BSONObj>>& thisShardsChunks) {
+ const std::vector<std::pair<BSONObj, BSONObj>>& thisShardsChunks,
+ bool staleChunkManager) {
const OID epoch = OID::gen();
const NamespaceString kNss("test.foo");
const ShardId kThisShard("thisShard");
const ShardId kOtherShard("otherShard");
+ const Timestamp kRouting(100, 0);
+ const Timestamp kChunkManager(staleChunkManager ? 99 : 100, 0);
+
std::vector<ChunkType> allChunks;
auto nextMinKey = shardKeyPattern.globalMin();
ChunkVersion version{1, 0, epoch};
@@ -58,20 +62,25 @@ std::unique_ptr<CollectionMetadata> makeCollectionMetadataImpl(
// Need to add a chunk to the other shard from nextMinKey to myNextChunk.first.
allChunks.emplace_back(
kNss, ChunkRange{nextMinKey, myNextChunk.first}, version, kOtherShard);
+ allChunks.back().setHistory({ChunkHistory(kRouting, kOtherShard)});
version.incMajor();
}
allChunks.emplace_back(
kNss, ChunkRange{myNextChunk.first, myNextChunk.second}, version, kThisShard);
+ allChunks.back().setHistory({ChunkHistory(kRouting, kThisShard)});
version.incMajor();
nextMinKey = myNextChunk.second;
}
if (SimpleBSONObjComparator::kInstance.evaluate(nextMinKey < shardKeyPattern.globalMax())) {
allChunks.emplace_back(
kNss, ChunkRange{nextMinKey, shardKeyPattern.globalMax()}, version, kOtherShard);
+ allChunks.back().setHistory({ChunkHistory(kRouting, kOtherShard)});
}
UUID uuid(UUID::gen());
- auto cm = ChunkManager::makeNew(kNss, uuid, shardKeyPattern, nullptr, false, epoch, allChunks);
+ auto rt =
+ RoutingTableHistory::makeNew(kNss, uuid, shardKeyPattern, nullptr, false, epoch, allChunks);
+ std::shared_ptr<ChunkManager> cm = std::make_shared<ChunkManager>(rt, kChunkManager);
return stdx::make_unique<CollectionMetadata>(cm, kThisShard);
}
@@ -83,7 +92,7 @@ struct ConstructedRangeMap : public RangeMap {
class NoChunkFixture : public unittest::Test {
protected:
std::unique_ptr<CollectionMetadata> makeCollectionMetadata() const {
- return makeCollectionMetadataImpl(KeyPattern(BSON("a" << 1)), {});
+ return makeCollectionMetadataImpl(KeyPattern(BSON("a" << 1)), {}, false);
}
};
@@ -168,8 +177,8 @@ TEST_F(NoChunkFixture, OrphanedDataRangeEnd) {
class SingleChunkFixture : public unittest::Test {
protected:
std::unique_ptr<CollectionMetadata> makeCollectionMetadata() const {
- return makeCollectionMetadataImpl(KeyPattern(BSON("a" << 1)),
- {std::make_pair(BSON("a" << 10), BSON("a" << 20))});
+ return makeCollectionMetadataImpl(
+ KeyPattern(BSON("a" << 1)), {std::make_pair(BSON("a" << 10), BSON("a" << 20))}, false);
}
};
@@ -247,7 +256,8 @@ protected:
const KeyPattern shardKeyPattern(BSON("a" << 1 << "b" << 1));
return makeCollectionMetadataImpl(
shardKeyPattern,
- {std::make_pair(shardKeyPattern.globalMin(), shardKeyPattern.globalMax())});
+ {std::make_pair(shardKeyPattern.globalMin(), shardKeyPattern.globalMax())},
+ false);
}
};
@@ -271,7 +281,8 @@ protected:
return makeCollectionMetadataImpl(
KeyPattern(BSON("a" << 1 << "b" << 1)),
{std::make_pair(BSON("a" << 10 << "b" << 0), BSON("a" << 20 << "b" << 0)),
- std::make_pair(BSON("a" << 30 << "b" << 0), BSON("a" << 40 << "b" << 0))});
+ std::make_pair(BSON("a" << 30 << "b" << 0), BSON("a" << 40 << "b" << 0))},
+ false);
}
};
@@ -308,7 +319,8 @@ protected:
return makeCollectionMetadataImpl(KeyPattern(BSON("a" << 1)),
{std::make_pair(BSON("a" << MINKEY), BSON("a" << 10)),
std::make_pair(BSON("a" << 10), BSON("a" << 20)),
- std::make_pair(BSON("a" << 30), BSON("a" << MAXKEY))});
+ std::make_pair(BSON("a" << 30), BSON("a" << MAXKEY))},
+ false);
}
};
@@ -390,5 +402,27 @@ TEST_F(ThreeChunkWithRangeGapFixture, GetDifferentChunkFromLast) {
ASSERT_EQUALS(0, differentChunk.getMax().woCompare(BSON("a" << 10)));
}
+/**
+ * Fixture with single chunk containing:
+ * [10->20)
+ */
+class StaleChunkFixture : public unittest::Test {
+protected:
+ std::unique_ptr<CollectionMetadata> makeCollectionMetadata() const {
+ return makeCollectionMetadataImpl(
+ KeyPattern(BSON("a" << 1)), {std::make_pair(BSON("a" << 10), BSON("a" << 20))}, true);
+ }
+};
+
+TEST_F(StaleChunkFixture, KeyBelongsToMe) {
+ ASSERT_THROWS_CODE(makeCollectionMetadata()->keyBelongsToMe(BSON("a" << 10)),
+ AssertionException,
+ ErrorCodes::StaleChunkHistory);
+
+ ASSERT_THROWS_CODE(makeCollectionMetadata()->keyBelongsToMe(BSON("a" << 0)),
+ AssertionException,
+ ErrorCodes::StaleChunkHistory);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/collection_range_deleter_test.cpp b/src/mongo/db/s/collection_range_deleter_test.cpp
index beddabc66de..393d20c667b 100644
--- a/src/mongo/db/s/collection_range_deleter_test.cpp
+++ b/src/mongo/db/s/collection_range_deleter_test.cpp
@@ -70,7 +70,7 @@ protected:
client.createCollection(kNss.ns());
const KeyPattern keyPattern(kShardKeyPattern);
- auto cm = ChunkManager::makeNew(
+ auto rt = RoutingTableHistory::makeNew(
kNss,
UUID::gen(),
keyPattern,
@@ -81,6 +81,7 @@ protected:
ChunkRange{keyPattern.globalMin(), keyPattern.globalMax()},
ChunkVersion(1, 0, epoch()),
ShardId("otherShard"))});
+ std::shared_ptr<ChunkManager> cm = std::make_shared<ChunkManager>(rt, Timestamp(100, 0));
AutoGetCollection autoColl(operationContext(), kNss, MODE_IX);
auto const css = CollectionShardingState::get(operationContext(), kNss);
diff --git a/src/mongo/db/s/collection_sharding_state_test.cpp b/src/mongo/db/s/collection_sharding_state_test.cpp
index d50d0b10d86..63db860c50d 100644
--- a/src/mongo/db/s/collection_sharding_state_test.cpp
+++ b/src/mongo/db/s/collection_sharding_state_test.cpp
@@ -132,8 +132,10 @@ std::unique_ptr<CollectionMetadata> makeAMetadata(BSONObj const& keyPattern) {
const OID epoch = OID::gen();
auto range = ChunkRange(BSON("key" << MINKEY), BSON("key" << MAXKEY));
auto chunk = ChunkType(kTestNss, std::move(range), ChunkVersion(1, 0, epoch), ShardId("other"));
- auto cm = ChunkManager::makeNew(
+ auto rt = RoutingTableHistory::makeNew(
kTestNss, UUID::gen(), KeyPattern(keyPattern), nullptr, false, epoch, {std::move(chunk)});
+ std::shared_ptr<ChunkManager> cm = std::make_shared<ChunkManager>(rt, Timestamp(100, 0));
+
return stdx::make_unique<CollectionMetadata>(std::move(cm), ShardId("this"));
}
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
index c4fc7ca563e..50d25843d50 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
@@ -691,6 +691,7 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration(
// Generate the new versions of migratedChunk and controlChunk. Migrating chunk's minor version
// will be 0.
ChunkType newMigratedChunk = migratedChunk;
+ newMigratedChunk.setShard(toShard);
newMigratedChunk.setVersion(ChunkVersion(
currentCollectionVersion.majorVersion() + 1, 0, currentCollectionVersion.epoch()));
@@ -718,6 +719,9 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration(
<< validAfter.get().toString()};
}
newHistory.emplace(newHistory.begin(), ChunkHistory(validAfter.get(), toShard));
+ } else {
+ // TODO: SERVER-33781 FCV 3.6 should not have any history
+ newHistory.clear();
}
newMigratedChunk.setHistory(std::move(newHistory));
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp
index dd1357e8b4a..5a817120c27 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_commit_chunk_migration_test.cpp
@@ -64,7 +64,7 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandWithCtl) {
chunk0.setNS(kNamespace);
chunk0.setVersion(origVersion);
chunk0.setShard(shard0.getName());
- chunk0.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX"))});
+ chunk0.setHistory({ChunkHistory(Timestamp(100, 0), shard0.getName())});
// apportion
auto chunkMin = BSON("a" << 1);
@@ -143,7 +143,7 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandNoCtl) {
chunk0.setNS(kNamespace);
chunk0.setVersion(origVersion);
chunk0.setShard(shard0.getName());
- chunk0.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX"))});
+ chunk0.setHistory({ChunkHistory(Timestamp(100, 0), shard0.getName())});
// apportion
auto chunkMin = BSON("a" << 1);
@@ -204,7 +204,7 @@ TEST_F(CommitChunkMigrate, CheckCorrectOpsCommandNoCtlTrimHistory) {
chunk0.setNS(kNamespace);
chunk0.setVersion(origVersion);
chunk0.setShard(shard0.getName());
- chunk0.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX"))});
+ chunk0.setHistory({ChunkHistory(Timestamp(100, 0), shard0.getName())});
// apportion
auto chunkMin = BSON("a" << 1);
@@ -266,7 +266,7 @@ TEST_F(CommitChunkMigrate, RejectOutOfOrderHistory) {
chunk0.setNS(kNamespace);
chunk0.setVersion(origVersion);
chunk0.setShard(shard0.getName());
- chunk0.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX"))});
+ chunk0.setHistory({ChunkHistory(Timestamp(100, 0), shard0.getName())});
// apportion
auto chunkMin = BSON("a" << 1);
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp
index 932bc374745..336f1139e4e 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_split_chunk_test.cpp
@@ -53,7 +53,7 @@ TEST_F(SplitChunkTest, SplitExistingChunkCorrectlyShouldSucceed) {
auto chunkMax = BSON("a" << 10);
chunk.setMin(chunkMin);
chunk.setMax(chunkMax);
- chunk.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX")),
+ chunk.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shard0000")),
ChunkHistory(Timestamp(90, 0), ShardId("shardY"))});
auto chunkSplitPoint = BSON("a" << 5);
@@ -113,7 +113,7 @@ TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) {
auto chunkMax = BSON("a" << 10);
chunk.setMin(chunkMin);
chunk.setMax(chunkMax);
- chunk.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shardX")),
+ chunk.setHistory({ChunkHistory(Timestamp(100, 0), ShardId("shard0000")),
ChunkHistory(Timestamp(90, 0), ShardId("shardY"))});
auto chunkSplitPoint = BSON("a" << 5);
diff --git a/src/mongo/db/s/metadata_manager_test.cpp b/src/mongo/db/s/metadata_manager_test.cpp
index 9acd513a5ab..7a260c7f08f 100644
--- a/src/mongo/db/s/metadata_manager_test.cpp
+++ b/src/mongo/db/s/metadata_manager_test.cpp
@@ -71,7 +71,7 @@ protected:
static std::unique_ptr<CollectionMetadata> makeEmptyMetadata() {
const OID epoch = OID::gen();
- auto cm = ChunkManager::makeNew(
+ auto rt = RoutingTableHistory::makeNew(
kNss,
UUID::gen(),
kShardKeyPattern,
@@ -82,6 +82,9 @@ protected:
ChunkRange{BSON(kPattern << MINKEY), BSON(kPattern << MAXKEY)},
ChunkVersion(1, 0, epoch),
kOtherShard}});
+
+ std::shared_ptr<ChunkManager> cm = std::make_shared<ChunkManager>(rt, Timestamp(100, 0));
+
return stdx::make_unique<CollectionMetadata>(cm, kThisShard);
}
@@ -110,10 +113,12 @@ protected:
v2.incMajor();
auto v3 = v2;
v3.incMajor();
- cm = cm->makeUpdated(
+ auto rt = cm->getRoutingHistory().makeUpdated(
{ChunkType{kNss, ChunkRange{chunkToSplit->getMin(), minKey}, v1, kOtherShard},
ChunkType{kNss, ChunkRange{minKey, maxKey}, v2, kThisShard},
ChunkType{kNss, ChunkRange{maxKey, chunkToSplit->getMax()}, v3, kOtherShard}});
+ cm = std::make_shared<ChunkManager>(rt, Timestamp(100, 0));
+
return stdx::make_unique<CollectionMetadata>(cm, kThisShard);
}
diff --git a/src/mongo/s/catalog/type_chunk.h b/src/mongo/s/catalog/type_chunk.h
index 139e2d1dfa9..2e541690c2c 100644
--- a/src/mongo/s/catalog/type_chunk.h
+++ b/src/mongo/s/catalog/type_chunk.h
@@ -256,6 +256,9 @@ public:
void setHistory(std::vector<ChunkHistory>&& history) {
_history = std::move(history);
+ if (!_history.empty()) {
+ invariant(_shard == _history.front().getShard());
+ }
}
const std::vector<ChunkHistory>& getHistory() const {
return _history;
diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp
index 85d454b702e..a70ceafc4ee 100644
--- a/src/mongo/s/catalog_cache.cpp
+++ b/src/mongo/s/catalog_cache.cpp
@@ -35,6 +35,7 @@
#include "mongo/base/status.h"
#include "mongo/base/status_with.h"
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/logical_clock.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/repl/optime_with.h"
#include "mongo/s/catalog/type_collection.h"
@@ -67,10 +68,10 @@ const int kMaxInconsistentRoutingInfoRefreshAttempts = 3;
* dropped or recreated concurrently, the caller must retry the reload up to some configurable
* number of attempts.
*/
-std::shared_ptr<ChunkManager> refreshCollectionRoutingInfo(
+std::shared_ptr<RoutingTableHistory> refreshCollectionRoutingInfo(
OperationContext* opCtx,
const NamespaceString& nss,
- std::shared_ptr<ChunkManager> existingRoutingInfo,
+ std::shared_ptr<RoutingTableHistory> existingRoutingInfo,
StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollectionAndChangedChunks) {
if (swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound) {
return nullptr;
@@ -94,13 +95,13 @@ std::shared_ptr<ChunkManager> refreshCollectionRoutingInfo(
}
return nullptr;
}();
- return ChunkManager::makeNew(nss,
- collectionAndChunks.uuid,
- KeyPattern(collectionAndChunks.shardKeyPattern),
- std::move(defaultCollator),
- collectionAndChunks.shardKeyIsUnique,
- collectionAndChunks.epoch,
- collectionAndChunks.changedChunks);
+ return RoutingTableHistory::makeNew(nss,
+ collectionAndChunks.uuid,
+ KeyPattern(collectionAndChunks.shardKeyPattern),
+ std::move(defaultCollator),
+ collectionAndChunks.shardKeyIsUnique,
+ collectionAndChunks.epoch,
+ collectionAndChunks.changedChunks);
}();
std::set<ShardId> shardIds;
@@ -129,14 +130,18 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx
}
}
+StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo(
+ OperationContext* opCtx, const NamespaceString& nss) {
+ return _getCollectionRoutingInfoAt(opCtx, nss, boost::none);
+}
+
StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfoAt(
OperationContext* opCtx, const NamespaceString& nss, Timestamp atClusterTime) {
- // TODO (GPiTR): Implement retrieving collection routing info at time
- return getCollectionRoutingInfo(opCtx, nss);
+ return _getCollectionRoutingInfoAt(opCtx, nss, atClusterTime);
}
-StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo(
- OperationContext* opCtx, const NamespaceString& nss) {
+StatusWith<CachedCollectionRoutingInfo> CatalogCache::_getCollectionRoutingInfoAt(
+ OperationContext* opCtx, const NamespaceString& nss, boost::optional<Timestamp> atClusterTime) {
while (true) {
std::shared_ptr<DatabaseInfoEntry> dbEntry;
try {
@@ -197,12 +202,14 @@ StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo(
continue;
}
+ auto cm = std::make_shared<ChunkManager>(collEntry.routingInfo, atClusterTime);
+
return {CachedCollectionRoutingInfo(
nss,
{dbEntry,
uassertStatusOK(
Grid::get(opCtx)->shardRegistry()->getShard(opCtx, dbEntry->primaryShardId))},
- collEntry.routingInfo)};
+ std::move(cm))};
}
}
@@ -347,11 +354,12 @@ std::shared_ptr<CatalogCache::DatabaseInfoEntry> CatalogCache::_getDatabase(Oper
dbDesc.getVersion()});
}
-void CatalogCache::_scheduleCollectionRefresh(WithLock lk,
- std::shared_ptr<DatabaseInfoEntry> dbEntry,
- std::shared_ptr<ChunkManager> existingRoutingInfo,
- NamespaceString const& nss,
- int refreshAttempt) {
+void CatalogCache::_scheduleCollectionRefresh(
+ WithLock lk,
+ std::shared_ptr<DatabaseInfoEntry> dbEntry,
+ std::shared_ptr<RoutingTableHistory> existingRoutingInfo,
+ NamespaceString const& nss,
+ int refreshAttempt) {
// If we have an existing chunk manager, the refresh is considered "incremental", regardless of
// how many chunks are in the differential
const bool isIncremental(existingRoutingInfo);
@@ -366,7 +374,7 @@ void CatalogCache::_scheduleCollectionRefresh(WithLock lk,
// Invoked when one iteration of getChunksSince has completed, whether with success or error
const auto onRefreshCompleted = [ this, t = Timer(), nss, isIncremental ](
- const Status& status, ChunkManager* routingInfoAfterRefresh) {
+ const Status& status, RoutingTableHistory* routingInfoAfterRefresh) {
if (isIncremental) {
_stats.numActiveIncrementalRefreshes.subtractAndFetch(1);
} else {
@@ -414,7 +422,7 @@ void CatalogCache::_scheduleCollectionRefresh(WithLock lk,
[ this, dbEntry, nss, existingRoutingInfo, onRefreshFailed, onRefreshCompleted ](
OperationContext * opCtx,
StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
- std::shared_ptr<ChunkManager> newRoutingInfo;
+ std::shared_ptr<RoutingTableHistory> newRoutingInfo;
try {
newRoutingInfo = refreshCollectionRoutingInfo(
opCtx, nss, std::move(existingRoutingInfo), std::move(swCollAndChunks));
diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h
index bd857a8b4b8..6e28f3c7167 100644
--- a/src/mongo/s/catalog_cache.h
+++ b/src/mongo/s/catalog_cache.h
@@ -160,7 +160,7 @@ private:
std::shared_ptr<Notification<Status>> refreshCompletionNotification;
// Contains the cached routing information (only available if needsRefresh is false)
- std::shared_ptr<ChunkManager> routingInfo;
+ std::shared_ptr<RoutingTableHistory> routingInfo;
};
/**
@@ -191,10 +191,15 @@ private:
*/
void _scheduleCollectionRefresh(WithLock,
std::shared_ptr<DatabaseInfoEntry> dbEntry,
- std::shared_ptr<ChunkManager> existingRoutingInfo,
+ std::shared_ptr<RoutingTableHistory> existingRoutingInfo,
NamespaceString const& nss,
int refreshAttempt);
+ StatusWith<CachedCollectionRoutingInfo> _getCollectionRoutingInfoAt(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ boost::optional<Timestamp> atClusterTime);
+
// Interface from which chunks will be retrieved
CatalogCacheLoader& _cacheLoader;
diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp
index 21067acd7e0..95842124579 100644
--- a/src/mongo/s/chunk.cpp
+++ b/src/mongo/s/chunk.cpp
@@ -65,6 +65,34 @@ Chunk::Chunk(const ChunkType& from)
_jumbo(from.getJumbo()),
_dataWritten(mkDataWritten()) {
invariantOK(from.validate());
+ if (!_history.empty()) {
+ invariant(_shardId == _history.front().getShard());
+ }
+}
+
+const ShardId& Chunk::getShardIdAt(const boost::optional<Timestamp>& ts) const {
+ // This chunk was refreshed from FCV 3.6 config server so it doesn't have history
+ // TODO: SERVER-34100 consider removing getShardIdAt completely
+ // TODO: SERVER-33781 add uassert once we do upgrade/downgrade work
+ if (_history.empty()) {
+ return _shardId;
+ }
+
+ // If the tiemstamp is not provided than we return the latest shardid
+ if (!ts) {
+ invariant(_shardId == _history.front().getShard());
+ return _history.front().getShard();
+ }
+
+ for (const auto& h : _history) {
+ if (h.getValidAfter() <= ts) {
+ return h.getShard();
+ }
+ }
+
+ uasserted(ErrorCodes::StaleChunkHistory,
+ str::stream() << "Cant find shardId the chunk belonged to at cluster time "
+ << ts.get().toString());
}
bool Chunk::containsKey(const BSONObj& shardKey) const {
diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h
index 31205aefc25..c6a6bbec550 100644
--- a/src/mongo/s/chunk.h
+++ b/src/mongo/s/chunk.h
@@ -55,13 +55,20 @@ public:
}
const ShardId& getShardId() const {
- return _shardId;
+ // TODO: SERVER-34100 - consolidate a usage of getShardAt and getShardIdAt
+ return getShardIdAt(boost::none);
}
+ const ShardId& getShardIdAt(const boost::optional<Timestamp>& ts) const;
+
ChunkVersion getLastmod() const {
return _lastmod;
}
+ const auto& getHistory() const {
+ return _history;
+ }
+
bool isJumbo() const {
return _jumbo;
}
diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp
index 7bf687c1c1a..166f9ccda95 100644
--- a/src/mongo/s/chunk_manager.cpp
+++ b/src/mongo/s/chunk_manager.cpp
@@ -45,7 +45,7 @@
namespace mongo {
namespace {
-// Used to generate sequence numbers to assign to each newly created ChunkManager
+// Used to generate sequence numbers to assign to each newly created RoutingTableHistory
AtomicUInt32 nextCMSequenceNumber(0);
void checkAllElementsAreOfType(BSONType type, const BSONObj& o) {
@@ -68,13 +68,13 @@ std::string extractKeyStringInternal(const BSONObj& shardKeyValue, Ordering orde
} // namespace
-ChunkManager::ChunkManager(NamespaceString nss,
- boost::optional<UUID> uuid,
- KeyPattern shardKeyPattern,
- std::unique_ptr<CollatorInterface> defaultCollator,
- bool unique,
- ChunkMap chunkMap,
- ChunkVersion collectionVersion)
+RoutingTableHistory::RoutingTableHistory(NamespaceString nss,
+ boost::optional<UUID> uuid,
+ KeyPattern shardKeyPattern,
+ std::unique_ptr<CollatorInterface> defaultCollator,
+ bool unique,
+ ChunkMap chunkMap,
+ ChunkVersion collectionVersion)
: _sequenceNumber(nextCMSequenceNumber.addAndFetch(1)),
_nss(std::move(nss)),
_uuid(uuid),
@@ -89,7 +89,7 @@ ChunkManager::ChunkManager(NamespaceString nss,
std::shared_ptr<Chunk> ChunkManager::findIntersectingChunk(const BSONObj& shardKey,
const BSONObj& collation) const {
- const bool hasSimpleCollation = (collation.isEmpty() && !_defaultCollator) ||
+ const bool hasSimpleCollation = (collation.isEmpty() && !_rt->getDefaultCollator()) ||
SimpleBSONObjComparator::kInstance.evaluate(collation == CollationSpec::kSimpleSpec);
if (!hasSimpleCollation) {
for (BSONElement elt : shardKey) {
@@ -100,43 +100,38 @@ std::shared_ptr<Chunk> ChunkManager::findIntersectingChunk(const BSONObj& shardK
}
}
- const auto it = _chunkMap.upper_bound(_extractKeyString(shardKey));
+ const auto it = _rt->getChunkMap().upper_bound(_rt->_extractKeyString(shardKey));
uassert(ErrorCodes::ShardKeyNotFound,
str::stream() << "Cannot target single shard using key " << shardKey,
- it != _chunkMap.end() && it->second->containsKey(shardKey));
+ it != _rt->getChunkMap().end() && it->second->containsKey(shardKey));
return it->second;
}
-std::shared_ptr<Chunk> ChunkManager::findIntersectingChunkWithSimpleCollation(
- const BSONObj& shardKey) const {
- return findIntersectingChunk(shardKey, CollationSpec::kSimpleSpec);
-}
-
bool ChunkManager::keyBelongsToShard(const BSONObj& shardKey, const ShardId& shardId) const {
if (shardKey.isEmpty())
return false;
- const auto it = _chunkMap.upper_bound(_extractKeyString(shardKey));
- if (it == _chunkMap.end())
+ const auto it = _rt->getChunkMap().upper_bound(_rt->_extractKeyString(shardKey));
+ if (it == _rt->getChunkMap().end())
return false;
invariant(it->second->containsKey(shardKey));
- return it->second->getShardId() == shardId;
+ return it->second->getShardIdAt(_clusterTime) == shardId;
}
void ChunkManager::getShardIdsForQuery(OperationContext* opCtx,
const BSONObj& query,
const BSONObj& collation,
std::set<ShardId>* shardIds) const {
- auto qr = stdx::make_unique<QueryRequest>(_nss);
+ auto qr = stdx::make_unique<QueryRequest>(_rt->getns());
qr->setFilter(query);
if (!collation.isEmpty()) {
qr->setCollation(collation);
- } else if (_defaultCollator) {
- qr->setCollation(_defaultCollator->getSpec().toBSON());
+ } else if (_rt->getDefaultCollator()) {
+ qr->setCollation(_rt->getDefaultCollator()->getSpec().toBSON());
}
const boost::intrusive_ptr<ExpressionContext> expCtx;
@@ -148,11 +143,11 @@ void ChunkManager::getShardIdsForQuery(OperationContext* opCtx,
MatchExpressionParser::kAllowAllSpecialFeatures));
// Fast path for targeting equalities on the shard key.
- auto shardKeyToFind = _shardKeyPattern.extractShardKeyFromQuery(*cq);
+ auto shardKeyToFind = _rt->getShardKeyPattern().extractShardKeyFromQuery(*cq);
if (!shardKeyToFind.isEmpty()) {
try {
auto chunk = findIntersectingChunk(shardKeyToFind, collation);
- shardIds->insert(chunk->getShardId());
+ shardIds->insert(chunk->getShardIdAt(_clusterTime));
return;
} catch (const DBException&) {
// The query uses multiple shards
@@ -165,20 +160,20 @@ void ChunkManager::getShardIdsForQuery(OperationContext* opCtx,
// Query { a : { $gte : 1, $lt : 2 },
// b : { $gte : 3, $lt : 4 } }
// => Bounds { a : [1, 2), b : [3, 4) }
- IndexBounds bounds = getIndexBoundsForQuery(_shardKeyPattern.toBSON(), *cq);
+ IndexBounds bounds = getIndexBoundsForQuery(_rt->getShardKeyPattern().toBSON(), *cq);
// Transforms bounds for each shard key field into full shard key ranges
// for example :
// Key { a : 1, b : 1 }
// Bounds { a : [1, 2), b : [3, 4) }
// => Ranges { a : 1, b : 3 } => { a : 2, b : 4 }
- BoundList ranges = _shardKeyPattern.flattenBounds(bounds);
+ BoundList ranges = _rt->getShardKeyPattern().flattenBounds(bounds);
for (BoundList::const_iterator it = ranges.begin(); it != ranges.end(); ++it) {
getShardIdsForRange(it->first /*min*/, it->second /*max*/, shardIds);
// once we know we need to visit all shards no need to keep looping
- if (shardIds->size() == _shardVersions.size()) {
+ if (shardIds->size() == _rt->_shardVersions.size()) {
break;
}
}
@@ -187,47 +182,41 @@ void ChunkManager::getShardIdsForQuery(OperationContext* opCtx,
// For now, we satisfy that assumption by adding a shard with no matches rather than returning
// an empty set of shards.
if (shardIds->empty()) {
- shardIds->insert(_chunkMap.begin()->second->getShardId());
+ shardIds->insert(_rt->getChunkMap().begin()->second->getShardIdAt(_clusterTime));
}
}
void ChunkManager::getShardIdsForRange(const BSONObj& min,
const BSONObj& max,
std::set<ShardId>* shardIds) const {
- const auto itMin = _chunkMap.upper_bound(_extractKeyString(min));
- const auto itMax = [this, &max]() {
- auto it = _chunkMap.upper_bound(_extractKeyString(max));
- return it == _chunkMap.end() ? it : ++it;
- }();
-
- for (auto it = itMin; it != itMax; ++it) {
- shardIds->insert(it->second->getShardId());
+ const auto bounds = _rt->overlappingRanges(min, max, true);
+ for (auto it = bounds.first; it != bounds.second; ++it) {
+ shardIds->insert(it->second->getShardIdAt(_clusterTime));
// No need to iterate through the rest of the ranges, because we already know we need to use
// all shards.
- if (shardIds->size() == _shardVersions.size()) {
+ if (shardIds->size() == _rt->_shardVersions.size()) {
break;
}
}
}
bool ChunkManager::rangeOverlapsShard(const ChunkRange& range, const ShardId& shardId) const {
- const auto itMin = _chunkMap.upper_bound(_extractKeyString(range.getMin()));
- const auto itMax = [this, &range]() {
- auto it = _chunkMap.lower_bound(_extractKeyString(range.getMax()));
- return it == _chunkMap.end() ? it : ++it;
- }();
- const auto it = std::find_if(
- itMin, itMax, [&shardId](const auto& scr) { return scr.second->getShardId() == shardId; });
- return it != itMax;
+ const auto bounds = _rt->overlappingRanges(range.getMin(), range.getMax(), false);
+ const auto it = std::find_if(bounds.first, bounds.second, [this, &shardId](const auto& scr) {
+ return scr.second->getShardIdAt(_clusterTime) == shardId;
+ });
+
+ return it != bounds.second;
}
ChunkManager::ConstRangeOfChunks ChunkManager::getNextChunkOnShard(const BSONObj& shardKey,
const ShardId& shardId) const {
- for (auto it = _chunkMap.upper_bound(_extractKeyString(shardKey)); it != _chunkMap.end();
+ for (auto it = _rt->getChunkMap().upper_bound(_rt->_extractKeyString(shardKey));
+ it != _rt->getChunkMap().end();
++it) {
const auto& chunk = it->second;
- if (chunk->getShardId() == shardId) {
+ if (chunk->getShardIdAt(_clusterTime) == shardId) {
const auto begin = it;
const auto end = ++it;
return {ConstChunkIterator(begin), ConstChunkIterator(end)};
@@ -237,13 +226,28 @@ ChunkManager::ConstRangeOfChunks ChunkManager::getNextChunkOnShard(const BSONObj
return {ConstChunkIterator(), ConstChunkIterator()};
}
-void ChunkManager::getAllShardIds(std::set<ShardId>* all) const {
+void RoutingTableHistory::getAllShardIds(std::set<ShardId>* all) const {
std::transform(_shardVersions.begin(),
_shardVersions.end(),
std::inserter(*all, all->begin()),
[](const ShardVersionMap::value_type& pair) { return pair.first; });
}
+std::pair<ChunkMap::const_iterator, ChunkMap::const_iterator>
+RoutingTableHistory::overlappingRanges(const BSONObj& min,
+ const BSONObj& max,
+ bool isMaxInclusive) const {
+
+ const auto itMin = _chunkMap.upper_bound(_extractKeyString(min));
+ const auto itMax = [this, &max, isMaxInclusive]() {
+ auto it = isMaxInclusive ? _chunkMap.upper_bound(_extractKeyString(max))
+ : _chunkMap.lower_bound(_extractKeyString(max));
+ return it == _chunkMap.end() ? it : ++it;
+ }();
+
+ return {itMin, itMax};
+}
+
IndexBounds ChunkManager::getIndexBoundsForQuery(const BSONObj& key,
const CanonicalQuery& canonicalQuery) {
// $text is not allowed in planning since we don't have text index on mongos.
@@ -359,13 +363,14 @@ IndexBounds ChunkManager::collapseQuerySolution(const QuerySolutionNode* node) {
return bounds;
}
-bool ChunkManager::compatibleWith(const ChunkManager& other, const ShardId& shardName) const {
+bool RoutingTableHistory::compatibleWith(const RoutingTableHistory& other,
+ const ShardId& shardName) const {
// Return true if the shard version is the same in the two chunk managers
// TODO: This doesn't need to be so strong, just major vs
return other.getVersion(shardName).equals(getVersion(shardName));
}
-ChunkVersion ChunkManager::getVersion(const ShardId& shardName) const {
+ChunkVersion RoutingTableHistory::getVersion(const ShardId& shardName) const {
auto it = _shardVersions.find(shardName);
if (it == _shardVersions.end()) {
// Shards without explicitly tracked shard versions (meaning they have no chunks) always
@@ -376,13 +381,13 @@ ChunkVersion ChunkManager::getVersion(const ShardId& shardName) const {
return it->second;
}
-std::string ChunkManager::toString() const {
+std::string RoutingTableHistory::toString() const {
StringBuilder sb;
- sb << "ChunkManager: " << _nss.ns() << " key: " << _shardKeyPattern.toString() << '\n';
+ sb << "RoutingTableHistory: " << _nss.ns() << " key: " << _shardKeyPattern.toString() << '\n';
sb << "Chunks:\n";
- for (const auto& chunk : chunks()) {
- sb << "\t" << chunk->toString() << '\n';
+ for (const auto& chunk : _chunkMap) {
+ sb << "\t" << chunk.second->toString() << '\n';
}
sb << "Shard versions:\n";
@@ -393,9 +398,9 @@ std::string ChunkManager::toString() const {
return sb.str();
}
-ShardVersionMap ChunkManager::_constructShardVersionMap(const OID& epoch,
- const ChunkMap& chunkMap,
- Ordering shardKeyOrdering) {
+ShardVersionMap RoutingTableHistory::_constructShardVersionMap(const OID& epoch,
+ const ChunkMap& chunkMap,
+ Ordering shardKeyOrdering) {
ShardVersionMap shardVersions;
ChunkMap::const_iterator current = chunkMap.cbegin();
@@ -475,11 +480,11 @@ ShardVersionMap ChunkManager::_constructShardVersionMap(const OID& epoch,
return shardVersions;
}
-std::string ChunkManager::_extractKeyString(const BSONObj& shardKeyValue) const {
+std::string RoutingTableHistory::_extractKeyString(const BSONObj& shardKeyValue) const {
return extractKeyStringInternal(shardKeyValue, _shardKeyOrdering);
}
-std::shared_ptr<ChunkManager> ChunkManager::makeNew(
+std::shared_ptr<RoutingTableHistory> RoutingTableHistory::makeNew(
NamespaceString nss,
boost::optional<UUID> uuid,
KeyPattern shardKeyPattern,
@@ -487,17 +492,17 @@ std::shared_ptr<ChunkManager> ChunkManager::makeNew(
bool unique,
OID epoch,
const std::vector<ChunkType>& chunks) {
- return ChunkManager(std::move(nss),
- std::move(uuid),
- std::move(shardKeyPattern),
- std::move(defaultCollator),
- std::move(unique),
- {},
- {0, 0, epoch})
+ return RoutingTableHistory(std::move(nss),
+ std::move(uuid),
+ std::move(shardKeyPattern),
+ std::move(defaultCollator),
+ std::move(unique),
+ {},
+ {0, 0, epoch})
.makeUpdated(chunks);
}
-std::shared_ptr<ChunkManager> ChunkManager::makeUpdated(
+std::shared_ptr<RoutingTableHistory> RoutingTableHistory::makeUpdated(
const std::vector<ChunkType>& changedChunks) {
const auto startingCollectionVersion = getVersion();
@@ -546,14 +551,14 @@ std::shared_ptr<ChunkManager> ChunkManager::makeUpdated(
return shared_from_this();
}
- return std::shared_ptr<ChunkManager>(
- new ChunkManager(_nss,
- _uuid,
- KeyPattern(getShardKeyPattern().getKeyPattern()),
- CollatorInterface::cloneCollator(getDefaultCollator()),
- isUnique(),
- std::move(chunkMap),
- collectionVersion));
+ return std::shared_ptr<RoutingTableHistory>(
+ new RoutingTableHistory(_nss,
+ _uuid,
+ KeyPattern(getShardKeyPattern().getKeyPattern()),
+ CollatorInterface::cloneCollator(getDefaultCollator()),
+ isUnique(),
+ std::move(chunkMap),
+ collectionVersion));
}
} // namespace mongo
diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h
index e56c7b0cb62..816934477d8 100644
--- a/src/mongo/s/chunk_manager.h
+++ b/src/mongo/s/chunk_manager.h
@@ -47,6 +47,7 @@ namespace mongo {
class CanonicalQuery;
struct QuerySolutionNode;
class OperationContext;
+class ChunkManager;
// Ordered map from the max for each chunk to an entry describing the chunk
using ChunkMap = std::map<std::string, std::shared_ptr<Chunk>>;
@@ -55,8 +56,167 @@ using ChunkMap = std::map<std::string, std::shared_ptr<Chunk>>;
using ShardVersionMap = std::map<ShardId, ChunkVersion>;
/**
- * In-memory representation of the routing table for a single sharded collection.
+ * In-memory representation of the routing table for a single sharded collection at various points
+ * in time.
*/
+class RoutingTableHistory : public std::enable_shared_from_this<RoutingTableHistory> {
+ MONGO_DISALLOW_COPYING(RoutingTableHistory);
+
+public:
+ /**
+ * Makes an instance with a routing table for collection "nss", sharded on
+ * "shardKeyPattern".
+ *
+ * "defaultCollator" is the default collation for the collection, "unique" indicates whether
+ * or not the shard key for each document will be globally unique, and "epoch" is the globally
+ * unique identifier for this version of the collection.
+ *
+ * The "chunks" vector must contain the chunk routing information sorted in ascending order by
+ * chunk version, and adhere to the requirements of the routing table update algorithm.
+ */
+ static std::shared_ptr<RoutingTableHistory> makeNew(
+ NamespaceString nss,
+ boost::optional<UUID>,
+ KeyPattern shardKeyPattern,
+ std::unique_ptr<CollatorInterface> defaultCollator,
+ bool unique,
+ OID epoch,
+ const std::vector<ChunkType>& chunks);
+
+ /**
+ * Constructs a new instance with a routing table updated according to the changes described
+ * in "changedChunks".
+ *
+ * The changes in "changedChunks" must be sorted in ascending order by chunk version, and adhere
+ * to the requirements of the routing table update algorithm.
+ */
+ std::shared_ptr<RoutingTableHistory> makeUpdated(const std::vector<ChunkType>& changedChunks);
+
+ /**
+ * Returns an increasing number of the reload sequence number of this chunk manager.
+ */
+ unsigned long long getSequenceNumber() const {
+ return _sequenceNumber;
+ }
+
+ const NamespaceString& getns() const {
+ return _nss;
+ }
+
+ const ShardKeyPattern& getShardKeyPattern() const {
+ return _shardKeyPattern;
+ }
+
+ const CollatorInterface* getDefaultCollator() const {
+ return _defaultCollator.get();
+ }
+
+ bool isUnique() const {
+ return _unique;
+ }
+
+ ChunkVersion getVersion() const {
+ return _collectionVersion;
+ }
+
+ ChunkVersion getVersion(const ShardId& shardId) const;
+
+ const ChunkMap& getChunkMap() const {
+ return _chunkMap;
+ }
+
+ /**
+ * Returns the ids of all shards on which the collection has any chunks.
+ */
+ void getAllShardIds(std::set<ShardId>* all) const;
+
+ /**
+ * Returns true if, for this shard, the chunks are identical in both chunk managers
+ */
+ bool compatibleWith(const RoutingTableHistory& other, const ShardId& shard) const;
+
+ std::string toString() const;
+
+ bool uuidMatches(UUID uuid) const {
+ return _uuid && *_uuid == uuid;
+ }
+
+ std::pair<ChunkMap::const_iterator, ChunkMap::const_iterator> overlappingRanges(
+ const BSONObj& min, const BSONObj& max, bool isMaxInclusive) const;
+
+
+private:
+ /**
+ * Does a single pass over the chunkMap and constructs the ShardVersionMap object.
+ */
+ static ShardVersionMap _constructShardVersionMap(const OID& epoch,
+ const ChunkMap& chunkMap,
+ Ordering shardKeyOrdering);
+
+ RoutingTableHistory(NamespaceString nss,
+ boost::optional<UUID> uuid,
+ KeyPattern shardKeyPattern,
+ std::unique_ptr<CollatorInterface> defaultCollator,
+ bool unique,
+ ChunkMap chunkMap,
+ ChunkVersion collectionVersion);
+
+ std::string _extractKeyString(const BSONObj& shardKeyValue) const;
+
+ // The shard versioning mechanism hinges on keeping track of the number of times we reload
+ // ChunkManagers.
+ const unsigned long long _sequenceNumber;
+
+ // Namespace to which this routing information corresponds
+ const NamespaceString _nss;
+
+ // The invariant UUID of the collection. This is optional in 3.6, except in change streams.
+ const boost::optional<UUID> _uuid;
+
+ // The key pattern used to shard the collection
+ const ShardKeyPattern _shardKeyPattern;
+
+ const Ordering _shardKeyOrdering;
+
+ // Default collation to use for routing data queries for this collection
+ const std::unique_ptr<CollatorInterface> _defaultCollator;
+
+ // Whether the sharding key is unique
+ const bool _unique;
+
+ // Map from the max for each chunk to an entry describing the chunk. The union of all chunks'
+ // ranges must cover the complete space from [MinKey, MaxKey).
+ const ChunkMap _chunkMap;
+
+ // Map from shard id to the maximum chunk version for that shard. If a shard contains no
+ // chunks, it won't be present in this map.
+ const ShardVersionMap _shardVersions;
+
+ // Max version across all chunks
+ const ChunkVersion _collectionVersion;
+
+ // Auto-split throttling state (state mutable by write commands)
+ struct AutoSplitThrottle {
+ public:
+ AutoSplitThrottle() : _splitTickets(maxParallelSplits) {}
+
+ TicketHolder _splitTickets;
+
+ // Maximum number of parallel threads requesting a split
+ static const int maxParallelSplits = 5;
+
+ } _autoSplitThrottle;
+
+ friend class ChunkManager;
+ // This function needs to be able to access the auto-split throttle
+ friend void updateChunkWriteStatsAndSplitIfNeeded(OperationContext*,
+ ChunkManager*,
+ Chunk*,
+ long);
+};
+
+// This will be renamed to RoutingTableHistory and the original RoutingTableHistory will be
+// ChunkHistoryMap
class ChunkManager : public std::enable_shared_from_this<ChunkManager> {
MONGO_DISALLOW_COPYING(ChunkManager);
@@ -104,69 +264,46 @@ public:
ConstChunkIterator _end;
};
- /**
- * Makes an instance with a routing table for collection "nss", sharded on
- * "shardKeyPattern".
- *
- * "defaultCollator" is the default collation for the collection, "unique" indicates whether
- * or not the shard key for each document will be globally unique, and "epoch" is the globally
- * unique identifier for this version of the collection.
- *
- * The "chunks" vector must contain the chunk routing information sorted in ascending order by
- * chunk version, and adhere to the requirements of the routing table update algorithm.
- */
- static std::shared_ptr<ChunkManager> makeNew(NamespaceString nss,
- boost::optional<UUID>,
- KeyPattern shardKeyPattern,
- std::unique_ptr<CollatorInterface> defaultCollator,
- bool unique,
- OID epoch,
- const std::vector<ChunkType>& chunks);
-
- /**
- * Constructs a new instance with a routing table updated according to the changes described
- * in "changedChunks".
- *
- * The changes in "changedChunks" must be sorted in ascending order by chunk version, and adhere
- * to the requirements of the routing table update algorithm.
- */
- std::shared_ptr<ChunkManager> makeUpdated(const std::vector<ChunkType>& changedChunks);
-
+ ChunkManager(std::shared_ptr<RoutingTableHistory> rt, boost::optional<Timestamp> clusterTime)
+ : _rt(std::move(rt)), _clusterTime(std::move(clusterTime)) {}
/**
* Returns an increasing number of the reload sequence number of this chunk manager.
*/
unsigned long long getSequenceNumber() const {
- return _sequenceNumber;
+ return _rt->getSequenceNumber();
}
const NamespaceString& getns() const {
- return _nss;
+ return _rt->getns();
}
const ShardKeyPattern& getShardKeyPattern() const {
- return _shardKeyPattern;
+ return _rt->getShardKeyPattern();
}
const CollatorInterface* getDefaultCollator() const {
- return _defaultCollator.get();
+ return _rt->getDefaultCollator();
}
bool isUnique() const {
- return _unique;
+ return _rt->isUnique();
}
ChunkVersion getVersion() const {
- return _collectionVersion;
+ return _rt->getVersion();
}
- ChunkVersion getVersion(const ShardId& shardId) const;
+ ChunkVersion getVersion(const ShardId& shardId) const {
+ return _rt->getVersion(shardId);
+ }
ConstRangeOfChunks chunks() const {
- return {ConstChunkIterator{_chunkMap.cbegin()}, ConstChunkIterator{_chunkMap.cend()}};
+ return {ConstChunkIterator{_rt->getChunkMap().cbegin()},
+ ConstChunkIterator{_rt->getChunkMap().cend()}};
}
int numChunks() const {
- return _chunkMap.size();
+ return _rt->getChunkMap().size();
}
/**
@@ -206,7 +343,9 @@ public:
/**
* Same as findIntersectingChunk, but assumes the simple collation.
*/
- std::shared_ptr<Chunk> findIntersectingChunkWithSimpleCollation(const BSONObj& shardKey) const;
+ std::shared_ptr<Chunk> findIntersectingChunkWithSimpleCollation(const BSONObj& shardKey) const {
+ return findIntersectingChunk(shardKey, CollationSpec::kSimpleSpec);
+ }
/**
* Finds the shard IDs for a given filter and collation. If collation is empty, we use the
@@ -228,7 +367,9 @@ public:
/**
* Returns the ids of all shards on which the collection has any chunks.
*/
- void getAllShardIds(std::set<ShardId>* all) const;
+ void getAllShardIds(std::set<ShardId>* all) const {
+ _rt->getAllShardIds(all);
+ }
// Transforms query into bounds for each field in the shard key
// for example :
@@ -252,81 +393,29 @@ public:
/**
* Returns true if, for this shard, the chunks are identical in both chunk managers
*/
- bool compatibleWith(const ChunkManager& other, const ShardId& shard) const;
+ bool compatibleWith(const ChunkManager& other, const ShardId& shard) const {
+ return _rt->compatibleWith(*other._rt, shard);
+ }
- std::string toString() const;
+ std::string toString() const {
+ return _rt->toString();
+ }
bool uuidMatches(UUID uuid) const {
- return _uuid && *_uuid == uuid;
+ return _rt->uuidMatches(uuid);
}
-private:
- /**
- * Does a single pass over the chunkMap and constructs the ShardVersionMap object.
- */
- static ShardVersionMap _constructShardVersionMap(const OID& epoch,
- const ChunkMap& chunkMap,
- Ordering shardKeyOrdering);
-
- ChunkManager(NamespaceString nss,
- boost::optional<UUID> uuid,
- KeyPattern shardKeyPattern,
- std::unique_ptr<CollatorInterface> defaultCollator,
- bool unique,
- ChunkMap chunkMap,
- ChunkVersion collectionVersion);
-
- std::string _extractKeyString(const BSONObj& shardKeyValue) const;
-
- // The shard versioning mechanism hinges on keeping track of the number of times we reload
- // ChunkManagers.
- const unsigned long long _sequenceNumber;
-
- // Namespace to which this routing information corresponds
- const NamespaceString _nss;
-
- // The invariant UUID of the collection. This is optional in 3.6, except in change streams.
- const boost::optional<UUID> _uuid;
-
- // The key pattern used to shard the collection
- const ShardKeyPattern _shardKeyPattern;
-
- const Ordering _shardKeyOrdering;
-
- // Default collation to use for routing data queries for this collection
- const std::unique_ptr<CollatorInterface> _defaultCollator;
-
- // Whether the sharding key is unique
- const bool _unique;
-
- // Map from the max for each chunk to an entry describing the chunk. The union of all chunks'
- // ranges must cover the complete space from [MinKey, MaxKey).
- const ChunkMap _chunkMap;
-
- // Map from shard id to the maximum chunk version for that shard. If a shard contains no
- // chunks, it won't be present in this map.
- const ShardVersionMap _shardVersions;
-
- // Max version across all chunks
- const ChunkVersion _collectionVersion;
-
- // Auto-split throttling state (state mutable by write commands)
- struct AutoSplitThrottle {
- public:
- AutoSplitThrottle() : _splitTickets(maxParallelSplits) {}
-
- TicketHolder _splitTickets;
-
- // Maximum number of parallel threads requesting a split
- static const int maxParallelSplits = 5;
+ auto& autoSplitThrottle() const {
+ return _rt->_autoSplitThrottle;
+ }
- } _autoSplitThrottle;
+ RoutingTableHistory& getRoutingHistory() const {
+ return *_rt;
+ }
- // This function needs to be able to access the auto-split throttle
- friend void updateChunkWriteStatsAndSplitIfNeeded(OperationContext*,
- ChunkManager*,
- Chunk*,
- long);
+private:
+ std::shared_ptr<RoutingTableHistory> _rt;
+ boost::optional<Timestamp> _clusterTime;
};
} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
index fc51f62d325..6fe1d46770f 100644
--- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
@@ -51,6 +51,7 @@
#include "mongo/s/commands/strategy.h"
#include "mongo/s/grid.h"
#include "mongo/s/request_types/shard_collection_gen.h"
+#include "mongo/s/write_ops/cluster_write.h"
#include "mongo/stdx/chrono.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
diff --git a/src/mongo/s/write_ops/cluster_write.cpp b/src/mongo/s/write_ops/cluster_write.cpp
index fe288bfbd56..9eab15b47cd 100644
--- a/src/mongo/s/write_ops/cluster_write.cpp
+++ b/src/mongo/s/write_ops/cluster_write.cpp
@@ -259,12 +259,12 @@ void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* opCtx,
const NamespaceString& nss = manager->getns();
- if (!manager->_autoSplitThrottle._splitTickets.tryAcquire()) {
+ if (!manager->autoSplitThrottle()._splitTickets.tryAcquire()) {
LOG(1) << "won't auto split because not enough tickets: " << nss;
return;
}
- TicketHolderReleaser releaser(&(manager->_autoSplitThrottle._splitTickets));
+ TicketHolderReleaser releaser(&(manager->autoSplitThrottle()._splitTickets));
const ChunkRange chunkRange(chunk->getMin(), chunk->getMax());