/** * Copyright (C) 2019-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include #include "mongo/db/operation_context.h" #include "mongo/platform/mutex.h" #include "mongo/util/concurrency/thread_pool_interface.h" #include "mongo/util/functional.h" #include "mongo/util/future.h" #include "mongo/util/invalidating_lru_cache.h" namespace mongo { /** * Serves as a container of the non-templatised parts of the ReadThroughCache class below. */ class ReadThroughCacheBase { ReadThroughCacheBase(const ReadThroughCacheBase&) = delete; ReadThroughCacheBase& operator=(const ReadThroughCacheBase&) = delete; protected: ReadThroughCacheBase(ServiceContext* service, ThreadPoolInterface& threadPool); virtual ~ReadThroughCacheBase(); /** * This method is an extension of ThreadPoolInterface::schedule, with the following additions: * - Creates a client and an operation context and executes the specified 'work' under that * environment * - Returns a CancelToken, which can be used to attempt to cancel 'work' * * If the task manages to get canceled before it is executed (through a call to tryCancel), * 'work' will be invoked out-of-line with a non-OK status, set to error code * ReadThroughCacheLookupCanceled. */ class CancelToken { public: struct TaskInfo; CancelToken(std::shared_ptr info); CancelToken(CancelToken&&); ~CancelToken(); void tryCancel(); private: std::shared_ptr _info; }; using WorkWithOpContext = unique_function; CancelToken _asyncWork(WorkWithOpContext work) noexcept; Date_t _now(); // Service context under which this cache has been instantiated (used for access to service-wide // functionality, such as client/operation context creation) ServiceContext* const _serviceContext; private: // Thread pool to be used for invoking the blocking 'lookup' calls ThreadPoolInterface& _threadPool; // Used to protect calls to 'tryCancel' above and is shared across all emitted CancelTokens. // Semantically, each CancelToken's interruption is independent from all the others so they // could have their own mutexes, but in the interest of not creating a mutex for each async task // spawned, we share the mutex here. // // Has a lock level of 2, meaning what while held, any code is only allowed to take the Client // lock. Mutex _cancelTokensMutex = MONGO_MAKE_LATCH("ReadThroughCacheBase::_cancelTokensMutex"); }; template struct ReadThroughCacheLookup { using Fn = unique_function; }; template struct ReadThroughCacheLookup { using Fn = unique_function; }; /** * Implements an (optionally) causally consistent read-through cache from Key to Value, built on top * of InvalidatingLRUCache. * * Causal consistency is provided by requiring the backing store to asociate every Value it returns * with a logical timestamp of type Time. */ template class ReadThroughCache : public ReadThroughCacheBase { /** * Data structure wrapping and expanding on the values stored in the cache. */ struct StoredValue { Value value; // Contains the wallclock time of when the value was fetched from the backing storage. This // value is not precise and should only be used for diagnostics purposes (i.e., it cannot be // relied on to perform any recency comparisons for example). Date_t updateWallClockTime; }; using Cache = InvalidatingLRUCache; public: template static constexpr bool IsComparable = Cache::template IsComparable; /** * Common type for values returned from the cache. */ class ValueHandle { public: // The three constructors below are present in order to offset the fact that the cache // doesn't support pinning items. Their only usage must be in the authorization mananager // for the internal authentication user. ValueHandle(Value&& value) : _valueHandle({std::move(value), Date_t::min()}) {} ValueHandle(Value&& value, const Time& t) : _valueHandle({std::move(value), Date_t::min()}, t) {} ValueHandle() = default; operator bool() const { return bool(_valueHandle); } bool isValid() const { return _valueHandle.isValid(); } const Time& getTime() const { return _valueHandle.getTime(); } Value* get() { return &_valueHandle->value; } const Value* get() const { return &_valueHandle->value; } Value& operator*() { return *get(); } const Value& operator*() const { return *get(); } Value* operator->() { return get(); } const Value* operator->() const { return get(); } /** * See the comments for `StoredValue::updateWallClockTime` above. */ Date_t updateWallClockTime() const { return _valueHandle->updateWallClockTime; } private: friend class ReadThroughCache; ValueHandle(typename Cache::ValueHandle&& valueHandle) : _valueHandle(std::move(valueHandle)) {} typename Cache::ValueHandle _valueHandle; }; /** * Signature for a blocking function to provide the value for a key when there is a cache miss. * * The implementation must throw a uassertion to indicate an error while looking up the value, * return boost::none if the key is not found, or return an actual value. * * See the comments on 'advanceTimeInStore' for additional requirements that this function must * fulfill with respect to causal consistency. */ struct LookupResult { // The 't' parameter is mandatory for causally-consistent caches, but not needed otherwise // (since the time never changes). Using a default of '= CacheNotCausallyConsistent()' // allows non-causally-consistent users to not have to pass a second parameter, but would // fail compilation if causally-consistent users forget to pass it. explicit LookupResult(boost::optional&& v, Time t = CacheNotCausallyConsistent()) : v(std::move(v)), t(std::move(t)) {} LookupResult(LookupResult&&) = default; LookupResult& operator=(LookupResult&&) = default; // If boost::none, it means the '_lookupFn' did not find the key in the store boost::optional v; // If value is boost::none, specifies the time which was passed to '_lookupFn', effectively // meaning, at least as of 'time', there was no entry in the store for the key. Otherwise // contains the time that the store returned for the 'value'. Time t; }; using LookupFn = typename ReadThroughCacheLookup::Fn; // Exposed publicly so it can be unit-tested indepedently of the usages in this class. Must not // be used independently. class InProgressLookup; /** * If 'key' is found in the cache and it fulfills the requested 'causalConsistency', returns a * set ValueHandle (its operator bool will be true). Otherwise, either causes the blocking * 'LookupFn' to be asynchronously invoked to fetch 'key' from the backing store or joins an * already scheduled invocation) and returns a future which will be signaled when the lookup * completes. * * If the lookup is successful and 'key' is found in the store, it will be cached (so subsequent * lookups won't have to re-fetch it) and the future will be set. If 'key' is not found in the * backing store, returns a not-set ValueHandle (it's bool operator will be false). If 'lookup' * fails, the future will be set to the appropriate exception and nothing will be cached, * meaning that subsequent calls to 'acquireAsync' will kick-off 'lookup' again. * * NOTES: * The returned value may be invalid by the time the caller gets to access it, if 'invalidate' * is called for 'key'. */ template requires(IsComparable&& std::is_constructible_v) SharedSemiFuture acquireAsync( const KeyType& key, CacheCausalConsistency causalConsistency = CacheCausalConsistency::kLatestCached) { // Fast path if (auto cachedValue = _cache.get(key, causalConsistency)) return {std::move(cachedValue)}; stdx::unique_lock ul(_mutex); // Re-check the cache under a mutex, before kicking-off the asynchronous lookup if (auto cachedValue = _cache.get(key, causalConsistency)) return {std::move(cachedValue)}; // Join an in-progress lookup if one has already been scheduled if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end()) return it->second->addWaiter(ul); // Schedule an asynchronous lookup for the key auto [cachedValue, timeInStore] = _cache.getCachedValueAndTimeInStore(key); auto [it, emplaced] = _inProgressLookups.emplace( key, std::make_unique( *this, Key(key), ValueHandle(std::move(cachedValue)), std::move(timeInStore))); invariant(emplaced); auto& inProgressLookup = *it->second; auto sharedFutureToReturn = inProgressLookup.addWaiter(ul); ul.unlock(); _doLookupWhileNotValid(Key(key), Status(ErrorCodes::Error(461540), "")).getAsync([](auto) { }); return sharedFutureToReturn; } /** * A blocking variant of 'acquireAsync' above - refer to it for more details. * * NOTES: * This is a potentially blocking method. */ template requires IsComparable ValueHandle acquire(OperationContext* opCtx, const KeyType& key, CacheCausalConsistency causalConsistency = CacheCausalConsistency::kLatestCached) { return acquireAsync(key, causalConsistency).get(opCtx); } /** * Acquires the latest value from the cache, or an empty ValueHandle if the key is not present * in the cache. * * Doesn't attempt to lookup, and so doesn't block, but this means it will ignore any * in-progress keys or keys whose time in store is newer than what is currently cached. */ template requires IsComparable ValueHandle peekLatestCached(const KeyType& key) { return {_cache.get(key, CacheCausalConsistency::kLatestCached)}; } /** * Returns a vector of the latest values from the cache which satisfy the predicate. * * Doesn't attempt to lookup, and so doesn't block, but this means it will ignore any * in-progress keys or keys whose time in store is newer than what is currently cached. */ template std::vector peekLatestCachedIf(const Pred& pred) { auto invalidatingCacheValues = [&] { stdx::lock_guard lg(_mutex); return _cache.getLatestCachedIf( [&](const Key& key, const StoredValue* value) { return pred(key, value->value); }); }(); std::vector valueHandles; valueHandles.reserve(invalidatingCacheValues.size()); std::transform(invalidatingCacheValues.begin(), invalidatingCacheValues.end(), std::back_inserter(valueHandles), [](auto& invalidatingCacheValue) { return ValueHandle(std::move(invalidatingCacheValue)); }); return valueHandles; } /** * Invalidates the given 'key' and immediately replaces it with a new value. * * The 'time' parameter is mandatory for causally-consistent caches, but not needed otherwise * (since the time never changes). */ void insertOrAssign(const Key& key, Value&& value, Date_t updateWallClockTime) { MONGO_STATIC_ASSERT_MSG( !isCausallyConsistent