summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorDavid Percy <david.percy@mongodb.com>2022-01-28 23:33:34 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-16 18:12:45 +0000
commit65348d1c32d7b46eea6e8120e56337c4475afc49 (patch)
tree522c07d5d9f3da2b368db7bb3773c09f0994bfe4 /src/mongo/db/pipeline
parent729f18d0a3adef3c06ca4e15a04160c78c54e5ad (diff)
downloadmongo-65348d1c32d7b46eea6e8120e56337c4475afc49.tar.gz
SERVER-63699 Create a very limited bounded-sort stage for time-series
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp98
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h21
2 files changed, 119 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index b47fb4036f0..9a9f6aeb7d0 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -26,6 +26,7 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
#include "mongo/platform/basic.h"
@@ -43,8 +44,10 @@
#include "mongo/db/pipeline/skip_and_limit.h"
#include "mongo/db/query/collation/collation_index_key.h"
#include "mongo/db/stats/resource_consumption_metrics.h"
+#include "mongo/logv2/log.h"
#include "mongo/platform/overflow_arithmetic.h"
#include "mongo/s/query/document_source_merge_cursors.h"
+#include "mongo/util/assert_util.h"
namespace mongo {
@@ -75,8 +78,47 @@ REGISTER_DOCUMENT_SOURCE(sort,
LiteParsedDocumentSourceDefault::parse,
DocumentSourceSort::createFromBson,
AllowedWithApiStrict::kAlways);
+REGISTER_DOCUMENT_SOURCE_CONDITIONALLY(
+ _internalBoundedSort,
+ LiteParsedDocumentSourceDefault::parse,
+ DocumentSourceSort::parseBoundedSort,
+ AllowedWithApiStrict::kNeverInVersion1,
+ AllowedWithClientType::kAny,
+ boost::
+ none /* TODO SERVER-52286 feature_flags::gFeatureFlagBucketUnpackWithSort.getVersion() */,
+ feature_flags::gFeatureFlagBucketUnpackWithSort.isEnabledAndIgnoreFCV());
+
DocumentSource::GetNextResult DocumentSourceSort::doGetNext() {
+ if (_timeSorter) {
+ // Only pull input as necessary to get _timeSorter to have a result.
+ while (_timeSorter->getState() == TimeSorter::State::kWait) {
+ auto input = pSource->getNext();
+ switch (input.getStatus()) {
+ case GetNextResult::ReturnStatus::kPauseExecution:
+ return input;
+ case GetNextResult::ReturnStatus::kEOF:
+ // Tell _timeSorter there will be no more input. In response, its state
+ // will never be kWait again, and so we'll never call pSource->getNext() again.
+ _timeSorter->done();
+ continue;
+ case GetNextResult::ReturnStatus::kAdvanced:
+ Document doc = input.getDocument();
+ auto time = doc.getField(_sortExecutor->sortPattern()[0].fieldPath->fullPath());
+ uassert(6369909,
+ "$_internalBoundedSort only handles Date values",
+ time.getType() == Date);
+ _timeSorter->add(time.getDate(), doc);
+ continue;
+ }
+ }
+
+ if (_timeSorter->getState() == TimeSorter::State::kDone)
+ return GetNextResult::makeEOF();
+
+ return _timeSorter->next().second;
+ }
+
if (!_populated) {
const auto populationResult = populate();
if (populationResult.isPaused()) {
@@ -101,6 +143,25 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceSort::clone() const {
void DocumentSourceSort::serializeToArray(
std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
+ if (_timeSorter) {
+ tassert(6369900,
+ "$_internalBoundedSort should not absorb a $limit",
+ !_sortExecutor->hasLimit());
+
+ // TODO SERVER-63637 Implement execution stats.
+
+ // {$_internalBoundedSort: {sortKey, bound}}
+ auto sortKey = _sortExecutor->sortPattern().serialize(
+ SortPattern::SortKeySerialization::kForPipelineSerialization);
+ array.push_back(Value{Document{{
+ {"$_internalBoundedSort"_sd,
+ Document{{
+ {"sortKey"_sd, std::move(sortKey)},
+ {"bound"_sd, durationCount<Seconds>(_timeSorter->makeBound.bound)},
+ }}},
+ }}});
+ return;
+ }
uint64_t limit = _sortExecutor->getLimit();
@@ -145,6 +206,11 @@ Pipeline::SourceContainer::iterator DocumentSourceSort::doOptimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
invariant(*itr == this);
+ if (_timeSorter) {
+ // Do not absorb a limit, or combine with other sort stages.
+ return std::next(itr);
+ }
+
auto stageItr = std::next(itr);
auto limit = extractLimitForPushdown(stageItr, container);
if (limit)
@@ -209,6 +275,33 @@ intrusive_ptr<DocumentSourceSort> DocumentSourceSort::create(
return pSort;
}
+intrusive_ptr<DocumentSourceSort> DocumentSourceSort::parseBoundedSort(
+ BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
+ uassert(6369905,
+ "the $_internalBoundedSort key specification must be an object",
+ elem.type() == Object);
+ BSONObj args = elem.embeddedObject();
+
+ BSONElement key = args["sortKey"];
+ uassert(6369904, "$_internalBoundedSort sortKey must be an object", key.type() == Object);
+ SortPattern pat{key.embeddedObject(), expCtx};
+ uassert(6369903, "$_internalBoundedSort doesn't support compound sort", pat.size() == 1);
+ uassert(6369902, "$_internalBoundedSort only handles ascending sort", pat[0].isAscending);
+ uassert(6369901,
+ "$_internalBoundedSort doesn't support an expression in the sortKey",
+ pat[0].expression == nullptr);
+ uassert(6369907,
+ "$_internalBoundedSort doesn't support dotted field names",
+ pat[0].fieldPath->getPathLength() == 1);
+
+ BSONElement bound = args["bound"];
+ int boundN = uassertStatusOK(bound.parseIntegerElementToNonNegativeInt());
+
+ auto ds = DocumentSourceSort::create(expCtx, pat);
+ ds->_timeSorter.reset(new TimeSorter{{}, BoundMaker{Seconds{boundN}}});
+ return ds;
+}
+
DocumentSource::GetNextResult DocumentSourceSort::populate() {
auto nextInput = pSource->getNext();
for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) {
@@ -260,6 +353,11 @@ std::pair<Value, Document> DocumentSourceSort::extractSortKey(Document&& doc) co
}
boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceSort::distributedPlanLogic() {
+ uassert(6369906,
+ "$_internalBoundedSort cannot be the first stage on the merger, because it requires "
+ "almost-sorted input, which the shardsPart of a pipeline can't provide",
+ !_timeSorter);
+
DistributedPlanLogic split;
split.shardsStage = this;
split.inputSortPattern = _sortExecutor->sortPattern()
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index 1660c91b9f2..1043f9a5d9a 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -113,6 +113,12 @@ public:
}
/**
+ * Parse a stage that uses BoundedSorter.
+ */
+ static boost::intrusive_ptr<DocumentSourceSort> parseBoundedSort(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ /**
* Returns the the limit, if a subsequent $limit stage has been coalesced with this $sort stage.
* Otherwise, returns boost::none.
*/
@@ -189,6 +195,21 @@ private:
boost::optional<SortExecutor<Document>> _sortExecutor;
boost::optional<SortKeyGenerator> _sortKeyGen;
+
+ struct BoundMaker {
+ Seconds bound;
+
+ Date_t operator()(Date_t key) {
+ return key - bound;
+ }
+ };
+ struct Comp {
+ int operator()(Date_t x, Date_t y) const {
+ return x.toMillisSinceEpoch() - y.toMillisSinceEpoch();
+ }
+ };
+ using TimeSorter = BoundedSorter<Date_t, Document, Comp, BoundMaker>;
+ std::unique_ptr<TimeSorter> _timeSorter;
};
} // namespace mongo