summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/run_aggregate.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/commands/run_aggregate.cpp')
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp66
1 files changed, 28 insertions, 38 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index f371ef7b1a0..8b7d7fd7863 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -400,8 +400,7 @@ Status runAggregate(OperationContext* opCtx,
// where the collation has not yet been resolved, and where it has been resolved to nullptr.
boost::optional<std::unique_ptr<CollatorInterface>> collatorToUse;
- // The UUID of the collection for the execution namespace of this aggregation. For change
- // streams, this will be the UUID of the original namespace instead of the oplog namespace.
+ // The UUID of the collection for the execution namespace of this aggregation.
boost::optional<UUID> uuid;
std::vector<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> execs;
@@ -425,28 +424,6 @@ Status runAggregate(OperationContext* opCtx,
return ex.toStatus();
}
- if (liteParsedPipeline.hasChangeStream()) {
- nss = NamespaceString::kRsOplogNamespace;
-
- // Upgrade and wait for read concern if necessary.
- _adjustChangeStreamReadConcern(opCtx);
-
- if (liteParsedPipeline.shouldResolveUUIDAndCollation()) {
- // AutoGetCollectionForReadCommand will raise an error if 'origNss' is a view.
- AutoGetCollectionForReadCommand origNssCtx(opCtx, origNss);
-
- // Resolve the collator to either the user-specified collation or the default
- // collation of the collection on which $changeStream was invoked, so that we do not
- // end up resolving the collation on the oplog.
- invariant(!collatorToUse);
- Collection* origColl = origNssCtx.getCollection();
- collatorToUse.emplace(resolveCollator(opCtx, request, origColl));
-
- // Get the collection UUID to be set on the expression context.
- uuid = origColl ? origColl->uuid() : boost::none;
- }
- }
-
const auto& pipelineInvolvedNamespaces = liteParsedPipeline.getInvolvedNamespaces();
// If emplaced, AutoGetCollectionForReadCommand will throw if the sharding version for this
@@ -458,31 +435,44 @@ Status runAggregate(OperationContext* opCtx,
// AutoStatsTracker to record CurOp and Top entries.
boost::optional<AutoStatsTracker> statsTracker;
- // If this is a collectionless aggregation with no foreign namespaces, we don't want to
- // acquire any locks. Otherwise, lock the collection or view.
- if (nss.isCollectionlessAggregateNS() && pipelineInvolvedNamespaces.empty()) {
+ // If this is a change stream, perform special checks and change the execution namespace.
+ if (liteParsedPipeline.hasChangeStream()) {
+ // Replace the execution namespace with that of the oplog.
+ nss = NamespaceString::kRsOplogNamespace;
+
+ // Upgrade and wait for read concern if necessary.
+ _adjustChangeStreamReadConcern(opCtx);
+
+ // AutoGetCollectionForReadCommand will raise an error if 'origNss' is a view. We do not
+ // need to check this if we are opening a stream on an entire db or across the cluster.
+ if (!origNss.isCollectionlessAggregateNS()) {
+ AutoGetCollectionForReadCommand origNssCtx(opCtx, origNss);
+ }
+
+ // If the user specified an explicit collation, adopt it; otherwise, use the simple
+ // collation. We do not inherit the collection's default collation or UUID, since
+ // the stream may be resuming from a point before the current UUID existed.
+ collatorToUse.emplace(resolveCollator(opCtx, request, nullptr));
+
+ // Obtain collection locks on the execution namespace; that is, the oplog.
+ ctx.emplace(opCtx, nss, AutoGetCollection::ViewMode::kViewsForbidden);
+ } else if (nss.isCollectionlessAggregateNS() && pipelineInvolvedNamespaces.empty()) {
+ // If this is a collectionless agg with no foreign namespaces, don't acquire any locks.
statsTracker.emplace(opCtx,
nss,
Top::LockType::NotLocked,
AutoStatsTracker::LogMode::kUpdateTopAndCurop,
0);
+ collatorToUse.emplace(resolveCollator(opCtx, request, nullptr));
} else {
+ // This is a regular aggregation. Lock the collection or view.
ctx.emplace(opCtx, nss, AutoGetCollection::ViewMode::kViewsPermitted);
+ collatorToUse.emplace(resolveCollator(opCtx, request, ctx->getCollection()));
+ uuid = ctx->getCollection() ? ctx->getCollection()->uuid() : boost::none;
}
Collection* collection = ctx ? ctx->getCollection() : nullptr;
- // For change streams, the UUID will already have been set for the original namespace.
- if (!liteParsedPipeline.hasChangeStream()) {
- uuid = collection ? collection->uuid() : boost::none;
- }
-
- // The collator may already have been set if this is a $changeStream pipeline. If not,
- // resolve the collator to either the user-specified collation or the collection default.
- if (!collatorToUse) {
- collatorToUse.emplace(resolveCollator(opCtx, request, collection));
- }
-
// If this is a view, resolve it by finding the underlying collection and stitching view
// pipelines and this request's pipeline together. We then release our locks before
// recursively calling runAggregate(), which will re-acquire locks on the underlying