summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Saltz <matthew.saltz@mongodb.com>2018-04-18 13:43:02 -0400
committerMatthew Saltz <matthew.saltz@mongodb.com>2018-04-23 11:07:29 -0400
commita000fcd684216a331356a3c1568ef7fa99ea4907 (patch)
tree37c8299de6ba69800601cad0dd81d4e22d5e6df5
parent179985c786cea234b65946ff647debfdfdbed511 (diff)
downloadmongo-a000fcd684216a331356a3c1568ef7fa99ea4907.tar.gz
SERVER-33954 Modified getDatabaseWithRefresh/getCollectionRoutingInfoWithRefresh to refresh twice if the first refresh is not performed by its own thread
-rw-r--r--src/mongo/s/catalog_cache.cpp73
-rw-r--r--src/mongo/s/catalog_cache.h37
2 files changed, 97 insertions, 13 deletions
diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp
index 434f3848e1b..18ba4dd823b 100644
--- a/src/mongo/s/catalog_cache.cpp
+++ b/src/mongo/s/catalog_cache.cpp
@@ -118,10 +118,19 @@ std::shared_ptr<RoutingTableHistory> refreshCollectionRoutingInfo(
CatalogCache::CatalogCache(CatalogCacheLoader& cacheLoader) : _cacheLoader(cacheLoader) {}
CatalogCache::~CatalogCache() = default;
-
StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx,
StringData dbName) {
+ return _getDatabase(opCtx, dbName).status;
+}
+
+CatalogCache::RefreshResult<CachedDatabaseInfo> CatalogCache::_getDatabase(OperationContext* opCtx,
+ StringData dbName) {
+ using DatabaseInfoRefreshResult = RefreshResult<CachedDatabaseInfo>;
+ using DatabaseInfoRefreshAction = RefreshResult<CachedDatabaseInfo>::RefreshAction;
+
+ DatabaseInfoRefreshAction refreshActionTaken;
try {
+ // Whether we performed refresh or someone else or not at all
while (true) {
stdx::unique_lock<stdx::mutex> ul(_mutex);
@@ -136,6 +145,9 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx
refreshNotification = (dbEntry->refreshCompletionNotification =
std::make_shared<Notification<Status>>());
_scheduleDatabaseRefresh(ul, dbName.toString(), dbEntry);
+ refreshActionTaken = DatabaseInfoRefreshAction::kPerformedRefresh;
+ } else {
+ refreshActionTaken = DatabaseInfoRefreshAction::kJoinedInProgressRefresh;
}
// Wait on the notification outside of the mutex.
@@ -171,29 +183,39 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx
auto primaryShard = uassertStatusOK(
Grid::get(opCtx)->shardRegistry()->getShard(opCtx, dbEntry->dbt->getPrimary()));
- return {CachedDatabaseInfo(*dbEntry->dbt, std::move(primaryShard))};
+ return {CachedDatabaseInfo(*dbEntry->dbt, std::move(primaryShard)), refreshActionTaken};
}
} catch (const DBException& ex) {
- return ex.toStatus();
+ return {ex.toStatus(), refreshActionTaken};
}
}
StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfo(
OperationContext* opCtx, const NamespaceString& nss) {
+ return _getCollectionRoutingInfo(opCtx, nss).status;
+}
+
+CatalogCache::RefreshResult<CachedCollectionRoutingInfo> CatalogCache::_getCollectionRoutingInfo(
+ OperationContext* opCtx, const NamespaceString& nss) {
return _getCollectionRoutingInfoAt(opCtx, nss, boost::none);
}
+
StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfoAt(
OperationContext* opCtx, const NamespaceString& nss, Timestamp atClusterTime) {
- return _getCollectionRoutingInfoAt(opCtx, nss, atClusterTime);
+ return _getCollectionRoutingInfoAt(opCtx, nss, atClusterTime).status;
}
-StatusWith<CachedCollectionRoutingInfo> CatalogCache::_getCollectionRoutingInfoAt(
+CatalogCache::RefreshResult<CachedCollectionRoutingInfo> CatalogCache::_getCollectionRoutingInfoAt(
OperationContext* opCtx, const NamespaceString& nss, boost::optional<Timestamp> atClusterTime) {
+ using CollectionRoutingInfoRefreshResult = RefreshResult<CachedCollectionRoutingInfo>;
+ using CollectionRoutingInfoRefreshAction =
+ RefreshResult<CachedCollectionRoutingInfo>::RefreshAction;
+ CollectionRoutingInfoRefreshAction refreshActionTaken;
while (true) {
const auto swDbInfo = getDatabase(opCtx, nss.db());
if (!swDbInfo.isOK()) {
- return swDbInfo.getStatus();
+ return {swDbInfo.getStatus(), CollectionRoutingInfoRefreshAction::kPerformedRefresh};
}
const auto dbInfo = std::move(swDbInfo.getValue());
@@ -201,11 +223,13 @@ StatusWith<CachedCollectionRoutingInfo> CatalogCache::_getCollectionRoutingInfoA
const auto itDb = _collectionsByDb.find(nss.db());
if (itDb == _collectionsByDb.end()) {
- return {CachedCollectionRoutingInfo(nss, dbInfo, nullptr)};
+ return {CachedCollectionRoutingInfo(nss, dbInfo, nullptr),
+ CollectionRoutingInfoRefreshAction::kPerformedRefresh};
}
const auto itColl = itDb->second.find(nss.ns());
if (itColl == itDb->second.end()) {
- return {CachedCollectionRoutingInfo(nss, dbInfo, nullptr)};
+ return {CachedCollectionRoutingInfo(nss, dbInfo, nullptr),
+ CollectionRoutingInfoRefreshAction::kPerformedRefresh};
}
auto& collEntry = itColl->second;
@@ -215,6 +239,9 @@ StatusWith<CachedCollectionRoutingInfo> CatalogCache::_getCollectionRoutingInfoA
refreshNotification = (collEntry->refreshCompletionNotification =
std::make_shared<Notification<Status>>());
_scheduleCollectionRefresh(ul, collEntry, nss, 1);
+ refreshActionTaken = CollectionRoutingInfoRefreshAction::kPerformedRefresh;
+ } else {
+ refreshActionTaken = CollectionRoutingInfoRefreshAction::kJoinedInProgressRefresh;
}
// Wait on the notification outside of the mutex
@@ -238,7 +265,7 @@ StatusWith<CachedCollectionRoutingInfo> CatalogCache::_getCollectionRoutingInfoA
}();
if (!refreshStatus.isOK()) {
- return refreshStatus;
+ return {refreshStatus, refreshActionTaken};
}
// Once the refresh is complete, loop around to get the latest value
@@ -247,20 +274,42 @@ StatusWith<CachedCollectionRoutingInfo> CatalogCache::_getCollectionRoutingInfoA
auto cm = std::make_shared<ChunkManager>(collEntry->routingInfo, atClusterTime);
- return {CachedCollectionRoutingInfo(nss, dbInfo, std::move(cm))};
+ return {CachedCollectionRoutingInfo(nss, dbInfo, std::move(cm)), refreshActionTaken};
}
}
StatusWith<CachedDatabaseInfo> CatalogCache::getDatabaseWithRefresh(OperationContext* opCtx,
StringData dbName) {
invalidateDatabaseEntry(dbName);
- return getDatabase(opCtx, dbName);
+ auto refreshResult = _getDatabase(opCtx, dbName);
+ // We want to ensure that we don't join an in-progress refresh because that
+ // could violate causal consistency for this client. We don't need to actually perform the
+ // refresh ourselves but we do need the refresh to begin *after* this function is
+ // called, so calling it twice is enough regardless of what happens the
+ // second time. See SERVER-33954 for reasoning.
+ if (refreshResult.actionTaken ==
+ RefreshResult<CachedDatabaseInfo>::RefreshAction::kJoinedInProgressRefresh) {
+ invalidateDatabaseEntry(dbName);
+ refreshResult = _getDatabase(opCtx, dbName);
+ }
+ return refreshResult.status;
}
StatusWith<CachedCollectionRoutingInfo> CatalogCache::getCollectionRoutingInfoWithRefresh(
OperationContext* opCtx, const NamespaceString& nss) {
invalidateShardedCollection(nss);
- return getCollectionRoutingInfo(opCtx, nss);
+ auto refreshResult = _getCollectionRoutingInfo(opCtx, nss);
+ // We want to ensure that we don't join an in-progress refresh because that
+ // could violate causal consistency for this client. We don't need to actually perform the
+ // refresh ourselves but we do need the refresh to begin *after* this function is
+ // called, so calling it twice is enough regardless of what happens the
+ // second time. See SERVER-33954 for reasoning.
+ if (refreshResult.actionTaken ==
+ RefreshResult<CachedCollectionRoutingInfo>::RefreshAction::kJoinedInProgressRefresh) {
+ invalidateShardedCollection(nss);
+ refreshResult = _getCollectionRoutingInfo(opCtx, nss);
+ }
+ return refreshResult.status;
}
StatusWith<CachedCollectionRoutingInfo> CatalogCache::getShardedCollectionRoutingInfoWithRefresh(
diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h
index 483f86ef164..bb0ee65e5c9 100644
--- a/src/mongo/s/catalog_cache.h
+++ b/src/mongo/s/catalog_cache.h
@@ -212,6 +212,33 @@ private:
};
/**
+ * Return type for helper functions performing refreshes so that they can
+ * indicate both status and whether or not this thread joined an in
+ * progress refresh.
+ */
+ template <class T>
+ struct RefreshResult {
+ // Status containing result of refresh
+ StatusWith<T> status;
+
+ // Flag indicating whether or not this thread performed its own
+ // refresh. kDidNotPerformRefresh could mean that either no refresh was
+ // performed at all (which would be indicated by the status) or that it
+ // joined an already in-progress refresh.
+ enum class RefreshAction {
+ kPerformedRefresh,
+ kJoinedInProgressRefresh,
+ } actionTaken;
+ };
+
+ /**
+ * Helper function for getDatabase that includes in its result what refresh
+ * action was taken
+ */
+ RefreshResult<CachedDatabaseInfo> _getDatabase(OperationContext* opCtx, StringData dbName);
+
+
+ /**
* Non-blocking call which schedules an asynchronous refresh for the specified database. The
* database entry must be in the 'needsRefresh' state.
*/
@@ -228,7 +255,15 @@ private:
NamespaceString const& nss,
int refreshAttempt);
- StatusWith<CachedCollectionRoutingInfo> _getCollectionRoutingInfoAt(
+
+ /**
+ * Helper function used when we need the refresh action taken (e.g. when we
+ * want to force refresh)
+ */
+ RefreshResult<CachedCollectionRoutingInfo> _getCollectionRoutingInfo(
+ OperationContext* opCtx, const NamespaceString& nss);
+
+ RefreshResult<CachedCollectionRoutingInfo> _getCollectionRoutingInfoAt(
OperationContext* opCtx,
const NamespaceString& nss,
boost::optional<Timestamp> atClusterTime);