summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2020-05-10 06:48:54 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-05-17 15:16:51 +0000
commit7e2111ef33fc40959a254bd3109466176ae60718 (patch)
tree2a31ac8ddccccb24784b161839fa1ca92aeb10bf
parenta7f769dd597e33e988832c43c99912c1d3139c9b (diff)
downloadmongo-7e2111ef33fc40959a254bd3109466176ae60718.tar.gz
SERVER-46154 Pull the InProgressLookup outside of ReadThroughCache
The InProgressLookup tracking already has quite complicated logic, so it seems prudent to pull it into a separate class, outside of the ReadThroughCache so it can be tested independently.
-rw-r--r--src/mongo/base/error_codes.yml1
-rw-r--r--src/mongo/db/auth/authorization_manager_impl.cpp13
-rw-r--r--src/mongo/db/auth/authorization_manager_impl.h4
-rw-r--r--src/mongo/db/read_write_concern_defaults.cpp11
-rw-r--r--src/mongo/util/invalidating_lru_cache.h3
-rw-r--r--src/mongo/util/invalidating_lru_cache_test.cpp8
-rw-r--r--src/mongo/util/net/ssl_manager_openssl.cpp11
-rw-r--r--src/mongo/util/read_through_cache.cpp22
-rw-r--r--src/mongo/util/read_through_cache.h268
-rw-r--r--src/mongo/util/read_through_cache_test.cpp111
10 files changed, 260 insertions, 192 deletions
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index 3df82a37c51..29de011915d 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -352,7 +352,6 @@ error_codes:
# Internal only codes used by the ReadThroughCache and must never be returned in a network
# response
- - {code: 305,name: ReadThroughCacheKeyNotFound}
- {code: 306,name: ReadThroughCacheLookupCanceled}
- {code: 307,name: RangeDeletionAbandonedBecauseTaskDocumentDoesNotExist}
diff --git a/src/mongo/db/auth/authorization_manager_impl.cpp b/src/mongo/db/auth/authorization_manager_impl.cpp
index e3552ae7332..5f6e9ab1380 100644
--- a/src/mongo/db/auth/authorization_manager_impl.cpp
+++ b/src/mongo/db/auth/authorization_manager_impl.cpp
@@ -663,14 +663,14 @@ AuthorizationManagerImpl::AuthSchemaVersionCache::AuthSchemaVersionCache(
1 /* cacheSize */),
_externalState(externalState) {}
-boost::optional<int> AuthorizationManagerImpl::AuthSchemaVersionCache::_lookup(
- OperationContext* opCtx, int unusedKey) {
+AuthorizationManagerImpl::AuthSchemaVersionCache::LookupResult
+AuthorizationManagerImpl::AuthSchemaVersionCache::_lookup(OperationContext* opCtx, int unusedKey) {
invariant(unusedKey == 0);
int authzVersion;
uassertStatusOK(_externalState->getStoredAuthorizationVersion(opCtx, &authzVersion));
- return authzVersion;
+ return LookupResult(authzVersion);
}
AuthorizationManagerImpl::UserCacheImpl::UserCacheImpl(
@@ -689,8 +689,9 @@ AuthorizationManagerImpl::UserCacheImpl::UserCacheImpl(
_authSchemaVersionCache(authSchemaVersionCache),
_externalState(externalState) {}
-boost::optional<User> AuthorizationManagerImpl::UserCacheImpl::_lookup(OperationContext* opCtx,
- const UserRequest& userReq) {
+AuthorizationManagerImpl::UserCacheImpl::LookupResult
+AuthorizationManagerImpl::UserCacheImpl::_lookup(OperationContext* opCtx,
+ const UserRequest& userReq) {
LOGV2_DEBUG(20238, 1, "Getting user record", "user"_attr = userReq.name);
// Number of times to retry a user document that fetches due to transient AuthSchemaIncompatible
@@ -713,7 +714,7 @@ boost::optional<User> AuthorizationManagerImpl::UserCacheImpl::_lookup(Operation
User user(userReq.name);
uassertStatusOK(initializeUserFromPrivilegeDocument(&user, userObj));
- return user;
+ return LookupResult(std::move(user));
}
case schemaVersion24:
_authSchemaVersionCache->invalidateAll();
diff --git a/src/mongo/db/auth/authorization_manager_impl.h b/src/mongo/db/auth/authorization_manager_impl.h
index 17a3d731f1f..611aa427662 100644
--- a/src/mongo/db/auth/authorization_manager_impl.h
+++ b/src/mongo/db/auth/authorization_manager_impl.h
@@ -158,7 +158,7 @@ private:
// Even though the dist cache permits for lookup to return boost::none for non-existent
// values, the contract of the authorization manager is that it should throw an exception if
// the value can not be loaded, so if it returns, the value will always be set.
- boost::optional<int> _lookup(OperationContext* opCtx, int unusedKey);
+ LookupResult _lookup(OperationContext* opCtx, int unusedKey);
Mutex _mutex =
MONGO_MAKE_LATCH("AuthorizationManagerImpl::AuthSchemaVersionDistCache::_mutex");
@@ -181,7 +181,7 @@ private:
// Even though the dist cache permits for lookup to return boost::none for non-existent
// values, the contract of the authorization manager is that it should throw an exception if
// the value can not be loaded, so if it returns, the value will always be set.
- boost::optional<User> _lookup(OperationContext* opCtx, const UserRequest& user);
+ LookupResult _lookup(OperationContext* opCtx, const UserRequest& user);
Mutex _mutex = MONGO_MAKE_LATCH("AuthorizationManagerImpl::UserDistCacheImpl::_mutex");
diff --git a/src/mongo/db/read_write_concern_defaults.cpp b/src/mongo/db/read_write_concern_defaults.cpp
index f33c78215a3..3c69751348a 100644
--- a/src/mongo/db/read_write_concern_defaults.cpp
+++ b/src/mongo/db/read_write_concern_defaults.cpp
@@ -237,11 +237,12 @@ ReadWriteConcernDefaults::~ReadWriteConcernDefaults() = default;
ReadWriteConcernDefaults::Cache::Cache(ServiceContext* service,
ThreadPoolInterface& threadPool,
FetchDefaultsFn fetchDefaultsFn)
- : ReadThroughCache(_mutex,
- service,
- threadPool,
- [this](OperationContext* opCtx, Type) { return lookup(opCtx); },
- 1 /* cacheSize */),
+ : ReadThroughCache(
+ _mutex,
+ service,
+ threadPool,
+ [this](OperationContext* opCtx, Type) { return LookupResult(lookup(opCtx)); },
+ 1 /* cacheSize */),
_fetchDefaultsFn(std::move(fetchDefaultsFn)) {}
boost::optional<RWConcernDefault> ReadWriteConcernDefaults::Cache::lookup(OperationContext* opCtx) {
diff --git a/src/mongo/util/invalidating_lru_cache.h b/src/mongo/util/invalidating_lru_cache.h
index 8c80185f3f8..e1dced30e31 100644
--- a/src/mongo/util/invalidating_lru_cache.h
+++ b/src/mongo/util/invalidating_lru_cache.h
@@ -119,9 +119,6 @@ class InvalidatingLRUCache {
using Cache = LRUCache<Key, std::shared_ptr<StoredValue>>;
public:
- using key_type = typename Cache::key_type;
- using mapped_type = typename Cache::mapped_type;
-
/**
* The 'cacheSize' parameter specifies the maximum size of the cache before the least recently
* used entries start getting evicted. It is allowed to be zero, in which case no entries will
diff --git a/src/mongo/util/invalidating_lru_cache_test.cpp b/src/mongo/util/invalidating_lru_cache_test.cpp
index eea916ab4df..939892f0c1e 100644
--- a/src/mongo/util/invalidating_lru_cache_test.cpp
+++ b/src/mongo/util/invalidating_lru_cache_test.cpp
@@ -47,8 +47,6 @@ namespace {
struct TestValue {
TestValue(std::string in_value) : value(std::move(in_value)) {}
TestValue(TestValue&&) = default;
- TestValue(const TestValue&) = delete;
- TestValue& operator=(const TestValue&) = delete;
std::string value;
};
@@ -56,6 +54,12 @@ struct TestValue {
using TestValueCache = InvalidatingLRUCache<int, TestValue>;
using TestValueHandle = TestValueCache::ValueHandle;
+TEST(InvalidatingLRUCacheTest, StandaloneValueHandle) {
+ TestValueHandle standaloneHandle({"Standalone value"});
+ ASSERT(standaloneHandle.isValid());
+ ASSERT_EQ("Standalone value", standaloneHandle->value);
+}
+
TEST(InvalidatingLRUCacheTest, ValueHandleOperators) {
TestValueCache cache(1);
cache.insertOrAssign(100, {"Test value"});
diff --git a/src/mongo/util/net/ssl_manager_openssl.cpp b/src/mongo/util/net/ssl_manager_openssl.cpp
index 3d4274bc6e3..ae9e68a1c23 100644
--- a/src/mongo/util/net/ssl_manager_openssl.cpp
+++ b/src/mongo/util/net/ssl_manager_openssl.cpp
@@ -950,18 +950,17 @@ public:
}
private:
- static boost::optional<OCSPFetchResponse> _lookup(OperationContext* opCtx,
- const OCSPCacheKey& key) {
+ static LookupResult _lookup(OperationContext* opCtx, const OCSPCacheKey& key) {
// If there is a CRL file, we expect the CRL file to cover the certificate status
// information, and therefore we don't need to make a roundtrip.
if (!getSSLGlobalParams().sslCRLFile.empty()) {
- return boost::none;
+ return LookupResult(boost::none);
}
auto swOCSPContext =
extractOcspUris(key.context, key.peerCert.get(), key.intermediateCerts.get());
if (!swOCSPContext.isOK()) {
- return boost::none;
+ return LookupResult(boost::none);
}
auto ocspContext = std::move(swOCSPContext.getValue());
@@ -971,10 +970,10 @@ private:
key.context, key.intermediateCerts, ocspContext, OCSPPurpose::kClientVerify)
.getNoThrow();
if (!swResponse.isOK()) {
- return boost::none;
+ return LookupResult(boost::none);
}
- return std::move(swResponse.getValue());
+ return LookupResult(std::move(swResponse.getValue()));
}
static const ServiceContext::Decoration<boost::optional<OCSPCache>> getOCSPCache;
diff --git a/src/mongo/util/read_through_cache.cpp b/src/mongo/util/read_through_cache.cpp
index 4014647a13b..621c220b451 100644
--- a/src/mongo/util/read_through_cache.cpp
+++ b/src/mongo/util/read_through_cache.cpp
@@ -31,7 +31,6 @@
#include "mongo/util/read_through_cache.h"
-#include "mongo/db/operation_context.h"
#include "mongo/stdx/condition_variable.h"
namespace mongo {
@@ -70,19 +69,13 @@ void ReadThroughCacheBase::CancelToken::tryCancel() {
}
}
-ReadThroughCacheBase::CancelToken ReadThroughCacheBase::_asyncWork(WorkWithOpContext work) {
+ReadThroughCacheBase::CancelToken ReadThroughCacheBase::_asyncWork(
+ WorkWithOpContext work) noexcept {
auto taskInfo = std::make_shared<CancelToken::TaskInfo>(_serviceContext, _cancelTokenMutex);
- // This is workaround for the fact that the ThreadPool can execute inline. This variable is
- // local to the function and will only be accessed if the call to 'schedule' below executes
- // inline, therefore there is no need for synchronisation around it.
- boost::optional<Status> inlineExecutionStatus;
-
- _threadPool.schedule([work = std::move(work),
- taskInfo,
- inlineExecutionStatus = &inlineExecutionStatus](Status status) mutable {
+ _threadPool.schedule([work = std::move(work), taskInfo](Status status) mutable {
if (!status.isOK()) {
- inlineExecutionStatus->emplace(std::move(status));
+ work(nullptr, status);
return;
}
@@ -103,10 +96,11 @@ ReadThroughCacheBase::CancelToken ReadThroughCacheBase::_asyncWork(WorkWithOpCon
work(taskInfo->opCtxToCancel, cancelStatusAtTaskBegin);
});
- if (inlineExecutionStatus)
- uassertStatusOK(*inlineExecutionStatus);
-
return CancelToken(std::move(taskInfo));
}
+Date_t ReadThroughCacheBase::_now() {
+ return _serviceContext->getFastClockSource()->now();
+}
+
} // namespace mongo
diff --git a/src/mongo/util/read_through_cache.h b/src/mongo/util/read_through_cache.h
index a8ab84ae19f..29365a6b72c 100644
--- a/src/mongo/util/read_through_cache.h
+++ b/src/mongo/util/read_through_cache.h
@@ -54,11 +54,10 @@ protected:
virtual ~ReadThroughCacheBase();
/**
- * This method is an extension of ThreadPoolInterface::schedule, which in addition creates a
- * client and an operation context and executes the specified 'work' under that environment. The
- * difference is that instead of passing a status to 'work' in order to indicate an in-line
- * execution, the function will throw without actually calling 'work' (see 'schedule' for more
- * details on in-line execution).
+ * This method is an extension of ThreadPoolInterface::schedule, with the following additions:
+ * - Creates a client and an operation context and executes the specified 'work' under that
+ * environment
+ * - Returns a CancelToken, which can be used to attempt to cancel 'work'
*
* If the task manages to get canceled before it is executed (through a call to tryCancel),
* 'work' will be invoked out-of-line with a non-OK status, set to error code
@@ -77,7 +76,9 @@ protected:
std::shared_ptr<TaskInfo> _info;
};
using WorkWithOpContext = unique_function<void(OperationContext*, const Status&)>;
- CancelToken _asyncWork(WorkWithOpContext work);
+ CancelToken _asyncWork(WorkWithOpContext work) noexcept;
+
+ Date_t _now();
// Service context under which this cache has been instantiated (used for access to service-wide
// functionality, such as client/operation context creation)
@@ -180,7 +181,19 @@ public:
* The implementation must throw a uassertion to indicate an error while looking up the value,
* return boost::none if the key is not found, or return an actual value.
*/
- using LookupFn = unique_function<boost::optional<Value>(OperationContext*, const Key&)>;
+ struct LookupResult {
+ explicit LookupResult(boost::optional<Value>&& v) : v(std::move(v)) {}
+ LookupResult(LookupResult&&) = default;
+ LookupResult& operator=(LookupResult&&) = default;
+
+ // If boost::none, it means the '_lookupFn' did not find the key in the store
+ boost::optional<Value> v;
+ };
+ using LookupFn = unique_function<LookupResult(OperationContext*, const Key&)>;
+
+ // Exposed publicly so it can be unit-tested indepedently of the usages in this class. Must not
+ // be used independently.
+ class InProgressLookup;
/**
* If 'key' is found in the cache, returns a set ValueHandle (its operator bool will be true).
@@ -195,8 +208,8 @@ public:
* meaning that subsequent calls to 'acquireAsync' will kick-off 'lookup' again.
*
* NOTES:
- * The returned value may be invalid by the time the caller gets access to it if invalidate is
- * called for 'key'.
+ * The returned value may be invalid by the time the caller gets to access it, if 'invalidate'
+ * is called for 'key'.
*/
SharedSemiFuture<ValueHandle> acquireAsync(const Key& key) {
// Fast path
@@ -211,54 +224,18 @@ public:
// Join an in-progress lookup if one has already been scheduled
if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end())
- return it->second->sharedPromise.getFuture();
+ return it->second->addWaiter(ul);
- // Schedule an asynchronous lookup for the key and then loop around and wait for it to
- // complete
+ // Schedule an asynchronous lookup for the key
+ auto [it, emplaced] =
+ _inProgressLookups.emplace(key, std::make_unique<InProgressLookup>(*this, key));
+ invariant(emplaced);
+ auto& inProgressLookup = *it->second;
+ auto sharedFutureToReturn = inProgressLookup.addWaiter(ul);
- auto [kickOffAsyncLookupPromise, f] = makePromiseFuture<void>();
-
- auto emplaceResult =
- _inProgressLookups.emplace(key, std::make_unique<InProgressLookup>(key));
- invariant(emplaceResult.second /* emplaced */);
- auto& inProgressLookup = *emplaceResult.first->second;
- auto sharedFutureToReturn = inProgressLookup.sharedPromise.getFuture();
ul.unlock();
- // Construct the future chain before scheduling the async work so it doesn't execute inline
- // if it so happens that the async work completes by the time the future is constructed, or
- // if it executes inline due to the task executor being shut down.
- std::move(f)
- .then([this, &inProgressLookup] {
- stdx::unique_lock ul(_mutex);
- return _asyncLookupWhileInvalidated(std::move(ul), inProgressLookup);
- })
- .getAsync([this, key](StatusWith<Value> swValue) {
- stdx::unique_lock ul(_mutex);
- auto it = _inProgressLookups.find(key);
- invariant(it != _inProgressLookups.end());
- auto inProgressLookup = std::move(it->second);
- _inProgressLookups.erase(it);
-
- StatusWith<ValueHandle> swValueHandle(ErrorCodes::InternalError,
- "ReadThroughCache");
- if (swValue.isOK()) {
- swValueHandle = ValueHandle(_cache.insertOrAssignAndGet(
- key,
- {std::move(swValue.getValue()),
- _serviceContext->getFastClockSource()->now()}));
- } else if (swValue == ErrorCodes::ReadThroughCacheKeyNotFound) {
- swValueHandle = ValueHandle();
- } else {
- swValueHandle = swValue.getStatus();
- }
-
- ul.unlock();
-
- inProgressLookup->sharedPromise.setFromStatusWith(std::move(swValueHandle));
- });
-
- kickOffAsyncLookupPromise.emplaceValue();
+ _doLookupWhileNotValid(key, Status(ErrorCodes::Error(461540), "")).getAsync([](auto) {});
return sharedFutureToReturn;
}
@@ -279,7 +256,7 @@ public:
ValueHandle insertOrAssignAndGet(const Key& key, Value&& newValue, Date_t updateWallClockTime) {
stdx::lock_guard lg(_mutex);
if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end())
- it->second->invalidate(lg);
+ it->second->invalidateAndCancelCurrentLookupRound(lg);
return _cache.insertOrAssignAndGet(key, {std::move(newValue), updateWallClockTime});
}
@@ -296,7 +273,7 @@ public:
void invalidate(const Key& key) {
stdx::lock_guard lg(_mutex);
if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end())
- it->second->invalidate(lg);
+ it->second->invalidateAndCancelCurrentLookupRound(lg);
_cache.invalidate(key);
}
@@ -305,7 +282,7 @@ public:
stdx::lock_guard lg(_mutex);
for (auto& entry : _inProgressLookups) {
if (predicate(entry.first))
- entry.second->invalidate(lg);
+ entry.second->invalidateAndCancelCurrentLookupRound(lg);
}
_cache.invalidateIf([&](const Key& key, const StoredValue*) { return predicate(key); });
}
@@ -354,86 +331,42 @@ protected:
}
private:
- // Refer to the comments on '_asyncLookupWhileInvalidated' for more detail on how this structure
- // is used.
- struct InProgressLookup {
- InProgressLookup(Key key) : key(std::move(key)) {}
-
- void invalidate(WithLock) {
- invalidated = true;
- if (cancelToken)
- cancelToken->tryCancel();
- }
-
- Key key;
- SharedPromise<ValueHandle> sharedPromise;
-
- bool invalidated;
- boost::optional<CancelToken> cancelToken;
- };
using InProgressLookupsMap = stdx::unordered_map<Key, std::unique_ptr<InProgressLookup>>;
/**
- * This method is expected to be called with a constructed InProgressLookup object, emplaced on
- * '_inProgressLookups' (represented by the 'inProgressLookup' argument). It implements an
- * asynchronous "while (invalidated)" loop over the in-progress key referenced by
- * 'inProgressLookup', which *must* be kept valid by the caller until the returned Future
- * completes.
- *
- * The returned Future will be complete when that loop exists and will contain the latest value
- * (or error) returned by 'lookup'.
- *
- * If thought of sequentially, the loop looks like this:
- *
- * while (true) {
- * inProgressLookup.invalidated = false;
- * inProgressLookup.cancelToken.reset();
- * valueOrError = lookup(key);
- * if (!inProgressLookup.invalidated)
- * return valueOrError; // signals the future
- * }
+ * This method implements an asynchronous "while (!valid)" loop over 'key', which must be on the
+ * in-progress map.
*/
- Future<Value> _asyncLookupWhileInvalidated(stdx::unique_lock<Mutex> ul,
- InProgressLookup& inProgressLookup) noexcept {
- auto [promise, f] = makePromiseFuture<Value>();
- auto p = std::make_shared<Promise<Value>>(std::move(promise));
-
- // Construct the future chain before scheduling the async work so it doesn't execute inline
- auto future =
- std::move(f).onCompletion([this, &inProgressLookup](StatusWith<Value> swValue) {
- stdx::unique_lock ul(_mutex);
- if (!inProgressLookup.invalidated)
- return Future<Value>::makeReady(uassertStatusOK(std::move(swValue)));
-
- inProgressLookup.cancelToken.reset();
- return _asyncLookupWhileInvalidated(std::move(ul), inProgressLookup);
- });
+ Future<LookupResult> _doLookupWhileNotValid(Key key, StatusWith<LookupResult> sw) {
+ stdx::unique_lock ul(_mutex);
+ auto it = _inProgressLookups.find(key);
+ invariant(it != _inProgressLookups.end());
+ if (!ErrorCodes::isCancelationError(sw.getStatus()) && !it->second->valid(ul)) {
+ ul.unlock(); // asyncLookupRound also acquires the mutex
+ return it->second->asyncLookupRound().onCompletion(
+ [this, key](auto sw) { return _doLookupWhileNotValid(key, std::move(sw)); });
+ }
- invariant(!inProgressLookup.cancelToken);
- inProgressLookup.invalidated = false;
- try {
- inProgressLookup.cancelToken.emplace(_asyncWork([ this, p, &inProgressLookup ](
- OperationContext * opCtx, const Status& status) mutable noexcept {
- p->setWith([&]() mutable {
- uassertStatusOK(status);
- auto value = _lookupFn(opCtx, inProgressLookup.key);
- uassert(ErrorCodes::ReadThroughCacheKeyNotFound,
- "Internal only: key not found",
- value);
- return std::move(*value);
- });
- }));
- } catch (const ExceptionForCat<ErrorCategory::CancelationError>& ex) {
- // The thread pool is being shut down, so this is an inline execution
- invariant(!inProgressLookup.invalidated);
- invariant(!inProgressLookup.cancelToken);
-
- ul.unlock();
- p->setError(ex.toStatus());
+ // The detachment of the currently active lookup and the placement of the result on the
+ // '_cache' has to be atomic with respect to a concurrent call to 'invalidate'
+ auto inProgressLookup(std::move(it->second));
+ _inProgressLookups.erase(it);
+
+ StatusWith<ValueHandle> swValueHandle(ErrorCodes::Error(461541), "");
+ if (sw.isOK()) {
+ auto result = std::move(sw.getValue());
+ swValueHandle = result.v
+ ? ValueHandle(_cache.insertOrAssignAndGet(key, {std::move(*result.v), _now()}))
+ : ValueHandle();
+ } else {
+ swValueHandle = sw.getStatus();
}
+ ul.unlock();
- return std::move(future);
- };
+ inProgressLookup->signalWaiters(std::move(swValueHandle));
+
+ return Future<LookupResult>::makeReady(Status(ErrorCodes::Error(461542), ""));
+ }
// Blocking function which will be invoked to retrieve entries from the backing store
const LookupFn _lookupFn;
@@ -454,4 +387,81 @@ private:
InProgressLookupsMap _inProgressLookups;
};
+/**
+ * This class represents an in-progress lookup for a specific key and implements the guarantees of
+ * the invalidation logic as described in the comments of 'ReadThroughCache::invalidate'.
+ *
+ * It is intended to be used in conjunction with the 'ReadThroughCache', which operates on it under
+ * its '_mutex' and ensures there is always at most a single active instance at a time active for
+ * each 'key'.
+ *
+ * The methods of this class are not thread-safe, unless indicated in the comments.
+ *
+ * Its lifecycle is intended to be like this:
+ *
+ * inProgressLookups.emplace(inProgress);
+ * while (true) {
+ * result = inProgress.asyncLookupRound();
+ * if (!inProgress.valid()) {
+ * continue;
+ * }
+ *
+ * inProgressLookups.remove(inProgress)
+ * cachedValues.insert(result);
+ * inProgress.signalWaiters(result);
+ * }
+ */
+template <typename Key, typename Value>
+class ReadThroughCache<Key, Value>::InProgressLookup {
+public:
+ InProgressLookup(ReadThroughCache& cache, Key key) : _cache(cache), _key(std::move(key)) {}
+
+ Future<LookupResult> asyncLookupRound() {
+ auto [promise, future] = makePromiseFuture<LookupResult>();
+
+ stdx::lock_guard lg(_cache._mutex);
+ _valid = true;
+ _cancelToken.emplace(_cache._asyncWork([ this, promise = std::move(promise) ](
+ OperationContext * opCtx, const Status& status) mutable noexcept {
+ promise.setWith([&] {
+ uassertStatusOK(status);
+ return _cache._lookupFn(opCtx, _key);
+ });
+ }));
+
+ return std::move(future);
+ }
+
+ SharedSemiFuture<ValueHandle> addWaiter(WithLock) {
+ return _sharedPromise.getFuture();
+ }
+
+ bool valid(WithLock) const {
+ return _valid;
+ }
+
+ void invalidateAndCancelCurrentLookupRound(WithLock) {
+ _valid = false;
+ if (_cancelToken)
+ _cancelToken->tryCancel();
+ }
+
+ void signalWaiters(StatusWith<ValueHandle> swValueHandle) {
+ invariant(_valid);
+ _sharedPromise.setFromStatusWith(std::move(swValueHandle));
+ }
+
+private:
+ // The owning cache, from which mutex, lookupFn, async task scheduling, etc. will be used. It is
+ // the responsibility of the owning cache to join all outstanding lookups at destruction time.
+ ReadThroughCache& _cache;
+
+ const Key _key;
+
+ bool _valid{false};
+ boost::optional<CancelToken> _cancelToken;
+
+ SharedPromise<ValueHandle> _sharedPromise;
+};
+
} // namespace mongo
diff --git a/src/mongo/util/read_through_cache_test.cpp b/src/mongo/util/read_through_cache_test.cpp
index 169606cfbdb..d196290bef8 100644
--- a/src/mongo/util/read_through_cache_test.cpp
+++ b/src/mongo/util/read_through_cache_test.cpp
@@ -41,10 +41,14 @@
namespace mongo {
namespace {
+using unittest::assertGet;
+
+// The structure for testing is intentionally made movable, but non-copyable
struct CachedValue {
CachedValue(int counter) : counter(counter) {}
CachedValue(CachedValue&&) = default;
CachedValue& operator=(CachedValue&&) = default;
+
int counter;
};
@@ -86,14 +90,19 @@ protected:
OperationContext* const _opCtx{_opCtxHolder.get()};
};
+TEST(ReadThroughCacheTest, StandaloneValueHandle) {
+ Cache::ValueHandle standaloneHandle(CachedValue(100));
+ ASSERT(standaloneHandle.isValid());
+ ASSERT_EQ(100, standaloneHandle->counter);
+}
+
TEST_F(ReadThroughCacheTest, FetchInvalidateAndRefetch) {
int countLookups = 0;
CacheWithThreadPool cache(
getServiceContext(), 1, [&](OperationContext*, const std::string& key) {
ASSERT_EQ("TestKey", key);
countLookups++;
-
- return CachedValue{100 * countLookups};
+ return Cache::LookupResult(CachedValue(100 * countLookups));
});
for (int i = 1; i <= 3; i++) {
@@ -109,14 +118,24 @@ TEST_F(ReadThroughCacheTest, FetchInvalidateAndRefetch) {
}
}
+TEST_F(ReadThroughCacheTest, FailedLookup) {
+ CacheWithThreadPool cache(
+ getServiceContext(),
+ 1,
+ [&](OperationContext*, const std::string& key) -> Cache::LookupResult {
+ uasserted(ErrorCodes::InternalError, "Test error");
+ });
+
+ ASSERT_THROWS_CODE(cache.acquire(_opCtx, "TestKey"), DBException, ErrorCodes::InternalError);
+}
+
TEST_F(ReadThroughCacheTest, CacheSizeZero) {
int countLookups = 0;
CacheWithThreadPool cache(
getServiceContext(), 0, [&](OperationContext*, const std::string& key) {
ASSERT_EQ("TestKey", key);
countLookups++;
-
- return CachedValue{100 * countLookups};
+ return Cache::LookupResult(CachedValue(100 * countLookups));
});
for (int i = 1; i <= 3; i++) {
@@ -133,8 +152,7 @@ TEST_F(ReadThroughCacheTest, InvalidateCacheSizeZeroReissuesLookup) {
getServiceContext(), 0, [&](OperationContext*, const std::string& key) {
ASSERT_EQ("TestKey", key);
countLookups++;
-
- return CachedValue{1000 * countLookups};
+ return Cache::LookupResult(CachedValue(1000 * countLookups));
});
auto value = cache.acquire(_opCtx, "TestKey");
@@ -158,7 +176,7 @@ TEST_F(ReadThroughCacheTest, KeyDoesNotExist) {
CacheWithThreadPool cache(
getServiceContext(), 1, [&](OperationContext*, const std::string& key) {
ASSERT_EQ("TestKey", key);
- return boost::none;
+ return Cache::LookupResult(boost::none);
});
ASSERT(!cache.acquire(_opCtx, "TestKey"));
@@ -167,12 +185,58 @@ TEST_F(ReadThroughCacheTest, KeyDoesNotExist) {
/**
* Fixture for tests, which need to control the creation/destruction of their operation contexts.
*/
-class ReadThroughCacheTestAsync : public unittest::Test,
+class ReadThroughCacheAsyncTest : public unittest::Test,
public ScopedGlobalServiceContextForTest {};
using Barrier = unittest::Barrier;
-TEST_F(ReadThroughCacheTestAsync, AcquireObservesOperationContextDeadline) {
+TEST_F(ReadThroughCacheAsyncTest, SuccessfulInProgressLookup) {
+ ThreadPool threadPool{ThreadPool::Options()};
+ threadPool.startup();
+
+ Cache cache(getServiceContext(), threadPool, 1, [&](OperationContext*, const std::string& key) {
+ return Cache::LookupResult(CachedValue(500));
+ });
+
+ Cache::InProgressLookup inProgress(cache, "TestKey");
+ auto future = inProgress.addWaiter(WithLock::withoutLock());
+ ASSERT(!future.isReady());
+
+ auto optVal = inProgress.asyncLookupRound().get();
+ ASSERT(inProgress.valid(WithLock::withoutLock()));
+ ASSERT(optVal.v);
+ ASSERT_EQ(500, optVal.v->counter);
+ inProgress.signalWaiters(Cache::ValueHandle(std::move(*optVal.v)));
+
+ ASSERT(future.isReady());
+ ASSERT_EQ(500, future.get()->counter);
+}
+
+TEST_F(ReadThroughCacheAsyncTest, FailedInProgressLookup) {
+ ThreadPool threadPool{ThreadPool::Options()};
+ threadPool.startup();
+
+ Cache cache(getServiceContext(),
+ threadPool,
+ 1,
+ [&](OperationContext*, const std::string& key) -> Cache::LookupResult {
+ uasserted(ErrorCodes::InternalError, "Test error");
+ });
+
+ Cache::InProgressLookup inProgress(cache, "TestKey");
+ auto future = inProgress.addWaiter(WithLock::withoutLock());
+ ASSERT(!future.isReady());
+
+ auto asyncLookupResult = inProgress.asyncLookupRound().getNoThrow();
+ ASSERT_THROWS_CODE(inProgress.asyncLookupRound().get(), DBException, ErrorCodes::InternalError);
+ ASSERT(inProgress.valid(WithLock::withoutLock()));
+ inProgress.signalWaiters(asyncLookupResult.getStatus());
+
+ ASSERT(future.isReady());
+ ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::InternalError);
+}
+
+TEST_F(ReadThroughCacheAsyncTest, AcquireObservesOperationContextDeadline) {
ThreadPool threadPool{ThreadPool::Options()};
threadPool.startup();
@@ -181,7 +245,7 @@ TEST_F(ReadThroughCacheTestAsync, AcquireObservesOperationContextDeadline) {
Cache cache(getServiceContext(), threadPool, 1, [&](OperationContext*, const std::string& key) {
lookupStartedBarrier.countDownAndWait();
completeLookupBarrier.countDownAndWait();
- return CachedValue(5);
+ return Cache::LookupResult(CachedValue(5));
});
{
@@ -220,7 +284,7 @@ TEST_F(ReadThroughCacheTestAsync, AcquireObservesOperationContextDeadline) {
}
}
-TEST_F(ReadThroughCacheTestAsync, InvalidateReissuesLookup) {
+TEST_F(ReadThroughCacheAsyncTest, InvalidateReissuesLookup) {
ThreadPool threadPool{ThreadPool::Options()};
threadPool.startup();
@@ -232,41 +296,40 @@ TEST_F(ReadThroughCacheTestAsync, InvalidateReissuesLookup) {
int idx = countLookups.fetchAndAdd(1);
lookupStartedBarriers[idx].countDownAndWait();
completeLookupBarriers[idx].countDownAndWait();
- return CachedValue(idx);
+ return Cache::LookupResult(CachedValue(idx));
});
// Kick off the first lookup, which will block
auto future = cache.acquireAsync("TestKey");
ASSERT(!future.isReady());
- // Invalidate the first lookup attempt while it is still blocked
+ // Wait for the first lookup attempt to start and invalidate it before letting it proceed
lookupStartedBarriers[0].countDownAndWait();
ASSERT_EQ(1, countLookups.load());
cache.invalidate("TestKey");
ASSERT(!future.isReady());
-
- completeLookupBarriers[0].countDownAndWait();
+ completeLookupBarriers[0].countDownAndWait(); // Lets lookup attempt 1 proceed
ASSERT(!future.isReady());
- // Invalidate the second lookup attempt while it is still blocked
+ // Wait for the second lookup attempt to start and invalidate it before letting it proceed
lookupStartedBarriers[1].countDownAndWait();
ASSERT_EQ(2, countLookups.load());
cache.invalidate("TestKey");
ASSERT(!future.isReady());
-
- completeLookupBarriers[1].countDownAndWait();
+ completeLookupBarriers[1].countDownAndWait(); // Lets lookup attempt 2 proceed
ASSERT(!future.isReady());
- // Do not invalidate the third lookup and make sure it returns the correct value
+ // Wait for the third lookup attempt to start, but not do not invalidate it before letting it
+ // proceed
lookupStartedBarriers[2].countDownAndWait();
ASSERT_EQ(3, countLookups.load());
ASSERT(!future.isReady());
+ completeLookupBarriers[2].countDownAndWait(); // Lets lookup attempt 3 proceed
- completeLookupBarriers[2].countDownAndWait();
ASSERT_EQ(2, future.get()->counter);
}
-TEST_F(ReadThroughCacheTestAsync, AcquireWithAShutdownThreadPool) {
+TEST_F(ReadThroughCacheAsyncTest, AcquireWithAShutdownThreadPool) {
ThreadPool threadPool{ThreadPool::Options()};
threadPool.startup();
threadPool.shutdown();
@@ -274,14 +337,14 @@ TEST_F(ReadThroughCacheTestAsync, AcquireWithAShutdownThreadPool) {
Cache cache(getServiceContext(), threadPool, 1, [&](OperationContext*, const std::string&) {
FAIL("Should not be called");
- return CachedValue(0); // Will never be reached
+ return Cache::LookupResult(CachedValue(0)); // Will never be reached
});
auto future = cache.acquireAsync("TestKey");
ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::ShutdownInProgress);
}
-TEST_F(ReadThroughCacheTestAsync, InvalidateCalledBeforeLookupTaskExecutes) {
+TEST_F(ReadThroughCacheAsyncTest, InvalidateCalledBeforeLookupTaskExecutes) {
struct MockThreadPool : public ThreadPoolInterface {
void startup() override {}
void shutdown() override {}
@@ -300,7 +363,7 @@ TEST_F(ReadThroughCacheTestAsync, InvalidateCalledBeforeLookupTaskExecutes) {
} threadPool;
Cache cache(getServiceContext(), threadPool, 1, [&](OperationContext*, const std::string&) {
- return CachedValue(123);
+ return Cache::LookupResult(CachedValue(123));
});
auto future = cache.acquireAsync("TestKey");