diff options
Diffstat (limited to 'src/mongo/util/read_through_cache.h')
-rw-r--r-- | src/mongo/util/read_through_cache.h | 268 |
1 files changed, 139 insertions, 129 deletions
diff --git a/src/mongo/util/read_through_cache.h b/src/mongo/util/read_through_cache.h index a8ab84ae19f..29365a6b72c 100644 --- a/src/mongo/util/read_through_cache.h +++ b/src/mongo/util/read_through_cache.h @@ -54,11 +54,10 @@ protected: virtual ~ReadThroughCacheBase(); /** - * This method is an extension of ThreadPoolInterface::schedule, which in addition creates a - * client and an operation context and executes the specified 'work' under that environment. The - * difference is that instead of passing a status to 'work' in order to indicate an in-line - * execution, the function will throw without actually calling 'work' (see 'schedule' for more - * details on in-line execution). + * This method is an extension of ThreadPoolInterface::schedule, with the following additions: + * - Creates a client and an operation context and executes the specified 'work' under that + * environment + * - Returns a CancelToken, which can be used to attempt to cancel 'work' * * If the task manages to get canceled before it is executed (through a call to tryCancel), * 'work' will be invoked out-of-line with a non-OK status, set to error code @@ -77,7 +76,9 @@ protected: std::shared_ptr<TaskInfo> _info; }; using WorkWithOpContext = unique_function<void(OperationContext*, const Status&)>; - CancelToken _asyncWork(WorkWithOpContext work); + CancelToken _asyncWork(WorkWithOpContext work) noexcept; + + Date_t _now(); // Service context under which this cache has been instantiated (used for access to service-wide // functionality, such as client/operation context creation) @@ -180,7 +181,19 @@ public: * The implementation must throw a uassertion to indicate an error while looking up the value, * return boost::none if the key is not found, or return an actual value. */ - using LookupFn = unique_function<boost::optional<Value>(OperationContext*, const Key&)>; + struct LookupResult { + explicit LookupResult(boost::optional<Value>&& v) : v(std::move(v)) {} + LookupResult(LookupResult&&) = default; + LookupResult& operator=(LookupResult&&) = default; + + // If boost::none, it means the '_lookupFn' did not find the key in the store + boost::optional<Value> v; + }; + using LookupFn = unique_function<LookupResult(OperationContext*, const Key&)>; + + // Exposed publicly so it can be unit-tested indepedently of the usages in this class. Must not + // be used independently. + class InProgressLookup; /** * If 'key' is found in the cache, returns a set ValueHandle (its operator bool will be true). @@ -195,8 +208,8 @@ public: * meaning that subsequent calls to 'acquireAsync' will kick-off 'lookup' again. * * NOTES: - * The returned value may be invalid by the time the caller gets access to it if invalidate is - * called for 'key'. + * The returned value may be invalid by the time the caller gets to access it, if 'invalidate' + * is called for 'key'. */ SharedSemiFuture<ValueHandle> acquireAsync(const Key& key) { // Fast path @@ -211,54 +224,18 @@ public: // Join an in-progress lookup if one has already been scheduled if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end()) - return it->second->sharedPromise.getFuture(); + return it->second->addWaiter(ul); - // Schedule an asynchronous lookup for the key and then loop around and wait for it to - // complete + // Schedule an asynchronous lookup for the key + auto [it, emplaced] = + _inProgressLookups.emplace(key, std::make_unique<InProgressLookup>(*this, key)); + invariant(emplaced); + auto& inProgressLookup = *it->second; + auto sharedFutureToReturn = inProgressLookup.addWaiter(ul); - auto [kickOffAsyncLookupPromise, f] = makePromiseFuture<void>(); - - auto emplaceResult = - _inProgressLookups.emplace(key, std::make_unique<InProgressLookup>(key)); - invariant(emplaceResult.second /* emplaced */); - auto& inProgressLookup = *emplaceResult.first->second; - auto sharedFutureToReturn = inProgressLookup.sharedPromise.getFuture(); ul.unlock(); - // Construct the future chain before scheduling the async work so it doesn't execute inline - // if it so happens that the async work completes by the time the future is constructed, or - // if it executes inline due to the task executor being shut down. - std::move(f) - .then([this, &inProgressLookup] { - stdx::unique_lock ul(_mutex); - return _asyncLookupWhileInvalidated(std::move(ul), inProgressLookup); - }) - .getAsync([this, key](StatusWith<Value> swValue) { - stdx::unique_lock ul(_mutex); - auto it = _inProgressLookups.find(key); - invariant(it != _inProgressLookups.end()); - auto inProgressLookup = std::move(it->second); - _inProgressLookups.erase(it); - - StatusWith<ValueHandle> swValueHandle(ErrorCodes::InternalError, - "ReadThroughCache"); - if (swValue.isOK()) { - swValueHandle = ValueHandle(_cache.insertOrAssignAndGet( - key, - {std::move(swValue.getValue()), - _serviceContext->getFastClockSource()->now()})); - } else if (swValue == ErrorCodes::ReadThroughCacheKeyNotFound) { - swValueHandle = ValueHandle(); - } else { - swValueHandle = swValue.getStatus(); - } - - ul.unlock(); - - inProgressLookup->sharedPromise.setFromStatusWith(std::move(swValueHandle)); - }); - - kickOffAsyncLookupPromise.emplaceValue(); + _doLookupWhileNotValid(key, Status(ErrorCodes::Error(461540), "")).getAsync([](auto) {}); return sharedFutureToReturn; } @@ -279,7 +256,7 @@ public: ValueHandle insertOrAssignAndGet(const Key& key, Value&& newValue, Date_t updateWallClockTime) { stdx::lock_guard lg(_mutex); if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end()) - it->second->invalidate(lg); + it->second->invalidateAndCancelCurrentLookupRound(lg); return _cache.insertOrAssignAndGet(key, {std::move(newValue), updateWallClockTime}); } @@ -296,7 +273,7 @@ public: void invalidate(const Key& key) { stdx::lock_guard lg(_mutex); if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end()) - it->second->invalidate(lg); + it->second->invalidateAndCancelCurrentLookupRound(lg); _cache.invalidate(key); } @@ -305,7 +282,7 @@ public: stdx::lock_guard lg(_mutex); for (auto& entry : _inProgressLookups) { if (predicate(entry.first)) - entry.second->invalidate(lg); + entry.second->invalidateAndCancelCurrentLookupRound(lg); } _cache.invalidateIf([&](const Key& key, const StoredValue*) { return predicate(key); }); } @@ -354,86 +331,42 @@ protected: } private: - // Refer to the comments on '_asyncLookupWhileInvalidated' for more detail on how this structure - // is used. - struct InProgressLookup { - InProgressLookup(Key key) : key(std::move(key)) {} - - void invalidate(WithLock) { - invalidated = true; - if (cancelToken) - cancelToken->tryCancel(); - } - - Key key; - SharedPromise<ValueHandle> sharedPromise; - - bool invalidated; - boost::optional<CancelToken> cancelToken; - }; using InProgressLookupsMap = stdx::unordered_map<Key, std::unique_ptr<InProgressLookup>>; /** - * This method is expected to be called with a constructed InProgressLookup object, emplaced on - * '_inProgressLookups' (represented by the 'inProgressLookup' argument). It implements an - * asynchronous "while (invalidated)" loop over the in-progress key referenced by - * 'inProgressLookup', which *must* be kept valid by the caller until the returned Future - * completes. - * - * The returned Future will be complete when that loop exists and will contain the latest value - * (or error) returned by 'lookup'. - * - * If thought of sequentially, the loop looks like this: - * - * while (true) { - * inProgressLookup.invalidated = false; - * inProgressLookup.cancelToken.reset(); - * valueOrError = lookup(key); - * if (!inProgressLookup.invalidated) - * return valueOrError; // signals the future - * } + * This method implements an asynchronous "while (!valid)" loop over 'key', which must be on the + * in-progress map. */ - Future<Value> _asyncLookupWhileInvalidated(stdx::unique_lock<Mutex> ul, - InProgressLookup& inProgressLookup) noexcept { - auto [promise, f] = makePromiseFuture<Value>(); - auto p = std::make_shared<Promise<Value>>(std::move(promise)); - - // Construct the future chain before scheduling the async work so it doesn't execute inline - auto future = - std::move(f).onCompletion([this, &inProgressLookup](StatusWith<Value> swValue) { - stdx::unique_lock ul(_mutex); - if (!inProgressLookup.invalidated) - return Future<Value>::makeReady(uassertStatusOK(std::move(swValue))); - - inProgressLookup.cancelToken.reset(); - return _asyncLookupWhileInvalidated(std::move(ul), inProgressLookup); - }); + Future<LookupResult> _doLookupWhileNotValid(Key key, StatusWith<LookupResult> sw) { + stdx::unique_lock ul(_mutex); + auto it = _inProgressLookups.find(key); + invariant(it != _inProgressLookups.end()); + if (!ErrorCodes::isCancelationError(sw.getStatus()) && !it->second->valid(ul)) { + ul.unlock(); // asyncLookupRound also acquires the mutex + return it->second->asyncLookupRound().onCompletion( + [this, key](auto sw) { return _doLookupWhileNotValid(key, std::move(sw)); }); + } - invariant(!inProgressLookup.cancelToken); - inProgressLookup.invalidated = false; - try { - inProgressLookup.cancelToken.emplace(_asyncWork([ this, p, &inProgressLookup ]( - OperationContext * opCtx, const Status& status) mutable noexcept { - p->setWith([&]() mutable { - uassertStatusOK(status); - auto value = _lookupFn(opCtx, inProgressLookup.key); - uassert(ErrorCodes::ReadThroughCacheKeyNotFound, - "Internal only: key not found", - value); - return std::move(*value); - }); - })); - } catch (const ExceptionForCat<ErrorCategory::CancelationError>& ex) { - // The thread pool is being shut down, so this is an inline execution - invariant(!inProgressLookup.invalidated); - invariant(!inProgressLookup.cancelToken); - - ul.unlock(); - p->setError(ex.toStatus()); + // The detachment of the currently active lookup and the placement of the result on the + // '_cache' has to be atomic with respect to a concurrent call to 'invalidate' + auto inProgressLookup(std::move(it->second)); + _inProgressLookups.erase(it); + + StatusWith<ValueHandle> swValueHandle(ErrorCodes::Error(461541), ""); + if (sw.isOK()) { + auto result = std::move(sw.getValue()); + swValueHandle = result.v + ? ValueHandle(_cache.insertOrAssignAndGet(key, {std::move(*result.v), _now()})) + : ValueHandle(); + } else { + swValueHandle = sw.getStatus(); } + ul.unlock(); - return std::move(future); - }; + inProgressLookup->signalWaiters(std::move(swValueHandle)); + + return Future<LookupResult>::makeReady(Status(ErrorCodes::Error(461542), "")); + } // Blocking function which will be invoked to retrieve entries from the backing store const LookupFn _lookupFn; @@ -454,4 +387,81 @@ private: InProgressLookupsMap _inProgressLookups; }; +/** + * This class represents an in-progress lookup for a specific key and implements the guarantees of + * the invalidation logic as described in the comments of 'ReadThroughCache::invalidate'. + * + * It is intended to be used in conjunction with the 'ReadThroughCache', which operates on it under + * its '_mutex' and ensures there is always at most a single active instance at a time active for + * each 'key'. + * + * The methods of this class are not thread-safe, unless indicated in the comments. + * + * Its lifecycle is intended to be like this: + * + * inProgressLookups.emplace(inProgress); + * while (true) { + * result = inProgress.asyncLookupRound(); + * if (!inProgress.valid()) { + * continue; + * } + * + * inProgressLookups.remove(inProgress) + * cachedValues.insert(result); + * inProgress.signalWaiters(result); + * } + */ +template <typename Key, typename Value> +class ReadThroughCache<Key, Value>::InProgressLookup { +public: + InProgressLookup(ReadThroughCache& cache, Key key) : _cache(cache), _key(std::move(key)) {} + + Future<LookupResult> asyncLookupRound() { + auto [promise, future] = makePromiseFuture<LookupResult>(); + + stdx::lock_guard lg(_cache._mutex); + _valid = true; + _cancelToken.emplace(_cache._asyncWork([ this, promise = std::move(promise) ]( + OperationContext * opCtx, const Status& status) mutable noexcept { + promise.setWith([&] { + uassertStatusOK(status); + return _cache._lookupFn(opCtx, _key); + }); + })); + + return std::move(future); + } + + SharedSemiFuture<ValueHandle> addWaiter(WithLock) { + return _sharedPromise.getFuture(); + } + + bool valid(WithLock) const { + return _valid; + } + + void invalidateAndCancelCurrentLookupRound(WithLock) { + _valid = false; + if (_cancelToken) + _cancelToken->tryCancel(); + } + + void signalWaiters(StatusWith<ValueHandle> swValueHandle) { + invariant(_valid); + _sharedPromise.setFromStatusWith(std::move(swValueHandle)); + } + +private: + // The owning cache, from which mutex, lookupFn, async task scheduling, etc. will be used. It is + // the responsibility of the owning cache to join all outstanding lookups at destruction time. + ReadThroughCache& _cache; + + const Key _key; + + bool _valid{false}; + boost::optional<CancelToken> _cancelToken; + + SharedPromise<ValueHandle> _sharedPromise; +}; + } // namespace mongo |