summaryrefslogtreecommitdiff
path: root/src/mongo/util/read_through_cache.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/util/read_through_cache.h')
-rw-r--r--src/mongo/util/read_through_cache.h268
1 files changed, 139 insertions, 129 deletions
diff --git a/src/mongo/util/read_through_cache.h b/src/mongo/util/read_through_cache.h
index a8ab84ae19f..29365a6b72c 100644
--- a/src/mongo/util/read_through_cache.h
+++ b/src/mongo/util/read_through_cache.h
@@ -54,11 +54,10 @@ protected:
virtual ~ReadThroughCacheBase();
/**
- * This method is an extension of ThreadPoolInterface::schedule, which in addition creates a
- * client and an operation context and executes the specified 'work' under that environment. The
- * difference is that instead of passing a status to 'work' in order to indicate an in-line
- * execution, the function will throw without actually calling 'work' (see 'schedule' for more
- * details on in-line execution).
+ * This method is an extension of ThreadPoolInterface::schedule, with the following additions:
+ * - Creates a client and an operation context and executes the specified 'work' under that
+ * environment
+ * - Returns a CancelToken, which can be used to attempt to cancel 'work'
*
* If the task manages to get canceled before it is executed (through a call to tryCancel),
* 'work' will be invoked out-of-line with a non-OK status, set to error code
@@ -77,7 +76,9 @@ protected:
std::shared_ptr<TaskInfo> _info;
};
using WorkWithOpContext = unique_function<void(OperationContext*, const Status&)>;
- CancelToken _asyncWork(WorkWithOpContext work);
+ CancelToken _asyncWork(WorkWithOpContext work) noexcept;
+
+ Date_t _now();
// Service context under which this cache has been instantiated (used for access to service-wide
// functionality, such as client/operation context creation)
@@ -180,7 +181,19 @@ public:
* The implementation must throw a uassertion to indicate an error while looking up the value,
* return boost::none if the key is not found, or return an actual value.
*/
- using LookupFn = unique_function<boost::optional<Value>(OperationContext*, const Key&)>;
+ struct LookupResult {
+ explicit LookupResult(boost::optional<Value>&& v) : v(std::move(v)) {}
+ LookupResult(LookupResult&&) = default;
+ LookupResult& operator=(LookupResult&&) = default;
+
+ // If boost::none, it means the '_lookupFn' did not find the key in the store
+ boost::optional<Value> v;
+ };
+ using LookupFn = unique_function<LookupResult(OperationContext*, const Key&)>;
+
+ // Exposed publicly so it can be unit-tested indepedently of the usages in this class. Must not
+ // be used independently.
+ class InProgressLookup;
/**
* If 'key' is found in the cache, returns a set ValueHandle (its operator bool will be true).
@@ -195,8 +208,8 @@ public:
* meaning that subsequent calls to 'acquireAsync' will kick-off 'lookup' again.
*
* NOTES:
- * The returned value may be invalid by the time the caller gets access to it if invalidate is
- * called for 'key'.
+ * The returned value may be invalid by the time the caller gets to access it, if 'invalidate'
+ * is called for 'key'.
*/
SharedSemiFuture<ValueHandle> acquireAsync(const Key& key) {
// Fast path
@@ -211,54 +224,18 @@ public:
// Join an in-progress lookup if one has already been scheduled
if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end())
- return it->second->sharedPromise.getFuture();
+ return it->second->addWaiter(ul);
- // Schedule an asynchronous lookup for the key and then loop around and wait for it to
- // complete
+ // Schedule an asynchronous lookup for the key
+ auto [it, emplaced] =
+ _inProgressLookups.emplace(key, std::make_unique<InProgressLookup>(*this, key));
+ invariant(emplaced);
+ auto& inProgressLookup = *it->second;
+ auto sharedFutureToReturn = inProgressLookup.addWaiter(ul);
- auto [kickOffAsyncLookupPromise, f] = makePromiseFuture<void>();
-
- auto emplaceResult =
- _inProgressLookups.emplace(key, std::make_unique<InProgressLookup>(key));
- invariant(emplaceResult.second /* emplaced */);
- auto& inProgressLookup = *emplaceResult.first->second;
- auto sharedFutureToReturn = inProgressLookup.sharedPromise.getFuture();
ul.unlock();
- // Construct the future chain before scheduling the async work so it doesn't execute inline
- // if it so happens that the async work completes by the time the future is constructed, or
- // if it executes inline due to the task executor being shut down.
- std::move(f)
- .then([this, &inProgressLookup] {
- stdx::unique_lock ul(_mutex);
- return _asyncLookupWhileInvalidated(std::move(ul), inProgressLookup);
- })
- .getAsync([this, key](StatusWith<Value> swValue) {
- stdx::unique_lock ul(_mutex);
- auto it = _inProgressLookups.find(key);
- invariant(it != _inProgressLookups.end());
- auto inProgressLookup = std::move(it->second);
- _inProgressLookups.erase(it);
-
- StatusWith<ValueHandle> swValueHandle(ErrorCodes::InternalError,
- "ReadThroughCache");
- if (swValue.isOK()) {
- swValueHandle = ValueHandle(_cache.insertOrAssignAndGet(
- key,
- {std::move(swValue.getValue()),
- _serviceContext->getFastClockSource()->now()}));
- } else if (swValue == ErrorCodes::ReadThroughCacheKeyNotFound) {
- swValueHandle = ValueHandle();
- } else {
- swValueHandle = swValue.getStatus();
- }
-
- ul.unlock();
-
- inProgressLookup->sharedPromise.setFromStatusWith(std::move(swValueHandle));
- });
-
- kickOffAsyncLookupPromise.emplaceValue();
+ _doLookupWhileNotValid(key, Status(ErrorCodes::Error(461540), "")).getAsync([](auto) {});
return sharedFutureToReturn;
}
@@ -279,7 +256,7 @@ public:
ValueHandle insertOrAssignAndGet(const Key& key, Value&& newValue, Date_t updateWallClockTime) {
stdx::lock_guard lg(_mutex);
if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end())
- it->second->invalidate(lg);
+ it->second->invalidateAndCancelCurrentLookupRound(lg);
return _cache.insertOrAssignAndGet(key, {std::move(newValue), updateWallClockTime});
}
@@ -296,7 +273,7 @@ public:
void invalidate(const Key& key) {
stdx::lock_guard lg(_mutex);
if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end())
- it->second->invalidate(lg);
+ it->second->invalidateAndCancelCurrentLookupRound(lg);
_cache.invalidate(key);
}
@@ -305,7 +282,7 @@ public:
stdx::lock_guard lg(_mutex);
for (auto& entry : _inProgressLookups) {
if (predicate(entry.first))
- entry.second->invalidate(lg);
+ entry.second->invalidateAndCancelCurrentLookupRound(lg);
}
_cache.invalidateIf([&](const Key& key, const StoredValue*) { return predicate(key); });
}
@@ -354,86 +331,42 @@ protected:
}
private:
- // Refer to the comments on '_asyncLookupWhileInvalidated' for more detail on how this structure
- // is used.
- struct InProgressLookup {
- InProgressLookup(Key key) : key(std::move(key)) {}
-
- void invalidate(WithLock) {
- invalidated = true;
- if (cancelToken)
- cancelToken->tryCancel();
- }
-
- Key key;
- SharedPromise<ValueHandle> sharedPromise;
-
- bool invalidated;
- boost::optional<CancelToken> cancelToken;
- };
using InProgressLookupsMap = stdx::unordered_map<Key, std::unique_ptr<InProgressLookup>>;
/**
- * This method is expected to be called with a constructed InProgressLookup object, emplaced on
- * '_inProgressLookups' (represented by the 'inProgressLookup' argument). It implements an
- * asynchronous "while (invalidated)" loop over the in-progress key referenced by
- * 'inProgressLookup', which *must* be kept valid by the caller until the returned Future
- * completes.
- *
- * The returned Future will be complete when that loop exists and will contain the latest value
- * (or error) returned by 'lookup'.
- *
- * If thought of sequentially, the loop looks like this:
- *
- * while (true) {
- * inProgressLookup.invalidated = false;
- * inProgressLookup.cancelToken.reset();
- * valueOrError = lookup(key);
- * if (!inProgressLookup.invalidated)
- * return valueOrError; // signals the future
- * }
+ * This method implements an asynchronous "while (!valid)" loop over 'key', which must be on the
+ * in-progress map.
*/
- Future<Value> _asyncLookupWhileInvalidated(stdx::unique_lock<Mutex> ul,
- InProgressLookup& inProgressLookup) noexcept {
- auto [promise, f] = makePromiseFuture<Value>();
- auto p = std::make_shared<Promise<Value>>(std::move(promise));
-
- // Construct the future chain before scheduling the async work so it doesn't execute inline
- auto future =
- std::move(f).onCompletion([this, &inProgressLookup](StatusWith<Value> swValue) {
- stdx::unique_lock ul(_mutex);
- if (!inProgressLookup.invalidated)
- return Future<Value>::makeReady(uassertStatusOK(std::move(swValue)));
-
- inProgressLookup.cancelToken.reset();
- return _asyncLookupWhileInvalidated(std::move(ul), inProgressLookup);
- });
+ Future<LookupResult> _doLookupWhileNotValid(Key key, StatusWith<LookupResult> sw) {
+ stdx::unique_lock ul(_mutex);
+ auto it = _inProgressLookups.find(key);
+ invariant(it != _inProgressLookups.end());
+ if (!ErrorCodes::isCancelationError(sw.getStatus()) && !it->second->valid(ul)) {
+ ul.unlock(); // asyncLookupRound also acquires the mutex
+ return it->second->asyncLookupRound().onCompletion(
+ [this, key](auto sw) { return _doLookupWhileNotValid(key, std::move(sw)); });
+ }
- invariant(!inProgressLookup.cancelToken);
- inProgressLookup.invalidated = false;
- try {
- inProgressLookup.cancelToken.emplace(_asyncWork([ this, p, &inProgressLookup ](
- OperationContext * opCtx, const Status& status) mutable noexcept {
- p->setWith([&]() mutable {
- uassertStatusOK(status);
- auto value = _lookupFn(opCtx, inProgressLookup.key);
- uassert(ErrorCodes::ReadThroughCacheKeyNotFound,
- "Internal only: key not found",
- value);
- return std::move(*value);
- });
- }));
- } catch (const ExceptionForCat<ErrorCategory::CancelationError>& ex) {
- // The thread pool is being shut down, so this is an inline execution
- invariant(!inProgressLookup.invalidated);
- invariant(!inProgressLookup.cancelToken);
-
- ul.unlock();
- p->setError(ex.toStatus());
+ // The detachment of the currently active lookup and the placement of the result on the
+ // '_cache' has to be atomic with respect to a concurrent call to 'invalidate'
+ auto inProgressLookup(std::move(it->second));
+ _inProgressLookups.erase(it);
+
+ StatusWith<ValueHandle> swValueHandle(ErrorCodes::Error(461541), "");
+ if (sw.isOK()) {
+ auto result = std::move(sw.getValue());
+ swValueHandle = result.v
+ ? ValueHandle(_cache.insertOrAssignAndGet(key, {std::move(*result.v), _now()}))
+ : ValueHandle();
+ } else {
+ swValueHandle = sw.getStatus();
}
+ ul.unlock();
- return std::move(future);
- };
+ inProgressLookup->signalWaiters(std::move(swValueHandle));
+
+ return Future<LookupResult>::makeReady(Status(ErrorCodes::Error(461542), ""));
+ }
// Blocking function which will be invoked to retrieve entries from the backing store
const LookupFn _lookupFn;
@@ -454,4 +387,81 @@ private:
InProgressLookupsMap _inProgressLookups;
};
+/**
+ * This class represents an in-progress lookup for a specific key and implements the guarantees of
+ * the invalidation logic as described in the comments of 'ReadThroughCache::invalidate'.
+ *
+ * It is intended to be used in conjunction with the 'ReadThroughCache', which operates on it under
+ * its '_mutex' and ensures there is always at most a single active instance at a time active for
+ * each 'key'.
+ *
+ * The methods of this class are not thread-safe, unless indicated in the comments.
+ *
+ * Its lifecycle is intended to be like this:
+ *
+ * inProgressLookups.emplace(inProgress);
+ * while (true) {
+ * result = inProgress.asyncLookupRound();
+ * if (!inProgress.valid()) {
+ * continue;
+ * }
+ *
+ * inProgressLookups.remove(inProgress)
+ * cachedValues.insert(result);
+ * inProgress.signalWaiters(result);
+ * }
+ */
+template <typename Key, typename Value>
+class ReadThroughCache<Key, Value>::InProgressLookup {
+public:
+ InProgressLookup(ReadThroughCache& cache, Key key) : _cache(cache), _key(std::move(key)) {}
+
+ Future<LookupResult> asyncLookupRound() {
+ auto [promise, future] = makePromiseFuture<LookupResult>();
+
+ stdx::lock_guard lg(_cache._mutex);
+ _valid = true;
+ _cancelToken.emplace(_cache._asyncWork([ this, promise = std::move(promise) ](
+ OperationContext * opCtx, const Status& status) mutable noexcept {
+ promise.setWith([&] {
+ uassertStatusOK(status);
+ return _cache._lookupFn(opCtx, _key);
+ });
+ }));
+
+ return std::move(future);
+ }
+
+ SharedSemiFuture<ValueHandle> addWaiter(WithLock) {
+ return _sharedPromise.getFuture();
+ }
+
+ bool valid(WithLock) const {
+ return _valid;
+ }
+
+ void invalidateAndCancelCurrentLookupRound(WithLock) {
+ _valid = false;
+ if (_cancelToken)
+ _cancelToken->tryCancel();
+ }
+
+ void signalWaiters(StatusWith<ValueHandle> swValueHandle) {
+ invariant(_valid);
+ _sharedPromise.setFromStatusWith(std::move(swValueHandle));
+ }
+
+private:
+ // The owning cache, from which mutex, lookupFn, async task scheduling, etc. will be used. It is
+ // the responsibility of the owning cache to join all outstanding lookups at destruction time.
+ ReadThroughCache& _cache;
+
+ const Key _key;
+
+ bool _valid{false};
+ boost::optional<CancelToken> _cancelToken;
+
+ SharedPromise<ValueHandle> _sharedPromise;
+};
+
} // namespace mongo