summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcos José Grillo Ramírez <marcos.grillo@mongodb.com>2020-09-05 15:01:13 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-09 22:27:23 +0000
commit377b8fe43916ff2c4e2ed35cb80548aeb8ba8c8d (patch)
treee2036dd117f4b5f2255faedc1ef02853d879ae2c
parentf9d4a15397585a8f00ea0afa9864531e1f4ed5fb (diff)
downloadmongo-377b8fe43916ff2c4e2ed35cb80548aeb8ba8c8d.tar.gz
SERVER-46199 Make the collection CatalogCache support causal consistency
This change implements the collection CatalogCache on top of the ReadThroughCache, giving it the ability to support causal consistency. As part of this change, the 'forceRefreshFromThisThread' flag has been removed from the shard's refresh methods. Co-authored-by: Tommaso Tocci <tommaso.tocci@mongodb.com> Co-authored-by: Pierlauro Sciarelli <pierlauro.sciarelli@mongodb.com> Co-authored-by: Kaloian Manassiev <kaloian.manassiev@mongodb.com>
-rw-r--r--jstests/sharding/chunk_operations_invalidate_single_shard.js1
-rw-r--r--src/mongo/db/pipeline/process_interface/common_process_interface.cpp14
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.h8
-rw-r--r--src/mongo/db/s/README.md1
-rw-r--r--src/mongo/db/s/config/configsvr_drop_collection_command.cpp4
-rw-r--r--src/mongo/db/s/config/configsvr_shard_collection_command.cpp2
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp2
-rw-r--r--src/mongo/db/s/migration_util_test.cpp8
-rw-r--r--src/mongo/db/s/set_shard_version_command.cpp14
-rw-r--r--src/mongo/db/s/shard_filtering_metadata_refresh.cpp15
-rw-r--r--src/mongo/db/s/shard_filtering_metadata_refresh.h3
-rw-r--r--src/mongo/db/s/shard_key_util.cpp23
-rw-r--r--src/mongo/db/s/shard_key_util.h4
-rw-r--r--src/mongo/s/catalog_cache.cpp762
-rw-r--r--src/mongo/s/catalog_cache.h352
-rw-r--r--src/mongo/s/catalog_cache_refresh_test.cpp16
-rw-r--r--src/mongo/s/catalog_cache_test.cpp129
-rw-r--r--src/mongo/s/catalog_cache_test_fixture.cpp20
-rw-r--r--src/mongo/s/catalog_cache_test_fixture.h11
-rw-r--r--src/mongo/s/chunk_manager.cpp108
-rw-r--r--src/mongo/s/chunk_manager.h152
-rw-r--r--src/mongo/s/chunk_manager_refresh_bm.cpp13
-rw-r--r--src/mongo/s/commands/cluster_drop_cmd.cpp4
-rw-r--r--src/mongo/s/commands/cluster_merge_chunks_cmd.cpp6
-rw-r--r--src/mongo/s/commands/cluster_move_chunk_cmd.cpp11
-rw-r--r--src/mongo/s/commands/cluster_shard_collection_cmd.cpp4
-rw-r--r--src/mongo/s/commands/cluster_split_cmd.cpp6
-rw-r--r--src/mongo/s/commands/flush_router_config_cmd.cpp2
-rw-r--r--src/mongo/s/commands/strategy.cpp16
-rw-r--r--src/mongo/s/comparable_chunk_version_test.cpp133
-rw-r--r--src/mongo/s/comparable_database_version_test.cpp8
-rw-r--r--src/mongo/s/query/cluster_find.cpp16
-rw-r--r--src/mongo/s/request_types/set_shard_version_request.h1
-rw-r--r--src/mongo/s/sessions_collection_sharded.cpp2
-rw-r--r--src/mongo/s/sharding_test_fixture_common.cpp6
-rw-r--r--src/mongo/s/sharding_test_fixture_common.h3
-rw-r--r--src/mongo/s/write_ops/chunk_manager_targeter.cpp2
-rw-r--r--src/mongo/util/invalidating_lru_cache.h53
-rw-r--r--src/mongo/util/invalidating_lru_cache_test.cpp21
-rw-r--r--src/mongo/util/read_through_cache.h18
40 files changed, 1027 insertions, 947 deletions
diff --git a/jstests/sharding/chunk_operations_invalidate_single_shard.js b/jstests/sharding/chunk_operations_invalidate_single_shard.js
index e660cec2305..30a736fcdea 100644
--- a/jstests/sharding/chunk_operations_invalidate_single_shard.js
+++ b/jstests/sharding/chunk_operations_invalidate_single_shard.js
@@ -52,6 +52,7 @@ let testSplit = () => {
const mongosCollectionVersion = getMongosCollVersion(ns);
assert.commandWorked(st.s.adminCommand({split: ns, middle: {x: -500}}));
+ assert.eq(mongosCollectionVersion, getMongosCollVersion(ns));
testColl.findOne({x: 0});
testColl.findOne({x: 1000});
diff --git a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
index 330ef41693e..b6b304c348b 100644
--- a/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp
@@ -184,15 +184,11 @@ bool CommonProcessInterface::keyPatternNamesExactPaths(const BSONObj& keyPattern
boost::optional<ChunkVersion> CommonProcessInterface::refreshAndGetCollectionVersion(
const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss) const {
- const bool forceRefreshFromThisThread = false;
- auto cm = uassertStatusOK(
- Grid::get(expCtx->opCtx)
- ->catalogCache()
- ->getCollectionRoutingInfoWithRefresh(expCtx->opCtx, nss, forceRefreshFromThisThread));
- if (cm.isSharded()) {
- return cm.getVersion();
- }
- return boost::none;
+ const auto cm = uassertStatusOK(Grid::get(expCtx->opCtx)
+ ->catalogCache()
+ ->getCollectionRoutingInfoWithRefresh(expCtx->opCtx, nss));
+
+ return cm.isSharded() ? boost::make_optional(cm.getVersion()) : boost::none;
}
std::vector<FieldPath> CommonProcessInterface::_shardKeyToDocumentKeyFields(
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h
index 13a20fee607..c63ac997a32 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.h
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.h
@@ -245,13 +245,9 @@ auto shardVersionRetry(OperationContext* opCtx,
str::stream() << "StaleConfig error on unexpected namespace. Expected "
<< nss << ", received " << staleInfo->getNss());
catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection(
- opCtx,
- nss,
- staleInfo->getVersionWanted(),
- staleInfo->getVersionReceived(),
- staleInfo->getShardId());
+ nss, staleInfo->getVersionWanted(), staleInfo->getShardId());
} else {
- catalogCache->onEpochChange(nss);
+ catalogCache->invalidateCollectionEntry_LINEARIZABLE(nss);
}
if (!logAndTestMaxRetries(e)) {
throw;
diff --git a/src/mongo/db/s/README.md b/src/mongo/db/s/README.md
index bf23835067c..a2a4547f1f8 100644
--- a/src/mongo/db/s/README.md
+++ b/src/mongo/db/s/README.md
@@ -103,7 +103,6 @@ collection or database. A full refresh occurs when:
Methods that will mark routing table cache information as stale (sharded collection).
* [invalidateShardOrEntireCollectionEntryForShardedCollection](https://github.com/mongodb/mongo/blob/62d9485657717bf61fbb870cb3d09b52b1a614dd/src/mongo/s/catalog_cache.h#L226-L236)
-* [invalidateShardForShardedCollection](https://github.com/mongodb/mongo/blob/62d9485657717bf61fbb870cb3d09b52b1a614dd/src/mongo/s/catalog_cache.h#L262-L268)
* [invalidateEntriesThatReferenceShard](https://github.com/mongodb/mongo/blob/62d9485657717bf61fbb870cb3d09b52b1a614dd/src/mongo/s/catalog_cache.h#L270-L274)
* [purgeCollection](https://github.com/mongodb/mongo/blob/62d9485657717bf61fbb870cb3d09b52b1a614dd/src/mongo/s/catalog_cache.h#L276-L280)
diff --git a/src/mongo/db/s/config/configsvr_drop_collection_command.cpp b/src/mongo/db/s/config/configsvr_drop_collection_command.cpp
index fc74fafc0c5..29b15c82a9e 100644
--- a/src/mongo/db/s/config/configsvr_drop_collection_command.cpp
+++ b/src/mongo/db/s/config/configsvr_drop_collection_command.cpp
@@ -129,7 +129,9 @@ public:
auto collDistLock = uassertStatusOK(
catalogClient->getDistLockManager()->lock(opCtx, nss.ns(), "dropCollection", waitFor));
- ON_BLOCK_EXIT([opCtx, nss] { Grid::get(opCtx)->catalogCache()->onEpochChange(nss); });
+ ON_BLOCK_EXIT([opCtx, nss] {
+ Grid::get(opCtx)->catalogCache()->invalidateCollectionEntry_LINEARIZABLE(nss);
+ });
_dropCollection(opCtx, nss);
diff --git a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
index ee992bef2a9..3af7f601e95 100644
--- a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
+++ b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
@@ -357,7 +357,7 @@ public:
result << "collectionUUID" << *uuid;
}
- catalogCache->onEpochChange(nss);
+ catalogCache->invalidateCollectionEntry_LINEARIZABLE(nss);
return true;
}
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 11bce269425..07f8f94daf9 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -466,7 +466,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig() {
"Starting post-migration commit refresh on the shard",
"migrationId"_attr = _coordinator->getMigrationId());
- forceShardFilteringMetadataRefresh(_opCtx, getNss(), true);
+ forceShardFilteringMetadataRefresh(_opCtx, getNss());
LOGV2_DEBUG_OPTIONS(4817405,
2,
diff --git a/src/mongo/db/s/migration_util_test.cpp b/src/mongo/db/s/migration_util_test.cpp
index 010f476773c..a2decb63c2d 100644
--- a/src/mongo/db/s/migration_util_test.cpp
+++ b/src/mongo/db/s/migration_util_test.cpp
@@ -522,7 +522,7 @@ TEST_F(SubmitRangeDeletionTaskTest,
_mockCatalogCacheLoader->setDatabaseRefreshReturnValue(kDefaultDatabaseType);
_mockCatalogCacheLoader->setCollectionRefreshReturnValue(
Status(ErrorCodes::NamespaceNotFound, "dummy errmsg"));
- forceShardFilteringMetadataRefresh(opCtx, kNss, true);
+ forceShardFilteringMetadataRefresh(opCtx, kNss);
auto cleanupCompleteFuture = migrationutil::submitRangeDeletionTask(opCtx, deletionTask);
@@ -553,7 +553,7 @@ TEST_F(SubmitRangeDeletionTaskTest, SucceedsIfFilteringMetadataUUIDMatchesTaskUU
_mockCatalogCacheLoader->setChunkRefreshReturnValue(
makeChangedChunks(ChunkVersion(1, 0, kEpoch)));
_mockCatalogClient->setCollections({coll});
- forceShardFilteringMetadataRefresh(opCtx, kNss, true);
+ forceShardFilteringMetadataRefresh(opCtx, kNss);
// The task should have been submitted successfully.
auto cleanupCompleteFuture = migrationutil::submitRangeDeletionTask(opCtx, deletionTask);
@@ -596,7 +596,7 @@ TEST_F(SubmitRangeDeletionTaskTest,
_mockCatalogCacheLoader->setDatabaseRefreshReturnValue(kDefaultDatabaseType);
_mockCatalogCacheLoader->setCollectionRefreshReturnValue(
Status(ErrorCodes::NamespaceNotFound, "dummy errmsg"));
- forceShardFilteringMetadataRefresh(opCtx, kNss, true);
+ forceShardFilteringMetadataRefresh(opCtx, kNss);
auto collectionUUID = createCollectionAndGetUUID(kNss);
auto deletionTask = createDeletionTask(kNss, collectionUUID, 0, 10, _myShardName);
@@ -633,7 +633,7 @@ TEST_F(SubmitRangeDeletionTaskTest,
_mockCatalogCacheLoader->setChunkRefreshReturnValue(
makeChangedChunks(ChunkVersion(1, 0, staleEpoch)));
_mockCatalogClient->setCollections({staleColl});
- forceShardFilteringMetadataRefresh(opCtx, kNss, true);
+ forceShardFilteringMetadataRefresh(opCtx, kNss);
auto collectionUUID = createCollectionAndGetUUID(kNss);
auto deletionTask = createDeletionTask(kNss, collectionUUID, 0, 10, _myShardName);
diff --git a/src/mongo/db/s/set_shard_version_command.cpp b/src/mongo/db/s/set_shard_version_command.cpp
index f8a321aea1a..aba2cd2f632 100644
--- a/src/mongo/db/s/set_shard_version_command.cpp
+++ b/src/mongo/db/s/set_shard_version_command.cpp
@@ -96,7 +96,7 @@ public:
uassertStatusOK(shardingState->canAcceptShardedCommands());
// Steps
- // 1. Set the `authoritative` and `forceRefresh` variables from the command object.
+ // 1. Set the `authoritative` variable from the command object.
//
// 2. Validate all command parameters against the info in our ShardingState, and return an
// error if they do not match.
@@ -117,12 +117,6 @@ public:
LastError::get(client).disable();
const bool authoritative = cmdObj.getBoolField("authoritative");
- // A flag that specifies whether the set shard version catalog refresh
- // is allowed to join an in-progress refresh triggered by an other
- // thread, or whether it's required to either a) trigger its own
- // refresh or b) wait for a refresh to be started after it has entered the
- // getCollectionRoutingInfoWithRefresh function
- const bool forceRefresh = cmdObj.getBoolField("forceRefresh");
// Step 2
@@ -241,11 +235,9 @@ public:
const auto status = [&] {
try {
- // TODO SERVER-48990 remove this if-else: just call onShardVersionMismatch
+ // TODO (SERVER-50812) remove this if-else: just call onShardVersionMismatch
if (requestedVersion == requestedVersion.DROPPED()) {
- // Note: The forceRefresh flag controls whether we make sure to do our own
- // refresh or if we're okay with joining another thread
- forceShardFilteringMetadataRefresh(opCtx, nss, forceRefresh);
+ forceShardFilteringMetadataRefresh(opCtx, nss);
} else {
onShardVersionMismatch(opCtx, nss, requestedVersion);
}
diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
index 1e39cd26dc8..317d80f2ec4 100644
--- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
+++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
@@ -284,7 +284,7 @@ ScopedShardVersionCriticalSection::ScopedShardVersionCriticalSection(OperationCo
migrationutil::recoverMigrationCoordinations(_opCtx, _nss);
}
- forceShardFilteringMetadataRefresh(_opCtx, _nss, true);
+ forceShardFilteringMetadataRefresh(_opCtx, _nss);
}
ScopedShardVersionCriticalSection::~ScopedShardVersionCriticalSection() {
@@ -334,9 +334,8 @@ CollectionMetadata forceGetCurrentMetadata(OperationContext* opCtx, const Namesp
invariant(shardingState->canAcceptShardedCommands());
try {
- const auto cm =
- uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(
- opCtx, nss, true));
+ const auto cm = uassertStatusOK(
+ Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, nss));
if (!cm.isSharded()) {
return CollectionMetadata();
@@ -354,8 +353,7 @@ CollectionMetadata forceGetCurrentMetadata(OperationContext* opCtx, const Namesp
}
ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx,
- const NamespaceString& nss,
- bool forceRefreshFromThisThread) {
+ const NamespaceString& nss) {
invariant(!opCtx->lockState()->isLocked());
invariant(!opCtx->getClient()->isInDirectClient());
@@ -366,9 +364,8 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx,
auto* const shardingState = ShardingState::get(opCtx);
invariant(shardingState->canAcceptShardedCommands());
- const auto cm =
- uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(
- opCtx, nss, forceRefreshFromThisThread));
+ const auto cm = uassertStatusOK(
+ Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, nss));
if (!cm.isSharded()) {
// The collection is not sharded. Avoid using AutoGetCollection() as it returns the
diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.h b/src/mongo/db/s/shard_filtering_metadata_refresh.h
index 774a370b9ef..317fab32f37 100644
--- a/src/mongo/db/s/shard_filtering_metadata_refresh.h
+++ b/src/mongo/db/s/shard_filtering_metadata_refresh.h
@@ -79,8 +79,7 @@ CollectionMetadata forceGetCurrentMetadata(OperationContext* opCtx, const Namesp
* called with a lock
*/
ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx,
- const NamespaceString& nss,
- bool forceRefreshFromThisThread = false);
+ const NamespaceString& nss);
/**
* Should be called when any client request on this shard generates a StaleDbVersion exception.
diff --git a/src/mongo/db/s/shard_key_util.cpp b/src/mongo/db/s/shard_key_util.cpp
index e216f9f682d..9b71b8e1ec9 100644
--- a/src/mongo/db/s/shard_key_util.cpp
+++ b/src/mongo/db/s/shard_key_util.cpp
@@ -230,18 +230,12 @@ void ValidationBehaviorsShardCollection::createShardKeyIndex(
ValidationBehaviorsRefineShardKey::ValidationBehaviorsRefineShardKey(OperationContext* opCtx,
const NamespaceString& nss)
- : _opCtx(opCtx) {
- const auto cm = uassertStatusOK(
- Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss));
- uassert(ErrorCodes::NamespaceNotSharded,
- str::stream() << "refineCollectionShardKey namespace " << nss.toString()
- << " is not sharded",
- cm.isSharded());
- const auto minKeyShardId = cm.getMinKeyShardIdWithSimpleCollation();
- _indexShard =
- uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, minKeyShardId));
- _cm = std::move(cm);
-}
+ : _opCtx(opCtx),
+ _cm(uassertStatusOK(
+ Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx,
+ nss))),
+ _indexShard(uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(
+ opCtx, _cm.getMinKeyShardIdWithSimpleCollation()))) {}
std::vector<BSONObj> ValidationBehaviorsRefineShardKey::loadIndexes(
const NamespaceString& nss) const {
@@ -249,8 +243,7 @@ std::vector<BSONObj> ValidationBehaviorsRefineShardKey::loadIndexes(
_opCtx,
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
nss.db().toString(),
- appendShardVersion(BSON("listIndexes" << nss.coll()),
- _cm->getVersion(_indexShard->getId())),
+ appendShardVersion(BSON("listIndexes" << nss.coll()), _cm.getVersion(_indexShard->getId())),
Milliseconds(-1));
if (indexesRes.getStatus().code() != ErrorCodes::NamespaceNotFound) {
return uassertStatusOK(indexesRes).docs;
@@ -266,7 +259,7 @@ void ValidationBehaviorsRefineShardKey::verifyUsefulNonMultiKeyIndex(
"admin",
appendShardVersion(
BSON(kCheckShardingIndexCmdName << nss.ns() << kKeyPatternField << proposedKey),
- _cm->getVersion(_indexShard->getId())),
+ _cm.getVersion(_indexShard->getId())),
Shard::RetryPolicy::kIdempotent));
if (checkShardingIndexRes.commandStatus == ErrorCodes::UnknownError) {
// CheckShardingIndex returns UnknownError if a compatible shard key index cannot be found,
diff --git a/src/mongo/db/s/shard_key_util.h b/src/mongo/db/s/shard_key_util.h
index d6e1802549c..e5ab23683eb 100644
--- a/src/mongo/db/s/shard_key_util.h
+++ b/src/mongo/db/s/shard_key_util.h
@@ -104,8 +104,10 @@ public:
private:
OperationContext* _opCtx;
+
+ ChunkManager _cm;
+
std::shared_ptr<Shard> _indexShard;
- boost::optional<ChunkManager> _cm;
};
/**
diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp
index 19846e62b48..d9c2500f2d3 100644
--- a/src/mongo/s/catalog_cache.cpp
+++ b/src/mongo/s/catalog_cache.cpp
@@ -55,6 +55,7 @@
#include "mongo/util/timer.h"
namespace mongo {
+
const OperationContext::Decoration<bool> operationShouldBlockBehindCatalogCacheRefresh =
OperationContext::declareDecoration<bool>();
@@ -68,81 +69,8 @@ namespace {
const int kMaxInconsistentRoutingInfoRefreshAttempts = 3;
const int kDatabaseCacheSize = 10000;
-/**
- * Returns whether two shard versions have a matching epoch.
- */
-bool shardVersionsHaveMatchingEpoch(boost::optional<ChunkVersion> wanted,
- const ChunkVersion& received) {
- return wanted && wanted->epoch() == received.epoch();
-};
-
-/**
- * Given an (optional) initial routing table and a set of changed chunks returned by the catalog
- * cache loader, produces a new routing table with the changes applied.
- *
- * If the collection is no longer sharded returns nullptr. If the epoch has changed, expects that
- * the 'collectionChunksList' contains the full contents of the chunks collection for that namespace
- * so that the routing table can be built from scratch.
- *
- * Throws ConflictingOperationInProgress if the chunk metadata was found to be inconsistent (not
- * containing all the necessary chunks, contains overlaps or chunks' epoch values are not the same
- * as that of the collection). Since this situation may be transient, due to the collection being
- * dropped or having its shard key refined concurrently, the caller must retry the reload up to some
- * configurable number of attempts.
- */
-std::shared_ptr<RoutingTableHistory> refreshCollectionRoutingInfo(
- OperationContext* opCtx,
- const NamespaceString& nss,
- std::shared_ptr<RoutingTableHistory> existingRoutingInfo,
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollectionAndChangedChunks) {
- if (swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound) {
- return nullptr;
- }
- const auto collectionAndChunks = uassertStatusOK(std::move(swCollectionAndChangedChunks));
-
- auto chunkManager = [&] {
- // If we have routing info already and it's for the same collection epoch, we're updating.
- // Otherwise, we're making a whole new routing table.
- if (existingRoutingInfo &&
- existingRoutingInfo->getVersion().epoch() == collectionAndChunks.epoch) {
- if (collectionAndChunks.changedChunks.size() == 1 &&
- collectionAndChunks.changedChunks[0].getVersion() ==
- existingRoutingInfo->getVersion())
- return existingRoutingInfo;
-
- return std::make_shared<RoutingTableHistory>(
- existingRoutingInfo->makeUpdated(std::move(collectionAndChunks.reshardingFields),
- collectionAndChunks.changedChunks));
- }
-
- auto defaultCollator = [&]() -> std::unique_ptr<CollatorInterface> {
- if (!collectionAndChunks.defaultCollation.isEmpty()) {
- // The collation should have been validated upon collection creation
- return uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
- ->makeFromBSON(collectionAndChunks.defaultCollation));
- }
- return nullptr;
- }();
-
- return std::make_shared<RoutingTableHistory>(
- RoutingTableHistory::makeNew(nss,
- collectionAndChunks.uuid,
- KeyPattern(collectionAndChunks.shardKeyPattern),
- std::move(defaultCollator),
- collectionAndChunks.shardKeyIsUnique,
- collectionAndChunks.epoch,
- std::move(collectionAndChunks.reshardingFields),
- collectionAndChunks.changedChunks));
- }();
-
- std::set<ShardId> shardIds;
- chunkManager->getAllShardIds(&shardIds);
- for (const auto& shardId : shardIds) {
- uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
- }
- return chunkManager;
-}
+const int kCollectionCacheSize = 10000;
} // namespace
@@ -155,7 +83,8 @@ CatalogCache::CatalogCache(ServiceContext* const service, CatalogCacheLoader& ca
options.maxThreads = 6;
return options;
}())),
- _databaseCache(service, *_executor, _cacheLoader) {
+ _databaseCache(service, *_executor, _cacheLoader),
+ _collectionCache(service, *_executor, _cacheLoader) {
_executor->startup();
}
@@ -190,111 +119,89 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx
}
}
-StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfo(OperationContext* opCtx,
- const NamespaceString& nss) {
- return _getCollectionRoutingInfo(opCtx, nss).statusWithInfo;
-}
-
-CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfoWithForcedRefresh(
- OperationContext* opCtx, const NamespaceString& nss) {
- setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true);
- _createOrGetCollectionEntryAndMarkAsNeedsRefresh(nss);
- return _getCollectionRoutingInfo(opCtx, nss);
-}
-
-CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfo(OperationContext* opCtx,
- const NamespaceString& nss) {
- return _getCollectionRoutingInfoAt(opCtx, nss, boost::none);
-}
-
-
-StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfoAt(OperationContext* opCtx,
- const NamespaceString& nss,
- Timestamp atClusterTime) {
- return _getCollectionRoutingInfoAt(opCtx, nss, atClusterTime).statusWithInfo;
-}
-
-CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfoAt(
+StatusWith<ChunkManager> CatalogCache::_getCollectionRoutingInfoAt(
OperationContext* opCtx, const NamespaceString& nss, boost::optional<Timestamp> atClusterTime) {
- invariant(!opCtx->lockState() || !opCtx->lockState()->isLocked(),
- "Do not hold a lock while refreshing the catalog cache. Doing so would potentially "
- "hold the lock during a network call, and can lead to a deadlock as described in "
- "SERVER-37398.");
- // This default value can cause a single unnecessary extra refresh if this thread did do the
- // refresh but the refresh failed, or if the database or collection was not found, but only if
- // the caller is getCollectionRoutingInfoWithRefresh with the parameter
- // forceRefreshFromThisThread set to true
- RefreshAction refreshActionTaken(RefreshAction::kDidNotPerformRefresh);
- while (true) {
+ invariant(
+ !opCtx->lockState() || !opCtx->lockState()->isLocked(),
+ "Do not hold a lock while refreshing the catalog cache. Doing so would potentially hold "
+ "the lock during a network call, and can lead to a deadlock as described in SERVER-37398.");
+
+ try {
const auto swDbInfo = getDatabase(opCtx, nss.db());
+
if (!swDbInfo.isOK()) {
if (swDbInfo == ErrorCodes::NamespaceNotFound) {
LOGV2_FOR_CATALOG_REFRESH(
- 4947102,
+ 4947103,
2,
"Invalidating cached collection entry because its database has been dropped",
"namespace"_attr = nss);
- purgeCollection(nss);
+ invalidateCollectionEntry_LINEARIZABLE(nss);
}
- return {swDbInfo.getStatus(), refreshActionTaken};
+ return swDbInfo.getStatus();
}
const auto dbInfo = std::move(swDbInfo.getValue());
- stdx::unique_lock<Latch> ul(_mutex);
-
- auto collEntry = _createOrGetCollectionEntry(ul, nss);
+ const auto cacheConsistency = gEnableFinerGrainedCatalogCacheRefresh &&
+ !operationShouldBlockBehindCatalogCacheRefresh(opCtx)
+ ? CacheCausalConsistency::kLatestCached
+ : CacheCausalConsistency::kLatestKnown;
- if (collEntry->needsRefresh &&
- (!gEnableFinerGrainedCatalogCacheRefresh || collEntry->epochHasChanged ||
- operationShouldBlockBehindCatalogCacheRefresh(opCtx))) {
+ auto collEntryFuture = _collectionCache.acquireAsync(nss, cacheConsistency);
- operationBlockedBehindCatalogCacheRefresh(opCtx) = true;
+ // If the entry is in the cache return inmediately.
+ if (collEntryFuture.isReady()) {
+ setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, false);
+ return ChunkManager(dbInfo.primaryId(),
+ dbInfo.databaseVersion(),
+ collEntryFuture.get(opCtx),
+ atClusterTime);
+ }
- auto refreshNotification = collEntry->refreshCompletionNotification;
- if (!refreshNotification) {
- refreshNotification = (collEntry->refreshCompletionNotification =
- std::make_shared<Notification<Status>>());
- _scheduleCollectionRefresh(ul, opCtx->getServiceContext(), collEntry, nss, 1);
- refreshActionTaken = RefreshAction::kPerformedRefresh;
- }
+ operationBlockedBehindCatalogCacheRefresh(opCtx) = true;
- // Wait on the notification outside of the mutex
- ul.unlock();
+ size_t acquireTries = 0;
+ Timer t;
- auto refreshStatus = [&]() {
- Timer t;
- ON_BLOCK_EXIT([&] { _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros()); });
+ while (true) {
+ try {
+ auto collEntry = collEntryFuture.get(opCtx);
+ _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros());
- try {
- const Milliseconds kReportingInterval{250};
- while (!refreshNotification->waitFor(opCtx, kReportingInterval)) {
- _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros());
- t.reset();
- }
+ setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, false);
- return refreshNotification->get(opCtx);
- } catch (const DBException& ex) {
+ return ChunkManager(dbInfo.primaryId(),
+ dbInfo.databaseVersion(),
+ std::move(collEntry),
+ atClusterTime);
+ } catch (ExceptionFor<ErrorCodes::ConflictingOperationInProgress>& ex) {
+ _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros());
+ acquireTries++;
+ if (acquireTries == kMaxInconsistentRoutingInfoRefreshAttempts) {
return ex.toStatus();
}
- }();
-
- if (!refreshStatus.isOK()) {
- return {refreshStatus, refreshActionTaken};
}
- // Once the refresh is complete, loop around to get the latest value
- continue;
+ collEntryFuture = _collectionCache.acquireAsync(nss, cacheConsistency);
+ t.reset();
}
-
- return {ChunkManager(dbInfo.primaryId(),
- dbInfo.databaseVersion(),
- collEntry->routingInfo,
- atClusterTime),
- refreshActionTaken};
+ } catch (const DBException& ex) {
+ return ex.toStatus();
}
}
+StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfo(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ return _getCollectionRoutingInfoAt(opCtx, nss, boost::none);
+}
+
+StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfoAt(OperationContext* opCtx,
+ const NamespaceString& nss,
+ Timestamp atClusterTime) {
+ return _getCollectionRoutingInfoAt(opCtx, nss, atClusterTime);
+}
+
StatusWith<CachedDatabaseInfo> CatalogCache::getDatabaseWithRefresh(OperationContext* opCtx,
StringData dbName) {
// TODO SERVER-49724: Make ReadThroughCache support StringData keys
@@ -303,32 +210,20 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabaseWithRefresh(OperationCon
}
StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfoWithRefresh(
- OperationContext* opCtx, const NamespaceString& nss, bool forceRefreshFromThisThread) {
- auto refreshResult = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss);
- // We want to ensure that we don't join an in-progress refresh because that
- // could violate causal consistency for this client. We don't need to actually perform the
- // refresh ourselves but we do need the refresh to begin *after* this function is
- // called, so calling it twice is enough regardless of what happens the
- // second time. See SERVER-33954 for reasoning.
- if (forceRefreshFromThisThread &&
- refreshResult.actionTaken == RefreshAction::kDidNotPerformRefresh) {
- refreshResult = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss);
- }
- return refreshResult.statusWithInfo;
+ OperationContext* opCtx, const NamespaceString& nss) {
+ _collectionCache.invalidate(nss);
+ setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true);
+ return getCollectionRoutingInfo(opCtx, nss);
}
StatusWith<ChunkManager> CatalogCache::getShardedCollectionRoutingInfoWithRefresh(
OperationContext* opCtx, const NamespaceString& nss) {
- auto swRoutingInfo = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss).statusWithInfo;
- if (!swRoutingInfo.isOK())
- return swRoutingInfo;
-
- auto cri(std::move(swRoutingInfo.getValue()));
- if (!cri.isSharded())
+ auto routingInfoStatus = getCollectionRoutingInfoWithRefresh(opCtx, nss);
+ if (routingInfoStatus.isOK() && !routingInfoStatus.getValue().isSharded()) {
return {ErrorCodes::NamespaceNotSharded,
str::stream() << "Collection " << nss.ns() << " is not sharded."};
-
- return cri;
+ }
+ return routingInfoStatus;
}
void CatalogCache::onStaleDatabaseVersion(const StringData dbName,
@@ -350,48 +245,49 @@ void CatalogCache::setOperationShouldBlockBehindCatalogCacheRefresh(OperationCon
if (gEnableFinerGrainedCatalogCacheRefresh) {
operationShouldBlockBehindCatalogCacheRefresh(opCtx) = shouldBlock;
}
-};
+}
void CatalogCache::invalidateShardOrEntireCollectionEntryForShardedCollection(
- OperationContext* opCtx,
const NamespaceString& nss,
- boost::optional<ChunkVersion> wantedVersion,
- const ChunkVersion& receivedVersion,
- ShardId shardId) {
- if (shardVersionsHaveMatchingEpoch(wantedVersion, receivedVersion)) {
- _createOrGetCollectionEntryAndMarkShardStale(nss, shardId);
- } else {
- _createOrGetCollectionEntryAndMarkEpochStale(nss);
+ const boost::optional<ChunkVersion>& wantedVersion,
+ const ShardId& shardId) {
+ _stats.countStaleConfigErrors.addAndFetch(1);
+
+ auto collectionEntry = _collectionCache.peekLatestCached(nss);
+ if (collectionEntry && collectionEntry->optRt) {
+ collectionEntry->optRt->setShardStale(shardId);
}
-};
-void CatalogCache::onEpochChange(const NamespaceString& nss) {
- _createOrGetCollectionEntryAndMarkEpochStale(nss);
-};
+ if (wantedVersion) {
+ _collectionCache.advanceTimeInStore(
+ nss, ComparableChunkVersion::makeComparableChunkVersion(*wantedVersion));
+ } else {
+ _collectionCache.advanceTimeInStore(
+ nss, ComparableChunkVersion::makeComparableChunkVersionForForcedRefresh());
+ }
+}
void CatalogCache::checkEpochOrThrow(const NamespaceString& nss,
- ChunkVersion targetCollectionVersion,
- const ShardId& shardId) const {
- stdx::lock_guard<Latch> lg(_mutex);
- const auto itDb = _collectionsByDb.find(nss.db());
+ const ChunkVersion& targetCollectionVersion,
+ const ShardId& shardId) {
uassert(StaleConfigInfo(nss, targetCollectionVersion, boost::none, shardId),
str::stream() << "could not act as router for " << nss.ns()
<< ", no entry for database " << nss.db(),
- itDb != _collectionsByDb.end());
+ _databaseCache.peekLatestCached(nss.db().toString()));
- auto itColl = itDb->second.find(nss.ns());
+ auto collectionValueHandle = _collectionCache.peekLatestCached(nss);
uassert(StaleConfigInfo(nss, targetCollectionVersion, boost::none, shardId),
str::stream() << "could not act as router for " << nss.ns()
<< ", no entry for collection.",
- itColl != itDb->second.end());
+ collectionValueHandle);
uassert(StaleConfigInfo(nss, targetCollectionVersion, boost::none, shardId),
str::stream() << "could not act as router for " << nss.ns() << ", wanted "
<< targetCollectionVersion.toString()
<< ", but found the collection was unsharded",
- itColl->second->routingInfo);
+ collectionValueHandle->optRt);
- auto foundVersion = itColl->second->routingInfo->getVersion();
+ auto foundVersion = collectionValueHandle->optRt->getVersion();
uassert(StaleConfigInfo(nss, targetCollectionVersion, foundVersion, shardId),
str::stream() << "could not act as router for " << nss.ns() << ", wanted "
<< targetCollectionVersion.toString() << ", but found "
@@ -399,11 +295,6 @@ void CatalogCache::checkEpochOrThrow(const NamespaceString& nss,
foundVersion.epoch() == targetCollectionVersion.epoch());
}
-void CatalogCache::invalidateShardForShardedCollection(const NamespaceString& nss,
- const ShardId& staleShardId) {
- _createOrGetCollectionEntryAndMarkShardStale(nss, staleShardId);
-}
-
void CatalogCache::invalidateEntriesThatReferenceShard(const ShardId& shardId) {
LOGV2_DEBUG(4997600,
1,
@@ -413,32 +304,24 @@ void CatalogCache::invalidateEntriesThatReferenceShard(const ShardId& shardId) {
_databaseCache.invalidateCachedValueIf(
[&](const DatabaseType& dbt) { return dbt.getPrimary() == shardId; });
- stdx::lock_guard<Latch> lg(_mutex);
-
// Invalidate collections which contain data on this shard.
- for (const auto& [db, collInfoMap] : _collectionsByDb) {
- for (const auto& [collNs, collRoutingInfoEntry] : collInfoMap) {
- if (!collRoutingInfoEntry->needsRefresh && collRoutingInfoEntry->routingInfo) {
- // The set of shards on which this collection contains chunks.
- std::set<ShardId> shardsOwningDataForCollection;
- collRoutingInfoEntry->routingInfo->getAllShardIds(&shardsOwningDataForCollection);
-
- if (shardsOwningDataForCollection.find(shardId) !=
- shardsOwningDataForCollection.end()) {
- LOGV2_DEBUG(22647,
- 3,
- "Invalidating cached collection {namespace} that has data "
- "on shard {shardId}",
- "Invalidating cached collection",
- "namespace"_attr = collNs,
- "shardId"_attr = shardId);
-
- collRoutingInfoEntry->needsRefresh = true;
- collRoutingInfoEntry->routingInfo->setShardStale(shardId);
- }
- }
- }
- }
+ _collectionCache.invalidateCachedValueIf([&](const OptionalRoutingTableHistory& ort) {
+ if (!ort.optRt)
+ return false;
+ const auto& rt = *ort.optRt;
+
+ std::set<ShardId> shardIds;
+ rt.getAllShardIds(&shardIds);
+
+ LOGV2_DEBUG(22647,
+ 3,
+ "Invalidating cached collection {namespace} that has data "
+ "on shard {shardId}",
+ "Invalidating cached collection",
+ "namespace"_attr = rt.nss(),
+ "shardId"_attr = shardId);
+ return shardIds.find(shardId) != shardIds.end();
+ });
LOGV2(22648,
"Finished invalidating databases and collections with data on shard: {shardId}",
@@ -446,46 +329,28 @@ void CatalogCache::invalidateEntriesThatReferenceShard(const ShardId& shardId) {
"shardId"_attr = shardId);
}
-void CatalogCache::purgeCollection(const NamespaceString& nss) {
- stdx::lock_guard<Latch> lg(_mutex);
-
- auto itDb = _collectionsByDb.find(nss.db());
- if (itDb == _collectionsByDb.end()) {
- return;
- }
-
- itDb->second.erase(nss.ns());
-}
-
void CatalogCache::purgeDatabase(StringData dbName) {
_databaseCache.invalidate(dbName.toString());
- stdx::lock_guard<Latch> lg(_mutex);
- _collectionsByDb.erase(dbName);
+ _collectionCache.invalidateKeyIf(
+ [&](const NamespaceString& nss) { return nss.db() == dbName; });
}
void CatalogCache::purgeAllDatabases() {
_databaseCache.invalidateAll();
- stdx::lock_guard<Latch> lg(_mutex);
- _collectionsByDb.clear();
+ _collectionCache.invalidateAll();
}
void CatalogCache::report(BSONObjBuilder* builder) const {
BSONObjBuilder cacheStatsBuilder(builder->subobjStart("catalogCache"));
- size_t numDatabaseEntries;
- size_t numCollectionEntries{0};
- {
- numDatabaseEntries = _databaseCache.getCacheInfo().size();
- stdx::lock_guard<Latch> ul(_mutex);
- for (const auto& entry : _collectionsByDb) {
- numCollectionEntries += entry.second.size();
- }
- }
+ const size_t numDatabaseEntries = _databaseCache.getCacheInfo().size();
+ const size_t numCollectionEntries = _collectionCache.getCacheInfo().size();
cacheStatsBuilder.append("numDatabaseEntries", static_cast<long long>(numDatabaseEntries));
cacheStatsBuilder.append("numCollectionEntries", static_cast<long long>(numCollectionEntries));
_stats.report(&cacheStatsBuilder);
+ _collectionCache.reportStats(&cacheStatsBuilder);
}
void CatalogCache::checkAndRecordOperationBlockedByRefresh(OperationContext* opCtx,
@@ -519,188 +384,8 @@ void CatalogCache::checkAndRecordOperationBlockedByRefresh(OperationContext* opC
}
}
-void CatalogCache::_scheduleCollectionRefresh(WithLock lk,
- ServiceContext* service,
- std::shared_ptr<CollectionRoutingInfoEntry> collEntry,
- NamespaceString const& nss,
- int refreshAttempt) {
- const auto existingRoutingInfo = collEntry->routingInfo;
-
- // If we have an existing chunk manager, the refresh is considered "incremental", regardless of
- // how many chunks are in the differential
- const bool isIncremental(existingRoutingInfo);
-
- if (isIncremental) {
- _stats.numActiveIncrementalRefreshes.addAndFetch(1);
- _stats.countIncrementalRefreshesStarted.addAndFetch(1);
- } else {
- _stats.numActiveFullRefreshes.addAndFetch(1);
- _stats.countFullRefreshesStarted.addAndFetch(1);
- }
-
- // Invoked when one iteration of getChunksSince has completed, whether with success or error
- const auto onRefreshCompleted = [this, t = Timer(), nss, isIncremental, existingRoutingInfo](
- const Status& status,
- RoutingTableHistory* routingInfoAfterRefresh) {
- if (isIncremental) {
- _stats.numActiveIncrementalRefreshes.subtractAndFetch(1);
- } else {
- _stats.numActiveFullRefreshes.subtractAndFetch(1);
- }
-
- if (!status.isOK()) {
- _stats.countFailedRefreshes.addAndFetch(1);
-
- LOGV2_OPTIONS(24103,
- {logv2::LogComponent::kShardingCatalogRefresh},
- "Error refreshing cached collection {namespace}; Took {duration} and "
- "failed due to {error}",
- "Error refreshing cached collection",
- "namespace"_attr = nss,
- "duration"_attr = Milliseconds(t.millis()),
- "error"_attr = redact(status));
- } else if (routingInfoAfterRefresh) {
- const int logLevel =
- (!existingRoutingInfo ||
- (existingRoutingInfo &&
- routingInfoAfterRefresh->getVersion() != existingRoutingInfo->getVersion()))
- ? 0
- : 1;
- LOGV2_FOR_CATALOG_REFRESH(
- 24104,
- logLevel,
- "Refreshed cached collection {namespace} to version {newVersion} from version "
- "{oldVersion}. Took {duration}",
- "Refreshed cached collection",
- "namespace"_attr = nss,
- "newVersion"_attr = routingInfoAfterRefresh->getVersion(),
- "oldVersion"_attr =
- (existingRoutingInfo
- ? (" from version " + existingRoutingInfo->getVersion().toString())
- : ""),
- "duration"_attr = Milliseconds(t.millis()));
- } else {
- LOGV2_OPTIONS(24105,
- {logv2::LogComponent::kShardingCatalogRefresh},
- "Collection {namespace} was found to be unsharded after refresh that "
- "took {duration}",
- "Collection has found to be unsharded after refresh",
- "namespace"_attr = nss,
- "duration"_attr = Milliseconds(t.millis()));
- }
- };
-
- // Invoked if getChunksSince resulted in error or threw an exception
- const auto onRefreshFailed =
- [ this, service, collEntry, nss, refreshAttempt,
- onRefreshCompleted ](WithLock lk, const Status& status) noexcept {
- onRefreshCompleted(status, nullptr);
-
- // It is possible that the metadata is being changed concurrently, so retry the
- // refresh again
- if (status == ErrorCodes::ConflictingOperationInProgress &&
- refreshAttempt < kMaxInconsistentRoutingInfoRefreshAttempts) {
- _scheduleCollectionRefresh(lk, service, collEntry, nss, refreshAttempt + 1);
- } else {
- // Leave needsRefresh to true so that any subsequent get attempts will kick off
- // another round of refresh
- collEntry->refreshCompletionNotification->set(status);
- collEntry->refreshCompletionNotification = nullptr;
- }
- };
-
- const auto refreshCallback =
- [ this, service, collEntry, nss, existingRoutingInfo, onRefreshFailed, onRefreshCompleted ](
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
-
- ThreadClient tc("CatalogCache::collectionRefresh", service);
- auto opCtx = tc->makeOperationContext();
-
- std::shared_ptr<RoutingTableHistory> newRoutingInfo;
- try {
- newRoutingInfo = refreshCollectionRoutingInfo(
- opCtx.get(), nss, std::move(existingRoutingInfo), std::move(swCollAndChunks));
-
- onRefreshCompleted(Status::OK(), newRoutingInfo.get());
- } catch (const DBException& ex) {
- stdx::lock_guard<Latch> lg(_mutex);
- onRefreshFailed(lg, ex.toStatus());
- return;
- }
-
- stdx::lock_guard<Latch> lg(_mutex);
-
- collEntry->epochHasChanged = false;
- collEntry->needsRefresh = false;
- collEntry->refreshCompletionNotification->set(Status::OK());
- collEntry->refreshCompletionNotification = nullptr;
-
- setOperationShouldBlockBehindCatalogCacheRefresh(opCtx.get(), false);
-
- // TODO(SERVER-49876): remove clang-tidy NOLINT comments.
- if (existingRoutingInfo && newRoutingInfo && // NOLINT(bugprone-use-after-move)
- existingRoutingInfo->getVersion() == // NOLINT(bugprone-use-after-move)
- newRoutingInfo->getVersion()) { // NOLINT(bugprone-use-after-move)
- // If the routingInfo hasn't changed, we need to manually reset stale shards.
- newRoutingInfo->setAllShardsRefreshed();
- }
-
- collEntry->routingInfo = std::move(newRoutingInfo);
- };
-
- const ChunkVersion startingCollectionVersion =
- (existingRoutingInfo ? existingRoutingInfo->getVersion() : ChunkVersion::UNSHARDED());
-
- LOGV2_FOR_CATALOG_REFRESH(
- 24106,
- 1,
- "Refreshing cached collection {namespace} with version {currentCollectionVersion}",
- "namespace"_attr = nss,
- "currentCollectionVersion"_attr = startingCollectionVersion);
-
- _cacheLoader.getChunksSince(nss, startingCollectionVersion)
- .thenRunOn(_executor)
- .getAsync(refreshCallback);
-
- // The routing info for this collection shouldn't change, as other threads may try to use the
- // CatalogCache while we are waiting for the refresh to complete.
- invariant(collEntry->routingInfo.get() == existingRoutingInfo.get());
-}
-
-void CatalogCache::_createOrGetCollectionEntryAndMarkEpochStale(const NamespaceString& nss) {
- stdx::lock_guard<Latch> lg(_mutex);
- auto collRoutingInfoEntry = _createOrGetCollectionEntry(lg, nss);
- collRoutingInfoEntry->needsRefresh = true;
- collRoutingInfoEntry->epochHasChanged = true;
-}
-
-void CatalogCache::_createOrGetCollectionEntryAndMarkShardStale(const NamespaceString& nss,
- const ShardId& staleShardId) {
- stdx::lock_guard<Latch> lg(_mutex);
- auto collRoutingInfoEntry = _createOrGetCollectionEntry(lg, nss);
- collRoutingInfoEntry->needsRefresh = true;
- if (collRoutingInfoEntry->routingInfo) {
- collRoutingInfoEntry->routingInfo->setShardStale(staleShardId);
- }
-}
-
-void CatalogCache::_createOrGetCollectionEntryAndMarkAsNeedsRefresh(const NamespaceString& nss) {
- stdx::lock_guard<Latch> lg(_mutex);
- auto collRoutingInfoEntry = _createOrGetCollectionEntry(lg, nss);
- collRoutingInfoEntry->needsRefresh = true;
-}
-
-std::shared_ptr<CatalogCache::CollectionRoutingInfoEntry> CatalogCache::_createOrGetCollectionEntry(
- WithLock wl, const NamespaceString& nss) {
- auto& collectionsForDb = _collectionsByDb[nss.db()];
- if (!collectionsForDb.contains(nss.ns())) {
- // TODO SERVER-46199: ensure collections cache size is capped
- // currently no routine except for dropDatabase is removing cached collection entries and
- // the cache for a specific DB can grow indefinitely.
- collectionsForDb[nss.ns()] = std::make_shared<CollectionRoutingInfoEntry>();
- }
-
- return collectionsForDb[nss.ns()];
+void CatalogCache::invalidateCollectionEntry_LINEARIZABLE(const NamespaceString& nss) {
+ _collectionCache.invalidate(nss);
}
void CatalogCache::Stats::report(BSONObjBuilder* builder) const {
@@ -708,14 +393,6 @@ void CatalogCache::Stats::report(BSONObjBuilder* builder) const {
builder->append("totalRefreshWaitTimeMicros", totalRefreshWaitTimeMicros.load());
- builder->append("numActiveIncrementalRefreshes", numActiveIncrementalRefreshes.load());
- builder->append("countIncrementalRefreshesStarted", countIncrementalRefreshesStarted.load());
-
- builder->append("numActiveFullRefreshes", numActiveFullRefreshes.load());
- builder->append("countFullRefreshesStarted", countFullRefreshesStarted.load());
-
- builder->append("countFailedRefreshes", countFailedRefreshes.load());
-
if (isMongos()) {
BSONObjBuilder operationsBlockedByRefreshBuilder(
builder->subobjStart("operationsBlockedByRefresh"));
@@ -756,7 +433,6 @@ CatalogCache::DatabaseCache::LookupResult CatalogCache::DatabaseCache::_lookupDa
OperationContext* opCtx,
const std::string& dbName,
const ComparableDatabaseVersion& previousDbVersion) {
-
// TODO (SERVER-34164): Track and increment stats for database refreshes
LOGV2_FOR_CATALOG_REFRESH(24102, 2, "Refreshing cached database entry", "db"_attr = dbName);
@@ -788,73 +464,199 @@ CatalogCache::DatabaseCache::LookupResult CatalogCache::DatabaseCache::_lookupDa
}
}
-AtomicWord<uint64_t> ComparableDatabaseVersion::_localSequenceNumSource{1ULL};
+CatalogCache::CollectionCache::CollectionCache(ServiceContext* service,
+ ThreadPoolInterface& threadPool,
+ CatalogCacheLoader& catalogCacheLoader)
+ : ReadThroughCache(_mutex,
+ service,
+ threadPool,
+ [this](OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ValueHandle& collectionHistory,
+ const ComparableChunkVersion& previousChunkVersion) {
+ return _lookupCollection(
+ opCtx, nss, collectionHistory, previousChunkVersion);
+ },
+ kCollectionCacheSize),
+ _catalogCacheLoader(catalogCacheLoader) {}
-ComparableDatabaseVersion ComparableDatabaseVersion::makeComparableDatabaseVersion(
- const DatabaseVersion& version) {
- return ComparableDatabaseVersion(version, _localSequenceNumSource.fetchAndAdd(1));
+void CatalogCache::CollectionCache::reportStats(BSONObjBuilder* builder) const {
+ _stats.report(builder);
}
-const DatabaseVersion& ComparableDatabaseVersion::getVersion() const {
- return _dbVersion;
+void CatalogCache::CollectionCache::_updateRefreshesStats(const bool isIncremental,
+ const bool add) {
+ if (add) {
+ if (isIncremental) {
+ _stats.numActiveIncrementalRefreshes.addAndFetch(1);
+ _stats.countIncrementalRefreshesStarted.addAndFetch(1);
+ } else {
+ _stats.numActiveFullRefreshes.addAndFetch(1);
+ _stats.countFullRefreshesStarted.addAndFetch(1);
+ }
+ } else {
+ if (isIncremental) {
+ _stats.numActiveIncrementalRefreshes.subtractAndFetch(1);
+ } else {
+ _stats.numActiveFullRefreshes.subtractAndFetch(1);
+ }
+ }
}
-uint64_t ComparableDatabaseVersion::getLocalSequenceNum() const {
- return _localSequenceNum;
-}
+void CatalogCache::CollectionCache::Stats::report(BSONObjBuilder* builder) const {
+ builder->append("numActiveIncrementalRefreshes", numActiveIncrementalRefreshes.load());
+ builder->append("countIncrementalRefreshesStarted", countIncrementalRefreshesStarted.load());
-BSONObj ComparableDatabaseVersion::toBSON() const {
- BSONObjBuilder builder;
- _dbVersion.getUuid().appendToBuilder(&builder, "uuid");
- builder.append("lastMod", _dbVersion.getLastMod());
- builder.append("localSequenceNum", std::to_string(_localSequenceNum));
- return builder.obj();
-}
+ builder->append("numActiveFullRefreshes", numActiveFullRefreshes.load());
+ builder->append("countFullRefreshesStarted", countFullRefreshesStarted.load());
-std::string ComparableDatabaseVersion::toString() const {
- return toBSON().toString();
+ builder->append("countFailedRefreshes", countFailedRefreshes.load());
}
+CatalogCache::CollectionCache::LookupResult CatalogCache::CollectionCache::_lookupCollection(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const RoutingTableHistoryValueHandle& existingHistory,
+ const ComparableChunkVersion& previousVersion) {
+ const bool isIncremental(existingHistory && existingHistory->optRt);
+ _updateRefreshesStats(isIncremental, true);
-CachedDatabaseInfo::CachedDatabaseInfo(DatabaseType dbt, std::shared_ptr<Shard> primaryShard)
- : _dbt(std::move(dbt)), _primaryShard(std::move(primaryShard)) {}
+ Timer t{};
+ try {
+ auto lookupVersion =
+ isIncremental ? existingHistory->optRt->getVersion() : ChunkVersion::UNSHARDED();
-const ShardId& CachedDatabaseInfo::primaryId() const {
- return _dbt.getPrimary();
+ LOGV2_FOR_CATALOG_REFRESH(4619900,
+ 1,
+ "Refreshing cached collection",
+ "namespace"_attr = nss,
+ "currentVersion"_attr = previousVersion);
+
+ auto collectionAndChunks = _catalogCacheLoader.getChunksSince(nss, lookupVersion).get();
+
+ auto newRoutingHistory = [&] {
+ // If we have routing info already and it's for the same collection epoch, we're
+ // updating. Otherwise, we're making a whole new routing table.
+ if (isIncremental &&
+ existingHistory->optRt->getVersion().epoch() == collectionAndChunks.epoch) {
+ return existingHistory->optRt->makeUpdated(collectionAndChunks.reshardingFields,
+ collectionAndChunks.changedChunks);
+ }
+
+ auto defaultCollator = [&]() -> std::unique_ptr<CollatorInterface> {
+ if (!collectionAndChunks.defaultCollation.isEmpty()) {
+ // The collation should have been validated upon collection creation
+ return uassertStatusOK(
+ CollatorFactoryInterface::get(opCtx->getServiceContext())
+ ->makeFromBSON(collectionAndChunks.defaultCollation));
+ }
+ return nullptr;
+ }();
+
+ return RoutingTableHistory::makeNew(nss,
+ collectionAndChunks.uuid,
+ KeyPattern(collectionAndChunks.shardKeyPattern),
+ std::move(defaultCollator),
+ collectionAndChunks.shardKeyIsUnique,
+ collectionAndChunks.epoch,
+ std::move(collectionAndChunks.reshardingFields),
+ collectionAndChunks.changedChunks);
+ }();
+
+ newRoutingHistory.setAllShardsRefreshed();
+
+ // Check that the shards all match with what is on the config server
+ std::set<ShardId> shardIds;
+ newRoutingHistory.getAllShardIds(&shardIds);
+ for (const auto& shardId : shardIds) {
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
+ }
+
+ const auto newVersion =
+ ComparableChunkVersion::makeComparableChunkVersion(newRoutingHistory.getVersion());
+
+ LOGV2_FOR_CATALOG_REFRESH(4619901,
+ isIncremental || newVersion != previousVersion ? 0 : 1,
+ "Refreshed cached collection",
+ "namespace"_attr = nss,
+ "newVersion"_attr = newVersion,
+ "oldVersion"_attr = previousVersion,
+ "duration"_attr = Milliseconds(t.millis()));
+ _updateRefreshesStats(isIncremental, false);
+
+ return LookupResult(OptionalRoutingTableHistory(std::move(newRoutingHistory)), newVersion);
+ } catch (const DBException& ex) {
+ _stats.countFailedRefreshes.addAndFetch(1);
+ _updateRefreshesStats(isIncremental, false);
+
+ if (ex.code() == ErrorCodes::NamespaceNotFound) {
+ LOGV2_FOR_CATALOG_REFRESH(4619902,
+ 0,
+ "Collection has found to be unsharded after refresh",
+ "namespace"_attr = nss,
+ "duration"_attr = Milliseconds(t.millis()));
+
+ return LookupResult(
+ OptionalRoutingTableHistory(),
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::UNSHARDED()));
+ }
+
+ LOGV2_FOR_CATALOG_REFRESH(4619903,
+ 0,
+ "Error refreshing cached collection",
+ "namespace"_attr = nss,
+ "duration"_attr = Milliseconds(t.millis()),
+ "error"_attr = redact(ex));
+
+ throw;
+ }
}
-bool CachedDatabaseInfo::shardingEnabled() const {
- return _dbt.getSharded();
+AtomicWord<uint64_t> ComparableDatabaseVersion::_uuidDisambiguatingSequenceNumSource{1ULL};
+
+ComparableDatabaseVersion ComparableDatabaseVersion::makeComparableDatabaseVersion(
+ const DatabaseVersion& version) {
+ return ComparableDatabaseVersion(version, _uuidDisambiguatingSequenceNumSource.fetchAndAdd(1));
}
-DatabaseVersion CachedDatabaseInfo::databaseVersion() const {
- return _dbt.getVersion();
+std::string ComparableDatabaseVersion::toString() const {
+ return str::stream() << (_dbVersion ? _dbVersion->toBSON().toString() : "NONE") << "|"
+ << _uuidDisambiguatingSequenceNum;
}
-AtomicWord<uint64_t> ComparableChunkVersion::_localSequenceNumSource{1ULL};
+bool ComparableDatabaseVersion::operator==(const ComparableDatabaseVersion& other) const {
+ if (!_dbVersion && !other._dbVersion)
+ return true; // Default constructed value
+ if (_dbVersion.is_initialized() != other._dbVersion.is_initialized())
+ return false; // One side is default constructed value
-ComparableChunkVersion ComparableChunkVersion::makeComparableChunkVersion(
- const ChunkVersion& version) {
- return ComparableChunkVersion(version, _localSequenceNumSource.fetchAndAdd(1));
+ return sameUuid(other) && (_dbVersion->getLastMod() == other._dbVersion->getLastMod());
}
-const ChunkVersion& ComparableChunkVersion::getVersion() const {
- return _chunkVersion;
+bool ComparableDatabaseVersion::operator<(const ComparableDatabaseVersion& other) const {
+ if (!_dbVersion && !other._dbVersion)
+ return false; // Default constructed value
+
+ if (_dbVersion && other._dbVersion && sameUuid(other)) {
+ return _dbVersion->getLastMod() < other._dbVersion->getLastMod();
+ } else {
+ return _uuidDisambiguatingSequenceNum < other._uuidDisambiguatingSequenceNum;
+ }
}
-uint64_t ComparableChunkVersion::getLocalSequenceNum() const {
- return _localSequenceNum;
+CachedDatabaseInfo::CachedDatabaseInfo(DatabaseType dbt, std::shared_ptr<Shard> primaryShard)
+ : _dbt(std::move(dbt)), _primaryShard(std::move(primaryShard)) {}
+
+const ShardId& CachedDatabaseInfo::primaryId() const {
+ return _dbt.getPrimary();
}
-BSONObj ComparableChunkVersion::toBSON() const {
- BSONObjBuilder builder;
- _chunkVersion.appendToCommand(&builder);
- builder.append("localSequenceNum", std::to_string(_localSequenceNum));
- return builder.obj();
+bool CachedDatabaseInfo::shardingEnabled() const {
+ return _dbt.getSharded();
}
-std::string ComparableChunkVersion::toString() const {
- return toBSON().toString();
+DatabaseVersion CachedDatabaseInfo::databaseVersion() const {
+ return _dbt.getVersion();
}
} // namespace mongo
diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h
index a957189183a..796b9e10136 100644
--- a/src/mongo/s/catalog_cache.h
+++ b/src/mongo/s/catalog_cache.h
@@ -45,8 +45,6 @@
namespace mongo {
class BSONObjBuilder;
-class CachedDatabaseInfo;
-class OperationContext;
static constexpr int kMaxNumStaleVersionRetries = 10;
@@ -64,21 +62,21 @@ extern const OperationContext::Decoration<bool> operationShouldBlockBehindCatalo
* in fact is impossible to compare two different DatabaseVersion that have different UUIDs.
*
* This class wrap a DatabaseVersion object to make it always comparable by timestamping it with a
- * node-local sequence number (_dbVersionLocalSequence).
+ * node-local sequence number (_uuidDisambiguatingSequenceNum).
*
* This class class should go away once a cluster-wide comparable DatabaseVersion will be
* implemented.
*/
class ComparableDatabaseVersion {
public:
- /*
- * Create a ComparableDatabaseVersion that wraps the given DatabaseVersion.
- * Each object created through this method will have a local sequence number grater then the
+ /**
+ * Creates a ComparableDatabaseVersion that wraps the given DatabaseVersion.
+ * Each object created through this method will have a local sequence number greater than the
* previously created ones.
*/
static ComparableDatabaseVersion makeComparableDatabaseVersion(const DatabaseVersion& version);
- /*
+ /**
* Empty constructor needed by the ReadThroughCache.
*
* Instances created through this constructor will be always less then the ones created through
@@ -86,39 +84,28 @@ public:
*/
ComparableDatabaseVersion() = default;
- const DatabaseVersion& getVersion() const;
-
- uint64_t getLocalSequenceNum() const;
-
- BSONObj toBSON() const;
+ const DatabaseVersion& getVersion() const {
+ return *_dbVersion;
+ }
std::string toString() const;
- // Rerturns true if the two versions have the same UUID
bool sameUuid(const ComparableDatabaseVersion& other) const {
- return _dbVersion.getUuid() == other._dbVersion.getUuid();
+ return _dbVersion->getUuid() == other._dbVersion->getUuid();
}
- bool operator==(const ComparableDatabaseVersion& other) const {
- return sameUuid(other) && (_dbVersion.getLastMod() == other._dbVersion.getLastMod());
- }
+ bool operator==(const ComparableDatabaseVersion& other) const;
bool operator!=(const ComparableDatabaseVersion& other) const {
return !(*this == other);
}
- /*
- * In the case the two compared instances have different UUIDs the most recently created one
- * will be grater, otherwise the comparision will be driven by the lastMod field of the
- * underlying DatabaseVersion.
+ /**
+ * In case the two compared instances have different UUIDs, the most recently created one will
+ * be greater, otherwise the comparison will be driven by the lastMod field of the underlying
+ * DatabaseVersion.
*/
- bool operator<(const ComparableDatabaseVersion& other) const {
- if (sameUuid(other)) {
- return _dbVersion.getLastMod() < other._dbVersion.getLastMod();
- } else {
- return _localSequenceNum < other._localSequenceNum;
- }
- }
+ bool operator<(const ComparableDatabaseVersion& other) const;
bool operator>(const ComparableDatabaseVersion& other) const {
return other < *this;
@@ -133,92 +120,18 @@ public:
}
private:
- static AtomicWord<uint64_t> _localSequenceNumSource;
+ static AtomicWord<uint64_t> _uuidDisambiguatingSequenceNumSource;
+
+ ComparableDatabaseVersion(const DatabaseVersion& version,
+ uint64_t uuidDisambiguatingSequenceNum)
+ : _dbVersion(version), _uuidDisambiguatingSequenceNum(uuidDisambiguatingSequenceNum) {}
- ComparableDatabaseVersion(const DatabaseVersion& version, uint64_t localSequenceNum)
- : _dbVersion(version), _localSequenceNum(localSequenceNum) {}
+ boost::optional<DatabaseVersion> _dbVersion;
- DatabaseVersion _dbVersion;
// Locally incremented sequence number that allows to compare two database versions with
// different UUIDs. Each new comparableDatabaseVersion will have a greater sequence number then
// the ones created before.
- uint64_t _localSequenceNum{0};
-};
-
-/**
- * Constructed to be used exclusively by the CatalogCache as a vector clock (Time) to drive
- * CollectionCache's lookups.
- *
- * The ChunkVersion class contains an non comparable epoch, which makes impossible to compare two
- * ChunkVersions when their epochs's differ.
- *
- * This class wraps a ChunkVersion object with a node-local sequence number (_localSequenceNum) that
- * allows the comparision.
- *
- * This class should go away once a cluster-wide comparable ChunkVersion is implemented.
- */
-class ComparableChunkVersion {
-public:
- static ComparableChunkVersion makeComparableChunkVersion(const ChunkVersion& version);
-
- ComparableChunkVersion() = default;
-
- const ChunkVersion& getVersion() const;
-
- uint64_t getLocalSequenceNum() const;
-
- BSONObj toBSON() const;
-
- std::string toString() const;
-
- bool sameEpoch(const ComparableChunkVersion& other) const {
- return _chunkVersion.epoch() == other._chunkVersion.epoch();
- }
-
- bool operator==(const ComparableChunkVersion& other) const {
- return sameEpoch(other) &&
- (_chunkVersion.majorVersion() == other._chunkVersion.majorVersion() &&
- _chunkVersion.minorVersion() == other._chunkVersion.minorVersion());
- }
-
- bool operator!=(const ComparableChunkVersion& other) const {
- return !(*this == other);
- }
-
- bool operator<(const ComparableChunkVersion& other) const {
- if (sameEpoch(other)) {
- return _chunkVersion.majorVersion() < other._chunkVersion.majorVersion() ||
- (_chunkVersion.majorVersion() == other._chunkVersion.majorVersion() &&
- _chunkVersion.minorVersion() < other._chunkVersion.minorVersion());
- } else {
- return _localSequenceNum < other._localSequenceNum;
- }
- }
-
- bool operator>(const ComparableChunkVersion& other) const {
- return other < *this;
- }
-
- bool operator<=(const ComparableChunkVersion& other) const {
- return !(*this > other);
- }
-
- bool operator>=(const ComparableChunkVersion& other) const {
- return !(*this < other);
- }
-
-private:
- static AtomicWord<uint64_t> _localSequenceNumSource;
-
- ComparableChunkVersion(const ChunkVersion& version, uint64_t localSequenceNum)
- : _chunkVersion(version), _localSequenceNum(localSequenceNum) {}
-
- ChunkVersion _chunkVersion;
-
- // Locally incremented sequence number that allows to compare two colection versions with
- // different epochs. Each new comparableChunkVersion will have a greater sequence number than
- // the ones created before.
- uint64_t _localSequenceNum{0};
+ uint64_t _uuidDisambiguatingSequenceNum{0};
};
/**
@@ -298,21 +211,9 @@ public:
/**
* Same as getCollectionRoutingInfo above, but in addition causes the namespace to be refreshed.
- *
- * When forceRefreshFromThisThread is false, it's possible for this call to
- * join an ongoing refresh from another thread forceRefreshFromThisThread.
- * forceRefreshFromThisThread checks whether it joined another thread and
- * then forces it to try again, which is necessary in cases where calls to
- * getCollectionRoutingInfoWithRefresh must be causally consistent
- *
- * TODO: Remove this parameter in favor of using collection creation time +
- * collection version to decide when a refresh is necessary and provide
- * proper causal consistency
*/
- StatusWith<ChunkManager> getCollectionRoutingInfoWithRefresh(
- OperationContext* opCtx,
- const NamespaceString& nss,
- bool forceRefreshFromThisThread = false);
+ StatusWith<ChunkManager> getCollectionRoutingInfoWithRefresh(OperationContext* opCtx,
+ const NamespaceString& nss);
/**
* Same as getCollectionRoutingInfoWithRefresh above, but in addition returns a
@@ -333,11 +234,6 @@ public:
const boost::optional<DatabaseVersion>& wantedVersion);
/**
- * Gets whether this operation should block behind a catalog cache refresh.
- */
- static bool getOperationShouldBlockBehindCatalogCacheRefresh(OperationContext* opCtx);
-
- /**
* Sets whether this operation should block behind a catalog cache refresh.
*/
static void setOperationShouldBlockBehindCatalogCacheRefresh(OperationContext* opCtx,
@@ -349,18 +245,9 @@ public:
* requests to block on an upcoming catalog cache refresh.
*/
void invalidateShardOrEntireCollectionEntryForShardedCollection(
- OperationContext* opCtx,
const NamespaceString& nss,
- boost::optional<ChunkVersion> wantedVersion,
- const ChunkVersion& receivedVersion,
- ShardId shardId);
-
- /**
- * Non-blocking method that marks the current collection entry for the namespace as needing
- * refresh due to an epoch change. Will cause all further targetting attempts for this
- * namespace to block on a catalog cache refresh.
- */
- void onEpochChange(const NamespaceString& nss);
+ const boost::optional<ChunkVersion>& wantedVersion,
+ const ShardId& shardId);
/**
* Throws a StaleConfigException if this catalog cache does not have an entry for the given
@@ -370,16 +257,8 @@ public:
* version to throw a StaleConfigException.
*/
void checkEpochOrThrow(const NamespaceString& nss,
- ChunkVersion targetCollectionVersion,
- const ShardId& shardId) const;
-
- /**
- * Non-blocking method, which invalidates the shard for the routing table for the specified
- * namespace. If that shard is targetted in the future, getCollectionRoutingInfo will wait on a
- * refresh.
- */
- void invalidateShardForShardedCollection(const NamespaceString& nss,
- const ShardId& staleShardId);
+ const ChunkVersion& targetCollectionVersion,
+ const ShardId& shardId);
/**
* Non-blocking method, which invalidates all namespaces which contain data on the specified
@@ -388,12 +267,6 @@ public:
void invalidateEntriesThatReferenceShard(const ShardId& shardId);
/**
- * Non-blocking method, which removes the entire specified collection from the cache (resulting
- * in full refresh on subsequent access)
- */
- void purgeCollection(const NamespaceString& nss);
-
- /**
* Non-blocking method, which removes the entire specified database (including its collections)
* from the cache.
*/
@@ -416,35 +289,17 @@ public:
*/
void checkAndRecordOperationBlockedByRefresh(OperationContext* opCtx, mongo::LogicalOp opType);
+ /**
+ * Non-blocking method that marks the current collection entry for the namespace as needing
+ * refresh. Will cause all further targetting attempts to block on a catalog cache refresh,
+ * even if they do not require causal consistency.
+ */
+ void invalidateCollectionEntry_LINEARIZABLE(const NamespaceString& nss);
+
private:
// Make the cache entries friends so they can access the private classes below
friend class CachedDatabaseInfo;
- /**
- * Cache entry describing a collection.
- */
- struct CollectionRoutingInfoEntry {
- CollectionRoutingInfoEntry() = default;
- // Disable copy (and move) semantics
- CollectionRoutingInfoEntry(const CollectionRoutingInfoEntry&) = delete;
- CollectionRoutingInfoEntry& operator=(const CollectionRoutingInfoEntry&) = delete;
-
- // Specifies whether this cache entry needs a refresh (in which case routingInfo should not
- // be relied on) or it doesn't, in which case there should be a non-null routingInfo.
- bool needsRefresh{true};
-
- // Specifies whether the namespace has had an epoch change, which indicates that every
- // shard should block on an upcoming refresh.
- bool epochHasChanged{true};
-
- // Contains a notification to be waited on for the refresh to complete (only available if
- // needsRefresh is true)
- std::shared_ptr<Notification<Status>> refreshCompletionNotification;
-
- // Contains the cached routing information (only available if needsRefresh is false)
- std::shared_ptr<RoutingTableHistory> routingInfo;
- };
-
class DatabaseCache
: public ReadThroughCache<std::string, DatabaseType, ComparableDatabaseVersion> {
public:
@@ -461,88 +316,54 @@ private:
Mutex _mutex = MONGO_MAKE_LATCH("DatabaseCache::_mutex");
};
- /**
- * Non-blocking call which schedules an asynchronous refresh for the specified namespace. The
- * namespace must be in the 'needRefresh' state.
- */
- void _scheduleCollectionRefresh(WithLock,
- ServiceContext* service,
- std::shared_ptr<CollectionRoutingInfoEntry> collEntry,
- NamespaceString const& nss,
- int refreshAttempt);
+ class CollectionCache : public RoutingTableHistoryCache {
+ public:
+ CollectionCache(ServiceContext* service,
+ ThreadPoolInterface& threadPool,
+ CatalogCacheLoader& catalogCacheLoader);
- /**
- * Marks a collection entry as needing refresh. Will create the collection entry if one does
- * not exist. Also marks the epoch as changed, which will cause all further targetting requests
- * against this namespace to block upon a catalog cache refresh.
- */
- void _createOrGetCollectionEntryAndMarkEpochStale(const NamespaceString& nss);
+ void reportStats(BSONObjBuilder* builder) const;
- /**
- * Marks a collection entry as needing refresh. Will create the collection entry if one does
- * not exist. Will mark the given shard ID as stale, which will cause all further targetting
- * requests for the given shard for this namespace to block upon a catalog cache refresh.
- */
- void _createOrGetCollectionEntryAndMarkShardStale(const NamespaceString& nss,
- const ShardId& shardId);
+ private:
+ LookupResult _lookupCollection(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ValueHandle& collectionHistory,
+ const ComparableChunkVersion& previousChunkVersion);
- /**
- * Marks a collection entry as needing refresh. Will create the collection entry if one does
- * not exist.
- */
- void _createOrGetCollectionEntryAndMarkAsNeedsRefresh(const NamespaceString& nss);
+ CatalogCacheLoader& _catalogCacheLoader;
+ Mutex _mutex = MONGO_MAKE_LATCH("CollectionCache::_mutex");
- /**
- * Retrieves the collection entry for the given namespace, creating the entry if one does not
- * already exist.
- */
- std::shared_ptr<CollectionRoutingInfoEntry> _createOrGetCollectionEntry(
- WithLock wl, const NamespaceString& nss);
+ struct Stats {
+ // Tracks how many incremental refreshes are waiting to complete currently
+ AtomicWord<long long> numActiveIncrementalRefreshes{0};
- /**
- * Used as a flag to indicate whether or not this thread performed its own
- * refresh for certain helper functions
- *
- * kPerformedRefresh is used only when the calling thread performed the
- * refresh *itself*
- *
- * kDidNotPerformRefresh is used either when there was an error or when
- * this thread joined an ongoing refresh
- */
- enum class RefreshAction {
- kPerformedRefresh,
- kDidNotPerformRefresh,
- };
+ // Cumulative, always-increasing counter of how many incremental refreshes have been
+ // kicked off
+ AtomicWord<long long> countIncrementalRefreshesStarted{0};
- /**
- * Return type for helper functions performing refreshes so that they can
- * indicate both status and whether or not this thread performed its own
- * refresh
- */
- struct RefreshResult {
- // Status containing result of refresh
- StatusWith<ChunkManager> statusWithInfo;
- RefreshAction actionTaken;
- };
+ // Tracks how many full refreshes are waiting to complete currently
+ AtomicWord<long long> numActiveFullRefreshes{0};
- /**
- * Retrieves the collection routing info for this namespace after blocking on a catalog cache
- * refresh.
- */
- CatalogCache::RefreshResult _getCollectionRoutingInfoWithForcedRefresh(
- OperationContext* opctx, const NamespaceString& nss);
+ // Cumulative, always-increasing counter of how many full refreshes have been kicked off
+ AtomicWord<long long> countFullRefreshesStarted{0};
- /**
- * Helper function used when we need the refresh action taken (e.g. when we
- * want to force refresh)
- */
- CatalogCache::RefreshResult _getCollectionRoutingInfo(OperationContext* opCtx,
- const NamespaceString& nss);
+ // Cumulative, always-increasing counter of how many full or incremental refreshes
+ // failed for whatever reason
+ AtomicWord<long long> countFailedRefreshes{0};
- CatalogCache::RefreshResult _getCollectionRoutingInfoAt(
- OperationContext* opCtx,
- const NamespaceString& nss,
- boost::optional<Timestamp> atClusterTime);
+ /**
+ * Reports the accumulated statistics for serverStatus.
+ */
+ void report(BSONObjBuilder* builder) const;
+
+ } _stats;
+
+ void _updateRefreshesStats(const bool isIncremental, const bool add);
+ };
+
+ StatusWith<ChunkManager> _getCollectionRoutingInfoAt(OperationContext* opCtx,
+ const NamespaceString& nss,
+ boost::optional<Timestamp> atClusterTime);
// Interface from which chunks will be retrieved
CatalogCacheLoader& _cacheLoader;
@@ -557,23 +378,6 @@ private:
// combined
AtomicWord<long long> totalRefreshWaitTimeMicros{0};
- // Tracks how many incremental refreshes are waiting to complete currently
- AtomicWord<long long> numActiveIncrementalRefreshes{0};
-
- // Cumulative, always-increasing counter of how many incremental refreshes have been kicked
- // off
- AtomicWord<long long> countIncrementalRefreshesStarted{0};
-
- // Tracks how many full refreshes are waiting to complete currently
- AtomicWord<long long> numActiveFullRefreshes{0};
-
- // Cumulative, always-increasing counter of how many full refreshes have been kicked off
- AtomicWord<long long> countFullRefreshesStarted{0};
-
- // Cumulative, always-increasing counter of how many full or incremental refreshes failed
- // for whatever reason
- AtomicWord<long long> countFailedRefreshes{0};
-
// Cumulative, always-increasing counter of how many operations have been blocked by a
// catalog cache refresh. Broken down by operation type to match the operations tracked
// by the OpCounters class.
@@ -595,15 +399,9 @@ private:
std::shared_ptr<ThreadPool> _executor;
-
DatabaseCache _databaseCache;
- // Mutex to serialize access to the collection cache
- mutable Mutex _mutex = MONGO_MAKE_LATCH("CatalogCache::_mutex");
- // Map from full collection name to the routing info for that collection, grouped by database
- using CollectionInfoMap = StringMap<std::shared_ptr<CollectionRoutingInfoEntry>>;
- using CollectionsByDbMap = StringMap<CollectionInfoMap>;
- CollectionsByDbMap _collectionsByDb;
+ CollectionCache _collectionCache;
};
} // namespace mongo
diff --git a/src/mongo/s/catalog_cache_refresh_test.cpp b/src/mongo/s/catalog_cache_refresh_test.cpp
index 70b56845eb1..1e21135a15b 100644
--- a/src/mongo/s/catalog_cache_refresh_test.cpp
+++ b/src/mongo/s/catalog_cache_refresh_test.cpp
@@ -440,7 +440,7 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadMissingChunkWithLowestVersion) {
ASSERT_EQ(1, initialRoutingInfo.numChunks());
- auto future = scheduleRoutingInfoForcedRefresh(kNss);
+ auto future = scheduleRoutingInfoIncrementalRefresh(kNss);
const auto incompleteChunks = [&]() {
ChunkVersion version(1, 0, epoch);
@@ -497,7 +497,7 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadMissingChunkWithHighestVersion) {
ASSERT_EQ(1, initialRoutingInfo.numChunks());
- auto future = scheduleRoutingInfoForcedRefresh(kNss);
+ auto future = scheduleRoutingInfoIncrementalRefresh(kNss);
const auto incompleteChunks = [&]() {
ChunkVersion version(1, 0, epoch);
@@ -551,7 +551,7 @@ TEST_F(CatalogCacheRefreshTest, ChunkEpochChangeDuringIncrementalLoad) {
auto initialRoutingInfo(makeChunkManager(kNss, shardKeyPattern, nullptr, true, {}));
ASSERT_EQ(1, initialRoutingInfo.numChunks());
- auto future = scheduleRoutingInfoForcedRefresh(kNss);
+ auto future = scheduleRoutingInfoIncrementalRefresh(kNss);
ChunkVersion version = initialRoutingInfo.getVersion();
@@ -598,7 +598,7 @@ TEST_F(CatalogCacheRefreshTest, ChunkEpochChangeDuringIncrementalLoadRecoveryAft
setupNShards(2);
- auto future = scheduleRoutingInfoForcedRefresh(kNss);
+ auto future = scheduleRoutingInfoIncrementalRefresh(kNss);
ChunkVersion oldVersion = initialRoutingInfo.getVersion();
const OID newEpoch = OID::gen();
@@ -683,7 +683,7 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterCollectionEpochChange) {
setupNShards(2);
- auto future = scheduleRoutingInfoForcedRefresh(kNss);
+ auto future = scheduleRoutingInfoIncrementalRefresh(kNss);
ChunkVersion newVersion(1, 0, OID::gen());
@@ -730,7 +730,7 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterSplit) {
ChunkVersion version = initialRoutingInfo.getVersion();
- auto future = scheduleRoutingInfoForcedRefresh(kNss);
+ auto future = scheduleRoutingInfoIncrementalRefresh(kNss);
expectGetCollection(version.epoch(), shardKeyPattern);
@@ -776,7 +776,7 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMoveWithReshardingFieldsAdde
ChunkVersion version = initialRoutingInfo.getVersion();
- auto future = scheduleRoutingInfoForcedRefresh(kNss);
+ auto future = scheduleRoutingInfoIncrementalRefresh(kNss);
ChunkVersion expectedDestShardVersion;
@@ -824,7 +824,7 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMoveLastChunkWithReshardingF
ChunkVersion version = initialRoutingInfo.getVersion();
- auto future = scheduleRoutingInfoForcedRefresh(kNss);
+ auto future = scheduleRoutingInfoIncrementalRefresh(kNss);
// The collection type won't have resharding fields this time.
expectGetCollection(version.epoch(), shardKeyPattern);
diff --git a/src/mongo/s/catalog_cache_test.cpp b/src/mongo/s/catalog_cache_test.cpp
index fce177bdd4f..8fdb461aca3 100644
--- a/src/mongo/s/catalog_cache_test.cpp
+++ b/src/mongo/s/catalog_cache_test.cpp
@@ -35,6 +35,7 @@
#include "mongo/s/catalog_cache.h"
#include "mongo/s/catalog_cache_loader_mock.h"
#include "mongo/s/sharding_router_test_fixture.h"
+#include "mongo/s/stale_exception.h"
namespace mongo {
namespace {
@@ -72,7 +73,54 @@ protected:
_catalogCacheLoader->setDatabaseRefreshReturnValue(kErrorStatus);
}
+ void loadCollection(const ChunkVersion& version) {
+ const auto coll = makeCollectionType(version);
+ _catalogCacheLoader->setCollectionRefreshReturnValue(coll);
+ _catalogCacheLoader->setChunkRefreshReturnValue(makeChunks(version));
+
+ const auto swChunkManager =
+ _catalogCache->getCollectionRoutingInfo(operationContext(), coll.getNs());
+ ASSERT_OK(swChunkManager.getStatus());
+
+ // Reset the loader return values to avoid false positive results
+ _catalogCacheLoader->setCollectionRefreshReturnValue(kErrorStatus);
+ _catalogCacheLoader->setChunkRefreshReturnValue(kErrorStatus);
+ }
+
+ void loadUnshardedCollection(const NamespaceString& nss) {
+ _catalogCacheLoader->setCollectionRefreshReturnValue(
+ Status(ErrorCodes::NamespaceNotFound, "collection not found"));
+
+ const auto swChunkManager =
+ _catalogCache->getCollectionRoutingInfo(operationContext(), nss);
+ ASSERT_OK(swChunkManager.getStatus());
+
+ // Reset the loader return value to avoid false positive results
+ _catalogCacheLoader->setCollectionRefreshReturnValue(kErrorStatus);
+ }
+
+ std::vector<ChunkType> makeChunks(ChunkVersion version) {
+ ChunkType chunk(kNss,
+ {kShardKeyPattern.getKeyPattern().globalMin(),
+ kShardKeyPattern.getKeyPattern().globalMax()},
+ version,
+ {"0"});
+ chunk.setName(OID::gen());
+ return {chunk};
+ }
+
+ CollectionType makeCollectionType(const ChunkVersion& collVersion) {
+ CollectionType coll;
+ coll.setNs(kNss);
+ coll.setEpoch(collVersion.epoch());
+ coll.setKeyPattern(kShardKeyPattern.getKeyPattern());
+ coll.setUnique(false);
+ return coll;
+ }
+
const NamespaceString kNss{"catalgoCacheTestDB.foo"};
+ const std::string kPattern{"_id"};
+ const ShardKeyPattern kShardKeyPattern{BSON(kPattern << 1)};
const int kDummyPort{12345};
const HostAndPort kConfigHostAndPort{"DummyConfig", kDummyPort};
const std::vector<ShardId> kShards{{"0"}, {"1"}};
@@ -129,5 +177,86 @@ TEST_F(CatalogCacheTest, InvalidateSingleDbOnShardRemoval) {
ASSERT_EQ(cachedDb.primaryId(), kShards[1]);
}
+TEST_F(CatalogCacheTest, CheckEpochNoDatabase) {
+ const auto collVersion = ChunkVersion(1, 0, OID::gen());
+ ASSERT_THROWS_WITH_CHECK(_catalogCache->checkEpochOrThrow(kNss, collVersion, kShards[0]),
+ StaleConfigException,
+ [&](const StaleConfigException& ex) {
+ const auto staleInfo = ex.extraInfo<StaleConfigInfo>();
+ ASSERT(staleInfo);
+ ASSERT_EQ(staleInfo->getNss(), kNss);
+ ASSERT_EQ(staleInfo->getVersionReceived(), collVersion);
+ ASSERT_EQ(staleInfo->getShardId(), kShards[0]);
+ ASSERT(staleInfo->getVersionWanted() == boost::none);
+ });
+}
+
+TEST_F(CatalogCacheTest, CheckEpochNoCollection) {
+ const auto dbVersion = DatabaseVersion();
+ const auto collVersion = ChunkVersion(1, 0, OID::gen());
+
+ loadDatabases({DatabaseType(kNss.db().toString(), kShards[0], true, dbVersion)});
+ ASSERT_THROWS_WITH_CHECK(_catalogCache->checkEpochOrThrow(kNss, collVersion, kShards[0]),
+ StaleConfigException,
+ [&](const StaleConfigException& ex) {
+ const auto staleInfo = ex.extraInfo<StaleConfigInfo>();
+ ASSERT(staleInfo);
+ ASSERT_EQ(staleInfo->getNss(), kNss);
+ ASSERT_EQ(staleInfo->getVersionReceived(), collVersion);
+ ASSERT_EQ(staleInfo->getShardId(), kShards[0]);
+ ASSERT(staleInfo->getVersionWanted() == boost::none);
+ });
+}
+
+TEST_F(CatalogCacheTest, CheckEpochUnshardedCollection) {
+ const auto dbVersion = DatabaseVersion();
+ const auto collVersion = ChunkVersion(1, 0, OID::gen());
+
+ loadDatabases({DatabaseType(kNss.db().toString(), kShards[0], true, dbVersion)});
+ loadUnshardedCollection(kNss);
+ ASSERT_THROWS_WITH_CHECK(_catalogCache->checkEpochOrThrow(kNss, collVersion, kShards[0]),
+ StaleConfigException,
+ [&](const StaleConfigException& ex) {
+ const auto staleInfo = ex.extraInfo<StaleConfigInfo>();
+ ASSERT(staleInfo);
+ ASSERT_EQ(staleInfo->getNss(), kNss);
+ ASSERT_EQ(staleInfo->getVersionReceived(), collVersion);
+ ASSERT_EQ(staleInfo->getShardId(), kShards[0]);
+ ASSERT(staleInfo->getVersionWanted() == boost::none);
+ });
+}
+
+TEST_F(CatalogCacheTest, CheckEpochWithMismatch) {
+ const auto dbVersion = DatabaseVersion();
+ const auto wantedCollVersion = ChunkVersion(1, 0, OID::gen());
+ const auto receivedCollVersion = ChunkVersion(1, 0, OID::gen());
+
+ loadDatabases({DatabaseType(kNss.db().toString(), kShards[0], true, dbVersion)});
+ loadCollection(wantedCollVersion);
+
+ ASSERT_THROWS_WITH_CHECK(
+ _catalogCache->checkEpochOrThrow(kNss, receivedCollVersion, kShards[0]),
+ StaleConfigException,
+ [&](const StaleConfigException& ex) {
+ const auto staleInfo = ex.extraInfo<StaleConfigInfo>();
+ ASSERT(staleInfo);
+ ASSERT_EQ(staleInfo->getNss(), kNss);
+ ASSERT_EQ(staleInfo->getVersionReceived(), receivedCollVersion);
+ ASSERT(staleInfo->getVersionWanted() != boost::none);
+ ASSERT_EQ(*(staleInfo->getVersionWanted()), wantedCollVersion);
+ ASSERT_EQ(staleInfo->getShardId(), kShards[0]);
+ });
+}
+
+TEST_F(CatalogCacheTest, CheckEpochWithMatch) {
+ const auto dbVersion = DatabaseVersion();
+ const auto collVersion = ChunkVersion(1, 0, OID::gen());
+
+ loadDatabases({DatabaseType(kNss.db().toString(), kShards[0], true, dbVersion)});
+ loadCollection(collVersion);
+
+ _catalogCache->checkEpochOrThrow(kNss, collVersion, kShards[0]);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/catalog_cache_test_fixture.cpp b/src/mongo/s/catalog_cache_test_fixture.cpp
index 71e02e67fac..4f59eeaef8a 100644
--- a/src/mongo/s/catalog_cache_test_fixture.cpp
+++ b/src/mongo/s/catalog_cache_test_fixture.cpp
@@ -81,6 +81,26 @@ CatalogCacheTestFixture::scheduleRoutingInfoUnforcedRefresh(const NamespaceStrin
});
}
+executor::NetworkTestEnv::FutureHandle<boost::optional<ChunkManager>>
+CatalogCacheTestFixture::scheduleRoutingInfoIncrementalRefresh(const NamespaceString& nss) {
+ auto catalogCache = Grid::get(getServiceContext())->catalogCache();
+ const auto cm =
+ uassertStatusOK(catalogCache->getCollectionRoutingInfo(operationContext(), nss));
+ ASSERT(cm.isSharded());
+
+ // Simulates the shard wanting a higher version than the one sent by the router.
+ catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection(
+ nss, boost::none, cm.dbPrimary());
+
+ return launchAsync([this, nss] {
+ auto client = getServiceContext()->makeClient("Test");
+ auto const catalogCache = Grid::get(getServiceContext())->catalogCache();
+
+ return boost::make_optional(
+ uassertStatusOK(catalogCache->getCollectionRoutingInfo(operationContext(), nss)));
+ });
+}
+
std::vector<ShardType> CatalogCacheTestFixture::setupNShards(int numShards) {
std::vector<ShardType> shards;
for (int i = 0; i < numShards; i++) {
diff --git a/src/mongo/s/catalog_cache_test_fixture.h b/src/mongo/s/catalog_cache_test_fixture.h
index fb5238a2ba9..3d58f6a8557 100644
--- a/src/mongo/s/catalog_cache_test_fixture.h
+++ b/src/mongo/s/catalog_cache_test_fixture.h
@@ -84,6 +84,17 @@ protected:
scheduleRoutingInfoUnforcedRefresh(const NamespaceString& nss);
/**
+ * Advance the time in the cache for 'kNss' and schedules a thread to make an incremental
+ * refresh.
+ *
+ * NOTE: The returned value is always set. The reason to use optional is a deficiency of
+ * std::future with the MSVC STL library, which requires the templated type to be default
+ * constructible.
+ */
+ executor::NetworkTestEnv::FutureHandle<boost::optional<ChunkManager>>
+ scheduleRoutingInfoIncrementalRefresh(const NamespaceString& nss);
+
+ /**
* Ensures that there are 'numShards' available in the shard registry. The shard ids are
* generated as "0", "1", etc.
*
diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp
index 5713855e01f..9ded562066c 100644
--- a/src/mongo/s/chunk_manager.cpp
+++ b/src/mongo/s/chunk_manager.cpp
@@ -336,22 +336,23 @@ void RoutingTableHistory::setAllShardsRefreshed() {
}
Chunk ChunkManager::findIntersectingChunk(const BSONObj& shardKey, const BSONObj& collation) const {
- const bool hasSimpleCollation = (collation.isEmpty() && !_rt->getDefaultCollator()) ||
+ const bool hasSimpleCollation = (collation.isEmpty() && !_rt->optRt->getDefaultCollator()) ||
SimpleBSONObjComparator::kInstance.evaluate(collation == CollationSpec::kSimpleSpec);
if (!hasSimpleCollation) {
for (BSONElement elt : shardKey) {
uassert(ErrorCodes::ShardKeyNotFound,
str::stream() << "Cannot target single shard due to collation of key "
- << elt.fieldNameStringData() << " for namespace " << _rt->nss(),
+ << elt.fieldNameStringData() << " for namespace "
+ << _rt->optRt->nss(),
!CollationIndexKey::isCollatableType(elt.type()));
}
}
- auto chunkInfo = _rt->findIntersectingChunk(shardKey);
+ auto chunkInfo = _rt->optRt->findIntersectingChunk(shardKey);
uassert(ErrorCodes::ShardKeyNotFound,
str::stream() << "Cannot target single shard using key " << shardKey
- << " for namespace " << _rt->nss(),
+ << " for namespace " << _rt->optRt->nss(),
chunkInfo && chunkInfo->containsKey(shardKey));
return Chunk(*chunkInfo, _clusterTime);
@@ -361,7 +362,7 @@ bool ChunkManager::keyBelongsToShard(const BSONObj& shardKey, const ShardId& sha
if (shardKey.isEmpty())
return false;
- auto chunkInfo = _rt->findIntersectingChunk(shardKey);
+ auto chunkInfo = _rt->optRt->findIntersectingChunk(shardKey);
if (!chunkInfo)
return false;
@@ -374,7 +375,7 @@ void ChunkManager::getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> e
const BSONObj& query,
const BSONObj& collation,
std::set<ShardId>* shardIds) const {
- auto qr = std::make_unique<QueryRequest>(_rt->nss());
+ auto qr = std::make_unique<QueryRequest>(_rt->optRt->nss());
qr->setFilter(query);
if (auto uuid = getUUID())
@@ -382,8 +383,8 @@ void ChunkManager::getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> e
if (!collation.isEmpty()) {
qr->setCollation(collation);
- } else if (_rt->getDefaultCollator()) {
- auto defaultCollator = _rt->getDefaultCollator();
+ } else if (_rt->optRt->getDefaultCollator()) {
+ auto defaultCollator = _rt->optRt->getDefaultCollator();
qr->setCollation(defaultCollator->getSpec().toBSON());
expCtx->setCollator(defaultCollator->clone());
}
@@ -396,7 +397,7 @@ void ChunkManager::getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> e
MatchExpressionParser::kAllowAllSpecialFeatures));
// Fast path for targeting equalities on the shard key.
- auto shardKeyToFind = _rt->getShardKeyPattern().extractShardKeyFromQuery(*cq);
+ auto shardKeyToFind = _rt->optRt->getShardKeyPattern().extractShardKeyFromQuery(*cq);
if (!shardKeyToFind.isEmpty()) {
try {
auto chunk = findIntersectingChunk(shardKeyToFind, collation);
@@ -413,14 +414,14 @@ void ChunkManager::getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> e
// Query { a : { $gte : 1, $lt : 2 },
// b : { $gte : 3, $lt : 4 } }
// => Bounds { a : [1, 2), b : [3, 4) }
- IndexBounds bounds = getIndexBoundsForQuery(_rt->getShardKeyPattern().toBSON(), *cq);
+ IndexBounds bounds = getIndexBoundsForQuery(_rt->optRt->getShardKeyPattern().toBSON(), *cq);
// Transforms bounds for each shard key field into full shard key ranges
// for example :
// Key { a : 1, b : 1 }
// Bounds { a : [1, 2), b : [3, 4) }
// => Ranges { a : 1, b : 3 } => { a : 2, b : 4 }
- BoundList ranges = _rt->getShardKeyPattern().flattenBounds(bounds);
+ BoundList ranges = _rt->optRt->getShardKeyPattern().flattenBounds(bounds);
for (BoundList::const_iterator it = ranges.begin(); it != ranges.end(); ++it) {
getShardIdsForRange(it->first /*min*/, it->second /*max*/, shardIds);
@@ -430,7 +431,7 @@ void ChunkManager::getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> e
// because _shardVersions contains shards with chunks and is built based on the last
// refresh. Therefore, it is possible for _shardVersions to have fewer entries if a shard
// no longer owns chunks when it used to at _clusterTime.
- if (!_clusterTime && shardIds->size() == _rt->_shardVersions.size()) {
+ if (!_clusterTime && shardIds->size() == _rt->optRt->_shardVersions.size()) {
break;
}
}
@@ -439,7 +440,7 @@ void ChunkManager::getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> e
// For now, we satisfy that assumption by adding a shard with no matches rather than returning
// an empty set of shards.
if (shardIds->empty()) {
- _rt->forEachChunk([&](const std::shared_ptr<ChunkInfo>& chunkInfo) {
+ _rt->optRt->forEachChunk([&](const std::shared_ptr<ChunkInfo>& chunkInfo) {
shardIds->insert(chunkInfo->getShardIdAt(_clusterTime));
return false;
});
@@ -459,7 +460,7 @@ void ChunkManager::getShardIdsForRange(const BSONObj& min,
return;
}
- _rt->forEachOverlappingChunk(min, max, true, [&](auto& chunkInfo) {
+ _rt->optRt->forEachOverlappingChunk(min, max, true, [&](auto& chunkInfo) {
shardIds->insert(chunkInfo->getShardIdAt(_clusterTime));
// No need to iterate through the rest of the ranges, because we already know we need to use
@@ -467,7 +468,7 @@ void ChunkManager::getShardIdsForRange(const BSONObj& min,
// because _shardVersions contains shards with chunks and is built based on the last
// refresh. Therefore, it is possible for _shardVersions to have fewer entries if a shard
// no longer owns chunks when it used to at _clusterTime.
- if (!_clusterTime && shardIds->size() == _rt->_shardVersions.size()) {
+ if (!_clusterTime && shardIds->size() == _rt->optRt->_shardVersions.size()) {
return false;
}
@@ -478,14 +479,15 @@ void ChunkManager::getShardIdsForRange(const BSONObj& min,
bool ChunkManager::rangeOverlapsShard(const ChunkRange& range, const ShardId& shardId) const {
bool overlapFound = false;
- _rt->forEachOverlappingChunk(range.getMin(), range.getMax(), false, [&](auto& chunkInfo) {
- if (chunkInfo->getShardIdAt(_clusterTime) == shardId) {
- overlapFound = true;
- return false;
- }
+ _rt->optRt->forEachOverlappingChunk(
+ range.getMin(), range.getMax(), false, [&](auto& chunkInfo) {
+ if (chunkInfo->getShardIdAt(_clusterTime) == shardId) {
+ overlapFound = true;
+ return false;
+ }
- return true;
- });
+ return true;
+ });
return overlapFound;
}
@@ -494,7 +496,7 @@ boost::optional<Chunk> ChunkManager::getNextChunkOnShard(const BSONObj& shardKey
const ShardId& shardId) const {
boost::optional<Chunk> chunk;
- _rt->forEachChunk(
+ _rt->optRt->forEachChunk(
[&](auto& chunkInfo) {
if (chunkInfo->getShardIdAt(_clusterTime) == shardId) {
chunk.emplace(*chunkInfo, _clusterTime);
@@ -654,7 +656,7 @@ ChunkManager ChunkManager::makeAtTime(const ChunkManager& cm, Timestamp clusterT
}
std::string ChunkManager::toString() const {
- return _rt ? _rt->toString() : "UNSHARDED";
+ return _rt->optRt ? _rt->optRt->toString() : "UNSHARDED";
}
bool RoutingTableHistory::compatibleWith(const RoutingTableHistory& other,
@@ -733,7 +735,7 @@ RoutingTableHistory RoutingTableHistory::makeUpdated(
auto changedChunkInfos = flatten(changedChunks);
auto chunkMap = _chunkMap.createMerged(changedChunkInfos);
- // If at least one diff was applied, the collection's version must have advanced
+ // Only update the same collection.
invariant(getVersion().epoch() == chunkMap.getVersion().epoch());
return RoutingTableHistory(_nss,
@@ -745,4 +747,60 @@ RoutingTableHistory RoutingTableHistory::makeUpdated(
std::move(chunkMap));
}
+AtomicWord<uint64_t> ComparableChunkVersion::_epochDisambiguatingSequenceNumSource{1ULL};
+AtomicWord<uint64_t> ComparableChunkVersion::_forcedRefreshSequenceNumSource{1ULL};
+
+ComparableChunkVersion ComparableChunkVersion::makeComparableChunkVersion(
+ const ChunkVersion& version) {
+ return ComparableChunkVersion(_forcedRefreshSequenceNumSource.load(),
+ version,
+ _epochDisambiguatingSequenceNumSource.fetchAndAdd(1));
+}
+
+ComparableChunkVersion ComparableChunkVersion::makeComparableChunkVersionForForcedRefresh() {
+ return ComparableChunkVersion(_forcedRefreshSequenceNumSource.addAndFetch(2) - 1,
+ boost::none,
+ _epochDisambiguatingSequenceNumSource.fetchAndAdd(1));
+}
+
+std::string ComparableChunkVersion::toString() const {
+ return str::stream() << _forcedRefreshSequenceNum << "|"
+ << (_chunkVersion ? _chunkVersion->toString() : "NONE") << "|"
+ << _epochDisambiguatingSequenceNum;
+}
+
+bool ComparableChunkVersion::operator==(const ComparableChunkVersion& other) const {
+ if (_forcedRefreshSequenceNum == other._forcedRefreshSequenceNum) {
+ if (_forcedRefreshSequenceNum == 0)
+ return true; // Default constructed value
+
+ if (sameEpoch(other)) {
+ if (_chunkVersion->majorVersion() == 0 && other._chunkVersion->majorVersion() == 0) {
+ return _chunkVersion->epoch() == OID();
+ }
+ return _chunkVersion->majorVersion() == other._chunkVersion->majorVersion() &&
+ _chunkVersion->minorVersion() == other._chunkVersion->minorVersion();
+ }
+ }
+ return false;
+}
+
+bool ComparableChunkVersion::operator<(const ComparableChunkVersion& other) const {
+ if (_forcedRefreshSequenceNum < other._forcedRefreshSequenceNum)
+ return true;
+ if (_forcedRefreshSequenceNum > other._forcedRefreshSequenceNum)
+ return false;
+ if (_forcedRefreshSequenceNum == 0)
+ return false; // Default constructed value
+
+ if (sameEpoch(other) && other._chunkVersion->epoch() != OID() &&
+ _chunkVersion->majorVersion() != 0 && other._chunkVersion->majorVersion() != 0) {
+ return _chunkVersion->majorVersion() < other._chunkVersion->majorVersion() ||
+ (_chunkVersion->majorVersion() == other._chunkVersion->majorVersion() &&
+ _chunkVersion->minorVersion() < other._chunkVersion->minorVersion());
+ } else {
+ return _epochDisambiguatingSequenceNum < other._epochDisambiguatingSequenceNum;
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h
index 7f25a810a4a..e694a94c201 100644
--- a/src/mongo/s/chunk_manager.h
+++ b/src/mongo/s/chunk_manager.h
@@ -43,6 +43,7 @@
#include "mongo/s/shard_key_pattern.h"
#include "mongo/stdx/unordered_map.h"
#include "mongo/util/concurrency/ticketholder.h"
+#include "mongo/util/read_through_cache.h"
namespace mongo {
@@ -324,13 +325,128 @@ private:
};
/**
+ * Constructed to be used exclusively by the CatalogCache as a vector clock (Time) to drive
+ * CollectionCache's lookups.
+ *
+ * The ChunkVersion class contains a non comparable epoch, which makes impossible to compare two
+ * ChunkVersions when their epochs's differ.
+ *
+ * This class wraps a ChunkVersion object with a node-local sequence number
+ * (_epochDisambiguatingSequenceNum) that allows the comparision.
+ *
+ * This class should go away once a cluster-wide comparable ChunkVersion is implemented.
+ */
+class ComparableChunkVersion {
+public:
+ /**
+ * Creates a ComparableChunkVersion that wraps the given ChunkVersion.
+ * Each object created through this method will have a local sequence number greater than the
+ * previously created ones.
+ */
+ static ComparableChunkVersion makeComparableChunkVersion(const ChunkVersion& version);
+
+ /**
+ * Creates a ComparableChunkVersion object, which will artificially be greater than any that
+ * were previously created by `makeComparableChunkVersion`. Used as means to cause the
+ * collections cache to attempt a refresh in situations where causal consistency cannot be
+ * inferred.
+ */
+ static ComparableChunkVersion makeComparableChunkVersionForForcedRefresh();
+
+ /**
+ * Empty constructor needed by the ReadThroughCache.
+ *
+ * Instances created through this constructor will be always less then the ones created through
+ * the two static constructors, but they do not carry any meaningful value and can only be used
+ * for comparison purposes.
+ */
+ ComparableChunkVersion() = default;
+
+ const ChunkVersion& getVersion() const {
+ return *_chunkVersion;
+ }
+
+ std::string toString() const;
+
+ bool sameEpoch(const ComparableChunkVersion& other) const {
+ return _chunkVersion->epoch() == other._chunkVersion->epoch();
+ }
+
+ bool operator==(const ComparableChunkVersion& other) const;
+
+ bool operator!=(const ComparableChunkVersion& other) const {
+ return !(*this == other);
+ }
+
+ /**
+ * In case the two compared instances have different epochs, the most recently created one will
+ * be greater, otherwise the comparision will be driven by the major/minor versions of the
+ * underlying ChunkVersion.
+ */
+ bool operator<(const ComparableChunkVersion& other) const;
+
+ bool operator>(const ComparableChunkVersion& other) const {
+ return other < *this;
+ }
+
+ bool operator<=(const ComparableChunkVersion& other) const {
+ return !(*this > other);
+ }
+
+ bool operator>=(const ComparableChunkVersion& other) const {
+ return !(*this < other);
+ }
+
+private:
+ static AtomicWord<uint64_t> _epochDisambiguatingSequenceNumSource;
+ static AtomicWord<uint64_t> _forcedRefreshSequenceNumSource;
+
+ ComparableChunkVersion(uint64_t forcedRefreshSequenceNum,
+ boost::optional<ChunkVersion> version,
+ uint64_t epochDisambiguatingSequenceNum)
+ : _forcedRefreshSequenceNum(forcedRefreshSequenceNum),
+ _chunkVersion(std::move(version)),
+ _epochDisambiguatingSequenceNum(epochDisambiguatingSequenceNum) {}
+
+ uint64_t _forcedRefreshSequenceNum{0};
+
+ boost::optional<ChunkVersion> _chunkVersion;
+
+ // Locally incremented sequence number that allows to compare two colection versions with
+ // different epochs. Each new comparableChunkVersion will have a greater sequence number than
+ // the ones created before.
+ uint64_t _epochDisambiguatingSequenceNum{0};
+};
+
+/**
+ * This intermediate structure is necessary to be able to store UNSHARDED collections in the routing
+ * table history cache below. The reason is that currently the RoutingTableHistory class only
+ * supports sharded collections (i.e., collections which have entries in config.collections and
+ * config.chunks).
+ */
+struct OptionalRoutingTableHistory {
+ // UNSHARDED collection constructor
+ OptionalRoutingTableHistory() = default;
+
+ // SHARDED collection constructor
+ OptionalRoutingTableHistory(RoutingTableHistory&& rt) : optRt(std::move(rt)) {}
+
+ // If boost::none, the collection is UNSHARDED, otherwise it is SHARDED
+ boost::optional<RoutingTableHistory> optRt;
+};
+
+using RoutingTableHistoryCache =
+ ReadThroughCache<NamespaceString, OptionalRoutingTableHistory, ComparableChunkVersion>;
+using RoutingTableHistoryValueHandle = RoutingTableHistoryCache::ValueHandle;
+
+/**
* Wrapper around a RoutingTableHistory, which pins it to a particular point in time.
*/
class ChunkManager {
public:
ChunkManager(ShardId dbPrimary,
DatabaseVersion dbVersion,
- std::shared_ptr<RoutingTableHistory> rt,
+ RoutingTableHistoryValueHandle rt,
boost::optional<Timestamp> clusterTime)
: _dbPrimary(std::move(dbPrimary)),
_dbVersion(std::move(dbVersion)),
@@ -340,7 +456,7 @@ public:
// Methods supported on both sharded and unsharded collections
bool isSharded() const {
- return bool(_rt);
+ return bool(_rt->optRt);
}
const ShardId& dbPrimary() const {
@@ -352,7 +468,7 @@ public:
}
int numChunks() const {
- return _rt ? _rt->numChunks() : 1;
+ return _rt->optRt ? _rt->optRt->numChunks() : 1;
}
std::string toString() const;
@@ -360,32 +476,32 @@ public:
// Methods only supported on sharded collections (caller must check isSharded())
const ShardKeyPattern& getShardKeyPattern() const {
- return _rt->getShardKeyPattern();
+ return _rt->optRt->getShardKeyPattern();
}
const CollatorInterface* getDefaultCollator() const {
- return _rt->getDefaultCollator();
+ return _rt->optRt->getDefaultCollator();
}
bool isUnique() const {
- return _rt->isUnique();
+ return _rt->optRt->isUnique();
}
ChunkVersion getVersion() const {
- return _rt->getVersion();
+ return _rt->optRt->getVersion();
}
ChunkVersion getVersion(const ShardId& shardId) const {
- return _rt->getVersion(shardId);
+ return _rt->optRt->getVersion(shardId);
}
ChunkVersion getVersionForLogging(const ShardId& shardId) const {
- return _rt->getVersionForLogging(shardId);
+ return _rt->optRt->getVersionForLogging(shardId);
}
template <typename Callable>
void forEachChunk(Callable&& handler) const {
- _rt->forEachChunk(
+ _rt->optRt->forEachChunk(
[this, handler = std::forward<Callable>(handler)](const auto& chunkInfo) mutable {
if (!handler(Chunk{*chunkInfo, _clusterTime}))
return false;
@@ -461,14 +577,14 @@ public:
* Returns the ids of all shards on which the collection has any chunks.
*/
void getAllShardIds(std::set<ShardId>* all) const {
- _rt->getAllShardIds(all);
+ _rt->optRt->getAllShardIds(all);
}
/**
* Returns the number of shards on which the collection has any chunks
*/
int getNShardsOwningChunks() const {
- return _rt->getNShardsOwningChunks();
+ return _rt->optRt->getNShardsOwningChunks();
}
// Transforms query into bounds for each field in the shard key
@@ -500,30 +616,30 @@ public:
* Returns true if, for this shard, the chunks are identical in both chunk managers
*/
bool compatibleWith(const ChunkManager& other, const ShardId& shard) const {
- return _rt->compatibleWith(*other._rt, shard);
+ return _rt->optRt->compatibleWith(*other._rt->optRt, shard);
}
bool uuidMatches(UUID uuid) const {
- return _rt->uuidMatches(uuid);
+ return _rt->optRt->uuidMatches(uuid);
}
boost::optional<UUID> getUUID() const {
- return _rt->getUUID();
+ return _rt->optRt->getUUID();
}
const boost::optional<TypeCollectionReshardingFields>& getReshardingFields() const {
- return _rt->getReshardingFields();
+ return _rt->optRt->getReshardingFields();
}
const RoutingTableHistory& getRoutingTableHistory_ForTest() const {
- return *_rt;
+ return *_rt->optRt;
}
private:
ShardId _dbPrimary;
DatabaseVersion _dbVersion;
- std::shared_ptr<RoutingTableHistory> _rt;
+ RoutingTableHistoryValueHandle _rt;
boost::optional<Timestamp> _clusterTime;
};
diff --git a/src/mongo/s/chunk_manager_refresh_bm.cpp b/src/mongo/s/chunk_manager_refresh_bm.cpp
index a3feba2de1e..bd9b133301c 100644
--- a/src/mongo/s/chunk_manager_refresh_bm.cpp
+++ b/src/mongo/s/chunk_manager_refresh_bm.cpp
@@ -43,8 +43,10 @@ namespace {
const NamespaceString kNss("test", "foo");
-std::shared_ptr<RoutingTableHistory> makeStandaloneRoutingTableHistory(RoutingTableHistory rt) {
- return std::make_shared<RoutingTableHistory>(std::move(rt));
+RoutingTableHistoryValueHandle makeStandaloneRoutingTableHistory(RoutingTableHistory rt) {
+ const auto version = rt.getVersion();
+ return RoutingTableHistoryValueHandle(
+ std::move(rt), ComparableChunkVersion::makeComparableChunkVersion(version));
}
ChunkRange getRangeForChunk(int i, int nChunks) {
@@ -69,6 +71,7 @@ CollectionMetadata makeChunkManagerWithShardSelector(int nShards,
std::vector<ChunkType> chunks;
chunks.reserve(nChunks);
+
for (uint32_t i = 0; i < nChunks; ++i) {
chunks.emplace_back(kNss,
getRangeForChunk(i, nChunks),
@@ -144,13 +147,13 @@ auto BM_FullBuildOfChunkManager(benchmark::State& state, ShardSelectorFn selectS
const uint32_t nChunks = state.range(1);
const auto collEpoch = OID::gen();
- const auto collName = NamespaceString("test.foo");
const auto shardKeyPattern = KeyPattern(BSON("_id" << 1));
std::vector<ChunkType> chunks;
chunks.reserve(nChunks);
+
for (uint32_t i = 0; i < nChunks; ++i) {
- chunks.emplace_back(collName,
+ chunks.emplace_back(kNss,
getRangeForChunk(i, nChunks),
ChunkVersion{i + 1, 0, collEpoch},
selectShard(i, nShards, nChunks));
@@ -158,7 +161,7 @@ auto BM_FullBuildOfChunkManager(benchmark::State& state, ShardSelectorFn selectS
for (auto keepRunning : state) {
auto rt = RoutingTableHistory::makeNew(
- collName, UUID::gen(), shardKeyPattern, nullptr, true, collEpoch, boost::none, chunks);
+ kNss, UUID::gen(), shardKeyPattern, nullptr, true, collEpoch, boost::none, chunks);
benchmark::DoNotOptimize(
CollectionMetadata(ChunkManager(ShardId("shard0"),
DatabaseVersion(UUID::gen(), 1),
diff --git a/src/mongo/s/commands/cluster_drop_cmd.cpp b/src/mongo/s/commands/cluster_drop_cmd.cpp
index a69e3292597..f727489ccc0 100644
--- a/src/mongo/s/commands/cluster_drop_cmd.cpp
+++ b/src/mongo/s/commands/cluster_drop_cmd.cpp
@@ -88,7 +88,9 @@ public:
// Invalidate the routing table cache entry for this collection so that we reload it the
// next time it is accessed, even if sending the command to the config server fails due
// to e.g. a NetworkError.
- ON_BLOCK_EXIT([opCtx, nss] { Grid::get(opCtx)->catalogCache()->onEpochChange(nss); });
+ ON_BLOCK_EXIT([opCtx, nss] {
+ Grid::get(opCtx)->catalogCache()->invalidateCollectionEntry_LINEARIZABLE(nss);
+ });
auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
auto cmdResponse = uassertStatusOK(configShard->runCommandWithFixedRetryAttempts(
diff --git a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp
index b4157bee9d9..531aa1ab41e 100644
--- a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp
+++ b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp
@@ -174,8 +174,10 @@ public:
Shard::RetryPolicy::kNotIdempotent));
uassertStatusOK(response.commandStatus);
- Grid::get(opCtx)->catalogCache()->invalidateShardForShardedCollection(
- nss, firstChunk.getShardId());
+ Grid::get(opCtx)
+ ->catalogCache()
+ ->invalidateShardOrEntireCollectionEntryForShardedCollection(
+ nss, boost::none, firstChunk.getShardId());
CommandHelpers::filterCommandReplyForPassthrough(response.response, &result);
return true;
diff --git a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
index 01cdb91234e..f6e2d27c80f 100644
--- a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
+++ b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
@@ -198,9 +198,14 @@ public:
cmdObj["waitForDelete"].trueValue(),
forceJumbo));
- Grid::get(opCtx)->catalogCache()->invalidateShardForShardedCollection(nss,
- chunk->getShardId());
- Grid::get(opCtx)->catalogCache()->invalidateShardForShardedCollection(nss, to->getId());
+ Grid::get(opCtx)
+ ->catalogCache()
+ ->invalidateShardOrEntireCollectionEntryForShardedCollection(
+ nss, boost::none, chunk->getShardId());
+ Grid::get(opCtx)
+ ->catalogCache()
+ ->invalidateShardOrEntireCollectionEntryForShardedCollection(
+ nss, boost::none, to->getId());
result.append("millis", t.millis());
return true;
diff --git a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
index d27fd037d30..d4c4d7901ad 100644
--- a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
+++ b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
@@ -105,7 +105,9 @@ public:
// Invalidate the routing table cache entry for this collection so that we reload the
// collection the next time it's accessed, even if we receive a failure, e.g. NetworkError.
- ON_BLOCK_EXIT([opCtx, nss] { Grid::get(opCtx)->catalogCache()->onEpochChange(nss); });
+ ON_BLOCK_EXIT([opCtx, nss] {
+ Grid::get(opCtx)->catalogCache()->invalidateCollectionEntry_LINEARIZABLE(nss);
+ });
auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
auto cmdResponse = uassertStatusOK(configShard->runCommandWithFixedRetryAttempts(
diff --git a/src/mongo/s/commands/cluster_split_cmd.cpp b/src/mongo/s/commands/cluster_split_cmd.cpp
index 19d33b3f10b..5532fac1daf 100644
--- a/src/mongo/s/commands/cluster_split_cmd.cpp
+++ b/src/mongo/s/commands/cluster_split_cmd.cpp
@@ -270,8 +270,10 @@ public:
ChunkRange(chunk->getMin(), chunk->getMax()),
{splitPoint}));
- Grid::get(opCtx)->catalogCache()->invalidateShardForShardedCollection(nss,
- chunk->getShardId());
+ Grid::get(opCtx)
+ ->catalogCache()
+ ->invalidateShardOrEntireCollectionEntryForShardedCollection(
+ nss, boost::none, chunk->getShardId());
return true;
}
diff --git a/src/mongo/s/commands/flush_router_config_cmd.cpp b/src/mongo/s/commands/flush_router_config_cmd.cpp
index bcc61a82a0a..d27b65a2c4d 100644
--- a/src/mongo/s/commands/flush_router_config_cmd.cpp
+++ b/src/mongo/s/commands/flush_router_config_cmd.cpp
@@ -102,7 +102,7 @@ public:
"Routing metadata flushed for collection {namespace}",
"Routing metadata flushed for collection",
"namespace"_attr = nss);
- catalogCache->purgeCollection(nss);
+ catalogCache->invalidateCollectionEntry_LINEARIZABLE(nss);
}
}
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 644c10e6bcb..f83b490d0ef 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -722,16 +722,12 @@ void runCommand(OperationContext* opCtx,
auto catalogCache = Grid::get(opCtx)->catalogCache();
if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) {
catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection(
- opCtx,
- staleNs,
- staleInfo->getVersionWanted(),
- staleInfo->getVersionReceived(),
- staleInfo->getShardId());
+ staleNs, staleInfo->getVersionWanted(), staleInfo->getShardId());
} else {
// If we don't have the stale config info and therefore don't know the shard's
// id, we have to force all further targetting requests for the namespace to
// block on a refresh.
- catalogCache->onEpochChange(staleNs);
+ catalogCache->invalidateCollectionEntry_LINEARIZABLE(staleNs);
}
@@ -1301,16 +1297,12 @@ void Strategy::explainFind(OperationContext* opCtx,
Grid::get(opCtx)
->catalogCache()
->invalidateShardOrEntireCollectionEntryForShardedCollection(
- opCtx,
- staleNs,
- staleInfo->getVersionWanted(),
- staleInfo->getVersionReceived(),
- staleInfo->getShardId());
+ staleNs, staleInfo->getVersionWanted(), staleInfo->getShardId());
} else {
// If we don't have the stale config info and therefore don't know the shard's id,
// we have to force all further targetting requests for the namespace to block on
// a refresh.
- Grid::get(opCtx)->catalogCache()->onEpochChange(staleNs);
+ Grid::get(opCtx)->catalogCache()->invalidateCollectionEntry_LINEARIZABLE(staleNs);
}
if (canRetry) {
diff --git a/src/mongo/s/comparable_chunk_version_test.cpp b/src/mongo/s/comparable_chunk_version_test.cpp
index 941d9bad080..8c1fa71fce2 100644
--- a/src/mongo/s/comparable_chunk_version_test.cpp
+++ b/src/mongo/s/comparable_chunk_version_test.cpp
@@ -29,8 +29,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/s/catalog_cache.h"
-#include "mongo/s/chunk_version.h"
+#include "mongo/s/chunk_manager.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -95,9 +94,15 @@ TEST(ComparableChunkVersionTest, VersionLessSameEpoch) {
ASSERT_FALSE(version2 > version3);
}
+TEST(ComparableChunkVersionTest, DefaultConstructedVersionsAreEqual) {
+ const ComparableChunkVersion defaultVersion1{}, defaultVersion2{};
+ ASSERT(defaultVersion1 == defaultVersion2);
+ ASSERT_FALSE(defaultVersion1 < defaultVersion2);
+ ASSERT_FALSE(defaultVersion1 > defaultVersion2);
+}
+
TEST(ComparableChunkVersionTest, DefaultConstructedVersionIsAlwaysLess) {
const ComparableChunkVersion defaultVersion{};
- ASSERT_EQ(defaultVersion.getLocalSequenceNum(), 0);
const auto version1 =
ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, OID::gen()));
ASSERT(defaultVersion != version1);
@@ -105,5 +110,127 @@ TEST(ComparableChunkVersionTest, DefaultConstructedVersionIsAlwaysLess) {
ASSERT_FALSE(defaultVersion > version1);
}
+TEST(ComparableChunkVersionTest, DefaultConstructedVersionIsAlwaysLessThanUnsharded) {
+ const ComparableChunkVersion defaultVersion{};
+ const auto version1 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::UNSHARDED());
+ ASSERT(defaultVersion != version1);
+ ASSERT(defaultVersion < version1);
+ ASSERT_FALSE(defaultVersion > version1);
+}
+
+TEST(ComparableChunkVersionTest, DefaultConstructedVersionIsAlwaysLessThanDropped) {
+ const ComparableChunkVersion defaultVersion{};
+ const auto version1 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::DROPPED());
+ ASSERT(defaultVersion != version1);
+ ASSERT(defaultVersion < version1);
+ ASSERT_FALSE(defaultVersion > version1);
+}
+
+TEST(ComparableChunkVersionTest, UnshardedAndDroppedAreEqual) {
+ const auto version1 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::UNSHARDED());
+ const auto version2 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::DROPPED());
+ const auto version3 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::UNSHARDED());
+ const auto version4 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::DROPPED());
+ ASSERT(version1 == version2);
+ ASSERT(version1 == version3);
+ ASSERT(version2 == version4);
+}
+
+TEST(ComparableChunkVersionTest, NoChunksAreDifferent) {
+ const auto oid = OID::gen();
+ const auto version1 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, oid));
+ const auto version2 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, oid));
+ ASSERT(version1 != version2);
+ ASSERT(version1 < version2);
+ ASSERT_FALSE(version1 > version2);
+}
+
+TEST(ComparableChunkVersionTest, NoChunksCompareBySequenceNum) {
+ const auto oid = OID::gen();
+ const auto version1 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(1, 0, oid));
+ const auto noChunkSV1 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, oid));
+
+ ASSERT(version1 != noChunkSV1);
+ ASSERT(noChunkSV1 > version1);
+
+ const auto noChunkSV2 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, oid));
+
+ ASSERT(noChunkSV1 != noChunkSV2);
+ ASSERT_FALSE(noChunkSV1 > noChunkSV2);
+ ASSERT(noChunkSV2 > noChunkSV1);
+
+ const auto version2 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(2, 0, oid));
+
+ ASSERT(version2 != noChunkSV2);
+ ASSERT(version2 > noChunkSV2);
+}
+
+TEST(ComparableChunkVersionTest, NoChunksGreaterThanUnshardedBySequenceNum) {
+ const auto unsharded =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::UNSHARDED());
+ const auto noChunkSV =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, OID::gen()));
+
+ ASSERT(noChunkSV != unsharded);
+ ASSERT(noChunkSV > unsharded);
+}
+
+TEST(ComparableChunkVersionTest, UnshardedGreaterThanNoChunksBySequenceNum) {
+ const auto noChunkSV =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, OID::gen()));
+ const auto unsharded =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::UNSHARDED());
+
+ ASSERT(noChunkSV != unsharded);
+ ASSERT(unsharded > noChunkSV);
+}
+
+TEST(ComparableChunkVersionTest, NoChunksGreaterThanDefault) {
+ const auto noChunkSV =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, OID::gen()));
+ const ComparableChunkVersion defaultVersion{};
+
+ ASSERT(noChunkSV != defaultVersion);
+ ASSERT(noChunkSV > defaultVersion);
+}
+
+TEST(ComparableChunkVersionTest, ForcedRefreshSequenceNumber) {
+ auto oid = OID::gen();
+ const ComparableChunkVersion defaultVersionBeforeForce;
+ const auto versionBeforeForce =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(100, 0, oid));
+
+ const auto forcedRefreshVersion =
+ ComparableChunkVersion::makeComparableChunkVersionForForcedRefresh();
+
+ const auto versionAfterForce =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(100, 0, oid));
+ const ComparableChunkVersion defaultVersionAfterForce;
+
+ ASSERT(defaultVersionBeforeForce != forcedRefreshVersion);
+ ASSERT(defaultVersionBeforeForce < forcedRefreshVersion);
+
+ ASSERT(versionBeforeForce != forcedRefreshVersion);
+ ASSERT(versionBeforeForce < forcedRefreshVersion);
+
+ ASSERT(versionAfterForce != forcedRefreshVersion);
+ ASSERT(versionAfterForce > forcedRefreshVersion);
+
+ ASSERT(defaultVersionAfterForce != forcedRefreshVersion);
+ ASSERT(defaultVersionAfterForce < forcedRefreshVersion);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/comparable_database_version_test.cpp b/src/mongo/s/comparable_database_version_test.cpp
index 3b2486a5ebd..d4201d56564 100644
--- a/src/mongo/s/comparable_database_version_test.cpp
+++ b/src/mongo/s/comparable_database_version_test.cpp
@@ -82,9 +82,15 @@ TEST(ComparableDatabaseVersionTest, VersionLessSameUuid) {
ASSERT_FALSE(version1 > version2);
}
+TEST(ComparableDatabaseVersionTest, DefaultConstructedVersionsAreEqual) {
+ const ComparableDatabaseVersion defaultVersion1{}, defaultVersion2{};
+ ASSERT(defaultVersion1 == defaultVersion2);
+ ASSERT_FALSE(defaultVersion1 < defaultVersion2);
+ ASSERT_FALSE(defaultVersion1 > defaultVersion2);
+}
+
TEST(ComparableDatabaseVersionTest, DefaultConstructedVersionIsAlwaysLess) {
const ComparableDatabaseVersion defaultVersion{};
- ASSERT_EQ(defaultVersion.getLocalSequenceNum(), 0);
const auto version1 =
ComparableDatabaseVersion::makeComparableDatabaseVersion(DatabaseVersion(UUID::gen(), 0));
ASSERT(defaultVersion != version1);
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 3996e01c326..12073508642 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -504,18 +504,18 @@ CursorId ClusterFind::runQuery(OperationContext* opCtx,
// Re-target and re-send the initial find command to the shards until we have established the
// shard version.
for (size_t retries = 1; retries <= kMaxRetries; ++retries) {
- auto routingInfoStatus = getCollectionRoutingInfoForTxnCmd(opCtx, query.nss());
- if (routingInfoStatus == ErrorCodes::NamespaceNotFound) {
+ auto swCM = getCollectionRoutingInfoForTxnCmd(opCtx, query.nss());
+ if (swCM == ErrorCodes::NamespaceNotFound) {
// If the database doesn't exist, we successfully return an empty result set without
// creating a cursor.
return CursorId(0);
}
- auto routingInfo = uassertStatusOK(routingInfoStatus);
+ const auto cm = uassertStatusOK(std::move(swCM));
try {
return runQueryWithoutRetrying(
- opCtx, query, readPref, routingInfo, results, partialResultsReturned);
+ opCtx, query, readPref, cm, results, partialResultsReturned);
} catch (ExceptionFor<ErrorCodes::StaleDbVersion>& ex) {
if (retries >= kMaxRetries) {
// Check if there are no retries remaining, so the last received error can be
@@ -577,13 +577,9 @@ CursorId ClusterFind::runQuery(OperationContext* opCtx,
if (ex.code() != ErrorCodes::ShardInvalidatedForTargeting) {
if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) {
catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection(
- opCtx,
- query.nss(),
- staleInfo->getVersionWanted(),
- staleInfo->getVersionReceived(),
- staleInfo->getShardId());
+ query.nss(), staleInfo->getVersionWanted(), staleInfo->getShardId());
} else {
- catalogCache->onEpochChange(query.nss());
+ catalogCache->invalidateCollectionEntry_LINEARIZABLE(query.nss());
}
}
diff --git a/src/mongo/s/request_types/set_shard_version_request.h b/src/mongo/s/request_types/set_shard_version_request.h
index bfd7385ffae..44cacff0415 100644
--- a/src/mongo/s/request_types/set_shard_version_request.h
+++ b/src/mongo/s/request_types/set_shard_version_request.h
@@ -98,6 +98,7 @@ private:
SetShardVersionRequest();
bool _isAuthoritative{false};
+ // TODO (SERVER-50812) remove this flag that isn't used anymore
bool _forceRefresh{false};
boost::optional<NamespaceString> _nss;
diff --git a/src/mongo/s/sessions_collection_sharded.cpp b/src/mongo/s/sessions_collection_sharded.cpp
index 060c1158dbd..22915bd2c0a 100644
--- a/src/mongo/s/sessions_collection_sharded.cpp
+++ b/src/mongo/s/sessions_collection_sharded.cpp
@@ -123,8 +123,6 @@ void SessionsCollectionSharded::checkSessionsCollectionExists(OperationContext*
const auto cm = uassertStatusOK(
Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(
opCtx, NamespaceString::kLogicalSessionsNamespace));
-
- uassert(ErrorCodes::NamespaceNotFound, "config.system.sessions does not exist", cm.isSharded());
}
void SessionsCollectionSharded::refreshSessions(OperationContext* opCtx,
diff --git a/src/mongo/s/sharding_test_fixture_common.cpp b/src/mongo/s/sharding_test_fixture_common.cpp
index 95dd505687b..2ac936d3977 100644
--- a/src/mongo/s/sharding_test_fixture_common.cpp
+++ b/src/mongo/s/sharding_test_fixture_common.cpp
@@ -47,9 +47,11 @@ ShardingTestFixtureCommon::ShardingTestFixtureCommon() {
ShardingTestFixtureCommon::~ShardingTestFixtureCommon() = default;
-std::shared_ptr<RoutingTableHistory> ShardingTestFixtureCommon::makeStandaloneRoutingTableHistory(
+RoutingTableHistoryValueHandle ShardingTestFixtureCommon::makeStandaloneRoutingTableHistory(
RoutingTableHistory rt) {
- return std::make_shared<RoutingTableHistory>(std::move(rt));
+ const auto version = rt.getVersion();
+ return RoutingTableHistoryValueHandle(
+ std::move(rt), ComparableChunkVersion::makeComparableChunkVersion(version));
}
void ShardingTestFixtureCommon::onCommand(NetworkTestEnv::OnCommandFunction func) {
diff --git a/src/mongo/s/sharding_test_fixture_common.h b/src/mongo/s/sharding_test_fixture_common.h
index 0ecbbb30695..52377d7fbc5 100644
--- a/src/mongo/s/sharding_test_fixture_common.h
+++ b/src/mongo/s/sharding_test_fixture_common.h
@@ -55,8 +55,7 @@ public:
* which can be used to pass to ChunkManager for tests, which specifically target the behaviour
* of the ChunkManager.
*/
- static std::shared_ptr<RoutingTableHistory> makeStandaloneRoutingTableHistory(
- RoutingTableHistory rt);
+ static RoutingTableHistoryValueHandle makeStandaloneRoutingTableHistory(RoutingTableHistory rt);
protected:
ShardingTestFixtureCommon();
diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp
index f7189efdfe9..6794dabc3ca 100644
--- a/src/mongo/s/write_ops/chunk_manager_targeter.cpp
+++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp
@@ -791,7 +791,7 @@ int ChunkManagerTargeter::getNShardsOwningChunks() const {
void ChunkManagerTargeter::_refreshShardVersionNow(OperationContext* opCtx) {
uassertStatusOK(
- Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, _nss, true));
+ Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, _nss));
_init(opCtx);
}
diff --git a/src/mongo/util/invalidating_lru_cache.h b/src/mongo/util/invalidating_lru_cache.h
index 18b9a94c9fa..c8ead4adecc 100644
--- a/src/mongo/util/invalidating_lru_cache.h
+++ b/src/mongo/util/invalidating_lru_cache.h
@@ -196,9 +196,9 @@ public:
*/
class ValueHandle {
public:
- // The two constructors below are present in order to offset the fact that the cache doesn't
- // support pinning items. Their only usage must be in the authorization mananager for the
- // internal authentication user.
+ // The three constructors below are present in order to offset the fact that the cache
+ // doesn't support pinning items. Their only usage must be in the authorization mananager
+ // for the internal authentication user.
explicit ValueHandle(Value&& value)
: _value(std::make_shared<StoredValue>(nullptr,
0,
@@ -207,6 +207,10 @@ public:
CacheNotCausallyConsistent(),
CacheNotCausallyConsistent())) {}
+ explicit ValueHandle(Value&& value, const Time& t)
+ : _value(
+ std::make_shared<StoredValue>(nullptr, 0, boost::none, std::move(value), t, t)) {}
+
ValueHandle() = default;
operator bool() const {
@@ -264,15 +268,16 @@ public:
Value&& value,
const Time& time = CacheNotCausallyConsistent()) {
LockGuardWithPostUnlockDestructor guard(_mutex);
- Time timeInStore;
- _invalidate(&guard, key, _cache.find(key), &timeInStore);
- if (auto evicted = _cache.add(key,
- std::make_shared<StoredValue>(this,
- ++_epoch,
- key,
- std::forward<Value>(value),
- time,
- std::max(time, timeInStore)))) {
+ Time currentTime, currentTimeInStore;
+ _invalidate(&guard, key, _cache.find(key), &currentTime, &currentTimeInStore);
+ if (auto evicted =
+ _cache.add(key,
+ std::make_shared<StoredValue>(this,
+ ++_epoch,
+ key,
+ std::forward<Value>(value),
+ time,
+ std::max(time, currentTimeInStore)))) {
const auto& evictedKey = evicted->first;
auto& evictedValue = evicted->second;
@@ -310,15 +315,16 @@ public:
Value&& value,
const Time& time = CacheNotCausallyConsistent()) {
LockGuardWithPostUnlockDestructor guard(_mutex);
- Time timeInStore;
- _invalidate(&guard, key, _cache.find(key), &timeInStore);
- if (auto evicted = _cache.add(key,
- std::make_shared<StoredValue>(this,
- ++_epoch,
- key,
- std::forward<Value>(value),
- time,
- std::max(time, timeInStore)))) {
+ Time currentTime, currentTimeInStore;
+ _invalidate(&guard, key, _cache.find(key), &currentTime, &currentTimeInStore);
+ if (auto evicted =
+ _cache.add(key,
+ std::make_shared<StoredValue>(this,
+ ++_epoch,
+ key,
+ std::forward<Value>(value),
+ time,
+ std::max(time, currentTimeInStore)))) {
const auto& evictedKey = evicted->first;
auto& evictedValue = evicted->second;
@@ -526,10 +532,13 @@ private:
void _invalidate(LockGuardWithPostUnlockDestructor* guard,
const Key& key,
typename Cache::iterator it,
+ Time* outTime = nullptr,
Time* outTimeInStore = nullptr) {
if (it != _cache.end()) {
auto& storedValue = it->second;
storedValue->isValid.store(false);
+ if (outTime)
+ *outTime = storedValue->time;
if (outTimeInStore)
*outTimeInStore = storedValue->timeInStore;
guard->releasePtr(std::move(storedValue));
@@ -545,6 +554,8 @@ private:
// released and drops to zero
if (auto evictedValue = itEvicted->second.lock()) {
evictedValue->isValid.store(false);
+ if (outTime)
+ *outTime = evictedValue->time;
if (outTimeInStore)
*outTimeInStore = evictedValue->timeInStore;
guard->releasePtr(std::move(evictedValue));
diff --git a/src/mongo/util/invalidating_lru_cache_test.cpp b/src/mongo/util/invalidating_lru_cache_test.cpp
index 282a130af68..8476dfc5c9e 100644
--- a/src/mongo/util/invalidating_lru_cache_test.cpp
+++ b/src/mongo/util/invalidating_lru_cache_test.cpp
@@ -67,11 +67,14 @@ TEST(InvalidatingLRUCacheTest, ValueHandleOperators) {
TestValueCache cache(1);
cache.insertOrAssign(100, {"Test value"});
+ // Test non-const operators
{
auto valueHandle = cache.get(100);
ASSERT_EQ("Test value", valueHandle->value);
ASSERT_EQ("Test value", (*valueHandle).value);
}
+
+ // Test const operators
{
const auto valueHandle = cache.get(100);
ASSERT_EQ("Test value", valueHandle->value);
@@ -473,7 +476,7 @@ void parallelTest(size_t cacheSize, TestFunc doTest) {
}
TEST(InvalidatingLRUCacheParallelTest, InsertOrAssignThenGet) {
- parallelTest<TestValueCache>(1, [](auto& cache) mutable {
+ parallelTest<TestValueCache>(1, [](auto& cache) {
const int key = 100;
cache.insertOrAssign(key, TestValue{"Parallel tester value"});
@@ -501,7 +504,7 @@ TEST(InvalidatingLRUCacheParallelTest, InsertOrAssignAndGet) {
}
TEST(InvalidatingLRUCacheParallelTest, CacheSizeZeroInsertOrAssignAndGet) {
- parallelTest<TestValueCache>(0, [](auto& cache) mutable {
+ parallelTest<TestValueCache>(0, [](auto& cache) {
const int key = 300;
auto cachedItem = cache.insertOrAssignAndGet(key, TestValue{"Parallel tester value"});
ASSERT(cachedItem);
@@ -511,12 +514,18 @@ TEST(InvalidatingLRUCacheParallelTest, CacheSizeZeroInsertOrAssignAndGet) {
}
TEST(InvalidatingLRUCacheParallelTest, AdvanceTime) {
- AtomicWord<uint64_t> counter{0};
+ AtomicWord<uint64_t> counter{1};
+ Mutex insertOrAssignMutex = MONGO_MAKE_LATCH("ReadThroughCacheBase::_cancelTokenMutex");
- parallelTest<TestValueCacheCausallyConsistent>(0, [&counter](auto& cache) mutable {
+ parallelTest<TestValueCacheCausallyConsistent>(0, [&](auto& cache) {
const int key = 300;
- cache.insertOrAssign(
- key, TestValue{"Parallel tester value"}, Timestamp(counter.fetchAndAdd(1)));
+ {
+ // The calls to insertOrAssign must always pass strictly incrementing time
+ stdx::lock_guard lg(insertOrAssignMutex);
+ cache.insertOrAssign(
+ key, TestValue{"Parallel tester value"}, Timestamp(counter.fetchAndAdd(1)));
+ }
+
auto latestCached = cache.get(key, CacheCausalConsistency::kLatestCached);
auto latestKnown = cache.get(key, CacheCausalConsistency::kLatestKnown);
diff --git a/src/mongo/util/read_through_cache.h b/src/mongo/util/read_through_cache.h
index 3d5c7bf0923..72b3e7a5771 100644
--- a/src/mongo/util/read_through_cache.h
+++ b/src/mongo/util/read_through_cache.h
@@ -136,10 +136,12 @@ public:
*/
class ValueHandle {
public:
- // The two constructors below are present in order to offset the fact that the cache doesn't
- // support pinning items. Their only usage must be in the authorization mananager for the
- // internal authentication user.
+ // The three constructors below are present in order to offset the fact that the cache
+ // doesn't support pinning items. Their only usage must be in the authorization mananager
+ // for the internal authentication user.
ValueHandle(Value&& value) : _valueHandle({std::move(value), Date_t::min()}) {}
+ ValueHandle(Value&& value, const Time& t)
+ : _valueHandle({std::move(value), Date_t::min()}, t) {}
ValueHandle() = default;
operator bool() const {
@@ -289,6 +291,16 @@ public:
}
/**
+ * Acquires the latest value from the cache, or an empty ValueHandle if the key is not present
+ * in the cache.
+ *
+ * Doesn't attempt to lookup, and so doesn't block.
+ */
+ ValueHandle peekLatestCached(const Key& key) {
+ return {_cache.get(key, CacheCausalConsistency::kLatestCached)};
+ }
+
+ /**
* Invalidates the given 'key' and immediately replaces it with a new value.
*/
ValueHandle insertOrAssignAndGet(const Key& key, Value&& newValue, Date_t updateWallClockTime) {