diff options
Diffstat (limited to 'src/mongo/db/s/shard_server_catalog_cache_loader.cpp')
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader.cpp | 109 |
1 files changed, 55 insertions, 54 deletions
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp index 36700332edb..4384aeeed72 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp @@ -394,8 +394,10 @@ std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSinc return std::make_tuple(_role == ReplicaSetRole::Primary, _term); }(); - uassertStatusOK(_threadPool.schedule( - [ this, nss, version, callbackFn, notify, isPrimary, term ]() noexcept { + _threadPool.schedule( + [ this, nss, version, callbackFn, notify, isPrimary, term ](auto status) noexcept { + invariant(status); + auto context = _contexts.makeOperationContext(*Client::getCurrent()); auto const opCtx = context.opCtx(); @@ -420,7 +422,7 @@ std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSinc callbackFn(opCtx, ex.toStatus()); notify->set(); } - })); + }); return notify; } @@ -441,36 +443,38 @@ void ShardServerCatalogCacheLoader::getDatabase( isPrimary = (_role == ReplicaSetRole::Primary); } - uassertStatusOK(_threadPool.schedule( - [ this, name = dbName.toString(), callbackFn, isPrimary, currentTerm ]() noexcept { - auto context = _contexts.makeOperationContext(*Client::getCurrent()); + _threadPool.schedule([ this, name = dbName.toString(), callbackFn, isPrimary, currentTerm ]( + auto status) noexcept { + invariant(status); - { - stdx::lock_guard<stdx::mutex> lock(_mutex); - - // We may have missed an OperationContextGroup interrupt since this operation began - // but before the OperationContext was added to the group. So we'll check that - // we're still in the same _term. - if (_term != currentTerm) { - callbackFn(context.opCtx(), - Status{ErrorCodes::Interrupted, - "Unable to refresh routing table because replica set state " - "changed or node is shutting down."}); - return; - } + auto context = _contexts.makeOperationContext(*Client::getCurrent()); + + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + // We may have missed an OperationContextGroup interrupt since this operation began + // but before the OperationContext was added to the group. So we'll check that + // we're still in the same _term. + if (_term != currentTerm) { + callbackFn(context.opCtx(), + Status{ErrorCodes::Interrupted, + "Unable to refresh routing table because replica set state " + "changed or node is shutting down."}); + return; } + } - try { - if (isPrimary) { - _schedulePrimaryGetDatabase( - context.opCtx(), StringData(name), currentTerm, callbackFn); - } else { - _runSecondaryGetDatabase(context.opCtx(), StringData(name), callbackFn); - } - } catch (const DBException& ex) { - callbackFn(context.opCtx(), ex.toStatus()); + try { + if (isPrimary) { + _schedulePrimaryGetDatabase( + context.opCtx(), StringData(name), currentTerm, callbackFn); + } else { + _runSecondaryGetDatabase(context.opCtx(), StringData(name), callbackFn); } - })); + } catch (const DBException& ex) { + callbackFn(context.opCtx(), ex.toStatus()); + } + }); } void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opCtx, @@ -680,9 +684,8 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( }(); if (termAfterRefresh != termScheduled) { - // Raising a ConflictingOperationInProgress error here will cause the - // CatalogCache to attempt the refresh as secondary instead of failing the - // operation + // Raising a ConflictingOperationInProgress error here will cause the CatalogCache to + // attempt the refresh as secondary instead of failing the operation return Status(ErrorCodes::ConflictingOperationInProgress, str::stream() << "Replication stepdown occurred during refresh for '" << nss.toString()); @@ -877,7 +880,11 @@ void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleCollAndChun return; } - invariant(_threadPool.schedule([this, nss]() { _runCollAndChunksTasks(nss); })); + _threadPool.schedule([this, nss](auto status) { + invariant(status); + + _runCollAndChunksTasks(nss); + }); } void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask(OperationContext* opCtx, @@ -895,8 +902,11 @@ void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask(Oper return; } - auto name = dbName.toString(); - invariant(_threadPool.schedule([this, name]() { _runDbTasks(name); })); + _threadPool.schedule([ this, name = dbName.toString() ](auto status) { + invariant(status); + + _runDbTasks(name); + }); } void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString& nss) { @@ -924,15 +934,11 @@ void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString // Schedule more work if there is any if (!_collAndChunkTaskLists[nss].empty()) { - Status status = _threadPool.schedule([this, nss]() { _runCollAndChunksTasks(nss); }); - if (!status.isOK()) { - LOG(0) << "Cache loader failed to schedule a persisted metadata update" - << " task for namespace '" << nss << "' due to '" << redact(status) - << "'. Clearing task list so that scheduling will be attempted by the next" - << " caller to refresh this namespace."; - - _collAndChunkTaskLists.erase(nss); - } + _threadPool.schedule([this, nss](auto status) { + invariant(status); + + _runCollAndChunksTasks(nss); + }); } else { _collAndChunkTaskLists.erase(nss); } @@ -962,16 +968,11 @@ void ShardServerCatalogCacheLoader::_runDbTasks(StringData dbName) { // Schedule more work if there is any if (!_dbTaskLists[dbName.toString()].empty()) { - Status status = - _threadPool.schedule([ this, name = dbName.toString() ]() { _runDbTasks(name); }); - if (!status.isOK()) { - LOG(0) << "Cache loader failed to schedule a persisted metadata update" - << " task for namespace '" << dbName << "' due to '" << redact(status) - << "'. Clearing task list so that scheduling will be attempted by the next" - << " caller to refresh this namespace."; - - _dbTaskLists.erase(dbName.toString()); - } + _threadPool.schedule([ this, name = dbName.toString() ](auto status) { + invariant(status); + + _runDbTasks(name); + }); } else { _dbTaskLists.erase(dbName.toString()); } |