summaryrefslogtreecommitdiff
path: root/src/mongo/db/service_entry_point_common.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/service_entry_point_common.cpp')
-rw-r--r--src/mongo/db/service_entry_point_common.cpp55
1 files changed, 50 insertions, 5 deletions
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index ab449e2b9a7..1ce882c43cb 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -666,6 +666,8 @@ private:
boost::optional<ResourceConsumption::ScopedMetricsCollector> _scopedMetrics;
boost::optional<ImpersonationSessionGuard> _impersonationSessionGuard;
std::unique_ptr<PolymorphicScoped> _scoped;
+ bool _refreshedDatabase = false;
+ bool _refreshedCollection = false;
};
class RunCommandImpl : public std::enable_shared_from_this<RunCommandImpl> {
@@ -1556,15 +1558,60 @@ Future<void> ExecCommandDatabase::_initiateCommand() try {
rpc::TrackingMetadata::get(opCtx).setIsLogged(true);
}
- _execContext->behaviors->waitForReadConcern(opCtx, _invocation.get(), request);
- _execContext->behaviors->setPrepareConflictBehaviorForReadConcern(opCtx, _invocation.get());
return Status::OK();
} catch (const DBException& ex) {
return ex.toStatus();
}
Future<void> ExecCommandDatabase::_commandExec() {
- return RunCommandImpl::run(shared_from_this());
+ auto opCtx = _execContext->getOpCtx();
+ auto& request = _execContext->getRequest();
+
+ _execContext->behaviors->waitForReadConcern(opCtx, _invocation.get(), request);
+ _execContext->behaviors->setPrepareConflictBehaviorForReadConcern(opCtx, _invocation.get());
+ _execContext->getReplyBuilder()->reset();
+
+ return RunCommandImpl::run(shared_from_this())
+ .onError<ErrorCodes::StaleDbVersion>(
+ [this, anchor = shared_from_this()](Status s) -> Future<void> {
+ auto opCtx = _execContext->getOpCtx();
+
+ if (!opCtx->getClient()->isInDirectClient() &&
+ serverGlobalParams.clusterRole != ClusterRole::ConfigServer &&
+ !_refreshedDatabase) {
+ auto sce = s.extraInfo<StaleDbRoutingVersion>();
+ invariant(sce);
+ // TODO SERVER-52784 refresh only if wantedVersion is empty or less then
+ // received
+ const auto refreshed = _execContext->behaviors->refreshDatabase(opCtx, *sce);
+ if (refreshed) {
+ _refreshedDatabase = true;
+ return _commandExec();
+ }
+ }
+
+ return s;
+ })
+ .onErrorCategory<ErrorCategory::StaleShardVersionError>([this, anchor = shared_from_this()](
+ Status s) -> Future<void> {
+ auto opCtx = _execContext->getOpCtx();
+
+ if (!opCtx->getClient()->isInDirectClient() &&
+ serverGlobalParams.clusterRole != ClusterRole::ConfigServer &&
+ !_refreshedCollection) {
+ if (auto sce = s.extraInfo<StaleConfigInfo>()) {
+ // TODO SERVER-52784 refresh only if wantedVersion is empty or less then
+ // received
+ const auto refreshed = _execContext->behaviors->refreshCollection(opCtx, *sce);
+ if (refreshed) {
+ _refreshedCollection = true;
+ return _commandExec();
+ }
+ }
+ }
+
+ return s;
+ });
}
void ExecCommandDatabase::_handleFailure(Status status) {
@@ -1578,8 +1625,6 @@ void ExecCommandDatabase::_handleFailure(Status status) {
auto replyBuilder = _execContext->getReplyBuilder();
const auto& behaviors = *_execContext->behaviors;
- behaviors.handleException(status, opCtx);
-
// Append the error labels for transient transaction errors.
auto response = _extraFieldsBuilder.asTempObj();
boost::optional<ErrorCodes::Error> wcCode;