diff options
Diffstat (limited to 'src/mongo/db/pipeline/sharded_agg_helpers.cpp')
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.cpp | 57 |
1 files changed, 57 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 21bb1d87f37..288c59814bf 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -1296,6 +1296,63 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline( return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead( pipelineToTarget.release()); } + + auto cm = catalogCache->getCollectionRoutingInfo(expCtx->opCtx, expCtx->ns); + if (cm.isOK() && !cm.getValue().isSharded()) { + // If the collection is unsharded and we are on the primary, we should be able to + // do a local read. The primary may be moved right after the primary shard check, + // but the local read path will do a db version check before it establishes a cursor + // to catch this case and ensure we fail to read locally. + auto setDbVersion = false; + try { + expCtx->mongoProcessInterface->setExpectedShardVersion( + expCtx->opCtx, expCtx->ns, ChunkVersion::UNSHARDED()); + setDbVersion = expCtx->mongoProcessInterface->setExpectedDbVersion( + expCtx->opCtx, expCtx->ns, cm.getValue().dbVersion()); + + // During 'attachCursorSourceToPipelineForLocalRead', the expected db version + // must be set. Whether or not that call is succesful, to avoid affecting later + // operations on this db, we should unset the expected db version on the opCtx, + // if we set it above. + ScopeGuard dbVersionUnsetter([&]() { + if (setDbVersion) { + expCtx->mongoProcessInterface->unsetExpectedDbVersion(expCtx->opCtx, + expCtx->ns); + } + }); + + expCtx->mongoProcessInterface->checkOnPrimaryShardForDb(expCtx->opCtx, + expCtx->ns); + + LOGV2_DEBUG(5837600, + 3, + "Performing local read", + "ns"_attr = expCtx->ns, + "pipeline"_attr = pipelineToTarget->serializeToBson(), + "comment"_attr = expCtx->opCtx->getComment()); + + return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead( + pipelineToTarget.release()); + } catch (ExceptionFor<ErrorCodes::IllegalOperation>&) { + // The current node isn't the primary for or has stale information about this + // collection, proceed with shard targeting. + } catch (ExceptionFor<ErrorCodes::StaleDbVersion>&) { + // The current node has stale information about this collection, proceed with + // shard targeting, which has logic to handle refreshing that may be needed. + } catch (ExceptionForCat<ErrorCategory::StaleShardVersionError>&) { + // The current node has stale information about this collection, proceed with + // shard targeting, which has logic to handle refreshing that may be needed. + } catch (ExceptionFor<ErrorCodes::CommandNotSupportedOnView>&) { + // The current node may be trying to run a pipeline on a namespace which is an + // unresolved view, proceed with shard targeting, + } + + // The local read failed. Recreate 'pipelineToTarget' if it was released above. + if (!pipelineToTarget) { + pipelineToTarget = pipeline->clone(); + } + } + return targetShardsAndAddMergeCursors(expCtx, std::move(pipelineToTarget), boost::none, |