summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.cpp229
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.h12
2 files changed, 111 insertions, 130 deletions
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
index 70b0c445ec3..36700332edb 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
@@ -616,99 +616,94 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
return getPersistedMaxChunkVersion(opCtx, nss);
}();
- auto remoteRefreshCallbackFn = [this,
- nss,
- catalogCacheSinceVersion,
- maxLoaderVersion,
- termScheduled,
- callbackFn,
- notify](
+ auto remoteRefreshFn = [this, nss, catalogCacheSinceVersion, maxLoaderVersion, termScheduled](
OperationContext* opCtx,
- StatusWith<CollectionAndChangedChunks> swCollectionAndChangedChunks) {
+ StatusWith<CollectionAndChangedChunks>
+ swCollectionAndChangedChunks) -> StatusWith<CollectionAndChangedChunks> {
if (swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound) {
- Status scheduleStatus = _ensureMajorityPrimaryAndScheduleCollAndChunksTask(
+ _ensureMajorityPrimaryAndScheduleCollAndChunksTask(
opCtx,
nss,
collAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
- if (!scheduleStatus.isOK()) {
- callbackFn(opCtx, scheduleStatus);
- notify->set();
- return;
- }
LOG_CATALOG_REFRESH(1) << "Cache loader remotely refreshed for collection " << nss
<< " from version " << maxLoaderVersion
<< " and no metadata was found.";
- } else if (swCollectionAndChangedChunks.isOK()) {
- auto& collAndChunks = swCollectionAndChangedChunks.getValue();
-
- if (collAndChunks.changedChunks.back().getVersion().epoch() != collAndChunks.epoch) {
- swCollectionAndChangedChunks =
- Status{ErrorCodes::ConflictingOperationInProgress,
- str::stream()
- << "Invalid chunks found when reloading '"
- << nss.toString()
- << "' Previous collection epoch was '"
- << collAndChunks.epoch.toString()
- << "', but found a new epoch '"
- << collAndChunks.changedChunks.back().getVersion().epoch().toString()
- << "'. Collection was dropped and recreated."};
- } else {
- if ((collAndChunks.epoch != maxLoaderVersion.epoch()) ||
- (collAndChunks.changedChunks.back().getVersion() > maxLoaderVersion)) {
- Status scheduleStatus = _ensureMajorityPrimaryAndScheduleCollAndChunksTask(
- opCtx,
- nss,
- collAndChunkTask{
- swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
- if (!scheduleStatus.isOK()) {
- callbackFn(opCtx, scheduleStatus);
- notify->set();
- return;
- }
- }
+ return swCollectionAndChangedChunks;
+ }
+
+ if (!swCollectionAndChangedChunks.isOK()) {
+ return swCollectionAndChangedChunks;
+ }
+
+ auto& collAndChunks = swCollectionAndChangedChunks.getValue();
+
+ if (collAndChunks.changedChunks.back().getVersion().epoch() != collAndChunks.epoch) {
+ return Status{
+ ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Invalid chunks found when reloading '" << nss.toString()
+ << "' Previous collection epoch was '"
+ << collAndChunks.epoch.toString()
+ << "', but found a new epoch '"
+ << collAndChunks.changedChunks.back().getVersion().epoch().toString()
+ << "'. Collection was dropped and recreated."};
+ }
+
+ if ((collAndChunks.epoch != maxLoaderVersion.epoch()) ||
+ (collAndChunks.changedChunks.back().getVersion() > maxLoaderVersion)) {
+ _ensureMajorityPrimaryAndScheduleCollAndChunksTask(
+ opCtx,
+ nss,
+ collAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
+ }
- LOG_CATALOG_REFRESH(1) << "Cache loader remotely refreshed for collection " << nss
- << " from collection version " << maxLoaderVersion
- << " and found collection version "
- << collAndChunks.changedChunks.back().getVersion();
+ LOG_CATALOG_REFRESH(1) << "Cache loader remotely refreshed for collection " << nss
+ << " from collection version " << maxLoaderVersion
+ << " and found collection version "
+ << collAndChunks.changedChunks.back().getVersion();
- // Metadata was found remotely -- otherwise would have received NamespaceNotFound
- // rather than Status::OK(). Return metadata for CatalogCache that's GTE
- // catalogCacheSinceVersion, from the loader's persisted and enqueued metadata.
+ // Metadata was found remotely
+ // -- otherwise we would have received NamespaceNotFound rather than Status::OK().
+ // Return metadata for CatalogCache that's GTE catalogCacheSinceVersion,
+ // from the loader's persisted and enqueued metadata.
- swCollectionAndChangedChunks =
- _getLoaderMetadata(opCtx, nss, catalogCacheSinceVersion, termScheduled);
+ swCollectionAndChangedChunks =
+ _getLoaderMetadata(opCtx, nss, catalogCacheSinceVersion, termScheduled);
+ if (!swCollectionAndChangedChunks.isOK()) {
+ return swCollectionAndChangedChunks;
+ }
- const auto termAfterRefresh = [&] {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- return _term;
- }();
-
- if (termAfterRefresh != termScheduled) {
- // Raising a ConflictingOperationInProgress error here will cause the
- // CatalogCache to attempt the refresh as secondary instead of failing the
- // operation
- swCollectionAndChangedChunks = Status(
- ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "Replication stepdown occurred during refresh for '"
- << nss.toString());
- } else if (swCollectionAndChangedChunks.isOK()) {
- // After finding metadata remotely, we must have found metadata locally.
- invariant(!collAndChunks.changedChunks.empty());
- }
- }
+ const auto termAfterRefresh = [&] {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ return _term;
+ }();
+
+ if (termAfterRefresh != termScheduled) {
+ // Raising a ConflictingOperationInProgress error here will cause the
+ // CatalogCache to attempt the refresh as secondary instead of failing the
+ // operation
+ return Status(ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Replication stepdown occurred during refresh for '"
+ << nss.toString());
}
- // Complete the callbackFn work.
- callbackFn(opCtx, std::move(swCollectionAndChangedChunks));
- notify->set();
+ // After finding metadata remotely, we must have found metadata locally.
+ invariant(!collAndChunks.changedChunks.empty());
+
+ return swCollectionAndChangedChunks;
};
// Refresh the loader's metadata from the config server. The caller's request will
// then be serviced from the loader's up-to-date metadata.
- _configServerLoader->getChunksSince(nss, maxLoaderVersion, remoteRefreshCallbackFn);
+ _configServerLoader->getChunksSince(
+ nss,
+ maxLoaderVersion,
+ [this, remoteRefreshFn, callbackFn, notify](auto opCtx, auto swCollectionAndChangedChunks) {
+ // Complete the callbackFn work.
+ callbackFn(opCtx, remoteRefreshFn(opCtx, std::move(swCollectionAndChangedChunks)));
+ notify->set();
+ });
}
void ShardServerCatalogCacheLoader::_runSecondaryGetDatabase(
@@ -728,36 +723,33 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetDatabase(
StringData dbName,
long long termScheduled,
stdx::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) {
- auto remoteRefreshCallbackFn = [ this, name = dbName.toString(), termScheduled, callbackFn ](
+ auto remoteRefreshFn = [ this, name = dbName.toString(), termScheduled ](
OperationContext * opCtx, StatusWith<DatabaseType> swDatabaseType) {
if (swDatabaseType == ErrorCodes::NamespaceNotFound) {
- Status scheduleStatus = _ensureMajorityPrimaryAndScheduleDbTask(
+ _ensureMajorityPrimaryAndScheduleDbTask(
opCtx, name, DBTask{swDatabaseType, termScheduled});
- if (!scheduleStatus.isOK()) {
- callbackFn(opCtx, scheduleStatus);
- return;
- }
LOG_CATALOG_REFRESH(1) << "Cache loader remotely refreshed for database " << name
<< " and found the database has been dropped.";
+ return swDatabaseType;
+ }
- } else if (swDatabaseType.isOK()) {
- Status scheduleStatus = _ensureMajorityPrimaryAndScheduleDbTask(
- opCtx, name, DBTask{swDatabaseType, termScheduled});
- if (!scheduleStatus.isOK()) {
- callbackFn(opCtx, scheduleStatus);
- return;
- }
-
- LOG_CATALOG_REFRESH(1) << "Cache loader remotely refreshed for database " << name
- << " and found " << swDatabaseType.getValue().toBSON();
+ if (!swDatabaseType.isOK()) {
+ return swDatabaseType;
}
- // Complete the callbackFn work.
- callbackFn(opCtx, std::move(swDatabaseType));
+ _ensureMajorityPrimaryAndScheduleDbTask(opCtx, name, DBTask{swDatabaseType, termScheduled});
+
+ LOG_CATALOG_REFRESH(1) << "Cache loader remotely refreshed for database " << name
+ << " and found " << swDatabaseType.getValue().toBSON();
+
+ return swDatabaseType;
};
- _configServerLoader->getDatabase(dbName, remoteRefreshCallbackFn);
+ _configServerLoader->getDatabase(
+ dbName, [this, remoteRefreshFn, callbackFn](auto opCtx, auto swDatabaseType) {
+ callbackFn(opCtx, remoteRefreshFn(opCtx, std::move(swDatabaseType)));
+ });
}
StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoaderMetadata(
@@ -871,51 +863,40 @@ std::pair<bool, CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getE
return std::make_pair(true, collAndChunks);
}
-Status ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleCollAndChunksTask(
+void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleCollAndChunksTask(
OperationContext* opCtx, const NamespaceString& nss, collAndChunkTask task) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- const bool wasEmpty = _collAndChunkTaskLists[nss].empty();
- _collAndChunkTaskLists[nss].addTask(std::move(task));
- if (!wasEmpty) {
- return Status::OK();
- }
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
- Status status = _threadPool.schedule([this, nss]() { _runCollAndChunksTasks(nss); });
- if (!status.isOK()) {
- LOG(0) << "Cache loader failed to schedule persisted metadata update"
- << " task for namespace '" << nss << "' due to '" << redact(status)
- << "'. Clearing task list so that scheduling"
- << " will be attempted by the next caller to refresh this namespace.";
+ auto& list = _collAndChunkTaskLists[nss];
+ auto wasEmpty = list.empty();
+ list.addTask(std::move(task));
- _collAndChunkTaskLists.erase(nss);
+ if (!wasEmpty)
+ return;
}
- return status;
+ invariant(_threadPool.schedule([this, nss]() { _runCollAndChunksTasks(nss); }));
}
-Status ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask(
- OperationContext* opCtx, StringData dbName, DBTask task) {
+void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask(OperationContext* opCtx,
+ StringData dbName,
+ DBTask task) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- const bool wasEmpty = _dbTaskLists[dbName.toString()].empty();
- _dbTaskLists[dbName.toString()].addTask(std::move(task));
- if (!wasEmpty) {
- return Status::OK();
- }
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
- Status status =
- _threadPool.schedule([ this, name = dbName.toString() ]() { _runDbTasks(name); });
- if (!status.isOK()) {
- LOG(0) << "Cache loader failed to schedule persisted metadata update"
- << " task for db '" << dbName << "' due to '" << redact(status)
- << "'. Clearing task list so that scheduling"
- << " will be attempted by the next caller to refresh this namespace.";
+ auto& list = _dbTaskLists[dbName.toString()];
+ auto wasEmpty = list.empty();
+ list.addTask(std::move(task));
- _dbTaskLists.erase(dbName.toString());
+ if (!wasEmpty)
+ return;
}
- return status;
+ auto name = dbName.toString();
+ invariant(_threadPool.schedule([this, name]() { _runDbTasks(name); }));
}
void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString& nss) {
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h
index 62abc9802e4..c578217a3d5 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.h
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h
@@ -430,13 +430,13 @@ private:
*
* Only run on the shard primary.
*/
- Status _ensureMajorityPrimaryAndScheduleCollAndChunksTask(OperationContext* opCtx,
- const NamespaceString& nss,
- collAndChunkTask task);
+ void _ensureMajorityPrimaryAndScheduleCollAndChunksTask(OperationContext* opCtx,
+ const NamespaceString& nss,
+ collAndChunkTask task);
- Status _ensureMajorityPrimaryAndScheduleDbTask(OperationContext* opCtx,
- StringData dbName,
- DBTask task);
+ void _ensureMajorityPrimaryAndScheduleDbTask(OperationContext* opCtx,
+ StringData dbName,
+ DBTask task);
/**
* Schedules tasks in the 'nss' task list to execute until the task list is depleted.
*