diff options
author | Tommaso Tocci <tommaso.tocci@mongodb.com> | 2022-03-23 08:01:38 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-07-11 17:00:38 +0000 |
commit | 5dcd267b09a726ef041be925a0b970bfa25ca6ff (patch) | |
tree | f475fafe4a243712911a108693945f72d00a7693 | |
parent | 47d2c0a6db3d4d2968b9268eae67dc8575dbad2c (diff) | |
download | mongo-5dcd267b09a726ef041be925a0b970bfa25ca6ff.tar.gz |
SERVER-64725 Make ShardRegistry::periodicReloader interruptible
(cherry picked from commit ecff61d82253c9103e94ef50327d5c6cf8d64c1c)
-rw-r--r-- | src/mongo/s/client/shard_registry.cpp | 102 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.h | 6 |
2 files changed, 31 insertions, 77 deletions
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 5e1a715199a..922f9f7a387 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -227,26 +227,33 @@ void ShardRegistry::startupPeriodicReloader(OperationContext* opCtx) { // construct task executor auto net = executor::makeNetworkInterface("ShardRegistryUpdater", nullptr, std::move(hookList)); auto netPtr = net.get(); - _executor = std::make_unique<executor::ThreadPoolTaskExecutor>( + _executor = std::make_shared<executor::ThreadPoolTaskExecutor>( std::make_unique<executor::NetworkInterfaceThreadPool>(netPtr), std::move(net)); LOGV2_DEBUG(22724, 1, "Starting up task executor for periodic reloading of ShardRegistry"); _executor->startup(); - auto status = - _executor->scheduleWork([this](const CallbackArgs& cbArgs) { _periodicReload(cbArgs); }); - - if (status.getStatus() == ErrorCodes::ShutdownInProgress) { - LOGV2_DEBUG( - 22725, 1, "Can't schedule Shard Registry reload. Executor shutdown in progress"); - return; - } - - if (!status.isOK()) { - LOGV2_FATAL(40252, - "Error scheduling shard registry reload caused by {error}", - "Error scheduling shard registry reload", - "error"_attr = redact(status.getStatus())); - } + AsyncTry([this] { + LOGV2_DEBUG(22726, 1, "Reloading shardRegistry"); + return _reloadInternal(); + }) + .until([](auto sw) { + if (!sw.isOK()) { + LOGV2(22727, + "Error running periodic reload of shard registry", + "error"_attr = redact(sw.getStatus()), + "shardRegistryReloadInterval"_attr = kRefreshPeriod); + } + // Continue until the _executor will shutdown + return false; + }) + .withDelayBetweenIterations(kRefreshPeriod) // This call is optional. + .on(_executor, CancellationToken::uncancelable()) + .getAsync([](auto sw) { + LOGV2_DEBUG(22725, + 1, + "Exiting periodic shard registry reloader", + "reason"_attr = redact(sw.getStatus())); + }); } void ShardRegistry::shutdownPeriodicReloader() { @@ -270,52 +277,6 @@ void ShardRegistry::shutdown() { } } -void ShardRegistry::_periodicReload(const CallbackArgs& cbArgs) { - LOGV2_DEBUG(22726, 1, "Reloading shardRegistry"); - if (!cbArgs.status.isOK()) { - LOGV2_WARNING(22734, - "Error reloading shard registry caused by {error}", - "Error reloading shard registry", - "error"_attr = redact(cbArgs.status)); - return; - } - - ThreadClient tc("shard-registry-reload", getGlobalServiceContext()); - - auto opCtx = tc->makeOperationContext(); - - auto refreshPeriod = kRefreshPeriod; - - try { - reload(opCtx.get()); - } catch (const DBException& e) { - LOGV2(22727, - "Error running periodic reload of shard registry caused by {error}; will retry after " - "{shardRegistryReloadInterval}", - "Error running periodic reload of shard registry", - "error"_attr = redact(e), - "shardRegistryReloadInterval"_attr = refreshPeriod); - } - - // reschedule itself - auto status = - _executor->scheduleWorkAt(_executor->now() + refreshPeriod, - [this](const CallbackArgs& cbArgs) { _periodicReload(cbArgs); }); - - if (status.getStatus() == ErrorCodes::ShutdownInProgress) { - LOGV2_DEBUG( - 22728, 1, "Error scheduling shard registry reload. Executor shutdown in progress"); - return; - } - - if (!status.isOK()) { - LOGV2_FATAL(40253, - "Error scheduling shard registry reload caused by {error}", - "Error scheduling shard registry reload", - "error"_attr = redact(status.getStatus())); - } -} - ConnectionString ShardRegistry::getConfigServerConnectionString() const { return getConfigShard()->getConnString(); } @@ -458,16 +419,11 @@ void ShardRegistry::reload(OperationContext* opCtx) { if (MONGO_unlikely(TestingProctor::instance().isEnabled())) { // TODO SERVER-62152 investigate hang on reload in unit tests // Some unit tests don't support running the reload's AsyncTry on the fixed executor. - _reloadInternal(opCtx); + _reloadInternal().get(opCtx); } else { - AsyncTry([=]() mutable { - ThreadClient tc("ShardRegistry::reload", getGlobalServiceContext()); - auto innerOpCtx = tc->makeOperationContext(); - - _reloadInternal(innerOpCtx.get()); - }) - .until([](Status status) mutable { - return status != ErrorCodes::ReadConcernMajorityNotAvailableYet; + AsyncTry([=]() mutable { return _reloadInternal(); }) + .until([](auto sw) mutable { + return sw.getStatus() != ErrorCodes::ReadConcernMajorityNotAvailableYet; }) .withBackoffBetweenIterations(kExponentialBackoff) .on(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), @@ -477,13 +433,13 @@ void ShardRegistry::reload(OperationContext* opCtx) { } } -void ShardRegistry::_reloadInternal(OperationContext* opCtx) { +SharedSemiFuture<ShardRegistry::Cache::ValueHandle> ShardRegistry::_reloadInternal() { // Make the next acquire do a lookup. auto value = _forceReloadIncrement.addAndFetch(1); LOGV2_DEBUG(4620253, 2, "Forcing ShardRegistry reload", "newForceReloadIncrement"_attr = value); // Force it to actually happen now. - _getData(opCtx); + return _getDataAsync(); } void ShardRegistry::clearEntries() { diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h index fa241007731..0bb6196d2c3 100644 --- a/src/mongo/s/client/shard_registry.h +++ b/src/mongo/s/client/shard_registry.h @@ -438,9 +438,7 @@ private: void _initializeCacheIfNecessary() const; - void _periodicReload(const executor::TaskExecutor::CallbackArgs& cbArgs); - - void _reloadInternal(OperationContext* opCtx); + SharedSemiFuture<Cache::ValueHandle> _reloadInternal(); /** * Factory to create shards. Never changed after startup so safe to access outside of _mutex. @@ -463,7 +461,7 @@ private: ThreadPool _threadPool; // Executor for periodically reloading the registry (ie. in which _periodicReload() runs). - std::unique_ptr<executor::TaskExecutor> _executor{}; + std::shared_ptr<executor::TaskExecutor> _executor{}; mutable Mutex _cacheMutex = MONGO_MAKE_LATCH("ShardRegistry::_cacheMutex"); std::unique_ptr<Cache> _cache; |