From 8c96607bfbb82f2cf161177b4546f0ebeb07042a Mon Sep 17 00:00:00 2001 From: Tommaso Tocci Date: Wed, 23 Mar 2022 08:01:38 +0000 Subject: SERVER-64725 Make ShardRegistry::periodicReloader interruptible (cherry picked from commit ecff61d82253c9103e94ef50327d5c6cf8d64c1c) --- src/mongo/s/client/shard_registry.cpp | 102 ++++++++++------------------------ src/mongo/s/client/shard_registry.h | 6 +- 2 files changed, 31 insertions(+), 77 deletions(-) diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index b09d38fc342..fd0b4d56cc4 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -211,26 +211,33 @@ void ShardRegistry::startupPeriodicReloader(OperationContext* opCtx) { // construct task executor auto net = executor::makeNetworkInterface("ShardRegistryUpdater", nullptr, std::move(hookList)); auto netPtr = net.get(); - _executor = std::make_unique( + _executor = std::make_shared( std::make_unique(netPtr), std::move(net)); LOGV2_DEBUG(22724, 1, "Starting up task executor for periodic reloading of ShardRegistry"); _executor->startup(); - auto status = - _executor->scheduleWork([this](const CallbackArgs& cbArgs) { _periodicReload(cbArgs); }); - - if (status.getStatus() == ErrorCodes::ShutdownInProgress) { - LOGV2_DEBUG( - 22725, 1, "Can't schedule Shard Registry reload. Executor shutdown in progress"); - return; - } - - if (!status.isOK()) { - LOGV2_FATAL(40252, - "Error scheduling shard registry reload caused by {error}", - "Error scheduling shard registry reload", - "error"_attr = redact(status.getStatus())); - } + AsyncTry([this] { + LOGV2_DEBUG(22726, 1, "Reloading shardRegistry"); + return _reloadInternal(); + }) + .until([](auto sw) { + if (!sw.isOK()) { + LOGV2(22727, + "Error running periodic reload of shard registry", + "error"_attr = redact(sw.getStatus()), + "shardRegistryReloadInterval"_attr = kRefreshPeriod); + } + // Continue until the _executor will shutdown + return false; + }) + .withDelayBetweenIterations(kRefreshPeriod) // This call is optional. + .on(_executor, CancellationToken::uncancelable()) + .getAsync([](auto sw) { + LOGV2_DEBUG(22725, + 1, + "Exiting periodic shard registry reloader", + "reason"_attr = redact(sw.getStatus())); + }); } void ShardRegistry::shutdownPeriodicReloader() { @@ -254,52 +261,6 @@ void ShardRegistry::shutdown() { } } -void ShardRegistry::_periodicReload(const CallbackArgs& cbArgs) { - LOGV2_DEBUG(22726, 1, "Reloading shardRegistry"); - if (!cbArgs.status.isOK()) { - LOGV2_WARNING(22734, - "Error reloading shard registry caused by {error}", - "Error reloading shard registry", - "error"_attr = redact(cbArgs.status)); - return; - } - - ThreadClient tc("shard-registry-reload", getGlobalServiceContext()); - - auto opCtx = tc->makeOperationContext(); - - auto refreshPeriod = kRefreshPeriod; - - try { - reload(opCtx.get()); - } catch (const DBException& e) { - LOGV2(22727, - "Error running periodic reload of shard registry caused by {error}; will retry after " - "{shardRegistryReloadInterval}", - "Error running periodic reload of shard registry", - "error"_attr = redact(e), - "shardRegistryReloadInterval"_attr = refreshPeriod); - } - - // reschedule itself - auto status = - _executor->scheduleWorkAt(_executor->now() + refreshPeriod, - [this](const CallbackArgs& cbArgs) { _periodicReload(cbArgs); }); - - if (status.getStatus() == ErrorCodes::ShutdownInProgress) { - LOGV2_DEBUG( - 22728, 1, "Error scheduling shard registry reload. Executor shutdown in progress"); - return; - } - - if (!status.isOK()) { - LOGV2_FATAL(40253, - "Error scheduling shard registry reload caused by {error}", - "Error scheduling shard registry reload", - "error"_attr = redact(status.getStatus())); - } -} - ConnectionString ShardRegistry::getConfigServerConnectionString() const { return getConfigShard()->getConnString(); } @@ -442,16 +403,11 @@ void ShardRegistry::reload(OperationContext* opCtx) { if (MONGO_unlikely(TestingProctor::instance().isEnabled())) { // TODO SERVER-62152 investigate hang on reload in unit tests // Some unit tests don't support running the reload's AsyncTry on the fixed executor. - _reloadInternal(opCtx); + _reloadInternal().get(opCtx); } else { - AsyncTry([=]() mutable { - ThreadClient tc("ShardRegistry::reload", getGlobalServiceContext()); - auto innerOpCtx = tc->makeOperationContext(); - - _reloadInternal(innerOpCtx.get()); - }) - .until([](Status status) mutable { - return status != ErrorCodes::ReadConcernMajorityNotAvailableYet; + AsyncTry([=]() mutable { return _reloadInternal(); }) + .until([](auto sw) mutable { + return sw.getStatus() != ErrorCodes::ReadConcernMajorityNotAvailableYet; }) .withBackoffBetweenIterations(kExponentialBackoff) .on(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), @@ -461,13 +417,13 @@ void ShardRegistry::reload(OperationContext* opCtx) { } } -void ShardRegistry::_reloadInternal(OperationContext* opCtx) { +SharedSemiFuture ShardRegistry::_reloadInternal() { // Make the next acquire do a lookup. auto value = _forceReloadIncrement.addAndFetch(1); LOGV2_DEBUG(4620253, 2, "Forcing ShardRegistry reload", "newForceReloadIncrement"_attr = value); // Force it to actually happen now. - _getData(opCtx); + return _getDataAsync(); } void ShardRegistry::clearEntries() { diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h index fa241007731..0bb6196d2c3 100644 --- a/src/mongo/s/client/shard_registry.h +++ b/src/mongo/s/client/shard_registry.h @@ -438,9 +438,7 @@ private: void _initializeCacheIfNecessary() const; - void _periodicReload(const executor::TaskExecutor::CallbackArgs& cbArgs); - - void _reloadInternal(OperationContext* opCtx); + SharedSemiFuture _reloadInternal(); /** * Factory to create shards. Never changed after startup so safe to access outside of _mutex. @@ -463,7 +461,7 @@ private: ThreadPool _threadPool; // Executor for periodically reloading the registry (ie. in which _periodicReload() runs). - std::unique_ptr _executor{}; + std::shared_ptr _executor{}; mutable Mutex _cacheMutex = MONGO_MAKE_LATCH("ShardRegistry::_cacheMutex"); std::unique_ptr _cache; -- cgit v1.2.1