diff options
-rw-r--r-- | src/mongo/util/invalidating_lru_cache.h | 210 | ||||
-rw-r--r-- | src/mongo/util/invalidating_lru_cache_test.cpp | 128 | ||||
-rw-r--r-- | src/mongo/util/read_through_cache.h | 193 | ||||
-rw-r--r-- | src/mongo/util/read_through_cache_test.cpp | 325 |
4 files changed, 690 insertions, 166 deletions
diff --git a/src/mongo/util/invalidating_lru_cache.h b/src/mongo/util/invalidating_lru_cache.h index e1dced30e31..60c2341a295 100644 --- a/src/mongo/util/invalidating_lru_cache.h +++ b/src/mongo/util/invalidating_lru_cache.h @@ -41,11 +41,56 @@ namespace mongo { /** - * Extension over 'LRUCache', which provides thread-safety, introspection and most importantly the - * ability to mark entries as invalid to indicate to potential callers that they should not be used - * anymore. + * Type indicating that the specific cache instance does not support causal consistency. To be used + * as the default 'Time' parameter to the 'InvalidatingLRUCache' template, indicating that the cache + * is not causally consistent. */ -template <typename Key, typename Value> +struct CacheNotCausallyConsistent { + bool operator==(const CacheNotCausallyConsistent&) const { + return true; + } + bool operator!=(const CacheNotCausallyConsistent&) const { + return false; + } + bool operator>(const CacheNotCausallyConsistent&) const { + return false; + } + bool operator>=(const CacheNotCausallyConsistent&) const { + return true; + } + bool operator<(const CacheNotCausallyConsistent&) const { + return false; + } + bool operator<=(const CacheNotCausallyConsistent&) const { + return true; + } +}; + +/** + * Specifies the desired causal consistency for calls to 'get' (and 'acquire', respectively in the + * ReadThroughCache, which is its main consumer). + */ +enum class CacheCausalConsistency { + // Provides the fastest acquire semantics, where if the cache already contains a + // (non-invalidated) value cached, it will be immediately returned. Otherwise, the 'acquire' + // call will block. + kLatestCached, + + // Provides a causally-consistent semantics with respect to a previous call to + // 'advanceTimeInStore', where if the cache's (non-invalidated) value has time == timeInStore, + // the value will be immediately returned. Otherwise, the 'acquire' call will block. + kLatestKnown, +}; + +/** + * Extension built on top of 'LRUCache', which provides thread-safety, introspection and most + * importantly the ability to invalidate each entry and/or associate a logical timestamp in order to + * indicate to potential callers that the entry should not be used anymore. + * + * The type for 'Time' must support 'operator <' and its default constructor 'Time()' must provide + * the lowest possible value for the time. + */ +template <typename Key, typename Value, typename Time = CacheNotCausallyConsistent> class InvalidatingLRUCache { /** * Data structure representing the values stored in the cache. @@ -58,11 +103,18 @@ class InvalidatingLRUCache { StoredValue(InvalidatingLRUCache* owningCache, uint64_t epoch, boost::optional<Key>&& key, - Value&& value) + Value&& value, + const Time& time, + const Time& timeInStore) : owningCache(owningCache), epoch(epoch), key(std::move(key)), - value(std::move(value)) {} + value(std::move(value)), + time(time), + timeInStore(timeInStore), + isValid(time == timeInStore) { + invariant(time <= timeInStore); + } ~StoredValue() { if (!owningCache) @@ -111,10 +163,19 @@ class InvalidatingLRUCache { const boost::optional<Key> key; Value value; - // Initially set to true to indicate that the entry is valid and can be read without - // synchronisation. Transitions to false only once, under `_mutex` in order to mark the - // entry as invalid. - AtomicWord<bool> isValid{true}; + // Timestamp associated with the current 'value'. The semantics of the time is entirely up + // to the user of the cache, but it must be monotonically increasing for the same key. + const Time time; + + // Timestamp which the store has indicated as available for 'key' (through a call to + // 'advanceTimeInStore'). Starts as equal to 'time' and always moves forward, under + // '_mutex'. + Time timeInStore; + + // Can be read without synchronisation. Transitions to false only once, under `_mutex` in + // order to mark the entry as invalid either as a result of 'invalidate' or + // 'advanceTimeInStore'. + AtomicWord<bool> isValid; }; using Cache = LRUCache<Key, std::shared_ptr<StoredValue>>; @@ -139,7 +200,12 @@ public: // support pinning items. Their only usage must be in the authorization mananager for the // internal authentication user. explicit ValueHandle(Value&& value) - : _value(std::make_shared<StoredValue>(nullptr, 0, boost::none, std::move(value))) {} + : _value(std::make_shared<StoredValue>(nullptr, + 0, + boost::none, + std::move(value), + CacheNotCausallyConsistent(), + CacheNotCausallyConsistent())) {} ValueHandle() = default; @@ -188,13 +254,25 @@ public: /** * Inserts or updates a key with a new value. If 'key' was checked-out at the time this method * was called, it will become invalidated. + * + * The 'time' parameter is mandatory for causally-consistent caches, but not needed otherwise + * (since the time never changes). Using a default of '= CacheNotCausallyConsistent()' allows + * non-causally-consistent users to not have to pass a second parameter, but would fail + * compilation if causally-consistent users forget to pass it. */ - void insertOrAssign(const Key& key, Value&& value) { + void insertOrAssign(const Key& key, + Value&& value, + const Time& time = CacheNotCausallyConsistent()) { LockGuardWithPostUnlockDestructor guard(_mutex); - _invalidate(&guard, key, _cache.find(key)); - if (auto evicted = _cache.add( - key, - std::make_shared<StoredValue>(this, ++_epoch, key, std::forward<Value>(value)))) { + Time timeInStore; + _invalidate(&guard, key, _cache.find(key), &timeInStore); + if (auto evicted = _cache.add(key, + std::make_shared<StoredValue>(this, + ++_epoch, + key, + std::forward<Value>(value), + time, + std::max(time, timeInStore)))) { const auto& evictedKey = evicted->first; auto& evictedValue = evicted->second; @@ -222,13 +300,25 @@ public: * For caches of size zero, this method will not cache the passed-in value, but it will be * returned and the `get` method will continue returning it until all returned handles are * destroyed. + * + * The 'time' parameter is mandatory for causally-consistent caches, but not needed otherwise + * (since the time never changes). Using a default of '= CacheNotCausallyConsistent()' allows + * non-causally-consistent users to not have to pass a second parameter, but would fail + * compilation if causally-consistent users forget to pass it. */ - ValueHandle insertOrAssignAndGet(const Key& key, Value&& value) { + ValueHandle insertOrAssignAndGet(const Key& key, + Value&& value, + const Time& time = CacheNotCausallyConsistent()) { LockGuardWithPostUnlockDestructor guard(_mutex); - _invalidate(&guard, key, _cache.find(key)); - if (auto evicted = _cache.add( - key, - std::make_shared<StoredValue>(this, ++_epoch, key, std::forward<Value>(value)))) { + Time timeInStore; + _invalidate(&guard, key, _cache.find(key), &timeInStore); + if (auto evicted = _cache.add(key, + std::make_shared<StoredValue>(this, + ++_epoch, + key, + std::forward<Value>(value), + time, + std::max(time, timeInStore)))) { const auto& evictedKey = evicted->first; auto& evictedValue = evicted->second; @@ -266,19 +356,78 @@ public: * it could still get evicted if the cache is under pressure. The returned handle must be * destroyed before the owning cache object itself is destroyed. */ - ValueHandle get(const Key& key) { + ValueHandle get( + const Key& key, + CacheCausalConsistency causalConsistency = CacheCausalConsistency::kLatestCached) { stdx::lock_guard<Latch> lg(_mutex); - if (auto it = _cache.find(key); it != _cache.end()) - return ValueHandle(it->second); + std::shared_ptr<StoredValue> storedValue; + if (auto it = _cache.find(key); it != _cache.end()) { + storedValue = it->second; + } else if (auto it = _evictedCheckedOutValues.find(key); + it != _evictedCheckedOutValues.end()) { + storedValue = it->second.lock(); + } - if (auto it = _evictedCheckedOutValues.find(key); it != _evictedCheckedOutValues.end()) - return ValueHandle(it->second.lock()); + if (causalConsistency == CacheCausalConsistency::kLatestKnown && storedValue && + storedValue->time < storedValue->timeInStore) + return ValueHandle(nullptr); + return ValueHandle(std::move(storedValue)); + } - return ValueHandle(nullptr); + /** + * Indicates to the cache that the backing store contains a new value for the specified key, + * with a timestamp of 'newTimeInStore'. + * + * Any already returned ValueHandles will start returning isValid = false. Subsequent calls to + * 'get' with a causal consistency of 'kLatestCached' will continue to return the value, which + * is currently cached, but with isValid = false. Calls to 'get' with a causal consistency of + * 'kLatestKnown' will return no value. It is up to the caller to this function to subsequently + * either 'insertOrAssign' a new value for the 'key', or to call 'invalidate'. + */ + void advanceTimeInStore(const Key& key, const Time& newTimeInStore) { + stdx::lock_guard<Latch> lg(_mutex); + std::shared_ptr<StoredValue> storedValue; + if (auto it = _cache.find(key); it != _cache.end()) { + storedValue = it->second; + } else if (auto it = _evictedCheckedOutValues.find(key); + it != _evictedCheckedOutValues.end()) { + storedValue = it->second.lock(); + } + + if (!storedValue) + return; + + if (newTimeInStore > storedValue->timeInStore) { + storedValue->timeInStore = newTimeInStore; + storedValue->isValid.store(false); + } + } + + /** + * If 'key' is in the store, returns its latest 'timeInStore', which can either be from the time + * of insertion or from the latest call to 'advanceTimeInStore'. Otherwise, returns Time(). + */ + Time getTimeInStore(const Key& key) { + stdx::lock_guard<Latch> lg(_mutex); + std::shared_ptr<StoredValue> storedValue; + if (auto it = _cache.find(key); it != _cache.end()) { + storedValue = it->second; + } else if (auto it = _evictedCheckedOutValues.find(key); + it != _evictedCheckedOutValues.end()) { + storedValue = it->second.lock(); + } + + if (storedValue) + return storedValue->timeInStore; + + return Time(); } /** * Marks 'key' as invalid if it is found in the cache (whether checked-out or not). + * + * Any already returned ValueHandles will start returning isValid = false. Subsequent calls to + * 'get' will *not* return value for 'key' until the next call to 'insertOrAssign'. */ void invalidate(const Key& key) { LockGuardWithPostUnlockDestructor guard(_mutex); @@ -373,10 +522,13 @@ private: */ void _invalidate(LockGuardWithPostUnlockDestructor* guard, const Key& key, - typename Cache::iterator it) { + typename Cache::iterator it, + Time* outTimeInStore = nullptr) { if (it != _cache.end()) { auto& storedValue = it->second; storedValue->isValid.store(false); + if (outTimeInStore) + *outTimeInStore = storedValue->timeInStore; guard->releasePtr(std::move(storedValue)); _cache.erase(it); return; @@ -390,6 +542,8 @@ private: // released and drops to zero if (auto evictedValue = itEvicted->second.lock()) { evictedValue->isValid.store(false); + if (outTimeInStore) + *outTimeInStore = evictedValue->timeInStore; guard->releasePtr(std::move(evictedValue)); } diff --git a/src/mongo/util/invalidating_lru_cache_test.cpp b/src/mongo/util/invalidating_lru_cache_test.cpp index 939892f0c1e..5d6f53470db 100644 --- a/src/mongo/util/invalidating_lru_cache_test.cpp +++ b/src/mongo/util/invalidating_lru_cache_test.cpp @@ -54,6 +54,9 @@ struct TestValue { using TestValueCache = InvalidatingLRUCache<int, TestValue>; using TestValueHandle = TestValueCache::ValueHandle; +using TestValueCacheCausallyConsistent = InvalidatingLRUCache<int, TestValue, Timestamp>; +using TestValueHandleCausallyConsistent = TestValueCacheCausallyConsistent::ValueHandle; + TEST(InvalidatingLRUCacheTest, StandaloneValueHandle) { TestValueHandle standaloneHandle({"Standalone value"}); ASSERT(standaloneHandle.isValid()); @@ -76,6 +79,29 @@ TEST(InvalidatingLRUCacheTest, ValueHandleOperators) { } } +TEST(InvalidatingLRUCacheTest, CausalConsistency) { + TestValueCacheCausallyConsistent cache(1); + + cache.insertOrAssign(2, TestValue("Value @ TS 100"), Timestamp(100)); + ASSERT_EQ("Value @ TS 100", cache.get(2, CacheCausalConsistency::kLatestCached)->value); + ASSERT_EQ("Value @ TS 100", cache.get(2, CacheCausalConsistency::kLatestKnown)->value); + + auto value = cache.get(2, CacheCausalConsistency::kLatestCached); + cache.advanceTimeInStore(2, Timestamp(200)); + ASSERT_EQ("Value @ TS 100", value->value); + ASSERT(!value.isValid()); + ASSERT_EQ("Value @ TS 100", cache.get(2, CacheCausalConsistency::kLatestCached)->value); + ASSERT(!cache.get(2, CacheCausalConsistency::kLatestCached).isValid()); + ASSERT(!cache.get(2, CacheCausalConsistency::kLatestKnown)); + + // Intentionally push value for key with a timestamp higher than the one passed to advanceTime + cache.insertOrAssign(2, TestValue("Value @ TS 300"), Timestamp(300)); + ASSERT_EQ("Value @ TS 100", value->value); + ASSERT(!value.isValid()); + ASSERT_EQ("Value @ TS 300", cache.get(2, CacheCausalConsistency::kLatestCached)->value); + ASSERT_EQ("Value @ TS 300", cache.get(2, CacheCausalConsistency::kLatestKnown)->value); +} + TEST(InvalidatingLRUCacheTest, InvalidateNonCheckedOutValue) { TestValueCache cache(3); @@ -233,6 +259,63 @@ TEST(InvalidatingLRUCacheTest, CheckedOutItemsAreInvalidatedWithPredicateWhenEvi } } +TEST(InvalidatingLRUCacheTest, CausalConsistencyPreservedForEvictedCheckedOutKeys) { + TestValueCacheCausallyConsistent cache(1); + + auto key1ValueAtTS10 = + cache.insertOrAssignAndGet(1, TestValue("Key 1 - Value @ TS 10"), Timestamp(10)); + + // This will evict key 1, but we have a handle to it, so it will stay accessible on the evicted + // list + cache.insertOrAssign(2, TestValue("Key 2 - Value @ TS 20"), Timestamp(20)); + + ASSERT_EQ(Timestamp(10), cache.getTimeInStore(1)); + ASSERT_EQ("Key 1 - Value @ TS 10", key1ValueAtTS10->value); + ASSERT_EQ("Key 1 - Value @ TS 10", cache.get(1, CacheCausalConsistency::kLatestCached)->value); + ASSERT_EQ("Key 1 - Value @ TS 10", cache.get(1, CacheCausalConsistency::kLatestKnown)->value); + + cache.advanceTimeInStore(1, Timestamp(11)); + ASSERT_EQ(Timestamp(11), cache.getTimeInStore(1)); + ASSERT(!key1ValueAtTS10.isValid()); + ASSERT_EQ("Key 1 - Value @ TS 10", key1ValueAtTS10->value); + ASSERT_EQ("Key 1 - Value @ TS 10", cache.get(1, CacheCausalConsistency::kLatestCached)->value); + ASSERT(!cache.get(1, CacheCausalConsistency::kLatestKnown)); + + cache.insertOrAssign(1, TestValue("Key 1 - Value @ TS 12"), Timestamp(12)); + ASSERT_EQ("Key 1 - Value @ TS 12", cache.get(1, CacheCausalConsistency::kLatestCached)->value); + ASSERT_EQ("Key 1 - Value @ TS 12", cache.get(1, CacheCausalConsistency::kLatestKnown)->value); +} + +TEST(InvalidatingLRUCacheTest, InvalidateAfterAdvanceTime) { + TestValueCacheCausallyConsistent cache(1); + + cache.insertOrAssign(20, TestValue("Value @ TS 200"), Timestamp(200)); + cache.advanceTimeInStore(20, Timestamp(250)); + ASSERT_EQ("Value @ TS 200", cache.get(20, CacheCausalConsistency::kLatestCached)->value); + ASSERT(!cache.get(20, CacheCausalConsistency::kLatestKnown)); + + cache.invalidate(20); + ASSERT(!cache.get(20, CacheCausalConsistency::kLatestCached)); + ASSERT(!cache.get(20, CacheCausalConsistency::kLatestKnown)); +} + +TEST(InvalidatingLRUCacheTest, InsertEntryAtTimeLessThanAdvanceTime) { + TestValueCacheCausallyConsistent cache(1); + + cache.insertOrAssign(20, TestValue("Value @ TS 200"), Timestamp(200)); + cache.advanceTimeInStore(20, Timestamp(300)); + ASSERT_EQ("Value @ TS 200", cache.get(20, CacheCausalConsistency::kLatestCached)->value); + ASSERT(!cache.get(20, CacheCausalConsistency::kLatestKnown)); + + cache.insertOrAssign(20, TestValue("Value @ TS 250"), Timestamp(250)); + ASSERT_EQ("Value @ TS 250", cache.get(20, CacheCausalConsistency::kLatestCached)->value); + ASSERT(!cache.get(20, CacheCausalConsistency::kLatestKnown)); + + cache.insertOrAssign(20, TestValue("Value @ TS 300"), Timestamp(300)); + ASSERT_EQ("Value @ TS 300", cache.get(20, CacheCausalConsistency::kLatestCached)->value); + ASSERT_EQ("Value @ TS 300", cache.get(20, CacheCausalConsistency::kLatestKnown)->value); +} + TEST(InvalidatingLRUCacheTest, OrderOfDestructionOfHandlesDiffersFromOrderOfInsertion) { TestValueCache cache(1); @@ -339,12 +422,33 @@ TEST(InvalidatingLRUCacheTest, CacheSizeZeroInvalidateAllEntries) { } } -template <typename TestFunc> +TEST(InvalidatingLRUCacheTest, CacheSizeZeroCausalConsistency) { + TestValueCacheCausallyConsistent cache(0); + + cache.advanceTimeInStore(100, Timestamp(30)); + cache.insertOrAssign(100, TestValue("Value @ TS 30"), Timestamp(30)); + ASSERT_EQ(Timestamp(), cache.getTimeInStore(100)); + + auto valueAtTS30 = cache.insertOrAssignAndGet(100, TestValue("Value @ TS 30"), Timestamp(30)); + ASSERT_EQ("Value @ TS 30", cache.get(100, CacheCausalConsistency::kLatestCached)->value); + ASSERT_EQ("Value @ TS 30", cache.get(100, CacheCausalConsistency::kLatestKnown)->value); + + cache.advanceTimeInStore(100, Timestamp(35)); + ASSERT_EQ(Timestamp(35), cache.getTimeInStore(100)); + ASSERT_EQ("Value @ TS 30", cache.get(100, CacheCausalConsistency::kLatestCached)->value); + ASSERT(!cache.get(100, CacheCausalConsistency::kLatestKnown)); + + auto valueAtTS40 = cache.insertOrAssignAndGet(100, TestValue("Value @ TS 40"), Timestamp(40)); + ASSERT_EQ("Value @ TS 40", cache.get(100, CacheCausalConsistency::kLatestCached)->value); + ASSERT_EQ("Value @ TS 40", cache.get(100, CacheCausalConsistency::kLatestKnown)->value); +} + +template <class TCache, typename TestFunc> void parallelTest(size_t cacheSize, TestFunc doTest) { constexpr auto kNumIterations = 100'000; constexpr auto kNumThreads = 4; - TestValueCache cache(cacheSize); + TCache cache(cacheSize); std::vector<stdx::thread> threads; for (int i = 0; i < kNumThreads; i++) { @@ -361,7 +465,7 @@ void parallelTest(size_t cacheSize, TestFunc doTest) { } TEST(InvalidatingLRUCacheParallelTest, InsertOrAssignThenGet) { - parallelTest(1, [](TestValueCache& cache) mutable { + parallelTest<TestValueCache>(1, [](auto& cache) mutable { const int key = 100; cache.insertOrAssign(key, TestValue{"Parallel tester value"}); @@ -378,7 +482,7 @@ TEST(InvalidatingLRUCacheParallelTest, InsertOrAssignThenGet) { } TEST(InvalidatingLRUCacheParallelTest, InsertOrAssignAndGet) { - parallelTest(1, [](auto& cache) { + parallelTest<TestValueCache>(1, [](auto& cache) { const int key = 200; auto cachedItem = cache.insertOrAssignAndGet(key, TestValue{"Parallel tester value"}); ASSERT(cachedItem); @@ -389,7 +493,7 @@ TEST(InvalidatingLRUCacheParallelTest, InsertOrAssignAndGet) { } TEST(InvalidatingLRUCacheParallelTest, CacheSizeZeroInsertOrAssignAndGet) { - parallelTest(0, [](TestValueCache& cache) mutable { + parallelTest<TestValueCache>(0, [](auto& cache) mutable { const int key = 300; auto cachedItem = cache.insertOrAssignAndGet(key, TestValue{"Parallel tester value"}); ASSERT(cachedItem); @@ -398,5 +502,19 @@ TEST(InvalidatingLRUCacheParallelTest, CacheSizeZeroInsertOrAssignAndGet) { }); } +TEST(InvalidatingLRUCacheParallelTest, AdvanceTime) { + AtomicWord<uint64_t> counter{0}; + + parallelTest<TestValueCacheCausallyConsistent>(0, [&counter](auto& cache) mutable { + const int key = 300; + cache.insertOrAssign( + key, TestValue{"Parallel tester value"}, Timestamp(counter.fetchAndAdd(1))); + auto latestCached = cache.get(key, CacheCausalConsistency::kLatestCached); + auto latestKnown = cache.get(key, CacheCausalConsistency::kLatestKnown); + + cache.advanceTimeInStore(key, Timestamp(counter.fetchAndAdd(1))); + }); +} + } // namespace } // namespace mongo diff --git a/src/mongo/util/read_through_cache.h b/src/mongo/util/read_through_cache.h index 29365a6b72c..5d0b356be86 100644 --- a/src/mongo/util/read_through_cache.h +++ b/src/mongo/util/read_through_cache.h @@ -98,9 +98,13 @@ protected: }; /** - * Implements a generic read-through cache built on top of InvalidatingLRUCache. + * Implements an (optionally) causally consistent read-through cache from Key to Value, built on top + * of InvalidatingLRUCache. + * + * Causal consistency is provided by requiring the backing store to asociate every Value it returns + * with a logical timestamp of type Time. */ -template <typename Key, typename Value> +template <typename Key, typename Value, typename Time = CacheNotCausallyConsistent> class ReadThroughCache : public ReadThroughCacheBase { /** * Data structure wrapping and expanding on the values stored in the cache. @@ -113,7 +117,7 @@ class ReadThroughCache : public ReadThroughCacheBase { // relied on to perform any recency comparisons for example). Date_t updateWallClockTime; }; - using Cache = InvalidatingLRUCache<Key, StoredValue>; + using Cache = InvalidatingLRUCache<Key, StoredValue, Time>; public: /** @@ -180,14 +184,27 @@ public: * * The implementation must throw a uassertion to indicate an error while looking up the value, * return boost::none if the key is not found, or return an actual value. + * + * See the comments on 'advanceTimeInStore' for additional requirements that this function must + * fulfill with respect to causal consistency. */ struct LookupResult { - explicit LookupResult(boost::optional<Value>&& v) : v(std::move(v)) {} + // The 't' parameter is mandatory for causally-consistent caches, but not needed otherwise + // (since the time never changes). Using a default of '= CacheNotCausallyConsistent()' + // allows non-causally-consistent users to not have to pass a second parameter, but would + // fail compilation if causally-consistent users forget to pass it. + explicit LookupResult(boost::optional<Value>&& v, Time t = CacheNotCausallyConsistent()) + : v(std::move(v)), t(std::move(t)) {} LookupResult(LookupResult&&) = default; LookupResult& operator=(LookupResult&&) = default; // If boost::none, it means the '_lookupFn' did not find the key in the store boost::optional<Value> v; + + // If value is boost::none, specifies the time which was passed to '_lookupFn', effectively + // meaning, at least as of 'time', there was no entry in the store for the key. Otherwise + // contains the time that the store returned for the 'value'. + Time t; }; using LookupFn = unique_function<LookupResult(OperationContext*, const Key&)>; @@ -196,10 +213,11 @@ public: class InProgressLookup; /** - * If 'key' is found in the cache, returns a set ValueHandle (its operator bool will be true). - * Otherwise, either causes the blocking 'lookup' below to be asynchronously invoked to fetch - * 'key' from the backing store (or joins an already scheduled invocation) and returns a future - * which will be signaled when the lookup completes. + * If 'key' is found in the cache and it fulfills the requested 'causalConsistency', returns a + * set ValueHandle (its operator bool will be true). Otherwise, either causes the blocking + * 'LookupFn' to be asynchronously invoked to fetch 'key' from the backing store or joins an + * already scheduled invocation) and returns a future which will be signaled when the lookup + * completes. * * If the lookup is successful and 'key' is found in the store, it will be cached (so subsequent * lookups won't have to re-fetch it) and the future will be set. If 'key' is not found in the @@ -211,27 +229,31 @@ public: * The returned value may be invalid by the time the caller gets to access it, if 'invalidate' * is called for 'key'. */ - SharedSemiFuture<ValueHandle> acquireAsync(const Key& key) { + SharedSemiFuture<ValueHandle> acquireAsync( + const Key& key, + CacheCausalConsistency causalConsistency = CacheCausalConsistency::kLatestCached) { // Fast path - if (auto cachedValue = _cache.get(key)) + if (auto cachedValue = _cache.get(key, causalConsistency)) return {std::move(cachedValue)}; stdx::unique_lock ul(_mutex); // Re-check the cache under a mutex, before kicking-off the asynchronous lookup - if (auto cachedValue = _cache.get(key)) + if (auto cachedValue = _cache.get(key, causalConsistency)) return {std::move(cachedValue)}; + Time minTime = _cache.getTimeInStore(key); + // Join an in-progress lookup if one has already been scheduled if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end()) - return it->second->addWaiter(ul); + return it->second->addWaiter(ul, minTime); // Schedule an asynchronous lookup for the key auto [it, emplaced] = _inProgressLookups.emplace(key, std::make_unique<InProgressLookup>(*this, key)); invariant(emplaced); auto& inProgressLookup = *it->second; - auto sharedFutureToReturn = inProgressLookup.addWaiter(ul); + auto sharedFutureToReturn = inProgressLookup.addWaiter(ul, minTime); ul.unlock(); @@ -246,8 +268,11 @@ public: * NOTES: * This is a potentially blocking method. */ - ValueHandle acquire(OperationContext* opCtx, const Key& key) { - return acquireAsync(key).get(opCtx); + ValueHandle acquire( + OperationContext* opCtx, + const Key& key, + CacheCausalConsistency causalConsistency = CacheCausalConsistency::kLatestCached) { + return acquireAsync(key, causalConsistency).get(opCtx); } /** @@ -261,6 +286,20 @@ public: } /** + * Indicates to the cache that the backing store has a newer version of 'key', corresponding to + * 'newTime'. Subsequent calls to 'acquireAsync' with a causal consistency set to 'LatestKnown' + * will block and perform refresh until the cached value reaches 'newTime'. + * + * With respect to causal consistency, the 'LookupFn' used for this cache must provide the + * guarantee that if 'advanceTimeInStore' is called with a 'newTime', a subsequent call to + * 'LookupFn' for 'key' must return at least 'newTime' or later. + */ + void advanceTimeInStore(const Key& key, const Time& newTime) { + stdx::lock_guard lg(_mutex); + _cache.advanceTimeInStore(key, newTime); + } + + /** * The invalidate methods below guarantee the following: * - All affected keys already in the cache (or returned to callers) will be invalidated and * removed from the cache @@ -341,31 +380,62 @@ private: stdx::unique_lock ul(_mutex); auto it = _inProgressLookups.find(key); invariant(it != _inProgressLookups.end()); - if (!ErrorCodes::isCancelationError(sw.getStatus()) && !it->second->valid(ul)) { - ul.unlock(); // asyncLookupRound also acquires the mutex - return it->second->asyncLookupRound().onCompletion( - [this, key](auto sw) { return _doLookupWhileNotValid(key, std::move(sw)); }); - } - - // The detachment of the currently active lookup and the placement of the result on the - // '_cache' has to be atomic with respect to a concurrent call to 'invalidate' - auto inProgressLookup(std::move(it->second)); - _inProgressLookups.erase(it); - - StatusWith<ValueHandle> swValueHandle(ErrorCodes::Error(461541), ""); - if (sw.isOK()) { - auto result = std::move(sw.getValue()); - swValueHandle = result.v - ? ValueHandle(_cache.insertOrAssignAndGet(key, {std::move(*result.v), _now()})) - : ValueHandle(); - } else { - swValueHandle = sw.getStatus(); - } + auto& inProgressLookup = *it->second; + auto [promisesToSet, result, mustDoAnotherLoop] = [&] { + // The thread pool is shutting down, so terminate the loop + if (ErrorCodes::isCancelationError(sw.getStatus())) + return std::make_tuple(inProgressLookup.getAllPromisesOnError(ul), + StatusWith<ValueHandle>(sw.getStatus()), + false); + + // There was a concurrent call to 'invalidate', so start all over + if (!inProgressLookup.valid(ul)) + return std::make_tuple( + std::vector<std::unique_ptr<SharedPromise<ValueHandle>>>{}, + StatusWith<ValueHandle>(Status(ErrorCodes::Error(461541), "")), + true); + + // Lookup resulted in an error, which is not cancelation + if (!sw.isOK()) + return std::make_tuple(inProgressLookup.getAllPromisesOnError(ul), + StatusWith<ValueHandle>(sw.getStatus()), + false); + + // Value (or boost::none) was returned by lookup and there was no concurrent call to + // 'invalidate'. Place the value on the cache and return the necessary promises to + // signal (those which are waiting for time < time at the store). + auto& result = sw.getValue(); + auto promisesToSet = inProgressLookup.getPromisesLessThanTime(ul, result.t); + return std::make_tuple( + std::move(promisesToSet), + StatusWith<ValueHandle>(result.v + ? ValueHandle(_cache.insertOrAssignAndGet( + key, {std::move(*result.v), _now()}, result.t)) + : ValueHandle()), + !inProgressLookup.empty(ul)); + }(); + + if (!mustDoAnotherLoop) + _inProgressLookups.erase(it); ul.unlock(); - inProgressLookup->signalWaiters(std::move(swValueHandle)); + // The only reason this loop pops the values as it goes and std::moves into the last value + // is to support the CacheSizeZero unit-test, which requires that once the future it waits + // on is set, it contains the last reference on the returned ValueHandle + while (!promisesToSet.empty()) { + auto p(std::move(promisesToSet.back())); + promisesToSet.pop_back(); + + if (promisesToSet.empty()) + p->setFromStatusWith(std::move(result)); + else + p->setFromStatusWith(result); + } - return Future<LookupResult>::makeReady(Status(ErrorCodes::Error(461542), "")); + return mustDoAnotherLoop + ? inProgressLookup.asyncLookupRound().onCompletion( + [this, key](auto sw) { return _doLookupWhileNotValid(key, std::move(sw)); }) + : Future<LookupResult>::makeReady(Status(ErrorCodes::Error(461542), "")); } // Blocking function which will be invoked to retrieve entries from the backing store @@ -381,7 +451,8 @@ private: // Keeps track of all the keys, which were attempted to be 'acquireAsync'-ed, weren't found in // the cache and are currently in the process of being looked up from the backing store. A - // single key may only be on this map or in '_cache', but never in both. + // single key may be missing from '_cache', or contain an old 'kLatestCached' and have an active + // lookup on this map for 'kLatestKnown'. // // This map is protected by '_mutex'. InProgressLookupsMap _inProgressLookups; @@ -411,8 +482,8 @@ private: * inProgress.signalWaiters(result); * } */ -template <typename Key, typename Value> -class ReadThroughCache<Key, Value>::InProgressLookup { +template <typename Key, typename Value, typename Time> +class ReadThroughCache<Key, Value, Time>::InProgressLookup { public: InProgressLookup(ReadThroughCache& cache, Key key) : _cache(cache), _key(std::move(key)) {} @@ -432,25 +503,50 @@ public: return std::move(future); } - SharedSemiFuture<ValueHandle> addWaiter(WithLock) { - return _sharedPromise.getFuture(); + SharedSemiFuture<ValueHandle> addWaiter(WithLock, Time time) { + auto [it, unusedEmplaced] = + _outstanding.try_emplace(time, std::make_unique<SharedPromise<ValueHandle>>()); + return it->second->getFuture(); } bool valid(WithLock) const { return _valid; } + std::vector<std::unique_ptr<SharedPromise<ValueHandle>>> getAllPromisesOnError(WithLock) { + invariant(_valid); + std::vector<std::unique_ptr<SharedPromise<ValueHandle>>> ret; + for (auto it = _outstanding.begin(); it != _outstanding.end();) { + ret.emplace_back(std::move(it->second)); + it = _outstanding.erase(it); + } + return ret; + } + + std::vector<std::unique_ptr<SharedPromise<ValueHandle>>> getPromisesLessThanTime(WithLock, + Time time) { + invariant(_valid); + std::vector<std::unique_ptr<SharedPromise<ValueHandle>>> ret; + for (auto it = _outstanding.begin(); it != _outstanding.end();) { + if (it->first > time) + break; + ret.emplace_back(std::move(it->second)); + it = _outstanding.erase(it); + } + return ret; + } + + bool empty(WithLock) const { + invariant(_valid); + return _outstanding.empty(); + } + void invalidateAndCancelCurrentLookupRound(WithLock) { _valid = false; if (_cancelToken) _cancelToken->tryCancel(); } - void signalWaiters(StatusWith<ValueHandle> swValueHandle) { - invariant(_valid); - _sharedPromise.setFromStatusWith(std::move(swValueHandle)); - } - private: // The owning cache, from which mutex, lookupFn, async task scheduling, etc. will be used. It is // the responsibility of the owning cache to join all outstanding lookups at destruction time. @@ -461,7 +557,8 @@ private: bool _valid{false}; boost::optional<CancelToken> _cancelToken; - SharedPromise<ValueHandle> _sharedPromise; + using TimeAndPromiseMap = std::map<Time, std::unique_ptr<SharedPromise<ValueHandle>>>; + TimeAndPromiseMap _outstanding; }; } // namespace mongo diff --git a/src/mongo/util/read_through_cache_test.cpp b/src/mongo/util/read_through_cache_test.cpp index d196290bef8..8f479f4aea8 100644 --- a/src/mongo/util/read_through_cache_test.cpp +++ b/src/mongo/util/read_through_cache_test.cpp @@ -55,7 +55,39 @@ struct CachedValue { class Cache : public ReadThroughCache<std::string, CachedValue> { public: Cache(ServiceContext* service, ThreadPoolInterface& threadPool, size_t size, LookupFn lookupFn) - : ReadThroughCache(_mutex, service, threadPool, std::move(lookupFn), size) {} + : ReadThroughCache(_mutex, + service, + threadPool, + [this, lookupFn = std::move(lookupFn)](OperationContext* opCtx, + const std::string& key) { + ++countLookups; + return lookupFn(opCtx, key); + }, + size) {} + + int countLookups{0}; + +private: + Mutex _mutex = MONGO_MAKE_LATCH("ReadThroughCacheTest::Cache"); +}; + +class CausallyConsistentCache : public ReadThroughCache<std::string, CachedValue, Timestamp> { +public: + CausallyConsistentCache(ServiceContext* service, + ThreadPoolInterface& threadPool, + size_t size, + LookupFn lookupFn) + : ReadThroughCache(_mutex, + service, + threadPool, + [this, lookupFn = std::move(lookupFn)](OperationContext* opCtx, + const std::string& key) { + ++countLookups; + return lookupFn(opCtx, key); + }, + size) {} + + int countLookups{0}; private: Mutex _mutex = MONGO_MAKE_LATCH("ReadThroughCacheTest::Cache"); @@ -67,12 +99,14 @@ private: */ class ReadThroughCacheTest : public ServiceContextTest { protected: - // Extends Cache and automatically provides it with a thread pool, which will be shutdown and - // joined before the Cache is destroyed (which is part of the contract of ReadThroughCache) - class CacheWithThreadPool : public Cache { + // Extends any of Cache/CausallyConsistentCache and automatically provides it with a thread + // pool, which will be shutdown and joined before the Cache is destroyed (which is part of the + // contract of ReadThroughCache) + template <class T> + class CacheWithThreadPool : public T { public: - CacheWithThreadPool(ServiceContext* service, size_t size, LookupFn lookupFn) - : Cache(service, _threadPool, size, std::move(lookupFn)) { + CacheWithThreadPool(ServiceContext* service, size_t size, typename T::LookupFn lookupFn) + : T(service, _threadPool, size, std::move(lookupFn)) { _threadPool.startup(); } @@ -97,89 +131,162 @@ TEST(ReadThroughCacheTest, StandaloneValueHandle) { } TEST_F(ReadThroughCacheTest, FetchInvalidateAndRefetch) { - int countLookups = 0; - CacheWithThreadPool cache( - getServiceContext(), 1, [&](OperationContext*, const std::string& key) { - ASSERT_EQ("TestKey", key); - countLookups++; - return Cache::LookupResult(CachedValue(100 * countLookups)); - }); + auto fnTest = [&](auto cache) { + for (int i = 1; i <= 3; i++) { + auto value = cache.acquire(_opCtx, "TestKey"); + ASSERT(value); + ASSERT_EQ(100 * i, value->counter); + ASSERT_EQ(i, cache.countLookups); - for (int i = 1; i <= 3; i++) { - auto value = cache.acquire(_opCtx, "TestKey"); - ASSERT(value); - ASSERT_EQ(100 * i, value->counter); - ASSERT_EQ(i, countLookups); + ASSERT(cache.acquire(_opCtx, "TestKey")); + ASSERT_EQ(i, cache.countLookups); - ASSERT(cache.acquire(_opCtx, "TestKey")); - ASSERT_EQ(i, countLookups); + cache.invalidate("TestKey"); + } + }; - cache.invalidate("TestKey"); - } + fnTest(CacheWithThreadPool<Cache>( + getServiceContext(), + 1, + [&, nextValue = 0](OperationContext*, const std::string& key) mutable { + ASSERT_EQ("TestKey", key); + return Cache::LookupResult(CachedValue(100 * ++nextValue)); + })); + + fnTest(CacheWithThreadPool<CausallyConsistentCache>( + getServiceContext(), + 1, + [&, nextValue = 0](OperationContext*, const std::string& key) mutable { + ASSERT_EQ("TestKey", key); + ++nextValue; + return CausallyConsistentCache::LookupResult(CachedValue(100 * nextValue), + Timestamp(nextValue)); + })); } TEST_F(ReadThroughCacheTest, FailedLookup) { - CacheWithThreadPool cache( + auto fnTest = [&](auto cache) { + ASSERT_THROWS_CODE( + cache.acquire(_opCtx, "TestKey"), DBException, ErrorCodes::InternalError); + }; + + fnTest(CacheWithThreadPool<Cache>( getServiceContext(), 1, [&](OperationContext*, const std::string& key) -> Cache::LookupResult { uasserted(ErrorCodes::InternalError, "Test error"); - }); + })); - ASSERT_THROWS_CODE(cache.acquire(_opCtx, "TestKey"), DBException, ErrorCodes::InternalError); + fnTest(CacheWithThreadPool<CausallyConsistentCache>( + getServiceContext(), + 1, + [&](OperationContext*, const std::string& key) -> CausallyConsistentCache::LookupResult { + uasserted(ErrorCodes::InternalError, "Test error"); + })); } TEST_F(ReadThroughCacheTest, CacheSizeZero) { - int countLookups = 0; - CacheWithThreadPool cache( - getServiceContext(), 0, [&](OperationContext*, const std::string& key) { + auto fnTest = [&](auto cache) { + for (int i = 1; i <= 3; i++) { + auto value = cache.acquire(_opCtx, "TestKey", CacheCausalConsistency::kLatestKnown); + ASSERT(value); + ASSERT_EQ(100 * i, value->counter); + ASSERT_EQ(i, cache.countLookups); + } + }; + + fnTest(CacheWithThreadPool<Cache>( + getServiceContext(), + 0, + [&, nextValue = 0](OperationContext*, const std::string& key) mutable { ASSERT_EQ("TestKey", key); - countLookups++; - return Cache::LookupResult(CachedValue(100 * countLookups)); - }); + return Cache::LookupResult(CachedValue(100 * ++nextValue)); + })); - for (int i = 1; i <= 3; i++) { - auto value = cache.acquire(_opCtx, "TestKey"); - ASSERT(value); - ASSERT_EQ(100 * i, value->counter); - ASSERT_EQ(i, countLookups); - } + fnTest(CacheWithThreadPool<CausallyConsistentCache>( + getServiceContext(), + 0, + [&, nextValue = 0](OperationContext*, const std::string& key) mutable { + ASSERT_EQ("TestKey", key); + ++nextValue; + return CausallyConsistentCache::LookupResult(CachedValue(100 * nextValue), + Timestamp(nextValue)); + })); } TEST_F(ReadThroughCacheTest, InvalidateCacheSizeZeroReissuesLookup) { - int countLookups = 0; - CacheWithThreadPool cache( - getServiceContext(), 0, [&](OperationContext*, const std::string& key) { - ASSERT_EQ("TestKey", key); - countLookups++; - return Cache::LookupResult(CachedValue(1000 * countLookups)); - }); + auto fnTest = [&](auto cache) { + auto value = cache.acquire(_opCtx, "TestKey"); + ASSERT(value); + ASSERT_EQ(1000, value->counter); + ASSERT_EQ(1, cache.countLookups); - auto value = cache.acquire(_opCtx, "TestKey"); - ASSERT(value); - ASSERT_EQ(1000, value->counter); - ASSERT_EQ(1, countLookups); + // Because 'value' above is held alive, the cache will not perform lookup until it is + // destroyed + ASSERT_EQ(1000, cache.acquire(_opCtx, "TestKey")->counter); + ASSERT_EQ(1, cache.countLookups); - // Because 'value' above is held alive, the cache will not perform lookup until it is destroyed - ASSERT_EQ(1000, cache.acquire(_opCtx, "TestKey")->counter); - ASSERT_EQ(1, countLookups); + cache.invalidate("TestKey"); + auto valueAfterInvalidate = cache.acquire(_opCtx, "TestKey"); + ASSERT(!value.isValid()); + ASSERT(valueAfterInvalidate); + ASSERT_EQ(2000, valueAfterInvalidate->counter); + ASSERT_EQ(2, cache.countLookups); + }; - cache.invalidate("TestKey"); - auto valueAfterInvalidate = cache.acquire(_opCtx, "TestKey"); - ASSERT(!value.isValid()); - ASSERT(valueAfterInvalidate); - ASSERT_EQ(2000, valueAfterInvalidate->counter); - ASSERT_EQ(2, countLookups); + fnTest(CacheWithThreadPool<Cache>( + getServiceContext(), + 0, + [&, nextValue = 0](OperationContext*, const std::string& key) mutable { + ASSERT_EQ("TestKey", key); + return Cache::LookupResult(CachedValue(1000 * ++nextValue)); + })); + + fnTest(CacheWithThreadPool<CausallyConsistentCache>( + getServiceContext(), + 0, + [&, nextValue = 0](OperationContext*, const std::string& key) mutable { + ASSERT_EQ("TestKey", key); + ++nextValue; + return CausallyConsistentCache::LookupResult(CachedValue(1000 * nextValue), + Timestamp(nextValue)); + })); } TEST_F(ReadThroughCacheTest, KeyDoesNotExist) { - CacheWithThreadPool cache( + auto fnTest = [&](auto cache) { ASSERT(!cache.acquire(_opCtx, "TestKey")); }; + + fnTest(CacheWithThreadPool<Cache>( getServiceContext(), 1, [&](OperationContext*, const std::string& key) { ASSERT_EQ("TestKey", key); return Cache::LookupResult(boost::none); + })); + + fnTest(CacheWithThreadPool<CausallyConsistentCache>( + getServiceContext(), 1, [&](OperationContext*, const std::string& key) { + ASSERT_EQ("TestKey", key); + return CausallyConsistentCache::LookupResult(boost::none, Timestamp(10)); + })); +} + +TEST_F(ReadThroughCacheTest, CausalConsistency) { + boost::optional<CausallyConsistentCache::LookupResult> nextToReturn; + CacheWithThreadPool<CausallyConsistentCache> cache( + getServiceContext(), 1, [&](OperationContext*, const std::string& key) { + ASSERT_EQ("TestKey", key); + return CausallyConsistentCache::LookupResult(std::move(*nextToReturn)); }); - ASSERT(!cache.acquire(_opCtx, "TestKey")); + nextToReturn.emplace(CachedValue(10), Timestamp(10)); + ASSERT_EQ(10, cache.acquire(_opCtx, "TestKey", CacheCausalConsistency::kLatestCached)->counter); + ASSERT_EQ(10, cache.acquire(_opCtx, "TestKey", CacheCausalConsistency::kLatestKnown)->counter); + + nextToReturn.emplace(CachedValue(20), Timestamp(20)); + cache.advanceTimeInStore("TestKey", Timestamp(20)); + ASSERT_EQ(10, cache.acquire(_opCtx, "TestKey", CacheCausalConsistency::kLatestCached)->counter); + ASSERT(!cache.acquire(_opCtx, "TestKey", CacheCausalConsistency::kLatestCached).isValid()); + ASSERT_EQ(20, cache.acquire(_opCtx, "TestKey", CacheCausalConsistency::kLatestKnown)->counter); + ASSERT(cache.acquire(_opCtx, "TestKey", CacheCausalConsistency::kLatestKnown).isValid()); } /** @@ -190,7 +297,7 @@ class ReadThroughCacheAsyncTest : public unittest::Test, using Barrier = unittest::Barrier; -TEST_F(ReadThroughCacheAsyncTest, SuccessfulInProgressLookup) { +TEST_F(ReadThroughCacheAsyncTest, SuccessfulInProgressLookupForNotCausallyConsistentCache) { ThreadPool threadPool{ThreadPool::Options()}; threadPool.startup(); @@ -199,20 +306,23 @@ TEST_F(ReadThroughCacheAsyncTest, SuccessfulInProgressLookup) { }); Cache::InProgressLookup inProgress(cache, "TestKey"); - auto future = inProgress.addWaiter(WithLock::withoutLock()); + auto future = inProgress.addWaiter(WithLock::withoutLock(), CacheNotCausallyConsistent()); ASSERT(!future.isReady()); - auto optVal = inProgress.asyncLookupRound().get(); + auto res = inProgress.asyncLookupRound().get(); ASSERT(inProgress.valid(WithLock::withoutLock())); - ASSERT(optVal.v); - ASSERT_EQ(500, optVal.v->counter); - inProgress.signalWaiters(Cache::ValueHandle(std::move(*optVal.v))); + ASSERT(res.v); + ASSERT_EQ(500, res.v->counter); + auto promisesToSet = + inProgress.getPromisesLessThanTime(WithLock::withoutLock(), CacheNotCausallyConsistent()); + ASSERT_EQ(1U, promisesToSet.size()); + promisesToSet.front()->emplaceValue(std::move(*res.v)); ASSERT(future.isReady()); ASSERT_EQ(500, future.get()->counter); } -TEST_F(ReadThroughCacheAsyncTest, FailedInProgressLookup) { +TEST_F(ReadThroughCacheAsyncTest, FailedInProgressLookupForNotCausallyConsistentCache) { ThreadPool threadPool{ThreadPool::Options()}; threadPool.startup(); @@ -224,13 +334,15 @@ TEST_F(ReadThroughCacheAsyncTest, FailedInProgressLookup) { }); Cache::InProgressLookup inProgress(cache, "TestKey"); - auto future = inProgress.addWaiter(WithLock::withoutLock()); + auto future = inProgress.addWaiter(WithLock::withoutLock(), CacheNotCausallyConsistent()); ASSERT(!future.isReady()); auto asyncLookupResult = inProgress.asyncLookupRound().getNoThrow(); ASSERT_THROWS_CODE(inProgress.asyncLookupRound().get(), DBException, ErrorCodes::InternalError); ASSERT(inProgress.valid(WithLock::withoutLock())); - inProgress.signalWaiters(asyncLookupResult.getStatus()); + auto promisesToSet = inProgress.getAllPromisesOnError(WithLock::withoutLock()); + ASSERT_EQ(1U, promisesToSet.size()); + promisesToSet.front()->setFromStatusWith(asyncLookupResult.getStatus()); ASSERT(future.isReady()); ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::InternalError); @@ -337,31 +449,74 @@ TEST_F(ReadThroughCacheAsyncTest, AcquireWithAShutdownThreadPool) { Cache cache(getServiceContext(), threadPool, 1, [&](OperationContext*, const std::string&) { FAIL("Should not be called"); - return Cache::LookupResult(CachedValue(0)); // Will never be reached + return Cache::LookupResult(boost::none); // Will never be reached }); auto future = cache.acquireAsync("TestKey"); ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::ShutdownInProgress); } -TEST_F(ReadThroughCacheAsyncTest, InvalidateCalledBeforeLookupTaskExecutes) { - struct MockThreadPool : public ThreadPoolInterface { - void startup() override {} - void shutdown() override {} - void join() override {} - void schedule(Task task) override { - ASSERT(!mostRecentTask); - mostRecentTask = std::move(task); - } - void runMostRecentTask() { - ASSERT(mostRecentTask); - auto f = std::move(mostRecentTask); - f(Status::OK()); - } +class MockThreadPool : public ThreadPoolInterface { +public: + ~MockThreadPool() { + ASSERT(!_mostRecentTask); + } + void startup() override {} + void shutdown() override {} + void join() override {} + void schedule(Task task) override { + ASSERT(!_mostRecentTask); + _mostRecentTask = std::move(task); + } + void runMostRecentTask() { + ASSERT(_mostRecentTask); + auto f = std::move(_mostRecentTask); + f(Status::OK()); + } + +private: + Task _mostRecentTask; +}; + +TEST_F(ReadThroughCacheAsyncTest, AcquireAsyncAndAdvanceTimeInterleave) { + MockThreadPool threadPool; + boost::optional<CausallyConsistentCache::LookupResult> nextToReturn; + CausallyConsistentCache cache( + getServiceContext(), threadPool, 1, [&](OperationContext*, const std::string&) { + return std::move(*nextToReturn); + }); + + auto futureAtTS100 = cache.acquireAsync("TestKey"); + nextToReturn.emplace(CachedValue(100), Timestamp(100)); + threadPool.runMostRecentTask(); + ASSERT_EQ(100, futureAtTS100.get()->counter); + ASSERT(futureAtTS100.get().isValid()); + + cache.advanceTimeInStore("TestKey", Timestamp(150)); + auto futureAtTS150 = cache.acquireAsync("TestKey", CacheCausalConsistency::kLatestKnown); + ASSERT(!futureAtTS100.get().isValid()); + ASSERT(!futureAtTS150.isReady()); + + cache.advanceTimeInStore("TestKey", Timestamp(250)); + auto futureAtTS250 = cache.acquireAsync("TestKey", CacheCausalConsistency::kLatestKnown); + ASSERT(!futureAtTS100.get().isValid()); + ASSERT(!futureAtTS150.isReady()); + ASSERT(!futureAtTS250.isReady()); - Task mostRecentTask; - } threadPool; + nextToReturn.emplace(CachedValue(150), Timestamp(150)); + threadPool.runMostRecentTask(); + ASSERT_EQ(150, futureAtTS150.get()->counter); + ASSERT(!futureAtTS150.get().isValid()); + ASSERT(!futureAtTS250.isReady()); + nextToReturn.emplace(CachedValue(250), Timestamp(250)); + threadPool.runMostRecentTask(); + ASSERT_EQ(250, futureAtTS250.get()->counter); + ASSERT(futureAtTS250.get().isValid()); +} + +TEST_F(ReadThroughCacheAsyncTest, InvalidateCalledBeforeLookupTaskExecutes) { + MockThreadPool threadPool; Cache cache(getServiceContext(), threadPool, 1, [&](OperationContext*, const std::string&) { return Cache::LookupResult(CachedValue(123)); }); |