summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_sort.cpp
diff options
context:
space:
mode:
authorDan Larkin-York <dan.larkin-york@mongodb.com>2022-03-21 19:21:49 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-21 21:48:27 +0000
commit85409b314c52efaa1987d50291f2779dfa314328 (patch)
tree0975c4b9ab98ea336ced8f44c707d6f44b8c3117 /src/mongo/db/pipeline/document_source_sort.cpp
parent0f9843968b1656cd1287d33f16508368fd5d7b2a (diff)
downloadmongo-85409b314c52efaa1987d50291f2779dfa314328.tar.gz
SERVER-64602 Provide time-series bucket time bounds as document metadata for use by bounded sorter
Diffstat (limited to 'src/mongo/db/pipeline/document_source_sort.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp104
1 files changed, 78 insertions, 26 deletions
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index b5ef461e697..a96c3876699 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/exec/document_value/document_comparator.h"
+#include "mongo/db/exec/document_value/document_metadata_fields.h"
#include "mongo/db/exec/document_value/value.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/pipeline/expression.h"
@@ -58,27 +59,34 @@ using std::unique_ptr;
using std::vector;
namespace {
-struct BoundMakerAsc {
- const Seconds bound;
+constexpr StringData kMin = "min"_sd;
+constexpr StringData kMax = "max"_sd;
- DocumentSourceSort::SortableDate operator()(DocumentSourceSort::SortableDate key) const {
- return {key.date - bound};
+struct BoundMakerMin {
+ const long long offset; // Offset in millis
+
+ DocumentSourceSort::SortableDate operator()(DocumentSourceSort::SortableDate key,
+ const Document& doc) const {
+ return {Date_t::fromMillisSinceEpoch(
+ doc.metadata().getTimeseriesBucketMinTime().toMillisSinceEpoch() + offset)};
}
- long long serialize() const {
- return duration_cast<Seconds>(bound).count();
+ Document serialize() const {
+ return Document{{{"base"_sd, kMin}, {"offset"_sd, offset}}};
}
};
-struct BoundMakerDesc {
- const Seconds bound;
+struct BoundMakerMax {
+ const long long offset;
- DocumentSourceSort::SortableDate operator()(DocumentSourceSort::SortableDate key) const {
- return {key.date + bound};
+ DocumentSourceSort::SortableDate operator()(DocumentSourceSort::SortableDate key,
+ const Document& doc) const {
+ return {Date_t::fromMillisSinceEpoch(
+ doc.metadata().getTimeseriesBucketMaxTime().toMillisSinceEpoch() + offset)};
}
- long long serialize() const {
- return duration_cast<Seconds>(bound).count();
+ Document serialize() const {
+ return Document{{{"base"_sd, kMax}, {"offset"_sd, offset}}};
}
};
struct CompAsc {
@@ -102,10 +110,14 @@ struct CompDesc {
}
};
-using TimeSorterAsc =
- BoundedSorter<DocumentSourceSort::SortableDate, Document, CompAsc, BoundMakerAsc>;
-using TimeSorterDesc =
- BoundedSorter<DocumentSourceSort::SortableDate, Document, CompDesc, BoundMakerDesc>;
+using TimeSorterAscMin =
+ BoundedSorter<DocumentSourceSort::SortableDate, Document, CompAsc, BoundMakerMin>;
+using TimeSorterAscMax =
+ BoundedSorter<DocumentSourceSort::SortableDate, Document, CompAsc, BoundMakerMax>;
+using TimeSorterDescMin =
+ BoundedSorter<DocumentSourceSort::SortableDate, Document, CompDesc, BoundMakerMin>;
+using TimeSorterDescMax =
+ BoundedSorter<DocumentSourceSort::SortableDate, Document, CompDesc, BoundMakerMax>;
} // namespace
constexpr StringData DocumentSourceSort::kStageName;
@@ -299,9 +311,8 @@ Pipeline::SourceContainer::iterator DocumentSourceSort::doOptimizeAt(
DepsTracker::State DocumentSourceSort::getDependencies(DepsTracker* deps) const {
_sortExecutor->sortPattern().addDependencies(deps);
- if (pExpCtx->needsMerge) {
- // Include the sort key if we will merge several sorted streams later.
- deps->setNeedsMetadata(DocumentMetadataFields::kSortKey, true);
+ if (_requiredMetadata.any()) {
+ deps->requestMetadata(_requiredMetadata);
}
return DepsTracker::State::SEE_NEXT;
@@ -345,7 +356,25 @@ intrusive_ptr<DocumentSourceSort> DocumentSourceSort::parseBoundedSort(
pat[0].fieldPath->getPathLength() == 1);
BSONElement bound = args["bound"];
- int boundN = uassertStatusOK(bound.parseIntegerElementToNonNegativeInt());
+ uassert(
+ 6460200, "$_internalBoundedSort bound must be an object", bound && bound.type() == Object);
+
+ BSONElement boundOffsetElem = bound.Obj()["offset"];
+ long long boundOffset = 0;
+ if (boundOffsetElem && boundOffsetElem.isNumber()) {
+ boundOffset = uassertStatusOK(boundOffsetElem.parseIntegerElementToLong()) *
+ 1000; // convert to millis
+ }
+
+ BSONElement boundBaseElem = bound.Obj()["base"];
+ uassert(6460201,
+ "$_internalBoundedSort bound.base must be a string",
+ boundBaseElem && boundBaseElem.type() == String);
+ StringData boundBase = boundBaseElem.valueStringData();
+ uassert(6460202,
+ str::stream() << "$_internalBoundedSort bound.base must be '" << kMin << "' or '"
+ << kMax << "'",
+ boundBase == kMin || boundBase == kMax);
SortOptions opts;
opts.maxMemoryUsageBytes = internalQueryMaxBlockingSortMemoryUsageBytes.load();
@@ -355,11 +384,26 @@ intrusive_ptr<DocumentSourceSort> DocumentSourceSort::parseBoundedSort(
}
auto ds = DocumentSourceSort::create(expCtx, pat);
- if (pat[0].isAscending) {
- ds->_timeSorter.reset(new TimeSorterAsc{opts, CompAsc{}, BoundMakerAsc{Seconds{boundN}}});
+ if (boundBase == kMin) {
+ if (pat[0].isAscending) {
+ ds->_timeSorter.reset(
+ new TimeSorterAscMin{opts, CompAsc{}, BoundMakerMin{boundOffset}});
+ } else {
+ ds->_timeSorter.reset(
+ new TimeSorterDescMin{opts, CompDesc{}, BoundMakerMin{boundOffset}});
+ }
+ ds->_requiredMetadata.set(DocumentMetadataFields::MetaType::kTimeseriesBucketMinTime);
+ } else if (boundBase == kMax) {
+ if (pat[0].isAscending) {
+ ds->_timeSorter.reset(
+ new TimeSorterAscMax{opts, CompAsc{}, BoundMakerMax{boundOffset}});
+ } else {
+ ds->_timeSorter.reset(
+ new TimeSorterDescMax{opts, CompDesc{}, BoundMakerMax{boundOffset}});
+ }
+ ds->_requiredMetadata.set(DocumentMetadataFields::MetaType::kTimeseriesBucketMaxTime);
} else {
- ds->_timeSorter.reset(
- new TimeSorterDesc{opts, CompDesc{}, BoundMakerDesc{Seconds{boundN}}});
+ MONGO_UNREACHABLE;
}
return ds;
}
@@ -463,8 +507,16 @@ std::string nextFileName() {
template class ::mongo::BoundedSorter<::mongo::DocumentSourceSort::SortableDate,
::mongo::Document,
::mongo::CompAsc,
- ::mongo::BoundMakerAsc>;
+ ::mongo::BoundMakerMin>;
+template class ::mongo::BoundedSorter<::mongo::DocumentSourceSort::SortableDate,
+ ::mongo::Document,
+ ::mongo::CompAsc,
+ ::mongo::BoundMakerMax>;
+template class ::mongo::BoundedSorter<::mongo::DocumentSourceSort::SortableDate,
+ ::mongo::Document,
+ ::mongo::CompDesc,
+ ::mongo::BoundMakerMin>;
template class ::mongo::BoundedSorter<::mongo::DocumentSourceSort::SortableDate,
::mongo::Document,
::mongo::CompDesc,
- ::mongo::BoundMakerDesc>;
+ ::mongo::BoundMakerMax>;