summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTommaso Tocci <tommaso.tocci@mongodb.com>2022-07-08 08:15:24 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-07-11 21:46:35 +0000
commit4f9e10bb3019091f565188285db6717b65e4d74d (patch)
tree550b52d439c3b8022e28aa4ce7b1495d58cad460
parent894acad323188f784c0256192825d13e531e75f3 (diff)
downloadmongo-4f9e10bb3019091f565188285db6717b65e4d74d.tar.gz
SERVER-57519 Make ARS use causally consistent ShardRegistry::getShard() function
(cherry picked from commit 527dfe85a1771586339b34a177b3f7954aa2793b)
-rw-r--r--src/mongo/db/s/sharding_ddl_util.cpp9
-rw-r--r--src/mongo/s/async_requests_sender.cpp97
-rw-r--r--src/mongo/s/async_requests_sender.h10
-rw-r--r--src/mongo/s/client/shard_registry.cpp64
-rw-r--r--src/mongo/s/client/shard_registry.h6
5 files changed, 116 insertions, 70 deletions
diff --git a/src/mongo/db/s/sharding_ddl_util.cpp b/src/mongo/db/s/sharding_ddl_util.cpp
index ea068f5d654..b774f8478b8 100644
--- a/src/mongo/db/s/sharding_ddl_util.cpp
+++ b/src/mongo/db/s/sharding_ddl_util.cpp
@@ -195,15 +195,6 @@ std::vector<AsyncRequestsSender::Response> sendAuthenticatedCommandToShards(
const BSONObj& command,
const std::vector<ShardId>& shardIds,
const std::shared_ptr<executor::TaskExecutor>& executor) {
- // TODO SERVER-57519: remove the following scope
- {
- // Ensure ShardRegistry is initialized before using the AsyncRequestsSender that relies on
- // unsafe functions (SERVER-57280)
- auto shardRegistry = Grid::get(opCtx)->shardRegistry();
- if (!shardRegistry->isUp()) {
- shardRegistry->reload(opCtx);
- }
- }
// The AsyncRequestsSender ignore impersonation metadata so we need to manually attach them to
// the command
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp
index 045404c4fad..0eba85b1893 100644
--- a/src/mongo/s/async_requests_sender.cpp
+++ b/src/mongo/s/async_requests_sender.cpp
@@ -151,9 +151,10 @@ AsyncRequestsSender::RemoteData::RemoteData(AsyncRequestsSender* ars,
BSONObj cmdObj)
: _ars(ars), _shardId(std::move(shardId)), _cmdObj(std::move(cmdObj)) {}
-std::shared_ptr<Shard> AsyncRequestsSender::RemoteData::getShard() {
- // TODO: Pass down an OperationContext* to use here.
- return Grid::get(getGlobalServiceContext())->shardRegistry()->getShardNoReload(_shardId);
+SemiFuture<std::shared_ptr<Shard>> AsyncRequestsSender::RemoteData::getShard() noexcept {
+ return Grid::get(getGlobalServiceContext())
+ ->shardRegistry()
+ ->getShard(*_ars->_subBaton, _shardId);
}
void AsyncRequestsSender::RemoteData::executeRequest() {
@@ -173,7 +174,12 @@ void AsyncRequestsSender::RemoteData::executeRequest() {
auto AsyncRequestsSender::RemoteData::scheduleRequest()
-> SemiFuture<RemoteCommandOnAnyCallbackArgs> {
- return resolveShardIdToHostAndPorts(_ars->_readPreference)
+ return getShard()
+ .thenRunOn(*_ars->_subBaton)
+ .then([this](auto&& shard) {
+ return shard->getTargeter()->findHosts(_ars->_readPreference,
+ CancellationToken::uncancelable());
+ })
.thenRunOn(*_ars->_subBaton)
.then([this](auto&& hostAndPorts) {
_shardHostAndPort.emplace(hostAndPorts.front());
@@ -183,17 +189,6 @@ auto AsyncRequestsSender::RemoteData::scheduleRequest()
.semi();
}
-SemiFuture<std::vector<HostAndPort>> AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPorts(
- const ReadPreferenceSetting& readPref) {
- const auto shard = getShard();
- if (!shard) {
- return Status(ErrorCodes::ShardNotFound,
- str::stream() << "Could not find shard " << _shardId);
- }
-
- return shard->getTargeter()->findHosts(readPref, CancellationToken::uncancelable());
-}
-
auto AsyncRequestsSender::RemoteData::scheduleRemoteCommand(std::vector<HostAndPort>&& hostAndPorts)
-> SemiFuture<RemoteCommandOnAnyCallbackArgs> {
hangBeforeSchedulingRemoteCommand.executeIf(
@@ -259,43 +254,47 @@ auto AsyncRequestsSender::RemoteData::handleResponse(RemoteCommandOnAnyCallbackA
}
// There was an error with either the response or the command.
- auto shard = getShard();
- if (!shard) {
- uasserted(ErrorCodes::ShardNotFound, str::stream() << "Could not find shard " << _shardId);
- } else {
- std::vector<HostAndPort> failedTargets;
-
- if (rcr.response.target) {
- failedTargets = {*rcr.response.target};
- } else {
- failedTargets = rcr.request.target;
- }
+ return getShard()
+ .thenRunOn(*_ars->_subBaton)
+ .then([this, status = std::move(status), rcr = std::move(rcr)](
+ std::shared_ptr<mongo::Shard>&& shard) {
+ std::vector<HostAndPort> failedTargets;
- shard->updateReplSetMonitor(failedTargets.front(), status);
- bool isStartingTransaction = _cmdObj.getField("startTransaction").booleanSafe();
- if (!_ars->_stopRetrying && shard->isRetriableError(status.code(), _ars->_retryPolicy) &&
- _retryCount < kMaxNumFailedHostRetryAttempts && !isStartingTransaction) {
-
- LOGV2_DEBUG(4615637,
- 1,
- "Command to remote {shardId} for hosts {hosts} failed with retryable error "
- "{error} and will be retried",
- "Command to remote shard failed with retryable error and will be retried",
- "shardId"_attr = _shardId,
- "hosts"_attr = failedTargets,
- "error"_attr = redact(status));
- ++_retryCount;
- _shardHostAndPort.reset();
- // retry through recursion
- return scheduleRequest();
- }
- }
+ if (rcr.response.target) {
+ failedTargets = {*rcr.response.target};
+ } else {
+ failedTargets = rcr.request.target;
+ }
- // Status' in the response.status field that aren't retried get converted to top level errors
- uassertStatusOK(rcr.response.status);
+ shard->updateReplSetMonitor(failedTargets.front(), status);
+ bool isStartingTransaction = _cmdObj.getField("startTransaction").booleanSafe();
+ if (!_ars->_stopRetrying &&
+ shard->isRetriableError(status.code(), _ars->_retryPolicy) &&
+ _retryCount < kMaxNumFailedHostRetryAttempts && !isStartingTransaction) {
+
+ LOGV2_DEBUG(
+ 4615637,
+ 1,
+ "Command to remote {shardId} for hosts {hosts} failed with retryable error "
+ "{error} and will be retried",
+ "Command to remote shard failed with retryable error and will be retried",
+ "shardId"_attr = _shardId,
+ "hosts"_attr = failedTargets,
+ "error"_attr = redact(status));
+ ++_retryCount;
+ _shardHostAndPort.reset();
+ // retry through recursion
+ return scheduleRequest();
+ }
+
+ // Status' in the response.status field that aren't retried get converted to top level
+ // errors
+ uassertStatusOK(rcr.response.status);
- // We're not okay (on the remote), but still not going to retry
- return std::move(rcr);
+ // We're not okay (on the remote), but still not going to retry
+ return Future<RemoteCommandOnAnyCallbackArgs>::makeReady(std::move(rcr)).semi();
+ })
+ .semi();
};
} // namespace mongo
diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h
index 3e14bec56d3..329bf9b4757 100644
--- a/src/mongo/s/async_requests_sender.h
+++ b/src/mongo/s/async_requests_sender.h
@@ -177,9 +177,15 @@ private:
RemoteData(AsyncRequestsSender* ars, ShardId shardId, BSONObj cmdObj);
/**
- * Returns the Shard object associated with this remote.
+ * Returns a SemiFuture containing a shard object associated with this remote.
+ *
+ * This will return a SemiFuture with a ShardNotFound error status in case the shard is not
+ * found.
+ *
+ * Additionally this call can trigger a refresh of the ShardRegistry so it could possibly
+ * return other network error status related to the refresh.
*/
- std::shared_ptr<Shard> getShard();
+ SemiFuture<std::shared_ptr<Shard>> getShard() noexcept;
/**
* Returns true if we've already queued a response from the remote.
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index fd0b4d56cc4..bff41fe973e 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -218,9 +218,9 @@ void ShardRegistry::startupPeriodicReloader(OperationContext* opCtx) {
AsyncTry([this] {
LOGV2_DEBUG(22726, 1, "Reloading shardRegistry");
- return _reloadInternal();
+ return _reloadAsyncNoRetry();
})
- .until([](auto sw) {
+ .until([](auto&& sw) {
if (!sw.isOK()) {
LOGV2(22727,
"Error running periodic reload of shard registry",
@@ -232,7 +232,7 @@ void ShardRegistry::startupPeriodicReloader(OperationContext* opCtx) {
})
.withDelayBetweenIterations(kRefreshPeriod) // This call is optional.
.on(_executor, CancellationToken::uncancelable())
- .getAsync([](auto sw) {
+ .getAsync([](auto&& sw) {
LOGV2_DEBUG(22725,
1,
"Exiting periodic shard registry reloader",
@@ -295,6 +295,49 @@ StatusWith<std::shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opC
return {ErrorCodes::ShardNotFound, str::stream() << "Shard " << shardId << " not found"};
}
+SemiFuture<std::shared_ptr<Shard>> ShardRegistry::getShard(ExecutorPtr executor,
+ const ShardId& shardId) noexcept {
+
+ // Fetch the shard registry data associated to the latest known topology time
+ return _getDataAsync()
+ .thenRunOn(executor)
+ .then([this, executor, shardId](auto&& cachedData) {
+ // First check if this is a non config shard lookup
+ if (auto shard = cachedData->findShard(shardId)) {
+ return SemiFuture<std::shared_ptr<Shard>>::makeReady(std::move(shard));
+ }
+
+ // then check if this is a config shard (this call is blocking in any case)
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ if (auto shard = _configShardData.findShard(shardId)) {
+ return SemiFuture<std::shared_ptr<Shard>>::makeReady(std::move(shard));
+ }
+ }
+
+ // If the shard was not found, force reload the shard regitry data and try again.
+ //
+ // This is to cover the following scenario:
+ // 1. Primary of the replicaset fetch the list of shards and store it on disk
+ // 2. Primary crash before the latest VectorClock topology time is majority written to
+ // disk
+ // 3. A new primary with a stale ShardRegistry is elected and read the set of shards
+ // from disk and calls ShardRegistry::getShard
+
+ return _reloadAsync()
+ .thenRunOn(executor)
+ .then([this, executor, shardId](auto&& cachedData) -> std::shared_ptr<Shard> {
+ auto shard = cachedData->findShard(shardId);
+ uassert(ErrorCodes::ShardNotFound,
+ str::stream() << "Shard " << shardId << " not found",
+ shard);
+ return shard;
+ })
+ .semi();
+ })
+ .semi();
+}
+
std::vector<ShardId> ShardRegistry::getAllShardIds(OperationContext* opCtx) {
auto shardIds = _getData(opCtx)->getAllShardIds();
if (shardIds.empty()) {
@@ -400,24 +443,27 @@ void ShardRegistry::toBSON(BSONObjBuilder* result) const {
}
void ShardRegistry::reload(OperationContext* opCtx) {
+ _reloadAsync().get(opCtx);
+}
+
+SharedSemiFuture<ShardRegistry::Cache::ValueHandle> ShardRegistry::_reloadAsync() {
if (MONGO_unlikely(TestingProctor::instance().isEnabled())) {
// TODO SERVER-62152 investigate hang on reload in unit tests
// Some unit tests don't support running the reload's AsyncTry on the fixed executor.
- _reloadInternal().get(opCtx);
+ return _reloadAsyncNoRetry();
} else {
- AsyncTry([=]() mutable { return _reloadInternal(); })
+ return AsyncTry([=]() mutable { return _reloadAsyncNoRetry(); })
.until([](auto sw) mutable {
return sw.getStatus() != ErrorCodes::ReadConcernMajorityNotAvailableYet;
})
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(),
+ .on(Grid::get(getGlobalServiceContext())->getExecutorPool()->getFixedExecutor(),
CancellationToken::uncancelable())
- .semi()
- .get(opCtx);
+ .share();
}
}
-SharedSemiFuture<ShardRegistry::Cache::ValueHandle> ShardRegistry::_reloadInternal() {
+SharedSemiFuture<ShardRegistry::Cache::ValueHandle> ShardRegistry::_reloadAsyncNoRetry() {
// Make the next acquire do a lookup.
auto value = _forceReloadIncrement.addAndFetch(1);
LOGV2_DEBUG(4620253, 2, "Forcing ShardRegistry reload", "newForceReloadIncrement"_attr = value);
diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h
index 0bb6196d2c3..02bcd9897c9 100644
--- a/src/mongo/s/client/shard_registry.h
+++ b/src/mongo/s/client/shard_registry.h
@@ -239,6 +239,9 @@ public:
*/
StatusWith<std::shared_ptr<Shard>> getShard(OperationContext* opCtx, const ShardId& shardId);
+ SemiFuture<std::shared_ptr<Shard>> getShard(ExecutorPtr executor,
+ const ShardId& shardId) noexcept;
+
/**
* Returns a vector containing all known shard IDs.
* The order of the elements is not guaranteed.
@@ -438,7 +441,8 @@ private:
void _initializeCacheIfNecessary() const;
- SharedSemiFuture<Cache::ValueHandle> _reloadInternal();
+ SharedSemiFuture<Cache::ValueHandle> _reloadAsync();
+ SharedSemiFuture<Cache::ValueHandle> _reloadAsyncNoRetry();
/**
* Factory to create shards. Never changed after startup so safe to access outside of _mutex.