diff options
author | Adityavardhan Agrawal <adi.agrawal@mongodb.com> | 2023-04-17 13:55:39 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-04-17 14:46:20 +0000 |
commit | 01beeb4c2895153914158b1b5691c8c8cd60356a (patch) | |
tree | 9cbf16f84ba3176aafc4d930a9b116ea4b7016cc /src/mongo/db/views | |
parent | f8968230334cca0e504035188445697fcc8088cf (diff) | |
download | mongo-01beeb4c2895153914158b1b5691c8c8cd60356a.tar.gz |
SERVER-72686: Support $collStats in agg pipeline for timeseries collections
Diffstat (limited to 'src/mongo/db/views')
-rw-r--r-- | src/mongo/db/views/resolved_view.cpp | 95 | ||||
-rw-r--r-- | src/mongo/db/views/resolved_view.h | 10 |
2 files changed, 63 insertions, 42 deletions
diff --git a/src/mongo/db/views/resolved_view.cpp b/src/mongo/db/views/resolved_view.cpp index ec3a7ae8a49..2fd2b4a17cc 100644 --- a/src/mongo/db/views/resolved_view.cpp +++ b/src/mongo/db/views/resolved_view.cpp @@ -33,6 +33,7 @@ #include "mongo/base/init.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/pipeline/document_source_coll_stats.h" #include "mongo/db/pipeline/document_source_index_stats.h" #include "mongo/db/pipeline/document_source_internal_convert_bucket_index_stats.h" #include "mongo/db/pipeline/document_source_internal_unpack_bucket.h" @@ -133,45 +134,47 @@ std::shared_ptr<const ErrorExtraInfo> ResolvedView::parse(const BSONObj& cmdRepl return std::make_shared<ResolvedView>(fromBSON(cmdReply)); } -AggregateCommandRequest ResolvedView::asExpandedViewAggregation( - const AggregateCommandRequest& request) const { - // Perform the aggregation on the resolved namespace. The new pipeline consists of two parts: - // first, 'pipeline' in this ResolvedView; then, the pipeline in 'request'. - std::vector<BSONObj> resolvedPipeline; - resolvedPipeline.reserve(_pipeline.size() + request.getPipeline().size()); - resolvedPipeline.insert(resolvedPipeline.end(), _pipeline.begin(), _pipeline.end()); - resolvedPipeline.insert( - resolvedPipeline.end(), request.getPipeline().begin(), request.getPipeline().end()); - - // $indexStats needs special handling for time-series-collections. Normally for a regular read, - // $_internalUnpackBucket unpacks the buckets entries into time-series document format and then - // passes the time-series documents on through the pipeline. Instead we need to read the buckets - // collection's index stats unmodified and then pass the results through an additional stage to - // specially convert them to the time-series collection's schema, and then onward. There is no - // need for the $_internalUnpackBucket stage with $indexStats, so we remove it. - if (resolvedPipeline.size() >= 2 && - resolvedPipeline[0][DocumentSourceInternalUnpackBucket::kStageNameInternal] && - resolvedPipeline[1][DocumentSourceIndexStats::kStageName]) { - // Clear the $_internalUnpackBucket stage. - auto unpackStage = resolvedPipeline[0]; - resolvedPipeline[0] = resolvedPipeline[1]; - - // Grab the $_internalUnpackBucket stage's time-series collection schema options and pass - // them into the $_internalConvertBucketIndexStats stage to use for schema conversion. - BSONObjBuilder builder; - for (const auto& elem : - unpackStage[DocumentSourceInternalUnpackBucket::kStageNameInternal].Obj()) { - if (elem.fieldNameStringData() == timeseries::kTimeFieldName || - elem.fieldNameStringData() == timeseries::kMetaFieldName) { +void ResolvedView::handleTimeseriesRewrites(std::vector<BSONObj>* resolvedPipeline) const { + // Stages that are constrained to be the first stage of the pipeline ($collStats, $indexStats) + // require special handling since $_internalUnpackBucket is the first stage. + if (resolvedPipeline->size() >= 2 && + (*resolvedPipeline)[0][DocumentSourceInternalUnpackBucket::kStageNameInternal] && + ((*resolvedPipeline)[1][DocumentSourceIndexStats::kStageName] || + (*resolvedPipeline)[1][DocumentSourceCollStats::kStageName])) { + // Normally for a regular read, $_internalUnpackBucket unpacks the buckets entries into + // time-series document format and then passes the time-series documents on through the + // pipeline. Instead, for $indexStats, we need to read the buckets collection's index + // stats unmodified and then pass the results through an additional stage to specially + // convert them to the time-series collection's schema, and then onward. We grab the + // $_internalUnpackBucket stage's time-series collection schema options and pass them + // into the $_internalConvertBucketIndexStats stage to use for schema conversion. + if ((*resolvedPipeline)[1][DocumentSourceIndexStats::kStageName]) { + auto unpackStage = (*resolvedPipeline)[0]; + (*resolvedPipeline)[0] = (*resolvedPipeline)[1]; + BSONObjBuilder builder; + for (const auto& elem : + unpackStage[DocumentSourceInternalUnpackBucket::kStageNameInternal].Obj()) { + if (elem.fieldNameStringData() == timeseries::kTimeFieldName || + elem.fieldNameStringData() == timeseries::kMetaFieldName) { + builder.append(elem); + } + } + (*resolvedPipeline)[1] = + BSON(DocumentSourceInternalConvertBucketIndexStats::kStageName << builder.obj()); + } else { + auto collStatsStage = (*resolvedPipeline)[1]; + BSONObjBuilder builder; + for (const auto& elem : collStatsStage[DocumentSourceCollStats::kStageName].Obj()) { builder.append(elem); } + builder.append("$_requestOnTimeseriesView", true); + (*resolvedPipeline)[1] = BSON(DocumentSourceCollStats::kStageName << builder.obj()); + // For $collStats, we directly read the collection stats from the buckets + // collection, and skip $_internalUnpackBucket. + resolvedPipeline->erase(resolvedPipeline->begin()); } - - resolvedPipeline[1] = - BSON(DocumentSourceInternalConvertBucketIndexStats::kStageName << builder.obj()); - } else if (resolvedPipeline.size() >= 1 && - resolvedPipeline[0][DocumentSourceInternalUnpackBucket::kStageNameInternal]) { - auto unpackStage = resolvedPipeline[0]; + } else { + auto unpackStage = (*resolvedPipeline)[0]; BSONObjBuilder builder; for (const auto& elem : @@ -184,11 +187,27 @@ AggregateCommandRequest ResolvedView::asExpandedViewAggregation( builder.append(DocumentSourceInternalUnpackBucket::kUsesExtendedRange, ((_timeseriesUsesExtendedRange && *_timeseriesUsesExtendedRange))); - resolvedPipeline[0] = + (*resolvedPipeline)[0] = BSON(DocumentSourceInternalUnpackBucket::kStageNameInternal << builder.obj()); } +} + +AggregateCommandRequest ResolvedView::asExpandedViewAggregation( + const AggregateCommandRequest& request) const { + // Perform the aggregation on the resolved namespace. The new pipeline consists of two parts: + // first, 'pipeline' in this ResolvedView; then, the pipeline in 'request'. + std::vector<BSONObj> resolvedPipeline; + resolvedPipeline.reserve(_pipeline.size() + request.getPipeline().size()); + resolvedPipeline.insert(resolvedPipeline.end(), _pipeline.begin(), _pipeline.end()); + resolvedPipeline.insert( + resolvedPipeline.end(), request.getPipeline().begin(), request.getPipeline().end()); + + if (resolvedPipeline.size() >= 1 && + resolvedPipeline[0][DocumentSourceInternalUnpackBucket::kStageNameInternal]) { + handleTimeseriesRewrites(&resolvedPipeline); + } - AggregateCommandRequest expandedRequest{_namespace, resolvedPipeline}; + AggregateCommandRequest expandedRequest{_namespace, std::move(resolvedPipeline)}; if (request.getExplain()) { expandedRequest.setExplain(request.getExplain()); diff --git a/src/mongo/db/views/resolved_view.h b/src/mongo/db/views/resolved_view.h index 82238556288..bdceb93ac8e 100644 --- a/src/mongo/db/views/resolved_view.h +++ b/src/mongo/db/views/resolved_view.h @@ -59,6 +59,8 @@ public: static ResolvedView fromBSON(const BSONObj& commandResponseObj); + void handleTimeseriesRewrites(std::vector<BSONObj>* resolvedPipeline) const; + /** * Convert an aggregation command on a view to the equivalent command against the view's * underlying collection. @@ -91,12 +93,12 @@ private: NamespaceString _namespace; std::vector<BSONObj> _pipeline; - // The default collation associated with this view. An empty object means that the default is - // the simple collation. + // The default collation associated with this view. An empty object means that the default + // is the simple collation. // // Currently all operations which run over a view must use the default collation. This means - // that operations on the view which do not specify a collation inherit the default. Operations - // on the view which specify any other collation fail with a user error. + // that operations on the view which do not specify a collation inherit the default. + // Operations on the view which specify any other collation fail with a user error. BSONObj _defaultCollation; boost::optional<TimeseriesOptions> _timeseriesOptions; |