diff options
Diffstat (limited to 'src/mongo/db/commands/run_aggregate.cpp')
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 66 |
1 files changed, 28 insertions, 38 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index f371ef7b1a0..8b7d7fd7863 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -400,8 +400,7 @@ Status runAggregate(OperationContext* opCtx, // where the collation has not yet been resolved, and where it has been resolved to nullptr. boost::optional<std::unique_ptr<CollatorInterface>> collatorToUse; - // The UUID of the collection for the execution namespace of this aggregation. For change - // streams, this will be the UUID of the original namespace instead of the oplog namespace. + // The UUID of the collection for the execution namespace of this aggregation. boost::optional<UUID> uuid; std::vector<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> execs; @@ -425,28 +424,6 @@ Status runAggregate(OperationContext* opCtx, return ex.toStatus(); } - if (liteParsedPipeline.hasChangeStream()) { - nss = NamespaceString::kRsOplogNamespace; - - // Upgrade and wait for read concern if necessary. - _adjustChangeStreamReadConcern(opCtx); - - if (liteParsedPipeline.shouldResolveUUIDAndCollation()) { - // AutoGetCollectionForReadCommand will raise an error if 'origNss' is a view. - AutoGetCollectionForReadCommand origNssCtx(opCtx, origNss); - - // Resolve the collator to either the user-specified collation or the default - // collation of the collection on which $changeStream was invoked, so that we do not - // end up resolving the collation on the oplog. - invariant(!collatorToUse); - Collection* origColl = origNssCtx.getCollection(); - collatorToUse.emplace(resolveCollator(opCtx, request, origColl)); - - // Get the collection UUID to be set on the expression context. - uuid = origColl ? origColl->uuid() : boost::none; - } - } - const auto& pipelineInvolvedNamespaces = liteParsedPipeline.getInvolvedNamespaces(); // If emplaced, AutoGetCollectionForReadCommand will throw if the sharding version for this @@ -458,31 +435,44 @@ Status runAggregate(OperationContext* opCtx, // AutoStatsTracker to record CurOp and Top entries. boost::optional<AutoStatsTracker> statsTracker; - // If this is a collectionless aggregation with no foreign namespaces, we don't want to - // acquire any locks. Otherwise, lock the collection or view. - if (nss.isCollectionlessAggregateNS() && pipelineInvolvedNamespaces.empty()) { + // If this is a change stream, perform special checks and change the execution namespace. + if (liteParsedPipeline.hasChangeStream()) { + // Replace the execution namespace with that of the oplog. + nss = NamespaceString::kRsOplogNamespace; + + // Upgrade and wait for read concern if necessary. + _adjustChangeStreamReadConcern(opCtx); + + // AutoGetCollectionForReadCommand will raise an error if 'origNss' is a view. We do not + // need to check this if we are opening a stream on an entire db or across the cluster. + if (!origNss.isCollectionlessAggregateNS()) { + AutoGetCollectionForReadCommand origNssCtx(opCtx, origNss); + } + + // If the user specified an explicit collation, adopt it; otherwise, use the simple + // collation. We do not inherit the collection's default collation or UUID, since + // the stream may be resuming from a point before the current UUID existed. + collatorToUse.emplace(resolveCollator(opCtx, request, nullptr)); + + // Obtain collection locks on the execution namespace; that is, the oplog. + ctx.emplace(opCtx, nss, AutoGetCollection::ViewMode::kViewsForbidden); + } else if (nss.isCollectionlessAggregateNS() && pipelineInvolvedNamespaces.empty()) { + // If this is a collectionless agg with no foreign namespaces, don't acquire any locks. statsTracker.emplace(opCtx, nss, Top::LockType::NotLocked, AutoStatsTracker::LogMode::kUpdateTopAndCurop, 0); + collatorToUse.emplace(resolveCollator(opCtx, request, nullptr)); } else { + // This is a regular aggregation. Lock the collection or view. ctx.emplace(opCtx, nss, AutoGetCollection::ViewMode::kViewsPermitted); + collatorToUse.emplace(resolveCollator(opCtx, request, ctx->getCollection())); + uuid = ctx->getCollection() ? ctx->getCollection()->uuid() : boost::none; } Collection* collection = ctx ? ctx->getCollection() : nullptr; - // For change streams, the UUID will already have been set for the original namespace. - if (!liteParsedPipeline.hasChangeStream()) { - uuid = collection ? collection->uuid() : boost::none; - } - - // The collator may already have been set if this is a $changeStream pipeline. If not, - // resolve the collator to either the user-specified collation or the collection default. - if (!collatorToUse) { - collatorToUse.emplace(resolveCollator(opCtx, request, collection)); - } - // If this is a view, resolve it by finding the underlying collection and stitching view // pipelines and this request's pipeline together. We then release our locks before // recursively calling runAggregate(), which will re-acquire locks on the underlying |