diff options
author | Ted Tuckman <ted.tuckman@mongodb.com> | 2021-09-20 16:21:48 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-09-20 17:09:42 +0000 |
commit | 7ee2e01e5d95a9b925fa1964b4f9f6151a9806bd (patch) | |
tree | 68419dc507ae5466b0793a035501a5bfdd1abb2b | |
parent | f408ec2ca68e0cc1ee51b6971d688daf2c513ef4 (diff) | |
download | mongo-7ee2e01e5d95a9b925fa1964b4f9f6151a9806bd.tar.gz |
SERVER-57344 Enable partitioning in $densify
-rw-r--r-- | jstests/aggregation/sources/densify/libs/parse_util.js | 16 | ||||
-rw-r--r-- | jstests/aggregation/sources/densify/partitions.js | 392 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_densify.cpp | 389 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_densify.h | 98 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_densify_test.cpp | 38 |
5 files changed, 813 insertions, 120 deletions
diff --git a/jstests/aggregation/sources/densify/libs/parse_util.js b/jstests/aggregation/sources/densify/libs/parse_util.js index 9e51df53445..2a41de28f7e 100644 --- a/jstests/aggregation/sources/densify/libs/parse_util.js +++ b/jstests/aggregation/sources/densify/libs/parse_util.js @@ -188,21 +188,19 @@ let parseUtil = (function(db, coll, stageName, options = {}) { * range: {step: 1.0, bounds: "full", unit: "second"} * } * })); */ - // TODO SERVER-57344: Enable this parsing test. - /* assert.commandWorked(run({ - * [stageName]: - * {field: "a", partitionByFields: ["b", "c"], range: {step: 1.0, bounds: "full"}} - * })); */ - // TODO SERVER-57344: Enable this parsing test. - /* assert.commandWorked(run({ + assert.commandWorked(run({ + [stageName]: + {field: "a", partitionByFields: ["b", "c"], range: {step: 1.0, bounds: "full"}} + })); + assert.commandWorked(run({ [stageName]: { field: "a", partitionByFields: [ "b", ], - range: { step: 1.0, bounds: "partition" } + range: {step: 1.0, bounds: "partition"} } - })); */ + })); assert.commandWorked(run({[stageName]: {field: "a", range: {step: 1.0, bounds: "full"}}})); } diff --git a/jstests/aggregation/sources/densify/partitions.js b/jstests/aggregation/sources/densify/partitions.js new file mode 100644 index 00000000000..f7df2c4b257 --- /dev/null +++ b/jstests/aggregation/sources/densify/partitions.js @@ -0,0 +1,392 @@ +/** + * Test that densify works for partitions. + * @tags: [ + * # Needed as $densify is a 51 feature. + * requires_fcv_51, + * ] + */ + +(function() { +"use strict"; + +load("jstests/aggregation/extras/utils.js"); // arrayEq + +const featureEnabled = + assert.commandWorked(db.adminCommand({getParameter: 1, featureFlagDensify: 1})) + .featureFlagDensify.value; +if (!featureEnabled) { + jsTestLog("Skipping test because the densify feature flag is disabled"); + return; +} + +const coll = db[jsTestName()]; + +function buildErrorString(found, expected) { + return "Expected:\n" + tojson(expected) + "\nGot:\n" + tojson(found); +} + +// Two docs in each of two partitions. +function testOne() { + coll.drop(); + + const testDocs = [ + {val: 0, partition: 0}, + {val: 2, partition: 0}, + {val: 0, partition: 1}, + {val: 2, partition: 1} + ]; + assert.commandWorked(coll.insert(testDocs)); + + let result = coll.aggregate([ + {$project: {_id: 0}}, + { + $densify: { + field: "val", + partitionByFields: ["partition"], + range: {step: 1, bounds: "partition"} + } + } + ]); + const resultArray = result.toArray(); + const testExpected = testDocs.concat([{val: 1, partition: 0}, {val: 1, partition: 1}]); + assert(arrayEq(resultArray, testExpected), buildErrorString(resultArray, testExpected)); + coll.drop(); +} + +// Same as test one, but partitions are interleaved. +function testTwo() { + coll.drop(); + const testDocs = [ + {val: 0, partition: 0}, + {val: 0, partition: 1}, + {val: 2, partition: 1}, + {val: 2, partition: 0} + ]; + assert.commandWorked(coll.insert(testDocs)); + + let result = coll.aggregate([ + {$project: {_id: 0}}, + { + $densify: { + field: "val", + partitionByFields: ["partition"], + range: {step: 1, bounds: "partition"} + } + } + ]); + const resultArray = result.toArray(); + const testExpected = testDocs.concat([{val: 1, partition: 0}, {val: 1, partition: 1}]); + assert(arrayEq(resultArray, testExpected), buildErrorString(resultArray, testExpected)); + coll.drop(); +} + +// Two larger partitions, interleaved. +function testThree() { + coll.drop(); + let testDocs = []; + let testExpected = []; + for (let i = 0; i < 20; i++) { + for (let j = 0; j < 2; j++) { + if (i % 4 == 2 && j == 0) { + testDocs.push({val: i, part: j}); + } else if (i % 5 == 0 && j == 1) { + testDocs.push({val: i, part: j}); + } + // Should have every document below 16 in first partition and 15 in the second. + if (i >= 2 && i <= 18 && j == 0) { + testExpected.push({val: i, part: j}); + } + if (i <= 15 && j == 1) { + testExpected.push({val: i, part: j}); + } + } + } + assert.commandWorked(coll.insert(testDocs)); + let result = coll.aggregate([ + {$project: {_id: 0}}, + { + $densify: + {field: "val", partitionByFields: ["part"], range: {step: 1, bounds: "partition"}} + } + ]); + const resultArray = result.toArray(); + assert(arrayEq(resultArray, testExpected), buildErrorString(resultArray, testExpected)); +} + +// Five small partitions. +function testFour() { + coll.drop(); + let testDocs = []; + let testExpected = []; + for (let partVal = 0; partVal < 5; partVal++) { + // Add an initial document to each partition. + testDocs.push({val: 0, part: partVal}); + testExpected.push({val: 0, part: partVal}); + for (let densifyVal = 1; densifyVal < 10; densifyVal++) { + if (partVal > 0 && densifyVal % partVal == 0) { + testDocs.push({val: densifyVal, part: partVal}); + } + testExpected.push({val: densifyVal, part: partVal}); + } + // Add a top document to each partition. + testDocs.push({val: 10, part: partVal}); + testExpected.push({val: 10, part: partVal}); + } + assert.commandWorked(coll.insert(testDocs)); + let result = coll.aggregate([ + {$project: {_id: 0}}, + { + $densify: + {field: "val", partitionByFields: ["part"], range: {step: 1, bounds: "partition"}} + } + ]); + const resultArray = result.toArray(); + assert(arrayEq(resultArray, testExpected), buildErrorString(resultArray, testExpected)); +} + +// One partition doesn't need densifying. +function testFive() { + coll.drop(); + + const testDocs = [ + {val: 0, partition: 0}, + {val: 2, partition: 0}, + {val: 123, partition: 1}, + ]; + assert.commandWorked(coll.insert(testDocs)); + + let result = coll.aggregate([ + {$project: {_id: 0}}, + { + $densify: { + field: "val", + partitionByFields: ["partition"], + range: {step: 1, bounds: "partition"} + } + } + ]); + const resultArray = result.toArray(); + const testExpected = testDocs.concat([{val: 1, partition: 0}]); + assert(arrayEq(resultArray, testExpected), buildErrorString(resultArray, testExpected)); + coll.drop(); +} + +// Verify the following test works in the full case without partitions. +function fullTestOne(stepVal = 1) { + coll.drop(); + let testDocs = []; + let testExpected = []; + // Add an initial document. + testDocs.push({val: 0}); + testExpected.push({val: 0}); + for (let densifyVal = 1; densifyVal < 11; densifyVal++) { + if (densifyVal % 2 == 0) { + testDocs.push({val: densifyVal}); + testExpected.push({val: densifyVal}); + } else if (densifyVal % stepVal == 0) { + testExpected.push({val: densifyVal}); + } + } + testDocs.push({val: 11}); + testExpected.push({val: 11}); + assert.commandWorked(coll.insert(testDocs)); + let result = coll.aggregate( + [{$project: {_id: 0}}, {$densify: {field: "val", range: {step: stepVal, bounds: "full"}}}]); + const resultArray = result.toArray(); + assert(arrayEq(resultArray, testExpected), buildErrorString(resultArray, testExpected)); +} + +// Multiple partition fields. +function testFive(stepVal = 1) { + coll.drop(); + let testDocs = []; + let testExpected = []; + for (let partValA = 0; partValA < 2; partValA++) { + for (let partValB = 0; partValB < 2; partValB++) { + // Add an initial document to each partition. + testDocs.push({val: 0, partA: partValA, partB: partValB}); + testExpected.push({val: 0, partA: partValA, partB: partValB}); + for (let densifyVal = 1; densifyVal < 11; densifyVal++) { + if (densifyVal % 2 == 0) { + testDocs.push({val: densifyVal, partA: partValA, partB: partValB}); + testExpected.push({val: densifyVal, partA: partValA, partB: partValB}); + } else if (densifyVal % stepVal == 0) { + testExpected.push({val: densifyVal, partA: partValA, partB: partValB}); + } + } + // Add a max document to each partition. + testDocs.push({val: 11, partA: partValA, partB: partValB}); + testExpected.push({val: 11, partA: partValA, partB: partValB}); + } + } + assert.commandWorked(coll.insert(testDocs)); + let result = coll.aggregate([ + {$project: {_id: 0}}, + { + $densify: { + field: "val", + partitionByFields: ["partA", "partB"], + range: {step: stepVal, bounds: "partition"} + } + } + ]); + const resultArray = result.toArray(); + assert(arrayEq(resultArray, testExpected), buildErrorString(resultArray, testExpected)); +} + +// Test partitioning with full where partitions need to be densified at the end. +// Three partitions, each with only one document. +function fullTestTwo(stepVal = 2) { + coll.drop(); + let testDocs = []; + let testExpected = []; + // Add an initial document. + testDocs.push({val: 0, part: 0}); + testDocs.push({val: 0, part: 1}); + testDocs.push({val: 10, part: 2}); + for (let densifyVal = 0; densifyVal < 11; densifyVal += stepVal) { + for (let partitionVal = 0; partitionVal <= 2; partitionVal++) { + testExpected.push({val: densifyVal, part: partitionVal}); + } + } + assert.commandWorked(coll.insert(testDocs)); + let result = coll.aggregate([ + {$project: {_id: 0}}, + { + $densify: + {field: "val", range: {step: stepVal, bounds: "full"}, partitionByFields: ["part"]} + }, + {$sort: {val: 1, part: 1}} + ]); + const resultArray = result.toArray(); + assert(arrayEq(resultArray, testExpected), buildErrorString(resultArray, testExpected)); +} + +// Same as above, but with extra documents in the middle of each partition somewhere. +function fullTestThree(stepVal = 2) { + coll.drop(); + let testDocs = []; + let testExpected = []; + // Add an initial document. + testDocs.push({val: 0, part: 0}); + testDocs.push({val: 4, part: 0}); + testDocs.push({val: 0, part: 1}); + testDocs.push({val: 5, part: 1}); + testExpected.push({val: 5, part: 1}); + testDocs.push({val: 10, part: 2}); + for (let densifyVal = 0; densifyVal < 11; densifyVal += stepVal) { + for (let partitionVal = 0; partitionVal <= 2; partitionVal++) { + testExpected.push({val: densifyVal, part: partitionVal}); + } + } + assert.commandWorked(coll.insert(testDocs)); + let result = coll.aggregate([ + {$project: {_id: 0}}, + { + $densify: + {field: "val", range: {step: stepVal, bounds: "full"}, partitionByFields: ["part"]} + }, + {$sort: {val: 1, part: 1}} + ]); + const resultArray = result.toArray(); + assert(arrayEq(resultArray, testExpected), buildErrorString(resultArray, testExpected)); +} + +// Two partitions with no documents in the range. +function rangeTestOne() { + coll.drop(); + + const testDocs = [ + {val: 0, partition: 0}, + {val: 4, partition: 1}, + ]; + + const expectedDocs = [ + {val: 0, partition: 0}, + {val: 4, partition: 1}, + {val: 2, partition: 0}, + {val: 2, partition: 1}, + {val: 3, partition: 0}, + {val: 3, partition: 1} + ]; + assert.commandWorked(coll.insert(testDocs)); + + let result = coll.aggregate([ + {$project: {_id: 0}}, + { + $densify: + {field: "val", partitionByFields: ["partition"], range: {step: 1, bounds: [2, 3]}} + } + ]); + const resultArray = result.toArray(); + assert(arrayEq(resultArray, expectedDocs), buildErrorString(resultArray, expectedDocs)); + coll.drop(); +} + +// Three partitions, each with different documents w/respect to the range. +function rangeTestTwo() { + coll.drop(); + let testDocs = []; + let testExpected = []; + testDocs.push({val: 0, part: 0}); + testExpected.push({val: 0, part: 0}); + testDocs.push({val: 5, part: 1}); + testExpected.push({val: 5, part: 1}); + testDocs.push({val: 10, part: 2}); + testExpected.push({val: 10, part: 2}); + for (let densifyVal = 4; densifyVal <= 8; densifyVal += 2) { + for (let partitionVal = 0; partitionVal <= 2; partitionVal++) { + testExpected.push({val: densifyVal, part: partitionVal}); + } + } + assert.commandWorked(coll.insert(testDocs)); + let result = coll.aggregate([ + {$project: {_id: 0}}, + {$densify: {field: "val", range: {step: 2, bounds: [4, 8]}, partitionByFields: ["part"]}}, + {$sort: {val: 1, part: 1}} + ]); + const resultArray = result.toArray(); + assert(arrayEq(resultArray, testExpected), buildErrorString(resultArray, testExpected)); +} + +// Test negative numbers. +function fullTestFour() { + coll.drop(); + let testDocs = []; + let testExpected = []; + // Add an initial document. + testDocs.push({val: -10, part: 0}); + testExpected.push({val: -10, part: 0}); + testDocs.push({val: 10, part: 0}); + testExpected.push({val: 10, part: 0}); + testDocs.push({val: -2, part: 1}); + testExpected.push({val: -2, part: 1}); + testExpected.push({val: -10, part: 1}); + for (let densifyVal = -7; densifyVal < 10; densifyVal += 3) { + testExpected.push({val: densifyVal, part: 0}); + testExpected.push({val: densifyVal, part: 1}); + } + assert.commandWorked(coll.insert(testDocs)); + let result = coll.aggregate([ + {$project: {_id: 0}}, + {$densify: {field: "val", range: {step: 3, bounds: "full"}, partitionByFields: ["part"]}} + ]); + const resultArray = result.toArray(); + assert(arrayEq(resultArray, testExpected), buildErrorString(resultArray, testExpected)); +} + +testOne(); +testTwo(); +testThree(); +testFour(); +testFive(); +fullTestOne(); +testFive(); +fullTestOne(3); +testFive(3); +fullTestTwo(); +fullTestThree(); +rangeTestOne(); +rangeTestTwo(); +fullTestFour(); +})(); diff --git a/src/mongo/db/pipeline/document_source_densify.cpp b/src/mongo/db/pipeline/document_source_densify.cpp index a82646ed1a2..53d72a8429c 100644 --- a/src/mongo/db/pipeline/document_source_densify.cpp +++ b/src/mongo/db/pipeline/document_source_densify.cpp @@ -203,12 +203,6 @@ list<intrusive_ptr<DocumentSource>> create(const intrusive_ptr<ExpressionContext } // namespace document_source_densify namespace { -Value getDensifyValue(const Document& doc, const FieldPath& path) { - Value val = doc.getNestedField(path); - uassert(5733201, "Densify field type must be numeric", val.numeric()); - return val; -} - Value addValues(Value lhs, Value rhs) { return uassertStatusOK(ExpressionAdd::apply(lhs, rhs)); } @@ -343,27 +337,37 @@ bool DocumentSourceInternalDensify::DocGenerator::done() const { return _state == GeneratorState::kDone; } -// TODO SERVER-57344: Execution flow should be refactored such that std::visits are done in these -// functions instead of the doGetNext(). This is to avoid the need to pass NumericBounds to avoid -// duplicate std::visits in functions like this. DocumentSource::GetNextResult DocumentSourceInternalDensify::densifyAfterEOF(NumericBounds bounds) { // Once we have hit an EOF, if the last seen value (_current) plus the step is greater // than or equal to the rangeMax, that means we have finished densifying // over the explicit range so we just return an EOF. Otherwise, we finish // densifying over the rest of the range. - if (compareValues(addValues(stdx::get<Value>(*_current), _range.getStep()) >= bounds.second)) { + if (!_current) { + // We've seen no documents yet. + auto bounds = _range.getBounds(); + tassert(5734403, + "Expected numeric explicit range", + stdx::holds_alternative<NumericBounds>(bounds)); + auto lowerBound = stdx::get<NumericBounds>(bounds).first; + _current = lowerBound; + _docGenerator = DocGenerator( + lowerBound, _range, _field, boost::none, boost::none, pExpCtx->getValueComparator()); + } else if (compareValues(addValues(stdx::get<Value>(*_current), _range.getStep()) >= + bounds.second)) { _densifyState = DensifyState::kDensifyDone; return DocumentSource::GetNextResult::makeEOF(); } else { - _docGenerator = DocGenerator(addValues(stdx::get<Value>(*_current), _range.getStep()), - _range, - _field, - boost::none, - boost::none, - pExpCtx->getValueComparator()); - _densifyState = DensifyState::kHaveGenerator; - return _docGenerator->getNextDocument(); + auto lowerBound = addValues(stdx::get<Value>(*_current), _range.getStep()); + _docGenerator = DocGenerator( + lowerBound, _range, _field, boost::none, boost::none, pExpCtx->getValueComparator()); + } + _densifyState = DensifyState::kHaveGenerator; + auto generatedDoc = _docGenerator->getNextDocument(); + if (_docGenerator->done()) { + _densifyState = DensifyState::kDensifyDone; + _docGenerator = boost::none; } + return generatedDoc; } DocumentSource::GetNextResult DocumentSourceInternalDensify::processDocAboveMinBound( @@ -373,35 +377,72 @@ DocumentSource::GetNextResult DocumentSourceInternalDensify::processDocAboveMinB tassert(8423306, "Cannot be in this state if _current is greater than the upper bound.", compareValues(stdx::get<Value>(*_current) <= bounds.second)); + // _current is the last seen value, don't generate it again. + Value lowerBound = addValues(stdx::get<Value>(*_current), _range.getStep()); + auto rem = valOffsetFromStep(val, stdx::get<Value>(*_current), _range.getStep()); // If val is on the step we need to subtract the step to avoid returning the doc twice. if (compareValues(rem == Value(0))) { + // If val is the next value to be generated, just return it. + if (compareValues(val == lowerBound)) { + setPartitionValue(doc); + _current = lowerBound; + return doc; + } val = subtractValues(val, _range.getStep()); } Value upperBound = (compareValues(val <= bounds.second)) ? val : bounds.second; - - _docGenerator = - DocGenerator(*_current, - RangeStatement(_range.getStep(), - NumericBounds(stdx::get<Value>(*_current), upperBound), - _range.getUnit()), - _field, - boost::none, - std::move(doc), - pExpCtx->getValueComparator()); + _docGenerator = DocGenerator( + lowerBound, + RangeStatement(_range.getStep(), NumericBounds(lowerBound, upperBound), _range.getUnit()), + _field, + _partitionExpr ? boost::make_optional<Document>(getDensifyPartition(doc).getDocument()) + : boost::none, + doc, + pExpCtx->getValueComparator()); Document nextFromGen = _docGenerator->getNextDocument(); - _current = getDensifyValue(nextFromGen, _field); + _current = getDensifyValue(nextFromGen); _densifyState = DensifyState::kHaveGenerator; // If the doc generator is done it will be deleted and the state will be kNeedGen. resetDocGen(bounds); + setPartitionValue(nextFromGen); return nextFromGen; } +DocumentSource::GetNextResult DocumentSourceInternalDensify::processFirstDocForExplicitRange( + Value val, NumericBounds bounds, Document doc) { + // For the first document in a partition, '_current' is the minimum value - step. + if (!_current) { + _current = subtractValues(bounds.first, _range.getStep()); + } + auto where = processRange(val, stdx::get<Value>(*_current), bounds); + switch (where) { + case ValComparedToRange::kInside: { + return processDocAboveMinBound(val, bounds, doc); + } + case ValComparedToRange::kAbove: { + return processDocAboveMinBound(val, bounds, doc); + } + case ValComparedToRange::kRangeMin: { + _densifyState = DensifyState::kNeedGen; + _current = val; + return doc; + } + case ValComparedToRange::kBelow: { + return doc; + } + } + MONGO_UNREACHABLE_TASSERT(5733414); + return DocumentSource::GetNextResult::makeEOF(); +} + /** Checks if the generator is done, changes states accordingly. */ void DocumentSourceInternalDensify::resetDocGen(NumericBounds bounds) { if (_docGenerator->done()) { - if (compareValues(stdx::get<Value>(*_current) >= bounds.second)) { + if (compareValues(stdx::get<Value>(*_current) >= bounds.second) && !_partitionExpr) { _densifyState = DensifyState::kDensifyDone; + } else if (_partitionExpr && _eof) { + _densifyState = DensifyState::kFinishingDensify; } else { _densifyState = DensifyState::kNeedGen; } @@ -424,30 +465,103 @@ DocumentSourceInternalDensify::ValComparedToRange DocumentSourceInternalDensify: } } + +DocumentSource::GetNextResult DocumentSourceInternalDensify::finishDensifyingPartitionedInputHelper( + Value max, boost::optional<Value> minOverride) { + while (_partitionTable.size() != 0) { + auto firstPartitionKeyVal = _partitionTable.begin(); + Value firstPartition = firstPartitionKeyVal->first; + DensifyValueType firstPartitionVal = firstPartitionKeyVal->second; + // We've already seen the stored value, we want to start generating on the next + // one. + auto valToGenerate = addValues(stdx::get<Value>(firstPartitionVal), _range.getStep()); + // If the valToGenerate is > max seen, skip this partition. It is done. + if (compareValues(valToGenerate, max) > 0) { + _partitionTable.erase(firstPartitionKeyVal); + continue; + } + // If the valToGenerate is < 'minOverride', use the override instead. + if (minOverride && compareValues(valToGenerate, *minOverride) < 0) { + valToGenerate = *minOverride; + } + _docGenerator = DocGenerator( + valToGenerate, + RangeStatement(_range.getStep(), NumericBounds(valToGenerate, max), _range.getUnit()), + _field, + firstPartition.getDocument(), + boost::none, // final doc. + pExpCtx->getValueComparator()); + // Remove this partition from the table, we're done with it. + _partitionTable.erase(firstPartitionKeyVal); + _densifyState = DensifyState::kHaveGenerator; + auto nextDoc = _docGenerator->getNextDocument(); + if (_docGenerator->done()) { + _docGenerator = boost::none; + _densifyState = DensifyState::kFinishingDensify; + } + return DocumentSource::GetNextResult(std::move(nextDoc)); + } + _densifyState = DensifyState::kDensifyDone; + return DocumentSource::GetNextResult::makeEOF(); +} +DocumentSource::GetNextResult DocumentSourceInternalDensify::finishDensifyingPartitionedInput() { + // If the partition map is empty, we're done. + if (_partitionTable.size() == 0) { + _densifyState = DensifyState::kDensifyDone; + return DocumentSource::GetNextResult::makeEOF(); + } + return stdx::visit(visit_helper::Overloaded{[&](Full full) { + // Densify between partitions's last seen value + // and global max. + return finishDensifyingPartitionedInputHelper( + stdx::get<Value>(*_globalMax)); + }, + [&](Partition partition) { + // Partition bounds don't do any extra work + // after EOF; + MONGO_UNREACHABLE; + return DocumentSource::GetNextResult::makeEOF(); + }, + [&](DateBounds bounds) { + // Not implemented. + MONGO_UNREACHABLE; + return DocumentSource::GetNextResult::makeEOF(); + }, + [&](NumericBounds bounds) { + // Densify between partitions's last seen value + // and global max. Use the override for the + // global min. + return finishDensifyingPartitionedInputHelper( + bounds.second, bounds.first); + }}, + _range.getBounds()); +} + DocumentSource::GetNextResult DocumentSourceInternalDensify::handleSourceExhausted() { _eof = true; return stdx::visit( visit_helper::Overloaded{ [&](RangeStatement::Full full) { - _densifyState = DensifyState::kDensifyDone; - return DocumentSource::GetNextResult::makeEOF(); + if (_partitionExpr) { + return finishDensifyingPartitionedInput(); + } else { + _densifyState = DensifyState::kDensifyDone; + return DocumentSource::GetNextResult::makeEOF(); + } }, [&](RangeStatement::Partition part) { - MONGO_UNREACHABLE; + // We have already densified up to the last document in each partition. + _densifyState = DensifyState::kDensifyDone; return DocumentSource::GetNextResult::makeEOF(); }, [&](RangeStatement::DateBounds bounds) { // TODO SERVER-57340 and SERVER-57342 - tasserted(5734000, "Type of densify should not be kPartition"); + tasserted(5734000, "Type of densify should not be explicit date bounds"); return DocumentSource::GetNextResult::makeEOF(); }, [&](RangeStatement::NumericBounds bounds) { - // The _current is treated as the last seen value. Therefore, when creating document - // generators we pass in the _current + the step in order to avoid returning the - // same document twice. However, if we have yet to densify, we do not want to skip - // the current value of _current, so the step is decremented here to avoid that. - if (_densifyState == DensifyState::kUninitializedOrBelowRange) { - _current = subtractValues(stdx::get<Value>(*_current), _range.getStep()); + if (_partitionExpr) { + return finishDensifyingPartitionedInput(); } return densifyAfterEOF(bounds); }, @@ -455,16 +569,32 @@ DocumentSource::GetNextResult DocumentSourceInternalDensify::handleSourceExhaust _range.getBounds()); } -DocumentSource::GetNextResult DocumentSourceInternalDensify::handleNeedGenFull(Document currentDoc, - Value max) { - // Note that max is not the global max, its only the max up to the current document. + +int DocumentSourceInternalDensify::compareToNextStep(const Value& val) { Value currentPlusStep = addValues(stdx::get<Value>(*_current), _range.getStep()); + return compareValues(currentPlusStep, val); +} + +DocumentSource::GetNextResult DocumentSourceInternalDensify::handleNeedGen(Document currentDoc, + Value max) { + // Note that max is not the global max, its only the max up to the current document. + auto compRes = compareToNextStep(max); - if (compareValues(currentPlusStep >= max)) { + // If the current value is the next value to be generated, save it as the current (last seen) + // value. + if (compRes == 0) { + setPartitionValue(currentDoc); _current = max; + } + // If the current value is before the next generated step, don't save it so we can correctly + // generate the next value later. + if (compRes >= 0) { return currentDoc; } + // Falling through the above conditions means the currentDoc is strictly greater than the last + // seen document plus the step value. + // This line checks if we are aligned on the step by checking if the current // value in the document minus the min is divisible by the step. If it is we // subtract step from max. This is neccessary so we don't generate the final @@ -478,8 +608,10 @@ DocumentSource::GetNextResult DocumentSourceInternalDensify::handleNeedGenFull(D DensifyValueType(newCurrent), RangeStatement(_range.getStep(), NumericBounds(newCurrent, maxAdjusted), _range.getUnit()), _field, - boost::none, - std::move(currentDoc), + _partitionExpr + ? boost::make_optional<Document>(getDensifyPartition(currentDoc).getDocument()) + : boost::none, + currentDoc, pExpCtx->getValueComparator()); _densifyState = DensifyState::kHaveGenerator; @@ -488,8 +620,10 @@ DocumentSource::GetNextResult DocumentSourceInternalDensify::handleNeedGenFull(D _docGenerator = boost::none; _densifyState = DensifyState::kNeedGen; } - _current = getDensifyValue(nextDoc, _field); - + // Documents generated by the generator are always on the step. + _current = getDensifyValue(nextDoc); + // If we are partitioned, save the most recent doc. + setPartitionValue(nextDoc); return nextDoc; } @@ -498,26 +632,36 @@ DocumentSource::GetNextResult DocumentSourceInternalDensify::handleNeedGenExplic auto where = processRange(val, stdx::get<Value>(*_current), bounds); switch (where) { case ValComparedToRange::kInside: { - _current = addValues(stdx::get<Value>(*_current), _range.getStep()); - if (compareValues(stdx::get<Value>(*_current) == val)) { + auto nextStep = addValues(stdx::get<Value>(*_current), _range.getStep()); + if (compareValues(val == nextStep)) { + _current = val; + setPartitionValue(currentDoc); + return currentDoc; + } else if (compareValues(val < nextStep)) { return currentDoc; } return processDocAboveMinBound(val, bounds, currentDoc); } case ValComparedToRange::kAbove: { - _current = addValues(stdx::get<Value>(*_current), _range.getStep()); - if (compareValues(stdx::get<Value>(*_current) > bounds.second)) { - _densifyState = DensifyState::kDensifyDone; + auto nextStep = addValues(stdx::get<Value>(*_current), _range.getStep()); + if (compareValues(nextStep > bounds.second)) { + _current = nextStep; + // If we are partitioning other partitions may still need to densify. + setPartitionValue(currentDoc); + if (!_partitionExpr) { + _densifyState = DensifyState::kDensifyDone; + } return currentDoc; } return processDocAboveMinBound(val, bounds, currentDoc); } case ValComparedToRange::kRangeMin: { - _current = addValues(stdx::get<Value>(*_current), _range.getStep()); - _densifyState = DensifyState::kUninitializedOrBelowRange; + setPartitionValue(currentDoc); + _current = val; return currentDoc; } case ValComparedToRange::kBelow: { + setPartitionValue(currentDoc); _densifyState = DensifyState::kUninitializedOrBelowRange; return currentDoc; } @@ -550,6 +694,18 @@ Value DocumentSourceInternalDensify::serialize( return Value(out.freezeToValue()); } +void DocumentSourceInternalDensify::initializePartitionState(Document initialDoc) { + // We check whether there is anything in _partitions during parsing. + std::vector<std::pair<std::string, boost::intrusive_ptr<mongo::Expression>>> partitionExp; + for (FieldPath p : _partitions) { + partitionExp.push_back({p.fullPath(), + ExpressionFieldPath::createPathFromString( + pExpCtx.get(), p.fullPath(), pExpCtx->variablesParseState)}); + } + _partitionExpr = ExpressionObject::create(pExpCtx.get(), std::move(partitionExp)); + setPartitionValue(initialDoc); +} + DocumentSource::GetNextResult DocumentSourceInternalDensify::doGetNext() { switch (_densifyState) { case DensifyState::kUninitializedOrBelowRange: { @@ -570,42 +726,34 @@ DocumentSource::GetNextResult DocumentSourceInternalDensify::doGetNext() { return nextDoc; } - Value val = getDensifyValue(doc, _field); + Value val = getDensifyValue(doc); + + // If we have partitions specified, setup the partition expression and table. + if (_partitions.size() != 0 && !_partitionExpr) { + initializePartitionState(doc); + } return stdx::visit( visit_helper::Overloaded{ [&](Full full) { _current = val; + _globalMin = val; _densifyState = DensifyState::kNeedGen; return nextDoc; }, [&](Partition partition) { - tasserted(5734001, "Type of densify should not be 'partition'"); - return DocumentSource::GetNextResult::makeEOF(); + tassert(5734400, + "Partition state must be initialized for partition bounds", + _partitionExpr); + _densifyState = DensifyState::kNeedGen; + return nextDoc; }, [&](DateBounds bounds) { tasserted(5733412, "Type of densify should not be date bounds"); return DocumentSource::GetNextResult::makeEOF(); }, [&](NumericBounds bounds) { - auto where = processRange(val, stdx::get<Value>(*_current), bounds); - switch (where) { - case ValComparedToRange::kInside: { - return processDocAboveMinBound(val, bounds, nextDoc.getDocument()); - } - case ValComparedToRange::kAbove: { - return processDocAboveMinBound(val, bounds, nextDoc.getDocument()); - } - case ValComparedToRange::kRangeMin: { - _current = addValues(stdx::get<Value>(*_current), _range.getStep()); - return nextDoc; - } - case ValComparedToRange::kBelow: { - return nextDoc; - } - } - tasserted(5733414, "One of the switch statements should have been hit."); - return DocumentSource::GetNextResult::makeEOF(); + return processFirstDocForExplicitRange(val, bounds, doc); }}, _range.getBounds()); } @@ -625,24 +773,65 @@ DocumentSource::GetNextResult DocumentSourceInternalDensify::doGetNext() { // The densify field is not present, let document pass unmodified. return nextDoc; } - Value val = getDensifyValue(currentDoc, _field); + Value val = getDensifyValue(currentDoc); return stdx::visit( visit_helper::Overloaded{ [&](Full full) { - _current = *_current; - return handleNeedGenFull(currentDoc, val); + if (_partitionExpr) { + // Keep track of '_globalMax' for later. The latest document from the + // source is always the max. + _globalMax = val; + // If we haven't seen this partition before, densify between + // '_globalMin' and this value. + auto partitionVal = getDensifyPartition(currentDoc); + auto foundPartitionVal = _partitionTable.find(partitionVal); + if (foundPartitionVal == _partitionTable.end()) { + // _current represents the last value seen. We want to generate + // _globalMin, so pretend we've seen the value before that. + _current = + subtractValues(stdx::get<Value>(*_globalMin), _range.getStep()); + // Insert the new partition into the table. + setPartitionValue(currentDoc); + return handleNeedGen(currentDoc, val); + } + // Otherwise densify between the last seen value and this one. + _current = foundPartitionVal->second; + } + return handleNeedGen(currentDoc, val); }, [&](Partition partition) { - // TODO SERVER-57340 and SERVER-57342 - tasserted(5734003, "Type of densify should not be kPartition"); - return DocumentSource::GetNextResult::makeEOF(); + // If we haven't seen this partition before, add it to the table then + // return. + auto partitionVal = getDensifyPartition(currentDoc); + auto foundPartitionVal = _partitionTable.find(partitionVal); + if (foundPartitionVal == _partitionTable.end()) { + setPartitionValue(currentDoc); + return nextDoc; + } + // Reset current to be the last value in this partition. + _current = foundPartitionVal->second; + return handleNeedGen(currentDoc, val); }, [&](DateBounds bounds) { MONGO_UNREACHABLE; return DocumentSource::GetNextResult::makeEOF(); }, [&](NumericBounds bounds) { + if (_partitionExpr) { + // If we haven't seen this partition before, add it to the table then + // check where it is in the range. + auto partitionVal = getDensifyPartition(currentDoc); + auto foundPartitionVal = _partitionTable.find(partitionVal); + if (foundPartitionVal == _partitionTable.end()) { + setPartitionValue(currentDoc); + // This partition has seen no values. + _current = boost::none; + return processFirstDocForExplicitRange(val, bounds, currentDoc); + } + // Otherwise reset current to be the last value in this partition. + _current = foundPartitionVal->second; + } return handleNeedGenExplicit(nextDoc.getDocument(), val, bounds); }}, _range.getBounds()); @@ -659,34 +848,58 @@ DocumentSource::GetNextResult DocumentSourceInternalDensify::doGetNext() { [&](Full full) { if (_docGenerator->done()) { _docGenerator = boost::none; - _densifyState = DensifyState::kNeedGen; + if (_eof && _partitionExpr) { + _densifyState = DensifyState::kFinishingDensify; + } else { + _densifyState = DensifyState::kNeedGen; + } + } + // The generator's final document may not be on the step. + auto genDensifyVal = getDensifyValue(generatedDoc); + if (compareToNextStep(genDensifyVal) == 0) { + _current = genDensifyVal; + setPartitionValue(generatedDoc); } - _current = getDensifyValue(generatedDoc, _field); return GetNextResult(std::move(generatedDoc)); }, [&](Partition partition) { - // TODO SERVER-57340 and SERVER-57342 - tasserted(5734004, "Type of densify should not be kPartition"); - return DocumentSource::GetNextResult::makeEOF(); + if (_docGenerator->done()) { + _docGenerator = boost::none; + _densifyState = DensifyState::kNeedGen; + } + // The generator's final document may not be on the step. + auto genDensifyVal = getDensifyValue(generatedDoc); + if (compareToNextStep(genDensifyVal) == 0) { + _current = genDensifyVal; + setPartitionValue(generatedDoc); + } + return GetNextResult(std::move(generatedDoc)); }, [&](DateBounds bounds) { MONGO_UNREACHABLE; return DocumentSource::GetNextResult::makeEOF(); }, [&](NumericBounds bounds) { - auto val = getDensifyValue(generatedDoc, _field); + auto val = getDensifyValue(generatedDoc); // Only want to update the rangeMin if the value - current is divisible by // the step. auto rem = valOffsetFromStep(val, stdx::get<Value>(*_current), _range.getStep()); if (compareValues(rem == Value(0))) { _current = val; + setPartitionValue(generatedDoc); } resetDocGen(bounds); return GetNextResult(std::move(generatedDoc)); }}, _range.getBounds()); } + case DensifyState::kFinishingDensify: { + tassert(5734402, + "Densify expected to have already hit EOF in FinishingDensify state", + _eof); + return finishDensifyingPartitionedInput(); + } case DensifyState::kDensifyDone: { // In the full range, this should only return EOF. // In the explicit range we finish densifying over the range and any remaining documents diff --git a/src/mongo/db/pipeline/document_source_densify.h b/src/mongo/db/pipeline/document_source_densify.h index df0f4d0edd3..9ef885eb848 100644 --- a/src/mongo/db/pipeline/document_source_densify.h +++ b/src/mongo/db/pipeline/document_source_densify.h @@ -30,9 +30,11 @@ #pragma once #include "mongo/db/exec/document_value/value.h" +#include "mongo/db/exec/document_value/value_comparator.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_densify_gen.h" #include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/field_path.h" #include "mongo/db/query/datetime/date_time_support.h" #include "mongo/util/time_support.h" @@ -140,23 +142,8 @@ public: : DocumentSource(kStageName, pExpCtx), _field(std::move(field)), _partitions(std::move(partitions)), - _range(std::move(range)) { - _current = stdx::visit( - visit_helper::Overloaded{ - [&](RangeStatement::Full full) -> boost::optional<DensifyValueType> { - return boost::none; - }, - [&](RangeStatement::Partition partition) -> boost::optional<DensifyValueType> { - return boost::none; - }, - [&](RangeStatement::DateBounds bounds) -> boost::optional<DensifyValueType> { - return DensifyValueType(bounds.first); - }, - [&](RangeStatement::NumericBounds bounds) -> boost::optional<DensifyValueType> { - return DensifyValueType(bounds.first); - }}, - _range.getBounds()); - }; + _range(std::move(range)), + _partitionTable(pExpCtx->getValueComparator().makeUnorderedValueMap<Value>()){}; class DocGenerator { public: @@ -241,19 +228,40 @@ private: kAbove, }; + Value getDensifyValue(const Document& doc) { + Value val = doc.getNestedField(_field); + uassert(5733201, "Densify field type must be numeric", val.numeric()); + return val; + } + + Value getDensifyPartition(const Document& doc) { + auto part = _partitionExpr->evaluate(doc, &pExpCtx->variables); + return part; + } + + /** + * Returns <0 for below step, 0 for equal to step, >0 for greater than step. + */ + int compareToNextStep(const Value& val); + bool compareValues(Value::DeferredComparison deferredComparison) { return pExpCtx->getValueComparator().evaluate(deferredComparison); } + /** + * Returns <0 if 'lhs' is less than 'rhs', 0 if 'lhs' is equal to 'rhs', and >0 if 'lhs' is + * greater than 'rhs'. + */ int compareValues(const Value& lhs, const Value& rhs) { return pExpCtx->getValueComparator().compare(lhs, rhs); } /** * Decides whether or not to build a DocGen and return the first document generated or return - * the current doc if the rangeMin + step is greater than rangeMax. + * the current doc if the rangeMin + step is greater than rangeMax. Used for both 'full' and + * 'partition' bounds. */ - DocumentSource::GetNextResult handleNeedGenFull(Document currentDoc, Value max); + DocumentSource::GetNextResult handleNeedGen(Document currentDoc, Value max); /** * Checks where the current doc's value lies compared to the range and creates the correct @@ -267,11 +275,18 @@ private: * Takes care of when an EOF has been hit for the explicit case. It checks if we have finished * densifying over the range, and if so changes the state to be kDensify done. Otherwise it * builds a new generator that will finish densifying over the range and changes the state to - * kHaveGen. + * kHaveGen. Only used if the input is not partitioned. */ DocumentSource::GetNextResult densifyAfterEOF(RangeStatement::NumericBounds); /** + * Decide what to do for the first document in a given partition for explicit range. Either + * generate documents between the minimum and the value, or just return it. + */ + DocumentSource::GetNextResult processFirstDocForExplicitRange( + Value val, RangeStatement::NumericBounds bounds, Document doc); + + /** * Creates a document generator based on the value passed in, the current _current, and the * NumericBounds. Once created, the state changes to kHaveGenerator and the first document from * the generator is returned. @@ -295,6 +310,15 @@ private: DocumentSource::GetNextResult handleSourceExhausted(); /** + * Handles building a document generator once we've seen an EOF for partitioned input. Min will + * be the last seen value in the partition unless it is less than the optional 'minOverride'. + * Helper is to share code between visit functions. + */ + DocumentSource::GetNextResult finishDensifyingPartitionedInput(); + DocumentSource::GetNextResult finishDensifyingPartitionedInputHelper( + Value max, boost::optional<Value> minOverride = boost::none); + + /** * Checks if the current document generator is done. If it is and we have finished densifying, * it changes the state to be kDensifyDone. If there is more to densify, the state becomes * kNeedGen. The generator is also deleted. @@ -305,17 +329,43 @@ private: Value diff = uassertStatusOK(ExpressionSubtract::apply(val, sub)); return uassertStatusOK(ExpressionMod::apply(diff, step)); } + + /** + * Set up the state for densifying over partitions. + */ + void initializePartitionState(Document initialDoc); + + /** + * Helper to set the value in the partition table. + */ + void setPartitionValue(Document doc) { + if (_partitionExpr) { + _partitionTable[getDensifyPartition(doc)] = getDensifyValue(doc); + } + } boost::optional<DocGenerator> _docGenerator = boost::none; /** - * The minimum value that the document generator will create, therefore the next generated - * document will have this value. This is also used as last seen value by the explicit case. + * The last value seen or generated by the stage that is also in line with the step. */ boost::optional<DensifyValueType> _current = boost::none; + // Used to keep track of the bounds for densification in the full case. + boost::optional<DensifyValueType> _globalMin = boost::none; + boost::optional<DensifyValueType> _globalMax = boost::none; + + // Expression to be used to compare partitions. + boost::intrusive_ptr<ExpressionObject> _partitionExpr; + bool _eof = false; - enum class DensifyState { kUninitializedOrBelowRange, kNeedGen, kHaveGenerator, kDensifyDone }; + enum class DensifyState { + kUninitializedOrBelowRange, + kNeedGen, + kHaveGenerator, + kFinishingDensify, + kDensifyDone + }; enum class TypeOfDensify { kFull, @@ -328,5 +378,7 @@ private: FieldPath _field; std::list<FieldPath> _partitions; RangeStatement _range; + // Store of the value we've seen for each partition. + ValueUnorderedMap<Value> _partitionTable; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_densify_test.cpp b/src/mongo/db/pipeline/document_source_densify_test.cpp index e62e2fc4873..309421c4acf 100644 --- a/src/mongo/db/pipeline/document_source_densify_test.cpp +++ b/src/mongo/db/pipeline/document_source_densify_test.cpp @@ -47,6 +47,7 @@ using Full = RangeStatement::Full; using GenClass = DocumentSourceInternalDensify::DocGenerator; using DensifyFullNumericTest = AggregationContextFixture; using DensifyExplicitNumericTest = AggregationContextFixture; +using DensifyPartitionNumericTest = AggregationContextFixture; using DensifyCloneTest = AggregationContextFixture; MONGO_INITIALIZER_GENERAL(turnOnDensifyFlag, @@ -1173,6 +1174,42 @@ TEST_F(DensifyExplicitNumericTest, DensificationForNumericValuesErrorsIfFieldIsN ASSERT_THROWS_CODE(densify.getNext(), AssertionException, 5733201); } +TEST_F(DensifyExplicitNumericTest, DensifiesOnImmediateEOF) { + auto densify = DocumentSourceInternalDensify( + getExpCtx(), + "a", + std::list<FieldPath>(), + RangeStatement(Value(1), NumericBounds(Value(0), Value(1)), boost::none)); + auto source = DocumentSourceMock::createForTest({}, getExpCtx()); + densify.setSource(source.get()); + auto next = densify.getNext(); + ASSERT(next.isAdvanced()); + ASSERT_EQUALS(0, next.getDocument().getField("a").getDouble()); + next = densify.getNext(); + ASSERT(next.isAdvanced()); + ASSERT_EQUALS(1, next.getDocument().getField("a").getDouble()); + next = densify.getNext(); + ASSERT_FALSE(next.isAdvanced()); +} + +TEST_F(DensifyFullNumericTest, DensifiesOnImmediateEOF) { + auto densify = DocumentSourceInternalDensify( + getExpCtx(), "a", std::list<FieldPath>(), RangeStatement(Value(3), Full(), boost::none)); + auto source = DocumentSourceMock::createForTest({}, getExpCtx()); + densify.setSource(source.get()); + auto next = densify.getNext(); + ASSERT_FALSE(next.isAdvanced()); +} + +TEST_F(DensifyPartitionNumericTest, DensifiesOnImmediateEOF) { + auto densify = DocumentSourceInternalDensify( + getExpCtx(), "a", std::list<FieldPath>(), RangeStatement(Value(3), Full(), boost::none)); + auto source = DocumentSourceMock::createForTest({}, getExpCtx()); + densify.setSource(source.get()); + auto next = densify.getNext(); + ASSERT_FALSE(next.isAdvanced()); +} + TEST_F(DensifyCloneTest, InternalDesnifyCanBeCloned) { std::list<boost::intrusive_ptr<DocumentSource>> sources; @@ -1184,5 +1221,6 @@ TEST_F(DensifyCloneTest, InternalDesnifyCanBeCloned) { auto pipe = Pipeline::create(sources, getExpCtx()); auto clonedPipe = pipe->clone(); } + } // namespace } // namespace mongo |