summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/shard_server_catalog_cache_loader.cpp')
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.cpp109
1 files changed, 55 insertions, 54 deletions
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
index 36700332edb..4384aeeed72 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
@@ -394,8 +394,10 @@ std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSinc
return std::make_tuple(_role == ReplicaSetRole::Primary, _term);
}();
- uassertStatusOK(_threadPool.schedule(
- [ this, nss, version, callbackFn, notify, isPrimary, term ]() noexcept {
+ _threadPool.schedule(
+ [ this, nss, version, callbackFn, notify, isPrimary, term ](auto status) noexcept {
+ invariant(status);
+
auto context = _contexts.makeOperationContext(*Client::getCurrent());
auto const opCtx = context.opCtx();
@@ -420,7 +422,7 @@ std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSinc
callbackFn(opCtx, ex.toStatus());
notify->set();
}
- }));
+ });
return notify;
}
@@ -441,36 +443,38 @@ void ShardServerCatalogCacheLoader::getDatabase(
isPrimary = (_role == ReplicaSetRole::Primary);
}
- uassertStatusOK(_threadPool.schedule(
- [ this, name = dbName.toString(), callbackFn, isPrimary, currentTerm ]() noexcept {
- auto context = _contexts.makeOperationContext(*Client::getCurrent());
+ _threadPool.schedule([ this, name = dbName.toString(), callbackFn, isPrimary, currentTerm ](
+ auto status) noexcept {
+ invariant(status);
- {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
-
- // We may have missed an OperationContextGroup interrupt since this operation began
- // but before the OperationContext was added to the group. So we'll check that
- // we're still in the same _term.
- if (_term != currentTerm) {
- callbackFn(context.opCtx(),
- Status{ErrorCodes::Interrupted,
- "Unable to refresh routing table because replica set state "
- "changed or node is shutting down."});
- return;
- }
+ auto context = _contexts.makeOperationContext(*Client::getCurrent());
+
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ // We may have missed an OperationContextGroup interrupt since this operation began
+ // but before the OperationContext was added to the group. So we'll check that
+ // we're still in the same _term.
+ if (_term != currentTerm) {
+ callbackFn(context.opCtx(),
+ Status{ErrorCodes::Interrupted,
+ "Unable to refresh routing table because replica set state "
+ "changed or node is shutting down."});
+ return;
}
+ }
- try {
- if (isPrimary) {
- _schedulePrimaryGetDatabase(
- context.opCtx(), StringData(name), currentTerm, callbackFn);
- } else {
- _runSecondaryGetDatabase(context.opCtx(), StringData(name), callbackFn);
- }
- } catch (const DBException& ex) {
- callbackFn(context.opCtx(), ex.toStatus());
+ try {
+ if (isPrimary) {
+ _schedulePrimaryGetDatabase(
+ context.opCtx(), StringData(name), currentTerm, callbackFn);
+ } else {
+ _runSecondaryGetDatabase(context.opCtx(), StringData(name), callbackFn);
}
- }));
+ } catch (const DBException& ex) {
+ callbackFn(context.opCtx(), ex.toStatus());
+ }
+ });
}
void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opCtx,
@@ -680,9 +684,8 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
}();
if (termAfterRefresh != termScheduled) {
- // Raising a ConflictingOperationInProgress error here will cause the
- // CatalogCache to attempt the refresh as secondary instead of failing the
- // operation
+ // Raising a ConflictingOperationInProgress error here will cause the CatalogCache to
+ // attempt the refresh as secondary instead of failing the operation
return Status(ErrorCodes::ConflictingOperationInProgress,
str::stream() << "Replication stepdown occurred during refresh for '"
<< nss.toString());
@@ -877,7 +880,11 @@ void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleCollAndChun
return;
}
- invariant(_threadPool.schedule([this, nss]() { _runCollAndChunksTasks(nss); }));
+ _threadPool.schedule([this, nss](auto status) {
+ invariant(status);
+
+ _runCollAndChunksTasks(nss);
+ });
}
void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask(OperationContext* opCtx,
@@ -895,8 +902,11 @@ void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask(Oper
return;
}
- auto name = dbName.toString();
- invariant(_threadPool.schedule([this, name]() { _runDbTasks(name); }));
+ _threadPool.schedule([ this, name = dbName.toString() ](auto status) {
+ invariant(status);
+
+ _runDbTasks(name);
+ });
}
void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString& nss) {
@@ -924,15 +934,11 @@ void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString
// Schedule more work if there is any
if (!_collAndChunkTaskLists[nss].empty()) {
- Status status = _threadPool.schedule([this, nss]() { _runCollAndChunksTasks(nss); });
- if (!status.isOK()) {
- LOG(0) << "Cache loader failed to schedule a persisted metadata update"
- << " task for namespace '" << nss << "' due to '" << redact(status)
- << "'. Clearing task list so that scheduling will be attempted by the next"
- << " caller to refresh this namespace.";
-
- _collAndChunkTaskLists.erase(nss);
- }
+ _threadPool.schedule([this, nss](auto status) {
+ invariant(status);
+
+ _runCollAndChunksTasks(nss);
+ });
} else {
_collAndChunkTaskLists.erase(nss);
}
@@ -962,16 +968,11 @@ void ShardServerCatalogCacheLoader::_runDbTasks(StringData dbName) {
// Schedule more work if there is any
if (!_dbTaskLists[dbName.toString()].empty()) {
- Status status =
- _threadPool.schedule([ this, name = dbName.toString() ]() { _runDbTasks(name); });
- if (!status.isOK()) {
- LOG(0) << "Cache loader failed to schedule a persisted metadata update"
- << " task for namespace '" << dbName << "' due to '" << redact(status)
- << "'. Clearing task list so that scheduling will be attempted by the next"
- << " caller to refresh this namespace.";
-
- _dbTaskLists.erase(dbName.toString());
- }
+ _threadPool.schedule([ this, name = dbName.toString() ](auto status) {
+ invariant(status);
+
+ _runDbTasks(name);
+ });
} else {
_dbTaskLists.erase(dbName.toString());
}