summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2020-05-06 14:16:36 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-05-21 05:50:39 +0000
commita4c3cbd4060dc99e4aaa4a4472882e7995125431 (patch)
tree7fa1359778988ad1bad7bc6d136f9bd62b29fabf
parent0ab12830d07bc60523d4a21eb216ba4ab70a2be2 (diff)
downloadmongo-a4c3cbd4060dc99e4aaa4a4472882e7995125431.tar.gz
SERVER-46154 Make ReadThroughCache support causal consistency
-rw-r--r--src/mongo/util/invalidating_lru_cache.h210
-rw-r--r--src/mongo/util/invalidating_lru_cache_test.cpp128
-rw-r--r--src/mongo/util/read_through_cache.h193
-rw-r--r--src/mongo/util/read_through_cache_test.cpp325
4 files changed, 690 insertions, 166 deletions
diff --git a/src/mongo/util/invalidating_lru_cache.h b/src/mongo/util/invalidating_lru_cache.h
index e1dced30e31..60c2341a295 100644
--- a/src/mongo/util/invalidating_lru_cache.h
+++ b/src/mongo/util/invalidating_lru_cache.h
@@ -41,11 +41,56 @@
namespace mongo {
/**
- * Extension over 'LRUCache', which provides thread-safety, introspection and most importantly the
- * ability to mark entries as invalid to indicate to potential callers that they should not be used
- * anymore.
+ * Type indicating that the specific cache instance does not support causal consistency. To be used
+ * as the default 'Time' parameter to the 'InvalidatingLRUCache' template, indicating that the cache
+ * is not causally consistent.
*/
-template <typename Key, typename Value>
+struct CacheNotCausallyConsistent {
+ bool operator==(const CacheNotCausallyConsistent&) const {
+ return true;
+ }
+ bool operator!=(const CacheNotCausallyConsistent&) const {
+ return false;
+ }
+ bool operator>(const CacheNotCausallyConsistent&) const {
+ return false;
+ }
+ bool operator>=(const CacheNotCausallyConsistent&) const {
+ return true;
+ }
+ bool operator<(const CacheNotCausallyConsistent&) const {
+ return false;
+ }
+ bool operator<=(const CacheNotCausallyConsistent&) const {
+ return true;
+ }
+};
+
+/**
+ * Specifies the desired causal consistency for calls to 'get' (and 'acquire', respectively in the
+ * ReadThroughCache, which is its main consumer).
+ */
+enum class CacheCausalConsistency {
+ // Provides the fastest acquire semantics, where if the cache already contains a
+ // (non-invalidated) value cached, it will be immediately returned. Otherwise, the 'acquire'
+ // call will block.
+ kLatestCached,
+
+ // Provides a causally-consistent semantics with respect to a previous call to
+ // 'advanceTimeInStore', where if the cache's (non-invalidated) value has time == timeInStore,
+ // the value will be immediately returned. Otherwise, the 'acquire' call will block.
+ kLatestKnown,
+};
+
+/**
+ * Extension built on top of 'LRUCache', which provides thread-safety, introspection and most
+ * importantly the ability to invalidate each entry and/or associate a logical timestamp in order to
+ * indicate to potential callers that the entry should not be used anymore.
+ *
+ * The type for 'Time' must support 'operator <' and its default constructor 'Time()' must provide
+ * the lowest possible value for the time.
+ */
+template <typename Key, typename Value, typename Time = CacheNotCausallyConsistent>
class InvalidatingLRUCache {
/**
* Data structure representing the values stored in the cache.
@@ -58,11 +103,18 @@ class InvalidatingLRUCache {
StoredValue(InvalidatingLRUCache* owningCache,
uint64_t epoch,
boost::optional<Key>&& key,
- Value&& value)
+ Value&& value,
+ const Time& time,
+ const Time& timeInStore)
: owningCache(owningCache),
epoch(epoch),
key(std::move(key)),
- value(std::move(value)) {}
+ value(std::move(value)),
+ time(time),
+ timeInStore(timeInStore),
+ isValid(time == timeInStore) {
+ invariant(time <= timeInStore);
+ }
~StoredValue() {
if (!owningCache)
@@ -111,10 +163,19 @@ class InvalidatingLRUCache {
const boost::optional<Key> key;
Value value;
- // Initially set to true to indicate that the entry is valid and can be read without
- // synchronisation. Transitions to false only once, under `_mutex` in order to mark the
- // entry as invalid.
- AtomicWord<bool> isValid{true};
+ // Timestamp associated with the current 'value'. The semantics of the time is entirely up
+ // to the user of the cache, but it must be monotonically increasing for the same key.
+ const Time time;
+
+ // Timestamp which the store has indicated as available for 'key' (through a call to
+ // 'advanceTimeInStore'). Starts as equal to 'time' and always moves forward, under
+ // '_mutex'.
+ Time timeInStore;
+
+ // Can be read without synchronisation. Transitions to false only once, under `_mutex` in
+ // order to mark the entry as invalid either as a result of 'invalidate' or
+ // 'advanceTimeInStore'.
+ AtomicWord<bool> isValid;
};
using Cache = LRUCache<Key, std::shared_ptr<StoredValue>>;
@@ -139,7 +200,12 @@ public:
// support pinning items. Their only usage must be in the authorization mananager for the
// internal authentication user.
explicit ValueHandle(Value&& value)
- : _value(std::make_shared<StoredValue>(nullptr, 0, boost::none, std::move(value))) {}
+ : _value(std::make_shared<StoredValue>(nullptr,
+ 0,
+ boost::none,
+ std::move(value),
+ CacheNotCausallyConsistent(),
+ CacheNotCausallyConsistent())) {}
ValueHandle() = default;
@@ -188,13 +254,25 @@ public:
/**
* Inserts or updates a key with a new value. If 'key' was checked-out at the time this method
* was called, it will become invalidated.
+ *
+ * The 'time' parameter is mandatory for causally-consistent caches, but not needed otherwise
+ * (since the time never changes). Using a default of '= CacheNotCausallyConsistent()' allows
+ * non-causally-consistent users to not have to pass a second parameter, but would fail
+ * compilation if causally-consistent users forget to pass it.
*/
- void insertOrAssign(const Key& key, Value&& value) {
+ void insertOrAssign(const Key& key,
+ Value&& value,
+ const Time& time = CacheNotCausallyConsistent()) {
LockGuardWithPostUnlockDestructor guard(_mutex);
- _invalidate(&guard, key, _cache.find(key));
- if (auto evicted = _cache.add(
- key,
- std::make_shared<StoredValue>(this, ++_epoch, key, std::forward<Value>(value)))) {
+ Time timeInStore;
+ _invalidate(&guard, key, _cache.find(key), &timeInStore);
+ if (auto evicted = _cache.add(key,
+ std::make_shared<StoredValue>(this,
+ ++_epoch,
+ key,
+ std::forward<Value>(value),
+ time,
+ std::max(time, timeInStore)))) {
const auto& evictedKey = evicted->first;
auto& evictedValue = evicted->second;
@@ -222,13 +300,25 @@ public:
* For caches of size zero, this method will not cache the passed-in value, but it will be
* returned and the `get` method will continue returning it until all returned handles are
* destroyed.
+ *
+ * The 'time' parameter is mandatory for causally-consistent caches, but not needed otherwise
+ * (since the time never changes). Using a default of '= CacheNotCausallyConsistent()' allows
+ * non-causally-consistent users to not have to pass a second parameter, but would fail
+ * compilation if causally-consistent users forget to pass it.
*/
- ValueHandle insertOrAssignAndGet(const Key& key, Value&& value) {
+ ValueHandle insertOrAssignAndGet(const Key& key,
+ Value&& value,
+ const Time& time = CacheNotCausallyConsistent()) {
LockGuardWithPostUnlockDestructor guard(_mutex);
- _invalidate(&guard, key, _cache.find(key));
- if (auto evicted = _cache.add(
- key,
- std::make_shared<StoredValue>(this, ++_epoch, key, std::forward<Value>(value)))) {
+ Time timeInStore;
+ _invalidate(&guard, key, _cache.find(key), &timeInStore);
+ if (auto evicted = _cache.add(key,
+ std::make_shared<StoredValue>(this,
+ ++_epoch,
+ key,
+ std::forward<Value>(value),
+ time,
+ std::max(time, timeInStore)))) {
const auto& evictedKey = evicted->first;
auto& evictedValue = evicted->second;
@@ -266,19 +356,78 @@ public:
* it could still get evicted if the cache is under pressure. The returned handle must be
* destroyed before the owning cache object itself is destroyed.
*/
- ValueHandle get(const Key& key) {
+ ValueHandle get(
+ const Key& key,
+ CacheCausalConsistency causalConsistency = CacheCausalConsistency::kLatestCached) {
stdx::lock_guard<Latch> lg(_mutex);
- if (auto it = _cache.find(key); it != _cache.end())
- return ValueHandle(it->second);
+ std::shared_ptr<StoredValue> storedValue;
+ if (auto it = _cache.find(key); it != _cache.end()) {
+ storedValue = it->second;
+ } else if (auto it = _evictedCheckedOutValues.find(key);
+ it != _evictedCheckedOutValues.end()) {
+ storedValue = it->second.lock();
+ }
- if (auto it = _evictedCheckedOutValues.find(key); it != _evictedCheckedOutValues.end())
- return ValueHandle(it->second.lock());
+ if (causalConsistency == CacheCausalConsistency::kLatestKnown && storedValue &&
+ storedValue->time < storedValue->timeInStore)
+ return ValueHandle(nullptr);
+ return ValueHandle(std::move(storedValue));
+ }
- return ValueHandle(nullptr);
+ /**
+ * Indicates to the cache that the backing store contains a new value for the specified key,
+ * with a timestamp of 'newTimeInStore'.
+ *
+ * Any already returned ValueHandles will start returning isValid = false. Subsequent calls to
+ * 'get' with a causal consistency of 'kLatestCached' will continue to return the value, which
+ * is currently cached, but with isValid = false. Calls to 'get' with a causal consistency of
+ * 'kLatestKnown' will return no value. It is up to the caller to this function to subsequently
+ * either 'insertOrAssign' a new value for the 'key', or to call 'invalidate'.
+ */
+ void advanceTimeInStore(const Key& key, const Time& newTimeInStore) {
+ stdx::lock_guard<Latch> lg(_mutex);
+ std::shared_ptr<StoredValue> storedValue;
+ if (auto it = _cache.find(key); it != _cache.end()) {
+ storedValue = it->second;
+ } else if (auto it = _evictedCheckedOutValues.find(key);
+ it != _evictedCheckedOutValues.end()) {
+ storedValue = it->second.lock();
+ }
+
+ if (!storedValue)
+ return;
+
+ if (newTimeInStore > storedValue->timeInStore) {
+ storedValue->timeInStore = newTimeInStore;
+ storedValue->isValid.store(false);
+ }
+ }
+
+ /**
+ * If 'key' is in the store, returns its latest 'timeInStore', which can either be from the time
+ * of insertion or from the latest call to 'advanceTimeInStore'. Otherwise, returns Time().
+ */
+ Time getTimeInStore(const Key& key) {
+ stdx::lock_guard<Latch> lg(_mutex);
+ std::shared_ptr<StoredValue> storedValue;
+ if (auto it = _cache.find(key); it != _cache.end()) {
+ storedValue = it->second;
+ } else if (auto it = _evictedCheckedOutValues.find(key);
+ it != _evictedCheckedOutValues.end()) {
+ storedValue = it->second.lock();
+ }
+
+ if (storedValue)
+ return storedValue->timeInStore;
+
+ return Time();
}
/**
* Marks 'key' as invalid if it is found in the cache (whether checked-out or not).
+ *
+ * Any already returned ValueHandles will start returning isValid = false. Subsequent calls to
+ * 'get' will *not* return value for 'key' until the next call to 'insertOrAssign'.
*/
void invalidate(const Key& key) {
LockGuardWithPostUnlockDestructor guard(_mutex);
@@ -373,10 +522,13 @@ private:
*/
void _invalidate(LockGuardWithPostUnlockDestructor* guard,
const Key& key,
- typename Cache::iterator it) {
+ typename Cache::iterator it,
+ Time* outTimeInStore = nullptr) {
if (it != _cache.end()) {
auto& storedValue = it->second;
storedValue->isValid.store(false);
+ if (outTimeInStore)
+ *outTimeInStore = storedValue->timeInStore;
guard->releasePtr(std::move(storedValue));
_cache.erase(it);
return;
@@ -390,6 +542,8 @@ private:
// released and drops to zero
if (auto evictedValue = itEvicted->second.lock()) {
evictedValue->isValid.store(false);
+ if (outTimeInStore)
+ *outTimeInStore = evictedValue->timeInStore;
guard->releasePtr(std::move(evictedValue));
}
diff --git a/src/mongo/util/invalidating_lru_cache_test.cpp b/src/mongo/util/invalidating_lru_cache_test.cpp
index 939892f0c1e..5d6f53470db 100644
--- a/src/mongo/util/invalidating_lru_cache_test.cpp
+++ b/src/mongo/util/invalidating_lru_cache_test.cpp
@@ -54,6 +54,9 @@ struct TestValue {
using TestValueCache = InvalidatingLRUCache<int, TestValue>;
using TestValueHandle = TestValueCache::ValueHandle;
+using TestValueCacheCausallyConsistent = InvalidatingLRUCache<int, TestValue, Timestamp>;
+using TestValueHandleCausallyConsistent = TestValueCacheCausallyConsistent::ValueHandle;
+
TEST(InvalidatingLRUCacheTest, StandaloneValueHandle) {
TestValueHandle standaloneHandle({"Standalone value"});
ASSERT(standaloneHandle.isValid());
@@ -76,6 +79,29 @@ TEST(InvalidatingLRUCacheTest, ValueHandleOperators) {
}
}
+TEST(InvalidatingLRUCacheTest, CausalConsistency) {
+ TestValueCacheCausallyConsistent cache(1);
+
+ cache.insertOrAssign(2, TestValue("Value @ TS 100"), Timestamp(100));
+ ASSERT_EQ("Value @ TS 100", cache.get(2, CacheCausalConsistency::kLatestCached)->value);
+ ASSERT_EQ("Value @ TS 100", cache.get(2, CacheCausalConsistency::kLatestKnown)->value);
+
+ auto value = cache.get(2, CacheCausalConsistency::kLatestCached);
+ cache.advanceTimeInStore(2, Timestamp(200));
+ ASSERT_EQ("Value @ TS 100", value->value);
+ ASSERT(!value.isValid());
+ ASSERT_EQ("Value @ TS 100", cache.get(2, CacheCausalConsistency::kLatestCached)->value);
+ ASSERT(!cache.get(2, CacheCausalConsistency::kLatestCached).isValid());
+ ASSERT(!cache.get(2, CacheCausalConsistency::kLatestKnown));
+
+ // Intentionally push value for key with a timestamp higher than the one passed to advanceTime
+ cache.insertOrAssign(2, TestValue("Value @ TS 300"), Timestamp(300));
+ ASSERT_EQ("Value @ TS 100", value->value);
+ ASSERT(!value.isValid());
+ ASSERT_EQ("Value @ TS 300", cache.get(2, CacheCausalConsistency::kLatestCached)->value);
+ ASSERT_EQ("Value @ TS 300", cache.get(2, CacheCausalConsistency::kLatestKnown)->value);
+}
+
TEST(InvalidatingLRUCacheTest, InvalidateNonCheckedOutValue) {
TestValueCache cache(3);
@@ -233,6 +259,63 @@ TEST(InvalidatingLRUCacheTest, CheckedOutItemsAreInvalidatedWithPredicateWhenEvi
}
}
+TEST(InvalidatingLRUCacheTest, CausalConsistencyPreservedForEvictedCheckedOutKeys) {
+ TestValueCacheCausallyConsistent cache(1);
+
+ auto key1ValueAtTS10 =
+ cache.insertOrAssignAndGet(1, TestValue("Key 1 - Value @ TS 10"), Timestamp(10));
+
+ // This will evict key 1, but we have a handle to it, so it will stay accessible on the evicted
+ // list
+ cache.insertOrAssign(2, TestValue("Key 2 - Value @ TS 20"), Timestamp(20));
+
+ ASSERT_EQ(Timestamp(10), cache.getTimeInStore(1));
+ ASSERT_EQ("Key 1 - Value @ TS 10", key1ValueAtTS10->value);
+ ASSERT_EQ("Key 1 - Value @ TS 10", cache.get(1, CacheCausalConsistency::kLatestCached)->value);
+ ASSERT_EQ("Key 1 - Value @ TS 10", cache.get(1, CacheCausalConsistency::kLatestKnown)->value);
+
+ cache.advanceTimeInStore(1, Timestamp(11));
+ ASSERT_EQ(Timestamp(11), cache.getTimeInStore(1));
+ ASSERT(!key1ValueAtTS10.isValid());
+ ASSERT_EQ("Key 1 - Value @ TS 10", key1ValueAtTS10->value);
+ ASSERT_EQ("Key 1 - Value @ TS 10", cache.get(1, CacheCausalConsistency::kLatestCached)->value);
+ ASSERT(!cache.get(1, CacheCausalConsistency::kLatestKnown));
+
+ cache.insertOrAssign(1, TestValue("Key 1 - Value @ TS 12"), Timestamp(12));
+ ASSERT_EQ("Key 1 - Value @ TS 12", cache.get(1, CacheCausalConsistency::kLatestCached)->value);
+ ASSERT_EQ("Key 1 - Value @ TS 12", cache.get(1, CacheCausalConsistency::kLatestKnown)->value);
+}
+
+TEST(InvalidatingLRUCacheTest, InvalidateAfterAdvanceTime) {
+ TestValueCacheCausallyConsistent cache(1);
+
+ cache.insertOrAssign(20, TestValue("Value @ TS 200"), Timestamp(200));
+ cache.advanceTimeInStore(20, Timestamp(250));
+ ASSERT_EQ("Value @ TS 200", cache.get(20, CacheCausalConsistency::kLatestCached)->value);
+ ASSERT(!cache.get(20, CacheCausalConsistency::kLatestKnown));
+
+ cache.invalidate(20);
+ ASSERT(!cache.get(20, CacheCausalConsistency::kLatestCached));
+ ASSERT(!cache.get(20, CacheCausalConsistency::kLatestKnown));
+}
+
+TEST(InvalidatingLRUCacheTest, InsertEntryAtTimeLessThanAdvanceTime) {
+ TestValueCacheCausallyConsistent cache(1);
+
+ cache.insertOrAssign(20, TestValue("Value @ TS 200"), Timestamp(200));
+ cache.advanceTimeInStore(20, Timestamp(300));
+ ASSERT_EQ("Value @ TS 200", cache.get(20, CacheCausalConsistency::kLatestCached)->value);
+ ASSERT(!cache.get(20, CacheCausalConsistency::kLatestKnown));
+
+ cache.insertOrAssign(20, TestValue("Value @ TS 250"), Timestamp(250));
+ ASSERT_EQ("Value @ TS 250", cache.get(20, CacheCausalConsistency::kLatestCached)->value);
+ ASSERT(!cache.get(20, CacheCausalConsistency::kLatestKnown));
+
+ cache.insertOrAssign(20, TestValue("Value @ TS 300"), Timestamp(300));
+ ASSERT_EQ("Value @ TS 300", cache.get(20, CacheCausalConsistency::kLatestCached)->value);
+ ASSERT_EQ("Value @ TS 300", cache.get(20, CacheCausalConsistency::kLatestKnown)->value);
+}
+
TEST(InvalidatingLRUCacheTest, OrderOfDestructionOfHandlesDiffersFromOrderOfInsertion) {
TestValueCache cache(1);
@@ -339,12 +422,33 @@ TEST(InvalidatingLRUCacheTest, CacheSizeZeroInvalidateAllEntries) {
}
}
-template <typename TestFunc>
+TEST(InvalidatingLRUCacheTest, CacheSizeZeroCausalConsistency) {
+ TestValueCacheCausallyConsistent cache(0);
+
+ cache.advanceTimeInStore(100, Timestamp(30));
+ cache.insertOrAssign(100, TestValue("Value @ TS 30"), Timestamp(30));
+ ASSERT_EQ(Timestamp(), cache.getTimeInStore(100));
+
+ auto valueAtTS30 = cache.insertOrAssignAndGet(100, TestValue("Value @ TS 30"), Timestamp(30));
+ ASSERT_EQ("Value @ TS 30", cache.get(100, CacheCausalConsistency::kLatestCached)->value);
+ ASSERT_EQ("Value @ TS 30", cache.get(100, CacheCausalConsistency::kLatestKnown)->value);
+
+ cache.advanceTimeInStore(100, Timestamp(35));
+ ASSERT_EQ(Timestamp(35), cache.getTimeInStore(100));
+ ASSERT_EQ("Value @ TS 30", cache.get(100, CacheCausalConsistency::kLatestCached)->value);
+ ASSERT(!cache.get(100, CacheCausalConsistency::kLatestKnown));
+
+ auto valueAtTS40 = cache.insertOrAssignAndGet(100, TestValue("Value @ TS 40"), Timestamp(40));
+ ASSERT_EQ("Value @ TS 40", cache.get(100, CacheCausalConsistency::kLatestCached)->value);
+ ASSERT_EQ("Value @ TS 40", cache.get(100, CacheCausalConsistency::kLatestKnown)->value);
+}
+
+template <class TCache, typename TestFunc>
void parallelTest(size_t cacheSize, TestFunc doTest) {
constexpr auto kNumIterations = 100'000;
constexpr auto kNumThreads = 4;
- TestValueCache cache(cacheSize);
+ TCache cache(cacheSize);
std::vector<stdx::thread> threads;
for (int i = 0; i < kNumThreads; i++) {
@@ -361,7 +465,7 @@ void parallelTest(size_t cacheSize, TestFunc doTest) {
}
TEST(InvalidatingLRUCacheParallelTest, InsertOrAssignThenGet) {
- parallelTest(1, [](TestValueCache& cache) mutable {
+ parallelTest<TestValueCache>(1, [](auto& cache) mutable {
const int key = 100;
cache.insertOrAssign(key, TestValue{"Parallel tester value"});
@@ -378,7 +482,7 @@ TEST(InvalidatingLRUCacheParallelTest, InsertOrAssignThenGet) {
}
TEST(InvalidatingLRUCacheParallelTest, InsertOrAssignAndGet) {
- parallelTest(1, [](auto& cache) {
+ parallelTest<TestValueCache>(1, [](auto& cache) {
const int key = 200;
auto cachedItem = cache.insertOrAssignAndGet(key, TestValue{"Parallel tester value"});
ASSERT(cachedItem);
@@ -389,7 +493,7 @@ TEST(InvalidatingLRUCacheParallelTest, InsertOrAssignAndGet) {
}
TEST(InvalidatingLRUCacheParallelTest, CacheSizeZeroInsertOrAssignAndGet) {
- parallelTest(0, [](TestValueCache& cache) mutable {
+ parallelTest<TestValueCache>(0, [](auto& cache) mutable {
const int key = 300;
auto cachedItem = cache.insertOrAssignAndGet(key, TestValue{"Parallel tester value"});
ASSERT(cachedItem);
@@ -398,5 +502,19 @@ TEST(InvalidatingLRUCacheParallelTest, CacheSizeZeroInsertOrAssignAndGet) {
});
}
+TEST(InvalidatingLRUCacheParallelTest, AdvanceTime) {
+ AtomicWord<uint64_t> counter{0};
+
+ parallelTest<TestValueCacheCausallyConsistent>(0, [&counter](auto& cache) mutable {
+ const int key = 300;
+ cache.insertOrAssign(
+ key, TestValue{"Parallel tester value"}, Timestamp(counter.fetchAndAdd(1)));
+ auto latestCached = cache.get(key, CacheCausalConsistency::kLatestCached);
+ auto latestKnown = cache.get(key, CacheCausalConsistency::kLatestKnown);
+
+ cache.advanceTimeInStore(key, Timestamp(counter.fetchAndAdd(1)));
+ });
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/util/read_through_cache.h b/src/mongo/util/read_through_cache.h
index 29365a6b72c..5d0b356be86 100644
--- a/src/mongo/util/read_through_cache.h
+++ b/src/mongo/util/read_through_cache.h
@@ -98,9 +98,13 @@ protected:
};
/**
- * Implements a generic read-through cache built on top of InvalidatingLRUCache.
+ * Implements an (optionally) causally consistent read-through cache from Key to Value, built on top
+ * of InvalidatingLRUCache.
+ *
+ * Causal consistency is provided by requiring the backing store to asociate every Value it returns
+ * with a logical timestamp of type Time.
*/
-template <typename Key, typename Value>
+template <typename Key, typename Value, typename Time = CacheNotCausallyConsistent>
class ReadThroughCache : public ReadThroughCacheBase {
/**
* Data structure wrapping and expanding on the values stored in the cache.
@@ -113,7 +117,7 @@ class ReadThroughCache : public ReadThroughCacheBase {
// relied on to perform any recency comparisons for example).
Date_t updateWallClockTime;
};
- using Cache = InvalidatingLRUCache<Key, StoredValue>;
+ using Cache = InvalidatingLRUCache<Key, StoredValue, Time>;
public:
/**
@@ -180,14 +184,27 @@ public:
*
* The implementation must throw a uassertion to indicate an error while looking up the value,
* return boost::none if the key is not found, or return an actual value.
+ *
+ * See the comments on 'advanceTimeInStore' for additional requirements that this function must
+ * fulfill with respect to causal consistency.
*/
struct LookupResult {
- explicit LookupResult(boost::optional<Value>&& v) : v(std::move(v)) {}
+ // The 't' parameter is mandatory for causally-consistent caches, but not needed otherwise
+ // (since the time never changes). Using a default of '= CacheNotCausallyConsistent()'
+ // allows non-causally-consistent users to not have to pass a second parameter, but would
+ // fail compilation if causally-consistent users forget to pass it.
+ explicit LookupResult(boost::optional<Value>&& v, Time t = CacheNotCausallyConsistent())
+ : v(std::move(v)), t(std::move(t)) {}
LookupResult(LookupResult&&) = default;
LookupResult& operator=(LookupResult&&) = default;
// If boost::none, it means the '_lookupFn' did not find the key in the store
boost::optional<Value> v;
+
+ // If value is boost::none, specifies the time which was passed to '_lookupFn', effectively
+ // meaning, at least as of 'time', there was no entry in the store for the key. Otherwise
+ // contains the time that the store returned for the 'value'.
+ Time t;
};
using LookupFn = unique_function<LookupResult(OperationContext*, const Key&)>;
@@ -196,10 +213,11 @@ public:
class InProgressLookup;
/**
- * If 'key' is found in the cache, returns a set ValueHandle (its operator bool will be true).
- * Otherwise, either causes the blocking 'lookup' below to be asynchronously invoked to fetch
- * 'key' from the backing store (or joins an already scheduled invocation) and returns a future
- * which will be signaled when the lookup completes.
+ * If 'key' is found in the cache and it fulfills the requested 'causalConsistency', returns a
+ * set ValueHandle (its operator bool will be true). Otherwise, either causes the blocking
+ * 'LookupFn' to be asynchronously invoked to fetch 'key' from the backing store or joins an
+ * already scheduled invocation) and returns a future which will be signaled when the lookup
+ * completes.
*
* If the lookup is successful and 'key' is found in the store, it will be cached (so subsequent
* lookups won't have to re-fetch it) and the future will be set. If 'key' is not found in the
@@ -211,27 +229,31 @@ public:
* The returned value may be invalid by the time the caller gets to access it, if 'invalidate'
* is called for 'key'.
*/
- SharedSemiFuture<ValueHandle> acquireAsync(const Key& key) {
+ SharedSemiFuture<ValueHandle> acquireAsync(
+ const Key& key,
+ CacheCausalConsistency causalConsistency = CacheCausalConsistency::kLatestCached) {
// Fast path
- if (auto cachedValue = _cache.get(key))
+ if (auto cachedValue = _cache.get(key, causalConsistency))
return {std::move(cachedValue)};
stdx::unique_lock ul(_mutex);
// Re-check the cache under a mutex, before kicking-off the asynchronous lookup
- if (auto cachedValue = _cache.get(key))
+ if (auto cachedValue = _cache.get(key, causalConsistency))
return {std::move(cachedValue)};
+ Time minTime = _cache.getTimeInStore(key);
+
// Join an in-progress lookup if one has already been scheduled
if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end())
- return it->second->addWaiter(ul);
+ return it->second->addWaiter(ul, minTime);
// Schedule an asynchronous lookup for the key
auto [it, emplaced] =
_inProgressLookups.emplace(key, std::make_unique<InProgressLookup>(*this, key));
invariant(emplaced);
auto& inProgressLookup = *it->second;
- auto sharedFutureToReturn = inProgressLookup.addWaiter(ul);
+ auto sharedFutureToReturn = inProgressLookup.addWaiter(ul, minTime);
ul.unlock();
@@ -246,8 +268,11 @@ public:
* NOTES:
* This is a potentially blocking method.
*/
- ValueHandle acquire(OperationContext* opCtx, const Key& key) {
- return acquireAsync(key).get(opCtx);
+ ValueHandle acquire(
+ OperationContext* opCtx,
+ const Key& key,
+ CacheCausalConsistency causalConsistency = CacheCausalConsistency::kLatestCached) {
+ return acquireAsync(key, causalConsistency).get(opCtx);
}
/**
@@ -261,6 +286,20 @@ public:
}
/**
+ * Indicates to the cache that the backing store has a newer version of 'key', corresponding to
+ * 'newTime'. Subsequent calls to 'acquireAsync' with a causal consistency set to 'LatestKnown'
+ * will block and perform refresh until the cached value reaches 'newTime'.
+ *
+ * With respect to causal consistency, the 'LookupFn' used for this cache must provide the
+ * guarantee that if 'advanceTimeInStore' is called with a 'newTime', a subsequent call to
+ * 'LookupFn' for 'key' must return at least 'newTime' or later.
+ */
+ void advanceTimeInStore(const Key& key, const Time& newTime) {
+ stdx::lock_guard lg(_mutex);
+ _cache.advanceTimeInStore(key, newTime);
+ }
+
+ /**
* The invalidate methods below guarantee the following:
* - All affected keys already in the cache (or returned to callers) will be invalidated and
* removed from the cache
@@ -341,31 +380,62 @@ private:
stdx::unique_lock ul(_mutex);
auto it = _inProgressLookups.find(key);
invariant(it != _inProgressLookups.end());
- if (!ErrorCodes::isCancelationError(sw.getStatus()) && !it->second->valid(ul)) {
- ul.unlock(); // asyncLookupRound also acquires the mutex
- return it->second->asyncLookupRound().onCompletion(
- [this, key](auto sw) { return _doLookupWhileNotValid(key, std::move(sw)); });
- }
-
- // The detachment of the currently active lookup and the placement of the result on the
- // '_cache' has to be atomic with respect to a concurrent call to 'invalidate'
- auto inProgressLookup(std::move(it->second));
- _inProgressLookups.erase(it);
-
- StatusWith<ValueHandle> swValueHandle(ErrorCodes::Error(461541), "");
- if (sw.isOK()) {
- auto result = std::move(sw.getValue());
- swValueHandle = result.v
- ? ValueHandle(_cache.insertOrAssignAndGet(key, {std::move(*result.v), _now()}))
- : ValueHandle();
- } else {
- swValueHandle = sw.getStatus();
- }
+ auto& inProgressLookup = *it->second;
+ auto [promisesToSet, result, mustDoAnotherLoop] = [&] {
+ // The thread pool is shutting down, so terminate the loop
+ if (ErrorCodes::isCancelationError(sw.getStatus()))
+ return std::make_tuple(inProgressLookup.getAllPromisesOnError(ul),
+ StatusWith<ValueHandle>(sw.getStatus()),
+ false);
+
+ // There was a concurrent call to 'invalidate', so start all over
+ if (!inProgressLookup.valid(ul))
+ return std::make_tuple(
+ std::vector<std::unique_ptr<SharedPromise<ValueHandle>>>{},
+ StatusWith<ValueHandle>(Status(ErrorCodes::Error(461541), "")),
+ true);
+
+ // Lookup resulted in an error, which is not cancelation
+ if (!sw.isOK())
+ return std::make_tuple(inProgressLookup.getAllPromisesOnError(ul),
+ StatusWith<ValueHandle>(sw.getStatus()),
+ false);
+
+ // Value (or boost::none) was returned by lookup and there was no concurrent call to
+ // 'invalidate'. Place the value on the cache and return the necessary promises to
+ // signal (those which are waiting for time < time at the store).
+ auto& result = sw.getValue();
+ auto promisesToSet = inProgressLookup.getPromisesLessThanTime(ul, result.t);
+ return std::make_tuple(
+ std::move(promisesToSet),
+ StatusWith<ValueHandle>(result.v
+ ? ValueHandle(_cache.insertOrAssignAndGet(
+ key, {std::move(*result.v), _now()}, result.t))
+ : ValueHandle()),
+ !inProgressLookup.empty(ul));
+ }();
+
+ if (!mustDoAnotherLoop)
+ _inProgressLookups.erase(it);
ul.unlock();
- inProgressLookup->signalWaiters(std::move(swValueHandle));
+ // The only reason this loop pops the values as it goes and std::moves into the last value
+ // is to support the CacheSizeZero unit-test, which requires that once the future it waits
+ // on is set, it contains the last reference on the returned ValueHandle
+ while (!promisesToSet.empty()) {
+ auto p(std::move(promisesToSet.back()));
+ promisesToSet.pop_back();
+
+ if (promisesToSet.empty())
+ p->setFromStatusWith(std::move(result));
+ else
+ p->setFromStatusWith(result);
+ }
- return Future<LookupResult>::makeReady(Status(ErrorCodes::Error(461542), ""));
+ return mustDoAnotherLoop
+ ? inProgressLookup.asyncLookupRound().onCompletion(
+ [this, key](auto sw) { return _doLookupWhileNotValid(key, std::move(sw)); })
+ : Future<LookupResult>::makeReady(Status(ErrorCodes::Error(461542), ""));
}
// Blocking function which will be invoked to retrieve entries from the backing store
@@ -381,7 +451,8 @@ private:
// Keeps track of all the keys, which were attempted to be 'acquireAsync'-ed, weren't found in
// the cache and are currently in the process of being looked up from the backing store. A
- // single key may only be on this map or in '_cache', but never in both.
+ // single key may be missing from '_cache', or contain an old 'kLatestCached' and have an active
+ // lookup on this map for 'kLatestKnown'.
//
// This map is protected by '_mutex'.
InProgressLookupsMap _inProgressLookups;
@@ -411,8 +482,8 @@ private:
* inProgress.signalWaiters(result);
* }
*/
-template <typename Key, typename Value>
-class ReadThroughCache<Key, Value>::InProgressLookup {
+template <typename Key, typename Value, typename Time>
+class ReadThroughCache<Key, Value, Time>::InProgressLookup {
public:
InProgressLookup(ReadThroughCache& cache, Key key) : _cache(cache), _key(std::move(key)) {}
@@ -432,25 +503,50 @@ public:
return std::move(future);
}
- SharedSemiFuture<ValueHandle> addWaiter(WithLock) {
- return _sharedPromise.getFuture();
+ SharedSemiFuture<ValueHandle> addWaiter(WithLock, Time time) {
+ auto [it, unusedEmplaced] =
+ _outstanding.try_emplace(time, std::make_unique<SharedPromise<ValueHandle>>());
+ return it->second->getFuture();
}
bool valid(WithLock) const {
return _valid;
}
+ std::vector<std::unique_ptr<SharedPromise<ValueHandle>>> getAllPromisesOnError(WithLock) {
+ invariant(_valid);
+ std::vector<std::unique_ptr<SharedPromise<ValueHandle>>> ret;
+ for (auto it = _outstanding.begin(); it != _outstanding.end();) {
+ ret.emplace_back(std::move(it->second));
+ it = _outstanding.erase(it);
+ }
+ return ret;
+ }
+
+ std::vector<std::unique_ptr<SharedPromise<ValueHandle>>> getPromisesLessThanTime(WithLock,
+ Time time) {
+ invariant(_valid);
+ std::vector<std::unique_ptr<SharedPromise<ValueHandle>>> ret;
+ for (auto it = _outstanding.begin(); it != _outstanding.end();) {
+ if (it->first > time)
+ break;
+ ret.emplace_back(std::move(it->second));
+ it = _outstanding.erase(it);
+ }
+ return ret;
+ }
+
+ bool empty(WithLock) const {
+ invariant(_valid);
+ return _outstanding.empty();
+ }
+
void invalidateAndCancelCurrentLookupRound(WithLock) {
_valid = false;
if (_cancelToken)
_cancelToken->tryCancel();
}
- void signalWaiters(StatusWith<ValueHandle> swValueHandle) {
- invariant(_valid);
- _sharedPromise.setFromStatusWith(std::move(swValueHandle));
- }
-
private:
// The owning cache, from which mutex, lookupFn, async task scheduling, etc. will be used. It is
// the responsibility of the owning cache to join all outstanding lookups at destruction time.
@@ -461,7 +557,8 @@ private:
bool _valid{false};
boost::optional<CancelToken> _cancelToken;
- SharedPromise<ValueHandle> _sharedPromise;
+ using TimeAndPromiseMap = std::map<Time, std::unique_ptr<SharedPromise<ValueHandle>>>;
+ TimeAndPromiseMap _outstanding;
};
} // namespace mongo
diff --git a/src/mongo/util/read_through_cache_test.cpp b/src/mongo/util/read_through_cache_test.cpp
index d196290bef8..8f479f4aea8 100644
--- a/src/mongo/util/read_through_cache_test.cpp
+++ b/src/mongo/util/read_through_cache_test.cpp
@@ -55,7 +55,39 @@ struct CachedValue {
class Cache : public ReadThroughCache<std::string, CachedValue> {
public:
Cache(ServiceContext* service, ThreadPoolInterface& threadPool, size_t size, LookupFn lookupFn)
- : ReadThroughCache(_mutex, service, threadPool, std::move(lookupFn), size) {}
+ : ReadThroughCache(_mutex,
+ service,
+ threadPool,
+ [this, lookupFn = std::move(lookupFn)](OperationContext* opCtx,
+ const std::string& key) {
+ ++countLookups;
+ return lookupFn(opCtx, key);
+ },
+ size) {}
+
+ int countLookups{0};
+
+private:
+ Mutex _mutex = MONGO_MAKE_LATCH("ReadThroughCacheTest::Cache");
+};
+
+class CausallyConsistentCache : public ReadThroughCache<std::string, CachedValue, Timestamp> {
+public:
+ CausallyConsistentCache(ServiceContext* service,
+ ThreadPoolInterface& threadPool,
+ size_t size,
+ LookupFn lookupFn)
+ : ReadThroughCache(_mutex,
+ service,
+ threadPool,
+ [this, lookupFn = std::move(lookupFn)](OperationContext* opCtx,
+ const std::string& key) {
+ ++countLookups;
+ return lookupFn(opCtx, key);
+ },
+ size) {}
+
+ int countLookups{0};
private:
Mutex _mutex = MONGO_MAKE_LATCH("ReadThroughCacheTest::Cache");
@@ -67,12 +99,14 @@ private:
*/
class ReadThroughCacheTest : public ServiceContextTest {
protected:
- // Extends Cache and automatically provides it with a thread pool, which will be shutdown and
- // joined before the Cache is destroyed (which is part of the contract of ReadThroughCache)
- class CacheWithThreadPool : public Cache {
+ // Extends any of Cache/CausallyConsistentCache and automatically provides it with a thread
+ // pool, which will be shutdown and joined before the Cache is destroyed (which is part of the
+ // contract of ReadThroughCache)
+ template <class T>
+ class CacheWithThreadPool : public T {
public:
- CacheWithThreadPool(ServiceContext* service, size_t size, LookupFn lookupFn)
- : Cache(service, _threadPool, size, std::move(lookupFn)) {
+ CacheWithThreadPool(ServiceContext* service, size_t size, typename T::LookupFn lookupFn)
+ : T(service, _threadPool, size, std::move(lookupFn)) {
_threadPool.startup();
}
@@ -97,89 +131,162 @@ TEST(ReadThroughCacheTest, StandaloneValueHandle) {
}
TEST_F(ReadThroughCacheTest, FetchInvalidateAndRefetch) {
- int countLookups = 0;
- CacheWithThreadPool cache(
- getServiceContext(), 1, [&](OperationContext*, const std::string& key) {
- ASSERT_EQ("TestKey", key);
- countLookups++;
- return Cache::LookupResult(CachedValue(100 * countLookups));
- });
+ auto fnTest = [&](auto cache) {
+ for (int i = 1; i <= 3; i++) {
+ auto value = cache.acquire(_opCtx, "TestKey");
+ ASSERT(value);
+ ASSERT_EQ(100 * i, value->counter);
+ ASSERT_EQ(i, cache.countLookups);
- for (int i = 1; i <= 3; i++) {
- auto value = cache.acquire(_opCtx, "TestKey");
- ASSERT(value);
- ASSERT_EQ(100 * i, value->counter);
- ASSERT_EQ(i, countLookups);
+ ASSERT(cache.acquire(_opCtx, "TestKey"));
+ ASSERT_EQ(i, cache.countLookups);
- ASSERT(cache.acquire(_opCtx, "TestKey"));
- ASSERT_EQ(i, countLookups);
+ cache.invalidate("TestKey");
+ }
+ };
- cache.invalidate("TestKey");
- }
+ fnTest(CacheWithThreadPool<Cache>(
+ getServiceContext(),
+ 1,
+ [&, nextValue = 0](OperationContext*, const std::string& key) mutable {
+ ASSERT_EQ("TestKey", key);
+ return Cache::LookupResult(CachedValue(100 * ++nextValue));
+ }));
+
+ fnTest(CacheWithThreadPool<CausallyConsistentCache>(
+ getServiceContext(),
+ 1,
+ [&, nextValue = 0](OperationContext*, const std::string& key) mutable {
+ ASSERT_EQ("TestKey", key);
+ ++nextValue;
+ return CausallyConsistentCache::LookupResult(CachedValue(100 * nextValue),
+ Timestamp(nextValue));
+ }));
}
TEST_F(ReadThroughCacheTest, FailedLookup) {
- CacheWithThreadPool cache(
+ auto fnTest = [&](auto cache) {
+ ASSERT_THROWS_CODE(
+ cache.acquire(_opCtx, "TestKey"), DBException, ErrorCodes::InternalError);
+ };
+
+ fnTest(CacheWithThreadPool<Cache>(
getServiceContext(),
1,
[&](OperationContext*, const std::string& key) -> Cache::LookupResult {
uasserted(ErrorCodes::InternalError, "Test error");
- });
+ }));
- ASSERT_THROWS_CODE(cache.acquire(_opCtx, "TestKey"), DBException, ErrorCodes::InternalError);
+ fnTest(CacheWithThreadPool<CausallyConsistentCache>(
+ getServiceContext(),
+ 1,
+ [&](OperationContext*, const std::string& key) -> CausallyConsistentCache::LookupResult {
+ uasserted(ErrorCodes::InternalError, "Test error");
+ }));
}
TEST_F(ReadThroughCacheTest, CacheSizeZero) {
- int countLookups = 0;
- CacheWithThreadPool cache(
- getServiceContext(), 0, [&](OperationContext*, const std::string& key) {
+ auto fnTest = [&](auto cache) {
+ for (int i = 1; i <= 3; i++) {
+ auto value = cache.acquire(_opCtx, "TestKey", CacheCausalConsistency::kLatestKnown);
+ ASSERT(value);
+ ASSERT_EQ(100 * i, value->counter);
+ ASSERT_EQ(i, cache.countLookups);
+ }
+ };
+
+ fnTest(CacheWithThreadPool<Cache>(
+ getServiceContext(),
+ 0,
+ [&, nextValue = 0](OperationContext*, const std::string& key) mutable {
ASSERT_EQ("TestKey", key);
- countLookups++;
- return Cache::LookupResult(CachedValue(100 * countLookups));
- });
+ return Cache::LookupResult(CachedValue(100 * ++nextValue));
+ }));
- for (int i = 1; i <= 3; i++) {
- auto value = cache.acquire(_opCtx, "TestKey");
- ASSERT(value);
- ASSERT_EQ(100 * i, value->counter);
- ASSERT_EQ(i, countLookups);
- }
+ fnTest(CacheWithThreadPool<CausallyConsistentCache>(
+ getServiceContext(),
+ 0,
+ [&, nextValue = 0](OperationContext*, const std::string& key) mutable {
+ ASSERT_EQ("TestKey", key);
+ ++nextValue;
+ return CausallyConsistentCache::LookupResult(CachedValue(100 * nextValue),
+ Timestamp(nextValue));
+ }));
}
TEST_F(ReadThroughCacheTest, InvalidateCacheSizeZeroReissuesLookup) {
- int countLookups = 0;
- CacheWithThreadPool cache(
- getServiceContext(), 0, [&](OperationContext*, const std::string& key) {
- ASSERT_EQ("TestKey", key);
- countLookups++;
- return Cache::LookupResult(CachedValue(1000 * countLookups));
- });
+ auto fnTest = [&](auto cache) {
+ auto value = cache.acquire(_opCtx, "TestKey");
+ ASSERT(value);
+ ASSERT_EQ(1000, value->counter);
+ ASSERT_EQ(1, cache.countLookups);
- auto value = cache.acquire(_opCtx, "TestKey");
- ASSERT(value);
- ASSERT_EQ(1000, value->counter);
- ASSERT_EQ(1, countLookups);
+ // Because 'value' above is held alive, the cache will not perform lookup until it is
+ // destroyed
+ ASSERT_EQ(1000, cache.acquire(_opCtx, "TestKey")->counter);
+ ASSERT_EQ(1, cache.countLookups);
- // Because 'value' above is held alive, the cache will not perform lookup until it is destroyed
- ASSERT_EQ(1000, cache.acquire(_opCtx, "TestKey")->counter);
- ASSERT_EQ(1, countLookups);
+ cache.invalidate("TestKey");
+ auto valueAfterInvalidate = cache.acquire(_opCtx, "TestKey");
+ ASSERT(!value.isValid());
+ ASSERT(valueAfterInvalidate);
+ ASSERT_EQ(2000, valueAfterInvalidate->counter);
+ ASSERT_EQ(2, cache.countLookups);
+ };
- cache.invalidate("TestKey");
- auto valueAfterInvalidate = cache.acquire(_opCtx, "TestKey");
- ASSERT(!value.isValid());
- ASSERT(valueAfterInvalidate);
- ASSERT_EQ(2000, valueAfterInvalidate->counter);
- ASSERT_EQ(2, countLookups);
+ fnTest(CacheWithThreadPool<Cache>(
+ getServiceContext(),
+ 0,
+ [&, nextValue = 0](OperationContext*, const std::string& key) mutable {
+ ASSERT_EQ("TestKey", key);
+ return Cache::LookupResult(CachedValue(1000 * ++nextValue));
+ }));
+
+ fnTest(CacheWithThreadPool<CausallyConsistentCache>(
+ getServiceContext(),
+ 0,
+ [&, nextValue = 0](OperationContext*, const std::string& key) mutable {
+ ASSERT_EQ("TestKey", key);
+ ++nextValue;
+ return CausallyConsistentCache::LookupResult(CachedValue(1000 * nextValue),
+ Timestamp(nextValue));
+ }));
}
TEST_F(ReadThroughCacheTest, KeyDoesNotExist) {
- CacheWithThreadPool cache(
+ auto fnTest = [&](auto cache) { ASSERT(!cache.acquire(_opCtx, "TestKey")); };
+
+ fnTest(CacheWithThreadPool<Cache>(
getServiceContext(), 1, [&](OperationContext*, const std::string& key) {
ASSERT_EQ("TestKey", key);
return Cache::LookupResult(boost::none);
+ }));
+
+ fnTest(CacheWithThreadPool<CausallyConsistentCache>(
+ getServiceContext(), 1, [&](OperationContext*, const std::string& key) {
+ ASSERT_EQ("TestKey", key);
+ return CausallyConsistentCache::LookupResult(boost::none, Timestamp(10));
+ }));
+}
+
+TEST_F(ReadThroughCacheTest, CausalConsistency) {
+ boost::optional<CausallyConsistentCache::LookupResult> nextToReturn;
+ CacheWithThreadPool<CausallyConsistentCache> cache(
+ getServiceContext(), 1, [&](OperationContext*, const std::string& key) {
+ ASSERT_EQ("TestKey", key);
+ return CausallyConsistentCache::LookupResult(std::move(*nextToReturn));
});
- ASSERT(!cache.acquire(_opCtx, "TestKey"));
+ nextToReturn.emplace(CachedValue(10), Timestamp(10));
+ ASSERT_EQ(10, cache.acquire(_opCtx, "TestKey", CacheCausalConsistency::kLatestCached)->counter);
+ ASSERT_EQ(10, cache.acquire(_opCtx, "TestKey", CacheCausalConsistency::kLatestKnown)->counter);
+
+ nextToReturn.emplace(CachedValue(20), Timestamp(20));
+ cache.advanceTimeInStore("TestKey", Timestamp(20));
+ ASSERT_EQ(10, cache.acquire(_opCtx, "TestKey", CacheCausalConsistency::kLatestCached)->counter);
+ ASSERT(!cache.acquire(_opCtx, "TestKey", CacheCausalConsistency::kLatestCached).isValid());
+ ASSERT_EQ(20, cache.acquire(_opCtx, "TestKey", CacheCausalConsistency::kLatestKnown)->counter);
+ ASSERT(cache.acquire(_opCtx, "TestKey", CacheCausalConsistency::kLatestKnown).isValid());
}
/**
@@ -190,7 +297,7 @@ class ReadThroughCacheAsyncTest : public unittest::Test,
using Barrier = unittest::Barrier;
-TEST_F(ReadThroughCacheAsyncTest, SuccessfulInProgressLookup) {
+TEST_F(ReadThroughCacheAsyncTest, SuccessfulInProgressLookupForNotCausallyConsistentCache) {
ThreadPool threadPool{ThreadPool::Options()};
threadPool.startup();
@@ -199,20 +306,23 @@ TEST_F(ReadThroughCacheAsyncTest, SuccessfulInProgressLookup) {
});
Cache::InProgressLookup inProgress(cache, "TestKey");
- auto future = inProgress.addWaiter(WithLock::withoutLock());
+ auto future = inProgress.addWaiter(WithLock::withoutLock(), CacheNotCausallyConsistent());
ASSERT(!future.isReady());
- auto optVal = inProgress.asyncLookupRound().get();
+ auto res = inProgress.asyncLookupRound().get();
ASSERT(inProgress.valid(WithLock::withoutLock()));
- ASSERT(optVal.v);
- ASSERT_EQ(500, optVal.v->counter);
- inProgress.signalWaiters(Cache::ValueHandle(std::move(*optVal.v)));
+ ASSERT(res.v);
+ ASSERT_EQ(500, res.v->counter);
+ auto promisesToSet =
+ inProgress.getPromisesLessThanTime(WithLock::withoutLock(), CacheNotCausallyConsistent());
+ ASSERT_EQ(1U, promisesToSet.size());
+ promisesToSet.front()->emplaceValue(std::move(*res.v));
ASSERT(future.isReady());
ASSERT_EQ(500, future.get()->counter);
}
-TEST_F(ReadThroughCacheAsyncTest, FailedInProgressLookup) {
+TEST_F(ReadThroughCacheAsyncTest, FailedInProgressLookupForNotCausallyConsistentCache) {
ThreadPool threadPool{ThreadPool::Options()};
threadPool.startup();
@@ -224,13 +334,15 @@ TEST_F(ReadThroughCacheAsyncTest, FailedInProgressLookup) {
});
Cache::InProgressLookup inProgress(cache, "TestKey");
- auto future = inProgress.addWaiter(WithLock::withoutLock());
+ auto future = inProgress.addWaiter(WithLock::withoutLock(), CacheNotCausallyConsistent());
ASSERT(!future.isReady());
auto asyncLookupResult = inProgress.asyncLookupRound().getNoThrow();
ASSERT_THROWS_CODE(inProgress.asyncLookupRound().get(), DBException, ErrorCodes::InternalError);
ASSERT(inProgress.valid(WithLock::withoutLock()));
- inProgress.signalWaiters(asyncLookupResult.getStatus());
+ auto promisesToSet = inProgress.getAllPromisesOnError(WithLock::withoutLock());
+ ASSERT_EQ(1U, promisesToSet.size());
+ promisesToSet.front()->setFromStatusWith(asyncLookupResult.getStatus());
ASSERT(future.isReady());
ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::InternalError);
@@ -337,31 +449,74 @@ TEST_F(ReadThroughCacheAsyncTest, AcquireWithAShutdownThreadPool) {
Cache cache(getServiceContext(), threadPool, 1, [&](OperationContext*, const std::string&) {
FAIL("Should not be called");
- return Cache::LookupResult(CachedValue(0)); // Will never be reached
+ return Cache::LookupResult(boost::none); // Will never be reached
});
auto future = cache.acquireAsync("TestKey");
ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::ShutdownInProgress);
}
-TEST_F(ReadThroughCacheAsyncTest, InvalidateCalledBeforeLookupTaskExecutes) {
- struct MockThreadPool : public ThreadPoolInterface {
- void startup() override {}
- void shutdown() override {}
- void join() override {}
- void schedule(Task task) override {
- ASSERT(!mostRecentTask);
- mostRecentTask = std::move(task);
- }
- void runMostRecentTask() {
- ASSERT(mostRecentTask);
- auto f = std::move(mostRecentTask);
- f(Status::OK());
- }
+class MockThreadPool : public ThreadPoolInterface {
+public:
+ ~MockThreadPool() {
+ ASSERT(!_mostRecentTask);
+ }
+ void startup() override {}
+ void shutdown() override {}
+ void join() override {}
+ void schedule(Task task) override {
+ ASSERT(!_mostRecentTask);
+ _mostRecentTask = std::move(task);
+ }
+ void runMostRecentTask() {
+ ASSERT(_mostRecentTask);
+ auto f = std::move(_mostRecentTask);
+ f(Status::OK());
+ }
+
+private:
+ Task _mostRecentTask;
+};
+
+TEST_F(ReadThroughCacheAsyncTest, AcquireAsyncAndAdvanceTimeInterleave) {
+ MockThreadPool threadPool;
+ boost::optional<CausallyConsistentCache::LookupResult> nextToReturn;
+ CausallyConsistentCache cache(
+ getServiceContext(), threadPool, 1, [&](OperationContext*, const std::string&) {
+ return std::move(*nextToReturn);
+ });
+
+ auto futureAtTS100 = cache.acquireAsync("TestKey");
+ nextToReturn.emplace(CachedValue(100), Timestamp(100));
+ threadPool.runMostRecentTask();
+ ASSERT_EQ(100, futureAtTS100.get()->counter);
+ ASSERT(futureAtTS100.get().isValid());
+
+ cache.advanceTimeInStore("TestKey", Timestamp(150));
+ auto futureAtTS150 = cache.acquireAsync("TestKey", CacheCausalConsistency::kLatestKnown);
+ ASSERT(!futureAtTS100.get().isValid());
+ ASSERT(!futureAtTS150.isReady());
+
+ cache.advanceTimeInStore("TestKey", Timestamp(250));
+ auto futureAtTS250 = cache.acquireAsync("TestKey", CacheCausalConsistency::kLatestKnown);
+ ASSERT(!futureAtTS100.get().isValid());
+ ASSERT(!futureAtTS150.isReady());
+ ASSERT(!futureAtTS250.isReady());
- Task mostRecentTask;
- } threadPool;
+ nextToReturn.emplace(CachedValue(150), Timestamp(150));
+ threadPool.runMostRecentTask();
+ ASSERT_EQ(150, futureAtTS150.get()->counter);
+ ASSERT(!futureAtTS150.get().isValid());
+ ASSERT(!futureAtTS250.isReady());
+ nextToReturn.emplace(CachedValue(250), Timestamp(250));
+ threadPool.runMostRecentTask();
+ ASSERT_EQ(250, futureAtTS250.get()->counter);
+ ASSERT(futureAtTS250.get().isValid());
+}
+
+TEST_F(ReadThroughCacheAsyncTest, InvalidateCalledBeforeLookupTaskExecutes) {
+ MockThreadPool threadPool;
Cache cache(getServiceContext(), threadPool, 1, [&](OperationContext*, const std::string&) {
return Cache::LookupResult(CachedValue(123));
});