diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2015-08-07 16:10:23 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2015-08-27 13:01:51 -0400 |
commit | d88cb83e39b4cdddc9bdcf98f667f6cabdadc8ec (patch) | |
tree | 6a3ee70ea056ddab3a5801a783d009cd925724de /src | |
parent | 843fd8ae34d06ed73f824c124cd2d9ef7d23b814 (diff) | |
download | mongo-d88cb83e39b4cdddc9bdcf98f667f6cabdadc8ec.tar.gz |
SERVER-19182 Integrate storage engine optimizations into $sample stage
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document.h | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_internal.h | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source.h | 61 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_sample.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp | 142 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_test.cpp | 177 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_value_test.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 158 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.h | 15 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp | 3 | ||||
-rw-r--r-- | src/mongo/platform/random.cpp | 10 | ||||
-rw-r--r-- | src/mongo/platform/random.h | 5 | ||||
-rw-r--r-- | src/mongo/platform/random_test.cpp | 46 |
16 files changed, 583 insertions, 75 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index d307503afc8..c9b43aeb967 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -108,6 +108,7 @@ docSourceEnv.Library( 'document_source_project.cpp', 'document_source_redact.cpp', 'document_source_sample.cpp', + 'document_source_sample_from_random_cursor.cpp', 'document_source_skip.cpp', 'document_source_sort.cpp', 'document_source_unwind.cpp', diff --git a/src/mongo/db/pipeline/document.cpp b/src/mongo/db/pipeline/document.cpp index a0dd7dc8287..ca6a6f73bde 100644 --- a/src/mongo/db/pipeline/document.cpp +++ b/src/mongo/db/pipeline/document.cpp @@ -253,7 +253,7 @@ BSONObj Document::toBsonWithMetaData() const { if (hasTextScore()) bb.append(metaFieldTextScore, getTextScore()); if (hasRandMetaField()) - bb.append(metaFieldRandVal, static_cast<long long>(getRandMetaField())); + bb.append(metaFieldRandVal, getRandMetaField()); return bb.obj(); } @@ -269,7 +269,7 @@ Document Document::fromBsonWithMetaData(const BSONObj& bson) { md.setTextScore(elem.Double()); continue; } else if (fieldName == metaFieldRandVal) { - md.setRandMetaField(static_cast<int64_t>(elem.numberLong())); + md.setRandMetaField(elem.Double()); continue; } } @@ -438,7 +438,7 @@ void Document::serializeForSorter(BufBuilder& buf) const { } if (hasRandMetaField()) { buf.appendNum(char(DocumentStorage::MetaType::RAND_VAL + 1)); - buf.appendNum(static_cast<long long>(getRandMetaField())); + buf.appendNum(getRandMetaField()); } buf.appendNum(char(0)); } @@ -455,7 +455,7 @@ Document Document::deserializeForSorter(BufReader& buf, const SorterDeserializeS if (marker == char(DocumentStorage::MetaType::TEXT_SCORE) + 1) { doc.setTextScore(buf.read<double>()); } else if (marker == char(DocumentStorage::MetaType::RAND_VAL) + 1) { - doc.setRandMetaField(buf.read<int64_t>()); + doc.setRandMetaField(buf.read<double>()); } else { uasserted(28744, "Unrecognized marker, unable to deserialize buffer"); } diff --git a/src/mongo/db/pipeline/document.h b/src/mongo/db/pipeline/document.h index c6a100c954b..bc5f49bee3c 100644 --- a/src/mongo/db/pipeline/document.h +++ b/src/mongo/db/pipeline/document.h @@ -205,7 +205,7 @@ public: bool hasRandMetaField() const { return storage().hasRandMetaField(); } - int64_t getRandMetaField() const { + double getRandMetaField() const { return storage().getRandMetaField(); } @@ -429,7 +429,7 @@ public: storage().setTextScore(score); } - void setRandMetaField(int64_t val) { + void setRandMetaField(double val) { storage().setRandMetaField(val); } diff --git a/src/mongo/db/pipeline/document_internal.h b/src/mongo/db/pipeline/document_internal.h index 1c0d1a13566..67b46a6b3a3 100644 --- a/src/mongo/db/pipeline/document_internal.h +++ b/src/mongo/db/pipeline/document_internal.h @@ -297,10 +297,10 @@ public: bool hasRandMetaField() const { return _metaFields.test(MetaType::RAND_VAL); } - int64_t getRandMetaField() const { + double getRandMetaField() const { return _randVal; } - void setRandMetaField(int64_t val) { + void setRandMetaField(double val) { _metaFields.set(MetaType::RAND_VAL); _randVal = val; } @@ -385,7 +385,7 @@ private: std::bitset<MetaType::NUM_FIELDS> _metaFields; double _textScore; - int64_t _randVal; + double _randVal; // When adding a field, make sure to update clone() method }; } diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 4adfccb5b17..d2a6727a911 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -62,6 +62,7 @@ class ExpressionFieldPath; class ExpressionObject; class DocumentSourceLimit; class PlanExecutor; +class RecordCursor; /** * Registers a DocumentSource to have the name 'key'. When a stage with name '$key' is found, @@ -917,21 +918,77 @@ public: const char* getSourceName() const final; Value serialize(bool explain = false) const final; + GetDepsReturn getDependencies(DepsTracker* deps) const final { + return SEE_NEXT; + } + boost::intrusive_ptr<DocumentSource> getShardSource() final; boost::intrusive_ptr<DocumentSource> getMergeSource() final; + long long getSampleSize() const { + return _size; + } + static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); private: explicit DocumentSourceSample(const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + long long _size; - // When no storage engine optimizations are available, $sample uses a $sort stage to randomly - // sort the documents. + // Uses a $sort stage to randomly sort the documents. boost::intrusive_ptr<DocumentSourceSort> _sortStage; }; +/** + * This class is not a registered stage, it is only used as an optimized replacement for $sample + * when the storage engine allows us to use a random cursor. + */ +class DocumentSourceSampleFromRandomCursor final : public DocumentSource { +public: + boost::optional<Document> getNext() final; + const char* getSourceName() const final; + Value serialize(bool explain = false) const final; + GetDepsReturn getDependencies(DepsTracker* deps) const final; + + static boost::intrusive_ptr<DocumentSourceSampleFromRandomCursor> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + long long size, + std::string idField, + long long collectionSize); + +private: + DocumentSourceSampleFromRandomCursor(const boost::intrusive_ptr<ExpressionContext>& expCtx, + long long size, + std::string idField, + long long collectionSize); + + /** + * Keep asking for documents from the random cursor until it yields a new document. Errors if a + * a document is encountered without a value for '_idField', or if the random cursor keeps + * returning duplicate elements. + */ + boost::optional<Document> getNextNonDuplicateDocument(); + + long long _size; + + // The field to use as the id of a document. Usually '_id', but 'ts' for the oplog. + std::string _idField; + + // Keeps track of the documents that have been returned, since a random cursor is allowed to + // return duplicates. + ValueSet _seenDocs; + + // The approximate number of documents in the collection (includes orphans). + const long long _nDocsInColl; + + // The value to be assigned to the randMetaField of outcoming documents. Each call to getNext() + // will decrement this value by an amount scaled by _nDocsInColl as an attempt to appear as if + // the documents were produced by a top-k random sort. + double _randMetaFieldVal = 1.0; +}; + class DocumentSourceLimit final : public DocumentSource, public SplittableDocumentSource { public: // virtuals from DocumentSource diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp index 146a27c1241..6a7f96f5ed5 100644 --- a/src/mongo/db/pipeline/document_source_sample.cpp +++ b/src/mongo/db/pipeline/document_source_sample.cpp @@ -30,8 +30,6 @@ #include "mongo/db/pipeline/document_source.h" -#include <vector> - #include "mongo/db/client.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/expression.h" @@ -61,8 +59,7 @@ boost::optional<Document> DocumentSourceSample::getNext() { PseudoRandom& prng = pExpCtx->opCtx->getClient()->getPrng(); while (boost::optional<Document> next = pSource->getNext()) { MutableDocument doc(std::move(*next)); - // Add random metadata field. - doc.setRandMetaField(prng.nextInt64()); + doc.setRandMetaField(prng.nextCanonicalDouble()); _sortStage->loadDocument(doc.freeze()); } _sortStage->loadingDone(); diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp new file mode 100644 index 00000000000..802a5d05aab --- /dev/null +++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp @@ -0,0 +1,142 @@ +/** + * Copyright (C) 2015 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source.h" + +#include <boost/math/distributions/beta.hpp> + +#include "mongo/db/client.h" +#include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/value.h" +#include "mongo/util/log.h" + +namespace mongo { +using boost::intrusive_ptr; + +DocumentSourceSampleFromRandomCursor::DocumentSourceSampleFromRandomCursor( + const intrusive_ptr<ExpressionContext>& pExpCtx, + long long size, + std::string idField, + long long nDocsInCollection) + : DocumentSource(pExpCtx), + _size(size), + _idField(std::move(idField)), + _nDocsInColl(nDocsInCollection) {} + +const char* DocumentSourceSampleFromRandomCursor::getSourceName() const { + return "$sampleFromRandomCursor"; +} + +namespace { +/** + * Select a random value drawn according to the distribution Beta(alpha=1, beta=N). The kth smallest + * value of a sample of size N from a Uniform(0, 1) distribution has a Beta(k, N + 1 - k) + * distribution, so the return value represents the smallest value from such a sample. This is also + * the expected distance between the values drawn from a uniform distribution, which is how it is + * being used here. + */ +double smallestFromSampleOfUniform(PseudoRandom* prng, size_t N) { + boost::math::beta_distribution<double> betaDist(1.0, static_cast<double>(N)); + double p = prng->nextCanonicalDouble(); + return boost::math::quantile(betaDist, p); +} +} // namespace + +boost::optional<Document> DocumentSourceSampleFromRandomCursor::getNext() { + pExpCtx->checkForInterrupt(); + + if (_seenDocs.size() >= static_cast<size_t>(_size)) + return {}; + + auto doc = getNextNonDuplicateDocument(); + if (!doc) + return {}; + + // Assign it a random value to enable merging by random value, attempting to avoid bias in that + // process. + auto& prng = pExpCtx->opCtx->getClient()->getPrng(); + _randMetaFieldVal -= smallestFromSampleOfUniform(&prng, _nDocsInColl); + + MutableDocument md(std::move(*doc)); + md.setRandMetaField(_randMetaFieldVal); + return md.freeze(); +} + +boost::optional<Document> DocumentSourceSampleFromRandomCursor::getNextNonDuplicateDocument() { + // We may get duplicate documents back from the random cursor, and should not return duplicate + // documents, so keep trying until we get a new one. + const int kMaxAttempts = 100; + for (int i = 0; i < kMaxAttempts; ++i) { + auto doc = pSource->getNext(); + if (!doc) + return doc; + + auto idField = (*doc)[_idField]; + uassert( + 28793, + str::stream() + << "The optimized $sample stage requires all documents have a " << _idField + << " field in order to de-duplicate results, but encountered a document without a " + << _idField << " field: " << (*doc).toString(), + !idField.missing()); + + if (_seenDocs.insert(std::move(idField)).second) { + return doc; + } + LOG(1) << "$sample encountered duplicate document: " << (*doc).toString() << std::endl; + } + uasserted(28799, + str::stream() << "$sample stage could not find a non-duplicate document after " + << kMaxAttempts << " while using a random cursor. This is likely a " + "sporadic failure, please try again."); +} + +Value DocumentSourceSampleFromRandomCursor::serialize(bool explain) const { + return Value(DOC(getSourceName() << DOC("size" << _size))); +} + +DocumentSource::GetDepsReturn DocumentSourceSampleFromRandomCursor::getDependencies( + DepsTracker* deps) const { + deps->fields.insert(_idField); + return SEE_NEXT; +} + +intrusive_ptr<DocumentSourceSampleFromRandomCursor> DocumentSourceSampleFromRandomCursor::create( + const intrusive_ptr<ExpressionContext>& expCtx, + long long size, + std::string idField, + long long nDocsInCollection) { + return new DocumentSourceSampleFromRandomCursor(expCtx, size, idField, nDocsInCollection); +} +} // mongo diff --git a/src/mongo/db/pipeline/document_source_test.cpp b/src/mongo/db/pipeline/document_source_test.cpp index 2d9e3afa535..3fe97dec7b0 100644 --- a/src/mongo/db/pipeline/document_source_test.cpp +++ b/src/mongo/db/pipeline/document_source_test.cpp @@ -1037,7 +1037,7 @@ public: SampleBasics() : _mock(DocumentSourceMock::create()) {} protected: - void createSample(long long size) { + virtual void createSample(long long size) { BSONObj spec = BSON("$sample" << BSON("size" << size)); BSONElement specElement = spec.firstElement(); _sample = DocumentSourceSample::createFromBson(specElement, ctx()); @@ -1045,7 +1045,7 @@ protected: checkBsonRepresentation(spec); } - DocumentSourceSample* sample() { + DocumentSource* sample() { return static_cast<DocumentSourceSample*>(_sample.get()); } @@ -1094,19 +1094,20 @@ protected: ASSERT(!_sample->getNext()); } +protected: + intrusive_ptr<DocumentSource> _sample; + intrusive_ptr<DocumentSourceMock> _mock; + private: /** * Check that the BSON representation generated by the souce matches the BSON it was * created with. */ void checkBsonRepresentation(const BSONObj& spec) { - Value serialized = sample()->serialize(false); + Value serialized = static_cast<DocumentSourceSample*>(sample())->serialize(false); auto generatedSpec = serialized.getDocument().toBson(); ASSERT_EQUALS(spec, generatedSpec); } - - intrusive_ptr<DocumentSource> _sample; - intrusive_ptr<DocumentSourceMock> _mock; }; /** @@ -1192,6 +1193,166 @@ TEST_F(InvalidSampleSpec, MissingSize) { ASSERT_THROWS_CODE(createSample(createSpec(BSONObj())), UserException, 28749); } +namespace DocumentSourceSampleFromRandomCursor { +using mongo::DocumentSourceSampleFromRandomCursor; + +class SampleFromRandomCursorBasics : public SampleBasics { +public: + void createSample(long long size) override { + _sample = DocumentSourceSampleFromRandomCursor::create(ctx(), size, "_id", 100); + sample()->setSource(_mock.get()); + } +}; + +/** + * A sample of size zero should not return any results. + */ +TEST_F(SampleFromRandomCursorBasics, ZeroSize) { + loadDocuments(2); + checkResults(0, 0); +} + +/** + * When sampling with a size smaller than the number of documents our source stage can produce, + * there should be no more than the sample size output. + */ +TEST_F(SampleFromRandomCursorBasics, SourceExhaustedBeforeSample) { + loadDocuments(5); + checkResults(10, 5); +} + +/** + * When the source stage runs out of documents, the $sampleFromRandomCursors stage should be + * exhausted. + */ +TEST_F(SampleFromRandomCursorBasics, SampleExhaustedBeforeSource) { + loadDocuments(10); + checkResults(5, 5); +} + +/** + * The $sampleFromRandomCursor stage should not modify the contents of the documents. + */ +TEST_F(SampleFromRandomCursorBasics, DocsUnmodified) { + createSample(1); + source()->queue.push_back(DOC("_id" << 1 << "b" << DOC("c" << 2))); + auto next = sample()->getNext(); + ASSERT_TRUE(bool(next)); + Document doc = *next; + ASSERT_EQUALS(1, doc["_id"].getInt()); + ASSERT_EQUALS(2, doc["b"]["c"].getInt()); + ASSERT_TRUE(doc.hasRandMetaField()); + assertExhausted(); +} + +/** + * The $sampleFromRandomCursor stage should ignore duplicate documents. + */ +TEST_F(SampleFromRandomCursorBasics, IgnoreDuplicates) { + createSample(2); + source()->queue.push_back(DOC("_id" << 1)); + source()->queue.push_back(DOC("_id" << 1)); // Duplicate, should ignore. + source()->queue.push_back(DOC("_id" << 2)); + + auto next = sample()->getNext(); + ASSERT_TRUE(bool(next)); + Document doc = *next; + ASSERT_EQUALS(1, doc["_id"].getInt()); + ASSERT_TRUE(doc.hasRandMetaField()); + double doc1Meta = doc.getRandMetaField(); + + // Should ignore the duplicate {_id: 1}, and return {_id: 2}. + next = sample()->getNext(); + ASSERT_TRUE(bool(next)); + doc = *next; + ASSERT_EQUALS(2, doc["_id"].getInt()); + ASSERT_TRUE(doc.hasRandMetaField()); + double doc2Meta = doc.getRandMetaField(); + ASSERT_GTE(doc1Meta, doc2Meta); + + // Both stages should be exhausted. + ASSERT_FALSE(source()->getNext()); + assertExhausted(); +} + +/** + * The $sampleFromRandomCursor stage should error if it receives too many duplicate documents. + */ +TEST_F(SampleFromRandomCursorBasics, TooManyDups) { + createSample(2); + for (int i = 0; i < 1000; i++) { + source()->queue.push_back(DOC("_id" << 1)); + } + + // First should be successful, it's not a duplicate. + ASSERT_TRUE(bool(sample()->getNext())); + + // The rest are duplicates, should error. + ASSERT_THROWS_CODE(sample()->getNext(), UserException, 28799); +} + +/** + * The $sampleFromRandomCursor stage should error if it receives a document without an _id. + */ +TEST_F(SampleFromRandomCursorBasics, MissingIdField) { + // Once with only a bad document. + createSample(2); // _idField is '_id'. + source()->queue.push_back(DOC("non_id" << 2)); + ASSERT_THROWS_CODE(sample()->getNext(), UserException, 28793); + + // Again, with some regular documents before a bad one. + createSample(2); // _idField is '_id'. + source()->queue.push_back(DOC("_id" << 1)); + source()->queue.push_back(DOC("_id" << 1)); + source()->queue.push_back(DOC("non_id" << 2)); + + // First should be successful. + ASSERT_TRUE(bool(sample()->getNext())); + + ASSERT_THROWS_CODE(sample()->getNext(), UserException, 28793); +} + +/** + * The $sampleFromRandomCursor stage should set the random meta value in a way that mimics the + * non-optimized case. + * + * This fails currently, but should be re-enabled once SERVER-20121 is resolved. + */ +#if 0 +TEST_F(SampleFromRandomCursorBasics, MimicNonOptimized) { + // Compute the average random meta value on the each doc returned. + double firstTotal = 0.0; + double secondTotal = 0.0; + int nTrials = 10000; + for (int i = 0; i < nTrials; i++) { + // Sample 2 out of 3 documents. + _sample = DocumentSourceSampleFromRandomCursor::create(ctx(), 2, "_id", 3); + sample()->setSource(_mock.get()); + + source()->queue.push_back(DOC("_id" << 1)); + source()->queue.push_back(DOC("_id" << 2)); + + auto doc = sample()->getNext(); + ASSERT_TRUE(bool(doc)); + ASSERT_TRUE((*doc).hasRandMetaField()); + firstTotal += (*doc).getRandMetaField(); + + doc = sample()->getNext(); + ASSERT_TRUE(bool(doc)); + ASSERT_TRUE((*doc).hasRandMetaField()); + secondTotal += (*doc).getRandMetaField(); + } + // The average random meta value of the first document should be about 0.75. + ASSERT_GTE(firstTotal / nTrials, 0.74); + ASSERT_LTE(firstTotal / nTrials, 0.76); + + // The average random meta value of the second document should be about 0.5. + ASSERT_GTE(secondTotal / nTrials, 0.49); + ASSERT_LTE(secondTotal / nTrials, 0.51); +} +#endif +} // namespace DocumentSourceSampleFromRandomCursor + } // namespace DocumentSourceSample namespace DocumentSourceSort { @@ -1548,10 +1709,10 @@ class RandMeta : public CheckResultsBase { std::deque<Document> inputData() { MutableDocument first; first["_id"] = Value(0); - first.setRandMetaField(10); + first.setRandMetaField(0.01); MutableDocument second; second["_id"] = Value(1); - second.setRandMetaField(20); + second.setRandMetaField(0.02); return {first.freeze(), second.freeze()}; } diff --git a/src/mongo/db/pipeline/document_value_test.cpp b/src/mongo/db/pipeline/document_value_test.cpp index 6b2269948e6..cf7e9b00cc2 100644 --- a/src/mongo/db/pipeline/document_value_test.cpp +++ b/src/mongo/db/pipeline/document_value_test.cpp @@ -452,18 +452,18 @@ TEST(MetaFields, RandValBasics) { // Setting the random value field should work as expected. MutableDocument docBuilder; - docBuilder.setRandMetaField(1); + docBuilder.setRandMetaField(1.0); Document doc = docBuilder.freeze(); ASSERT_TRUE(doc.hasRandMetaField()); ASSERT_EQ(1, doc.getRandMetaField()); // Setting the random value twice should keep the second value. MutableDocument docBuilder2; - docBuilder2.setRandMetaField(1); - docBuilder2.setRandMetaField(2); + docBuilder2.setRandMetaField(1.0); + docBuilder2.setRandMetaField(2.0); Document doc2 = docBuilder2.freeze(); ASSERT_TRUE(doc2.hasRandMetaField()); - ASSERT_EQ(2, doc2.getRandMetaField()); + ASSERT_EQ(2.0, doc2.getRandMetaField()); } class SerializationTest : public unittest::Test { @@ -493,7 +493,7 @@ protected: TEST_F(SerializationTest, MetaSerializationNoVals) { MutableDocument docBuilder; docBuilder.setTextScore(10.0); - docBuilder.setRandMetaField(20); + docBuilder.setRandMetaField(20.0); assertRoundTrips(docBuilder.freeze()); } @@ -501,14 +501,14 @@ TEST_F(SerializationTest, MetaSerializationWithVals) { // Same as above test, but add a non-meta field as well. MutableDocument docBuilder(DOC("foo" << 10)); docBuilder.setTextScore(10.0); - docBuilder.setRandMetaField(20); + docBuilder.setRandMetaField(20.0); assertRoundTrips(docBuilder.freeze()); } TEST(MetaFields, ToAndFromBson) { MutableDocument docBuilder; docBuilder.setTextScore(10.0); - docBuilder.setRandMetaField(20); + docBuilder.setRandMetaField(20.0); Document doc = docBuilder.freeze(); BSONObj obj = doc.toBsonWithMetaData(); ASSERT_EQ(10.0, obj[Document::metaFieldTextScore].Double()); diff --git a/src/mongo/db/pipeline/expression.cpp b/src/mongo/db/pipeline/expression.cpp index a91f475cd1f..f15564538f0 100644 --- a/src/mongo/db/pipeline/expression.cpp +++ b/src/mongo/db/pipeline/expression.cpp @@ -1908,8 +1908,7 @@ Value ExpressionMeta::evaluateInternal(Variables* vars) const { case MetaType::TEXT_SCORE: return root.hasTextScore() ? Value(root.getTextScore()) : Value(); case MetaType::RAND_VAL: - return root.hasRandMetaField() ? Value(static_cast<long long>(root.getRandMetaField())) - : Value(); + return root.hasRandMetaField() ? Value(root.getRandMetaField()) : Value(); } MONGO_UNREACHABLE; } diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 884a942ca36..7ea11b1b486 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -34,6 +34,10 @@ #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/document_validation.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/exec/multi_iterator.h" +#include "mongo/db/exec/working_set.h" +#include "mongo/db/exec/shard_filter.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/pipeline/document_source.h" @@ -41,9 +45,11 @@ #include "mongo/db/query/get_executor.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/service_context.h" +#include "mongo/db/storage/record_store.h" #include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_state.h" #include "mongo/s/chunk_version.h" +#include "mongo/stdx/memory.h" namespace mongo { @@ -91,9 +97,46 @@ private: intrusive_ptr<ExpressionContext> _ctx; DBDirectClient _client; }; + +/** + * Returns a PlanExecutor which uses a random cursor to sample documents if successful. Returns {} + * if the storage engine doesn't support random cursors, or if 'sampleSize' is a large enough + * percentage of the collection. + */ +shared_ptr<PlanExecutor> createRandomCursorExecutor(Collection* collection, + OperationContext* txn, + long long sampleSize, + long long numRecords) { + double kMaxSampleRatioForRandCursor = 0.05; + if (sampleSize > numRecords * kMaxSampleRatioForRandCursor || numRecords <= 100) + return {}; + + // Attempt to get a random cursor from the storage engine. + auto randCursor = collection->getRecordStore()->getRandomCursor(txn); + + if (!randCursor) + return {}; + + auto ws = stdx::make_unique<WorkingSet>(); + auto stage = stdx::make_unique<MultiIteratorStage>(txn, ws.get(), collection); + stage->addIterator(std::move(randCursor)); + + // If we're in a sharded environment, we need to filter out documents we don't own. + if (ShardingState::get(getGlobalServiceContext()) + ->needCollectionMetadata(txn->getClient(), txn->getNS())) { + auto shardFilterStage = stdx::make_unique<ShardFilterStage>( + txn, + ShardingState::get(getGlobalServiceContext())->getCollectionMetadata(txn->getNS()), + ws.get(), + stage.release()); + return uassertStatusOK(PlanExecutor::make( + txn, std::move(ws), std::move(shardFilterStage), collection, PlanExecutor::YIELD_AUTO)); + } + + return uassertStatusOK(PlanExecutor::make( + txn, std::move(ws), std::move(stage), collection, PlanExecutor::YIELD_AUTO)); } -namespace { StatusWith<std::unique_ptr<PlanExecutor>> attemptToGetExecutor( OperationContext* txn, Collection* collection, @@ -126,9 +169,6 @@ shared_ptr<PlanExecutor> PipelineD::prepareCursorSource( Collection* collection, const intrusive_ptr<Pipeline>& pPipeline, const intrusive_ptr<ExpressionContext>& pExpCtx) { - // get the full "namespace" name - const string& fullName = pExpCtx->ns.ns(); - // We will be modifying the source vector as we go. Pipeline::SourceContainer& sources = pPipeline->sources; @@ -141,18 +181,37 @@ shared_ptr<PlanExecutor> PipelineD::prepareCursorSource( } } - if (!sources.empty() && sources.front()->isValidInitialSource()) { - if (dynamic_cast<DocumentSourceMergeCursors*>(sources.front().get())) { - // Enable the hooks for setting up authentication on the subsequent internal - // connections we are going to create. This would normally have been done - // when SetShardVersion was called, but since SetShardVersion is never called - // on secondaries, this is needed. - ShardedConnectionInfo::addHook(); + if (!sources.empty()) { + if (sources.front()->isValidInitialSource()) { + if (dynamic_cast<DocumentSourceMergeCursors*>(sources.front().get())) { + // Enable the hooks for setting up authentication on the subsequent internal + // connections we are going to create. This would normally have been done + // when SetShardVersion was called, but since SetShardVersion is never called + // on secondaries, this is needed. + ShardedConnectionInfo::addHook(); + } + return std::shared_ptr<PlanExecutor>(); // don't need a cursor + } + + // Optimize an initial $sample stage if possible. + if (auto sampleStage = dynamic_cast<DocumentSourceSample*>(sources.front().get())) { + const long long sampleSize = sampleStage->getSampleSize(); + const long long numRecords = collection->getRecordStore()->numRecords(txn); + auto exec = createRandomCursorExecutor(collection, txn, sampleSize, numRecords); + if (exec) { + // Replace $sample stage with $sampleFromRandomCursor stage. + sources.pop_front(); + std::string idString = collection->ns().isOplog() ? "ts" : "_id"; + sources.emplace_front(DocumentSourceSampleFromRandomCursor::create( + pExpCtx, sampleSize, idString, numRecords)); + + const BSONObj initialQuery; + return addCursorSource( + pPipeline, pExpCtx, exec, pPipeline->getDependencies(initialQuery)); + } } - return std::shared_ptr<PlanExecutor>(); // don't need a cursor } - // Look for an initial match. This works whether we got an initial query or not. // If not, it results in a "{}" query, which will be what we want in that case. const BSONObj queryObj = pPipeline->getInitialQuery(); @@ -188,35 +247,7 @@ shared_ptr<PlanExecutor> PipelineD::prepareCursorSource( auto exec = prepareExecutor( txn, collection, pPipeline, pExpCtx, sortStage, deps, queryObj, &sortObj, &projForQuery); - // DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved. We - // deregister the PlanExecutor so that it can be registered with ClientCursor. - exec->deregisterExec(); - exec->saveState(); - - // Put the PlanExecutor into a DocumentSourceCursor and add it to the front of the pipeline. - intrusive_ptr<DocumentSourceCursor> pSource = - DocumentSourceCursor::create(fullName, exec, pExpCtx); - - // Note the query, sort, and projection for explain. - pSource->setQuery(queryObj); - pSource->setSort(sortObj); - if (!projForQuery.isEmpty()) { - pSource->setProjection(projForQuery, boost::none); - } else { - // There may be fewer dependencies now if the sort was covered. - if (!sortObj.isEmpty()) { - deps = pPipeline->getDependencies(queryObj); - } - - pSource->setProjection(deps.toProjection(), deps.toParsedDeps()); - } - - while (!sources.empty() && pSource->coalesce(sources.front())) { - sources.pop_front(); - } - - pPipeline->addInitialSource(pSource); - return exec; + return addCursorSource(pPipeline, pExpCtx, exec, deps, queryObj, sortObj, projForQuery); } std::shared_ptr<PlanExecutor> PipelineD::prepareExecutor( @@ -311,4 +342,47 @@ std::shared_ptr<PlanExecutor> PipelineD::prepareExecutor( txn, collection, expCtx, queryObj, *projectionObj, *sortObj, plannerOpts)); } +shared_ptr<PlanExecutor> PipelineD::addCursorSource(const intrusive_ptr<Pipeline>& pipeline, + const intrusive_ptr<ExpressionContext>& expCtx, + shared_ptr<PlanExecutor> exec, + DepsTracker deps, + const BSONObj& queryObj, + const BSONObj& sortObj, + const BSONObj& projectionObj) { + // Get the full "namespace" name. + const string& fullName = expCtx->ns.ns(); + + // Put the PlanExecutor into a DocumentSourceCursor and add it to the front of the pipeline. + intrusive_ptr<DocumentSourceCursor> pSource = + DocumentSourceCursor::create(fullName, exec, expCtx); + + // Note the query, sort, and projection for explain. + pSource->setQuery(queryObj); + pSource->setSort(sortObj); + + if (!projectionObj.isEmpty()) { + pSource->setProjection(projectionObj, boost::none); + } else { + // There may be fewer dependencies now if the sort was covered. + if (!sortObj.isEmpty()) { + deps = pipeline->getDependencies(queryObj); + } + + pSource->setProjection(deps.toProjection(), deps.toParsedDeps()); + } + + while (!pipeline->sources.empty() && pSource->coalesce(pipeline->sources.front())) { + pipeline->sources.pop_front(); + } + + pipeline->addInitialSource(pSource); + + // DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved. We + // deregister the PlanExecutor so that it can be registered with ClientCursor. + exec->deregisterExec(); + exec->saveState(); + + return exec; +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index df9cc14f342..a9993c6331b 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -31,6 +31,8 @@ #include <boost/intrusive_ptr.hpp> #include <memory> +#include "mongo/bson/bsonobj.h" + namespace mongo { class Collection; class DocumentSourceCursor; @@ -102,6 +104,19 @@ private: const BSONObj& queryObj, BSONObj* sortObj, BSONObj* projectionObj); + + /** + * Creates a DocumentSourceCursor from the given PlanExecutor and adds it to the front of the + * Pipeline. + */ + static std::shared_ptr<PlanExecutor> addCursorSource( + const boost::intrusive_ptr<Pipeline>& pipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + std::shared_ptr<PlanExecutor> exec, + DepsTracker deps, + const BSONObj& queryObj = BSONObj(), + const BSONObj& sortObj = BSONObj(), + const BSONObj& projectionObj = BSONObj()); }; } // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index 393516736d5..ce56af2a184 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -309,7 +309,8 @@ public: } ~RandomCursor() { - detachFromOperationContext(); + if (_cursor) + detachFromOperationContext(); } boost::optional<Record> next() final { diff --git a/src/mongo/platform/random.cpp b/src/mongo/platform/random.cpp index b7fd984c411..ea641460037 100644 --- a/src/mongo/platform/random.cpp +++ b/src/mongo/platform/random.cpp @@ -40,6 +40,7 @@ #include <cstdlib> #include <iostream> #include <fstream> +#include <limits> #include "mongo/platform/basic.h" @@ -93,6 +94,15 @@ int64_t PseudoRandom::nextInt64() { return (a << 32) | b; } +double PseudoRandom::nextCanonicalDouble() { + double result; + do { + auto generated = static_cast<uint64_t>(nextInt64()); + result = static_cast<double>(generated) / std::numeric_limits<uint64_t>::max(); + } while (result == 1.0); + return result; +} + // --- SecureRandom ---- SecureRandom::~SecureRandom() {} diff --git a/src/mongo/platform/random.h b/src/mongo/platform/random.h index 29fe7e1c32c..8d43284b9a1 100644 --- a/src/mongo/platform/random.h +++ b/src/mongo/platform/random.h @@ -49,6 +49,11 @@ public: int64_t nextInt64(); /** + * Returns a random number in the range [0, 1). + */ + double nextCanonicalDouble(); + + /** * @return a number between 0 and max */ int32_t nextInt32(int32_t max) { diff --git a/src/mongo/platform/random_test.cpp b/src/mongo/platform/random_test.cpp index 3aecfb2e98b..c4e214f1af9 100644 --- a/src/mongo/platform/random_test.cpp +++ b/src/mongo/platform/random_test.cpp @@ -98,6 +98,52 @@ TEST(RandomTest, R2) { ASSERT_EQUALS(100U, s.size()); } +/** + * Test that if two PsuedoRandom's have the same seed, then subsequent calls to + * nextCanonicalDouble() will return the same value. + */ +TEST(RandomTest, NextCanonicalSameSeed) { + PseudoRandom a(12); + PseudoRandom b(12); + for (int i = 0; i < 100; i++) { + ASSERT_EQUALS(a.nextCanonicalDouble(), b.nextCanonicalDouble()); + } +} + +/** + * Test that if two PsuedoRandom's have different seeds, then nextCanonicalDouble() will return + * different values. + */ +TEST(RandomTest, NextCanonicalDifferentSeeds) { + PseudoRandom a(12); + PseudoRandom b(11); + ASSERT_NOT_EQUALS(a.nextCanonicalDouble(), b.nextCanonicalDouble()); +} + +/** + * Test that nextCanonicalDouble() avoids returning a value soon after it has previously returned + * that value. + */ +TEST(RandomTest, NextCanonicalDistinctValues) { + PseudoRandom a(11); + std::set<double> s; + for (int i = 0; i < 100; i++) { + s.insert(a.nextCanonicalDouble()); + } + ASSERT_EQUALS(100U, s.size()); +} + +/** + * Test that nextCanonicalDouble() always returns values between 0 and 1. + */ +TEST(RandomTest, NextCanonicalWithinRange) { + PseudoRandom prng(10); + for (int i = 0; i < 100; i++) { + double next = prng.nextCanonicalDouble(); + ASSERT_LTE(0.0, next); + ASSERT_LT(next, 1.0); + } +} TEST(RandomTest, Secure1) { SecureRandom* a = SecureRandom::create(); |