diff options
author | Matthew Saltz <matthew.saltz@mongodb.com> | 2018-04-18 13:43:02 -0400 |
---|---|---|
committer | Matthew Saltz <matthew.saltz@mongodb.com> | 2018-04-23 11:07:29 -0400 |
commit | a000fcd684216a331356a3c1568ef7fa99ea4907 (patch) | |
tree | 37c8299de6ba69800601cad0dd81d4e22d5e6df5 | |
parent | 179985c786cea234b65946ff647debfdfdbed511 (diff) | |
download | mongo-a000fcd684216a331356a3c1568ef7fa99ea4907.tar.gz |
SERVER-33954 Modified getDatabaseWithRefresh/getCollectionRoutingInfoWithRefresh to refresh twice if the first refresh is not performed by its own thread
-rw-r--r-- | src/mongo/s/catalog_cache.cpp | 73 | ||||
-rw-r--r-- | src/mongo/s/catalog_cache.h | 37 |
2 files changed, 97 insertions, 13 deletions
diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp index 434f3848e1b..18ba4dd823b 100644 --- a/src/mongo/s/catalog_cache.cpp +++ b/src/mongo/s/catalog_cache.cpp @@ -118,10 +118,19 @@ std::shared_ptr<RoutingTableHistory> refreshCollectionRoutingInfo( CatalogCache::CatalogCache(CatalogCacheLoader& cacheLoader) : _cacheLoader(cacheLoader) {} CatalogCache::~CatalogCache() = default; - StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx, StringData dbName) { + return _getDatabase(opCtx, dbName).status; +} + +CatalogCache::RefreshResult<CachedDatabaseInfo> CatalogCache::_getDatabase(OperationContext* opCtx, + StringData dbName) { + using DatabaseInfoRefreshResult = RefreshResult<CachedDatabaseInfo>; + using DatabaseInfoRefreshAction = RefreshResult<CachedDatabaseInfo>::RefreshAction; + + DatabaseInfoRefreshAction refreshActionTaken; try { + // Whether we performed refresh or someone else or not at all while (true) { stdx::unique_lock<stdx::mutex> ul(_mutex); @@ -136,6 +145,9 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx refreshNotification = (dbEntry->refreshCompletionNotification = std::make_shared<Notification<Status>>()); _scheduleDatabaseRefresh(ul, dbName.toString(), dbEntry); + refreshActionTaken = DatabaseInfoRefreshAction::kPerformedRefresh; + } else { + refreshActionTaken = DatabaseInfoRefreshAction::kJoinedInProgressRefresh; } // Wait on the notification outside of the mutex. @@ -171,29 +183,39 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx auto primaryShard = uassertStatusOK( Grid::get(opCtx)->shardRegistry()->getShard(opCtx, dbEntry->dbt->getPrimary())); - return {CachedDatabaseInfo(*dbEntry->dbt, std::move(primaryShard))}; + return {CachedDatabaseInfo(*dbEntry->dbt, std::move(primaryShard)), refreshActionTaken}; } } catch (const DBException& ex) { - return ex.toStatus(); + return {ex.toStatus(), refreshActionTaken}; } } StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo( OperationContext* opCtx, const NamespaceString& nss) { + return _getCollectionRoutingInfo(opCtx, nss).status; +} + +CatalogCache::RefreshResult<CachedCollectionRoutingInfo> CatalogCache::_getCollectionRoutingInfo( + OperationContext* opCtx, const NamespaceString& nss) { return _getCollectionRoutingInfoAt(opCtx, nss, boost::none); } + StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfoAt( OperationContext* opCtx, const NamespaceString& nss, Timestamp atClusterTime) { - return _getCollectionRoutingInfoAt(opCtx, nss, atClusterTime); + return _getCollectionRoutingInfoAt(opCtx, nss, atClusterTime).status; } -StatusWith<CachedCollectionRoutingInfo> CatalogCache::_getCollectionRoutingInfoAt( +CatalogCache::RefreshResult<CachedCollectionRoutingInfo> CatalogCache::_getCollectionRoutingInfoAt( OperationContext* opCtx, const NamespaceString& nss, boost::optional<Timestamp> atClusterTime) { + using CollectionRoutingInfoRefreshResult = RefreshResult<CachedCollectionRoutingInfo>; + using CollectionRoutingInfoRefreshAction = + RefreshResult<CachedCollectionRoutingInfo>::RefreshAction; + CollectionRoutingInfoRefreshAction refreshActionTaken; while (true) { const auto swDbInfo = getDatabase(opCtx, nss.db()); if (!swDbInfo.isOK()) { - return swDbInfo.getStatus(); + return {swDbInfo.getStatus(), CollectionRoutingInfoRefreshAction::kPerformedRefresh}; } const auto dbInfo = std::move(swDbInfo.getValue()); @@ -201,11 +223,13 @@ StatusWith<CachedCollectionRoutingInfo> CatalogCache::_getCollectionRoutingInfoA const auto itDb = _collectionsByDb.find(nss.db()); if (itDb == _collectionsByDb.end()) { - return {CachedCollectionRoutingInfo(nss, dbInfo, nullptr)}; + return {CachedCollectionRoutingInfo(nss, dbInfo, nullptr), + CollectionRoutingInfoRefreshAction::kPerformedRefresh}; } const auto itColl = itDb->second.find(nss.ns()); if (itColl == itDb->second.end()) { - return {CachedCollectionRoutingInfo(nss, dbInfo, nullptr)}; + return {CachedCollectionRoutingInfo(nss, dbInfo, nullptr), + CollectionRoutingInfoRefreshAction::kPerformedRefresh}; } auto& collEntry = itColl->second; @@ -215,6 +239,9 @@ StatusWith<CachedCollectionRoutingInfo> CatalogCache::_getCollectionRoutingInfoA refreshNotification = (collEntry->refreshCompletionNotification = std::make_shared<Notification<Status>>()); _scheduleCollectionRefresh(ul, collEntry, nss, 1); + refreshActionTaken = CollectionRoutingInfoRefreshAction::kPerformedRefresh; + } else { + refreshActionTaken = CollectionRoutingInfoRefreshAction::kJoinedInProgressRefresh; } // Wait on the notification outside of the mutex @@ -238,7 +265,7 @@ StatusWith<CachedCollectionRoutingInfo> CatalogCache::_getCollectionRoutingInfoA }(); if (!refreshStatus.isOK()) { - return refreshStatus; + return {refreshStatus, refreshActionTaken}; } // Once the refresh is complete, loop around to get the latest value @@ -247,20 +274,42 @@ StatusWith<CachedCollectionRoutingInfo> CatalogCache::_getCollectionRoutingInfoA auto cm = std::make_shared<ChunkManager>(collEntry->routingInfo, atClusterTime); - return {CachedCollectionRoutingInfo(nss, dbInfo, std::move(cm))}; + return {CachedCollectionRoutingInfo(nss, dbInfo, std::move(cm)), refreshActionTaken}; } } StatusWith<CachedDatabaseInfo> CatalogCache::getDatabaseWithRefresh(OperationContext* opCtx, StringData dbName) { invalidateDatabaseEntry(dbName); - return getDatabase(opCtx, dbName); + auto refreshResult = _getDatabase(opCtx, dbName); + // We want to ensure that we don't join an in-progress refresh because that + // could violate causal consistency for this client. We don't need to actually perform the + // refresh ourselves but we do need the refresh to begin *after* this function is + // called, so calling it twice is enough regardless of what happens the + // second time. See SERVER-33954 for reasoning. + if (refreshResult.actionTaken == + RefreshResult<CachedDatabaseInfo>::RefreshAction::kJoinedInProgressRefresh) { + invalidateDatabaseEntry(dbName); + refreshResult = _getDatabase(opCtx, dbName); + } + return refreshResult.status; } StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfoWithRefresh( OperationContext* opCtx, const NamespaceString& nss) { invalidateShardedCollection(nss); - return getCollectionRoutingInfo(opCtx, nss); + auto refreshResult = _getCollectionRoutingInfo(opCtx, nss); + // We want to ensure that we don't join an in-progress refresh because that + // could violate causal consistency for this client. We don't need to actually perform the + // refresh ourselves but we do need the refresh to begin *after* this function is + // called, so calling it twice is enough regardless of what happens the + // second time. See SERVER-33954 for reasoning. + if (refreshResult.actionTaken == + RefreshResult<CachedCollectionRoutingInfo>::RefreshAction::kJoinedInProgressRefresh) { + invalidateShardedCollection(nss); + refreshResult = _getCollectionRoutingInfo(opCtx, nss); + } + return refreshResult.status; } StatusWith<CachedCollectionRoutingInfo> CatalogCache::getShardedCollectionRoutingInfoWithRefresh( diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h index 483f86ef164..bb0ee65e5c9 100644 --- a/src/mongo/s/catalog_cache.h +++ b/src/mongo/s/catalog_cache.h @@ -212,6 +212,33 @@ private: }; /** + * Return type for helper functions performing refreshes so that they can + * indicate both status and whether or not this thread joined an in + * progress refresh. + */ + template <class T> + struct RefreshResult { + // Status containing result of refresh + StatusWith<T> status; + + // Flag indicating whether or not this thread performed its own + // refresh. kDidNotPerformRefresh could mean that either no refresh was + // performed at all (which would be indicated by the status) or that it + // joined an already in-progress refresh. + enum class RefreshAction { + kPerformedRefresh, + kJoinedInProgressRefresh, + } actionTaken; + }; + + /** + * Helper function for getDatabase that includes in its result what refresh + * action was taken + */ + RefreshResult<CachedDatabaseInfo> _getDatabase(OperationContext* opCtx, StringData dbName); + + + /** * Non-blocking call which schedules an asynchronous refresh for the specified database. The * database entry must be in the 'needsRefresh' state. */ @@ -228,7 +255,15 @@ private: NamespaceString const& nss, int refreshAttempt); - StatusWith<CachedCollectionRoutingInfo> _getCollectionRoutingInfoAt( + + /** + * Helper function used when we need the refresh action taken (e.g. when we + * want to force refresh) + */ + RefreshResult<CachedCollectionRoutingInfo> _getCollectionRoutingInfo( + OperationContext* opCtx, const NamespaceString& nss); + + RefreshResult<CachedCollectionRoutingInfo> _getCollectionRoutingInfoAt( OperationContext* opCtx, const NamespaceString& nss, boost::optional<Timestamp> atClusterTime); |