summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2015-08-07 16:10:23 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2015-08-27 13:01:51 -0400
commitd88cb83e39b4cdddc9bdcf98f667f6cabdadc8ec (patch)
tree6a3ee70ea056ddab3a5801a783d009cd925724de /src
parent843fd8ae34d06ed73f824c124cd2d9ef7d23b814 (diff)
downloadmongo-d88cb83e39b4cdddc9bdcf98f667f6cabdadc8ec.tar.gz
SERVER-19182 Integrate storage engine optimizations into $sample stage
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document.cpp8
-rw-r--r--src/mongo/db/pipeline/document.h4
-rw-r--r--src/mongo/db/pipeline/document_internal.h6
-rw-r--r--src/mongo/db/pipeline/document_source.h61
-rw-r--r--src/mongo/db/pipeline/document_source_sample.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp142
-rw-r--r--src/mongo/db/pipeline/document_source_test.cpp177
-rw-r--r--src/mongo/db/pipeline/document_value_test.cpp14
-rw-r--r--src/mongo/db/pipeline/expression.cpp3
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp158
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h15
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp3
-rw-r--r--src/mongo/platform/random.cpp10
-rw-r--r--src/mongo/platform/random.h5
-rw-r--r--src/mongo/platform/random_test.cpp46
16 files changed, 583 insertions, 75 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index d307503afc8..c9b43aeb967 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -108,6 +108,7 @@ docSourceEnv.Library(
'document_source_project.cpp',
'document_source_redact.cpp',
'document_source_sample.cpp',
+ 'document_source_sample_from_random_cursor.cpp',
'document_source_skip.cpp',
'document_source_sort.cpp',
'document_source_unwind.cpp',
diff --git a/src/mongo/db/pipeline/document.cpp b/src/mongo/db/pipeline/document.cpp
index a0dd7dc8287..ca6a6f73bde 100644
--- a/src/mongo/db/pipeline/document.cpp
+++ b/src/mongo/db/pipeline/document.cpp
@@ -253,7 +253,7 @@ BSONObj Document::toBsonWithMetaData() const {
if (hasTextScore())
bb.append(metaFieldTextScore, getTextScore());
if (hasRandMetaField())
- bb.append(metaFieldRandVal, static_cast<long long>(getRandMetaField()));
+ bb.append(metaFieldRandVal, getRandMetaField());
return bb.obj();
}
@@ -269,7 +269,7 @@ Document Document::fromBsonWithMetaData(const BSONObj& bson) {
md.setTextScore(elem.Double());
continue;
} else if (fieldName == metaFieldRandVal) {
- md.setRandMetaField(static_cast<int64_t>(elem.numberLong()));
+ md.setRandMetaField(elem.Double());
continue;
}
}
@@ -438,7 +438,7 @@ void Document::serializeForSorter(BufBuilder& buf) const {
}
if (hasRandMetaField()) {
buf.appendNum(char(DocumentStorage::MetaType::RAND_VAL + 1));
- buf.appendNum(static_cast<long long>(getRandMetaField()));
+ buf.appendNum(getRandMetaField());
}
buf.appendNum(char(0));
}
@@ -455,7 +455,7 @@ Document Document::deserializeForSorter(BufReader& buf, const SorterDeserializeS
if (marker == char(DocumentStorage::MetaType::TEXT_SCORE) + 1) {
doc.setTextScore(buf.read<double>());
} else if (marker == char(DocumentStorage::MetaType::RAND_VAL) + 1) {
- doc.setRandMetaField(buf.read<int64_t>());
+ doc.setRandMetaField(buf.read<double>());
} else {
uasserted(28744, "Unrecognized marker, unable to deserialize buffer");
}
diff --git a/src/mongo/db/pipeline/document.h b/src/mongo/db/pipeline/document.h
index c6a100c954b..bc5f49bee3c 100644
--- a/src/mongo/db/pipeline/document.h
+++ b/src/mongo/db/pipeline/document.h
@@ -205,7 +205,7 @@ public:
bool hasRandMetaField() const {
return storage().hasRandMetaField();
}
- int64_t getRandMetaField() const {
+ double getRandMetaField() const {
return storage().getRandMetaField();
}
@@ -429,7 +429,7 @@ public:
storage().setTextScore(score);
}
- void setRandMetaField(int64_t val) {
+ void setRandMetaField(double val) {
storage().setRandMetaField(val);
}
diff --git a/src/mongo/db/pipeline/document_internal.h b/src/mongo/db/pipeline/document_internal.h
index 1c0d1a13566..67b46a6b3a3 100644
--- a/src/mongo/db/pipeline/document_internal.h
+++ b/src/mongo/db/pipeline/document_internal.h
@@ -297,10 +297,10 @@ public:
bool hasRandMetaField() const {
return _metaFields.test(MetaType::RAND_VAL);
}
- int64_t getRandMetaField() const {
+ double getRandMetaField() const {
return _randVal;
}
- void setRandMetaField(int64_t val) {
+ void setRandMetaField(double val) {
_metaFields.set(MetaType::RAND_VAL);
_randVal = val;
}
@@ -385,7 +385,7 @@ private:
std::bitset<MetaType::NUM_FIELDS> _metaFields;
double _textScore;
- int64_t _randVal;
+ double _randVal;
// When adding a field, make sure to update clone() method
};
}
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 4adfccb5b17..d2a6727a911 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -62,6 +62,7 @@ class ExpressionFieldPath;
class ExpressionObject;
class DocumentSourceLimit;
class PlanExecutor;
+class RecordCursor;
/**
* Registers a DocumentSource to have the name 'key'. When a stage with name '$key' is found,
@@ -917,21 +918,77 @@ public:
const char* getSourceName() const final;
Value serialize(bool explain = false) const final;
+ GetDepsReturn getDependencies(DepsTracker* deps) const final {
+ return SEE_NEXT;
+ }
+
boost::intrusive_ptr<DocumentSource> getShardSource() final;
boost::intrusive_ptr<DocumentSource> getMergeSource() final;
+ long long getSampleSize() const {
+ return _size;
+ }
+
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
private:
explicit DocumentSourceSample(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
long long _size;
- // When no storage engine optimizations are available, $sample uses a $sort stage to randomly
- // sort the documents.
+ // Uses a $sort stage to randomly sort the documents.
boost::intrusive_ptr<DocumentSourceSort> _sortStage;
};
+/**
+ * This class is not a registered stage, it is only used as an optimized replacement for $sample
+ * when the storage engine allows us to use a random cursor.
+ */
+class DocumentSourceSampleFromRandomCursor final : public DocumentSource {
+public:
+ boost::optional<Document> getNext() final;
+ const char* getSourceName() const final;
+ Value serialize(bool explain = false) const final;
+ GetDepsReturn getDependencies(DepsTracker* deps) const final;
+
+ static boost::intrusive_ptr<DocumentSourceSampleFromRandomCursor> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ long long size,
+ std::string idField,
+ long long collectionSize);
+
+private:
+ DocumentSourceSampleFromRandomCursor(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ long long size,
+ std::string idField,
+ long long collectionSize);
+
+ /**
+ * Keep asking for documents from the random cursor until it yields a new document. Errors if a
+ * a document is encountered without a value for '_idField', or if the random cursor keeps
+ * returning duplicate elements.
+ */
+ boost::optional<Document> getNextNonDuplicateDocument();
+
+ long long _size;
+
+ // The field to use as the id of a document. Usually '_id', but 'ts' for the oplog.
+ std::string _idField;
+
+ // Keeps track of the documents that have been returned, since a random cursor is allowed to
+ // return duplicates.
+ ValueSet _seenDocs;
+
+ // The approximate number of documents in the collection (includes orphans).
+ const long long _nDocsInColl;
+
+ // The value to be assigned to the randMetaField of outcoming documents. Each call to getNext()
+ // will decrement this value by an amount scaled by _nDocsInColl as an attempt to appear as if
+ // the documents were produced by a top-k random sort.
+ double _randMetaFieldVal = 1.0;
+};
+
class DocumentSourceLimit final : public DocumentSource, public SplittableDocumentSource {
public:
// virtuals from DocumentSource
diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp
index 146a27c1241..6a7f96f5ed5 100644
--- a/src/mongo/db/pipeline/document_source_sample.cpp
+++ b/src/mongo/db/pipeline/document_source_sample.cpp
@@ -30,8 +30,6 @@
#include "mongo/db/pipeline/document_source.h"
-#include <vector>
-
#include "mongo/db/client.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/expression.h"
@@ -61,8 +59,7 @@ boost::optional<Document> DocumentSourceSample::getNext() {
PseudoRandom& prng = pExpCtx->opCtx->getClient()->getPrng();
while (boost::optional<Document> next = pSource->getNext()) {
MutableDocument doc(std::move(*next));
- // Add random metadata field.
- doc.setRandMetaField(prng.nextInt64());
+ doc.setRandMetaField(prng.nextCanonicalDouble());
_sortStage->loadDocument(doc.freeze());
}
_sortStage->loadingDone();
diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp
new file mode 100644
index 00000000000..802a5d05aab
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp
@@ -0,0 +1,142 @@
+/**
+ * Copyright (C) 2015 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source.h"
+
+#include <boost/math/distributions/beta.hpp>
+
+#include "mongo/db/client.h"
+#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/pipeline/value.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+using boost::intrusive_ptr;
+
+DocumentSourceSampleFromRandomCursor::DocumentSourceSampleFromRandomCursor(
+ const intrusive_ptr<ExpressionContext>& pExpCtx,
+ long long size,
+ std::string idField,
+ long long nDocsInCollection)
+ : DocumentSource(pExpCtx),
+ _size(size),
+ _idField(std::move(idField)),
+ _nDocsInColl(nDocsInCollection) {}
+
+const char* DocumentSourceSampleFromRandomCursor::getSourceName() const {
+ return "$sampleFromRandomCursor";
+}
+
+namespace {
+/**
+ * Select a random value drawn according to the distribution Beta(alpha=1, beta=N). The kth smallest
+ * value of a sample of size N from a Uniform(0, 1) distribution has a Beta(k, N + 1 - k)
+ * distribution, so the return value represents the smallest value from such a sample. This is also
+ * the expected distance between the values drawn from a uniform distribution, which is how it is
+ * being used here.
+ */
+double smallestFromSampleOfUniform(PseudoRandom* prng, size_t N) {
+ boost::math::beta_distribution<double> betaDist(1.0, static_cast<double>(N));
+ double p = prng->nextCanonicalDouble();
+ return boost::math::quantile(betaDist, p);
+}
+} // namespace
+
+boost::optional<Document> DocumentSourceSampleFromRandomCursor::getNext() {
+ pExpCtx->checkForInterrupt();
+
+ if (_seenDocs.size() >= static_cast<size_t>(_size))
+ return {};
+
+ auto doc = getNextNonDuplicateDocument();
+ if (!doc)
+ return {};
+
+ // Assign it a random value to enable merging by random value, attempting to avoid bias in that
+ // process.
+ auto& prng = pExpCtx->opCtx->getClient()->getPrng();
+ _randMetaFieldVal -= smallestFromSampleOfUniform(&prng, _nDocsInColl);
+
+ MutableDocument md(std::move(*doc));
+ md.setRandMetaField(_randMetaFieldVal);
+ return md.freeze();
+}
+
+boost::optional<Document> DocumentSourceSampleFromRandomCursor::getNextNonDuplicateDocument() {
+ // We may get duplicate documents back from the random cursor, and should not return duplicate
+ // documents, so keep trying until we get a new one.
+ const int kMaxAttempts = 100;
+ for (int i = 0; i < kMaxAttempts; ++i) {
+ auto doc = pSource->getNext();
+ if (!doc)
+ return doc;
+
+ auto idField = (*doc)[_idField];
+ uassert(
+ 28793,
+ str::stream()
+ << "The optimized $sample stage requires all documents have a " << _idField
+ << " field in order to de-duplicate results, but encountered a document without a "
+ << _idField << " field: " << (*doc).toString(),
+ !idField.missing());
+
+ if (_seenDocs.insert(std::move(idField)).second) {
+ return doc;
+ }
+ LOG(1) << "$sample encountered duplicate document: " << (*doc).toString() << std::endl;
+ }
+ uasserted(28799,
+ str::stream() << "$sample stage could not find a non-duplicate document after "
+ << kMaxAttempts << " while using a random cursor. This is likely a "
+ "sporadic failure, please try again.");
+}
+
+Value DocumentSourceSampleFromRandomCursor::serialize(bool explain) const {
+ return Value(DOC(getSourceName() << DOC("size" << _size)));
+}
+
+DocumentSource::GetDepsReturn DocumentSourceSampleFromRandomCursor::getDependencies(
+ DepsTracker* deps) const {
+ deps->fields.insert(_idField);
+ return SEE_NEXT;
+}
+
+intrusive_ptr<DocumentSourceSampleFromRandomCursor> DocumentSourceSampleFromRandomCursor::create(
+ const intrusive_ptr<ExpressionContext>& expCtx,
+ long long size,
+ std::string idField,
+ long long nDocsInCollection) {
+ return new DocumentSourceSampleFromRandomCursor(expCtx, size, idField, nDocsInCollection);
+}
+} // mongo
diff --git a/src/mongo/db/pipeline/document_source_test.cpp b/src/mongo/db/pipeline/document_source_test.cpp
index 2d9e3afa535..3fe97dec7b0 100644
--- a/src/mongo/db/pipeline/document_source_test.cpp
+++ b/src/mongo/db/pipeline/document_source_test.cpp
@@ -1037,7 +1037,7 @@ public:
SampleBasics() : _mock(DocumentSourceMock::create()) {}
protected:
- void createSample(long long size) {
+ virtual void createSample(long long size) {
BSONObj spec = BSON("$sample" << BSON("size" << size));
BSONElement specElement = spec.firstElement();
_sample = DocumentSourceSample::createFromBson(specElement, ctx());
@@ -1045,7 +1045,7 @@ protected:
checkBsonRepresentation(spec);
}
- DocumentSourceSample* sample() {
+ DocumentSource* sample() {
return static_cast<DocumentSourceSample*>(_sample.get());
}
@@ -1094,19 +1094,20 @@ protected:
ASSERT(!_sample->getNext());
}
+protected:
+ intrusive_ptr<DocumentSource> _sample;
+ intrusive_ptr<DocumentSourceMock> _mock;
+
private:
/**
* Check that the BSON representation generated by the souce matches the BSON it was
* created with.
*/
void checkBsonRepresentation(const BSONObj& spec) {
- Value serialized = sample()->serialize(false);
+ Value serialized = static_cast<DocumentSourceSample*>(sample())->serialize(false);
auto generatedSpec = serialized.getDocument().toBson();
ASSERT_EQUALS(spec, generatedSpec);
}
-
- intrusive_ptr<DocumentSource> _sample;
- intrusive_ptr<DocumentSourceMock> _mock;
};
/**
@@ -1192,6 +1193,166 @@ TEST_F(InvalidSampleSpec, MissingSize) {
ASSERT_THROWS_CODE(createSample(createSpec(BSONObj())), UserException, 28749);
}
+namespace DocumentSourceSampleFromRandomCursor {
+using mongo::DocumentSourceSampleFromRandomCursor;
+
+class SampleFromRandomCursorBasics : public SampleBasics {
+public:
+ void createSample(long long size) override {
+ _sample = DocumentSourceSampleFromRandomCursor::create(ctx(), size, "_id", 100);
+ sample()->setSource(_mock.get());
+ }
+};
+
+/**
+ * A sample of size zero should not return any results.
+ */
+TEST_F(SampleFromRandomCursorBasics, ZeroSize) {
+ loadDocuments(2);
+ checkResults(0, 0);
+}
+
+/**
+ * When sampling with a size smaller than the number of documents our source stage can produce,
+ * there should be no more than the sample size output.
+ */
+TEST_F(SampleFromRandomCursorBasics, SourceExhaustedBeforeSample) {
+ loadDocuments(5);
+ checkResults(10, 5);
+}
+
+/**
+ * When the source stage runs out of documents, the $sampleFromRandomCursors stage should be
+ * exhausted.
+ */
+TEST_F(SampleFromRandomCursorBasics, SampleExhaustedBeforeSource) {
+ loadDocuments(10);
+ checkResults(5, 5);
+}
+
+/**
+ * The $sampleFromRandomCursor stage should not modify the contents of the documents.
+ */
+TEST_F(SampleFromRandomCursorBasics, DocsUnmodified) {
+ createSample(1);
+ source()->queue.push_back(DOC("_id" << 1 << "b" << DOC("c" << 2)));
+ auto next = sample()->getNext();
+ ASSERT_TRUE(bool(next));
+ Document doc = *next;
+ ASSERT_EQUALS(1, doc["_id"].getInt());
+ ASSERT_EQUALS(2, doc["b"]["c"].getInt());
+ ASSERT_TRUE(doc.hasRandMetaField());
+ assertExhausted();
+}
+
+/**
+ * The $sampleFromRandomCursor stage should ignore duplicate documents.
+ */
+TEST_F(SampleFromRandomCursorBasics, IgnoreDuplicates) {
+ createSample(2);
+ source()->queue.push_back(DOC("_id" << 1));
+ source()->queue.push_back(DOC("_id" << 1)); // Duplicate, should ignore.
+ source()->queue.push_back(DOC("_id" << 2));
+
+ auto next = sample()->getNext();
+ ASSERT_TRUE(bool(next));
+ Document doc = *next;
+ ASSERT_EQUALS(1, doc["_id"].getInt());
+ ASSERT_TRUE(doc.hasRandMetaField());
+ double doc1Meta = doc.getRandMetaField();
+
+ // Should ignore the duplicate {_id: 1}, and return {_id: 2}.
+ next = sample()->getNext();
+ ASSERT_TRUE(bool(next));
+ doc = *next;
+ ASSERT_EQUALS(2, doc["_id"].getInt());
+ ASSERT_TRUE(doc.hasRandMetaField());
+ double doc2Meta = doc.getRandMetaField();
+ ASSERT_GTE(doc1Meta, doc2Meta);
+
+ // Both stages should be exhausted.
+ ASSERT_FALSE(source()->getNext());
+ assertExhausted();
+}
+
+/**
+ * The $sampleFromRandomCursor stage should error if it receives too many duplicate documents.
+ */
+TEST_F(SampleFromRandomCursorBasics, TooManyDups) {
+ createSample(2);
+ for (int i = 0; i < 1000; i++) {
+ source()->queue.push_back(DOC("_id" << 1));
+ }
+
+ // First should be successful, it's not a duplicate.
+ ASSERT_TRUE(bool(sample()->getNext()));
+
+ // The rest are duplicates, should error.
+ ASSERT_THROWS_CODE(sample()->getNext(), UserException, 28799);
+}
+
+/**
+ * The $sampleFromRandomCursor stage should error if it receives a document without an _id.
+ */
+TEST_F(SampleFromRandomCursorBasics, MissingIdField) {
+ // Once with only a bad document.
+ createSample(2); // _idField is '_id'.
+ source()->queue.push_back(DOC("non_id" << 2));
+ ASSERT_THROWS_CODE(sample()->getNext(), UserException, 28793);
+
+ // Again, with some regular documents before a bad one.
+ createSample(2); // _idField is '_id'.
+ source()->queue.push_back(DOC("_id" << 1));
+ source()->queue.push_back(DOC("_id" << 1));
+ source()->queue.push_back(DOC("non_id" << 2));
+
+ // First should be successful.
+ ASSERT_TRUE(bool(sample()->getNext()));
+
+ ASSERT_THROWS_CODE(sample()->getNext(), UserException, 28793);
+}
+
+/**
+ * The $sampleFromRandomCursor stage should set the random meta value in a way that mimics the
+ * non-optimized case.
+ *
+ * This fails currently, but should be re-enabled once SERVER-20121 is resolved.
+ */
+#if 0
+TEST_F(SampleFromRandomCursorBasics, MimicNonOptimized) {
+ // Compute the average random meta value on the each doc returned.
+ double firstTotal = 0.0;
+ double secondTotal = 0.0;
+ int nTrials = 10000;
+ for (int i = 0; i < nTrials; i++) {
+ // Sample 2 out of 3 documents.
+ _sample = DocumentSourceSampleFromRandomCursor::create(ctx(), 2, "_id", 3);
+ sample()->setSource(_mock.get());
+
+ source()->queue.push_back(DOC("_id" << 1));
+ source()->queue.push_back(DOC("_id" << 2));
+
+ auto doc = sample()->getNext();
+ ASSERT_TRUE(bool(doc));
+ ASSERT_TRUE((*doc).hasRandMetaField());
+ firstTotal += (*doc).getRandMetaField();
+
+ doc = sample()->getNext();
+ ASSERT_TRUE(bool(doc));
+ ASSERT_TRUE((*doc).hasRandMetaField());
+ secondTotal += (*doc).getRandMetaField();
+ }
+ // The average random meta value of the first document should be about 0.75.
+ ASSERT_GTE(firstTotal / nTrials, 0.74);
+ ASSERT_LTE(firstTotal / nTrials, 0.76);
+
+ // The average random meta value of the second document should be about 0.5.
+ ASSERT_GTE(secondTotal / nTrials, 0.49);
+ ASSERT_LTE(secondTotal / nTrials, 0.51);
+}
+#endif
+} // namespace DocumentSourceSampleFromRandomCursor
+
} // namespace DocumentSourceSample
namespace DocumentSourceSort {
@@ -1548,10 +1709,10 @@ class RandMeta : public CheckResultsBase {
std::deque<Document> inputData() {
MutableDocument first;
first["_id"] = Value(0);
- first.setRandMetaField(10);
+ first.setRandMetaField(0.01);
MutableDocument second;
second["_id"] = Value(1);
- second.setRandMetaField(20);
+ second.setRandMetaField(0.02);
return {first.freeze(), second.freeze()};
}
diff --git a/src/mongo/db/pipeline/document_value_test.cpp b/src/mongo/db/pipeline/document_value_test.cpp
index 6b2269948e6..cf7e9b00cc2 100644
--- a/src/mongo/db/pipeline/document_value_test.cpp
+++ b/src/mongo/db/pipeline/document_value_test.cpp
@@ -452,18 +452,18 @@ TEST(MetaFields, RandValBasics) {
// Setting the random value field should work as expected.
MutableDocument docBuilder;
- docBuilder.setRandMetaField(1);
+ docBuilder.setRandMetaField(1.0);
Document doc = docBuilder.freeze();
ASSERT_TRUE(doc.hasRandMetaField());
ASSERT_EQ(1, doc.getRandMetaField());
// Setting the random value twice should keep the second value.
MutableDocument docBuilder2;
- docBuilder2.setRandMetaField(1);
- docBuilder2.setRandMetaField(2);
+ docBuilder2.setRandMetaField(1.0);
+ docBuilder2.setRandMetaField(2.0);
Document doc2 = docBuilder2.freeze();
ASSERT_TRUE(doc2.hasRandMetaField());
- ASSERT_EQ(2, doc2.getRandMetaField());
+ ASSERT_EQ(2.0, doc2.getRandMetaField());
}
class SerializationTest : public unittest::Test {
@@ -493,7 +493,7 @@ protected:
TEST_F(SerializationTest, MetaSerializationNoVals) {
MutableDocument docBuilder;
docBuilder.setTextScore(10.0);
- docBuilder.setRandMetaField(20);
+ docBuilder.setRandMetaField(20.0);
assertRoundTrips(docBuilder.freeze());
}
@@ -501,14 +501,14 @@ TEST_F(SerializationTest, MetaSerializationWithVals) {
// Same as above test, but add a non-meta field as well.
MutableDocument docBuilder(DOC("foo" << 10));
docBuilder.setTextScore(10.0);
- docBuilder.setRandMetaField(20);
+ docBuilder.setRandMetaField(20.0);
assertRoundTrips(docBuilder.freeze());
}
TEST(MetaFields, ToAndFromBson) {
MutableDocument docBuilder;
docBuilder.setTextScore(10.0);
- docBuilder.setRandMetaField(20);
+ docBuilder.setRandMetaField(20.0);
Document doc = docBuilder.freeze();
BSONObj obj = doc.toBsonWithMetaData();
ASSERT_EQ(10.0, obj[Document::metaFieldTextScore].Double());
diff --git a/src/mongo/db/pipeline/expression.cpp b/src/mongo/db/pipeline/expression.cpp
index a91f475cd1f..f15564538f0 100644
--- a/src/mongo/db/pipeline/expression.cpp
+++ b/src/mongo/db/pipeline/expression.cpp
@@ -1908,8 +1908,7 @@ Value ExpressionMeta::evaluateInternal(Variables* vars) const {
case MetaType::TEXT_SCORE:
return root.hasTextScore() ? Value(root.getTextScore()) : Value();
case MetaType::RAND_VAL:
- return root.hasRandMetaField() ? Value(static_cast<long long>(root.getRandMetaField()))
- : Value();
+ return root.hasRandMetaField() ? Value(root.getRandMetaField()) : Value();
}
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 884a942ca36..7ea11b1b486 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -34,6 +34,10 @@
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/document_validation.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/exec/multi_iterator.h"
+#include "mongo/db/exec/working_set.h"
+#include "mongo/db/exec/shard_filter.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/pipeline/document_source.h"
@@ -41,9 +45,11 @@
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/query_planner.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/storage/record_store.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/s/chunk_version.h"
+#include "mongo/stdx/memory.h"
namespace mongo {
@@ -91,9 +97,46 @@ private:
intrusive_ptr<ExpressionContext> _ctx;
DBDirectClient _client;
};
+
+/**
+ * Returns a PlanExecutor which uses a random cursor to sample documents if successful. Returns {}
+ * if the storage engine doesn't support random cursors, or if 'sampleSize' is a large enough
+ * percentage of the collection.
+ */
+shared_ptr<PlanExecutor> createRandomCursorExecutor(Collection* collection,
+ OperationContext* txn,
+ long long sampleSize,
+ long long numRecords) {
+ double kMaxSampleRatioForRandCursor = 0.05;
+ if (sampleSize > numRecords * kMaxSampleRatioForRandCursor || numRecords <= 100)
+ return {};
+
+ // Attempt to get a random cursor from the storage engine.
+ auto randCursor = collection->getRecordStore()->getRandomCursor(txn);
+
+ if (!randCursor)
+ return {};
+
+ auto ws = stdx::make_unique<WorkingSet>();
+ auto stage = stdx::make_unique<MultiIteratorStage>(txn, ws.get(), collection);
+ stage->addIterator(std::move(randCursor));
+
+ // If we're in a sharded environment, we need to filter out documents we don't own.
+ if (ShardingState::get(getGlobalServiceContext())
+ ->needCollectionMetadata(txn->getClient(), txn->getNS())) {
+ auto shardFilterStage = stdx::make_unique<ShardFilterStage>(
+ txn,
+ ShardingState::get(getGlobalServiceContext())->getCollectionMetadata(txn->getNS()),
+ ws.get(),
+ stage.release());
+ return uassertStatusOK(PlanExecutor::make(
+ txn, std::move(ws), std::move(shardFilterStage), collection, PlanExecutor::YIELD_AUTO));
+ }
+
+ return uassertStatusOK(PlanExecutor::make(
+ txn, std::move(ws), std::move(stage), collection, PlanExecutor::YIELD_AUTO));
}
-namespace {
StatusWith<std::unique_ptr<PlanExecutor>> attemptToGetExecutor(
OperationContext* txn,
Collection* collection,
@@ -126,9 +169,6 @@ shared_ptr<PlanExecutor> PipelineD::prepareCursorSource(
Collection* collection,
const intrusive_ptr<Pipeline>& pPipeline,
const intrusive_ptr<ExpressionContext>& pExpCtx) {
- // get the full "namespace" name
- const string& fullName = pExpCtx->ns.ns();
-
// We will be modifying the source vector as we go.
Pipeline::SourceContainer& sources = pPipeline->sources;
@@ -141,18 +181,37 @@ shared_ptr<PlanExecutor> PipelineD::prepareCursorSource(
}
}
- if (!sources.empty() && sources.front()->isValidInitialSource()) {
- if (dynamic_cast<DocumentSourceMergeCursors*>(sources.front().get())) {
- // Enable the hooks for setting up authentication on the subsequent internal
- // connections we are going to create. This would normally have been done
- // when SetShardVersion was called, but since SetShardVersion is never called
- // on secondaries, this is needed.
- ShardedConnectionInfo::addHook();
+ if (!sources.empty()) {
+ if (sources.front()->isValidInitialSource()) {
+ if (dynamic_cast<DocumentSourceMergeCursors*>(sources.front().get())) {
+ // Enable the hooks for setting up authentication on the subsequent internal
+ // connections we are going to create. This would normally have been done
+ // when SetShardVersion was called, but since SetShardVersion is never called
+ // on secondaries, this is needed.
+ ShardedConnectionInfo::addHook();
+ }
+ return std::shared_ptr<PlanExecutor>(); // don't need a cursor
+ }
+
+ // Optimize an initial $sample stage if possible.
+ if (auto sampleStage = dynamic_cast<DocumentSourceSample*>(sources.front().get())) {
+ const long long sampleSize = sampleStage->getSampleSize();
+ const long long numRecords = collection->getRecordStore()->numRecords(txn);
+ auto exec = createRandomCursorExecutor(collection, txn, sampleSize, numRecords);
+ if (exec) {
+ // Replace $sample stage with $sampleFromRandomCursor stage.
+ sources.pop_front();
+ std::string idString = collection->ns().isOplog() ? "ts" : "_id";
+ sources.emplace_front(DocumentSourceSampleFromRandomCursor::create(
+ pExpCtx, sampleSize, idString, numRecords));
+
+ const BSONObj initialQuery;
+ return addCursorSource(
+ pPipeline, pExpCtx, exec, pPipeline->getDependencies(initialQuery));
+ }
}
- return std::shared_ptr<PlanExecutor>(); // don't need a cursor
}
-
// Look for an initial match. This works whether we got an initial query or not.
// If not, it results in a "{}" query, which will be what we want in that case.
const BSONObj queryObj = pPipeline->getInitialQuery();
@@ -188,35 +247,7 @@ shared_ptr<PlanExecutor> PipelineD::prepareCursorSource(
auto exec = prepareExecutor(
txn, collection, pPipeline, pExpCtx, sortStage, deps, queryObj, &sortObj, &projForQuery);
- // DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved. We
- // deregister the PlanExecutor so that it can be registered with ClientCursor.
- exec->deregisterExec();
- exec->saveState();
-
- // Put the PlanExecutor into a DocumentSourceCursor and add it to the front of the pipeline.
- intrusive_ptr<DocumentSourceCursor> pSource =
- DocumentSourceCursor::create(fullName, exec, pExpCtx);
-
- // Note the query, sort, and projection for explain.
- pSource->setQuery(queryObj);
- pSource->setSort(sortObj);
- if (!projForQuery.isEmpty()) {
- pSource->setProjection(projForQuery, boost::none);
- } else {
- // There may be fewer dependencies now if the sort was covered.
- if (!sortObj.isEmpty()) {
- deps = pPipeline->getDependencies(queryObj);
- }
-
- pSource->setProjection(deps.toProjection(), deps.toParsedDeps());
- }
-
- while (!sources.empty() && pSource->coalesce(sources.front())) {
- sources.pop_front();
- }
-
- pPipeline->addInitialSource(pSource);
- return exec;
+ return addCursorSource(pPipeline, pExpCtx, exec, deps, queryObj, sortObj, projForQuery);
}
std::shared_ptr<PlanExecutor> PipelineD::prepareExecutor(
@@ -311,4 +342,47 @@ std::shared_ptr<PlanExecutor> PipelineD::prepareExecutor(
txn, collection, expCtx, queryObj, *projectionObj, *sortObj, plannerOpts));
}
+shared_ptr<PlanExecutor> PipelineD::addCursorSource(const intrusive_ptr<Pipeline>& pipeline,
+ const intrusive_ptr<ExpressionContext>& expCtx,
+ shared_ptr<PlanExecutor> exec,
+ DepsTracker deps,
+ const BSONObj& queryObj,
+ const BSONObj& sortObj,
+ const BSONObj& projectionObj) {
+ // Get the full "namespace" name.
+ const string& fullName = expCtx->ns.ns();
+
+ // Put the PlanExecutor into a DocumentSourceCursor and add it to the front of the pipeline.
+ intrusive_ptr<DocumentSourceCursor> pSource =
+ DocumentSourceCursor::create(fullName, exec, expCtx);
+
+ // Note the query, sort, and projection for explain.
+ pSource->setQuery(queryObj);
+ pSource->setSort(sortObj);
+
+ if (!projectionObj.isEmpty()) {
+ pSource->setProjection(projectionObj, boost::none);
+ } else {
+ // There may be fewer dependencies now if the sort was covered.
+ if (!sortObj.isEmpty()) {
+ deps = pipeline->getDependencies(queryObj);
+ }
+
+ pSource->setProjection(deps.toProjection(), deps.toParsedDeps());
+ }
+
+ while (!pipeline->sources.empty() && pSource->coalesce(pipeline->sources.front())) {
+ pipeline->sources.pop_front();
+ }
+
+ pipeline->addInitialSource(pSource);
+
+ // DocumentSourceCursor expects a yielding PlanExecutor that has had its state saved. We
+ // deregister the PlanExecutor so that it can be registered with ClientCursor.
+ exec->deregisterExec();
+ exec->saveState();
+
+ return exec;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index df9cc14f342..a9993c6331b 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -31,6 +31,8 @@
#include <boost/intrusive_ptr.hpp>
#include <memory>
+#include "mongo/bson/bsonobj.h"
+
namespace mongo {
class Collection;
class DocumentSourceCursor;
@@ -102,6 +104,19 @@ private:
const BSONObj& queryObj,
BSONObj* sortObj,
BSONObj* projectionObj);
+
+ /**
+ * Creates a DocumentSourceCursor from the given PlanExecutor and adds it to the front of the
+ * Pipeline.
+ */
+ static std::shared_ptr<PlanExecutor> addCursorSource(
+ const boost::intrusive_ptr<Pipeline>& pipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ std::shared_ptr<PlanExecutor> exec,
+ DepsTracker deps,
+ const BSONObj& queryObj = BSONObj(),
+ const BSONObj& sortObj = BSONObj(),
+ const BSONObj& projectionObj = BSONObj());
};
} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index 393516736d5..ce56af2a184 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -309,7 +309,8 @@ public:
}
~RandomCursor() {
- detachFromOperationContext();
+ if (_cursor)
+ detachFromOperationContext();
}
boost::optional<Record> next() final {
diff --git a/src/mongo/platform/random.cpp b/src/mongo/platform/random.cpp
index b7fd984c411..ea641460037 100644
--- a/src/mongo/platform/random.cpp
+++ b/src/mongo/platform/random.cpp
@@ -40,6 +40,7 @@
#include <cstdlib>
#include <iostream>
#include <fstream>
+#include <limits>
#include "mongo/platform/basic.h"
@@ -93,6 +94,15 @@ int64_t PseudoRandom::nextInt64() {
return (a << 32) | b;
}
+double PseudoRandom::nextCanonicalDouble() {
+ double result;
+ do {
+ auto generated = static_cast<uint64_t>(nextInt64());
+ result = static_cast<double>(generated) / std::numeric_limits<uint64_t>::max();
+ } while (result == 1.0);
+ return result;
+}
+
// --- SecureRandom ----
SecureRandom::~SecureRandom() {}
diff --git a/src/mongo/platform/random.h b/src/mongo/platform/random.h
index 29fe7e1c32c..8d43284b9a1 100644
--- a/src/mongo/platform/random.h
+++ b/src/mongo/platform/random.h
@@ -49,6 +49,11 @@ public:
int64_t nextInt64();
/**
+ * Returns a random number in the range [0, 1).
+ */
+ double nextCanonicalDouble();
+
+ /**
* @return a number between 0 and max
*/
int32_t nextInt32(int32_t max) {
diff --git a/src/mongo/platform/random_test.cpp b/src/mongo/platform/random_test.cpp
index 3aecfb2e98b..c4e214f1af9 100644
--- a/src/mongo/platform/random_test.cpp
+++ b/src/mongo/platform/random_test.cpp
@@ -98,6 +98,52 @@ TEST(RandomTest, R2) {
ASSERT_EQUALS(100U, s.size());
}
+/**
+ * Test that if two PsuedoRandom's have the same seed, then subsequent calls to
+ * nextCanonicalDouble() will return the same value.
+ */
+TEST(RandomTest, NextCanonicalSameSeed) {
+ PseudoRandom a(12);
+ PseudoRandom b(12);
+ for (int i = 0; i < 100; i++) {
+ ASSERT_EQUALS(a.nextCanonicalDouble(), b.nextCanonicalDouble());
+ }
+}
+
+/**
+ * Test that if two PsuedoRandom's have different seeds, then nextCanonicalDouble() will return
+ * different values.
+ */
+TEST(RandomTest, NextCanonicalDifferentSeeds) {
+ PseudoRandom a(12);
+ PseudoRandom b(11);
+ ASSERT_NOT_EQUALS(a.nextCanonicalDouble(), b.nextCanonicalDouble());
+}
+
+/**
+ * Test that nextCanonicalDouble() avoids returning a value soon after it has previously returned
+ * that value.
+ */
+TEST(RandomTest, NextCanonicalDistinctValues) {
+ PseudoRandom a(11);
+ std::set<double> s;
+ for (int i = 0; i < 100; i++) {
+ s.insert(a.nextCanonicalDouble());
+ }
+ ASSERT_EQUALS(100U, s.size());
+}
+
+/**
+ * Test that nextCanonicalDouble() always returns values between 0 and 1.
+ */
+TEST(RandomTest, NextCanonicalWithinRange) {
+ PseudoRandom prng(10);
+ for (int i = 0; i < 100; i++) {
+ double next = prng.nextCanonicalDouble();
+ ASSERT_LTE(0.0, next);
+ ASSERT_LT(next, 1.0);
+ }
+}
TEST(RandomTest, Secure1) {
SecureRandom* a = SecureRandom::create();