summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTommaso Tocci <tommaso.tocci@mongodb.com>2022-03-23 08:01:38 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-14 14:36:28 +0000
commit8c96607bfbb82f2cf161177b4546f0ebeb07042a (patch)
treebdd1bee1c4eb4b37f8b9a36d470aae3f7f0e1d83
parent6a1b4f309d741385eb49b1f87ebe94a2781c1411 (diff)
downloadmongo-8c96607bfbb82f2cf161177b4546f0ebeb07042a.tar.gz
SERVER-64725 Make ShardRegistry::periodicReloader interruptible
(cherry picked from commit ecff61d82253c9103e94ef50327d5c6cf8d64c1c)
-rw-r--r--src/mongo/s/client/shard_registry.cpp102
-rw-r--r--src/mongo/s/client/shard_registry.h6
2 files changed, 31 insertions, 77 deletions
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index b09d38fc342..fd0b4d56cc4 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -211,26 +211,33 @@ void ShardRegistry::startupPeriodicReloader(OperationContext* opCtx) {
// construct task executor
auto net = executor::makeNetworkInterface("ShardRegistryUpdater", nullptr, std::move(hookList));
auto netPtr = net.get();
- _executor = std::make_unique<executor::ThreadPoolTaskExecutor>(
+ _executor = std::make_shared<executor::ThreadPoolTaskExecutor>(
std::make_unique<executor::NetworkInterfaceThreadPool>(netPtr), std::move(net));
LOGV2_DEBUG(22724, 1, "Starting up task executor for periodic reloading of ShardRegistry");
_executor->startup();
- auto status =
- _executor->scheduleWork([this](const CallbackArgs& cbArgs) { _periodicReload(cbArgs); });
-
- if (status.getStatus() == ErrorCodes::ShutdownInProgress) {
- LOGV2_DEBUG(
- 22725, 1, "Can't schedule Shard Registry reload. Executor shutdown in progress");
- return;
- }
-
- if (!status.isOK()) {
- LOGV2_FATAL(40252,
- "Error scheduling shard registry reload caused by {error}",
- "Error scheduling shard registry reload",
- "error"_attr = redact(status.getStatus()));
- }
+ AsyncTry([this] {
+ LOGV2_DEBUG(22726, 1, "Reloading shardRegistry");
+ return _reloadInternal();
+ })
+ .until([](auto sw) {
+ if (!sw.isOK()) {
+ LOGV2(22727,
+ "Error running periodic reload of shard registry",
+ "error"_attr = redact(sw.getStatus()),
+ "shardRegistryReloadInterval"_attr = kRefreshPeriod);
+ }
+ // Continue until the _executor will shutdown
+ return false;
+ })
+ .withDelayBetweenIterations(kRefreshPeriod) // This call is optional.
+ .on(_executor, CancellationToken::uncancelable())
+ .getAsync([](auto sw) {
+ LOGV2_DEBUG(22725,
+ 1,
+ "Exiting periodic shard registry reloader",
+ "reason"_attr = redact(sw.getStatus()));
+ });
}
void ShardRegistry::shutdownPeriodicReloader() {
@@ -254,52 +261,6 @@ void ShardRegistry::shutdown() {
}
}
-void ShardRegistry::_periodicReload(const CallbackArgs& cbArgs) {
- LOGV2_DEBUG(22726, 1, "Reloading shardRegistry");
- if (!cbArgs.status.isOK()) {
- LOGV2_WARNING(22734,
- "Error reloading shard registry caused by {error}",
- "Error reloading shard registry",
- "error"_attr = redact(cbArgs.status));
- return;
- }
-
- ThreadClient tc("shard-registry-reload", getGlobalServiceContext());
-
- auto opCtx = tc->makeOperationContext();
-
- auto refreshPeriod = kRefreshPeriod;
-
- try {
- reload(opCtx.get());
- } catch (const DBException& e) {
- LOGV2(22727,
- "Error running periodic reload of shard registry caused by {error}; will retry after "
- "{shardRegistryReloadInterval}",
- "Error running periodic reload of shard registry",
- "error"_attr = redact(e),
- "shardRegistryReloadInterval"_attr = refreshPeriod);
- }
-
- // reschedule itself
- auto status =
- _executor->scheduleWorkAt(_executor->now() + refreshPeriod,
- [this](const CallbackArgs& cbArgs) { _periodicReload(cbArgs); });
-
- if (status.getStatus() == ErrorCodes::ShutdownInProgress) {
- LOGV2_DEBUG(
- 22728, 1, "Error scheduling shard registry reload. Executor shutdown in progress");
- return;
- }
-
- if (!status.isOK()) {
- LOGV2_FATAL(40253,
- "Error scheduling shard registry reload caused by {error}",
- "Error scheduling shard registry reload",
- "error"_attr = redact(status.getStatus()));
- }
-}
-
ConnectionString ShardRegistry::getConfigServerConnectionString() const {
return getConfigShard()->getConnString();
}
@@ -442,16 +403,11 @@ void ShardRegistry::reload(OperationContext* opCtx) {
if (MONGO_unlikely(TestingProctor::instance().isEnabled())) {
// TODO SERVER-62152 investigate hang on reload in unit tests
// Some unit tests don't support running the reload's AsyncTry on the fixed executor.
- _reloadInternal(opCtx);
+ _reloadInternal().get(opCtx);
} else {
- AsyncTry([=]() mutable {
- ThreadClient tc("ShardRegistry::reload", getGlobalServiceContext());
- auto innerOpCtx = tc->makeOperationContext();
-
- _reloadInternal(innerOpCtx.get());
- })
- .until([](Status status) mutable {
- return status != ErrorCodes::ReadConcernMajorityNotAvailableYet;
+ AsyncTry([=]() mutable { return _reloadInternal(); })
+ .until([](auto sw) mutable {
+ return sw.getStatus() != ErrorCodes::ReadConcernMajorityNotAvailableYet;
})
.withBackoffBetweenIterations(kExponentialBackoff)
.on(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(),
@@ -461,13 +417,13 @@ void ShardRegistry::reload(OperationContext* opCtx) {
}
}
-void ShardRegistry::_reloadInternal(OperationContext* opCtx) {
+SharedSemiFuture<ShardRegistry::Cache::ValueHandle> ShardRegistry::_reloadInternal() {
// Make the next acquire do a lookup.
auto value = _forceReloadIncrement.addAndFetch(1);
LOGV2_DEBUG(4620253, 2, "Forcing ShardRegistry reload", "newForceReloadIncrement"_attr = value);
// Force it to actually happen now.
- _getData(opCtx);
+ return _getDataAsync();
}
void ShardRegistry::clearEntries() {
diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h
index fa241007731..0bb6196d2c3 100644
--- a/src/mongo/s/client/shard_registry.h
+++ b/src/mongo/s/client/shard_registry.h
@@ -438,9 +438,7 @@ private:
void _initializeCacheIfNecessary() const;
- void _periodicReload(const executor::TaskExecutor::CallbackArgs& cbArgs);
-
- void _reloadInternal(OperationContext* opCtx);
+ SharedSemiFuture<Cache::ValueHandle> _reloadInternal();
/**
* Factory to create shards. Never changed after startup so safe to access outside of _mutex.
@@ -463,7 +461,7 @@ private:
ThreadPool _threadPool;
// Executor for periodically reloading the registry (ie. in which _periodicReload() runs).
- std::unique_ptr<executor::TaskExecutor> _executor{};
+ std::shared_ptr<executor::TaskExecutor> _executor{};
mutable Mutex _cacheMutex = MONGO_MAKE_LATCH("ShardRegistry::_cacheMutex");
std::unique_ptr<Cache> _cache;