diff options
Diffstat (limited to 'src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp | 275 |
1 files changed, 0 insertions, 275 deletions
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp index 92e43593e32..9c0ca61a175 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp @@ -812,280 +812,5 @@ TEST_F(InternalUnpackBucketExecTest, ParserRejectsBothIncludeAndExcludeParameter AssertionException, 5408000); } - -TEST_F(InternalUnpackBucketExecTest, BucketUnpackerExtractSingleMeasurement) { - auto expCtx = getExpCtx(); - - std::set<std::string> fields{ - "_id", kUserDefinedMetaName.toString(), kUserDefinedTimeName.toString(), "a", "b"}; - auto spec = BucketSpec{ - kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)}; - auto unpacker = BucketUnpacker{std::move(spec), BucketUnpacker::Behavior::kInclude, true, true}; - - auto d1 = dateFromISOString("2020-02-17T00:00:00.000Z").getValue(); - auto d2 = dateFromISOString("2020-02-17T01:00:00.000Z").getValue(); - auto d3 = dateFromISOString("2020-02-17T02:00:00.000Z").getValue(); - auto bucket = BSON("meta" << BSON("m1" << 999 << "m2" << 9999) << "data" - << BSON("_id" << BSON("0" << 1 << "1" << 2 << "2" << 3) << "time" - << BSON("0" << d1 << "1" << d2 << "2" << d3) << "a" - << BSON("0" << 1 << "1" << 2 << "2" << 3) << "b" - << BSON("1" << 1 << "2" << 2))); - - unpacker.reset(std::move(bucket)); - - auto next = unpacker.extractSingleMeasurement(0); - auto expected = Document{ - {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 1}, {"time", d1}, {"a", 1}}; - ASSERT_DOCUMENT_EQ(next, expected); - - next = unpacker.extractSingleMeasurement(2); - expected = Document{{"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, - {"_id", 3}, - {"time", d3}, - {"a", 3}, - {"b", 2}}; - ASSERT_DOCUMENT_EQ(next, expected); - - next = unpacker.extractSingleMeasurement(1); - expected = Document{{"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, - {"_id", 2}, - {"time", d2}, - {"a", 2}, - {"b", 1}}; - ASSERT_DOCUMENT_EQ(next, expected); - - // Can we extract the middle element again? - next = unpacker.extractSingleMeasurement(1); - ASSERT_DOCUMENT_EQ(next, expected); -} - -TEST_F(InternalUnpackBucketExecTest, BucketUnpackerExtractSingleMeasurementSparse) { - auto expCtx = getExpCtx(); - - std::set<std::string> fields{ - "_id", kUserDefinedMetaName.toString(), kUserDefinedTimeName.toString(), "a", "b"}; - auto spec = BucketSpec{ - kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)}; - auto unpacker = BucketUnpacker{std::move(spec), BucketUnpacker::Behavior::kInclude, true, true}; - - auto d1 = dateFromISOString("2020-02-17T00:00:00.000Z").getValue(); - auto d2 = dateFromISOString("2020-02-17T01:00:00.000Z").getValue(); - auto bucket = BSON("meta" << BSON("m1" << 999 << "m2" << 9999) << "data" - << BSON("_id" << BSON("0" << 1 << "1" << 2) << "time" - << BSON("0" << d1 << "1" << d2) << "a" << BSON("0" << 1) - << "b" << BSON("1" << 1))); - - unpacker.reset(std::move(bucket)); - auto next = unpacker.extractSingleMeasurement(1); - auto expected = Document{ - {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 2}, {"time", d2}, {"b", 1}}; - ASSERT_DOCUMENT_EQ(next, expected); - - // Can we extract the same element again? - next = unpacker.extractSingleMeasurement(1); - ASSERT_DOCUMENT_EQ(next, expected); - - next = unpacker.extractSingleMeasurement(0); - expected = Document{ - {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 1}, {"time", d1}, {"a", 1}}; - ASSERT_DOCUMENT_EQ(next, expected); - - // Can we extract the same element twice in a row? - next = unpacker.extractSingleMeasurement(0); - ASSERT_DOCUMENT_EQ(next, expected); - - next = unpacker.extractSingleMeasurement(0); - ASSERT_DOCUMENT_EQ(next, expected); -} - -class InternalUnpackBucketRandomSampleTest : public AggregationContextFixture { -protected: - BSONObj makeIncludeAllSpec() { - return BSON("$_internalUnpackBucket" - << BSON("include" << BSON_ARRAY("_id" - << "time" << kUserDefinedMetaName << "a" - << "b") - << timeseries::kTimeFieldName << kUserDefinedTimeName - << timeseries::kMetaFieldName << kUserDefinedMetaName)); - } - - boost::intrusive_ptr<DocumentSource> makeUnpackStage(const BSONObj& spec, - long long nSample, - int bucketMaxCount) { - auto ds = - DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), getExpCtx()); - auto unpack = dynamic_cast<DocumentSourceInternalUnpackBucket*>(ds.get()); - unpack->setSampleParameters(nSample, bucketMaxCount); - return unpack; - } - - boost::intrusive_ptr<DocumentSource> makeInternalUnpackBucketSample(int nSample, - int nBuckets, - int nMeasurements) { - auto spec = makeIncludeAllSpec(); - generateBuckets(nBuckets, nMeasurements); - auto ds = - DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), getExpCtx()); - auto unpack = dynamic_cast<DocumentSourceInternalUnpackBucket*>(ds.get()); - unpack->setSampleParameters(nSample, 1000); - return unpack; - } - - boost::intrusive_ptr<DocumentSource> prepareMock() { - auto mock = DocumentSourceMock::createForTest(getExpCtx()); - for (auto&& b : _buckets) { - mock->push_back(DocumentSource::GetNextResult{std::move(b)}); - } - return mock; - } - - Document makeBucketPart(int nMeasurements, std::function<Value(int)> gen) { - auto doc = MutableDocument{}; - for (auto i = 0; i < nMeasurements; ++i) { - doc.addField(std::to_string(i), gen(i)); - } - return doc.freeze(); - } - - void generateBuckets(int nBuckets, int nMeasurements) { - auto& prng = getExpCtx()->opCtx->getClient()->getPrng(); - std::vector<Document> buckets; - for (auto m = 0; m < nBuckets; m++) { - auto idDoc = makeBucketPart(nMeasurements, [](int i) { return Value{OID::gen()}; }); - auto timeDoc = makeBucketPart(nMeasurements, [](int i) { return Value{Date_t{}}; }); - auto aCol = makeBucketPart(nMeasurements, - [&](int i) { return Value{prng.nextCanonicalDouble()}; }); - buckets.push_back({Document{ - {"_id", Value{OID::gen()}}, - {"meta", Document{{"m1", m}, {"m2", m + 1}}}, - {"data", - Document{{"_id", idDoc}, {"time", std::move(timeDoc)}, {"a", std::move(aCol)}}}}}); - } - - _buckets = std::move(buckets); - } - -private: - std::vector<Document> _buckets; -}; - -TEST_F(InternalUnpackBucketRandomSampleTest, SampleHasExpectedStatProperties) { - auto unpack = makeInternalUnpackBucketSample(100, 1000, 1000); - auto mock = prepareMock(); - unpack->setSource(mock.get()); - - auto next = unpack->getNext(); - ASSERT_TRUE(next.isAdvanced()); - - auto avg = 0.0; - auto nSampled = 0; - while (next.isAdvanced()) { - avg += next.getDocument()["a"].getDouble(); - next = unpack->getNext(); - nSampled++; - } - avg /= nSampled; - ASSERT_EQ(nSampled, 100); - - // The average for the uniform distribution on [0, 1) is ~0.5, and the stdev is sqrt(1/12). - // We will check if the avg is between +/- 2*sqrt(1/12). - auto stddev = std::sqrt(1.0 / 12.0); - ASSERT_GT(avg, 0.5 - 2 * stddev); - ASSERT_LT(avg, 0.5 + 2 * stddev); -} - -TEST_F(InternalUnpackBucketRandomSampleTest, SampleIgnoresDuplicates) { - auto spec = BSON("$_internalUnpackBucket" - << BSON("include" << BSON_ARRAY("_id" - << "time" << kUserDefinedMetaName << "a" - << "b") - << timeseries::kTimeFieldName << kUserDefinedTimeName - << timeseries::kMetaFieldName << kUserDefinedMetaName)); - - // Make an unpack bucket stage initialized with a sample size of 2 and bucketMaxCount of 1. - auto unpack = makeUnpackStage(spec, 2, 1); - - // Fill mock with duplicate buckets to simulate random sampling the same buckets over and over - // again until the 'kMaxAttempts' are reached in 'doGetNext'. - auto mock = DocumentSourceMock::createForTest(getExpCtx()); - for (auto i = 0; i < 101; ++i) { - mock->push_back(Document{{"_id", Value{OID::createFromString("000000000000000000000001")}}, - {"meta", Document{{"m1", 1}, {"m2", 2}}}, - {"data", - Document{{"_id", Document{{"0", 1}}}, - {"time", Document{{"0", Date_t::now()}}}, - {"a", Document{{"0", 1}}}}}}); - } - unpack->setSource(mock.get()); - - // The sample size is 2 and there's only one unique measurement in the mock. The second - // 'getNext' call should spin until the it reaches 'kMaxAttempts' of tries and then throw. - ASSERT_TRUE(unpack->getNext().isAdvanced()); - ASSERT_THROWS_CODE(unpack->getNext(), AssertionException, 5422103); -} - -namespace { -/** - * Manually computes the timestamp object size for n timestamps. - */ -auto expectedTimestampObjSize(int32_t rowKeyOffset, int32_t n) { - BSONObjBuilder bob; - for (auto i = 0; i < n; ++i) { - bob.appendDate(std::to_string(i + rowKeyOffset), Date_t::now()); - } - return bob.done().objsize(); -} -} // namespace - -TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountLowerBoundsAreCorrect) { - // The last table entry is a sentinel for an upper bound on the interval that covers measurement - // counts up to 16 MB. - const auto maxTableEntry = BucketUnpacker::kTimestampObjSizeTable.size() - 1; - - // Test the case when the target size hits a table entry which represents the lower bound of an - // interval. - for (size_t index = 0; index < maxTableEntry; ++index) { - auto interval = BucketUnpacker::kTimestampObjSizeTable[index]; - ASSERT_EQ(interval.first, BucketUnpacker::computeMeasurementCount(interval.second)); - } -} - -TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountUpperBoundsAreCorrect) { - const auto maxTableEntry = BucketUnpacker::kTimestampObjSizeTable.size() - 1; - - // The lower bound sizes of each interval in the kTimestampObjSizeTable are hardcoded. Use this - // fact and walk the table backwards to check the correctness of the S_i'th interval's upper - // bound by using the lower bound size for the S_i+1 interval and subtracting the BSONObj size - // containing one timestamp with the appropriate rowKey. - std::pair<int, int> currentInterval; - auto currentIntervalSize = 0; - auto currentIntervalCount = 0; - auto size = 0; - for (size_t index = maxTableEntry; index > 0; --index) { - currentInterval = BucketUnpacker::kTimestampObjSizeTable[index]; - currentIntervalSize = currentInterval.second; - currentIntervalCount = currentInterval.first; - auto rowKey = currentIntervalCount - 1; - size = expectedTimestampObjSize(rowKey, 1); - // We need to add back the kMinBSONLength since it's subtracted out. - ASSERT_EQ(currentIntervalCount - 1, - BucketUnpacker::computeMeasurementCount(currentIntervalSize - size + - BSONObj::kMinBSONLength)); - } -} - -TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountAllPointsInSmallerIntervals) { - // Test all values for some of the smaller intervals up to 100 measurements. - for (auto bucketCount = 0; bucketCount < 25; ++bucketCount) { - auto size = expectedTimestampObjSize(0, bucketCount); - ASSERT_EQ(bucketCount, BucketUnpacker::computeMeasurementCount(size)); - } -} - -TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountInLargerIntervals) { - ASSERT_EQ(2222, BucketUnpacker::computeMeasurementCount(30003)); - ASSERT_EQ(11111, BucketUnpacker::computeMeasurementCount(155560)); - ASSERT_EQ(449998, BucketUnpacker::computeMeasurementCount(7088863)); -} } // namespace } // namespace mongo |