summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/sharded_agg_helpers.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/sharded_agg_helpers.cpp')
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp57
1 files changed, 57 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 21bb1d87f37..288c59814bf 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -1296,6 +1296,63 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(
return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead(
pipelineToTarget.release());
}
+
+ auto cm = catalogCache->getCollectionRoutingInfo(expCtx->opCtx, expCtx->ns);
+ if (cm.isOK() && !cm.getValue().isSharded()) {
+ // If the collection is unsharded and we are on the primary, we should be able to
+ // do a local read. The primary may be moved right after the primary shard check,
+ // but the local read path will do a db version check before it establishes a cursor
+ // to catch this case and ensure we fail to read locally.
+ auto setDbVersion = false;
+ try {
+ expCtx->mongoProcessInterface->setExpectedShardVersion(
+ expCtx->opCtx, expCtx->ns, ChunkVersion::UNSHARDED());
+ setDbVersion = expCtx->mongoProcessInterface->setExpectedDbVersion(
+ expCtx->opCtx, expCtx->ns, cm.getValue().dbVersion());
+
+ // During 'attachCursorSourceToPipelineForLocalRead', the expected db version
+ // must be set. Whether or not that call is succesful, to avoid affecting later
+ // operations on this db, we should unset the expected db version on the opCtx,
+ // if we set it above.
+ ScopeGuard dbVersionUnsetter([&]() {
+ if (setDbVersion) {
+ expCtx->mongoProcessInterface->unsetExpectedDbVersion(expCtx->opCtx,
+ expCtx->ns);
+ }
+ });
+
+ expCtx->mongoProcessInterface->checkOnPrimaryShardForDb(expCtx->opCtx,
+ expCtx->ns);
+
+ LOGV2_DEBUG(5837600,
+ 3,
+ "Performing local read",
+ "ns"_attr = expCtx->ns,
+ "pipeline"_attr = pipelineToTarget->serializeToBson(),
+ "comment"_attr = expCtx->opCtx->getComment());
+
+ return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead(
+ pipelineToTarget.release());
+ } catch (ExceptionFor<ErrorCodes::IllegalOperation>&) {
+ // The current node isn't the primary for or has stale information about this
+ // collection, proceed with shard targeting.
+ } catch (ExceptionFor<ErrorCodes::StaleDbVersion>&) {
+ // The current node has stale information about this collection, proceed with
+ // shard targeting, which has logic to handle refreshing that may be needed.
+ } catch (ExceptionForCat<ErrorCategory::StaleShardVersionError>&) {
+ // The current node has stale information about this collection, proceed with
+ // shard targeting, which has logic to handle refreshing that may be needed.
+ } catch (ExceptionFor<ErrorCodes::CommandNotSupportedOnView>&) {
+ // The current node may be trying to run a pipeline on a namespace which is an
+ // unresolved view, proceed with shard targeting,
+ }
+
+ // The local read failed. Recreate 'pipelineToTarget' if it was released above.
+ if (!pipelineToTarget) {
+ pipelineToTarget = pipeline->clone();
+ }
+ }
+
return targetShardsAndAddMergeCursors(expCtx,
std::move(pipelineToTarget),
boost::none,