diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2020-05-10 06:48:54 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-05-17 15:16:51 +0000 |
commit | 7e2111ef33fc40959a254bd3109466176ae60718 (patch) | |
tree | 2a31ac8ddccccb24784b161839fa1ca92aeb10bf | |
parent | a7f769dd597e33e988832c43c99912c1d3139c9b (diff) | |
download | mongo-7e2111ef33fc40959a254bd3109466176ae60718.tar.gz |
SERVER-46154 Pull the InProgressLookup outside of ReadThroughCache
The InProgressLookup tracking already has quite complicated logic, so it
seems prudent to pull it into a separate class, outside of the
ReadThroughCache so it can be tested independently.
-rw-r--r-- | src/mongo/base/error_codes.yml | 1 | ||||
-rw-r--r-- | src/mongo/db/auth/authorization_manager_impl.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/auth/authorization_manager_impl.h | 4 | ||||
-rw-r--r-- | src/mongo/db/read_write_concern_defaults.cpp | 11 | ||||
-rw-r--r-- | src/mongo/util/invalidating_lru_cache.h | 3 | ||||
-rw-r--r-- | src/mongo/util/invalidating_lru_cache_test.cpp | 8 | ||||
-rw-r--r-- | src/mongo/util/net/ssl_manager_openssl.cpp | 11 | ||||
-rw-r--r-- | src/mongo/util/read_through_cache.cpp | 22 | ||||
-rw-r--r-- | src/mongo/util/read_through_cache.h | 268 | ||||
-rw-r--r-- | src/mongo/util/read_through_cache_test.cpp | 111 |
10 files changed, 260 insertions, 192 deletions
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index 3df82a37c51..29de011915d 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -352,7 +352,6 @@ error_codes: # Internal only codes used by the ReadThroughCache and must never be returned in a network # response - - {code: 305,name: ReadThroughCacheKeyNotFound} - {code: 306,name: ReadThroughCacheLookupCanceled} - {code: 307,name: RangeDeletionAbandonedBecauseTaskDocumentDoesNotExist} diff --git a/src/mongo/db/auth/authorization_manager_impl.cpp b/src/mongo/db/auth/authorization_manager_impl.cpp index e3552ae7332..5f6e9ab1380 100644 --- a/src/mongo/db/auth/authorization_manager_impl.cpp +++ b/src/mongo/db/auth/authorization_manager_impl.cpp @@ -663,14 +663,14 @@ AuthorizationManagerImpl::AuthSchemaVersionCache::AuthSchemaVersionCache( 1 /* cacheSize */), _externalState(externalState) {} -boost::optional<int> AuthorizationManagerImpl::AuthSchemaVersionCache::_lookup( - OperationContext* opCtx, int unusedKey) { +AuthorizationManagerImpl::AuthSchemaVersionCache::LookupResult +AuthorizationManagerImpl::AuthSchemaVersionCache::_lookup(OperationContext* opCtx, int unusedKey) { invariant(unusedKey == 0); int authzVersion; uassertStatusOK(_externalState->getStoredAuthorizationVersion(opCtx, &authzVersion)); - return authzVersion; + return LookupResult(authzVersion); } AuthorizationManagerImpl::UserCacheImpl::UserCacheImpl( @@ -689,8 +689,9 @@ AuthorizationManagerImpl::UserCacheImpl::UserCacheImpl( _authSchemaVersionCache(authSchemaVersionCache), _externalState(externalState) {} -boost::optional<User> AuthorizationManagerImpl::UserCacheImpl::_lookup(OperationContext* opCtx, - const UserRequest& userReq) { +AuthorizationManagerImpl::UserCacheImpl::LookupResult +AuthorizationManagerImpl::UserCacheImpl::_lookup(OperationContext* opCtx, + const UserRequest& userReq) { LOGV2_DEBUG(20238, 1, "Getting user record", "user"_attr = userReq.name); // Number of times to retry a user document that fetches due to transient AuthSchemaIncompatible @@ -713,7 +714,7 @@ boost::optional<User> AuthorizationManagerImpl::UserCacheImpl::_lookup(Operation User user(userReq.name); uassertStatusOK(initializeUserFromPrivilegeDocument(&user, userObj)); - return user; + return LookupResult(std::move(user)); } case schemaVersion24: _authSchemaVersionCache->invalidateAll(); diff --git a/src/mongo/db/auth/authorization_manager_impl.h b/src/mongo/db/auth/authorization_manager_impl.h index 17a3d731f1f..611aa427662 100644 --- a/src/mongo/db/auth/authorization_manager_impl.h +++ b/src/mongo/db/auth/authorization_manager_impl.h @@ -158,7 +158,7 @@ private: // Even though the dist cache permits for lookup to return boost::none for non-existent // values, the contract of the authorization manager is that it should throw an exception if // the value can not be loaded, so if it returns, the value will always be set. - boost::optional<int> _lookup(OperationContext* opCtx, int unusedKey); + LookupResult _lookup(OperationContext* opCtx, int unusedKey); Mutex _mutex = MONGO_MAKE_LATCH("AuthorizationManagerImpl::AuthSchemaVersionDistCache::_mutex"); @@ -181,7 +181,7 @@ private: // Even though the dist cache permits for lookup to return boost::none for non-existent // values, the contract of the authorization manager is that it should throw an exception if // the value can not be loaded, so if it returns, the value will always be set. - boost::optional<User> _lookup(OperationContext* opCtx, const UserRequest& user); + LookupResult _lookup(OperationContext* opCtx, const UserRequest& user); Mutex _mutex = MONGO_MAKE_LATCH("AuthorizationManagerImpl::UserDistCacheImpl::_mutex"); diff --git a/src/mongo/db/read_write_concern_defaults.cpp b/src/mongo/db/read_write_concern_defaults.cpp index f33c78215a3..3c69751348a 100644 --- a/src/mongo/db/read_write_concern_defaults.cpp +++ b/src/mongo/db/read_write_concern_defaults.cpp @@ -237,11 +237,12 @@ ReadWriteConcernDefaults::~ReadWriteConcernDefaults() = default; ReadWriteConcernDefaults::Cache::Cache(ServiceContext* service, ThreadPoolInterface& threadPool, FetchDefaultsFn fetchDefaultsFn) - : ReadThroughCache(_mutex, - service, - threadPool, - [this](OperationContext* opCtx, Type) { return lookup(opCtx); }, - 1 /* cacheSize */), + : ReadThroughCache( + _mutex, + service, + threadPool, + [this](OperationContext* opCtx, Type) { return LookupResult(lookup(opCtx)); }, + 1 /* cacheSize */), _fetchDefaultsFn(std::move(fetchDefaultsFn)) {} boost::optional<RWConcernDefault> ReadWriteConcernDefaults::Cache::lookup(OperationContext* opCtx) { diff --git a/src/mongo/util/invalidating_lru_cache.h b/src/mongo/util/invalidating_lru_cache.h index 8c80185f3f8..e1dced30e31 100644 --- a/src/mongo/util/invalidating_lru_cache.h +++ b/src/mongo/util/invalidating_lru_cache.h @@ -119,9 +119,6 @@ class InvalidatingLRUCache { using Cache = LRUCache<Key, std::shared_ptr<StoredValue>>; public: - using key_type = typename Cache::key_type; - using mapped_type = typename Cache::mapped_type; - /** * The 'cacheSize' parameter specifies the maximum size of the cache before the least recently * used entries start getting evicted. It is allowed to be zero, in which case no entries will diff --git a/src/mongo/util/invalidating_lru_cache_test.cpp b/src/mongo/util/invalidating_lru_cache_test.cpp index eea916ab4df..939892f0c1e 100644 --- a/src/mongo/util/invalidating_lru_cache_test.cpp +++ b/src/mongo/util/invalidating_lru_cache_test.cpp @@ -47,8 +47,6 @@ namespace { struct TestValue { TestValue(std::string in_value) : value(std::move(in_value)) {} TestValue(TestValue&&) = default; - TestValue(const TestValue&) = delete; - TestValue& operator=(const TestValue&) = delete; std::string value; }; @@ -56,6 +54,12 @@ struct TestValue { using TestValueCache = InvalidatingLRUCache<int, TestValue>; using TestValueHandle = TestValueCache::ValueHandle; +TEST(InvalidatingLRUCacheTest, StandaloneValueHandle) { + TestValueHandle standaloneHandle({"Standalone value"}); + ASSERT(standaloneHandle.isValid()); + ASSERT_EQ("Standalone value", standaloneHandle->value); +} + TEST(InvalidatingLRUCacheTest, ValueHandleOperators) { TestValueCache cache(1); cache.insertOrAssign(100, {"Test value"}); diff --git a/src/mongo/util/net/ssl_manager_openssl.cpp b/src/mongo/util/net/ssl_manager_openssl.cpp index 3d4274bc6e3..ae9e68a1c23 100644 --- a/src/mongo/util/net/ssl_manager_openssl.cpp +++ b/src/mongo/util/net/ssl_manager_openssl.cpp @@ -950,18 +950,17 @@ public: } private: - static boost::optional<OCSPFetchResponse> _lookup(OperationContext* opCtx, - const OCSPCacheKey& key) { + static LookupResult _lookup(OperationContext* opCtx, const OCSPCacheKey& key) { // If there is a CRL file, we expect the CRL file to cover the certificate status // information, and therefore we don't need to make a roundtrip. if (!getSSLGlobalParams().sslCRLFile.empty()) { - return boost::none; + return LookupResult(boost::none); } auto swOCSPContext = extractOcspUris(key.context, key.peerCert.get(), key.intermediateCerts.get()); if (!swOCSPContext.isOK()) { - return boost::none; + return LookupResult(boost::none); } auto ocspContext = std::move(swOCSPContext.getValue()); @@ -971,10 +970,10 @@ private: key.context, key.intermediateCerts, ocspContext, OCSPPurpose::kClientVerify) .getNoThrow(); if (!swResponse.isOK()) { - return boost::none; + return LookupResult(boost::none); } - return std::move(swResponse.getValue()); + return LookupResult(std::move(swResponse.getValue())); } static const ServiceContext::Decoration<boost::optional<OCSPCache>> getOCSPCache; diff --git a/src/mongo/util/read_through_cache.cpp b/src/mongo/util/read_through_cache.cpp index 4014647a13b..621c220b451 100644 --- a/src/mongo/util/read_through_cache.cpp +++ b/src/mongo/util/read_through_cache.cpp @@ -31,7 +31,6 @@ #include "mongo/util/read_through_cache.h" -#include "mongo/db/operation_context.h" #include "mongo/stdx/condition_variable.h" namespace mongo { @@ -70,19 +69,13 @@ void ReadThroughCacheBase::CancelToken::tryCancel() { } } -ReadThroughCacheBase::CancelToken ReadThroughCacheBase::_asyncWork(WorkWithOpContext work) { +ReadThroughCacheBase::CancelToken ReadThroughCacheBase::_asyncWork( + WorkWithOpContext work) noexcept { auto taskInfo = std::make_shared<CancelToken::TaskInfo>(_serviceContext, _cancelTokenMutex); - // This is workaround for the fact that the ThreadPool can execute inline. This variable is - // local to the function and will only be accessed if the call to 'schedule' below executes - // inline, therefore there is no need for synchronisation around it. - boost::optional<Status> inlineExecutionStatus; - - _threadPool.schedule([work = std::move(work), - taskInfo, - inlineExecutionStatus = &inlineExecutionStatus](Status status) mutable { + _threadPool.schedule([work = std::move(work), taskInfo](Status status) mutable { if (!status.isOK()) { - inlineExecutionStatus->emplace(std::move(status)); + work(nullptr, status); return; } @@ -103,10 +96,11 @@ ReadThroughCacheBase::CancelToken ReadThroughCacheBase::_asyncWork(WorkWithOpCon work(taskInfo->opCtxToCancel, cancelStatusAtTaskBegin); }); - if (inlineExecutionStatus) - uassertStatusOK(*inlineExecutionStatus); - return CancelToken(std::move(taskInfo)); } +Date_t ReadThroughCacheBase::_now() { + return _serviceContext->getFastClockSource()->now(); +} + } // namespace mongo diff --git a/src/mongo/util/read_through_cache.h b/src/mongo/util/read_through_cache.h index a8ab84ae19f..29365a6b72c 100644 --- a/src/mongo/util/read_through_cache.h +++ b/src/mongo/util/read_through_cache.h @@ -54,11 +54,10 @@ protected: virtual ~ReadThroughCacheBase(); /** - * This method is an extension of ThreadPoolInterface::schedule, which in addition creates a - * client and an operation context and executes the specified 'work' under that environment. The - * difference is that instead of passing a status to 'work' in order to indicate an in-line - * execution, the function will throw without actually calling 'work' (see 'schedule' for more - * details on in-line execution). + * This method is an extension of ThreadPoolInterface::schedule, with the following additions: + * - Creates a client and an operation context and executes the specified 'work' under that + * environment + * - Returns a CancelToken, which can be used to attempt to cancel 'work' * * If the task manages to get canceled before it is executed (through a call to tryCancel), * 'work' will be invoked out-of-line with a non-OK status, set to error code @@ -77,7 +76,9 @@ protected: std::shared_ptr<TaskInfo> _info; }; using WorkWithOpContext = unique_function<void(OperationContext*, const Status&)>; - CancelToken _asyncWork(WorkWithOpContext work); + CancelToken _asyncWork(WorkWithOpContext work) noexcept; + + Date_t _now(); // Service context under which this cache has been instantiated (used for access to service-wide // functionality, such as client/operation context creation) @@ -180,7 +181,19 @@ public: * The implementation must throw a uassertion to indicate an error while looking up the value, * return boost::none if the key is not found, or return an actual value. */ - using LookupFn = unique_function<boost::optional<Value>(OperationContext*, const Key&)>; + struct LookupResult { + explicit LookupResult(boost::optional<Value>&& v) : v(std::move(v)) {} + LookupResult(LookupResult&&) = default; + LookupResult& operator=(LookupResult&&) = default; + + // If boost::none, it means the '_lookupFn' did not find the key in the store + boost::optional<Value> v; + }; + using LookupFn = unique_function<LookupResult(OperationContext*, const Key&)>; + + // Exposed publicly so it can be unit-tested indepedently of the usages in this class. Must not + // be used independently. + class InProgressLookup; /** * If 'key' is found in the cache, returns a set ValueHandle (its operator bool will be true). @@ -195,8 +208,8 @@ public: * meaning that subsequent calls to 'acquireAsync' will kick-off 'lookup' again. * * NOTES: - * The returned value may be invalid by the time the caller gets access to it if invalidate is - * called for 'key'. + * The returned value may be invalid by the time the caller gets to access it, if 'invalidate' + * is called for 'key'. */ SharedSemiFuture<ValueHandle> acquireAsync(const Key& key) { // Fast path @@ -211,54 +224,18 @@ public: // Join an in-progress lookup if one has already been scheduled if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end()) - return it->second->sharedPromise.getFuture(); + return it->second->addWaiter(ul); - // Schedule an asynchronous lookup for the key and then loop around and wait for it to - // complete + // Schedule an asynchronous lookup for the key + auto [it, emplaced] = + _inProgressLookups.emplace(key, std::make_unique<InProgressLookup>(*this, key)); + invariant(emplaced); + auto& inProgressLookup = *it->second; + auto sharedFutureToReturn = inProgressLookup.addWaiter(ul); - auto [kickOffAsyncLookupPromise, f] = makePromiseFuture<void>(); - - auto emplaceResult = - _inProgressLookups.emplace(key, std::make_unique<InProgressLookup>(key)); - invariant(emplaceResult.second /* emplaced */); - auto& inProgressLookup = *emplaceResult.first->second; - auto sharedFutureToReturn = inProgressLookup.sharedPromise.getFuture(); ul.unlock(); - // Construct the future chain before scheduling the async work so it doesn't execute inline - // if it so happens that the async work completes by the time the future is constructed, or - // if it executes inline due to the task executor being shut down. - std::move(f) - .then([this, &inProgressLookup] { - stdx::unique_lock ul(_mutex); - return _asyncLookupWhileInvalidated(std::move(ul), inProgressLookup); - }) - .getAsync([this, key](StatusWith<Value> swValue) { - stdx::unique_lock ul(_mutex); - auto it = _inProgressLookups.find(key); - invariant(it != _inProgressLookups.end()); - auto inProgressLookup = std::move(it->second); - _inProgressLookups.erase(it); - - StatusWith<ValueHandle> swValueHandle(ErrorCodes::InternalError, - "ReadThroughCache"); - if (swValue.isOK()) { - swValueHandle = ValueHandle(_cache.insertOrAssignAndGet( - key, - {std::move(swValue.getValue()), - _serviceContext->getFastClockSource()->now()})); - } else if (swValue == ErrorCodes::ReadThroughCacheKeyNotFound) { - swValueHandle = ValueHandle(); - } else { - swValueHandle = swValue.getStatus(); - } - - ul.unlock(); - - inProgressLookup->sharedPromise.setFromStatusWith(std::move(swValueHandle)); - }); - - kickOffAsyncLookupPromise.emplaceValue(); + _doLookupWhileNotValid(key, Status(ErrorCodes::Error(461540), "")).getAsync([](auto) {}); return sharedFutureToReturn; } @@ -279,7 +256,7 @@ public: ValueHandle insertOrAssignAndGet(const Key& key, Value&& newValue, Date_t updateWallClockTime) { stdx::lock_guard lg(_mutex); if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end()) - it->second->invalidate(lg); + it->second->invalidateAndCancelCurrentLookupRound(lg); return _cache.insertOrAssignAndGet(key, {std::move(newValue), updateWallClockTime}); } @@ -296,7 +273,7 @@ public: void invalidate(const Key& key) { stdx::lock_guard lg(_mutex); if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end()) - it->second->invalidate(lg); + it->second->invalidateAndCancelCurrentLookupRound(lg); _cache.invalidate(key); } @@ -305,7 +282,7 @@ public: stdx::lock_guard lg(_mutex); for (auto& entry : _inProgressLookups) { if (predicate(entry.first)) - entry.second->invalidate(lg); + entry.second->invalidateAndCancelCurrentLookupRound(lg); } _cache.invalidateIf([&](const Key& key, const StoredValue*) { return predicate(key); }); } @@ -354,86 +331,42 @@ protected: } private: - // Refer to the comments on '_asyncLookupWhileInvalidated' for more detail on how this structure - // is used. - struct InProgressLookup { - InProgressLookup(Key key) : key(std::move(key)) {} - - void invalidate(WithLock) { - invalidated = true; - if (cancelToken) - cancelToken->tryCancel(); - } - - Key key; - SharedPromise<ValueHandle> sharedPromise; - - bool invalidated; - boost::optional<CancelToken> cancelToken; - }; using InProgressLookupsMap = stdx::unordered_map<Key, std::unique_ptr<InProgressLookup>>; /** - * This method is expected to be called with a constructed InProgressLookup object, emplaced on - * '_inProgressLookups' (represented by the 'inProgressLookup' argument). It implements an - * asynchronous "while (invalidated)" loop over the in-progress key referenced by - * 'inProgressLookup', which *must* be kept valid by the caller until the returned Future - * completes. - * - * The returned Future will be complete when that loop exists and will contain the latest value - * (or error) returned by 'lookup'. - * - * If thought of sequentially, the loop looks like this: - * - * while (true) { - * inProgressLookup.invalidated = false; - * inProgressLookup.cancelToken.reset(); - * valueOrError = lookup(key); - * if (!inProgressLookup.invalidated) - * return valueOrError; // signals the future - * } + * This method implements an asynchronous "while (!valid)" loop over 'key', which must be on the + * in-progress map. */ - Future<Value> _asyncLookupWhileInvalidated(stdx::unique_lock<Mutex> ul, - InProgressLookup& inProgressLookup) noexcept { - auto [promise, f] = makePromiseFuture<Value>(); - auto p = std::make_shared<Promise<Value>>(std::move(promise)); - - // Construct the future chain before scheduling the async work so it doesn't execute inline - auto future = - std::move(f).onCompletion([this, &inProgressLookup](StatusWith<Value> swValue) { - stdx::unique_lock ul(_mutex); - if (!inProgressLookup.invalidated) - return Future<Value>::makeReady(uassertStatusOK(std::move(swValue))); - - inProgressLookup.cancelToken.reset(); - return _asyncLookupWhileInvalidated(std::move(ul), inProgressLookup); - }); + Future<LookupResult> _doLookupWhileNotValid(Key key, StatusWith<LookupResult> sw) { + stdx::unique_lock ul(_mutex); + auto it = _inProgressLookups.find(key); + invariant(it != _inProgressLookups.end()); + if (!ErrorCodes::isCancelationError(sw.getStatus()) && !it->second->valid(ul)) { + ul.unlock(); // asyncLookupRound also acquires the mutex + return it->second->asyncLookupRound().onCompletion( + [this, key](auto sw) { return _doLookupWhileNotValid(key, std::move(sw)); }); + } - invariant(!inProgressLookup.cancelToken); - inProgressLookup.invalidated = false; - try { - inProgressLookup.cancelToken.emplace(_asyncWork([ this, p, &inProgressLookup ]( - OperationContext * opCtx, const Status& status) mutable noexcept { - p->setWith([&]() mutable { - uassertStatusOK(status); - auto value = _lookupFn(opCtx, inProgressLookup.key); - uassert(ErrorCodes::ReadThroughCacheKeyNotFound, - "Internal only: key not found", - value); - return std::move(*value); - }); - })); - } catch (const ExceptionForCat<ErrorCategory::CancelationError>& ex) { - // The thread pool is being shut down, so this is an inline execution - invariant(!inProgressLookup.invalidated); - invariant(!inProgressLookup.cancelToken); - - ul.unlock(); - p->setError(ex.toStatus()); + // The detachment of the currently active lookup and the placement of the result on the + // '_cache' has to be atomic with respect to a concurrent call to 'invalidate' + auto inProgressLookup(std::move(it->second)); + _inProgressLookups.erase(it); + + StatusWith<ValueHandle> swValueHandle(ErrorCodes::Error(461541), ""); + if (sw.isOK()) { + auto result = std::move(sw.getValue()); + swValueHandle = result.v + ? ValueHandle(_cache.insertOrAssignAndGet(key, {std::move(*result.v), _now()})) + : ValueHandle(); + } else { + swValueHandle = sw.getStatus(); } + ul.unlock(); - return std::move(future); - }; + inProgressLookup->signalWaiters(std::move(swValueHandle)); + + return Future<LookupResult>::makeReady(Status(ErrorCodes::Error(461542), "")); + } // Blocking function which will be invoked to retrieve entries from the backing store const LookupFn _lookupFn; @@ -454,4 +387,81 @@ private: InProgressLookupsMap _inProgressLookups; }; +/** + * This class represents an in-progress lookup for a specific key and implements the guarantees of + * the invalidation logic as described in the comments of 'ReadThroughCache::invalidate'. + * + * It is intended to be used in conjunction with the 'ReadThroughCache', which operates on it under + * its '_mutex' and ensures there is always at most a single active instance at a time active for + * each 'key'. + * + * The methods of this class are not thread-safe, unless indicated in the comments. + * + * Its lifecycle is intended to be like this: + * + * inProgressLookups.emplace(inProgress); + * while (true) { + * result = inProgress.asyncLookupRound(); + * if (!inProgress.valid()) { + * continue; + * } + * + * inProgressLookups.remove(inProgress) + * cachedValues.insert(result); + * inProgress.signalWaiters(result); + * } + */ +template <typename Key, typename Value> +class ReadThroughCache<Key, Value>::InProgressLookup { +public: + InProgressLookup(ReadThroughCache& cache, Key key) : _cache(cache), _key(std::move(key)) {} + + Future<LookupResult> asyncLookupRound() { + auto [promise, future] = makePromiseFuture<LookupResult>(); + + stdx::lock_guard lg(_cache._mutex); + _valid = true; + _cancelToken.emplace(_cache._asyncWork([ this, promise = std::move(promise) ]( + OperationContext * opCtx, const Status& status) mutable noexcept { + promise.setWith([&] { + uassertStatusOK(status); + return _cache._lookupFn(opCtx, _key); + }); + })); + + return std::move(future); + } + + SharedSemiFuture<ValueHandle> addWaiter(WithLock) { + return _sharedPromise.getFuture(); + } + + bool valid(WithLock) const { + return _valid; + } + + void invalidateAndCancelCurrentLookupRound(WithLock) { + _valid = false; + if (_cancelToken) + _cancelToken->tryCancel(); + } + + void signalWaiters(StatusWith<ValueHandle> swValueHandle) { + invariant(_valid); + _sharedPromise.setFromStatusWith(std::move(swValueHandle)); + } + +private: + // The owning cache, from which mutex, lookupFn, async task scheduling, etc. will be used. It is + // the responsibility of the owning cache to join all outstanding lookups at destruction time. + ReadThroughCache& _cache; + + const Key _key; + + bool _valid{false}; + boost::optional<CancelToken> _cancelToken; + + SharedPromise<ValueHandle> _sharedPromise; +}; + } // namespace mongo diff --git a/src/mongo/util/read_through_cache_test.cpp b/src/mongo/util/read_through_cache_test.cpp index 169606cfbdb..d196290bef8 100644 --- a/src/mongo/util/read_through_cache_test.cpp +++ b/src/mongo/util/read_through_cache_test.cpp @@ -41,10 +41,14 @@ namespace mongo { namespace { +using unittest::assertGet; + +// The structure for testing is intentionally made movable, but non-copyable struct CachedValue { CachedValue(int counter) : counter(counter) {} CachedValue(CachedValue&&) = default; CachedValue& operator=(CachedValue&&) = default; + int counter; }; @@ -86,14 +90,19 @@ protected: OperationContext* const _opCtx{_opCtxHolder.get()}; }; +TEST(ReadThroughCacheTest, StandaloneValueHandle) { + Cache::ValueHandle standaloneHandle(CachedValue(100)); + ASSERT(standaloneHandle.isValid()); + ASSERT_EQ(100, standaloneHandle->counter); +} + TEST_F(ReadThroughCacheTest, FetchInvalidateAndRefetch) { int countLookups = 0; CacheWithThreadPool cache( getServiceContext(), 1, [&](OperationContext*, const std::string& key) { ASSERT_EQ("TestKey", key); countLookups++; - - return CachedValue{100 * countLookups}; + return Cache::LookupResult(CachedValue(100 * countLookups)); }); for (int i = 1; i <= 3; i++) { @@ -109,14 +118,24 @@ TEST_F(ReadThroughCacheTest, FetchInvalidateAndRefetch) { } } +TEST_F(ReadThroughCacheTest, FailedLookup) { + CacheWithThreadPool cache( + getServiceContext(), + 1, + [&](OperationContext*, const std::string& key) -> Cache::LookupResult { + uasserted(ErrorCodes::InternalError, "Test error"); + }); + + ASSERT_THROWS_CODE(cache.acquire(_opCtx, "TestKey"), DBException, ErrorCodes::InternalError); +} + TEST_F(ReadThroughCacheTest, CacheSizeZero) { int countLookups = 0; CacheWithThreadPool cache( getServiceContext(), 0, [&](OperationContext*, const std::string& key) { ASSERT_EQ("TestKey", key); countLookups++; - - return CachedValue{100 * countLookups}; + return Cache::LookupResult(CachedValue(100 * countLookups)); }); for (int i = 1; i <= 3; i++) { @@ -133,8 +152,7 @@ TEST_F(ReadThroughCacheTest, InvalidateCacheSizeZeroReissuesLookup) { getServiceContext(), 0, [&](OperationContext*, const std::string& key) { ASSERT_EQ("TestKey", key); countLookups++; - - return CachedValue{1000 * countLookups}; + return Cache::LookupResult(CachedValue(1000 * countLookups)); }); auto value = cache.acquire(_opCtx, "TestKey"); @@ -158,7 +176,7 @@ TEST_F(ReadThroughCacheTest, KeyDoesNotExist) { CacheWithThreadPool cache( getServiceContext(), 1, [&](OperationContext*, const std::string& key) { ASSERT_EQ("TestKey", key); - return boost::none; + return Cache::LookupResult(boost::none); }); ASSERT(!cache.acquire(_opCtx, "TestKey")); @@ -167,12 +185,58 @@ TEST_F(ReadThroughCacheTest, KeyDoesNotExist) { /** * Fixture for tests, which need to control the creation/destruction of their operation contexts. */ -class ReadThroughCacheTestAsync : public unittest::Test, +class ReadThroughCacheAsyncTest : public unittest::Test, public ScopedGlobalServiceContextForTest {}; using Barrier = unittest::Barrier; -TEST_F(ReadThroughCacheTestAsync, AcquireObservesOperationContextDeadline) { +TEST_F(ReadThroughCacheAsyncTest, SuccessfulInProgressLookup) { + ThreadPool threadPool{ThreadPool::Options()}; + threadPool.startup(); + + Cache cache(getServiceContext(), threadPool, 1, [&](OperationContext*, const std::string& key) { + return Cache::LookupResult(CachedValue(500)); + }); + + Cache::InProgressLookup inProgress(cache, "TestKey"); + auto future = inProgress.addWaiter(WithLock::withoutLock()); + ASSERT(!future.isReady()); + + auto optVal = inProgress.asyncLookupRound().get(); + ASSERT(inProgress.valid(WithLock::withoutLock())); + ASSERT(optVal.v); + ASSERT_EQ(500, optVal.v->counter); + inProgress.signalWaiters(Cache::ValueHandle(std::move(*optVal.v))); + + ASSERT(future.isReady()); + ASSERT_EQ(500, future.get()->counter); +} + +TEST_F(ReadThroughCacheAsyncTest, FailedInProgressLookup) { + ThreadPool threadPool{ThreadPool::Options()}; + threadPool.startup(); + + Cache cache(getServiceContext(), + threadPool, + 1, + [&](OperationContext*, const std::string& key) -> Cache::LookupResult { + uasserted(ErrorCodes::InternalError, "Test error"); + }); + + Cache::InProgressLookup inProgress(cache, "TestKey"); + auto future = inProgress.addWaiter(WithLock::withoutLock()); + ASSERT(!future.isReady()); + + auto asyncLookupResult = inProgress.asyncLookupRound().getNoThrow(); + ASSERT_THROWS_CODE(inProgress.asyncLookupRound().get(), DBException, ErrorCodes::InternalError); + ASSERT(inProgress.valid(WithLock::withoutLock())); + inProgress.signalWaiters(asyncLookupResult.getStatus()); + + ASSERT(future.isReady()); + ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::InternalError); +} + +TEST_F(ReadThroughCacheAsyncTest, AcquireObservesOperationContextDeadline) { ThreadPool threadPool{ThreadPool::Options()}; threadPool.startup(); @@ -181,7 +245,7 @@ TEST_F(ReadThroughCacheTestAsync, AcquireObservesOperationContextDeadline) { Cache cache(getServiceContext(), threadPool, 1, [&](OperationContext*, const std::string& key) { lookupStartedBarrier.countDownAndWait(); completeLookupBarrier.countDownAndWait(); - return CachedValue(5); + return Cache::LookupResult(CachedValue(5)); }); { @@ -220,7 +284,7 @@ TEST_F(ReadThroughCacheTestAsync, AcquireObservesOperationContextDeadline) { } } -TEST_F(ReadThroughCacheTestAsync, InvalidateReissuesLookup) { +TEST_F(ReadThroughCacheAsyncTest, InvalidateReissuesLookup) { ThreadPool threadPool{ThreadPool::Options()}; threadPool.startup(); @@ -232,41 +296,40 @@ TEST_F(ReadThroughCacheTestAsync, InvalidateReissuesLookup) { int idx = countLookups.fetchAndAdd(1); lookupStartedBarriers[idx].countDownAndWait(); completeLookupBarriers[idx].countDownAndWait(); - return CachedValue(idx); + return Cache::LookupResult(CachedValue(idx)); }); // Kick off the first lookup, which will block auto future = cache.acquireAsync("TestKey"); ASSERT(!future.isReady()); - // Invalidate the first lookup attempt while it is still blocked + // Wait for the first lookup attempt to start and invalidate it before letting it proceed lookupStartedBarriers[0].countDownAndWait(); ASSERT_EQ(1, countLookups.load()); cache.invalidate("TestKey"); ASSERT(!future.isReady()); - - completeLookupBarriers[0].countDownAndWait(); + completeLookupBarriers[0].countDownAndWait(); // Lets lookup attempt 1 proceed ASSERT(!future.isReady()); - // Invalidate the second lookup attempt while it is still blocked + // Wait for the second lookup attempt to start and invalidate it before letting it proceed lookupStartedBarriers[1].countDownAndWait(); ASSERT_EQ(2, countLookups.load()); cache.invalidate("TestKey"); ASSERT(!future.isReady()); - - completeLookupBarriers[1].countDownAndWait(); + completeLookupBarriers[1].countDownAndWait(); // Lets lookup attempt 2 proceed ASSERT(!future.isReady()); - // Do not invalidate the third lookup and make sure it returns the correct value + // Wait for the third lookup attempt to start, but not do not invalidate it before letting it + // proceed lookupStartedBarriers[2].countDownAndWait(); ASSERT_EQ(3, countLookups.load()); ASSERT(!future.isReady()); + completeLookupBarriers[2].countDownAndWait(); // Lets lookup attempt 3 proceed - completeLookupBarriers[2].countDownAndWait(); ASSERT_EQ(2, future.get()->counter); } -TEST_F(ReadThroughCacheTestAsync, AcquireWithAShutdownThreadPool) { +TEST_F(ReadThroughCacheAsyncTest, AcquireWithAShutdownThreadPool) { ThreadPool threadPool{ThreadPool::Options()}; threadPool.startup(); threadPool.shutdown(); @@ -274,14 +337,14 @@ TEST_F(ReadThroughCacheTestAsync, AcquireWithAShutdownThreadPool) { Cache cache(getServiceContext(), threadPool, 1, [&](OperationContext*, const std::string&) { FAIL("Should not be called"); - return CachedValue(0); // Will never be reached + return Cache::LookupResult(CachedValue(0)); // Will never be reached }); auto future = cache.acquireAsync("TestKey"); ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::ShutdownInProgress); } -TEST_F(ReadThroughCacheTestAsync, InvalidateCalledBeforeLookupTaskExecutes) { +TEST_F(ReadThroughCacheAsyncTest, InvalidateCalledBeforeLookupTaskExecutes) { struct MockThreadPool : public ThreadPoolInterface { void startup() override {} void shutdown() override {} @@ -300,7 +363,7 @@ TEST_F(ReadThroughCacheTestAsync, InvalidateCalledBeforeLookupTaskExecutes) { } threadPool; Cache cache(getServiceContext(), threadPool, 1, [&](OperationContext*, const std::string&) { - return CachedValue(123); + return Cache::LookupResult(CachedValue(123)); }); auto future = cache.acquireAsync("TestKey"); |