From e6d0a3ee2b265f2e41454ac6abfded7c19d3c441 Mon Sep 17 00:00:00 2001 From: Tommaso Tocci Date: Fri, 8 Jul 2022 08:15:24 +0000 Subject: SERVER-57519 Make ARS use causally consistent ShardRegistry::getShard() function (cherry picked from commit 527dfe85a1771586339b34a177b3f7954aa2793b) --- src/mongo/db/s/sharding_ddl_util.cpp | 9 ---- src/mongo/s/async_requests_sender.cpp | 97 +++++++++++++++++------------------ src/mongo/s/async_requests_sender.h | 10 +++- src/mongo/s/client/shard_registry.cpp | 64 +++++++++++++++++++---- src/mongo/s/client/shard_registry.h | 6 ++- 5 files changed, 116 insertions(+), 70 deletions(-) (limited to 'src') diff --git a/src/mongo/db/s/sharding_ddl_util.cpp b/src/mongo/db/s/sharding_ddl_util.cpp index d516fd5e668..1149eb9977c 100644 --- a/src/mongo/db/s/sharding_ddl_util.cpp +++ b/src/mongo/db/s/sharding_ddl_util.cpp @@ -193,15 +193,6 @@ std::vector sendAuthenticatedCommandToShards( const BSONObj& command, const std::vector& shardIds, const std::shared_ptr& executor) { - // TODO SERVER-57519: remove the following scope - { - // Ensure ShardRegistry is initialized before using the AsyncRequestsSender that relies on - // unsafe functions (SERVER-57280) - auto shardRegistry = Grid::get(opCtx)->shardRegistry(); - if (!shardRegistry->isUp()) { - shardRegistry->reload(opCtx); - } - } // The AsyncRequestsSender ignore impersonation metadata so we need to manually attach them to // the command diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp index 1fceefc428e..9cf89c655c9 100644 --- a/src/mongo/s/async_requests_sender.cpp +++ b/src/mongo/s/async_requests_sender.cpp @@ -170,9 +170,10 @@ AsyncRequestsSender::RemoteData::RemoteData(AsyncRequestsSender* ars, BSONObj cmdObj) : _ars(ars), _shardId(std::move(shardId)), _cmdObj(std::move(cmdObj)) {} -std::shared_ptr AsyncRequestsSender::RemoteData::getShard() { - // TODO: Pass down an OperationContext* to use here. - return Grid::get(getGlobalServiceContext())->shardRegistry()->getShardNoReload(_shardId); +SemiFuture> AsyncRequestsSender::RemoteData::getShard() noexcept { + return Grid::get(getGlobalServiceContext()) + ->shardRegistry() + ->getShard(*_ars->_subBaton, _shardId); } void AsyncRequestsSender::RemoteData::executeRequest() { @@ -192,7 +193,12 @@ void AsyncRequestsSender::RemoteData::executeRequest() { auto AsyncRequestsSender::RemoteData::scheduleRequest() -> SemiFuture { - return resolveShardIdToHostAndPorts(_ars->_readPreference) + return getShard() + .thenRunOn(*_ars->_subBaton) + .then([this](auto&& shard) { + return shard->getTargeter()->findHosts(_ars->_readPreference, + CancellationToken::uncancelable()); + }) .thenRunOn(*_ars->_subBaton) .then([this](auto&& hostAndPorts) { _shardHostAndPort.emplace(hostAndPorts.front()); @@ -202,17 +208,6 @@ auto AsyncRequestsSender::RemoteData::scheduleRequest() .semi(); } -SemiFuture> AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPorts( - const ReadPreferenceSetting& readPref) { - const auto shard = getShard(); - if (!shard) { - return Status(ErrorCodes::ShardNotFound, - str::stream() << "Could not find shard " << _shardId); - } - - return shard->getTargeter()->findHosts(readPref, CancellationToken::uncancelable()); -} - auto AsyncRequestsSender::RemoteData::scheduleRemoteCommand(std::vector&& hostAndPorts) -> SemiFuture { hangBeforeSchedulingRemoteCommand.executeIf( @@ -278,43 +273,47 @@ auto AsyncRequestsSender::RemoteData::handleResponse(RemoteCommandOnAnyCallbackA } // There was an error with either the response or the command. - auto shard = getShard(); - if (!shard) { - uasserted(ErrorCodes::ShardNotFound, str::stream() << "Could not find shard " << _shardId); - } else { - std::vector failedTargets; - - if (rcr.response.target) { - failedTargets = {*rcr.response.target}; - } else { - failedTargets = rcr.request.target; - } + return getShard() + .thenRunOn(*_ars->_subBaton) + .then([this, status = std::move(status), rcr = std::move(rcr)]( + std::shared_ptr&& shard) { + std::vector failedTargets; - shard->updateReplSetMonitor(failedTargets.front(), status); - bool isStartingTransaction = _cmdObj.getField("startTransaction").booleanSafe(); - if (!_ars->_stopRetrying && shard->isRetriableError(status.code(), _ars->_retryPolicy) && - _retryCount < kMaxNumFailedHostRetryAttempts && !isStartingTransaction) { - - LOGV2_DEBUG(4615637, - 1, - "Command to remote {shardId} for hosts {hosts} failed with retryable error " - "{error} and will be retried", - "Command to remote shard failed with retryable error and will be retried", - "shardId"_attr = _shardId, - "hosts"_attr = failedTargets, - "error"_attr = redact(status)); - ++_retryCount; - _shardHostAndPort.reset(); - // retry through recursion - return scheduleRequest(); - } - } + if (rcr.response.target) { + failedTargets = {*rcr.response.target}; + } else { + failedTargets = rcr.request.target; + } - // Status' in the response.status field that aren't retried get converted to top level errors - uassertStatusOK(rcr.response.status); + shard->updateReplSetMonitor(failedTargets.front(), status); + bool isStartingTransaction = _cmdObj.getField("startTransaction").booleanSafe(); + if (!_ars->_stopRetrying && + shard->isRetriableError(status.code(), _ars->_retryPolicy) && + _retryCount < kMaxNumFailedHostRetryAttempts && !isStartingTransaction) { + + LOGV2_DEBUG( + 4615637, + 1, + "Command to remote {shardId} for hosts {hosts} failed with retryable error " + "{error} and will be retried", + "Command to remote shard failed with retryable error and will be retried", + "shardId"_attr = _shardId, + "hosts"_attr = failedTargets, + "error"_attr = redact(status)); + ++_retryCount; + _shardHostAndPort.reset(); + // retry through recursion + return scheduleRequest(); + } + + // Status' in the response.status field that aren't retried get converted to top level + // errors + uassertStatusOK(rcr.response.status); - // We're not okay (on the remote), but still not going to retry - return std::move(rcr); + // We're not okay (on the remote), but still not going to retry + return Future::makeReady(std::move(rcr)).semi(); + }) + .semi(); }; } // namespace mongo diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h index 432227356cc..27d22bc5511 100644 --- a/src/mongo/s/async_requests_sender.h +++ b/src/mongo/s/async_requests_sender.h @@ -179,9 +179,15 @@ private: RemoteData(AsyncRequestsSender* ars, ShardId shardId, BSONObj cmdObj); /** - * Returns the Shard object associated with this remote. + * Returns a SemiFuture containing a shard object associated with this remote. + * + * This will return a SemiFuture with a ShardNotFound error status in case the shard is not + * found. + * + * Additionally this call can trigger a refresh of the ShardRegistry so it could possibly + * return other network error status related to the refresh. */ - std::shared_ptr getShard(); + SemiFuture> getShard() noexcept; /** * Returns true if we've already queued a response from the remote. diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 61e5c5bd294..0b1691af82b 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -207,9 +207,9 @@ void ShardRegistry::startupPeriodicReloader(OperationContext* opCtx) { AsyncTry([this] { LOGV2_DEBUG(22726, 1, "Reloading shardRegistry"); - return _reloadInternal(); + return _reloadAsyncNoRetry(); }) - .until([](auto sw) { + .until([](auto&& sw) { if (!sw.isOK()) { LOGV2(22727, "Error running periodic reload of shard registry", @@ -221,7 +221,7 @@ void ShardRegistry::startupPeriodicReloader(OperationContext* opCtx) { }) .withDelayBetweenIterations(kRefreshPeriod) // This call is optional. .on(_executor, CancellationToken::uncancelable()) - .getAsync([](auto sw) { + .getAsync([](auto&& sw) { LOGV2_DEBUG(22725, 1, "Exiting periodic shard registry reloader", @@ -284,6 +284,49 @@ StatusWith> ShardRegistry::getShard(OperationContext* opC return {ErrorCodes::ShardNotFound, str::stream() << "Shard " << shardId << " not found"}; } +SemiFuture> ShardRegistry::getShard(ExecutorPtr executor, + const ShardId& shardId) noexcept { + + // Fetch the shard registry data associated to the latest known topology time + return _getDataAsync() + .thenRunOn(executor) + .then([this, executor, shardId](auto&& cachedData) { + // First check if this is a non config shard lookup + if (auto shard = cachedData->findShard(shardId)) { + return SemiFuture>::makeReady(std::move(shard)); + } + + // then check if this is a config shard (this call is blocking in any case) + { + stdx::lock_guard lk(_mutex); + if (auto shard = _configShardData.findShard(shardId)) { + return SemiFuture>::makeReady(std::move(shard)); + } + } + + // If the shard was not found, force reload the shard regitry data and try again. + // + // This is to cover the following scenario: + // 1. Primary of the replicaset fetch the list of shards and store it on disk + // 2. Primary crash before the latest VectorClock topology time is majority written to + // disk + // 3. A new primary with a stale ShardRegistry is elected and read the set of shards + // from disk and calls ShardRegistry::getShard + + return _reloadAsync() + .thenRunOn(executor) + .then([this, executor, shardId](auto&& cachedData) -> std::shared_ptr { + auto shard = cachedData->findShard(shardId); + uassert(ErrorCodes::ShardNotFound, + str::stream() << "Shard " << shardId << " not found", + shard); + return shard; + }) + .semi(); + }) + .semi(); +} + std::vector ShardRegistry::getAllShardIds(OperationContext* opCtx) { auto shardIds = _getData(opCtx)->getAllShardIds(); if (shardIds.empty()) { @@ -399,23 +442,26 @@ void ShardRegistry::toBSON(BSONObjBuilder* result) const { } void ShardRegistry::reload(OperationContext* opCtx) { + _reloadAsync().get(opCtx); +} + +SharedSemiFuture ShardRegistry::_reloadAsync() { if (MONGO_unlikely(TestingProctor::instance().isEnabled())) { // Some unit tests don't support running the reload's AsyncTry on the fixed executor. - _reloadInternal().get(opCtx); + return _reloadAsyncNoRetry(); } else { - AsyncTry([=]() mutable { return _reloadInternal(); }) + return AsyncTry([=]() mutable { return _reloadAsyncNoRetry(); }) .until([](auto sw) mutable { return sw.getStatus() != ErrorCodes::ReadConcernMajorityNotAvailableYet; }) .withBackoffBetweenIterations(kExponentialBackoff) - .on(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), + .on(Grid::get(getGlobalServiceContext())->getExecutorPool()->getFixedExecutor(), CancellationToken::uncancelable()) - .semi() - .get(opCtx); + .share(); } } -SharedSemiFuture ShardRegistry::_reloadInternal() { +SharedSemiFuture ShardRegistry::_reloadAsyncNoRetry() { // Make the next acquire do a lookup. auto value = _forceReloadIncrement.addAndFetch(1); LOGV2_DEBUG(4620253, 2, "Forcing ShardRegistry reload", "newForceReloadIncrement"_attr = value); diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h index df7c5a75977..aceb45db9d2 100644 --- a/src/mongo/s/client/shard_registry.h +++ b/src/mongo/s/client/shard_registry.h @@ -239,6 +239,9 @@ public: */ StatusWith> getShard(OperationContext* opCtx, const ShardId& shardId); + SemiFuture> getShard(ExecutorPtr executor, + const ShardId& shardId) noexcept; + /** * Returns a vector containing all known shard IDs. * The order of the elements is not guaranteed. @@ -438,7 +441,8 @@ private: void _initializeCacheIfNecessary() const; - SharedSemiFuture _reloadInternal(); + SharedSemiFuture _reloadAsync(); + SharedSemiFuture _reloadAsyncNoRetry(); /** * Factory to create shards. Never changed after startup so safe to access outside of _mutex. -- cgit v1.2.1