diff options
author | David Percy <david.percy@mongodb.com> | 2022-01-28 23:33:34 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-16 18:12:45 +0000 |
commit | 65348d1c32d7b46eea6e8120e56337c4475afc49 (patch) | |
tree | 522c07d5d9f3da2b368db7bb3773c09f0994bfe4 /src/mongo/db/pipeline | |
parent | 729f18d0a3adef3c06ca4e15a04160c78c54e5ad (diff) | |
download | mongo-65348d1c32d7b46eea6e8120e56337c4475afc49.tar.gz |
SERVER-63699 Create a very limited bounded-sort stage for time-series
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r-- | src/mongo/db/pipeline/document_source_sort.cpp | 98 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_sort.h | 21 |
2 files changed, 119 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index b47fb4036f0..9a9f6aeb7d0 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -26,6 +26,7 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery #include "mongo/platform/basic.h" @@ -43,8 +44,10 @@ #include "mongo/db/pipeline/skip_and_limit.h" #include "mongo/db/query/collation/collation_index_key.h" #include "mongo/db/stats/resource_consumption_metrics.h" +#include "mongo/logv2/log.h" #include "mongo/platform/overflow_arithmetic.h" #include "mongo/s/query/document_source_merge_cursors.h" +#include "mongo/util/assert_util.h" namespace mongo { @@ -75,8 +78,47 @@ REGISTER_DOCUMENT_SOURCE(sort, LiteParsedDocumentSourceDefault::parse, DocumentSourceSort::createFromBson, AllowedWithApiStrict::kAlways); +REGISTER_DOCUMENT_SOURCE_CONDITIONALLY( + _internalBoundedSort, + LiteParsedDocumentSourceDefault::parse, + DocumentSourceSort::parseBoundedSort, + AllowedWithApiStrict::kNeverInVersion1, + AllowedWithClientType::kAny, + boost:: + none /* TODO SERVER-52286 feature_flags::gFeatureFlagBucketUnpackWithSort.getVersion() */, + feature_flags::gFeatureFlagBucketUnpackWithSort.isEnabledAndIgnoreFCV()); + DocumentSource::GetNextResult DocumentSourceSort::doGetNext() { + if (_timeSorter) { + // Only pull input as necessary to get _timeSorter to have a result. + while (_timeSorter->getState() == TimeSorter::State::kWait) { + auto input = pSource->getNext(); + switch (input.getStatus()) { + case GetNextResult::ReturnStatus::kPauseExecution: + return input; + case GetNextResult::ReturnStatus::kEOF: + // Tell _timeSorter there will be no more input. In response, its state + // will never be kWait again, and so we'll never call pSource->getNext() again. + _timeSorter->done(); + continue; + case GetNextResult::ReturnStatus::kAdvanced: + Document doc = input.getDocument(); + auto time = doc.getField(_sortExecutor->sortPattern()[0].fieldPath->fullPath()); + uassert(6369909, + "$_internalBoundedSort only handles Date values", + time.getType() == Date); + _timeSorter->add(time.getDate(), doc); + continue; + } + } + + if (_timeSorter->getState() == TimeSorter::State::kDone) + return GetNextResult::makeEOF(); + + return _timeSorter->next().second; + } + if (!_populated) { const auto populationResult = populate(); if (populationResult.isPaused()) { @@ -101,6 +143,25 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceSort::clone() const { void DocumentSourceSort::serializeToArray( std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const { + if (_timeSorter) { + tassert(6369900, + "$_internalBoundedSort should not absorb a $limit", + !_sortExecutor->hasLimit()); + + // TODO SERVER-63637 Implement execution stats. + + // {$_internalBoundedSort: {sortKey, bound}} + auto sortKey = _sortExecutor->sortPattern().serialize( + SortPattern::SortKeySerialization::kForPipelineSerialization); + array.push_back(Value{Document{{ + {"$_internalBoundedSort"_sd, + Document{{ + {"sortKey"_sd, std::move(sortKey)}, + {"bound"_sd, durationCount<Seconds>(_timeSorter->makeBound.bound)}, + }}}, + }}}); + return; + } uint64_t limit = _sortExecutor->getLimit(); @@ -145,6 +206,11 @@ Pipeline::SourceContainer::iterator DocumentSourceSort::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this); + if (_timeSorter) { + // Do not absorb a limit, or combine with other sort stages. + return std::next(itr); + } + auto stageItr = std::next(itr); auto limit = extractLimitForPushdown(stageItr, container); if (limit) @@ -209,6 +275,33 @@ intrusive_ptr<DocumentSourceSort> DocumentSourceSort::create( return pSort; } +intrusive_ptr<DocumentSourceSort> DocumentSourceSort::parseBoundedSort( + BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { + uassert(6369905, + "the $_internalBoundedSort key specification must be an object", + elem.type() == Object); + BSONObj args = elem.embeddedObject(); + + BSONElement key = args["sortKey"]; + uassert(6369904, "$_internalBoundedSort sortKey must be an object", key.type() == Object); + SortPattern pat{key.embeddedObject(), expCtx}; + uassert(6369903, "$_internalBoundedSort doesn't support compound sort", pat.size() == 1); + uassert(6369902, "$_internalBoundedSort only handles ascending sort", pat[0].isAscending); + uassert(6369901, + "$_internalBoundedSort doesn't support an expression in the sortKey", + pat[0].expression == nullptr); + uassert(6369907, + "$_internalBoundedSort doesn't support dotted field names", + pat[0].fieldPath->getPathLength() == 1); + + BSONElement bound = args["bound"]; + int boundN = uassertStatusOK(bound.parseIntegerElementToNonNegativeInt()); + + auto ds = DocumentSourceSort::create(expCtx, pat); + ds->_timeSorter.reset(new TimeSorter{{}, BoundMaker{Seconds{boundN}}}); + return ds; +} + DocumentSource::GetNextResult DocumentSourceSort::populate() { auto nextInput = pSource->getNext(); for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) { @@ -260,6 +353,11 @@ std::pair<Value, Document> DocumentSourceSort::extractSortKey(Document&& doc) co } boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceSort::distributedPlanLogic() { + uassert(6369906, + "$_internalBoundedSort cannot be the first stage on the merger, because it requires " + "almost-sorted input, which the shardsPart of a pipeline can't provide", + !_timeSorter); + DistributedPlanLogic split; split.shardsStage = this; split.inputSortPattern = _sortExecutor->sortPattern() diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index 1660c91b9f2..1043f9a5d9a 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -113,6 +113,12 @@ public: } /** + * Parse a stage that uses BoundedSorter. + */ + static boost::intrusive_ptr<DocumentSourceSort> parseBoundedSort( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + /** * Returns the the limit, if a subsequent $limit stage has been coalesced with this $sort stage. * Otherwise, returns boost::none. */ @@ -189,6 +195,21 @@ private: boost::optional<SortExecutor<Document>> _sortExecutor; boost::optional<SortKeyGenerator> _sortKeyGen; + + struct BoundMaker { + Seconds bound; + + Date_t operator()(Date_t key) { + return key - bound; + } + }; + struct Comp { + int operator()(Date_t x, Date_t y) const { + return x.toMillisSinceEpoch() - y.toMillisSinceEpoch(); + } + }; + using TimeSorter = BoundedSorter<Date_t, Document, Comp, BoundMaker>; + std::unique_ptr<TimeSorter> _timeSorter; }; } // namespace mongo |